From 6184d0f464a141c92e88bb87b922d6348f97f622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 22:35:02 +0900 Subject: [PATCH] refactor: extract submission-queue management into StoreQueueMixin Move queue id derivation, active-queue selection, legacy-submission migration, and queue bootstrap/ensure helpers into a mixin; CopyrighterStore inherits it. Drop now-unused hashlib import. sqlite_store.py 874 -> 724 lines (5119 -> 724, -86%); now under the 800-line guideline. --- src/rights_filter/server/sqlite_store.py | 153 +------------------- src/rights_filter/server/store_queue.py | 170 +++++++++++++++++++++++ 2 files changed, 172 insertions(+), 151 deletions(-) create mode 100644 src/rights_filter/server/store_queue.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 44f99f5..9f056e5 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -1,6 +1,5 @@ from __future__ import annotations -import hashlib import json import os import shutil @@ -50,6 +49,7 @@ from rights_filter.server.store_remote_fetch import ( from rights_filter.server.store_enrichment import StoreEnrichmentMixin from rights_filter.server.store_operations import StoreOperationsMixin from rights_filter.server.store_persistence import StorePersistenceMixin +from rights_filter.server.store_queue import StoreQueueMixin from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin from rights_filter.server.store_schema import ( _ensure_constrained_schema, @@ -72,6 +72,7 @@ from rights_filter.server.store_serialization import ( class CopyrighterStore( StorePersistenceMixin, + StoreQueueMixin, StoreSearchCandidatesMixin, StoreEnrichmentMixin, StoreOperationsMixin, @@ -233,156 +234,6 @@ class CopyrighterStore( """ ) - @staticmethod - def _queue_id_for_path(folder_path: Path) -> str: - return f"queue-{hashlib.sha1(str(folder_path).encode('utf-8')).hexdigest()[:16]}" - - @staticmethod - def _normalize_queue_folder(folder_path: Path | str) -> Path: - return Path(folder_path).resolve() - - def _set_active_queue(self, queue_id: str) -> None: - with self._connect() as conn: - conn.execute("update submission_queues set is_active = 0") - conn.execute( - "update submission_queues set is_active = 1 where id = ?", - (queue_id,), - ) - - def _queue_row_by_id(self, queue_id: str) -> dict[str, Any] | None: - with self._connect() as conn: - row = conn.execute( - """ - select id, folder_path, label, is_active, created_at, created_epoch, - last_imported_epoch, last_imported_at - from submission_queues - where id = ? - """, - (queue_id,), - ).fetchone() - if row is None: - return None - return { - "id": row["id"], - "folderPath": row["folder_path"], - "label": row["label"], - "isActive": bool(row["is_active"]), - "createdAt": row["created_at"], - "createdEpoch": row["created_epoch"], - "lastImportedAt": row["last_imported_at"], - "lastImportedEpoch": row["last_imported_epoch"], - } - - def _migrate_legacy_submissions_to_queue(self, queue_id: str, conn: sqlite3.Connection | None = None) -> None: - should_close = conn is None - if conn is None: - conn = self._connect() - try: - conn.execute( - "update submissions set queue_id = ? where queue_id = ''", - (queue_id,), - ) - if should_close: - conn.commit() - finally: - if should_close: - conn.close() - - def bootstrap_active_queue(self) -> dict[str, Any] | None: - with self._connect() as conn: - row = conn.execute( - """ - select id, folder_path, label, is_active, created_at, created_epoch, - last_imported_epoch, last_imported_at - from submission_queues - where is_active = 1 - limit 1 - """, - ).fetchone() - if row is None: - return None - return { - "id": row["id"], - "folderPath": row["folder_path"], - "label": row["label"], - "isActive": bool(row["is_active"]), - "createdAt": row["created_at"], - "createdEpoch": row["created_epoch"], - "lastImportedAt": row["last_imported_at"], - "lastImportedEpoch": row["last_imported_epoch"], - } - - def ensure_queue(self, folder_path: Path | str, label: str | None = None) -> dict[str, Any]: - folder = self._normalize_queue_folder(folder_path) - queue_id = self._queue_id_for_path(folder) - now = datetime.now() - queue_label = str(label or "").strip() or folder.name - - with self._connect() as conn: - existing = conn.execute( - "select id from submission_queues where id = ? or folder_path = ?", - (queue_id, str(folder)), - ).fetchone() - if existing is None: - conn.execute( - """ - insert into submission_queues ( - id, folder_path, label, is_active, created_at, created_epoch, last_imported_epoch, last_imported_at - ) values (?, ?, ?, 1, ?, ?, 0, '') - """, - (queue_id, str(folder), queue_label, now.isoformat(" ", "seconds"), int(now.timestamp())), - ) - else: - conn.execute( - "update submission_queues set is_active = 1, folder_path = ?, label = ? where id = ?", - (str(folder), queue_label, queue_id), - ) - conn.execute("update submission_queues set is_active = 0 where id != ?", (queue_id,)) - if self._has_queueless_submissions(conn=conn): - self._migrate_legacy_submissions_to_queue(queue_id, conn=conn) - queue_row = conn.execute( - """ - select id, folder_path, label, is_active, created_at, created_epoch, - last_imported_epoch, last_imported_at - from submission_queues - where id = ? - """, - (queue_id,), - ).fetchone() - if queue_row is None: - return self._queue_row_by_id(queue_id) - return { - "id": queue_row["id"], - "folderPath": queue_row["folder_path"], - "label": queue_row["label"], - "isActive": bool(queue_row["is_active"]), - "createdAt": queue_row["created_at"], - "createdEpoch": queue_row["created_epoch"], - "lastImportedAt": queue_row["last_imported_at"], - "lastImportedEpoch": queue_row["last_imported_epoch"], - } - - def active_submission_image_store(self, fallback_root: Path | str | None = None) -> LocalSubmissionImageStore: - queue = self.bootstrap_active_queue() - if queue is None: - if fallback_root is None: - raise ValueError("no active submission queue is configured") - return LocalSubmissionImageStore(fallback_root) - return LocalSubmissionImageStore(queue["folderPath"]) - - def _has_queueless_submissions(self, conn: sqlite3.Connection | None = None) -> bool: - should_close = conn is None - if conn is None: - conn = self._connect() - try: - count_row = conn.execute( - "select count(*) as count from submissions where queue_id = ''" - ).fetchone() - return bool(int(count_row["count"]) > 0) if count_row else False - finally: - if should_close: - conn.close() - def seed_from_image_store(self, image_store: LocalSubmissionImageStore) -> int: # Serialize seeding so two concurrent reload/import/upload requests can't # both classify the same record as new and double-run analysis, external diff --git a/src/rights_filter/server/store_queue.py b/src/rights_filter/server/store_queue.py new file mode 100644 index 0000000..923bbcc --- /dev/null +++ b/src/rights_filter/server/store_queue.py @@ -0,0 +1,170 @@ +"""Submission-queue management for CopyrighterStore, as a mixin. + +Queue id derivation, active-queue selection, legacy-submission migration, and +the active-queue bootstrap/ensure helpers. Mixed into CopyrighterStore; relies on +persistence methods and self.* attributes provided by the host class. Behavior +unchanged. +""" + +from __future__ import annotations + +import hashlib +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Any + +from rights_filter.server.image_store import LocalSubmissionImageStore + + +class StoreQueueMixin: + @staticmethod + def _queue_id_for_path(folder_path: Path) -> str: + return f"queue-{hashlib.sha1(str(folder_path).encode('utf-8')).hexdigest()[:16]}" + + @staticmethod + def _normalize_queue_folder(folder_path: Path | str) -> Path: + return Path(folder_path).resolve() + + def _set_active_queue(self, queue_id: str) -> None: + with self._connect() as conn: + conn.execute("update submission_queues set is_active = 0") + conn.execute( + "update submission_queues set is_active = 1 where id = ?", + (queue_id,), + ) + + def _queue_row_by_id(self, queue_id: str) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute( + """ + select id, folder_path, label, is_active, created_at, created_epoch, + last_imported_epoch, last_imported_at + from submission_queues + where id = ? + """, + (queue_id,), + ).fetchone() + if row is None: + return None + return { + "id": row["id"], + "folderPath": row["folder_path"], + "label": row["label"], + "isActive": bool(row["is_active"]), + "createdAt": row["created_at"], + "createdEpoch": row["created_epoch"], + "lastImportedAt": row["last_imported_at"], + "lastImportedEpoch": row["last_imported_epoch"], + } + + def _migrate_legacy_submissions_to_queue(self, queue_id: str, conn: sqlite3.Connection | None = None) -> None: + should_close = conn is None + if conn is None: + conn = self._connect() + try: + conn.execute( + "update submissions set queue_id = ? where queue_id = ''", + (queue_id,), + ) + if should_close: + conn.commit() + finally: + if should_close: + conn.close() + + def bootstrap_active_queue(self) -> dict[str, Any] | None: + with self._connect() as conn: + row = conn.execute( + """ + select id, folder_path, label, is_active, created_at, created_epoch, + last_imported_epoch, last_imported_at + from submission_queues + where is_active = 1 + limit 1 + """, + ).fetchone() + if row is None: + return None + return { + "id": row["id"], + "folderPath": row["folder_path"], + "label": row["label"], + "isActive": bool(row["is_active"]), + "createdAt": row["created_at"], + "createdEpoch": row["created_epoch"], + "lastImportedAt": row["last_imported_at"], + "lastImportedEpoch": row["last_imported_epoch"], + } + + def ensure_queue(self, folder_path: Path | str, label: str | None = None) -> dict[str, Any]: + folder = self._normalize_queue_folder(folder_path) + queue_id = self._queue_id_for_path(folder) + now = datetime.now() + queue_label = str(label or "").strip() or folder.name + + with self._connect() as conn: + existing = conn.execute( + "select id from submission_queues where id = ? or folder_path = ?", + (queue_id, str(folder)), + ).fetchone() + if existing is None: + conn.execute( + """ + insert into submission_queues ( + id, folder_path, label, is_active, created_at, created_epoch, last_imported_epoch, last_imported_at + ) values (?, ?, ?, 1, ?, ?, 0, '') + """, + (queue_id, str(folder), queue_label, now.isoformat(" ", "seconds"), int(now.timestamp())), + ) + else: + conn.execute( + "update submission_queues set is_active = 1, folder_path = ?, label = ? where id = ?", + (str(folder), queue_label, queue_id), + ) + conn.execute("update submission_queues set is_active = 0 where id != ?", (queue_id,)) + if self._has_queueless_submissions(conn=conn): + self._migrate_legacy_submissions_to_queue(queue_id, conn=conn) + queue_row = conn.execute( + """ + select id, folder_path, label, is_active, created_at, created_epoch, + last_imported_epoch, last_imported_at + from submission_queues + where id = ? + """, + (queue_id,), + ).fetchone() + if queue_row is None: + return self._queue_row_by_id(queue_id) + return { + "id": queue_row["id"], + "folderPath": queue_row["folder_path"], + "label": queue_row["label"], + "isActive": bool(queue_row["is_active"]), + "createdAt": queue_row["created_at"], + "createdEpoch": queue_row["created_epoch"], + "lastImportedAt": queue_row["last_imported_at"], + "lastImportedEpoch": queue_row["last_imported_epoch"], + } + + def active_submission_image_store(self, fallback_root: Path | str | None = None) -> LocalSubmissionImageStore: + queue = self.bootstrap_active_queue() + if queue is None: + if fallback_root is None: + raise ValueError("no active submission queue is configured") + return LocalSubmissionImageStore(fallback_root) + return LocalSubmissionImageStore(queue["folderPath"]) + + def _has_queueless_submissions(self, conn: sqlite3.Connection | None = None) -> bool: + should_close = conn is None + if conn is None: + conn = self._connect() + try: + count_row = conn.execute( + "select count(*) as count from submissions where queue_id = ''" + ).fetchone() + return bool(int(count_row["count"]) > 0) if count_row else False + finally: + if should_close: + conn.close() +