refactor: extract SQLite persistence primitives into StorePersistenceMixin

Move connection/transaction management and the generic _put/_get/_all row access
plus evidence-read helpers into a mixin; CopyrighterStore now inherits it. Methods
rely on the host class's self.db_path / self._write_lock. Behavior-preserving.
sqlite_store.py 3368 -> 3072 lines.
This commit is contained in:
유창욱 2026-06-20 21:46:01 +09:00
parent 3be7b016ce
commit 40501e13f1
2 changed files with 315 additions and 298 deletions

View file

@ -2,14 +2,12 @@ from __future__ import annotations
import base64
import hashlib
import html
import json
import os
import re
import shutil
import sqlite3
import threading
from contextlib import contextmanager
from dataclasses import replace
from datetime import datetime
from pathlib import Path
@ -62,6 +60,7 @@ from rights_filter.server.store_page_scrape import (
_normalized_remote_image_url,
_search_result_direct_image_urls,
)
from rights_filter.server.store_persistence import StorePersistenceMixin
from rights_filter.server.store_schema import (
_ensure_constrained_schema,
_ensure_queue_schema,
@ -116,7 +115,7 @@ from rights_filter.server.store_url_utils import (
)
class CopyrighterStore:
class CopyrighterStore(StorePersistenceMixin):
def __init__(
self,
db_path: Path | str,
@ -3071,298 +3070,3 @@ class CopyrighterStore:
for submission in self._all("submissions", queue_id=queue_id):
self._rescore_submission(str(submission["id"]))
def _count(self, table: str) -> int:
_validate_table(table)
with self._connect() as conn:
return int(conn.execute(f"select count(*) from {table}").fetchone()[0])
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("pragma foreign_keys = on")
# The server is a ThreadingHTTPServer: concurrent operator requests open
# their own connection. Without a busy timeout the second writer raises
# "database is locked" immediately; wait for the lock instead.
conn.execute("pragma busy_timeout = 5000")
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _conn_ctx(self, conn: sqlite3.Connection | None = None):
# When a caller passes an open connection (inside _transaction), reuse it
# and let the caller own commit/rollback. Otherwise behave exactly like
# `with self._connect() as conn:` — sqlite3 commits on success / rolls
# back on error.
if conn is not None:
yield conn
return
with self._connect() as owned:
yield owned
@contextmanager
def _transaction(self):
# A single connection + single commit for multi-step writes that must be
# atomic (e.g. a state change and its audit event), so a mid-sequence
# failure leaves no partial state.
conn = self._connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _put(
self,
table: str,
id_value: str,
payload: dict[str, Any],
*,
conn: sqlite3.Connection | None = None,
) -> None:
_validate_payload(table, id_value, payload)
payload_json = json.dumps(payload, ensure_ascii=False)
with self._conn_ctx(conn) as conn:
if table == "submissions":
conn.execute(
"""
insert into submissions
(id, title, risk_score, risk_band, decision_status, submitted_epoch, queue_id, payload)
values (?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
title = excluded.title,
risk_score = excluded.risk_score,
risk_band = excluded.risk_band,
decision_status = excluded.decision_status,
submitted_epoch = excluded.submitted_epoch,
queue_id = excluded.queue_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("title", "")),
int(payload.get("riskScore", 0) or 0),
str(payload.get("riskBand", "low")),
str(payload.get("decisionStatus", "unreviewed")),
int(payload.get("submittedEpoch", 0) or 0),
str(payload.get("queueId", "")),
payload_json,
),
)
return
if table == "evidence":
conn.execute(
"""
insert into evidence
(id, submission_id, source, confidence, status, contributed, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
submission_id = excluded.submission_id,
source = excluded.source,
confidence = excluded.confidence,
status = excluded.status,
contributed = excluded.contributed,
payload = excluded.payload
""",
(
id_value,
str(payload["submission_id"]),
str(payload.get("source", "")),
# Clamp to the column's CHECK (confidence >= 0 and <= 1).
# Some sources (e.g. Google Vision web-entity relevance
# scores) are unbounded and would raise IntegrityError.
max(0.0, min(1.0, float(payload.get("confidence", 0) or 0))),
str(payload.get("status", "active")),
1 if bool(payload.get("contributed", True)) else 0,
payload_json,
),
)
return
if table == "providers":
conn.execute(
"""
insert into providers
(id, name, enabled, usage, quota, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
enabled = excluded.enabled,
usage = excluded.usage,
quota = excluded.quota,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
1 if bool(payload.get("enabled", False)) else 0,
int(payload.get("usage", 0) or 0),
int(payload.get("quota", 0) or 0),
payload_json,
),
)
return
if table == "knowledge_entries":
conn.execute(
"""
insert into knowledge_entries
(id, name, entry_type, entry_status, active, source_submission_id, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
entry_type = excluded.entry_type,
entry_status = excluded.entry_status,
active = excluded.active,
source_submission_id = excluded.source_submission_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
str(payload.get("type", "other")),
str(payload.get("entryStatus", "confirmed")),
1 if bool(payload.get("active", True)) else 0,
str(payload.get("sourceSubmissionId", "")),
payload_json,
),
)
return
if table == "collection_candidates":
conn.execute(
"""
insert into collection_candidates
(id, provider, query, status, collected_epoch, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
provider = excluded.provider,
query = excluded.query,
status = excluded.status,
collected_epoch = excluded.collected_epoch,
payload = excluded.payload
""",
(
id_value,
str(payload.get("provider", "")),
str(payload.get("query", "")),
str(payload.get("status", "candidate")),
int(payload.get("collectedEpoch", 0) or 0),
payload_json,
),
)
return
if table == "corrections":
conn.execute(
"""
insert into corrections
(id, decision_id, status, payload)
values (?, ?, ?, ?)
on conflict(id) do update set
decision_id = excluded.decision_id,
status = excluded.status,
payload = excluded.payload
""",
(
id_value,
str(payload.get("decisionId", payload.get("decision_id", ""))),
str(payload.get("status", "")),
payload_json,
),
)
return
conn.execute(
f"insert or replace into {table} (id, payload) values (?, ?)",
(id_value, payload_json),
)
def _get(
self,
table: str,
id_value: str,
*,
conn: sqlite3.Connection | None = None,
) -> dict[str, Any]:
_validate_table(table)
with self._conn_ctx(conn) as conn:
row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone()
if row is None:
raise KeyError(id_value)
return json.loads(row["payload"])
def _apply_provider_usage_delta(
self, provider_id: str, count: int, payload: dict[str, Any]
) -> None:
# Re-read current usage under the write lock and apply the delta so
# concurrent provider calls don't lose increments. The rest of `payload`
# (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for
# status fields.
with self._write_lock:
current = self._get("providers", provider_id)
payload["usage"] = int(current.get("usage", 0) or 0) + int(count)
self._put("providers", provider_id, payload)
def _clear_collection_candidates(self) -> None:
with self._connect() as conn:
conn.execute("delete from collection_candidates")
def _all(self, table: str, queue_id: str | None = None) -> list[dict[str, Any]]:
_validate_table(table)
with self._connect() as conn:
if table == "submissions" and queue_id is not None:
rows = conn.execute(
"select payload from submissions where queue_id = ? order by id",
(queue_id,),
).fetchall()
else:
rows = conn.execute(f"select payload from {table} order by id").fetchall()
values = [json.loads(row["payload"]) for row in rows]
if table == "providers":
order = {"internal": 0, "naver": 1, "google": 2, "google_search": 3, "llm": 4}
values.sort(key=lambda item: order.get(item["id"], 99))
if table == "collection_candidates":
values.sort(key=lambda item: int(item.get("collectedEpoch", 0)), reverse=True)
return values
def _evidence_rows(self, queue_id: str | None = None):
with self._connect() as conn:
if queue_id is None:
rows = conn.execute("select submission_id, payload from evidence order by id").fetchall()
else:
rows = conn.execute(
"""
select e.submission_id, e.payload
from evidence e
join submissions s on s.id = e.submission_id
where s.queue_id = ?
order by e.id
""",
(queue_id,),
).fetchall()
return rows
def _evidence_for_submission(self, submission_id: str) -> list[dict[str, Any]]:
# Read only this submission's evidence. _rescore_submission previously
# called _evidence_by_submission(), which read and JSON-parsed the ENTIRE
# evidence table on every single-submission rescore (a hot path inside the
# seed / auto-search / similarity loops).
with self._connect() as conn:
rows = conn.execute(
"select payload from evidence where submission_id = ? order by id",
(submission_id,),
).fetchall()
evidence: list[dict[str, Any]] = []
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
evidence.append(payload)
return evidence
def _evidence_by_submission(self, queue_id: str | None = None) -> dict[str, list[dict[str, Any]]]:
rows = self._evidence_rows(queue_id=queue_id)
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
grouped.setdefault(row["submission_id"], []).append(payload)
return grouped

View file

@ -0,0 +1,313 @@
"""SQLite persistence primitives for CopyrighterStore, as a mixin.
Connection/transaction management and the generic _put/_get/_all row access plus
the evidence-read helpers. Mixed into CopyrighterStore; methods rely on the host
class providing `self.db_path` and `self._write_lock`. Behavior unchanged.
"""
from __future__ import annotations
import json
import sqlite3
from contextlib import contextmanager
from typing import Any
from rights_filter.server.store_serialization import _validate_payload, _validate_table
class StorePersistenceMixin:
def _count(self, table: str) -> int:
_validate_table(table)
with self._connect() as conn:
return int(conn.execute(f"select count(*) from {table}").fetchone()[0])
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("pragma foreign_keys = on")
# The server is a ThreadingHTTPServer: concurrent operator requests open
# their own connection. Without a busy timeout the second writer raises
# "database is locked" immediately; wait for the lock instead.
conn.execute("pragma busy_timeout = 5000")
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _conn_ctx(self, conn: sqlite3.Connection | None = None):
# When a caller passes an open connection (inside _transaction), reuse it
# and let the caller own commit/rollback. Otherwise behave exactly like
# `with self._connect() as conn:` — sqlite3 commits on success / rolls
# back on error.
if conn is not None:
yield conn
return
with self._connect() as owned:
yield owned
@contextmanager
def _transaction(self):
# A single connection + single commit for multi-step writes that must be
# atomic (e.g. a state change and its audit event), so a mid-sequence
# failure leaves no partial state.
conn = self._connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _put(
self,
table: str,
id_value: str,
payload: dict[str, Any],
*,
conn: sqlite3.Connection | None = None,
) -> None:
_validate_payload(table, id_value, payload)
payload_json = json.dumps(payload, ensure_ascii=False)
with self._conn_ctx(conn) as conn:
if table == "submissions":
conn.execute(
"""
insert into submissions
(id, title, risk_score, risk_band, decision_status, submitted_epoch, queue_id, payload)
values (?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
title = excluded.title,
risk_score = excluded.risk_score,
risk_band = excluded.risk_band,
decision_status = excluded.decision_status,
submitted_epoch = excluded.submitted_epoch,
queue_id = excluded.queue_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("title", "")),
int(payload.get("riskScore", 0) or 0),
str(payload.get("riskBand", "low")),
str(payload.get("decisionStatus", "unreviewed")),
int(payload.get("submittedEpoch", 0) or 0),
str(payload.get("queueId", "")),
payload_json,
),
)
return
if table == "evidence":
conn.execute(
"""
insert into evidence
(id, submission_id, source, confidence, status, contributed, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
submission_id = excluded.submission_id,
source = excluded.source,
confidence = excluded.confidence,
status = excluded.status,
contributed = excluded.contributed,
payload = excluded.payload
""",
(
id_value,
str(payload["submission_id"]),
str(payload.get("source", "")),
# Clamp to the column's CHECK (confidence >= 0 and <= 1).
# Some sources (e.g. Google Vision web-entity relevance
# scores) are unbounded and would raise IntegrityError.
max(0.0, min(1.0, float(payload.get("confidence", 0) or 0))),
str(payload.get("status", "active")),
1 if bool(payload.get("contributed", True)) else 0,
payload_json,
),
)
return
if table == "providers":
conn.execute(
"""
insert into providers
(id, name, enabled, usage, quota, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
enabled = excluded.enabled,
usage = excluded.usage,
quota = excluded.quota,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
1 if bool(payload.get("enabled", False)) else 0,
int(payload.get("usage", 0) or 0),
int(payload.get("quota", 0) or 0),
payload_json,
),
)
return
if table == "knowledge_entries":
conn.execute(
"""
insert into knowledge_entries
(id, name, entry_type, entry_status, active, source_submission_id, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
entry_type = excluded.entry_type,
entry_status = excluded.entry_status,
active = excluded.active,
source_submission_id = excluded.source_submission_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
str(payload.get("type", "other")),
str(payload.get("entryStatus", "confirmed")),
1 if bool(payload.get("active", True)) else 0,
str(payload.get("sourceSubmissionId", "")),
payload_json,
),
)
return
if table == "collection_candidates":
conn.execute(
"""
insert into collection_candidates
(id, provider, query, status, collected_epoch, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
provider = excluded.provider,
query = excluded.query,
status = excluded.status,
collected_epoch = excluded.collected_epoch,
payload = excluded.payload
""",
(
id_value,
str(payload.get("provider", "")),
str(payload.get("query", "")),
str(payload.get("status", "candidate")),
int(payload.get("collectedEpoch", 0) or 0),
payload_json,
),
)
return
if table == "corrections":
conn.execute(
"""
insert into corrections
(id, decision_id, status, payload)
values (?, ?, ?, ?)
on conflict(id) do update set
decision_id = excluded.decision_id,
status = excluded.status,
payload = excluded.payload
""",
(
id_value,
str(payload.get("decisionId", payload.get("decision_id", ""))),
str(payload.get("status", "")),
payload_json,
),
)
return
conn.execute(
f"insert or replace into {table} (id, payload) values (?, ?)",
(id_value, payload_json),
)
def _get(
self,
table: str,
id_value: str,
*,
conn: sqlite3.Connection | None = None,
) -> dict[str, Any]:
_validate_table(table)
with self._conn_ctx(conn) as conn:
row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone()
if row is None:
raise KeyError(id_value)
return json.loads(row["payload"])
def _apply_provider_usage_delta(
self, provider_id: str, count: int, payload: dict[str, Any]
) -> None:
# Re-read current usage under the write lock and apply the delta so
# concurrent provider calls don't lose increments. The rest of `payload`
# (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for
# status fields.
with self._write_lock:
current = self._get("providers", provider_id)
payload["usage"] = int(current.get("usage", 0) or 0) + int(count)
self._put("providers", provider_id, payload)
def _clear_collection_candidates(self) -> None:
with self._connect() as conn:
conn.execute("delete from collection_candidates")
def _all(self, table: str, queue_id: str | None = None) -> list[dict[str, Any]]:
_validate_table(table)
with self._connect() as conn:
if table == "submissions" and queue_id is not None:
rows = conn.execute(
"select payload from submissions where queue_id = ? order by id",
(queue_id,),
).fetchall()
else:
rows = conn.execute(f"select payload from {table} order by id").fetchall()
values = [json.loads(row["payload"]) for row in rows]
if table == "providers":
order = {"internal": 0, "naver": 1, "google": 2, "google_search": 3, "llm": 4}
values.sort(key=lambda item: order.get(item["id"], 99))
if table == "collection_candidates":
values.sort(key=lambda item: int(item.get("collectedEpoch", 0)), reverse=True)
return values
def _evidence_rows(self, queue_id: str | None = None):
with self._connect() as conn:
if queue_id is None:
rows = conn.execute("select submission_id, payload from evidence order by id").fetchall()
else:
rows = conn.execute(
"""
select e.submission_id, e.payload
from evidence e
join submissions s on s.id = e.submission_id
where s.queue_id = ?
order by e.id
""",
(queue_id,),
).fetchall()
return rows
def _evidence_for_submission(self, submission_id: str) -> list[dict[str, Any]]:
# Read only this submission's evidence. _rescore_submission previously
# called _evidence_by_submission(), which read and JSON-parsed the ENTIRE
# evidence table on every single-submission rescore (a hot path inside the
# seed / auto-search / similarity loops).
with self._connect() as conn:
rows = conn.execute(
"select payload from evidence where submission_id = ? order by id",
(submission_id,),
).fetchall()
evidence: list[dict[str, Any]] = []
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
evidence.append(payload)
return evidence
def _evidence_by_submission(self, queue_id: str | None = None) -> dict[str, list[dict[str, Any]]]:
rows = self._evidence_rows(queue_id=queue_id)
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
grouped.setdefault(row["submission_id"], []).append(payload)
return grouped