fix: SQLite concurrency safety and atomic decision writes

Enable WAL + busy_timeout in _connect (ThreadingHTTPServer concurrent
operators no longer hit 'database is locked'), add a _transaction helper and
thread an optional conn through _put/_get/add_audit_event so record_decision
commits its status change, watchlist entry, and audit event atomically.
This commit is contained in:
유창욱 2026-06-20 18:19:08 +09:00
parent e9a15e8110
commit 20a6f55408
2 changed files with 122 additions and 21 deletions

View file

@ -10,6 +10,7 @@ import re
import shutil import shutil
import sqlite3 import sqlite3
from html.parser import HTMLParser from html.parser import HTMLParser
from contextlib import contextmanager
from dataclasses import replace from dataclasses import replace
from datetime import datetime from datetime import datetime
from io import BytesIO from io import BytesIO
@ -439,6 +440,10 @@ class CopyrighterStore:
def initialize(self) -> None: def initialize(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn: with self._connect() as conn:
# WAL lets readers proceed during a write (operators browsing while a
# rerun writes) instead of blocking on the rollback-journal lock.
# Persistent per DB file, so set once at init.
conn.execute("pragma journal_mode = wal")
conn.executescript( conn.executescript(
""" """
create table if not exists submissions ( create table if not exists submissions (
@ -1001,19 +1006,21 @@ class CopyrighterStore:
if decision == "rejected" and not memo.strip(): if decision == "rejected" and not memo.strip():
raise ValueError("reject decision requires memo") raise ValueError("reject decision requires memo")
submission = self._get("submissions", submission_id) with self._transaction() as conn:
submission = self._get("submissions", submission_id, conn=conn)
submission["decisionStatus"] = decision submission["decisionStatus"] = decision
submission["applicantStatus"] = "승인 대기" if decision == "approved" else "반려 예정" if decision == "rejected" else "검토 중" submission["applicantStatus"] = "승인 대기" if decision == "approved" else "반려 예정" if decision == "rejected" else "검토 중"
self._put("submissions", submission_id, submission) self._put("submissions", submission_id, submission, conn=conn)
if decision in {"held", "rejected"}: if decision in {"held", "rejected"}:
self._create_or_update_watchlist_entry(submission_id, decision, memo, image_store) self._create_or_update_watchlist_entry(submission_id, decision, memo, image_store, conn=conn)
self.add_audit_event( self.add_audit_event(
actor="rights.ops", actor="rights.ops",
event="Operator decision created", event="Operator decision created",
object_id=submission_id, object_id=submission_id,
change=f"{decision} · {memo or 'memo optional'}", change=f"{decision} · {memo or 'memo optional'}",
conn=conn,
) )
return self.review(submission_id) return self.review(submission_id)
@ -1154,8 +1161,10 @@ class CopyrighterStore:
decision: str, decision: str,
memo: str, memo: str,
image_store: LocalSubmissionImageStore | None, image_store: LocalSubmissionImageStore | None,
*,
conn: sqlite3.Connection | None = None,
) -> None: ) -> None:
submission = self._get("submissions", submission_id) submission = self._get("submissions", submission_id, conn=conn)
evidence = self._evidence_by_submission().get(submission_id, []) evidence = self._evidence_by_submission().get(submission_id, [])
selected_evidence = _watchlist_source_evidence(evidence) selected_evidence = _watchlist_source_evidence(evidence)
selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")] selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")]
@ -1163,7 +1172,7 @@ class CopyrighterStore:
entry_id = _stable_id("kb-watchlist", submission_id) entry_id = _stable_id("kb-watchlist", submission_id)
try: try:
existing = self._get("knowledge_entries", entry_id) existing = self._get("knowledge_entries", entry_id, conn=conn)
except KeyError: except KeyError:
existing = {} existing = {}
@ -1195,7 +1204,7 @@ class CopyrighterStore:
"matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")), "matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")),
"lastOriginDecisionAt": _now_label(), "lastOriginDecisionAt": _now_label(),
} }
self._put("knowledge_entries", entry_id, entry) self._put("knowledge_entries", entry_id, entry, conn=conn)
def _watchlist_fingerprints( def _watchlist_fingerprints(
self, self,
@ -1830,7 +1839,15 @@ class CopyrighterStore:
raise ValueError("face crop media path points outside image store") raise ValueError("face crop media path points outside image store")
return path return path
def add_audit_event(self, actor: str, event: str, object_id: str, change: str) -> None: def add_audit_event(
self,
actor: str,
event: str,
object_id: str,
change: str,
*,
conn: sqlite3.Connection | None = None,
) -> None:
payload = { payload = {
"timestamp": _now_label(), "timestamp": _now_label(),
"actor": actor, "actor": actor,
@ -1838,7 +1855,7 @@ class CopyrighterStore:
"object": object_id, "object": object_id,
"change": change, "change": change,
} }
with self._connect() as conn: with self._conn_ctx(conn) as conn:
conn.execute( conn.execute(
""" """
insert into audit_events (timestamp, actor, event, object_ref, payload) insert into audit_events (timestamp, actor, event, object_ref, payload)
@ -3248,13 +3265,51 @@ class CopyrighterStore:
def _connect(self) -> sqlite3.Connection: def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path) conn = sqlite3.connect(self.db_path)
conn.execute("pragma foreign_keys = on") conn.execute("pragma foreign_keys = on")
# The server is a ThreadingHTTPServer: concurrent operator requests open
# their own connection. Without a busy timeout the second writer raises
# "database is locked" immediately; wait for the lock instead.
conn.execute("pragma busy_timeout = 5000")
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return conn return conn
def _put(self, table: str, id_value: str, payload: dict[str, Any]) -> None: @contextmanager
def _conn_ctx(self, conn: sqlite3.Connection | None = None):
# When a caller passes an open connection (inside _transaction), reuse it
# and let the caller own commit/rollback. Otherwise behave exactly like
# `with self._connect() as conn:` — sqlite3 commits on success / rolls
# back on error.
if conn is not None:
yield conn
return
with self._connect() as owned:
yield owned
@contextmanager
def _transaction(self):
# A single connection + single commit for multi-step writes that must be
# atomic (e.g. a state change and its audit event), so a mid-sequence
# failure leaves no partial state.
conn = self._connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _put(
self,
table: str,
id_value: str,
payload: dict[str, Any],
*,
conn: sqlite3.Connection | None = None,
) -> None:
_validate_payload(table, id_value, payload) _validate_payload(table, id_value, payload)
payload_json = json.dumps(payload, ensure_ascii=False) payload_json = json.dumps(payload, ensure_ascii=False)
with self._connect() as conn: with self._conn_ctx(conn) as conn:
if table == "submissions": if table == "submissions":
conn.execute( conn.execute(
""" """
@ -3405,9 +3460,15 @@ class CopyrighterStore:
(id_value, payload_json), (id_value, payload_json),
) )
def _get(self, table: str, id_value: str) -> dict[str, Any]: def _get(
self,
table: str,
id_value: str,
*,
conn: sqlite3.Connection | None = None,
) -> dict[str, Any]:
_validate_table(table) _validate_table(table)
with self._connect() as conn: with self._conn_ctx(conn) as conn:
row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone() row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone()
if row is None: if row is None:
raise KeyError(id_value) raise KeyError(id_value)

View file

@ -7989,3 +7989,43 @@ def test_sqlite_store_runs_auto_search_for_existing_submission(tmp_path: Path):
history_providers = {item["provider"] for item in review["queryHistory"]} history_providers = {item["provider"] for item in review["queryHistory"]}
assert "naver" in history_providers assert "naver" in history_providers
assert "google_search" in history_providers assert "google_search" in history_providers
def test_connect_uses_wal_and_busy_timeout(tmp_path: Path):
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
with store._connect() as conn:
assert conn.execute("pragma journal_mode").fetchone()[0].lower() == "wal"
assert int(conn.execute("pragma busy_timeout").fetchone()[0]) >= 5000
def test_record_decision_rolls_back_when_audit_fails(tmp_path: Path, monkeypatch):
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
store._put(
"submissions",
"SUB-1",
{
"id": "SUB-1",
"title": "t",
"asset": "",
"riskScore": 0,
"riskBand": "low",
"decisionStatus": "unreviewed",
"providerState": {},
"fileFacts": {},
"evidence": [],
},
)
def boom(*args, **kwargs):
raise RuntimeError("audit down")
monkeypatch.setattr(store, "add_audit_event", boom)
with pytest.raises(RuntimeError):
store.record_decision("SUB-1", "held", "memo")
# The status change must roll back atomically with the failed audit write.
assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed"
with pytest.raises(KeyError):
store._get("knowledge_entries", sqlite_store_module._stable_id("kb-watchlist", "SUB-1"))