fix: PII retention, write-race serialization, and correctness fixes

Governance: purge biometric face crops past a retention window (env
COPYRIGHTER_FACE_CROP_RETENTION_DAYS, default 90d) with an audit trail, run
at startup and reload; audit personal-image transmission to external Vision.
Concurrency: a process write lock + atomic provider-usage delta stop lost
counter updates; candidate promotion is idempotent (deterministic id + status
guard); seeding is serialized. Correctness: skip LLM summarize when a summary
already exists; constraint migration cleans orphan temp tables on failure.
Add provider-readiness startup log. Tests pin all of the above plus risk-band
boundaries (29/30/69/70, 100 cap) and media path-traversal guards.
This commit is contained in:
유창욱 2026-06-20 18:44:08 +09:00
parent 1abb1107a2
commit 7f5799e5e1
6 changed files with 377 additions and 41 deletions

View file

@ -90,10 +90,13 @@ class EvidenceEnricher:
EvidenceSource.FINGERPRINT, EvidenceSource.FINGERPRINT,
} }
] ]
llm_summary = self.llm_assistant.summarize(submission_id, source_evidence) # Only invoke the LLM when no summary exists yet. Previously summarize()
if llm_summary.source == EvidenceSource.ENRICHMENT_FAILURE: # ran on every rerun and its result was discarded (and its failure
summary.summary_failures += 1 # counted) when a summary was already present.
if not _has_existing_llm_summary(run.evidence): if not _has_existing_llm_summary(run.evidence):
llm_summary = self.llm_assistant.summarize(submission_id, source_evidence)
if llm_summary.source == EvidenceSource.ENRICHMENT_FAILURE:
summary.summary_failures += 1
new_evidence.append(llm_summary) new_evidence.append(llm_summary)
for item in new_evidence: for item in new_evidence:

View file

@ -11,6 +11,18 @@ from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.server.sqlite_store import CopyrighterStore
def _provider_readiness(runtime) -> str:
state = {
"Naver search": runtime.naver_adapter is not None,
"Google Vision web detection": runtime.google_adapter is not None,
"Google custom search": runtime.google_custom_search_adapter is not None,
"LLM (Ollama)": runtime.llm_assistant is not None,
}
return ", ".join(
f"{name}: {'ready' if ok else 'disabled (no credentials)'}" for name, ok in state.items()
)
def main() -> None: def main() -> None:
parser = argparse.ArgumentParser(description="Run the local Copyrighter API server.") parser = argparse.ArgumentParser(description="Run the local Copyrighter API server.")
parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--host", default="127.0.0.1")
@ -23,9 +35,11 @@ def main() -> None:
load_env_file(Path(args.env), os.environ) load_env_file(Path(args.env), os.environ)
provider_runtime = build_provider_runtime(os.environ) provider_runtime = build_provider_runtime(os.environ)
print("Provider readiness: " + _provider_readiness(provider_runtime))
image_store = LocalSubmissionImageStore(Path(args.images)) image_store = LocalSubmissionImageStore(Path(args.images))
store = CopyrighterStore(Path(args.db), provider_runtime=provider_runtime) store = CopyrighterStore(Path(args.db), provider_runtime=provider_runtime)
store.initialize() store.initialize()
store.purge_expired_face_crops()
auth_token = os.environ.get("COPYRIGHTER_AUTH_TOKEN", "").strip() or None auth_token = os.environ.get("COPYRIGHTER_AUTH_TOKEN", "").strip() or None
if auth_token is None: if auth_token is None:

View file

