refactor: extract operator operations into StoreOperationsMixin

Move knowledge-entry lifecycle, rerun/auto/manual search drivers, LLM summary
management, manual knowledge registration, and keyword-candidate collection/
promotion into a mixin; CopyrighterStore inherits it. Drop now-unused imports;
point the rollback test at store_serialization._stable_id. sqlite_store.py 1598
-> 874 lines (5333 -> 874, -84%).
This commit is contained in:
유창욱 2026-06-20 22:28:30 +09:00
parent 3bc07d94c3
commit b575d2ee06
3 changed files with 771 additions and 734 deletions

View file

@ -16,7 +16,6 @@ from rights_filter.analysis.face_person_detection import HeuristicFacePersonDete
from rights_filter.analysis.fingerprints import FingerprintService from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.internal_analyzer import InternalAnalyzer from rights_filter.analysis.internal_analyzer import InternalAnalyzer
from rights_filter.analysis.risk_scoring import RiskScorer from rights_filter.analysis.risk_scoring import RiskScorer
from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import ( from rights_filter.domain.records import (
Evidence, Evidence,
EvidenceSource, EvidenceSource,
@ -30,7 +29,7 @@ from rights_filter.integrations.cloud_vision_web_detection import (
from rights_filter.integrations.env_clients import ProviderRuntime, build_provider_runtime from rights_filter.integrations.env_clients import ProviderRuntime, build_provider_runtime
from rights_filter.integrations.external_policy import ExternalApiPolicy from rights_filter.integrations.external_policy import ExternalApiPolicy
from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage
from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_constants import ( from rights_filter.server.store_constants import (
DEFAULT_COVERAGE_GOOD_THRESHOLD, DEFAULT_COVERAGE_GOOD_THRESHOLD,
DEFAULT_COVERAGE_WARN_THRESHOLD, DEFAULT_COVERAGE_WARN_THRESHOLD,
@ -49,6 +48,7 @@ from rights_filter.server.store_remote_fetch import (
_fetch_url_bytes, _fetch_url_bytes,
) )
from rights_filter.server.store_enrichment import StoreEnrichmentMixin from rights_filter.server.store_enrichment import StoreEnrichmentMixin
from rights_filter.server.store_operations import StoreOperationsMixin
from rights_filter.server.store_persistence import StorePersistenceMixin from rights_filter.server.store_persistence import StorePersistenceMixin
from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin
from rights_filter.server.store_schema import ( from rights_filter.server.store_schema import (
@ -59,24 +59,23 @@ from rights_filter.server.store_schema import (
) )
from rights_filter.server.store_serialization import ( from rights_filter.server.store_serialization import (
_default_evidence_contribution, _default_evidence_contribution,
_domain_evidence_from_ui,
_evidence_id, _evidence_id,
_evidence_matches_provider, _evidence_matches_provider,
_evidence_payload, _evidence_payload,
_knowledge_type_value,
_now_label, _now_label,
_provider_item_failed, _provider_item_failed,
_provider_item_has_result, _provider_item_has_result,
_stable_id,
_submission_payload, _submission_payload,
_submission_search_hint_evidence, _submission_search_hint_evidence,
_timestamp_id,
_watchlist_source_evidence,
) )
from rights_filter.server.store_text import _text_list, _unique_texts
class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreEnrichmentMixin): class CopyrighterStore(
StorePersistenceMixin,
StoreSearchCandidatesMixin,
StoreEnrichmentMixin,
StoreOperationsMixin,
):
def __init__( def __init__(
self, self,
db_path: Path | str, db_path: Path | str,
@ -774,730 +773,6 @@ class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreE
) )
return self.review(submission_id) return self.review(submission_id)
def promote_watchlist_entry(self, entry_id: str) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") != "watchlist":
raise ValueError("knowledge entry is not a watchlist candidate")
entry["entryStatus"] = "confirmed"
entry["active"] = True
entry["excludedReason"] = ""
entry["confirmedAt"] = _now_label()
entry["confirmedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate promoted",
str(entry.get("name", entry_id)),
"promoted into confirmed reference DB",
)
return self.bootstrap()
def exclude_watchlist_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") not in {"watchlist", "confirmed"}:
raise ValueError("knowledge entry cannot be excluded")
entry["entryStatus"] = "excluded"
entry["active"] = False
entry["excludedReason"] = reason.strip() or "오탐 또는 무관 후보"
entry["excludedAt"] = _now_label()
entry["excludedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate excluded",
str(entry.get("name", entry_id)),
entry["excludedReason"],
)
return self.bootstrap()
def update_knowledge_entry(self, entry_id: str, payload: dict[str, Any]) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
updates: dict[str, Any] = {}
if "aliases" in payload:
updates["aliases"] = _text_list(payload.get("aliases"))
if "keywords" in payload:
updates["keywords"] = _text_list(payload.get("keywords"))
if "memo" in payload:
updates["memo"] = str(payload.get("memo", "")).strip()
if not updates:
raise ValueError("aliases, keywords, memo 중 수정할 값이 필요합니다")
before = {key: entry.get(key) for key in updates}
entry.update(updates)
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry updated",
str(entry.get("name", entry_id)),
f"{json.dumps(before, ensure_ascii=False)} -> {json.dumps(updates, ensure_ascii=False)}",
)
return self.bootstrap()
def deactivate_knowledge_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 비활성화할 수 있습니다")
if not entry.get("active", False):
raise ValueError("이미 비활성 상태입니다")
entry["active"] = False
entry["deactivatedAt"] = _now_label()
entry["deactivatedBy"] = "rights.ops"
entry["deactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry deactivated",
str(entry.get("name", entry_id)),
reason.strip() or "운영자 비활성화",
)
return self.bootstrap()
def reactivate_knowledge_entry(self, entry_id: str, reason: str) -> dict[str, Any]:
if not reason.strip():
raise ValueError("재활성에는 사유 메모가 필요합니다")
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 재활성화할 수 있습니다")
if entry.get("active", False):
raise ValueError("이미 활성 상태입니다")
entry["active"] = True
entry["reactivatedAt"] = _now_label()
entry["reactivatedBy"] = "rights.ops"
entry["reactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry reactivated",
str(entry.get("name", entry_id)),
reason.strip(),
)
return self.bootstrap()
def _create_or_update_watchlist_entry(
self,
submission_id: str,
decision: str,
memo: str,
image_store: LocalSubmissionImageStore | None,
*,
conn: sqlite3.Connection | None = None,
) -> None:
submission = self._get("submissions", submission_id, conn=conn)
evidence = self._evidence_by_submission().get(submission_id, [])
selected_evidence = _watchlist_source_evidence(evidence)
selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")]
sample_fingerprints = self._watchlist_fingerprints(submission_id, image_store)
entry_id = _stable_id("kb-watchlist", submission_id)
try:
existing = self._get("knowledge_entries", entry_id, conn=conn)
except KeyError:
existing = {}
keywords = _unique_texts(
[
*[str(item) for item in submission.get("reasons", [])[:3]],
*[str(item.get("title", "")) for item in selected_evidence[:3]],
]
)
entry = {
**existing,
"id": entry_id,
"name": submission.get("derivedPreview", {}).get("entryName") or f"{submission.get('title', submission_id)} / {submission_id}",
"type": "rejected_image",
"aliases": _unique_texts([submission_id, str(submission.get("title", ""))]),
"keywords": keywords,
"memo": memo.strip() or ("보류 판정으로 자동 생성" if decision == "held" else "반려 판정으로 자동 생성"),
"provenance": "automatic",
"active": True,
"entryStatus": "watchlist",
"originDecisionStatus": decision,
"sourceDecision": f"DEC-{submission_id}",
"sourceSubmissionId": submission_id,
"sourceEvidenceIds": selected_evidence_ids,
"sampleFingerprints": sample_fingerprints or _text_list(existing.get("sampleFingerprints")),
"imageAsset": str(submission.get("asset", "")),
"imageFacts": submission.get("fileFacts", {}),
"contributionCount": int(existing.get("contributionCount", 0) or 0),
"matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")),
"lastOriginDecisionAt": _now_label(),
}
self._put("knowledge_entries", entry_id, entry, conn=conn)
def _watchlist_fingerprints(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> list[str]:
if image_store is None:
return []
try:
fingerprints = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
)
except Exception:
return []
return [fingerprints.perceptual]
def rerun_enrichment(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
score_before = int(submission.get("riskScore", 0) or 0)
evidence_before = {
str(item.get("id", "")): item
for item in self._evidence_by_submission().get(submission_id, [])
}
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
evidence = {
"id": f"ev-{submission_id}-rerun-{_timestamp_id()}",
"group": "internal",
"source": "fingerprint",
"title": "재분석 요청이 접수됨",
"confidence": 0,
"query": "",
"domain": "internal",
"url": "",
"retrievedAt": _now_label(),
"contributed": False,
"sourceEvidenceIds": [],
"status": "queued",
}
self._put("evidence", evidence["id"], {**evidence, "submission_id": submission_id})
if image_store is not None:
self._rerun_internal_analysis(submission_id, image_store)
google_evidence = self._rerun_google_image_search(submission_id, image_store)
query_source_evidence = [
*google_evidence,
*_submission_search_hint_evidence(submission),
]
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self.add_audit_event("rights.ops", "Analysis run created", submission_id, "operator rerun")
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
evidence_after = {
str(item.get("id", "")): item
for item in self._evidence_for_submission(submission_id)
}
rerun_marker_prefix = f"ev-{submission_id}-rerun-"
# LLM 요약은 재분석마다 삭제 후 재생성되어 id가 항상 바뀌므로(요약의
# source_evidence_ids에 타임스탬프 마커 id가 섞임) diff에 포함하면
# 변경이 없어도 매번 신규+제거로 잡힌다 — diff 대상에서 제외한다.
added_ids = [
evidence_id
for evidence_id in evidence_after
if evidence_id not in evidence_before
and not evidence_id.startswith(rerun_marker_prefix)
and str(evidence_after[evidence_id].get("source", "")) != "llm"
]
removed_items = [
evidence_before[evidence_id]
for evidence_id in evidence_before
if evidence_id not in evidence_after
and str(evidence_before[evidence_id].get("source", "")) != "llm"
]
refreshed = self._get("submissions", submission_id)
refreshed["lastRerunDiff"] = {
"at": _now_label(),
"scoreBefore": score_before,
"scoreAfter": int(refreshed.get("riskScore", 0) or 0),
"addedEvidenceIds": added_ids,
"removedEvidenceIds": [str(item.get("id", "")) for item in removed_items],
"removedSummaries": [
{"source": str(item.get("source", "")), "reason": str(item.get("title", ""))}
for item in removed_items
],
}
self._put("submissions", submission_id, refreshed)
return self.review(submission_id)
def run_auto_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
existing_evidence = self._evidence_by_submission().get(submission_id, [])
query_source_evidence = [
_domain_evidence_from_ui(item)
for item in existing_evidence
if item.get("source") in {"google", "naver", "face", "fingerprint", "llm", "failure"}
]
query_source_evidence.extend(_submission_search_hint_evidence(submission))
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event(
"rights.ops",
"Provider called",
f"auto-search / {submission_id}",
"operator request for auto text search",
)
return self.review(submission_id)
def manual_search(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
domain_evidence, provider_call_count = self._manual_search_evidence(
submission_id,
provider,
query,
image_store,
)
for item in domain_evidence:
evidence = _evidence_payload(submission_id, item)
evidence["status"] = "manual"
self._put("evidence", evidence["id"], evidence)
submission.setdefault("queryHistory", []).insert(
0,
{
"provider": provider,
"query": query,
"status": "manual",
"timestamp": _now_label(),
"count": len(domain_evidence),
},
)
self._put("submissions", submission_id, submission)
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} search failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self._ensure_llm_summary(submission_id)
if image_store is not None:
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event("rights.ops", "Provider called", f"{provider} / {submission_id}", f"manual text query: {query}")
return self.review(submission_id)
def _ensure_llm_summaries_for_existing_source_evidence(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._ensure_llm_summary(str(submission["id"]), only_if_missing=True)
def _ensure_llm_summary(self, submission_id: str, *, only_if_missing: bool = False) -> bool:
if self.provider_runtime.llm_assistant is None:
return False
llm_provider = self._get("providers", "llm")
if not llm_provider.get("enabled"):
return False
evidence_payloads = self._evidence_by_submission().get(submission_id, [])
if only_if_missing and any(
_evidence_matches_provider(item, "llm") and _provider_item_has_result(item)
for item in evidence_payloads
):
return False
source_evidence = [
_domain_evidence_from_ui(item)
for item in evidence_payloads
if item.get("source") in {"fingerprint", "face", "google", "naver"}
]
if not source_evidence:
return False
llm_evidence = self.provider_runtime.llm_assistant.summarize(
submission_id,
source_evidence,
)
self._delete_llm_summary_evidence(submission_id)
self._put(
"evidence",
_evidence_id(submission_id, llm_evidence),
_evidence_payload(submission_id, llm_evidence),
)
if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE:
llm_provider["lastFailure"] = llm_evidence.reason
else:
llm_provider["lastSuccess"] = _now_label()
llm_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("llm", 1, llm_provider)
return True
def _delete_llm_summary_evidence(self, submission_id: str) -> None:
with self._connect() as conn:
conn.execute(
"""
delete from evidence
where submission_id = ?
and (
source = 'llm'
or (
source = 'failure'
and json_extract(payload, '$.title') like 'LLM assistance failed%'
)
)
""",
(submission_id,),
)
def _manual_search_evidence(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None,
) -> tuple[list[Evidence], int]:
if provider == "naver":
if self.provider_runtime.naver_adapter is None:
raise ValueError(f"{provider} provider not connected")
promoter = SearchResultPromoter()
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
domain_evidence = promoter.promote(domain_evidence)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
page_similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None and not similarity_evidence:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
call_count += 1
page_evidence = promoter.promote(page_evidence)
domain_evidence.extend(page_evidence)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="manual",
)
if (
can_compare_search_images
and image_store is not None
and not similarity_evidence
and not page_similarity_evidence
):
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
web_evidence = promoter.promote(web_evidence)
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
if provider == "google_search":
if self.provider_runtime.google_custom_search_adapter is None:
raise ValueError(f"{provider} provider not connected")
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
if can_compare_search_images and image_store is not None and not similarity_evidence:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
raise ValueError(f"{provider} provider not connected")
def register_manual_knowledge_entry(self, payload: dict[str, Any]) -> dict[str, Any]:
name = str(payload.get("name", "")).strip()
if not name:
raise ValueError("knowledge name required")
entry_id = f"kb-manual-{_timestamp_id()}"
image_record = self._store_manual_knowledge_image(entry_id, payload.get("image"))
sample_fingerprints = [image_record["perceptualFingerprint"]] if image_record else []
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "other"))),
"aliases": _text_list(payload.get("aliases")),
"keywords": _text_list(payload.get("keywords")),
"memo": str(payload.get("memo", "")).strip(),
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_record["asset"] if image_record else "",
"imageFacts": image_record["facts"] if image_record else {},
}
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
"manual image reference" if image_record else "manual text reference",
)
return entry
def collect_keyword_candidates(self, query: str, provider: str = "naver") -> dict[str, Any]:
query = query.strip()
if not query:
raise ValueError("collection query required")
if provider not in {"naver", "google_search"}:
raise ValueError(f"{provider} provider not supported for candidate collection")
self._sync_provider_payloads()
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
if provider == "naver" and self.provider_runtime.naver_adapter is None:
raise ValueError("naver provider not connected")
if provider == "google_search" and self.provider_runtime.google_custom_search_adapter is None:
raise ValueError("google_search provider not connected")
self._clear_collection_candidates()
if provider == "google_search":
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
else:
domain_evidence = self.provider_runtime.naver_adapter.search(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
collected = 0
provider_call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
for candidate in self._collection_candidates_from_evidence(query, domain_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "google_search" and collected == 0:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
provider_call_count += 1
domain_evidence.extend(page_evidence)
for candidate in self._collection_candidates_from_evidence(query, page_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} collection failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates")
payload = self.bootstrap()
payload["collected"] = collected
return payload
def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]:
with self._write_lock:
candidate = self._get("collection_candidates", candidate_id)
# Idempotent: a double-click / retry must not create a second
# confirmed knowledge entry for the same candidate.
if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"):
return self.bootstrap()
# Deterministic id so a racing retry upserts the same row instead of
# minting a new timestamped id.
entry_id = _stable_id("kb-candidate", candidate_id)
query = str(candidate.get("query", ""))
name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query
memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or [query],
"keywords": _text_list(payload.get("keywords")) or [query],
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_id,
"sampleFingerprints": _text_list(candidate.get("sampleFingerprints")),
"imageAsset": str(candidate.get("imageAsset", "")),
"imageFacts": candidate.get("imageFacts", {}),
}
with self._transaction() as conn:
self._put("knowledge_entries", entry_id, entry, conn=conn)
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", candidate_id, candidate, conn=conn)
self.add_audit_event(
"rights.ops", "Knowledge entry manually created", name,
f"promoted candidate {candidate_id}", conn=conn,
)
return self.bootstrap()
def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]:
candidate_ids = _unique_texts(_text_list(payload.get("candidate_ids", payload.get("candidateIds"))))
if not candidate_ids:
raise ValueError("candidate_ids required")
candidates = [self._get("collection_candidates", candidate_id) for candidate_id in candidate_ids]
sample_fingerprints = _unique_texts(
fingerprint
for candidate in candidates
for fingerprint in _text_list(candidate.get("sampleFingerprints"))
)
if not sample_fingerprints:
raise ValueError("selected candidates have no sample fingerprints")
entry_id = f"kb-candidate-{_timestamp_id()}"
queries = _unique_texts(str(candidate.get("query", "")) for candidate in candidates)
image_assets = _unique_texts(str(candidate.get("imageAsset", "")) for candidate in candidates)
image_facts = [
candidate.get("imageFacts", {})
for candidate in candidates
if isinstance(candidate.get("imageFacts", {}), dict) and candidate.get("imageFacts", {})
]
fallback_name = next(
(
str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
for candidate in candidates
if str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
),
"Collected reference",
)
name = str(payload.get("name", "")).strip() or fallback_name
memo = str(payload.get("memo", "")).strip() or f"Promoted from collected candidates: {', '.join(queries)}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or queries,
"keywords": _text_list(payload.get("keywords")) or queries,
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_ids[0] if len(candidate_ids) == 1 else "",
"sourceCandidates": candidate_ids,
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_assets[0] if image_assets else "",
"imageAssets": image_assets,
"imageFacts": {
"samples": len(candidates),
"queries": queries,
"items": image_facts,
},
}
self._put("knowledge_entries", entry_id, entry)
for candidate in candidates:
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", str(candidate["id"]), candidate)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
f"promoted {len(candidates)} collected candidates",
)
return self.bootstrap()
def providers(self) -> list[dict[str, Any]]: def providers(self) -> list[dict[str, Any]]:
return self._all("providers") return self._all("providers")

