Compare commits

..

No commits in common. "71a6da10a35d4ce7a844926b1fd42fc9d59f6fa9" and "3be7b016ce9db47b52ce58bd0c74a2074aa2992f" have entirely different histories.

19 changed files with 2791 additions and 3258 deletions

1
.gitignore vendored
View file

@ -22,7 +22,6 @@ node_modules/
# Runtime data, databases, logs, generated artifacts
data/
outputs/
artifacts/
*.sqlite3
*.sqlite3-journal
*.log

View file

@ -1,78 +0,0 @@
# POSA Copyrighter
이미지 저작권 위험을 자동 분석하고, 운영자가 콘솔에서 근거를 확인해 **승인 / 보류 / 반려**를 판정하는 폐쇄망(air-gapped) 운영 도구입니다.
- **분석 서버**: 제출 이미지를 분석해 위험도·근거를 산출하고 SQLite에 저장
- **운영자 콘솔**: 심사 큐, 근거 검토, 판정, 지식 DB, 감사 로그를 제공하는 단일 페이지 웹 GUI
- **외부 근거 검색**: Naver 검색, Google Vision/Custom Search, 로컬 LLM(Ollama) 연동(선택)
## 요구 사항
- Python 3.13
- 런타임 서드파티 의존성은 이미지 처리 라이브러리뿐입니다(지연 로딩): `numpy`, `opencv-python-headless`, `pillow`
> **폐쇄망(air-gapped) 원칙**: 모든 의존성·자산은 빌드/스테이징 머신에서 미리 받아 **오프라인 휠**로 설치합니다. 대상 서버는 설치·실행 시 인터넷에 접근하지 않습니다. 자세한 절차는 [`docs/operations/offline-install.md`](docs/operations/offline-install.md)를 참고하세요.
## 설치
```bash
# 개발 머신 (온라인) — 의존성 설치
pip install -r requirements.txt # 런타임
pip install -r requirements-dev.txt # 테스트 포함
# 폐쇄망 대상 — 오프라인 휠로 설치 (예시)
pip install --no-index --find-links wheelhouse -r requirements.txt
```
`.env.example`를 복사해 `.env`를 만들고 필요한 값(인증 토큰, 검색 API 키 등)을 채웁니다. 키를 비워 두면 해당 외부 검색은 자동으로 비활성화됩니다.
```bash
cp .env.example .env
```
## 실행
```bash
python run_copyrighter_server.py
# 또는
python -m rights_filter.server --host 127.0.0.1 --port 9500
```
주요 옵션: `--host`, `--port`, `--db`(SQLite 경로), `--images`(제출 이미지 폴더), `--static`(운영자 GUI 경로), `--env`(.env 경로).
기본값으로 실행하면 `http://127.0.0.1:9500` 에서 운영자 콘솔이 열립니다.
> 보안: `COPYRIGHTER_AUTH_TOKEN`이 비어 있으면 데이터 라우트가 **인증 없이** 열립니다. 단일 호스트 개발 환경 외에는 반드시 토큰을 설정하세요.
## 프로젝트 구조
```
src/rights_filter/ 분석·서버·통합·도메인 로직 (Python)
server/ http.server 기반 API + 정적 파일 서빙, SQLite 저장소
analysis/ 이미지 분석·위험도 산출
integrations/ Naver / Google / LLM 어댑터
web/operator-gui/ 운영자 콘솔 (정적 SPA: index.html, app.js, styles.css)
tests/ pytest 테스트
operator_gui/ 운영자 GUI 정적 검증 + Playwright E2E
docs/ 기획·설계·운영 문서
```
## 테스트
```bash
# 전체
python -m pytest
# 운영자 GUI E2E (Playwright)
playwright install chromium # 빌드 머신에서 1회 — 브라우저 바이너리는 오프라인 번들에 포함
python -m pytest tests/operator_gui/
```
- 브라우저(E2E) 테스트는 Playwright/Chromium이 없으면 **자동으로 건너뜁니다**(`importorskip` + graceful skip).
- 운영자 워크벤치는 `tests/operator_gui/pages/`의 Page Object(`OperatorWorkbench`)와 `conftest.py`의 공용 픽스처를 사용합니다.
## 문서
- [프로젝트 소개 및 기술 구현](docs/project-introduction-and-technical-implementation.md)
- [운영 워크리스트](docs/operations/copyrighter-operation-worklist.md)
- [오프라인 설치 절차](docs/operations/offline-install.md)

View file

@ -530,32 +530,21 @@ All HIGH/MEDIUM/LOW findings implemented, tested, and committed on branch
- Phase 8 (frontend safeUrl/apiJson/onerror) → `f8aa10f`; protocol-relative
URL fix after security review → `7317bfb`.
### Phase 9 (god-file split) — complete
Behavior-preserving, each step gated on the full suite (400 passed). The
5119-line `sqlite_store.py` god file is now **724 lines** (the `CopyrighterStore`
facade: `__init__`, `initialize`, seed, bootstrap, `_search_coverage`, review,
decision, providers, media, audit) composed from focused modules + mixins:
### Phase 9 (god-file split) — partial
Done as behavior-preserving module extractions (suite-gated), `sqlite_store.py`
5333 → ~4955 lines:
- `store_url_utils.py` (URL helpers) → `e66f9d5`
- `store_remote_fetch.py` (fetch + SSRF + opener) and `store_schema.py`
(DDL/typed-columns/constraint migration) → `da91775`
Helper modules (stateless): `store_url_utils` (52), `store_text` (27),
`store_constants` (113), `store_remote_fetch` (125, fetch + SSRF opener),
`store_schema` (257, DDL/migration), `store_serialization` (597, payload/domain
helpers), `store_page_scrape` (976, HTML/CSS/JSON image-URL extraction).
Class mixins (composed into `CopyrighterStore` via inheritance):
`StorePersistenceMixin` (313, connection/transaction/_put/_get),
`StoreQueueMixin` (170), `StoreSearchCandidatesMixin` (743, similarity +
candidate storage + rescore), `StoreEnrichmentMixin` (789, provider sync + face
+ rerun + auto-search), `StoreOperationsMixin` (761, knowledge lifecycle +
manual/rerun search + collection).
All modules are <800 lines except `store_page_scrape` (976) a single cohesive
HTML/CSS/JSON/srcset image-URL parser whose helpers are tightly coupled
(parser ↔ css ↔ json ↔ srcset predicates); splitting it further would fragment
one responsibility, so it is intentionally left whole. Tests that monkeypatch
`HeuristicFacePersonDetector` were updated to also patch `store_enrichment`
(where face detection now runs). Cross-file URL-helper dedup (Task 28) is
intentionally NOT done — the integration adapters' suffix policy diverges from
the store's (`.svg`).
Remaining (in progress / follow-up): the `CopyrighterStore` class methods are
the bulk and need class-level decomposition (mixin modules), which first
requires extracting the shared module-level helpers (serialization/text/id +
the 670-line `_PageImageParser` + css/page extraction, currently coupled to
`_normalized_image_url`/`_unique_texts`) into shared modules to avoid circular
imports. NOT a pure no-op like the prior extractions; gate each step on the
full suite. Cross-file URL-helper dedup (Task 28) is intentionally NOT done —
the integration adapters' suffix policy diverges from the store's (`.svg`).
## Deferred (explicitly out of this plan, log as follow-ups)
- Full `GovernancePolicyRegistry` role enforcement on every read/serve path (Task 2 + Task 9 cover the high-impact subset).

View file

@ -3,8 +3,3 @@
pytest==8.4.2
pytest-cov==6.0.0
pytest-timeout==2.4.0
# Operator-GUI browser (E2E) tests. The Chromium binary is NOT pip-installed:
# on the build/staging host run `playwright install chromium`, then bundle the
# browser cache (PLAYWRIGHT_BROWSERS_PATH) into the offline artifact for the
# air-gapped target. Browser tests skip cleanly when Chromium is absent.
playwright==1.49.1

File diff suppressed because it is too large Load diff

View file

