refactor: extract payload serialization helpers into store_serialization

Move submission/evidence payload builders, provider-state derivation, UI<->domain
evidence mapping, weak-label handling, and id/label/image helpers into
store_serialization (depends only on stdlib + domain + url/text helpers, no store
coupling). Behavior-preserving; imported back into sqlite_store. 3992 -> 3613 lines.
This commit is contained in:
유창욱 2026-06-20 21:24:58 +09:00
parent e3bc99e6b9
commit 8e53139029
2 changed files with 447 additions and 402 deletions

View file

@ -4,7 +4,6 @@ import base64
import hashlib import hashlib
import html import html
import json import json
import mimetypes
import os import os
import re import re
import shutil import shutil
@ -13,7 +12,6 @@ import threading
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import replace from dataclasses import replace
from datetime import datetime from datetime import datetime
from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from urllib.parse import urlparse from urllib.parse import urlparse
@ -31,7 +29,6 @@ from rights_filter.domain.records import (
InMemoryRightsFilterRepository, InMemoryRightsFilterRepository,
KnowledgeBaseEntry, KnowledgeBaseEntry,
KnowledgeEntryType, KnowledgeEntryType,
KnowledgeProvenance,
) )
from rights_filter.integrations.cloud_vision_web_detection import ( from rights_filter.integrations.cloud_vision_web_detection import (
CloudVisionWebDetectionAdapter, CloudVisionWebDetectionAdapter,
@ -60,6 +57,29 @@ from rights_filter.server.store_schema import (
_ensure_schema_version, _ensure_schema_version,
_ensure_typed_columns, _ensure_typed_columns,
) )
from rights_filter.server.store_serialization import (
_domain_evidence_from_ui,
_evidence_id,
_evidence_matches_provider,
_evidence_payload,
_external_provider_ids,
_external_provider_state_for_submission,
_face_crop_web_evidence,
_google_weak_label_title,
_image_size_from_bytes,
_image_suffix_from_url,
_is_google_weak_label_payload,
_knowledge_provenance,
_now_label,
_provider_item_failed,
_provider_item_has_result,
_safe_filename,
_safe_image_suffix,
_stable_id,
_strip_html,
_submission_payload,
_timestamp_id,
)
from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_text import _text_list, _unique_texts
from rights_filter.server.store_url_utils import ( from rights_filter.server.store_url_utils import (
_decoded_nested_url, _decoded_nested_url,
@ -3591,402 +3611,3 @@ def _knowledge_entry_type(value: str) -> KnowledgeEntryType:
return KnowledgeEntryType.OTHER return KnowledgeEntryType.OTHER
def _knowledge_provenance(value: str) -> KnowledgeProvenance:
if value == "automatic":
return KnowledgeProvenance.AUTOMATIC_REJECTION
try:
return KnowledgeProvenance(value)
except ValueError:
return KnowledgeProvenance.EXTERNAL_EVIDENCE
def _safe_image_suffix(filename: str, content_type: str) -> str:
suffix = Path(filename).suffix.lower()
if not suffix and content_type:
suffix = mimetypes.guess_extension(content_type.split(";", 1)[0].strip()) or ""
if suffix == ".jpe":
suffix = ".jpg"
if suffix not in SUPPORTED_IMAGE_SUFFIXES:
raise ValueError("unsupported knowledge image type")
return suffix
def _safe_filename(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip(".-")
def _image_size_from_bytes(content: bytes) -> tuple[int, int]:
try:
from PIL import Image
with Image.open(BytesIO(content)) as image:
return int(image.width), int(image.height)
except Exception:
return 1, 1
def _stable_id(prefix: str, *parts: str) -> str:
digest = hashlib.sha1("\x1f".join(parts).encode("utf-8")).hexdigest()[:20]
return f"{prefix}-{digest}"
def _strip_html(value: str) -> str:
without_tags = re.sub(r"<[^>]+>", "", value)
return " ".join(without_tags.split())
def _image_suffix_from_url(url: str) -> str:
suffix = Path(urlparse(url).path).suffix.lower()
if suffix == ".jpe":
suffix = ".jpg"
if suffix not in SUPPORTED_IMAGE_SUFFIXES:
return ".jpg"
return suffix
def _submission_payload(
record: dict[str, Any],
score: int,
band: str,
reasons: list[str],
provider_payloads: dict[str, dict[str, Any]] | None = None,
) -> dict[str, Any]:
submission_id = record["id"]
title = record.get("title", submission_id)
submitted_at = record.get("submitted_at") or _now_label()
submitted_epoch = _label_to_epoch(submitted_at)
provider_payloads = provider_payloads or {}
provider_state = {"internal": "ok"}
for provider_id in provider_payloads:
if provider_id == "internal":
continue
provider_state[provider_id] = _external_provider_state(provider_payloads, provider_id)
return {
"id": submission_id,
"title": title,
"asset": record["asset"],
"riskScore": score,
"riskBand": band,
"submittedAt": submitted_at,
"submittedEpoch": submitted_epoch,
"lastAnalysis": _now_label(),
"applicantStatus": "검토 중",
"decisionStatus": "unreviewed",
"applicantVisible": False,
"reasons": reasons or ["분석 근거 없음"],
"providerState": provider_state,
"fileFacts": {
"size": f"{record.get('width', 1)} x {record.get('height', 1)}",
"format": record.get("format", "FILE"),
"submitted": submitted_at,
"analysis": "v1",
},
"derivativeNote": "로컬 이미지 저장소에서 읽은 내부 검토용 이미지입니다.",
"recommendation": {
"label": "운영자 검토 필요",
"detail": "자동 분석은 참고 정보이며 최종 결정은 운영자가 기록합니다.",
},
"derivedPreview": {
"automatic": False,
"entryName": f"{title} / {submission_id}",
"effect": "반려 시에만 지식 DB 후보로 기록됩니다.",
},
"queryHistory": [],
"similar": [{"asset": record["asset"], "label": "local submission"}],
"evidence": [],
}
def _external_provider_state(
provider_payloads: dict[str, dict[str, Any]],
provider_id: str,
) -> str:
provider = provider_payloads.get(provider_id, {})
return "pending" if provider.get("enabled") else "disabled"
def _external_provider_state_for_submission(
provider_payloads: dict[str, dict[str, Any]],
provider_id: str,
submission: dict[str, Any],
evidence: list[dict[str, Any]],
) -> str:
provider = provider_payloads.get(provider_id, {})
if not provider.get("enabled"):
return "disabled"
matching_evidence = [
item for item in evidence if _evidence_matches_provider(item, provider_id)
]
matching_history = [
item
for item in submission.get("queryHistory", []) or []
if _history_matches_provider(item, provider_id)
]
if any(_provider_item_failed(item) for item in matching_evidence) or any(
str(item.get("status", "")) == "failed" for item in matching_history
):
return "failed"
if any(_provider_item_has_result(item) for item in matching_evidence) or any(
int(item.get("count", 0) or 0) > 0 and str(item.get("status", "")) in {"auto", "manual"}
for item in matching_history
):
return "covered"
if matching_evidence or any(str(item.get("status", "")) in {"auto", "manual"} for item in matching_history):
return "empty"
return "not_run"
def _evidence_matches_provider(evidence: dict[str, Any], provider_id: str) -> bool:
source = str(evidence.get("source", ""))
domain = str(evidence.get("domain", ""))
if provider_id == "naver":
return source == "naver" or domain in {"naver", "naver_blog", "naver_web"}
if provider_id == "google":
return source == "google" and domain != "google_custom_search"
if provider_id == "google_search":
return domain == "google_custom_search"
if provider_id == "llm":
return source == "llm" or (source == "failure" and "LLM" in str(evidence.get("title", "")))
return domain == provider_id
def _history_matches_provider(history: dict[str, Any], provider_id: str) -> bool:
provider = str(history.get("provider", ""))
if provider_id == "naver":
return provider in {"naver", "naver_blog", "naver_web"}
return provider == provider_id
def _provider_item_failed(evidence: dict[str, Any]) -> bool:
title = str(evidence.get("title", "")).lower()
if "returned no results" in title:
return False
if str(evidence.get("source", "")) == "failure":
return True
return "failed" in title
def _provider_item_has_result(evidence: dict[str, Any]) -> bool:
title = str(evidence.get("title", "")).lower()
if "returned no results" in title:
return False
if str(evidence.get("source", "")) == "failure":
return False
if str(evidence.get("matchType", "")) == "weak_label":
return False
return True
def _external_provider_ids(provider_payloads: dict[str, dict[str, Any]]) -> list[str]:
return [provider_id for provider_id in provider_payloads if provider_id != "internal"]
def _face_crop_web_evidence(
submission_id: str,
crop_index: int,
evidence: Evidence,
) -> Evidence:
data = {
**evidence.data,
"submission_id": submission_id,
"face_crop_search": True,
"crop_index": crop_index,
"weak_hint": True,
"privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.",
}
return Evidence(
source=evidence.source,
reason=f"Google face crop web evidence: {evidence.reason}",
confidence=evidence.confidence,
data=data,
)
def _evidence_payload(submission_id: str, evidence: Evidence) -> dict[str, Any]:
source = _ui_source(evidence.source)
result_url = str(evidence.data.get("url", evidence.data.get("result_url", "")))
image_url = str(evidence.data.get("image_url", ""))
page_image_urls = _unique_texts(_text_list(evidence.data.get("page_image_urls", [])))
thumbnail_url = str(evidence.data.get("thumbnail_url", ""))
if not thumbnail_url and not image_url and page_image_urls:
thumbnail_url = page_image_urls[0]
page_title = str(evidence.data.get("page_title", evidence.data.get("title", "")))
face_crop_search = bool(evidence.data.get("face_crop_search", False))
knowledge_entry_status = str(evidence.data.get("knowledge_entry_status", ""))
return {
"id": _evidence_id(submission_id, evidence),
"group": "watchlist" if knowledge_entry_status == "watchlist" else "face_web" if face_crop_search else _ui_group(evidence.source),
"source": source,
"title": evidence.reason,
"confidence": evidence.confidence,
"query": str(evidence.data.get("query", "")),
"querySignature": str(evidence.data.get("query_signature", "")),
"queryStrategy": str(evidence.data.get("query_strategy", "")),
"querySource": str(evidence.data.get("query_source", "")),
"searchType": str(evidence.data.get("search_type", "")),
"domain": str(evidence.data.get("provider", evidence.data.get("domain", "internal"))),
"url": result_url,
"imageUrl": image_url,
"thumbnailUrl": thumbnail_url,
"pageImageUrls": page_image_urls,
"remoteImageUrl": str(evidence.data.get("remote_image_url", "")),
"sourcePageUrl": str(evidence.data.get("source_page_url", "")),
"imageCandidateSource": str(evidence.data.get("image_candidate_source", "")),
"bloggerName": str(evidence.data.get("blogger_name", "")),
"bloggerLink": str(evidence.data.get("blogger_link", "")),
"postdate": str(evidence.data.get("postdate", "")),
"pageTitle": page_title,
"matchType": str(evidence.data.get("match", "")),
"rank": evidence.data.get("rank", ""),
"providerScore": evidence.data.get("score", ""),
"faceCropSearch": face_crop_search,
"cropIndex": evidence.data.get("crop_index", ""),
"privacyNote": str(evidence.data.get("privacy_note", "")),
"knowledgeEntryId": str(evidence.data.get("knowledge_entry_id", "")),
"knowledgeEntryName": str(evidence.data.get("knowledge_name", "")),
"knowledgeEntryStatus": knowledge_entry_status,
"sourceSubmissionId": str(evidence.data.get("source_submission_id", "")),
"similarity": evidence.data.get("similarity", ""),
"retrievedAt": _now_label(),
"contributed": source not in {"llm", "failure"} and not evidence.data.get("weak_hint", False),
"sourceEvidenceIds": evidence.data.get("source_evidence_ids", []),
"status": "active",
"submission_id": submission_id,
}
def _domain_evidence_from_ui(payload: dict[str, Any]) -> Evidence:
title = str(payload.get("title", ""))
return Evidence(
source=_domain_source_from_ui_payload(payload),
reason=title,
confidence=float(payload.get("confidence", 0)),
data={
"evidence_id": payload.get("id", ""),
"query": payload.get("query", ""),
"query_signature": payload.get("querySignature", ""),
"query_strategy": payload.get("queryStrategy", ""),
"query_source": payload.get("querySource", ""),
"search_type": payload.get("searchType", ""),
"domain": payload.get("domain", ""),
"url": payload.get("url", ""),
"result_url": payload.get("url", ""),
"image_url": payload.get("imageUrl", ""),
"thumbnail_url": payload.get("thumbnailUrl", ""),
"remote_image_url": payload.get("remoteImageUrl", ""),
"source_page_url": payload.get("sourcePageUrl", ""),
"image_candidate_source": payload.get("imageCandidateSource", ""),
"blogger_name": payload.get("bloggerName", ""),
"blogger_link": payload.get("bloggerLink", ""),
"postdate": payload.get("postdate", ""),
"page_title": payload.get("pageTitle", ""),
"match": payload.get("matchType", ""),
"rank": payload.get("rank", ""),
"score": payload.get("providerScore", ""),
"contributed": payload.get("contributed", True),
"status": payload.get("status", ""),
"weak_hint": bool(payload.get("faceCropSearch", False)) or _is_google_weak_label_payload(payload),
"face_crop_search": bool(payload.get("faceCropSearch", False)),
"crop_index": payload.get("cropIndex", ""),
"privacy_note": payload.get("privacyNote", ""),
"operator_status": payload.get("operatorStatus", ""),
"knowledge_entry_id": payload.get("knowledgeEntryId", ""),
"knowledge_name": payload.get("knowledgeEntryName", ""),
"knowledge_entry_status": payload.get("knowledgeEntryStatus", ""),
"source_submission_id": payload.get("sourceSubmissionId", ""),
"similarity": payload.get("similarity", ""),
},
)
def _domain_source_from_ui_payload(payload: dict[str, Any]) -> EvidenceSource:
source = str(payload.get("source", ""))
title = str(payload.get("title", "")).lower()
if source == "failure" and (
"disabled" in title or "skipped" in title or "usage limit" in title
):
return EvidenceSource.EXTERNAL_SKIPPED
return _domain_source_from_ui(source)
def _domain_source_from_ui(source: str) -> EvidenceSource:
if source == "google":
return EvidenceSource.WEB_DETECTION
if source == "naver":
return EvidenceSource.NAVER_SEARCH
if source == "face":
return EvidenceSource.FACE_PERSON
if source == "failure":
return EvidenceSource.FAILURE
if source == "llm":
return EvidenceSource.LLM_SUMMARY
return EvidenceSource.FINGERPRINT
def _is_google_weak_label_payload(payload: dict[str, Any]) -> bool:
title = str(payload.get("title", ""))
return (
payload.get("source") == "google"
and not payload.get("url")
and (title.startswith("Best guess label ") or title.startswith("Google weak label "))
)
def _google_weak_label_title(title: str) -> str:
if title.startswith("Best guess label "):
return "Google weak label " + title.removeprefix("Best guess label ")
return title
def _evidence_id(submission_id: str, evidence: Evidence) -> str:
base = f"{submission_id}:{evidence.source}:{evidence.reason}:{json.dumps(evidence.data, sort_keys=True, default=str)}"
return "ev-" + hashlib.sha256(base.encode("utf-8")).hexdigest()[:24]
def _ui_source(source: EvidenceSource) -> str:
if source == EvidenceSource.WEB_DETECTION:
return "google"
if source == EvidenceSource.NAVER_SEARCH:
return "naver"
if source == EvidenceSource.LLM_SUMMARY:
return "llm"
if source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED, EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}:
return "failure"
if source == EvidenceSource.FACE_PERSON:
return "face"
return "fingerprint"
def _ui_group(source: EvidenceSource) -> str:
ui_source = _ui_source(source)
if ui_source in {"fingerprint", "face"}:
return "internal"
return ui_source
def _now_label() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _label_to_epoch(value: str) -> int:
# Parse the timestamp label into a Unix epoch for chronological sorting in
# the operator GUI. Falls back to "now" when the label is missing or in an
# unrecognized format (mirroring the submittedAt `or _now_label()` fallback).
text = str(value).strip()
if not text:
return int(datetime.now().timestamp())
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return int(datetime.strptime(text, fmt).timestamp())
except ValueError:
continue
try:
return int(datetime.fromisoformat(text).timestamp())
except ValueError:
return int(datetime.now().timestamp())
def _timestamp_id() -> str:
return datetime.now().strftime("%Y%m%d%H%M%S%f")

View file

@ -0,0 +1,424 @@
"""Payload (de)serialization and small domain-mapping helpers for the store.
Extracted from sqlite_store.py: submission/evidence payload builders, provider-
state derivation, UI<->domain evidence mapping, weak-label handling, and id/label
helpers. Pure functions over dict/domain payloads; no dependency on the store
class or its module-level constants. Behavior unchanged.
"""
from __future__ import annotations
import hashlib
import json
import mimetypes
import re
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from rights_filter.domain.records import Evidence, EvidenceSource, KnowledgeProvenance
from rights_filter.server.image_store import SUPPORTED_IMAGE_SUFFIXES
from rights_filter.server.store_text import _text_list, _unique_texts
def _knowledge_provenance(value: str) -> KnowledgeProvenance:
if value == "automatic":
return KnowledgeProvenance.AUTOMATIC_REJECTION
try:
return KnowledgeProvenance(value)
except ValueError:
return KnowledgeProvenance.EXTERNAL_EVIDENCE
def _safe_image_suffix(filename: str, content_type: str) -> str:
suffix = Path(filename).suffix.lower()
if not suffix and content_type:
suffix = mimetypes.guess_extension(content_type.split(";", 1)[0].strip()) or ""
if suffix == ".jpe":
suffix = ".jpg"
if suffix not in SUPPORTED_IMAGE_SUFFIXES:
raise ValueError("unsupported knowledge image type")
return suffix
def _safe_filename(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip(".-")
def _image_size_from_bytes(content: bytes) -> tuple[int, int]:
try:
from PIL import Image
with Image.open(BytesIO(content)) as image:
return int(image.width), int(image.height)
except Exception:
return 1, 1
def _stable_id(prefix: str, *parts: str) -> str:
digest = hashlib.sha1("\x1f".join(parts).encode("utf-8")).hexdigest()[:20]
return f"{prefix}-{digest}"
def _strip_html(value: str) -> str:
without_tags = re.sub(r"<[^>]+>", "", value)
return " ".join(without_tags.split())
def _image_suffix_from_url(url: str) -> str:
suffix = Path(urlparse(url).path).suffix.lower()
if suffix == ".jpe":
suffix = ".jpg"
if suffix not in SUPPORTED_IMAGE_SUFFIXES:
return ".jpg"
return suffix
def _submission_payload(
record: dict[str, Any],
score: int,
band: str,
reasons: list[str],
provider_payloads: dict[str, dict[str, Any]] | None = None,
) -> dict[str, Any]:
submission_id = record["id"]
title = record.get("title", submission_id)
submitted_at = record.get("submitted_at") or _now_label()
submitted_epoch = _label_to_epoch(submitted_at)
provider_payloads = provider_payloads or {}
provider_state = {"internal": "ok"}
for provider_id in provider_payloads:
if provider_id == "internal":
continue
provider_state[provider_id] = _external_provider_state(provider_payloads, provider_id)
return {
"id": submission_id,
"title": title,
"asset": record["asset"],
"riskScore": score,
"riskBand": band,
"submittedAt": submitted_at,
"submittedEpoch": submitted_epoch,
"lastAnalysis": _now_label(),
"applicantStatus": "검토 중",
"decisionStatus": "unreviewed",
"applicantVisible": False,
"reasons": reasons or ["분석 근거 없음"],
"providerState": provider_state,
"fileFacts": {
"size": f"{record.get('width', 1)} x {record.get('height', 1)}",
"format": record.get("format", "FILE"),
"submitted": submitted_at,
"analysis": "v1",
},
"derivativeNote": "로컬 이미지 저장소에서 읽은 내부 검토용 이미지입니다.",
"recommendation": {
"label": "운영자 검토 필요",
"detail": "자동 분석은 참고 정보이며 최종 결정은 운영자가 기록합니다.",
},
"derivedPreview": {
"automatic": False,
"entryName": f"{title} / {submission_id}",
"effect": "반려 시에만 지식 DB 후보로 기록됩니다.",
},
"queryHistory": [],
"similar": [{"asset": record["asset"], "label": "local submission"}],
"evidence": [],
}
def _external_provider_state(
provider_payloads: dict[str, dict[str, Any]],
provider_id: str,
) -> str:
provider = provider_payloads.get(provider_id, {})
return "pending" if provider.get("enabled") else "disabled"
def _external_provider_state_for_submission(
provider_payloads: dict[str, dict[str, Any]],
provider_id: str,
submission: dict[str, Any],
evidence: list[dict[str, Any]],
) -> str:
provider = provider_payloads.get(provider_id, {})
if not provider.get("enabled"):
return "disabled"
matching_evidence = [
item for item in evidence if _evidence_matches_provider(item, provider_id)
]
matching_history = [
item
for item in submission.get("queryHistory", []) or []
if _history_matches_provider(item, provider_id)
]
if any(_provider_item_failed(item) for item in matching_evidence) or any(
str(item.get("status", "")) == "failed" for item in matching_history
):
return "failed"
if any(_provider_item_has_result(item) for item in matching_evidence) or any(
int(item.get("count", 0) or 0) > 0 and str(item.get("status", "")) in {"auto", "manual"}
for item in matching_history
):
return "covered"
if matching_evidence or any(str(item.get("status", "")) in {"auto", "manual"} for item in matching_history):
return "empty"
return "not_run"
def _evidence_matches_provider(evidence: dict[str, Any], provider_id: str) -> bool:
source = str(evidence.get("source", ""))
domain = str(evidence.get("domain", ""))
if provider_id == "naver":
return source == "naver" or domain in {"naver", "naver_blog", "naver_web"}
if provider_id == "google":
return source == "google" and domain != "google_custom_search"
if provider_id == "google_search":
return domain == "google_custom_search"
if provider_id == "llm":
return source == "llm" or (source == "failure" and "LLM" in str(evidence.get("title", "")))
return domain == provider_id
def _history_matches_provider(history: dict[str, Any], provider_id: str) -> bool:
provider = str(history.get("provider", ""))
if provider_id == "naver":
return provider in {"naver", "naver_blog", "naver_web"}
return provider == provider_id
def _provider_item_failed(evidence: dict[str, Any]) -> bool:
title = str(evidence.get("title", "")).lower()
if "returned no results" in title:
return False
if str(evidence.get("source", "")) == "failure":
return True
return "failed" in title
def _provider_item_has_result(evidence: dict[str, Any]) -> bool:
title = str(evidence.get("title", "")).lower()
if "returned no results" in title:
return False
if str(evidence.get("source", "")) == "failure":
return False
if str(evidence.get("matchType", "")) == "weak_label":
return False
return True
def _external_provider_ids(provider_payloads: dict[str, dict[str, Any]]) -> list[str]:
return [provider_id for provider_id in provider_payloads if provider_id != "internal"]
def _face_crop_web_evidence(
submission_id: str,
crop_index: int,
evidence: Evidence,
) -> Evidence:
data = {
**evidence.data,
"submission_id": submission_id,
"face_crop_search": True,
"crop_index": crop_index,
"weak_hint": True,
"privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.",
}
return Evidence(
source=evidence.source,
reason=f"Google face crop web evidence: {evidence.reason}",
confidence=evidence.confidence,
data=data,
)
def _evidence_payload(submission_id: str, evidence: Evidence) -> dict[str, Any]:
source = _ui_source(evidence.source)
result_url = str(evidence.data.get("url", evidence.data.get("result_url", "")))
image_url = str(evidence.data.get("image_url", ""))
page_image_urls = _unique_texts(_text_list(evidence.data.get("page_image_urls", [])))
thumbnail_url = str(evidence.data.get("thumbnail_url", ""))
if not thumbnail_url and not image_url and page_image_urls:
thumbnail_url = page_image_urls[0]
page_title = str(evidence.data.get("page_title", evidence.data.get("title", "")))
face_crop_search = bool(evidence.data.get("face_crop_search", False))
knowledge_entry_status = str(evidence.data.get("knowledge_entry_status", ""))
return {
"id": _evidence_id(submission_id, evidence),
"group": "watchlist" if knowledge_entry_status == "watchlist" else "face_web" if face_crop_search else _ui_group(evidence.source),
"source": source,
"title": evidence.reason,
"confidence": evidence.confidence,
"query": str(evidence.data.get("query", "")),
"querySignature": str(evidence.data.get("query_signature", "")),
"queryStrategy": str(evidence.data.get("query_strategy", "")),
"querySource": str(evidence.data.get("query_source", "")),
"searchType": str(evidence.data.get("search_type", "")),
"domain": str(evidence.data.get("provider", evidence.data.get("domain", "internal"))),
"url": result_url,
"imageUrl": image_url,
"thumbnailUrl": thumbnail_url,
"pageImageUrls": page_image_urls,
"remoteImageUrl": str(evidence.data.get("remote_image_url", "")),
"sourcePageUrl": str(evidence.data.get("source_page_url", "")),
"imageCandidateSource": str(evidence.data.get("image_candidate_source", "")),
"bloggerName": str(evidence.data.get("blogger_name", "")),
"bloggerLink": str(evidence.data.get("blogger_link", "")),
"postdate": str(evidence.data.get("postdate", "")),
"pageTitle": page_title,
"matchType": str(evidence.data.get("match", "")),
"rank": evidence.data.get("rank", ""),
"providerScore": evidence.data.get("score", ""),
"faceCropSearch": face_crop_search,
"cropIndex": evidence.data.get("crop_index", ""),
"privacyNote": str(evidence.data.get("privacy_note", "")),
"knowledgeEntryId": str(evidence.data.get("knowledge_entry_id", "")),
"knowledgeEntryName": str(evidence.data.get("knowledge_name", "")),
"knowledgeEntryStatus": knowledge_entry_status,
"sourceSubmissionId": str(evidence.data.get("source_submission_id", "")),
"similarity": evidence.data.get("similarity", ""),
"retrievedAt": _now_label(),
"contributed": source not in {"llm", "failure"} and not evidence.data.get("weak_hint", False),
"sourceEvidenceIds": evidence.data.get("source_evidence_ids", []),
"status": "active",
"submission_id": submission_id,
}
def _domain_evidence_from_ui(payload: dict[str, Any]) -> Evidence:
title = str(payload.get("title", ""))
return Evidence(
source=_domain_source_from_ui_payload(payload),
reason=title,
confidence=float(payload.get("confidence", 0)),
data={
"evidence_id": payload.get("id", ""),
"query": payload.get("query", ""),
"query_signature": payload.get("querySignature", ""),
"query_strategy": payload.get("queryStrategy", ""),
"query_source": payload.get("querySource", ""),
"search_type": payload.get("searchType", ""),
"domain": payload.get("domain", ""),
"url": payload.get("url", ""),
"result_url": payload.get("url", ""),
"image_url": payload.get("imageUrl", ""),
"thumbnail_url": payload.get("thumbnailUrl", ""),
"remote_image_url": payload.get("remoteImageUrl", ""),
"source_page_url": payload.get("sourcePageUrl", ""),
"image_candidate_source": payload.get("imageCandidateSource", ""),
"blogger_name": payload.get("bloggerName", ""),
"blogger_link": payload.get("bloggerLink", ""),
"postdate": payload.get("postdate", ""),
"page_title": payload.get("pageTitle", ""),
"match": payload.get("matchType", ""),
"rank": payload.get("rank", ""),
"score": payload.get("providerScore", ""),
"contributed": payload.get("contributed", True),
"status": payload.get("status", ""),
"weak_hint": bool(payload.get("faceCropSearch", False)) or _is_google_weak_label_payload(payload),
"face_crop_search": bool(payload.get("faceCropSearch", False)),
"crop_index": payload.get("cropIndex", ""),
"privacy_note": payload.get("privacyNote", ""),
"operator_status": payload.get("operatorStatus", ""),
"knowledge_entry_id": payload.get("knowledgeEntryId", ""),
"knowledge_name": payload.get("knowledgeEntryName", ""),
"knowledge_entry_status": payload.get("knowledgeEntryStatus", ""),
"source_submission_id": payload.get("sourceSubmissionId", ""),
"similarity": payload.get("similarity", ""),
},
)
def _domain_source_from_ui_payload(payload: dict[str, Any]) -> EvidenceSource:
source = str(payload.get("source", ""))
title = str(payload.get("title", "")).lower()
if source == "failure" and (
"disabled" in title or "skipped" in title or "usage limit" in title
):
return EvidenceSource.EXTERNAL_SKIPPED
return _domain_source_from_ui(source)
def _domain_source_from_ui(source: str) -> EvidenceSource:
if source == "google":
return EvidenceSource.WEB_DETECTION
if source == "naver":
return EvidenceSource.NAVER_SEARCH
if source == "face":
return EvidenceSource.FACE_PERSON
if source == "failure":
return EvidenceSource.FAILURE
if source == "llm":
return EvidenceSource.LLM_SUMMARY
return EvidenceSource.FINGERPRINT
def _is_google_weak_label_payload(payload: dict[str, Any]) -> bool:
title = str(payload.get("title", ""))
return (
payload.get("source") == "google"
and not payload.get("url")
and (title.startswith("Best guess label ") or title.startswith("Google weak label "))
)
def _google_weak_label_title(title: str) -> str:
if title.startswith("Best guess label "):
return "Google weak label " + title.removeprefix("Best guess label ")
return title
def _evidence_id(submission_id: str, evidence: Evidence) -> str:
base = f"{submission_id}:{evidence.source}:{evidence.reason}:{json.dumps(evidence.data, sort_keys=True, default=str)}"
return "ev-" + hashlib.sha256(base.encode("utf-8")).hexdigest()[:24]
def _ui_source(source: EvidenceSource) -> str:
if source == EvidenceSource.WEB_DETECTION:
return "google"
if source == EvidenceSource.NAVER_SEARCH:
return "naver"
if source == EvidenceSource.LLM_SUMMARY:
return "llm"
if source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED, EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}:
return "failure"
if source == EvidenceSource.FACE_PERSON:
return "face"
return "fingerprint"
def _ui_group(source: EvidenceSource) -> str:
ui_source = _ui_source(source)
if ui_source in {"fingerprint", "face"}:
return "internal"
return ui_source
def _now_label() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _label_to_epoch(value: str) -> int:
# Parse the timestamp label into a Unix epoch for chronological sorting in
# the operator GUI. Falls back to "now" when the label is missing or in an
# unrecognized format (mirroring the submittedAt `or _now_label()` fallback).
text = str(value).strip()
if not text:
return int(datetime.now().timestamp())
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return int(datetime.strptime(text, fmt).timestamp())
except ValueError:
continue
try:
return int(datetime.fromisoformat(text).timestamp())
except ValueError:
return int(datetime.now().timestamp())
def _timestamp_id() -> str:
return datetime.now().strftime("%Y%m%d%H%M%S%f")