refactor: extract remote-fetch and schema modules from sqlite_store

Move the pure network layer (image/page/stylesheet fetchers, SSRF guard,
redirect-validating opener) into store_remote_fetch, and the DDL/typed-column/
constraint-migration helpers into store_schema. Both are behavior-preserving
relocations imported back into sqlite_store; tests repoint their fetch
monkeypatches to store_remote_fetch. sqlite_store.py 5333 -> 4948 lines.
This commit is contained in:
유창욱 2026-06-20 20:50:29 +09:00
parent e66f9d5001
commit da917755dd
4 changed files with 401 additions and 365 deletions

View file

@ -3,13 +3,11 @@ from __future__ import annotations
import base64 import base64
import hashlib import hashlib
import html import html
import ipaddress
import json import json
import mimetypes import mimetypes
import os import os
import re import re
import shutil import shutil
import socket
import sqlite3 import sqlite3
import threading import threading
from html.parser import HTMLParser from html.parser import HTMLParser
@ -19,8 +17,7 @@ from datetime import datetime
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from urllib.parse import parse_qsl, unquote, urljoin, urlparse from urllib.parse import parse_qsl, urljoin, urlparse
from urllib.request import HTTPRedirectHandler, Request, build_opener
from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector
from rights_filter.analysis.fingerprints import FingerprintService from rights_filter.analysis.fingerprints import FingerprintService
@ -45,6 +42,17 @@ from rights_filter.integrations.env_clients import ProviderRuntime, build_provid
from rights_filter.integrations.external_policy import ExternalApiPolicy from rights_filter.integrations.external_policy import ExternalApiPolicy
from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage
from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES
from rights_filter.server.store_remote_fetch import (
_fetch_page_url_bytes,
_fetch_stylesheet_url_bytes,
_fetch_url_bytes,
)
from rights_filter.server.store_schema import (
_ensure_constrained_schema,
_ensure_queue_schema,
_ensure_schema_version,
_ensure_typed_columns,
)
from rights_filter.server.store_url_utils import ( from rights_filter.server.store_url_utils import (
_decoded_nested_url, _decoded_nested_url,
_is_http_url, _is_http_url,
@ -73,7 +81,6 @@ STORE_TABLES = {
"audit_events", "audit_events",
"submission_queues", "submission_queues",
} }
SCHEMA_VERSION = 2
RISK_BANDS = ("low", "medium", "high", "failed", "pending") RISK_BANDS = ("low", "medium", "high", "failed", "pending")
DECISION_STATUSES = ("unreviewed", "held", "rejected", "approved", "corrected") DECISION_STATUSES = ("unreviewed", "held", "rejected", "approved", "corrected")
EVIDENCE_STATUSES = ( EVIDENCE_STATUSES = (
@ -97,90 +104,6 @@ DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40
MIN_COVERAGE_THRESHOLD = 0 MIN_COVERAGE_THRESHOLD = 0
MAX_COVERAGE_THRESHOLD = 100 MAX_COVERAGE_THRESHOLD = 100
DEFAULT_FACE_CROP_RETENTION_DAYS = 90 DEFAULT_FACE_CROP_RETENTION_DAYS = 90
CONSTRAINED_TABLE_SCHEMAS = {
"submissions": """
create table {table} (
id text primary key,
title text not null default '',
risk_score integer not null default 0 check (risk_score >= 0 and risk_score <= 100),
risk_band text not null default 'low' check (risk_band in ('low', 'medium', 'high', 'failed', 'pending')),
decision_status text not null default 'unreviewed' check (decision_status in ('unreviewed', 'held', 'rejected', 'approved', 'corrected')),
submitted_epoch integer not null default 0 check (submitted_epoch >= 0),
queue_id text not null default '',
payload text not null check (json_valid(payload))
)
""",
"evidence": """
create table {table} (
id text primary key,
submission_id text not null,
source text not null default '',
confidence real not null default 0 check (confidence >= 0 and confidence <= 1),
status text not null default 'active' check (status in ('active', 'auto', 'manual', 'queued', 'rerun', 'weak', 'used_for_judgment', 'irrelevant', 'false_positive', 'pending')),
contributed integer not null default 1 check (contributed in (0, 1)),
payload text not null check (json_valid(payload)),
foreign key (submission_id) references submissions(id) on delete cascade
)
""",
"providers": """
create table {table} (
id text primary key,
name text not null default '',
enabled integer not null default 0 check (enabled in (0, 1)),
usage integer not null default 0 check (usage >= 0),
quota integer not null default 0 check (quota >= 0),
payload text not null check (json_valid(payload))
)
""",
"knowledge_entries": """
create table {table} (
id text primary key,
name text not null default '',
entry_type text not null default 'other',
entry_status text not null default 'confirmed' check (entry_status in ('confirmed', 'watchlist', 'excluded')),
active integer not null default 1 check (active in (0, 1)),
source_submission_id text not null default '',
payload text not null check (json_valid(payload))
)
""",
"collection_candidates": """
create table {table} (
id text primary key,
provider text not null default '',
query text not null default '',
status text not null default 'candidate' check (status in ('candidate', 'promoted')),
collected_epoch integer not null default 0 check (collected_epoch >= 0),
payload text not null check (json_valid(payload))
)
""",
"corrections": """
create table {table} (
id text primary key,
decision_id text not null default '',
status text not null default '',
payload text not null check (json_valid(payload))
)
""",
"audit_events": """
create table {table} (
id integer primary key autoincrement,
timestamp text not null default '',
actor text not null default '',
event text not null default '',
object_ref text not null default '',
payload text not null check (json_valid(payload))
)
""",
}
LEGACY_REBUILD_ORDER = (
"submissions",
"providers",
"knowledge_entries",
"collection_candidates",
"corrections",
"audit_events",
"evidence",
)
PAYLOAD_REQUIRED_FIELDS = { PAYLOAD_REQUIRED_FIELDS = {
"submissions": { "submissions": {
"id", "id",
@ -233,167 +156,8 @@ PAYLOAD_REQUIRED_FIELDS = {
"id", "id",
}, },
} }
TYPED_COLUMNS = {
"submissions": [
("title", "text not null default ''"),
("risk_score", "integer not null default 0"),
("risk_band", "text not null default 'low'"),
("decision_status", "text not null default 'unreviewed'"),
("submitted_epoch", "integer not null default 0"),
("queue_id", "text not null default ''"),
],
"evidence": [
("source", "text not null default ''"),
("confidence", "real not null default 0"),
("status", "text not null default 'active'"),
("contributed", "integer not null default 1"),
],
"providers": [
("name", "text not null default ''"),
("enabled", "integer not null default 0"),
("usage", "integer not null default 0"),
("quota", "integer not null default 0"),
],
"knowledge_entries": [
("name", "text not null default ''"),
("entry_type", "text not null default 'other'"),
("entry_status", "text not null default 'confirmed'"),
("active", "integer not null default 1"),
("source_submission_id", "text not null default ''"),
],
"collection_candidates": [
("provider", "text not null default ''"),
("query", "text not null default ''"),
("status", "text not null default 'candidate'"),
("collected_epoch", "integer not null default 0"),
],
"corrections": [
("decision_id", "text not null default ''"),
("status", "text not null default ''"),
],
"audit_events": [
("timestamp", "text not null default ''"),
("actor", "text not null default ''"),
("event", "text not null default ''"),
("object_ref", "text not null default ''"),
],
}
def _ensure_typed_columns(conn: sqlite3.Connection) -> None:
for table, columns in TYPED_COLUMNS.items():
existing = {
str(row["name"])
for row in conn.execute(f"pragma table_info({table})").fetchall()
}
for name, definition in columns:
if name not in existing:
conn.execute(f"alter table {table} add column {name} {definition}")
def _ensure_schema_version(conn: sqlite3.Connection) -> None:
current = int(conn.execute("pragma user_version").fetchone()[0])
if current < SCHEMA_VERSION:
conn.execute(f"pragma user_version = {SCHEMA_VERSION}")
def _ensure_queue_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"""
create table if not exists submission_queues (
id text primary key,
folder_path text not null unique,
label text not null default '',
is_active integer not null default 0 check (is_active in (0, 1)),
created_at text not null default '',
created_epoch integer not null default 0 check (created_epoch >= 0),
last_imported_epoch integer not null default 0 check (last_imported_epoch >= 0),
last_imported_at text not null default ''
)
"""
)
conn.execute(
"create index if not exists idx_submission_queues_active on submission_queues(is_active, id)"
)
def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'"
).fetchall()
for row in rows:
conn.execute(f"drop table if exists {row['name']}")
def _ensure_constrained_schema(conn: sqlite3.Connection) -> None:
pending = [
table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table)
]
if not pending:
return
# Disable foreign-key enforcement for the duration of the rebuilds. The
# rebuild drops each parent table before its children are recreated, and a
# `drop table submissions` while foreign keys are ON would fire evidence's
# `on delete cascade` and silently wipe every evidence row. Toggling FKs off
# (the procedure SQLite documents for constraint-changing migrations) must
# happen with no transaction pending, so commit first and restore on exit.
conn.commit()
# Clean up any half-built temp table left by a prior crashed/failed run so we
# retry cleanly instead of tripping on an orphan.
_drop_stale_rebuild_temp_tables(conn)
conn.execute("pragma foreign_keys = off")
try:
for table in pending:
_rebuild_constrained_table(conn, table)
conn.commit()
except Exception:
# The copy step raises before the original table is dropped (e.g. legacy
# rows violate the new CHECK), so the source data is intact; remove the
# orphaned temp table so the next startup can retry.
_drop_stale_rebuild_temp_tables(conn)
conn.commit()
raise
finally:
conn.execute("pragma foreign_keys = on")
def _table_needs_constraint_rebuild(conn: sqlite3.Connection, table: str) -> bool:
row = conn.execute(
"select sql from sqlite_master where type = 'table' and name = ?",
(table,),
).fetchone()
if row is None:
return False
sql = str(row["sql"] or "").lower()
if "json_valid(payload)" not in sql:
return True
if table == "evidence" and "foreign key" not in sql:
return True
if table == "submissions" and "risk_band in" not in sql:
return True
return False
def _rebuild_constrained_table(conn: sqlite3.Connection, table: str) -> None:
schema = CONSTRAINED_TABLE_SCHEMAS[table]
temp_table = f"__copyrighter_{table}_new"
conn.execute(f"drop table if exists {temp_table}")
conn.execute(schema.format(table=temp_table))
source_columns = _table_columns(conn, table)
target_columns = _table_columns(conn, temp_table)
copied_columns = [column for column in target_columns if column in source_columns]
if copied_columns:
column_sql = ", ".join(copied_columns)
conn.execute(
f"insert into {temp_table} ({column_sql}) select {column_sql} from {table}"
)
conn.execute(f"drop table {table}")
conn.execute(f"alter table {temp_table} rename to {table}")
def _table_columns(conn: sqlite3.Connection, table: str) -> list[str]:
return [str(row["name"]) for row in conn.execute(f"pragma table_info({table})").fetchall()]
def _bounded_int_env(value: str | None, default: int, minimum: int, maximum: int) -> int: def _bounded_int_env(value: str | None, default: int, minimum: int, maximum: int) -> int:
@ -3892,117 +3656,6 @@ def _image_suffix_from_url(url: str) -> str:
return suffix return suffix
CANDIDATE_IMAGE_FETCH_LIMIT = 8_000_000
CANDIDATE_IMAGE_FETCH_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36 CopyrighterCandidateCollector/0.1"
),
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Sec-Fetch-Dest": "image",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
}
PAGE_FETCH_HEADERS = {
"User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/*,*/*;q=0.8",
"Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"],
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
STYLESHEET_FETCH_HEADERS = {
"User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"],
"Accept": "text/css,*/*;q=0.8",
"Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"],
"Sec-Fetch-Dest": "style",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
}
def _fetch_url_bytes(
url: str,
timeout: int = 10,
limit: int = CANDIDATE_IMAGE_FETCH_LIMIT,
referer_url: str = "",
) -> bytes:
headers = _headers_with_referer(CANDIDATE_IMAGE_FETCH_HEADERS, referer_url)
return _fetch_url_bytes_with_headers(url, headers, timeout, limit)
def _fetch_page_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes:
return _fetch_url_bytes_with_headers(url, PAGE_FETCH_HEADERS, timeout, limit)
def _fetch_stylesheet_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes:
return _fetch_url_bytes_with_headers(url, STYLESHEET_FETCH_HEADERS, timeout, limit)
def _assert_public_http_url(url: str) -> None:
# SSRF guard: candidate image / page / stylesheet URLs come from external
# search-result content (attacker-influenceable), so refuse to fetch
# anything that resolves to a non-public address (loopback, RFC1918,
# link-local cloud-metadata 169.254.169.254, etc.).
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.hostname:
raise ValueError("only public http(s) URLs may be fetched")
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
infos = socket.getaddrinfo(parsed.hostname, port, proto=socket.IPPROTO_TCP)
except OSError as exc:
raise ValueError("could not resolve fetch host") from exc
for *_, sockaddr in infos:
ip = ipaddress.ip_address(sockaddr[0])
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_reserved
or ip.is_multicast
or ip.is_unspecified
):
raise ValueError("refusing to fetch internal address")
class _ValidatingRedirectHandler(HTTPRedirectHandler):
# Re-run the SSRF guard on every redirect hop so a public URL cannot bounce
# to an internal address.
def redirect_request(self, req, fp, code, msg, headers, newurl):
_assert_public_http_url(newurl)
return super().redirect_request(req, fp, code, msg, headers, newurl)
_SSRF_SAFE_OPENER = build_opener(_ValidatingRedirectHandler())
def _open_url(request: Request, timeout: int):
return _SSRF_SAFE_OPENER.open(request, timeout=timeout)
def _fetch_url_bytes_with_headers(
url: str,
headers: dict[str, str],
timeout: int,
limit: int,
) -> bytes:
_assert_public_http_url(url)
request = Request(url, headers=headers)
with _open_url(request, timeout) as response:
data = response.read(limit + 1)
if len(data) > limit:
raise ValueError("candidate image exceeds size limit")
return data
def _headers_with_referer(headers: dict[str, str], referer_url: str) -> dict[str, str]:
if not referer_url or not _is_http_url(referer_url):
return headers
return {**headers, "Referer": referer_url}
class _PageImageParser(HTMLParser): class _PageImageParser(HTMLParser):
def __init__(self, parse_noscript: bool = True) -> None: def __init__(self, parse_noscript: bool = True) -> None:
super().__init__() super().__init__()