@ -1,791 +0,0 @@
"""Enrichment internals for CopyrighterStore, as a mixin.
Provider-state sync, local face evidence/crop sync + retention, internal/Google
re-analysis, face-crop web detection, and the auto Google/Naver search drivers.
Mixed into CopyrighterStore; relies on persistence methods, self.* attributes
(provider_runtime, image dirs, fetchers, _write_lock), and self.add_audit_event /
rescoring provided by the host class. Behavior unchanged.
"""
from __future__ import annotations
import json
from dataclasses import replace
from datetime import datetime
from typing import Any
from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.internal_analyzer import InternalAnalyzer
from rights_filter.analysis.preprocessing import build_external_derivative, build_face_crop_derivatives
from rights_filter.analysis.search_query_generation import SearchQueryGenerator
from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_serialization import (
_evidence_id,
_evidence_payload,
_existing_google_custom_query_signatures,
_existing_naver_query_signatures,
_external_provider_ids,
_external_provider_state_for_submission,
_face_crop_web_evidence,
_google_custom_image_query_signature,
_google_custom_web_query_signature,
_google_weak_label_title,
_is_google_weak_label_payload,
_naver_blog_query_signature,
_naver_query_signature,
_naver_web_query_signature,
_now_label,
_query_history_status,
)
class StoreEnrichmentMixin:
def _sync_provider_payloads(self) -> None:
existing = {provider["id"]: provider for provider in self._all("providers")}
for provider_id, payload in self.provider_runtime.provider_payloads.items():
merged = {
**payload,
"usage": existing.get(provider_id, {}).get("usage", payload["usage"]),
}
self._put("providers", provider_id, merged)
self._sync_submission_provider_state()
def _sync_submission_provider_state(self, queue_id: str | None = None) -> None:
provider_payloads = {provider["id"]: provider for provider in self._all("providers")}
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
for submission in self._all("submissions", queue_id=queue_id):
provider_state = submission.get("providerState", {})
evidence = evidence_by_submission.get(str(submission["id"]), [])
provider_state["internal"] = "ok"
for provider_id in _external_provider_ids(provider_payloads):
provider_state[provider_id] = _external_provider_state_for_submission(
provider_payloads,
provider_id,
submission,
evidence,
)
submission["providerState"] = provider_state
self._put("submissions", submission["id"], submission)
def _normalize_saved_google_weak_labels_and_rescore(self, queue_id: str | None = None) -> None:
changed_submissions: set[str] = set()
with self._connect() as conn:
rows = conn.execute(
"""
select id, submission_id, payload
from evidence
""".strip() + (
" where submission_id in (select id from submissions where queue_id = ?)"
if queue_id is not None
else ""
),
(queue_id,) if queue_id is not None else (),
).fetchall()
for row in rows:
payload = json.loads(row["payload"])
if _is_google_weak_label_payload(payload):
payload["title"] = _google_weak_label_title(str(payload.get("title", "")))
payload["confidence"] = 0.0
payload["contributed"] = False
payload["status"] = "weak"
self._put("evidence", row["id"], payload)
changed_submissions.add(str(row["submission_id"]))
for submission_id in changed_submissions:
self._rescore_submission(submission_id)
def _refresh_existing_local_face_evidence(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
if not existing_submission_ids:
return
evidence_by_submission = self._evidence_by_submission(queue_id=queue_id)
detector = HeuristicFacePersonDetector()
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
if any(
item.get("source") == "face"
for item in evidence_by_submission.get(submission_id, [])
):
continue
signal = detector.detect(image_store.image_payload(submission_id))
if not signal.present:
continue
evidence = Evidence(
source=EvidenceSource.FACE_PERSON,
reason="Face/person detected",
confidence=0.8,
data={
"face_count": signal.face_count,
"person_count": signal.person_count,
},
)
self._put(
"evidence",
_evidence_id(submission_id, evidence),
_evidence_payload(submission_id, evidence),
)
self._rescore_submission(submission_id)
def _sync_face_crops(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> None:
if image_store is None:
return
try:
original = image_store.image_payload(submission_id)
except Exception:
return
signal = HeuristicFacePersonDetector().detect(original)
crop_dir = self.face_crop_image_dir / submission_id
if crop_dir.exists():
for stale in crop_dir.glob("crop-*.jpg"):
try:
stale.unlink()
except OSError:
continue
face_crops: list[dict[str, Any]] = []
for index, box in enumerate(signal.face_boxes[:3], start=1):
crops = build_face_crop_derivatives(original, [box])
if not crops:
continue
try:
crop_dir.mkdir(parents=True, exist_ok=True)
crop_path = crop_dir / f"crop-{index}.jpg"
crop_path.write_bytes(crops[0].content)
except OSError:
continue
face_crops.append(
{
"index": index,
"url": f"{self.face_crop_public_prefix}/{submission_id}/crop-{index}.jpg",
"box": [int(value) for value in box],
}
)
submission = self._get("submissions", submission_id)
submission["faceCrops"] = face_crops
self._put("submissions", submission_id, submission)
def purge_expired_face_crops(self, *, now_epoch: float | None = None) -> int:
# Delete biometric face-crop files older than the retention window and
# drop their references from the owning submission, with an audit trail.
retention_days = int(self.face_crop_retention_days)
if retention_days <= 0 or not self.face_crop_image_dir.exists():
return 0
now = now_epoch if now_epoch is not None else datetime.now().timestamp()
cutoff = now - retention_days * 86400
purged = 0
with self._write_lock:
for crop_dir in sorted(p for p in self.face_crop_image_dir.iterdir() if p.is_dir()):
submission_id = crop_dir.name
removed: list[str] = []
for crop_file in crop_dir.glob("crop-*.jpg"):
try:
if crop_file.stat().st_mtime >= cutoff:
continue
crop_file.unlink()
except OSError:
continue
removed.append(crop_file.name)
purged += 1
if not removed:
continue
change = f"retention {retention_days}d: removed {len(removed)} biometric crop(s)"
try:
submission = self._get("submissions", submission_id)
except KeyError:
self.add_audit_event("system", "Face crop purged", submission_id, change)
continue
remaining = [
crop
for crop in submission.get("faceCrops", [])
if f"crop-{crop.get('index')}.jpg" not in removed
]
with self._transaction() as conn:
submission["faceCrops"] = remaining
self._put("submissions", submission_id, submission, conn=conn)
self.add_audit_event("system", "Face crop purged", submission_id, change, conn=conn)
return purged
def _refresh_existing_submission_file_facts(
self,
image_store: LocalSubmissionImageStore,
*,
queue_id: str,
) -> None:
existing_submission_ids = {submission["id"] for submission in self._all("submissions", queue_id=queue_id)}
for record in image_store.submission_records():
submission_id = str(record["id"])
if submission_id not in existing_submission_ids:
continue
submission = self._get("submissions", submission_id)
submission["asset"] = record["asset"]
submission["fileFacts"] = {
**submission.get("fileFacts", {}),
"size": f"{record.get('width', 1)} x {record.get('height', 1)}",
"format": record.get("format", "FILE"),
}
self._put("submissions", submission_id, submission)
def _rerun_internal_analysis(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
repository = self._knowledge_repository()
analyzer = InternalAnalyzer(
repository,
FingerprintService(),
HeuristicFacePersonDetector(),
)
domain_evidence = analyzer.analyze(
submission_id,
image_store.image_payload(submission_id),
)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun"
self._put("evidence", payload["id"], payload)
self._sync_similar_reference_images(submission_id, domain_evidence)
self._increment_knowledge_contribution_counts(submission_id, domain_evidence)
self._rescore_submission(submission_id)
self._sync_face_crops(submission_id, image_store)
return domain_evidence
def _rerun_google_image_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> list[Evidence]:
if self.provider_runtime.google_adapter is None:
return []
original_image = image_store.image_payload(submission_id)
derivative = build_external_derivative(original_image)
domain_evidence = self.provider_runtime.google_adapter.detect(
submission_id,
derivative,
self.provider_runtime.external_policy,
)
call_count = 1
face_crop_evidence, face_crop_calls = self._google_face_crop_web_detection(
submission_id,
original_image,
)
call_count += face_crop_calls
domain_evidence.extend(face_crop_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "rerun" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="rerun",
)
google_provider = self._get("providers", "google")
if any(
item.source in {EvidenceSource.FAILURE, EvidenceSource.EXTERNAL_SKIPPED}
for item in domain_evidence
):
google_provider["lastFailure"] = domain_evidence[0].reason if domain_evidence else "Google image search failed"
else:
google_provider["lastSuccess"] = _now_label()
google_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google", call_count, google_provider)
self.add_audit_event(
"rights.ops",
"Provider called",
f"google / {submission_id}",
"image web detection rerun",
)
self._rescore_submission(submission_id)
return domain_evidence
def _google_face_crop_web_detection(
self,
submission_id: str,
original_image: Any,
) -> tuple[list[Evidence], int]:
if self.provider_runtime.google_adapter is None:
return [], 0
if not self.provider_runtime.face_crop_web_detection_enabled:
return [], 0
submission = self._get("submissions", submission_id)
stored_crops = submission.get("faceCrops")
if stored_crops is None:
signal = HeuristicFacePersonDetector().detect(original_image)
indexed_boxes = [
(index, box)
for index, box in enumerate(signal.face_boxes[:3], start=1)
]
else:
# 저장된 faceCrops의 index를 그대로 사용해 증거의 crop_index가
# 워크벤치 썸네일 번호와 항상 일치하게 한다(중간 박스 크롭 실패 시
# 재열거하면 번호가 어긋난다).
indexed_boxes = [
(int(item.get("index", 0)), tuple(int(value) for value in item.get("box", [])))
for item in stored_crops
if len(item.get("box", [])) == 4
]
if not indexed_boxes:
return [], 0
evidence: list[Evidence] = []
call_count = 0
for crop_index, box in indexed_boxes:
crops = build_face_crop_derivatives(original_image, [box])
if not crops:
continue
crop = crops[0]
call_count += 1
crop_evidence = self.provider_runtime.google_adapter.detect(
f"{submission_id}:face-crop-{crop_index}",
crop,
self.provider_runtime.external_policy,
)
for item in crop_evidence:
face_web_evidence = _face_crop_web_evidence(submission_id, crop_index, item)
evidence.append(face_web_evidence)
crop_matches = self._face_crop_search_result_similarity_evidence(
submission_id,
crop_index,
crop,
face_web_evidence,
)
if crop_matches:
evidence.extend(crop_matches)
if call_count:
# Governance: personal (biometric) images left the system to a
# third-party API. The external call itself is permitted, but the
# transmission must be auditable.
self.add_audit_event(
"system",
"Personal image sent to external provider",
submission_id,
f"{call_count} face crop(s) -> Google Vision web detection",
)
return evidence, call_count
def _auto_google_custom_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.google_custom_search_adapter is None:
return
if self.provider_runtime.auto_google_custom_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_google_custom_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_google_custom_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
for planned in query_plan:
query = planned.query
image_signature = _google_custom_image_query_signature(query)
if image_signature in existing_signatures:
continue
existing_signatures.add(image_signature)
image_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in image_evidence):
external_call_count += 1
image_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in image_evidence
]
all_domain_evidence.extend(image_evidence)
for item in image_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
image_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(image_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(image_evidence),
}
)
web_signature = _google_custom_web_query_signature(query)
if web_signature in existing_signatures:
continue
existing_signatures.add(web_signature)
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=self._search_result_similarity_remaining_budget(
submission_id,
image_store,
),
)
)
history_entries.append(
{
"provider": "google_search",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
google_search_provider = self._get("providers", "google_search")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
google_search_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Google custom search failed"
else:
google_search_provider["lastSuccess"] = _now_label()
google_search_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("google_search", external_call_count, google_search_provider)
self.add_audit_event(
"system",
"Provider called",
f"google_search / {submission_id}",
f"auto custom search query batch: {', '.join(item['query'] for item in history_entries)}",
)
def _auto_naver_search(
self,
submission_id: str,
source_evidence: list[Evidence],
image_store: LocalSubmissionImageStore | None = None,
) -> None:
if self.provider_runtime.naver_adapter is None:
return
if self.provider_runtime.auto_naver_query_limit <= 0:
return
query_plan = SearchQueryGenerator().plan(
source_evidence,
[],
max_queries=self.provider_runtime.auto_naver_query_limit,
)
if not query_plan:
return
existing_signatures = _existing_naver_query_signatures(
self._evidence_by_submission().get(submission_id, [])
)
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
all_domain_evidence: list[Evidence] = []
history_entries: list[dict[str, Any]] = []
external_call_count = 0
blog_query_count = 0
web_query_count = 0
promoter = SearchResultPromoter()
for planned in query_plan:
query = planned.query
signature = _naver_query_signature(query)
if signature in existing_signatures:
continue
existing_signatures.add(signature)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence):
external_call_count += 1
domain_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in domain_evidence
]
domain_evidence = promoter.promote(domain_evidence)
all_domain_evidence.extend(domain_evidence)
for item in domain_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images and remaining_matches > 0:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="auto",
max_matches=remaining_matches,
)
all_domain_evidence.extend(similarity_evidence)
history_entries.append(
{
"provider": "naver",
"query": query,
"status": _query_history_status(domain_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(domain_evidence),
}
)
page_similarity_evidence: list[Evidence] = []
remaining_matches = self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
blog_signature = _naver_blog_query_signature(query)
if (
blog_query_count < self.provider_runtime.auto_naver_blog_query_limit
and blog_signature not in existing_signatures
):
existing_signatures.add(blog_signature)
blog_query_count += 1
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
external_call_count += 1
page_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in page_evidence
]
page_evidence = promoter.promote(page_evidence)
all_domain_evidence.extend(page_evidence)
for item in page_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="auto",
max_matches=remaining_matches if can_compare_search_images else 0,
)
all_domain_evidence.extend(page_similarity_evidence)
history_entries.append(
{
"provider": "naver_blog",
"query": query,
"status": _query_history_status(page_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(page_evidence),
}
)
web_signature = _naver_web_query_signature(query)
if (
web_query_count < self.provider_runtime.auto_naver_web_query_limit
and web_signature not in existing_signatures
):
existing_signatures.add(web_signature)
web_query_count += 1
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
external_call_count += 1
web_evidence = [
replace(
item,
data={
**item.data,
"query_strategy": planned.strategy,
"query_source": planned.source,
"query_priority": planned.priority,
},
)
for item in web_evidence
]
web_evidence = promoter.promote(web_evidence)
all_domain_evidence.extend(web_evidence)
for item in web_evidence:
payload = _evidence_payload(submission_id, item)
payload["status"] = "auto" if payload["status"] == "active" else payload["status"]
self._put("evidence", payload["id"], payload)
all_domain_evidence.extend(
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="auto",
max_matches=(
self._search_result_similarity_remaining_budget(
submission_id,
image_store,
)
if can_compare_search_images
else 0
),
)
)
history_entries.append(
{
"provider": "naver_web",
"query": query,
"status": _query_history_status(web_evidence),
"strategy": planned.strategy,
"source": planned.source,
"timestamp": _now_label(),
"count": len(web_evidence),
}
)
if not history_entries:
return
submission = self._get("submissions", submission_id)
submission["queryHistory"] = [*history_entries, *submission.get("queryHistory", [])]
self._put("submissions", submission_id, submission)
naver_provider = self._get("providers", "naver")
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in all_domain_evidence
):
naver_provider["lastFailure"] = all_domain_evidence[0].reason if all_domain_evidence else "Naver search failed"
else:
naver_provider["lastSuccess"] = _now_label()
naver_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("naver", external_call_count, naver_provider)
self.add_audit_event(
"system",
"Provider called",
f"naver / {submission_id}",
f"auto text query batch: {', '.join(item['query'] for item in history_entries)}",
)

View file

