From 20a6f55408b5a63065c54ced90d1f217975838b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 18:19:08 +0900 Subject: [PATCH] fix: SQLite concurrency safety and atomic decision writes Enable WAL + busy_timeout in _connect (ThreadingHTTPServer concurrent operators no longer hit 'database is locked'), add a _transaction helper and thread an optional conn through _put/_get/add_audit_event so record_decision commits its status change, watchlist entry, and audit event atomically. --- src/rights_filter/server/sqlite_store.py | 103 ++++++++++++++---- .../rights_filter/server/test_sqlite_store.py | 40 +++++++ 2 files changed, 122 insertions(+), 21 deletions(-) diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index e7a5ee4..da33143 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -10,6 +10,7 @@ import re import shutil import sqlite3 from html.parser import HTMLParser +from contextlib import contextmanager from dataclasses import replace from datetime import datetime from io import BytesIO @@ -439,6 +440,10 @@ class CopyrighterStore: def initialize(self) -> None: self.db_path.parent.mkdir(parents=True, exist_ok=True) with self._connect() as conn: + # WAL lets readers proceed during a write (operators browsing while a + # rerun writes) instead of blocking on the rollback-journal lock. + # Persistent per DB file, so set once at init. + conn.execute("pragma journal_mode = wal") conn.executescript( """ create table if not exists submissions ( @@ -1001,20 +1006,22 @@ class CopyrighterStore: if decision == "rejected" and not memo.strip(): raise ValueError("reject decision requires memo") - submission = self._get("submissions", submission_id) - submission["decisionStatus"] = decision - submission["applicantStatus"] = "승인 대기" if decision == "approved" else "반려 예정" if decision == "rejected" else "검토 중" - self._put("submissions", submission_id, submission) + with self._transaction() as conn: + submission = self._get("submissions", submission_id, conn=conn) + submission["decisionStatus"] = decision + submission["applicantStatus"] = "승인 대기" if decision == "approved" else "반려 예정" if decision == "rejected" else "검토 중" + self._put("submissions", submission_id, submission, conn=conn) - if decision in {"held", "rejected"}: - self._create_or_update_watchlist_entry(submission_id, decision, memo, image_store) + if decision in {"held", "rejected"}: + self._create_or_update_watchlist_entry(submission_id, decision, memo, image_store, conn=conn) - self.add_audit_event( - actor="rights.ops", - event="Operator decision created", - object_id=submission_id, - change=f"{decision} · {memo or 'memo optional'}", - ) + self.add_audit_event( + actor="rights.ops", + event="Operator decision created", + object_id=submission_id, + change=f"{decision} · {memo or 'memo optional'}", + conn=conn, + ) return self.review(submission_id) def mark_evidence_status( @@ -1154,8 +1161,10 @@ class CopyrighterStore: decision: str, memo: str, image_store: LocalSubmissionImageStore | None, + *, + conn: sqlite3.Connection | None = None, ) -> None: - submission = self._get("submissions", submission_id) + submission = self._get("submissions", submission_id, conn=conn) evidence = self._evidence_by_submission().get(submission_id, []) selected_evidence = _watchlist_source_evidence(evidence) selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")] @@ -1163,7 +1172,7 @@ class CopyrighterStore: entry_id = _stable_id("kb-watchlist", submission_id) try: - existing = self._get("knowledge_entries", entry_id) + existing = self._get("knowledge_entries", entry_id, conn=conn) except KeyError: existing = {} @@ -1195,7 +1204,7 @@ class CopyrighterStore: "matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")), "lastOriginDecisionAt": _now_label(), } - self._put("knowledge_entries", entry_id, entry) + self._put("knowledge_entries", entry_id, entry, conn=conn) def _watchlist_fingerprints( self, @@ -1830,7 +1839,15 @@ class CopyrighterStore: raise ValueError("face crop media path points outside image store") return path - def add_audit_event(self, actor: str, event: str, object_id: str, change: str) -> None: + def add_audit_event( + self, + actor: str, + event: str, + object_id: str, + change: str, + *, + conn: sqlite3.Connection | None = None, + ) -> None: payload = { "timestamp": _now_label(), "actor": actor, @@ -1838,7 +1855,7 @@ class CopyrighterStore: "object": object_id, "change": change, } - with self._connect() as conn: + with self._conn_ctx(conn) as conn: conn.execute( """ insert into audit_events (timestamp, actor, event, object_ref, payload) @@ -3248,13 +3265,51 @@ class CopyrighterStore: def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) conn.execute("pragma foreign_keys = on") + # The server is a ThreadingHTTPServer: concurrent operator requests open + # their own connection. Without a busy timeout the second writer raises + # "database is locked" immediately; wait for the lock instead. + conn.execute("pragma busy_timeout = 5000") conn.row_factory = sqlite3.Row return conn - def _put(self, table: str, id_value: str, payload: dict[str, Any]) -> None: + @contextmanager + def _conn_ctx(self, conn: sqlite3.Connection | None = None): + # When a caller passes an open connection (inside _transaction), reuse it + # and let the caller own commit/rollback. Otherwise behave exactly like + # `with self._connect() as conn:` — sqlite3 commits on success / rolls + # back on error. + if conn is not None: + yield conn + return + with self._connect() as owned: + yield owned + + @contextmanager + def _transaction(self): + # A single connection + single commit for multi-step writes that must be + # atomic (e.g. a state change and its audit event), so a mid-sequence + # failure leaves no partial state. + conn = self._connect() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + def _put( + self, + table: str, + id_value: str, + payload: dict[str, Any], + *, + conn: sqlite3.Connection | None = None, + ) -> None: _validate_payload(table, id_value, payload) payload_json = json.dumps(payload, ensure_ascii=False) - with self._connect() as conn: + with self._conn_ctx(conn) as conn: if table == "submissions": conn.execute( """ @@ -3405,9 +3460,15 @@ class CopyrighterStore: (id_value, payload_json), ) - def _get(self, table: str, id_value: str) -> dict[str, Any]: + def _get( + self, + table: str, + id_value: str, + *, + conn: sqlite3.Connection | None = None, + ) -> dict[str, Any]: _validate_table(table) - with self._connect() as conn: + with self._conn_ctx(conn) as conn: row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone() if row is None: raise KeyError(id_value) diff --git a/tests/rights_filter/server/test_sqlite_store.py b/tests/rights_filter/server/test_sqlite_store.py index 42f9bab..f5e016c 100644 --- a/tests/rights_filter/server/test_sqlite_store.py +++ b/tests/rights_filter/server/test_sqlite_store.py @@ -7989,3 +7989,43 @@ def test_sqlite_store_runs_auto_search_for_existing_submission(tmp_path: Path): history_providers = {item["provider"] for item in review["queryHistory"]} assert "naver" in history_providers assert "google_search" in history_providers + + +def test_connect_uses_wal_and_busy_timeout(tmp_path: Path): + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + with store._connect() as conn: + assert conn.execute("pragma journal_mode").fetchone()[0].lower() == "wal" + assert int(conn.execute("pragma busy_timeout").fetchone()[0]) >= 5000 + + +def test_record_decision_rolls_back_when_audit_fails(tmp_path: Path, monkeypatch): + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + store._put( + "submissions", + "SUB-1", + { + "id": "SUB-1", + "title": "t", + "asset": "", + "riskScore": 0, + "riskBand": "low", + "decisionStatus": "unreviewed", + "providerState": {}, + "fileFacts": {}, + "evidence": [], + }, + ) + + def boom(*args, **kwargs): + raise RuntimeError("audit down") + + monkeypatch.setattr(store, "add_audit_event", boom) + with pytest.raises(RuntimeError): + store.record_decision("SUB-1", "held", "memo") + + # The status change must roll back atomically with the failed audit write. + assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed" + with pytest.raises(KeyError): + store._get("knowledge_entries", sqlite_store_module._stable_id("kb-watchlist", "SUB-1"))