View file

@ -0,0 +1,125 @@
"""Remote fetch layer for candidate images, pages, and stylesheets.
Extracted from sqlite_store.py. This is the pure network boundary (HTTP fetch
+ SSRF guard); HTML/CSS parsing of the fetched content stays in the store. Tests
monkeypatch _open_url / _assert_public_http_url / _fetch_url_bytes here.
"""
from __future__ import annotations
import ipaddress
import socket
from urllib.parse import urlparse
from urllib.request import HTTPRedirectHandler, Request, build_opener
from rights_filter.server.store_url_utils import _is_http_url
CANDIDATE_IMAGE_FETCH_LIMIT = 8_000_000
CANDIDATE_IMAGE_FETCH_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36 CopyrighterCandidateCollector/0.1"
),
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
"Sec-Fetch-Dest": "image",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
}
PAGE_FETCH_HEADERS = {
"User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"],
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/*,*/*;q=0.8",
"Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"],
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
}
STYLESHEET_FETCH_HEADERS = {
"User-Agent": CANDIDATE_IMAGE_FETCH_HEADERS["User-Agent"],
"Accept": "text/css,*/*;q=0.8",
"Accept-Language": CANDIDATE_IMAGE_FETCH_HEADERS["Accept-Language"],
"Sec-Fetch-Dest": "style",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
}
def _fetch_url_bytes(
url: str,
timeout: int = 10,
limit: int = CANDIDATE_IMAGE_FETCH_LIMIT,
referer_url: str = "",
) -> bytes:
headers = _headers_with_referer(CANDIDATE_IMAGE_FETCH_HEADERS, referer_url)
return _fetch_url_bytes_with_headers(url, headers, timeout, limit)
def _fetch_page_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes:
return _fetch_url_bytes_with_headers(url, PAGE_FETCH_HEADERS, timeout, limit)
def _fetch_stylesheet_url_bytes(url: str, timeout: int = 10, limit: int = CANDIDATE_IMAGE_FETCH_LIMIT) -> bytes:
return _fetch_url_bytes_with_headers(url, STYLESHEET_FETCH_HEADERS, timeout, limit)
def _assert_public_http_url(url: str) -> None:
# SSRF guard: candidate image / page / stylesheet URLs come from external
# search-result content (attacker-influenceable), so refuse to fetch
# anything that resolves to a non-public address (loopback, RFC1918,
# link-local cloud-metadata 169.254.169.254, etc.).
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.hostname:
raise ValueError("only public http(s) URLs may be fetched")
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
infos = socket.getaddrinfo(parsed.hostname, port, proto=socket.IPPROTO_TCP)
except OSError as exc:
raise ValueError("could not resolve fetch host") from exc
for *_, sockaddr in infos:
ip = ipaddress.ip_address(sockaddr[0])
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_reserved
or ip.is_multicast
or ip.is_unspecified
):
raise ValueError("refusing to fetch internal address")
class _ValidatingRedirectHandler(HTTPRedirectHandler):
# Re-run the SSRF guard on every redirect hop so a public URL cannot bounce
# to an internal address.
def redirect_request(self, req, fp, code, msg, headers, newurl):
_assert_public_http_url(newurl)
return super().redirect_request(req, fp, code, msg, headers, newurl)
_SSRF_SAFE_OPENER = build_opener(_ValidatingRedirectHandler())
def _open_url(request: Request, timeout: int):
return _SSRF_SAFE_OPENER.open(request, timeout=timeout)
def _fetch_url_bytes_with_headers(
url: str,
headers: dict[str, str],
timeout: int,
limit: int,
) -> bytes:
_assert_public_http_url(url)
request = Request(url, headers=headers)
with _open_url(request, timeout) as response:
data = response.read(limit + 1)
if len(data) > limit:
raise ValueError("candidate image exceeds size limit")
return data
def _headers_with_referer(headers: dict[str, str], referer_url: str) -> dict[str, str]:
if not referer_url or not _is_http_url(referer_url):
return headers
return {**headers, "Referer": referer_url}