@ -1,760 +0,0 @@
"""Operator-facing operations for CopyrighterStore, as a mixin.
Knowledge-entry lifecycle (watchlist promote/exclude, update/deactivate/
reactivate), rerun/auto/manual search drivers, LLM summary management, manual
knowledge registration, and keyword-candidate collection/promotion. Mixed into
CopyrighterStore; relies on persistence + enrichment + search-candidate methods
and self.* attributes provided by the host class. Behavior unchanged.
"""
from __future__ import annotations
import json
import sqlite3
from typing import Any
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_serialization import (
_domain_evidence_from_ui,
_evidence_id,
_evidence_matches_provider,
_evidence_payload,
_knowledge_type_value,
_now_label,
_provider_item_has_result,
_stable_id,
_submission_search_hint_evidence,
_timestamp_id,
_watchlist_source_evidence,
)
from rights_filter.server.store_text import _text_list, _unique_texts
class StoreOperationsMixin:
def promote_watchlist_entry(self, entry_id: str) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") != "watchlist":
raise ValueError("knowledge entry is not a watchlist candidate")
entry["entryStatus"] = "confirmed"
entry["active"] = True
entry["excludedReason"] = ""
entry["confirmedAt"] = _now_label()
entry["confirmedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate promoted",
str(entry.get("name", entry_id)),
"promoted into confirmed reference DB",
)
return self.bootstrap()
def exclude_watchlist_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus") not in {"watchlist", "confirmed"}:
raise ValueError("knowledge entry cannot be excluded")
entry["entryStatus"] = "excluded"
entry["active"] = False
entry["excludedReason"] = reason.strip() or "오탐 또는 무관 후보"
entry["excludedAt"] = _now_label()
entry["excludedBy"] = "rights.ops"
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Watchlist candidate excluded",
str(entry.get("name", entry_id)),
entry["excludedReason"],
)
return self.bootstrap()
def update_knowledge_entry(self, entry_id: str, payload: dict[str, Any]) -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
updates: dict[str, Any] = {}
if "aliases" in payload:
updates["aliases"] = _text_list(payload.get("aliases"))
if "keywords" in payload:
updates["keywords"] = _text_list(payload.get("keywords"))
if "memo" in payload:
updates["memo"] = str(payload.get("memo", "")).strip()
if not updates:
raise ValueError("aliases, keywords, memo 중 수정할 값이 필요합니다")
before = {key: entry.get(key) for key in updates}
entry.update(updates)
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry updated",
str(entry.get("name", entry_id)),
f"{json.dumps(before, ensure_ascii=False)} -> {json.dumps(updates, ensure_ascii=False)}",
)
return self.bootstrap()
def deactivate_knowledge_entry(self, entry_id: str, reason: str = "") -> dict[str, Any]:
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 비활성화할 수 있습니다")
if not entry.get("active", False):
raise ValueError("이미 비활성 상태입니다")
entry["active"] = False
entry["deactivatedAt"] = _now_label()
entry["deactivatedBy"] = "rights.ops"
entry["deactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry deactivated",
str(entry.get("name", entry_id)),
reason.strip() or "운영자 비활성화",
)
return self.bootstrap()
def reactivate_knowledge_entry(self, entry_id: str, reason: str) -> dict[str, Any]:
if not reason.strip():
raise ValueError("재활성에는 사유 메모가 필요합니다")
entry = self._get("knowledge_entries", entry_id)
if entry.get("entryStatus", "confirmed") != "confirmed":
raise ValueError("확정 DB 항목만 재활성화할 수 있습니다")
if entry.get("active", False):
raise ValueError("이미 활성 상태입니다")
entry["active"] = True
entry["reactivatedAt"] = _now_label()
entry["reactivatedBy"] = "rights.ops"
entry["reactivatedReason"] = reason.strip()
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry reactivated",
str(entry.get("name", entry_id)),
reason.strip(),
)
return self.bootstrap()
def _create_or_update_watchlist_entry(
self,
submission_id: str,
decision: str,
memo: str,
image_store: LocalSubmissionImageStore | None,
*,
conn: sqlite3.Connection | None = None,
) -> None:
submission = self._get("submissions", submission_id, conn=conn)
evidence = self._evidence_by_submission().get(submission_id, [])
selected_evidence = _watchlist_source_evidence(evidence)
selected_evidence_ids = [str(item.get("id", "")) for item in selected_evidence if item.get("id")]
sample_fingerprints = self._watchlist_fingerprints(submission_id, image_store)
entry_id = _stable_id("kb-watchlist", submission_id)
try:
existing = self._get("knowledge_entries", entry_id, conn=conn)
except KeyError:
existing = {}
keywords = _unique_texts(
[
*[str(item) for item in submission.get("reasons", [])[:3]],
*[str(item.get("title", "")) for item in selected_evidence[:3]],
]
)
entry = {
**existing,
"id": entry_id,
"name": submission.get("derivedPreview", {}).get("entryName") or f"{submission.get('title', submission_id)} / {submission_id}",
"type": "rejected_image",
"aliases": _unique_texts([submission_id, str(submission.get("title", ""))]),
"keywords": keywords,
"memo": memo.strip() or ("보류 판정으로 자동 생성" if decision == "held" else "반려 판정으로 자동 생성"),
"provenance": "automatic",
"active": True,
"entryStatus": "watchlist",
"originDecisionStatus": decision,
"sourceDecision": f"DEC-{submission_id}",
"sourceSubmissionId": submission_id,
"sourceEvidenceIds": selected_evidence_ids,
"sampleFingerprints": sample_fingerprints or _text_list(existing.get("sampleFingerprints")),
"imageAsset": str(submission.get("asset", "")),
"imageFacts": submission.get("fileFacts", {}),
"contributionCount": int(existing.get("contributionCount", 0) or 0),
"matchedSubmissionIds": _text_list(existing.get("matchedSubmissionIds")),
"lastOriginDecisionAt": _now_label(),
}
self._put("knowledge_entries", entry_id, entry, conn=conn)
def _watchlist_fingerprints(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> list[str]:
if image_store is None:
return []
try:
fingerprints = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
)
except Exception:
return []
return [fingerprints.perceptual]
def rerun_enrichment(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
score_before = int(submission.get("riskScore", 0) or 0)
evidence_before = {
str(item.get("id", "")): item
for item in self._evidence_by_submission().get(submission_id, [])
}
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
evidence = {
"id": f"ev-{submission_id}-rerun-{_timestamp_id()}",
"group": "internal",
"source": "fingerprint",
"title": "재분석 요청이 접수됨",
"confidence": 0,
"query": "",
"domain": "internal",
"url": "",
"retrievedAt": _now_label(),
"contributed": False,
"sourceEvidenceIds": [],
"status": "queued",
}
self._put("evidence", evidence["id"], {**evidence, "submission_id": submission_id})
if image_store is not None:
self._rerun_internal_analysis(submission_id, image_store)
google_evidence = self._rerun_google_image_search(submission_id, image_store)
query_source_evidence = [
*google_evidence,
*_submission_search_hint_evidence(submission),
]
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self.add_audit_event("rights.ops", "Analysis run created", submission_id, "operator rerun")
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
evidence_after = {
str(item.get("id", "")): item
for item in self._evidence_for_submission(submission_id)
}
rerun_marker_prefix = f"ev-{submission_id}-rerun-"
# LLM 요약은 재분석마다 삭제 후 재생성되어 id가 항상 바뀌므로(요약의
# source_evidence_ids에 타임스탬프 마커 id가 섞임) diff에 포함하면
# 변경이 없어도 매번 신규+제거로 잡힌다 — diff 대상에서 제외한다.
added_ids = [
evidence_id
for evidence_id in evidence_after
if evidence_id not in evidence_before
and not evidence_id.startswith(rerun_marker_prefix)
and str(evidence_after[evidence_id].get("source", "")) != "llm"
]
removed_items = [
evidence_before[evidence_id]
for evidence_id in evidence_before
if evidence_id not in evidence_after
and str(evidence_before[evidence_id].get("source", "")) != "llm"
]
refreshed = self._get("submissions", submission_id)
refreshed["lastRerunDiff"] = {
"at": _now_label(),
"scoreBefore": score_before,
"scoreAfter": int(refreshed.get("riskScore", 0) or 0),
"addedEvidenceIds": added_ids,
"removedEvidenceIds": [str(item.get("id", "")) for item in removed_items],
"removedSummaries": [
{"source": str(item.get("source", "")), "reason": str(item.get("title", ""))}
for item in removed_items
],
}
self._put("submissions", submission_id, refreshed)
return self.review(submission_id)
def run_auto_search(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
submission["lastAnalysis"] = _now_label()
self._put("submissions", submission_id, submission)
existing_evidence = self._evidence_by_submission().get(submission_id, [])
query_source_evidence = [
_domain_evidence_from_ui(item)
for item in existing_evidence
if item.get("source") in {"google", "naver", "face", "fingerprint", "llm", "failure"}
]
query_source_evidence.extend(_submission_search_hint_evidence(submission))
self._auto_naver_search(submission_id, query_source_evidence, image_store)
self._auto_google_custom_search(submission_id, query_source_evidence, image_store)
self._ensure_llm_summary(submission_id)
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event(
"rights.ops",
"Provider called",
f"auto-search / {submission_id}",
"operator request for auto text search",
)
return self.review(submission_id)
def manual_search(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None = None,
) -> dict[str, Any]:
submission = self._get("submissions", submission_id)
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
domain_evidence, provider_call_count = self._manual_search_evidence(
submission_id,
provider,
query,
image_store,
)
for item in domain_evidence:
evidence = _evidence_payload(submission_id, item)
evidence["status"] = "manual"
self._put("evidence", evidence["id"], evidence)
submission.setdefault("queryHistory", []).insert(
0,
{
"provider": provider,
"query": query,
"status": "manual",
"timestamp": _now_label(),
"count": len(domain_evidence),
},
)
self._put("submissions", submission_id, submission)
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} search failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self._ensure_llm_summary(submission_id)
if image_store is not None:
self._rescore_submission(submission_id)
self._sync_submission_provider_state()
self.add_audit_event("rights.ops", "Provider called", f"{provider} / {submission_id}", f"manual text query: {query}")
return self.review(submission_id)
def _ensure_llm_summaries_for_existing_source_evidence(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._ensure_llm_summary(str(submission["id"]), only_if_missing=True)
def _ensure_llm_summary(self, submission_id: str, *, only_if_missing: bool = False) -> bool:
if self.provider_runtime.llm_assistant is None:
return False
llm_provider = self._get("providers", "llm")
if not llm_provider.get("enabled"):
return False
evidence_payloads = self._evidence_by_submission().get(submission_id, [])
if only_if_missing and any(
_evidence_matches_provider(item, "llm") and _provider_item_has_result(item)
for item in evidence_payloads
):
return False
source_evidence = [
_domain_evidence_from_ui(item)
for item in evidence_payloads
if item.get("source") in {"fingerprint", "face", "google", "naver"}
]
if not source_evidence:
return False
llm_evidence = self.provider_runtime.llm_assistant.summarize(
submission_id,
source_evidence,
)
self._delete_llm_summary_evidence(submission_id)
self._put(
"evidence",
_evidence_id(submission_id, llm_evidence),
_evidence_payload(submission_id, llm_evidence),
)
if llm_evidence.source == EvidenceSource.ENRICHMENT_FAILURE:
llm_provider["lastFailure"] = llm_evidence.reason
else:
llm_provider["lastSuccess"] = _now_label()
llm_provider["lastFailure"] = "없음"
self._apply_provider_usage_delta("llm", 1, llm_provider)
return True
def _delete_llm_summary_evidence(self, submission_id: str) -> None:
with self._connect() as conn:
conn.execute(
"""
delete from evidence
where submission_id = ?
and (
source = 'llm'
or (
source = 'failure'
and json_extract(payload, '$.title') like 'LLM assistance failed%'
)
)
""",
(submission_id,),
)
def _manual_search_evidence(
self,
submission_id: str,
provider: str,
query: str,
image_store: LocalSubmissionImageStore | None,
) -> tuple[list[Evidence], int]:
if provider == "naver":
if self.provider_runtime.naver_adapter is None:
raise ValueError(f"{provider} provider not connected")
promoter = SearchResultPromoter()
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.naver_adapter.search(
submission_id,
query,
self.provider_runtime.search_policy,
)
domain_evidence = promoter.promote(domain_evidence)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
page_similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None and not similarity_evidence:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
call_count += 1
page_evidence = promoter.promote(page_evidence)
domain_evidence.extend(page_evidence)
page_similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
page_evidence,
image_store,
status="manual",
)
if (
can_compare_search_images
and image_store is not None
and not similarity_evidence
and not page_similarity_evidence
):
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
web_evidence = promoter.promote(web_evidence)
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
if provider == "google_search":
if self.provider_runtime.google_custom_search_adapter is None:
raise ValueError(f"{provider} provider not connected")
can_compare_search_images = self._can_compare_search_result_images(
submission_id,
image_store,
)
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
similarity_evidence: list[Evidence] = []
if can_compare_search_images and image_store is not None:
similarity_evidence = self._sync_search_result_image_similarity(
submission_id,
domain_evidence,
image_store,
status="manual",
)
if can_compare_search_images and image_store is not None and not similarity_evidence:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
submission_id,
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
call_count += 1
domain_evidence.extend(web_evidence)
self._sync_search_result_image_similarity(
submission_id,
web_evidence,
image_store,
status="manual",
)
return domain_evidence, call_count
raise ValueError(f"{provider} provider not connected")
def register_manual_knowledge_entry(self, payload: dict[str, Any]) -> dict[str, Any]:
name = str(payload.get("name", "")).strip()
if not name:
raise ValueError("knowledge name required")
entry_id = f"kb-manual-{_timestamp_id()}"
image_record = self._store_manual_knowledge_image(entry_id, payload.get("image"))
sample_fingerprints = [image_record["perceptualFingerprint"]] if image_record else []
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "other"))),
"aliases": _text_list(payload.get("aliases")),
"keywords": _text_list(payload.get("keywords")),
"memo": str(payload.get("memo", "")).strip(),
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_record["asset"] if image_record else "",
"imageFacts": image_record["facts"] if image_record else {},
}
self._put("knowledge_entries", entry_id, entry)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
"manual image reference" if image_record else "manual text reference",
)
return entry
def collect_keyword_candidates(self, query: str, provider: str = "naver") -> dict[str, Any]:
query = query.strip()
if not query:
raise ValueError("collection query required")
if provider not in {"naver", "google_search"}:
raise ValueError(f"{provider} provider not supported for candidate collection")
self._sync_provider_payloads()
provider_payload = self._get("providers", provider)
if not provider_payload["enabled"]:
raise ValueError(f"{provider} provider disabled")
if provider == "naver" and self.provider_runtime.naver_adapter is None:
raise ValueError("naver provider not connected")
if provider == "google_search" and self.provider_runtime.google_custom_search_adapter is None:
raise ValueError("google_search provider not connected")
self._clear_collection_candidates()
if provider == "google_search":
domain_evidence = self.provider_runtime.google_custom_search_adapter.search_images(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
else:
domain_evidence = self.provider_runtime.naver_adapter.search(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
collected = 0
provider_call_count = 1 if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in domain_evidence) else 0
for candidate in self._collection_candidates_from_evidence(query, domain_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "google_search" and collected == 0:
web_evidence = self.provider_runtime.google_custom_search_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.google_custom_search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
page_evidence = self.provider_runtime.naver_adapter.search_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in page_evidence):
provider_call_count += 1
domain_evidence.extend(page_evidence)
for candidate in self._collection_candidates_from_evidence(query, page_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if provider == "naver" and collected == 0:
web_evidence = self.provider_runtime.naver_adapter.search_web_pages(
"candidate-collection",
query,
self.provider_runtime.search_policy,
)
if any(item.source != EvidenceSource.SEARCH_SKIPPED for item in web_evidence):
provider_call_count += 1
domain_evidence.extend(web_evidence)
for candidate in self._collection_candidates_from_evidence(query, web_evidence, provider):
self._put("collection_candidates", candidate["id"], candidate)
collected += 1
if any(
item.source in {EvidenceSource.SEARCH_SKIPPED, EvidenceSource.ENRICHMENT_FAILURE}
for item in domain_evidence
):
provider_payload["lastFailure"] = domain_evidence[0].reason if domain_evidence else f"{provider} collection failed"
else:
provider_payload["lastSuccess"] = _now_label()
provider_payload["lastFailure"] = "없음"
self._apply_provider_usage_delta(provider, provider_call_count, provider_payload)
self.add_audit_event("rights.ops", "Keyword candidates collected", provider, f"{query} · {collected} candidates")
payload = self.bootstrap()
payload["collected"] = collected
return payload
def promote_collection_candidate(self, candidate_id: str, payload: dict[str, Any]) -> dict[str, Any]:
with self._write_lock:
candidate = self._get("collection_candidates", candidate_id)
# Idempotent: a double-click / retry must not create a second
# confirmed knowledge entry for the same candidate.
if candidate.get("status") == "promoted" and candidate.get("promotedKnowledgeId"):
return self.bootstrap()
# Deterministic id so a racing retry upserts the same row instead of
# minting a new timestamped id.
entry_id = _stable_id("kb-candidate", candidate_id)
query = str(candidate.get("query", ""))
name = str(payload.get("name", "")).strip() or str(candidate.get("title", "")).strip() or query
memo = str(payload.get("memo", "")).strip() or f"키워드 후보 수집에서 편입: {query}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or [query],
"keywords": _text_list(payload.get("keywords")) or [query],
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_id,
"sampleFingerprints": _text_list(candidate.get("sampleFingerprints")),
"imageAsset": str(candidate.get("imageAsset", "")),
"imageFacts": candidate.get("imageFacts", {}),
}
with self._transaction() as conn:
self._put("knowledge_entries", entry_id, entry, conn=conn)
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", candidate_id, candidate, conn=conn)
self.add_audit_event(
"rights.ops", "Knowledge entry manually created", name,
f"promoted candidate {candidate_id}", conn=conn,
)
return self.bootstrap()
def promote_collection_candidates(self, payload: dict[str, Any]) -> dict[str, Any]:
candidate_ids = _unique_texts(_text_list(payload.get("candidate_ids", payload.get("candidateIds"))))
if not candidate_ids:
raise ValueError("candidate_ids required")
candidates = [self._get("collection_candidates", candidate_id) for candidate_id in candidate_ids]
sample_fingerprints = _unique_texts(
fingerprint
for candidate in candidates
for fingerprint in _text_list(candidate.get("sampleFingerprints"))
)
if not sample_fingerprints:
raise ValueError("selected candidates have no sample fingerprints")
entry_id = f"kb-candidate-{_timestamp_id()}"
queries = _unique_texts(str(candidate.get("query", "")) for candidate in candidates)
image_assets = _unique_texts(str(candidate.get("imageAsset", "")) for candidate in candidates)
image_facts = [
candidate.get("imageFacts", {})
for candidate in candidates
if isinstance(candidate.get("imageFacts", {}), dict) and candidate.get("imageFacts", {})
]
fallback_name = next(
(
str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
for candidate in candidates
if str(candidate.get("title", "")).strip() or str(candidate.get("query", "")).strip()
),
"Collected reference",
)
name = str(payload.get("name", "")).strip() or fallback_name
memo = str(payload.get("memo", "")).strip() or f"Promoted from collected candidates: {', '.join(queries)}"
entry = {
"id": entry_id,
"name": name,
"type": _knowledge_type_value(str(payload.get("type", "rejected_reference"))),
"aliases": _text_list(payload.get("aliases")) or queries,
"keywords": _text_list(payload.get("keywords")) or queries,
"memo": memo,
"provenance": "manual",
"active": True,
"entryStatus": "confirmed",
"sourceDecision": "",
"sourceCandidate": candidate_ids[0] if len(candidate_ids) == 1 else "",
"sourceCandidates": candidate_ids,
"sampleFingerprints": sample_fingerprints,
"imageAsset": image_assets[0] if image_assets else "",
"imageAssets": image_assets,
"imageFacts": {
"samples": len(candidates),
"queries": queries,
"items": image_facts,
},
}
self._put("knowledge_entries", entry_id, entry)
for candidate in candidates:
candidate["status"] = "promoted"
candidate["promotedKnowledgeId"] = entry_id
self._put("collection_candidates", str(candidate["id"]), candidate)
self.add_audit_event(
"rights.ops",
"Knowledge entry manually created",
name,
f"promoted {len(candidates)} collected candidates",
)
return self.bootstrap()

View file

@ -254,6 +254,10 @@ def _srcset_descriptor_score(value: str) -> float:
return 0.0
def _is_generic_data_image_attr(name: str, value: str) -> bool:
return bool(_data_attribute_image_urls(name, value))
def _data_attribute_image_urls(
name: str,
value: str,

View file

@ -1,308 +0,0 @@
"""SQLite persistence primitives for CopyrighterStore, as a mixin.
Connection/transaction management and the generic _put/_get/_all row access plus
the evidence-read helpers. Mixed into CopyrighterStore; methods rely on the host
class providing `self.db_path` and `self._write_lock`. Behavior unchanged.
"""
from __future__ import annotations
import json
import sqlite3
from contextlib import contextmanager
from typing import Any
from rights_filter.server.store_serialization import _validate_payload, _validate_table
class StorePersistenceMixin:
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.execute("pragma foreign_keys = on")
# The server is a ThreadingHTTPServer: concurrent operator requests open
# their own connection. Without a busy timeout the second writer raises
# "database is locked" immediately; wait for the lock instead.
conn.execute("pragma busy_timeout = 5000")
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _conn_ctx(self, conn: sqlite3.Connection | None = None):
# When a caller passes an open connection (inside _transaction), reuse it
# and let the caller own commit/rollback. Otherwise behave exactly like
# `with self._connect() as conn:` — sqlite3 commits on success / rolls
# back on error.
if conn is not None:
yield conn
return
with self._connect() as owned:
yield owned
@contextmanager
def _transaction(self):
# A single connection + single commit for multi-step writes that must be
# atomic (e.g. a state change and its audit event), so a mid-sequence
# failure leaves no partial state.
conn = self._connect()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _put(
self,
table: str,
id_value: str,
payload: dict[str, Any],
*,
conn: sqlite3.Connection | None = None,
) -> None:
_validate_payload(table, id_value, payload)
payload_json = json.dumps(payload, ensure_ascii=False)
with self._conn_ctx(conn) as conn:
if table == "submissions":
conn.execute(
"""
insert into submissions
(id, title, risk_score, risk_band, decision_status, submitted_epoch, queue_id, payload)
values (?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
title = excluded.title,
risk_score = excluded.risk_score,
risk_band = excluded.risk_band,
decision_status = excluded.decision_status,
submitted_epoch = excluded.submitted_epoch,
queue_id = excluded.queue_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("title", "")),
int(payload.get("riskScore", 0) or 0),
str(payload.get("riskBand", "low")),
str(payload.get("decisionStatus", "unreviewed")),
int(payload.get("submittedEpoch", 0) or 0),
str(payload.get("queueId", "")),
payload_json,
),
)
return
if table == "evidence":
conn.execute(
"""
insert into evidence
(id, submission_id, source, confidence, status, contributed, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
submission_id = excluded.submission_id,
source = excluded.source,
confidence = excluded.confidence,
status = excluded.status,
contributed = excluded.contributed,
payload = excluded.payload
""",
(
id_value,
str(payload["submission_id"]),
str(payload.get("source", "")),
# Clamp to the column's CHECK (confidence >= 0 and <= 1).
# Some sources (e.g. Google Vision web-entity relevance
# scores) are unbounded and would raise IntegrityError.
max(0.0, min(1.0, float(payload.get("confidence", 0) or 0))),
str(payload.get("status", "active")),
1 if bool(payload.get("contributed", True)) else 0,
payload_json,
),
)
return
if table == "providers":
conn.execute(
"""
insert into providers
(id, name, enabled, usage, quota, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
enabled = excluded.enabled,
usage = excluded.usage,
quota = excluded.quota,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
1 if bool(payload.get("enabled", False)) else 0,
int(payload.get("usage", 0) or 0),
int(payload.get("quota", 0) or 0),
payload_json,
),
)
return
if table == "knowledge_entries":
conn.execute(
"""
insert into knowledge_entries
(id, name, entry_type, entry_status, active, source_submission_id, payload)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
name = excluded.name,
entry_type = excluded.entry_type,
entry_status = excluded.entry_status,
active = excluded.active,
source_submission_id = excluded.source_submission_id,
payload = excluded.payload
""",
(
id_value,
str(payload.get("name", "")),
str(payload.get("type", "other")),
str(payload.get("entryStatus", "confirmed")),
1 if bool(payload.get("active", True)) else 0,
str(payload.get("sourceSubmissionId", "")),
payload_json,
),
)
return
if table == "collection_candidates":
conn.execute(
"""
insert into collection_candidates
(id, provider, query, status, collected_epoch, payload)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set
provider = excluded.provider,
query = excluded.query,
status = excluded.status,
collected_epoch = excluded.collected_epoch,
payload = excluded.payload
""",
(
id_value,
str(payload.get("provider", "")),
str(payload.get("query", "")),
str(payload.get("status", "candidate")),
int(payload.get("collectedEpoch", 0) or 0),
payload_json,
),
)
return
if table == "corrections":
conn.execute(
"""
insert into corrections
(id, decision_id, status, payload)
values (?, ?, ?, ?)
on conflict(id) do update set
decision_id = excluded.decision_id,
status = excluded.status,
payload = excluded.payload
""",
(
id_value,
str(payload.get("decisionId", payload.get("decision_id", ""))),
str(payload.get("status", "")),
payload_json,
),
)
return
conn.execute(
f"insert or replace into {table} (id, payload) values (?, ?)",
(id_value, payload_json),
)
def _get(
self,
table: str,
id_value: str,
*,
conn: sqlite3.Connection | None = None,
) -> dict[str, Any]:
_validate_table(table)
with self._conn_ctx(conn) as conn:
row = conn.execute(f"select payload from {table} where id = ?", (id_value,)).fetchone()
if row is None:
raise KeyError(id_value)
return json.loads(row["payload"])
def _apply_provider_usage_delta(
self, provider_id: str, count: int, payload: dict[str, Any]
) -> None:
# Re-read current usage under the write lock and apply the delta so
# concurrent provider calls don't lose increments. The rest of `payload`
# (lastSuccess / lastFailure) is last-writer-wins, which is acceptable for
# status fields.
with self._write_lock:
current = self._get("providers", provider_id)
payload["usage"] = int(current.get("usage", 0) or 0) + int(count)
self._put("providers", provider_id, payload)
def _clear_collection_candidates(self) -> None:
with self._connect() as conn:
conn.execute("delete from collection_candidates")
def _all(self, table: str, queue_id: str | None = None) -> list[dict[str, Any]]:
_validate_table(table)
with self._connect() as conn:
if table == "submissions" and queue_id is not None:
rows = conn.execute(
"select payload from submissions where queue_id = ? order by id",
(queue_id,),
).fetchall()
else:
rows = conn.execute(f"select payload from {table} order by id").fetchall()
values = [json.loads(row["payload"]) for row in rows]
if table == "providers":
order = {"internal": 0, "naver": 1, "google": 2, "google_search": 3, "llm": 4}
values.sort(key=lambda item: order.get(item["id"], 99))
if table == "collection_candidates":
values.sort(key=lambda item: int(item.get("collectedEpoch", 0)), reverse=True)
return values
def _evidence_rows(self, queue_id: str | None = None):
with self._connect() as conn:
if queue_id is None:
rows = conn.execute("select submission_id, payload from evidence order by id").fetchall()
else:
rows = conn.execute(
"""
select e.submission_id, e.payload
from evidence e
join submissions s on s.id = e.submission_id
where s.queue_id = ?
order by e.id
""",
(queue_id,),
).fetchall()
return rows
def _evidence_for_submission(self, submission_id: str) -> list[dict[str, Any]]:
# Read only this submission's evidence. _rescore_submission previously
# called _evidence_by_submission(), which read and JSON-parsed the ENTIRE
# evidence table on every single-submission rescore (a hot path inside the
# seed / auto-search / similarity loops).
with self._connect() as conn:
rows = conn.execute(
"select payload from evidence where submission_id = ? order by id",
(submission_id,),
).fetchall()
evidence: list[dict[str, Any]] = []
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
evidence.append(payload)
return evidence
def _evidence_by_submission(self, queue_id: str | None = None) -> dict[str, list[dict[str, Any]]]:
rows = self._evidence_rows(queue_id=queue_id)
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
payload = json.loads(row["payload"])
payload.pop("submission_id", None)
grouped.setdefault(row["submission_id"], []).append(payload)
return grouped

View file

@ -1,162 +0,0 @@
"""Submission-queue management for CopyrighterStore, as a mixin.
Queue id derivation, active-queue selection, legacy-submission migration, and
the active-queue bootstrap/ensure helpers. Mixed into CopyrighterStore; relies on
persistence methods and self.* attributes provided by the host class. Behavior
unchanged.
"""
from __future__ import annotations
import hashlib
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any
from rights_filter.server.image_store import LocalSubmissionImageStore
class StoreQueueMixin:
@staticmethod
def _queue_id_for_path(folder_path: Path) -> str:
return f"queue-{hashlib.sha1(str(folder_path).encode('utf-8')).hexdigest()[:16]}"
@staticmethod
def _normalize_queue_folder(folder_path: Path | str) -> Path:
return Path(folder_path).resolve()
def _queue_row_by_id(self, queue_id: str) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"""
select id, folder_path, label, is_active, created_at, created_epoch,
last_imported_epoch, last_imported_at
from submission_queues
where id = ?
""",
(queue_id,),
).fetchone()
if row is None:
return None
return {
"id": row["id"],
"folderPath": row["folder_path"],
"label": row["label"],
"isActive": bool(row["is_active"]),
"createdAt": row["created_at"],
"createdEpoch": row["created_epoch"],
"lastImportedAt": row["last_imported_at"],
"lastImportedEpoch": row["last_imported_epoch"],
}
def _migrate_legacy_submissions_to_queue(self, queue_id: str, conn: sqlite3.Connection | None = None) -> None:
should_close = conn is None
if conn is None:
conn = self._connect()
try:
conn.execute(
"update submissions set queue_id = ? where queue_id = ''",
(queue_id,),
)
if should_close:
conn.commit()
finally:
if should_close:
conn.close()
def bootstrap_active_queue(self) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"""
select id, folder_path, label, is_active, created_at, created_epoch,
last_imported_epoch, last_imported_at
from submission_queues
where is_active = 1
limit 1
""",
).fetchone()
if row is None:
return None
return {
"id": row["id"],
"folderPath": row["folder_path"],
"label": row["label"],
"isActive": bool(row["is_active"]),
"createdAt": row["created_at"],
"createdEpoch": row["created_epoch"],
"lastImportedAt": row["last_imported_at"],
"lastImportedEpoch": row["last_imported_epoch"],
}
def ensure_queue(self, folder_path: Path | str, label: str | None = None) -> dict[str, Any]:
folder = self._normalize_queue_folder(folder_path)
queue_id = self._queue_id_for_path(folder)
now = datetime.now()
queue_label = str(label or "").strip() or folder.name
with self._connect() as conn:
existing = conn.execute(
"select id from submission_queues where id = ? or folder_path = ?",
(queue_id, str(folder)),
).fetchone()
if existing is None:
conn.execute(
"""
insert into submission_queues (
id, folder_path, label, is_active, created_at, created_epoch, last_imported_epoch, last_imported_at
) values (?, ?, ?, 1, ?, ?, 0, '')
""",
(queue_id, str(folder), queue_label, now.isoformat(" ", "seconds"), int(now.timestamp())),
)
else:
conn.execute(
"update submission_queues set is_active = 1, folder_path = ?, label = ? where id = ?",
(str(folder), queue_label, queue_id),
)
conn.execute("update submission_queues set is_active = 0 where id != ?", (queue_id,))
if self._has_queueless_submissions(conn=conn):
self._migrate_legacy_submissions_to_queue(queue_id, conn=conn)
queue_row = conn.execute(
"""
select id, folder_path, label, is_active, created_at, created_epoch,
last_imported_epoch, last_imported_at
from submission_queues
where id = ?
""",
(queue_id,),
).fetchone()
if queue_row is None:
return self._queue_row_by_id(queue_id)
return {
"id": queue_row["id"],
"folderPath": queue_row["folder_path"],
"label": queue_row["label"],
"isActive": bool(queue_row["is_active"]),
"createdAt": queue_row["created_at"],
"createdEpoch": queue_row["created_epoch"],
"lastImportedAt": queue_row["last_imported_at"],
"lastImportedEpoch": queue_row["last_imported_epoch"],
}
def active_submission_image_store(self, fallback_root: Path | str | None = None) -> LocalSubmissionImageStore:
queue = self.bootstrap_active_queue()
if queue is None:
if fallback_root is None:
raise ValueError("no active submission queue is configured")
return LocalSubmissionImageStore(fallback_root)
return LocalSubmissionImageStore(queue["folderPath"])
def _has_queueless_submissions(self, conn: sqlite3.Connection | None = None) -> bool:
should_close = conn is None
if conn is None:
conn = self._connect()
try:
count_row = conn.execute(
"select count(*) as count from submissions where queue_id = ''"
).fetchone()
return bool(int(count_row["count"]) > 0) if count_row else False
finally:
if should_close:
conn.close()

View file

@ -1,736 +0,0 @@
"""Search-result image similarity, candidate-image storage, the in-memory
knowledge repository, and rescoring as a mixin for CopyrighterStore.
Mixed into CopyrighterStore; relies on persistence methods (self._put/_get/...),
self.* attributes, and the extracted helper modules. Behavior unchanged.
"""
from __future__ import annotations
import base64
from datetime import datetime
from pathlib import Path
from typing import Any
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.analysis.risk_scoring import RiskScorer
from rights_filter.domain.records import (
Evidence,
EvidenceSource,
InMemoryRightsFilterRepository,
KnowledgeBaseEntry,
)
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.store_page_scrape import (
_content_has_comparable_image_fingerprint,
_extract_css_image_urls,
_extract_page_image_urls,
_extract_page_stylesheet_urls,
_normalized_remote_image_url,
_search_result_direct_image_urls,
)
from rights_filter.server.store_remote_fetch import _fetch_url_bytes
from rights_filter.server.store_serialization import (
_domain_evidence_from_ui,
_evidence_id,
_evidence_payload,
_image_size_from_bytes,
_image_suffix_from_url,
_knowledge_entry_type,
_knowledge_provenance,
_now_label,
_safe_filename,
_safe_image_suffix,
_stable_id,
_strip_html,
)
from rights_filter.server.store_text import _text_list, _unique_texts
from rights_filter.server.store_url_utils import _is_http_url, _url_looks_like_image
class StoreSearchCandidatesMixin:
def _knowledge_repository(self) -> InMemoryRightsFilterRepository:
repository = InMemoryRightsFilterRepository()
for payload in self._all("knowledge_entries"):
if not payload.get("active", True):
continue
if payload.get("entryStatus") == "excluded":
continue
sample_fingerprints = _text_list(
payload.get("sampleFingerprints", payload.get("sample_fingerprints", []))
)
if not sample_fingerprints:
continue
repository.save_knowledge_entry(
KnowledgeBaseEntry(
id=str(payload.get("id", "")),
entry_type=_knowledge_entry_type(str(payload.get("type", "other"))),
name=str(payload.get("name", "")),
provenance=_knowledge_provenance(str(payload.get("provenance", "manual"))),
aliases=_text_list(payload.get("aliases")),
related_keywords=_text_list(payload.get("keywords")),
policy_memo=str(payload.get("memo", "")),
sample_fingerprints=sample_fingerprints,
source_decision_id=str(payload.get("sourceDecision", "")) or None,
entry_status=str(payload.get("entryStatus", "confirmed")),
source_submission_id=str(payload.get("sourceSubmissionId", "")),
active=bool(payload.get("active", True)),
)
)
return repository
def _sync_similar_reference_images(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = [
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT and item.data.get("knowledge_entry_id")
]
if not matched_entry_ids:
return
submission = self._get("submissions", submission_id)
similar = list(submission.get("similar", []))
existing_assets = {str(item.get("asset", "")) for item in similar}
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
asset = str(entry.get("imageAsset", ""))
if not asset or asset in existing_assets:
continue
similar.append(
{
"asset": asset,
"label": f"{entry.get('name', entry_id)} / internal match",
}
)
existing_assets.add(asset)
submission["similar"] = similar
self._put("submissions", submission_id, submission)
def _sync_search_result_image_similarity(
self,
submission_id: str,
evidence: list[Evidence],
image_store: LocalSubmissionImageStore,
status: str = "active",
max_matches: int | None = None,
) -> list[Evidence]:
submission_fingerprint = self._submission_perceptual_fingerprint(
submission_id,
image_store,
)
if submission_fingerprint is None:
return []
if max_matches is None:
max_matches = self.provider_runtime.search_result_compare_limit
else:
max_matches = min(
max_matches,
self.provider_runtime.search_result_compare_limit,
)
if max_matches <= 0:
return []
similarity_evidence: list[Evidence] = []
for item in evidence:
if len(similarity_evidence) >= max_matches:
break
matches = self._search_result_image_similarity_evidence(
submission_id,
submission_fingerprint,
item,
)
if not matches:
continue
for match in matches:
if len(similarity_evidence) >= max_matches:
break
payload = _evidence_payload(submission_id, match)
payload["status"] = status
self._put("evidence", payload["id"], payload)
similarity_evidence.append(match)
if similarity_evidence:
self._rescore_submission(submission_id)
return similarity_evidence
def _can_compare_search_result_images(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> bool:
if image_store is None:
return False
return self._submission_perceptual_fingerprint(submission_id, image_store) is not None
def _search_result_similarity_count(self, submission_id: str) -> int:
return sum(
1
for item in self._evidence_by_submission().get(submission_id, [])
if item.get("source") == "fingerprint"
and str(item.get("matchType") or "").startswith("search_result")
)
def _search_result_similarity_remaining_budget(
self,
submission_id: str,
image_store: LocalSubmissionImageStore | None,
) -> int:
if not self._can_compare_search_result_images(submission_id, image_store):
return 0
return max(
0,
self.provider_runtime.search_result_compare_limit
- self._search_result_similarity_count(submission_id),
)
def _submission_perceptual_fingerprint(
self,
submission_id: str,
image_store: LocalSubmissionImageStore,
) -> str | None:
try:
fingerprint = FingerprintService().fingerprints_for(
image_store.image_payload(submission_id).content
).perceptual
except Exception:
return None
if fingerprint.startswith("phash:unavailable:"):
return None
return fingerprint
def _search_result_image_similarity_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
) -> list[Evidence]:
if source_evidence.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
return []
if source_evidence.data.get("weak_hint"):
return []
matches: list[Evidence] = []
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_image",
candidate_source="result_image_url",
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="result_page_direct_image",
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source="provider_page_image",
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
submission_fingerprint,
source_evidence,
image_url,
match_type="search_result_page_image",
candidate_source=candidate_source,
)
if match is not None:
return [match]
return matches
def _face_crop_search_result_similarity_evidence(
self,
submission_id: str,
crop_index: int,
crop: Any,
source_evidence: Evidence,
) -> list[Evidence]:
try:
crop_fingerprint = FingerprintService().fingerprints_for(crop.content).perceptual
except Exception:
return []
if crop_fingerprint.startswith("phash:unavailable:"):
return []
matches: list[Evidence] = []
extra_data = {
"face_crop_search": True,
"crop_index": crop_index,
"weak_hint": True,
"privacy_note": "얼굴 영역만 웹 탐지한 참고 근거이며 동일인 판정이 아닙니다.",
}
for image_url in _unique_texts(
[
str(source_evidence.data.get("image_url", "")),
str(source_evidence.data.get("thumbnail_url", "")),
]
):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_image",
candidate_source="face_crop_result_image_url",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _search_result_direct_image_urls(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_result_page_direct_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url in _unique_texts(source_evidence.data.get("page_image_urls", [])):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source="face_crop_provider_page_image",
extra_data=extra_data,
)
if match is not None:
return [match]
for image_url, candidate_source in self._search_result_page_image_candidates(source_evidence):
match = self._search_result_candidate_image_evidence(
submission_id,
crop_fingerprint,
source_evidence,
image_url,
match_type="face_crop_search_result_page_image",
candidate_source=f"face_crop_{candidate_source}",
extra_data=extra_data,
)
if match is not None:
return [match]
return matches
def _search_result_candidate_image_evidence(
self,
submission_id: str,
submission_fingerprint: str,
source_evidence: Evidence,
image_url: str,
match_type: str,
candidate_source: str,
extra_data: dict[str, Any] | None = None,
) -> Evidence | None:
image_url = _normalized_remote_image_url(image_url)
result_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
or image_url
)
image_id = _stable_id(
"searchimg",
submission_id,
str(source_evidence.source),
match_type,
image_url,
str(source_evidence.data.get("query", "")),
)
image_record = self._store_candidate_image(image_id, image_url, referer_url=result_url)
if not image_record:
return None
similarity = FingerprintService().similarity(
submission_fingerprint,
str(image_record["perceptualFingerprint"]),
)
if similarity < self.provider_runtime.search_result_similarity_threshold:
return None
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason=f"Search result image similarity {similarity:.2f}",
confidence=similarity,
data={
"submission_id": submission_id,
"provider": source_evidence.data.get("provider", ""),
"query": source_evidence.data.get("query", ""),
"query_signature": source_evidence.data.get("query_signature", ""),
"query_strategy": source_evidence.data.get("query_strategy", ""),
"query_source": source_evidence.data.get("query_source", ""),
"url": result_url,
"result_url": result_url,
"image_url": image_record["asset"],
"thumbnail_url": image_record["asset"],
"remote_image_url": image_url,
"source_page_url": result_url,
"image_candidate_source": candidate_source,
"page_title": source_evidence.data.get("page_title", source_evidence.data.get("title", "")),
"match": match_type,
"similarity": similarity,
"source_evidence_ids": [_evidence_id(submission_id, source_evidence)],
"contributed": True,
**(extra_data or {}),
},
)
def _search_result_page_image_candidates(self, source_evidence: Evidence) -> list[tuple[str, str]]:
page_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
)
limit = getattr(self.provider_runtime, "search_result_page_image_limit", 3)
if not page_url or limit <= 0 or not _is_http_url(page_url):
return []
if _url_looks_like_image(page_url):
return []
try:
content = self.page_fetcher(page_url)
except Exception:
return []
if _content_has_comparable_image_fingerprint(content):
return [(page_url, "result_page_direct_image")]
image_urls = _extract_page_image_urls(content, page_url, limit)
if len(image_urls) < limit:
image_urls.extend(
self._search_result_stylesheet_image_urls(
content,
page_url,
limit - len(image_urls),
)
)
return [(image_url, "html_page_image") for image_url in _unique_texts(image_urls)[:limit]]
def _search_result_stylesheet_image_urls(
self,
page_content: bytes,
page_url: str,
limit: int,
) -> list[str]:
if limit <= 0:
return []
image_urls: list[str] = []
for stylesheet_url in _extract_page_stylesheet_urls(page_content, page_url, limit):
try:
stylesheet_content = self.stylesheet_fetcher(stylesheet_url)
except Exception:
continue
for image_url in _extract_css_image_urls(stylesheet_content, stylesheet_url, limit - len(image_urls)):
image_urls.append(image_url)
if len(image_urls) >= limit:
return image_urls
return image_urls
def _increment_knowledge_contribution_counts(
self,
submission_id: str,
evidence: list[Evidence],
) -> None:
matched_entry_ids = _unique_texts(
str(item.data.get("knowledge_entry_id", ""))
for item in evidence
if item.source == EvidenceSource.FINGERPRINT
and item.data.get("knowledge_entry_status") == "watchlist"
and item.data.get("knowledge_entry_id")
)
for entry_id in matched_entry_ids:
try:
entry = self._get("knowledge_entries", entry_id)
except KeyError:
continue
if entry.get("entryStatus") != "watchlist":
continue
if str(entry.get("sourceSubmissionId", "")) == submission_id:
continue
matched_submission_ids = _text_list(entry.get("matchedSubmissionIds"))
if submission_id in matched_submission_ids:
continue
matched_submission_ids.append(submission_id)
entry["matchedSubmissionIds"] = matched_submission_ids
entry["contributionCount"] = int(entry.get("contributionCount", 0) or 0) + 1
entry["lastMatchedSubmissionId"] = submission_id
entry["lastMatchedAt"] = _now_label()
self._put("knowledge_entries", entry_id, entry)
def _store_manual_knowledge_image(
self,
entry_id: str,
image_payload: Any,
) -> dict[str, Any] | None:
if not image_payload:
return None
if not isinstance(image_payload, dict):
raise ValueError("knowledge image must be an object")
data = str(image_payload.get("data", ""))
if not data:
raise ValueError("knowledge image data required")
if "," in data and data.split(",", 1)[0].startswith("data:"):
data = data.split(",", 1)[1]
try:
content = base64.b64decode(data, validate=True)
except Exception as exc:
raise ValueError("knowledge image data must be base64") from exc
if not content:
raise ValueError("knowledge image is empty")
filename = str(image_payload.get("filename", "reference")).strip() or "reference"
suffix = _safe_image_suffix(filename, str(image_payload.get("content_type", "")))
safe_stem = _safe_filename(Path(filename).stem) or "reference"
target_name = f"{entry_id}-{safe_stem}{suffix}"
self.knowledge_image_dir.mkdir(parents=True, exist_ok=True)
root = self.knowledge_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("knowledge image path points outside image store")
target.write_bytes(content)
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
return {
"asset": f"{self.knowledge_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"filename": filename,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _collection_candidates_from_evidence(
self,
query: str,
evidence: list[Evidence],
provider: str,
) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in evidence:
if item.source not in {EvidenceSource.NAVER_SEARCH, EvidenceSource.WEB_DETECTION}:
continue
if item.data.get("image_url"):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
source_candidate_type="search_result_image",
)
if candidate is not None:
candidates.append(candidate)
continue
candidate_count = len(candidates)
for image_url in _unique_texts(item.data.get("page_image_urls", [])):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="provider_page_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url in _search_result_direct_image_urls(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type="result_page_direct_image",
)
if candidate is not None:
candidates.append(candidate)
if len(candidates) > candidate_count:
continue
for image_url, source_candidate_type in self._search_result_page_image_candidates(item):
candidate = self._candidate_payload_from_evidence(
query,
item,
provider,
image_url=image_url,
thumbnail_url=image_url,
source_candidate_type=source_candidate_type,
)
if candidate is not None:
candidates.append(candidate)
break
return candidates
def _candidate_payload_from_evidence(
self,
query: str,
evidence: Evidence,
provider: str = "naver",
image_url: str | None = None,
thumbnail_url: str | None = None,
source_candidate_type: str = "search_result_image",
) -> dict[str, Any] | None:
image_url = _normalized_remote_image_url(
str(image_url if image_url is not None else evidence.data.get("image_url", ""))
)
thumbnail_url = _normalized_remote_image_url(
str(thumbnail_url if thumbnail_url is not None else evidence.data.get("thumbnail_url", ""))
)
result_url = str(evidence.data.get("result_url", ""))
candidate_id = _stable_id("cand", provider, source_candidate_type, query, image_url, thumbnail_url, result_url)
image_record = None
stored_image_url = ""
for candidate_url in _unique_texts([image_url, thumbnail_url]):
image_record = self._store_candidate_image(
candidate_id,
candidate_url,
referer_url=result_url,
)
if image_record is not None:
stored_image_url = candidate_url
break
if image_record is None:
return None
display_image_url = stored_image_url or image_url
return {
"id": candidate_id,
"provider": provider,
"query": query,
"title": _strip_html(str(evidence.data.get("title", ""))),
"status": "candidate",
"rank": evidence.data.get("rank", ""),
"imageUrl": display_image_url,
"thumbnailUrl": thumbnail_url,
"resultUrl": result_url,
"sourceUrl": result_url or display_image_url,
"sourceCandidateType": source_candidate_type,
"imageAsset": image_record["asset"],
"sampleFingerprints": [image_record["perceptualFingerprint"]],
"imageFacts": image_record["facts"],
"collectedAt": _now_label(),
"collectedEpoch": int(datetime.now().timestamp()),
"promotedKnowledgeId": "",
}
def _store_candidate_image(
self,
candidate_id: str,
url: str,
referer_url: str = "",
) -> dict[str, Any] | None:
if not url:
return None
suffix = _image_suffix_from_url(url)
target_name = f"{candidate_id}{suffix}"
root = self.collection_image_dir.resolve()
target = (root / target_name).resolve()
if target != root and root not in target.parents:
raise ValueError("candidate image path points outside image store")
if target.exists() and target.is_file():
try:
record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
target.read_bytes(),
)
except Exception:
record = None
if record is not None:
return record
try:
content = self._fetch_candidate_image_content(url, referer_url)
except Exception:
return None
image_record = self._candidate_image_record_from_content(
target_name,
url,
suffix,
content,
)
if image_record is None:
return None
self.collection_image_dir.mkdir(parents=True, exist_ok=True)
target.write_bytes(content)
return image_record
def _candidate_image_record_from_content(
self,
target_name: str,
url: str,
suffix: str,
content: bytes,
) -> dict[str, Any] | None:
if not content:
return None
width, height = _image_size_from_bytes(content)
fingerprints = FingerprintService().fingerprints_for(content)
if fingerprints.perceptual.startswith("phash:unavailable:"):
return None
return {
"asset": f"{self.collection_public_prefix}/{target_name}",
"perceptualFingerprint": fingerprints.perceptual,
"facts": {
"source": url,
"format": suffix.lstrip(".").upper(),
"size": f"{width} x {height}",
"fingerprints": 1,
},
}
def _fetch_candidate_image_content(self, url: str, referer_url: str = "") -> bytes:
if self._custom_candidate_image_fetcher is not None:
return self._custom_candidate_image_fetcher(url)
return _fetch_url_bytes(url, referer_url=referer_url)
def _rescore_submission(self, submission_id: str) -> None:
submission = self._get("submissions", submission_id)
evidence = [
_domain_evidence_from_ui(item)
for item in self._evidence_for_submission(submission_id)
]
score = RiskScorer().score(evidence)
submission["riskScore"] = score.score
submission["riskBand"] = score.band
submission["reasons"] = score.reasons or ["분석 근거 없음"]
self._put("submissions", submission_id, submission)
def _rescore_all_submissions(self, queue_id: str | None = None) -> None:
for submission in self._all("submissions", queue_id=queue_id):
self._rescore_submission(str(submission["id"]))