View file

@ -0,0 +1,761 @@
"""Operator-facing operations for CopyrighterStore, as a mixin.
Knowledge-entry lifecycle (watchlist promote/exclude, update/deactivate/
reactivate), rerun/auto/manual search drivers, LLM summary management, manual
knowledge registration, and keyword-candidate collection/promotion. Mixed into
CopyrighterStore; relies on persistence + enrichment + search-candidate methods
and self.* attributes provided by the host class. Behavior unchanged.
"""
from __future__ import annotations
import json
import re
import sqlite3
from typing import Any
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_serialization import (
_domain_evidence_from_ui,
_evidence_id,
_evidence_matches_provider,
_evidence_payload,
_knowledge_type_value,
_now_label,
_provider_item_has_result,
_stable_id,
_submission_search_hint_evidence,
_timestamp_id,
_watchlist_source_evidence,
)
from rights_filter.server.store_text import _text_list, _unique_texts
class StoreOperationsMixin:
def promote_watchlist_entry(self, entry_id: str) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") != "watchlist":
raise ValueError("knowledge entry is not a watchlist candidate")
entry["entryStatus"] = "confirmed"
entry["active"] = True
entry["excludedReason"] = ""
entry["confirmedAt"] = _now_label()
entry["confirmedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate promoted",
str(entry.get("name", entry_id)),
"promoted into confirmed reference DB",
)
return self.bootstrap()
def exclude_watchlist_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") not in {"watchlist", "confirmed"}:
raise ValueError("knowledge entry cannot be excluded")
entry["entryStatus"] = "excluded"
entry["active"] = False
entry["excludedReason"] = reason.strip() or "오탐 또는 무관 후보"
entry["excludedAt"] = _now_label()
entry["excludedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate excluded",
str(entry.get("name", entry_id)),
entry["excludedReason"],
)
return self.bootstrap()
def update_knowledge_entry(self, entry_id: str, payload: dict[str, Any]) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
updates: dict[str, Any] = {}
if "aliases" in payload:
updates["aliases"] = _text_list(payload.get("aliases"))
if "keywords" in payload:
updates["keywords"] = _text_list(payload.get("keywords"))
if "memo" in payload:
updates["memo"] = str(payload.get("memo", "")).strip()
if not updates:
raise ValueError("aliases, keywords, memo 중 수정할 값이 필요합니다")
before = {key: entry.get(key) for key in updates}
entry.update(updates)
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry updated",
str(entry.get("name", entry_id)),
f"{json.dumps(before, ensure_ascii=False)} -> {json.dumps(updates, ensure_ascii=False)}",
)
return self.bootstrap()
def deactivate_knowledge_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 비활성화할 수 있습니다")
if not entry.get("active", False):
raise ValueError("이미 비활성 상태입니다")
entry["active"] = False
entry["deactivatedAt"] = _now_label()
entry["deactivatedBy"] = "rights.ops"
entry["deactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry deactivated",
str(entry.get("name", entry_id)),
reason.strip() or "운영자 비활성화",
)
return self.bootstrap()
def reactivate_knowledge_entry(self, entry_id: str, reason: str) -> dict[str, Any]:
if not reason.strip():
raise ValueError("재활성에는 사유 메모가 필요합니다")
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 재활성화할 수 있습니다")
if entry.get("active", False):
raise ValueError("이미 활성 상태입니다")
entry["active"] = True
entry["reactivatedAt"] = _now_label()
entry["reactivatedBy"] = "rights.ops"
entry["reactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry reactivated",
str(entry.get("name", entry_id)),
reason.strip(),
)
return self.bootstrap()
def _create_or_update_watchlist_entry(
self,
submission_id: str,
decision: str,
memo: str,
image_store: LocalSubmissionImageStore | None,
*,
conn: sqlite3.Connection | None = None,
) -> None:
submission = self._get("submissions", submission_id, conn=conn)
evidence = self._evidence_by_submission().get(submission_id, [])
selected_evidence = _watchlist_source_evidence(evidence)
selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")]
sample_fingerprints = self._watchlist_fingerprints(submission_id, image_store)
entry_id = _stable_id("kb-watchlist", submission_id)
try:
existing = self._get("knowledge_entries", entry_id, conn=conn)
except KeyError:
existing = {}
keywords = _unique_texts(
[
*[str(item) for item in submission.get("reasons", [])[:3]],
*[str(item.get("title", "")) for item in selected_evidence[:3]],
]
)
entry = {
**existing,
"id": entry_id,
"name": submission.get("derivedPreview", {}).get("entryName") or f"{submission.get('title', submission_id)} / {submission_id}",
"type": "rejected_image",
"aliases": _unique_texts([submission_id, str(submission.get("title", ""))]),
"keywords": keywords,
"memo": memo.strip() or ("보류 판정으로 자동 생성" if decision == "held" else "반려 판정으로 자동 생성"),
"provenance": "automatic",
"active": True,
"entryStatus": "watchlist",
"originDecisionStatus": decision,
"sourceDecision": f"DEC-{submission_id}",
"sourceSubmissionId": submission_id,
"sourceEvidenceIds": selected_evidence_ids,
"sampleFingerprints": sample_fingerprints or _text_list(existing.get("sampleFingerprints")),
"imageAsset": str(submission.get("asset", "")),
"imageFacts": submission.get("fileFacts", {}),
"contributionCount": int(existing.get("contributionCount", 0) or 0),
"matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")),
"lastOriginDecisionAt": _now_label(),
}
self._put("knowledge_entries", entry_id, entry, conn=conn)
def _watchlist_fingerprints(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> list[str]:
if image_store is None:
return []
try:
fingerprints = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
)
except Exception:
return []
return [fingerprints.perceptual]
def rerun_enrichment(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
score_before = int(submission.get("riskScore", 0) or 0)
evidence_before = {
str(item.get("id", "")): item
for item in self._evidence_by_submission().get(submission_id, [])
}
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
evidence = {
"id": f"ev-{submission_id}-rerun-{_timestamp_id()}",
"group": "internal",
"source": "fingerprint",
"title": "재분석 요청이 접수됨",
"confidence": 0,
"query": "",
"domain": "internal",
"url": "",
"retrievedAt": _now_label(),
"contributed": False,
"sourceEvidenceIds": [],
"status": "queued",
}
self._put("evidence", evidence["id"], {**evidence, "submission_id": submission_id})
if image_store is not None:
self._rerun_internal_analysis(submission_id, image_store)
google_evidence = self._rerun_google_image_search(submission_id, image_store)
query_source_evidence = [
*google_evidence,
*_submission_search_hint_evidence(submission),
]
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self.add_audit_event("rights.ops", "Analysis run created", submission_id, "operator rerun")
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
evidence_after = {
str(item.get("id", "")): item
for item in self._evidence_for_submission(submission_id)
}
rerun_marker_prefix = f"ev-{submission_id}-rerun-"
# LLM 요약은 재분석마다 삭제 후 재생성되어 id가 항상 바뀌므로(요약의
# source_evidence_ids에 타임스탬프 마커 id가 섞임) diff에 포함하면
# 변경이 없어도 매번 신규+제거로 잡힌다 — diff 대상에서 제외한다.
added_ids = [
evidence_id
for evidence_id in evidence_after
if evidence_id not in evidence_before
and not evidence_id.startswith(rerun_marker_prefix)
and str(evidence_after[evidence_id].get("source", "")) != "llm"
]
removed_items = [
evidence_before[evidence_id]
for evidence_id in evidence_before
if evidence_id not in evidence_after
and str(evidence_before[evidence_id].get("source", "")) != "llm"
]
refreshed = self._get("submissions", submission_id)
refreshed["lastRerunDiff"] = {
"at": _now_label(),
"scoreBefore": score_before,
"scoreAfter": int(refreshed.get("riskScore", 0) or 0),
"addedEvidenceIds": added_ids,
"removedEvidenceIds": [str(item.get("id", "")) for item in removed_items],
"removedSummaries": [
{"source": str(item.get("source", "")), "reason": str(item.get("title", ""))}
for item in removed_items
],
}
self._put("submissions", submission_id, refreshed)
return self.review(submission_id)
def run_auto_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
existing_evidence = self._evidence_by_submission().get(submission_id, [])
query_source_evidence = [
_domain_evidence_from_ui(item)
for item in existing_evidence
if item.get("source") in {"google", "naver", "face", "fingerprint", "llm", "failure"}
]
query_source_evidence.extend(_submission_search_hint_evidence(submission))
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event(
"rights.ops",
"Provider called",
f"auto-search / {submission_id}",
"operator request for auto text search",
)
return self.review(submission_id)
def manual_search(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
domain_evidence, provider_call_count = self._manual_search_evidence(
submission_id,
provider,
query,
image_store,
)
for item in domain_evidence:
evidence = _evidence_payload(submission_id, item)
evidence["status"] = "manual"
self._put("evidence", evidence["id"], evidence)
submission.setdefault("queryHistory", []).insert(
0,
{
"provider": provider,
"query": query,
"status": "manual",
"timestamp": _now_label(),
"count": len(domain_evidence),
},
)
self._put("submissions", submission_id, submission)
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} search failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self._ensure_llm_summary(submission_id)
if image_store is not None:
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event("rights.ops", "Provider called", f"{provider} / {submission_id}", f"manual text query: {query}")
return self.review(submission_id)
def _ensure_llm_summaries_for_existing_source_evidence(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._ensure_llm_summary(str(submission["id"]), only_if_missing=True)
def _ensure_llm_summary(self, submission_id: str, *, only_if_missing: bool = False) -> bool:
if self.provider_runtime.llm_assistant is None:
return False
llm_provider = self._get("providers", "llm")
if not llm_provider.get("enabled"):
return False
evidence_payloads = self._evidence_by_submission().get(submission_id, [])
if only_if_missing and any(
_evidence_matches_provider(item, "llm") and _provider_item_has_result(item)
for item in evidence_payloads
):
return False
source_evidence = [
_domain_evidence_from_ui(item)
for item in evidence_payloads
if item.get("source") in {"fingerprint", "face", "google", "naver"}
]
if not source_evidence:
return False
llm_evidence = self.provider_runtime.llm_assistant.summarize(
submission_id,
source_evidence,
)
self._delete_llm_summary_evidence(submission_id)
self._put(
"evidence",
_evidence_id(submission_id, llm_evidence),
_evidence_payload(submission_id, llm_evidence),
)
if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE:
llm_provider["lastFailure"] = llm_evidence.reason
else:
llm_provider["lastSuccess"] = _now_label()
llm_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("llm", 1, llm_provider)
return True
def _delete_llm_summary_evidence(self, submission_id: str) -> None:
with self._connect() as conn:
conn.execute(
"""
delete from evidence
where submission_id = ?
and (
source = 'llm'
or (
source = 'failure'
and json_extract(payload, '$.title') like 'LLM assistance failed%'
)
)
""",
(submission_id,),
)
def _manual_search_evidence(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None,
) -> tuple[list[Evidence], int]:
if provider == "naver":
if self.provider_runtime.naver_adapter is None:
raise ValueError(f"{provider} provider not connected")
promoter = SearchResultPromoter()
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
domain_evidence = promoter.promote(domain_evidence)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
page_similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None and not similarity_evidence:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
call_count += 1
page_evidence = promoter.promote(page_evidence)
domain_evidence.extend(page_evidence)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="manual",
)
if (
can_compare_search_images
and image_store is not None
and not similarity_evidence
and not page_similarity_evidence
):
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
web_evidence = promoter.promote(web_evidence)
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
if provider == "google_search":
if self.provider_runtime.google_custom_search_adapter is None:
raise ValueError(f"{provider} provider not connected")
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
if can_compare_search_images and image_store is not None and not similarity_evidence:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
raise ValueError(f"{provider} provider not connected")
def register_manual_knowledge_entry(self, payload: dict[str, Any]) -> dict[str, Any]:
name = str(payload.get("name", "")).strip()
if not name:
raise ValueError("knowledge name required")
entry_id = f"kb-manual-{_timestamp_id()}"
image_record = self._store_manual_knowledge_image(entry_id, payload.get("image"))
sample_fingerprints = [image_record["perceptualFingerprint"]] if image_record else []
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "other"))),
"aliases": _text_list(payload.get("aliases")),
"keywords": _text_list(payload.get("keywords")),
"memo": str(payload.get("memo", "")).strip(),
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_record["asset"] if image_record else "",
"imageFacts": image_record["facts"] if image_record else {},
}
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
"manual image reference" if image_record else "manual text reference",
)
return entry
def collect_keyword_candidates(self, query: str, provider: str = "naver") -> dict[str, Any]:
query = query.strip()
if not query:
raise ValueError("collection query required")
if provider not in {"naver", "google_search"}:
raise ValueError(f"{provider} provider not supported for candidate collection")
self._sync_provider_payloads()
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
if provider == "naver" and self.provider_runtime.naver_adapter is None:
raise ValueError("naver provider not connected")
if provider == "google_search" and self.provider_runtime.google_custom_search_adapter is None:
raise ValueError("google_search provider not connected")
self._clear_collection_candidates()
if provider == "google_search":
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
else:
domain_evidence = self.provider_runtime.naver_adapter.search(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
collected = 0
provider_call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
for candidate in self._collection_candidates_from_evidence(query, domain_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "google_search" and collected == 0:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
provider_call_count += 1
domain_evidence.extend(page_evidence)
for candidate in self._collection_candidates_from_evidence(query, page_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} collection failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates")
payload = self.bootstrap()
payload["collected"] = collected
return payload
def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]:
with self._write_lock:
candidate = self._get("collection_candidates", candidate_id)
# Idempotent: a double-click / retry must not create a second
# confirmed knowledge entry for the same candidate.
if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"):
return self.bootstrap()
# Deterministic id so a racing retry upserts the same row instead of
# minting a new timestamped id.
entry_id = _stable_id("kb-candidate", candidate_id)
query = str(candidate.get("query", ""))
name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query
memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or [query],
"keywords": _text_list(payload.get("keywords")) or [query],
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_id,
"sampleFingerprints": _text_list(candidate.get("sampleFingerprints")),
"imageAsset": str(candidate.get("imageAsset", "")),
"imageFacts": candidate.get("imageFacts", {}),
}
with self._transaction() as conn:
self._put("knowledge_entries", entry_id, entry, conn=conn)
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", candidate_id, candidate, conn=conn)
self.add_audit_event(
"rights.ops", "Knowledge entry manually created", name,
f"promoted candidate {candidate_id}", conn=conn,
)
return self.bootstrap()
def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]:
candidate_ids = _unique_texts(_text_list(payload.get("candidate_ids", payload.get("candidateIds"))))
if not candidate_ids:
raise ValueError("candidate_ids required")
candidates = [self._get("collection_candidates", candidate_id) for candidate_id in candidate_ids]
sample_fingerprints = _unique_texts(
fingerprint
for candidate in candidates
for fingerprint in _text_list(candidate.get("sampleFingerprints"))
)
if not sample_fingerprints:
raise ValueError("selected candidates have no sample fingerprints")
entry_id = f"kb-candidate-{_timestamp_id()}"
queries = _unique_texts(str(candidate.get("query", "")) for candidate in candidates)
image_assets = _unique_texts(str(candidate.get("imageAsset", "")) for candidate in candidates)
image_facts = [
candidate.get("imageFacts", {})
for candidate in candidates
if isinstance(candidate.get("imageFacts", {}), dict) and candidate.get("imageFacts", {})
]
fallback_name = next(
(
str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
for candidate in candidates
if str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
),
"Collected reference",
)
name = str(payload.get("name", "")).strip() or fallback_name
memo = str(payload.get("memo", "")).strip() or f"Promoted from collected candidates: {', '.join(queries)}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or queries,
"keywords": _text_list(payload.get("keywords")) or queries,
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_ids[0] if len(candidate_ids) == 1 else "",
"sourceCandidates": candidate_ids,
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_assets[0] if image_assets else "",
"imageAssets": image_assets,
"imageFacts": {
"samples": len(candidates),
"queries": queries,
"items": image_facts,
},
}
self._put("knowledge_entries", entry_id, entry)
for candidate in candidates:
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", str(candidate["id"]), candidate)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
f"promoted {len(candidates)} collected candidates",
)
return self.bootstrap()

View file

@ -14,6 +14,7 @@ from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_enrichment as store_enrichment_module from rights_filter.server import store_enrichment as store_enrichment_module
from rights_filter.server import store_remote_fetch as remote_fetch_module from rights_filter.server import store_remote_fetch as remote_fetch_module
from rights_filter.server import store_serialization as store_serialization_module
from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime from rights_filter.integrations.env_clients import build_provider_runtime
@ -8040,7 +8041,7 @@ def test_record_decision_rolls_back_when_audit_fails(tmp_path: Path, monkeypatch
# The status change must roll back atomically with the failed audit write. # The status change must roll back atomically with the failed audit write.
assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed" assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed"
with pytest.raises(KeyError): with pytest.raises(KeyError):
store._get("knowledge_entries", sqlite_store_module._stable_id("kb-watchlist", "SUB-1")) store._get("knowledge_entries", store_serialization_module._stable_id("kb-watchlist", "SUB-1"))
@pytest.mark.parametrize( @pytest.mark.parametrize(