非同期 API リファレンス
AsyncNanaSQLiteクラスの非同期メソッド一覧です。
AsyncNanaSQLite
class AsyncNanaSQLite(db_path: str, table: str = 'data', bulk_load: bool = False, optimize: bool = True, cache_size_mb: int = 64, max_workers: int = 5, strict_sql_validation: bool = True, validator: Any | None = None, coerce: bool = False, v2_mode: bool = False, v2_config: V2Config | None = None, **kwargs: Any) -> None(最適化されたスレッドプールを使用するNanaSQLiteの非同期ラッパー)
データベース操作はすべて専用のスレッドプール内で実行され、非同期イベントループのブロックを防ぎます。 これにより、FastAPIやaiohttpなどの非同期アプリケーションで安全に使用できます。
高負荷なシナリオにおいて最適な並行性とパフォーマンスを実現するため、 カスタマイズ可能なスレッドプールを使用しています。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
db_path | str | SQLiteデータベースファイルのパス |
table | str | 使用するテーブル名 (デフォルト: "data") |
bulk_load | bool | Trueの場合、初期化時に全データをメモリに読み込む |
optimize | bool | Trueの場合、WALモードなど高速化設定を適用 |
cache_size_mb | int | SQLiteキャッシュサイズ(MB)、デフォルト64MB |
max_workers | int | スレッドプール内の最大ワーカー数(デフォルト: 5) |
strict_sql_validation | bool | Trueの場合、未許可の関数等を含むクエリを拒否 (v1.2.0) |
validator | `Any | None` |
coerce | bool | |
v2_mode | bool | Trueの場合、新しいV2エンジンの試験的機能を使用する |
v2_config | `V2Config | None` |
使用例
async with AsyncNanaSQLite("mydata.db") as db:
await db.aset("config", {"theme": "dark"})
config = await db.aget("config")
print(config) # 高負荷環境向けの設定
async with AsyncNanaSQLite("mydata.db", max_workers=10) as db:
# 並行処理が多い場合に最適化
results = await asyncio.gather(*[db.aget(f"key_{i}") for i in range(100)])コンストラクタ
コアメソッド
close
def close() -> None非同期でデータベース接続を閉じる
スレッドプールエグゼキューターもシャットダウンします。
使用例
await db.close()table
def table(table_name: str, validator: Any | None | EllipsisType = Ellipsis, coerce: bool | EllipsisType = Ellipsis, hooks: list[NanaHook] | None | EllipsisType = Ellipsis) -> AsyncNanaSQLite非同期でサブテーブルのAsyncNanaSQLiteインスタンスを取得
既に初期化済みの親インスタンスから呼ばれることを想定しています。 接続とエグゼキューターは親インスタンスと共有されます。
⚠️ 重要な注意事項:
- 同じテーブルに対して複数のインスタンスを作成しないでください 各インスタンスは独立したキャッシュを持つため、キャッシュ不整合が発生します
- 推奨: テーブルインスタンスを変数に保存して再利用してください
非推奨: sub2 = await db.table("users") # キャッシュ不整合の原因
推奨: # users_dbを使い回す
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | 取得するサブテーブル名 |
validator | `Any | None |
coerce | `bool | EllipsisType` |
hooks | `list[NanaHook] | None |
戻り値
Type: AsyncNanaSQLite
AsyncNanaSQLite: 指定したテーブルを操作するAsyncNanaSQLiteインスタンス
使用例
async with AsyncNanaSQLite("mydata.db", table="main") as db:
users_db = await db.table("users")
products_db = await db.table("products")
await users_db.aset("user1", {"name": "Alice"})
await products_db.aset("prod1", {"name": "Laptop"})辞書インターフェース
get
def get(key: str, default: Any = None) -> Any非同期でキーの値を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
default | キーが存在しない場合のデフォルト値 |
戻り値
キーの値(存在しない場合はdefault)
使用例
user = await db.aget("user")
config = await db.aget("config", {})keys
def keys() -> list[str]非同期で全キーを取得
戻り値
Type: list[str]
全キーのリスト
使用例
keys = await db.akeys()values
def values() -> list[Any]非同期で全値を取得
戻り値
Type: list[Any]
全値のリスト
使用例
values = await db.avalues()items
def items() -> list[tuple[str, Any]]非同期で全アイテムを取得
戻り値
Type: list[tuple[str, Any]]
全アイテムのリスト(キーと値のタプル)
使用例
items = await db.aitems()to_dict
def to_dict() -> dict非同期で全データをPython dictとして取得
戻り値
Type: dict
全データを含むdict
使用例
data = await db.to_dict()copy
def copy() -> dict非同期で浅いコピーを作成
戻り値
Type: dict
全データのコピー
使用例
data_copy = await db.copy()clear_cache
def clear_cache() -> Noneaclear_cache のエイリアス
データ管理
aflush
def aflush(wait: bool = False) -> None[v2 Feature] 非同期でv2エンジンのバックグラウンドバッファとキューをSQLiteに明示的にフラッシュする。 v2_modeがFalseの場合は何もしない。
使用例
await db.aflush(wait=True)flush
def flush(wait: bool = False) -> None[v2 Feature] 非同期でv2エンジンのバックグラウンドバッファとキューをSQLiteに明示的にフラッシュする。 v2_modeがFalseの場合は何もしない。
使用例
await db.aflush(wait=True)aget_dlq
def aget_dlq() -> list[dict[str, Any]][v2 Feature] 非同期でデッドレターキュー(DLQ)の内容を取得します。 v2モードが無効な場合は空のリストを返します。
使用例
failed = await db.aget_dlq()get_dlq
def get_dlq() -> list[dict[str, Any]][v2 Feature] 非同期でデッドレターキュー(DLQ)の内容を取得します。 v2モードが無効な場合は空のリストを返します。
使用例
failed = await db.aget_dlq()aretry_dlq
def aretry_dlq() -> None[v2 Feature] 非同期でDLQ内の全アイテムを再試行キューに戻します。 v2モードが無効な場合は何もしません。
使用例
await db.aretry_dlq()retry_dlq
def retry_dlq() -> None[v2 Feature] 非同期でDLQ内の全アイテムを再試行キューに戻します。 v2モードが無効な場合は何もしません。
使用例
await db.aretry_dlq()aclear_dlq
def aclear_dlq() -> None[v2 Feature] 非同期でDLQの内容をクリアします。 v2モードが無効な場合は何もしません。
使用例
await db.aclear_dlq()clear_dlq
def clear_dlq() -> None[v2 Feature] 非同期でDLQの内容をクリアします。 v2モードが無効な場合は何もしません。
使用例
await db.aclear_dlq()aget_v2_metrics
def aget_v2_metrics() -> dict[str, Any][v2 Feature] 非同期でメトリクス情報を取得します( v2_enable_metrics=True 時のみ有効)。 v2モード自体またはメトリクスが無効な場合は空の辞書を返します。
使用例
metrics = await db.aget_v2_metrics()
print(metrics["flush_count"])get_v2_metrics
def get_v2_metrics() -> dict[str, Any][v2 Feature] 非同期でメトリクス情報を取得します( v2_enable_metrics=True 時のみ有効)。 v2モード自体またはメトリクスが無効な場合は空の辞書を返します。
使用例
metrics = await db.aget_v2_metrics()
print(metrics["flush_count"])load_all
def load_all() -> None非同期で全データを一括ロード
使用例
await db.load_all()refresh
def refresh(key: str | None = None) -> None非同期でキャッシュを更新
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | `str | None` |
使用例
await db.refresh("user")
await db.refresh() # 全キャッシュ更新is_cached
def is_cached(key: str) -> bool非同期でキーがキャッシュ済みか確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 確認するキー |
戻り値
Type: bool
キャッシュ済みの場合True
使用例
cached = await db.is_cached("user")batch_update
def batch_update(mapping: dict[str, Any]) -> None非同期で一括書き込み(高速)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
使用例
await db.batch_update({
"key1": "value1",
"key2": "value2",
"key3": {"nested": "data"}
})batch_update_partial
def batch_update_partial(mapping: dict[str, Any]) -> dict[str, str]非同期で一括書き込み(部分成功モード)
バリデーションまたはシリアライズに失敗したキーだけを拒否し、 正常なキーは一括で保存する。返り値は拒否されたキーと理由の辞書。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
戻り値
Type: dict[str, str]
拒否されたキー -> エラーメッセージ のdict
batch_delete
def batch_delete(keys: list[str]) -> None非同期で一括削除(高速)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
keys | list[str] | 削除するキーのリスト |
使用例
await db.batch_delete(["key1", "key2", "key3"])get_fresh
def get_fresh(key: str, default: Any = None) -> Any非同期でDBから直接読み込み、キャッシュを更新
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
default | キーが存在しない場合のデフォルト値 |
戻り値
DBから取得した最新の値
使用例
value = await db.get_fresh("key")トランザクション制御
begin_transaction
def begin_transaction() -> None非同期でトランザクションを開始
使用例
await db.begin_transaction()
try:
await db.sql_insert("users", {"name": "Alice"})
await db.sql_insert("users", {"name": "Bob"})
await db.commit()
except:
await db.rollback()commit
def commit() -> None非同期でトランザクションをコミット
使用例
await db.commit()rollback
def rollback() -> None非同期でトランザクションをロールバック
使用例
await db.rollback()in_transaction
def in_transaction() -> bool非同期でトランザクション状態を確認
戻り値
Type: bool
bool: トランザクション中の場合True
使用例
status = await db.in_transaction()
print(f"In transaction: {status}")transaction
def transaction()非同期トランザクションのコンテキストマネージャ
使用例
async with db.transaction():
await db.sql_insert("users", {"name": "Alice"})
await db.sql_insert("users", {"name": "Bob"})
# 自動的にコミット、例外時はロールバックSQLラッパー (CRUD)
upsert
def upsert(table_name: str | None = None, data: dict[str, Any] | Any | None = None, *, conflict_columns: list[str] | None = None) -> None非同期でUPSERT(存在すれば更新、なければ挿入)を実行します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
data | `dict[str, Any] | Any |
conflict_columns | `list[str] | None` |
使用例
await db.aupsert("users", {"id": 1, "name": "Alice"})
await db.aupsert("user:1", {"name": "Nana"})sql_insert
def sql_insert(table_name: str, data: dict) -> int非同期でdictから直接INSERT
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | カラム名と値のdict |
戻り値
Type: int
挿入されたROWID
使用例
rowid = await db.sql_insert("users", {
"name": "Alice",
"email": "[email protected]",
"age": 25
})sql_update
def sql_update(table_name: str, data: dict, where: str, parameters: tuple | None = None) -> int非同期でdictとwhere条件でUPDATE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | 更新するカラム名と値のdict |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
更新された行数
使用例
count = await db.sql_update("users",
{"age": 26, "status": "active"},
"name = ?",
("Alice",)
)sql_delete
def sql_delete(table_name: str, where: str, parameters: tuple | None = None) -> int非同期でwhere条件でDELETE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
削除された行数
使用例
count = await db.sql_delete("users", "age < ?", (18,))クエリ
query
def query(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]非同期でSELECTクエリを実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト(各行はdict)
使用例
results = await db.query(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10
)query_with_pagination
def query_with_pagination(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, offset: int | None = None, group_by: str | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]非同期で拡張されたクエリを実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
offset | `int | None` |
group_by | `str | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト(各行はdict)
使用例
results = await db.query_with_pagination(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10,
offset=0
)count
def count(table_name: str | None = None, where: str | None = None, parameters: tuple | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> int非同期でレコード数を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
where | `str | None` |
parameters | `tuple | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: int
レコード数
使用例
count = await db.count("users", "age < ?", (18,))直接SQL実行
execute
def execute(sql: str, parameters: tuple | None = None) -> Any非同期でSQLを直接実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
APSWのCursorオブジェクト
使用例
cursor = await db.execute("SELECT * FROM data WHERE key LIKE ?", ("user%",))execute_many
def execute_many(sql: str, parameters_list: list[tuple]) -> None非同期でSQLを複数のパラメータで一括実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters_list | list[tuple] | パラメータのリスト |
使用例
await db.execute_many(
"INSERT OR REPLACE INTO custom (id, name) VALUES (?, ?)",
[(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)fetch_one
def fetch_one(sql: str, parameters: tuple | None = None) -> tuple | None非同期でSQLを実行して1行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: tuple | None
1行の結果(tuple)
使用例
row = await db.fetch_one("SELECT value FROM data WHERE key = ?", ("user",))fetch_all
def fetch_all(sql: str, parameters: tuple | None = None) -> list[tuple]非同期でSQLを実行して全行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: list[tuple]
全行の結果(tupleのリスト)
使用例
rows = await db.fetch_all("SELECT key, value FROM data WHERE key LIKE ?", ("user%",))スキーマ管理
create_table
def create_table(table_name: str, columns: dict, if_not_exists: bool = True, primary_key: str | None = None) -> None非同期でテーブルを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
columns | dict | カラム定義のdict |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
primary_key | `str | None` |
使用例
await db.create_table("users", {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE"
})create_index
def create_index(index_name: str, table_name: str, columns: list[str], unique: bool = False, if_not_exists: bool = True) -> None非同期でインデックスを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
index_name | str | インデックス名 |
table_name | str | テーブル名 |
columns | list[str] | インデックスを作成するカラムのリスト |
unique | bool | Trueの場合、ユニークインデックスを作成 |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
使用例
await db.create_index("idx_users_email", "users", ["email"], unique=True)table_exists
def table_exists(table_name: str) -> bool非同期でテーブルの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
戻り値
Type: bool
存在する場合True
使用例
exists = await db.table_exists("users")list_tables
def list_tables() -> list[str]非同期でデータベース内の全テーブル一覧を取得
戻り値
Type: list[str]
テーブル名のリスト
使用例
tables = await db.list_tables()drop_table
def drop_table(table_name: str, if_exists: bool = True) -> None非同期でテーブルを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
if_exists | bool | Trueの場合、存在する場合のみ削除 |
使用例
await db.drop_table("old_table")drop_index
def drop_index(index_name: str, if_exists: bool = True) -> None非同期でインデックスを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
index_name | str | インデックス名 |
if_exists | bool | Trueの場合、存在する場合のみ削除 |
使用例
await db.drop_index("idx_users_email")ユーティリティ関数
vacuum
def vacuum() -> None非同期でデータベースを最適化(VACUUM実行)
使用例
await db.vacuum()Pydantic サポート
set_model
def set_model(key: str, model: Any) -> None非同期でPydanticモデルを保存
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 保存するキー |
model | Pydanticモデルのインスタンス |
使用例
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
user = User(name="Nana", age=20)
await db.set_model("user", user)get_model
def get_model(key: str, model_class: type = None) -> Any非同期でPydanticモデルを取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
model_class | type | Pydanticモデルのクラス |
戻り値
Pydanticモデルのインスタンス
使用例
user = await db.get_model("user", User)その他のメソッド
add_hook
def add_hook(hook: NanaHook) -> None引数名
| 引数名 | 型 | 説明 |
|---|---|---|
hook | NanaHook | Any object implementing the NanaHook protocol. |
aget
def aget(key: str, default: Any = None) -> Any非同期でキーの値を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
default | キーが存在しない場合のデフォルト値 |
戻り値
キーの値(存在しない場合はdefault)
使用例
user = await db.aget("user")
config = await db.aget("config", {})aset
def aset(key: str, value: Any) -> None非同期でキーに値を設定
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 設定するキー |
value | 設定する値 |
使用例
await db.aset("user", {"name": "Nana", "age": 20})adelete
def adelete(key: str) -> None非同期でキーを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 削除するキー |
例外
- KeyError: キーが存在しない場合
使用例
await db.adelete("old_data")acontains
def acontains(key: str) -> bool非同期でキーの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 確認するキー |
戻り値
Type: bool
キーが存在する場合True
使用例
if await db.acontains("user"):
print("User exists")contains
def contains(key: str) -> bool非同期でキーの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 確認するキー |
戻り値
Type: bool
キーが存在する場合True
使用例
if await db.acontains("user"):
print("User exists")alen
def alen() -> int非同期でデータベースの件数を取得
戻り値
Type: int
データベース内のキーの数
使用例
count = await db.alen()akeys
def akeys() -> list[str]非同期で全キーを取得
戻り値
Type: list[str]
全キーのリスト
使用例
keys = await db.akeys()avalues
def avalues() -> list[Any]非同期で全値を取得
戻り値
Type: list[Any]
全値のリスト
使用例
values = await db.avalues()aitems
def aitems() -> list[tuple[str, Any]]非同期で全アイテムを取得
戻り値
Type: list[tuple[str, Any]]
全アイテムのリスト(キーと値のタプル)
使用例
items = await db.aitems()apop
def apop(key: str, *args) -> Any非同期でキーを削除して値を返す
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 削除するキー *args: デフォルト値(オプション) |
戻り値
削除されたキーの値
使用例
value = await db.apop("temp_data")
value = await db.apop("maybe_missing", "default")aupdate
def aupdate(mapping: dict | None = None, **kwargs) -> None非同期で複数のキーを更新
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | `dict | None` |
使用例
await db.aupdate({"key1": "value1", "key2": "value2"})
await db.aupdate(key3="value3", key4="value4")aclear
def aclear() -> None非同期で全データを削除
使用例
await db.aclear()asetdefault
def asetdefault(key: str, default: Any = None) -> Any非同期でキーが存在しない場合のみ値を設定
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | キー |
default | デフォルト値 |
戻り値
キーの値(既存または新規設定した値)
使用例
value = await db.asetdefault("config", {})aload_all
def aload_all() -> None非同期で全データを一括ロード
使用例
await db.load_all()arefresh
def arefresh(key: str | None = None) -> None非同期でキャッシュを更新
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | `str | None` |
使用例
await db.refresh("user")
await db.refresh() # 全キャッシュ更新ais_cached
def ais_cached(key: str) -> bool非同期でキーがキャッシュ済みか確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 確認するキー |
戻り値
Type: bool
キャッシュ済みの場合True
使用例
cached = await db.is_cached("user")abatch_update
def abatch_update(mapping: dict[str, Any]) -> None非同期で一括書き込み(高速)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
使用例
await db.batch_update({
"key1": "value1",
"key2": "value2",
"key3": {"nested": "data"}
})abatch_update_partial
def abatch_update_partial(mapping: dict[str, Any]) -> dict[str, str]非同期で一括書き込み(部分成功モード)
バリデーションまたはシリアライズに失敗したキーだけを拒否し、 正常なキーは一括で保存する。返り値は拒否されたキーと理由の辞書。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
戻り値
Type: dict[str, str]
拒否されたキー -> エラーメッセージ のdict
abatch_delete
def abatch_delete(keys: list[str]) -> None非同期で一括削除(高速)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
keys | list[str] | 削除するキーのリスト |
使用例
await db.batch_delete(["key1", "key2", "key3"])ato_dict
def ato_dict() -> dict非同期で全データをPython dictとして取得
戻り値
Type: dict
全データを含むdict
使用例
data = await db.to_dict()acopy
def acopy() -> dict非同期で浅いコピーを作成
戻り値
Type: dict
全データのコピー
使用例
data_copy = await db.copy()aget_fresh
def aget_fresh(key: str, default: Any = None) -> Any非同期でDBから直接読み込み、キャッシュを更新
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
default | キーが存在しない場合のデフォルト値 |
戻り値
DBから取得した最新の値
使用例
value = await db.get_fresh("key")abatch_get
def abatch_get(keys: list[str]) -> dict[str, Any]非同期で複数のキーを一度に取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
keys | list[str] | 取得するキーのリスト |
戻り値
Type: dict[str, Any]
取得に成功したキーと値の dict
使用例
results = await db.abatch_get(["key1", "key2"])aset_model
def aset_model(key: str, model: Any) -> None非同期でPydanticモデルを保存
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 保存するキー |
model | Pydanticモデルのインスタンス |
使用例
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
user = User(name="Nana", age=20)
await db.set_model("user", user)aget_model
def aget_model(key: str, model_class: type = None) -> Any非同期でPydanticモデルを取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
model_class | type | Pydanticモデルのクラス |
戻り値
Pydanticモデルのインスタンス
使用例
user = await db.get_model("user", User)aexecute
def aexecute(sql: str, parameters: tuple | None = None) -> Any非同期でSQLを直接実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
APSWのCursorオブジェクト
使用例
cursor = await db.execute("SELECT * FROM data WHERE key LIKE ?", ("user%",))aexecute_many
def aexecute_many(sql: str, parameters_list: list[tuple]) -> None非同期でSQLを複数のパラメータで一括実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters_list | list[tuple] | パラメータのリスト |
使用例
await db.execute_many(
"INSERT OR REPLACE INTO custom (id, name) VALUES (?, ?)",
[(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)afetch_one
def afetch_one(sql: str, parameters: tuple | None = None) -> tuple | None非同期でSQLを実行して1行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: tuple | None
1行の結果(tuple)
使用例
row = await db.fetch_one("SELECT value FROM data WHERE key = ?", ("user",))afetch_all
def afetch_all(sql: str, parameters: tuple | None = None) -> list[tuple]非同期でSQLを実行して全行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: list[tuple]
全行の結果(tupleのリスト)
使用例
rows = await db.fetch_all("SELECT key, value FROM data WHERE key LIKE ?", ("user%",))acreate_table
def acreate_table(table_name: str, columns: dict, if_not_exists: bool = True, primary_key: str | None = None) -> None非同期でテーブルを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
columns | dict | カラム定義のdict |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
primary_key | `str | None` |
使用例
await db.create_table("users", {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE"
})acreate_index
def acreate_index(index_name: str, table_name: str, columns: list[str], unique: bool = False, if_not_exists: bool = True) -> None非同期でインデックスを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
index_name | str | インデックス名 |
table_name | str | テーブル名 |
columns | list[str] | インデックスを作成するカラムのリスト |
unique | bool | Trueの場合、ユニークインデックスを作成 |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
使用例
await db.create_index("idx_users_email", "users", ["email"], unique=True)aquery
def aquery(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]非同期でSELECTクエリを実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト(各行はdict)
使用例
results = await db.query(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10
)aquery_with_pagination
def aquery_with_pagination(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, offset: int | None = None, group_by: str | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]非同期で拡張されたクエリを実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
offset | `int | None` |
group_by | `str | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト(各行はdict)
使用例
results = await db.query_with_pagination(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10,
offset=0
)atable_exists
def atable_exists(table_name: str) -> bool非同期でテーブルの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
戻り値
Type: bool
存在する場合True
使用例
exists = await db.table_exists("users")alist_tables
def alist_tables() -> list[str]非同期でデータベース内の全テーブル一覧を取得
戻り値
Type: list[str]
テーブル名のリスト
使用例
tables = await db.list_tables()adrop_table
def adrop_table(table_name: str, if_exists: bool = True) -> None非同期でテーブルを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
if_exists | bool | Trueの場合、存在する場合のみ削除 |
使用例
await db.drop_table("old_table")asql_insert
def asql_insert(table_name: str, data: dict) -> int非同期でdictから直接INSERT
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | カラム名と値のdict |
戻り値
Type: int
挿入されたROWID
使用例
rowid = await db.sql_insert("users", {
"name": "Alice",
"email": "[email protected]",
"age": 25
})asql_update
def asql_update(table_name: str, data: dict, where: str, parameters: tuple | None = None) -> int非同期でdictとwhere条件でUPDATE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | 更新するカラム名と値のdict |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
更新された行数
使用例
count = await db.sql_update("users",
{"age": 26, "status": "active"},
"name = ?",
("Alice",)
)asql_delete
def asql_delete(table_name: str, where: str, parameters: tuple | None = None) -> int非同期でwhere条件でDELETE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
削除された行数
使用例
count = await db.sql_delete("users", "age < ?", (18,))acount
def acount(table_name: str | None = None, where: str | None = None, parameters: tuple | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> int非同期でレコード数を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
where | `str | None` |
parameters | `tuple | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: int
レコード数
使用例
count = await db.count("users", "age < ?", (18,))avacuum
def avacuum() -> None非同期でデータベースを最適化(VACUUM実行)
使用例
await db.vacuum()aclear_cache
def aclear_cache() -> Noneメモリキャッシュをクリア (非同期)
DBのデータは削除せず、メモリ上のキャッシュのみ破棄します。
atable
def atable(table_name: str, validator: Any | None | EllipsisType = Ellipsis, coerce: bool | EllipsisType = Ellipsis, hooks: list[NanaHook] | None | EllipsisType = Ellipsis) -> AsyncNanaSQLite非同期でサブテーブルのAsyncNanaSQLiteインスタンスを取得
既に初期化済みの親インスタンスから呼ばれることを想定しています。 接続とエグゼキューターは親インスタンスと共有されます。
⚠️ 重要な注意事項:
- 同じテーブルに対して複数のインスタンスを作成しないでください 各インスタンスは独立したキャッシュを持つため、キャッシュ不整合が発生します
- 推奨: テーブルインスタンスを変数に保存して再利用してください
非推奨: sub2 = await db.table("users") # キャッシュ不整合の原因
推奨: # users_dbを使い回す
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | 取得するサブテーブル名 |
validator | `Any | None |
coerce | `bool | EllipsisType` |
hooks | `list[NanaHook] | None |
戻り値
Type: AsyncNanaSQLite
AsyncNanaSQLite: 指定したテーブルを操作するAsyncNanaSQLiteインスタンス
使用例
async with AsyncNanaSQLite("mydata.db", table="main") as db:
users_db = await db.table("users")
products_db = await db.table("products")
await users_db.aset("user1", {"name": "Alice"})
await products_db.aset("prod1", {"name": "Laptop"})abackup
def abackup(target_path: str) -> None非同期でデータベースを指定のパスにバックアップします。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
target_path | str | バックアップ先のファイルパス |
arestore
def arestore(source_path: str) -> None非同期で指定のパスからデータベースをリストアします。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
source_path | str | リストア元のファイルパス |
apragma
def apragma(pragma_name: str, value: Any = None) -> Any非同期で PRAGMA を実行します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
pragma_name | str | PRAGMA名 |
value | 設定する値(Noneの場合は取得) |
戻り値
PRAGMAの結果
aget_table_schema
def aget_table_schema(table_name: str | None = None) -> list[dict]非同期でテーブルのスキーマ情報を取得します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
戻り値
Type: list[dict]
スキーマ情報のリスト
alist_indexes
def alist_indexes(table_name: str | None = None) -> list[str]非同期でテーブルのインデックス一覧を取得します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
戻り値
Type: list[str]
インデックス名のリスト
aalter_table_add_column
def aalter_table_add_column(table_name: str, column_name: str, column_type: str) -> None非同期でテーブルにカラムを追加します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
column_name | str | カラム名 |
column_type | str | カラムの型定義 |
aupsert
def aupsert(table_name: str | Any = None, data: Any = None, conflict_columns: list[str] | None = None) -> int | None非同期で UPSERT 操作を実行します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | Any` |
data | カラム名と値のdict、または第1引数がキー名の場合は値 | |
conflict_columns | `list[str] | None` |
戻り値
Type: int | None
挿入/更新されたROWID。キー/値ペア指定時はNone。