View file

@ -1 +0,0 @@
"""Operator-GUI test package (enables relative imports of shared page objects)."""

View file

@ -1,105 +0,0 @@
"""Shared fixtures for operator-GUI tests.
The browser fixtures lazily import Playwright (via ``importorskip``) and skip
when Chromium is unavailable, so the non-browser static tests in this package
still collect and run on a host without Playwright installed.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
from typing import Any, Iterator
import pytest
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT / "src") not in sys.path:
sys.path.insert(0, str(ROOT / "src"))
APP_DIR = ROOT / "web" / "operator-gui"
ARTIFACT_DIR = ROOT / "artifacts" / "operator-gui-e2e"
DEFAULT_VIEWPORT = {"width": 1280, "height": 900}
@dataclass
class OperatorServer:
"""A running operator server plus the paths a spec may need to assert on."""
base_url: str
image_root: Path
store: Any
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call): # noqa: ANN001 - pytest hook signature
"""Stash each phase's report so fixtures can screenshot on failure."""
outcome = yield
setattr(item, f"rep_{outcome.get_result().when}", outcome.get_result())
@pytest.fixture
def operator_server(tmp_path: Path) -> Iterator[OperatorServer]:
"""Build and serve the operator app against a fresh, isolated SQLite store."""
from rights_filter.server.http_app import build_server
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.sqlite_store import CopyrighterStore
image_root = tmp_path / "submissions"
image_root.mkdir()
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3")
store.initialize()
server = build_server(
host="127.0.0.1",
port=0,
store=store,
image_store=LocalSubmissionImageStore(image_root),
static_dir=APP_DIR,
)
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
yield OperatorServer(
base_url=f"http://127.0.0.1:{server.server_port}",
image_root=image_root,
store=store,
)
finally:
server.shutdown()
thread.join(timeout=5)
@pytest.fixture
def browser() -> Iterator[Any]:
"""Launch headless Chromium, skipping cleanly when it is unavailable."""
playwright_api = pytest.importorskip("playwright.sync_api")
with playwright_api.sync_playwright() as pw:
try:
launched = pw.chromium.launch(headless=True)
except Exception as exc: # missing browser binaries, sandbox, etc.
pytest.skip(f"Playwright Chromium is not available: {exc}")
try:
yield launched
finally:
launched.close()
@pytest.fixture
def operator_page(browser: Any, request: pytest.FixtureRequest) -> Iterator[Any]:
"""A fresh page at the default viewport; screenshots to artifacts on failure."""
page = browser.new_page(viewport=DEFAULT_VIEWPORT)
try:
yield page
finally:
report = getattr(request.node, "rep_call", None)
if report is not None and report.failed:
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
safe_name = request.node.name.replace("/", "_").replace("::", "_")[:120]
try:
page.screenshot(path=str(ARTIFACT_DIR / f"{safe_name}.png"), full_page=True)
except Exception:
pass # screenshot is best-effort diagnostics, never fail teardown
page.close()