View file

@ -0,0 +1,257 @@
"""SQLite schema definition, typed-column upgrades, and constraint migration.
Extracted from sqlite_store.py. Pure DDL/migration helpers operating on a
passed-in connection; no dependency on the store class. Behavior unchanged.
"""
from __future__ import annotations
import sqlite3
SCHEMA_VERSION = 2
CONSTRAINED_TABLE_SCHEMAS = {
"submissions": """
create table {table} (
id text primary key,
title text not null default '',
risk_score integer not null default 0 check (risk_score >= 0 and risk_score <= 100),
risk_band text not null default 'low' check (risk_band in ('low', 'medium', 'high', 'failed', 'pending')),
decision_status text not null default 'unreviewed' check (decision_status in ('unreviewed', 'held', 'rejected', 'approved', 'corrected')),
submitted_epoch integer not null default 0 check (submitted_epoch >= 0),
queue_id text not null default '',
payload text not null check (json_valid(payload))
)
""",
"evidence": """
create table {table} (
id text primary key,
submission_id text not null,
source text not null default '',
confidence real not null default 0 check (confidence >= 0 and confidence <= 1),
status text not null default 'active' check (status in ('active', 'auto', 'manual', 'queued', 'rerun', 'weak', 'used_for_judgment', 'irrelevant', 'false_positive', 'pending')),
contributed integer not null default 1 check (contributed in (0, 1)),
payload text not null check (json_valid(payload)),
foreign key (submission_id) references submissions(id) on delete cascade
)
""",
"providers": """
create table {table} (
id text primary key,
name text not null default '',
enabled integer not null default 0 check (enabled in (0, 1)),
usage integer not null default 0 check (usage >= 0),
quota integer not null default 0 check (quota >= 0),
payload text not null check (json_valid(payload))
)
""",
"knowledge_entries": """
create table {table} (
id text primary key,
name text not null default '',
entry_type text not null default 'other',
entry_status text not null default 'confirmed' check (entry_status in ('confirmed', 'watchlist', 'excluded')),
active integer not null default 1 check (active in (0, 1)),
source_submission_id text not null default '',
payload text not null check (json_valid(payload))
)
""",
"collection_candidates": """
create table {table} (
id text primary key,
provider text not null default '',
query text not null default '',
status text not null default 'candidate' check (status in ('candidate', 'promoted')),
collected_epoch integer not null default 0 check (collected_epoch >= 0),
payload text not null check (json_valid(payload))
)
""",
"corrections": """
create table {table} (
id text primary key,
decision_id text not null default '',
status text not null default '',
payload text not null check (json_valid(payload))
)
""",
"audit_events": """
create table {table} (
id integer primary key autoincrement,
timestamp text not null default '',
actor text not null default '',
event text not null default '',
object_ref text not null default '',
payload text not null check (json_valid(payload))
)
""",
}
LEGACY_REBUILD_ORDER = (
"submissions",
"providers",
"knowledge_entries",
"collection_candidates",
"corrections",
"audit_events",
"evidence",
)
TYPED_COLUMNS = {
"submissions": [
("title", "text not null default ''"),
("risk_score", "integer not null default 0"),
("risk_band", "text not null default 'low'"),
("decision_status", "text not null default 'unreviewed'"),
("submitted_epoch", "integer not null default 0"),
("queue_id", "text not null default ''"),
],
"evidence": [
("source", "text not null default ''"),
("confidence", "real not null default 0"),
("status", "text not null default 'active'"),
("contributed", "integer not null default 1"),
],
"providers": [
("name", "text not null default ''"),
("enabled", "integer not null default 0"),
("usage", "integer not null default 0"),
("quota", "integer not null default 0"),
],
"knowledge_entries": [
("name", "text not null default ''"),
("entry_type", "text not null default 'other'"),
("entry_status", "text not null default 'confirmed'"),
("active", "integer not null default 1"),
("source_submission_id", "text not null default ''"),
],
"collection_candidates": [
("provider", "text not null default ''"),
("query", "text not null default ''"),
("status", "text not null default 'candidate'"),
("collected_epoch", "integer not null default 0"),
],
"corrections": [
("decision_id", "text not null default ''"),
("status", "text not null default ''"),
],
"audit_events": [
("timestamp", "text not null default ''"),
("actor", "text not null default ''"),
("event", "text not null default ''"),
("object_ref", "text not null default ''"),
],
}
def _ensure_typed_columns(conn: sqlite3.Connection) -> None:
for table, columns in TYPED_COLUMNS.items():
existing = {
str(row["name"])
for row in conn.execute(f"pragma table_info({table})").fetchall()
}
for name, definition in columns:
if name not in existing:
conn.execute(f"alter table {table} add column {name} {definition}")
def _ensure_schema_version(conn: sqlite3.Connection) -> None:
current = int(conn.execute("pragma user_version").fetchone()[0])
if current < SCHEMA_VERSION:
conn.execute(f"pragma user_version = {SCHEMA_VERSION}")
def _ensure_queue_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"""
create table if not exists submission_queues (
id text primary key,
folder_path text not null unique,
label text not null default '',
is_active integer not null default 0 check (is_active in (0, 1)),
created_at text not null default '',
created_epoch integer not null default 0 check (created_epoch >= 0),
last_imported_epoch integer not null default 0 check (last_imported_epoch >= 0),
last_imported_at text not null default ''
)
"""
)
conn.execute(
"create index if not exists idx_submission_queues_active on submission_queues(is_active, id)"
)
def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'"
).fetchall()
for row in rows:
conn.execute(f"drop table if exists {row['name']}")
def _ensure_constrained_schema(conn: sqlite3.Connection) -> None:
pending = [
table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table)
]
if not pending:
return
# Disable foreign-key enforcement for the duration of the rebuilds. The
# rebuild drops each parent table before its children are recreated, and a
# `drop table submissions` while foreign keys are ON would fire evidence's
# `on delete cascade` and silently wipe every evidence row. Toggling FKs off
# (the procedure SQLite documents for constraint-changing migrations) must
# happen with no transaction pending, so commit first and restore on exit.
conn.commit()
# Clean up any half-built temp table left by a prior crashed/failed run so we
# retry cleanly instead of tripping on an orphan.
_drop_stale_rebuild_temp_tables(conn)
conn.execute("pragma foreign_keys = off")
try:
for table in pending:
_rebuild_constrained_table(conn, table)
conn.commit()
except Exception:
# The copy step raises before the original table is dropped (e.g. legacy
# rows violate the new CHECK), so the source data is intact; remove the
# orphaned temp table so the next startup can retry.
_drop_stale_rebuild_temp_tables(conn)
conn.commit()
raise
finally:
conn.execute("pragma foreign_keys = on")
def _table_needs_constraint_rebuild(conn: sqlite3.Connection, table: str) -> bool:
row = conn.execute(
"select sql from sqlite_master where type = 'table' and name = ?",
(table,),
).fetchone()
if row is None:
return False
sql = str(row["sql"] or "").lower()
if "json_valid(payload)" not in sql:
return True
if table == "evidence" and "foreign key" not in sql:
return True
if table == "submissions" and "risk_band in" not in sql:
return True
return False
def _rebuild_constrained_table(conn: sqlite3.Connection, table: str) -> None:
schema = CONSTRAINED_TABLE_SCHEMAS[table]
temp_table = f"__copyrighter_{table}_new"
conn.execute(f"drop table if exists {temp_table}")
conn.execute(schema.format(table=temp_table))
source_columns = _table_columns(conn, table)
target_columns = _table_columns(conn, temp_table)
copied_columns = [column for column in target_columns if column in source_columns]
if copied_columns:
column_sql = ", ".join(copied_columns)
conn.execute(
f"insert into {temp_table} ({column_sql}) select {column_sql} from {table}"
)
conn.execute(f"drop table {table}")
conn.execute(f"alter table {temp_table} rename to {table}")
def _table_columns(conn: sqlite3.Connection, table: str) -> list[str]:
return [str(row["name"]) for row in conn.execute(f"pragma table_info({table})").fetchall()]

