feat/ 导出3位用户的数据用于测试AI质量

This commit is contained in:
Kevin
2026-04-03 10:29:36 +08:00
parent 07c6478742
commit 545d5a4ae0
15 changed files with 146 additions and 6783 deletions

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
One-off: read life-echo pg_dump COPY data, emit one markdown per user
(conversations + segments + memoir chapters).
Run from api/ (recommended, matches uv venv):
uv run python scripts/extract_sql_to_user_md.py
uv run python scripts/extract_sql_to_user_md.py ./tests/data/dump.sql
uv run python scripts/extract_sql_to_user_md.py --all ./tests/data/
Only users listed in EXPORT_USER_KEYS (below) are exported by default.
Use --all to export everyone. Use () for EXPORT_USER_KEYS to mean “all”.
Users with no conversation rows in the dump are skipped (no file written).
Sessions that have no segments rows are omitted (no empty “### 会话” stubs).
If you pass a directory, the newest *.sql inside is used.
Default SQL path: <repo>/api/backups/life_echo_20260313_182756.sql
Output: api/tests/user_exports/<safe_name>_<uuid>.md
"""
from __future__ import annotations
import re
import sys
from collections import defaultdict
from pathlib import Path
# This file lives at <repo>/api/scripts/…
API_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SQL = API_ROOT / "backups/life_echo_20260313_182756.sql"
OUT_DIR = API_ROOT / "tests" / "user_exports"
# --- 编辑这里:只导出匹配任意一项的用户(完整 user id 或手机号,手机号可只写数字)---
# 设为空元组 () 表示不筛选、导出全部(与命令行 --all 相同效果)。
EXPORT_USER_KEYS: tuple[str, ...] = (
"1ade609c-567a-450b-b8fb-776aaba3c2b3",
"5e51151a-cb46-4c5c-ad5d-dae9c58ca243",
"e27fcd97-fefa-43b8-a7a3-3ecd49ebf5f0",
"15366015180",
)
def _digits_only(s: str) -> str:
return "".join(c for c in s if c.isdigit())
def user_matches_export_keys(u: dict, keys: tuple[str, ...]) -> bool:
"""Match full UUID (case-insensitive) or phone (exact, after stripping non-digits)."""
if not keys:
return True
uid = u["id"]
phone = u.get("phone") or ""
phone_d = _digits_only(phone)
for raw in keys:
key = raw.strip()
if not key:
continue
if key.lower() == uid.lower():
return True
if _digits_only(key) and _digits_only(key) == phone_d and phone_d:
return True
if key == phone:
return True
return False
def unescape_pg_text(s: str) -> str:
if s == r"\N":
return ""
return (
s.replace("\\n", "\n")
.replace("\\t", "\t")
.replace("\\r", "\r")
.replace("\\\\", "\\")
)
def extract_copy_block(text: str, table: str) -> list[str]:
"""Lines between COPY public.<table> ... FROM stdin; and \\."""
marker = f"COPY public.{table} "
start = text.find(marker)
if start == -1:
return []
nl = text.find("\n", start)
if nl == -1:
return []
body_start = nl + 1
end = text.find("\n\\.\n", body_start)
if end == -1:
end = text.find("\n\\.", body_start)
if end == -1:
return []
chunk = text[body_start:end]
return [ln for ln in chunk.split("\n") if ln.strip()]
def parse_users(lines: list[str]) -> dict[str, dict]:
users: dict[str, dict] = {}
for ln in lines:
parts = ln.split("\t")
if len(parts) < 14:
continue
uid, phone = parts[0], parts[1]
nickname = parts[5] if parts[5] != r"\N" else ""
email = parts[3] if parts[3] != r"\N" else ""
users[uid] = {
"id": uid,
"phone": phone,
"nickname": nickname,
"email": email,
}
return users
def parse_conversations(lines: list[str]) -> list[dict]:
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 9:
continue
out.append(
{
"id": parts[0],
"user_id": parts[1],
"started_at": parts[2],
"ended_at": parts[3] if parts[3] != r"\N" else "",
"duration_seconds": parts[4],
"summary": unescape_pg_text(parts[5]) if parts[5] != r"\N" else "",
"status": parts[6],
"current_topic": unescape_pg_text(parts[7])
if parts[7] != r"\N"
else "",
"conversation_stage": unescape_pg_text(parts[8])
if parts[8] != r"\N"
else "",
}
)
return out
def parse_segments(lines: list[str]) -> list[dict]:
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 8:
continue
sid, cid, audio = parts[0], parts[1], parts[2]
agent_response = unescape_pg_text(parts[-1]) if parts[-1] != r"\N" else ""
topic_category = parts[-2] if parts[-2] != r"\N" else ""
processed = parts[-3]
created_at = parts[-4]
transcript = unescape_pg_text("\t".join(parts[3:-4]))
out.append(
{
"id": sid,
"conversation_id": cid,
"audio_url": audio if audio != r"\N" else "",
"transcript_text": transcript,
"created_at": created_at,
"processed": processed,
"topic_category": topic_category,
"agent_response": agent_response,
}
)
return out
def parse_chapters(lines: list[str]) -> list[dict]:
"""12 columns; content may contain tabs — unpack from the right."""
out: list[dict] = []
for ln in lines:
parts = ln.split("\t")
if len(parts) < 12:
continue
is_active = parts[-1]
source_segments = parts[-2]
is_new = parts[-3]
category = parts[-4]
updated_at = parts[-5]
images = parts[-6]
status = parts[-7]
order_index = parts[-8]
cid, uid, title = parts[0], parts[1], parts[2]
content = unescape_pg_text("\t".join(parts[3:-8]))
out.append(
{
"id": cid,
"user_id": uid,
"title": title,
"content": content,
"order_index": int(order_index) if order_index.isdigit() else 0,
"status": status,
"images": images,
"updated_at": updated_at,
"category": category if category != r"\N" else "",
"is_new": is_new,
"source_segments": source_segments,
"is_active": is_active,
}
)
return out
def safe_filename(s: str) -> str:
s = re.sub(r"[^\w\u4e00-\u9fff.-]+", "_", s.strip())
s = s.strip("_") or "user"
return s[:80]
def resolve_sql_arg(raw: str | None) -> Path:
if raw is None:
return DEFAULT_SQL
p = Path(raw).expanduser()
if not p.is_absolute():
p = Path.cwd() / p
p = p.resolve()
if p.is_dir():
candidates = sorted(
p.glob("*.sql"), key=lambda x: x.stat().st_mtime, reverse=True
)
if not candidates:
sys.exit(f"No *.sql files in directory: {p}")
return candidates[0]
return p
def _parse_cli_args(argv: list[str]) -> tuple[Path | None, bool]:
"""Returns (sql_path_arg or None, export_all)."""
export_all = False
rest: list[str] = []
for a in argv:
if a == "--all":
export_all = True
else:
rest.append(a)
sql_arg = rest[0] if rest else None
return sql_arg, export_all
def main() -> None:
sql_arg, export_all_flag = _parse_cli_args(sys.argv[1:])
sql_path = resolve_sql_arg(sql_arg)
if not sql_path.is_file():
print(f"Missing SQL file: {sql_path}")
sys.exit(1)
text = sql_path.read_text(encoding="utf-8", errors="replace")
users = parse_users(extract_copy_block(text, "users"))
conversations = parse_conversations(extract_copy_block(text, "conversations"))
segments = parse_segments(extract_copy_block(text, "segments"))
chapters = parse_chapters(extract_copy_block(text, "chapters"))
conv_by_user: dict[str, list[dict]] = defaultdict(list)
for c in conversations:
conv_by_user[c["user_id"]].append(c)
seg_by_conv: dict[str, list[dict]] = defaultdict(list)
for s in segments:
seg_by_conv[s["conversation_id"]].append(s)
chap_by_user: dict[str, list[dict]] = defaultdict(list)
for ch in chapters:
chap_by_user[ch["user_id"]].append(ch)
active_keys: tuple[str, ...] = (
() if (export_all_flag or not EXPORT_USER_KEYS) else EXPORT_USER_KEYS
)
users_to_write = {
uid: u for uid, u in users.items() if user_matches_export_keys(u, active_keys)
}
if active_keys and not users_to_write:
sample = ", ".join(
f"{u['phone']}/{u['id'][:8]}" for u in list(users.values())[:5]
)
sys.exit(
"No users matched EXPORT_USER_KEYS. "
f"Keys={active_keys!r}. Sample dump users: {sample or '(none)'}"
)
users_with_convs = {
uid: u for uid, u in users_to_write.items() if conv_by_user[uid]
}
skipped_no_conv = len(users_to_write) - len(users_with_convs)
if skipped_no_conv:
print(f"Skipped {skipped_no_conv} user(s) with no conversations")
OUT_DIR.mkdir(parents=True, exist_ok=True)
if active_keys:
print(
f"Filter: {len(active_keys)} key(s) -> {len(users_to_write)} user(s) "
f"-> {len(users_with_convs)} with conversations"
)
else:
print(f"Export all: {len(users_with_convs)} user(s) with conversations")
for uid, u in sorted(users_with_convs.items(), key=lambda x: x[1].get("phone", "")):
label = u["nickname"] or u["phone"] or uid[:8]
fname = f"{safe_filename(label)}_{uid}.md"
path = OUT_DIR / fname
lines_out: list[str] = []
lines_out.append(f"# 用户导出: {label}")
lines_out.append("")
lines_out.append(f"- **User ID:** `{uid}`")
lines_out.append(f"- **Phone:** {u['phone']}")
if u.get("email"):
lines_out.append(f"- **Email:** {u['email']}")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 对话记录(用户 + AI")
lines_out.append("")
user_convs = sorted(conv_by_user[uid], key=lambda c: c["started_at"])
convs_with_segs = [cv for cv in user_convs if seg_by_conv[cv["id"]]]
if not convs_with_segs:
lines_out.append(
"无对话轮次dump 里这些会话下没有 segments或尚未落库"
)
lines_out.append("")
else:
for cv in convs_with_segs:
segs = sorted(seg_by_conv[cv["id"]], key=lambda s: s["created_at"])
lines_out.append(f"### 会话 `{cv['id']}`")
lines_out.append("")
lines_out.append(f"- 开始: {cv['started_at']}")
if cv.get("conversation_stage"):
lines_out.append(f"- 阶段: {cv['conversation_stage']}")
lines_out.append("")
for i, seg in enumerate(segs, 1):
lines_out.append(f"#### 轮次 {i}{seg['created_at']}")
lines_out.append("")
if seg.get("audio_url"):
lines_out.append(f"- **音频:** `{seg['audio_url']}`")
lines_out.append("")
lines_out.append("**用户:**")
lines_out.append("")
lines_out.append(seg["transcript_text"] or "(空)")
lines_out.append("")
lines_out.append("**AI:**")
lines_out.append("")
lines_out.append(seg["agent_response"] or "(无回复)")
lines_out.append("")
lines_out.append("---")
lines_out.append("")
lines_out.append("## 回忆录章节(生成正文)")
lines_out.append("")
user_chapters = sorted(
chap_by_user[uid], key=lambda c: (c["order_index"], c["updated_at"])
)
if not user_chapters:
lines_out.append("(无章节)")
else:
for ch in user_chapters:
lines_out.append(f"### [{ch['order_index']}] {ch['title']}")
lines_out.append("")
lines_out.append(
f"- **ID:** `{ch['id']}` | **状态:** {ch['status']} | **分类:** {ch['category'] or ''} | **updated:** {ch['updated_at']}"
)
lines_out.append("")
lines_out.append(ch["content"])
lines_out.append("")
lines_out.append("---")
lines_out.append("")
path.write_text("\n".join(lines_out), encoding="utf-8")
print(f"Wrote {path}")
print(f"Done. {len(users_with_convs)} users -> {OUT_DIR} (source: {sql_path})")
if __name__ == "__main__":
main()