View file

@ -1,5 +0,0 @@
"""Page objects for operator-GUI browser (Playwright) tests."""
from .workbench_page import OperatorWorkbench
__all__ = ["OperatorWorkbench"]

View file

@ -1,151 +0,0 @@
"""Page Object Model for the operator workbench single-page app.
Encapsulates the operator-GUI selectors and interactions so browser specs read
as user intent ("select case", "use suggested query") instead of raw locators.
Mirrors the existing app structure in ``web/operator-gui/index.html``.
All locator accessors return Playwright ``Locator`` objects, which auto-wait;
prefer them over arbitrary timeouts (see the e2e-testing skill guidance).
"""
from __future__ import annotations
import json
from typing import Any
# Status string the app shows after a suggested query is loaded but not yet run.
SUGGESTED_QUERY_LOADED_STATUS = "추천 쿼리를 입력했습니다. 실행 버튼을 눌러 검색하세요."
# Client-side guard message when a reject/correct decision is missing its memo.
MEMO_REQUIRED_ERROR = "반려 또는 보정 결정에는 메모가 필요합니다."
class OperatorWorkbench:
"""Drives the operator GUI for browser-level (E2E) assertions."""
def __init__(self, page: Any, base_url: str) -> None:
self.page = page
self.base_url = base_url.rstrip("/")
self.console_errors: list[str] = []
page.on("console", lambda message: self.console_errors.append(f"console:{message.type}:{message.text}"))
page.on("pageerror", lambda error: self.console_errors.append(f"pageerror:{error}"))
# -- Locators ---------------------------------------------------------
@property
def queue_body(self) -> Any:
return self.page.locator("#queue-body")
@property
def workbench_view(self) -> Any:
return self.page.locator("#workbench-view")
@property
def case_title(self) -> Any:
return self.page.locator("#case-title")
@property
def manual_query(self) -> Any:
return self.page.locator("#manual-query")
@property
def manual_query_status(self) -> Any:
return self.page.locator("#manual-query-status")
@property
def submission_image_name(self) -> Any:
return self.page.locator("#submission-image-name")
@property
def submission_import_status(self) -> Any:
return self.page.locator("#submission-import-status")
@property
def recommendation_box(self) -> Any:
"""Selected-case summary; includes the "현재 운영 결정: <label>" line."""
return self.page.locator("#recommendation-box")
@property
def decision_memo(self) -> Any:
return self.page.locator("#decision-memo")
@property
def memo_error(self) -> Any:
return self.page.locator("#memo-error")
def panel(self, name: str) -> Any:
"""Workbench panel by ``data-workbench-panel`` (e.g. ``queries``, ``evidence``)."""
return self.page.locator(f'[data-workbench-panel="{name}"]')
def case_row(self, case_id: str) -> Any:
return self.page.locator(f'#queue-body [data-select-case="{case_id}"]')
# -- Navigation / setup ----------------------------------------------
def mock_bootstrap(self, payload: dict) -> "OperatorWorkbench":
"""Stub ``/api/bootstrap`` so a spec controls the queue state.
Must be called before :meth:`goto` so the route is registered before the
app issues its first request.
"""
self.page.route(
"**/api/bootstrap",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body=json.dumps(payload, ensure_ascii=False),
),
)
return self
def goto(self) -> "OperatorWorkbench":
self.page.goto(self.base_url + "/")
return self
# -- Actions ----------------------------------------------------------
def wait_for_case(self, case_id: str, **kwargs: Any) -> "OperatorWorkbench":
self.page.wait_for_selector(f'#queue-body [data-select-case="{case_id}"]', **kwargs)
return self
def select_case(self, case_id: str) -> "OperatorWorkbench":
self.page.get_by_role("button", name=case_id).click()
return self
def use_suggested_query(self, label: str) -> "OperatorWorkbench":
"""Click a suggested-query chip, which fills the manual-query field."""
self.page.get_by_role("button", name=label).click()
return self
def upload_submission(self, file_path: str) -> "OperatorWorkbench":
self.page.set_input_files("#submission-image", str(file_path))
return self
def confirm_upload(self) -> "OperatorWorkbench":
self.page.get_by_role("button", name="사진 넣기").click()
return self
def upload_and_open(self, file_path: str, case_id: str, timeout: int = 10000) -> "OperatorWorkbench":
"""Upload an image and wait until its new case is queued and the workbench opens."""
self.upload_submission(file_path)
self.confirm_upload()
self.wait_for_case(case_id, state="attached", timeout=timeout)
self.workbench_view.wait_for(state="visible", timeout=timeout)
return self
def set_decision_memo(self, text: str) -> "OperatorWorkbench":
self.decision_memo.fill(text)
return self
def decide(self, decision: str) -> "OperatorWorkbench":
"""Click a decision button by its ``data-decision`` value (approved/held/rejected)."""
self.page.locator(f'[data-decision="{decision}"]').click()
return self
def wait_for_decision_label(self, label: str, timeout: int = 10000) -> "OperatorWorkbench":
"""Wait until the recommendation box reflects a persisted decision label."""
self.page.wait_for_function(
"(label) => document.getElementById('recommendation-box')"
"?.innerText.includes('현재 운영 결정: ' + label)",
arg=label,
timeout=timeout,
)
return self

