From 7f5799e5e19ac6c6822f4a6f2383e1ccd63e9e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 18:44:08 +0900 Subject: [PATCH] fix: PII retention, write-race serialization, and correctness fixes Governance: purge biometric face crops past a retention window (env COPYRIGHTER_FACE_CROP_RETENTION_DAYS, default 90d) with an audit trail, run at startup and reload; audit personal-image transmission to external Vision. Concurrency: a process write lock + atomic provider-usage delta stop lost counter updates; candidate promotion is idempotent (deterministic id + status guard); seeding is serialized. Correctness: skip LLM summarize when a summary already exists; constraint migration cleans orphan temp tables on failure. Add provider-readiness startup log. Tests pin all of the above plus risk-band boundaries (29/30/69/70, 100 cap) and media path-traversal guards. --- .../analysis/evidence_enrichment.py | 9 +- src/rights_filter/server/__main__.py | 14 ++ src/rights_filter/server/sqlite_store.py | 185 ++++++++++++++---- .../analysis/test_evidence_enrichment.py | 31 +++ .../analysis/test_risk_scoring.py | 52 +++++ .../rights_filter/server/test_sqlite_store.py | 127 ++++++++++++ 6 files changed, 377 insertions(+), 41 deletions(-) diff --git a/src/rights_filter/analysis/evidence_enrichment.py b/src/rights_filter/analysis/evidence_enrichment.py index 988316d..21fc9db 100644 --- a/src/rights_filter/analysis/evidence_enrichment.py +++ b/src/rights_filter/analysis/evidence_enrichment.py @@ -90,10 +90,13 @@ class EvidenceEnricher: EvidenceSource.FINGERPRINT, } ] - llm_summary = self.llm_assistant.summarize(submission_id, source_evidence) - if llm_summary.source == EvidenceSource.ENRICHMENT_FAILURE: - summary.summary_failures += 1 + # Only invoke the LLM when no summary exists yet. Previously summarize() + # ran on every rerun and its result was discarded (and its failure + # counted) when a summary was already present. if not _has_existing_llm_summary(run.evidence): + llm_summary = self.llm_assistant.summarize(submission_id, source_evidence) + if llm_summary.source == EvidenceSource.ENRICHMENT_FAILURE: + summary.summary_failures += 1 new_evidence.append(llm_summary) for item in new_evidence: diff --git a/src/rights_filter/server/__main__.py b/src/rights_filter/server/__main__.py index 5fd8a90..6192655 100644 --- a/src/rights_filter/server/__main__.py +++ b/src/rights_filter/server/__main__.py @@ -11,6 +11,18 @@ from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server.sqlite_store import CopyrighterStore +def _provider_readiness(runtime) -> str: + state = { + "Naver search": runtime.naver_adapter is not None, + "Google Vision web detection": runtime.google_adapter is not None, + "Google custom search": runtime.google_custom_search_adapter is not None, + "LLM (Ollama)": runtime.llm_assistant is not None, + } + return ", ".join( + f"{name}: {'ready' if ok else 'disabled (no credentials)'}" for name, ok in state.items() + ) + + def main() -> None: parser = argparse.ArgumentParser(description="Run the local Copyrighter API server.") parser.add_argument("--host", default="127.0.0.1") @@ -23,9 +35,11 @@ def main() -> None: load_env_file(Path(args.env), os.environ) provider_runtime = build_provider_runtime(os.environ) + print("Provider readiness: " + _provider_readiness(provider_runtime)) image_store = LocalSubmissionImageStore(Path(args.images)) store = CopyrighterStore(Path(args.db), provider_runtime=provider_runtime) store.initialize() + store.purge_expired_face_crops() auth_token = os.environ.get("COPYRIGHTER_AUTH_TOKEN", "").strip() or None if auth_token is None: diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index c18123b..fdf21de 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -11,6 +11,7 @@ import re import shutil import socket import sqlite3 +import threading from html.parser import HTMLParser from contextlib import contextmanager from dataclasses import replace @@ -88,6 +89,7 @@ DEFAULT_QUERY_COVERAGE_GOOD_THRESHOLD = 70 DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40 MIN_COVERAGE_THRESHOLD = 0 MAX_COVERAGE_THRESHOLD = 100 +DEFAULT_FACE_CROP_RETENTION_DAYS = 90 CONSTRAINED_TABLE_SCHEMAS = { "submissions": """ create table {table} ( @@ -308,6 +310,14 @@ def _ensure_queue_schema(conn: sqlite3.Connection) -> None: ) +def _drop_stale_rebuild_temp_tables(conn: sqlite3.Connection) -> None: + rows = conn.execute( + "select name from sqlite_master where type = 'table' and name like '\\_\\_copyrighter\\_%\\_new' escape '\\'" + ).fetchall() + for row in rows: + conn.execute(f"drop table if exists {row['name']}") + + def _ensure_constrained_schema(conn: sqlite3.Connection) -> None: pending = [ table for table in LEGACY_REBUILD_ORDER if _table_needs_constraint_rebuild(conn, table) @@ -321,11 +331,21 @@ def _ensure_constrained_schema(conn: sqlite3.Connection) -> None: # (the procedure SQLite documents for constraint-changing migrations) must # happen with no transaction pending, so commit first and restore on exit. conn.commit() + # Clean up any half-built temp table left by a prior crashed/failed run so we + # retry cleanly instead of tripping on an orphan. + _drop_stale_rebuild_temp_tables(conn) conn.execute("pragma foreign_keys = off") try: for table in pending: _rebuild_constrained_table(conn, table) conn.commit() + except Exception: + # The copy step raises before the original table is dropped (e.g. legacy + # rows violate the new CHECK), so the source data is intact; remove the + # orphaned temp table so the next startup can retry. + _drop_stale_rebuild_temp_tables(conn) + conn.commit() + raise finally: conn.execute("pragma foreign_keys = on") @@ -408,6 +428,14 @@ class CopyrighterStore: self.collection_public_prefix = collection_public_prefix.rstrip("/") self.face_crop_image_dir = self.db_path.parent / "face-crops" self.face_crop_public_prefix = "/face-crop-media" + # Biometric face crops are personal data: purge them after this many days. + # 0 = keep indefinitely. Configurable via env, clamped to a sane range. + self.face_crop_retention_days = _bounded_int_env( + os.environ.get("COPYRIGHTER_FACE_CROP_RETENTION_DAYS"), + default=DEFAULT_FACE_CROP_RETENTION_DAYS, + minimum=0, + maximum=3650, + ) self.coverage_thresholds = { "coverageGoodRate": _bounded_int_env( os.environ.get("COPYRIGHTER_COVERAGE_GOOD_THRESHOLD"), @@ -438,6 +466,11 @@ class CopyrighterStore: self.candidate_image_fetcher = candidate_image_fetcher or _fetch_url_bytes self.page_fetcher = candidate_image_fetcher or _fetch_page_url_bytes self.stylesheet_fetcher = candidate_image_fetcher or _fetch_stylesheet_url_bytes + # The server is a ThreadingHTTPServer; serialize read-modify-write store + # operations (provider usage counters, candidate promotion, seeding) + # across request threads to prevent lost updates / duplicate side effects. + # Reentrant so a guarded method may call another guarded helper. + self._write_lock = threading.RLock() def initialize(self) -> None: self.db_path.parent.mkdir(parents=True, exist_ok=True) @@ -677,6 +710,14 @@ class CopyrighterStore: conn.close() def seed_from_image_store(self, image_store: LocalSubmissionImageStore) -> int: + # Serialize seeding so two concurrent reload/import/upload requests can't + # both classify the same record as new and double-run analysis, external + # searches, and audit writes for it. + with self._write_lock: + return self._seed_from_image_store_locked(image_store) + + def _seed_from_image_store_locked(self, image_store: LocalSubmissionImageStore) -> int: + self.purge_expired_face_crops() queue = self.ensure_queue(image_store.root) queue_id = str(queue["id"]) now = datetime.now().isoformat(" ", "seconds") @@ -1369,7 +1410,6 @@ class CopyrighterStore: }, ) self._put("submissions", submission_id, submission) - provider_payload["usage"] += provider_call_count if any( item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} for item in domain_evidence @@ -1378,7 +1418,7 @@ class CopyrighterStore: else: provider_payload["lastSuccess"] = _now_label() provider_payload["lastFailure"] = "없음" - self._put("providers", provider, provider_payload) + self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) self._ensure_llm_summary(submission_id) if image_store is not None: self._rescore_submission(submission_id) @@ -1423,13 +1463,12 @@ class CopyrighterStore: _evidence_id(submission_id, llm_evidence), _evidence_payload(submission_id, llm_evidence), ) - llm_provider["usage"] += 1 if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE: llm_provider["lastFailure"] = llm_evidence.reason else: llm_provider["lastSuccess"] = _now_label() llm_provider["lastFailure"] = "없음" - self._put("providers", "llm", llm_provider) + self._apply_provider_usage_delta("llm", 1, llm_provider) return True def _delete_llm_summary_evidence(self, submission_id: str) -> None: @@ -1663,7 +1702,6 @@ class CopyrighterStore: self._put("collection_candidates", candidate["id"], candidate) collected += 1 - provider_payload["usage"] += provider_call_count if any( item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} for item in domain_evidence @@ -1672,39 +1710,50 @@ class CopyrighterStore: else: provider_payload["lastSuccess"] = _now_label() provider_payload["lastFailure"] = "없음" - self._put("providers", provider, provider_payload) + self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates") payload = self.bootstrap() payload["collected"] = collected return payload def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]: - candidate = self._get("collection_candidates", candidate_id) - entry_id = f"kb-candidate-{_timestamp_id()}" - query = str(candidate.get("query", "")) - name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query - memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}" - entry = { - "id": entry_id, - "name": name, - "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), - "aliases": _text_list(payload.get("aliases")) or [query], - "keywords": _text_list(payload.get("keywords")) or [query], - "memo": memo, - "provenance": "manual", - "active": True, - "entryStatus": "confirmed", - "sourceDecision": "", - "sourceCandidate": candidate_id, - "sampleFingerprints": _text_list(candidate.get("sampleFingerprints")), - "imageAsset": str(candidate.get("imageAsset", "")), - "imageFacts": candidate.get("imageFacts", {}), - } - self._put("knowledge_entries", entry_id, entry) - candidate["status"] = "promoted" - candidate["promotedKnowledgeId"] = entry_id - self._put("collection_candidates", candidate_id, candidate) - self.add_audit_event("rights.ops", "Knowledge entry manually created", name, f"promoted candidate {candidate_id}") + with self._write_lock: + candidate = self._get("collection_candidates", candidate_id) + # Idempotent: a double-click / retry must not create a second + # confirmed knowledge entry for the same candidate. + if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"): + return self.bootstrap() + # Deterministic id so a racing retry upserts the same row instead of + # minting a new timestamped id. + entry_id = _stable_id("kb-candidate", candidate_id) + query = str(candidate.get("query", "")) + name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query + memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}" + entry = { + "id": entry_id, + "name": name, + "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), + "aliases": _text_list(payload.get("aliases")) or [query], + "keywords": _text_list(payload.get("keywords")) or [query], + "memo": memo, + "provenance": "manual", + "active": True, + "entryStatus": "confirmed", + "sourceDecision": "", + "sourceCandidate": candidate_id, + "sampleFingerprints": _text_list(candidate.get("sampleFingerprints")), + "imageAsset": str(candidate.get("imageAsset", "")), + "imageFacts": candidate.get("imageFacts", {}), + } + with self._transaction() as conn: + self._put("knowledge_entries", entry_id, entry, conn=conn) + candidate["status"] = "promoted" + candidate["promotedKnowledgeId"] = entry_id + self._put("collection_candidates", candidate_id, candidate, conn=conn) + self.add_audit_event( + "rights.ops", "Knowledge entry manually created", name, + f"promoted candidate {candidate_id}", conn=conn, + ) return self.bootstrap() def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]: @@ -2013,6 +2062,47 @@ class CopyrighterStore: submission["faceCrops"] = face_crops self._put("submissions", submission_id, submission) + def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int: + # Delete biometric face-crop files older than the retention window and + # drop their references from the owning submission, with an audit trail. + retention_days = int(self.face_crop_retention_days) + if retention_days <= 0 or not self.face_crop_image_dir.exists(): + return 0 + now = now_epoch if now_epoch is not None else datetime.now().timestamp() + cutoff = now - retention_days * 86400 + purged = 0 + with self._write_lock: + for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()): + submission_id = crop_dir.name + removed: list[str] = [] + for crop_file in crop_dir.glob("crop-*.jpg"): + try: + if crop_file.stat().st_mtime >= cutoff: + continue + crop_file.unlink() + except OSError: + continue + removed.append(crop_file.name) + purged += 1 + if not removed: + continue + change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)" + try: + submission = self._get("submissions", submission_id) + except KeyError: + self.add_audit_event("system", "Face crop purged", submission_id, change) + continue + remaining = [ + crop + for crop in submission.get("faceCrops", []) + if f"crop-{crop.get('index')}.jpg" not in removed + ] + with self._transaction() as conn: + submission["faceCrops"] = remaining + self._put("submissions", submission_id, submission, conn=conn) + self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn) + return purged + def _refresh_existing_submission_file_facts( self, image_store: LocalSubmissionImageStore, @@ -2092,7 +2182,6 @@ class CopyrighterStore: ) google_provider = self._get("providers", "google") - google_provider["usage"] += call_count if any( item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED} for item in domain_evidence @@ -2101,7 +2190,7 @@ class CopyrighterStore: else: google_provider["lastSuccess"] = _now_label() google_provider["lastFailure"] = "없음" - self._put("providers", "google", google_provider) + self._apply_provider_usage_delta("google", call_count, google_provider) self.add_audit_event( "rights.ops", "Provider called", @@ -2165,6 +2254,16 @@ class CopyrighterStore: ) if crop_matches: evidence.extend(crop_matches) + if call_count: + # Governance: personal (biometric) images left the system to a + # third-party API. The external call itself is permitted, but the + # transmission must be auditable. + self.add_audit_event( + "system", + "Personal image sent to external provider", + submission_id, + f"{call_count} face crop(s) -> Google Vision web detection", + ) return evidence, call_count def _auto_google_custom_search( @@ -2317,7 +2416,6 @@ class CopyrighterStore: self._put("submissions", submission_id, submission) google_search_provider = self._get("providers", "google_search") - google_search_provider["usage"] += external_call_count if any( item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} for item in all_domain_evidence @@ -2326,7 +2424,7 @@ class CopyrighterStore: else: google_search_provider["lastSuccess"] = _now_label() google_search_provider["lastFailure"] = "없음" - self._put("providers", "google_search", google_search_provider) + self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider) self.add_audit_event( "system", "Provider called", @@ -2551,7 +2649,6 @@ class CopyrighterStore: self._put("submissions", submission_id, submission) naver_provider = self._get("providers", "naver") - naver_provider["usage"] += external_call_count if any( item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} for item in all_domain_evidence @@ -2560,7 +2657,7 @@ class CopyrighterStore: else: naver_provider["lastSuccess"] = _now_label() naver_provider["lastFailure"] = "없음" - self._put("providers", "naver", naver_provider) + self._apply_provider_usage_delta("naver", external_call_count, naver_provider) self.add_audit_event( "system", "Provider called", @@ -3476,6 +3573,18 @@ class CopyrighterStore: raise KeyError(id_value) return json.loads(row["payload"]) + def _apply_provider_usage_delta( + self, provider_id: str, count: int, payload: dict[str, Any] + ) -> None: + # Re-read current usage under the write lock and apply the delta so + # concurrent provider calls don't lose increments. The rest of `payload` + # (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for + # status fields. + with self._write_lock: + current = self._get("providers", provider_id) + payload["usage"] = int(current.get("usage", 0) or 0) + int(count) + self._put("providers", provider_id, payload) + def _clear_collection_candidates(self) -> None: with self._connect() as conn: conn.execute("delete from collection_candidates") diff --git a/tests/rights_filter/analysis/test_evidence_enrichment.py b/tests/rights_filter/analysis/test_evidence_enrichment.py index 85cebc6..5c0cb48 100644 --- a/tests/rights_filter/analysis/test_evidence_enrichment.py +++ b/tests/rights_filter/analysis/test_evidence_enrichment.py @@ -110,3 +110,34 @@ def test_missing_analysis_run_returns_failure_summary(): assert summary.failed == 1 assert "missing analysis run" in summary.failure_reasons[0] + + +def test_enrichment_skips_llm_when_summary_already_present(): + repo = InMemoryRightsFilterRepository() + run = AnalysisRun.for_submission("submission-1", "v1") + run.add_evidence( + Evidence( + source=EvidenceSource.WEB_DETECTION, + reason="Web entity matched IU", + confidence=0.9, + data={"entity": "IU", "category": "celebrity"}, + ) + ) + run.add_evidence( + Evidence(source=EvidenceSource.LLM_SUMMARY, reason="existing summary", confidence=0.0, data={}) + ) + repo.save_analysis_run(run) + + enricher = _enricher() + calls: list[str] = [] + original = enricher.llm_assistant.summarize + + def counting(submission_id, source_evidence): + calls.append(submission_id) + return original(submission_id, source_evidence) + + enricher.llm_assistant.summarize = counting + enricher.enrich_latest(repo, "submission-1") + + # summarize must not run when a summary already exists (result was discarded). + assert calls == [] diff --git a/tests/rights_filter/analysis/test_risk_scoring.py b/tests/rights_filter/analysis/test_risk_scoring.py index 2e4019f..592f114 100644 --- a/tests/rights_filter/analysis/test_risk_scoring.py +++ b/tests/rights_filter/analysis/test_risk_scoring.py @@ -177,3 +177,55 @@ def test_multiple_google_visual_similar_results_do_not_stack_into_high_risk(): assert result.band == "medium" assert result.score < 70 + + +def _fingerprint(similarity: float, reason: str = "Prior rejected image similarity") -> Evidence: + return Evidence( + source=EvidenceSource.FINGERPRINT, + reason=reason, + confidence=similarity, + data={"similarity": similarity}, + ) + + +def test_score_of_exactly_30_is_medium_not_low(): + # A single weak fingerprint contributes +30 — the medium boundary. + result = RiskScorer().score([_fingerprint(0.5)]) + assert result.score == 30 + assert result.band == "medium" + + +def test_score_just_below_70_is_medium(): + # Face (+35) + weak fingerprint (+30) = 65. + result = RiskScorer().score( + [ + Evidence(source=EvidenceSource.FACE_PERSON, reason="Face/person detected", confidence=0.8, data={"face_count": 1}), + _fingerprint(0.5), + ] + ) + assert result.score == 65 + assert result.band == "medium" + + +def test_score_of_exactly_70_is_high_not_medium(): + # Face (+35) + partial web detection (+35) = 70 — the high boundary. + result = RiskScorer().score( + [ + Evidence(source=EvidenceSource.FACE_PERSON, reason="Face/person detected", confidence=0.8, data={"face_count": 1}), + Evidence(source=EvidenceSource.WEB_DETECTION, reason="Partial match", confidence=0.9, data={"match": "partial"}), + ] + ) + assert result.score == 70 + assert result.band == "high" + + +def test_score_is_clamped_to_100(): + # Strong fingerprint (+80) + full web detection (+45) = 125 -> clamped to 100. + result = RiskScorer().score( + [ + _fingerprint(0.95), + Evidence(source=EvidenceSource.WEB_DETECTION, reason="Full match", confidence=0.9, data={"match": "full"}), + ] + ) + assert result.score == 100 + assert result.band == "high" diff --git a/tests/rights_filter/server/test_sqlite_store.py b/tests/rights_filter/server/test_sqlite_store.py index a00f062..a6cd395 100644 --- a/tests/rights_filter/server/test_sqlite_store.py +++ b/tests/rights_filter/server/test_sqlite_store.py @@ -8050,3 +8050,130 @@ def test_fetch_rejects_internal_or_non_http_hosts(url): # SSRF guard runs before any network call, so these raise without a fetch. with pytest.raises(ValueError): sqlite_store_module._fetch_url_bytes(url) + + +@pytest.mark.parametrize( + "method_name", + ["knowledge_media_path", "collected_media_path", "face_crop_media_path"], +) +@pytest.mark.parametrize("evil", ["../../etc/passwd", "../../../secret", "../escape.txt"]) +def test_media_path_guards_reject_traversal(tmp_path: Path, method_name: str, evil: str): + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + method = getattr(store, method_name) + with pytest.raises(ValueError): + method(evil) + + +def test_promote_collection_candidate_is_idempotent(tmp_path: Path): + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + store._put( + "collection_candidates", + "CAND-1", + { + "id": "CAND-1", + "provider": "naver", + "query": "iu", + "title": "IU", + "status": "candidate", + "sourceUrl": "", + "collectedEpoch": 0, + "sampleFingerprints": [], + }, + ) + + store.promote_collection_candidate("CAND-1", {}) + store.promote_collection_candidate("CAND-1", {}) # double-click / retry + + candidate_entries = [ + entry for entry in store._all("knowledge_entries") if entry.get("sourceCandidate") == "CAND-1" + ] + assert len(candidate_entries) == 1 + assert store._get("collection_candidates", "CAND-1")["status"] == "promoted" + + +def test_constraint_migration_failure_preserves_data_and_cleans_temp(tmp_path: Path): + db_path = tmp_path / "legacy.sqlite3" + legacy = sqlite3.connect(db_path) + # Legacy submissions table without the new risk_band / json_valid constraints. + legacy.execute( + "create table submissions (id text primary key, title text, risk_score integer, " + "risk_band text, decision_status text, submitted_epoch integer, queue_id text, payload text)" + ) + # A value the NEW CHECK rejects, so the rebuild copy step raises. + legacy.execute( + "insert into submissions values ('S1', 't', 0, 'weird', 'unreviewed', 0, '', '{}')" + ) + legacy.commit() + legacy.close() + + store = CopyrighterStore(db_path) + with pytest.raises(sqlite3.IntegrityError): + store.initialize() + + check = sqlite3.connect(db_path) + try: + # Original data survives (copy failed before the original was dropped). + assert check.execute("select count(*) from submissions").fetchone()[0] == 1 + # No orphaned rebuild temp table is left behind. + orphan = check.execute( + "select count(*) from sqlite_master where name like '__copyrighter_%_new'" + ).fetchone()[0] + assert orphan == 0 + finally: + check.close() + + +def test_purge_expired_face_crops_removes_old_biometric_data(tmp_path: Path): + import os + import time + + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + store.face_crop_retention_days = 30 + + crop_dir = store.face_crop_image_dir / "SUB-1" + crop_dir.mkdir(parents=True) + crop_file = crop_dir / "crop-1.jpg" + crop_file.write_bytes(b"jpegdata") + aged = time.time() - 60 * 86400 + os.utime(crop_file, (aged, aged)) + + store._put( + "submissions", + "SUB-1", + { + "id": "SUB-1", + "title": "t", + "asset": "", + "riskScore": 0, + "riskBand": "low", + "decisionStatus": "unreviewed", + "providerState": {}, + "fileFacts": {}, + "evidence": [], + "faceCrops": [{"index": 1, "url": "/face-crop-media/SUB-1/crop-1.jpg", "box": [0, 0, 1, 1]}], + }, + ) + + purged = store.purge_expired_face_crops() + + assert purged == 1 + assert not crop_file.exists() + assert store._get("submissions", "SUB-1")["faceCrops"] == [] + assert any(event["event"] == "Face crop purged" for event in store.audit_events()) + + +def test_purge_expired_face_crops_keeps_recent_crops(tmp_path: Path): + store = CopyrighterStore(tmp_path / "c.sqlite3") + store.initialize() + store.face_crop_retention_days = 30 + + crop_dir = store.face_crop_image_dir / "SUB-2" + crop_dir.mkdir(parents=True) + crop_file = crop_dir / "crop-1.jpg" + crop_file.write_bytes(b"jpegdata") # fresh mtime + + assert store.purge_expired_face_crops() == 0 + assert crop_file.exists()