refactor: extract enrichment internals into StoreEnrichmentMixin

Move provider-state sync, local face evidence/crop sync + retention purge,
internal/Google re-analysis, face-crop web detection, and the auto Google/Naver
search drivers into a mixin; CopyrighterStore inherits it. Drop now-unused
imports. Tests now also patch HeuristicFacePersonDetector on store_enrichment
(face detection moved there). sqlite_store.py 2358 -> 1598 lines (-70% overall).
This commit is contained in:
유창욱 2026-06-20 22:18:38 +09:00
parent 8e0a8c307d
commit 3bc07d94c3
4 changed files with 802 additions and 760 deletions

View file

@ -15,9 +15,7 @@ from urllib.parse import urlparse
from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector
from rights_filter.analysis.fingerprints import FingerprintService from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.internal_analyzer import InternalAnalyzer from rights_filter.analysis.internal_analyzer import InternalAnalyzer
from rights_filter.analysis.preprocessing import build_external_derivative, build_face_crop_derivatives
from rights_filter.analysis.risk_scoring import RiskScorer from rights_filter.analysis.risk_scoring import RiskScorer
from rights_filter.analysis.search_query_generation import SearchQueryGenerator
from rights_filter.analysis.search_result_promoter import SearchResultPromoter from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import ( from rights_filter.domain.records import (
Evidence, Evidence,
@ -50,6 +48,7 @@ from rights_filter.server.store_remote_fetch import (
_fetch_stylesheet_url_bytes, _fetch_stylesheet_url_bytes,
_fetch_url_bytes, _fetch_url_bytes,
) )
from rights_filter.server.store_enrichment import StoreEnrichmentMixin
from rights_filter.server.store_persistence import StorePersistenceMixin from rights_filter.server.store_persistence import StorePersistenceMixin
from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin
from rights_filter.server.store_schema import ( from rights_filter.server.store_schema import (
@ -64,23 +63,10 @@ from rights_filter.server.store_serialization import (
_evidence_id, _evidence_id,
_evidence_matches_provider, _evidence_matches_provider,
_evidence_payload, _evidence_payload,
_existing_google_custom_query_signatures,
_existing_naver_query_signatures,
_external_provider_ids,
_external_provider_state_for_submission,
_face_crop_web_evidence,
_google_custom_image_query_signature,
_google_custom_web_query_signature,
_google_weak_label_title,
_is_google_weak_label_payload,
_knowledge_type_value, _knowledge_type_value,
_naver_blog_query_signature,
_naver_query_signature,
_naver_web_query_signature,
_now_label, _now_label,
_provider_item_failed, _provider_item_failed,
_provider_item_has_result, _provider_item_has_result,
_query_history_status,
_stable_id, _stable_id,
_submission_payload, _submission_payload,
_submission_search_hint_evidence, _submission_search_hint_evidence,
@ -90,7 +76,7 @@ from rights_filter.server.store_serialization import (
from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_text import _text_list, _unique_texts
class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin): class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreEnrichmentMixin):
def __init__( def __init__(
self, self,
db_path: Path | str, db_path: Path | str,
@ -1610,747 +1596,3 @@ class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin):
), ),
) )
def _sync_provider_payloads(self) -> None:
existing = {provider["id"]: provider for provider in self._all("providers")}
for provider_id, payload in self.provider_runtime.provider_payloads.items():
merged = {
**payload,
"usage": existing.get(provider_id, {}).get("usage", payload["usage"]),
}
self._put("providers", provider_id, merged)
self._sync_submission_provider_state()
def _sync_submission_provider_state(self, queue_id: str | None = None) -> None:
provider_payloads = {provider["id"]: provider for provider in self._all("providers")}
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
for submission in self._all("submissions", queue_id=queue_id):
provider_state = submission.get("providerState", {})
evidence = evidence_by_submission.get(str(submission["id"]), [])
provider_state["internal"] = "ok"
for provider_id in _external_provider_ids(provider_payloads):
provider_state[provider_id] = _external_provider_state_for_submission(
provider_payloads,
provider_id,
submission,
evidence,
)
submission["providerState"] = provider_state
self._put("submissions", submission["id"], submission)
def _normalize_saved_google_weak_labels_and_rescore(self, queue_id: str | None = None) -> None:
changed_submissions: set[str] = set()
with self._connect() as conn:
rows = conn.execute(
"""
select id, submission_id, payload
from evidence
""".strip() + (
" where submission_id in (select id from submissions where queue_id = ?)"
if queue_id is not None
else ""
),
(queue_id,) if queue_id is not None else (),
).fetchall()
for row in rows:
payload = json.loads(row["payload"])
if _is_google_weak_label_payload(payload):
payload["title"] = _google_weak_label_title(str(payload.get("title", "")))
payload["confidence"] = 0.0
payload["contributed"] = False
payload["status"] = "weak"
self._put("evidence", row["id"], payload)
changed_submissions.add(str(row["submission_id"]))
for submission_id in changed_submissions:
self._rescore_submission(submission_id)
def _refresh_existing_local_face_evidence(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
if not existing_submission_ids:
return
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
detector = HeuristicFacePersonDetector()
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
if any(
item.get("source") == "face"
for item in evidence_by_submission.get(submission_id, [])
):
continue
signal = detector.detect(image_store.image_payload(submission_id))
if not signal.present:
continue
evidence = Evidence(
source=EvidenceSource.FACE_PERSON,
reason="Face/person detected",
confidence=0.8,
data={
"face_count": signal.face_count,
"person_count": signal.person_count,
},
)
self._put(
"evidence",
_evidence_id(submission_id, evidence),
_evidence_payload(submission_id, evidence),
)
self._rescore_submission(submission_id)
def _sync_face_crops(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> None:
if image_store is None:
return
try:
original = image_store.image_payload(submission_id)
except Exception:
return
signal = HeuristicFacePersonDetector().detect(original)
crop_dir = self.face_crop_image_dir / submission_id
if crop_dir.exists():
for stale in crop_dir.glob("crop-*.jpg"):
try:
stale.unlink()
except OSError:
continue
face_crops: list[dict[str, Any]] = []
for index, box in enumerate(signal.face_boxes[:3], start=1):
crops = build_face_crop_derivatives(original, [box])
if not crops:
continue
try:
crop_dir.mkdir(parents=True, exist_ok=True)
crop_path = crop_dir / f"crop-{index}.jpg"
crop_path.write_bytes(crops[0].content)
except OSError:
continue
face_crops.append(
{
"index": index,
"url": f"{self.face_crop_public_prefix}/{submission_id}/crop-{index}.jpg",
"box": [int(value) for value in box],
}
)
submission = self._get("submissions", submission_id)
submission["faceCrops"] = face_crops
self._put("submissions", submission_id, submission)
def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int:
# Delete biometric face-crop files older than the retention window and
# drop their references from the owning submission, with an audit trail.
retention_days = int(self.face_crop_retention_days)
if retention_days <= 0 or not self.face_crop_image_dir.exists():
return 0
now = now_epoch if now_epoch is not None else datetime.now().timestamp()
cutoff = now - retention_days * 86400
purged = 0
with self._write_lock:
for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()):
submission_id = crop_dir.name
removed: list[str] = []
for crop_file in crop_dir.glob("crop-*.jpg"):
try:
if crop_file.stat().st_mtime >= cutoff:
continue
crop_file.unlink()
except OSError:
continue
removed.append(crop_file.name)
purged += 1
if not removed:
continue
change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)"
try:
submission = self._get("submissions", submission_id)
except KeyError:
self.add_audit_event("system", "Face crop purged", submission_id, change)
continue
remaining = [
crop
for crop in submission.get("faceCrops", [])
if f"crop-{crop.get('index')}.jpg" not in removed
]
with self._transaction() as conn:
submission["faceCrops"] = remaining
self._put("submissions", submission_id, submission, conn=conn)
self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn)
return purged
def _refresh_existing_submission_file_facts(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
submission = self._get("submissions", submission_id)
submission["asset"] = record["asset"]
submission["fileFacts"] = {
**submission.get("fileFacts", {}),
"size": f"{record.get('width', 1)} x {record.get('height', 1)}",
"format": record.get("format", "FILE"),
}
self._put("submissions", submission_id, submission)
def _rerun_internal_analysis(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
repository = self._knowledge_repository()
analyzer = InternalAnalyzer(
repository,
FingerprintService(),
HeuristicFacePersonDetector(),
)
domain_evidence = analyzer.analyze(
submission_id,
image_store.image_payload(submission_id),
)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun"
self._put("evidence", payload["id"], payload)
self._sync_similar_reference_images(submission_id, domain_evidence)
self._increment_knowledge_contribution_counts(submission_id, domain_evidence)
self._rescore_submission(submission_id)
self._sync_face_crops(submission_id, image_store)
return domain_evidence
def _rerun_google_image_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
if self.provider_runtime.google_adapter is None:
return []
original_image = image_store.image_payload(submission_id)
derivative = build_external_derivative(original_image)
domain_evidence = self.provider_runtime.google_adapter.detect(
submission_id,
derivative,
self.provider_runtime.external_policy,
)
call_count = 1
face_crop_evidence, face_crop_calls = self._google_face_crop_web_detection(
submission_id,
original_image,
)
call_count += face_crop_calls
domain_evidence.extend(face_crop_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="rerun",
)
google_provider = self._get("providers", "google")
if any(
item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED}
for item in domain_evidence
):
google_provider["lastFailure"] = domain_evidence[0].reason if domain_evidence else "Google image search failed"
else:
google_provider["lastSuccess"] = _now_label()
google_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google", call_count, google_provider)
self.add_audit_event(
"rights.ops",
"Provider called",
f"google / {submission_id}",
"image web detection rerun",
)
self._rescore_submission(submission_id)
return domain_evidence
def _google_face_crop_web_detection(
self,
submission_id: str,
original_image: Any,
) -> tuple[list[Evidence], int]:
if self.provider_runtime.google_adapter is None:
return [], 0
if not self.provider_runtime.face_crop_web_detection_enabled:
return [], 0
submission = self._get("submissions", submission_id)
stored_crops = submission.get("faceCrops")
if stored_crops is None:
signal = HeuristicFacePersonDetector().detect(original_image)
indexed_boxes = [
(index, box)
for index, box in enumerate(signal.face_boxes[:3], start=1)
]
else:
# 저장된 faceCrops의 index를 그대로 사용해 증거의 crop_index가
# 워크벤치 썸네일 번호와 항상 일치하게 한다(중간 박스 크롭 실패 시
# 재열거하면 번호가 어긋난다).
indexed_boxes = [
(int(item.get("index", 0)), tuple(int(value) for value in item.get("box", [])))
for item in stored_crops
if len(item.get("box", [])) == 4
]
if not indexed_boxes:
return [], 0
evidence: list[Evidence] = []
call_count = 0
for crop_index, box in indexed_boxes:
crops = build_face_crop_derivatives(original_image, [box])
if not crops:
continue
crop = crops[0]
call_count += 1
crop_evidence = self.provider_runtime.google_adapter.detect(
f"{submission_id}:face-crop-{crop_index}",
crop,
self.provider_runtime.external_policy,
)
for item in crop_evidence:
face_web_evidence = _face_crop_web_evidence(submission_id, crop_index, item)
evidence.append(face_web_evidence)
crop_matches = self._face_crop_search_result_similarity_evidence(
submission_id,
crop_index,
crop,
face_web_evidence,
)
if crop_matches:
evidence.extend(crop_matches)
if call_count:
# Governance: personal (biometric) images left the system to a
# third-party API. The external call itself is permitted, but the
# transmission must be auditable.
self.add_audit_event(
"system",
"Personal image sent to external provider",
submission_id,
f"{call_count} face crop(s) -> Google Vision web detection",
)
return evidence, call_count
def _auto_google_custom_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.google_custom_search_adapter is None:
return
if self.provider_runtime.auto_google_custom_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_google_custom_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_google_custom_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
for planned in query_plan:
query = planned.query
image_signature = _google_custom_image_query_signature(query)
if image_signature in existing_signatures:
continue
existing_signatures.add(image_signature)
image_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in image_evidence):
external_call_count += 1
image_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in image_evidence
]
all_domain_evidence.extend(image_evidence)
for item in image_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
image_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(image_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(image_evidence),
}
)
web_signature = _google_custom_web_query_signature(query)
if web_signature in existing_signatures:
continue
existing_signatures.add(web_signature)
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=self._search_result_similarity_remaining_budget(
submission_id,
image_store,
),
)
)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
google_search_provider = self._get("providers", "google_search")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
google_search_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Google custom search failed"
else:
google_search_provider["lastSuccess"] = _now_label()
google_search_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider)
self.add_audit_event(
"system",
"Provider called",
f"google_search / {submission_id}",
f"auto custom search query batch: {', '.join(item['query'] for item in history_entries)}",
)
def _auto_naver_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.naver_adapter is None:
return
if self.provider_runtime.auto_naver_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_naver_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_naver_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
blog_query_count = 0
web_query_count = 0
promoter = SearchResultPromoter()
for planned in query_plan:
query = planned.query
signature = _naver_query_signature(query)
if signature in existing_signatures:
continue
existing_signatures.add(signature)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence):
external_call_count += 1
domain_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in domain_evidence
]
domain_evidence = promoter.promote(domain_evidence)
all_domain_evidence.extend(domain_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "naver",
"query": query,
"status": _query_history_status(domain_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(domain_evidence),
}
)
page_similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if blog_query_count < self.provider_runtime.auto_naver_blog_query_limit:
blog_signature = _naver_blog_query_signature(query)
if blog_signature in existing_signatures:
continue
existing_signatures.add(blog_signature)
blog_query_count += 1
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
external_call_count += 1
page_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in page_evidence
]
page_evidence = promoter.promote(page_evidence)
all_domain_evidence.extend(page_evidence)
for item in page_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="auto",
max_matches=remaining_matches if can_compare_search_images else 0,
)
all_domain_evidence.extend(page_similarity_evidence)
history_entries.append(
{
"provider": "naver_blog",
"query": query,
"status": _query_history_status(page_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(page_evidence),
}
)
if web_query_count < self.provider_runtime.auto_naver_web_query_limit:
web_signature = _naver_web_query_signature(query)
if web_signature in existing_signatures:
continue
existing_signatures.add(web_signature)
web_query_count += 1
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
web_evidence = promoter.promote(web_evidence)
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=(
self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images
else 0
),
)
)
history_entries.append(
{
"provider": "naver_web",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
naver_provider = self._get("providers", "naver")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
naver_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Naver search failed"
else:
naver_provider["lastSuccess"] = _now_label()
naver_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("naver", external_call_count, naver_provider)
self.add_audit_event(
"system",
"Provider called",
f"naver / {submission_id}",
f"auto text query batch: {', '.join(item['query'] for item in history_entries)}",
)

View file

@ -0,0 +1,789 @@
"""Enrichment internals for CopyrighterStore, as a mixin.
Provider-state sync, local face evidence/crop sync + retention, internal/Google
re-analysis, face-crop web detection, and the auto Google/Naver search drivers.
Mixed into CopyrighterStore; relies on persistence methods, self.* attributes
(provider_runtime, image dirs, fetchers, _write_lock), and self.add_audit_event /
rescoring provided by the host class. Behavior unchanged.
"""
from __future__ import annotations
import json
from dataclasses import replace
from datetime import datetime
from typing import Any
from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.internal_analyzer import InternalAnalyzer
from rights_filter.analysis.preprocessing import build_external_derivative, build_face_crop_derivatives
from rights_filter.analysis.search_query_generation import SearchQueryGenerator
from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_serialization import (
_evidence_id,
_evidence_payload,
_existing_google_custom_query_signatures,
_existing_naver_query_signatures,
_external_provider_ids,
_external_provider_state_for_submission,
_face_crop_web_evidence,
_google_custom_image_query_signature,
_google_custom_web_query_signature,
_google_weak_label_title,
_is_google_weak_label_payload,
_naver_blog_query_signature,
_naver_query_signature,
_naver_web_query_signature,
_now_label,
_query_history_status,
)
class StoreEnrichmentMixin:
def _sync_provider_payloads(self) -> None:
existing = {provider["id"]: provider for provider in self._all("providers")}
for provider_id, payload in self.provider_runtime.provider_payloads.items():
merged = {
**payload,
"usage": existing.get(provider_id, {}).get("usage", payload["usage"]),
}
self._put("providers", provider_id, merged)
self._sync_submission_provider_state()
def _sync_submission_provider_state(self, queue_id: str | None = None) -> None:
provider_payloads = {provider["id"]: provider for provider in self._all("providers")}
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
for submission in self._all("submissions", queue_id=queue_id):
provider_state = submission.get("providerState", {})
evidence = evidence_by_submission.get(str(submission["id"]), [])
provider_state["internal"] = "ok"
for provider_id in _external_provider_ids(provider_payloads):
provider_state[provider_id] = _external_provider_state_for_submission(
provider_payloads,
provider_id,
submission,
evidence,
)
submission["providerState"] = provider_state
self._put("submissions", submission["id"], submission)
def _normalize_saved_google_weak_labels_and_rescore(self, queue_id: str | None = None) -> None:
changed_submissions: set[str] = set()
with self._connect() as conn:
rows = conn.execute(
"""
select id, submission_id, payload
from evidence
""".strip() + (
" where submission_id in (select id from submissions where queue_id = ?)"
if queue_id is not None
else ""
),
(queue_id,) if queue_id is not None else (),
).fetchall()
for row in rows:
payload = json.loads(row["payload"])
if _is_google_weak_label_payload(payload):
payload["title"] = _google_weak_label_title(str(payload.get("title", "")))
payload["confidence"] = 0.0
payload["contributed"] = False
payload["status"] = "weak"
self._put("evidence", row["id"], payload)
changed_submissions.add(str(row["submission_id"]))
for submission_id in changed_submissions:
self._rescore_submission(submission_id)
def _refresh_existing_local_face_evidence(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
if not existing_submission_ids:
return
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
detector = HeuristicFacePersonDetector()
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
if any(
item.get("source") == "face"
for item in evidence_by_submission.get(submission_id, [])
):
continue
signal = detector.detect(image_store.image_payload(submission_id))
if not signal.present:
continue
evidence = Evidence(
source=EvidenceSource.FACE_PERSON,
reason="Face/person detected",
confidence=0.8,
data={
"face_count": signal.face_count,
"person_count": signal.person_count,
},
)
self._put(
"evidence",
_evidence_id(submission_id, evidence),
_evidence_payload(submission_id, evidence),
)
self._rescore_submission(submission_id)
def _sync_face_crops(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> None:
if image_store is None:
return
try:
original = image_store.image_payload(submission_id)
except Exception:
return
signal = HeuristicFacePersonDetector().detect(original)
crop_dir = self.face_crop_image_dir / submission_id
if crop_dir.exists():
for stale in crop_dir.glob("crop-*.jpg"):
try:
stale.unlink()
except OSError:
continue
face_crops: list[dict[str, Any]] = []
for index, box in enumerate(signal.face_boxes[:3], start=1):
crops = build_face_crop_derivatives(original, [box])
if not crops:
continue
try:
crop_dir.mkdir(parents=True, exist_ok=True)
crop_path = crop_dir / f"crop-{index}.jpg"
crop_path.write_bytes(crops[0].content)
except OSError:
continue
face_crops.append(
{
"index": index,
"url": f"{self.face_crop_public_prefix}/{submission_id}/crop-{index}.jpg",
"box": [int(value) for value in box],
}
)
submission = self._get("submissions", submission_id)
submission["faceCrops"] = face_crops
self._put("submissions", submission_id, submission)
def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int:
# Delete biometric face-crop files older than the retention window and
# drop their references from the owning submission, with an audit trail.
retention_days = int(self.face_crop_retention_days)
if retention_days <= 0 or not self.face_crop_image_dir.exists():
return 0
now = now_epoch if now_epoch is not None else datetime.now().timestamp()
cutoff = now - retention_days * 86400
purged = 0
with self._write_lock:
for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()):
submission_id = crop_dir.name
removed: list[str] = []
for crop_file in crop_dir.glob("crop-*.jpg"):
try:
if crop_file.stat().st_mtime >= cutoff:
continue
crop_file.unlink()
except OSError:
continue
removed.append(crop_file.name)
purged += 1
if not removed:
continue
change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)"
try:
submission = self._get("submissions", submission_id)
except KeyError:
self.add_audit_event("system", "Face crop purged", submission_id, change)
continue
remaining = [
crop
for crop in submission.get("faceCrops", [])
if f"crop-{crop.get('index')}.jpg" not in removed
]
with self._transaction() as conn:
submission["faceCrops"] = remaining
self._put("submissions", submission_id, submission, conn=conn)
self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn)
return purged
def _refresh_existing_submission_file_facts(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
submission = self._get("submissions", submission_id)
submission["asset"] = record["asset"]
submission["fileFacts"] = {
**submission.get("fileFacts", {}),
"size": f"{record.get('width', 1)} x {record.get('height', 1)}",
"format": record.get("format", "FILE"),
}
self._put("submissions", submission_id, submission)
def _rerun_internal_analysis(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
repository = self._knowledge_repository()
analyzer = InternalAnalyzer(
repository,
FingerprintService(),
HeuristicFacePersonDetector(),
)
domain_evidence = analyzer.analyze(
submission_id,
image_store.image_payload(submission_id),
)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun"
self._put("evidence", payload["id"], payload)
self._sync_similar_reference_images(submission_id, domain_evidence)
self._increment_knowledge_contribution_counts(submission_id, domain_evidence)
self._rescore_submission(submission_id)
self._sync_face_crops(submission_id, image_store)
return domain_evidence
def _rerun_google_image_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
if self.provider_runtime.google_adapter is None:
return []
original_image = image_store.image_payload(submission_id)
derivative = build_external_derivative(original_image)
domain_evidence = self.provider_runtime.google_adapter.detect(
submission_id,
derivative,
self.provider_runtime.external_policy,
)
call_count = 1
face_crop_evidence, face_crop_calls = self._google_face_crop_web_detection(
submission_id,
original_image,
)
call_count += face_crop_calls
domain_evidence.extend(face_crop_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="rerun",
)
google_provider = self._get("providers", "google")
if any(
item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED}
for item in domain_evidence
):
google_provider["lastFailure"] = domain_evidence[0].reason if domain_evidence else "Google image search failed"
else:
google_provider["lastSuccess"] = _now_label()
google_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google", call_count, google_provider)
self.add_audit_event(
"rights.ops",
"Provider called",
f"google / {submission_id}",
"image web detection rerun",
)
self._rescore_submission(submission_id)
return domain_evidence
def _google_face_crop_web_detection(
self,
submission_id: str,
original_image: Any,
) -> tuple[list[Evidence], int]:
if self.provider_runtime.google_adapter is None:
return [], 0
if not self.provider_runtime.face_crop_web_detection_enabled:
return [], 0
submission = self._get("submissions", submission_id)
stored_crops = submission.get("faceCrops")
if stored_crops is None:
signal = HeuristicFacePersonDetector().detect(original_image)
indexed_boxes = [
(index, box)
for index, box in enumerate(signal.face_boxes[:3], start=1)
]
else:
# 저장된 faceCrops의 index를 그대로 사용해 증거의 crop_index가
# 워크벤치 썸네일 번호와 항상 일치하게 한다(중간 박스 크롭 실패 시
# 재열거하면 번호가 어긋난다).
indexed_boxes = [
(int(item.get("index", 0)), tuple(int(value) for value in item.get("box", [])))
for item in stored_crops
if len(item.get("box", [])) == 4
]
if not indexed_boxes:
return [], 0
evidence: list[Evidence] = []
call_count = 0
for crop_index, box in indexed_boxes:
crops = build_face_crop_derivatives(original_image, [box])
if not crops:
continue
crop = crops[0]
call_count += 1
crop_evidence = self.provider_runtime.google_adapter.detect(
f"{submission_id}:face-crop-{crop_index}",
crop,
self.provider_runtime.external_policy,
)
for item in crop_evidence:
face_web_evidence = _face_crop_web_evidence(submission_id, crop_index, item)
evidence.append(face_web_evidence)
crop_matches = self._face_crop_search_result_similarity_evidence(
submission_id,
crop_index,
crop,
face_web_evidence,
)
if crop_matches:
evidence.extend(crop_matches)
if call_count:
# Governance: personal (biometric) images left the system to a
# third-party API. The external call itself is permitted, but the
# transmission must be auditable.
self.add_audit_event(
"system",
"Personal image sent to external provider",
submission_id,
f"{call_count} face crop(s) -> Google Vision web detection",
)
return evidence, call_count
def _auto_google_custom_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.google_custom_search_adapter is None:
return
if self.provider_runtime.auto_google_custom_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_google_custom_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_google_custom_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
for planned in query_plan:
query = planned.query
image_signature = _google_custom_image_query_signature(query)
if image_signature in existing_signatures:
continue
existing_signatures.add(image_signature)
image_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in image_evidence):
external_call_count += 1
image_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in image_evidence
]
all_domain_evidence.extend(image_evidence)
for item in image_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
image_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(image_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(image_evidence),
}
)
web_signature = _google_custom_web_query_signature(query)
if web_signature in existing_signatures:
continue
existing_signatures.add(web_signature)
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=self._search_result_similarity_remaining_budget(
submission_id,
image_store,
),
)
)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
google_search_provider = self._get("providers", "google_search")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
google_search_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Google custom search failed"
else:
google_search_provider["lastSuccess"] = _now_label()
google_search_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider)
self.add_audit_event(
"system",
"Provider called",
f"google_search / {submission_id}",
f"auto custom search query batch: {', '.join(item['query'] for item in history_entries)}",
)
def _auto_naver_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.naver_adapter is None:
return
if self.provider_runtime.auto_naver_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_naver_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_naver_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
blog_query_count = 0
web_query_count = 0
promoter = SearchResultPromoter()
for planned in query_plan:
query = planned.query
signature = _naver_query_signature(query)
if signature in existing_signatures:
continue
existing_signatures.add(signature)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence):
external_call_count += 1
domain_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in domain_evidence
]
domain_evidence = promoter.promote(domain_evidence)
all_domain_evidence.extend(domain_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "naver",
"query": query,
"status": _query_history_status(domain_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(domain_evidence),
}
)
page_similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if blog_query_count < self.provider_runtime.auto_naver_blog_query_limit:
blog_signature = _naver_blog_query_signature(query)
if blog_signature in existing_signatures:
continue
existing_signatures.add(blog_signature)
blog_query_count += 1
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
external_call_count += 1
page_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in page_evidence
]
page_evidence = promoter.promote(page_evidence)
all_domain_evidence.extend(page_evidence)
for item in page_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="auto",
max_matches=remaining_matches if can_compare_search_images else 0,
)
all_domain_evidence.extend(page_similarity_evidence)
history_entries.append(
{
"provider": "naver_blog",
"query": query,
"status": _query_history_status(page_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(page_evidence),
}
)
if web_query_count < self.provider_runtime.auto_naver_web_query_limit:
web_signature = _naver_web_query_signature(query)
if web_signature in existing_signatures:
continue
existing_signatures.add(web_signature)
web_query_count += 1
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
web_evidence = promoter.promote(web_evidence)
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=(
self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images
else 0
),
)
)
history_entries.append(
{
"provider": "naver_web",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
naver_provider = self._get("providers", "naver")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
naver_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Naver search failed"
else:
naver_provider["lastSuccess"] = _now_label()
naver_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("naver", external_call_count, naver_provider)
self.add_audit_event(
"system",
"Provider called",
f"naver / {submission_id}",
f"auto text query batch: {', '.join(item['query'] for item in history_entries)}",
)

View file

@ -13,6 +13,7 @@ from rights_filter.analysis.face_person_detection import FacePersonSignal
from rights_filter.server.http_app import build_server from rights_filter.server.http_app import build_server
from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_enrichment as store_enrichment_module
from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime from rights_filter.integrations.env_clients import build_provider_runtime
@ -75,6 +76,7 @@ class OneFaceDetector:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def use_explicit_test_face_detector(monkeypatch): def use_explicit_test_face_detector(monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
def _fixtures(tmp_path: Path): def _fixtures(tmp_path: Path):
@ -1233,6 +1235,7 @@ class OneFaceBoxDetector:
def test_face_crops_are_persisted_served_and_listed_in_review(tmp_path: Path, monkeypatch): def test_face_crops_are_persisted_served_and_listed_in_review(tmp_path: Path, monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector())
static_dir, image_store, store = _png_fixtures(tmp_path) static_dir, image_store, store = _png_fixtures(tmp_path)
server = build_server(host="127.0.0.1", port=0, store=store, image_store=image_store, static_dir=static_dir) server = build_server(host="127.0.0.1", port=0, store=store, image_store=image_store, static_dir=static_dir)
_start(server) _start(server)

View file

@ -12,6 +12,7 @@ from rights_filter.analysis.preprocessing import ImagePayload, build_face_crop_d
from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_enrichment as store_enrichment_module
from rights_filter.server import store_remote_fetch as remote_fetch_module from rights_filter.server import store_remote_fetch as remote_fetch_module
from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime from rights_filter.integrations.env_clients import build_provider_runtime
@ -100,6 +101,7 @@ class OneFaceDetector:
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def use_explicit_test_face_detector(monkeypatch): def use_explicit_test_face_detector(monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
def _image_store(tmp_path: Path) -> LocalSubmissionImageStore: def _image_store(tmp_path: Path) -> LocalSubmissionImageStore:
@ -1012,10 +1014,12 @@ def test_sqlite_store_refreshes_existing_local_face_evidence_when_reloaded(tmp_p
store.initialize() store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector())
store.seed_from_image_store(image_store) store.seed_from_image_store(image_store)
assert not any(item["source"] == "face" for item in store.review("SUB-DB1")["evidence"]) assert not any(item["source"] == "face" for item in store.review("SUB-DB1")["evidence"])
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
imported = store.seed_from_image_store(image_store) imported = store.seed_from_image_store(image_store)
review = store.review("SUB-DB1") review = store.review("SUB-DB1")
@ -7415,6 +7419,7 @@ def test_sqlite_store_rerun_enrichment_runs_google_web_detection_on_face_crop_wh
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime) store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime)
store.initialize() store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
store.seed_from_image_store(image_store) store.seed_from_image_store(image_store)
calls_after_seed = len(transport.calls) calls_after_seed = len(transport.calls)
@ -7493,6 +7498,7 @@ def test_sqlite_store_compares_face_crop_search_result_images_against_crop_finge
) )
store.initialize() store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store) store.seed_from_image_store(image_store)
review = store.rerun_enrichment("SUB-FACE1", image_store) review = store.rerun_enrichment("SUB-FACE1", image_store)
@ -7585,6 +7591,7 @@ def test_sqlite_store_compares_face_crop_search_result_page_images_against_crop_
) )
store.initialize() store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store) store.seed_from_image_store(image_store)
review = store.rerun_enrichment("SUB-FACE1", image_store) review = store.rerun_enrichment("SUB-FACE1", image_store)
@ -7670,6 +7677,7 @@ def test_sqlite_store_rerun_enrichment_uses_face_crop_page_title_for_naver_searc
) )
store.initialize() store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector()) monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store) store.seed_from_image_store(image_store)
calls_after_seed = len(transport.calls) calls_after_seed = len(transport.calls)