diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 308bf17..18b4091 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -1,10 +1,8 @@ from __future__ import annotations -import base64 import hashlib import json import os -import re import shutil import sqlite3 import threading @@ -52,15 +50,8 @@ from rights_filter.server.store_remote_fetch import ( _fetch_stylesheet_url_bytes, _fetch_url_bytes, ) -from rights_filter.server.store_page_scrape import ( - _content_has_comparable_image_fingerprint, - _extract_css_image_urls, - _extract_page_image_urls, - _extract_page_stylesheet_urls, - _normalized_remote_image_url, - _search_result_direct_image_urls, -) from rights_filter.server.store_persistence import StorePersistenceMixin +from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_schema import ( _ensure_constrained_schema, _ensure_queue_schema, @@ -81,11 +72,7 @@ from rights_filter.server.store_serialization import ( _google_custom_image_query_signature, _google_custom_web_query_signature, _google_weak_label_title, - _image_size_from_bytes, - _image_suffix_from_url, _is_google_weak_label_payload, - _knowledge_entry_type, - _knowledge_provenance, _knowledge_type_value, _naver_blog_query_signature, _naver_query_signature, @@ -94,28 +81,16 @@ from rights_filter.server.store_serialization import ( _provider_item_failed, _provider_item_has_result, _query_history_status, - _safe_filename, - _safe_image_suffix, _stable_id, - _strip_html, _submission_payload, _submission_search_hint_evidence, _timestamp_id, - _validate_payload, - _validate_table, _watchlist_source_evidence, ) from rights_filter.server.store_text import _text_list, _unique_texts -from rights_filter.server.store_url_utils import ( - _decoded_nested_url, - _is_http_url, - _url_has_image_format_hint, - _url_looks_like_image, - _url_path_has_image_suffix, -) -class CopyrighterStore(StorePersistenceMixin): +class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin): def __init__( self, db_path: Path | str, @@ -2379,694 +2354,3 @@ class CopyrighterStore(StorePersistenceMixin): f"auto text query batch: {', '.join(item['query'] for item in history_entries)}", ) - def _knowledge_repository(self) -> InMemoryRightsFilterRepository: - repository = InMemoryRightsFilterRepository() - for payload in self._all("knowledge_entries"): - if not payload.get("active", True): - continue - if payload.get("entryStatus") == "excluded": - continue - sample_fingerprints = _text_list( - payload.get("sampleFingerprints", payload.get("sample_fingerprints", [])) - ) - if not sample_fingerprints: - continue - repository.save_knowledge_entry( - KnowledgeBaseEntry( - id=str(payload.get("id", "")), - entry_type=_knowledge_entry_type(str(payload.get("type", "other"))), - name=str(payload.get("name", "")), - provenance=_knowledge_provenance(str(payload.get("provenance", "manual"))), - aliases=_text_list(payload.get("aliases")), - related_keywords=_text_list(payload.get("keywords")), - policy_memo=str(payload.get("memo", "")), - sample_fingerprints=sample_fingerprints, - source_decision_id=str(payload.get("sourceDecision", "")) or None, - entry_status=str(payload.get("entryStatus", "confirmed")), - source_submission_id=str(payload.get("sourceSubmissionId", "")), - active=bool(payload.get("active", True)), - ) - ) - return repository - - def _sync_similar_reference_images( - self, - submission_id: str, - evidence: list[Evidence], - ) -> None: - matched_entry_ids = [ - str(item.data.get("knowledge_entry_id", "")) - for item in evidence - if item.source == EvidenceSource.FINGERPRINT and item.data.get("knowledge_entry_id") - ] - if not matched_entry_ids: - return - - submission = self._get("submissions", submission_id) - similar = list(submission.get("similar", [])) - existing_assets = {str(item.get("asset", "")) for item in similar} - for entry_id in matched_entry_ids: - try: - entry = self._get("knowledge_entries", entry_id) - except KeyError: - continue - asset = str(entry.get("imageAsset", "")) - if not asset or asset in existing_assets: - continue - similar.append( - { - "asset": asset, - "label": f"{entry.get('name', entry_id)} / internal match", - } - ) - existing_assets.add(asset) - submission["similar"] = similar - self._put("submissions", submission_id, submission) - - def _sync_search_result_image_similarity( - self, - submission_id: str, - evidence: list[Evidence], - image_store: LocalSubmissionImageStore, - status: str = "active", - max_matches: int | None = None, - ) -> list[Evidence]: - submission_fingerprint = self._submission_perceptual_fingerprint( - submission_id, - image_store, - ) - if submission_fingerprint is None: - return [] - - if max_matches is None: - max_matches = self.provider_runtime.search_result_compare_limit - else: - max_matches = min( - max_matches, - self.provider_runtime.search_result_compare_limit, - ) - if max_matches <= 0: - return [] - - similarity_evidence: list[Evidence] = [] - for item in evidence: - if len(similarity_evidence) >= max_matches: - break - matches = self._search_result_image_similarity_evidence( - submission_id, - submission_fingerprint, - item, - ) - if not matches: - continue - for match in matches: - if len(similarity_evidence) >= max_matches: - break - payload = _evidence_payload(submission_id, match) - payload["status"] = status - self._put("evidence", payload["id"], payload) - similarity_evidence.append(match) - if similarity_evidence: - self._rescore_submission(submission_id) - return similarity_evidence - - def _can_compare_search_result_images( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None, - ) -> bool: - if image_store is None: - return False - return self._submission_perceptual_fingerprint(submission_id, image_store) is not None - - def _search_result_similarity_count(self, submission_id: str) -> int: - return sum( - 1 - for item in self._evidence_by_submission().get(submission_id, []) - if item.get("source") == "fingerprint" - and str(item.get("matchType") or "").startswith("search_result") - ) - - def _search_result_similarity_remaining_budget( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None, - ) -> int: - if not self._can_compare_search_result_images(submission_id, image_store): - return 0 - return max( - 0, - self.provider_runtime.search_result_compare_limit - - self._search_result_similarity_count(submission_id), - ) - - def _submission_perceptual_fingerprint( - self, - submission_id: str, - image_store: LocalSubmissionImageStore, - ) -> str | None: - try: - fingerprint = FingerprintService().fingerprints_for( - image_store.image_payload(submission_id).content - ).perceptual - except Exception: - return None - if fingerprint.startswith("phash:unavailable:"): - return None - return fingerprint - - def _search_result_image_similarity_evidence( - self, - submission_id: str, - submission_fingerprint: str, - source_evidence: Evidence, - ) -> list[Evidence]: - if source_evidence.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}: - return [] - if source_evidence.data.get("weak_hint"): - return [] - - matches: list[Evidence] = [] - - for image_url in _unique_texts( - [ - str(source_evidence.data.get("image_url", "")), - str(source_evidence.data.get("thumbnail_url", "")), - ] - ): - match = self._search_result_candidate_image_evidence( - submission_id, - submission_fingerprint, - source_evidence, - image_url, - match_type="search_result_image", - candidate_source="result_image_url", - ) - if match is not None: - return [match] - - for image_url in _search_result_direct_image_urls(source_evidence): - match = self._search_result_candidate_image_evidence( - submission_id, - submission_fingerprint, - source_evidence, - image_url, - match_type="search_result_page_image", - candidate_source="result_page_direct_image", - ) - if match is not None: - return [match] - - for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])): - match = self._search_result_candidate_image_evidence( - submission_id, - submission_fingerprint, - source_evidence, - image_url, - match_type="search_result_page_image", - candidate_source="provider_page_image", - ) - if match is not None: - return [match] - - for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence): - match = self._search_result_candidate_image_evidence( - submission_id, - submission_fingerprint, - source_evidence, - image_url, - match_type="search_result_page_image", - candidate_source=candidate_source, - ) - if match is not None: - return [match] - return matches - - def _face_crop_search_result_similarity_evidence( - self, - submission_id: str, - crop_index: int, - crop: Any, - source_evidence: Evidence, - ) -> list[Evidence]: - try: - crop_fingerprint = FingerprintService().fingerprints_for(crop.content).perceptual - except Exception: - return [] - if crop_fingerprint.startswith("phash:unavailable:"): - return [] - - matches: list[Evidence] = [] - - extra_data = { - "face_crop_search": True, - "crop_index": crop_index, - "weak_hint": True, - "privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.", - } - for image_url in _unique_texts( - [ - str(source_evidence.data.get("image_url", "")), - str(source_evidence.data.get("thumbnail_url", "")), - ] - ): - match = self._search_result_candidate_image_evidence( - submission_id, - crop_fingerprint, - source_evidence, - image_url, - match_type="face_crop_search_result_image", - candidate_source="face_crop_result_image_url", - extra_data=extra_data, - ) - if match is not None: - return [match] - - for image_url in _search_result_direct_image_urls(source_evidence): - match = self._search_result_candidate_image_evidence( - submission_id, - crop_fingerprint, - source_evidence, - image_url, - match_type="face_crop_search_result_page_image", - candidate_source="face_crop_result_page_direct_image", - extra_data=extra_data, - ) - if match is not None: - return [match] - - for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])): - match = self._search_result_candidate_image_evidence( - submission_id, - crop_fingerprint, - source_evidence, - image_url, - match_type="face_crop_search_result_page_image", - candidate_source="face_crop_provider_page_image", - extra_data=extra_data, - ) - if match is not None: - return [match] - - for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence): - match = self._search_result_candidate_image_evidence( - submission_id, - crop_fingerprint, - source_evidence, - image_url, - match_type="face_crop_search_result_page_image", - candidate_source=f"face_crop_{candidate_source}", - extra_data=extra_data, - ) - if match is not None: - return [match] - return matches - - def _search_result_candidate_image_evidence( - self, - submission_id: str, - submission_fingerprint: str, - source_evidence: Evidence, - image_url: str, - match_type: str, - candidate_source: str, - extra_data: dict[str, Any] | None = None, - ) -> Evidence | None: - image_url = _normalized_remote_image_url(image_url) - result_url = str( - source_evidence.data.get("result_url", source_evidence.data.get("url", "")) - or image_url - ) - image_id = _stable_id( - "searchimg", - submission_id, - str(source_evidence.source), - match_type, - image_url, - str(source_evidence.data.get("query", "")), - ) - image_record = self._store_candidate_image(image_id, image_url, referer_url=result_url) - if not image_record: - return None - - similarity = FingerprintService().similarity( - submission_fingerprint, - str(image_record["perceptualFingerprint"]), - ) - if similarity < self.provider_runtime.search_result_similarity_threshold: - return None - return Evidence( - source=EvidenceSource.FINGERPRINT, - reason=f"Search result image similarity {similarity:.2f}", - confidence=similarity, - data={ - "submission_id": submission_id, - "provider": source_evidence.data.get("provider", ""), - "query": source_evidence.data.get("query", ""), - "query_signature": source_evidence.data.get("query_signature", ""), - "query_strategy": source_evidence.data.get("query_strategy", ""), - "query_source": source_evidence.data.get("query_source", ""), - "url": result_url, - "result_url": result_url, - "image_url": image_record["asset"], - "thumbnail_url": image_record["asset"], - "remote_image_url": image_url, - "source_page_url": result_url, - "image_candidate_source": candidate_source, - "page_title": source_evidence.data.get("page_title", source_evidence.data.get("title", "")), - "match": match_type, - "similarity": similarity, - "source_evidence_ids": [_evidence_id(submission_id, source_evidence)], - "contributed": True, - **(extra_data or {}), - }, - ) - - def _search_result_page_image_candidates(self, source_evidence: Evidence) -> list[tuple[str, str]]: - page_url = str( - source_evidence.data.get("result_url", source_evidence.data.get("url", "")) - ) - limit = getattr(self.provider_runtime, "search_result_page_image_limit", 3) - if not page_url or limit <= 0 or not _is_http_url(page_url): - return [] - if _url_looks_like_image(page_url): - return [] - try: - content = self.page_fetcher(page_url) - except Exception: - return [] - if _content_has_comparable_image_fingerprint(content): - return [(page_url, "result_page_direct_image")] - image_urls = _extract_page_image_urls(content, page_url, limit) - if len(image_urls) < limit: - image_urls.extend( - self._search_result_stylesheet_image_urls( - content, - page_url, - limit - len(image_urls), - ) - ) - return [(image_url, "html_page_image") for image_url in _unique_texts(image_urls)[:limit]] - - def _search_result_stylesheet_image_urls( - self, - page_content: bytes, - page_url: str, - limit: int, - ) -> list[str]: - if limit <= 0: - return [] - image_urls: list[str] = [] - for stylesheet_url in _extract_page_stylesheet_urls(page_content, page_url, limit): - try: - stylesheet_content = self.stylesheet_fetcher(stylesheet_url) - except Exception: - continue - for image_url in _extract_css_image_urls(stylesheet_content, stylesheet_url, limit - len(image_urls)): - image_urls.append(image_url) - if len(image_urls) >= limit: - return image_urls - return image_urls - - def _search_result_page_image_urls(self, source_evidence: Evidence) -> list[str]: - return [ - image_url - for image_url, _candidate_source in self._search_result_page_image_candidates(source_evidence) - ] - - def _increment_knowledge_contribution_counts( - self, - submission_id: str, - evidence: list[Evidence], - ) -> None: - matched_entry_ids = _unique_texts( - str(item.data.get("knowledge_entry_id", "")) - for item in evidence - if item.source == EvidenceSource.FINGERPRINT - and item.data.get("knowledge_entry_status") == "watchlist" - and item.data.get("knowledge_entry_id") - ) - for entry_id in matched_entry_ids: - try: - entry = self._get("knowledge_entries", entry_id) - except KeyError: - continue - if entry.get("entryStatus") != "watchlist": - continue - if str(entry.get("sourceSubmissionId", "")) == submission_id: - continue - matched_submission_ids = _text_list(entry.get("matchedSubmissionIds")) - if submission_id in matched_submission_ids: - continue - matched_submission_ids.append(submission_id) - entry["matchedSubmissionIds"] = matched_submission_ids - entry["contributionCount"] = int(entry.get("contributionCount", 0) or 0) + 1 - entry["lastMatchedSubmissionId"] = submission_id - entry["lastMatchedAt"] = _now_label() - self._put("knowledge_entries", entry_id, entry) - - def _store_manual_knowledge_image( - self, - entry_id: str, - image_payload: Any, - ) -> dict[str, Any] | None: - if not image_payload: - return None - if not isinstance(image_payload, dict): - raise ValueError("knowledge image must be an object") - - data = str(image_payload.get("data", "")) - if not data: - raise ValueError("knowledge image data required") - if "," in data and data.split(",", 1)[0].startswith("data:"): - data = data.split(",", 1)[1] - try: - content = base64.b64decode(data, validate=True) - except Exception as exc: - raise ValueError("knowledge image data must be base64") from exc - if not content: - raise ValueError("knowledge image is empty") - - filename = str(image_payload.get("filename", "reference")).strip() or "reference" - suffix = _safe_image_suffix(filename, str(image_payload.get("content_type", ""))) - safe_stem = _safe_filename(Path(filename).stem) or "reference" - target_name = f"{entry_id}-{safe_stem}{suffix}" - self.knowledge_image_dir.mkdir(parents=True, exist_ok=True) - root = self.knowledge_image_dir.resolve() - target = (root / target_name).resolve() - if target != root and root not in target.parents: - raise ValueError("knowledge image path points outside image store") - target.write_bytes(content) - - width, height = _image_size_from_bytes(content) - fingerprints = FingerprintService().fingerprints_for(content) - return { - "asset": f"{self.knowledge_public_prefix}/{target_name}", - "perceptualFingerprint": fingerprints.perceptual, - "facts": { - "filename": filename, - "format": suffix.lstrip(".").upper(), - "size": f"{width} x {height}", - "fingerprints": 1, - }, - } - - def _collection_candidates_from_evidence( - self, - query: str, - evidence: list[Evidence], - provider: str, - ) -> list[dict[str, Any]]: - candidates: list[dict[str, Any]] = [] - for item in evidence: - if item.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}: - continue - if item.data.get("image_url"): - candidate = self._candidate_payload_from_evidence( - query, - item, - provider, - source_candidate_type="search_result_image", - ) - if candidate is not None: - candidates.append(candidate) - continue - candidate_count = len(candidates) - for image_url in _unique_texts(item.data.get("page_image_urls", [])): - candidate = self._candidate_payload_from_evidence( - query, - item, - provider, - image_url=image_url, - thumbnail_url=image_url, - source_candidate_type="provider_page_image", - ) - if candidate is not None: - candidates.append(candidate) - if len(candidates) > candidate_count: - continue - for image_url in _search_result_direct_image_urls(item): - candidate = self._candidate_payload_from_evidence( - query, - item, - provider, - image_url=image_url, - thumbnail_url=image_url, - source_candidate_type="result_page_direct_image", - ) - if candidate is not None: - candidates.append(candidate) - if len(candidates) > candidate_count: - continue - for image_url, source_candidate_type in self._search_result_page_image_candidates(item): - candidate = self._candidate_payload_from_evidence( - query, - item, - provider, - image_url=image_url, - thumbnail_url=image_url, - source_candidate_type=source_candidate_type, - ) - if candidate is not None: - candidates.append(candidate) - break - return candidates - - def _candidate_payload_from_evidence( - self, - query: str, - evidence: Evidence, - provider: str = "naver", - image_url: str | None = None, - thumbnail_url: str | None = None, - source_candidate_type: str = "search_result_image", - ) -> dict[str, Any] | None: - image_url = _normalized_remote_image_url( - str(image_url if image_url is not None else evidence.data.get("image_url", "")) - ) - thumbnail_url = _normalized_remote_image_url( - str(thumbnail_url if thumbnail_url is not None else evidence.data.get("thumbnail_url", "")) - ) - result_url = str(evidence.data.get("result_url", "")) - candidate_id = _stable_id("cand", provider, source_candidate_type, query, image_url, thumbnail_url, result_url) - image_record = None - stored_image_url = "" - for candidate_url in _unique_texts([image_url, thumbnail_url]): - image_record = self._store_candidate_image( - candidate_id, - candidate_url, - referer_url=result_url, - ) - if image_record is not None: - stored_image_url = candidate_url - break - if image_record is None: - return None - display_image_url = stored_image_url or image_url - return { - "id": candidate_id, - "provider": provider, - "query": query, - "title": _strip_html(str(evidence.data.get("title", ""))), - "status": "candidate", - "rank": evidence.data.get("rank", ""), - "imageUrl": display_image_url, - "thumbnailUrl": thumbnail_url, - "resultUrl": result_url, - "sourceUrl": result_url or display_image_url, - "sourceCandidateType": source_candidate_type, - "imageAsset": image_record["asset"], - "sampleFingerprints": [image_record["perceptualFingerprint"]], - "imageFacts": image_record["facts"], - "collectedAt": _now_label(), - "collectedEpoch": int(datetime.now().timestamp()), - "promotedKnowledgeId": "", - } - - def _store_candidate_image( - self, - candidate_id: str, - url: str, - referer_url: str = "", - ) -> dict[str, Any] | None: - if not url: - return None - suffix = _image_suffix_from_url(url) - target_name = f"{candidate_id}{suffix}" - root = self.collection_image_dir.resolve() - target = (root / target_name).resolve() - if target != root and root not in target.parents: - raise ValueError("candidate image path points outside image store") - if target.exists() and target.is_file(): - try: - record = self._candidate_image_record_from_content( - target_name, - url, - suffix, - target.read_bytes(), - ) - except Exception: - record = None - if record is not None: - return record - try: - content = self._fetch_candidate_image_content(url, referer_url) - except Exception: - return None - image_record = self._candidate_image_record_from_content( - target_name, - url, - suffix, - content, - ) - if image_record is None: - return None - self.collection_image_dir.mkdir(parents=True, exist_ok=True) - target.write_bytes(content) - return image_record - - def _candidate_image_record_from_content( - self, - target_name: str, - url: str, - suffix: str, - content: bytes, - ) -> dict[str, Any] | None: - if not content: - return None - width, height = _image_size_from_bytes(content) - fingerprints = FingerprintService().fingerprints_for(content) - if fingerprints.perceptual.startswith("phash:unavailable:"): - return None - return { - "asset": f"{self.collection_public_prefix}/{target_name}", - "perceptualFingerprint": fingerprints.perceptual, - "facts": { - "source": url, - "format": suffix.lstrip(".").upper(), - "size": f"{width} x {height}", - "fingerprints": 1, - }, - } - - def _fetch_candidate_image_content(self, url: str, referer_url: str = "") -> bytes: - if self._custom_candidate_image_fetcher is not None: - return self._custom_candidate_image_fetcher(url) - return _fetch_url_bytes(url, referer_url=referer_url) - - def _rescore_submission(self, submission_id: str) -> None: - submission = self._get("submissions", submission_id) - evidence = [ - _domain_evidence_from_ui(item) - for item in self._evidence_for_submission(submission_id) - ] - score = RiskScorer().score(evidence) - submission["riskScore"] = score.score - submission["riskBand"] = score.band - submission["reasons"] = score.reasons or ["분석 근거 없음"] - self._put("submissions", submission_id, submission) - - def _rescore_all_submissions(self, queue_id: str | None = None) -> None: - for submission in self._all("submissions", queue_id=queue_id): - self._rescore_submission(str(submission["id"])) - diff --git a/src/rights_filter/server/store_search_candidates.py b/src/rights_filter/server/store_search_candidates.py new file mode 100644 index 0000000..a4a85f8 --- /dev/null +++ b/src/rights_filter/server/store_search_candidates.py @@ -0,0 +1,743 @@ +"""Search-result image similarity, candidate-image storage, the in-memory +knowledge repository, and rescoring — as a mixin for CopyrighterStore. + +Mixed into CopyrighterStore; relies on persistence methods (self._put/_get/...), +self.* attributes, and the extracted helper modules. Behavior unchanged. +""" + +from __future__ import annotations + +import base64 +import re +from datetime import datetime +from pathlib import Path +from typing import Any + +from rights_filter.analysis.fingerprints import FingerprintService +from rights_filter.analysis.risk_scoring import RiskScorer +from rights_filter.domain.records import ( + Evidence, + EvidenceSource, + InMemoryRightsFilterRepository, + KnowledgeBaseEntry, +) +from rights_filter.server.image_store import LocalSubmissionImageStore +from rights_filter.server.store_page_scrape import ( + _content_has_comparable_image_fingerprint, + _extract_css_image_urls, + _extract_page_image_urls, + _extract_page_stylesheet_urls, + _normalized_remote_image_url, + _search_result_direct_image_urls, +) +from rights_filter.server.store_remote_fetch import _fetch_url_bytes +from rights_filter.server.store_serialization import ( + _domain_evidence_from_ui, + _evidence_id, + _evidence_payload, + _image_size_from_bytes, + _image_suffix_from_url, + _knowledge_entry_type, + _knowledge_provenance, + _now_label, + _safe_filename, + _safe_image_suffix, + _stable_id, + _strip_html, +) +from rights_filter.server.store_text import _text_list, _unique_texts +from rights_filter.server.store_url_utils import _is_http_url, _url_looks_like_image + + +class StoreSearchCandidatesMixin: + def _knowledge_repository(self) -> InMemoryRightsFilterRepository: + repository = InMemoryRightsFilterRepository() + for payload in self._all("knowledge_entries"): + if not payload.get("active", True): + continue + if payload.get("entryStatus") == "excluded": + continue + sample_fingerprints = _text_list( + payload.get("sampleFingerprints", payload.get("sample_fingerprints", [])) + ) + if not sample_fingerprints: + continue + repository.save_knowledge_entry( + KnowledgeBaseEntry( + id=str(payload.get("id", "")), + entry_type=_knowledge_entry_type(str(payload.get("type", "other"))), + name=str(payload.get("name", "")), + provenance=_knowledge_provenance(str(payload.get("provenance", "manual"))), + aliases=_text_list(payload.get("aliases")), + related_keywords=_text_list(payload.get("keywords")), + policy_memo=str(payload.get("memo", "")), + sample_fingerprints=sample_fingerprints, + source_decision_id=str(payload.get("sourceDecision", "")) or None, + entry_status=str(payload.get("entryStatus", "confirmed")), + source_submission_id=str(payload.get("sourceSubmissionId", "")), + active=bool(payload.get("active", True)), + ) + ) + return repository + + def _sync_similar_reference_images( + self, + submission_id: str, + evidence: list[Evidence], + ) -> None: + matched_entry_ids = [ + str(item.data.get("knowledge_entry_id", "")) + for item in evidence + if item.source == EvidenceSource.FINGERPRINT and item.data.get("knowledge_entry_id") + ] + if not matched_entry_ids: + return + + submission = self._get("submissions", submission_id) + similar = list(submission.get("similar", [])) + existing_assets = {str(item.get("asset", "")) for item in similar} + for entry_id in matched_entry_ids: + try: + entry = self._get("knowledge_entries", entry_id) + except KeyError: + continue + asset = str(entry.get("imageAsset", "")) + if not asset or asset in existing_assets: + continue + similar.append( + { + "asset": asset, + "label": f"{entry.get('name', entry_id)} / internal match", + } + ) + existing_assets.add(asset) + submission["similar"] = similar + self._put("submissions", submission_id, submission) + + def _sync_search_result_image_similarity( + self, + submission_id: str, + evidence: list[Evidence], + image_store: LocalSubmissionImageStore, + status: str = "active", + max_matches: int | None = None, + ) -> list[Evidence]: + submission_fingerprint = self._submission_perceptual_fingerprint( + submission_id, + image_store, + ) + if submission_fingerprint is None: + return [] + + if max_matches is None: + max_matches = self.provider_runtime.search_result_compare_limit + else: + max_matches = min( + max_matches, + self.provider_runtime.search_result_compare_limit, + ) + if max_matches <= 0: + return [] + + similarity_evidence: list[Evidence] = [] + for item in evidence: + if len(similarity_evidence) >= max_matches: + break + matches = self._search_result_image_similarity_evidence( + submission_id, + submission_fingerprint, + item, + ) + if not matches: + continue + for match in matches: + if len(similarity_evidence) >= max_matches: + break + payload = _evidence_payload(submission_id, match) + payload["status"] = status + self._put("evidence", payload["id"], payload) + similarity_evidence.append(match) + if similarity_evidence: + self._rescore_submission(submission_id) + return similarity_evidence + + def _can_compare_search_result_images( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None, + ) -> bool: + if image_store is None: + return False + return self._submission_perceptual_fingerprint(submission_id, image_store) is not None + + def _search_result_similarity_count(self, submission_id: str) -> int: + return sum( + 1 + for item in self._evidence_by_submission().get(submission_id, []) + if item.get("source") == "fingerprint" + and str(item.get("matchType") or "").startswith("search_result") + ) + + def _search_result_similarity_remaining_budget( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None, + ) -> int: + if not self._can_compare_search_result_images(submission_id, image_store): + return 0 + return max( + 0, + self.provider_runtime.search_result_compare_limit + - self._search_result_similarity_count(submission_id), + ) + + def _submission_perceptual_fingerprint( + self, + submission_id: str, + image_store: LocalSubmissionImageStore, + ) -> str | None: + try: + fingerprint = FingerprintService().fingerprints_for( + image_store.image_payload(submission_id).content + ).perceptual + except Exception: + return None + if fingerprint.startswith("phash:unavailable:"): + return None + return fingerprint + + def _search_result_image_similarity_evidence( + self, + submission_id: str, + submission_fingerprint: str, + source_evidence: Evidence, + ) -> list[Evidence]: + if source_evidence.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}: + return [] + if source_evidence.data.get("weak_hint"): + return [] + + matches: list[Evidence] = [] + + for image_url in _unique_texts( + [ + str(source_evidence.data.get("image_url", "")), + str(source_evidence.data.get("thumbnail_url", "")), + ] + ): + match = self._search_result_candidate_image_evidence( + submission_id, + submission_fingerprint, + source_evidence, + image_url, + match_type="search_result_image", + candidate_source="result_image_url", + ) + if match is not None: + return [match] + + for image_url in _search_result_direct_image_urls(source_evidence): + match = self._search_result_candidate_image_evidence( + submission_id, + submission_fingerprint, + source_evidence, + image_url, + match_type="search_result_page_image", + candidate_source="result_page_direct_image", + ) + if match is not None: + return [match] + + for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])): + match = self._search_result_candidate_image_evidence( + submission_id, + submission_fingerprint, + source_evidence, + image_url, + match_type="search_result_page_image", + candidate_source="provider_page_image", + ) + if match is not None: + return [match] + + for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence): + match = self._search_result_candidate_image_evidence( + submission_id, + submission_fingerprint, + source_evidence, + image_url, + match_type="search_result_page_image", + candidate_source=candidate_source, + ) + if match is not None: + return [match] + return matches + + def _face_crop_search_result_similarity_evidence( + self, + submission_id: str, + crop_index: int, + crop: Any, + source_evidence: Evidence, + ) -> list[Evidence]: + try: + crop_fingerprint = FingerprintService().fingerprints_for(crop.content).perceptual + except Exception: + return [] + if crop_fingerprint.startswith("phash:unavailable:"): + return [] + + matches: list[Evidence] = [] + + extra_data = { + "face_crop_search": True, + "crop_index": crop_index, + "weak_hint": True, + "privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.", + } + for image_url in _unique_texts( + [ + str(source_evidence.data.get("image_url", "")), + str(source_evidence.data.get("thumbnail_url", "")), + ] + ): + match = self._search_result_candidate_image_evidence( + submission_id, + crop_fingerprint, + source_evidence, + image_url, + match_type="face_crop_search_result_image", + candidate_source="face_crop_result_image_url", + extra_data=extra_data, + ) + if match is not None: + return [match] + + for image_url in _search_result_direct_image_urls(source_evidence): + match = self._search_result_candidate_image_evidence( + submission_id, + crop_fingerprint, + source_evidence, + image_url, + match_type="face_crop_search_result_page_image", + candidate_source="face_crop_result_page_direct_image", + extra_data=extra_data, + ) + if match is not None: + return [match] + + for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])): + match = self._search_result_candidate_image_evidence( + submission_id, + crop_fingerprint, + source_evidence, + image_url, + match_type="face_crop_search_result_page_image", + candidate_source="face_crop_provider_page_image", + extra_data=extra_data, + ) + if match is not None: + return [match] + + for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence): + match = self._search_result_candidate_image_evidence( + submission_id, + crop_fingerprint, + source_evidence, + image_url, + match_type="face_crop_search_result_page_image", + candidate_source=f"face_crop_{candidate_source}", + extra_data=extra_data, + ) + if match is not None: + return [match] + return matches + + def _search_result_candidate_image_evidence( + self, + submission_id: str, + submission_fingerprint: str, + source_evidence: Evidence, + image_url: str, + match_type: str, + candidate_source: str, + extra_data: dict[str, Any] | None = None, + ) -> Evidence | None: + image_url = _normalized_remote_image_url(image_url) + result_url = str( + source_evidence.data.get("result_url", source_evidence.data.get("url", "")) + or image_url + ) + image_id = _stable_id( + "searchimg", + submission_id, + str(source_evidence.source), + match_type, + image_url, + str(source_evidence.data.get("query", "")), + ) + image_record = self._store_candidate_image(image_id, image_url, referer_url=result_url) + if not image_record: + return None + + similarity = FingerprintService().similarity( + submission_fingerprint, + str(image_record["perceptualFingerprint"]), + ) + if similarity < self.provider_runtime.search_result_similarity_threshold: + return None + return Evidence( + source=EvidenceSource.FINGERPRINT, + reason=f"Search result image similarity {similarity:.2f}", + confidence=similarity, + data={ + "submission_id": submission_id, + "provider": source_evidence.data.get("provider", ""), + "query": source_evidence.data.get("query", ""), + "query_signature": source_evidence.data.get("query_signature", ""), + "query_strategy": source_evidence.data.get("query_strategy", ""), + "query_source": source_evidence.data.get("query_source", ""), + "url": result_url, + "result_url": result_url, + "image_url": image_record["asset"], + "thumbnail_url": image_record["asset"], + "remote_image_url": image_url, + "source_page_url": result_url, + "image_candidate_source": candidate_source, + "page_title": source_evidence.data.get("page_title", source_evidence.data.get("title", "")), + "match": match_type, + "similarity": similarity, + "source_evidence_ids": [_evidence_id(submission_id, source_evidence)], + "contributed": True, + **(extra_data or {}), + }, + ) + + def _search_result_page_image_candidates(self, source_evidence: Evidence) -> list[tuple[str, str]]: + page_url = str( + source_evidence.data.get("result_url", source_evidence.data.get("url", "")) + ) + limit = getattr(self.provider_runtime, "search_result_page_image_limit", 3) + if not page_url or limit <= 0 or not _is_http_url(page_url): + return [] + if _url_looks_like_image(page_url): + return [] + try: + content = self.page_fetcher(page_url) + except Exception: + return [] + if _content_has_comparable_image_fingerprint(content): + return [(page_url, "result_page_direct_image")] + image_urls = _extract_page_image_urls(content, page_url, limit) + if len(image_urls) < limit: + image_urls.extend( + self._search_result_stylesheet_image_urls( + content, + page_url, + limit - len(image_urls), + ) + ) + return [(image_url, "html_page_image") for image_url in _unique_texts(image_urls)[:limit]] + + def _search_result_stylesheet_image_urls( + self, + page_content: bytes, + page_url: str, + limit: int, + ) -> list[str]: + if limit <= 0: + return [] + image_urls: list[str] = [] + for stylesheet_url in _extract_page_stylesheet_urls(page_content, page_url, limit): + try: + stylesheet_content = self.stylesheet_fetcher(stylesheet_url) + except Exception: + continue + for image_url in _extract_css_image_urls(stylesheet_content, stylesheet_url, limit - len(image_urls)): + image_urls.append(image_url) + if len(image_urls) >= limit: + return image_urls + return image_urls + + def _search_result_page_image_urls(self, source_evidence: Evidence) -> list[str]: + return [ + image_url + for image_url, _candidate_source in self._search_result_page_image_candidates(source_evidence) + ] + + def _increment_knowledge_contribution_counts( + self, + submission_id: str, + evidence: list[Evidence], + ) -> None: + matched_entry_ids = _unique_texts( + str(item.data.get("knowledge_entry_id", "")) + for item in evidence + if item.source == EvidenceSource.FINGERPRINT + and item.data.get("knowledge_entry_status") == "watchlist" + and item.data.get("knowledge_entry_id") + ) + for entry_id in matched_entry_ids: + try: + entry = self._get("knowledge_entries", entry_id) + except KeyError: + continue + if entry.get("entryStatus") != "watchlist": + continue + if str(entry.get("sourceSubmissionId", "")) == submission_id: + continue + matched_submission_ids = _text_list(entry.get("matchedSubmissionIds")) + if submission_id in matched_submission_ids: + continue + matched_submission_ids.append(submission_id) + entry["matchedSubmissionIds"] = matched_submission_ids + entry["contributionCount"] = int(entry.get("contributionCount", 0) or 0) + 1 + entry["lastMatchedSubmissionId"] = submission_id + entry["lastMatchedAt"] = _now_label() + self._put("knowledge_entries", entry_id, entry) + + def _store_manual_knowledge_image( + self, + entry_id: str, + image_payload: Any, + ) -> dict[str, Any] | None: + if not image_payload: + return None + if not isinstance(image_payload, dict): + raise ValueError("knowledge image must be an object") + + data = str(image_payload.get("data", "")) + if not data: + raise ValueError("knowledge image data required") + if "," in data and data.split(",", 1)[0].startswith("data:"): + data = data.split(",", 1)[1] + try: + content = base64.b64decode(data, validate=True) + except Exception as exc: + raise ValueError("knowledge image data must be base64") from exc + if not content: + raise ValueError("knowledge image is empty") + + filename = str(image_payload.get("filename", "reference")).strip() or "reference" + suffix = _safe_image_suffix(filename, str(image_payload.get("content_type", ""))) + safe_stem = _safe_filename(Path(filename).stem) or "reference" + target_name = f"{entry_id}-{safe_stem}{suffix}" + self.knowledge_image_dir.mkdir(parents=True, exist_ok=True) + root = self.knowledge_image_dir.resolve() + target = (root / target_name).resolve() + if target != root and root not in target.parents: + raise ValueError("knowledge image path points outside image store") + target.write_bytes(content) + + width, height = _image_size_from_bytes(content) + fingerprints = FingerprintService().fingerprints_for(content) + return { + "asset": f"{self.knowledge_public_prefix}/{target_name}", + "perceptualFingerprint": fingerprints.perceptual, + "facts": { + "filename": filename, + "format": suffix.lstrip(".").upper(), + "size": f"{width} x {height}", + "fingerprints": 1, + }, + } + + def _collection_candidates_from_evidence( + self, + query: str, + evidence: list[Evidence], + provider: str, + ) -> list[dict[str, Any]]: + candidates: list[dict[str, Any]] = [] + for item in evidence: + if item.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}: + continue + if item.data.get("image_url"): + candidate = self._candidate_payload_from_evidence( + query, + item, + provider, + source_candidate_type="search_result_image", + ) + if candidate is not None: + candidates.append(candidate) + continue + candidate_count = len(candidates) + for image_url in _unique_texts(item.data.get("page_image_urls", [])): + candidate = self._candidate_payload_from_evidence( + query, + item, + provider, + image_url=image_url, + thumbnail_url=image_url, + source_candidate_type="provider_page_image", + ) + if candidate is not None: + candidates.append(candidate) + if len(candidates) > candidate_count: + continue + for image_url in _search_result_direct_image_urls(item): + candidate = self._candidate_payload_from_evidence( + query, + item, + provider, + image_url=image_url, + thumbnail_url=image_url, + source_candidate_type="result_page_direct_image", + ) + if candidate is not None: + candidates.append(candidate) + if len(candidates) > candidate_count: + continue + for image_url, source_candidate_type in self._search_result_page_image_candidates(item): + candidate = self._candidate_payload_from_evidence( + query, + item, + provider, + image_url=image_url, + thumbnail_url=image_url, + source_candidate_type=source_candidate_type, + ) + if candidate is not None: + candidates.append(candidate) + break + return candidates + + def _candidate_payload_from_evidence( + self, + query: str, + evidence: Evidence, + provider: str = "naver", + image_url: str | None = None, + thumbnail_url: str | None = None, + source_candidate_type: str = "search_result_image", + ) -> dict[str, Any] | None: + image_url = _normalized_remote_image_url( + str(image_url if image_url is not None else evidence.data.get("image_url", "")) + ) + thumbnail_url = _normalized_remote_image_url( + str(thumbnail_url if thumbnail_url is not None else evidence.data.get("thumbnail_url", "")) + ) + result_url = str(evidence.data.get("result_url", "")) + candidate_id = _stable_id("cand", provider, source_candidate_type, query, image_url, thumbnail_url, result_url) + image_record = None + stored_image_url = "" + for candidate_url in _unique_texts([image_url, thumbnail_url]): + image_record = self._store_candidate_image( + candidate_id, + candidate_url, + referer_url=result_url, + ) + if image_record is not None: + stored_image_url = candidate_url + break + if image_record is None: + return None + display_image_url = stored_image_url or image_url + return { + "id": candidate_id, + "provider": provider, + "query": query, + "title": _strip_html(str(evidence.data.get("title", ""))), + "status": "candidate", + "rank": evidence.data.get("rank", ""), + "imageUrl": display_image_url, + "thumbnailUrl": thumbnail_url, + "resultUrl": result_url, + "sourceUrl": result_url or display_image_url, + "sourceCandidateType": source_candidate_type, + "imageAsset": image_record["asset"], + "sampleFingerprints": [image_record["perceptualFingerprint"]], + "imageFacts": image_record["facts"], + "collectedAt": _now_label(), + "collectedEpoch": int(datetime.now().timestamp()), + "promotedKnowledgeId": "", + } + + def _store_candidate_image( + self, + candidate_id: str, + url: str, + referer_url: str = "", + ) -> dict[str, Any] | None: + if not url: + return None + suffix = _image_suffix_from_url(url) + target_name = f"{candidate_id}{suffix}" + root = self.collection_image_dir.resolve() + target = (root / target_name).resolve() + if target != root and root not in target.parents: + raise ValueError("candidate image path points outside image store") + if target.exists() and target.is_file(): + try: + record = self._candidate_image_record_from_content( + target_name, + url, + suffix, + target.read_bytes(), + ) + except Exception: + record = None + if record is not None: + return record + try: + content = self._fetch_candidate_image_content(url, referer_url) + except Exception: + return None + image_record = self._candidate_image_record_from_content( + target_name, + url, + suffix, + content, + ) + if image_record is None: + return None + self.collection_image_dir.mkdir(parents=True, exist_ok=True) + target.write_bytes(content) + return image_record + + def _candidate_image_record_from_content( + self, + target_name: str, + url: str, + suffix: str, + content: bytes, + ) -> dict[str, Any] | None: + if not content: + return None + width, height = _image_size_from_bytes(content) + fingerprints = FingerprintService().fingerprints_for(content) + if fingerprints.perceptual.startswith("phash:unavailable:"): + return None + return { + "asset": f"{self.collection_public_prefix}/{target_name}", + "perceptualFingerprint": fingerprints.perceptual, + "facts": { + "source": url, + "format": suffix.lstrip(".").upper(), + "size": f"{width} x {height}", + "fingerprints": 1, + }, + } + + def _fetch_candidate_image_content(self, url: str, referer_url: str = "") -> bytes: + if self._custom_candidate_image_fetcher is not None: + return self._custom_candidate_image_fetcher(url) + return _fetch_url_bytes(url, referer_url=referer_url) + + def _rescore_submission(self, submission_id: str) -> None: + submission = self._get("submissions", submission_id) + evidence = [ + _domain_evidence_from_ui(item) + for item in self._evidence_for_submission(submission_id) + ] + score = RiskScorer().score(evidence) + submission["riskScore"] = score.score + submission["riskBand"] = score.band + submission["reasons"] = score.reasons or ["분석 근거 없음"] + self._put("submissions", submission_id, submission) + + def _rescore_all_submissions(self, queue_id: str | None = None) -> None: + for submission in self._all("submissions", queue_id=queue_id): + self._rescore_submission(str(submission["id"])) +