From b575d2ee060863d014ac87ce85a44c38f5d17ca3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 22:28:30 +0900 Subject: [PATCH] refactor: extract operator operations into StoreOperationsMixin Move knowledge-entry lifecycle, rerun/auto/manual search drivers, LLM summary management, manual knowledge registration, and keyword-candidate collection/ promotion into a mixin; CopyrighterStore inherits it. Drop now-unused imports; point the rollback test at store_serialization._stable_id. sqlite_store.py 1598 -> 874 lines (5333 -> 874, -84%). --- src/rights_filter/server/sqlite_store.py | 741 +---------------- src/rights_filter/server/store_operations.py | 761 ++++++++++++++++++ .../rights_filter/server/test_sqlite_store.py | 3 +- 3 files changed, 771 insertions(+), 734 deletions(-) create mode 100644 src/rights_filter/server/store_operations.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 3c2a0f7..44f99f5 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -16,7 +16,6 @@ from rights_filter.analysis.face_person_detection import HeuristicFacePersonDete from rights_filter.analysis.fingerprints import FingerprintService from rights_filter.analysis.internal_analyzer import InternalAnalyzer from rights_filter.analysis.risk_scoring import RiskScorer -from rights_filter.analysis.search_result_promoter import SearchResultPromoter from rights_filter.domain.records import ( Evidence, EvidenceSource, @@ -30,7 +29,7 @@ from rights_filter.integrations.cloud_vision_web_detection import ( from rights_filter.integrations.env_clients import ProviderRuntime, build_provider_runtime from rights_filter.integrations.external_policy import ExternalApiPolicy from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage -from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES +from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server.store_constants import ( DEFAULT_COVERAGE_GOOD_THRESHOLD, DEFAULT_COVERAGE_WARN_THRESHOLD, @@ -49,6 +48,7 @@ from rights_filter.server.store_remote_fetch import ( _fetch_url_bytes, ) from rights_filter.server.store_enrichment import StoreEnrichmentMixin +from rights_filter.server.store_operations import StoreOperationsMixin from rights_filter.server.store_persistence import StorePersistenceMixin from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_schema import ( @@ -59,24 +59,23 @@ from rights_filter.server.store_schema import ( ) from rights_filter.server.store_serialization import ( _default_evidence_contribution, - _domain_evidence_from_ui, _evidence_id, _evidence_matches_provider, _evidence_payload, - _knowledge_type_value, _now_label, _provider_item_failed, _provider_item_has_result, - _stable_id, _submission_payload, _submission_search_hint_evidence, - _timestamp_id, - _watchlist_source_evidence, ) -from rights_filter.server.store_text import _text_list, _unique_texts -class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreEnrichmentMixin): +class CopyrighterStore( + StorePersistenceMixin, + StoreSearchCandidatesMixin, + StoreEnrichmentMixin, + StoreOperationsMixin, +): def __init__( self, db_path: Path | str, @@ -774,730 +773,6 @@ class CopyrighterStore(StorePersistenceMixin, StoreSearchCandidatesMixin, StoreE ) return self.review(submission_id) - def promote_watchlist_entry(self, entry_id: str) -> dict[str, Any]: - entry = self._get("knowledge_entries", entry_id) - if entry.get("entryStatus") != "watchlist": - raise ValueError("knowledge entry is not a watchlist candidate") - entry["entryStatus"] = "confirmed" - entry["active"] = True - entry["excludedReason"] = "" - entry["confirmedAt"] = _now_label() - entry["confirmedBy"] = "rights.ops" - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Watchlist candidate promoted", - str(entry.get("name", entry_id)), - "promoted into confirmed reference DB", - ) - return self.bootstrap() - - def exclude_watchlist_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]: - entry = self._get("knowledge_entries", entry_id) - if entry.get("entryStatus") not in {"watchlist", "confirmed"}: - raise ValueError("knowledge entry cannot be excluded") - entry["entryStatus"] = "excluded" - entry["active"] = False - entry["excludedReason"] = reason.strip() or "오탐 또는 무관 후보" - entry["excludedAt"] = _now_label() - entry["excludedBy"] = "rights.ops" - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Watchlist candidate excluded", - str(entry.get("name", entry_id)), - entry["excludedReason"], - ) - return self.bootstrap() - - def update_knowledge_entry(self, entry_id: str, payload: dict[str, Any]) -> dict[str, Any]: - entry = self._get("knowledge_entries", entry_id) - updates: dict[str, Any] = {} - if "aliases" in payload: - updates["aliases"] = _text_list(payload.get("aliases")) - if "keywords" in payload: - updates["keywords"] = _text_list(payload.get("keywords")) - if "memo" in payload: - updates["memo"] = str(payload.get("memo", "")).strip() - if not updates: - raise ValueError("aliases, keywords, memo 중 수정할 값이 필요합니다") - - before = {key: entry.get(key) for key in updates} - entry.update(updates) - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Knowledge entry updated", - str(entry.get("name", entry_id)), - f"{json.dumps(before, ensure_ascii=False)} -> {json.dumps(updates, ensure_ascii=False)}", - ) - return self.bootstrap() - - def deactivate_knowledge_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]: - entry = self._get("knowledge_entries", entry_id) - if entry.get("entryStatus", "confirmed") != "confirmed": - raise ValueError("확정 DB 항목만 비활성화할 수 있습니다") - if not entry.get("active", False): - raise ValueError("이미 비활성 상태입니다") - entry["active"] = False - entry["deactivatedAt"] = _now_label() - entry["deactivatedBy"] = "rights.ops" - entry["deactivatedReason"] = reason.strip() - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Knowledge entry deactivated", - str(entry.get("name", entry_id)), - reason.strip() or "운영자 비활성화", - ) - return self.bootstrap() - - def reactivate_knowledge_entry(self, entry_id: str, reason: str) -> dict[str, Any]: - if not reason.strip(): - raise ValueError("재활성에는 사유 메모가 필요합니다") - entry = self._get("knowledge_entries", entry_id) - if entry.get("entryStatus", "confirmed") != "confirmed": - raise ValueError("확정 DB 항목만 재활성화할 수 있습니다") - if entry.get("active", False): - raise ValueError("이미 활성 상태입니다") - entry["active"] = True - entry["reactivatedAt"] = _now_label() - entry["reactivatedBy"] = "rights.ops" - entry["reactivatedReason"] = reason.strip() - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Knowledge entry reactivated", - str(entry.get("name", entry_id)), - reason.strip(), - ) - return self.bootstrap() - - def _create_or_update_watchlist_entry( - self, - submission_id: str, - decision: str, - memo: str, - image_store: LocalSubmissionImageStore | None, - *, - conn: sqlite3.Connection | None = None, - ) -> None: - submission = self._get("submissions", submission_id, conn=conn) - evidence = self._evidence_by_submission().get(submission_id, []) - selected_evidence = _watchlist_source_evidence(evidence) - selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")] - sample_fingerprints = self._watchlist_fingerprints(submission_id, image_store) - - entry_id = _stable_id("kb-watchlist", submission_id) - try: - existing = self._get("knowledge_entries", entry_id, conn=conn) - except KeyError: - existing = {} - - keywords = _unique_texts( - [ - *[str(item) for item in submission.get("reasons", [])[:3]], - *[str(item.get("title", "")) for item in selected_evidence[:3]], - ] - ) - entry = { - **existing, - "id": entry_id, - "name": submission.get("derivedPreview", {}).get("entryName") or f"{submission.get('title', submission_id)} / {submission_id}", - "type": "rejected_image", - "aliases": _unique_texts([submission_id, str(submission.get("title", ""))]), - "keywords": keywords, - "memo": memo.strip() or ("보류 판정으로 자동 생성" if decision == "held" else "반려 판정으로 자동 생성"), - "provenance": "automatic", - "active": True, - "entryStatus": "watchlist", - "originDecisionStatus": decision, - "sourceDecision": f"DEC-{submission_id}", - "sourceSubmissionId": submission_id, - "sourceEvidenceIds": selected_evidence_ids, - "sampleFingerprints": sample_fingerprints or _text_list(existing.get("sampleFingerprints")), - "imageAsset": str(submission.get("asset", "")), - "imageFacts": submission.get("fileFacts", {}), - "contributionCount": int(existing.get("contributionCount", 0) or 0), - "matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")), - "lastOriginDecisionAt": _now_label(), - } - self._put("knowledge_entries", entry_id, entry, conn=conn) - - def _watchlist_fingerprints( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None, - ) -> list[str]: - if image_store is None: - return [] - try: - fingerprints = FingerprintService().fingerprints_for( - image_store.image_payload(submission_id).content - ) - except Exception: - return [] - return [fingerprints.perceptual] - - def rerun_enrichment( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None = None, - ) -> dict[str, Any]: - submission = self._get("submissions", submission_id) - score_before = int(submission.get("riskScore", 0) or 0) - evidence_before = { - str(item.get("id", "")): item - for item in self._evidence_by_submission().get(submission_id, []) - } - submission["lastAnalysis"] = _now_label() - self._put("submissions", submission_id, submission) - evidence = { - "id": f"ev-{submission_id}-rerun-{_timestamp_id()}", - "group": "internal", - "source": "fingerprint", - "title": "재분석 요청이 접수됨", - "confidence": 0, - "query": "", - "domain": "internal", - "url": "", - "retrievedAt": _now_label(), - "contributed": False, - "sourceEvidenceIds": [], - "status": "queued", - } - self._put("evidence", evidence["id"], {**evidence, "submission_id": submission_id}) - - if image_store is not None: - self._rerun_internal_analysis(submission_id, image_store) - google_evidence = self._rerun_google_image_search(submission_id, image_store) - query_source_evidence = [ - *google_evidence, - *_submission_search_hint_evidence(submission), - ] - self._auto_naver_search(submission_id, query_source_evidence, image_store) - self._auto_google_custom_search(submission_id, query_source_evidence, image_store) - - self._ensure_llm_summary(submission_id) - - self.add_audit_event("rights.ops", "Analysis run created", submission_id, "operator rerun") - self._rescore_submission(submission_id) - self._sync_submission_provider_state() - - evidence_after = { - str(item.get("id", "")): item - for item in self._evidence_for_submission(submission_id) - } - rerun_marker_prefix = f"ev-{submission_id}-rerun-" - # LLM 요약은 재분석마다 삭제 후 재생성되어 id가 항상 바뀌므로(요약의 - # source_evidence_ids에 타임스탬프 마커 id가 섞임) diff에 포함하면 - # 변경이 없어도 매번 신규+제거로 잡힌다 — diff 대상에서 제외한다. - added_ids = [ - evidence_id - for evidence_id in evidence_after - if evidence_id not in evidence_before - and not evidence_id.startswith(rerun_marker_prefix) - and str(evidence_after[evidence_id].get("source", "")) != "llm" - ] - removed_items = [ - evidence_before[evidence_id] - for evidence_id in evidence_before - if evidence_id not in evidence_after - and str(evidence_before[evidence_id].get("source", "")) != "llm" - ] - refreshed = self._get("submissions", submission_id) - refreshed["lastRerunDiff"] = { - "at": _now_label(), - "scoreBefore": score_before, - "scoreAfter": int(refreshed.get("riskScore", 0) or 0), - "addedEvidenceIds": added_ids, - "removedEvidenceIds": [str(item.get("id", "")) for item in removed_items], - "removedSummaries": [ - {"source": str(item.get("source", "")), "reason": str(item.get("title", ""))} - for item in removed_items - ], - } - self._put("submissions", submission_id, refreshed) - return self.review(submission_id) - - def run_auto_search( - self, - submission_id: str, - image_store: LocalSubmissionImageStore | None = None, - ) -> dict[str, Any]: - submission = self._get("submissions", submission_id) - submission["lastAnalysis"] = _now_label() - self._put("submissions", submission_id, submission) - - existing_evidence = self._evidence_by_submission().get(submission_id, []) - query_source_evidence = [ - _domain_evidence_from_ui(item) - for item in existing_evidence - if item.get("source") in {"google", "naver", "face", "fingerprint", "llm", "failure"} - ] - query_source_evidence.extend(_submission_search_hint_evidence(submission)) - - self._auto_naver_search(submission_id, query_source_evidence, image_store) - self._auto_google_custom_search(submission_id, query_source_evidence, image_store) - - self._ensure_llm_summary(submission_id) - self._rescore_submission(submission_id) - self._sync_submission_provider_state() - self.add_audit_event( - "rights.ops", - "Provider called", - f"auto-search / {submission_id}", - "operator request for auto text search", - ) - return self.review(submission_id) - - def manual_search( - self, - submission_id: str, - provider: str, - query: str, - image_store: LocalSubmissionImageStore | None = None, - ) -> dict[str, Any]: - submission = self._get("submissions", submission_id) - provider_payload = self._get("providers", provider) - if not provider_payload["enabled"]: - raise ValueError(f"{provider} provider disabled") - - domain_evidence, provider_call_count = self._manual_search_evidence( - submission_id, - provider, - query, - image_store, - ) - for item in domain_evidence: - evidence = _evidence_payload(submission_id, item) - evidence["status"] = "manual" - self._put("evidence", evidence["id"], evidence) - - submission.setdefault("queryHistory", []).insert( - 0, - { - "provider": provider, - "query": query, - "status": "manual", - "timestamp": _now_label(), - "count": len(domain_evidence), - }, - ) - self._put("submissions", submission_id, submission) - if any( - item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} - for item in domain_evidence - ): - provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} search failed" - else: - provider_payload["lastSuccess"] = _now_label() - provider_payload["lastFailure"] = "없음" - self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) - self._ensure_llm_summary(submission_id) - if image_store is not None: - self._rescore_submission(submission_id) - self._sync_submission_provider_state() - self.add_audit_event("rights.ops", "Provider called", f"{provider} / {submission_id}", f"manual text query: {query}") - return self.review(submission_id) - - def _ensure_llm_summaries_for_existing_source_evidence(self, queue_id: str | None = None) -> None: - for submission in self._all("submissions", queue_id=queue_id): - self._ensure_llm_summary(str(submission["id"]), only_if_missing=True) - - def _ensure_llm_summary(self, submission_id: str, *, only_if_missing: bool = False) -> bool: - if self.provider_runtime.llm_assistant is None: - return False - - llm_provider = self._get("providers", "llm") - if not llm_provider.get("enabled"): - return False - - evidence_payloads = self._evidence_by_submission().get(submission_id, []) - if only_if_missing and any( - _evidence_matches_provider(item, "llm") and _provider_item_has_result(item) - for item in evidence_payloads - ): - return False - - source_evidence = [ - _domain_evidence_from_ui(item) - for item in evidence_payloads - if item.get("source") in {"fingerprint", "face", "google", "naver"} - ] - if not source_evidence: - return False - - llm_evidence = self.provider_runtime.llm_assistant.summarize( - submission_id, - source_evidence, - ) - self._delete_llm_summary_evidence(submission_id) - self._put( - "evidence", - _evidence_id(submission_id, llm_evidence), - _evidence_payload(submission_id, llm_evidence), - ) - if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE: - llm_provider["lastFailure"] = llm_evidence.reason - else: - llm_provider["lastSuccess"] = _now_label() - llm_provider["lastFailure"] = "없음" - self._apply_provider_usage_delta("llm", 1, llm_provider) - return True - - def _delete_llm_summary_evidence(self, submission_id: str) -> None: - with self._connect() as conn: - conn.execute( - """ - delete from evidence - where submission_id = ? - and ( - source = 'llm' - or ( - source = 'failure' - and json_extract(payload, '$.title') like 'LLM assistance failed%' - ) - ) - """, - (submission_id,), - ) - - def _manual_search_evidence( - self, - submission_id: str, - provider: str, - query: str, - image_store: LocalSubmissionImageStore | None, - ) -> tuple[list[Evidence], int]: - if provider == "naver": - if self.provider_runtime.naver_adapter is None: - raise ValueError(f"{provider} provider not connected") - promoter = SearchResultPromoter() - can_compare_search_images = self._can_compare_search_result_images( - submission_id, - image_store, - ) - domain_evidence = self.provider_runtime.naver_adapter.search( - submission_id, - query, - self.provider_runtime.search_policy, - ) - domain_evidence = promoter.promote(domain_evidence) - call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 - similarity_evidence: list[Evidence] = [] - if can_compare_search_images and image_store is not None: - similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - domain_evidence, - image_store, - status="manual", - ) - page_similarity_evidence: list[Evidence] = [] - if can_compare_search_images and image_store is not None and not similarity_evidence: - page_evidence = self.provider_runtime.naver_adapter.search_pages( - submission_id, - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): - call_count += 1 - page_evidence = promoter.promote(page_evidence) - domain_evidence.extend(page_evidence) - page_similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - page_evidence, - image_store, - status="manual", - ) - if ( - can_compare_search_images - and image_store is not None - and not similarity_evidence - and not page_similarity_evidence - ): - web_evidence = self.provider_runtime.naver_adapter.search_web_pages( - submission_id, - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - call_count += 1 - web_evidence = promoter.promote(web_evidence) - domain_evidence.extend(web_evidence) - self._sync_search_result_image_similarity( - submission_id, - web_evidence, - image_store, - status="manual", - ) - return domain_evidence, call_count - - if provider == "google_search": - if self.provider_runtime.google_custom_search_adapter is None: - raise ValueError(f"{provider} provider not connected") - can_compare_search_images = self._can_compare_search_result_images( - submission_id, - image_store, - ) - domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images( - submission_id, - query, - self.provider_runtime.google_custom_search_policy, - ) - call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 - similarity_evidence: list[Evidence] = [] - if can_compare_search_images and image_store is not None: - similarity_evidence = self._sync_search_result_image_similarity( - submission_id, - domain_evidence, - image_store, - status="manual", - ) - if can_compare_search_images and image_store is not None and not similarity_evidence: - web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( - submission_id, - query, - self.provider_runtime.google_custom_search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - call_count += 1 - domain_evidence.extend(web_evidence) - self._sync_search_result_image_similarity( - submission_id, - web_evidence, - image_store, - status="manual", - ) - return domain_evidence, call_count - - raise ValueError(f"{provider} provider not connected") - - def register_manual_knowledge_entry(self, payload: dict[str, Any]) -> dict[str, Any]: - name = str(payload.get("name", "")).strip() - if not name: - raise ValueError("knowledge name required") - - entry_id = f"kb-manual-{_timestamp_id()}" - image_record = self._store_manual_knowledge_image(entry_id, payload.get("image")) - sample_fingerprints = [image_record["perceptualFingerprint"]] if image_record else [] - entry = { - "id": entry_id, - "name": name, - "type": _knowledge_type_value(str(payload.get("type", "other"))), - "aliases": _text_list(payload.get("aliases")), - "keywords": _text_list(payload.get("keywords")), - "memo": str(payload.get("memo", "")).strip(), - "provenance": "manual", - "active": True, - "entryStatus": "confirmed", - "sourceDecision": "", - "sampleFingerprints": sample_fingerprints, - "imageAsset": image_record["asset"] if image_record else "", - "imageFacts": image_record["facts"] if image_record else {}, - } - self._put("knowledge_entries", entry_id, entry) - self.add_audit_event( - "rights.ops", - "Knowledge entry manually created", - name, - "manual image reference" if image_record else "manual text reference", - ) - return entry - - def collect_keyword_candidates(self, query: str, provider: str = "naver") -> dict[str, Any]: - query = query.strip() - if not query: - raise ValueError("collection query required") - if provider not in {"naver", "google_search"}: - raise ValueError(f"{provider} provider not supported for candidate collection") - - self._sync_provider_payloads() - provider_payload = self._get("providers", provider) - if not provider_payload["enabled"]: - raise ValueError(f"{provider} provider disabled") - if provider == "naver" and self.provider_runtime.naver_adapter is None: - raise ValueError("naver provider not connected") - if provider == "google_search" and self.provider_runtime.google_custom_search_adapter is None: - raise ValueError("google_search provider not connected") - - self._clear_collection_candidates() - if provider == "google_search": - domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images( - "candidate-collection", - query, - self.provider_runtime.google_custom_search_policy, - ) - else: - domain_evidence = self.provider_runtime.naver_adapter.search( - "candidate-collection", - query, - self.provider_runtime.search_policy, - ) - collected = 0 - provider_call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 - for candidate in self._collection_candidates_from_evidence(query, domain_evidence, provider): - self._put("collection_candidates", candidate["id"], candidate) - collected += 1 - - if provider == "google_search" and collected == 0: - web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( - "candidate-collection", - query, - self.provider_runtime.google_custom_search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - provider_call_count += 1 - domain_evidence.extend(web_evidence) - for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider): - self._put("collection_candidates", candidate["id"], candidate) - collected += 1 - if provider == "naver" and collected == 0: - page_evidence = self.provider_runtime.naver_adapter.search_pages( - "candidate-collection", - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): - provider_call_count += 1 - domain_evidence.extend(page_evidence) - for candidate in self._collection_candidates_from_evidence(query, page_evidence, provider): - self._put("collection_candidates", candidate["id"], candidate) - collected += 1 - if provider == "naver" and collected == 0: - web_evidence = self.provider_runtime.naver_adapter.search_web_pages( - "candidate-collection", - query, - self.provider_runtime.search_policy, - ) - if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): - provider_call_count += 1 - domain_evidence.extend(web_evidence) - for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider): - self._put("collection_candidates", candidate["id"], candidate) - collected += 1 - - if any( - item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} - for item in domain_evidence - ): - provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} collection failed" - else: - provider_payload["lastSuccess"] = _now_label() - provider_payload["lastFailure"] = "없음" - self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) - self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates") - payload = self.bootstrap() - payload["collected"] = collected - return payload - - def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]: - with self._write_lock: - candidate = self._get("collection_candidates", candidate_id) - # Idempotent: a double-click / retry must not create a second - # confirmed knowledge entry for the same candidate. - if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"): - return self.bootstrap() - # Deterministic id so a racing retry upserts the same row instead of - # minting a new timestamped id. - entry_id = _stable_id("kb-candidate", candidate_id) - query = str(candidate.get("query", "")) - name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query - memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}" - entry = { - "id": entry_id, - "name": name, - "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), - "aliases": _text_list(payload.get("aliases")) or [query], - "keywords": _text_list(payload.get("keywords")) or [query], - "memo": memo, - "provenance": "manual", - "active": True, - "entryStatus": "confirmed", - "sourceDecision": "", - "sourceCandidate": candidate_id, - "sampleFingerprints": _text_list(candidate.get("sampleFingerprints")), - "imageAsset": str(candidate.get("imageAsset", "")), - "imageFacts": candidate.get("imageFacts", {}), - } - with self._transaction() as conn: - self._put("knowledge_entries", entry_id, entry, conn=conn) - candidate["status"] = "promoted" - candidate["promotedKnowledgeId"] = entry_id - self._put("collection_candidates", candidate_id, candidate, conn=conn) - self.add_audit_event( - "rights.ops", "Knowledge entry manually created", name, - f"promoted candidate {candidate_id}", conn=conn, - ) - return self.bootstrap() - - def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]: - candidate_ids = _unique_texts(_text_list(payload.get("candidate_ids", payload.get("candidateIds")))) - if not candidate_ids: - raise ValueError("candidate_ids required") - - candidates = [self._get("collection_candidates", candidate_id) for candidate_id in candidate_ids] - sample_fingerprints = _unique_texts( - fingerprint - for candidate in candidates - for fingerprint in _text_list(candidate.get("sampleFingerprints")) - ) - if not sample_fingerprints: - raise ValueError("selected candidates have no sample fingerprints") - - entry_id = f"kb-candidate-{_timestamp_id()}" - queries = _unique_texts(str(candidate.get("query", "")) for candidate in candidates) - image_assets = _unique_texts(str(candidate.get("imageAsset", "")) for candidate in candidates) - image_facts = [ - candidate.get("imageFacts", {}) - for candidate in candidates - if isinstance(candidate.get("imageFacts", {}), dict) and candidate.get("imageFacts", {}) - ] - fallback_name = next( - ( - str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip() - for candidate in candidates - if str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip() - ), - "Collected reference", - ) - name = str(payload.get("name", "")).strip() or fallback_name - memo = str(payload.get("memo", "")).strip() or f"Promoted from collected candidates: {', '.join(queries)}" - entry = { - "id": entry_id, - "name": name, - "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), - "aliases": _text_list(payload.get("aliases")) or queries, - "keywords": _text_list(payload.get("keywords")) or queries, - "memo": memo, - "provenance": "manual", - "active": True, - "entryStatus": "confirmed", - "sourceDecision": "", - "sourceCandidate": candidate_ids[0] if len(candidate_ids) == 1 else "", - "sourceCandidates": candidate_ids, - "sampleFingerprints": sample_fingerprints, - "imageAsset": image_assets[0] if image_assets else "", - "imageAssets": image_assets, - "imageFacts": { - "samples": len(candidates), - "queries": queries, - "items": image_facts, - }, - } - self._put("knowledge_entries", entry_id, entry) - for candidate in candidates: - candidate["status"] = "promoted" - candidate["promotedKnowledgeId"] = entry_id - self._put("collection_candidates", str(candidate["id"]), candidate) - self.add_audit_event( - "rights.ops", - "Knowledge entry manually created", - name, - f"promoted {len(candidates)} collected candidates", - ) - return self.bootstrap() - def providers(self) -> list[dict[str, Any]]: return self._all("providers") diff --git a/src/rights_filter/server/store_operations.py b/src/rights_filter/server/store_operations.py new file mode 100644 index 0000000..ccae777 --- /dev/null +++ b/src/rights_filter/server/store_operations.py @@ -0,0 +1,761 @@ +"""Operator-facing operations for CopyrighterStore, as a mixin. + +Knowledge-entry lifecycle (watchlist promote/exclude, update/deactivate/ +reactivate), rerun/auto/manual search drivers, LLM summary management, manual +knowledge registration, and keyword-candidate collection/promotion. Mixed into +CopyrighterStore; relies on persistence + enrichment + search-candidate methods +and self.* attributes provided by the host class. Behavior unchanged. +""" + +from __future__ import annotations + +import json +import re +import sqlite3 +from typing import Any + +from rights_filter.analysis.fingerprints import FingerprintService +from rights_filter.analysis.search_result_promoter import SearchResultPromoter +from rights_filter.domain.records import Evidence, EvidenceSource +from rights_filter.server.image_store import LocalSubmissionImageStore +from rights_filter.server.store_serialization import ( + _domain_evidence_from_ui, + _evidence_id, + _evidence_matches_provider, + _evidence_payload, + _knowledge_type_value, + _now_label, + _provider_item_has_result, + _stable_id, + _submission_search_hint_evidence, + _timestamp_id, + _watchlist_source_evidence, +) +from rights_filter.server.store_text import _text_list, _unique_texts + + +class StoreOperationsMixin: + def promote_watchlist_entry(self, entry_id: str) -> dict[str, Any]: + entry = self._get("knowledge_entries", entry_id) + if entry.get("entryStatus") != "watchlist": + raise ValueError("knowledge entry is not a watchlist candidate") + entry["entryStatus"] = "confirmed" + entry["active"] = True + entry["excludedReason"] = "" + entry["confirmedAt"] = _now_label() + entry["confirmedBy"] = "rights.ops" + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Watchlist candidate promoted", + str(entry.get("name", entry_id)), + "promoted into confirmed reference DB", + ) + return self.bootstrap() + + def exclude_watchlist_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]: + entry = self._get("knowledge_entries", entry_id) + if entry.get("entryStatus") not in {"watchlist", "confirmed"}: + raise ValueError("knowledge entry cannot be excluded") + entry["entryStatus"] = "excluded" + entry["active"] = False + entry["excludedReason"] = reason.strip() or "오탐 또는 무관 후보" + entry["excludedAt"] = _now_label() + entry["excludedBy"] = "rights.ops" + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Watchlist candidate excluded", + str(entry.get("name", entry_id)), + entry["excludedReason"], + ) + return self.bootstrap() + + def update_knowledge_entry(self, entry_id: str, payload: dict[str, Any]) -> dict[str, Any]: + entry = self._get("knowledge_entries", entry_id) + updates: dict[str, Any] = {} + if "aliases" in payload: + updates["aliases"] = _text_list(payload.get("aliases")) + if "keywords" in payload: + updates["keywords"] = _text_list(payload.get("keywords")) + if "memo" in payload: + updates["memo"] = str(payload.get("memo", "")).strip() + if not updates: + raise ValueError("aliases, keywords, memo 중 수정할 값이 필요합니다") + + before = {key: entry.get(key) for key in updates} + entry.update(updates) + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Knowledge entry updated", + str(entry.get("name", entry_id)), + f"{json.dumps(before, ensure_ascii=False)} -> {json.dumps(updates, ensure_ascii=False)}", + ) + return self.bootstrap() + + def deactivate_knowledge_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]: + entry = self._get("knowledge_entries", entry_id) + if entry.get("entryStatus", "confirmed") != "confirmed": + raise ValueError("확정 DB 항목만 비활성화할 수 있습니다") + if not entry.get("active", False): + raise ValueError("이미 비활성 상태입니다") + entry["active"] = False + entry["deactivatedAt"] = _now_label() + entry["deactivatedBy"] = "rights.ops" + entry["deactivatedReason"] = reason.strip() + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Knowledge entry deactivated", + str(entry.get("name", entry_id)), + reason.strip() or "운영자 비활성화", + ) + return self.bootstrap() + + def reactivate_knowledge_entry(self, entry_id: str, reason: str) -> dict[str, Any]: + if not reason.strip(): + raise ValueError("재활성에는 사유 메모가 필요합니다") + entry = self._get("knowledge_entries", entry_id) + if entry.get("entryStatus", "confirmed") != "confirmed": + raise ValueError("확정 DB 항목만 재활성화할 수 있습니다") + if entry.get("active", False): + raise ValueError("이미 활성 상태입니다") + entry["active"] = True + entry["reactivatedAt"] = _now_label() + entry["reactivatedBy"] = "rights.ops" + entry["reactivatedReason"] = reason.strip() + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Knowledge entry reactivated", + str(entry.get("name", entry_id)), + reason.strip(), + ) + return self.bootstrap() + + def _create_or_update_watchlist_entry( + self, + submission_id: str, + decision: str, + memo: str, + image_store: LocalSubmissionImageStore | None, + *, + conn: sqlite3.Connection | None = None, + ) -> None: + submission = self._get("submissions", submission_id, conn=conn) + evidence = self._evidence_by_submission().get(submission_id, []) + selected_evidence = _watchlist_source_evidence(evidence) + selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")] + sample_fingerprints = self._watchlist_fingerprints(submission_id, image_store) + + entry_id = _stable_id("kb-watchlist", submission_id) + try: + existing = self._get("knowledge_entries", entry_id, conn=conn) + except KeyError: + existing = {} + + keywords = _unique_texts( + [ + *[str(item) for item in submission.get("reasons", [])[:3]], + *[str(item.get("title", "")) for item in selected_evidence[:3]], + ] + ) + entry = { + **existing, + "id": entry_id, + "name": submission.get("derivedPreview", {}).get("entryName") or f"{submission.get('title', submission_id)} / {submission_id}", + "type": "rejected_image", + "aliases": _unique_texts([submission_id, str(submission.get("title", ""))]), + "keywords": keywords, + "memo": memo.strip() or ("보류 판정으로 자동 생성" if decision == "held" else "반려 판정으로 자동 생성"), + "provenance": "automatic", + "active": True, + "entryStatus": "watchlist", + "originDecisionStatus": decision, + "sourceDecision": f"DEC-{submission_id}", + "sourceSubmissionId": submission_id, + "sourceEvidenceIds": selected_evidence_ids, + "sampleFingerprints": sample_fingerprints or _text_list(existing.get("sampleFingerprints")), + "imageAsset": str(submission.get("asset", "")), + "imageFacts": submission.get("fileFacts", {}), + "contributionCount": int(existing.get("contributionCount", 0) or 0), + "matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")), + "lastOriginDecisionAt": _now_label(), + } + self._put("knowledge_entries", entry_id, entry, conn=conn) + + def _watchlist_fingerprints( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None, + ) -> list[str]: + if image_store is None: + return [] + try: + fingerprints = FingerprintService().fingerprints_for( + image_store.image_payload(submission_id).content + ) + except Exception: + return [] + return [fingerprints.perceptual] + + def rerun_enrichment( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None = None, + ) -> dict[str, Any]: + submission = self._get("submissions", submission_id) + score_before = int(submission.get("riskScore", 0) or 0) + evidence_before = { + str(item.get("id", "")): item + for item in self._evidence_by_submission().get(submission_id, []) + } + submission["lastAnalysis"] = _now_label() + self._put("submissions", submission_id, submission) + evidence = { + "id": f"ev-{submission_id}-rerun-{_timestamp_id()}", + "group": "internal", + "source": "fingerprint", + "title": "재분석 요청이 접수됨", + "confidence": 0, + "query": "", + "domain": "internal", + "url": "", + "retrievedAt": _now_label(), + "contributed": False, + "sourceEvidenceIds": [], + "status": "queued", + } + self._put("evidence", evidence["id"], {**evidence, "submission_id": submission_id}) + + if image_store is not None: + self._rerun_internal_analysis(submission_id, image_store) + google_evidence = self._rerun_google_image_search(submission_id, image_store) + query_source_evidence = [ + *google_evidence, + *_submission_search_hint_evidence(submission), + ] + self._auto_naver_search(submission_id, query_source_evidence, image_store) + self._auto_google_custom_search(submission_id, query_source_evidence, image_store) + + self._ensure_llm_summary(submission_id) + + self.add_audit_event("rights.ops", "Analysis run created", submission_id, "operator rerun") + self._rescore_submission(submission_id) + self._sync_submission_provider_state() + + evidence_after = { + str(item.get("id", "")): item + for item in self._evidence_for_submission(submission_id) + } + rerun_marker_prefix = f"ev-{submission_id}-rerun-" + # LLM 요약은 재분석마다 삭제 후 재생성되어 id가 항상 바뀌므로(요약의 + # source_evidence_ids에 타임스탬프 마커 id가 섞임) diff에 포함하면 + # 변경이 없어도 매번 신규+제거로 잡힌다 — diff 대상에서 제외한다. + added_ids = [ + evidence_id + for evidence_id in evidence_after + if evidence_id not in evidence_before + and not evidence_id.startswith(rerun_marker_prefix) + and str(evidence_after[evidence_id].get("source", "")) != "llm" + ] + removed_items = [ + evidence_before[evidence_id] + for evidence_id in evidence_before + if evidence_id not in evidence_after + and str(evidence_before[evidence_id].get("source", "")) != "llm" + ] + refreshed = self._get("submissions", submission_id) + refreshed["lastRerunDiff"] = { + "at": _now_label(), + "scoreBefore": score_before, + "scoreAfter": int(refreshed.get("riskScore", 0) or 0), + "addedEvidenceIds": added_ids, + "removedEvidenceIds": [str(item.get("id", "")) for item in removed_items], + "removedSummaries": [ + {"source": str(item.get("source", "")), "reason": str(item.get("title", ""))} + for item in removed_items + ], + } + self._put("submissions", submission_id, refreshed) + return self.review(submission_id) + + def run_auto_search( + self, + submission_id: str, + image_store: LocalSubmissionImageStore | None = None, + ) -> dict[str, Any]: + submission = self._get("submissions", submission_id) + submission["lastAnalysis"] = _now_label() + self._put("submissions", submission_id, submission) + + existing_evidence = self._evidence_by_submission().get(submission_id, []) + query_source_evidence = [ + _domain_evidence_from_ui(item) + for item in existing_evidence + if item.get("source") in {"google", "naver", "face", "fingerprint", "llm", "failure"} + ] + query_source_evidence.extend(_submission_search_hint_evidence(submission)) + + self._auto_naver_search(submission_id, query_source_evidence, image_store) + self._auto_google_custom_search(submission_id, query_source_evidence, image_store) + + self._ensure_llm_summary(submission_id) + self._rescore_submission(submission_id) + self._sync_submission_provider_state() + self.add_audit_event( + "rights.ops", + "Provider called", + f"auto-search / {submission_id}", + "operator request for auto text search", + ) + return self.review(submission_id) + + def manual_search( + self, + submission_id: str, + provider: str, + query: str, + image_store: LocalSubmissionImageStore | None = None, + ) -> dict[str, Any]: + submission = self._get("submissions", submission_id) + provider_payload = self._get("providers", provider) + if not provider_payload["enabled"]: + raise ValueError(f"{provider} provider disabled") + + domain_evidence, provider_call_count = self._manual_search_evidence( + submission_id, + provider, + query, + image_store, + ) + for item in domain_evidence: + evidence = _evidence_payload(submission_id, item) + evidence["status"] = "manual" + self._put("evidence", evidence["id"], evidence) + + submission.setdefault("queryHistory", []).insert( + 0, + { + "provider": provider, + "query": query, + "status": "manual", + "timestamp": _now_label(), + "count": len(domain_evidence), + }, + ) + self._put("submissions", submission_id, submission) + if any( + item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} + for item in domain_evidence + ): + provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} search failed" + else: + provider_payload["lastSuccess"] = _now_label() + provider_payload["lastFailure"] = "없음" + self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) + self._ensure_llm_summary(submission_id) + if image_store is not None: + self._rescore_submission(submission_id) + self._sync_submission_provider_state() + self.add_audit_event("rights.ops", "Provider called", f"{provider} / {submission_id}", f"manual text query: {query}") + return self.review(submission_id) + + def _ensure_llm_summaries_for_existing_source_evidence(self, queue_id: str | None = None) -> None: + for submission in self._all("submissions", queue_id=queue_id): + self._ensure_llm_summary(str(submission["id"]), only_if_missing=True) + + def _ensure_llm_summary(self, submission_id: str, *, only_if_missing: bool = False) -> bool: + if self.provider_runtime.llm_assistant is None: + return False + + llm_provider = self._get("providers", "llm") + if not llm_provider.get("enabled"): + return False + + evidence_payloads = self._evidence_by_submission().get(submission_id, []) + if only_if_missing and any( + _evidence_matches_provider(item, "llm") and _provider_item_has_result(item) + for item in evidence_payloads + ): + return False + + source_evidence = [ + _domain_evidence_from_ui(item) + for item in evidence_payloads + if item.get("source") in {"fingerprint", "face", "google", "naver"} + ] + if not source_evidence: + return False + + llm_evidence = self.provider_runtime.llm_assistant.summarize( + submission_id, + source_evidence, + ) + self._delete_llm_summary_evidence(submission_id) + self._put( + "evidence", + _evidence_id(submission_id, llm_evidence), + _evidence_payload(submission_id, llm_evidence), + ) + if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE: + llm_provider["lastFailure"] = llm_evidence.reason + else: + llm_provider["lastSuccess"] = _now_label() + llm_provider["lastFailure"] = "없음" + self._apply_provider_usage_delta("llm", 1, llm_provider) + return True + + def _delete_llm_summary_evidence(self, submission_id: str) -> None: + with self._connect() as conn: + conn.execute( + """ + delete from evidence + where submission_id = ? + and ( + source = 'llm' + or ( + source = 'failure' + and json_extract(payload, '$.title') like 'LLM assistance failed%' + ) + ) + """, + (submission_id,), + ) + + def _manual_search_evidence( + self, + submission_id: str, + provider: str, + query: str, + image_store: LocalSubmissionImageStore | None, + ) -> tuple[list[Evidence], int]: + if provider == "naver": + if self.provider_runtime.naver_adapter is None: + raise ValueError(f"{provider} provider not connected") + promoter = SearchResultPromoter() + can_compare_search_images = self._can_compare_search_result_images( + submission_id, + image_store, + ) + domain_evidence = self.provider_runtime.naver_adapter.search( + submission_id, + query, + self.provider_runtime.search_policy, + ) + domain_evidence = promoter.promote(domain_evidence) + call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 + similarity_evidence: list[Evidence] = [] + if can_compare_search_images and image_store is not None: + similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + domain_evidence, + image_store, + status="manual", + ) + page_similarity_evidence: list[Evidence] = [] + if can_compare_search_images and image_store is not None and not similarity_evidence: + page_evidence = self.provider_runtime.naver_adapter.search_pages( + submission_id, + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): + call_count += 1 + page_evidence = promoter.promote(page_evidence) + domain_evidence.extend(page_evidence) + page_similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + page_evidence, + image_store, + status="manual", + ) + if ( + can_compare_search_images + and image_store is not None + and not similarity_evidence + and not page_similarity_evidence + ): + web_evidence = self.provider_runtime.naver_adapter.search_web_pages( + submission_id, + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + call_count += 1 + web_evidence = promoter.promote(web_evidence) + domain_evidence.extend(web_evidence) + self._sync_search_result_image_similarity( + submission_id, + web_evidence, + image_store, + status="manual", + ) + return domain_evidence, call_count + + if provider == "google_search": + if self.provider_runtime.google_custom_search_adapter is None: + raise ValueError(f"{provider} provider not connected") + can_compare_search_images = self._can_compare_search_result_images( + submission_id, + image_store, + ) + domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images( + submission_id, + query, + self.provider_runtime.google_custom_search_policy, + ) + call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 + similarity_evidence: list[Evidence] = [] + if can_compare_search_images and image_store is not None: + similarity_evidence = self._sync_search_result_image_similarity( + submission_id, + domain_evidence, + image_store, + status="manual", + ) + if can_compare_search_images and image_store is not None and not similarity_evidence: + web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( + submission_id, + query, + self.provider_runtime.google_custom_search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + call_count += 1 + domain_evidence.extend(web_evidence) + self._sync_search_result_image_similarity( + submission_id, + web_evidence, + image_store, + status="manual", + ) + return domain_evidence, call_count + + raise ValueError(f"{provider} provider not connected") + + def register_manual_knowledge_entry(self, payload: dict[str, Any]) -> dict[str, Any]: + name = str(payload.get("name", "")).strip() + if not name: + raise ValueError("knowledge name required") + + entry_id = f"kb-manual-{_timestamp_id()}" + image_record = self._store_manual_knowledge_image(entry_id, payload.get("image")) + sample_fingerprints = [image_record["perceptualFingerprint"]] if image_record else [] + entry = { + "id": entry_id, + "name": name, + "type": _knowledge_type_value(str(payload.get("type", "other"))), + "aliases": _text_list(payload.get("aliases")), + "keywords": _text_list(payload.get("keywords")), + "memo": str(payload.get("memo", "")).strip(), + "provenance": "manual", + "active": True, + "entryStatus": "confirmed", + "sourceDecision": "", + "sampleFingerprints": sample_fingerprints, + "imageAsset": image_record["asset"] if image_record else "", + "imageFacts": image_record["facts"] if image_record else {}, + } + self._put("knowledge_entries", entry_id, entry) + self.add_audit_event( + "rights.ops", + "Knowledge entry manually created", + name, + "manual image reference" if image_record else "manual text reference", + ) + return entry + + def collect_keyword_candidates(self, query: str, provider: str = "naver") -> dict[str, Any]: + query = query.strip() + if not query: + raise ValueError("collection query required") + if provider not in {"naver", "google_search"}: + raise ValueError(f"{provider} provider not supported for candidate collection") + + self._sync_provider_payloads() + provider_payload = self._get("providers", provider) + if not provider_payload["enabled"]: + raise ValueError(f"{provider} provider disabled") + if provider == "naver" and self.provider_runtime.naver_adapter is None: + raise ValueError("naver provider not connected") + if provider == "google_search" and self.provider_runtime.google_custom_search_adapter is None: + raise ValueError("google_search provider not connected") + + self._clear_collection_candidates() + if provider == "google_search": + domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images( + "candidate-collection", + query, + self.provider_runtime.google_custom_search_policy, + ) + else: + domain_evidence = self.provider_runtime.naver_adapter.search( + "candidate-collection", + query, + self.provider_runtime.search_policy, + ) + collected = 0 + provider_call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0 + for candidate in self._collection_candidates_from_evidence(query, domain_evidence, provider): + self._put("collection_candidates", candidate["id"], candidate) + collected += 1 + + if provider == "google_search" and collected == 0: + web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages( + "candidate-collection", + query, + self.provider_runtime.google_custom_search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + provider_call_count += 1 + domain_evidence.extend(web_evidence) + for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider): + self._put("collection_candidates", candidate["id"], candidate) + collected += 1 + if provider == "naver" and collected == 0: + page_evidence = self.provider_runtime.naver_adapter.search_pages( + "candidate-collection", + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence): + provider_call_count += 1 + domain_evidence.extend(page_evidence) + for candidate in self._collection_candidates_from_evidence(query, page_evidence, provider): + self._put("collection_candidates", candidate["id"], candidate) + collected += 1 + if provider == "naver" and collected == 0: + web_evidence = self.provider_runtime.naver_adapter.search_web_pages( + "candidate-collection", + query, + self.provider_runtime.search_policy, + ) + if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence): + provider_call_count += 1 + domain_evidence.extend(web_evidence) + for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider): + self._put("collection_candidates", candidate["id"], candidate) + collected += 1 + + if any( + item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE} + for item in domain_evidence + ): + provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} collection failed" + else: + provider_payload["lastSuccess"] = _now_label() + provider_payload["lastFailure"] = "없음" + self._apply_provider_usage_delta(provider, provider_call_count, provider_payload) + self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates") + payload = self.bootstrap() + payload["collected"] = collected + return payload + + def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]: + with self._write_lock: + candidate = self._get("collection_candidates", candidate_id) + # Idempotent: a double-click / retry must not create a second + # confirmed knowledge entry for the same candidate. + if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"): + return self.bootstrap() + # Deterministic id so a racing retry upserts the same row instead of + # minting a new timestamped id. + entry_id = _stable_id("kb-candidate", candidate_id) + query = str(candidate.get("query", "")) + name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query + memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}" + entry = { + "id": entry_id, + "name": name, + "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), + "aliases": _text_list(payload.get("aliases")) or [query], + "keywords": _text_list(payload.get("keywords")) or [query], + "memo": memo, + "provenance": "manual", + "active": True, + "entryStatus": "confirmed", + "sourceDecision": "", + "sourceCandidate": candidate_id, + "sampleFingerprints": _text_list(candidate.get("sampleFingerprints")), + "imageAsset": str(candidate.get("imageAsset", "")), + "imageFacts": candidate.get("imageFacts", {}), + } + with self._transaction() as conn: + self._put("knowledge_entries", entry_id, entry, conn=conn) + candidate["status"] = "promoted" + candidate["promotedKnowledgeId"] = entry_id + self._put("collection_candidates", candidate_id, candidate, conn=conn) + self.add_audit_event( + "rights.ops", "Knowledge entry manually created", name, + f"promoted candidate {candidate_id}", conn=conn, + ) + return self.bootstrap() + + def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]: + candidate_ids = _unique_texts(_text_list(payload.get("candidate_ids", payload.get("candidateIds")))) + if not candidate_ids: + raise ValueError("candidate_ids required") + + candidates = [self._get("collection_candidates", candidate_id) for candidate_id in candidate_ids] + sample_fingerprints = _unique_texts( + fingerprint + for candidate in candidates + for fingerprint in _text_list(candidate.get("sampleFingerprints")) + ) + if not sample_fingerprints: + raise ValueError("selected candidates have no sample fingerprints") + + entry_id = f"kb-candidate-{_timestamp_id()}" + queries = _unique_texts(str(candidate.get("query", "")) for candidate in candidates) + image_assets = _unique_texts(str(candidate.get("imageAsset", "")) for candidate in candidates) + image_facts = [ + candidate.get("imageFacts", {}) + for candidate in candidates + if isinstance(candidate.get("imageFacts", {}), dict) and candidate.get("imageFacts", {}) + ] + fallback_name = next( + ( + str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip() + for candidate in candidates + if str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip() + ), + "Collected reference", + ) + name = str(payload.get("name", "")).strip() or fallback_name + memo = str(payload.get("memo", "")).strip() or f"Promoted from collected candidates: {', '.join(queries)}" + entry = { + "id": entry_id, + "name": name, + "type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))), + "aliases": _text_list(payload.get("aliases")) or queries, + "keywords": _text_list(payload.get("keywords")) or queries, + "memo": memo, + "provenance": "manual", + "active": True, + "entryStatus": "confirmed", + "sourceDecision": "", + "sourceCandidate": candidate_ids[0] if len(candidate_ids) == 1 else "", + "sourceCandidates": candidate_ids, + "sampleFingerprints": sample_fingerprints, + "imageAsset": image_assets[0] if image_assets else "", + "imageAssets": image_assets, + "imageFacts": { + "samples": len(candidates), + "queries": queries, + "items": image_facts, + }, + } + self._put("knowledge_entries", entry_id, entry) + for candidate in candidates: + candidate["status"] = "promoted" + candidate["promotedKnowledgeId"] = entry_id + self._put("collection_candidates", str(candidate["id"]), candidate) + self.add_audit_event( + "rights.ops", + "Knowledge entry manually created", + name, + f"promoted {len(candidates)} collected candidates", + ) + return self.bootstrap() + diff --git a/tests/rights_filter/server/test_sqlite_store.py b/tests/rights_filter/server/test_sqlite_store.py index e7c41fb..a733e0d 100644 --- a/tests/rights_filter/server/test_sqlite_store.py +++ b/tests/rights_filter/server/test_sqlite_store.py @@ -14,6 +14,7 @@ from rights_filter.server.image_store import LocalSubmissionImageStore from rights_filter.server import sqlite_store as sqlite_store_module from rights_filter.server import store_enrichment as store_enrichment_module from rights_filter.server import store_remote_fetch as remote_fetch_module +from rights_filter.server import store_serialization as store_serialization_module from rights_filter.server.sqlite_store import CopyrighterStore from rights_filter.integrations.env_clients import build_provider_runtime @@ -8040,7 +8041,7 @@ def test_record_decision_rolls_back_when_audit_fails(tmp_path: Path, monkeypatch # The status change must roll back atomically with the failed audit write. assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed" with pytest.raises(KeyError): - store._get("knowledge_entries", sqlite_store_module._stable_id("kb-watchlist", "SUB-1")) + store._get("knowledge_entries", store_serialization_module._stable_id("kb-watchlist", "SUB-1")) @pytest.mark.parametrize(