Synchronous API Reference
Reference for the synchronous NanaSQLite class.
NanaSQLite
class NanaSQLite(db_path: str, table: str = 'data', bulk_load: bool = False, optimize: bool = True, cache_size_mb: int = 64, strict_sql_validation: bool = True, validator: Any | None = None, coerce: bool = False, v2_mode: bool = False, v2_config: V2Config | None = None, **kwargs: Any)APSW SQLite-backed dict wrapper with Security and Connection Enhancements (v1.2.0).
Internally maintains a Python dict and synchronizes with SQLite during operations. In v1.2.0, enhanced dynamic SQL validation, ReDoS protection, and strict connection management are introduced.
Parameter
| Parameter | Type | Description |
|---|---|---|
db_path | str | |
table | str | |
bulk_load | bool | |
optimize | bool | |
cache_size_mb | int | |
strict_sql_validation | bool | |
validator | `Any | None` |
coerce | bool | True の場合、validkit-py の自動変換(コアース)機能を有効にする。 |
v2_mode | bool | |
v2_config | `V2Config | None` |
Constructor
Core Methods
close
def close() -> Nonetable
def table(table_name: str, cache_strategy: CacheType | Literal['unbounded', 'lru', 'ttl'] | None = None, cache_size: int | None = None, cache_ttl: float | None = None, cache_persistence_ttl: bool | None = None, validator: Any | None | EllipsisType = Ellipsis, coerce: bool | EllipsisType = Ellipsis, hooks: list[NanaHook] | None | EllipsisType = Ellipsis, v2_enable_metrics: bool | EllipsisType = Ellipsis, memory_first: bool | EllipsisType = Ellipsis)Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
cache_strategy | `CacheType | Literal[unbounded, lru, ttl] |
cache_size | `int | None` |
cache_ttl | `float | None` |
cache_persistence_ttl | `bool | None` |
validator | `Any | None |
coerce | `bool | EllipsisType` |
hooks | `list[NanaHook] | None |
v2_enable_metrics | `bool | EllipsisType` |
memory_first | `bool | EllipsisType` |
Raises
Example
from validkit import v
with NanaSQLite("app.db", table="main") as main_db:
users_schema = {"name": v.str(), "age": v.int()}
users_db = main_db.table("users", validator=users_schema)
products_db = main_db.table("products")
users_db["user1"] = {"name": "Alice", "age": 30}
products_db["prod1"] = {"name": "Laptop"}Dictionary Interface
__getitem__
def __getitem__(key: str) -> Any__setitem__
def __setitem__(key: str, value: Any) -> None__delitem__
def __delitem__(key: str) -> None__contains__
def __contains__(key: str) -> bool__len__
def __len__() -> int__iter__
def __iter__() -> Iterator[str]for key in dict
keys
def keys() -> listvalues
def values() -> listitems
def items() -> listget
def get(key: str, default: Any = None) -> Anydict.get(key, default)
pop
def pop(key: str, *args) -> Anydict.pop(key[, default])
update
def update(mapping: dict | None = None, **kwargs) -> Noneclear
def clear() -> Nonesetdefault
def setdefault(key: str, default: Any = None) -> Anydict.setdefault(key, default)
to_dict
def to_dict() -> dictcopy
def copy() -> dictclear_cache
def clear_cache() -> NoneData Management
get_fresh
def get_fresh(key: str, default: Any = None) -> Anyexecute()でDBを直接変更した後などに使用。
通常のget()よりオーバーヘッドがあるため、
Parameter
| Parameter | Type | Description |
|---|---|---|
key | str |
Returns
Example
db.execute("UPDATE data SET value = ? WHERE key = ?", ('"new"', "key"))
value = db.get_fresh("key") # DBから最新値を取得batch_get
def batch_get(keys: list[str]) -> dict[str, Any]1回の SELECT IN (...) クエリで複数のキーをDBから取得する。
Parameter
| Parameter | Type | Description |
|---|---|---|
keys | list[str] |
Returns
Type: dict[str, Any]
Example
results = db.batch_get(["user1", "user2", "user3"])
print(results) # {"user1": {...}, "user2": {...}}flush
def flush(wait: bool = False) -> None[v2 Feature] Explicitly flush the v2 engine's background buffer and queue to SQLite. If v2_mode is False, this operates as a no-op.
get_dlq
def get_dlq() -> list[dict[str, Any]]retry_dlq
def retry_dlq() -> Noneclear_dlq
def clear_dlq() -> Noneget_v2_metrics
def get_v2_metrics() -> dict[str, Any]load_all
def load_all() -> Nonerefresh
def refresh(key: str | None = None) -> Noneis_cached
def is_cached(key: str) -> boolbatch_update
def batch_update(mapping: dict[str, Any]) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
mapping | dict[str, Any] |
Returns
None
Example
db.batch_update({"key1": "value1", "key2": "value2", ...})batch_update_partial
def batch_update_partial(mapping: dict[str, Any]) -> dict[str, str]batch_update() のアトミック契約は維持したまま、各キーを個別に準備し、
Parameter
| Parameter | Type | Description |
|---|---|---|
mapping | dict[str, Any] |
Returns
Type: dict[str, str]
Example
failed = db.batch_update_partial({"ok": 1, "bad": object()})
print(failed)batch_delete
def batch_delete(keys: list[str]) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
keys | list[str] |
Returns
None
Transaction Control
begin_transaction
def begin_transaction() -> NoneNote:
Raises
Example
db.begin_transaction()
try:
db.sql_insert("users", {"name": "Alice"})
db.sql_insert("users", {"name": "Bob"})
db.commit()
except:
db.rollback()commit
def commit() -> Nonerollback
def rollback() -> Nonein_transaction
def in_transaction() -> boolReturns
Type: bool
Example
db.begin_transaction()
print(db.in_transaction()) # True
db.commit()
print(db.in_transaction()) # Falsetransaction
def transaction()Raises
Example
with db.transaction():
db.sql_insert("users", {"name": "Alice"})
db.sql_insert("users", {"name": "Bob"})
# 自動的にコミット、例外時はロールバックSQL Wrapper (CRUD)
sql_insert
def sql_insert(table_name: str, data: dict) -> intParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
data | dict |
Returns
Type: int
Example
rowid = db.sql_insert("users", {
"name": "Alice",
"email": "[email protected]",
"age": 25
})sql_update
def sql_update(table_name: str, data: dict, where: str, parameters: tuple | None = None) -> intParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
data | dict | |
where | str | |
parameters | `tuple | None` |
Returns
Type: int
Example
count = db.sql_update("users",
{"age": 26, "status": "active"},
"name = ?",
("Alice",)
)sql_delete
def sql_delete(table_name: str, where: str, parameters: tuple | None = None) -> intParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
where | str | |
parameters | `tuple | None` |
Returns
Type: int
Example
count = db.sql_delete("users", "age < ?", (18,))upsert
def upsert(table_name: str | Any = None, data: Any = None, conflict_columns: list[str] | None = None) -> int | NoneParameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | Any` |
conflict_columns | `list[str] | None` |
Returns
Type: int | None
Example
# テーブル指定(標準)
db.upsert("users", {"id": 1, "name": "Alice", "age": 25})
# キー/値指定 (v2互換)
db.upsert("user:1", {"name": "Nana"})Query
query
def query(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool |
Returns
Type: list[dict]
Example
# デフォルトテーブルから全データ取得
results = db.query() # 条件付き検索
results = db.query(
table_name="users",
columns=["id", "name", "email"],
where="age > ?",
parameters=(20,),
order_by="name ASC",
limit=10
)count
def count(table_name: str | None = None, where: str | None = None, parameters: tuple | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> intParameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | None` |
where | `str | None` |
parameters | `tuple | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool |
Example
total = db.count("users")
adults = db.count("users", "age >= ?", (18,))exists
def exists(table_name: str, where: str, parameters: tuple | None = None) -> boolParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
where | str | |
parameters | `tuple | None` |
Returns
Type: bool
Example
if db.exists("users", "email = ?", ("[email protected]",)):
print("User exists")query_with_pagination
def query_with_pagination(table_name: str | None = None, columns: list[str] | None = None, where: str | None = None, parameters: tuple | None = None, order_by: str | None = None, limit: int | None = None, offset: int | None = None, group_by: str | None = None, strict_sql_validation: bool | None = None, allowed_sql_functions: list[str] | None = None, forbidden_sql_functions: list[str] | None = None, override_allowed: bool = False) -> list[dict]Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | None` |
columns | `list[str] | None` |
where | `str | None` |
parameters | `tuple | None` |
order_by | `str | None` |
limit | `int | None` |
offset | `int | None` |
group_by | `str | None` |
strict_sql_validation | `bool | None` |
allowed_sql_functions | `list[str] | None` |
forbidden_sql_functions | `list[str] | None` |
override_allowed | bool |
Returns
Type: list[dict]
Example
# ページネーション
page2 = db.query_with_pagination("users",
limit=10, offset=10, order_by="id ASC") # グループ集計
stats = db.query_with_pagination("orders",
columns=["user_id", "COUNT(*) as order_count"],
group_by="user_id"
)Direct SQL Execution
execute
def execute(sql: str, parameters: tuple | None = None) -> apsw.Cursor.. warning:: キャッシュを更新するには refresh() を呼び出してください。
Parameter
| Parameter | Type | Description |
|---|---|---|
sql | str | |
parameters | `tuple | None` |
Returns
Type: apsw.Cursor
Raises
Example
cursor = db.execute("SELECT * FROM data WHERE key LIKE ?", ("user%",))
for row in cursor:
print(row) db.execute("UPDATE data SET value = ? WHERE key = ?", ('"new"', "key"))
db.refresh("key") # キャッシュを更新execute_many
def execute_many(sql: str, parameters_list: list[tuple]) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
sql | str | |
parameters_list | list[tuple] |
Example
db.execute_many(
"INSERT OR REPLACE INTO custom (id, name) VALUES (?, ?)",
[(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)fetch_one
def fetch_one(sql: str, parameters: tuple | None = None) -> tuple | NoneParameter
| Parameter | Type | Description |
|---|---|---|
sql | str | |
parameters | `tuple | None` |
Returns
Type: tuple | None
Example
row = db.fetch_one("SELECT value FROM data WHERE key = ?", ("user",))
print(row[0])fetch_all
def fetch_all(sql: str, parameters: tuple | None = None) -> list[tuple]Parameter
| Parameter | Type | Description |
|---|---|---|
sql | str | |
parameters | `tuple | None` |
Returns
Type: list[tuple]
Example
rows = db.fetch_all("SELECT key, value FROM data WHERE key LIKE ?", ("user%",))
for key, value in rows:
print(key, value)Schema Management
create_table
def create_table(table_name: str, columns: dict, if_not_exists: bool = True, primary_key: str | None = None) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
columns | dict | |
if_not_exists | bool | |
primary_key | `str | None` |
Example
db.create_table("users", {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE",
"age": "INTEGER"
})
db.create_table("posts", {
"id": "INTEGER",
"title": "TEXT",
"content": "TEXT"
}, primary_key="id")create_index
def create_index(index_name: str, table_name: str, columns: list[str], unique: bool = False, if_not_exists: bool = True) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
index_name | str | |
table_name | str | |
columns | list[str] | |
unique | bool | |
if_not_exists | bool |
Example
db.create_index("idx_users_email", "users", ["email"], unique=True)
db.create_index("idx_posts_user", "posts", ["user_id", "created_at"])table_exists
def table_exists(table_name: str) -> boolParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str |
Returns
Type: bool
Example
if db.table_exists("users"):
print("users table exists")list_tables
def list_tables() -> list[str]Returns
Type: list[str]
Example
tables = db.list_tables()
print(tables) # ['data', 'users', 'posts']drop_table
def drop_table(table_name: str, if_exists: bool = True) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
if_exists | bool |
Example
db.drop_table("old_table")
db.drop_table("temp", if_exists=True)drop_index
def drop_index(index_name: str, if_exists: bool = True) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
index_name | str | |
if_exists | bool |
Example
db.drop_index("idx_users_email")alter_table_add_column
def alter_table_add_column(table_name: str, column_name: str, column_type: str, default: Any = None) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
column_name | str | |
column_type | str |
Example
db.alter_table_add_column("users", "phone", "TEXT")
db.alter_table_add_column("users", "status", "TEXT", default="'active'")get_table_schema
def get_table_schema(table_name: str | None = None) -> list[dict]Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | None` |
Returns
Type: list[dict]
Example
schema = db.get_table_schema("users")
for col in schema:
print(f"{col['name']}: {col['type']}")list_indexes
def list_indexes(table_name: str | None = None) -> list[dict]Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | `str | None` |
Returns
Type: list[dict]
Example
indexes = db.list_indexes("users")
for idx in indexes:
print(f"{idx['name']}: {idx['columns']}")Utility Functions
vacuum
def vacuum() -> NoneExample
db.vacuum()get_db_size
def get_db_size() -> intReturns
Type: int
Example
size = db.get_db_size()
print(f"DB size: {size / 1024 / 1024:.2f} MB")get_last_insert_rowid
def get_last_insert_rowid() -> intReturns
Type: int
Example
db.sql_insert("users", {"name": "Alice"})
rowid = db.get_last_insert_rowid()pragma
def pragma(pragma_name: str, value: Any = None) -> AnyParameter
| Parameter | Type | Description |
|---|---|---|
pragma_name | str |
Returns
Example
# 取得
mode = db.pragma("journal_mode") # 設定
db.pragma("foreign_keys", 1)Backup & Restore
backup
def backup(dest_path: str) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
dest_path | str |
restore
def restore(src_path: str) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
src_path | str |
Pydantic Support
set_model
def set_model(key: str, model: Any) -> NoneParameter
| Parameter | Type | Description |
|---|---|---|
key | str |
Example
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
user = User(name="Nana", age=20)
db.set_model("user", user)get_model
def get_model(key: str, model_class: type | None = None) -> AnyParameter
| Parameter | Type | Description |
|---|---|---|
key | str | |
model_class | `type | None` |
Returns
Example
user = db.get_model("user", User)
print(user.name) # "Nana"Other Methods
add_hook
def add_hook(hook: NanaHook) -> None[v1.5.0 Feature] Add a hook/constraint to intercept read, write, and delete operations.
Parameter
| Parameter | Type | Description |
|---|---|---|
hook | NanaHook | Instantiated hook (e.g., CheckHook, UniqueHook, PydanticHook). |
popitem
def popitem()D.popitem() -> (k, v), remove and return some (key, value) pair as a 2-tuple; but raise KeyError if D is empty.
export_table_to_dict
def export_table_to_dict(table_name: str) -> list[dict]Parameter
| Parameter | Type | Description |
|---|---|---|
table_name | str |
Returns
Type: list[dict]
Example
all_users = db.export_table_to_dict("users")import_from_dict_list
def import_from_dict_list(table_name: str, data_list: list[dict]) -> intParameter
| Parameter | Type | Description |
|---|---|---|
table_name | str | |
data_list | list[dict] |
Returns
Type: int
Example
users = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30}
]
count = db.import_from_dict_list("users", users)get_by_path
def get_by_path(key: str, path: str):param key: :param path: :return:
Raises
Example
# 例: JSON列から特定のフィールドを取得
db.get_by_path("Alice", "$.age")
db.get_by_path("Bob", "$.score")