Files
life-echo/api/app/features/memoir/asset_resolver.py
Kevin 8af37e5e8e 修复:CI 部署环境与 ref 错配、迁移碎片化、图片意图 source_span、章节物化脏版式、会话历史与本地语音不一致
新增:TTS 上传 COS 与分片、章节 reading_segments 物化与快照、markdown 清洗、会话消息 repository、语音 store 重构与相关测试
2026-03-20 16:43:02 +08:00

111 lines
3.3 KiB
Python

"""
asset:// 与旧占位符清理。
迁移与渲染共用:从正文移除 {{IMAGE:...}} / {{{{IMAGE:...}}}}。
"""
import re
from typing import Callable
_PLACEHOLDER_RE = re.compile(
r"\{\{\{\{IMAGE:(.*?)\}\}\}\}|\{\{IMAGE:(.*?)\}\}",
re.DOTALL,
)
_ASSET_REF_RE = re.compile(r"!\[([^\]]*)\]\(asset://([a-zA-Z0-9_-]+)\)")
def strip_legacy_image_placeholders(text: str | None) -> str:
"""移除正文中的旧 IMAGE 占位符,保留其余 markdown。"""
if not text:
return ""
return _PLACEHOLDER_RE.sub("", text).strip()
def parse_asset_refs(markdown: str) -> list[tuple[int, int, str, str]]:
refs = []
for m in _ASSET_REF_RE.finditer(markdown or ""):
refs.append((m.start(), m.end(), m.group(1) or "", m.group(2) or ""))
return refs
def collect_asset_ids_from_markdown(markdown: str) -> list[str]:
return [m.group(2) for m in _ASSET_REF_RE.finditer(markdown or "") if m.group(2)]
def collect_asset_ids_for_chapter(chapter) -> set[str]:
"""章节正文 canonical、收录的各 story 正文、cover_asset_id 中的 asset id。"""
ids: set[str] = set()
md = getattr(chapter, "canonical_markdown", None) or ""
ids.update(collect_asset_ids_from_markdown(md))
cid = getattr(chapter, "cover_asset_id", None)
if cid:
ids.add(str(cid))
for link in getattr(chapter, "story_links", None) or []:
st = getattr(link, "story", None)
if st is None:
continue
smd = getattr(st, "canonical_markdown", None) or ""
ids.update(collect_asset_ids_from_markdown(smd))
for intent in getattr(st, "image_intents", None) or []:
if getattr(intent, "intent_role", None) == "primary":
aid = getattr(intent, "asset_id", None)
if aid:
ids.add(str(aid))
return ids
def collect_asset_ids_for_chapters(chapters: list) -> set[str]:
combined: set[str] = set()
for ch in chapters or []:
combined |= collect_asset_ids_for_chapter(ch)
return combined
def split_markdown_by_asset_refs(
markdown: str,
resolve_asset: Callable[[str], str | None],
) -> list[dict]:
blocks: list[dict] = []
refs = parse_asset_refs(markdown or "")
if not refs:
text = (markdown or "").strip()
if text:
blocks.append({"type": "text", "value": text})
return blocks
pos = 0
for start, end, caption, asset_id in refs:
if start > pos:
text = markdown[pos:start].strip()
if text:
blocks.append({"type": "text", "value": text})
url = resolve_asset(asset_id) if asset_id else None
if url:
blocks.append({"type": "image", "url": url, "caption": caption})
pos = end
if pos < len(markdown or ""):
text = markdown[pos:].strip()
if text:
blocks.append({"type": "text", "value": text})
return blocks
def resolve_asset_refs_in_markdown(
markdown: str,
resolve_asset: Callable[[str], str | None],
) -> str:
if not markdown or not resolve_asset:
return markdown or ""
def repl(m):
caption, asset_id = m.group(1) or "", m.group(2) or ""
url = resolve_asset(asset_id) if asset_id else None
if url:
return f"![{caption}]({url})"
return m.group(0)
return _ASSET_REF_RE.sub(repl, markdown)