Files
life-echo/api/scripts/extract_sql_to_user_md.py

589 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
One-off: read life-echo pg_dump COPY data, emit one markdown per user
(conversations + segments + stories / story_versions + memoir chapters).
Run from api/ (recommended, matches uv venv):
uv run python scripts/extract_sql_to_user_md.py
uv run python scripts/extract_sql_to_user_md.py ./tests/data/dump.sql
uv run python scripts/extract_sql_to_user_md.py --all ./tests/data/
uv run python scripts/extract_sql_to_user_md.py --only 15366015180 ./backups/dev.sql
Only users listed in EXPORT_USER_KEYS (below) are exported by default.
`--only` may be repeated; it overrides EXPORT_USER_KEYS for that run.
Use --all to export everyone. Use () for EXPORT_USER_KEYS to mean “all”.
Users are skipped only when they have no conversations, stories, or chapters in the dump.
Sessions that have no segments rows are omitted (no empty “### 会话” stubs).
If you pass a directory, the newest *.sql inside is used.
Default SQL path: <repo>/api/backups/life_echo_20260313_182756.sql
Output: api/tests/user_exports/<safe_name>_<uuid>.md
"""
from __future__ import annotations
import re
import sys
from collections import defaultdict
from pathlib import Path
# This file lives at <repo>/api/scripts/…
API_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SQL = API_ROOT / "backups/life_echo_20260313_182756.sql"
OUT_DIR = API_ROOT / "tests" / "user_exports"
# --- 编辑这里:只导出匹配任意一项的用户(完整 user id 或手机号,手机号可只写数字)---
# 设为空元组 () 表示不筛选、导出全部(与命令行 --all 相同效果)。
EXPORT_USER_KEYS: tuple[str, ...] = (
"1ade609c-567a-450b-b8fb-776aaba3c2b3",
"5e51151a-cb46-4c5c-ad5d-dae9c58ca243",
"e27fcd97-fefa-43b8-a7a3-3ecd49ebf5f0",
"15366015180",
)
def _digits_only(s: str) -> str:
return "".join(c for c in s if c.isdigit())
def user_matches_export_keys(u: dict, keys: tuple[str, ...]) -> bool:
"""Match full UUID (case-insensitive) or phone (exact, after stripping non-digits)."""
if not keys:
return True
uid = u["id"]
phone = u.get("phone") or ""
phone_d = _digits_only(phone)
for raw in keys:
key = raw.strip()
if not key:
continue
if key.lower() == uid.lower():
return True
if _digits_only(key) and _digits_only(key) == phone_d and phone_d:
return True
if key == phone:
return True
return False
def unescape_pg_text(s: str) -> str:
if s == r"\N":
return ""
return (
s.replace("\\n", "\n")
.replace("\\t", "\t")
.replace("\\r", "\r")
.replace("\\\\", "\\")
)
def extract_copy_block(text: str, table: str) -> list[str]:
"""Lines between COPY public.<table> ... FROM stdin; and \\."""
marker = f"COPY public.{table} "
start = text.find(marker)
if start == -1:
return []
nl = text.find("\n", start)
if nl == -1:
return []
body_start = nl + 1
end = text.find("\n\\.\n", body_start)
if end == -1:
end = text.find("\n\\.", body_start)
if end == -1:
return []
chunk = text[body_start:end]
return [ln for ln in chunk.split("\n") if ln.strip()]
def parse_users(lines: list[str]) -> dict[str, dict]:
users: dict[str, dict] = {}
for ln in lines:
parts = ln.split("\t")
if len(parts) < 14:
continue
uid, phone = parts[0], parts[1]
nickname = parts[5] if parts[5] != r"\N" else ""
email = parts[3] if parts[3] != r"\N" else ""
users[uid] = {
"id": uid,
"phone": phone,
"nickname": nickname,
"email": email,
}
return users
def parse_conversations(lines: list[str]) -> list[dict]:
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 9:
continue
out.append(
{
"id": parts[0],
"user_id": parts[1],
"started_at": parts[2],
"ended_at": parts[3] if parts[3] != r"\N" else "",
"duration_seconds": parts[4],
"summary": unescape_pg_text(parts[5]) if parts[5] != r"\N" else "",
"status": parts[6],
"current_topic": unescape_pg_text(parts[7])
if parts[7] != r"\N"
else "",
"conversation_stage": unescape_pg_text(parts[8])
if parts[8] != r"\N"
else "",
}
)
return out
def parse_segments(lines: list[str]) -> list[dict]:
"""Legacy dumps: 8 cols ending in agent_response. Current schema: 12 cols (see Segment model)."""
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 8:
continue
sid, cid, audio = parts[0], parts[1], parts[2]
if len(parts) >= 12:
transcript = unescape_pg_text("\t".join(parts[3:-8]))
created_at = parts[-7]
processed = parts[-6]
topic_category = parts[-5] if parts[-5] != r"\N" else ""
agent_response = unescape_pg_text(parts[-4]) if parts[-4] != r"\N" else ""
else:
agent_response = unescape_pg_text(parts[-1]) if parts[-1] != r"\N" else ""
topic_category = parts[-2] if parts[-2] != r"\N" else ""
processed = parts[-3]
created_at = parts[-4]
transcript = unescape_pg_text("\t".join(parts[3:-4]))
out.append(
{
"id": sid,
"conversation_id": cid,
"audio_url": audio if audio != r"\N" else "",
"transcript_text": transcript,
"created_at": created_at,
"processed": processed,
"topic_category": topic_category,
"agent_response": agent_response,
}
)
return out
def parse_stories(lines: list[str]) -> list[dict]:
"""COPY columns: id, user_id, title, stage, story_type, summary, canonical_markdown,
time_start..updated_at — 17 fields; canonical_markdown may contain tabs.
"""
out: list[dict] = []
tail = 10 # time_start through updated_at
for ln in lines:
parts = ln.split("\t")
if len(parts) < 17:
continue
sid, uid, title, stage, story_type = (
parts[0],
parts[1],
parts[2],
parts[3] if parts[3] != r"\N" else "",
parts[4] if parts[4] != r"\N" else "",
)
summary = unescape_pg_text(parts[5]) if parts[5] != r"\N" else ""
canonical = unescape_pg_text("\t".join(parts[6:-tail]))
time_start = parts[-10] if parts[-10] != r"\N" else ""
time_end = parts[-9] if parts[-9] != r"\N" else ""
people_refs = parts[-8] if parts[-8] != r"\N" else ""
place_refs = parts[-7] if parts[-7] != r"\N" else ""
tag_refs = parts[-6] if parts[-6] != r"\N" else ""
status = parts[-5] if parts[-5] != r"\N" else ""
confidence = parts[-4] if parts[-4] != r"\N" else ""
current_version_id = parts[-3] if parts[-3] != r"\N" else ""
created_at, updated_at = parts[-2], parts[-1]
out.append(
{
"id": sid,
"user_id": uid,
"title": title,
"stage": stage,
"story_type": story_type,
"summary": summary,
"canonical_markdown": canonical,
"time_start": time_start,
"time_end": time_end,
"people_refs": people_refs,
"place_refs": place_refs,
"tag_refs": tag_refs,
"status": status,
"confidence": confidence,
"current_version_id": current_version_id,
"created_at": created_at,
"updated_at": updated_at,
}
)
return out
def parse_story_versions(lines: list[str]) -> list[dict]:
"""COPY: id, story_id, version_no, markdown_snapshot, change_summary, actor_type,
source_type, parent_version_id, prompt_meta, created_at — markdown_snapshot may contain tabs.
"""
out: list[dict] = []
tail = 6 # change_summary .. created_at
for ln in lines:
parts = ln.split("\t")
if len(parts) < 10:
continue
vid, story_id = parts[0], parts[1]
version_no_raw = parts[2]
try:
version_no = int(version_no_raw)
except ValueError:
version_no = 0
markdown_snapshot = unescape_pg_text("\t".join(parts[3:-tail]))
change_summary = unescape_pg_text(parts[-6]) if parts[-6] != r"\N" else ""
actor_type = parts[-5] if parts[-5] != r"\N" else ""
source_type = parts[-4] if parts[-4] != r"\N" else ""
parent_version_id = parts[-3] if parts[-3] != r"\N" else ""
prompt_meta = parts[-2] if parts[-2] != r"\N" else ""
created_at = parts[-1]
out.append(
{
"id": vid,
"story_id": story_id,
"version_no": version_no,
"markdown_snapshot": markdown_snapshot,
"change_summary": change_summary,
"actor_type": actor_type,
"source_type": source_type,
"parent_version_id": parent_version_id,
"prompt_meta": prompt_meta,
"created_at": created_at,
}
)
return out
def parse_chapters(lines: list[str]) -> list[dict]:
"""12 columns; content may contain tabs — unpack from the right."""
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 12:
continue
is_active = parts[-1]
source_segments = parts[-2]
is_new = parts[-3]
category = parts[-4]
updated_at = parts[-5]
images = parts[-6]
status = parts[-7]
order_index = parts[-8]
cid, uid, title = parts[0], parts[1], parts[2]
content = unescape_pg_text("\t".join(parts[3:-8]))
out.append(
{
"id": cid,
"user_id": uid,
"title": title,
"content": content,
"order_index": int(order_index) if order_index.isdigit() else 0,
"status": status,
"images": images,
"updated_at": updated_at,
"category": category if category != r"\N" else "",
"is_new": is_new,
"source_segments": source_segments,
"is_active": is_active,
}
)
return out
def safe_filename(s: str) -> str:
s = re.sub(r"[^\w\u4e00-\u9fff.-]+", "_", s.strip())
s = s.strip("_") or "user"
return s[:80]
def resolve_sql_arg(raw: str | None) -> Path:
if raw is None:
return DEFAULT_SQL
p = Path(raw).expanduser()
if not p.is_absolute():
p = Path.cwd() / p
p = p.resolve()
if p.is_dir():
candidates = sorted(
p.glob("*.sql"), key=lambda x: x.stat().st_mtime, reverse=True
)
if not candidates:
sys.exit(f"No *.sql files in directory: {p}")
return candidates[0]
return p
def _parse_cli_args(argv: list[str]) -> tuple[Path | None, bool, tuple[str, ...]]:
"""Returns (sql_path_arg or None, export_all, only_keys or ())."""
export_all = False
only_keys: list[str] = []
positional: list[str] = []
i = 0
while i < len(argv):
a = argv[i]
if a == "--all":
export_all = True
i += 1
elif a == "--only":
if i + 1 >= len(argv):
sys.exit("--only requires a value (user id or phone)")
only_keys.append(argv[i + 1])
i += 2
else:
positional.append(a)
i += 1
if len(positional) > 1:
sys.exit("Extra arguments: pass at most one SQL path or directory.")
sql_arg = positional[0] if positional else None
return sql_arg, export_all, tuple(only_keys)
def main() -> None:
sql_arg, export_all_flag, only_keys_cli = _parse_cli_args(sys.argv[1:])
sql_path = resolve_sql_arg(sql_arg)
if not sql_path.is_file():
print(f"Missing SQL file: {sql_path}")
sys.exit(1)
text = sql_path.read_text(encoding="utf-8", errors="replace")
users = parse_users(extract_copy_block(text, "users"))
conversations = parse_conversations(extract_copy_block(text, "conversations"))
segments = parse_segments(extract_copy_block(text, "segments"))
stories = parse_stories(extract_copy_block(text, "stories"))
story_versions = parse_story_versions(extract_copy_block(text, "story_versions"))
chapters = parse_chapters(extract_copy_block(text, "chapters"))
conv_by_user: dict[str, list[dict]] = defaultdict(list)
for c in conversations:
conv_by_user[c["user_id"]].append(c)
seg_by_conv: dict[str, list[dict]] = defaultdict(list)
for s in segments:
seg_by_conv[s["conversation_id"]].append(s)
stories_by_user: dict[str, list[dict]] = defaultdict(list)
for st in stories:
stories_by_user[st["user_id"]].append(st)
versions_by_story: dict[str, list[dict]] = defaultdict(list)
for v in story_versions:
versions_by_story[v["story_id"]].append(v)
chap_by_user: dict[str, list[dict]] = defaultdict(list)
for ch in chapters:
chap_by_user[ch["user_id"]].append(ch)
if export_all_flag and only_keys_cli:
sys.exit("Cannot combine --all with --only")
if only_keys_cli:
active_keys = only_keys_cli
else:
active_keys = (
() if (export_all_flag or not EXPORT_USER_KEYS) else EXPORT_USER_KEYS
)
users_to_write = {
uid: u for uid, u in users.items() if user_matches_export_keys(u, active_keys)
}
if active_keys and not users_to_write:
sample = ", ".join(
f"{u['phone']}/{u['id'][:8]}" for u in list(users.values())[:5]
)
sys.exit(
"No users matched EXPORT_USER_KEYS. "
f"Keys={active_keys!r}. Sample dump users: {sample or '(none)'}"
)
def _user_has_exportable_content(uid: str) -> bool:
return bool(conv_by_user[uid] or stories_by_user[uid] or chap_by_user[uid])
users_with_content = {
uid: u for uid, u in users_to_write.items() if _user_has_exportable_content(uid)
}
skipped_empty = len(users_to_write) - len(users_with_content)
if skipped_empty:
print(
f"Skipped {skipped_empty} user(s) with no conversations, stories, or chapters"
)
OUT_DIR.mkdir(parents=True, exist_ok=True)
if active_keys:
print(
f"Filter: {len(active_keys)} key(s) -> {len(users_to_write)} user(s) "
f"-> {len(users_with_content)} with exportable content"
)
else:
print(f"Export all: {len(users_with_content)} user(s) with exportable content")
for uid, u in sorted(
users_with_content.items(), key=lambda x: x[1].get("phone", "")
):
label = u["nickname"] or u["phone"] or uid[:8]
fname = f"{safe_filename(label)}_{uid}.md"
path = OUT_DIR / fname
lines_out: list[str] = []
lines_out.append(f"# 用户导出: {label}")
lines_out.append("")
lines_out.append(f"- **User ID:** `{uid}`")
lines_out.append(f"- **Phone:** {u['phone']}")
if u.get("email"):
lines_out.append(f"- **Email:** {u['email']}")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 对话记录(用户 + AI")
lines_out.append("")
user_convs = sorted(conv_by_user[uid], key=lambda c: c["started_at"])
if not user_convs:
lines_out.append("dump 中该用户无 conversations")
lines_out.append("")
else:
convs_with_segs = [cv for cv in user_convs if seg_by_conv[cv["id"]]]
if not convs_with_segs:
lines_out.append("(有会话但无 segments 轮次,或尚未落库)")
lines_out.append("")
else:
for cv in convs_with_segs:
segs = sorted(seg_by_conv[cv["id"]], key=lambda s: s["created_at"])
lines_out.append(f"### 会话 `{cv['id']}`")
lines_out.append("")
lines_out.append(f"- 开始: {cv['started_at']}")
if cv.get("conversation_stage"):
lines_out.append(f"- 阶段: {cv['conversation_stage']}")
lines_out.append("")
for i, seg in enumerate(segs, 1):
lines_out.append(f"#### 轮次 {i}{seg['created_at']}")
lines_out.append("")
if seg.get("audio_url"):
lines_out.append(f"- **音频:** `{seg['audio_url']}`")
lines_out.append("")
lines_out.append("**用户:**")
lines_out.append("")
lines_out.append(seg["transcript_text"] or "(空)")
lines_out.append("")
lines_out.append("**AI:**")
lines_out.append("")
lines_out.append(seg["agent_response"] or "(无回复)")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 人生故事stories")
lines_out.append("")
user_stories = sorted(
stories_by_user[uid],
key=lambda s: (s["updated_at"], s["created_at"], s["title"]),
)
if not user_stories:
lines_out.append("(无 story 行;表未进 dump 或用户暂无数据)")
lines_out.append("")
else:
for st in user_stories:
lines_out.append(f"### {st['title']}")
lines_out.append("")
meta_bits = [
f"`{st['id']}`",
f"status={st['status'] or ''}",
]
if st.get("stage"):
meta_bits.append(f"stage={st['stage']}")
if st.get("story_type"):
meta_bits.append(f"type={st['story_type']}")
lines_out.append("- " + " | ".join(meta_bits))
if st.get("time_start") or st.get("time_end"):
lines_out.append(
f"- **时间:** {st['time_start'] or ''}{st['time_end'] or ''}"
)
if st.get("summary"):
lines_out.append("")
lines_out.append("**摘要:**")
lines_out.append("")
lines_out.append(st["summary"])
if st.get("people_refs") or st.get("place_refs") or st.get("tag_refs"):
lines_out.append("")
if st.get("people_refs"):
lines_out.append(f"- people_refs: `{st['people_refs'][:200]}`")
if st.get("place_refs"):
lines_out.append(f"- place_refs: `{st['place_refs'][:200]}`")
if st.get("tag_refs"):
lines_out.append(f"- tag_refs: `{st['tag_refs'][:200]}`")
lines_out.append("")
lines_out.append("**当前正文canonical_markdown:**")
lines_out.append("")
lines_out.append(st["canonical_markdown"] or "(空)")
lines_out.append("")
vers = sorted(
versions_by_story[st["id"]], key=lambda v: v["version_no"]
)
if vers:
lines_out.append("#### 版本历史story_versions")
lines_out.append("")
for v in vers:
lines_out.append(
f"- **v{v['version_no']}** `{v['id']}` @ {v['created_at']}"
+ (
f"{v['actor_type']}/{v['source_type']}"
if v["actor_type"] or v["source_type"]
else ""
)
)
if v.get("change_summary"):
lines_out.append(f" - 变更说明: {v['change_summary']}")
lines_out.append("")
lines_out.append(v["markdown_snapshot"] or "(空快照)")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 回忆录章节(生成正文)")
lines_out.append("")
user_chapters = sorted(
chap_by_user[uid], key=lambda c: (c["order_index"], c["updated_at"])
)
if not user_chapters:
lines_out.append("(无章节)")
lines_out.append("")
else:
for ch in user_chapters:
lines_out.append(f"### [{ch['order_index']}] {ch['title']}")
lines_out.append("")
lines_out.append(
f"- **ID:** `{ch['id']}` | **状态:** {ch['status']} | **分类:** {ch['category'] or ''} | **updated:** {ch['updated_at']}"
)
lines_out.append("")
lines_out.append(ch["content"])
lines_out.append("")
lines_out.append("---")
lines_out.append("")
path.write_text("\n".join(lines_out), encoding="utf-8")
print(f"Wrote {path}")
print(f"Done. {len(users_with_content)} users -> {OUT_DIR} (source: {sql_path})")
if __name__ == "__main__":
main()