From e3bc99e6b9fc7fcd6f898736879469373c6769d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9C=A0=EC=B0=BD=EC=9A=B1?= Date: Sat, 20 Jun 2026 21:10:22 +0900 Subject: [PATCH] refactor: extract text helpers and HTML/CSS image-scraping from sqlite_store Move the pure text helpers (_text_list, _unique_texts) into store_text and the ~950-line page/CSS/JSON/srcset image-URL extraction (the _PageImageParser and its helpers) into store_page_scrape. Both behavior-preserving; store_page_scrape depends only on stdlib + url/text helpers + domain Evidence (no store coupling). sqlite_store.py 4955 -> 3992 lines. --- src/rights_filter/server/sqlite_store.py | 983 +----------------- src/rights_filter/server/store_page_scrape.py | 976 +++++++++++++++++ src/rights_filter/server/store_text.py | 27 + tests/rights_filter/test_review_fixes.py | 3 +- 4 files changed, 1015 insertions(+), 974 deletions(-) create mode 100644 src/rights_filter/server/store_page_scrape.py create mode 100644 src/rights_filter/server/store_text.py diff --git a/src/rights_filter/server/sqlite_store.py b/src/rights_filter/server/sqlite_store.py index 708a98d..2c8d376 100644 --- a/src/rights_filter/server/sqlite_store.py +++ b/src/rights_filter/server/sqlite_store.py @@ -10,14 +10,13 @@ import re import shutil import sqlite3 import threading -from html.parser import HTMLParser from contextlib import contextmanager from dataclasses import replace from datetime import datetime from io import BytesIO from pathlib import Path from typing import Any, Callable -from urllib.parse import parse_qsl, urljoin, urlparse +from urllib.parse import urlparse from rights_filter.analysis.face_person_detection import HeuristicFacePersonDetector from rights_filter.analysis.fingerprints import FingerprintService @@ -47,12 +46,21 @@ from rights_filter.server.store_remote_fetch import ( _fetch_stylesheet_url_bytes, _fetch_url_bytes, ) +from rights_filter.server.store_page_scrape import ( + _content_has_comparable_image_fingerprint, + _extract_css_image_urls, + _extract_page_image_urls, + _extract_page_stylesheet_urls, + _normalized_remote_image_url, + _search_result_direct_image_urls, +) from rights_filter.server.store_schema import ( _ensure_constrained_schema, _ensure_queue_schema, _ensure_schema_version, _ensure_typed_columns, ) +from rights_filter.server.store_text import _text_list, _unique_texts from rights_filter.server.store_url_utils import ( _decoded_nested_url, _is_http_url, @@ -3514,26 +3522,6 @@ def _query_history_status(evidence: list[Evidence]) -> str: return "auto" -def _text_list(value: Any) -> list[str]: - if value is None: - return [] - if isinstance(value, list): - return [str(item).strip() for item in value if str(item).strip()] - return [item.strip() for item in str(value).split(",") if item.strip()] - - -def _unique_texts(values: Any) -> list[str]: - seen: set[str] = set() - result: list[str] = [] - for value in values: - text = str(value).strip() - if not text or text in seen: - continue - seen.add(text) - result.append(text) - return result - - def _default_evidence_contribution(payload: dict[str, Any]) -> bool: source = str(payload.get("source", "")) if source in {"llm", "failure"}: @@ -3656,957 +3644,6 @@ def _image_suffix_from_url(url: str) -> str: return suffix -class _PageImageParser(HTMLParser): - def __init__(self, parse_noscript: bool = True) -> None: - super().__init__() - self.parse_noscript = parse_noscript - self.priority_urls: list[str] = [] - self.image_urls: list[str] = [] - self.stylesheet_urls: list[str] = [] - self._script_chunks: list[str] = [] - self._collecting_script_type = "" - self._style_chunks: list[str] = [] - self._collecting_style = False - self._noscript_chunks: list[str] = [] - self._collecting_noscript = False - - def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: - attr = {name.lower(): (value or "") for name, value in attrs} - tag_name = tag.lower() - if tag_name == "noscript" and self.parse_noscript: - self._collecting_noscript = True - self._noscript_chunks = [] - return - if tag_name == "style": - self._collecting_style = True - self._style_chunks = [] - return - if tag_name == "script": - script_type = attr.get("type", "").lower() - if "ld+json" in script_type: - self._collecting_script_type = "json_ld" - self._script_chunks = [] - elif script_type in {"application/json", "application/problem+json", "application/activity+json"}: - self._collecting_script_type = "json" - self._script_chunks = [] - elif script_type in {"", "text/javascript", "application/javascript", "module"}: - self._collecting_script_type = "javascript" - self._script_chunks = [] - return - if attr.get("style"): - self.image_urls.extend(_css_url_image_urls(attr["style"])) - if attr.get("poster") and _looks_like_image_reference(attr["poster"]): - self.priority_urls.append(attr["poster"]) - for name in ( - "data-bg", - "data-bg-url", - "data-background", - "data-background-image", - "data-background-url", - "data-lazy-background", - "data-image", - "data-image-src", - "data-image-url", - "data-img", - "data-img-src", - "data-img-url", - "data-src", - "data-src-large", - "data-fallback-src", - "data-full", - "data-full-src", - "data-full-image", - "data-full-url", - "data-hires", - "data-highres", - "data-large", - "data-large-src", - "data-large-image", - "data-large-url", - "data-original", - "data-original-image", - "data-lazy-src", - "data-original-src", - "data-original-url", - "data-photo", - "data-photo-url", - "data-thumb", - "data-thumb-url", - "data-thumbnail", - "data-thumbnail-url", - "data-zoom-image", - ): - if attr.get(name): - self.image_urls.extend( - _data_attribute_image_urls(name, attr[name], known_image_attr=True) - ) - for name, value in attr.items(): - self.image_urls.extend(_data_attribute_image_urls(name, value)) - if tag_name in {"a", "area"} and attr.get("href") and _looks_like_image_reference(attr["href"]): - self.image_urls.append(attr["href"]) - if tag_name == "meta": - key = (attr.get("property") or attr.get("name") or attr.get("itemprop") or "").lower() - if key in { - "image", - "contenturl", - "thumbnail", - "thumbnailurl", - "og:image", - "og:image:url", - "og:image:secure_url", - "twitter:image", - "twitter:image:src", - "twitter:image:url", - }: - self.priority_urls.append(attr.get("content", "")) - return - if tag_name == "link": - rel = attr.get("rel", "").lower() - rel_key = rel.replace("-", "_") - as_value = attr.get("as", "").lower() - if "stylesheet" in rel and attr.get("href"): - self.stylesheet_urls.append(attr["href"]) - if "image_src" in rel_key or (as_value == "image" and any(token in rel for token in ("preload", "prefetch"))): - if attr.get("href"): - self.priority_urls.append(attr["href"]) - for name in ("imagesrcset", "image-srcset"): - if attr.get(name): - self.priority_urls.extend(_srcset_image_urls(attr[name])) - return - if tag_name == "source": - for name in ("srcset", "data-original-srcset", "data-lazy-srcset", "data-srcset"): - if attr.get(name): - self.priority_urls.extend(_srcset_image_urls(attr[name])) - return - if tag_name in {"img", "amp-img", "amp-anim"}: - target_urls = self.priority_urls if _is_likely_primary_image_attrs(attr) else self.image_urls - for name in ("data-original-srcset", "data-lazy-srcset", "data-srcset", "srcset"): - if attr.get(name): - target_urls.extend(_srcset_image_urls(attr[name])) - for name in ("data-original", "data-original-src", "data-lazy-src", "data-src", "src"): - if attr.get(name): - target_urls.append(attr[name]) - return - - def handle_data(self, data: str) -> None: - if self._collecting_script_type: - self._script_chunks.append(data) - if self._collecting_style: - self._style_chunks.append(data) - if self._collecting_noscript: - self._noscript_chunks.append(data) - - def handle_endtag(self, tag: str) -> None: - tag_name = tag.lower() - if tag_name == "noscript" and self._collecting_noscript: - self._collecting_noscript = False - parser = _PageImageParser(parse_noscript=False) - parser.feed(html.unescape("".join(self._noscript_chunks))) - self.priority_urls.extend(parser.priority_urls) - self.image_urls.extend(parser.image_urls) - self.stylesheet_urls.extend(parser.stylesheet_urls) - self._noscript_chunks = [] - return - if tag_name == "style" and self._collecting_style: - self._collecting_style = False - self.image_urls.extend(_css_url_image_urls("".join(self._style_chunks))) - self._style_chunks = [] - return - if tag_name != "script" or not self._collecting_script_type: - return - script_type = self._collecting_script_type - self._collecting_script_type = "" - script_content = "".join(self._script_chunks) - self._script_chunks = [] - if script_type == "json_ld": - self.priority_urls.extend(_json_ld_image_urls(script_content)) - elif script_type == "json": - self.priority_urls.extend(_json_script_image_urls(script_content)) - elif script_type == "javascript": - self.priority_urls.extend(_javascript_image_urls(script_content)) - - -def _srcset_image_urls(value: str) -> list[str]: - candidates: list[tuple[float, int, str]] = [] - for order, raw_candidate in enumerate(_split_srcset_candidates(str(value))): - candidate = raw_candidate.strip() - if not candidate: - continue - parts = candidate.split() - url = parts[0].strip() - if not url: - continue - score = _srcset_descriptor_score(parts[1] if len(parts) > 1 else "") - candidates.append((score, order, url)) - return [ - url - for _, _, url in sorted( - candidates, - key=lambda item: (-item[0], item[1]), - ) - ] - - -def _split_srcset_candidates(value: str) -> list[str]: - candidates: list[str] = [] - start = 0 - for index, character in enumerate(value): - if character != ",": - continue - remainder = value[index + 1 :].lstrip() - if not _starts_srcset_candidate(remainder): - continue - candidates.append(value[start:index]) - start = index + 1 - candidates.append(value[start:]) - return candidates - - -def _starts_srcset_candidate(value: str) -> bool: - text = str(value).strip() - if not text: - return False - first_token = text.split(None, 1)[0] - return _is_urlish_reference(first_token) or _is_scheme_less_remote_image_url(first_token) - - -def _srcset_descriptor_score(value: str) -> float: - descriptor = value.strip().lower() - if descriptor.endswith("w"): - try: - return float(descriptor[:-1]) - except ValueError: - return 0.0 - if descriptor.endswith("x"): - try: - return float(descriptor[:-1]) * 1000 - except ValueError: - return 0.0 - return 0.0 - - -def _is_generic_data_image_attr(name: str, value: str) -> bool: - return bool(_data_attribute_image_urls(name, value)) - - -def _data_attribute_image_urls( - name: str, - value: str, - *, - known_image_attr: bool = False, -) -> list[str]: - attr_name = str(name).lower().replace("-", "_").replace(":", "_") - text = html.unescape(str(value).strip()) - if not attr_name.startswith("data_") or not text or text.lower().startswith("data:"): - return [] - if _is_srcset_attr_name(attr_name): - return _srcset_image_urls(text) - if _looks_like_image_reference(text): - return [text] - if _is_urlish_reference(text): - return [text] - image_named_attr = _is_image_data_attr_name(attr_name) - if not known_image_attr and not image_named_attr and not _looks_like_json_attribute_text(text): - return [] - return _json_attribute_image_urls( - text, - allow_plain_url_keys=known_image_attr or image_named_attr, - ) - - -def _is_image_data_attr_name(attr_name: str) -> bool: - image_tokens = ( - "avatar", - "background", - "bg", - "image", - "img", - "photo", - "picture", - "poster", - "thumb", - "thumbnail", - ) - return any(token in attr_name for token in image_tokens) - - -def _json_attribute_image_urls( - value: str, - *, - allow_plain_url_keys: bool = True, -) -> list[str]: - text = _json_attribute_text(value) - if not text: - return [] - try: - document = json.loads(text) - except Exception: - return [] - - urls: list[str] = [] - - def collect_likely(value: Any) -> None: - if isinstance(value, str): - urls.extend(_json_image_string_candidates(value)) - return - if isinstance(value, list): - for item in value: - collect_likely(item) - return - if isinstance(value, dict): - for key, child in value.items(): - if _is_srcset_key(str(key)): - urls.extend(_srcset_image_urls(str(child))) - continue - if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)): - collect_likely(child) - elif isinstance(child, (dict, list)): - collect_likely(child) - - def collect_obvious(value: Any) -> None: - if isinstance(value, str): - return - if isinstance(value, list): - for item in value: - collect_obvious(item) - return - if isinstance(value, dict): - for key, child in value.items(): - if _is_srcset_key(str(key)): - urls.extend(_srcset_image_urls(str(child))) - continue - if _is_likely_json_image_key(str(key)): - collect_likely(child) - else: - collect_obvious(child) - - if allow_plain_url_keys: - collect_likely(document) - else: - collect_obvious(document) - return _unique_texts(urls) - - -def _looks_like_json_attribute_text(value: str) -> bool: - return bool(_json_attribute_text(value)) - - -def _json_attribute_text(value: str) -> str: - text = html.unescape(str(value).strip()) - if text.startswith(("{", "[")): - return text - decoded = _decoded_nested_url(text) - if decoded and decoded.startswith(("{", "[")): - return decoded - return "" - - -def _is_srcset_attr_name(name: str) -> bool: - normalized = str(name).lower().replace("-", "_").replace(":", "_") - return "srcset" in normalized or "src_set" in normalized - - -def _is_urlish_reference(value: str) -> bool: - text = str(value).strip() - return ( - _is_http_url(text) - or text.startswith(("/", "//", "./", "../")) - or _url_looks_like_image(text) - ) - - -def _json_ld_image_urls(script_content: str) -> list[str]: - try: - document = json.loads(script_content) - except Exception: - return [] - - urls: list[str] = [] - - def collect_image_value(value: Any) -> None: - if isinstance(value, str): - urls.append(value) - return - if isinstance(value, list): - for item in value: - collect_image_value(item) - return - if isinstance(value, dict): - for key in ("contentUrl", "url", "thumbnailUrl"): - if key in value: - collect_image_value(value[key]) - - def visit(value: Any) -> None: - if isinstance(value, list): - for item in value: - visit(item) - return - if not isinstance(value, dict): - return - for key, child in value.items(): - if str(key).lower() in { - "image", - "thumbnail", - "thumbnailurl", - "contenturl", - "primaryimageofpage", - "associatedmedia", - }: - collect_image_value(child) - else: - visit(child) - - visit(document) - return urls - - -def _json_script_image_urls(script_content: str) -> list[str]: - try: - document = json.loads(script_content) - except Exception: - return [] - - urls: list[str] = [] - - def collect(value: Any) -> None: - if isinstance(value, str): - urls.extend(_json_image_string_candidates(value)) - return - if isinstance(value, list): - for item in value: - collect(item) - return - if isinstance(value, dict): - for key, child in value.items(): - if _is_srcset_key(str(key)): - urls.extend(_srcset_image_urls(str(child))) - continue - if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)): - collect(child) - elif isinstance(child, (dict, list)): - collect(child) - return - - def visit(value: Any) -> None: - if isinstance(value, list): - for item in value: - visit(item) - return - if not isinstance(value, dict): - return - for key, child in value.items(): - if _is_srcset_key(str(key)): - urls.extend(_srcset_image_urls(str(child))) - elif _is_likely_json_image_key(str(key)): - collect(child) - else: - visit(child) - - visit(document) - return urls - - -def _is_likely_json_image_key(key: str) -> bool: - normalized = key.lower().replace("-", "_") - return ( - "image" in normalized - or "thumbnail" in normalized - or _is_srcset_key(normalized) - or normalized in { - "asset_url", - "content_url", - "contenturl", - "media_url", - "mediaurl", - "photo", - "photo_url", - "poster", - "poster_url", - "avatar", - "avatar_url", - } - ) - - -def _is_json_url_value_key(key: str) -> bool: - normalized = re.sub(r"(? list[str]: - raw = str(value).strip() - decoded = _decoded_nested_url(raw) - if decoded != raw and (_looks_like_image_reference(decoded) or _is_urlish_reference(decoded)): - return [decoded] - if _looks_like_image_reference(raw) or _is_urlish_reference(raw): - return [raw] - return [] - - -def _is_srcset_key(key: str) -> bool: - normalized = str(key).lower().replace("-", "_") - return normalized in { - "image_src_set", - "image_srcset", - "imagesrcset", - "photo_src_set", - "photo_srcset", - "picture_src_set", - "picture_srcset", - "src_set", - "srcset", - "thumbnail_src_set", - "thumbnail_srcset", - } - - -def _javascript_image_urls(script_content: str) -> list[str]: - urls: list[str] = [] - image_key = r"[\w$:-]*(?:image|thumbnail|photo|avatar|poster|picture)[\w$:-]*" - srcset_key = r"[\w$:-]*(?:srcset|src-set)[\w$:-]*" - srcset_pattern = re.compile( - rf"""(?is)["']?({srcset_key})["']?\s*[:=]\s*["']([^"']+)["']""" - ) - key_value_pattern = re.compile( - rf"""(?is)["']?({image_key})["']?\s*[:=]\s*["']([^"']+)["']""" - ) - nested_value_pattern = re.compile( - rf"""(?is)["']?({image_key})["']?\s*[:=]\s*\{{[^{{}}]{{0,500}}?["'](?:url|src|contentUrl|thumbnailUrl)["']\s*:\s*["']([^"']+)["']""" - ) - for _key, value in srcset_pattern.findall(script_content): - urls.extend(_srcset_image_urls(_decode_javascript_string(value))) - for pattern in (key_value_pattern, nested_value_pattern): - for _key, value in pattern.findall(script_content): - candidate = _decode_javascript_string(value) - if _looks_like_image_reference(candidate): - urls.append(candidate) - return _unique_texts(urls) - - -def _decode_javascript_string(value: str) -> str: - text = value.replace("\\/", "/") - if "\\u" not in text and "\\x" not in text: - return text - - def _replace_escape(match: re.Match[str]) -> str: - try: - return chr(int(match.group(1) or match.group(2), 16)) - except (TypeError, ValueError): - return match.group(0) - - # Decode only explicit \uXXXX / \xXX escapes. The previous - # bytes(text, "utf-8").decode("unicode_escape") reinterpreted real UTF-8 - # bytes as Latin-1, silently corrupting literal non-ASCII (e.g. Korean) URLs. - return re.sub(r"\\u([0-9a-fA-F]{4})|\\x([0-9a-fA-F]{2})", _replace_escape, text) - - -def _looks_like_image_reference(value: str) -> bool: - text = value.strip() - if not text or text.lower().startswith("data:"): - return False - if _unwrapped_image_url(text): - return True - if _relative_wrapped_image_url(text): - return True - return _url_looks_like_image(text) - - -def _relative_wrapped_image_url(value: str) -> str: - parsed = urlparse(value) - if parsed.scheme or parsed.netloc: - return "" - for key, raw_value in parse_qsl(parsed.query, keep_blank_values=False): - key_text = key.lower().replace("-", "_") - if key_text not in { - "imgurl", - "imageurl", - "image_url", - "mediaurl", - "media_url", - "contenturl", - "content_url", - "photo", - "photo_url", - "src", - "source", - "image", - "img", - "url", - "u", - }: - continue - candidate = _decoded_nested_url(raw_value) - if candidate.startswith("/") or _url_looks_like_image(candidate): - return candidate - return "" - - -def _is_likely_primary_image_attrs(attr: dict[str, str]) -> bool: - text = " ".join( - str(attr.get(name, "")) - for name in ( - "alt", - "aria-label", - "class", - "data-image-type", - "data-role", - "id", - "itemprop", - "src", - "data-src", - "data-original", - "data-lazy-src", - "data-original-src", - ) - ).casefold() - negative_tokens = ( - "advert", - "avatar", - "badge", - "banner", - "button", - "emoji", - "favicon", - "icon", - "logo", - "sprite", - "tracking", - ) - if any(token in text for token in negative_tokens): - return False - - positive_tokens = ( - "article", - "cover", - "full", - "hero", - "main", - "official", - "photo", - "picture", - "portrait", - "primary", - "profile", - "representative", - "thumbnail", - ) - if any(token in text for token in positive_tokens): - return True - - width = _numeric_attr(attr.get("width", "")) - height = _numeric_attr(attr.get("height", "")) - loading = attr.get("loading", "").casefold() - fetchpriority = attr.get("fetchpriority", "").casefold() - return ( - width >= 300 - and height >= 300 - and (fetchpriority == "high" or loading != "lazy") - ) - - -def _numeric_attr(value: str) -> int: - match = re.search(r"\d+", str(value)) - if not match: - return 0 - try: - return int(match.group(0)) - except ValueError: - return 0 - - -def _css_url_image_urls(style: str) -> list[str]: - direct_urls = [ - match.group(2).strip() - for match in re.finditer(r"url\(\s*(['\"]?)(.*?)\1\s*\)", style, flags=re.IGNORECASE) - if match.group(2).strip() - ] - return _unique_texts([*_css_image_set_urls(style), *direct_urls]) - - -def _css_image_set_urls(style: str) -> list[str]: - candidates: list[tuple[float, int, str]] = [] - order = 0 - for body in _css_image_set_bodies(style): - for raw_candidate in _split_top_level_commas(body): - url, descriptor = _css_image_set_candidate(raw_candidate) - if not url: - continue - candidates.append((_css_image_set_descriptor_score(descriptor), order, url)) - order += 1 - return [ - url - for _, _, url in sorted( - candidates, - key=lambda item: (-item[0], item[1]), - ) - ] - - -def _css_image_set_bodies(style: str) -> list[str]: - bodies: list[str] = [] - for match in re.finditer(r"(?:-webkit-)?image-set\s*\(", style, flags=re.IGNORECASE): - start = match.end() - depth = 1 - quote = "" - escaped = False - for index in range(start, len(style)): - character = style[index] - if quote: - if escaped: - escaped = False - elif character == "\\": - escaped = True - elif character == quote: - quote = "" - continue - if character in {"'", '"'}: - quote = character - continue - if character == "(": - depth += 1 - continue - if character == ")": - depth -= 1 - if depth == 0: - bodies.append(style[start:index]) - break - return bodies - - -def _split_top_level_commas(value: str) -> list[str]: - parts: list[str] = [] - start = 0 - depth = 0 - quote = "" - escaped = False - for index, character in enumerate(value): - if quote: - if escaped: - escaped = False - elif character == "\\": - escaped = True - elif character == quote: - quote = "" - continue - if character in {"'", '"'}: - quote = character - continue - if character == "(": - depth += 1 - continue - if character == ")": - depth = max(0, depth - 1) - continue - if character == "," and depth == 0: - parts.append(value[start:index].strip()) - start = index + 1 - parts.append(value[start:].strip()) - return [part for part in parts if part] - - -def _css_image_set_candidate(value: str) -> tuple[str, str]: - url_match = re.search(r"url\(\s*(['\"]?)(.*?)\1\s*\)", value, flags=re.IGNORECASE) - if url_match: - return url_match.group(2).strip(), value[url_match.end() :] - - quoted_match = re.search(r"""(['"])(.*?)\1""", value) - if quoted_match: - return quoted_match.group(2).strip(), value[quoted_match.end() :] - - parts = value.split(None, 1) - if parts and _looks_like_image_reference(parts[0]): - return parts[0].strip(), parts[1] if len(parts) > 1 else "" - return "", "" - - -def _css_image_set_descriptor_score(value: str) -> float: - descriptor = value.strip().lower() - match = re.search(r"([0-9]*\.?[0-9]+)\s*(dppx|dpi|x|w)\b", descriptor) - if not match: - return 0.0 - number = float(match.group(1)) - unit = match.group(2) - if unit == "w": - return number - if unit == "dpi": - return (number / 96) * 1000 - return number * 1000 - - -def _extract_page_image_urls(content: bytes, base_url: str, limit: int) -> list[str]: - if limit <= 0: - return [] - return [ - url - for url in _page_image_references(content, base_url)[0] - if _is_http_url(url) - ][:limit] - - -def _extract_page_stylesheet_urls(content: bytes, base_url: str, limit: int) -> list[str]: - if limit <= 0: - return [] - return [ - url - for url in _page_image_references(content, base_url)[1] - if _is_http_url(url) - ][:limit] - - -def _extract_css_image_urls(content: bytes, base_url: str, limit: int) -> list[str]: - if limit <= 0: - return [] - return [ - url - for url in _unique_texts( - _normalized_image_url(base_url, url) - for url in _css_url_image_urls(content.decode("utf-8", errors="replace")) - ) - if _is_http_url(url) - ][:limit] - - -def _page_image_references(content: bytes, base_url: str) -> tuple[list[str], list[str]]: - parser = _PageImageParser() - parser.feed(content.decode("utf-8", errors="replace")) - image_urls = [ - url - for url in _unique_texts( - _normalized_image_url(base_url, url) - for url in [*parser.priority_urls, *parser.image_urls] - ) - if _is_http_url(url) - ] - stylesheet_urls = [ - url - for url in _unique_texts( - _normalized_image_url(base_url, url) - for url in parser.stylesheet_urls - ) - if _is_http_url(url) - ] - return image_urls, stylesheet_urls - - -def _content_has_comparable_image_fingerprint(content: bytes) -> bool: - try: - fingerprint = FingerprintService().fingerprints_for(content).perceptual - except Exception: - return False - return not fingerprint.startswith("phash:unavailable:") - - -def _search_result_direct_image_urls(source_evidence: Evidence) -> list[str]: - result_url = str( - source_evidence.data.get("result_url", source_evidence.data.get("url", "")) - ) - unwrapped_url = _unwrapped_image_url(result_url) - if unwrapped_url: - return [unwrapped_url] - if _is_http_url(result_url) and _url_looks_like_image(result_url): - return [result_url] - return [] - - -def _normalized_image_url(base_url: str, url: str) -> str: - text = _decoded_url_reference(str(url).strip()) - if not text or text.lower().startswith("data:"): - return "" - if _is_scheme_less_remote_image_url(text): - text = f"https://{text.lstrip('/')}" - normalized = urljoin(base_url, text) - return _unwrapped_image_url(normalized) or normalized - - -def _normalized_remote_image_url(url: str) -> str: - text = _decoded_url_reference(str(url).strip()) - if not text or text.lower().startswith("data:"): - return "" - if _is_scheme_less_remote_image_url(text): - text = f"https://{text.lstrip('/')}" - return _unwrapped_image_url(text) or text - - -def _unwrapped_image_url(url: str) -> str: - if not _is_http_url(url): - return "" - parsed = urlparse(url) - strong_keys = { - "imgurl", - "imageurl", - "image_url", - "mediaurl", - "media_url", - "contenturl", - "content_url", - "photo", - "photo_url", - "src", - "source", - "image", - "img", - } - weak_keys = {"url", "u", "target", "redirect", "redirect_url"} - for key, value in parse_qsl(parsed.query, keep_blank_values=False): - key_text = key.lower().replace("-", "_") - candidate = _decoded_nested_url(value) - if not candidate: - continue - if not _is_http_url(candidate): - if candidate.startswith("//"): - candidate = f"https:{candidate}" - elif _is_scheme_less_remote_image_url(candidate): - candidate = f"https://{candidate.lstrip('/')}" - elif candidate.startswith("/") or _url_looks_like_image(candidate): - candidate = urljoin(url, candidate) - else: - continue - if key_text in strong_keys: - return candidate - if key_text in weak_keys and _url_looks_like_image(candidate): - return candidate - return "" - - -def _is_scheme_less_remote_image_url(value: str) -> bool: - text = str(value).strip().lstrip("/") - if not _url_looks_like_image(text): - return False - first_segment = text.split("/", 1)[0] - if first_segment in {".", ".."} or first_segment.startswith("."): - return False - return "." in first_segment and " " not in first_segment - - -def _decoded_url_reference(value: str) -> str: - raw = str(value).strip() - decoded = _decoded_nested_url(raw) - if decoded == raw: - return raw - if ( - _is_http_url(decoded) - or decoded.startswith(("/", "//", "./", "../")) - or _is_scheme_less_remote_image_url(decoded) - or _url_looks_like_image(decoded) - ): - return decoded - return raw - - def _submission_payload( record: dict[str, Any], score: int, diff --git a/src/rights_filter/server/store_page_scrape.py b/src/rights_filter/server/store_page_scrape.py new file mode 100644 index 0000000..98f577b --- /dev/null +++ b/src/rights_filter/server/store_page_scrape.py @@ -0,0 +1,976 @@ +"""HTML/CSS/JSON image-URL extraction for external search-result pages. + +Extracted from sqlite_store.py (the ~950-line "URL scraping" responsibility the +architecture review flagged as not belonging in a persistence store). Pure +parsing/normalization of fetched page content; behavior unchanged. Depends only +on stdlib, the URL/text helpers, and domain Evidence — never on the store class. +""" + +from __future__ import annotations + +import html +import json +import re +from html.parser import HTMLParser +from typing import Any +from urllib.parse import parse_qsl, urljoin, urlparse + +from rights_filter.analysis.fingerprints import FingerprintService +from rights_filter.domain.records import Evidence +from rights_filter.server.store_text import _unique_texts +from rights_filter.server.store_url_utils import ( + _decoded_nested_url, + _is_http_url, + _url_looks_like_image, +) + + +class _PageImageParser(HTMLParser): + def __init__(self, parse_noscript: bool = True) -> None: + super().__init__() + self.parse_noscript = parse_noscript + self.priority_urls: list[str] = [] + self.image_urls: list[str] = [] + self.stylesheet_urls: list[str] = [] + self._script_chunks: list[str] = [] + self._collecting_script_type = "" + self._style_chunks: list[str] = [] + self._collecting_style = False + self._noscript_chunks: list[str] = [] + self._collecting_noscript = False + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + attr = {name.lower(): (value or "") for name, value in attrs} + tag_name = tag.lower() + if tag_name == "noscript" and self.parse_noscript: + self._collecting_noscript = True + self._noscript_chunks = [] + return + if tag_name == "style": + self._collecting_style = True + self._style_chunks = [] + return + if tag_name == "script": + script_type = attr.get("type", "").lower() + if "ld+json" in script_type: + self._collecting_script_type = "json_ld" + self._script_chunks = [] + elif script_type in {"application/json", "application/problem+json", "application/activity+json"}: + self._collecting_script_type = "json" + self._script_chunks = [] + elif script_type in {"", "text/javascript", "application/javascript", "module"}: + self._collecting_script_type = "javascript" + self._script_chunks = [] + return + if attr.get("style"): + self.image_urls.extend(_css_url_image_urls(attr["style"])) + if attr.get("poster") and _looks_like_image_reference(attr["poster"]): + self.priority_urls.append(attr["poster"]) + for name in ( + "data-bg", + "data-bg-url", + "data-background", + "data-background-image", + "data-background-url", + "data-lazy-background", + "data-image", + "data-image-src", + "data-image-url", + "data-img", + "data-img-src", + "data-img-url", + "data-src", + "data-src-large", + "data-fallback-src", + "data-full", + "data-full-src", + "data-full-image", + "data-full-url", + "data-hires", + "data-highres", + "data-large", + "data-large-src", + "data-large-image", + "data-large-url", + "data-original", + "data-original-image", + "data-lazy-src", + "data-original-src", + "data-original-url", + "data-photo", + "data-photo-url", + "data-thumb", + "data-thumb-url", + "data-thumbnail", + "data-thumbnail-url", + "data-zoom-image", + ): + if attr.get(name): + self.image_urls.extend( + _data_attribute_image_urls(name, attr[name], known_image_attr=True) + ) + for name, value in attr.items(): + self.image_urls.extend(_data_attribute_image_urls(name, value)) + if tag_name in {"a", "area"} and attr.get("href") and _looks_like_image_reference(attr["href"]): + self.image_urls.append(attr["href"]) + if tag_name == "meta": + key = (attr.get("property") or attr.get("name") or attr.get("itemprop") or "").lower() + if key in { + "image", + "contenturl", + "thumbnail", + "thumbnailurl", + "og:image", + "og:image:url", + "og:image:secure_url", + "twitter:image", + "twitter:image:src", + "twitter:image:url", + }: + self.priority_urls.append(attr.get("content", "")) + return + if tag_name == "link": + rel = attr.get("rel", "").lower() + rel_key = rel.replace("-", "_") + as_value = attr.get("as", "").lower() + if "stylesheet" in rel and attr.get("href"): + self.stylesheet_urls.append(attr["href"]) + if "image_src" in rel_key or (as_value == "image" and any(token in rel for token in ("preload", "prefetch"))): + if attr.get("href"): + self.priority_urls.append(attr["href"]) + for name in ("imagesrcset", "image-srcset"): + if attr.get(name): + self.priority_urls.extend(_srcset_image_urls(attr[name])) + return + if tag_name == "source": + for name in ("srcset", "data-original-srcset", "data-lazy-srcset", "data-srcset"): + if attr.get(name): + self.priority_urls.extend(_srcset_image_urls(attr[name])) + return + if tag_name in {"img", "amp-img", "amp-anim"}: + target_urls = self.priority_urls if _is_likely_primary_image_attrs(attr) else self.image_urls + for name in ("data-original-srcset", "data-lazy-srcset", "data-srcset", "srcset"): + if attr.get(name): + target_urls.extend(_srcset_image_urls(attr[name])) + for name in ("data-original", "data-original-src", "data-lazy-src", "data-src", "src"): + if attr.get(name): + target_urls.append(attr[name]) + return + + def handle_data(self, data: str) -> None: + if self._collecting_script_type: + self._script_chunks.append(data) + if self._collecting_style: + self._style_chunks.append(data) + if self._collecting_noscript: + self._noscript_chunks.append(data) + + def handle_endtag(self, tag: str) -> None: + tag_name = tag.lower() + if tag_name == "noscript" and self._collecting_noscript: + self._collecting_noscript = False + parser = _PageImageParser(parse_noscript=False) + parser.feed(html.unescape("".join(self._noscript_chunks))) + self.priority_urls.extend(parser.priority_urls) + self.image_urls.extend(parser.image_urls) + self.stylesheet_urls.extend(parser.stylesheet_urls) + self._noscript_chunks = [] + return + if tag_name == "style" and self._collecting_style: + self._collecting_style = False + self.image_urls.extend(_css_url_image_urls("".join(self._style_chunks))) + self._style_chunks = [] + return + if tag_name != "script" or not self._collecting_script_type: + return + script_type = self._collecting_script_type + self._collecting_script_type = "" + script_content = "".join(self._script_chunks) + self._script_chunks = [] + if script_type == "json_ld": + self.priority_urls.extend(_json_ld_image_urls(script_content)) + elif script_type == "json": + self.priority_urls.extend(_json_script_image_urls(script_content)) + elif script_type == "javascript": + self.priority_urls.extend(_javascript_image_urls(script_content)) + + +def _srcset_image_urls(value: str) -> list[str]: + candidates: list[tuple[float, int, str]] = [] + for order, raw_candidate in enumerate(_split_srcset_candidates(str(value))): + candidate = raw_candidate.strip() + if not candidate: + continue + parts = candidate.split() + url = parts[0].strip() + if not url: + continue + score = _srcset_descriptor_score(parts[1] if len(parts) > 1 else "") + candidates.append((score, order, url)) + return [ + url + for _, _, url in sorted( + candidates, + key=lambda item: (-item[0], item[1]), + ) + ] + + +def _split_srcset_candidates(value: str) -> list[str]: + candidates: list[str] = [] + start = 0 + for index, character in enumerate(value): + if character != ",": + continue + remainder = value[index + 1 :].lstrip() + if not _starts_srcset_candidate(remainder): + continue + candidates.append(value[start:index]) + start = index + 1 + candidates.append(value[start:]) + return candidates + + +def _starts_srcset_candidate(value: str) -> bool: + text = str(value).strip() + if not text: + return False + first_token = text.split(None, 1)[0] + return _is_urlish_reference(first_token) or _is_scheme_less_remote_image_url(first_token) + + +def _srcset_descriptor_score(value: str) -> float: + descriptor = value.strip().lower() + if descriptor.endswith("w"): + try: + return float(descriptor[:-1]) + except ValueError: + return 0.0 + if descriptor.endswith("x"): + try: + return float(descriptor[:-1]) * 1000 + except ValueError: + return 0.0 + return 0.0 + + +def _is_generic_data_image_attr(name: str, value: str) -> bool: + return bool(_data_attribute_image_urls(name, value)) + + +def _data_attribute_image_urls( + name: str, + value: str, + *, + known_image_attr: bool = False, +) -> list[str]: + attr_name = str(name).lower().replace("-", "_").replace(":", "_") + text = html.unescape(str(value).strip()) + if not attr_name.startswith("data_") or not text or text.lower().startswith("data:"): + return [] + if _is_srcset_attr_name(attr_name): + return _srcset_image_urls(text) + if _looks_like_image_reference(text): + return [text] + if _is_urlish_reference(text): + return [text] + image_named_attr = _is_image_data_attr_name(attr_name) + if not known_image_attr and not image_named_attr and not _looks_like_json_attribute_text(text): + return [] + return _json_attribute_image_urls( + text, + allow_plain_url_keys=known_image_attr or image_named_attr, + ) + + +def _is_image_data_attr_name(attr_name: str) -> bool: + image_tokens = ( + "avatar", + "background", + "bg", + "image", + "img", + "photo", + "picture", + "poster", + "thumb", + "thumbnail", + ) + return any(token in attr_name for token in image_tokens) + + +def _json_attribute_image_urls( + value: str, + *, + allow_plain_url_keys: bool = True, +) -> list[str]: + text = _json_attribute_text(value) + if not text: + return [] + try: + document = json.loads(text) + except Exception: + return [] + + urls: list[str] = [] + + def collect_likely(value: Any) -> None: + if isinstance(value, str): + urls.extend(_json_image_string_candidates(value)) + return + if isinstance(value, list): + for item in value: + collect_likely(item) + return + if isinstance(value, dict): + for key, child in value.items(): + if _is_srcset_key(str(key)): + urls.extend(_srcset_image_urls(str(child))) + continue + if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)): + collect_likely(child) + elif isinstance(child, (dict, list)): + collect_likely(child) + + def collect_obvious(value: Any) -> None: + if isinstance(value, str): + return + if isinstance(value, list): + for item in value: + collect_obvious(item) + return + if isinstance(value, dict): + for key, child in value.items(): + if _is_srcset_key(str(key)): + urls.extend(_srcset_image_urls(str(child))) + continue + if _is_likely_json_image_key(str(key)): + collect_likely(child) + else: + collect_obvious(child) + + if allow_plain_url_keys: + collect_likely(document) + else: + collect_obvious(document) + return _unique_texts(urls) + + +def _looks_like_json_attribute_text(value: str) -> bool: + return bool(_json_attribute_text(value)) + + +def _json_attribute_text(value: str) -> str: + text = html.unescape(str(value).strip()) + if text.startswith(("{", "[")): + return text + decoded = _decoded_nested_url(text) + if decoded and decoded.startswith(("{", "[")): + return decoded + return "" + + +def _is_srcset_attr_name(name: str) -> bool: + normalized = str(name).lower().replace("-", "_").replace(":", "_") + return "srcset" in normalized or "src_set" in normalized + + +def _is_urlish_reference(value: str) -> bool: + text = str(value).strip() + return ( + _is_http_url(text) + or text.startswith(("/", "//", "./", "../")) + or _url_looks_like_image(text) + ) + + +def _json_ld_image_urls(script_content: str) -> list[str]: + try: + document = json.loads(script_content) + except Exception: + return [] + + urls: list[str] = [] + + def collect_image_value(value: Any) -> None: + if isinstance(value, str): + urls.append(value) + return + if isinstance(value, list): + for item in value: + collect_image_value(item) + return + if isinstance(value, dict): + for key in ("contentUrl", "url", "thumbnailUrl"): + if key in value: + collect_image_value(value[key]) + + def visit(value: Any) -> None: + if isinstance(value, list): + for item in value: + visit(item) + return + if not isinstance(value, dict): + return + for key, child in value.items(): + if str(key).lower() in { + "image", + "thumbnail", + "thumbnailurl", + "contenturl", + "primaryimageofpage", + "associatedmedia", + }: + collect_image_value(child) + else: + visit(child) + + visit(document) + return urls + + +def _json_script_image_urls(script_content: str) -> list[str]: + try: + document = json.loads(script_content) + except Exception: + return [] + + urls: list[str] = [] + + def collect(value: Any) -> None: + if isinstance(value, str): + urls.extend(_json_image_string_candidates(value)) + return + if isinstance(value, list): + for item in value: + collect(item) + return + if isinstance(value, dict): + for key, child in value.items(): + if _is_srcset_key(str(key)): + urls.extend(_srcset_image_urls(str(child))) + continue + if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)): + collect(child) + elif isinstance(child, (dict, list)): + collect(child) + return + + def visit(value: Any) -> None: + if isinstance(value, list): + for item in value: + visit(item) + return + if not isinstance(value, dict): + return + for key, child in value.items(): + if _is_srcset_key(str(key)): + urls.extend(_srcset_image_urls(str(child))) + elif _is_likely_json_image_key(str(key)): + collect(child) + else: + visit(child) + + visit(document) + return urls + + +def _is_likely_json_image_key(key: str) -> bool: + normalized = key.lower().replace("-", "_") + return ( + "image" in normalized + or "thumbnail" in normalized + or _is_srcset_key(normalized) + or normalized in { + "asset_url", + "content_url", + "contenturl", + "media_url", + "mediaurl", + "photo", + "photo_url", + "poster", + "poster_url", + "avatar", + "avatar_url", + } + ) + + +def _is_json_url_value_key(key: str) -> bool: + normalized = re.sub(r"(? list[str]: + raw = str(value).strip() + decoded = _decoded_nested_url(raw) + if decoded != raw and (_looks_like_image_reference(decoded) or _is_urlish_reference(decoded)): + return [decoded] + if _looks_like_image_reference(raw) or _is_urlish_reference(raw): + return [raw] + return [] + + +def _is_srcset_key(key: str) -> bool: + normalized = str(key).lower().replace("-", "_") + return normalized in { + "image_src_set", + "image_srcset", + "imagesrcset", + "photo_src_set", + "photo_srcset", + "picture_src_set", + "picture_srcset", + "src_set", + "srcset", + "thumbnail_src_set", + "thumbnail_srcset", + } + + +def _javascript_image_urls(script_content: str) -> list[str]: + urls: list[str] = [] + image_key = r"[\w$:-]*(?:image|thumbnail|photo|avatar|poster|picture)[\w$:-]*" + srcset_key = r"[\w$:-]*(?:srcset|src-set)[\w$:-]*" + srcset_pattern = re.compile( + rf"""(?is)["']?({srcset_key})["']?\s*[:=]\s*["']([^"']+)["']""" + ) + key_value_pattern = re.compile( + rf"""(?is)["']?({image_key})["']?\s*[:=]\s*["']([^"']+)["']""" + ) + nested_value_pattern = re.compile( + rf"""(?is)["']?({image_key})["']?\s*[:=]\s*\{{[^{{}}]{{0,500}}?["'](?:url|src|contentUrl|thumbnailUrl)["']\s*:\s*["']([^"']+)["']""" + ) + for _key, value in srcset_pattern.findall(script_content): + urls.extend(_srcset_image_urls(_decode_javascript_string(value))) + for pattern in (key_value_pattern, nested_value_pattern): + for _key, value in pattern.findall(script_content): + candidate = _decode_javascript_string(value) + if _looks_like_image_reference(candidate): + urls.append(candidate) + return _unique_texts(urls) + + +def _decode_javascript_string(value: str) -> str: + text = value.replace("\\/", "/") + if "\\u" not in text and "\\x" not in text: + return text + + def _replace_escape(match: re.Match[str]) -> str: + try: + return chr(int(match.group(1) or match.group(2), 16)) + except (TypeError, ValueError): + return match.group(0) + + # Decode only explicit \uXXXX / \xXX escapes. The previous + # bytes(text, "utf-8").decode("unicode_escape") reinterpreted real UTF-8 + # bytes as Latin-1, silently corrupting literal non-ASCII (e.g. Korean) URLs. + return re.sub(r"\\u([0-9a-fA-F]{4})|\\x([0-9a-fA-F]{2})", _replace_escape, text) + + +def _looks_like_image_reference(value: str) -> bool: + text = value.strip() + if not text or text.lower().startswith("data:"): + return False + if _unwrapped_image_url(text): + return True + if _relative_wrapped_image_url(text): + return True + return _url_looks_like_image(text) + + +def _relative_wrapped_image_url(value: str) -> str: + parsed = urlparse(value) + if parsed.scheme or parsed.netloc: + return "" + for key, raw_value in parse_qsl(parsed.query, keep_blank_values=False): + key_text = key.lower().replace("-", "_") + if key_text not in { + "imgurl", + "imageurl", + "image_url", + "mediaurl", + "media_url", + "contenturl", + "content_url", + "photo", + "photo_url", + "src", + "source", + "image", + "img", + "url", + "u", + }: + continue + candidate = _decoded_nested_url(raw_value) + if candidate.startswith("/") or _url_looks_like_image(candidate): + return candidate + return "" + + +def _is_likely_primary_image_attrs(attr: dict[str, str]) -> bool: + text = " ".join( + str(attr.get(name, "")) + for name in ( + "alt", + "aria-label", + "class", + "data-image-type", + "data-role", + "id", + "itemprop", + "src", + "data-src", + "data-original", + "data-lazy-src", + "data-original-src", + ) + ).casefold() + negative_tokens = ( + "advert", + "avatar", + "badge", + "banner", + "button", + "emoji", + "favicon", + "icon", + "logo", + "sprite", + "tracking", + ) + if any(token in text for token in negative_tokens): + return False + + positive_tokens = ( + "article", + "cover", + "full", + "hero", + "main", + "official", + "photo", + "picture", + "portrait", + "primary", + "profile", + "representative", + "thumbnail", + ) + if any(token in text for token in positive_tokens): + return True + + width = _numeric_attr(attr.get("width", "")) + height = _numeric_attr(attr.get("height", "")) + loading = attr.get("loading", "").casefold() + fetchpriority = attr.get("fetchpriority", "").casefold() + return ( + width >= 300 + and height >= 300 + and (fetchpriority == "high" or loading != "lazy") + ) + + +def _numeric_attr(value: str) -> int: + match = re.search(r"\d+", str(value)) + if not match: + return 0 + try: + return int(match.group(0)) + except ValueError: + return 0 + + +def _css_url_image_urls(style: str) -> list[str]: + direct_urls = [ + match.group(2).strip() + for match in re.finditer(r"url\(\s*(['\"]?)(.*?)\1\s*\)", style, flags=re.IGNORECASE) + if match.group(2).strip() + ] + return _unique_texts([*_css_image_set_urls(style), *direct_urls]) + + +def _css_image_set_urls(style: str) -> list[str]: + candidates: list[tuple[float, int, str]] = [] + order = 0 + for body in _css_image_set_bodies(style): + for raw_candidate in _split_top_level_commas(body): + url, descriptor = _css_image_set_candidate(raw_candidate) + if not url: + continue + candidates.append((_css_image_set_descriptor_score(descriptor), order, url)) + order += 1 + return [ + url + for _, _, url in sorted( + candidates, + key=lambda item: (-item[0], item[1]), + ) + ] + + +def _css_image_set_bodies(style: str) -> list[str]: + bodies: list[str] = [] + for match in re.finditer(r"(?:-webkit-)?image-set\s*\(", style, flags=re.IGNORECASE): + start = match.end() + depth = 1 + quote = "" + escaped = False + for index in range(start, len(style)): + character = style[index] + if quote: + if escaped: + escaped = False + elif character == "\\": + escaped = True + elif character == quote: + quote = "" + continue + if character in {"'", '"'}: + quote = character + continue + if character == "(": + depth += 1 + continue + if character == ")": + depth -= 1 + if depth == 0: + bodies.append(style[start:index]) + break + return bodies + + +def _split_top_level_commas(value: str) -> list[str]: + parts: list[str] = [] + start = 0 + depth = 0 + quote = "" + escaped = False + for index, character in enumerate(value): + if quote: + if escaped: + escaped = False + elif character == "\\": + escaped = True + elif character == quote: + quote = "" + continue + if character in {"'", '"'}: + quote = character + continue + if character == "(": + depth += 1 + continue + if character == ")": + depth = max(0, depth - 1) + continue + if character == "," and depth == 0: + parts.append(value[start:index].strip()) + start = index + 1 + parts.append(value[start:].strip()) + return [part for part in parts if part] + + +def _css_image_set_candidate(value: str) -> tuple[str, str]: + url_match = re.search(r"url\(\s*(['\"]?)(.*?)\1\s*\)", value, flags=re.IGNORECASE) + if url_match: + return url_match.group(2).strip(), value[url_match.end() :] + + quoted_match = re.search(r"""(['"])(.*?)\1""", value) + if quoted_match: + return quoted_match.group(2).strip(), value[quoted_match.end() :] + + parts = value.split(None, 1) + if parts and _looks_like_image_reference(parts[0]): + return parts[0].strip(), parts[1] if len(parts) > 1 else "" + return "", "" + + +def _css_image_set_descriptor_score(value: str) -> float: + descriptor = value.strip().lower() + match = re.search(r"([0-9]*\.?[0-9]+)\s*(dppx|dpi|x|w)\b", descriptor) + if not match: + return 0.0 + number = float(match.group(1)) + unit = match.group(2) + if unit == "w": + return number + if unit == "dpi": + return (number / 96) * 1000 + return number * 1000 + + +def _extract_page_image_urls(content: bytes, base_url: str, limit: int) -> list[str]: + if limit <= 0: + return [] + return [ + url + for url in _page_image_references(content, base_url)[0] + if _is_http_url(url) + ][:limit] + + +def _extract_page_stylesheet_urls(content: bytes, base_url: str, limit: int) -> list[str]: + if limit <= 0: + return [] + return [ + url + for url in _page_image_references(content, base_url)[1] + if _is_http_url(url) + ][:limit] + + +def _extract_css_image_urls(content: bytes, base_url: str, limit: int) -> list[str]: + if limit <= 0: + return [] + return [ + url + for url in _unique_texts( + _normalized_image_url(base_url, url) + for url in _css_url_image_urls(content.decode("utf-8", errors="replace")) + ) + if _is_http_url(url) + ][:limit] + + +def _page_image_references(content: bytes, base_url: str) -> tuple[list[str], list[str]]: + parser = _PageImageParser() + parser.feed(content.decode("utf-8", errors="replace")) + image_urls = [ + url + for url in _unique_texts( + _normalized_image_url(base_url, url) + for url in [*parser.priority_urls, *parser.image_urls] + ) + if _is_http_url(url) + ] + stylesheet_urls = [ + url + for url in _unique_texts( + _normalized_image_url(base_url, url) + for url in parser.stylesheet_urls + ) + if _is_http_url(url) + ] + return image_urls, stylesheet_urls + + +def _content_has_comparable_image_fingerprint(content: bytes) -> bool: + try: + fingerprint = FingerprintService().fingerprints_for(content).perceptual + except Exception: + return False + return not fingerprint.startswith("phash:unavailable:") + + +def _search_result_direct_image_urls(source_evidence: Evidence) -> list[str]: + result_url = str( + source_evidence.data.get("result_url", source_evidence.data.get("url", "")) + ) + unwrapped_url = _unwrapped_image_url(result_url) + if unwrapped_url: + return [unwrapped_url] + if _is_http_url(result_url) and _url_looks_like_image(result_url): + return [result_url] + return [] + + +def _normalized_image_url(base_url: str, url: str) -> str: + text = _decoded_url_reference(str(url).strip()) + if not text or text.lower().startswith("data:"): + return "" + if _is_scheme_less_remote_image_url(text): + text = f"https://{text.lstrip('/')}" + normalized = urljoin(base_url, text) + return _unwrapped_image_url(normalized) or normalized + + +def _normalized_remote_image_url(url: str) -> str: + text = _decoded_url_reference(str(url).strip()) + if not text or text.lower().startswith("data:"): + return "" + if _is_scheme_less_remote_image_url(text): + text = f"https://{text.lstrip('/')}" + return _unwrapped_image_url(text) or text + + +def _unwrapped_image_url(url: str) -> str: + if not _is_http_url(url): + return "" + parsed = urlparse(url) + strong_keys = { + "imgurl", + "imageurl", + "image_url", + "mediaurl", + "media_url", + "contenturl", + "content_url", + "photo", + "photo_url", + "src", + "source", + "image", + "img", + } + weak_keys = {"url", "u", "target", "redirect", "redirect_url"} + for key, value in parse_qsl(parsed.query, keep_blank_values=False): + key_text = key.lower().replace("-", "_") + candidate = _decoded_nested_url(value) + if not candidate: + continue + if not _is_http_url(candidate): + if candidate.startswith("//"): + candidate = f"https:{candidate}" + elif _is_scheme_less_remote_image_url(candidate): + candidate = f"https://{candidate.lstrip('/')}" + elif candidate.startswith("/") or _url_looks_like_image(candidate): + candidate = urljoin(url, candidate) + else: + continue + if key_text in strong_keys: + return candidate + if key_text in weak_keys and _url_looks_like_image(candidate): + return candidate + return "" + + +def _is_scheme_less_remote_image_url(value: str) -> bool: + text = str(value).strip().lstrip("/") + if not _url_looks_like_image(text): + return False + first_segment = text.split("/", 1)[0] + if first_segment in {".", ".."} or first_segment.startswith("."): + return False + return "." in first_segment and " " not in first_segment + + +def _decoded_url_reference(value: str) -> str: + raw = str(value).strip() + decoded = _decoded_nested_url(raw) + if decoded == raw: + return raw + if ( + _is_http_url(decoded) + or decoded.startswith(("/", "//", "./", "../")) + or _is_scheme_less_remote_image_url(decoded) + or _url_looks_like_image(decoded) + ): + return decoded + return raw diff --git a/src/rights_filter/server/store_text.py b/src/rights_filter/server/store_text.py new file mode 100644 index 0000000..e2211db --- /dev/null +++ b/src/rights_filter/server/store_text.py @@ -0,0 +1,27 @@ +"""Pure text-normalization helpers shared by the SQLite store and its +extracted submodules. Extracted from sqlite_store.py; behavior unchanged. +""" + +from __future__ import annotations + +from typing import Any + + +def _text_list(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(item).strip() for item in value if str(item).strip()] + return [item.strip() for item in str(value).split(",") if item.strip()] + + +def _unique_texts(values: Any) -> list[str]: + seen: set[str] = set() + result: list[str] = [] + for value in values: + text = str(value).strip() + if not text or text in seen: + continue + seen.add(text) + result.append(text) + return result diff --git a/tests/rights_filter/test_review_fixes.py b/tests/rights_filter/test_review_fixes.py index 969cf0e..e30a73d 100644 --- a/tests/rights_filter/test_review_fixes.py +++ b/tests/rights_filter/test_review_fixes.py @@ -16,7 +16,8 @@ from rights_filter.analysis.search_result_promoter import SearchResultPromoter from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.integrations.naver_search import NaverSearchAdapter from rights_filter.integrations.search_policy import SearchApiPolicy -from rights_filter.server.sqlite_store import CopyrighterStore, _decode_javascript_string +from rights_filter.server.sqlite_store import CopyrighterStore +from rights_filter.server.store_page_scrape import _decode_javascript_string # --- #1 CRITICAL: constraint migration must not cascade-delete evidence -------