Files
life-echo/api/scripts/extract_sql_to_user_md.py

589 lines
22 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
One-off: read life-echo pg_dump COPY data, emit one markdown per user
(conversations + segments + stories / story_versions + memoir chapters).
Run from api/ (recommended, matches uv venv):
uv run python scripts/extract_sql_to_user_md.py
uv run python scripts/extract_sql_to_user_md.py ./tests/data/dump.sql
uv run python scripts/extract_sql_to_user_md.py --all ./tests/data/
uv run python scripts/extract_sql_to_user_md.py --only 15366015180 ./backups/dev.sql
Only users listed in EXPORT_USER_KEYS (below) are exported by default.
`--only` may be repeated; it overrides EXPORT_USER_KEYS for that run.
Use --all to export everyone. Use () for EXPORT_USER_KEYS to mean all.
Users are skipped only when they have no conversations, stories, or chapters in the dump.
Sessions that have no segments rows are omitted (no empty ### 会话” stubs).
If you pass a directory, the newest *.sql inside is used.
Default SQL path: <repo>/api/backups/life_echo_20260313_182756.sql
Output: api/tests/user_exports/<safe_name>_<uuid>.md
"""
from __future__ import annotations
import re
import sys
from collections import defaultdict
from pathlib import Path
# This file lives at <repo>/api/scripts/…
API_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SQL = API_ROOT / "backups/life_echo_20260313_182756.sql"
OUT_DIR = API_ROOT / "tests" / "user_exports"
# --- 编辑这里:只导出匹配任意一项的用户(完整 user id 或手机号,手机号可只写数字)---
# 设为空元组 () 表示不筛选、导出全部(与命令行 --all 相同效果)。
EXPORT_USER_KEYS: tuple[str, ...] = (
"1ade609c-567a-450b-b8fb-776aaba3c2b3",
"5e51151a-cb46-4c5c-ad5d-dae9c58ca243",
"e27fcd97-fefa-43b8-a7a3-3ecd49ebf5f0",
"15366015180",
)
def _digits_only(s: str) -> str:
return "".join(c for c in s if c.isdigit())
def user_matches_export_keys(u: dict, keys: tuple[str, ...]) -> bool:
"""Match full UUID (case-insensitive) or phone (exact, after stripping non-digits)."""
if not keys:
return True
uid = u["id"]
phone = u.get("phone") or ""
phone_d = _digits_only(phone)
for raw in keys:
key = raw.strip()
if not key:
continue
if key.lower() == uid.lower():
return True
if _digits_only(key) and _digits_only(key) == phone_d and phone_d:
return True
if key == phone:
return True
return False
def unescape_pg_text(s: str) -> str:
if s == r"\N":
return ""
return (
s.replace("\\n", "\n")
.replace("\\t", "\t")
.replace("\\r", "\r")
.replace("\\\\", "\\")
)
def extract_copy_block(text: str, table: str) -> list[str]:
"""Lines between COPY public.<table> ... FROM stdin; and \\."""
marker = f"COPY public.{table} "
start = text.find(marker)
if start == -1:
return []
nl = text.find("\n", start)
if nl == -1:
return []
body_start = nl + 1
end = text.find("\n\\.\n", body_start)
if end == -1:
end = text.find("\n\\.", body_start)
if end == -1:
return []
chunk = text[body_start:end]
return [ln for ln in chunk.split("\n") if ln.strip()]
def parse_users(lines: list[str]) -> dict[str, dict]:
users: dict[str, dict] = {}
for ln in lines:
parts = ln.split("\t")
if len(parts) < 14:
continue
uid, phone = parts[0], parts[1]
nickname = parts[5] if parts[5] != r"\N" else ""
email = parts[3] if parts[3] != r"\N" else ""
users[uid] = {
"id": uid,
"phone": phone,
"nickname": nickname,
"email": email,
}
return users
def parse_conversations(lines: list[str]) -> list[dict]:
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 9:
continue
out.append(
{
"id": parts[0],
"user_id": parts[1],
"started_at": parts[2],
"ended_at": parts[3] if parts[3] != r"\N" else "",
"duration_seconds": parts[4],
"summary": unescape_pg_text(parts[5]) if parts[5] != r"\N" else "",
"status": parts[6],
"current_topic": unescape_pg_text(parts[7])
if parts[7] != r"\N"
else "",
"conversation_stage": unescape_pg_text(parts[8])
if parts[8] != r"\N"
else "",
}
)
return out
def parse_segments(lines: list[str]) -> list[dict]:
"""Legacy dumps: 8 cols ending in agent_response. Current schema: 12 cols (see Segment model)."""
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 8:
continue
sid, cid, audio = parts[0], parts[1], parts[2]
if len(parts) >= 12:
transcript = unescape_pg_text("\t".join(parts[3:-8]))
created_at = parts[-7]
processed = parts[-6]
topic_category = parts[-5] if parts[-5] != r"\N" else ""
agent_response = unescape_pg_text(parts[-4]) if parts[-4] != r"\N" else ""
else:
agent_response = unescape_pg_text(parts[-1]) if parts[-1] != r"\N" else ""
topic_category = parts[-2] if parts[-2] != r"\N" else ""
processed = parts[-3]
created_at = parts[-4]
transcript = unescape_pg_text("\t".join(parts[3:-4]))
out.append(
{
"id": sid,
"conversation_id": cid,
"audio_url": audio if audio != r"\N" else "",
"transcript_text": transcript,
"created_at": created_at,
"processed": processed,
"topic_category": topic_category,
"agent_response": agent_response,
}
)
return out
def parse_stories(lines: list[str]) -> list[dict]:
"""COPY columns: id, user_id, title, stage, story_type, summary, canonical_markdown,
time_start..updated_at 17 fields; canonical_markdown may contain tabs.
"""
out: list[dict] = []
tail = 10 # time_start through updated_at
for ln in lines:
parts = ln.split("\t")
if len(parts) < 17:
continue
sid, uid, title, stage, story_type = (
parts[0],
parts[1],
parts[2],
parts[3] if parts[3] != r"\N" else "",
parts[4] if parts[4] != r"\N" else "",
)
summary = unescape_pg_text(parts[5]) if parts[5] != r"\N" else ""
canonical = unescape_pg_text("\t".join(parts[6:-tail]))
time_start = parts[-10] if parts[-10] != r"\N" else ""
time_end = parts[-9] if parts[-9] != r"\N" else ""
people_refs = parts[-8] if parts[-8] != r"\N" else ""
place_refs = parts[-7] if parts[-7] != r"\N" else ""
tag_refs = parts[-6] if parts[-6] != r"\N" else ""
status = parts[-5] if parts[-5] != r"\N" else ""
confidence = parts[-4] if parts[-4] != r"\N" else ""
current_version_id = parts[-3] if parts[-3] != r"\N" else ""
created_at, updated_at = parts[-2], parts[-1]
out.append(
{
"id": sid,
"user_id": uid,
"title": title,
"stage": stage,
"story_type": story_type,
"summary": summary,
"canonical_markdown": canonical,
"time_start": time_start,
"time_end": time_end,
"people_refs": people_refs,
"place_refs": place_refs,
"tag_refs": tag_refs,
"status": status,
"confidence": confidence,
"current_version_id": current_version_id,
"created_at": created_at,
"updated_at": updated_at,
}
)
return out
def parse_story_versions(lines: list[str]) -> list[dict]:
"""COPY: id, story_id, version_no, markdown_snapshot, change_summary, actor_type,
source_type, parent_version_id, prompt_meta, created_at markdown_snapshot may contain tabs.
"""
out: list[dict] = []
tail = 6 # change_summary .. created_at
for ln in lines:
parts = ln.split("\t")
if len(parts) < 10:
continue
vid, story_id = parts[0], parts[1]
version_no_raw = parts[2]
try:
version_no = int(version_no_raw)
except ValueError:
version_no = 0
markdown_snapshot = unescape_pg_text("\t".join(parts[3:-tail]))
change_summary = unescape_pg_text(parts[-6]) if parts[-6] != r"\N" else ""
actor_type = parts[-5] if parts[-5] != r"\N" else ""
source_type = parts[-4] if parts[-4] != r"\N" else ""
parent_version_id = parts[-3] if parts[-3] != r"\N" else ""
prompt_meta = parts[-2] if parts[-2] != r"\N" else ""
created_at = parts[-1]
out.append(
{
"id": vid,
"story_id": story_id,
"version_no": version_no,
"markdown_snapshot": markdown_snapshot,
"change_summary": change_summary,
"actor_type": actor_type,
"source_type": source_type,
"parent_version_id": parent_version_id,
"prompt_meta": prompt_meta,
"created_at": created_at,
}
)
return out
def parse_chapters(lines: list[str]) -> list[dict]:
"""12 columns; content may contain tabs — unpack from the right."""
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 12:
continue
is_active = parts[-1]
source_segments = parts[-2]
is_new = parts[-3]
category = parts[-4]
updated_at = parts[-5]
images = parts[-6]
status = parts[-7]
order_index = parts[-8]
cid, uid, title = parts[0], parts[1], parts[2]
content = unescape_pg_text("\t".join(parts[3:-8]))
out.append(
{
"id": cid,
"user_id": uid,
"title": title,
"content": content,
"order_index": int(order_index) if order_index.isdigit() else 0,
"status": status,
"images": images,
"updated_at": updated_at,
"category": category if category != r"\N" else "",
"is_new": is_new,
"source_segments": source_segments,
"is_active": is_active,
}
)
return out
def safe_filename(s: str) -> str:
s = re.sub(r"[^\w\u4e00-\u9fff.-]+", "_", s.strip())
s = s.strip("_") or "user"
return s[:80]
def resolve_sql_arg(raw: str | None) -> Path:
if raw is None:
return DEFAULT_SQL
p = Path(raw).expanduser()
if not p.is_absolute():
p = Path.cwd() / p
p = p.resolve()
if p.is_dir():
candidates = sorted(
p.glob("*.sql"), key=lambda x: x.stat().st_mtime, reverse=True
)
if not candidates:
sys.exit(f"No *.sql files in directory: {p}")
return candidates[0]
return p
def _parse_cli_args(argv: list[str]) -> tuple[Path | None, bool, tuple[str, ...]]:
"""Returns (sql_path_arg or None, export_all, only_keys or ())."""
export_all = False
only_keys: list[str] = []
positional: list[str] = []
i = 0
while i < len(argv):
a = argv[i]
if a == "--all":
export_all = True
i += 1
elif a == "--only":
if i + 1 >= len(argv):
sys.exit("--only requires a value (user id or phone)")
only_keys.append(argv[i + 1])
i += 2
else:
positional.append(a)
i += 1
if len(positional) > 1:
sys.exit("Extra arguments: pass at most one SQL path or directory.")
sql_arg = positional[0] if positional else None
return sql_arg, export_all, tuple(only_keys)
def main() -> None:
sql_arg, export_all_flag, only_keys_cli = _parse_cli_args(sys.argv[1:])
sql_path = resolve_sql_arg(sql_arg)
if not sql_path.is_file():
print(f"Missing SQL file: {sql_path}")
sys.exit(1)
text = sql_path.read_text(encoding="utf-8", errors="replace")
users = parse_users(extract_copy_block(text, "users"))
conversations = parse_conversations(extract_copy_block(text, "conversations"))
segments = parse_segments(extract_copy_block(text, "segments"))
stories = parse_stories(extract_copy_block(text, "stories"))
story_versions = parse_story_versions(extract_copy_block(text, "story_versions"))
chapters = parse_chapters(extract_copy_block(text, "chapters"))
conv_by_user: dict[str, list[dict]] = defaultdict(list)
for c in conversations:
conv_by_user[c["user_id"]].append(c)
seg_by_conv: dict[str, list[dict]] = defaultdict(list)
for s in segments:
seg_by_conv[s["conversation_id"]].append(s)
stories_by_user: dict[str, list[dict]] = defaultdict(list)
for st in stories:
stories_by_user[st["user_id"]].append(st)
versions_by_story: dict[str, list[dict]] = defaultdict(list)
for v in story_versions:
versions_by_story[v["story_id"]].append(v)
chap_by_user: dict[str, list[dict]] = defaultdict(list)
for ch in chapters:
chap_by_user[ch["user_id"]].append(ch)
if export_all_flag and only_keys_cli:
sys.exit("Cannot combine --all with --only")
if only_keys_cli:
active_keys = only_keys_cli
else:
active_keys = (
() if (export_all_flag or not EXPORT_USER_KEYS) else EXPORT_USER_KEYS
)
users_to_write = {
uid: u for uid, u in users.items() if user_matches_export_keys(u, active_keys)
}
if active_keys and not users_to_write:
sample = ", ".join(
f"{u['phone']}/{u['id'][:8]}" for u in list(users.values())[:5]
)
sys.exit(
"No users matched EXPORT_USER_KEYS. "
f"Keys={active_keys!r}. Sample dump users: {sample or '(none)'}"
)
def _user_has_exportable_content(uid: str) -> bool:
return bool(conv_by_user[uid] or stories_by_user[uid] or chap_by_user[uid])
users_with_content = {
uid: u for uid, u in users_to_write.items() if _user_has_exportable_content(uid)
}
skipped_empty = len(users_to_write) - len(users_with_content)
if skipped_empty:
print(
f"Skipped {skipped_empty} user(s) with no conversations, stories, or chapters"
)
OUT_DIR.mkdir(parents=True, exist_ok=True)
if active_keys:
print(
f"Filter: {len(active_keys)} key(s) -> {len(users_to_write)} user(s) "
f"-> {len(users_with_content)} with exportable content"
)
else:
print(f"Export all: {len(users_with_content)} user(s) with exportable content")
for uid, u in sorted(
users_with_content.items(), key=lambda x: x[1].get("phone", "")
):
label = u["nickname"] or u["phone"] or uid[:8]
fname = f"{safe_filename(label)}_{uid}.md"
path = OUT_DIR / fname
lines_out: list[str] = []
lines_out.append(f"# 用户导出: {label}")
lines_out.append("")
lines_out.append(f"- **User ID:** `{uid}`")
lines_out.append(f"- **Phone:** {u['phone']}")
if u.get("email"):
lines_out.append(f"- **Email:** {u['email']}")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 对话记录(用户 + AI")
lines_out.append("")
user_convs = sorted(conv_by_user[uid], key=lambda c: c["started_at"])
if not user_convs:
lines_out.append("dump 中该用户无 conversations")
lines_out.append("")
else:
convs_with_segs = [cv for cv in user_convs if seg_by_conv[cv["id"]]]
if not convs_with_segs:
lines_out.append("(有会话但无 segments 轮次,或尚未落库)")
lines_out.append("")
else:
for cv in convs_with_segs:
segs = sorted(seg_by_conv[cv["id"]], key=lambda s: s["created_at"])
lines_out.append(f"### 会话 `{cv['id']}`")
lines_out.append("")
lines_out.append(f"- 开始: {cv['started_at']}")
if cv.get("conversation_stage"):
lines_out.append(f"- 阶段: {cv['conversation_stage']}")
lines_out.append("")
for i, seg in enumerate(segs, 1):
lines_out.append(f"#### 轮次 {i}{seg['created_at']}")
lines_out.append("")
if seg.get("audio_url"):
lines_out.append(f"- **音频:** `{seg['audio_url']}`")
lines_out.append("")
lines_out.append("**用户:**")
lines_out.append("")
lines_out.append(seg["transcript_text"] or "(空)")
lines_out.append("")
lines_out.append("**AI:**")
lines_out.append("")
lines_out.append(seg["agent_response"] or "(无回复)")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 人生故事stories")
lines_out.append("")
user_stories = sorted(
stories_by_user[uid],
key=lambda s: (s["updated_at"], s["created_at"], s["title"]),
)
if not user_stories:
lines_out.append("(无 story 行;表未进 dump 或用户暂无数据)")
lines_out.append("")
else:
for st in user_stories:
lines_out.append(f"### {st['title']}")
lines_out.append("")
meta_bits = [
f"`{st['id']}`",
f"status={st['status'] or ''}",
]
if st.get("stage"):
meta_bits.append(f"stage={st['stage']}")
if st.get("story_type"):
meta_bits.append(f"type={st['story_type']}")
lines_out.append("- " + " | ".join(meta_bits))
if st.get("time_start") or st.get("time_end"):
lines_out.append(
f"- **时间:** {st['time_start'] or ''}{st['time_end'] or ''}"
)
if st.get("summary"):
lines_out.append("")
lines_out.append("**摘要:**")
lines_out.append("")
lines_out.append(st["summary"])
if st.get("people_refs") or st.get("place_refs") or st.get("tag_refs"):
lines_out.append("")
if st.get("people_refs"):
lines_out.append(f"- people_refs: `{st['people_refs'][:200]}`")
if st.get("place_refs"):
lines_out.append(f"- place_refs: `{st['place_refs'][:200]}`")
if st.get("tag_refs"):
lines_out.append(f"- tag_refs: `{st['tag_refs'][:200]}`")
lines_out.append("")
lines_out.append("**当前正文canonical_markdown:**")
lines_out.append("")
lines_out.append(st["canonical_markdown"] or "(空)")
lines_out.append("")
vers = sorted(
versions_by_story[st["id"]], key=lambda v: v["version_no"]
)
if vers:
lines_out.append("#### 版本历史story_versions")
lines_out.append("")
for v in vers:
lines_out.append(
f"- **v{v['version_no']}** `{v['id']}` @ {v['created_at']}"
+ (
f"{v['actor_type']}/{v['source_type']}"
if v["actor_type"] or v["source_type"]
else ""
)
)
if v.get("change_summary"):
lines_out.append(f" - 变更说明: {v['change_summary']}")
lines_out.append("")
lines_out.append(v["markdown_snapshot"] or "(空快照)")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 回忆录章节(生成正文)")
lines_out.append("")
user_chapters = sorted(
chap_by_user[uid], key=lambda c: (c["order_index"], c["updated_at"])
)
if not user_chapters:
lines_out.append("(无章节)")
lines_out.append("")
else:
for ch in user_chapters:
lines_out.append(f"### [{ch['order_index']}] {ch['title']}")
lines_out.append("")
lines_out.append(
f"- **ID:** `{ch['id']}` | **状态:** {ch['status']} | **分类:** {ch['category'] or ''} | **updated:** {ch['updated_at']}"
)
lines_out.append("")
lines_out.append(ch["content"])
lines_out.append("")
lines_out.append("---")
lines_out.append("")
path.write_text("\n".join(lines_out), encoding="utf-8")
print(f"Wrote {path}")
print(f"Done. {len(users_with_content)} users -> {OUT_DIR} (source: {sql_path})")
if __name__ == "__main__":
main()