diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 2c8d376..31db4a2 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -4,7 +4,6 @@ import base64 import hashlib import html import json -import mimetypes import os import re import shutil @@ -13,7 +12,6 @@ import threading from contextlib import contextmanager from dataclasses import replace from datetime import datetime -from io import BytesIO from pathlib import Path from typing import Any, Callable from urllib.parse import urlparse @@ -31,7 +29,6 @@ from rights_filter.domain.records import ( InMemoryRightsFilterRepository, KnowledgeBaseEntry, KnowledgeEntryType, - KnowledgeProvenance, ) from rights_filter.integrations.cloud_vision_web_detection import ( CloudVisionWebDetectionAdapter, @@ -60,6 +57,29 @@ from rights_filter.server.store_schema import ( _ensure_schema_version, _ensure_typed_columns, ) +from rights_filter.server.store_serialization import ( + _domain_evidence_from_ui, + _evidence_id, + _evidence_matches_provider, + _evidence_payload, + _external_provider_ids, + _external_provider_state_for_submission, + _face_crop_web_evidence, + _google_weak_label_title, + _image_size_from_bytes, + _image_suffix_from_url, + _is_google_weak_label_payload, + _knowledge_provenance, + _now_label, + _provider_item_failed, + _provider_item_has_result, + _safe_filename, + _safe_image_suffix, + _stable_id, + _strip_html, + _submission_payload, + _timestamp_id, +) from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_url_utils import ( _decoded_nested_url, @@ -3591,402 +3611,3 @@ def _knowledge_entry_type(value: str) -> KnowledgeEntryType: return KnowledgeEntryType.OTHER -def _knowledge_provenance(value: str) -> KnowledgeProvenance: - if value == "automatic": - return KnowledgeProvenance.AUTOMATIC_REJECTION - try: - return KnowledgeProvenance(value) - except ValueError: - return KnowledgeProvenance.EXTERNAL_EVIDENCE - - -def _safe_image_suffix(filename: str, content_type: str) -> str: - suffix = Path(filename).suffix.lower() - if not suffix and content_type: - suffix = mimetypes.guess_extension(content_type.split(";", 1)[0].strip()) or "" - if suffix == ".jpe": - suffix = ".jpg" - if suffix not in SUPPORTED_IMAGE_SUFFIXES: - raise ValueError("unsupported knowledge image type") - return suffix - - -def _safe_filename(value: str) -> str: - return re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip(".-") - - -def _image_size_from_bytes(content: bytes) -> tuple[int, int]: - try: - from PIL import Image - - with Image.open(BytesIO(content)) as image: - return int(image.width), int(image.height) - except Exception: - return 1, 1 - - -def _stable_id(prefix: str, *parts: str) -> str: - digest = hashlib.sha1("\x1f".join(parts).encode("utf-8")).hexdigest()[:20] - return f"{prefix}-{digest}" - - -def _strip_html(value: str) -> str: - without_tags = re.sub(r"<[^>]+>", "", value) - return " ".join(without_tags.split()) - - -def _image_suffix_from_url(url: str) -> str: - suffix = Path(urlparse(url).path).suffix.lower() - if suffix == ".jpe": - suffix = ".jpg" - if suffix not in SUPPORTED_IMAGE_SUFFIXES: - return ".jpg" - return suffix - - -def _submission_payload( - record: dict[str, Any], - score: int, - band: str, - reasons: list[str], - provider_payloads: dict[str, dict[str, Any]] | None = None, -) -> dict[str, Any]: - submission_id = record["id"] - title = record.get("title", submission_id) - submitted_at = record.get("submitted_at") or _now_label() - submitted_epoch = _label_to_epoch(submitted_at) - provider_payloads = provider_payloads or {} - provider_state = {"internal": "ok"} - for provider_id in provider_payloads: - if provider_id == "internal": - continue - provider_state[provider_id] = _external_provider_state(provider_payloads, provider_id) - return { - "id": submission_id, - "title": title, - "asset": record["asset"], - "riskScore": score, - "riskBand": band, - "submittedAt": submitted_at, - "submittedEpoch": submitted_epoch, - "lastAnalysis": _now_label(), - "applicantStatus": "검토 중", - "decisionStatus": "unreviewed", - "applicantVisible": False, - "reasons": reasons or ["분석 근거 없음"], - "providerState": provider_state, - "fileFacts": { - "size": f"{record.get('width', 1)} x {record.get('height', 1)}", - "format": record.get("format", "FILE"), - "submitted": submitted_at, - "analysis": "v1", - }, - "derivativeNote": "로컬 이미지 저장소에서 읽은 내부 검토용 이미지입니다.", - "recommendation": { - "label": "운영자 검토 필요", - "detail": "자동 분석은 참고 정보이며 최종 결정은 운영자가 기록합니다.", - }, - "derivedPreview": { - "automatic": False, - "entryName": f"{title} / {submission_id}", - "effect": "반려 시에만 지식 DB 후보로 기록됩니다.", - }, - "queryHistory": [], - "similar": [{"asset": record["asset"], "label": "local submission"}], - "evidence": [], - } - - -def _external_provider_state( - provider_payloads: dict[str, dict[str, Any]], - provider_id: str, -) -> str: - provider = provider_payloads.get(provider_id, {}) - return "pending" if provider.get("enabled") else "disabled" - - -def _external_provider_state_for_submission( - provider_payloads: dict[str, dict[str, Any]], - provider_id: str, - submission: dict[str, Any], - evidence: list[dict[str, Any]], -) -> str: - provider = provider_payloads.get(provider_id, {}) - if not provider.get("enabled"): - return "disabled" - - matching_evidence = [ - item for item in evidence if _evidence_matches_provider(item, provider_id) - ] - matching_history = [ - item - for item in submission.get("queryHistory", []) or [] - if _history_matches_provider(item, provider_id) - ] - - if any(_provider_item_failed(item) for item in matching_evidence) or any( - str(item.get("status", "")) == "failed" for item in matching_history - ): - return "failed" - if any(_provider_item_has_result(item) for item in matching_evidence) or any( - int(item.get("count", 0) or 0) > 0 and str(item.get("status", "")) in {"auto", "manual"} - for item in matching_history - ): - return "covered" - if matching_evidence or any(str(item.get("status", "")) in {"auto", "manual"} for item in matching_history): - return "empty" - return "not_run" - - -def _evidence_matches_provider(evidence: dict[str, Any], provider_id: str) -> bool: - source = str(evidence.get("source", "")) - domain = str(evidence.get("domain", "")) - if provider_id == "naver": - return source == "naver" or domain in {"naver", "naver_blog", "naver_web"} - if provider_id == "google": - return source == "google" and domain != "google_custom_search" - if provider_id == "google_search": - return domain == "google_custom_search" - if provider_id == "llm": - return source == "llm" or (source == "failure" and "LLM" in str(evidence.get("title", ""))) - return domain == provider_id - - -def _history_matches_provider(history: dict[str, Any], provider_id: str) -> bool: - provider = str(history.get("provider", "")) - if provider_id == "naver": - return provider in {"naver", "naver_blog", "naver_web"} - return provider == provider_id - - -def _provider_item_failed(evidence: dict[str, Any]) -> bool: - title = str(evidence.get("title", "")).lower() - if "returned no results" in title: - return False - if str(evidence.get("source", "")) == "failure": - return True - return "failed" in title - - -def _provider_item_has_result(evidence: dict[str, Any]) -> bool: - title = str(evidence.get("title", "")).lower() - if "returned no results" in title: - return False - if str(evidence.get("source", "")) == "failure": - return False - if str(evidence.get("matchType", "")) == "weak_label": - return False - return True - - -def _external_provider_ids(provider_payloads: dict[str, dict[str, Any]]) -> list[str]: - return [provider_id for provider_id in provider_payloads if provider_id != "internal"] - - -def _face_crop_web_evidence( - submission_id: str, - crop_index: int, - evidence: Evidence, -) -> Evidence: - data = { - **evidence.data, - "submission_id": submission_id, - "face_crop_search": True, - "crop_index": crop_index, - "weak_hint": True, - "privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.", - } - return Evidence( - source=evidence.source, - reason=f"Google face crop web evidence: {evidence.reason}", - confidence=evidence.confidence, - data=data, - ) - - -def _evidence_payload(submission_id: str, evidence: Evidence) -> dict[str, Any]: - source = _ui_source(evidence.source) - result_url = str(evidence.data.get("url", evidence.data.get("result_url", ""))) - image_url = str(evidence.data.get("image_url", "")) - page_image_urls = _unique_texts(_text_list(evidence.data.get("page_image_urls", []))) - thumbnail_url = str(evidence.data.get("thumbnail_url", "")) - if not thumbnail_url and not image_url and page_image_urls: - thumbnail_url = page_image_urls[0] - page_title = str(evidence.data.get("page_title", evidence.data.get("title", ""))) - face_crop_search = bool(evidence.data.get("face_crop_search", False)) - knowledge_entry_status = str(evidence.data.get("knowledge_entry_status", "")) - return { - "id": _evidence_id(submission_id, evidence), - "group": "watchlist" if knowledge_entry_status == "watchlist" else "face_web" if face_crop_search else _ui_group(evidence.source), - "source": source, - "title": evidence.reason, - "confidence": evidence.confidence, - "query": str(evidence.data.get("query", "")), - "querySignature": str(evidence.data.get("query_signature", "")), - "queryStrategy": str(evidence.data.get("query_strategy", "")), - "querySource": str(evidence.data.get("query_source", "")), - "searchType": str(evidence.data.get("search_type", "")), - "domain": str(evidence.data.get("provider", evidence.data.get("domain", "internal"))), - "url": result_url, - "imageUrl": image_url, - "thumbnailUrl": thumbnail_url, - "pageImageUrls": page_image_urls, - "remoteImageUrl": str(evidence.data.get("remote_image_url", "")), - "sourcePageUrl": str(evidence.data.get("source_page_url", "")), - "imageCandidateSource": str(evidence.data.get("image_candidate_source", "")), - "bloggerName": str(evidence.data.get("blogger_name", "")), - "bloggerLink": str(evidence.data.get("blogger_link", "")), - "postdate": str(evidence.data.get("postdate", "")), - "pageTitle": page_title, - "matchType": str(evidence.data.get("match", "")), - "rank": evidence.data.get("rank", ""), - "providerScore": evidence.data.get("score", ""), - "faceCropSearch": face_crop_search, - "cropIndex": evidence.data.get("crop_index", ""), - "privacyNote": str(evidence.data.get("privacy_note", "")), - "knowledgeEntryId": str(evidence.data.get("knowledge_entry_id", "")), - "knowledgeEntryName": str(evidence.data.get("knowledge_name", "")), - "knowledgeEntryStatus": knowledge_entry_status, - "sourceSubmissionId": str(evidence.data.get("source_submission_id", "")), - "similarity": evidence.data.get("similarity", ""), - "retrievedAt": _now_label(), - "contributed": source not in {"llm", "failure"} and not evidence.data.get("weak_hint", False), - "sourceEvidenceIds": evidence.data.get("source_evidence_ids", []), - "status": "active", - "submission_id": submission_id, - } - - -def _domain_evidence_from_ui(payload: dict[str, Any]) -> Evidence: - title = str(payload.get("title", "")) - return Evidence( - source=_domain_source_from_ui_payload(payload), - reason=title, - confidence=float(payload.get("confidence", 0)), - data={ - "evidence_id": payload.get("id", ""), - "query": payload.get("query", ""), - "query_signature": payload.get("querySignature", ""), - "query_strategy": payload.get("queryStrategy", ""), - "query_source": payload.get("querySource", ""), - "search_type": payload.get("searchType", ""), - "domain": payload.get("domain", ""), - "url": payload.get("url", ""), - "result_url": payload.get("url", ""), - "image_url": payload.get("imageUrl", ""), - "thumbnail_url": payload.get("thumbnailUrl", ""), - "remote_image_url": payload.get("remoteImageUrl", ""), - "source_page_url": payload.get("sourcePageUrl", ""), - "image_candidate_source": payload.get("imageCandidateSource", ""), - "blogger_name": payload.get("bloggerName", ""), - "blogger_link": payload.get("bloggerLink", ""), - "postdate": payload.get("postdate", ""), - "page_title": payload.get("pageTitle", ""), - "match": payload.get("matchType", ""), - "rank": payload.get("rank", ""), - "score": payload.get("providerScore", ""), - "contributed": payload.get("contributed", True), - "status": payload.get("status", ""), - "weak_hint": bool(payload.get("faceCropSearch", False)) or _is_google_weak_label_payload(payload), - "face_crop_search": bool(payload.get("faceCropSearch", False)), - "crop_index": payload.get("cropIndex", ""), - "privacy_note": payload.get("privacyNote", ""), - "operator_status": payload.get("operatorStatus", ""), - "knowledge_entry_id": payload.get("knowledgeEntryId", ""), - "knowledge_name": payload.get("knowledgeEntryName", ""), - "knowledge_entry_status": payload.get("knowledgeEntryStatus", ""), - "source_submission_id": payload.get("sourceSubmissionId", ""), - "similarity": payload.get("similarity", ""), - }, - ) - - -def _domain_source_from_ui_payload(payload: dict[str, Any]) -> EvidenceSource: - source = str(payload.get("source", "")) - title = str(payload.get("title", "")).lower() - if source == "failure" and ( - "disabled" in title or "skipped" in title or "usage limit" in title - ): - return EvidenceSource.EXTERNAL_SKIPPED - return _domain_source_from_ui(source) - - -def _domain_source_from_ui(source: str) -> EvidenceSource: - if source == "google": - return EvidenceSource.WEB_DETECTION - if source == "naver": - return EvidenceSource.NAVER_SEARCH - if source == "face": - return EvidenceSource.FACE_PERSON - if source == "failure": - return EvidenceSource.FAILURE - if source == "llm": - return EvidenceSource.LLM_SUMMARY - return EvidenceSource.FINGERPRINT - - -def _is_google_weak_label_payload(payload: dict[str, Any]) -> bool: - title = str(payload.get("title", "")) - return ( - payload.get("source") == "google" - and not payload.get("url") - and (title.startswith("Best guess label ") or title.startswith("Google weak label ")) - ) - - -def _google_weak_label_title(title: str) -> str: - if title.startswith("Best guess label "): - return "Google weak label " + title.removeprefix("Best guess label ") - return title - - -def _evidence_id(submission_id: str, evidence: Evidence) -> str: - base = f"{submission_id}:{evidence.source}:{evidence.reason}:{json.dumps(evidence.data, sort_keys=True, default=str)}" - return "ev-" + hashlib.sha256(base.encode("utf-8")).hexdigest()[:24] - - -def _ui_source(source: EvidenceSource) -> str: - if source == EvidenceSource.WEB_DETECTION: - return "google" - if source == EvidenceSource.NAVER_SEARCH: - return "naver" - if source == EvidenceSource.LLM_SUMMARY: - return "llm" - if source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED, EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}: - return "failure" - if source == EvidenceSource.FACE_PERSON: - return "face" - return "fingerprint" - - -def _ui_group(source: EvidenceSource) -> str: - ui_source = _ui_source(source) - if ui_source in {"fingerprint", "face"}: - return "internal" - return ui_source - - -def _now_label() -> str: - return datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - -def _label_to_epoch(value: str) -> int: - # Parse the timestamp label into a Unix epoch for chronological sorting in - # the operator GUI. Falls back to "now" when the label is missing or in an - # unrecognized format (mirroring the submittedAt `or _now_label()` fallback). - text = str(value).strip() - if not text: - return int(datetime.now().timestamp()) - for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): - try: - return int(datetime.strptime(text, fmt).timestamp()) - except ValueError: - continue - try: - return int(datetime.fromisoformat(text).timestamp()) - except ValueError: - return int(datetime.now().timestamp()) - - -def _timestamp_id() -> str: - return datetime.now().strftime("%Y%m%d%H%M%S%f") diff --git a/src/rights_filter/server/store_serialization.py b/src/rights_filter/server/store_serialization.py new file mode 100644 index 0000000..7deb31a --- /dev/null +++ b/src/rights_filter/server/store_serialization.py @@ -0,0 +1,424 @@ +"""Payload (de)serialization and small domain-mapping helpers for the store. + +Extracted from sqlite_store.py: submission/evidence payload builders, provider- +state derivation, UI<->domain evidence mapping, weak-label handling, and id/label +helpers. Pure functions over dict/domain payloads; no dependency on the store +class or its module-level constants. Behavior unchanged. +""" + +from __future__ import annotations + +import hashlib +import json +import mimetypes +import re +from datetime import datetime +from io import BytesIO +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +from rights_filter.domain.records import Evidence, EvidenceSource, KnowledgeProvenance +from rights_filter.server.image_store import SUPPORTED_IMAGE_SUFFIXES +from rights_filter.server.store_text import _text_list, _unique_texts + + +def _knowledge_provenance(value: str) -> KnowledgeProvenance: + if value == "automatic": + return KnowledgeProvenance.AUTOMATIC_REJECTION + try: + return KnowledgeProvenance(value) + except ValueError: + return KnowledgeProvenance.EXTERNAL_EVIDENCE + + +def _safe_image_suffix(filename: str, content_type: str) -> str: + suffix = Path(filename).suffix.lower() + if not suffix and content_type: + suffix = mimetypes.guess_extension(content_type.split(";", 1)[0].strip()) or "" + if suffix == ".jpe": + suffix = ".jpg" + if suffix not in SUPPORTED_IMAGE_SUFFIXES: + raise ValueError("unsupported knowledge image type") + return suffix + + +def _safe_filename(value: str) -> str: + return re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip(".-") + + +def _image_size_from_bytes(content: bytes) -> tuple[int, int]: + try: + from PIL import Image + + with Image.open(BytesIO(content)) as image: + return int(image.width), int(image.height) + except Exception: + return 1, 1 + + +def _stable_id(prefix: str, *parts: str) -> str: + digest = hashlib.sha1("\x1f".join(parts).encode("utf-8")).hexdigest()[:20] + return f"{prefix}-{digest}" + + +def _strip_html(value: str) -> str: + without_tags = re.sub(r"<[^>]+>", "", value) + return " ".join(without_tags.split()) + + +def _image_suffix_from_url(url: str) -> str: + suffix = Path(urlparse(url).path).suffix.lower() + if suffix == ".jpe": + suffix = ".jpg" + if suffix not in SUPPORTED_IMAGE_SUFFIXES: + return ".jpg" + return suffix + + +def _submission_payload( + record: dict[str, Any], + score: int, + band: str, + reasons: list[str], + provider_payloads: dict[str, dict[str, Any]] | None = None, +) -> dict[str, Any]: + submission_id = record["id"] + title = record.get("title", submission_id) + submitted_at = record.get("submitted_at") or _now_label() + submitted_epoch = _label_to_epoch(submitted_at) + provider_payloads = provider_payloads or {} + provider_state = {"internal": "ok"} + for provider_id in provider_payloads: + if provider_id == "internal": + continue + provider_state[provider_id] = _external_provider_state(provider_payloads, provider_id) + return { + "id": submission_id, + "title": title, + "asset": record["asset"], + "riskScore": score, + "riskBand": band, + "submittedAt": submitted_at, + "submittedEpoch": submitted_epoch, + "lastAnalysis": _now_label(), + "applicantStatus": "검토 중", + "decisionStatus": "unreviewed", + "applicantVisible": False, + "reasons": reasons or ["분석 근거 없음"], + "providerState": provider_state, + "fileFacts": { + "size": f"{record.get('width', 1)} x {record.get('height', 1)}", + "format": record.get("format", "FILE"), + "submitted": submitted_at, + "analysis": "v1", + }, + "derivativeNote": "로컬 이미지 저장소에서 읽은 내부 검토용 이미지입니다.", + "recommendation": { + "label": "운영자 검토 필요", + "detail": "자동 분석은 참고 정보이며 최종 결정은 운영자가 기록합니다.", + }, + "derivedPreview": { + "automatic": False, + "entryName": f"{title} / {submission_id}", + "effect": "반려 시에만 지식 DB 후보로 기록됩니다.", + }, + "queryHistory": [], + "similar": [{"asset": record["asset"], "label": "local submission"}], + "evidence": [], + } + + +def _external_provider_state( + provider_payloads: dict[str, dict[str, Any]], + provider_id: str, +) -> str: + provider = provider_payloads.get(provider_id, {}) + return "pending" if provider.get("enabled") else "disabled" + + +def _external_provider_state_for_submission( + provider_payloads: dict[str, dict[str, Any]], + provider_id: str, + submission: dict[str, Any], + evidence: list[dict[str, Any]], +) -> str: + provider = provider_payloads.get(provider_id, {}) + if not provider.get("enabled"): + return "disabled" + + matching_evidence = [ + item for item in evidence if _evidence_matches_provider(item, provider_id) + ] + matching_history = [ + item + for item in submission.get("queryHistory", []) or [] + if _history_matches_provider(item, provider_id) + ] + + if any(_provider_item_failed(item) for item in matching_evidence) or any( + str(item.get("status", "")) == "failed" for item in matching_history + ): + return "failed" + if any(_provider_item_has_result(item) for item in matching_evidence) or any( + int(item.get("count", 0) or 0) > 0 and str(item.get("status", "")) in {"auto", "manual"} + for item in matching_history + ): + return "covered" + if matching_evidence or any(str(item.get("status", "")) in {"auto", "manual"} for item in matching_history): + return "empty" + return "not_run" + + +def _evidence_matches_provider(evidence: dict[str, Any], provider_id: str) -> bool: + source = str(evidence.get("source", "")) + domain = str(evidence.get("domain", "")) + if provider_id == "naver": + return source == "naver" or domain in {"naver", "naver_blog", "naver_web"} + if provider_id == "google": + return source == "google" and domain != "google_custom_search" + if provider_id == "google_search": + return domain == "google_custom_search" + if provider_id == "llm": + return source == "llm" or (source == "failure" and "LLM" in str(evidence.get("title", ""))) + return domain == provider_id + + +def _history_matches_provider(history: dict[str, Any], provider_id: str) -> bool: + provider = str(history.get("provider", "")) + if provider_id == "naver": + return provider in {"naver", "naver_blog", "naver_web"} + return provider == provider_id + + +def _provider_item_failed(evidence: dict[str, Any]) -> bool: + title = str(evidence.get("title", "")).lower() + if "returned no results" in title: + return False + if str(evidence.get("source", "")) == "failure": + return True + return "failed" in title + + +def _provider_item_has_result(evidence: dict[str, Any]) -> bool: + title = str(evidence.get("title", "")).lower() + if "returned no results" in title: + return False + if str(evidence.get("source", "")) == "failure": + return False + if str(evidence.get("matchType", "")) == "weak_label": + return False + return True + + +def _external_provider_ids(provider_payloads: dict[str, dict[str, Any]]) -> list[str]: + return [provider_id for provider_id in provider_payloads if provider_id != "internal"] + + +def _face_crop_web_evidence( + submission_id: str, + crop_index: int, + evidence: Evidence, +) -> Evidence: + data = { + **evidence.data, + "submission_id": submission_id, + "face_crop_search": True, + "crop_index": crop_index, + "weak_hint": True, + "privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.", + } + return Evidence( + source=evidence.source, + reason=f"Google face crop web evidence: {evidence.reason}", + confidence=evidence.confidence, + data=data, + ) + + +def _evidence_payload(submission_id: str, evidence: Evidence) -> dict[str, Any]: + source = _ui_source(evidence.source) + result_url = str(evidence.data.get("url", evidence.data.get("result_url", ""))) + image_url = str(evidence.data.get("image_url", "")) + page_image_urls = _unique_texts(_text_list(evidence.data.get("page_image_urls", []))) + thumbnail_url = str(evidence.data.get("thumbnail_url", "")) + if not thumbnail_url and not image_url and page_image_urls: + thumbnail_url = page_image_urls[0] + page_title = str(evidence.data.get("page_title", evidence.data.get("title", ""))) + face_crop_search = bool(evidence.data.get("face_crop_search", False)) + knowledge_entry_status = str(evidence.data.get("knowledge_entry_status", "")) + return { + "id": _evidence_id(submission_id, evidence), + "group": "watchlist" if knowledge_entry_status == "watchlist" else "face_web" if face_crop_search else _ui_group(evidence.source), + "source": source, + "title": evidence.reason, + "confidence": evidence.confidence, + "query": str(evidence.data.get("query", "")), + "querySignature": str(evidence.data.get("query_signature", "")), + "queryStrategy": str(evidence.data.get("query_strategy", "")), + "querySource": str(evidence.data.get("query_source", "")), + "searchType": str(evidence.data.get("search_type", "")), + "domain": str(evidence.data.get("provider", evidence.data.get("domain", "internal"))), + "url": result_url, + "imageUrl": image_url, + "thumbnailUrl": thumbnail_url, + "pageImageUrls": page_image_urls, + "remoteImageUrl": str(evidence.data.get("remote_image_url", "")), + "sourcePageUrl": str(evidence.data.get("source_page_url", "")), + "imageCandidateSource": str(evidence.data.get("image_candidate_source", "")), + "bloggerName": str(evidence.data.get("blogger_name", "")), + "bloggerLink": str(evidence.data.get("blogger_link", "")), + "postdate": str(evidence.data.get("postdate", "")), + "pageTitle": page_title, + "matchType": str(evidence.data.get("match", "")), + "rank": evidence.data.get("rank", ""), + "providerScore": evidence.data.get("score", ""), + "faceCropSearch": face_crop_search, + "cropIndex": evidence.data.get("crop_index", ""), + "privacyNote": str(evidence.data.get("privacy_note", "")), + "knowledgeEntryId": str(evidence.data.get("knowledge_entry_id", "")), + "knowledgeEntryName": str(evidence.data.get("knowledge_name", "")), + "knowledgeEntryStatus": knowledge_entry_status, + "sourceSubmissionId": str(evidence.data.get("source_submission_id", "")), + "similarity": evidence.data.get("similarity", ""), + "retrievedAt": _now_label(), + "contributed": source not in {"llm", "failure"} and not evidence.data.get("weak_hint", False), + "sourceEvidenceIds": evidence.data.get("source_evidence_ids", []), + "status": "active", + "submission_id": submission_id, + } + + +def _domain_evidence_from_ui(payload: dict[str, Any]) -> Evidence: + title = str(payload.get("title", "")) + return Evidence( + source=_domain_source_from_ui_payload(payload), + reason=title, + confidence=float(payload.get("confidence", 0)), + data={ + "evidence_id": payload.get("id", ""), + "query": payload.get("query", ""), + "query_signature": payload.get("querySignature", ""), + "query_strategy": payload.get("queryStrategy", ""), + "query_source": payload.get("querySource", ""), + "search_type": payload.get("searchType", ""), + "domain": payload.get("domain", ""), + "url": payload.get("url", ""), + "result_url": payload.get("url", ""), + "image_url": payload.get("imageUrl", ""), + "thumbnail_url": payload.get("thumbnailUrl", ""), + "remote_image_url": payload.get("remoteImageUrl", ""), + "source_page_url": payload.get("sourcePageUrl", ""), + "image_candidate_source": payload.get("imageCandidateSource", ""), + "blogger_name": payload.get("bloggerName", ""), + "blogger_link": payload.get("bloggerLink", ""), + "postdate": payload.get("postdate", ""), + "page_title": payload.get("pageTitle", ""), + "match": payload.get("matchType", ""), + "rank": payload.get("rank", ""), + "score": payload.get("providerScore", ""), + "contributed": payload.get("contributed", True), + "status": payload.get("status", ""), + "weak_hint": bool(payload.get("faceCropSearch", False)) or _is_google_weak_label_payload(payload), + "face_crop_search": bool(payload.get("faceCropSearch", False)), + "crop_index": payload.get("cropIndex", ""), + "privacy_note": payload.get("privacyNote", ""), + "operator_status": payload.get("operatorStatus", ""), + "knowledge_entry_id": payload.get("knowledgeEntryId", ""), + "knowledge_name": payload.get("knowledgeEntryName", ""), + "knowledge_entry_status": payload.get("knowledgeEntryStatus", ""), + "source_submission_id": payload.get("sourceSubmissionId", ""), + "similarity": payload.get("similarity", ""), + }, + ) + + +def _domain_source_from_ui_payload(payload: dict[str, Any]) -> EvidenceSource: + source = str(payload.get("source", "")) + title = str(payload.get("title", "")).lower() + if source == "failure" and ( + "disabled" in title or "skipped" in title or "usage limit" in title + ): + return EvidenceSource.EXTERNAL_SKIPPED + return _domain_source_from_ui(source) + + +def _domain_source_from_ui(source: str) -> EvidenceSource: + if source == "google": + return EvidenceSource.WEB_DETECTION + if source == "naver": + return EvidenceSource.NAVER_SEARCH + if source == "face": + return EvidenceSource.FACE_PERSON + if source == "failure": + return EvidenceSource.FAILURE + if source == "llm": + return EvidenceSource.LLM_SUMMARY + return EvidenceSource.FINGERPRINT + + +def _is_google_weak_label_payload(payload: dict[str, Any]) -> bool: + title = str(payload.get("title", "")) + return ( + payload.get("source") == "google" + and not payload.get("url") + and (title.startswith("Best guess label ") or title.startswith("Google weak label ")) + ) + + +def _google_weak_label_title(title: str) -> str: + if title.startswith("Best guess label "): + return "Google weak label " + title.removeprefix("Best guess label ") + return title + + +def _evidence_id(submission_id: str, evidence: Evidence) -> str: + base = f"{submission_id}:{evidence.source}:{evidence.reason}:{json.dumps(evidence.data, sort_keys=True, default=str)}" + return "ev-" + hashlib.sha256(base.encode("utf-8")).hexdigest()[:24] + + +def _ui_source(source: EvidenceSource) -> str: + if source == EvidenceSource.WEB_DETECTION: + return "google" + if source == EvidenceSource.NAVER_SEARCH: + return "naver" + if source == EvidenceSource.LLM_SUMMARY: + return "llm" + if source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED, EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}: + return "failure" + if source == EvidenceSource.FACE_PERSON: + return "face" + return "fingerprint" + + +def _ui_group(source: EvidenceSource) -> str: + ui_source = _ui_source(source) + if ui_source in {"fingerprint", "face"}: + return "internal" + return ui_source + + +def _now_label() -> str: + return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + +def _label_to_epoch(value: str) -> int: + # Parse the timestamp label into a Unix epoch for chronological sorting in + # the operator GUI. Falls back to "now" when the label is missing or in an + # unrecognized format (mirroring the submittedAt `or _now_label()` fallback). + text = str(value).strip() + if not text: + return int(datetime.now().timestamp()) + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"): + try: + return int(datetime.strptime(text, fmt).timestamp()) + except ValueError: + continue + try: + return int(datetime.fromisoformat(text).timestamp()) + except ValueError: + return int(datetime.now().timestamp()) + + +def _timestamp_id() -> str: + return datetime.now().strftime("%Y%m%d%H%M%S%f")