refactor: extract store constants and remaining domain helpers

Move shared constants + _bounded_int_env into store_constants (a leaf module),
and the remaining module-level domain helpers (validation, query signatures,
search-hint evidence, watchlist selection, knowledge type/provenance) into
store_serialization. sqlite_store.py is now the CopyrighterStore class plus thin
imports: 3613 -> 3368 lines (5333 -> 3368 overall, -37%). All behavior-preserving.
This commit is contained in:
유창욱 2026-06-20 21:38:03 +09:00
parent 8e53139029
commit 3be7b016ce
3 changed files with 314 additions and 273 deletions

View file

@ -28,7 +28,6 @@ from rights_filter.domain.records import (
EvidenceSource, EvidenceSource,
InMemoryRightsFilterRepository, InMemoryRightsFilterRepository,
KnowledgeBaseEntry, KnowledgeBaseEntry,
KnowledgeEntryType,
) )
from rights_filter.integrations.cloud_vision_web_detection import ( from rights_filter.integrations.cloud_vision_web_detection import (
CloudVisionWebDetectionAdapter, CloudVisionWebDetectionAdapter,
@ -38,6 +37,18 @@ from rights_filter.integrations.env_clients import ProviderRuntime, build_provid
from rights_filter.integrations.external_policy import ExternalApiPolicy from rights_filter.integrations.external_policy import ExternalApiPolicy
from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage from rights_filter.jobs.batch_analyzer import BatchAnalyzer, SubmissionImage
from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES from rights_filter.server.image_store import LocalSubmissionImageStore, SUPPORTED_IMAGE_SUFFIXES
from rights_filter.server.store_constants import (
DEFAULT_COVERAGE_GOOD_THRESHOLD,
DEFAULT_COVERAGE_WARN_THRESHOLD,
DEFAULT_FACE_CROP_RETENTION_DAYS,
DEFAULT_QUERY_COVERAGE_GOOD_THRESHOLD,
DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD,
EVIDENCE_OPERATOR_STATUSES,
MAX_COVERAGE_THRESHOLD,
MIN_COVERAGE_THRESHOLD,
NON_CONTRIBUTING_OPERATOR_STATUSES,
_bounded_int_env,
)
from rights_filter.server.store_remote_fetch import ( from rights_filter.server.store_remote_fetch import (
_fetch_page_url_bytes, _fetch_page_url_bytes,
_fetch_stylesheet_url_bytes, _fetch_stylesheet_url_bytes,
@ -58,27 +69,42 @@ from rights_filter.server.store_schema import (
_ensure_typed_columns, _ensure_typed_columns,
) )
from rights_filter.server.store_serialization import ( from rights_filter.server.store_serialization import (
_default_evidence_contribution,
_domain_evidence_from_ui, _domain_evidence_from_ui,
_evidence_id, _evidence_id,
_evidence_matches_provider, _evidence_matches_provider,
_evidence_payload, _evidence_payload,
_existing_google_custom_query_signatures,
_existing_naver_query_signatures,
_external_provider_ids, _external_provider_ids,
_external_provider_state_for_submission, _external_provider_state_for_submission,
_face_crop_web_evidence, _face_crop_web_evidence,
_google_custom_image_query_signature,
_google_custom_web_query_signature,
_google_weak_label_title, _google_weak_label_title,
_image_size_from_bytes, _image_size_from_bytes,
_image_suffix_from_url, _image_suffix_from_url,
_is_google_weak_label_payload, _is_google_weak_label_payload,
_knowledge_entry_type,
_knowledge_provenance, _knowledge_provenance,
_knowledge_type_value,
_naver_blog_query_signature,
_naver_query_signature,
_naver_web_query_signature,
_now_label, _now_label,
_provider_item_failed, _provider_item_failed,
_provider_item_has_result, _provider_item_has_result,
_query_history_status,
_safe_filename, _safe_filename,
_safe_image_suffix, _safe_image_suffix,
_stable_id, _stable_id,
_strip_html, _strip_html,
_submission_payload, _submission_payload,
_submission_search_hint_evidence,
_timestamp_id, _timestamp_id,
_validate_payload,
_validate_table,
_watchlist_source_evidence,
) )
from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_text import _text_list, _unique_texts
from rights_filter.server.store_url_utils import ( from rights_filter.server.store_url_utils import (
@ -90,116 +116,6 @@ from rights_filter.server.store_url_utils import (
) )
EVIDENCE_OPERATOR_STATUSES = {
"used_for_judgment": "판단에 사용",
"irrelevant": "무관",
"false_positive": "오탐",
"pending": "보류",
}
NON_CONTRIBUTING_OPERATOR_STATUSES = {"irrelevant", "false_positive"}
SUBMISSION_FINAL_STATUSES = {"approved", "rejected", "corrected"}
QUEUE_ID_PLACEHOLDER = ""
STORE_TABLES = {
"submissions",
"evidence",
"providers",
"knowledge_entries",
"collection_candidates",
"corrections",
"audit_events",
"submission_queues",
}
RISK_BANDS = ("low", "medium", "high", "failed", "pending")
DECISION_STATUSES = ("unreviewed", "held", "rejected", "approved", "corrected")
EVIDENCE_STATUSES = (
"active",
"auto",
"manual",
"queued",
"rerun",
"weak",
"used_for_judgment",
"irrelevant",
"false_positive",
"pending",
)
KNOWLEDGE_STATUSES = ("confirmed", "watchlist", "excluded")
COLLECTION_STATUSES = ("candidate", "promoted")
DEFAULT_COVERAGE_GOOD_THRESHOLD = 70
DEFAULT_COVERAGE_WARN_THRESHOLD = 40
DEFAULT_QUERY_COVERAGE_GOOD_THRESHOLD = 70
DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40
MIN_COVERAGE_THRESHOLD = 0
MAX_COVERAGE_THRESHOLD = 100
DEFAULT_FACE_CROP_RETENTION_DAYS = 90
PAYLOAD_REQUIRED_FIELDS = {
"submissions": {
"id",
"title",
"asset",
"riskScore",
"riskBand",
"decisionStatus",
"providerState",
"fileFacts",
"evidence",
},
"evidence": {
"id",
"source",
"title",
"confidence",
"status",
"submission_id",
},
"providers": {
"id",
"name",
"enabled",
"usage",
"quota",
"lastSuccess",
"lastFailure",
},
"knowledge_entries": {
"id",
"name",
"type",
"provenance",
"active",
"entryStatus",
"sampleFingerprints",
},
"collection_candidates": {
"id",
"provider",
"query",
"title",
"status",
"sourceUrl",
"collectedEpoch",
"sampleFingerprints",
},
"corrections": {
"id",
},
}
def _bounded_int_env(value: str | None, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value) if value is not None else default
except (TypeError, ValueError):
return default
if parsed < minimum:
return minimum
if parsed > maximum:
return maximum
return parsed
class CopyrighterStore: class CopyrighterStore:
def __init__( def __init__(
self, self,
@ -3450,164 +3366,3 @@ class CopyrighterStore:
return grouped return grouped
def _validate_table(table: str) -> None:
if table not in STORE_TABLES:
raise ValueError(f"unsupported store table: {table}")
def _validate_payload(table: str, id_value: str, payload: dict[str, Any]) -> None:
_validate_table(table)
if not isinstance(payload, dict):
raise ValueError(f"{table} payload must be an object")
required = PAYLOAD_REQUIRED_FIELDS.get(table, set())
missing = sorted(field for field in required if field not in payload)
if missing:
raise ValueError(f"{table} payload missing required fields: {', '.join(missing)}")
if table != "audit_events" and "id" in payload and str(payload["id"]) != str(id_value):
raise ValueError(f"{table} payload id does not match storage id")
def _existing_naver_query_signatures(evidence: list[dict[str, Any]]) -> set[str]:
return {
str(item.get("querySignature") or _naver_query_signature(str(item.get("query", ""))))
for item in evidence
if item.get("query") and item.get("source") in {"naver", "failure"}
}
def _existing_google_custom_query_signatures(evidence: list[dict[str, Any]]) -> set[str]:
return {
str(item.get("querySignature") or "")
for item in evidence
if item.get("query")
and item.get("source") in {"google", "failure"}
and str(item.get("domain", "")) == "google_custom_search"
and item.get("querySignature")
}
def _google_custom_image_query_signature(query: str) -> str:
return "google-custom-image:" + " ".join(query.lower().split())
def _google_custom_web_query_signature(query: str) -> str:
return "google-custom-web:" + " ".join(query.lower().split())
def _naver_query_signature(query: str) -> str:
return "naver:" + " ".join(query.lower().split())
def _naver_blog_query_signature(query: str) -> str:
return "naver-blog:" + " ".join(query.lower().split())
def _naver_web_query_signature(query: str) -> str:
return "naver-web:" + " ".join(query.lower().split())
def _submission_search_hint_evidence(record: dict[str, Any]) -> list[Evidence]:
hints: list[Evidence] = []
title = str(record.get("title", "")).strip()
if title:
hints.append(_local_query_hint_evidence(title, "title"))
file_value = str(record.get("file", record.get("asset", ""))).strip()
file_stem = Path(urlparse(file_value).path).stem if file_value else ""
if file_stem and file_stem != title:
hints.append(_local_query_hint_evidence(file_stem, "file"))
return hints
def _local_query_hint_evidence(query: str, hint_source: str) -> Evidence:
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason="Local submission search hint",
confidence=0.0,
data={
"local_query_hint": True,
"query": query,
"hint_source": hint_source,
},
)
def _query_history_status(evidence: list[Evidence]) -> str:
if any(item.source == EvidenceSource.ENRICHMENT_FAILURE for item in evidence):
return "failed"
if evidence and all(item.source == EvidenceSource.SEARCH_SKIPPED for item in evidence):
return "skipped"
return "auto"
def _default_evidence_contribution(payload: dict[str, Any]) -> bool:
source = str(payload.get("source", ""))
if source in {"llm", "failure"}:
return False
if bool(payload.get("faceCropSearch", False)):
return False
if _is_google_weak_label_payload(payload):
return False
return True
def _watchlist_source_evidence(evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
used = [
item
for item in evidence
if item.get("operatorStatus") == "used_for_judgment"
and item.get("id")
]
if used:
return _sorted_evidence_for_watchlist(used)[:5]
candidates = [
item
for item in evidence
if item.get("id")
and item.get("operatorStatus") not in NON_CONTRIBUTING_OPERATOR_STATUSES
and item.get("source") not in {"llm", "failure"}
and item.get("title") != "Image fingerprints generated"
]
if candidates:
return _sorted_evidence_for_watchlist(candidates)[:5]
fallback = [
item
for item in evidence
if item.get("id")
and item.get("operatorStatus") not in NON_CONTRIBUTING_OPERATOR_STATUSES
]
return _sorted_evidence_for_watchlist(fallback)[:5]
def _sorted_evidence_for_watchlist(evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
evidence,
key=lambda item: (
1 if item.get("contributed", True) else 0,
float(item.get("confidence", 0) or 0),
str(item.get("retrievedAt", "")),
),
reverse=True,
)
def _knowledge_type_value(value: str) -> str:
normalized = value.strip() or "other"
aliases = {
"public_figure": "celebrity",
"rejected_reference": "rejected_image",
}
return aliases.get(normalized, normalized)
def _knowledge_entry_type(value: str) -> KnowledgeEntryType:
try:
return KnowledgeEntryType(_knowledge_type_value(value))
except ValueError:
return KnowledgeEntryType.OTHER

View file

@ -0,0 +1,113 @@
"""Shared constants and small config helpers for the SQLite store and its
extracted submodules. Leaf module (no internal imports) so any store module can
depend on it without import cycles. Extracted from sqlite_store.py unchanged.
"""
from __future__ import annotations
EVIDENCE_OPERATOR_STATUSES = {
"used_for_judgment": "판단에 사용",
"irrelevant": "무관",
"false_positive": "오탐",
"pending": "보류",
}
NON_CONTRIBUTING_OPERATOR_STATUSES = {"irrelevant", "false_positive"}
SUBMISSION_FINAL_STATUSES = {"approved", "rejected", "corrected"}
QUEUE_ID_PLACEHOLDER = ""
STORE_TABLES = {
"submissions",
"evidence",
"providers",
"knowledge_entries",
"collection_candidates",
"corrections",
"audit_events",
"submission_queues",
}
RISK_BANDS = ("low", "medium", "high", "failed", "pending")
DECISION_STATUSES = ("unreviewed", "held", "rejected", "approved", "corrected")
EVIDENCE_STATUSES = (
"active",
"auto",
"manual",
"queued",
"rerun",
"weak",
"used_for_judgment",
"irrelevant",
"false_positive",
"pending",
)
KNOWLEDGE_STATUSES = ("confirmed", "watchlist", "excluded")
COLLECTION_STATUSES = ("candidate", "promoted")
DEFAULT_COVERAGE_GOOD_THRESHOLD = 70
DEFAULT_COVERAGE_WARN_THRESHOLD = 40
DEFAULT_QUERY_COVERAGE_GOOD_THRESHOLD = 70
DEFAULT_QUERY_COVERAGE_WARN_THRESHOLD = 40
MIN_COVERAGE_THRESHOLD = 0
MAX_COVERAGE_THRESHOLD = 100
DEFAULT_FACE_CROP_RETENTION_DAYS = 90
PAYLOAD_REQUIRED_FIELDS = {
"submissions": {
"id",
"title",
"asset",
"riskScore",
"riskBand",
"decisionStatus",
"providerState",
"fileFacts",
"evidence",
},
"evidence": {
"id",
"source",
"title",
"confidence",
"status",
"submission_id",
},
"providers": {
"id",
"name",
"enabled",
"usage",
"quota",
"lastSuccess",
"lastFailure",
},
"knowledge_entries": {
"id",
"name",
"type",
"provenance",
"active",
"entryStatus",
"sampleFingerprints",
},
"collection_candidates": {
"id",
"provider",
"query",
"title",
"status",
"sourceUrl",
"collectedEpoch",
"sampleFingerprints",
},
"corrections": {
"id",
},
}
def _bounded_int_env(value: str | None, default: int, minimum: int, maximum: int) -> int:
try:
parsed = int(value) if value is not None else default
except (TypeError, ValueError):
return default
if parsed < minimum:
return minimum
if parsed > maximum:
return maximum
return parsed

View file

@ -18,8 +18,18 @@ from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import urlparse from urllib.parse import urlparse
from rights_filter.domain.records import Evidence, EvidenceSource, KnowledgeProvenance from rights_filter.domain.records import (
Evidence,
EvidenceSource,
KnowledgeEntryType,
KnowledgeProvenance,
)
from rights_filter.server.image_store import SUPPORTED_IMAGE_SUFFIXES from rights_filter.server.image_store import SUPPORTED_IMAGE_SUFFIXES
from rights_filter.server.store_constants import (
NON_CONTRIBUTING_OPERATOR_STATUSES,
PAYLOAD_REQUIRED_FIELDS,
STORE_TABLES,
)
from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_text import _text_list, _unique_texts
@ -422,3 +432,166 @@ def _label_to_epoch(value: str) -> int:
def _timestamp_id() -> str: def _timestamp_id() -> str:
return datetime.now().strftime("%Y%m%d%H%M%S%f") return datetime.now().strftime("%Y%m%d%H%M%S%f")
def _validate_table(table: str) -> None:
if table not in STORE_TABLES:
raise ValueError(f"unsupported store table: {table}")
def _validate_payload(table: str, id_value: str, payload: dict[str, Any]) -> None:
_validate_table(table)
if not isinstance(payload, dict):
raise ValueError(f"{table} payload must be an object")
required = PAYLOAD_REQUIRED_FIELDS.get(table, set())
missing = sorted(field for field in required if field not in payload)
if missing:
raise ValueError(f"{table} payload missing required fields: {', '.join(missing)}")
if table != "audit_events" and "id" in payload and str(payload["id"]) != str(id_value):
raise ValueError(f"{table} payload id does not match storage id")
def _existing_naver_query_signatures(evidence: list[dict[str, Any]]) -> set[str]:
return {
str(item.get("querySignature") or _naver_query_signature(str(item.get("query", ""))))
for item in evidence
if item.get("query") and item.get("source") in {"naver", "failure"}
}
def _existing_google_custom_query_signatures(evidence: list[dict[str, Any]]) -> set[str]:
return {
str(item.get("querySignature") or "")
for item in evidence
if item.get("query")
and item.get("source") in {"google", "failure"}
and str(item.get("domain", "")) == "google_custom_search"
and item.get("querySignature")
}
def _google_custom_image_query_signature(query: str) -> str:
return "google-custom-image:" + " ".join(query.lower().split())
def _google_custom_web_query_signature(query: str) -> str:
return "google-custom-web:" + " ".join(query.lower().split())
def _naver_query_signature(query: str) -> str:
return "naver:" + " ".join(query.lower().split())
def _naver_blog_query_signature(query: str) -> str:
return "naver-blog:" + " ".join(query.lower().split())
def _naver_web_query_signature(query: str) -> str:
return "naver-web:" + " ".join(query.lower().split())
def _submission_search_hint_evidence(record: dict[str, Any]) -> list[Evidence]:
hints: list[Evidence] = []
title = str(record.get("title", "")).strip()
if title:
hints.append(_local_query_hint_evidence(title, "title"))
file_value = str(record.get("file", record.get("asset", ""))).strip()
file_stem = Path(urlparse(file_value).path).stem if file_value else ""
if file_stem and file_stem != title:
hints.append(_local_query_hint_evidence(file_stem, "file"))
return hints
def _local_query_hint_evidence(query: str, hint_source: str) -> Evidence:
return Evidence(
source=EvidenceSource.FINGERPRINT,
reason="Local submission search hint",
confidence=0.0,
data={
"local_query_hint": True,
"query": query,
"hint_source": hint_source,
},
)
def _query_history_status(evidence: list[Evidence]) -> str:
if any(item.source == EvidenceSource.ENRICHMENT_FAILURE for item in evidence):
return "failed"
if evidence and all(item.source == EvidenceSource.SEARCH_SKIPPED for item in evidence):
return "skipped"
return "auto"
def _default_evidence_contribution(payload: dict[str, Any]) -> bool:
source = str(payload.get("source", ""))
if source in {"llm", "failure"}:
return False
if bool(payload.get("faceCropSearch", False)):
return False
if _is_google_weak_label_payload(payload):
return False
return True
def _watchlist_source_evidence(evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
used = [
item
for item in evidence
if item.get("operatorStatus") == "used_for_judgment"
and item.get("id")
]
if used:
return _sorted_evidence_for_watchlist(used)[:5]
candidates = [
item
for item in evidence
if item.get("id")
and item.get("operatorStatus") not in NON_CONTRIBUTING_OPERATOR_STATUSES
and item.get("source") not in {"llm", "failure"}
and item.get("title") != "Image fingerprints generated"
]
if candidates:
return _sorted_evidence_for_watchlist(candidates)[:5]
fallback = [
item
for item in evidence
if item.get("id")
and item.get("operatorStatus") not in NON_CONTRIBUTING_OPERATOR_STATUSES
]
return _sorted_evidence_for_watchlist(fallback)[:5]
def _sorted_evidence_for_watchlist(evidence: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
evidence,
key=lambda item: (
1 if item.get("contributed", True) else 0,
float(item.get("confidence", 0) or 0),
str(item.get("retrievedAt", "")),
),
reverse=True,
)
def _knowledge_type_value(value: str) -> str:
normalized = value.strip() or "other"
aliases = {
"public_figure": "celebrity",
"rejected_reference": "rejected_image",
}
return aliases.get(normalized, normalized)
def _knowledge_entry_type(value: str) -> KnowledgeEntryType:
try:
return KnowledgeEntryType(_knowledge_type_value(value))
except ValueError:
return KnowledgeEntryType.OTHER