数据库 - 新增迁移 0003:timeline_events.memory_source_id 外键 → memory_sources,便于按 ingest 源做时间线幂等 后端 - 记忆 - 新增 ingest 后 LLM 富化(摘要/事实/时间线),可配置开关与最大字符数 - 新增证据包组装:合并 chunk、摘要、事实、时间线、故事等检索结果;支持空 query 时是否仍带 rolling 等开关 - repo/retriever/service/router/schemas/summarizer/timeline/extractor 等扩展;文档 memory-retrieval.md 更新 后端 - 对话 WS - 增加 PING/PONG;分段 ASR 日志与空音频处理;转写失败与「无助手回复」错误提示更明确 - 助手多段回复持久化使用统一分隔符,与分段逻辑一致 后端 - Agent - reply_limits:按 [SPLIT] 与段落拆段,并保证非空 fallback,供 WS 与 TTS 多段下发 后端 - 回忆录任务 - transcript ingest 记录 source_id;任务成功结?
882 lines
32 KiB
Python
882 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
将「旧版 pg_dump 全量 SQL」恢复到的数据库中的数据,迁移到当前 Alembic schema 的目标库。
|
||
|
||
旧 schema 来源:仅含 books / chapters / conversations / memoir_states / orders /
|
||
refresh_tokens / segments / sms_verification_codes / users(见仓库内历史备份)。
|
||
|
||
用法(推荐):
|
||
|
||
1) 在 Postgres 上单独建一个库,只用于承载旧备份(表结构保持 dump 原样)::
|
||
|
||
createdb life_echo_legacy
|
||
psql -d life_echo_legacy -f api/backups/life_echo_20260313_182756.sql
|
||
|
||
2) **目标库**须与线上一致:已跑完当前仓库全部 Alembic 迁移(``alembic upgrade head``),
|
||
含 pgvector 与 ORM 所建全部表。线上/预发库均为该 schema;本脚本及 ``_purge_target_user`` 的
|
||
DELETE 顺序按当前表结构编写,若将来迁移新增表或外键,需同步更新删除逻辑。
|
||
|
||
3) 运行(仓库内可用 ``uv run python scripts/...``)::
|
||
|
||
python3 migrate_legacy_to_current.py \\
|
||
--legacy-url postgresql://postgres:postgres@localhost:5432/life_echo_legacy \\
|
||
--target-url postgresql://postgres:postgres@localhost:5432/life_echo
|
||
|
||
**仅服务器 + psycopg**:本脚本不依赖项目内其它包,可复制单文件到机器上执行::
|
||
|
||
pip install 'psycopg[binary]' # 或 python3 -m pip install --user 'psycopg[binary]'
|
||
python3 migrate_legacy_to_current.py --legacy-url ... --target-url ...
|
||
|
||
说明:
|
||
- 不会创建 stories / memory_* / conversation_messages 等旧库中不存在的表数据;
|
||
- chapters:content → canonical_markdown;按 user_id 关联该用户唯一 book 填 book_id(若无书则为 NULL);
|
||
- segments:transcript_text → user_input_text;新列 audio_duration_seconds / tts_audio_urls 置 NULL;
|
||
- conversations:补 last_message_at / deleted_at(均为 NULL);
|
||
- 若 images JSON 非空且可解析为 URL 列表,会写入 memoir_images(order_index 递增)。
|
||
|
||
冲突策略:默认对主键 id 做 UPSERT(旧数据覆盖目标同 id 行)。可用 --on-conflict skip 跳过已存在主键。
|
||
|
||
若目标库已有用户且手机号与某条 legacy 用户冲突(同号不同 id),由 ``--phone-conflict`` 控制:
|
||
- ``replace_target``(默认):先按与 ``purge_user_related_rows`` 相同顺序删除目标侧占号用户及其业务数据,再迁入 legacy(前提:目标库已是 Alembic head 的完整 schema);
|
||
- ``skip``:跳过该 legacy 用户及其关联行(旧行为)。
|
||
|
||
**宿主机上跑脚本(数据库在 Docker Compose 里)**:`.env` 里常见主机名 `postgres`,在容器外无法解析。
|
||
可直接把 URL 写成 `...@127.0.0.1:5432/...`,或使用 `--db-host 127.0.0.1` 自动替换两个 URL 中的主机名(端口不变)。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import uuid
|
||
from typing import Any, Literal
|
||
from urllib.parse import urlparse, urlunparse
|
||
|
||
from psycopg import Connection, connect
|
||
from psycopg.rows import dict_row
|
||
from psycopg.types.json import Json
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
OnConflict = Literal["upsert", "skip"]
|
||
PhoneConflictMode = Literal["skip", "replace_target"]
|
||
|
||
|
||
def _replace_url_host(url: str, new_host: str) -> str:
|
||
"""将 postgresql URL 中的主机名替换为 new_host(保留用户、密码、端口、库名)。"""
|
||
u = urlparse(url)
|
||
if not u.netloc or "@" not in u.netloc:
|
||
return url
|
||
auth, hostport = u.netloc.rsplit("@", 1)
|
||
if ":" in hostport:
|
||
_old_host, port = hostport.split(":", 1)
|
||
new_netloc = f"{auth}@{new_host}:{port}"
|
||
else:
|
||
new_netloc = f"{auth}@{new_host}"
|
||
return urlunparse((u.scheme, new_netloc, u.path, u.params, u.query, u.fragment))
|
||
|
||
|
||
def _open(url: str) -> Connection:
|
||
return connect(url, autocommit=False)
|
||
|
||
|
||
def _purge_target_user(conn: Connection, user_id: str) -> None:
|
||
"""
|
||
删除目标库中某用户及其业务数据,顺序与 app.features.user.repo.purge_user_related_rows 一致,
|
||
以便随后插入 legacy 用户行而不违反 users.phone 唯一约束。
|
||
|
||
假定目标库 schema 与 ``alembic upgrade head`` 一致(线上库常态);新增迁移若引入指向
|
||
``users`` 的表或外键,须在此补充 DELETE。
|
||
"""
|
||
uid = user_id
|
||
with conn.cursor() as cur:
|
||
cur.execute("DELETE FROM memory_facts WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM memory_chunks WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM memory_sources WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM memory_summaries WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM timeline_events WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM memory_curation_actions WHERE user_id = %s", (uid,))
|
||
|
||
cur.execute(
|
||
"""
|
||
SELECT sii.asset_id FROM story_image_intents sii
|
||
INNER JOIN stories s ON s.id = sii.story_id
|
||
WHERE s.user_id = %s AND sii.asset_id IS NOT NULL
|
||
""",
|
||
(uid,),
|
||
)
|
||
asset_ids = [row[0] for row in cur.fetchall()]
|
||
cur.execute(
|
||
"""
|
||
SELECT cci.asset_id FROM chapter_cover_intents cci
|
||
INNER JOIN chapters c ON c.id = cci.chapter_id
|
||
WHERE c.user_id = %s AND cci.asset_id IS NOT NULL
|
||
""",
|
||
(uid,),
|
||
)
|
||
asset_ids.extend(row[0] for row in cur.fetchall())
|
||
seen_assets = {a for a in asset_ids if a}
|
||
if seen_assets:
|
||
ids = list(seen_assets)
|
||
ph = ",".join(["%s"] * len(ids))
|
||
cur.execute(f"DELETE FROM assets WHERE id IN ({ph})", ids)
|
||
|
||
cur.execute("DELETE FROM stories WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM chapters WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM books WHERE user_id = %s", (uid,))
|
||
|
||
cur.execute(
|
||
"""
|
||
DELETE FROM conversation_messages
|
||
WHERE conversation_id IN (
|
||
SELECT id FROM conversations WHERE user_id = %s
|
||
)
|
||
""",
|
||
(uid,),
|
||
)
|
||
cur.execute(
|
||
"""
|
||
DELETE FROM segments
|
||
WHERE conversation_id IN (
|
||
SELECT id FROM conversations WHERE user_id = %s
|
||
)
|
||
""",
|
||
(uid,),
|
||
)
|
||
cur.execute("DELETE FROM conversations WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM memoir_states WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM orders WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM refresh_tokens WHERE user_id = %s", (uid,))
|
||
cur.execute("DELETE FROM users WHERE id = %s", (uid,))
|
||
|
||
|
||
def _phone_conflict_legacy_skips(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
mode: PhoneConflictMode,
|
||
) -> set[str]:
|
||
"""同号不同 id:skip 则跳过 legacy 用户;replace_target 则先删目标占号用户再迁入。"""
|
||
with target.cursor() as cur:
|
||
cur.execute("SELECT phone, id FROM users")
|
||
phone_owner: dict[str, str] = {row[0]: row[1] for row in cur.fetchall()}
|
||
skipped: set[str] = set()
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT id, phone FROM users ORDER BY id")
|
||
rows = cur.fetchall()
|
||
for r in rows:
|
||
legacy_id = r["id"]
|
||
phone = r["phone"]
|
||
owner = phone_owner.get(phone)
|
||
if owner is None or owner == legacy_id:
|
||
continue
|
||
if mode == "skip":
|
||
skipped.add(legacy_id)
|
||
logger.warning(
|
||
"skip legacy user %s phone=%s (target user id=%s)",
|
||
legacy_id,
|
||
phone,
|
||
owner,
|
||
)
|
||
continue
|
||
logger.warning(
|
||
"phone conflict: purging target user %s (phone=%s) then migrating legacy user %s",
|
||
owner,
|
||
phone,
|
||
legacy_id,
|
||
)
|
||
_purge_target_user(target, owner)
|
||
phone_owner = {p: uid for p, uid in phone_owner.items() if uid != owner}
|
||
return skipped
|
||
|
||
|
||
def migrate_users(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM users ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
phone = EXCLUDED.phone,
|
||
password_hash = EXCLUDED.password_hash,
|
||
email = EXCLUDED.email,
|
||
openid = EXCLUDED.openid,
|
||
nickname = EXCLUDED.nickname,
|
||
avatar_url = EXCLUDED.avatar_url,
|
||
subscription_type = EXCLUDED.subscription_type,
|
||
subscription_expires_at = EXCLUDED.subscription_expires_at,
|
||
created_at = EXCLUDED.created_at,
|
||
birth_year = EXCLUDED.birth_year,
|
||
birth_place = EXCLUDED.birth_place,
|
||
grew_up_place = EXCLUDED.grew_up_place,
|
||
occupation = EXCLUDED.occupation"""
|
||
)
|
||
sql = f"""
|
||
INSERT INTO users (
|
||
id, phone, password_hash, email, openid, nickname, avatar_url,
|
||
subscription_type, subscription_expires_at, created_at,
|
||
birth_year, birth_place, grew_up_place, occupation
|
||
) VALUES (
|
||
%(id)s, %(phone)s, %(password_hash)s, %(email)s, %(openid)s, %(nickname)s,
|
||
%(avatar_url)s, %(subscription_type)s, %(subscription_expires_at)s, %(created_at)s,
|
||
%(birth_year)s, %(birth_place)s, %(grew_up_place)s, %(occupation)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, rows)
|
||
return len(rows)
|
||
|
||
|
||
def migrate_books(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM books ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
title = EXCLUDED.title,
|
||
total_pages = EXCLUDED.total_pages,
|
||
total_words = EXCLUDED.total_words,
|
||
cover_image_url = EXCLUDED.cover_image_url,
|
||
updated_at = EXCLUDED.updated_at,
|
||
has_update = EXCLUDED.has_update,
|
||
last_update_chapter_id = EXCLUDED.last_update_chapter_id"""
|
||
)
|
||
sql = f"""
|
||
INSERT INTO books (
|
||
id, user_id, title, total_pages, total_words, cover_image_url,
|
||
updated_at, has_update, last_update_chapter_id
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(title)s, %(total_pages)s, %(total_words)s,
|
||
%(cover_image_url)s, %(updated_at)s, %(has_update)s, %(last_update_chapter_id)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, rows)
|
||
return len(rows)
|
||
|
||
|
||
def migrate_memoir_states(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM memoir_states ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
stage_order = EXCLUDED.stage_order,
|
||
current_stage = EXCLUDED.current_stage,
|
||
covered_stages = EXCLUDED.covered_stages,
|
||
slots = EXCLUDED.slots,
|
||
updated_at = EXCLUDED.updated_at"""
|
||
)
|
||
sql = f"""
|
||
INSERT INTO memoir_states (
|
||
id, user_id, stage_order, current_stage, covered_stages, slots, updated_at
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(stage_order)s, %(current_stage)s,
|
||
%(covered_stages)s, %(slots)s, %(updated_at)s
|
||
) {conflict}
|
||
"""
|
||
out: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
out.append(
|
||
{
|
||
"id": r["id"],
|
||
"user_id": r["user_id"],
|
||
"stage_order": Json(r["stage_order"])
|
||
if r.get("stage_order") is not None
|
||
else None,
|
||
"current_stage": r.get("current_stage"),
|
||
"covered_stages": Json(r["covered_stages"])
|
||
if r.get("covered_stages") is not None
|
||
else None,
|
||
"slots": Json(r["slots"]),
|
||
"updated_at": r.get("updated_at"),
|
||
}
|
||
)
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, out)
|
||
return len(out)
|
||
|
||
|
||
def migrate_conversations(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM conversations ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
started_at = EXCLUDED.started_at,
|
||
last_message_at = EXCLUDED.last_message_at,
|
||
ended_at = EXCLUDED.ended_at,
|
||
duration_seconds = EXCLUDED.duration_seconds,
|
||
summary = EXCLUDED.summary,
|
||
status = EXCLUDED.status,
|
||
current_topic = EXCLUDED.current_topic,
|
||
conversation_stage = EXCLUDED.conversation_stage,
|
||
deleted_at = EXCLUDED.deleted_at"""
|
||
)
|
||
out: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
out.append(
|
||
{
|
||
"id": r["id"],
|
||
"user_id": r["user_id"],
|
||
"started_at": r["started_at"],
|
||
"last_message_at": None,
|
||
"ended_at": r["ended_at"],
|
||
"duration_seconds": r.get("duration_seconds") or 0,
|
||
"summary": r["summary"],
|
||
"status": r.get("status") or "active",
|
||
"current_topic": r["current_topic"],
|
||
"conversation_stage": r["conversation_stage"],
|
||
"deleted_at": None,
|
||
}
|
||
)
|
||
sql = f"""
|
||
INSERT INTO conversations (
|
||
id, user_id, started_at, last_message_at, ended_at, duration_seconds,
|
||
summary, status, current_topic, conversation_stage, deleted_at
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(started_at)s, %(last_message_at)s, %(ended_at)s,
|
||
%(duration_seconds)s, %(summary)s, %(status)s, %(current_topic)s,
|
||
%(conversation_stage)s, %(deleted_at)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, out)
|
||
return len(out)
|
||
|
||
|
||
def migrate_segments(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT id, user_id FROM conversations")
|
||
conv_user = {r["id"]: r["user_id"] for r in cur.fetchall()}
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM segments ORDER BY id")
|
||
rows = []
|
||
for r in cur.fetchall():
|
||
uid = conv_user.get(r["conversation_id"])
|
||
if uid is None or uid in skip_user_ids:
|
||
continue
|
||
rows.append(r)
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
conversation_id = EXCLUDED.conversation_id,
|
||
audio_url = EXCLUDED.audio_url,
|
||
user_input_text = EXCLUDED.user_input_text,
|
||
audio_duration_seconds = EXCLUDED.audio_duration_seconds,
|
||
created_at = EXCLUDED.created_at,
|
||
processed = EXCLUDED.processed,
|
||
topic_category = EXCLUDED.topic_category,
|
||
agent_response = EXCLUDED.agent_response,
|
||
tts_audio_urls = EXCLUDED.tts_audio_urls"""
|
||
)
|
||
out: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
text_val = r.get("transcript_text")
|
||
if text_val is None:
|
||
text_val = ""
|
||
out.append(
|
||
{
|
||
"id": r["id"],
|
||
"conversation_id": r["conversation_id"],
|
||
"audio_url": r["audio_url"],
|
||
"user_input_text": text_val,
|
||
"audio_duration_seconds": None,
|
||
"created_at": r["created_at"],
|
||
"processed": r.get("processed")
|
||
if r.get("processed") is not None
|
||
else False,
|
||
"topic_category": r["topic_category"],
|
||
"agent_response": r["agent_response"],
|
||
"tts_audio_urls": None,
|
||
}
|
||
)
|
||
sql = f"""
|
||
INSERT INTO segments (
|
||
id, conversation_id, audio_url, user_input_text, audio_duration_seconds,
|
||
created_at, processed, topic_category, agent_response, tts_audio_urls
|
||
) VALUES (
|
||
%(id)s, %(conversation_id)s, %(audio_url)s, %(user_input_text)s,
|
||
%(audio_duration_seconds)s, %(created_at)s, %(processed)s, %(topic_category)s,
|
||
%(agent_response)s, %(tts_audio_urls)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, out)
|
||
return len(out)
|
||
|
||
|
||
def migrate_orders(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM orders ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
plan_id = EXCLUDED.plan_id,
|
||
plan_name = EXCLUDED.plan_name,
|
||
amount = EXCLUDED.amount,
|
||
currency = EXCLUDED.currency,
|
||
payment_method = EXCLUDED.payment_method,
|
||
status = EXCLUDED.status,
|
||
trade_no = EXCLUDED.trade_no,
|
||
paid_at = EXCLUDED.paid_at,
|
||
created_at = EXCLUDED.created_at,
|
||
expired_at = EXCLUDED.expired_at"""
|
||
)
|
||
sql = f"""
|
||
INSERT INTO orders (
|
||
id, user_id, plan_id, plan_name, amount, currency, payment_method,
|
||
status, trade_no, paid_at, created_at, expired_at
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(plan_id)s, %(plan_name)s, %(amount)s, %(currency)s,
|
||
%(payment_method)s, %(status)s, %(trade_no)s, %(paid_at)s, %(created_at)s,
|
||
%(expired_at)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, rows)
|
||
return len(rows)
|
||
|
||
|
||
def migrate_refresh_tokens(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM refresh_tokens ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
token = EXCLUDED.token,
|
||
expires_at = EXCLUDED.expires_at,
|
||
created_at = EXCLUDED.created_at,
|
||
is_revoked = EXCLUDED.is_revoked,
|
||
device_info = EXCLUDED.device_info"""
|
||
)
|
||
out: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
out.append(
|
||
{
|
||
"id": r["id"],
|
||
"user_id": r["user_id"],
|
||
"token": r["token"],
|
||
"expires_at": r["expires_at"],
|
||
"created_at": r["created_at"],
|
||
"is_revoked": r.get("is_revoked")
|
||
if r.get("is_revoked") is not None
|
||
else False,
|
||
"device_info": r.get("device_info"),
|
||
}
|
||
)
|
||
sql = f"""
|
||
INSERT INTO refresh_tokens (
|
||
id, user_id, token, expires_at, created_at, is_revoked, device_info
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(token)s, %(expires_at)s, %(created_at)s,
|
||
%(is_revoked)s, %(device_info)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, out)
|
||
return len(out)
|
||
|
||
|
||
def migrate_sms(legacy: Connection, target: Connection, on_conflict: OnConflict) -> int:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM sms_verification_codes ORDER BY id")
|
||
rows = cur.fetchall()
|
||
if not rows:
|
||
return 0
|
||
conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
phone = EXCLUDED.phone,
|
||
code = EXCLUDED.code,
|
||
purpose = EXCLUDED.purpose,
|
||
is_used = EXCLUDED.is_used,
|
||
is_expired = EXCLUDED.is_expired,
|
||
expires_at = EXCLUDED.expires_at,
|
||
created_at = EXCLUDED.created_at,
|
||
verified_at = EXCLUDED.verified_at,
|
||
ip_address = EXCLUDED.ip_address"""
|
||
)
|
||
sql = f"""
|
||
INSERT INTO sms_verification_codes (
|
||
id, phone, code, purpose, is_used, is_expired, expires_at,
|
||
created_at, verified_at, ip_address
|
||
) VALUES (
|
||
%(id)s, %(phone)s, %(code)s, %(purpose)s, %(is_used)s, %(is_expired)s,
|
||
%(expires_at)s, %(created_at)s, %(verified_at)s, %(ip_address)s
|
||
) {conflict}
|
||
"""
|
||
with target.cursor() as cur:
|
||
cur.executemany(sql, rows)
|
||
return len(rows)
|
||
|
||
|
||
def _book_id_for_user(target: Connection, user_id: str) -> str | None:
|
||
with target.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM books
|
||
WHERE user_id = %s
|
||
ORDER BY updated_at DESC NULLS LAST
|
||
LIMIT 1
|
||
""",
|
||
(user_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
return row[0] if row else None
|
||
|
||
|
||
def _parse_images_for_memoir(images_val: Any) -> list[str]:
|
||
if images_val is None:
|
||
return []
|
||
if isinstance(images_val, str):
|
||
try:
|
||
images_val = json.loads(images_val)
|
||
except json.JSONDecodeError:
|
||
return []
|
||
if not isinstance(images_val, list):
|
||
return []
|
||
urls: list[str] = []
|
||
for item in images_val:
|
||
if isinstance(item, str) and item.startswith(("http://", "https://")):
|
||
urls.append(item)
|
||
elif isinstance(item, dict):
|
||
u = item.get("url") or item.get("image_url") or item.get("src")
|
||
if isinstance(u, str) and u.startswith(("http://", "https://")):
|
||
urls.append(u)
|
||
return urls
|
||
|
||
|
||
def migrate_chapters_and_images(
|
||
legacy: Connection,
|
||
target: Connection,
|
||
on_conflict: OnConflict,
|
||
skip_user_ids: set[str],
|
||
) -> tuple[int, int]:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
cur.execute("SELECT * FROM chapters ORDER BY id")
|
||
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
|
||
if not rows:
|
||
return 0, 0
|
||
ch_conflict = (
|
||
"ON CONFLICT (id) DO NOTHING"
|
||
if on_conflict == "skip"
|
||
else """ON CONFLICT (id) DO UPDATE SET
|
||
user_id = EXCLUDED.user_id,
|
||
book_id = EXCLUDED.book_id,
|
||
title = EXCLUDED.title,
|
||
category = EXCLUDED.category,
|
||
order_index = EXCLUDED.order_index,
|
||
summary = EXCLUDED.summary,
|
||
canonical_markdown = EXCLUDED.canonical_markdown,
|
||
status = EXCLUDED.status,
|
||
cover_asset_id = EXCLUDED.cover_asset_id,
|
||
current_version_id = EXCLUDED.current_version_id,
|
||
created_at = EXCLUDED.created_at,
|
||
updated_at = EXCLUDED.updated_at,
|
||
is_new = EXCLUDED.is_new,
|
||
is_active = EXCLUDED.is_active,
|
||
source_segments = EXCLUDED.source_segments,
|
||
markdown_compose_dirty = EXCLUDED.markdown_compose_dirty,
|
||
markdown_composed_at = EXCLUDED.markdown_composed_at,
|
||
reading_segments_json = EXCLUDED.reading_segments_json"""
|
||
)
|
||
insert_ch = f"""
|
||
INSERT INTO chapters (
|
||
id, user_id, book_id, title, category, order_index, summary,
|
||
canonical_markdown, status, cover_asset_id, current_version_id,
|
||
created_at, updated_at, is_new, is_active, source_segments,
|
||
markdown_compose_dirty, markdown_composed_at, reading_segments_json
|
||
) VALUES (
|
||
%(id)s, %(user_id)s, %(book_id)s, %(title)s, %(category)s, %(order_index)s,
|
||
%(summary)s, %(canonical_markdown)s, %(status)s, %(cover_asset_id)s,
|
||
%(current_version_id)s, %(created_at)s, %(updated_at)s, %(is_new)s,
|
||
%(is_active)s, %(source_segments)s, %(markdown_compose_dirty)s,
|
||
%(markdown_composed_at)s, %(reading_segments_json)s
|
||
) {ch_conflict}
|
||
"""
|
||
img_inserted = 0
|
||
chapter_payloads: list[dict[str, Any]] = []
|
||
for r in rows:
|
||
uid = r["user_id"]
|
||
bid = _book_id_for_user(target, uid)
|
||
ts = r.get("updated_at")
|
||
chapter_payloads.append(
|
||
{
|
||
"id": r["id"],
|
||
"user_id": uid,
|
||
"book_id": bid,
|
||
"title": r["title"],
|
||
"category": r.get("category"),
|
||
"order_index": r["order_index"],
|
||
"summary": None,
|
||
"canonical_markdown": r.get("content"),
|
||
"status": r.get("status") or "draft",
|
||
"cover_asset_id": None,
|
||
"current_version_id": None,
|
||
"created_at": ts,
|
||
"updated_at": ts,
|
||
"is_new": r.get("is_new") if r.get("is_new") is not None else True,
|
||
"is_active": r.get("is_active")
|
||
if r.get("is_active") is not None
|
||
else True,
|
||
"source_segments": Json(r["source_segments"])
|
||
if r.get("source_segments") is not None
|
||
else None,
|
||
"markdown_compose_dirty": False,
|
||
"markdown_composed_at": None,
|
||
"reading_segments_json": None,
|
||
}
|
||
)
|
||
|
||
with target.cursor() as cur:
|
||
cur.executemany(insert_ch, chapter_payloads)
|
||
|
||
if on_conflict == "skip":
|
||
img_conflict = "ON CONFLICT (id) DO NOTHING"
|
||
else:
|
||
img_conflict = """ON CONFLICT (id) DO UPDATE SET
|
||
chapter_id = EXCLUDED.chapter_id,
|
||
order_index = EXCLUDED.order_index,
|
||
url = EXCLUDED.url,
|
||
status = EXCLUDED.status,
|
||
updated_at = EXCLUDED.updated_at"""
|
||
|
||
insert_img = f"""
|
||
INSERT INTO memoir_images (
|
||
id, chapter_id, order_index, placeholder, description, status,
|
||
prompt, url, storage_key, provider, style, size, error,
|
||
retryable, created_at, updated_at
|
||
) VALUES (
|
||
%(id)s, %(chapter_id)s, %(order_index)s, %(placeholder)s,
|
||
%(description)s, %(status)s, %(prompt)s, %(url)s, %(storage_key)s,
|
||
%(provider)s, %(style)s, %(size)s, %(error)s, %(retryable)s,
|
||
%(created_at)s, %(updated_at)s
|
||
) {img_conflict}
|
||
"""
|
||
|
||
for r in rows:
|
||
urls = _parse_images_for_memoir(r.get("images"))
|
||
if not urls:
|
||
continue
|
||
for i, url in enumerate(urls):
|
||
mid = str(
|
||
uuid.uuid5(
|
||
uuid.NAMESPACE_URL,
|
||
f"memoir_image:{r['id']}:{i}:{url}",
|
||
)
|
||
)
|
||
cur.execute(
|
||
insert_img,
|
||
{
|
||
"id": mid,
|
||
"chapter_id": r["id"],
|
||
"order_index": i,
|
||
"placeholder": None,
|
||
"description": None,
|
||
"status": "completed",
|
||
"prompt": None,
|
||
"url": url,
|
||
"storage_key": None,
|
||
"provider": None,
|
||
"style": None,
|
||
"size": None,
|
||
"error": None,
|
||
"retryable": None,
|
||
"created_at": r.get("updated_at"),
|
||
"updated_at": r.get("updated_at"),
|
||
},
|
||
)
|
||
img_inserted += 1
|
||
|
||
return len(chapter_payloads), img_inserted
|
||
|
||
|
||
def main() -> None:
|
||
p = argparse.ArgumentParser(
|
||
description="Migrate legacy Life Echo DB into current schema."
|
||
)
|
||
p.add_argument(
|
||
"--legacy-url",
|
||
required=True,
|
||
help="PostgreSQL URL to DB restored from old pg_dump (old schema only).",
|
||
)
|
||
p.add_argument(
|
||
"--target-url",
|
||
required=True,
|
||
help="PostgreSQL URL to current DB (alembic upgrade head).",
|
||
)
|
||
p.add_argument(
|
||
"--on-conflict",
|
||
choices=("upsert", "skip"),
|
||
default="upsert",
|
||
help="upsert: overwrite same id; skip: keep existing rows",
|
||
)
|
||
p.add_argument(
|
||
"--phone-conflict",
|
||
choices=("skip", "replace_target"),
|
||
default="replace_target",
|
||
help="同号不同 id:replace_target=先删目标占号用户再迁入;skip=跳过该 legacy 用户",
|
||
)
|
||
p.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Connect and print row counts only; no writes.",
|
||
)
|
||
p.add_argument(
|
||
"--db-host",
|
||
metavar="HOST",
|
||
default=None,
|
||
help=(
|
||
"Replace hostname in both URLs (e.g. 127.0.0.1 when running on the host "
|
||
"while Postgres is published from Docker; keeps user/password/port/dbname)."
|
||
),
|
||
)
|
||
args = p.parse_args()
|
||
on_conflict: OnConflict = args.on_conflict # type: ignore[assignment]
|
||
phone_conflict: PhoneConflictMode = args.phone_conflict # type: ignore[assignment]
|
||
|
||
legacy_url = args.legacy_url
|
||
target_url = args.target_url
|
||
if args.db_host:
|
||
legacy_url = _replace_url_host(legacy_url, args.db_host)
|
||
target_url = _replace_url_host(target_url, args.db_host)
|
||
logger.info("using db host %s (from --db-host)", args.db_host)
|
||
|
||
legacy = _open(legacy_url)
|
||
target = _open(target_url)
|
||
try:
|
||
if args.dry_run:
|
||
with legacy.cursor(row_factory=dict_row) as cur:
|
||
for t in (
|
||
"users",
|
||
"books",
|
||
"chapters",
|
||
"conversations",
|
||
"segments",
|
||
"memoir_states",
|
||
"orders",
|
||
"refresh_tokens",
|
||
"sms_verification_codes",
|
||
):
|
||
cur.execute(f"SELECT COUNT(*) AS c FROM {t}")
|
||
c = cur.fetchone()["c"]
|
||
logger.info("legacy %s rows=%s", t, c)
|
||
logger.info("dry-run done")
|
||
return
|
||
|
||
skip_users = _phone_conflict_legacy_skips(legacy, target, phone_conflict)
|
||
if skip_users:
|
||
logger.info(
|
||
"skip %d legacy users due to phone conflict mode=skip",
|
||
len(skip_users),
|
||
)
|
||
|
||
n_users = migrate_users(legacy, target, on_conflict, skip_users)
|
||
n_books = migrate_books(legacy, target, on_conflict, skip_users)
|
||
n_memoir = migrate_memoir_states(legacy, target, on_conflict, skip_users)
|
||
n_conv = migrate_conversations(legacy, target, on_conflict, skip_users)
|
||
n_seg = migrate_segments(legacy, target, on_conflict, skip_users)
|
||
n_ord = migrate_orders(legacy, target, on_conflict, skip_users)
|
||
n_rt = migrate_refresh_tokens(legacy, target, on_conflict, skip_users)
|
||
n_sms = migrate_sms(legacy, target, on_conflict)
|
||
n_ch, n_img = migrate_chapters_and_images(
|
||
legacy, target, on_conflict, skip_users
|
||
)
|
||
target.commit()
|
||
|
||
logger.info(
|
||
"migration committed: users=%s books=%s memoir_states=%s "
|
||
"conversations=%s segments=%s orders=%s refresh_tokens=%s "
|
||
"sms=%s chapters=%s memoir_images=%s",
|
||
n_users,
|
||
n_books,
|
||
n_memoir,
|
||
n_conv,
|
||
n_seg,
|
||
n_ord,
|
||
n_rt,
|
||
n_sms,
|
||
n_ch,
|
||
n_img,
|
||
)
|
||
except Exception:
|
||
target.rollback()
|
||
logger.exception("migration failed; target rolled back")
|
||
raise
|
||
finally:
|
||
legacy.close()
|
||
target.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|