NanaSQLite ベストプラクティス
本番環境でNanaSQLiteを効果的に使用するための包括的なガイドです。
目次
パフォーマンス最適化
適切なキャッシュ戦略の選択
遅延ロード(デフォルト)
# 最適: 大規模データベース、まばらなアクセスパターン
db = NanaSQLite("large.db")
# アクセス時のみデータをロード
user = db["user_123"] # 初回アクセス: DBからロード
user = db["user_123"] # 2回目のアクセス: メモリから一括ロード
# 最適: 小規模データベース(<100MB)、ほとんどのキーへの頻繁なアクセス
db = NanaSQLite("small.db", bulk_load=True)
# 起動時に全データをロード
# 以降の読み込みはすべてメモリから(超高速)判断マトリクス:
| データベースサイズ | アクセスパターン | 推奨 |
|---|---|---|
| < 10MB | 読み込み重視 | bulk_load=True |
| 10-100MB | ほとんどのキーにアクセス | bulk_load=True |
| 100MB-1GB | 一部のキーにアクセス | bulk_load=False(デフォルト) |
| > 1GB | 任意のパターン | bulk_load=False(デフォルト) |
バッチ操作の使用
❌ アンチパターン: 個別の書き込み
# 遅い: 1000個の別々のトランザクション
for i in range(1000):
db[f"user_{i}"] = {"name": f"User{i}"}✅ ベストプラクティス: バッチ書き込み
# 速い: 単一トランザクション(10-100倍高速)
users = {f"user_{i}": {"name": f"User{i}"} for i in range(1000)}
db.batch_update(users)パフォーマンス比較:
| 操作 | 個別 | バッチ | 高速化 |
|---|---|---|---|
| 100書き込み | ~200ms | ~2ms | 100倍 |
| 1000書き込み | ~2000ms | ~15ms | 133倍 |
| 10000書き込み | ~20000ms | ~150ms | 133倍 |
SQLiteキャッシュサイズの最適化
cache_size_mbパラメータはSQLiteの内部ページキャッシュ(PRAGMA cache_size)を制御し、NanaSQLiteの辞書キャッシュではありません。これはSQLiteがメモリに保持するデータベースページ数に影響し、ディスクI/Oを高速化します。
# デフォルト: 64MB SQLiteページキャッシュ(ほとんどの場合に適切)
db = NanaSQLite("data.db")
# 大規模データセット: SQLiteページキャッシュを増加
db = NanaSQLite("large.db", cache_size_mb=256)
# メモリ制約: SQLiteページキャッシュを削減
db = NanaSQLite("data.db", cache_size_mb=32)ガイドライン:
- 小規模DB(<100MB): 32-64MB SQLiteキャッシュ
- 中規模DB(100MB-1GB): 128-256MB SQLiteキャッシュ
- 大規模DB(>1GB): 256-512MB SQLiteキャッシュ
注意
このパラメータはNanaSQLiteの内部辞書キャッシュ(_data)が使用するメモリには影響しません。辞書キャッシュはロードされた値をPythonメモリに保存します。これを制御するには、bulk_load=False(デフォルト)で遅延ロードを使用してください。
自動クリーンアップのためのコンテキストマネージャ
✅ 常にコンテキストマネージャを使用
with NanaSQLite("data.db") as db:
db["key"] = "value"
# 自動的にクローズされ、リソースが解放される❌ 手動管理を避ける
db = NanaSQLite("data.db")
db["key"] = "value"
db.close() # 忘れやすい!セキュリティガイドライン
SQLインジェクションの防止
✅ パラメータバインディングを使用
# 安全: パラメータは適切にエスケープされる
results = db.query(
table_name="users",
where="name = ?",
parameters=(user_input,)
)❌ ユーザー入力を連結しない
# 危険: SQLインジェクションの脆弱性
# 絶対にやらないでください!
db.execute(f"SELECT * FROM users WHERE name = '{user_input}'")ファイルパスの検証
import os
def safe_db_path(user_input: str) -> str:
"""ディレクトリトラバーサルを防ぐためにデータベースパスを検証"""
# パス区切り文字と相対パスを削除
if ".." in user_input or "/" in user_input or "\\" in user_input:
raise ValueError("無効なデータベースパス")
# 安全なディレクトリ内にあることを確認
safe_dir = "/var/lib/myapp/databases"
return os.path.join(safe_dir, f"{user_input}.db")
# 使用方法
db_path = safe_db_path(user_provided_name)
db = NanaSQLite(db_path)機密データの保護
# 平文でシークレットを保存しない
# ❌ 悪い例
db["config"] = {
"api_key": "sk-1234567890abcdef",
"password": "mypassword123"
}
# ✅ 良い例: 機密値を暗号化
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
encrypted_api_key = cipher.encrypt(b"sk-1234567890abcdef")
db["config"] = {
"api_key": encrypted_api_key.decode(),
# bcryptを使用: import bcrypt; bcrypt.hashpw(password.encode(), bcrypt.gensalt())
"password_hash": hash_password("mypassword123") # 実際のハッシュ化関数に置き換え
}ファイルパーミッション
import os
import stat
# 制限されたパーミッションでデータベースを作成
db = NanaSQLite("secure.db")
db.close()
# ファイルパーミッションを所有者のみの読み書きに設定
os.chmod("secure.db", stat.S_IRUSR | stat.S_IWUSR)エラーハンドリング
存在しないキーの適切な処理
✅ デフォルト値付きでget()を使用
# 推奨: 例外処理が不要
value = db.get("key", default="デフォルト値")✅ 必須キーにはtry/exceptを使用
try:
value = db["required_key"]
except KeyError:
logger.error("必須設定が欠落しています")
# ValueErrorを使用するか、独自のConfigurationError例外クラスを定義してください
raise ValueError("required_keyが欠落しています")データベースエラーの処理
import apsw
import logging
logger = logging.getLogger(__name__)
try:
with NanaSQLite("data.db") as db:
db.create_table("users", {
"id": "INTEGER PRIMARY KEY",
"email": "TEXT UNIQUE"
})
db.sql_insert("users", {"email": "[email protected]"})
except apsw.Error as e:
logger.error(f"データベースエラー: {e}")
# 適切に処理(再試行、フォールバック等)挿入前のデータ検証
def save_user(db: NanaSQLite, user_data: dict) -> bool:
"""検証付きでユーザーを保存"""
# 必須フィールドの検証
required = ["name", "email", "age"]
if not all(field in user_data for field in required):
raise ValueError(f"必須フィールドが欠落: {required}")
# データ型の検証
if not isinstance(user_data["age"], int):
raise TypeError("年齢は整数である必要があります")
if user_data["age"] < 0 or user_data["age"] > 150:
raise ValueError("無効な年齢")
# 保存
db[f"user_{user_data['email']}"] = user_data
return Trueリソース管理
Webアプリケーション用の接続プーリング
FastAPIの例
from fastapi import FastAPI, Depends
from nanasqlite import AsyncNanaSQLite
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動: データベース接続を作成
app.state.db = AsyncNanaSQLite("app.db", max_workers=10)
yield
# シャットダウン: データベースをクローズ
await app.state.db.close()
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncNanaSQLite:
"""データベースの依存性注入"""
return app.state.db
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncNanaSQLite = Depends(get_db)):
return await db.aget(f"user_{user_id}")メモリ管理
# 長時間実行プロセスでは、定期的に未使用のキャッシュをクリア
class CachedDB:
def __init__(self, db_path: str):
self.db = NanaSQLite(db_path)
self.access_count = 0
def get(self, key: str):
self.access_count += 1
# 10000操作ごとにキャッシュを更新
if self.access_count % 10000 == 0:
self.db.refresh() # キャッシュをクリア
return self.db.get(key)デザインパターン
関心事の分離
ひとつのデータベースファイル内で、異なるデータタイプごとにテーブルを分けることが推奨されます。.table() メソッドを使用することで、同じデータベース接続を共有しながら論理的にデータを分離できます。
# 異なる関心事に異なるテーブルを使用
class AppDatabase:
def __init__(self, db_path: str):
# メイン接続
self.db = NanaSQLite(db_path)
# サブテーブル(接続を共有するため効率的)
self.users = self.db.table("users")
self.sessions = self.db.table("sessions")
self.cache = self.db.table("cache")
self.config = self.db.table("config")
def close(self):
# 親を閉じればすべてクローズされます
self.db.close()
# 使用方法
app_db = AppDatabase("app.db")
app_db.users["alice"] = {"role": "admin"}
app_db.sessions["sess_123"] = {"user_id": "alice"}
app_db.close()リポジトリパターン
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class User:
id: str
name: str
email: str
age: int
class UserRepository:
def __init__(self, db: NanaSQLite):
self.db = db
def save(self, user: User) -> None:
self.db[f"user_{user.id}"] = {
"name": user.name,
"email": user.email,
"age": user.age
}
def find_by_id(self, user_id: str) -> Optional[User]:
data = self.db.get(f"user_{user_id}")
if data:
return User(id=user_id, **data)
return None
def find_all(self) -> List[User]:
users = []
for key in self.db.keys():
if key.startswith("user_"):
user_id = key[5:] # "user_"プレフィックスを削除
data = self.db[key]
users.append(User(id=user_id, **data))
return users
# 使用方法
with NanaSQLite("app.db") as db:
repo = UserRepository(db)
user = User(id="1", name="Alice", email="[email protected]", age=30)
repo.save(user)
found = repo.find_by_id("1")
print(found.name) # Aliceデータ保護: バックアップ & リストア (v1.3.4b1以降)
定期バックアップ
backup() を使用すると、アプリケーションを停止せずに定期スナップショットを作成できます:
import schedule
import time
from nanasqlite import NanaSQLite
db = NanaSQLite("production.db")
def daily_backup():
from datetime import date
db.backup(f"backups/production_{date.today()}.db")
print(f"Backup completed: {date.today()}")
schedule.every().day.at("02:00").do(daily_backup)
while True:
schedule.run_pending()
time.sleep(1)アップグレード前のスナップショット
スキーマ移行やデータ変換の前にスナップショットを取りましょう:
with NanaSQLite("app.db") as db:
# リスキーな操作の前にスナップショットを取得
db.backup("pre_migration_snapshot.db")
# マイグレーションを実行
db.execute("ALTER TABLE data ADD COLUMN legacy TEXT")
# ... マイグレーションロジック ...エラー時のロールバック
エラーが発生した場合に restore() で既知の正常な状態に戻します:
with NanaSQLite("app.db") as db:
db.backup("pre_operation.db")
try:
perform_bulk_update(db)
except Exception as e:
print(f"Error: {e} – rolling back")
db.restore("pre_operation.db")テスト
ユニットテスト
import pytest
import tempfile
import os
@pytest.fixture
def temp_db():
"""テスト用の一時データベースを作成"""
fd, path = tempfile.mkstemp(suffix=".db")
os.close(fd)
yield path
os.unlink(path)
def test_basic_operations(temp_db):
with NanaSQLite(temp_db) as db:
# 書き込みのテスト
db["test_key"] = {"value": 123}
# 読み込みのテスト
assert db["test_key"] == {"value": 123}
# 削除のテスト
del db["test_key"]
assert "test_key" not in db
def test_batch_operations(temp_db):
with NanaSQLite(temp_db) as db:
# バッチ書き込みのテスト
data = {f"key_{i}": i for i in range(100)}
db.batch_update(data)
assert len(db) == 100
assert db["key_50"] == 50モック
from unittest.mock import MagicMock, patch
def test_with_mock():
mock_db = MagicMock(spec=NanaSQLite)
mock_db.get.return_value = {"name": "テストユーザー"}
# データベースを使用する関数
def get_user_name(db, user_id):
user = db.get(f"user_{user_id}")
return user["name"] if user else None
result = get_user_name(mock_db, "123")
assert result == "テストユーザー"
mock_db.get.assert_called_once_with("user_123")まとめ
重要なポイント
- ✅ 小規模で頻繁にアクセスされるデータベースには
bulk_load=Trueを使用 - ✅ 100以上の書き込みには常にバッチ操作を使用
- ✅ 自動クリーンアップのためにコンテキストマネージャ(
with文)を使用 - ✅ SQLインジェクションを防ぐためにパラメータバインディングを使用
- ✅ ユーザー入力、特にファイルパスを検証
- ✅
get()とtry/exceptでエラーを適切に処理 - ✅ 異なるテーブルで関心事を分離
- ✅ 一時データベースでテスト
- ✅ 長時間実行プロセスでメモリ使用量を監視
- ✅ 非同期フレームワーク(FastAPI、aiohttp)には非同期版を使用
避けるべき一般的な落とし穴:
- ❌ 大規模データベース(>1GB)で
bulk_load=Trueを使用 - ❌ バッチ操作の代わりに個別の書き込み
- ❌ データベースのクローズを忘れる(
with文を使用) - ❌ 文字列連結によるSQLインジェクション
- ❌ 暗号化せずに機密データを保存
- ❌ KeyError例外を無視
- ❌ ユーザー入力を検証しない
詳細な例については以下を参照してください: