refactor: extract search-result similarity and candidate storage into mixin

Move the search-result image similarity, candidate-image storage, in-memory
knowledge repository, and rescoring methods into StoreSearchCandidatesMixin;
CopyrighterStore inherits it. Drop now-unused imports. sqlite_store.py 3072 ->
2358 lines (5333 -> 2358 overall, -56%). Behavior-preserving.
This commit is contained in:
유창욱 2026-06-20 21:58:10 +09:00
parent 40501e13f1
commit 8e0a8c307d
2 changed files with 745 additions and 718 deletions

View file

@ -1,10 +1,8 @@
from __future__ import annotations
import base64
import hashlib
import json
import os
import re
import shutil
import sqlite3
import threading
@ -52,15 +50,8 @@ from rights_filter.server.store_remote_fetch import (
_fetch_stylesheet_url_bytes,
_fetch_url_bytes,
)
from rights_filter.server.store_page_scrape import (
_content_has_comparable_image_fingerprint,
_extract_css_image_urls,
_extract_page_image_urls,
_extract_page_stylesheet_urls,
_normalized_remote_image_url,
_search_result_direct_image_urls,
)
from rights_filter.server.store_persistence import StorePersistenceMixin
from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin
from rights_filter.server.store_schema import (
_ensure_constrained_schema,
_ensure_queue_schema,
@ -81,11 +72,7 @@ from rights_filter.server.store_serialization import (
_google_custom_image_query_signature,
_google_custom_web_query_signature,
_google_weak_label_title,
_image_size_from_bytes,
_image_suffix_from_url,
_is_google_weak_label_payload,
_knowledge_entry_type,
_knowledge_provenance,
_knowledge_type_value,
_naver_blog_query_signature,
_naver_query_signature,
@ -94,28 +81,16 @@ from rights_filter.server.store_serialization import (
_provider_item_failed,
_provider_item_has_result,
_query_history_status,
_safe_filename,
_safe_image_suffix,
_stable_id,
_strip_html,
_submission_payload,
_submission_search_hint_evidence,
_timestamp_id,
_validate_payload,
_validate_table,
_watchlist_source_evidence,
)
from rights_filter.server.store_text import _text_list, _unique_texts
from rights_filter.server.store_url_utils import (
_decoded_nested_url,
_is_http_url,
_url_has_image_format_hint,
_url_looks_like_image,
_url_path_has_image_suffix,
)
class CopyrighterStore(StorePersistenceMixin):
class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin):
def __init__(
self,
db_path: Path | str,
@ -2379,694 +2354,3 @@ class CopyrighterStore(StorePersistenceMixin):
f"auto text query batch: {', '.join(item['query'] for item in history_entries)}",
)
def _knowledge_repository(self) -> InMemoryRightsFilterRepository:
repository = InMemoryRightsFilterRepository()
for payload in self._all("knowledge_entries"):
if not payload.get("active", True):
continue
if payload.get("entryStatus") == "excluded":
continue
sample_fingerprints = _text_list(
payload.get("sampleFingerprints", payload.get("sample_fingerprints", []))
)
if not sample_fingerprints:
continue
repository.save_knowledge_entry(
KnowledgeBaseEntry(
id=str(payload.get("id", "")),
entry_type=_knowledge_entry_type(str(payload.get("type", "other"))),
name=str(payload.get("name", "")),
provenance=_knowledge_provenance(str(payload.get("provenance", "manual"))),
aliases=_text_list(payload.get("aliases")),
related_keywords=_text_list(payload.get("keywords")),
policy_memo=str(payload.get("memo", "")),
sample_fingerprints=sample_fingerprints,
source_decision_id=str(payload.get("sourceDecision", "")) or None,
entry_status=str(payload.get("entryStatus", "confirmed")),
source_submission_id=str(payload.get("sourceSubmissionId", "")),
active=bool(payload.get("active", True)),
)
)
return repository
def _sync_similar_reference_images(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = [
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT and item.data.get("knowledge_entry_id")
]
if not matched_entry_ids:
return
submission = self._get("submissions", submission_id)
similar = list(submission.get("similar", []))
existing_assets = {str(item.get("asset", "")) for item in similar}
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
asset = str(entry.get("imageAsset", ""))
if not asset or asset in existing_assets:
continue
similar.append(
{
"asset": asset,
"label": f"{entry.get('name', entry_id)} / internal match",
}
)
existing_assets.add(asset)
submission["similar"] = similar
self._put("submissions", submission_id, submission)
def _sync_search_result_image_similarity(
self,
submission_id: str,
evidence: list[Evidence],
image_store: LocalSubmissionImageStore,
status: str = "active",
max_matches: int | None = None,
) -> list[Evidence]:
submission_fingerprint = self._submission_perceptual_fingerprint(
submission_id,
image_store,
)
if submission_fingerprint is None:
return []
if max_matches is None:
max_matches = self.provider_runtime.search_result_compare_limit
else:
max_matches = min(
max_matches,
self.provider_runtime.search_result_compare_limit,
)
if max_matches <= 0:
return []
similarity_evidence: list[Evidence] = []
for item in evidence:
if len(similarity_evidence) >= max_matches:
break
matches = self._search_result_image_similarity_evidence(
submission_id,
submission_fingerprint,
item,
)
if not matches:
continue
for match in matches:
if len(similarity_evidence) >= max_matches:
break
payload = _evidence_payload(submission_id, match)
payload["status"] = status
self._put("evidence", payload["id"], payload)
similarity_evidence.append(match)
if similarity_evidence:
self._rescore_submission(submission_id)
return similarity_evidence
def _can_compare_search_result_images(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> bool:
if image_store is None:
return False
return self._submission_perceptual_fingerprint(submission_id, image_store) is not None
def _search_result_similarity_count(self, submission_id: str) -> int:
return sum(
1
for item in self._evidence_by_submission().get(submission_id, [])
if item.get("source") == "fingerprint"
and str(item.get("matchType") or "").startswith("search_result")
)
def _search_result_similarity_remaining_budget(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> int:
if not self._can_compare_search_result_images(submission_id, image_store):
return 0
return max(
0,
self.provider_runtime.search_result_compare_limit
- self._search_result_similarity_count(submission_id),
)
def _submission_perceptual_fingerprint(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> str | None:
try:
fingerprint = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
).perceptual
except Exception:
return None
if fingerprint.startswith("phash:unavailable:"):
return None
return fingerprint
def _search_result_image_similarity_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
) -> list[Evidence]:
if source_evidence.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
return []
if source_evidence.data.get("weak_hint"):
return []
matches: list[Evidence] = []
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_image",
candidate_source="result_image_url",
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="result_page_direct_image",
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="provider_page_image",
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source=candidate_source,
)
if match is not None:
return [match]
return matches
def _face_crop_search_result_similarity_evidence(
self,
submission_id: str,
crop_index: int,
crop: Any,
source_evidence: Evidence,
) -> list[Evidence]:
try:
crop_fingerprint = FingerprintService().fingerprints_for(crop.content).perceptual
except Exception:
return []
if crop_fingerprint.startswith("phash:unavailable:"):
return []
matches: list[Evidence] = []
extra_data = {
"face_crop_search": True,
"crop_index": crop_index,
"weak_hint": True,
"privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.",
}
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_image",
candidate_source="face_crop_result_image_url",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_result_page_direct_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_provider_page_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source=f"face_crop_{candidate_source}",
extra_data=extra_data,
)
if match is not None:
return [match]
return matches
def _search_result_candidate_image_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
image_url: str,
match_type: str,
candidate_source: str,
extra_data: dict[str, Any] | None = None,
) -> Evidence | None:
image_url = _normalized_remote_image_url(image_url)
result_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
or image_url
)
image_id = _stable_id(
"searchimg",
submission_id,
str(source_evidence.source),
match_type,
image_url,
str(source_evidence.data.get("query", "")),
)
image_record = self._store_candidate_image(image_id, image_url, referer_url=result_url)
if not image_record:
return None
similarity = FingerprintService().similarity(
submission_fingerprint,
str(image_record["perceptualFingerprint"]),
)
if similarity < self.provider_runtime.search_result_similarity_threshold:
return None
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason=f"Search result image similarity {similarity:.2f}",
confidence=similarity,
data={
"submission_id": submission_id,
"provider": source_evidence.data.get("provider", ""),
"query": source_evidence.data.get("query", ""),
"query_signature": source_evidence.data.get("query_signature", ""),
"query_strategy": source_evidence.data.get("query_strategy", ""),
"query_source": source_evidence.data.get("query_source", ""),
"url": result_url,
"result_url": result_url,
"image_url": image_record["asset"],
"thumbnail_url": image_record["asset"],
"remote_image_url": image_url,
"source_page_url": result_url,
"image_candidate_source": candidate_source,
"page_title": source_evidence.data.get("page_title", source_evidence.data.get("title", "")),
"match": match_type,
"similarity": similarity,
"source_evidence_ids": [_evidence_id(submission_id, source_evidence)],
"contributed": True,
**(extra_data or {}),
},
)
def _search_result_page_image_candidates(self, source_evidence: Evidence) -> list[tuple[str, str]]:
page_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
)
limit = getattr(self.provider_runtime, "search_result_page_image_limit", 3)
if not page_url or limit <= 0 or not _is_http_url(page_url):
return []
if _url_looks_like_image(page_url):
return []
try:
content = self.page_fetcher(page_url)
except Exception:
return []
if _content_has_comparable_image_fingerprint(content):
return [(page_url, "result_page_direct_image")]
image_urls = _extract_page_image_urls(content, page_url, limit)
if len(image_urls) < limit:
image_urls.extend(
self._search_result_stylesheet_image_urls(
content,
page_url,
limit - len(image_urls),
)
)
return [(image_url, "html_page_image") for image_url in _unique_texts(image_urls)[:limit]]
def _search_result_stylesheet_image_urls(
self,
page_content: bytes,
page_url: str,
limit: int,
) -> list[str]:
if limit <= 0:
return []
image_urls: list[str] = []
for stylesheet_url in _extract_page_stylesheet_urls(page_content, page_url, limit):
try:
stylesheet_content = self.stylesheet_fetcher(stylesheet_url)
except Exception:
continue
for image_url in _extract_css_image_urls(stylesheet_content, stylesheet_url, limit - len(image_urls)):
image_urls.append(image_url)
if len(image_urls) >= limit:
return image_urls
return image_urls
def _search_result_page_image_urls(self, source_evidence: Evidence) -> list[str]:
return [
image_url
for image_url, _candidate_source in self._search_result_page_image_candidates(source_evidence)
]
def _increment_knowledge_contribution_counts(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = _unique_texts(
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT
and item.data.get("knowledge_entry_status") == "watchlist"
and item.data.get("knowledge_entry_id")
)
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
if entry.get("entryStatus") != "watchlist":
continue
if str(entry.get("sourceSubmissionId", "")) == submission_id:
continue
matched_submission_ids = _text_list(entry.get("matchedSubmissionIds"))
if submission_id in matched_submission_ids:
continue
matched_submission_ids.append(submission_id)
entry["matchedSubmissionIds"] = matched_submission_ids
entry["contributionCount"] = int(entry.get("contributionCount", 0) or 0) + 1
entry["lastMatchedSubmissionId"] = submission_id
entry["lastMatchedAt"] = _now_label()
self._put("knowledge_entries", entry_id, entry)
def _store_manual_knowledge_image(
self,
entry_id: str,
image_payload: Any,
) -> dict[str, Any] | None:
if not image_payload:
return None
if not isinstance(image_payload, dict):
raise ValueError("knowledge image must be an object")
data = str(image_payload.get("data", ""))
if not data:
raise ValueError("knowledge image data required")
if "," in data and data.split(",", 1)[0].startswith("data:"):
data = data.split(",", 1)[1]
try:
content = base64.b64decode(data, validate=True)
except Exception as exc:
raise ValueError("knowledge image data must be base64") from exc
if not content:
raise ValueError("knowledge image is empty")
filename = str(image_payload.get("filename", "reference")).strip() or "reference"
suffix = _safe_image_suffix(filename, str(image_payload.get("content_type", "")))
safe_stem = _safe_filename(Path(filename).stem) or "reference"
target_name = f"{entry_id}-{safe_stem}{suffix}"
self.knowledge_image_dir.mkdir(parents=True, exist_ok=True)
root = self.knowledge_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("knowledge image path points outside image store")
target.write_bytes(content)
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
return {
"asset": f"{self.knowledge_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"filename": filename,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _collection_candidates_from_evidence(
self,
query: str,
evidence: list[Evidence],
provider: str,
) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in evidence:
if item.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
continue
if item.data.get("image_url"):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
source_candidate_type="search_result_image",
)
if candidate is not None:
candidates.append(candidate)
continue
candidate_count = len(candidates)
for image_url in _unique_texts(item.data.get("page_image_urls", [])):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="provider_page_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url in _search_result_direct_image_urls(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="result_page_direct_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url, source_candidate_type in self._search_result_page_image_candidates(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type=source_candidate_type,
)
if candidate is not None:
candidates.append(candidate)
break
return candidates
def _candidate_payload_from_evidence(
self,
query: str,
evidence: Evidence,
provider: str = "naver",
image_url: str | None = None,
thumbnail_url: str | None = None,
source_candidate_type: str = "search_result_image",
) -> dict[str, Any] | None:
image_url = _normalized_remote_image_url(
str(image_url if image_url is not None else evidence.data.get("image_url", ""))
)
thumbnail_url = _normalized_remote_image_url(
str(thumbnail_url if thumbnail_url is not None else evidence.data.get("thumbnail_url", ""))
)
result_url = str(evidence.data.get("result_url", ""))
candidate_id = _stable_id("cand", provider, source_candidate_type, query, image_url, thumbnail_url, result_url)
image_record = None
stored_image_url = ""
for candidate_url in _unique_texts([image_url, thumbnail_url]):
image_record = self._store_candidate_image(
candidate_id,
candidate_url,
referer_url=result_url,
)
if image_record is not None:
stored_image_url = candidate_url
break
if image_record is None:
return None
display_image_url = stored_image_url or image_url
return {
"id": candidate_id,
"provider": provider,
"query": query,
"title": _strip_html(str(evidence.data.get("title", ""))),
"status": "candidate",
"rank": evidence.data.get("rank", ""),
"imageUrl": display_image_url,
"thumbnailUrl": thumbnail_url,
"resultUrl": result_url,
"sourceUrl": result_url or display_image_url,
"sourceCandidateType": source_candidate_type,
"imageAsset": image_record["asset"],
"sampleFingerprints": [image_record["perceptualFingerprint"]],
"imageFacts": image_record["facts"],
"collectedAt": _now_label(),
"collectedEpoch": int(datetime.now().timestamp()),
"promotedKnowledgeId": "",
}
def _store_candidate_image(
self,
candidate_id: str,
url: str,
referer_url: str = "",
) -> dict[str, Any] | None:
if not url:
return None
suffix = _image_suffix_from_url(url)
target_name = f"{candidate_id}{suffix}"
root = self.collection_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("candidate image path points outside image store")
if target.exists() and target.is_file():
try:
record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
target.read_bytes(),
)
except Exception:
record = None
if record is not None:
return record
try:
content = self._fetch_candidate_image_content(url, referer_url)
except Exception:
return None
image_record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
content,
)
if image_record is None:
return None
self.collection_image_dir.mkdir(parents=True, exist_ok=True)
target.write_bytes(content)
return image_record
def _candidate_image_record_from_content(
self,
target_name: str,
url: str,
suffix: str,
content: bytes,
) -> dict[str, Any] | None:
if not content:
return None
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
if fingerprints.perceptual.startswith("phash:unavailable:"):
return None
return {
"asset": f"{self.collection_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"source": url,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _fetch_candidate_image_content(self, url: str, referer_url: str = "") -> bytes:
if self._custom_candidate_image_fetcher is not None:
return self._custom_candidate_image_fetcher(url)
return _fetch_url_bytes(url, referer_url=referer_url)
def _rescore_submission(self, submission_id: str) -> None:
submission = self._get("submissions", submission_id)
evidence = [
_domain_evidence_from_ui(item)
for item in self._evidence_for_submission(submission_id)
]
score = RiskScorer().score(evidence)
submission["riskScore"] = score.score
submission["riskBand"] = score.band
submission["reasons"] = score.reasons or ["분석 근거 없음"]
self._put("submissions", submission_id, submission)
def _rescore_all_submissions(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._rescore_submission(str(submission["id"]))

View file

@ -0,0 +1,743 @@
"""Search-result image similarity, candidate-image storage, the in-memory
knowledge repository, and rescoring as a mixin for CopyrighterStore.
Mixed into CopyrighterStore; relies on persistence methods (self._put/_get/...),
self.* attributes, and the extracted helper modules. Behavior unchanged.
"""
from __future__ import annotations
import base64
import re
from datetime import datetime
from pathlib import Path
from typing import Any
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.risk_scoring import RiskScorer
from rights_filter.domain.records import (
Evidence,
EvidenceSource,
InMemoryRightsFilterRepository,
KnowledgeBaseEntry,
)
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_page_scrape import (
_content_has_comparable_image_fingerprint,
_extract_css_image_urls,
_extract_page_image_urls,
_extract_page_stylesheet_urls,
_normalized_remote_image_url,
_search_result_direct_image_urls,
)
from rights_filter.server.store_remote_fetch import _fetch_url_bytes
from rights_filter.server.store_serialization import (
_domain_evidence_from_ui,
_evidence_id,
_evidence_payload,
_image_size_from_bytes,
_image_suffix_from_url,
_knowledge_entry_type,
_knowledge_provenance,
_now_label,
_safe_filename,
_safe_image_suffix,
_stable_id,
_strip_html,
)
from rights_filter.server.store_text import _text_list, _unique_texts
from rights_filter.server.store_url_utils import _is_http_url, _url_looks_like_image
class StoreSearchCandidatesMixin:
def _knowledge_repository(self) -> InMemoryRightsFilterRepository:
repository = InMemoryRightsFilterRepository()
for payload in self._all("knowledge_entries"):
if not payload.get("active", True):
continue
if payload.get("entryStatus") == "excluded":
continue
sample_fingerprints = _text_list(
payload.get("sampleFingerprints", payload.get("sample_fingerprints", []))
)
if not sample_fingerprints:
continue
repository.save_knowledge_entry(
KnowledgeBaseEntry(
id=str(payload.get("id", "")),
entry_type=_knowledge_entry_type(str(payload.get("type", "other"))),
name=str(payload.get("name", "")),
provenance=_knowledge_provenance(str(payload.get("provenance", "manual"))),
aliases=_text_list(payload.get("aliases")),
related_keywords=_text_list(payload.get("keywords")),
policy_memo=str(payload.get("memo", "")),
sample_fingerprints=sample_fingerprints,
source_decision_id=str(payload.get("sourceDecision", "")) or None,
entry_status=str(payload.get("entryStatus", "confirmed")),
source_submission_id=str(payload.get("sourceSubmissionId", "")),
active=bool(payload.get("active", True)),
)
)
return repository
def _sync_similar_reference_images(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = [
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT and item.data.get("knowledge_entry_id")
]
if not matched_entry_ids:
return
submission = self._get("submissions", submission_id)
similar = list(submission.get("similar", []))
existing_assets = {str(item.get("asset", "")) for item in similar}
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
asset = str(entry.get("imageAsset", ""))
if not asset or asset in existing_assets:
continue
similar.append(
{
"asset": asset,
"label": f"{entry.get('name', entry_id)} / internal match",
}
)
existing_assets.add(asset)
submission["similar"] = similar
self._put("submissions", submission_id, submission)
def _sync_search_result_image_similarity(
self,
submission_id: str,
evidence: list[Evidence],
image_store: LocalSubmissionImageStore,
status: str = "active",
max_matches: int | None = None,
) -> list[Evidence]:
submission_fingerprint = self._submission_perceptual_fingerprint(
submission_id,
image_store,
)
if submission_fingerprint is None:
return []
if max_matches is None:
max_matches = self.provider_runtime.search_result_compare_limit
else:
max_matches = min(
max_matches,
self.provider_runtime.search_result_compare_limit,
)
if max_matches <= 0:
return []
similarity_evidence: list[Evidence] = []
for item in evidence:
if len(similarity_evidence) >= max_matches:
break
matches = self._search_result_image_similarity_evidence(
submission_id,
submission_fingerprint,
item,
)
if not matches:
continue
for match in matches:
if len(similarity_evidence) >= max_matches:
break
payload = _evidence_payload(submission_id, match)
payload["status"] = status
self._put("evidence", payload["id"], payload)
similarity_evidence.append(match)
if similarity_evidence:
self._rescore_submission(submission_id)
return similarity_evidence
def _can_compare_search_result_images(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> bool:
if image_store is None:
return False
return self._submission_perceptual_fingerprint(submission_id, image_store) is not None
def _search_result_similarity_count(self, submission_id: str) -> int:
return sum(
1
for item in self._evidence_by_submission().get(submission_id, [])
if item.get("source") == "fingerprint"
and str(item.get("matchType") or "").startswith("search_result")
)
def _search_result_similarity_remaining_budget(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> int:
if not self._can_compare_search_result_images(submission_id, image_store):
return 0
return max(
0,
self.provider_runtime.search_result_compare_limit
- self._search_result_similarity_count(submission_id),
)
def _submission_perceptual_fingerprint(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> str | None:
try:
fingerprint = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
).perceptual
except Exception:
return None
if fingerprint.startswith("phash:unavailable:"):
return None
return fingerprint
def _search_result_image_similarity_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
) -> list[Evidence]:
if source_evidence.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
return []
if source_evidence.data.get("weak_hint"):
return []
matches: list[Evidence] = []
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_image",
candidate_source="result_image_url",
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="result_page_direct_image",
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="provider_page_image",
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source=candidate_source,
)
if match is not None:
return [match]
return matches
def _face_crop_search_result_similarity_evidence(
self,
submission_id: str,
crop_index: int,
crop: Any,
source_evidence: Evidence,
) -> list[Evidence]:
try:
crop_fingerprint = FingerprintService().fingerprints_for(crop.content).perceptual
except Exception:
return []
if crop_fingerprint.startswith("phash:unavailable:"):
return []
matches: list[Evidence] = []
extra_data = {
"face_crop_search": True,
"crop_index": crop_index,
"weak_hint": True,
"privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.",
}
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_image",
candidate_source="face_crop_result_image_url",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_result_page_direct_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_provider_page_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source=f"face_crop_{candidate_source}",
extra_data=extra_data,
)
if match is not None:
return [match]
return matches
def _search_result_candidate_image_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
image_url: str,
match_type: str,
candidate_source: str,
extra_data: dict[str, Any] | None = None,
) -> Evidence | None:
image_url = _normalized_remote_image_url(image_url)
result_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
or image_url
)
image_id = _stable_id(
"searchimg",
submission_id,
str(source_evidence.source),
match_type,
image_url,
str(source_evidence.data.get("query", "")),
)
image_record = self._store_candidate_image(image_id, image_url, referer_url=result_url)
if not image_record:
return None
similarity = FingerprintService().similarity(
submission_fingerprint,
str(image_record["perceptualFingerprint"]),
)
if similarity < self.provider_runtime.search_result_similarity_threshold:
return None
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason=f"Search result image similarity {similarity:.2f}",
confidence=similarity,
data={
"submission_id": submission_id,
"provider": source_evidence.data.get("provider", ""),
"query": source_evidence.data.get("query", ""),
"query_signature": source_evidence.data.get("query_signature", ""),
"query_strategy": source_evidence.data.get("query_strategy", ""),
"query_source": source_evidence.data.get("query_source", ""),
"url": result_url,
"result_url": result_url,
"image_url": image_record["asset"],
"thumbnail_url": image_record["asset"],
"remote_image_url": image_url,
"source_page_url": result_url,
"image_candidate_source": candidate_source,
"page_title": source_evidence.data.get("page_title", source_evidence.data.get("title", "")),
"match": match_type,
"similarity": similarity,
"source_evidence_ids": [_evidence_id(submission_id, source_evidence)],
"contributed": True,
**(extra_data or {}),
},
)
def _search_result_page_image_candidates(self, source_evidence: Evidence) -> list[tuple[str, str]]:
page_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
)
limit = getattr(self.provider_runtime, "search_result_page_image_limit", 3)
if not page_url or limit <= 0 or not _is_http_url(page_url):
return []
if _url_looks_like_image(page_url):
return []
try:
content = self.page_fetcher(page_url)
except Exception:
return []
if _content_has_comparable_image_fingerprint(content):
return [(page_url, "result_page_direct_image")]
image_urls = _extract_page_image_urls(content, page_url, limit)
if len(image_urls) < limit:
image_urls.extend(
self._search_result_stylesheet_image_urls(
content,
page_url,
limit - len(image_urls),
)
)
return [(image_url, "html_page_image") for image_url in _unique_texts(image_urls)[:limit]]
def _search_result_stylesheet_image_urls(
self,
page_content: bytes,
page_url: str,
limit: int,
) -> list[str]:
if limit <= 0:
return []
image_urls: list[str] = []
for stylesheet_url in _extract_page_stylesheet_urls(page_content, page_url, limit):
try:
stylesheet_content = self.stylesheet_fetcher(stylesheet_url)
except Exception:
continue
for image_url in _extract_css_image_urls(stylesheet_content, stylesheet_url, limit - len(image_urls)):
image_urls.append(image_url)
if len(image_urls) >= limit:
return image_urls
return image_urls
def _search_result_page_image_urls(self, source_evidence: Evidence) -> list[str]:
return [
image_url
for image_url, _candidate_source in self._search_result_page_image_candidates(source_evidence)
]
def _increment_knowledge_contribution_counts(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = _unique_texts(
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT
and item.data.get("knowledge_entry_status") == "watchlist"
and item.data.get("knowledge_entry_id")
)
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
if entry.get("entryStatus") != "watchlist":
continue
if str(entry.get("sourceSubmissionId", "")) == submission_id:
continue
matched_submission_ids = _text_list(entry.get("matchedSubmissionIds"))
if submission_id in matched_submission_ids:
continue
matched_submission_ids.append(submission_id)
entry["matchedSubmissionIds"] = matched_submission_ids
entry["contributionCount"] = int(entry.get("contributionCount", 0) or 0) + 1
entry["lastMatchedSubmissionId"] = submission_id
entry["lastMatchedAt"] = _now_label()
self._put("knowledge_entries", entry_id, entry)
def _store_manual_knowledge_image(
self,
entry_id: str,
image_payload: Any,
) -> dict[str, Any] | None:
if not image_payload:
return None
if not isinstance(image_payload, dict):
raise ValueError("knowledge image must be an object")
data = str(image_payload.get("data", ""))
if not data:
raise ValueError("knowledge image data required")
if "," in data and data.split(",", 1)[0].startswith("data:"):
data = data.split(",", 1)[1]
try:
content = base64.b64decode(data, validate=True)
except Exception as exc:
raise ValueError("knowledge image data must be base64") from exc
if not content:
raise ValueError("knowledge image is empty")
filename = str(image_payload.get("filename", "reference")).strip() or "reference"
suffix = _safe_image_suffix(filename, str(image_payload.get("content_type", "")))
safe_stem = _safe_filename(Path(filename).stem) or "reference"
target_name = f"{entry_id}-{safe_stem}{suffix}"
self.knowledge_image_dir.mkdir(parents=True, exist_ok=True)
root = self.knowledge_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("knowledge image path points outside image store")
target.write_bytes(content)
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
return {
"asset": f"{self.knowledge_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"filename": filename,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _collection_candidates_from_evidence(
self,
query: str,
evidence: list[Evidence],
provider: str,
) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in evidence:
if item.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
continue
if item.data.get("image_url"):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
source_candidate_type="search_result_image",
)
if candidate is not None:
candidates.append(candidate)
continue
candidate_count = len(candidates)
for image_url in _unique_texts(item.data.get("page_image_urls", [])):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="provider_page_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url in _search_result_direct_image_urls(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="result_page_direct_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url, source_candidate_type in self._search_result_page_image_candidates(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type=source_candidate_type,
)
if candidate is not None:
candidates.append(candidate)
break
return candidates
def _candidate_payload_from_evidence(
self,
query: str,
evidence: Evidence,
provider: str = "naver",
image_url: str | None = None,
thumbnail_url: str | None = None,
source_candidate_type: str = "search_result_image",
) -> dict[str, Any] | None:
image_url = _normalized_remote_image_url(
str(image_url if image_url is not None else evidence.data.get("image_url", ""))
)
thumbnail_url = _normalized_remote_image_url(
str(thumbnail_url if thumbnail_url is not None else evidence.data.get("thumbnail_url", ""))
)
result_url = str(evidence.data.get("result_url", ""))
candidate_id = _stable_id("cand", provider, source_candidate_type, query, image_url, thumbnail_url, result_url)
image_record = None
stored_image_url = ""
for candidate_url in _unique_texts([image_url, thumbnail_url]):
image_record = self._store_candidate_image(
candidate_id,
candidate_url,
referer_url=result_url,
)
if image_record is not None:
stored_image_url = candidate_url
break
if image_record is None:
return None
display_image_url = stored_image_url or image_url
return {
"id": candidate_id,
"provider": provider,
"query": query,
"title": _strip_html(str(evidence.data.get("title", ""))),
"status": "candidate",
"rank": evidence.data.get("rank", ""),
"imageUrl": display_image_url,
"thumbnailUrl": thumbnail_url,
"resultUrl": result_url,
"sourceUrl": result_url or display_image_url,
"sourceCandidateType": source_candidate_type,
"imageAsset": image_record["asset"],
"sampleFingerprints": [image_record["perceptualFingerprint"]],
"imageFacts": image_record["facts"],
"collectedAt": _now_label(),
"collectedEpoch": int(datetime.now().timestamp()),
"promotedKnowledgeId": "",
}
def _store_candidate_image(
self,
candidate_id: str,
url: str,
referer_url: str = "",
) -> dict[str, Any] | None:
if not url:
return None
suffix = _image_suffix_from_url(url)
target_name = f"{candidate_id}{suffix}"
root = self.collection_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("candidate image path points outside image store")
if target.exists() and target.is_file():
try:
record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
target.read_bytes(),
)
except Exception:
record = None
if record is not None:
return record
try:
content = self._fetch_candidate_image_content(url, referer_url)
except Exception:
return None
image_record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
content,
)
if image_record is None:
return None
self.collection_image_dir.mkdir(parents=True, exist_ok=True)
target.write_bytes(content)
return image_record
def _candidate_image_record_from_content(
self,
target_name: str,
url: str,
suffix: str,
content: bytes,
) -> dict[str, Any] | None:
if not content:
return None
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
if fingerprints.perceptual.startswith("phash:unavailable:"):
return None
return {
"asset": f"{self.collection_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"source": url,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _fetch_candidate_image_content(self, url: str, referer_url: str = "") -> bytes:
if self._custom_candidate_image_fetcher is not None:
return self._custom_candidate_image_fetcher(url)
return _fetch_url_bytes(url, referer_url=referer_url)
def _rescore_submission(self, submission_id: str) -> None:
submission = self._get("submissions", submission_id)
evidence = [
_domain_evidence_from_ui(item)
for item in self._evidence_for_submission(submission_id)
]
score = RiskScorer().score(evidence)
submission["riskScore"] = score.score
submission["riskBand"] = score.band
submission["reasons"] = score.reasons or ["분석 근거 없음"]
self._put("submissions", submission_id, submission)
def _rescore_all_submissions(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._rescore_submission(str(submission["id"]))