#!/usr/bin/env python3 """ One-off: read life-echo pg_dump COPY data, emit one markdown per user (conversations + segments + stories / story_versions + memoir chapters). Run from api/ (recommended, matches uv venv): uv run python scripts/extract_sql_to_user_md.py uv run python scripts/extract_sql_to_user_md.py ./tests/data/dump.sql uv run python scripts/extract_sql_to_user_md.py --all ./tests/data/ uv run python scripts/extract_sql_to_user_md.py --only 15366015180 ./backups/dev.sql Only users listed in EXPORT_USER_KEYS (below) are exported by default. `--only` may be repeated; it overrides EXPORT_USER_KEYS for that run. Use --all to export everyone. Use () for EXPORT_USER_KEYS to mean “all”. Users are skipped only when they have no conversations, stories, or chapters in the dump. Sessions that have no segments rows are omitted (no empty “### 会话” stubs). If you pass a directory, the newest *.sql inside is used. Default SQL path: /api/backups/life_echo_20260313_182756.sql Output: api/tests/user_exports/_.md """ from __future__ import annotations import re import sys from collections import defaultdict from pathlib import Path # This file lives at /api/scripts/… API_ROOT = Path(__file__).resolve().parents[1] DEFAULT_SQL = API_ROOT / "backups/life_echo_20260313_182756.sql" OUT_DIR = API_ROOT / "tests" / "user_exports" # --- 编辑这里:只导出匹配任意一项的用户(完整 user id 或手机号,手机号可只写数字)--- # 设为空元组 () 表示不筛选、导出全部(与命令行 --all 相同效果)。 EXPORT_USER_KEYS: tuple[str, ...] = ( "1ade609c-567a-450b-b8fb-776aaba3c2b3", "5e51151a-cb46-4c5c-ad5d-dae9c58ca243", "e27fcd97-fefa-43b8-a7a3-3ecd49ebf5f0", "15366015180", ) def _digits_only(s: str) -> str: return "".join(c for c in s if c.isdigit()) def user_matches_export_keys(u: dict, keys: tuple[str, ...]) -> bool: """Match full UUID (case-insensitive) or phone (exact, after stripping non-digits).""" if not keys: return True uid = u["id"] phone = u.get("phone") or "" phone_d = _digits_only(phone) for raw in keys: key = raw.strip() if not key: continue if key.lower() == uid.lower(): return True if _digits_only(key) and _digits_only(key) == phone_d and phone_d: return True if key == phone: return True return False def unescape_pg_text(s: str) -> str: if s == r"\N": return "" return ( s.replace("\\n", "\n") .replace("\\t", "\t") .replace("\\r", "\r") .replace("\\\\", "\\") ) def extract_copy_block(text: str, table: str) -> list[str]: """Lines between COPY public. ... FROM stdin; and \\.""" marker = f"COPY public.{table} " start = text.find(marker) if start == -1: return [] nl = text.find("\n", start) if nl == -1: return [] body_start = nl + 1 end = text.find("\n\\.\n", body_start) if end == -1: end = text.find("\n\\.", body_start) if end == -1: return [] chunk = text[body_start:end] return [ln for ln in chunk.split("\n") if ln.strip()] def parse_users(lines: list[str]) -> dict[str, dict]: users: dict[str, dict] = {} for ln in lines: parts = ln.split("\t") if len(parts) < 14: continue uid, phone = parts[0], parts[1] nickname = parts[5] if parts[5] != r"\N" else "" email = parts[3] if parts[3] != r"\N" else "" users[uid] = { "id": uid, "phone": phone, "nickname": nickname, "email": email, } return users def parse_conversations(lines: list[str]) -> list[dict]: out: list[dict] = [] for ln in lines: parts = ln.split("\t") if len(parts) < 9: continue out.append( { "id": parts[0], "user_id": parts[1], "started_at": parts[2], "ended_at": parts[3] if parts[3] != r"\N" else "", "duration_seconds": parts[4], "summary": unescape_pg_text(parts[5]) if parts[5] != r"\N" else "", "status": parts[6], "current_topic": unescape_pg_text(parts[7]) if parts[7] != r"\N" else "", "conversation_stage": unescape_pg_text(parts[8]) if parts[8] != r"\N" else "", } ) return out def parse_segments(lines: list[str]) -> list[dict]: """Legacy dumps: 8 cols ending in agent_response. Current schema: 12 cols (see Segment model).""" out: list[dict] = [] for ln in lines: parts = ln.split("\t") if len(parts) < 8: continue sid, cid, audio = parts[0], parts[1], parts[2] if len(parts) >= 12: transcript = unescape_pg_text("\t".join(parts[3:-8])) created_at = parts[-7] processed = parts[-6] topic_category = parts[-5] if parts[-5] != r"\N" else "" agent_response = unescape_pg_text(parts[-4]) if parts[-4] != r"\N" else "" else: agent_response = unescape_pg_text(parts[-1]) if parts[-1] != r"\N" else "" topic_category = parts[-2] if parts[-2] != r"\N" else "" processed = parts[-3] created_at = parts[-4] transcript = unescape_pg_text("\t".join(parts[3:-4])) out.append( { "id": sid, "conversation_id": cid, "audio_url": audio if audio != r"\N" else "", "transcript_text": transcript, "created_at": created_at, "processed": processed, "topic_category": topic_category, "agent_response": agent_response, } ) return out def parse_stories(lines: list[str]) -> list[dict]: """COPY columns: id, user_id, title, stage, story_type, summary, canonical_markdown, time_start..updated_at — 17 fields; canonical_markdown may contain tabs. """ out: list[dict] = [] tail = 10 # time_start through updated_at for ln in lines: parts = ln.split("\t") if len(parts) < 17: continue sid, uid, title, stage, story_type = ( parts[0], parts[1], parts[2], parts[3] if parts[3] != r"\N" else "", parts[4] if parts[4] != r"\N" else "", ) summary = unescape_pg_text(parts[5]) if parts[5] != r"\N" else "" canonical = unescape_pg_text("\t".join(parts[6:-tail])) time_start = parts[-10] if parts[-10] != r"\N" else "" time_end = parts[-9] if parts[-9] != r"\N" else "" people_refs = parts[-8] if parts[-8] != r"\N" else "" place_refs = parts[-7] if parts[-7] != r"\N" else "" tag_refs = parts[-6] if parts[-6] != r"\N" else "" status = parts[-5] if parts[-5] != r"\N" else "" confidence = parts[-4] if parts[-4] != r"\N" else "" current_version_id = parts[-3] if parts[-3] != r"\N" else "" created_at, updated_at = parts[-2], parts[-1] out.append( { "id": sid, "user_id": uid, "title": title, "stage": stage, "story_type": story_type, "summary": summary, "canonical_markdown": canonical, "time_start": time_start, "time_end": time_end, "people_refs": people_refs, "place_refs": place_refs, "tag_refs": tag_refs, "status": status, "confidence": confidence, "current_version_id": current_version_id, "created_at": created_at, "updated_at": updated_at, } ) return out def parse_story_versions(lines: list[str]) -> list[dict]: """COPY: id, story_id, version_no, markdown_snapshot, change_summary, actor_type, source_type, parent_version_id, prompt_meta, created_at — markdown_snapshot may contain tabs. """ out: list[dict] = [] tail = 6 # change_summary .. created_at for ln in lines: parts = ln.split("\t") if len(parts) < 10: continue vid, story_id = parts[0], parts[1] version_no_raw = parts[2] try: version_no = int(version_no_raw) except ValueError: version_no = 0 markdown_snapshot = unescape_pg_text("\t".join(parts[3:-tail])) change_summary = unescape_pg_text(parts[-6]) if parts[-6] != r"\N" else "" actor_type = parts[-5] if parts[-5] != r"\N" else "" source_type = parts[-4] if parts[-4] != r"\N" else "" parent_version_id = parts[-3] if parts[-3] != r"\N" else "" prompt_meta = parts[-2] if parts[-2] != r"\N" else "" created_at = parts[-1] out.append( { "id": vid, "story_id": story_id, "version_no": version_no, "markdown_snapshot": markdown_snapshot, "change_summary": change_summary, "actor_type": actor_type, "source_type": source_type, "parent_version_id": parent_version_id, "prompt_meta": prompt_meta, "created_at": created_at, } ) return out def parse_chapters(lines: list[str]) -> list[dict]: """12 columns; content may contain tabs — unpack from the right.""" out: list[dict] = [] for ln in lines: parts = ln.split("\t") if len(parts) < 12: continue is_active = parts[-1] source_segments = parts[-2] is_new = parts[-3] category = parts[-4] updated_at = parts[-5] images = parts[-6] status = parts[-7] order_index = parts[-8] cid, uid, title = parts[0], parts[1], parts[2] content = unescape_pg_text("\t".join(parts[3:-8])) out.append( { "id": cid, "user_id": uid, "title": title, "content": content, "order_index": int(order_index) if order_index.isdigit() else 0, "status": status, "images": images, "updated_at": updated_at, "category": category if category != r"\N" else "", "is_new": is_new, "source_segments": source_segments, "is_active": is_active, } ) return out def safe_filename(s: str) -> str: s = re.sub(r"[^\w\u4e00-\u9fff.-]+", "_", s.strip()) s = s.strip("_") or "user" return s[:80] def resolve_sql_arg(raw: str | None) -> Path: if raw is None: return DEFAULT_SQL p = Path(raw).expanduser() if not p.is_absolute(): p = Path.cwd() / p p = p.resolve() if p.is_dir(): candidates = sorted( p.glob("*.sql"), key=lambda x: x.stat().st_mtime, reverse=True ) if not candidates: sys.exit(f"No *.sql files in directory: {p}") return candidates[0] return p def _parse_cli_args(argv: list[str]) -> tuple[Path | None, bool, tuple[str, ...]]: """Returns (sql_path_arg or None, export_all, only_keys or ()).""" export_all = False only_keys: list[str] = [] positional: list[str] = [] i = 0 while i < len(argv): a = argv[i] if a == "--all": export_all = True i += 1 elif a == "--only": if i + 1 >= len(argv): sys.exit("--only requires a value (user id or phone)") only_keys.append(argv[i + 1]) i += 2 else: positional.append(a) i += 1 if len(positional) > 1: sys.exit("Extra arguments: pass at most one SQL path or directory.") sql_arg = positional[0] if positional else None return sql_arg, export_all, tuple(only_keys) def main() -> None: sql_arg, export_all_flag, only_keys_cli = _parse_cli_args(sys.argv[1:]) sql_path = resolve_sql_arg(sql_arg) if not sql_path.is_file(): print(f"Missing SQL file: {sql_path}") sys.exit(1) text = sql_path.read_text(encoding="utf-8", errors="replace") users = parse_users(extract_copy_block(text, "users")) conversations = parse_conversations(extract_copy_block(text, "conversations")) segments = parse_segments(extract_copy_block(text, "segments")) stories = parse_stories(extract_copy_block(text, "stories")) story_versions = parse_story_versions(extract_copy_block(text, "story_versions")) chapters = parse_chapters(extract_copy_block(text, "chapters")) conv_by_user: dict[str, list[dict]] = defaultdict(list) for c in conversations: conv_by_user[c["user_id"]].append(c) seg_by_conv: dict[str, list[dict]] = defaultdict(list) for s in segments: seg_by_conv[s["conversation_id"]].append(s) stories_by_user: dict[str, list[dict]] = defaultdict(list) for st in stories: stories_by_user[st["user_id"]].append(st) versions_by_story: dict[str, list[dict]] = defaultdict(list) for v in story_versions: versions_by_story[v["story_id"]].append(v) chap_by_user: dict[str, list[dict]] = defaultdict(list) for ch in chapters: chap_by_user[ch["user_id"]].append(ch) if export_all_flag and only_keys_cli: sys.exit("Cannot combine --all with --only") if only_keys_cli: active_keys = only_keys_cli else: active_keys = ( () if (export_all_flag or not EXPORT_USER_KEYS) else EXPORT_USER_KEYS ) users_to_write = { uid: u for uid, u in users.items() if user_matches_export_keys(u, active_keys) } if active_keys and not users_to_write: sample = ", ".join( f"{u['phone']}/{u['id'][:8]}…" for u in list(users.values())[:5] ) sys.exit( "No users matched EXPORT_USER_KEYS. " f"Keys={active_keys!r}. Sample dump users: {sample or '(none)'}" ) def _user_has_exportable_content(uid: str) -> bool: return bool(conv_by_user[uid] or stories_by_user[uid] or chap_by_user[uid]) users_with_content = { uid: u for uid, u in users_to_write.items() if _user_has_exportable_content(uid) } skipped_empty = len(users_to_write) - len(users_with_content) if skipped_empty: print( f"Skipped {skipped_empty} user(s) with no conversations, stories, or chapters" ) OUT_DIR.mkdir(parents=True, exist_ok=True) if active_keys: print( f"Filter: {len(active_keys)} key(s) -> {len(users_to_write)} user(s) " f"-> {len(users_with_content)} with exportable content" ) else: print(f"Export all: {len(users_with_content)} user(s) with exportable content") for uid, u in sorted( users_with_content.items(), key=lambda x: x[1].get("phone", "") ): label = u["nickname"] or u["phone"] or uid[:8] fname = f"{safe_filename(label)}_{uid}.md" path = OUT_DIR / fname lines_out: list[str] = [] lines_out.append(f"# 用户导出: {label}") lines_out.append("") lines_out.append(f"- **User ID:** `{uid}`") lines_out.append(f"- **Phone:** {u['phone']}") if u.get("email"): lines_out.append(f"- **Email:** {u['email']}") lines_out.append("") lines_out.append("---") lines_out.append("") lines_out.append("## 对话记录(用户 + AI)") lines_out.append("") user_convs = sorted(conv_by_user[uid], key=lambda c: c["started_at"]) if not user_convs: lines_out.append("(dump 中该用户无 conversations)") lines_out.append("") else: convs_with_segs = [cv for cv in user_convs if seg_by_conv[cv["id"]]] if not convs_with_segs: lines_out.append("(有会话但无 segments 轮次,或尚未落库)") lines_out.append("") else: for cv in convs_with_segs: segs = sorted(seg_by_conv[cv["id"]], key=lambda s: s["created_at"]) lines_out.append(f"### 会话 `{cv['id']}`") lines_out.append("") lines_out.append(f"- 开始: {cv['started_at']}") if cv.get("conversation_stage"): lines_out.append(f"- 阶段: {cv['conversation_stage']}") lines_out.append("") for i, seg in enumerate(segs, 1): lines_out.append(f"#### 轮次 {i} — {seg['created_at']}") lines_out.append("") if seg.get("audio_url"): lines_out.append(f"- **音频:** `{seg['audio_url']}`") lines_out.append("") lines_out.append("**用户:**") lines_out.append("") lines_out.append(seg["transcript_text"] or "(空)") lines_out.append("") lines_out.append("**AI:**") lines_out.append("") lines_out.append(seg["agent_response"] or "(无回复)") lines_out.append("") lines_out.append("---") lines_out.append("") lines_out.append("## 人生故事(stories)") lines_out.append("") user_stories = sorted( stories_by_user[uid], key=lambda s: (s["updated_at"], s["created_at"], s["title"]), ) if not user_stories: lines_out.append("(无 story 行;表未进 dump 或用户暂无数据)") lines_out.append("") else: for st in user_stories: lines_out.append(f"### {st['title']}") lines_out.append("") meta_bits = [ f"`{st['id']}`", f"status={st['status'] or '—'}", ] if st.get("stage"): meta_bits.append(f"stage={st['stage']}") if st.get("story_type"): meta_bits.append(f"type={st['story_type']}") lines_out.append("- " + " | ".join(meta_bits)) if st.get("time_start") or st.get("time_end"): lines_out.append( f"- **时间:** {st['time_start'] or '—'} → {st['time_end'] or '—'}" ) if st.get("summary"): lines_out.append("") lines_out.append("**摘要:**") lines_out.append("") lines_out.append(st["summary"]) if st.get("people_refs") or st.get("place_refs") or st.get("tag_refs"): lines_out.append("") if st.get("people_refs"): lines_out.append(f"- people_refs: `{st['people_refs'][:200]}`") if st.get("place_refs"): lines_out.append(f"- place_refs: `{st['place_refs'][:200]}`") if st.get("tag_refs"): lines_out.append(f"- tag_refs: `{st['tag_refs'][:200]}`") lines_out.append("") lines_out.append("**当前正文(canonical_markdown):**") lines_out.append("") lines_out.append(st["canonical_markdown"] or "(空)") lines_out.append("") vers = sorted( versions_by_story[st["id"]], key=lambda v: v["version_no"] ) if vers: lines_out.append("#### 版本历史(story_versions)") lines_out.append("") for v in vers: lines_out.append( f"- **v{v['version_no']}** `{v['id']}` @ {v['created_at']}" + ( f" — {v['actor_type']}/{v['source_type']}" if v["actor_type"] or v["source_type"] else "" ) ) if v.get("change_summary"): lines_out.append(f" - 变更说明: {v['change_summary']}") lines_out.append("") lines_out.append(v["markdown_snapshot"] or "(空快照)") lines_out.append("") lines_out.append("---") lines_out.append("") lines_out.append("## 回忆录章节(生成正文)") lines_out.append("") user_chapters = sorted( chap_by_user[uid], key=lambda c: (c["order_index"], c["updated_at"]) ) if not user_chapters: lines_out.append("(无章节)") lines_out.append("") else: for ch in user_chapters: lines_out.append(f"### [{ch['order_index']}] {ch['title']}") lines_out.append("") lines_out.append( f"- **ID:** `{ch['id']}` | **状态:** {ch['status']} | **分类:** {ch['category'] or '—'} | **updated:** {ch['updated_at']}" ) lines_out.append("") lines_out.append(ch["content"]) lines_out.append("") lines_out.append("---") lines_out.append("") path.write_text("\n".join(lines_out), encoding="utf-8") print(f"Wrote {path}") print(f"Done. {len(users_with_content)} users -> {OUT_DIR} (source: {sql_path})") if __name__ == "__main__": main()