同期 API リファレンス
NanaSQLiteクラスの同期メソッド一覧です。
NanaSQLite
class NanaSQLite(db_path: str, table: str = 'data', bulk_load: bool = False, optimize: bool = True, cache_size_mb: int = 64, strict_sql_validation: bool = True, validator: Any | None = None, coerce: bool = False, v2_mode: bool = False, v2_config: V2Config | None = None, **kwargs: Any)(APSW SQLiteをバックエンドとした、セキュリティ・接続管理強化版の辞書型ラッパー (v1.2.0))
内部でPython dictを保持し、操作時にSQLiteとの同期を行います。 v1.2.0では、動的SQLのバリデーション強化、ReDoS対策、および厳格な接続管理が導入されています。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
db_path | str | SQLiteデータベースファイルのパス |
table | str | 使用するテーブル名 (デフォルト: "data") |
bulk_load | bool | Trueの場合、初期化時に全データをメモリに読み込む |
optimize | bool | Trueの場合、WALモードなど高速化設定を適用 |
cache_size_mb | int | SQLiteキャッシュサイズ(MB)、デフォルト64MB |
strict_sql_validation | bool | Trueの場合、未許可の関数等を含むクエリを拒否 |
validator | `Any | None` |
coerce | bool | True の場合、validkit-py の自動変換(コアース)機能を有効にする。 |
v2_mode | bool | True の場合、新アーキテクチャ(バックグラウンド非同期書き込み)を有効化 |
v2_config | `V2Config | None` |
コンストラクタ
コアメソッド
close
def close() -> Noneデータベース接続を閉じる
注意: table()メソッドで作成されたインスタンスは接続を共有しているため、 接続の所有者(最初に作成されたインスタンス)のみが接続を閉じます。
例外
- NanaSQLiteTransactionError: トランザクション中にクローズを試みた場合
table
def table(table_name: str, cache_strategy: CacheType | Literal['unbounded', 'lru', 'ttl'] | None = None, cache_size: int | None = None, cache_ttl: float | None = None, cache_persistence_ttl: bool | None = None, validator: Any | None | EllipsisType = Ellipsis, coerce: bool | EllipsisType = Ellipsis, hooks: list[NanaHook] | None | EllipsisType = Ellipsis, v2_enable_metrics: bool | EllipsisType = Ellipsis, memory_first: bool | EllipsisType = Ellipsis)新しいインスタンスを作成しますが、SQLite接続とロックは共有します。 これにより、複数のテーブルインスタンスが同じ接続を使用して スレッドセーフに動作します。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
cache_strategy | `CacheType | Literal[unbounded, lru, ttl] |
cache_size | `int | None` |
cache_ttl | `float | None` |
cache_persistence_ttl | `bool | None` |
validator | `Any | None |
coerce | `bool | EllipsisType` |
hooks | `list[NanaHook] | None |
v2_enable_metrics | `bool | EllipsisType` |
memory_first | `bool | EllipsisType` |
例外
- NanaSQLiteConnectionError: 接続が閉じられている場合
使用例
from validkit import v
with NanaSQLite("app.db", table="main") as main_db:
users_schema = {"name": v.str(), "age": v.int()}
users_db = main_db.table("users", validator=users_schema)
products_db = main_db.table("products")
users_db["user1"] = {"name": "Alice", "age": 30}
products_db["prod1"] = {"name": "Laptop"}辞書インターフェース
__getitem__
def __getitem__(key: str) -> Anydict[key] - 遅延ロード後、メモリから取得
__setitem__
def __setitem__(key: str, value: Any) -> Nonedict[key] = value - 即時書き込み + メモリ更新
__delitem__
def __delitem__(key: str) -> Nonedel dict[key] - 即時削除
__contains__
def __contains__(key: str) -> boolkey in dict - キーの存在確認
軽量な SELECT 1 ... LIMIT 1 クエリで存在確認を行う。 値の読み込みは getitem の _ensure_cached に委譲する。
__len__
def __len__() -> intlen(dict) - DBの実際の件数を返す
__iter__
def __iter__() -> Iterator[str]keys
def keys() -> list全キーを取得(DBから)
values
def values() -> list全値を取得(一括ロードしてからメモリから)
items
def items() -> list全アイテムを取得(一括ロードしてからメモリから)
get
def get(key: str, default: Any = None) -> Anypop
def pop(key: str, *args) -> Anyupdate
def update(mapping: dict | None = None, **kwargs) -> Nonedict.update(mapping) - 一括更新
clear
def clear() -> Nonedict.clear() - 全削除
setdefault
def setdefault(key: str, default: Any = None) -> Anyto_dict
def to_dict() -> dict全データをPython dictとして取得
copy
def copy() -> dict浅いコピーを作成(標準dictを返す)
clear_cache
def clear_cache() -> Noneメモリキャッシュをクリア
DBのデータは削除せず、メモリ上のキャッシュのみ破棄します。
データ管理
get_fresh
def get_fresh(key: str, default: Any = None) -> AnyDBから直接読み込み、キャッシュを更新して値を返す
キャッシュをバイパスしてDBから最新の値を取得する。 execute()でDBを直接変更した後などに使用。
通常のget()よりオーバーヘッドがあるため、 キャッシュとDBの不整合が想定される場合のみ使用推奨。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
default | キーが存在しない場合のデフォルト値 |
戻り値
DBから取得した最新の値(存在しない場合はdefault)
使用例
db.execute("UPDATE data SET value = ? WHERE key = ?", ('"new"', "key"))
value = db.get_fresh("key") # DBから最新値を取得batch_get
def batch_get(keys: list[str]) -> dict[str, Any]複数のキーを一度に取得(効率的な一括ロード)
1回の SELECT IN (...) クエリで複数のキーをDBから取得する。 取得した値は自動的にキャッシュに保存される。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
keys | list[str] | 取得するキーのリスト |
戻り値
Type: dict[str, Any]
取得に成功したキーと値の dict
使用例
results = db.batch_get(["user1", "user2", "user3"])
print(results) # {"user1": {...}, "user2": {...}}flush
def flush(wait: bool = False) -> Noneget_dlq
def get_dlq() -> list[dict[str, Any]][v2 Feature] 現在のデッドレターキュー(DLQ)の内容を取得します。 v2モードが無効な場合は空のリストを返します。
retry_dlq
def retry_dlq() -> None[v2 Feature] デッドレターキュー(DLQ)内の全アイテムを再試行キューに戻します。 v2モードが無効な場合は何もしません。
clear_dlq
def clear_dlq() -> None[v2 Feature] デッドレターキュー(DLQ)の内容をクリアします。 v2モードが無効な場合は何もしません。
get_v2_metrics
def get_v2_metrics() -> dict[str, Any][v2 Feature] 現在のメトリクス情報を取得します(有効な場合)。 v2モード自体またはメトリクスが無効な場合は空の辞書を返します。
load_all
def load_all() -> None一括読み込み: 全データをメモリに展開
refresh
def refresh(key: str | None = None) -> Noneキャッシュを更新(DBから再読み込み)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | `str | None` |
is_cached
def is_cached(key: str) -> boolキーがキャッシュ済みかどうか
batch_update
def batch_update(mapping: dict[str, Any]) -> None一括書き込み(トランザクション + executemany使用で超高速)
大量のデータを一度に書き込む場合、通常のupdateより10-100倍高速。 v1.0.3rc5でexecutemanyによる最適化を追加。 v1.3.4b2より、validkit バリデーター設定時は全値を事前に検証する。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
戻り値
使用例
db.batch_update({"key1": "value1", "key2": "value2", ...})batch_update_partial
def batch_update_partial(mapping: dict[str, Any]) -> dict[str, str]一括書き込み(部分成功モード)
batch_update() のアトミック契約は維持したまま、各キーを個別に準備し、 バリデーションまたはシリアライズに失敗したキーだけをスキップして残りを書き込む。 返り値は、拒否されたキーとその理由の辞書。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
mapping | dict[str, Any] | 書き込むキーと値のdict |
戻り値
Type: dict[str, str]
拒否されたキー -> エラーメッセージ のdict
使用例
failed = db.batch_update_partial({"ok": 1, "bad": object()})
print(failed)batch_delete
def batch_delete(keys: list[str]) -> None一括削除(トランザクション + executemany使用で高速)
v1.0.3rc5でexecutemanyによる最適化を追加。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
keys | list[str] | 削除するキーのリスト |
トランザクション制御
begin_transaction
def begin_transaction() -> Noneトランザクションを開始
Note: SQLiteはネストされたトランザクションをサポートしていません。 既にトランザクション中の場合、NanaSQLiteTransactionErrorが発生します。
例外
- NanaSQLiteTransactionError: 既にトランザクション中の場合
- NanaSQLiteConnectionError: 接続が閉じられている場合
- NanaSQLiteDatabaseError: トランザクション開始に失敗した場合
使用例
db.begin_transaction()
try:
db.sql_insert("users", {"name": "Alice"})
db.sql_insert("users", {"name": "Bob"})
db.commit()
except:
db.rollback()commit
def commit() -> Noneトランザクションをコミット
例外
- NanaSQLiteTransactionError: トランザクション外でコミットを試みた場合
- NanaSQLiteConnectionError: 接続が閉じられている場合
- NanaSQLiteDatabaseError: コミットに失敗した場合
rollback
def rollback() -> Noneトランザクションをロールバック
例外
- NanaSQLiteTransactionError: トランザクション外でロールバックを試みた場合
- NanaSQLiteConnectionError: 接続が閉じられている場合
- NanaSQLiteDatabaseError: ロールバックに失敗した場合
in_transaction
def in_transaction() -> bool現在トランザクション中かどうかを返す
戻り値
Type: bool
bool: トランザクション中の場合True
使用例
db.begin_transaction()
print(db.in_transaction()) # True
db.commit()
print(db.in_transaction()) # Falsetransaction
def transaction()トランザクションのコンテキストマネージャ
コンテキストマネージャ内で例外が発生しない場合は自動的にコミット、 例外が発生した場合は自動的にロールバックします。
例外
- NanaSQLiteTransactionError: 既にトランザクション中の場合
使用例
with db.transaction():
db.sql_insert("users", {"name": "Alice"})
db.sql_insert("users", {"name": "Bob"})
# 自動的にコミット、例外時はロールバックSQLラッパー (CRUD)
sql_insert
def sql_insert(table_name: str, data: dict) -> intdictから直接INSERT
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | カラム名と値のdict |
戻り値
Type: int
挿入されたROWID
使用例
rowid = db.sql_insert("users", {
"name": "Alice",
"email": "[email protected]",
"age": 25
})sql_update
def sql_update(table_name: str, data: dict, where: str, parameters: tuple | None = None) -> intdictとwhere条件でUPDATE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data | dict | 更新するカラム名と値のdict |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
更新された行数
使用例
count = db.sql_update("users",
{"age": 26, "status": "active"},
"name = ?",
("Alice",)
)sql_delete
def sql_delete(table_name: str, where: str, parameters: tuple | None = None) -> intwhere条件でDELETE
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: int
削除された行数
使用例
count = db.sql_delete("users", "age < ?", (18,))upsert
def upsert(table_name: str | Any = None, data: Any = None, conflict_columns: list[str] | None = None) -> int | NoneINSERT OR REPLACE の簡易版(upsert) v2モードが有効で、キー/値のペアとして呼び出された場合はバックグラウンドキューに送られます。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | Any` |
data | カラム名と値のdict、または第1引数がキー名の場合は値 | |
conflict_columns | `list[str] | None` |
戻り値
Type: int | None
挿入/更新されたROWID。v2モードでのキー/値ペア指定時はNone。
使用例
# テーブル指定(標準)
db.upsert("users", {"id": 1, "name": "Alice", "age": 25})
# キー/値指定 (v2互換)
db.upsert("user:1", {"name": "Nana"})クエリ
query
def query(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]シンプルなSELECTクエリを実行
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト(各行はdict)
使用例
# デフォルトテーブルから全データ取得
results = db.query() # 条件付き検索
results = db.query(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10
)count
def count(table_name: str | None = None, where: str | None = None, parameters: tuple | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> intレコード数を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
where | `str | None` |
parameters | `tuple | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
使用例
total = db.count("users")
adults = db.count("users", "age >= ?", (18,))exists
def exists(table_name: str, where: str, parameters: tuple | None = None) -> boolレコードの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
where | str | WHERE句の条件 |
parameters | `tuple | None` |
戻り値
Type: bool
存在する場合True
使用例
if db.exists("users", "email = ?", ("[email protected]",)):
print("User exists")query_with_pagination
def query_with_pagination(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, offset: int | None = None, group_by: str | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]拡張されたクエリ(offset、group_by対応)
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
offset | `int | None` |
group_by | `str | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool | Trueの場合、インスタンス許可設定を無視 |
戻り値
Type: list[dict]
結果のリスト
使用例
# ページネーション
page2 = db.query_with_pagination("users",
limit=10, offset=10, order_by="id ASC") # グループ集計
stats = db.query_with_pagination("orders",
columns=["user_id", "COUNT(*) as order_count"],
group_by="user_id"
)直接SQL実行
execute
def execute(sql: str, parameters: tuple | None = None) -> apsw.CursorSQLを直接実行
任意のSQL文を実行できる。SELECT、INSERT、UPDATE、DELETEなど。 パラメータバインディングをサポート(SQLインジェクション対策)。
このメソッドで直接デフォルトテーブル(data)を操作した場合、
内部キャッシュ(_data)と不整合が発生する可能性があります。
キャッシュを更新するには `refresh()` を呼び出してください。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: apsw.Cursor
APSWのCursorオブジェクト(結果の取得に使用)
例外
- NanaSQLiteConnectionError: 接続が閉じられている場合
- NanaSQLiteDatabaseError: SQL実行エラー
使用例
cursor = db.execute("SELECT * FROM data WHERE key LIKE ?", ("user%",))
for row in cursor:
print(row)# キャッシュ更新が必要な場合:
db.execute("UPDATE data SET value = ? WHERE key = ?", ('"new"', "key"))
db.refresh("key") # キャッシュを更新execute_many
def execute_many(sql: str, parameters_list: list[tuple]) -> NoneSQLを複数のパラメータで一括実行
同じSQL文を複数のパラメータセットで実行(トランザクション使用)。 大量のINSERTやUPDATEを高速に実行できる。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters_list | list[tuple] | パラメータのリスト |
使用例
db.execute_many(
"INSERT OR REPLACE INTO custom (id, name) VALUES (?, ?)",
[(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)fetch_one
def fetch_one(sql: str, parameters: tuple | None = None) -> tuple | NoneSQLを実行して1行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: tuple | None
1行の結果(tuple)、結果がない場合はNone
使用例
row = db.fetch_one("SELECT value FROM data WHERE key = ?", ("user",))
print(row[0])fetch_all
def fetch_all(sql: str, parameters: tuple | None = None) -> list[tuple]SQLを実行して全行取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
sql | str | 実行するSQL文 |
parameters | `tuple | None` |
戻り値
Type: list[tuple]
全行の結果(tupleのリスト)
使用例
rows = db.fetch_all("SELECT key, value FROM data WHERE key LIKE ?", ("user%",))
for key, value in rows:
print(key, value)スキーマ管理
create_table
def create_table(table_name: str, columns: dict, if_not_exists: bool = True, primary_key: str | None = None) -> Noneテーブルを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
columns | dict | カラム定義のdict(カラム名: SQL型) |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
primary_key | `str | None` |
使用例
db.create_table("users", {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE",
"age": "INTEGER"
})
db.create_table("posts", {
"id": "INTEGER",
"title": "TEXT",
"content": "TEXT"
}, primary_key="id")create_index
def create_index(index_name: str, table_name: str, columns: list[str], unique: bool = False, if_not_exists: bool = True) -> Noneインデックスを作成
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
index_name | str | インデックス名 |
table_name | str | テーブル名 |
columns | list[str] | インデックスを作成するカラムのリスト |
unique | bool | Trueの場合、ユニークインデックスを作成 |
if_not_exists | bool | Trueの場合、存在しない場合のみ作成 |
使用例
db.create_index("idx_users_email", "users", ["email"], unique=True)
db.create_index("idx_posts_user", "posts", ["user_id", "created_at"])table_exists
def table_exists(table_name: str) -> boolテーブルの存在確認
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
戻り値
Type: bool
存在する場合True、しない場合False
使用例
if db.table_exists("users"):
print("users table exists")list_tables
def list_tables() -> list[str]データベース内の全テーブル一覧を取得
戻り値
Type: list[str]
テーブル名のリスト
使用例
tables = db.list_tables()
print(tables) # ['data', 'users', 'posts']drop_table
def drop_table(table_name: str, if_exists: bool = True) -> Noneテーブルを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
if_exists | bool | Trueの場合、存在する場合のみ削除(エラーを防ぐ) |
使用例
db.drop_table("old_table")
db.drop_table("temp", if_exists=True)drop_index
def drop_index(index_name: str, if_exists: bool = True) -> Noneインデックスを削除
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
index_name | str | インデックス名 |
if_exists | bool | Trueの場合、存在する場合のみ削除 |
使用例
db.drop_index("idx_users_email")alter_table_add_column
def alter_table_add_column(table_name: str, column_name: str, column_type: str, default: Any = None) -> None既存テーブルにカラムを追加
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
column_name | str | カラム名 |
column_type | str | カラムの型(SQL型) |
default | デフォルト値(Noneの場合は指定なし) |
使用例
db.alter_table_add_column("users", "phone", "TEXT")
db.alter_table_add_column("users", "status", "TEXT", default="'active'")get_table_schema
def get_table_schema(table_name: str | None = None) -> list[dict]テーブル構造を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
戻り値
Type: list[dict]
カラム情報のリスト(各カラムはdict)
使用例
schema = db.get_table_schema("users")
for col in schema:
print(f"{col['name']}: {col['type']}")list_indexes
def list_indexes(table_name: str | None = None) -> list[dict]インデックス一覧を取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | `str | None` |
戻り値
Type: list[dict]
インデックス情報のリスト
使用例
indexes = db.list_indexes("users")
for idx in indexes:
print(f"{idx['name']}: {idx['columns']}")ユーティリティ関数
vacuum
def vacuum() -> Noneデータベースを最適化(VACUUM実行)
削除されたレコードの領域を回収し、データベースファイルを最適化。
使用例
db.vacuum()get_db_size
def get_db_size() -> intデータベースファイルのサイズを取得(バイト単位)
戻り値
Type: int
データベースファイルのサイズ
使用例
size = db.get_db_size()
print(f"DB size: {size / 1024 / 1024:.2f} MB")get_last_insert_rowid
def get_last_insert_rowid() -> int最後に挿入されたROWIDを取得
戻り値
Type: int
最後に挿入されたROWID
使用例
db.sql_insert("users", {"name": "Alice"})
rowid = db.get_last_insert_rowid()pragma
def pragma(pragma_name: str, value: Any = None) -> AnyPRAGMA設定の取得/設定
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
pragma_name | str | PRAGMA名 |
value | 設定値(Noneの場合は取得のみ) |
戻り値
valueがNoneの場合は現在の値、そうでない場合はNone
使用例
# 取得
mode = db.pragma("journal_mode") # 設定
db.pragma("foreign_keys", 1)バックアップ & リストア
backup
def backup(dest_path: str) -> Noneデータベースをファイルにバックアップする
APSW の SQLite バックアップ API を使用して、現在の DB 全体を dest_path に書き出します。 SQLite のトランザクション機構により、他の SQLite 接続が同時に読み書きしていても データの整合性を保ったままバックアップできます。 NanaSQLite の内部ロックはバックアップ中に保持しないため、同一プロセス内の 他の NanaSQLite 操作をブロックしません。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
dest_path | str | バックアップ先ファイルパス |
例外
- NanaSQLiteClosedError: 接続が閉じられている場合
- NanaSQLiteValidationError: dest_path が現在のDBファイルと同一の場合(自己コピー防止)、または
- dest_path がインメモリDB文字列(':memory:' など)の場合(永続化されないため)
- NanaSQLiteDatabaseError: バックアップ中にエラーが発生した場合
- NanaSQLiteLockError: lock_timeout 設定によりロック取得に失敗した場合
restore
def restore(src_path: str) -> Noneバックアップファイルからデータベースをリストアする
現在の接続を一時的に閉じ、src_path のファイルを DB パスにコピーし、 stale な WAL/SHM/journal サイドカーファイル(-wal/-shm/-journal)を 削除してから再接続します。 リストア後はメモリキャッシュがクリアされ、DB の内容が反映されます。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
src_path | str | リストア元バックアップファイルパス |
例外
- NanaSQLiteClosedError: 接続が閉じられている場合
- NanaSQLiteConnectionError: 接続を所有していない (table() で取得した) インスタンスから呼ばれた場合
- NanaSQLiteValidationError: 現在のDBがインメモリDB(':memory:' など)の場合(ファイル置換が不可能なため)
- NanaSQLiteTransactionError: トランザクション中に呼ばれた場合
- NanaSQLiteDatabaseError: リストア中にエラーが発生した場合
- NanaSQLiteLockError: lock_timeout 設定によりロック取得に失敗した場合
Pydantic サポート
set_model
def set_model(key: str, model: Any) -> NonePydanticモデルを保存
Pydanticモデル(BaseModelを継承したクラス)をシリアライズして保存。 model_dump()メソッドを使用してdictに変換し、モデルのクラス情報も保存。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 保存するキー |
model | Pydanticモデルのインスタンス |
使用例
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
user = User(name="Nana", age=20)
db.set_model("user", user)get_model
def get_model(key: str, model_class: type | None = None) -> AnyPydanticモデルを取得
保存されたPydanticモデルをデシリアライズして復元。 model_classが指定されていない場合は、保存時のクラス情報を使用。
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
key | str | 取得するキー |
model_class | `type | None` |
戻り値
Pydanticモデルのインスタンス
使用例
user = db.get_model("user", User)
print(user.name) # "Nana"その他のメソッド
add_hook
def add_hook(hook: NanaHook) -> None引数名
| 引数名 | 型 | 説明 |
|---|---|---|
hook | NanaHook | Instantiated hook (e.g., CheckHook, UniqueHook, PydanticHook). |
popitem
def popitem()export_table_to_dict
def export_table_to_dict(table_name: str) -> list[dict]テーブル全体をdictのリストとして取得
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
戻り値
Type: list[dict]
全レコードのリスト
使用例
all_users = db.export_table_to_dict("users")import_from_dict_list
def import_from_dict_list(table_name: str, data_list: list[dict]) -> intdictのリストからテーブルに一括挿入
引数名
| 引数名 | 型 | 説明 |
|---|---|---|
table_name | str | テーブル名 |
data_list | list[dict] | 挿入するデータのリスト |
戻り値
Type: int
挿入された行数
使用例
users = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30}
]
count = db.import_from_dict_list("users", users)get_by_path
def get_by_path(key: str, path: str)パスから値を取得
例外
- NanaSQLiteDatabaseError: 暗号化モードが有効な場合
使用例
# 例: JSON列から特定のフィールドを取得
db.get_by_path("Alice", "$.age")
db.get_by_path("Bob", "$.score")