refactor: extract text helpers and HTML/CSS image-scraping from sqlite_store

Move the pure text helpers (_text_list, _unique_texts) into store_text and the
~950-line page/CSS/JSON/srcset image-URL extraction (the _PageImageParser and
its helpers) into store_page_scrape. Both behavior-preserving; store_page_scrape
depends only on stdlib + url/text helpers + domain Evidence (no store coupling).
sqlite_store.py 4955 -> 3992 lines.
This commit is contained in:
유창욱 2026-06-20 21:10:22 +09:00
parent bd35cf6f3f
commit e3bc99e6b9
4 changed files with 1015 additions and 974 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,976 @@
"""HTML/CSS/JSON image-URL extraction for external search-result pages.
Extracted from sqlite_store.py (the ~950-line "URL scraping" responsibility the
architecture review flagged as not belonging in a persistence store). Pure
parsing/normalization of fetched page content; behavior unchanged. Depends only
on stdlib, the URL/text helpers, and domain Evidence never on the store class.
"""
from __future__ import annotations
import html
import json
import re
from html.parser import HTMLParser
from typing import Any
from urllib.parse import parse_qsl, urljoin, urlparse
from rights_filter.analysis.fingerprints import FingerprintService
from rights_filter.domain.records import Evidence
from rights_filter.server.store_text import _unique_texts
from rights_filter.server.store_url_utils import (
_decoded_nested_url,
_is_http_url,
_url_looks_like_image,
)
class _PageImageParser(HTMLParser):
def __init__(self, parse_noscript: bool = True) -> None:
super().__init__()
self.parse_noscript = parse_noscript
self.priority_urls: list[str] = []
self.image_urls: list[str] = []
self.stylesheet_urls: list[str] = []
self._script_chunks: list[str] = []
self._collecting_script_type = ""
self._style_chunks: list[str] = []
self._collecting_style = False
self._noscript_chunks: list[str] = []
self._collecting_noscript = False
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attr = {name.lower(): (value or "") for name, value in attrs}
tag_name = tag.lower()
if tag_name == "noscript" and self.parse_noscript:
self._collecting_noscript = True
self._noscript_chunks = []
return
if tag_name == "style":
self._collecting_style = True
self._style_chunks = []
return
if tag_name == "script":
script_type = attr.get("type", "").lower()
if "ld+json" in script_type:
self._collecting_script_type = "json_ld"
self._script_chunks = []
elif script_type in {"application/json", "application/problem+json", "application/activity+json"}:
self._collecting_script_type = "json"
self._script_chunks = []
elif script_type in {"", "text/javascript", "application/javascript", "module"}:
self._collecting_script_type = "javascript"
self._script_chunks = []
return
if attr.get("style"):
self.image_urls.extend(_css_url_image_urls(attr["style"]))
if attr.get("poster") and _looks_like_image_reference(attr["poster"]):
self.priority_urls.append(attr["poster"])
for name in (
"data-bg",
"data-bg-url",
"data-background",
"data-background-image",
"data-background-url",
"data-lazy-background",
"data-image",
"data-image-src",
"data-image-url",
"data-img",
"data-img-src",
"data-img-url",
"data-src",
"data-src-large",
"data-fallback-src",
"data-full",
"data-full-src",
"data-full-image",
"data-full-url",
"data-hires",
"data-highres",
"data-large",
"data-large-src",
"data-large-image",
"data-large-url",
"data-original",
"data-original-image",
"data-lazy-src",
"data-original-src",
"data-original-url",
"data-photo",
"data-photo-url",
"data-thumb",
"data-thumb-url",
"data-thumbnail",
"data-thumbnail-url",
"data-zoom-image",
):
if attr.get(name):
self.image_urls.extend(
_data_attribute_image_urls(name, attr[name], known_image_attr=True)
)
for name, value in attr.items():
self.image_urls.extend(_data_attribute_image_urls(name, value))
if tag_name in {"a", "area"} and attr.get("href") and _looks_like_image_reference(attr["href"]):
self.image_urls.append(attr["href"])
if tag_name == "meta":
key = (attr.get("property") or attr.get("name") or attr.get("itemprop") or "").lower()
if key in {
"image",
"contenturl",
"thumbnail",
"thumbnailurl",
"og:image",
"og:image:url",
"og:image:secure_url",
"twitter:image",
"twitter:image:src",
"twitter:image:url",
}:
self.priority_urls.append(attr.get("content", ""))
return
if tag_name == "link":
rel = attr.get("rel", "").lower()
rel_key = rel.replace("-", "_")
as_value = attr.get("as", "").lower()
if "stylesheet" in rel and attr.get("href"):
self.stylesheet_urls.append(attr["href"])
if "image_src" in rel_key or (as_value == "image" and any(token in rel for token in ("preload", "prefetch"))):
if attr.get("href"):
self.priority_urls.append(attr["href"])
for name in ("imagesrcset", "image-srcset"):
if attr.get(name):
self.priority_urls.extend(_srcset_image_urls(attr[name]))
return
if tag_name == "source":
for name in ("srcset", "data-original-srcset", "data-lazy-srcset", "data-srcset"):
if attr.get(name):
self.priority_urls.extend(_srcset_image_urls(attr[name]))
return
if tag_name in {"img", "amp-img", "amp-anim"}:
target_urls = self.priority_urls if _is_likely_primary_image_attrs(attr) else self.image_urls
for name in ("data-original-srcset", "data-lazy-srcset", "data-srcset", "srcset"):
if attr.get(name):
target_urls.extend(_srcset_image_urls(attr[name]))
for name in ("data-original", "data-original-src", "data-lazy-src", "data-src", "src"):
if attr.get(name):
target_urls.append(attr[name])
return
def handle_data(self, data: str) -> None:
if self._collecting_script_type:
self._script_chunks.append(data)
if self._collecting_style:
self._style_chunks.append(data)
if self._collecting_noscript:
self._noscript_chunks.append(data)
def handle_endtag(self, tag: str) -> None:
tag_name = tag.lower()
if tag_name == "noscript" and self._collecting_noscript:
self._collecting_noscript = False
parser = _PageImageParser(parse_noscript=False)
parser.feed(html.unescape("".join(self._noscript_chunks)))
self.priority_urls.extend(parser.priority_urls)
self.image_urls.extend(parser.image_urls)
self.stylesheet_urls.extend(parser.stylesheet_urls)
self._noscript_chunks = []
return
if tag_name == "style" and self._collecting_style:
self._collecting_style = False
self.image_urls.extend(_css_url_image_urls("".join(self._style_chunks)))
self._style_chunks = []
return
if tag_name != "script" or not self._collecting_script_type:
return
script_type = self._collecting_script_type
self._collecting_script_type = ""
script_content = "".join(self._script_chunks)
self._script_chunks = []
if script_type == "json_ld":
self.priority_urls.extend(_json_ld_image_urls(script_content))
elif script_type == "json":
self.priority_urls.extend(_json_script_image_urls(script_content))
elif script_type == "javascript":
self.priority_urls.extend(_javascript_image_urls(script_content))
def _srcset_image_urls(value: str) -> list[str]:
candidates: list[tuple[float, int, str]] = []
for order, raw_candidate in enumerate(_split_srcset_candidates(str(value))):
candidate = raw_candidate.strip()
if not candidate:
continue
parts = candidate.split()
url = parts[0].strip()
if not url:
continue
score = _srcset_descriptor_score(parts[1] if len(parts) > 1 else "")
candidates.append((score, order, url))
return [
url
for _, _, url in sorted(
candidates,
key=lambda item: (-item[0], item[1]),
)
]
def _split_srcset_candidates(value: str) -> list[str]:
candidates: list[str] = []
start = 0
for index, character in enumerate(value):
if character != ",":
continue
remainder = value[index + 1 :].lstrip()
if not _starts_srcset_candidate(remainder):
continue
candidates.append(value[start:index])
start = index + 1
candidates.append(value[start:])
return candidates
def _starts_srcset_candidate(value: str) -> bool:
text = str(value).strip()
if not text:
return False
first_token = text.split(None, 1)[0]
return _is_urlish_reference(first_token) or _is_scheme_less_remote_image_url(first_token)
def _srcset_descriptor_score(value: str) -> float:
descriptor = value.strip().lower()
if descriptor.endswith("w"):
try:
return float(descriptor[:-1])
except ValueError:
return 0.0
if descriptor.endswith("x"):
try:
return float(descriptor[:-1]) * 1000
except ValueError:
return 0.0
return 0.0
def _is_generic_data_image_attr(name: str, value: str) -> bool:
return bool(_data_attribute_image_urls(name, value))
def _data_attribute_image_urls(
name: str,
value: str,
*,
known_image_attr: bool = False,
) -> list[str]:
attr_name = str(name).lower().replace("-", "_").replace(":", "_")
text = html.unescape(str(value).strip())
if not attr_name.startswith("data_") or not text or text.lower().startswith("data:"):
return []
if _is_srcset_attr_name(attr_name):
return _srcset_image_urls(text)
if _looks_like_image_reference(text):
return [text]
if _is_urlish_reference(text):
return [text]
image_named_attr = _is_image_data_attr_name(attr_name)
if not known_image_attr and not image_named_attr and not _looks_like_json_attribute_text(text):
return []
return _json_attribute_image_urls(
text,
allow_plain_url_keys=known_image_attr or image_named_attr,
)
def _is_image_data_attr_name(attr_name: str) -> bool:
image_tokens = (
"avatar",
"background",
"bg",
"image",
"img",
"photo",
"picture",
"poster",
"thumb",
"thumbnail",
)
return any(token in attr_name for token in image_tokens)
def _json_attribute_image_urls(
value: str,
*,
allow_plain_url_keys: bool = True,
) -> list[str]:
text = _json_attribute_text(value)
if not text:
return []
try:
document = json.loads(text)
except Exception:
return []
urls: list[str] = []
def collect_likely(value: Any) -> None:
if isinstance(value, str):
urls.extend(_json_image_string_candidates(value))
return
if isinstance(value, list):
for item in value:
collect_likely(item)
return
if isinstance(value, dict):
for key, child in value.items():
if _is_srcset_key(str(key)):
urls.extend(_srcset_image_urls(str(child)))
continue
if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)):
collect_likely(child)
elif isinstance(child, (dict, list)):
collect_likely(child)
def collect_obvious(value: Any) -> None:
if isinstance(value, str):
return
if isinstance(value, list):
for item in value:
collect_obvious(item)
return
if isinstance(value, dict):
for key, child in value.items():
if _is_srcset_key(str(key)):
urls.extend(_srcset_image_urls(str(child)))
continue
if _is_likely_json_image_key(str(key)):
collect_likely(child)
else:
collect_obvious(child)
if allow_plain_url_keys:
collect_likely(document)
else:
collect_obvious(document)
return _unique_texts(urls)
def _looks_like_json_attribute_text(value: str) -> bool:
return bool(_json_attribute_text(value))
def _json_attribute_text(value: str) -> str:
text = html.unescape(str(value).strip())
if text.startswith(("{", "[")):
return text
decoded = _decoded_nested_url(text)
if decoded and decoded.startswith(("{", "[")):
return decoded
return ""
def _is_srcset_attr_name(name: str) -> bool:
normalized = str(name).lower().replace("-", "_").replace(":", "_")
return "srcset" in normalized or "src_set" in normalized
def _is_urlish_reference(value: str) -> bool:
text = str(value).strip()
return (
_is_http_url(text)
or text.startswith(("/", "//", "./", "../"))
or _url_looks_like_image(text)
)
def _json_ld_image_urls(script_content: str) -> list[str]:
try:
document = json.loads(script_content)
except Exception:
return []
urls: list[str] = []
def collect_image_value(value: Any) -> None:
if isinstance(value, str):
urls.append(value)
return
if isinstance(value, list):
for item in value:
collect_image_value(item)
return
if isinstance(value, dict):
for key in ("contentUrl", "url", "thumbnailUrl"):
if key in value:
collect_image_value(value[key])
def visit(value: Any) -> None:
if isinstance(value, list):
for item in value:
visit(item)
return
if not isinstance(value, dict):
return
for key, child in value.items():
if str(key).lower() in {
"image",
"thumbnail",
"thumbnailurl",
"contenturl",
"primaryimageofpage",
"associatedmedia",
}:
collect_image_value(child)
else:
visit(child)
visit(document)
return urls
def _json_script_image_urls(script_content: str) -> list[str]:
try:
document = json.loads(script_content)
except Exception:
return []
urls: list[str] = []
def collect(value: Any) -> None:
if isinstance(value, str):
urls.extend(_json_image_string_candidates(value))
return
if isinstance(value, list):
for item in value:
collect(item)
return
if isinstance(value, dict):
for key, child in value.items():
if _is_srcset_key(str(key)):
urls.extend(_srcset_image_urls(str(child)))
continue
if _is_json_url_value_key(str(key)) or _is_likely_json_image_key(str(key)):
collect(child)
elif isinstance(child, (dict, list)):
collect(child)
return
def visit(value: Any) -> None:
if isinstance(value, list):
for item in value:
visit(item)
return
if not isinstance(value, dict):
return
for key, child in value.items():
if _is_srcset_key(str(key)):
urls.extend(_srcset_image_urls(str(child)))
elif _is_likely_json_image_key(str(key)):
collect(child)
else:
visit(child)
visit(document)
return urls
def _is_likely_json_image_key(key: str) -> bool:
normalized = key.lower().replace("-", "_")
return (
"image" in normalized
or "thumbnail" in normalized
or _is_srcset_key(normalized)
or normalized in {
"asset_url",
"content_url",
"contenturl",
"media_url",
"mediaurl",
"photo",
"photo_url",
"poster",
"poster_url",
"avatar",
"avatar_url",
}
)
def _is_json_url_value_key(key: str) -> bool:
normalized = re.sub(r"(?<!^)(?=[A-Z])", "_", str(key)).lower().replace("-", "_")
normalized = normalized.replace("__", "_")
return normalized in {
"content_url",
"contenturl",
"download_url",
"file_url",
"href",
"original_url",
"public_url",
"secure_url",
"src",
"thumbnail_url",
"thumbnailurl",
"url",
}
def _json_image_string_candidates(value: str) -> list[str]:
raw = str(value).strip()
decoded = _decoded_nested_url(raw)
if decoded != raw and (_looks_like_image_reference(decoded) or _is_urlish_reference(decoded)):
return [decoded]
if _looks_like_image_reference(raw) or _is_urlish_reference(raw):
return [raw]
return []
def _is_srcset_key(key: str) -> bool:
normalized = str(key).lower().replace("-", "_")
return normalized in {
"image_src_set",
"image_srcset",
"imagesrcset",
"photo_src_set",
"photo_srcset",
"picture_src_set",
"picture_srcset",
"src_set",
"srcset",
"thumbnail_src_set",
"thumbnail_srcset",
}
def _javascript_image_urls(script_content: str) -> list[str]:
urls: list[str] = []
image_key = r"[\w$:-]*(?:image|thumbnail|photo|avatar|poster|picture)[\w$:-]*"
srcset_key = r"[\w$:-]*(?:srcset|src-set)[\w$:-]*"
srcset_pattern = re.compile(
rf"""(?is)["']?({srcset_key})["']?\s*[:=]\s*["']([^"']+)["']"""
)
key_value_pattern = re.compile(
rf"""(?is)["']?({image_key})["']?\s*[:=]\s*["']([^"']+)["']"""
)
nested_value_pattern = re.compile(
rf"""(?is)["']?({image_key})["']?\s*[:=]\s*\{{[^{{}}]{{0,500}}?["'](?:url|src|contentUrl|thumbnailUrl)["']\s*:\s*["']([^"']+)["']"""
)
for _key, value in srcset_pattern.findall(script_content):
urls.extend(_srcset_image_urls(_decode_javascript_string(value)))
for pattern in (key_value_pattern, nested_value_pattern):
for _key, value in pattern.findall(script_content):
candidate = _decode_javascript_string(value)
if _looks_like_image_reference(candidate):
urls.append(candidate)
return _unique_texts(urls)
def _decode_javascript_string(value: str) -> str:
text = value.replace("\\/", "/")
if "\\u" not in text and "\\x" not in text:
return text
def _replace_escape(match: re.Match[str]) -> str:
try:
return chr(int(match.group(1) or match.group(2), 16))
except (TypeError, ValueError):
return match.group(0)
# Decode only explicit \uXXXX / \xXX escapes. The previous
# bytes(text, "utf-8").decode("unicode_escape") reinterpreted real UTF-8
# bytes as Latin-1, silently corrupting literal non-ASCII (e.g. Korean) URLs.
return re.sub(r"\\u([0-9a-fA-F]{4})|\\x([0-9a-fA-F]{2})", _replace_escape, text)
def _looks_like_image_reference(value: str) -> bool:
text = value.strip()
if not text or text.lower().startswith("data:"):
return False
if _unwrapped_image_url(text):
return True
if _relative_wrapped_image_url(text):
return True
return _url_looks_like_image(text)
def _relative_wrapped_image_url(value: str) -> str:
parsed = urlparse(value)
if parsed.scheme or parsed.netloc:
return ""
for key, raw_value in parse_qsl(parsed.query, keep_blank_values=False):
key_text = key.lower().replace("-", "_")
if key_text not in {
"imgurl",
"imageurl",
"image_url",
"mediaurl",
"media_url",
"contenturl",
"content_url",
"photo",
"photo_url",
"src",
"source",
"image",
"img",
"url",
"u",
}:
continue
candidate = _decoded_nested_url(raw_value)
if candidate.startswith("/") or _url_looks_like_image(candidate):
return candidate
return ""
def _is_likely_primary_image_attrs(attr: dict[str, str]) -> bool:
text = " ".join(
str(attr.get(name, ""))
for name in (
"alt",
"aria-label",
"class",
"data-image-type",
"data-role",
"id",
"itemprop",
"src",
"data-src",
"data-original",
"data-lazy-src",
"data-original-src",
)
).casefold()
negative_tokens = (
"advert",
"avatar",
"badge",
"banner",
"button",
"emoji",
"favicon",
"icon",
"logo",
"sprite",
"tracking",
)
if any(token in text for token in negative_tokens):
return False
positive_tokens = (
"article",
"cover",
"full",
"hero",
"main",
"official",
"photo",
"picture",
"portrait",
"primary",
"profile",
"representative",
"thumbnail",
)
if any(token in text for token in positive_tokens):
return True
width = _numeric_attr(attr.get("width", ""))
height = _numeric_attr(attr.get("height", ""))
loading = attr.get("loading", "").casefold()
fetchpriority = attr.get("fetchpriority", "").casefold()
return (
width >= 300
and height >= 300
and (fetchpriority == "high" or loading != "lazy")
)
def _numeric_attr(value: str) -> int:
match = re.search(r"\d+", str(value))
if not match:
return 0
try:
return int(match.group(0))
except ValueError:
return 0
def _css_url_image_urls(style: str) -> list[str]:
direct_urls = [
match.group(2).strip()
for match in re.finditer(r"url\(\s*(['\"]?)(.*?)\1\s*\)", style, flags=re.IGNORECASE)
if match.group(2).strip()
]
return _unique_texts([*_css_image_set_urls(style), *direct_urls])
def _css_image_set_urls(style: str) -> list[str]:
candidates: list[tuple[float, int, str]] = []
order = 0
for body in _css_image_set_bodies(style):
for raw_candidate in _split_top_level_commas(body):
url, descriptor = _css_image_set_candidate(raw_candidate)
if not url:
continue
candidates.append((_css_image_set_descriptor_score(descriptor), order, url))
order += 1
return [
url
for _, _, url in sorted(
candidates,
key=lambda item: (-item[0], item[1]),
)
]
def _css_image_set_bodies(style: str) -> list[str]:
bodies: list[str] = []
for match in re.finditer(r"(?:-webkit-)?image-set\s*\(", style, flags=re.IGNORECASE):
start = match.end()
depth = 1
quote = ""
escaped = False
for index in range(start, len(style)):
character = style[index]
if quote:
if escaped:
escaped = False
elif character == "\\":
escaped = True
elif character == quote:
quote = ""
continue
if character in {"'", '"'}:
quote = character
continue
if character == "(":
depth += 1
continue
if character == ")":
depth -= 1
if depth == 0:
bodies.append(style[start:index])
break
return bodies
def _split_top_level_commas(value: str) -> list[str]:
parts: list[str] = []
start = 0
depth = 0
quote = ""
escaped = False
for index, character in enumerate(value):
if quote:
if escaped:
escaped = False
elif character == "\\":
escaped = True
elif character == quote:
quote = ""
continue
if character in {"'", '"'}:
quote = character
continue
if character == "(":
depth += 1
continue
if character == ")":
depth = max(0, depth - 1)
continue
if character == "," and depth == 0:
parts.append(value[start:index].strip())
start = index + 1
parts.append(value[start:].strip())
return [part for part in parts if part]
def _css_image_set_candidate(value: str) -> tuple[str, str]:
url_match = re.search(r"url\(\s*(['\"]?)(.*?)\1\s*\)", value, flags=re.IGNORECASE)
if url_match:
return url_match.group(2).strip(), value[url_match.end() :]
quoted_match = re.search(r"""(['"])(.*?)\1""", value)
if quoted_match:
return quoted_match.group(2).strip(), value[quoted_match.end() :]
parts = value.split(None, 1)
if parts and _looks_like_image_reference(parts[0]):
return parts[0].strip(), parts[1] if len(parts) > 1 else ""
return "", ""
def _css_image_set_descriptor_score(value: str) -> float:
descriptor = value.strip().lower()
match = re.search(r"([0-9]*\.?[0-9]+)\s*(dppx|dpi|x|w)\b", descriptor)
if not match:
return 0.0
number = float(match.group(1))
unit = match.group(2)
if unit == "w":
return number
if unit == "dpi":
return (number / 96) * 1000
return number * 1000
def _extract_page_image_urls(content: bytes, base_url: str, limit: int) -> list[str]:
if limit <= 0:
return []
return [
url
for url in _page_image_references(content, base_url)[0]
if _is_http_url(url)
][:limit]
def _extract_page_stylesheet_urls(content: bytes, base_url: str, limit: int) -> list[str]:
if limit <= 0:
return []
return [
url
for url in _page_image_references(content, base_url)[1]
if _is_http_url(url)
][:limit]
def _extract_css_image_urls(content: bytes, base_url: str, limit: int) -> list[str]:
if limit <= 0:
return []
return [
url
for url in _unique_texts(
_normalized_image_url(base_url, url)
for url in _css_url_image_urls(content.decode("utf-8", errors="replace"))
)
if _is_http_url(url)
][:limit]
def _page_image_references(content: bytes, base_url: str) -> tuple[list[str], list[str]]:
parser = _PageImageParser()
parser.feed(content.decode("utf-8", errors="replace"))
image_urls = [
url
for url in _unique_texts(
_normalized_image_url(base_url, url)
for url in [*parser.priority_urls, *parser.image_urls]
)
if _is_http_url(url)
]
stylesheet_urls = [
url
for url in _unique_texts(
_normalized_image_url(base_url, url)
for url in parser.stylesheet_urls
)
if _is_http_url(url)
]
return image_urls, stylesheet_urls
def _content_has_comparable_image_fingerprint(content: bytes) -> bool:
try:
fingerprint = FingerprintService().fingerprints_for(content).perceptual
except Exception:
return False
return not fingerprint.startswith("phash:unavailable:")
def _search_result_direct_image_urls(source_evidence: Evidence) -> list[str]:
result_url = str(
source_evidence.data.get("result_url", source_evidence.data.get("url", ""))
)
unwrapped_url = _unwrapped_image_url(result_url)
if unwrapped_url:
return [unwrapped_url]
if _is_http_url(result_url) and _url_looks_like_image(result_url):
return [result_url]
return []
def _normalized_image_url(base_url: str, url: str) -> str:
text = _decoded_url_reference(str(url).strip())
if not text or text.lower().startswith("data:"):
return ""
if _is_scheme_less_remote_image_url(text):
text = f"https://{text.lstrip('/')}"
normalized = urljoin(base_url, text)
return _unwrapped_image_url(normalized) or normalized
def _normalized_remote_image_url(url: str) -> str:
text = _decoded_url_reference(str(url).strip())
if not text or text.lower().startswith("data:"):
return ""
if _is_scheme_less_remote_image_url(text):
text = f"https://{text.lstrip('/')}"
return _unwrapped_image_url(text) or text
def _unwrapped_image_url(url: str) -> str:
if not _is_http_url(url):
return ""
parsed = urlparse(url)
strong_keys = {
"imgurl",
"imageurl",
"image_url",
"mediaurl",
"media_url",
"contenturl",
"content_url",
"photo",
"photo_url",
"src",
"source",
"image",
"img",
}
weak_keys = {"url", "u", "target", "redirect", "redirect_url"}
for key, value in parse_qsl(parsed.query, keep_blank_values=False):
key_text = key.lower().replace("-", "_")
candidate = _decoded_nested_url(value)
if not candidate:
continue
if not _is_http_url(candidate):
if candidate.startswith("//"):
candidate = f"https:{candidate}"
elif _is_scheme_less_remote_image_url(candidate):
candidate = f"https://{candidate.lstrip('/')}"
elif candidate.startswith("/") or _url_looks_like_image(candidate):
candidate = urljoin(url, candidate)
else:
continue
if key_text in strong_keys:
return candidate
if key_text in weak_keys and _url_looks_like_image(candidate):
return candidate
return ""
def _is_scheme_less_remote_image_url(value: str) -> bool:
text = str(value).strip().lstrip("/")
if not _url_looks_like_image(text):
return False
first_segment = text.split("/", 1)[0]
if first_segment in {".", ".."} or first_segment.startswith("."):
return False
return "." in first_segment and " " not in first_segment
def _decoded_url_reference(value: str) -> str:
raw = str(value).strip()
decoded = _decoded_nested_url(raw)
if decoded == raw:
return raw
if (
_is_http_url(decoded)
or decoded.startswith(("/", "//", "./", "../"))
or _is_scheme_less_remote_image_url(decoded)
or _url_looks_like_image(decoded)
):
return decoded
return raw

View file

@ -0,0 +1,27 @@
"""Pure text-normalization helpers shared by the SQLite store and its
extracted submodules. Extracted from sqlite_store.py; behavior unchanged.
"""
from __future__ import annotations
from typing import Any
def _text_list(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
return [item.strip() for item in str(value).split(",") if item.strip()]
def _unique_texts(values: Any) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
text = str(value).strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result

View file

@ -16,7 +16,8 @@ from rights_filter.analysis.search_result_promoter import SearchResultPromoter
from rights_filter.domain.records import Evidence, EvidenceSource from rights_filter.domain.records import Evidence, EvidenceSource
from rights_filter.integrations.naver_search import NaverSearchAdapter from rights_filter.integrations.naver_search import NaverSearchAdapter
from rights_filter.integrations.search_policy import SearchApiPolicy from rights_filter.integrations.search_policy import SearchApiPolicy
from rights_filter.server.sqlite_store import CopyrighterStore, _decode_javascript_string from rights_filter.server.sqlite_store import CopyrighterStore
from rights_filter.server.store_page_scrape import _decode_javascript_string
# --- #1 CRITICAL: constraint migration must not cascade-delete evidence ------- # --- #1 CRITICAL: constraint migration must not cascade-delete evidence -------