View file

@ -1,9 +1,33 @@
from __future__ import annotations
import json
from pathlib import Path
import sys
from threading import Thread
import pytest
from .pages import OperatorWorkbench
from .pages.workbench_page import SUGGESTED_QUERY_LOADED_STATUS
ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(ROOT / "src"))
APP_DIR = ROOT / "web" / "operator-gui"
from rights_filter.server.http_app import build_server
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server.sqlite_store import CopyrighterStore
def _start(server):
thread = Thread(target=server.serve_forever, daemon=True)
thread.start()
return thread
def _browser_or_skip(playwright):
try:
return playwright.chromium.launch(headless=True)
except Exception as exc:
pytest.skip(f"Playwright Chromium is not available: {exc}")
def _bootstrap_payload():
@ -75,42 +99,93 @@ def _bootstrap_payload():
}
def test_browser_smoke_suggested_query_fills_manual_query_without_running_search(operator_server, operator_page):
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
workbench.mock_bootstrap(_bootstrap_payload()).goto()
def test_browser_smoke_suggested_query_fills_manual_query_without_running_search(tmp_path: Path):
playwright = pytest.importorskip("playwright.sync_api")
image_root = tmp_path / "submissions"
image_root.mkdir()
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3")
store.initialize()
server = build_server(
host="127.0.0.1",
port=0,
store=store,
image_store=LocalSubmissionImageStore(image_root),
static_dir=APP_DIR,
)
_start(server)
base = f"http://127.0.0.1:{server.server_port}"
try:
workbench.wait_for_case("SUB-SMOKE1", timeout=5000)
except Exception as exc:
body = workbench.page.locator("body").inner_text()[:1000]
pytest.fail(f"{exc}\nerrors={workbench.console_errors}\nbody={body}")
with playwright.sync_playwright() as pw:
browser = _browser_or_skip(pw)
page = browser.new_page(viewport={"width": 1280, "height": 900})
browser_errors = []
page.on("console", lambda message: browser_errors.append(f"console:{message.type}:{message.text}"))
page.on("pageerror", lambda error: browser_errors.append(f"pageerror:{error}"))
page.route(
"**/api/bootstrap",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body=json.dumps(_bootstrap_payload(), ensure_ascii=False),
),
)
page.goto(base + "/")
try:
page.wait_for_selector('#queue-body [data-select-case="SUB-SMOKE1"]', timeout=5000)
except Exception as exc:
pytest.fail(f"{exc}\nerrors={browser_errors}\nbody={page.locator('body').inner_text()[:1000]}")
page.get_by_role("button", name="SUB-SMOKE1").click()
page.get_by_role("button", name="Smoke sample 저작권").click()
workbench.select_case("SUB-SMOKE1")
workbench.use_suggested_query("Smoke sample 저작권")
assert page.locator('[data-workbench-panel="queries"]').is_visible()
assert page.locator("#manual-query").input_value() == "Smoke sample 저작권"
assert page.locator("#manual-query-status").inner_text() == "추천 쿼리를 입력했습니다. 실행 버튼을 눌러 검색하세요.", browser_errors
assert workbench.panel("queries").is_visible()
assert workbench.manual_query.input_value() == "Smoke sample 저작권"
assert workbench.manual_query_status.inner_text() == SUGGESTED_QUERY_LOADED_STATUS, workbench.console_errors
browser.close()
finally:
server.shutdown()
def test_browser_uploads_image_and_selects_new_submission(operator_server, operator_page, tmp_path):
def test_browser_uploads_image_and_selects_new_submission(tmp_path: Path):
playwright = pytest.importorskip("playwright.sync_api")
image_root = tmp_path / "submissions"
image_root.mkdir()
upload_file = tmp_path / "smoke upload.svg"
upload_file.write_text("<svg xmlns='http://www.w3.org/2000/svg' width='80' height='60'></svg>", encoding="utf-8")
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
workbench.goto()
workbench.upload_submission(str(upload_file))
assert workbench.submission_image_name.inner_text() == "smoke upload.svg"
workbench.confirm_upload()
workbench.wait_for_case("smoke-upload", state="attached", timeout=10000)
workbench.workbench_view.wait_for(state="visible", timeout=10000)
assert (operator_server.image_root / "images" / "smoke-upload.svg").exists()
assert (
"smoke-upload 사진이 추가되었습니다. 새 심사 건으로 바로 선택했습니다."
in workbench.submission_import_status.inner_text()
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3")
store.initialize()
server = build_server(
host="127.0.0.1",
port=0,
store=store,
image_store=LocalSubmissionImageStore(image_root),
static_dir=APP_DIR,
)
assert workbench.case_title.inner_text() == "smoke-upload · smoke-upload"
assert not workbench.console_errors
_start(server)
base = f"http://127.0.0.1:{server.server_port}"
try:
with playwright.sync_playwright() as pw:
browser = _browser_or_skip(pw)
page = browser.new_page(viewport={"width": 1280, "height": 900})
browser_errors = []
page.on("console", lambda message: browser_errors.append(f"console:{message.type}:{message.text}"))
page.on("pageerror", lambda error: browser_errors.append(f"pageerror:{error}"))
page.goto(base + "/")
page.set_input_files("#submission-image", str(upload_file))
assert page.locator("#submission-image-name").inner_text() == "smoke upload.svg"
page.get_by_role("button", name="사진 넣기").click()
page.wait_for_selector('#queue-body [data-select-case="smoke-upload"]', state="attached", timeout=10000)
page.wait_for_selector("#workbench-view", state="visible", timeout=10000)
assert (image_root / "images" / "smoke-upload.svg").exists()
assert "smoke-upload 사진이 추가되었습니다. 새 심사 건으로 바로 선택했습니다." in page.locator("#submission-import-status").inner_text()
assert page.locator("#case-title").inner_text() == "smoke-upload · smoke-upload"
assert not browser_errors
browser.close()
finally:
server.shutdown()

View file

@ -1,72 +0,0 @@
"""Operator decision flow (approve / hold / reject) end-to-end.
Runs against the real server (no bootstrap stub): each test uploads an image to
seed a genuine submission, then exercises the decision pane and asserts the
decision both renders and persists through ``refreshFromApi``.
"""
from __future__ import annotations
from pathlib import Path
from .pages import OperatorWorkbench
from .pages.workbench_page import MEMO_REQUIRED_ERROR
_SVG = "<svg xmlns='http://www.w3.org/2000/svg' width='80' height='60'></svg>"
def _open_seeded_case(workbench: OperatorWorkbench, tmp_path: Path, case_id: str = "decide-me") -> str:
upload_file = tmp_path / f"{case_id}.svg"
upload_file.write_text(_SVG, encoding="utf-8")
workbench.goto().upload_and_open(str(upload_file), case_id)
# A freshly uploaded submission starts unreviewed.
assert "현재 운영 결정: 미심사" in workbench.recommendation_box.inner_text()
return case_id
def test_reject_without_memo_is_blocked_and_not_persisted(operator_server, operator_page, tmp_path):
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
_open_seeded_case(workbench, tmp_path)
workbench.decide("rejected") # 반려 requires a memo
assert workbench.memo_error.inner_text() == MEMO_REQUIRED_ERROR
# The guard fires client-side, so the decision is never sent or persisted.
assert "현재 운영 결정: 미심사" in workbench.recommendation_box.inner_text()
def test_approve_persists_without_memo(operator_server, operator_page, tmp_path):
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
_open_seeded_case(workbench, tmp_path)
workbench.decide("approved") # 승인 needs no memo
workbench.wait_for_decision_label("승인")
assert workbench.memo_error.inner_text() == ""
assert not workbench.console_errors
def test_hold_persists_without_memo(operator_server, operator_page, tmp_path):
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
_open_seeded_case(workbench, tmp_path)
workbench.decide("held") # 보류 needs no memo
workbench.wait_for_decision_label("보류")
assert workbench.memo_error.inner_text() == ""
assert not workbench.console_errors
def test_reject_with_memo_persists_and_clears_memo(operator_server, operator_page, tmp_path):
workbench = OperatorWorkbench(operator_page, operator_server.base_url)
_open_seeded_case(workbench, tmp_path)
workbench.set_decision_memo("저작권 침해 소지")
workbench.decide("rejected")
workbench.wait_for_decision_label("반려")
assert workbench.memo_error.inner_text() == ""
# The memo field is cleared after a successful decision.
assert workbench.decision_memo.input_value() == ""
assert not workbench.console_errors

View file

@ -13,7 +13,6 @@ from rights_filter.analysis.face_person_detection import FacePersonSignal
from rights_filter.server.http_app import build_server
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_enrichment as store_enrichment_module
from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime
@ -76,7 +75,6 @@ class OneFaceDetector:
@pytest.fixture(autouse=True)
def use_explicit_test_face_detector(monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
def _fixtures(tmp_path: Path):
@ -1235,7 +1233,6 @@ class OneFaceBoxDetector:
def test_face_crops_are_persisted_served_and_listed_in_review(tmp_path: Path, monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceBoxDetector())
static_dir, image_store, store = _png_fixtures(tmp_path)
server = build_server(host="127.0.0.1", port=0, store=store, image_store=image_store, static_dir=static_dir)
_start(server)

View file

@ -12,9 +12,7 @@ from rights_filter.analysis.preprocessing import ImagePayload, build_face_crop_d
from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.server.image_store import LocalSubmissionImageStore
from rights_filter.server import sqlite_store as sqlite_store_module
from rights_filter.server import store_enrichment as store_enrichment_module
from rights_filter.server import store_remote_fetch as remote_fetch_module
from rights_filter.server import store_serialization as store_serialization_module
from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.integrations.env_clients import build_provider_runtime
@ -102,7 +100,6 @@ class OneFaceDetector:
@pytest.fixture(autouse=True)
def use_explicit_test_face_detector(monkeypatch):
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
def _image_store(tmp_path: Path) -> LocalSubmissionImageStore:
@ -1015,12 +1012,10 @@ def test_sqlite_store_refreshes_existing_local_face_evidence_when_reloaded(tmp_p
store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: NoFaceDetector())
store.seed_from_image_store(image_store)
assert not any(item["source"] == "face" for item in store.review("SUB-DB1")["evidence"])
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
imported = store.seed_from_image_store(image_store)
review = store.review("SUB-DB1")
@ -7420,7 +7415,6 @@ def test_sqlite_store_rerun_enrichment_runs_google_web_detection_on_face_crop_wh
store = CopyrighterStore(tmp_path / "copyrighter.sqlite3", provider_runtime=runtime)
store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: OneFaceDetector())
store.seed_from_image_store(image_store)
calls_after_seed = len(transport.calls)
@ -7499,7 +7493,6 @@ def test_sqlite_store_compares_face_crop_search_result_images_against_crop_finge
)
store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store)
review = store.rerun_enrichment("SUB-FACE1", image_store)
@ -7592,7 +7585,6 @@ def test_sqlite_store_compares_face_crop_search_result_page_images_against_crop_
)
store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store)
review = store.rerun_enrichment("SUB-FACE1", image_store)
@ -7678,7 +7670,6 @@ def test_sqlite_store_rerun_enrichment_uses_face_crop_page_title_for_naver_searc
)
store.initialize()
monkeypatch.setattr(sqlite_store_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
monkeypatch.setattr(store_enrichment_module, "HeuristicFacePersonDetector", lambda: BoxedFaceDetector())
store.seed_from_image_store(image_store)
calls_after_seed = len(transport.calls)
@ -8041,7 +8032,7 @@ def test_record_decision_rolls_back_when_audit_fails(tmp_path: Path, monkeypatch
# The status change must roll back atomically with the failed audit write.
assert store._get("submissions", "SUB-1")["decisionStatus"] == "unreviewed"
with pytest.raises(KeyError):
store._get("knowledge_entries", store_serialization_module._stable_id("kb-watchlist", "SUB-1"))
store._get("knowledge_entries", sqlite_store_module._stable_id("kb-watchlist", "SUB-1"))
@pytest.mark.parametrize(