View file

@ -12,6 +12,7 @@ from rights_filter.analysis.preprocessing import ImagePayload, build_face_crop_d
from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_remote_fetch as remote_fetch_module
from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime from rights_filter.integrations.env_clients import build_provider_runtime
@ -322,8 +323,8 @@ def test_fetch_url_bytes_uses_browser_image_headers_and_room_for_real_photos(mon
captured["timeout"] = timeout captured["timeout"] = timeout
return FakeResponse() return FakeResponse()
monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen)
monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None)
content = sqlite_store_module._fetch_url_bytes("https://cdn.example.test/profile.webp") content = sqlite_store_module._fetch_url_bytes("https://cdn.example.test/profile.webp")
@ -3633,8 +3634,8 @@ def test_sqlite_store_uses_page_and_image_specific_headers_for_default_fetcher(
return FakeResponse(submitted_image) return FakeResponse(submitted_image)
raise AssertionError(f"unexpected URL fetched: {request.full_url}") raise AssertionError(f"unexpected URL fetched: {request.full_url}")
monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen)
monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None)
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime)
store.initialize() store.initialize()
@ -3722,8 +3723,8 @@ def test_sqlite_store_uses_source_page_referer_for_default_page_image_fetch(
return FakeResponse(submitted_image) return FakeResponse(submitted_image)
raise AssertionError(f"unexpected URL fetched: {request.full_url}") raise AssertionError(f"unexpected URL fetched: {request.full_url}")
monkeypatch.setattr(sqlite_store_module, "_open_url", fake_urlopen) monkeypatch.setattr(remote_fetch_module, "_open_url", fake_urlopen)
monkeypatch.setattr(sqlite_store_module, "_assert_public_http_url", lambda url: None) monkeypatch.setattr(remote_fetch_module, "_assert_public_http_url", lambda url: None)
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime)
store.initialize() store.initialize()