@ -11,6 +11,7 @@ import re
import shutil import shutil
import socket import socket
import sqlite3 import sqlite3
import threading
from html.parser import HTMLParser from html.parser import HTMLParser
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import replace from dataclasses import replace
@ -88,6 +89,7 @@ DEFAULT_QUERY_COVERAGE_GOOD_THRESHOLD = 70
DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40 DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40
MIN_COVERAGE_THRESHOLD = 0 MIN_COVERAGE_THRESHOLD = 0
MAX_COVERAGE_THRESHOLD = 100 MAX_COVERAGE_THRESHOLD = 100
DEFAULT_FACE_CROP_RETENTION_DAYS = 90
CONSTRAINED_TABLE_SCHEMAS = { CONSTRAINED_TABLE_SCHEMAS = {
"submissions": """ "submissions": """
create table {table} ( create table {table} (
@ -308,6 +310,14 @@ def _ensure_queue_schema(conn: sqlite3.Connection) -> None:
) )
def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None:
rows = conn.execute(
"select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'"
).fetchall()
for row in rows:
conn.execute(f"drop table if exists {row['name']}")
def _ensure_constrained_schema(conn: sqlite3.Connection) -> None: def _ensure_constrained_schema(conn: sqlite3.Connection) -> None:
pending = [ pending = [
table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table) table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table)
@ -321,11 +331,21 @@ def _ensure_constrained_schema(conn: sqlite3.Connection) -> None:
# (the procedure SQLite documents for constraint-changing migrations) must # (the procedure SQLite documents for constraint-changing migrations) must
# happen with no transaction pending, so commit first and restore on exit. # happen with no transaction pending, so commit first and restore on exit.
conn.commit() conn.commit()
# Clean up any half-built temp table left by a prior crashed/failed run so we
# retry cleanly instead of tripping on an orphan.
_drop_stale_rebuild_temp_tables(conn)
conn.execute("pragma foreign_keys = off") conn.execute("pragma foreign_keys = off")
try: try:
for table in pending: for table in pending:
_rebuild_constrained_table(conn, table) _rebuild_constrained_table(conn, table)
conn.commit() conn.commit()
except Exception:
# The copy step raises before the original table is dropped (e.g. legacy
# rows violate the new CHECK), so the source data is intact; remove the
# orphaned temp table so the next startup can retry.
_drop_stale_rebuild_temp_tables(conn)
conn.commit()
raise
finally: finally:
conn.execute("pragma foreign_keys = on") conn.execute("pragma foreign_keys = on")
@ -408,6 +428,14 @@ class CopyrighterStore:
self.collection_public_prefix = collection_public_prefix.rstrip("/") self.collection_public_prefix = collection_public_prefix.rstrip("/")
self.face_crop_image_dir = self.db_path.parent / "face-crops" self.face_crop_image_dir = self.db_path.parent / "face-crops"
self.face_crop_public_prefix = "/face-crop-media" self.face_crop_public_prefix = "/face-crop-media"
# Biometric face crops are personal data: purge them after this many days.
# 0 = keep indefinitely. Configurable via env, clamped to a sane range.
self.face_crop_retention_days = _bounded_int_env(
os.environ.get("COPYRIGHTER_FACE_CROP_RETENTION_DAYS"),
default=DEFAULT_FACE_CROP_RETENTION_DAYS,
minimum=0,
maximum=3650,
)
self.coverage_thresholds = { self.coverage_thresholds = {
"coverageGoodRate": _bounded_int_env( "coverageGoodRate": _bounded_int_env(
os.environ.get("COPYRIGHTER_COVERAGE_GOOD_THRESHOLD"), os.environ.get("COPYRIGHTER_COVERAGE_GOOD_THRESHOLD"),
@ -438,6 +466,11 @@ class CopyrighterStore:
self.candidate_image_fetcher = candidate_image_fetcher or _fetch_url_bytes self.candidate_image_fetcher = candidate_image_fetcher or _fetch_url_bytes
self.page_fetcher = candidate_image_fetcher or _fetch_page_url_bytes self.page_fetcher = candidate_image_fetcher or _fetch_page_url_bytes
self.stylesheet_fetcher = candidate_image_fetcher or _fetch_stylesheet_url_bytes self.stylesheet_fetcher = candidate_image_fetcher or _fetch_stylesheet_url_bytes
# The server is a ThreadingHTTPServer; serialize read-modify-write store
# operations (provider usage counters, candidate promotion, seeding)
# across request threads to prevent lost updates / duplicate side effects.
# Reentrant so a guarded method may call another guarded helper.
self._write_lock = threading.RLock()
def initialize(self) -> None: def initialize(self) -> None:
self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db_path.parent.mkdir(parents=True, exist_ok=True)
@ -677,6 +710,14 @@ class CopyrighterStore:
conn.close() conn.close()
def seed_from_image_store(self, image_store: LocalSubmissionImageStore) -> int: def seed_from_image_store(self, image_store: LocalSubmissionImageStore) -> int:
# Serialize seeding so two concurrent reload/import/upload requests can't
# both classify the same record as new and double-run analysis, external
# searches, and audit writes for it.
with self._write_lock:
return self._seed_from_image_store_locked(image_store)
def _seed_from_image_store_locked(self, image_store: LocalSubmissionImageStore) -> int:
self.purge_expired_face_crops()
queue = self.ensure_queue(image_store.root) queue = self.ensure_queue(image_store.root)
queue_id = str(queue["id"]) queue_id = str(queue["id"])
now = datetime.now().isoformat(" ", "seconds") now = datetime.now().isoformat(" ", "seconds")
@ -1369,7 +1410,6 @@ class CopyrighterStore:
}, },
) )
self._put("submissions", submission_id, submission) self._put("submissions", submission_id, submission)
provider_payload["usage"] += provider_call_count
if any( if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence for item in domain_evidence
@ -1378,7 +1418,7 @@ class CopyrighterStore:
else: else:
provider_payload["lastSuccess"] = _now_label() provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음" provider_payload["lastFailure"] = "없음"
self._put("providers", provider, provider_payload) self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self._ensure_llm_summary(submission_id) self._ensure_llm_summary(submission_id)
if image_store is not None: if image_store is not None:
self._rescore_submission(submission_id) self._rescore_submission(submission_id)
@ -1423,13 +1463,12 @@ class CopyrighterStore:
_evidence_id(submission_id, llm_evidence), _evidence_id(submission_id, llm_evidence),
_evidence_payload(submission_id, llm_evidence), _evidence_payload(submission_id, llm_evidence),
) )
llm_provider["usage"] += 1
if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE: if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE:
llm_provider["lastFailure"] = llm_evidence.reason llm_provider["lastFailure"] = llm_evidence.reason
else: else:
llm_provider["lastSuccess"] = _now_label() llm_provider["lastSuccess"] = _now_label()
llm_provider["lastFailure"] = "없음" llm_provider["lastFailure"] = "없음"
self._put("providers", "llm", llm_provider) self._apply_provider_usage_delta("llm", 1, llm_provider)
return True return True
def _delete_llm_summary_evidence(self, submission_id: str) -> None: def _delete_llm_summary_evidence(self, submission_id: str) -> None:
@ -1663,7 +1702,6 @@ class CopyrighterStore:
self._put("collection_candidates", candidate["id"], candidate) self._put("collection_candidates", candidate["id"], candidate)
collected += 1 collected += 1
provider_payload["usage"] += provider_call_count
if any( if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence for item in domain_evidence
@ -1672,39 +1710,50 @@ class CopyrighterStore:
else: else:
provider_payload["lastSuccess"] = _now_label() provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음" provider_payload["lastFailure"] = "없음"
self._put("providers", provider, provider_payload) self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates") self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates")
payload = self.bootstrap() payload = self.bootstrap()
payload["collected"] = collected payload["collected"] = collected
return payload return payload
def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]: def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]:
candidate = self._get("collection_candidates", candidate_id) with self._write_lock:
entry_id = f"kb-candidate-{_timestamp_id()}" candidate = self._get("collection_candidates", candidate_id)
query = str(candidate.get("query", "")) # Idempotent: a double-click / retry must not create a second
name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query # confirmed knowledge entry for the same candidate.
memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}" if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"):
entry = { return self.bootstrap()
"id": entry_id, # Deterministic id so a racing retry upserts the same row instead of
"name": name, # minting a new timestamped id.
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), entry_id = _stable_id("kb-candidate", candidate_id)
"aliases": _text_list(payload.get("aliases")) or [query], query = str(candidate.get("query", ""))
"keywords": _text_list(payload.get("keywords")) or [query], name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query
"memo": memo, memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}"
"provenance": "manual", entry = {
"active": True, "id": entry_id,
"entryStatus": "confirmed", "name": name,
"sourceDecision": "", "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"sourceCandidate": candidate_id, "aliases": _text_list(payload.get("aliases")) or [query],
"sampleFingerprints": _text_list(candidate.get("sampleFingerprints")), "keywords": _text_list(payload.get("keywords")) or [query],
"imageAsset": str(candidate.get("imageAsset", "")), "memo": memo,
"imageFacts": candidate.get("imageFacts", {}), "provenance": "manual",
} "active": True,
self._put("knowledge_entries", entry_id, entry) "entryStatus": "confirmed",
candidate["status"] = "promoted" "sourceDecision": "",
candidate["promotedKnowledgeId"] = entry_id "sourceCandidate": candidate_id,
self._put("collection_candidates", candidate_id, candidate) "sampleFingerprints": _text_list(candidate.get("sampleFingerprints")),
self.add_audit_event("rights.ops", "Knowledge entry manually created", name, f"promoted candidate {candidate_id}") "imageAsset": str(candidate.get("imageAsset", "")),
"imageFacts": candidate.get("imageFacts", {}),
}
with self._transaction() as conn:
self._put("knowledge_entries", entry_id, entry, conn=conn)
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", candidate_id, candidate, conn=conn)
self.add_audit_event(
"rights.ops", "Knowledge entry manually created", name,
f"promoted candidate {candidate_id}", conn=conn,
)
return self.bootstrap() return self.bootstrap()
def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]: def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]:
@ -2013,6 +2062,47 @@ class CopyrighterStore:
submission["faceCrops"] = face_crops submission["faceCrops"] = face_crops
self._put("submissions", submission_id, submission) self._put("submissions", submission_id, submission)
def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int:
# Delete biometric face-crop files older than the retention window and
# drop their references from the owning submission, with an audit trail.
retention_days = int(self.face_crop_retention_days)
if retention_days <= 0 or not self.face_crop_image_dir.exists():
return 0
now = now_epoch if now_epoch is not None else datetime.now().timestamp()
cutoff = now - retention_days * 86400
purged = 0
with self._write_lock:
for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()):
submission_id = crop_dir.name
removed: list[str] = []
for crop_file in crop_dir.glob("crop-*.jpg"):
try:
if crop_file.stat().st_mtime >= cutoff:
continue
crop_file.unlink()
except OSError:
continue
removed.append(crop_file.name)
purged += 1
if not removed:
continue
change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)"
try:
submission = self._get("submissions", submission_id)
except KeyError:
self.add_audit_event("system", "Face crop purged", submission_id, change)
continue
remaining = [
crop
for crop in submission.get("faceCrops", [])
if f"crop-{crop.get('index')}.jpg" not in removed
]
with self._transaction() as conn:
submission["faceCrops"] = remaining
self._put("submissions", submission_id, submission, conn=conn)
self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn)
return purged
def _refresh_existing_submission_file_facts( def _refresh_existing_submission_file_facts(
self, self,
image_store: LocalSubmissionImageStore, image_store: LocalSubmissionImageStore,
@ -2092,7 +2182,6 @@ class CopyrighterStore:
) )
google_provider = self._get("providers", "google") google_provider = self._get("providers", "google")
google_provider["usage"] += call_count
if any( if any(
item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED} item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED}
for item in domain_evidence for item in domain_evidence
@ -2101,7 +2190,7 @@ class CopyrighterStore:
else: else:
google_provider["lastSuccess"] = _now_label() google_provider["lastSuccess"] = _now_label()
google_provider["lastFailure"] = "없음" google_provider["lastFailure"] = "없음"
self._put("providers", "google", google_provider) self._apply_provider_usage_delta("google", call_count, google_provider)
self.add_audit_event( self.add_audit_event(
"rights.ops", "rights.ops",
"Provider called", "Provider called",
@ -2165,6 +2254,16 @@ class CopyrighterStore:
) )
if crop_matches: if crop_matches:
evidence.extend(crop_matches) evidence.extend(crop_matches)
if call_count:
# Governance: personal (biometric) images left the system to a
# third-party API. The external call itself is permitted, but the
# transmission must be auditable.
self.add_audit_event(
"system",
"Personal image sent to external provider",
submission_id,
f"{call_count} face crop(s) -> Google Vision web detection",
)
return evidence, call_count return evidence, call_count
def _auto_google_custom_search( def _auto_google_custom_search(
@ -2317,7 +2416,6 @@ class CopyrighterStore:
self._put("submissions", submission_id, submission) self._put("submissions", submission_id, submission)
google_search_provider = self._get("providers", "google_search") google_search_provider = self._get("providers", "google_search")
google_search_provider["usage"] += external_call_count
if any( if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence for item in all_domain_evidence
@ -2326,7 +2424,7 @@ class CopyrighterStore:
else: else:
google_search_provider["lastSuccess"] = _now_label() google_search_provider["lastSuccess"] = _now_label()
google_search_provider["lastFailure"] = "없음" google_search_provider["lastFailure"] = "없음"
self._put("providers", "google_search", google_search_provider) self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider)
self.add_audit_event( self.add_audit_event(
"system", "system",
"Provider called", "Provider called",
@ -2551,7 +2649,6 @@ class CopyrighterStore:
self._put("submissions", submission_id, submission) self._put("submissions", submission_id, submission)
naver_provider = self._get("providers", "naver") naver_provider = self._get("providers", "naver")
naver_provider["usage"] += external_call_count
if any( if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence for item in all_domain_evidence
@ -2560,7 +2657,7 @@ class CopyrighterStore:
else: else:
naver_provider["lastSuccess"] = _now_label() naver_provider["lastSuccess"] = _now_label()
naver_provider["lastFailure"] = "없음" naver_provider["lastFailure"] = "없음"
self._put("providers", "naver", naver_provider) self._apply_provider_usage_delta("naver", external_call_count, naver_provider)
self.add_audit_event( self.add_audit_event(
"system", "system",
"Provider called", "Provider called",
@ -3476,6 +3573,18 @@ class CopyrighterStore:
raise KeyError(id_value) raise KeyError(id_value)
return json.loads(row["payload"]) return json.loads(row["payload"])
def _apply_provider_usage_delta(
self, provider_id: str, count: int, payload: dict[str, Any]
) -> None:
# Re-read current usage under the write lock and apply the delta so
# concurrent provider calls don't lose increments. The rest of `payload`
# (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for
# status fields.
with self._write_lock:
current = self._get("providers", provider_id)
payload["usage"] = int(current.get("usage", 0) or 0) + int(count)
self._put("providers", provider_id, payload)
def _clear_collection_candidates(self) -> None: def _clear_collection_candidates(self) -> None:
with self._connect() as conn: with self._connect() as conn:
conn.execute("delete from collection_candidates") conn.execute("delete from collection_candidates")

View file

@ -110,3 +110,34 @@ def test_missing_analysis_run_returns_failure_summary():
assert summary.failed == 1 assert summary.failed == 1
assert "missing analysis run" in summary.failure_reasons[0] assert "missing analysis run" in summary.failure_reasons[0]
def test_enrichment_skips_llm_when_summary_already_present():
repo = InMemoryRightsFilterRepository()
run = AnalysisRun.for_submission("submission-1", "v1")
run.add_evidence(
Evidence(
source=EvidenceSource.WEB_DETECTION,
reason="Web entity matched IU",
confidence=0.9,
data={"entity": "IU", "category": "celebrity"},
)
)
run.add_evidence(
Evidence(source=EvidenceSource.LLM_SUMMARY, reason="existing summary", confidence=0.0, data={})
)
repo.save_analysis_run(run)
enricher = _enricher()
calls: list[str] = []
original = enricher.llm_assistant.summarize
def counting(submission_id, source_evidence):
calls.append(submission_id)
return original(submission_id, source_evidence)
enricher.llm_assistant.summarize = counting
enricher.enrich_latest(repo, "submission-1")
# summarize must not run when a summary already exists (result was discarded).
assert calls == []

View file

@ -177,3 +177,55 @@ def test_multiple_google_visual_similar_results_do_not_stack_into_high_risk():
assert result.band == "medium" assert result.band == "medium"
assert result.score < 70 assert result.score < 70
def _fingerprint(similarity: float, reason: str = "Prior rejected image similarity") -> Evidence:
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason=reason,
confidence=similarity,
data={"similarity": similarity},
)
def test_score_of_exactly_30_is_medium_not_low():
# A single weak fingerprint contributes +30 — the medium boundary.
result = RiskScorer().score([_fingerprint(0.5)])
assert result.score == 30
assert result.band == "medium"
def test_score_just_below_70_is_medium():
# Face (+35) + weak fingerprint (+30) = 65.
result = RiskScorer().score(
[
Evidence(source=EvidenceSource.FACE_PERSON, reason="Face/person detected", confidence=0.8, data={"face_count": 1}),
_fingerprint(0.5),
]
)
assert result.score == 65
assert result.band == "medium"
def test_score_of_exactly_70_is_high_not_medium():
# Face (+35) + partial web detection (+35) = 70 — the high boundary.
result = RiskScorer().score(
[
Evidence(source=EvidenceSource.FACE_PERSON, reason="Face/person detected", confidence=0.8, data={"face_count": 1}),
Evidence(source=EvidenceSource.WEB_DETECTION, reason="Partial match", confidence=0.9, data={"match": "partial"}),
]
)
assert result.score == 70
assert result.band == "high"
def test_score_is_clamped_to_100():
# Strong fingerprint (+80) + full web detection (+45) = 125 -> clamped to 100.
result = RiskScorer().score(
[
_fingerprint(0.95),
Evidence(source=EvidenceSource.WEB_DETECTION, reason="Full match", confidence=0.9, data={"match": "full"}),
]
)
assert result.score == 100
assert result.band == "high"

View file

@ -8050,3 +8050,130 @@ def test_fetch_rejects_internal_or_non_http_hosts(url):
# SSRF guard runs before any network call, so these raise without a fetch. # SSRF guard runs before any network call, so these raise without a fetch.
with pytest.raises(ValueError): with pytest.raises(ValueError):
sqlite_store_module._fetch_url_bytes(url) sqlite_store_module._fetch_url_bytes(url)
@pytest.mark.parametrize(
"method_name",
["knowledge_media_path", "collected_media_path", "face_crop_media_path"],
)
@pytest.mark.parametrize("evil", ["../../etc/passwd", "../../../secret", "../escape.txt"])
def test_media_path_guards_reject_traversal(tmp_path: Path, method_name: str, evil: str):
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
method = getattr(store, method_name)
with pytest.raises(ValueError):
method(evil)
def test_promote_collection_candidate_is_idempotent(tmp_path: Path):
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
store._put(
"collection_candidates",
"CAND-1",
{
"id": "CAND-1",
"provider": "naver",
"query": "iu",
"title": "IU",
"status": "candidate",
"sourceUrl": "",
"collectedEpoch": 0,
"sampleFingerprints": [],
},
)
store.promote_collection_candidate("CAND-1", {})
store.promote_collection_candidate("CAND-1", {}) # double-click / retry
candidate_entries = [
entry for entry in store._all("knowledge_entries") if entry.get("sourceCandidate") == "CAND-1"
]
assert len(candidate_entries) == 1
assert store._get("collection_candidates", "CAND-1")["status"] == "promoted"
def test_constraint_migration_failure_preserves_data_and_cleans_temp(tmp_path: Path):
db_path = tmp_path / "legacy.sqlite3"
legacy = sqlite3.connect(db_path)
# Legacy submissions table without the new risk_band / json_valid constraints.
legacy.execute(
"create table submissions (id text primary key, title text, risk_score integer, "
"risk_band text, decision_status text, submitted_epoch integer, queue_id text, payload text)"
)
# A value the NEW CHECK rejects, so the rebuild copy step raises.
legacy.execute(
"insert into submissions values ('S1', 't', 0, 'weird', 'unreviewed', 0, '', '{}')"
)
legacy.commit()
legacy.close()
store = CopyrighterStore(db_path)
with pytest.raises(sqlite3.IntegrityError):
store.initialize()
check = sqlite3.connect(db_path)
try:
# Original data survives (copy failed before the original was dropped).
assert check.execute("select count(*) from submissions").fetchone()[0] == 1
# No orphaned rebuild temp table is left behind.
orphan = check.execute(
"select count(*) from sqlite_master where name like '__copyrighter_%_new'"
).fetchone()[0]
assert orphan == 0
finally:
check.close()
def test_purge_expired_face_crops_removes_old_biometric_data(tmp_path: Path):
import os
import time
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
store.face_crop_retention_days = 30
crop_dir = store.face_crop_image_dir / "SUB-1"
crop_dir.mkdir(parents=True)
crop_file = crop_dir / "crop-1.jpg"
crop_file.write_bytes(b"jpegdata")
aged = time.time() - 60 * 86400
os.utime(crop_file, (aged, aged))
store._put(
"submissions",
"SUB-1",
{
"id": "SUB-1",
"title": "t",
"asset": "",
"riskScore": 0,
"riskBand": "low",
"decisionStatus": "unreviewed",
"providerState": {},
"fileFacts": {},
"evidence": [],
"faceCrops": [{"index": 1, "url": "/face-crop-media/SUB-1/crop-1.jpg", "box": [0, 0, 1, 1]}],
},
)
purged = store.purge_expired_face_crops()
assert purged == 1
assert not crop_file.exists()
assert store._get("submissions", "SUB-1")["faceCrops"] == []
assert any(event["event"] == "Face crop purged" for event in store.audit_events())
def test_purge_expired_face_crops_keeps_recent_crops(tmp_path: Path):
store = CopyrighterStore(tmp_path / "c.sqlite3")
store.initialize()
store.face_crop_retention_days = 30
crop_dir = store.face_crop_image_dir / "SUB-2"
crop_dir.mkdir(parents=True)
crop_file = crop_dir / "crop-1.jpg"
crop_file.write_bytes(b"jpegdata") # fresh mtime
assert store.purge_expired_face_crops() == 0
assert crop_file.exists()