From da917755dda07378021bc4c512db4467e138358b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 20:50:29 +0900 Subject: [PATCH] refactor: extract remote-fetch and schema modules from sqlite_store Move the pure network layer (image/page/stylesheet fetchers, SSRF guard, redirect-validating opener) into store_remote_fetch, and the DDL/typed-column/ constraint-migration helpers into store_schema. Both are behavior-preserving relocations imported back into sqlite_store; tests repoint their fetch monkeypatches to store_remote_fetch. sqlite_store.py 5333 -> 4948 lines. --- src/rights_filter/server/sqlite_store.py | 371 +----------------- .../server/store_remote_fetch.py | 125 ++++++ src/rights_filter/server/store_schema.py | 257 ++++++++++++ .../rights_filter/server/test_sqlite_store.py | 13 +- 4 files changed, 401 insertions(+), 365 deletions(-) create mode 100644 src/rights_filter/server/store_remote_fetch.py create mode 100644 src/rights_filter/server/store_schema.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index b2d4090..708a98d 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -3,13 +3,11 @@ from __future__ import annotations import base64 import hashlib import html -import ipaddress import json import mimetypes import os import re import shutil -import socket import sqlite3 import threading from html.parser import HTMLParser @@ -19,8 +17,7 @@ from datetime import datetime from io import BytesIO from pathlib import Path from typing import Any, Callable -from urllib.parse import parse_qsl, unquote, urljoin, urlparse -from urllib.request import HTTPRedirectHandler, Request, build_opener +from urllib.parse import parse_qsl, urljoin, urlparse from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector from rights_filter.analysis.fingerprints import FingerprintService @@ -45,6 +42,17 @@ from rights_filter.integrations.env_clients import ProviderRuntime, build_provid from rights_filter.integrations.external_policy import ExternalApiPolicy from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES +from rights_filter.server.store_remote_fetch import ( + _fetch_page_url_bytes, + _fetch_stylesheet_url_bytes, + _fetch_url_bytes, +) +from rights_filter.server.store_schema import ( + _ensure_constrained_schema, + _ensure_queue_schema, + _ensure_schema_version, + _ensure_typed_columns, +) from rights_filter.server.store_url_utils import ( _decoded_nested_url, _is_http_url, @@ -73,7 +81,6 @@ STORE_TABLES = { "audit_events", "submission_queues", } -SCHEMA_VERSION = 2 RISK_BANDS = ("low", "medium", "high", "failed", "pending") DECISION_STATUSES = ("unreviewed", "held", "rejected", "approved", "corrected") EVIDENCE_STATUSES = ( @@ -97,90 +104,6 @@ DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40 MIN_COVERAGE_THRESHOLD = 0 MAX_COVERAGE_THRESHOLD = 100 DEFAULT_FACE_CROP_RETENTION_DAYS = 90 -CONSTRAINED_TABLE_SCHEMAS = { - "submissions": """ - create table {table} ( - id text primary key, - title text not null default '', - risk_score integer not null default 0 check (risk_score >= 0 and risk_score <= 100), - risk_band text not null default 'low' check (risk_band in ('low', 'medium', 'high', 'failed', 'pending')), - decision_status text not null default 'unreviewed' check (decision_status in ('unreviewed', 'held', 'rejected', 'approved', 'corrected')), - submitted_epoch integer not null default 0 check (submitted_epoch >= 0), - queue_id text not null default '', - payload text not null check (json_valid(payload)) - ) - """, - "evidence": """ - create table {table} ( - id text primary key, - submission_id text not null, - source text not null default '', - confidence real not null default 0 check (confidence >= 0 and confidence <= 1), - status text not null default 'active' check (status in ('active', 'auto', 'manual', 'queued', 'rerun', 'weak', 'used_for_judgment', 'irrelevant', 'false_positive', 'pending')), - contributed integer not null default 1 check (contributed in (0, 1)), - payload text not null check (json_valid(payload)), - foreign key (submission_id) references submissions(id) on delete cascade - ) - """, - "providers": """ - create table {table} ( - id text primary key, - name text not null default '', - enabled integer not null default 0 check (enabled in (0, 1)), - usage integer not null default 0 check (usage >= 0), - quota integer not null default 0 check (quota >= 0), - payload text not null check (json_valid(payload)) - ) - """, - "knowledge_entries": """ - create table {table} ( - id text primary key, - name text not null default '', - entry_type text not null default 'other', - entry_status text not null default 'confirmed' check (entry_status in ('confirmed', 'watchlist', 'excluded')), - active integer not null default 1 check (active in (0, 1)), - source_submission_id text not null default '', - payload text not null check (json_valid(payload)) - ) - """, - "collection_candidates": """ - create table {table} ( - id text primary key, - provider text not null default '', - query text not null default '', - status text not null default 'candidate' check (status in ('candidate', 'promoted')), - collected_epoch integer not null default 0 check (collected_epoch >= 0), - payload text not null check (json_valid(payload)) - ) - """, - "corrections": """ - create table {table} ( - id text primary key, - decision_id text not null default '', - status text not null default '', - payload text not null check (json_valid(payload)) - ) - """, - "audit_events": """ - create table {table} ( - id integer primary key autoincrement, - timestamp text not null default '', - actor text not null default '', - event text not null default '', - object_ref text not null default '', - payload text not null check (json_valid(payload)) - ) - """, -} -LEGACY_REBUILD_ORDER = ( - "submissions", - "providers", - "knowledge_entries", - "collection_candidates", - "corrections", - "audit_events", - "evidence", -) PAYLOAD_REQUIRED_FIELDS = { "submissions": { "id", @@ -233,167 +156,8 @@ PAYLOAD_REQUIRED_FIELDS = { "id", }, } -TYPED_COLUMNS = { - "submissions": [ - ("title", "text not null default ''"), - ("risk_score", "integer not null default 0"), - ("risk_band", "text not null default 'low'"), - ("decision_status", "text not null default 'unreviewed'"), - ("submitted_epoch", "integer not null default 0"), - ("queue_id", "text not null default ''"), - ], - "evidence": [ - ("source", "text not null default ''"), - ("confidence", "real not null default 0"), - ("status", "text not null default 'active'"), - ("contributed", "integer not null default 1"), - ], - "providers": [ - ("name", "text not null default ''"), - ("enabled", "integer not null default 0"), - ("usage", "integer not null default 0"), - ("quota", "integer not null default 0"), - ], - "knowledge_entries": [ - ("name", "text not null default ''"), - ("entry_type", "text not null default 'other'"), - ("entry_status", "text not null default 'confirmed'"), - ("active", "integer not null default 1"), - ("source_submission_id", "text not null default ''"), - ], - "collection_candidates": [ - ("provider", "text not null default ''"), - ("query", "text not null default ''"), - ("status", "text not null default 'candidate'"), - ("collected_epoch", "integer not null default 0"), - ], - "corrections": [ - ("decision_id", "text not null default ''"), - ("status", "text not null default ''"), - ], - "audit_events": [ - ("timestamp", "text not null default ''"), - ("actor", "text not null default ''"), - ("event", "text not null default ''"), - ("object_ref", "text not null default ''"), - ], -} -def _ensure_typed_columns(conn: sqlite3.Connection) -> None: - for table, columns in TYPED_COLUMNS.items(): - existing = { - str(row["name"]) - for row in conn.execute(f"pragma table_info({table})").fetchall() - } - for name, definition in columns: - if name not in existing: - conn.execute(f"alter table {table} add column {name} {definition}") - - -def _ensure_schema_version(conn: sqlite3.Connection) -> None: - current = int(conn.execute("pragma user_version").fetchone()[0]) - if current < SCHEMA_VERSION: - conn.execute(f"pragma user_version = {SCHEMA_VERSION}") - - -def _ensure_queue_schema(conn: sqlite3.Connection) -> None: - conn.execute( - """ - create table if not exists submission_queues ( - id text primary key, - folder_path text not null unique, - label text not null default '', - is_active integer not null default 0 check (is_active in (0, 1)), - created_at text not null default '', - created_epoch integer not null default 0 check (created_epoch >= 0), - last_imported_epoch integer not null default 0 check (last_imported_epoch >= 0), - last_imported_at text not null default '' - ) - """ - ) - conn.execute( - "create index if not exists idx_submission_queues_active on submission_queues(is_active, id)" - ) - - -def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None: - rows = conn.execute( - "select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'" - ).fetchall() - for row in rows: - conn.execute(f"drop table if exists {row['name']}") - - -def _ensure_constrained_schema(conn: sqlite3.Connection) -> None: - pending = [ - table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table) - ] - if not pending: - return - # Disable foreign-key enforcement for the duration of the rebuilds. The - # rebuild drops each parent table before its children are recreated, and a - # `drop table submissions` while foreign keys are ON would fire evidence's - # `on delete cascade` and silently wipe every evidence row. Toggling FKs off - # (the procedure SQLite documents for constraint-changing migrations) must - # happen with no transaction pending, so commit first and restore on exit. - conn.commit() - # Clean up any half-built temp table left by a prior crashed/failed run so we - # retry cleanly instead of tripping on an orphan. - _drop_stale_rebuild_temp_tables(conn) - conn.execute("pragma foreign_keys = off") - try: - for table in pending: - _rebuild_constrained_table(conn, table) - conn.commit() - except Exception: - # The copy step raises before the original table is dropped (e.g. legacy - # rows violate the new CHECK), so the source data is intact; remove the - # orphaned temp table so the next startup can retry. - _drop_stale_rebuild_temp_tables(conn) - conn.commit() - raise - finally: - conn.execute("pragma foreign_keys = on") - - -def _table_needs_constraint_rebuild(conn: sqlite3.Connection, table: str) -> bool: - row = conn.execute( - "select sql from sqlite_master where type = 'table' and name = ?", - (table,), - ).fetchone() - if row is None: - return False - sql = str(row["sql"] or "").lower() - if "json_valid(payload)" not in sql: - return True - if table == "evidence" and "foreign key" not in sql: - return True - if table == "submissions" and "risk_band in" not in sql: - return True - return False - - -def _rebuild_constrained_table(conn: sqlite3.Connection, table: str) -> None: - schema = CONSTRAINED_TABLE_SCHEMAS[table] - temp_table = f"__copyrighter_{table}_new" - conn.execute(f"drop table if exists {temp_table}") - conn.execute(schema.format(table=temp_table)) - - source_columns = _table_columns(conn, table) - target_columns = _table_columns(conn, temp_table) - copied_columns = [column for column in target_columns if column in source_columns] - if copied_columns: - column_sql = ", ".join(copied_columns) - conn.execute( - f"insert into {temp_table} ({column_sql}) select {column_sql} from {table}" - ) - conn.execute(f"drop table {table}") - conn.execute(f"alter table {temp_table} rename to {table}") - - -def _table_columns(conn: sqlite3.Connection, table: str) -> list[str]: - return [str(row["name"]) for row in conn.execute(f"pragma table_info({table})").fetchall()] def _bounded_int_env(value: str | None, default: int, minimum: int, maximum: int) -> int: @@ -3892,117 +3656,6 @@ def _image_suffix_from_url(url: str) -> str: return suffix -CANDIDATE_IMAGE_FETCH_LIMIT = 8_000_000 -CANDIDATE_IMAGE_FETCH_HEADERS = { - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/124.0 Safari/537.36 CopyrighterCandidateCollector/0.1" - ), - "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", - "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", - "Sec-Fetch-Dest": "image", - "Sec-Fetch-Mode": "no-cors", - "Sec-Fetch-Site": "cross-site", -} -PAGE_FETCH_HEADERS = { - "User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"], - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/*,*/*;q=0.8", - "Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"], - "Sec-Fetch-Dest": "document", - "Sec-Fetch-Mode": "navigate", - "Sec-Fetch-Site": "cross-site", -} -STYLESHEET_FETCH_HEADERS = { - "User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"], - "Accept": "text/css,*/*;q=0.8", - "Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"], - "Sec-Fetch-Dest": "style", - "Sec-Fetch-Mode": "no-cors", - "Sec-Fetch-Site": "cross-site", -} - - -def _fetch_url_bytes( - url: str, - timeout: int = 10, - limit: int = CANDIDATE_IMAGE_FETCH_LIMIT, - referer_url: str = "", -) -> bytes: - headers = _headers_with_referer(CANDIDATE_IMAGE_FETCH_HEADERS, referer_url) - return _fetch_url_bytes_with_headers(url, headers, timeout, limit) - - -def _fetch_page_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes: - return _fetch_url_bytes_with_headers(url, PAGE_FETCH_HEADERS, timeout, limit) - - -def _fetch_stylesheet_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes: - return _fetch_url_bytes_with_headers(url, STYLESHEET_FETCH_HEADERS, timeout, limit) - - -def _assert_public_http_url(url: str) -> None: - # SSRF guard: candidate image / page / stylesheet URLs come from external - # search-result content (attacker-influenceable), so refuse to fetch - # anything that resolves to a non-public address (loopback, RFC1918, - # link-local cloud-metadata 169.254.169.254, etc.). - parsed = urlparse(url) - if parsed.scheme not in {"http", "https"} or not parsed.hostname: - raise ValueError("only public http(s) URLs may be fetched") - port = parsed.port or (443 if parsed.scheme == "https" else 80) - try: - infos = socket.getaddrinfo(parsed.hostname, port, proto=socket.IPPROTO_TCP) - except OSError as exc: - raise ValueError("could not resolve fetch host") from exc - for *_, sockaddr in infos: - ip = ipaddress.ip_address(sockaddr[0]) - if ( - ip.is_private - or ip.is_loopback - or ip.is_link_local - or ip.is_reserved - or ip.is_multicast - or ip.is_unspecified - ): - raise ValueError("refusing to fetch internal address") - - -class _ValidatingRedirectHandler(HTTPRedirectHandler): - # Re-run the SSRF guard on every redirect hop so a public URL cannot bounce - # to an internal address. - def redirect_request(self, req, fp, code, msg, headers, newurl): - _assert_public_http_url(newurl) - return super().redirect_request(req, fp, code, msg, headers, newurl) - - -_SSRF_SAFE_OPENER = build_opener(_ValidatingRedirectHandler()) - - -def _open_url(request: Request, timeout: int): - return _SSRF_SAFE_OPENER.open(request, timeout=timeout) - - -def _fetch_url_bytes_with_headers( - url: str, - headers: dict[str, str], - timeout: int, - limit: int, -) -> bytes: - _assert_public_http_url(url) - request = Request(url, headers=headers) - with _open_url(request, timeout) as response: - data = response.read(limit + 1) - if len(data) > limit: - raise ValueError("candidate image exceeds size limit") - return data - - -def _headers_with_referer(headers: dict[str, str], referer_url: str) -> dict[str, str]: - if not referer_url or not _is_http_url(referer_url): - return headers - return {**headers, "Referer": referer_url} - - class _PageImageParser(HTMLParser): def __init__(self, parse_noscript: bool = True) -> None: super().__init__() diff --git a/src/rights_filter/server/store_remote_fetch.py b/src/rights_filter/server/store_remote_fetch.py new file mode 100644 index 0000000..30a3627 --- /dev/null +++ b/src/rights_filter/server/store_remote_fetch.py @@ -0,0 +1,125 @@ +"""Remote fetch layer for candidate images, pages, and stylesheets. + +Extracted from sqlite_store.py. This is the pure network boundary (HTTP fetch ++ SSRF guard); HTML/CSS parsing of the fetched content stays in the store. Tests +monkeypatch _open_url / _assert_public_http_url / _fetch_url_bytes here. +""" + +from __future__ import annotations + +import ipaddress +import socket +from urllib.parse import urlparse +from urllib.request import HTTPRedirectHandler, Request, build_opener + +from rights_filter.server.store_url_utils import _is_http_url + +CANDIDATE_IMAGE_FETCH_LIMIT = 8_000_000 +CANDIDATE_IMAGE_FETCH_HEADERS = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0 Safari/537.36 CopyrighterCandidateCollector/0.1" + ), + "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", + "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", + "Sec-Fetch-Dest": "image", + "Sec-Fetch-Mode": "no-cors", + "Sec-Fetch-Site": "cross-site", +} +PAGE_FETCH_HEADERS = { + "User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"], + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/*,*/*;q=0.8", + "Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"], + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "cross-site", +} +STYLESHEET_FETCH_HEADERS = { + "User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"], + "Accept": "text/css,*/*;q=0.8", + "Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"], + "Sec-Fetch-Dest": "style", + "Sec-Fetch-Mode": "no-cors", + "Sec-Fetch-Site": "cross-site", +} + + +def _fetch_url_bytes( + url: str, + timeout: int = 10, + limit: int = CANDIDATE_IMAGE_FETCH_LIMIT, + referer_url: str = "", +) -> bytes: + headers = _headers_with_referer(CANDIDATE_IMAGE_FETCH_HEADERS, referer_url) + return _fetch_url_bytes_with_headers(url, headers, timeout, limit) + + +def _fetch_page_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes: + return _fetch_url_bytes_with_headers(url, PAGE_FETCH_HEADERS, timeout, limit) + + +def _fetch_stylesheet_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes: + return _fetch_url_bytes_with_headers(url, STYLESHEET_FETCH_HEADERS, timeout, limit) + + +def _assert_public_http_url(url: str) -> None: + # SSRF guard: candidate image / page / stylesheet URLs come from external + # search-result content (attacker-influenceable), so refuse to fetch + # anything that resolves to a non-public address (loopback, RFC1918, + # link-local cloud-metadata 169.254.169.254, etc.). + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.hostname: + raise ValueError("only public http(s) URLs may be fetched") + port = parsed.port or (443 if parsed.scheme == "https" else 80) + try: + infos = socket.getaddrinfo(parsed.hostname, port, proto=socket.IPPROTO_TCP) + except OSError as exc: + raise ValueError("could not resolve fetch host") from exc + for *_, sockaddr in infos: + ip = ipaddress.ip_address(sockaddr[0]) + if ( + ip.is_private + or ip.is_loopback + or ip.is_link_local + or ip.is_reserved + or ip.is_multicast + or ip.is_unspecified + ): + raise ValueError("refusing to fetch internal address") + + +class _ValidatingRedirectHandler(HTTPRedirectHandler): + # Re-run the SSRF guard on every redirect hop so a public URL cannot bounce + # to an internal address. + def redirect_request(self, req, fp, code, msg, headers, newurl): + _assert_public_http_url(newurl) + return super().redirect_request(req, fp, code, msg, headers, newurl) + + +_SSRF_SAFE_OPENER = build_opener(_ValidatingRedirectHandler()) + + +def _open_url(request: Request, timeout: int): + return _SSRF_SAFE_OPENER.open(request, timeout=timeout) + + +def _fetch_url_bytes_with_headers( + url: str, + headers: dict[str, str], + timeout: int, + limit: int, +) -> bytes: + _assert_public_http_url(url) + request = Request(url, headers=headers) + with _open_url(request, timeout) as response: + data = response.read(limit + 1) + if len(data) > limit: + raise ValueError("candidate image exceeds size limit") + return data + + +def _headers_with_referer(headers: dict[str, str], referer_url: str) -> dict[str, str]: + if not referer_url or not _is_http_url(referer_url): + return headers + return {**headers, "Referer": referer_url} diff --git a/src/rights_filter/server/store_schema.py b/src/rights_filter/server/store_schema.py new file mode 100644 index 0000000..adf994c --- /dev/null +++ b/src/rights_filter/server/store_schema.py @@ -0,0 +1,257 @@ +"""SQLite schema definition, typed-column upgrades, and constraint migration. + +Extracted from sqlite_store.py. Pure DDL/migration helpers operating on a +passed-in connection; no dependency on the store class. Behavior unchanged. +""" + +from __future__ import annotations + +import sqlite3 + +SCHEMA_VERSION = 2 + +CONSTRAINED_TABLE_SCHEMAS = { + "submissions": """ + create table {table} ( + id text primary key, + title text not null default '', + risk_score integer not null default 0 check (risk_score >= 0 and risk_score <= 100), + risk_band text not null default 'low' check (risk_band in ('low', 'medium', 'high', 'failed', 'pending')), + decision_status text not null default 'unreviewed' check (decision_status in ('unreviewed', 'held', 'rejected', 'approved', 'corrected')), + submitted_epoch integer not null default 0 check (submitted_epoch >= 0), + queue_id text not null default '', + payload text not null check (json_valid(payload)) + ) + """, + "evidence": """ + create table {table} ( + id text primary key, + submission_id text not null, + source text not null default '', + confidence real not null default 0 check (confidence >= 0 and confidence <= 1), + status text not null default 'active' check (status in ('active', 'auto', 'manual', 'queued', 'rerun', 'weak', 'used_for_judgment', 'irrelevant', 'false_positive', 'pending')), + contributed integer not null default 1 check (contributed in (0, 1)), + payload text not null check (json_valid(payload)), + foreign key (submission_id) references submissions(id) on delete cascade + ) + """, + "providers": """ + create table {table} ( + id text primary key, + name text not null default '', + enabled integer not null default 0 check (enabled in (0, 1)), + usage integer not null default 0 check (usage >= 0), + quota integer not null default 0 check (quota >= 0), + payload text not null check (json_valid(payload)) + ) + """, + "knowledge_entries": """ + create table {table} ( + id text primary key, + name text not null default '', + entry_type text not null default 'other', + entry_status text not null default 'confirmed' check (entry_status in ('confirmed', 'watchlist', 'excluded')), + active integer not null default 1 check (active in (0, 1)), + source_submission_id text not null default '', + payload text not null check (json_valid(payload)) + ) + """, + "collection_candidates": """ + create table {table} ( + id text primary key, + provider text not null default '', + query text not null default '', + status text not null default 'candidate' check (status in ('candidate', 'promoted')), + collected_epoch integer not null default 0 check (collected_epoch >= 0), + payload text not null check (json_valid(payload)) + ) + """, + "corrections": """ + create table {table} ( + id text primary key, + decision_id text not null default '', + status text not null default '', + payload text not null check (json_valid(payload)) + ) + """, + "audit_events": """ + create table {table} ( + id integer primary key autoincrement, + timestamp text not null default '', + actor text not null default '', + event text not null default '', + object_ref text not null default '', + payload text not null check (json_valid(payload)) + ) + """, +} +LEGACY_REBUILD_ORDER = ( + "submissions", + "providers", + "knowledge_entries", + "collection_candidates", + "corrections", + "audit_events", + "evidence", +) +TYPED_COLUMNS = { + "submissions": [ + ("title", "text not null default ''"), + ("risk_score", "integer not null default 0"), + ("risk_band", "text not null default 'low'"), + ("decision_status", "text not null default 'unreviewed'"), + ("submitted_epoch", "integer not null default 0"), + ("queue_id", "text not null default ''"), + ], + "evidence": [ + ("source", "text not null default ''"), + ("confidence", "real not null default 0"), + ("status", "text not null default 'active'"), + ("contributed", "integer not null default 1"), + ], + "providers": [ + ("name", "text not null default ''"), + ("enabled", "integer not null default 0"), + ("usage", "integer not null default 0"), + ("quota", "integer not null default 0"), + ], + "knowledge_entries": [ + ("name", "text not null default ''"), + ("entry_type", "text not null default 'other'"), + ("entry_status", "text not null default 'confirmed'"), + ("active", "integer not null default 1"), + ("source_submission_id", "text not null default ''"), + ], + "collection_candidates": [ + ("provider", "text not null default ''"), + ("query", "text not null default ''"), + ("status", "text not null default 'candidate'"), + ("collected_epoch", "integer not null default 0"), + ], + "corrections": [ + ("decision_id", "text not null default ''"), + ("status", "text not null default ''"), + ], + "audit_events": [ + ("timestamp", "text not null default ''"), + ("actor", "text not null default ''"), + ("event", "text not null default ''"), + ("object_ref", "text not null default ''"), + ], +} + + +def _ensure_typed_columns(conn: sqlite3.Connection) -> None: + for table, columns in TYPED_COLUMNS.items(): + existing = { + str(row["name"]) + for row in conn.execute(f"pragma table_info({table})").fetchall() + } + for name, definition in columns: + if name not in existing: + conn.execute(f"alter table {table} add column {name} {definition}") + + +def _ensure_schema_version(conn: sqlite3.Connection) -> None: + current = int(conn.execute("pragma user_version").fetchone()[0]) + if current < SCHEMA_VERSION: + conn.execute(f"pragma user_version = {SCHEMA_VERSION}") + + +def _ensure_queue_schema(conn: sqlite3.Connection) -> None: + conn.execute( + """ + create table if not exists submission_queues ( + id text primary key, + folder_path text not null unique, + label text not null default '', + is_active integer not null default 0 check (is_active in (0, 1)), + created_at text not null default '', + created_epoch integer not null default 0 check (created_epoch >= 0), + last_imported_epoch integer not null default 0 check (last_imported_epoch >= 0), + last_imported_at text not null default '' + ) + """ + ) + conn.execute( + "create index if not exists idx_submission_queues_active on submission_queues(is_active, id)" + ) + + +def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None: + rows = conn.execute( + "select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'" + ).fetchall() + for row in rows: + conn.execute(f"drop table if exists {row['name']}") + + +def _ensure_constrained_schema(conn: sqlite3.Connection) -> None: + pending = [ + table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table) + ] + if not pending: + return + # Disable foreign-key enforcement for the duration of the rebuilds. The + # rebuild drops each parent table before its children are recreated, and a + # `drop table submissions` while foreign keys are ON would fire evidence's + # `on delete cascade` and silently wipe every evidence row. Toggling FKs off + # (the procedure SQLite documents for constraint-changing migrations) must + # happen with no transaction pending, so commit first and restore on exit. + conn.commit() + # Clean up any half-built temp table left by a prior crashed/failed run so we + # retry cleanly instead of tripping on an orphan. + _drop_stale_rebuild_temp_tables(conn) + conn.execute("pragma foreign_keys = off") + try: + for table in pending: + _rebuild_constrained_table(conn, table) + conn.commit() + except Exception: + # The copy step raises before the original table is dropped (e.g. legacy + # rows violate the new CHECK), so the source data is intact; remove the + # orphaned temp table so the next startup can retry. + _drop_stale_rebuild_temp_tables(conn) + conn.commit() + raise + finally: + conn.execute("pragma foreign_keys = on") + + +def _table_needs_constraint_rebuild(conn: sqlite3.Connection, table: str) -> bool: + row = conn.execute( + "select sql from sqlite_master where type = 'table' and name = ?", + (table,), + ).fetchone() + if row is None: + return False + sql = str(row["sql"] or "").lower() + if "json_valid(payload)" not in sql: + return True + if table == "evidence" and "foreign key" not in sql: + return True + if table == "submissions" and "risk_band in" not in sql: + return True + return False + + +def _rebuild_constrained_table(conn: sqlite3.Connection, table: str) -> None: + schema = CONSTRAINED_TABLE_SCHEMAS[table] + temp_table = f"__copyrighter_{table}_new" + conn.execute(f"drop table if exists {temp_table}") + conn.execute(schema.format(table=temp_table)) + + source_columns = _table_columns(conn, table) + target_columns = _table_columns(conn, temp_table) + copied_columns = [column for column in target_columns if column in source_columns] + if copied_columns: + column_sql = ", ".join(copied_columns) + conn.execute( + f"insert into {temp_table} ({column_sql}) select {column_sql} from {table}" + ) + conn.execute(f"drop table {table}") + conn.execute(f"alter table {temp_table} rename to {table}") + + +def _table_columns(conn: sqlite3.Connection, table: str) -> list[str]: + return [str(row["name"]) for row in conn.execute(f"pragma table_info({table})").fetchall()] diff --git a/tests/rights_filter/server/test_sqlite_store.py b/tests/rights_filter/server/test_sqlite_store.py index a6cd395..ebd1f43 100644 --- a/tests/rights_filter/server/test_sqlite_store.py +++ b/tests/rights_filter/server/test_sqlite_store.py @@ -12,6 +12,7 @@ from rights_filter.analysis.preprocessing import ImagePayload, build_face_crop_d from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server import sqlite_store as sqlite_store_module +from rights_filter.server import store_remote_fetch as remote_fetch_module from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.integrations.env_clients import build_provider_runtime @@ -322,8 +323,8 @@ def test_fetch_url_bytes_uses_browser_image_headers_and_room_for_real_photos(mon captured["timeout"] = timeout return FakeResponse() - monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) - monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) + monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen) + monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None) content = sqlite_store_module._fetch_url_bytes("https://cdn.example.test/profile.webp") @@ -3633,8 +3634,8 @@ def test_sqlite_store_uses_page_and_image_specific_headers_for_default_fetcher( return FakeResponse(submitted_image) raise AssertionError(f"unexpected URL fetched: {request.full_url}") - monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) - monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) + monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen) + monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None) store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store.initialize() @@ -3722,8 +3723,8 @@ def test_sqlite_store_uses_source_page_referer_for_default_page_image_fetch( return FakeResponse(submitted_image) raise AssertionError(f"unexpected URL fetched: {request.full_url}") - monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) - monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) + monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen) + monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None) store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store.initialize()