From 40501e13f1827d1f32ae0cbd0a7f46a93389351d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 21:46:01 +0900 Subject: [PATCH] refactor: extract SQLite persistence primitives into StorePersistenceMixin Move connection/transaction management and the generic _put/_get/_all row access plus evidence-read helpers into a mixin; CopyrighterStore now inherits it. Methods rely on the host class's self.db_path / self._write_lock. Behavior-preserving. sqlite_store.py 3368 -> 3072 lines. --- src/rights_filter/server/sqlite_store.py | 300 +---------------- src/rights_filter/server/store_persistence.py | 313 ++++++++++++++++++ 2 files changed, 315 insertions(+), 298 deletions(-) create mode 100644 src/rights_filter/server/store_persistence.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 0e4a17f..308bf17 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -2,14 +2,12 @@ from __future__ import annotations import base64 import hashlib -import html import json import os import re import shutil import sqlite3 import threading -from contextlib import contextmanager from dataclasses import replace from datetime import datetime from pathlib import Path @@ -62,6 +60,7 @@ from rights_filter.server.store_page_scrape import ( _normalized_remote_image_url, _search_result_direct_image_urls, ) +from rights_filter.server.store_persistence import StorePersistenceMixin from rights_filter.server.store_schema import ( _ensure_constrained_schema, _ensure_queue_schema, @@ -116,7 +115,7 @@ from rights_filter.server.store_url_utils import ( ) -class CopyrighterStore: +class CopyrighterStore(StorePersistenceMixin): def __init__( self, db_path: Path | str, @@ -3071,298 +3070,3 @@ class CopyrighterStore: for submission in self._all("submissions", queue_id=queue_id): self._rescore_submission(str(submission["id"])) - def _count(self, table: str) -> int: - _validate_table(table) - with self._connect() as conn: - return int(conn.execute(f"select count(*) from {table}").fetchone()[0]) - - def _connect(self) -> sqlite3.Connection: - conn = sqlite3.connect(self.db_path) - conn.execute("pragma foreign_keys = on") - # The server is a ThreadingHTTPServer: concurrent operator requests open - # their own connection. Without a busy timeout the second writer raises - # "database is locked" immediately; wait for the lock instead. - conn.execute("pragma busy_timeout = 5000") - conn.row_factory = sqlite3.Row - return conn - - @contextmanager - def _conn_ctx(self, conn: sqlite3.Connection | None = None): - # When a caller passes an open connection (inside _transaction), reuse it - # and let the caller own commit/rollback. Otherwise behave exactly like - # `with self._connect() as conn:` — sqlite3 commits on success / rolls - # back on error. - if conn is not None: - yield conn - return - with self._connect() as owned: - yield owned - - @contextmanager - def _transaction(self): - # A single connection + single commit for multi-step writes that must be - # atomic (e.g. a state change and its audit event), so a mid-sequence - # failure leaves no partial state. - conn = self._connect() - try: - yield conn - conn.commit() - except Exception: - conn.rollback() - raise - finally: - conn.close() - - def _put( - self, - table: str, - id_value: str, - payload: dict[str, Any], - *, - conn: sqlite3.Connection | None = None, - ) -> None: - _validate_payload(table, id_value, payload) - payload_json = json.dumps(payload, ensure_ascii=False) - with self._conn_ctx(conn) as conn: - if table == "submissions": - conn.execute( - """ - insert into submissions - (id, title, risk_score, risk_band, decision_status, submitted_epoch, queue_id, payload) - values (?, ?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - title = excluded.title, - risk_score = excluded.risk_score, - risk_band = excluded.risk_band, - decision_status = excluded.decision_status, - submitted_epoch = excluded.submitted_epoch, - queue_id = excluded.queue_id, - payload = excluded.payload - """, - ( - id_value, - str(payload.get("title", "")), - int(payload.get("riskScore", 0) or 0), - str(payload.get("riskBand", "low")), - str(payload.get("decisionStatus", "unreviewed")), - int(payload.get("submittedEpoch", 0) or 0), - str(payload.get("queueId", "")), - payload_json, - ), - ) - return - if table == "evidence": - conn.execute( - """ - insert into evidence - (id, submission_id, source, confidence, status, contributed, payload) - values (?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - submission_id = excluded.submission_id, - source = excluded.source, - confidence = excluded.confidence, - status = excluded.status, - contributed = excluded.contributed, - payload = excluded.payload - """, - ( - id_value, - str(payload["submission_id"]), - str(payload.get("source", "")), - # Clamp to the column's CHECK (confidence >= 0 and <= 1). - # Some sources (e.g. Google Vision web-entity relevance - # scores) are unbounded and would raise IntegrityError. - max(0.0, min(1.0, float(payload.get("confidence", 0) or 0))), - str(payload.get("status", "active")), - 1 if bool(payload.get("contributed", True)) else 0, - payload_json, - ), - ) - return - if table == "providers": - conn.execute( - """ - insert into providers - (id, name, enabled, usage, quota, payload) - values (?, ?, ?, ?, ?, ?) - on conflict(id) do update set - name = excluded.name, - enabled = excluded.enabled, - usage = excluded.usage, - quota = excluded.quota, - payload = excluded.payload - """, - ( - id_value, - str(payload.get("name", "")), - 1 if bool(payload.get("enabled", False)) else 0, - int(payload.get("usage", 0) or 0), - int(payload.get("quota", 0) or 0), - payload_json, - ), - ) - return - if table == "knowledge_entries": - conn.execute( - """ - insert into knowledge_entries - (id, name, entry_type, entry_status, active, source_submission_id, payload) - values (?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - name = excluded.name, - entry_type = excluded.entry_type, - entry_status = excluded.entry_status, - active = excluded.active, - source_submission_id = excluded.source_submission_id, - payload = excluded.payload - """, - ( - id_value, - str(payload.get("name", "")), - str(payload.get("type", "other")), - str(payload.get("entryStatus", "confirmed")), - 1 if bool(payload.get("active", True)) else 0, - str(payload.get("sourceSubmissionId", "")), - payload_json, - ), - ) - return - if table == "collection_candidates": - conn.execute( - """ - insert into collection_candidates - (id, provider, query, status, collected_epoch, payload) - values (?, ?, ?, ?, ?, ?) - on conflict(id) do update set - provider = excluded.provider, - query = excluded.query, - status = excluded.status, - collected_epoch = excluded.collected_epoch, - payload = excluded.payload - """, - ( - id_value, - str(payload.get("provider", "")), - str(payload.get("query", "")), - str(payload.get("status", "candidate")), - int(payload.get("collectedEpoch", 0) or 0), - payload_json, - ), - ) - return - if table == "corrections": - conn.execute( - """ - insert into corrections - (id, decision_id, status, payload) - values (?, ?, ?, ?) - on conflict(id) do update set - decision_id = excluded.decision_id, - status = excluded.status, - payload = excluded.payload - """, - ( - id_value, - str(payload.get("decisionId", payload.get("decision_id", ""))), - str(payload.get("status", "")), - payload_json, - ), - ) - return - conn.execute( - f"insert or replace into {table} (id, payload) values (?, ?)", - (id_value, payload_json), - ) - - def _get( - self, - table: str, - id_value: str, - *, - conn: sqlite3.Connection | None = None, - ) -> dict[str, Any]: - _validate_table(table) - with self._conn_ctx(conn) as conn: - row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone() - if row is None: - raise KeyError(id_value) - return json.loads(row["payload"]) - - def _apply_provider_usage_delta( - self, provider_id: str, count: int, payload: dict[str, Any] - ) -> None: - # Re-read current usage under the write lock and apply the delta so - # concurrent provider calls don't lose increments. The rest of `payload` - # (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for - # status fields. - with self._write_lock: - current = self._get("providers", provider_id) - payload["usage"] = int(current.get("usage", 0) or 0) + int(count) - self._put("providers", provider_id, payload) - - def _clear_collection_candidates(self) -> None: - with self._connect() as conn: - conn.execute("delete from collection_candidates") - - def _all(self, table: str, queue_id: str | None = None) -> list[dict[str, Any]]: - _validate_table(table) - with self._connect() as conn: - if table == "submissions" and queue_id is not None: - rows = conn.execute( - "select payload from submissions where queue_id = ? order by id", - (queue_id,), - ).fetchall() - else: - rows = conn.execute(f"select payload from {table} order by id").fetchall() - values = [json.loads(row["payload"]) for row in rows] - if table == "providers": - order = {"internal": 0, "naver": 1, "google": 2, "google_search": 3, "llm": 4} - values.sort(key=lambda item: order.get(item["id"], 99)) - if table == "collection_candidates": - values.sort(key=lambda item: int(item.get("collectedEpoch", 0)), reverse=True) - return values - - def _evidence_rows(self, queue_id: str | None = None): - with self._connect() as conn: - if queue_id is None: - rows = conn.execute("select submission_id, payload from evidence order by id").fetchall() - else: - rows = conn.execute( - """ - select e.submission_id, e.payload - from evidence e - join submissions s on s.id = e.submission_id - where s.queue_id = ? - order by e.id - """, - (queue_id,), - ).fetchall() - return rows - - def _evidence_for_submission(self, submission_id: str) -> list[dict[str, Any]]: - # Read only this submission's evidence. _rescore_submission previously - # called _evidence_by_submission(), which read and JSON-parsed the ENTIRE - # evidence table on every single-submission rescore (a hot path inside the - # seed / auto-search / similarity loops). - with self._connect() as conn: - rows = conn.execute( - "select payload from evidence where submission_id = ? order by id", - (submission_id,), - ).fetchall() - evidence: list[dict[str, Any]] = [] - for row in rows: - payload = json.loads(row["payload"]) - payload.pop("submission_id", None) - evidence.append(payload) - return evidence - - def _evidence_by_submission(self, queue_id: str | None = None) -> dict[str, list[dict[str, Any]]]: - rows = self._evidence_rows(queue_id=queue_id) - grouped: dict[str, list[dict[str, Any]]] = {} - for row in rows: - payload = json.loads(row["payload"]) - payload.pop("submission_id", None) - grouped.setdefault(row["submission_id"], []).append(payload) - return grouped - - diff --git a/src/rights_filter/server/store_persistence.py b/src/rights_filter/server/store_persistence.py new file mode 100644 index 0000000..c09707a --- /dev/null +++ b/src/rights_filter/server/store_persistence.py @@ -0,0 +1,313 @@ +"""SQLite persistence primitives for CopyrighterStore, as a mixin. + +Connection/transaction management and the generic _put/_get/_all row access plus +the evidence-read helpers. Mixed into CopyrighterStore; methods rely on the host +class providing `self.db_path` and `self._write_lock`. Behavior unchanged. +""" + +from __future__ import annotations + +import json +import sqlite3 +from contextlib import contextmanager +from typing import Any + +from rights_filter.server.store_serialization import _validate_payload, _validate_table + + +class StorePersistenceMixin: + def _count(self, table: str) -> int: + _validate_table(table) + with self._connect() as conn: + return int(conn.execute(f"select count(*) from {table}").fetchone()[0]) + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.execute("pragma foreign_keys = on") + # The server is a ThreadingHTTPServer: concurrent operator requests open + # their own connection. Without a busy timeout the second writer raises + # "database is locked" immediately; wait for the lock instead. + conn.execute("pragma busy_timeout = 5000") + conn.row_factory = sqlite3.Row + return conn + + @contextmanager + def _conn_ctx(self, conn: sqlite3.Connection | None = None): + # When a caller passes an open connection (inside _transaction), reuse it + # and let the caller own commit/rollback. Otherwise behave exactly like + # `with self._connect() as conn:` — sqlite3 commits on success / rolls + # back on error. + if conn is not None: + yield conn + return + with self._connect() as owned: + yield owned + + @contextmanager + def _transaction(self): + # A single connection + single commit for multi-step writes that must be + # atomic (e.g. a state change and its audit event), so a mid-sequence + # failure leaves no partial state. + conn = self._connect() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + def _put( + self, + table: str, + id_value: str, + payload: dict[str, Any], + *, + conn: sqlite3.Connection | None = None, + ) -> None: + _validate_payload(table, id_value, payload) + payload_json = json.dumps(payload, ensure_ascii=False) + with self._conn_ctx(conn) as conn: + if table == "submissions": + conn.execute( + """ + insert into submissions + (id, title, risk_score, risk_band, decision_status, submitted_epoch, queue_id, payload) + values (?, ?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + title = excluded.title, + risk_score = excluded.risk_score, + risk_band = excluded.risk_band, + decision_status = excluded.decision_status, + submitted_epoch = excluded.submitted_epoch, + queue_id = excluded.queue_id, + payload = excluded.payload + """, + ( + id_value, + str(payload.get("title", "")), + int(payload.get("riskScore", 0) or 0), + str(payload.get("riskBand", "low")), + str(payload.get("decisionStatus", "unreviewed")), + int(payload.get("submittedEpoch", 0) or 0), + str(payload.get("queueId", "")), + payload_json, + ), + ) + return + if table == "evidence": + conn.execute( + """ + insert into evidence + (id, submission_id, source, confidence, status, contributed, payload) + values (?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + submission_id = excluded.submission_id, + source = excluded.source, + confidence = excluded.confidence, + status = excluded.status, + contributed = excluded.contributed, + payload = excluded.payload + """, + ( + id_value, + str(payload["submission_id"]), + str(payload.get("source", "")), + # Clamp to the column's CHECK (confidence >= 0 and <= 1). + # Some sources (e.g. Google Vision web-entity relevance + # scores) are unbounded and would raise IntegrityError. + max(0.0, min(1.0, float(payload.get("confidence", 0) or 0))), + str(payload.get("status", "active")), + 1 if bool(payload.get("contributed", True)) else 0, + payload_json, + ), + ) + return + if table == "providers": + conn.execute( + """ + insert into providers + (id, name, enabled, usage, quota, payload) + values (?, ?, ?, ?, ?, ?) + on conflict(id) do update set + name = excluded.name, + enabled = excluded.enabled, + usage = excluded.usage, + quota = excluded.quota, + payload = excluded.payload + """, + ( + id_value, + str(payload.get("name", "")), + 1 if bool(payload.get("enabled", False)) else 0, + int(payload.get("usage", 0) or 0), + int(payload.get("quota", 0) or 0), + payload_json, + ), + ) + return + if table == "knowledge_entries": + conn.execute( + """ + insert into knowledge_entries + (id, name, entry_type, entry_status, active, source_submission_id, payload) + values (?, ?, ?, ?, ?, ?, ?) + on conflict(id) do update set + name = excluded.name, + entry_type = excluded.entry_type, + entry_status = excluded.entry_status, + active = excluded.active, + source_submission_id = excluded.source_submission_id, + payload = excluded.payload + """, + ( + id_value, + str(payload.get("name", "")), + str(payload.get("type", "other")), + str(payload.get("entryStatus", "confirmed")), + 1 if bool(payload.get("active", True)) else 0, + str(payload.get("sourceSubmissionId", "")), + payload_json, + ), + ) + return + if table == "collection_candidates": + conn.execute( + """ + insert into collection_candidates + (id, provider, query, status, collected_epoch, payload) + values (?, ?, ?, ?, ?, ?) + on conflict(id) do update set + provider = excluded.provider, + query = excluded.query, + status = excluded.status, + collected_epoch = excluded.collected_epoch, + payload = excluded.payload + """, + ( + id_value, + str(payload.get("provider", "")), + str(payload.get("query", "")), + str(payload.get("status", "candidate")), + int(payload.get("collectedEpoch", 0) or 0), + payload_json, + ), + ) + return + if table == "corrections": + conn.execute( + """ + insert into corrections + (id, decision_id, status, payload) + values (?, ?, ?, ?) + on conflict(id) do update set + decision_id = excluded.decision_id, + status = excluded.status, + payload = excluded.payload + """, + ( + id_value, + str(payload.get("decisionId", payload.get("decision_id", ""))), + str(payload.get("status", "")), + payload_json, + ), + ) + return + conn.execute( + f"insert or replace into {table} (id, payload) values (?, ?)", + (id_value, payload_json), + ) + + def _get( + self, + table: str, + id_value: str, + *, + conn: sqlite3.Connection | None = None, + ) -> dict[str, Any]: + _validate_table(table) + with self._conn_ctx(conn) as conn: + row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone() + if row is None: + raise KeyError(id_value) + return json.loads(row["payload"]) + + def _apply_provider_usage_delta( + self, provider_id: str, count: int, payload: dict[str, Any] + ) -> None: + # Re-read current usage under the write lock and apply the delta so + # concurrent provider calls don't lose increments. The rest of `payload` + # (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for + # status fields. + with self._write_lock: + current = self._get("providers", provider_id) + payload["usage"] = int(current.get("usage", 0) or 0) + int(count) + self._put("providers", provider_id, payload) + + def _clear_collection_candidates(self) -> None: + with self._connect() as conn: + conn.execute("delete from collection_candidates") + + def _all(self, table: str, queue_id: str | None = None) -> list[dict[str, Any]]: + _validate_table(table) + with self._connect() as conn: + if table == "submissions" and queue_id is not None: + rows = conn.execute( + "select payload from submissions where queue_id = ? order by id", + (queue_id,), + ).fetchall() + else: + rows = conn.execute(f"select payload from {table} order by id").fetchall() + values = [json.loads(row["payload"]) for row in rows] + if table == "providers": + order = {"internal": 0, "naver": 1, "google": 2, "google_search": 3, "llm": 4} + values.sort(key=lambda item: order.get(item["id"], 99)) + if table == "collection_candidates": + values.sort(key=lambda item: int(item.get("collectedEpoch", 0)), reverse=True) + return values + + def _evidence_rows(self, queue_id: str | None = None): + with self._connect() as conn: + if queue_id is None: + rows = conn.execute("select submission_id, payload from evidence order by id").fetchall() + else: + rows = conn.execute( + """ + select e.submission_id, e.payload + from evidence e + join submissions s on s.id = e.submission_id + where s.queue_id = ? + order by e.id + """, + (queue_id,), + ).fetchall() + return rows + + def _evidence_for_submission(self, submission_id: str) -> list[dict[str, Any]]: + # Read only this submission's evidence. _rescore_submission previously + # called _evidence_by_submission(), which read and JSON-parsed the ENTIRE + # evidence table on every single-submission rescore (a hot path inside the + # seed / auto-search / similarity loops). + with self._connect() as conn: + rows = conn.execute( + "select payload from evidence where submission_id = ? order by id", + (submission_id,), + ).fetchall() + evidence: list[dict[str, Any]] = [] + for row in rows: + payload = json.loads(row["payload"]) + payload.pop("submission_id", None) + evidence.append(payload) + return evidence + + def _evidence_by_submission(self, queue_id: str | None = None) -> dict[str, list[dict[str, Any]]]: + rows = self._evidence_rows(queue_id=queue_id) + grouped: dict[str, list[dict[str, Any]]] = {} + for row in rows: + payload = json.loads(row["payload"]) + payload.pop("submission_id", None) + grouped.setdefault(row["submission_id"], []).append(payload) + return grouped + +