From 3bc07d94c3cc00ee7defde70fac104df39db242e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 22:18:38 +0900 Subject: [PATCH] refactor: extract enrichment internals into StoreEnrichmentMixin Move provider-state sync, local face evidence/crop sync + retention purge, internal/Google re-analysis, face-crop web detection, and the auto Google/Naver search drivers into a mixin; CopyrighterStore inherits it. Drop now-unused imports. Tests now also patch HeuristicFacePersonDetector on store_enrichment (face detection moved there). sqlite_store.py 2358 -> 1598 lines (-70% overall). --- src/rights_filter/server/sqlite_store.py | 762 +---------------- src/rights_filter/server/store_enrichment.py | 789 ++++++++++++++++++ tests/rights_filter/server/test_http_app.py | 3 + .../rights_filter/server/test_sqlite_store.py | 8 + 4 files changed, 802 insertions(+), 760 deletions(-) create mode 100644 src/rights_filter/server/store_enrichment.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 18b4091..3c2a0f7 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -15,9 +15,7 @@ from urllib.parse import urlparse from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector from rights_filter.analysis.fingerprints import FingerprintService from rights_filter.analysis.internal_analyzer import InternalAnalyzer -from rights_filter.analysis.preprocessing import build_external_derivative, build_face_crop_derivatives from rights_filter.analysis.risk_scoring import RiskScorer -from rights_filter.analysis.search_query_generation import SearchQueryGenerator from rights_filter.analysis.search_result_promoter import SearchResultPromoter from rights_filter.domain.records import ( Evidence, @@ -50,6 +48,7 @@ from rights_filter.server.store_remote_fetch import ( _fetch_stylesheet_url_bytes, _fetch_url_bytes, ) +from rights_filter.server.store_enrichment import StoreEnrichmentMixin from rights_filter.server.store_persistence import StorePersistenceMixin from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_schema import ( @@ -64,23 +63,10 @@ from rights_filter.server.store_serialization import ( _evidence_id, _evidence_matches_provider, _evidence_payload, - _existing_google_custom_query_signatures, - _existing_naver_query_signatures, - _external_provider_ids, - _external_provider_state_for_submission, - _face_crop_web_evidence, - _google_custom_image_query_signature, - _google_custom_web_query_signature, - _google_weak_label_title, - _is_google_weak_label_payload, _knowledge_type_value, - _naver_blog_query_signature, - _naver_query_signature, - _naver_web_query_signature, _now_label, _provider_item_failed, _provider_item_has_result, - _query_history_status, _stable_id, _submission_payload, _submission_search_hint_evidence, @@ -90,7 +76,7 @@ from rights_filter.server.store_serialization import ( from rights_filter.server.store_text import _text_list, _unique_texts -class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin): +class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreEnrichmentMixin): def __init__( self, db_path: Path | str, @@ -1610,747 +1596,3 @@ class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin): ), ) - def _sync_provider_payloads(self) -> None: - existing = {provider["id"]: provider for provider in self._all("providers")} - for provider_id, payload in self.provider_runtime.provider_payloads.items(): - merged = { - **payload, - "usage": existing.get(provider_id, {}).get("usage", payload["usage"]), - } - self._put("providers", provider_id, merged) - self._sync_submission_provider_state() - - def _sync_submission_provider_state(self, queue_id: str | None = None) -> None: - provider_payloads = {provider["id"]: provider for provider in self._all("providers")} - evidence_by_submission = self._evidence_by_submission(queue_id=queue_id) - for submission in self._all("submissions", queue_id=queue_id): - provider_state = submission.get("providerState", {}) - evidence = evidence_by_submission.get(str(submission["id"]), []) - provider_state["internal"] = "ok" - for provider_id in _external_provider_ids(provider_payloads): - provider_state[provider_id] = _external_provider_state_for_submission( - provider_payloads, - provider_id, - submission, - evidence, - ) - submission["providerState"] = provider_state - self._put("submissions", submission["id"], submission) - - def _normalize_saved_google_weak_labels_and_rescore(self, queue_id: str | None = None) -> None: - changed_submissions: set[str] = set() - with self._connect() as conn: - rows = conn.execute( - """ - select id, submission_id, payload - from evidence - """.strip() + ( - " where submission_id in (select id from submissions where queue_id = ?)" - if queue_id is not None - else "" - ), - (queue_id,) if queue_id is not None else (), - ).fetchall() - - for row in rows: - payload = json.loads(row["payload"]) - if _is_google_weak_label_payload(payload): - payload["title"] = _google_weak_label_title(str(payload.get("title", ""))) - payload["confidence"] = 0.0 - payload["contributed"] = False - payload["status"] = "weak" - self._put("evidence", row["id"], payload) - changed_submissions.add(str(row["submission_id"])) - - for submission_id in changed_submissions: - self._rescore_submission(submission_id) - - def _refresh_existing_local_face_evidence( - self, - image_store: LocalSubmissionImageStore, - *, - queue_id: str, - ) -> None: - existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)} - if not existing_submission_ids: - return - - evidence_by_submission = self._evidence_by_submission(queue_id=queue_id) - detector = HeuristicFacePersonDetector() - for record in image_store.submission_records(): - submission_id = str(record["id"]) - if submission_id not in existing_submission_ids: - continue - if any( - item.get("source") == "face" - for item in evidence_by_submission.get(submission_id, []) - ): - continue - - signal = detector.detect(image_store.image_payload(submission_id)) - if not signal.present: - continue - - evidence = Evidence( - source=EvidenceSource.FACE_PERSON, - reason="Face/person detected", - confidence=0.8, - data={ - "face_count": signal.face_count, - "person_count": signal.person_count, - }, - ) - self._put( - "evidence", - _evidence_id(submission_id, evidence), - _evidence_payload(submission_id, evidence), - ) - self._rescore_submission(submission_id) - - def _sync_face_crops( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None, - ) -> None: - if image_store is None: - return - try: - original = image_store.image_payload(submission_id) - except Exception: - return - - signal = HeuristicFacePersonDetector().detect(original) - crop_dir = self.face_crop_image_dir / submission_id - if crop_dir.exists(): - for stale in crop_dir.glob("crop-*.jpg"): - try: - stale.unlink() - except OSError: - continue - - face_crops: list[dict[str, Any]] = [] - for index, box in enumerate(signal.face_boxes[:3], start=1): - crops = build_face_crop_derivatives(original, [box]) - if not crops: - continue - try: - crop_dir.mkdir(parents=True, exist_ok=True) - crop_path = crop_dir / f"crop-{index}.jpg" - crop_path.write_bytes(crops[0].content) - except OSError: - continue - face_crops.append( - { - "index": index, - "url": f"{self.face_crop_public_prefix}/{submission_id}/crop-{index}.jpg", - "box": [int(value) for value in box], - } - ) - - submission = self._get("submissions", submission_id) - submission["faceCrops"] = face_crops - self._put("submissions", submission_id, submission) - - def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int: - # Delete biometric face-crop files older than the retention window and - # drop their references from the owning submission, with an audit trail. - retention_days = int(self.face_crop_retention_days) - if retention_days <= 0 or not self.face_crop_image_dir.exists(): - return 0 - now = now_epoch if now_epoch is not None else datetime.now().timestamp() - cutoff = now - retention_days * 86400 - purged = 0 - with self._write_lock: - for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()): - submission_id = crop_dir.name - removed: list[str] = [] - for crop_file in crop_dir.glob("crop-*.jpg"): - try: - if crop_file.stat().st_mtime >= cutoff: - continue - crop_file.unlink() - except OSError: - continue - removed.append(crop_file.name) - purged += 1 - if not removed: - continue - change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)" - try: - submission = self._get("submissions", submission_id) - except KeyError: - self.add_audit_event("system", "Face crop purged", submission_id, change) - continue - remaining = [ - crop - for crop in submission.get("faceCrops", []) - if f"crop-{crop.get('index')}.jpg" not in removed - ] - with self._transaction() as conn: - submission["faceCrops"] = remaining - self._put("submissions", submission_id, submission, conn=conn) - self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn) - return purged - - def _refresh_existing_submission_file_facts( - self, - image_store: LocalSubmissionImageStore, - *, - queue_id: str, - ) -> None: - existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)} - for record in image_store.submission_records(): - submission_id = str(record["id"]) - if submission_id not in existing_submission_ids: - continue - submission = self._get("submissions", submission_id) - submission["asset"] = record["asset"] - submission["fileFacts"] = { - **submission.get("fileFacts", {}), - "size": f"{record.get('width', 1)} x {record.get('height', 1)}", - "format": record.get("format", "FILE"), - } - self._put("submissions", submission_id, submission) - - def _rerun_internal_analysis( - self, - submission_id: str, - image_store: LocalSubmissionImageStore, - ) -> list[Evidence]: - repository = self._knowledge_repository() - analyzer = InternalAnalyzer( - repository, - FingerprintService(), - HeuristicFacePersonDetector(), - ) - domain_evidence = analyzer.analyze( - submission_id, - image_store.image_payload(submission_id), - ) - for item in domain_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "rerun" - self._put("evidence", payload["id"], payload) - self._sync_similar_reference_images(submission_id, domain_evidence) - self._increment_knowledge_contribution_counts(submission_id, domain_evidence) - self._rescore_submission(submission_id) - self._sync_face_crops(submission_id, image_store) - return domain_evidence - - def _rerun_google_image_search( - self, - submission_id: str, - image_store: LocalSubmissionImageStore, - ) -> list[Evidence]: - if self.provider_runtime.google_adapter is None: - return [] - - original_image = image_store.image_payload(submission_id) - derivative = build_external_derivative(original_image) - domain_evidence = self.provider_runtime.google_adapter.detect( - submission_id, - derivative, - self.provider_runtime.external_policy, - ) - call_count = 1 - face_crop_evidence, face_crop_calls = self._google_face_crop_web_detection( - submission_id, - original_image, - ) - call_count += face_crop_calls - domain_evidence.extend(face_crop_evidence) - for item in domain_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "rerun" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - self._sync_search_result_image_similarity( - submission_id, - domain_evidence, - image_store, - status="rerun", - ) - - google_provider = self._get("providers", "google") - if any( - item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED} - for item in domain_evidence - ): - google_provider["lastFailure"] = domain_evidence[0].reason if domain_evidence else "Google image search failed" - else: - google_provider["lastSuccess"] = _now_label() - google_provider["lastFailure"] = "없음" - self._apply_provider_usage_delta("google", call_count, google_provider) - self.add_audit_event( - "rights.ops", - "Provider called", - f"google / {submission_id}", - "image web detection rerun", - ) - self._rescore_submission(submission_id) - return domain_evidence - - def _google_face_crop_web_detection( - self, - submission_id: str, - original_image: Any, - ) -> tuple[list[Evidence], int]: - if self.provider_runtime.google_adapter is None: - return [], 0 - if not self.provider_runtime.face_crop_web_detection_enabled: - return [], 0 - - submission = self._get("submissions", submission_id) - stored_crops = submission.get("faceCrops") - if stored_crops is None: - signal = HeuristicFacePersonDetector().detect(original_image) - indexed_boxes = [ - (index, box) - for index, box in enumerate(signal.face_boxes[:3], start=1) - ] - else: - # 저장된 faceCrops의 index를 그대로 사용해 증거의 crop_index가 - # 워크벤치 썸네일 번호와 항상 일치하게 한다(중간 박스 크롭 실패 시 - # 재열거하면 번호가 어긋난다). - indexed_boxes = [ - (int(item.get("index", 0)), tuple(int(value) for value in item.get("box", []))) - for item in stored_crops - if len(item.get("box", [])) == 4 - ] - if not indexed_boxes: - return [], 0 - - evidence: list[Evidence] = [] - call_count = 0 - for crop_index, box in indexed_boxes: - crops = build_face_crop_derivatives(original_image, [box]) - if not crops: - continue - crop = crops[0] - call_count += 1 - crop_evidence = self.provider_runtime.google_adapter.detect( - f"{submission_id}:face-crop-{crop_index}", - crop, - self.provider_runtime.external_policy, - ) - for item in crop_evidence: - face_web_evidence = _face_crop_web_evidence(submission_id, crop_index, item) - evidence.append(face_web_evidence) - crop_matches = self._face_crop_search_result_similarity_evidence( - submission_id, - crop_index, - crop, - face_web_evidence, - ) - if crop_matches: - evidence.extend(crop_matches) - if call_count: - # Governance: personal (biometric) images left the system to a - # third-party API. The external call itself is permitted, but the - # transmission must be auditable. - self.add_audit_event( - "system", - "Personal image sent to external provider", - submission_id, - f"{call_count} face crop(s) -> Google Vision web detection", - ) - return evidence, call_count - - def _auto_google_custom_search( - self, - submission_id: str, - source_evidence: list[Evidence], - image_store: LocalSubmissionImageStore | None = None, - ) -> None: - if self.provider_runtime.google_custom_search_adapter is None: - return - if self.provider_runtime.auto_google_custom_query_limit <= 0: - return - - query_plan = SearchQueryGenerator().plan( - source_evidence, - [], - max_queries=self.provider_runtime.auto_google_custom_query_limit, - ) - if not query_plan: - return - - existing_signatures = _existing_google_custom_query_signatures( - self._evidence_by_submission().get(submission_id, []) - ) - can_compare_search_images = self._can_compare_search_result_images( - submission_id, - image_store, - ) - all_domain_evidence: list[Evidence] = [] - history_entries: list[dict[str, Any]] = [] - external_call_count = 0 - - for planned in query_plan: - query = planned.query - image_signature = _google_custom_image_query_signature(query) - if image_signature in existing_signatures: - continue - existing_signatures.add(image_signature) - - image_evidence = self.provider_runtime.google_custom_search_adapter.search_images( - submission_id, - query, - self.provider_runtime.google_custom_search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in image_evidence): - external_call_count += 1 - image_evidence = [ - replace( - item, - data={ - **item.data, - "query_strategy": planned.strategy, - "query_source": planned.source, - "query_priority": planned.priority, - }, - ) - for item in image_evidence - ] - all_domain_evidence.extend(image_evidence) - for item in image_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "auto" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - - similarity_evidence: list[Evidence] = [] - remaining_matches = self._search_result_similarity_remaining_budget( - submission_id, - image_store, - ) - if can_compare_search_images and remaining_matches > 0: - similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - image_evidence, - image_store, - status="auto", - max_matches=remaining_matches, - ) - all_domain_evidence.extend(similarity_evidence) - - history_entries.append( - { - "provider": "google_search", - "query": query, - "status": _query_history_status(image_evidence), - "strategy": planned.strategy, - "source": planned.source, - "timestamp": _now_label(), - "count": len(image_evidence), - } - ) - - web_signature = _google_custom_web_query_signature(query) - if web_signature in existing_signatures: - continue - existing_signatures.add(web_signature) - - web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( - submission_id, - query, - self.provider_runtime.google_custom_search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - external_call_count += 1 - web_evidence = [ - replace( - item, - data={ - **item.data, - "query_strategy": planned.strategy, - "query_source": planned.source, - "query_priority": planned.priority, - }, - ) - for item in web_evidence - ] - all_domain_evidence.extend(web_evidence) - for item in web_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "auto" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - all_domain_evidence.extend( - self._sync_search_result_image_similarity( - submission_id, - web_evidence, - image_store, - status="auto", - max_matches=self._search_result_similarity_remaining_budget( - submission_id, - image_store, - ), - ) - ) - history_entries.append( - { - "provider": "google_search", - "query": query, - "status": _query_history_status(web_evidence), - "strategy": planned.strategy, - "source": planned.source, - "timestamp": _now_label(), - "count": len(web_evidence), - } - ) - - if not history_entries: - return - - submission = self._get("submissions", submission_id) - submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])] - self._put("submissions", submission_id, submission) - - google_search_provider = self._get("providers", "google_search") - if any( - item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} - for item in all_domain_evidence - ): - google_search_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Google custom search failed" - else: - google_search_provider["lastSuccess"] = _now_label() - google_search_provider["lastFailure"] = "없음" - self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider) - self.add_audit_event( - "system", - "Provider called", - f"google_search / {submission_id}", - f"auto custom search query batch: {', '.join(item['query'] for item in history_entries)}", - ) - - def _auto_naver_search( - self, - submission_id: str, - source_evidence: list[Evidence], - image_store: LocalSubmissionImageStore | None = None, - ) -> None: - if self.provider_runtime.naver_adapter is None: - return - if self.provider_runtime.auto_naver_query_limit <= 0: - return - - query_plan = SearchQueryGenerator().plan( - source_evidence, - [], - max_queries=self.provider_runtime.auto_naver_query_limit, - ) - if not query_plan: - return - - existing_signatures = _existing_naver_query_signatures( - self._evidence_by_submission().get(submission_id, []) - ) - can_compare_search_images = self._can_compare_search_result_images( - submission_id, - image_store, - ) - all_domain_evidence: list[Evidence] = [] - history_entries: list[dict[str, Any]] = [] - external_call_count = 0 - blog_query_count = 0 - web_query_count = 0 - promoter = SearchResultPromoter() - - for planned in query_plan: - query = planned.query - signature = _naver_query_signature(query) - if signature in existing_signatures: - continue - existing_signatures.add(signature) - - domain_evidence = self.provider_runtime.naver_adapter.search( - submission_id, - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence): - external_call_count += 1 - domain_evidence = [ - replace( - item, - data={ - **item.data, - "query_strategy": planned.strategy, - "query_source": planned.source, - "query_priority": planned.priority, - }, - ) - for item in domain_evidence - ] - domain_evidence = promoter.promote(domain_evidence) - all_domain_evidence.extend(domain_evidence) - for item in domain_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "auto" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - similarity_evidence: list[Evidence] = [] - remaining_matches = self._search_result_similarity_remaining_budget( - submission_id, - image_store, - ) - if can_compare_search_images and remaining_matches > 0: - similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - domain_evidence, - image_store, - status="auto", - max_matches=remaining_matches, - ) - all_domain_evidence.extend(similarity_evidence) - - history_entries.append( - { - "provider": "naver", - "query": query, - "status": _query_history_status(domain_evidence), - "strategy": planned.strategy, - "source": planned.source, - "timestamp": _now_label(), - "count": len(domain_evidence), - } - ) - - page_similarity_evidence: list[Evidence] = [] - remaining_matches = self._search_result_similarity_remaining_budget( - submission_id, - image_store, - ) - if blog_query_count < self.provider_runtime.auto_naver_blog_query_limit: - blog_signature = _naver_blog_query_signature(query) - if blog_signature in existing_signatures: - continue - existing_signatures.add(blog_signature) - blog_query_count += 1 - - page_evidence = self.provider_runtime.naver_adapter.search_pages( - submission_id, - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): - external_call_count += 1 - page_evidence = [ - replace( - item, - data={ - **item.data, - "query_strategy": planned.strategy, - "query_source": planned.source, - "query_priority": planned.priority, - }, - ) - for item in page_evidence - ] - page_evidence = promoter.promote(page_evidence) - all_domain_evidence.extend(page_evidence) - for item in page_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "auto" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - page_similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - page_evidence, - image_store, - status="auto", - max_matches=remaining_matches if can_compare_search_images else 0, - ) - all_domain_evidence.extend(page_similarity_evidence) - history_entries.append( - { - "provider": "naver_blog", - "query": query, - "status": _query_history_status(page_evidence), - "strategy": planned.strategy, - "source": planned.source, - "timestamp": _now_label(), - "count": len(page_evidence), - } - ) - - if web_query_count < self.provider_runtime.auto_naver_web_query_limit: - web_signature = _naver_web_query_signature(query) - if web_signature in existing_signatures: - continue - existing_signatures.add(web_signature) - web_query_count += 1 - - web_evidence = self.provider_runtime.naver_adapter.search_web_pages( - submission_id, - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - external_call_count += 1 - web_evidence = [ - replace( - item, - data={ - **item.data, - "query_strategy": planned.strategy, - "query_source": planned.source, - "query_priority": planned.priority, - }, - ) - for item in web_evidence - ] - web_evidence = promoter.promote(web_evidence) - all_domain_evidence.extend(web_evidence) - for item in web_evidence: - payload = _evidence_payload(submission_id, item) - payload["status"] = "auto" if payload["status"] == "active" else payload["status"] - self._put("evidence", payload["id"], payload) - all_domain_evidence.extend( - self._sync_search_result_image_similarity( - submission_id, - web_evidence, - image_store, - status="auto", - max_matches=( - self._search_result_similarity_remaining_budget( - submission_id, - image_store, - ) - if can_compare_search_images - else 0 - ), - ) - ) - history_entries.append( - { - "provider": "naver_web", - "query": query, - "status": _query_history_status(web_evidence), - "strategy": planned.strategy, - "source": planned.source, - "timestamp": _now_label(), - "count": len(web_evidence), - } - ) - - if not history_entries: - return - - submission = self._get("submissions", submission_id) - submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])] - self._put("submissions", submission_id, submission) - - naver_provider = self._get("providers", "naver") - if any( - item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} - for item in all_domain_evidence - ): - naver_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Naver search failed" - else: - naver_provider["lastSuccess"] = _now_label() - naver_provider["lastFailure"] = "없음" - self._apply_provider_usage_delta("naver", external_call_count, naver_provider) - self.add_audit_event( - "system", - "Provider called", - f"naver / {submission_id}", - f"auto text query batch: {', '.join(item['query'] for item in history_entries)}", - ) - diff --git a/src/rights_filter/server/store_enrichment.py b/src/rights_filter/server/store_enrichment.py new file mode 100644 index 0000000..d9a2663 --- /dev/null +++ b/src/rights_filter/server/store_enrichment.py @@ -0,0 +1,789 @@ +"""Enrichment internals for CopyrighterStore, as a mixin. + +Provider-state sync, local face evidence/crop sync + retention, internal/Google +re-analysis, face-crop web detection, and the auto Google/Naver search drivers. +Mixed into CopyrighterStore; relies on persistence methods, self.* attributes +(provider_runtime, image dirs, fetchers, _write_lock), and self.add_audit_event / +rescoring provided by the host class. Behavior unchanged. +""" + +from __future__ import annotations + +import json +from dataclasses import replace +from datetime import datetime +from typing import Any + +from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector +from rights_filter.analysis.fingerprints import FingerprintService +from rights_filter.analysis.internal_analyzer import InternalAnalyzer +from rights_filter.analysis.preprocessing import build_external_derivative, build_face_crop_derivatives +from rights_filter.analysis.search_query_generation import SearchQueryGenerator +from rights_filter.analysis.search_result_promoter import SearchResultPromoter +from rights_filter.domain.records import Evidence, EvidenceSource +from rights_filter.server.image_store import LocalSubmissionImageStore +from rights_filter.server.store_serialization import ( + _evidence_id, + _evidence_payload, + _existing_google_custom_query_signatures, + _existing_naver_query_signatures, + _external_provider_ids, + _external_provider_state_for_submission, + _face_crop_web_evidence, + _google_custom_image_query_signature, + _google_custom_web_query_signature, + _google_weak_label_title, + _is_google_weak_label_payload, + _naver_blog_query_signature, + _naver_query_signature, + _naver_web_query_signature, + _now_label, + _query_history_status, +) + + +class StoreEnrichmentMixin: + def _sync_provider_payloads(self) -> None: + existing = {provider["id"]: provider for provider in self._all("providers")} + for provider_id, payload in self.provider_runtime.provider_payloads.items(): + merged = { + **payload, + "usage": existing.get(provider_id, {}).get("usage", payload["usage"]), + } + self._put("providers", provider_id, merged) + self._sync_submission_provider_state() + + def _sync_submission_provider_state(self, queue_id: str | None = None) -> None: + provider_payloads = {provider["id"]: provider for provider in self._all("providers")} + evidence_by_submission = self._evidence_by_submission(queue_id=queue_id) + for submission in self._all("submissions", queue_id=queue_id): + provider_state = submission.get("providerState", {}) + evidence = evidence_by_submission.get(str(submission["id"]), []) + provider_state["internal"] = "ok" + for provider_id in _external_provider_ids(provider_payloads): + provider_state[provider_id] = _external_provider_state_for_submission( + provider_payloads, + provider_id, + submission, + evidence, + ) + submission["providerState"] = provider_state + self._put("submissions", submission["id"], submission) + + def _normalize_saved_google_weak_labels_and_rescore(self, queue_id: str | None = None) -> None: + changed_submissions: set[str] = set() + with self._connect() as conn: + rows = conn.execute( + """ + select id, submission_id, payload + from evidence + """.strip() + ( + " where submission_id in (select id from submissions where queue_id = ?)" + if queue_id is not None + else "" + ), + (queue_id,) if queue_id is not None else (), + ).fetchall() + + for row in rows: + payload = json.loads(row["payload"]) + if _is_google_weak_label_payload(payload): + payload["title"] = _google_weak_label_title(str(payload.get("title", ""))) + payload["confidence"] = 0.0 + payload["contributed"] = False + payload["status"] = "weak" + self._put("evidence", row["id"], payload) + changed_submissions.add(str(row["submission_id"])) + + for submission_id in changed_submissions: + self._rescore_submission(submission_id) + + def _refresh_existing_local_face_evidence( + self, + image_store: LocalSubmissionImageStore, + *, + queue_id: str, + ) -> None: + existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)} + if not existing_submission_ids: + return + + evidence_by_submission = self._evidence_by_submission(queue_id=queue_id) + detector = HeuristicFacePersonDetector() + for record in image_store.submission_records(): + submission_id = str(record["id"]) + if submission_id not in existing_submission_ids: + continue + if any( + item.get("source") == "face" + for item in evidence_by_submission.get(submission_id, []) + ): + continue + + signal = detector.detect(image_store.image_payload(submission_id)) + if not signal.present: + continue + + evidence = Evidence( + source=EvidenceSource.FACE_PERSON, + reason="Face/person detected", + confidence=0.8, + data={ + "face_count": signal.face_count, + "person_count": signal.person_count, + }, + ) + self._put( + "evidence", + _evidence_id(submission_id, evidence), + _evidence_payload(submission_id, evidence), + ) + self._rescore_submission(submission_id) + + def _sync_face_crops( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None, + ) -> None: + if image_store is None: + return + try: + original = image_store.image_payload(submission_id) + except Exception: + return + + signal = HeuristicFacePersonDetector().detect(original) + crop_dir = self.face_crop_image_dir / submission_id + if crop_dir.exists(): + for stale in crop_dir.glob("crop-*.jpg"): + try: + stale.unlink() + except OSError: + continue + + face_crops: list[dict[str, Any]] = [] + for index, box in enumerate(signal.face_boxes[:3], start=1): + crops = build_face_crop_derivatives(original, [box]) + if not crops: + continue + try: + crop_dir.mkdir(parents=True, exist_ok=True) + crop_path = crop_dir / f"crop-{index}.jpg" + crop_path.write_bytes(crops[0].content) + except OSError: + continue + face_crops.append( + { + "index": index, + "url": f"{self.face_crop_public_prefix}/{submission_id}/crop-{index}.jpg", + "box": [int(value) for value in box], + } + ) + + submission = self._get("submissions", submission_id) + submission["faceCrops"] = face_crops + self._put("submissions", submission_id, submission) + + def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int: + # Delete biometric face-crop files older than the retention window and + # drop their references from the owning submission, with an audit trail. + retention_days = int(self.face_crop_retention_days) + if retention_days <= 0 or not self.face_crop_image_dir.exists(): + return 0 + now = now_epoch if now_epoch is not None else datetime.now().timestamp() + cutoff = now - retention_days * 86400 + purged = 0 + with self._write_lock: + for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()): + submission_id = crop_dir.name + removed: list[str] = [] + for crop_file in crop_dir.glob("crop-*.jpg"): + try: + if crop_file.stat().st_mtime >= cutoff: + continue + crop_file.unlink() + except OSError: + continue + removed.append(crop_file.name) + purged += 1 + if not removed: + continue + change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)" + try: + submission = self._get("submissions", submission_id) + except KeyError: + self.add_audit_event("system", "Face crop purged", submission_id, change) + continue + remaining = [ + crop + for crop in submission.get("faceCrops", []) + if f"crop-{crop.get('index')}.jpg" not in removed + ] + with self._transaction() as conn: + submission["faceCrops"] = remaining + self._put("submissions", submission_id, submission, conn=conn) + self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn) + return purged + + def _refresh_existing_submission_file_facts( + self, + image_store: LocalSubmissionImageStore, + *, + queue_id: str, + ) -> None: + existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)} + for record in image_store.submission_records(): + submission_id = str(record["id"]) + if submission_id not in existing_submission_ids: + continue + submission = self._get("submissions", submission_id) + submission["asset"] = record["asset"] + submission["fileFacts"] = { + **submission.get("fileFacts", {}), + "size": f"{record.get('width', 1)} x {record.get('height', 1)}", + "format": record.get("format", "FILE"), + } + self._put("submissions", submission_id, submission) + + def _rerun_internal_analysis( + self, + submission_id: str, + image_store: LocalSubmissionImageStore, + ) -> list[Evidence]: + repository = self._knowledge_repository() + analyzer = InternalAnalyzer( + repository, + FingerprintService(), + HeuristicFacePersonDetector(), + ) + domain_evidence = analyzer.analyze( + submission_id, + image_store.image_payload(submission_id), + ) + for item in domain_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "rerun" + self._put("evidence", payload["id"], payload) + self._sync_similar_reference_images(submission_id, domain_evidence) + self._increment_knowledge_contribution_counts(submission_id, domain_evidence) + self._rescore_submission(submission_id) + self._sync_face_crops(submission_id, image_store) + return domain_evidence + + def _rerun_google_image_search( + self, + submission_id: str, + image_store: LocalSubmissionImageStore, + ) -> list[Evidence]: + if self.provider_runtime.google_adapter is None: + return [] + + original_image = image_store.image_payload(submission_id) + derivative = build_external_derivative(original_image) + domain_evidence = self.provider_runtime.google_adapter.detect( + submission_id, + derivative, + self.provider_runtime.external_policy, + ) + call_count = 1 + face_crop_evidence, face_crop_calls = self._google_face_crop_web_detection( + submission_id, + original_image, + ) + call_count += face_crop_calls + domain_evidence.extend(face_crop_evidence) + for item in domain_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "rerun" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + self._sync_search_result_image_similarity( + submission_id, + domain_evidence, + image_store, + status="rerun", + ) + + google_provider = self._get("providers", "google") + if any( + item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED} + for item in domain_evidence + ): + google_provider["lastFailure"] = domain_evidence[0].reason if domain_evidence else "Google image search failed" + else: + google_provider["lastSuccess"] = _now_label() + google_provider["lastFailure"] = "없음" + self._apply_provider_usage_delta("google", call_count, google_provider) + self.add_audit_event( + "rights.ops", + "Provider called", + f"google / {submission_id}", + "image web detection rerun", + ) + self._rescore_submission(submission_id) + return domain_evidence + + def _google_face_crop_web_detection( + self, + submission_id: str, + original_image: Any, + ) -> tuple[list[Evidence], int]: + if self.provider_runtime.google_adapter is None: + return [], 0 + if not self.provider_runtime.face_crop_web_detection_enabled: + return [], 0 + + submission = self._get("submissions", submission_id) + stored_crops = submission.get("faceCrops") + if stored_crops is None: + signal = HeuristicFacePersonDetector().detect(original_image) + indexed_boxes = [ + (index, box) + for index, box in enumerate(signal.face_boxes[:3], start=1) + ] + else: + # 저장된 faceCrops의 index를 그대로 사용해 증거의 crop_index가 + # 워크벤치 썸네일 번호와 항상 일치하게 한다(중간 박스 크롭 실패 시 + # 재열거하면 번호가 어긋난다). + indexed_boxes = [ + (int(item.get("index", 0)), tuple(int(value) for value in item.get("box", []))) + for item in stored_crops + if len(item.get("box", [])) == 4 + ] + if not indexed_boxes: + return [], 0 + + evidence: list[Evidence] = [] + call_count = 0 + for crop_index, box in indexed_boxes: + crops = build_face_crop_derivatives(original_image, [box]) + if not crops: + continue + crop = crops[0] + call_count += 1 + crop_evidence = self.provider_runtime.google_adapter.detect( + f"{submission_id}:face-crop-{crop_index}", + crop, + self.provider_runtime.external_policy, + ) + for item in crop_evidence: + face_web_evidence = _face_crop_web_evidence(submission_id, crop_index, item) + evidence.append(face_web_evidence) + crop_matches = self._face_crop_search_result_similarity_evidence( + submission_id, + crop_index, + crop, + face_web_evidence, + ) + if crop_matches: + evidence.extend(crop_matches) + if call_count: + # Governance: personal (biometric) images left the system to a + # third-party API. The external call itself is permitted, but the + # transmission must be auditable. + self.add_audit_event( + "system", + "Personal image sent to external provider", + submission_id, + f"{call_count} face crop(s) -> Google Vision web detection", + ) + return evidence, call_count + + def _auto_google_custom_search( + self, + submission_id: str, + source_evidence: list[Evidence], + image_store: LocalSubmissionImageStore | None = None, + ) -> None: + if self.provider_runtime.google_custom_search_adapter is None: + return + if self.provider_runtime.auto_google_custom_query_limit <= 0: + return + + query_plan = SearchQueryGenerator().plan( + source_evidence, + [], + max_queries=self.provider_runtime.auto_google_custom_query_limit, + ) + if not query_plan: + return + + existing_signatures = _existing_google_custom_query_signatures( + self._evidence_by_submission().get(submission_id, []) + ) + can_compare_search_images = self._can_compare_search_result_images( + submission_id, + image_store, + ) + all_domain_evidence: list[Evidence] = [] + history_entries: list[dict[str, Any]] = [] + external_call_count = 0 + + for planned in query_plan: + query = planned.query + image_signature = _google_custom_image_query_signature(query) + if image_signature in existing_signatures: + continue + existing_signatures.add(image_signature) + + image_evidence = self.provider_runtime.google_custom_search_adapter.search_images( + submission_id, + query, + self.provider_runtime.google_custom_search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in image_evidence): + external_call_count += 1 + image_evidence = [ + replace( + item, + data={ + **item.data, + "query_strategy": planned.strategy, + "query_source": planned.source, + "query_priority": planned.priority, + }, + ) + for item in image_evidence + ] + all_domain_evidence.extend(image_evidence) + for item in image_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "auto" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + + similarity_evidence: list[Evidence] = [] + remaining_matches = self._search_result_similarity_remaining_budget( + submission_id, + image_store, + ) + if can_compare_search_images and remaining_matches > 0: + similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + image_evidence, + image_store, + status="auto", + max_matches=remaining_matches, + ) + all_domain_evidence.extend(similarity_evidence) + + history_entries.append( + { + "provider": "google_search", + "query": query, + "status": _query_history_status(image_evidence), + "strategy": planned.strategy, + "source": planned.source, + "timestamp": _now_label(), + "count": len(image_evidence), + } + ) + + web_signature = _google_custom_web_query_signature(query) + if web_signature in existing_signatures: + continue + existing_signatures.add(web_signature) + + web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( + submission_id, + query, + self.provider_runtime.google_custom_search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + external_call_count += 1 + web_evidence = [ + replace( + item, + data={ + **item.data, + "query_strategy": planned.strategy, + "query_source": planned.source, + "query_priority": planned.priority, + }, + ) + for item in web_evidence + ] + all_domain_evidence.extend(web_evidence) + for item in web_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "auto" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + all_domain_evidence.extend( + self._sync_search_result_image_similarity( + submission_id, + web_evidence, + image_store, + status="auto", + max_matches=self._search_result_similarity_remaining_budget( + submission_id, + image_store, + ), + ) + ) + history_entries.append( + { + "provider": "google_search", + "query": query, + "status": _query_history_status(web_evidence), + "strategy": planned.strategy, + "source": planned.source, + "timestamp": _now_label(), + "count": len(web_evidence), + } + ) + + if not history_entries: + return + + submission = self._get("submissions", submission_id) + submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])] + self._put("submissions", submission_id, submission) + + google_search_provider = self._get("providers", "google_search") + if any( + item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} + for item in all_domain_evidence + ): + google_search_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Google custom search failed" + else: + google_search_provider["lastSuccess"] = _now_label() + google_search_provider["lastFailure"] = "없음" + self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider) + self.add_audit_event( + "system", + "Provider called", + f"google_search / {submission_id}", + f"auto custom search query batch: {', '.join(item['query'] for item in history_entries)}", + ) + + def _auto_naver_search( + self, + submission_id: str, + source_evidence: list[Evidence], + image_store: LocalSubmissionImageStore | None = None, + ) -> None: + if self.provider_runtime.naver_adapter is None: + return + if self.provider_runtime.auto_naver_query_limit <= 0: + return + + query_plan = SearchQueryGenerator().plan( + source_evidence, + [], + max_queries=self.provider_runtime.auto_naver_query_limit, + ) + if not query_plan: + return + + existing_signatures = _existing_naver_query_signatures( + self._evidence_by_submission().get(submission_id, []) + ) + can_compare_search_images = self._can_compare_search_result_images( + submission_id, + image_store, + ) + all_domain_evidence: list[Evidence] = [] + history_entries: list[dict[str, Any]] = [] + external_call_count = 0 + blog_query_count = 0 + web_query_count = 0 + promoter = SearchResultPromoter() + + for planned in query_plan: + query = planned.query + signature = _naver_query_signature(query) + if signature in existing_signatures: + continue + existing_signatures.add(signature) + + domain_evidence = self.provider_runtime.naver_adapter.search( + submission_id, + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence): + external_call_count += 1 + domain_evidence = [ + replace( + item, + data={ + **item.data, + "query_strategy": planned.strategy, + "query_source": planned.source, + "query_priority": planned.priority, + }, + ) + for item in domain_evidence + ] + domain_evidence = promoter.promote(domain_evidence) + all_domain_evidence.extend(domain_evidence) + for item in domain_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "auto" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + similarity_evidence: list[Evidence] = [] + remaining_matches = self._search_result_similarity_remaining_budget( + submission_id, + image_store, + ) + if can_compare_search_images and remaining_matches > 0: + similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + domain_evidence, + image_store, + status="auto", + max_matches=remaining_matches, + ) + all_domain_evidence.extend(similarity_evidence) + + history_entries.append( + { + "provider": "naver", + "query": query, + "status": _query_history_status(domain_evidence), + "strategy": planned.strategy, + "source": planned.source, + "timestamp": _now_label(), + "count": len(domain_evidence), + } + ) + + page_similarity_evidence: list[Evidence] = [] + remaining_matches = self._search_result_similarity_remaining_budget( + submission_id, + image_store, + ) + if blog_query_count < self.provider_runtime.auto_naver_blog_query_limit: + blog_signature = _naver_blog_query_signature(query) + if blog_signature in existing_signatures: + continue + existing_signatures.add(blog_signature) + blog_query_count += 1 + + page_evidence = self.provider_runtime.naver_adapter.search_pages( + submission_id, + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): + external_call_count += 1 + page_evidence = [ + replace( + item, + data={ + **item.data, + "query_strategy": planned.strategy, + "query_source": planned.source, + "query_priority": planned.priority, + }, + ) + for item in page_evidence + ] + page_evidence = promoter.promote(page_evidence) + all_domain_evidence.extend(page_evidence) + for item in page_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "auto" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + page_similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + page_evidence, + image_store, + status="auto", + max_matches=remaining_matches if can_compare_search_images else 0, + ) + all_domain_evidence.extend(page_similarity_evidence) + history_entries.append( + { + "provider": "naver_blog", + "query": query, + "status": _query_history_status(page_evidence), + "strategy": planned.strategy, + "source": planned.source, + "timestamp": _now_label(), + "count": len(page_evidence), + } + ) + + if web_query_count < self.provider_runtime.auto_naver_web_query_limit: + web_signature = _naver_web_query_signature(query) + if web_signature in existing_signatures: + continue + existing_signatures.add(web_signature) + web_query_count += 1 + + web_evidence = self.provider_runtime.naver_adapter.search_web_pages( + submission_id, + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + external_call_count += 1 + web_evidence = [ + replace( + item, + data={ + **item.data, + "query_strategy": planned.strategy, + "query_source": planned.source, + "query_priority": planned.priority, + }, + ) + for item in web_evidence + ] + web_evidence = promoter.promote(web_evidence) + all_domain_evidence.extend(web_evidence) + for item in web_evidence: + payload = _evidence_payload(submission_id, item) + payload["status"] = "auto" if payload["status"] == "active" else payload["status"] + self._put("evidence", payload["id"], payload) + all_domain_evidence.extend( + self._sync_search_result_image_similarity( + submission_id, + web_evidence, + image_store, + status="auto", + max_matches=( + self._search_result_similarity_remaining_budget( + submission_id, + image_store, + ) + if can_compare_search_images + else 0 + ), + ) + ) + history_entries.append( + { + "provider": "naver_web", + "query": query, + "status": _query_history_status(web_evidence), + "strategy": planned.strategy, + "source": planned.source, + "timestamp": _now_label(), + "count": len(web_evidence), + } + ) + + if not history_entries: + return + + submission = self._get("submissions", submission_id) + submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])] + self._put("submissions", submission_id, submission) + + naver_provider = self._get("providers", "naver") + if any( + item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} + for item in all_domain_evidence + ): + naver_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Naver search failed" + else: + naver_provider["lastSuccess"] = _now_label() + naver_provider["lastFailure"] = "없음" + self._apply_provider_usage_delta("naver", external_call_count, naver_provider) + self.add_audit_event( + "system", + "Provider called", + f"naver / {submission_id}", + f"auto text query batch: {', '.join(item['query'] for item in history_entries)}", + ) + diff --git a/tests/rights_filter/server/test_http_app.py b/tests/rights_filter/server/test_http_app.py index c0b7a3a..fd4af85 100644 --- a/tests/rights_filter/server/test_http_app.py +++ b/tests/rights_filter/server/test_http_app.py @@ -13,6 +13,7 @@ from rights_filter.analysis.face_person_detection import FacePersonSignal from rights_filter.server.http_app import build_server from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server import sqlite_store as sqlite_store_module +from rights_filter.server import store_enrichment as store_enrichment_module from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.integrations.env_clients import build_provider_runtime @@ -75,6 +76,7 @@ class OneFaceDetector: @pytest.fixture(autouse=True) def use_explicit_test_face_detector(monkeypatch): monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) def _fixtures(tmp_path: Path): @@ -1233,6 +1235,7 @@ class OneFaceBoxDetector: def test_face_crops_are_persisted_served_and_listed_in_review(tmp_path: Path, monkeypatch): monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector()) static_dir, image_store, store = _png_fixtures(tmp_path) server = build_server(host="127.0.0.1", port=0, store=store, image_store=image_store, static_dir=static_dir) _start(server) diff --git a/tests/rights_filter/server/test_sqlite_store.py b/tests/rights_filter/server/test_sqlite_store.py index ebd1f43..e7c41fb 100644 --- a/tests/rights_filter/server/test_sqlite_store.py +++ b/tests/rights_filter/server/test_sqlite_store.py @@ -12,6 +12,7 @@ from rights_filter.analysis.preprocessing import ImagePayload, build_face_crop_d from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server import sqlite_store as sqlite_store_module +from rights_filter.server import store_enrichment as store_enrichment_module from rights_filter.server import store_remote_fetch as remote_fetch_module from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.integrations.env_clients import build_provider_runtime @@ -100,6 +101,7 @@ class OneFaceDetector: @pytest.fixture(autouse=True) def use_explicit_test_face_detector(monkeypatch): monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) def _image_store(tmp_path: Path) -> LocalSubmissionImageStore: @@ -1012,10 +1014,12 @@ def test_sqlite_store_refreshes_existing_local_face_evidence_when_reloaded(tmp_p store.initialize() monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector()) store.seed_from_image_store(image_store) assert not any(item["source"] == "face" for item in store.review("SUB-DB1")["evidence"]) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) imported = store.seed_from_image_store(image_store) review = store.review("SUB-DB1") @@ -7415,6 +7419,7 @@ def test_sqlite_store_rerun_enrichment_runs_google_web_detection_on_face_crop_wh store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store.initialize() monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) store.seed_from_image_store(image_store) calls_after_seed = len(transport.calls) @@ -7493,6 +7498,7 @@ def test_sqlite_store_compares_face_crop_search_result_images_against_crop_finge ) store.initialize() monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) store.seed_from_image_store(image_store) review = store.rerun_enrichment("SUB-FACE1", image_store) @@ -7585,6 +7591,7 @@ def test_sqlite_store_compares_face_crop_search_result_page_images_against_crop_ ) store.initialize() monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) store.seed_from_image_store(image_store) review = store.rerun_enrichment("SUB-FACE1", image_store) @@ -7670,6 +7677,7 @@ def test_sqlite_store_rerun_enrichment_uses_face_crop_page_title_for_naver_searc ) store.initialize() monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) + monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) store.seed_from_image_store(image_store) calls_after_seed = len(transport.calls)