refactor: extract submission-queue management into StoreQueueMixin
Move queue id derivation, active-queue selection, legacy-submission migration, and queue bootstrap/ensure helpers into a mixin; CopyrighterStore inherits it. Drop now-unused hashlib import. sqlite_store.py 874 -> 724 lines (5119 -> 724, -86%); now under the 800-line guideline.
This commit is contained in:
parent
b575d2ee06
commit
6184d0f464
2 changed files with 172 additions and 151 deletions
|
|
@ -1,6 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
|
@ -50,6 +49,7 @@ from rights_filter.server.store_remote_fetch import (
|
|||
from rights_filter.server.store_enrichment import StoreEnrichmentMixin
|
||||
from rights_filter.server.store_operations import StoreOperationsMixin
|
||||
from rights_filter.server.store_persistence import StorePersistenceMixin
|
||||
from rights_filter.server.store_queue import StoreQueueMixin
|
||||
from rights_filter.server.store_search_candidates import StoreSearchCandidatesMixin
|
||||
from rights_filter.server.store_schema import (
|
||||
_ensure_constrained_schema,
|
||||
|
|
@ -72,6 +72,7 @@ from rights_filter.server.store_serialization import (
|
|||
|
||||
class CopyrighterStore(
|
||||
StorePersistenceMixin,
|
||||
StoreQueueMixin,
|
||||
StoreSearchCandidatesMixin,
|
||||
StoreEnrichmentMixin,
|
||||
StoreOperationsMixin,
|
||||
|
|
@ -233,156 +234,6 @@ class CopyrighterStore(
|
|||
"""
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _queue_id_for_path(folder_path: Path) -> str:
|
||||
return f"queue-{hashlib.sha1(str(folder_path).encode('utf-8')).hexdigest()[:16]}"
|
||||
|
||||
@staticmethod
|
||||
def _normalize_queue_folder(folder_path: Path | str) -> Path:
|
||||
return Path(folder_path).resolve()
|
||||
|
||||
def _set_active_queue(self, queue_id: str) -> None:
|
||||
with self._connect() as conn:
|
||||
conn.execute("update submission_queues set is_active = 0")
|
||||
conn.execute(
|
||||
"update submission_queues set is_active = 1 where id = ?",
|
||||
(queue_id,),
|
||||
)
|
||||
|
||||
def _queue_row_by_id(self, queue_id: str) -> dict[str, Any] | None:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where id = ?
|
||||
""",
|
||||
(queue_id,),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"id": row["id"],
|
||||
"folderPath": row["folder_path"],
|
||||
"label": row["label"],
|
||||
"isActive": bool(row["is_active"]),
|
||||
"createdAt": row["created_at"],
|
||||
"createdEpoch": row["created_epoch"],
|
||||
"lastImportedAt": row["last_imported_at"],
|
||||
"lastImportedEpoch": row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def _migrate_legacy_submissions_to_queue(self, queue_id: str, conn: sqlite3.Connection | None = None) -> None:
|
||||
should_close = conn is None
|
||||
if conn is None:
|
||||
conn = self._connect()
|
||||
try:
|
||||
conn.execute(
|
||||
"update submissions set queue_id = ? where queue_id = ''",
|
||||
(queue_id,),
|
||||
)
|
||||
if should_close:
|
||||
conn.commit()
|
||||
finally:
|
||||
if should_close:
|
||||
conn.close()
|
||||
|
||||
def bootstrap_active_queue(self) -> dict[str, Any] | None:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where is_active = 1
|
||||
limit 1
|
||||
""",
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"id": row["id"],
|
||||
"folderPath": row["folder_path"],
|
||||
"label": row["label"],
|
||||
"isActive": bool(row["is_active"]),
|
||||
"createdAt": row["created_at"],
|
||||
"createdEpoch": row["created_epoch"],
|
||||
"lastImportedAt": row["last_imported_at"],
|
||||
"lastImportedEpoch": row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def ensure_queue(self, folder_path: Path | str, label: str | None = None) -> dict[str, Any]:
|
||||
folder = self._normalize_queue_folder(folder_path)
|
||||
queue_id = self._queue_id_for_path(folder)
|
||||
now = datetime.now()
|
||||
queue_label = str(label or "").strip() or folder.name
|
||||
|
||||
with self._connect() as conn:
|
||||
existing = conn.execute(
|
||||
"select id from submission_queues where id = ? or folder_path = ?",
|
||||
(queue_id, str(folder)),
|
||||
).fetchone()
|
||||
if existing is None:
|
||||
conn.execute(
|
||||
"""
|
||||
insert into submission_queues (
|
||||
id, folder_path, label, is_active, created_at, created_epoch, last_imported_epoch, last_imported_at
|
||||
) values (?, ?, ?, 1, ?, ?, 0, '')
|
||||
""",
|
||||
(queue_id, str(folder), queue_label, now.isoformat(" ", "seconds"), int(now.timestamp())),
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"update submission_queues set is_active = 1, folder_path = ?, label = ? where id = ?",
|
||||
(str(folder), queue_label, queue_id),
|
||||
)
|
||||
conn.execute("update submission_queues set is_active = 0 where id != ?", (queue_id,))
|
||||
if self._has_queueless_submissions(conn=conn):
|
||||
self._migrate_legacy_submissions_to_queue(queue_id, conn=conn)
|
||||
queue_row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where id = ?
|
||||
""",
|
||||
(queue_id,),
|
||||
).fetchone()
|
||||
if queue_row is None:
|
||||
return self._queue_row_by_id(queue_id)
|
||||
return {
|
||||
"id": queue_row["id"],
|
||||
"folderPath": queue_row["folder_path"],
|
||||
"label": queue_row["label"],
|
||||
"isActive": bool(queue_row["is_active"]),
|
||||
"createdAt": queue_row["created_at"],
|
||||
"createdEpoch": queue_row["created_epoch"],
|
||||
"lastImportedAt": queue_row["last_imported_at"],
|
||||
"lastImportedEpoch": queue_row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def active_submission_image_store(self, fallback_root: Path | str | None = None) -> LocalSubmissionImageStore:
|
||||
queue = self.bootstrap_active_queue()
|
||||
if queue is None:
|
||||
if fallback_root is None:
|
||||
raise ValueError("no active submission queue is configured")
|
||||
return LocalSubmissionImageStore(fallback_root)
|
||||
return LocalSubmissionImageStore(queue["folderPath"])
|
||||
|
||||
def _has_queueless_submissions(self, conn: sqlite3.Connection | None = None) -> bool:
|
||||
should_close = conn is None
|
||||
if conn is None:
|
||||
conn = self._connect()
|
||||
try:
|
||||
count_row = conn.execute(
|
||||
"select count(*) as count from submissions where queue_id = ''"
|
||||
).fetchone()
|
||||
return bool(int(count_row["count"]) > 0) if count_row else False
|
||||
finally:
|
||||
if should_close:
|
||||
conn.close()
|
||||
|
||||
def seed_from_image_store(self, image_store: LocalSubmissionImageStore) -> int:
|
||||
# Serialize seeding so two concurrent reload/import/upload requests can't
|
||||
# both classify the same record as new and double-run analysis, external
|
||||
|
|
|
|||
170
src/rights_filter/server/store_queue.py
Normal file
170
src/rights_filter/server/store_queue.py
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
"""Submission-queue management for CopyrighterStore, as a mixin.
|
||||
|
||||
Queue id derivation, active-queue selection, legacy-submission migration, and
|
||||
the active-queue bootstrap/ensure helpers. Mixed into CopyrighterStore; relies on
|
||||
persistence methods and self.* attributes provided by the host class. Behavior
|
||||
unchanged.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from rights_filter.server.image_store import LocalSubmissionImageStore
|
||||
|
||||
|
||||
class StoreQueueMixin:
|
||||
@staticmethod
|
||||
def _queue_id_for_path(folder_path: Path) -> str:
|
||||
return f"queue-{hashlib.sha1(str(folder_path).encode('utf-8')).hexdigest()[:16]}"
|
||||
|
||||
@staticmethod
|
||||
def _normalize_queue_folder(folder_path: Path | str) -> Path:
|
||||
return Path(folder_path).resolve()
|
||||
|
||||
def _set_active_queue(self, queue_id: str) -> None:
|
||||
with self._connect() as conn:
|
||||
conn.execute("update submission_queues set is_active = 0")
|
||||
conn.execute(
|
||||
"update submission_queues set is_active = 1 where id = ?",
|
||||
(queue_id,),
|
||||
)
|
||||
|
||||
def _queue_row_by_id(self, queue_id: str) -> dict[str, Any] | None:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where id = ?
|
||||
""",
|
||||
(queue_id,),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"id": row["id"],
|
||||
"folderPath": row["folder_path"],
|
||||
"label": row["label"],
|
||||
"isActive": bool(row["is_active"]),
|
||||
"createdAt": row["created_at"],
|
||||
"createdEpoch": row["created_epoch"],
|
||||
"lastImportedAt": row["last_imported_at"],
|
||||
"lastImportedEpoch": row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def _migrate_legacy_submissions_to_queue(self, queue_id: str, conn: sqlite3.Connection | None = None) -> None:
|
||||
should_close = conn is None
|
||||
if conn is None:
|
||||
conn = self._connect()
|
||||
try:
|
||||
conn.execute(
|
||||
"update submissions set queue_id = ? where queue_id = ''",
|
||||
(queue_id,),
|
||||
)
|
||||
if should_close:
|
||||
conn.commit()
|
||||
finally:
|
||||
if should_close:
|
||||
conn.close()
|
||||
|
||||
def bootstrap_active_queue(self) -> dict[str, Any] | None:
|
||||
with self._connect() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where is_active = 1
|
||||
limit 1
|
||||
""",
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"id": row["id"],
|
||||
"folderPath": row["folder_path"],
|
||||
"label": row["label"],
|
||||
"isActive": bool(row["is_active"]),
|
||||
"createdAt": row["created_at"],
|
||||
"createdEpoch": row["created_epoch"],
|
||||
"lastImportedAt": row["last_imported_at"],
|
||||
"lastImportedEpoch": row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def ensure_queue(self, folder_path: Path | str, label: str | None = None) -> dict[str, Any]:
|
||||
folder = self._normalize_queue_folder(folder_path)
|
||||
queue_id = self._queue_id_for_path(folder)
|
||||
now = datetime.now()
|
||||
queue_label = str(label or "").strip() or folder.name
|
||||
|
||||
with self._connect() as conn:
|
||||
existing = conn.execute(
|
||||
"select id from submission_queues where id = ? or folder_path = ?",
|
||||
(queue_id, str(folder)),
|
||||
).fetchone()
|
||||
if existing is None:
|
||||
conn.execute(
|
||||
"""
|
||||
insert into submission_queues (
|
||||
id, folder_path, label, is_active, created_at, created_epoch, last_imported_epoch, last_imported_at
|
||||
) values (?, ?, ?, 1, ?, ?, 0, '')
|
||||
""",
|
||||
(queue_id, str(folder), queue_label, now.isoformat(" ", "seconds"), int(now.timestamp())),
|
||||
)
|
||||
else:
|
||||
conn.execute(
|
||||
"update submission_queues set is_active = 1, folder_path = ?, label = ? where id = ?",
|
||||
(str(folder), queue_label, queue_id),
|
||||
)
|
||||
conn.execute("update submission_queues set is_active = 0 where id != ?", (queue_id,))
|
||||
if self._has_queueless_submissions(conn=conn):
|
||||
self._migrate_legacy_submissions_to_queue(queue_id, conn=conn)
|
||||
queue_row = conn.execute(
|
||||
"""
|
||||
select id, folder_path, label, is_active, created_at, created_epoch,
|
||||
last_imported_epoch, last_imported_at
|
||||
from submission_queues
|
||||
where id = ?
|
||||
""",
|
||||
(queue_id,),
|
||||
).fetchone()
|
||||
if queue_row is None:
|
||||
return self._queue_row_by_id(queue_id)
|
||||
return {
|
||||
"id": queue_row["id"],
|
||||
"folderPath": queue_row["folder_path"],
|
||||
"label": queue_row["label"],
|
||||
"isActive": bool(queue_row["is_active"]),
|
||||
"createdAt": queue_row["created_at"],
|
||||
"createdEpoch": queue_row["created_epoch"],
|
||||
"lastImportedAt": queue_row["last_imported_at"],
|
||||
"lastImportedEpoch": queue_row["last_imported_epoch"],
|
||||
}
|
||||
|
||||
def active_submission_image_store(self, fallback_root: Path | str | None = None) -> LocalSubmissionImageStore:
|
||||
queue = self.bootstrap_active_queue()
|
||||
if queue is None:
|
||||
if fallback_root is None:
|
||||
raise ValueError("no active submission queue is configured")
|
||||
return LocalSubmissionImageStore(fallback_root)
|
||||
return LocalSubmissionImageStore(queue["folderPath"])
|
||||
|
||||
def _has_queueless_submissions(self, conn: sqlite3.Connection | None = None) -> bool:
|
||||
should_close = conn is None
|
||||
if conn is None:
|
||||
conn = self._connect()
|
||||
try:
|
||||
count_row = conn.execute(
|
||||
"select count(*) as count from submissions where queue_id = ''"
|
||||
).fetchone()
|
||||
return bool(int(count_row["count"]) > 0) if count_row else False
|
||||
finally:
|
||||
if should_close:
|
||||
conn.close()
|
||||
|
||||
Loading…
Reference in a new issue