#!/usr/bin/env python3 """ 将「旧版 pg_dump 全量 SQL」恢复到的数据库中的数据,迁移到当前 Alembic schema 的目标库。 旧 schema 来源:仅含 books / chapters / conversations / memoir_states / orders / refresh_tokens / segments / sms_verification_codes / users(见仓库内历史备份)。 用法(推荐): 1) 在 Postgres 上单独建一个库,只用于承载旧备份(表结构保持 dump 原样):: createdb life_echo_legacy psql -d life_echo_legacy -f api/backups/life_echo_20260313_182756.sql 2) **目标库**须与线上一致:已跑完当前仓库全部 Alembic 迁移(``alembic upgrade head``), 含 pgvector 与 ORM 所建全部表。线上/预发库均为该 schema;本脚本及 ``_purge_target_user`` 的 DELETE 顺序按当前表结构编写,若将来迁移新增表或外键,需同步更新删除逻辑。 3) 运行(仓库内可用 ``uv run python scripts/...``):: python3 migrate_legacy_to_current.py \\ --legacy-url postgresql://postgres:postgres@localhost:5432/life_echo_legacy \\ --target-url postgresql://postgres:postgres@localhost:5432/life_echo **仅服务器 + psycopg**:本脚本不依赖项目内其它包,可复制单文件到机器上执行:: pip install 'psycopg[binary]' # 或 python3 -m pip install --user 'psycopg[binary]' python3 migrate_legacy_to_current.py --legacy-url ... --target-url ... 说明: - 不会创建 stories / memory_* / conversation_messages 等旧库中不存在的表数据; - chapters:content → canonical_markdown;按 user_id 关联该用户唯一 book 填 book_id(若无书则为 NULL); - segments:transcript_text → user_input_text;新列 audio_duration_seconds / tts_audio_urls 置 NULL; - conversations:补 last_message_at / deleted_at(均为 NULL); - 若 images JSON 非空且可解析为 URL 列表,会写入 memoir_images(order_index 递增)。 冲突策略:默认对主键 id 做 UPSERT(旧数据覆盖目标同 id 行)。可用 --on-conflict skip 跳过已存在主键。 若目标库已有用户且手机号与某条 legacy 用户冲突(同号不同 id),由 ``--phone-conflict`` 控制: - ``replace_target``(默认):先按与 ``purge_user_related_rows`` 相同顺序删除目标侧占号用户及其业务数据,再迁入 legacy(前提:目标库已是 Alembic head 的完整 schema); - ``skip``:跳过该 legacy 用户及其关联行(旧行为)。 **宿主机上跑脚本(数据库在 Docker Compose 里)**:`.env` 里常见主机名 `postgres`,在容器外无法解析。 可直接把 URL 写成 `...@127.0.0.1:5432/...`,或使用 `--db-host 127.0.0.1` 自动替换两个 URL 中的主机名(端口不变)。 """ from __future__ import annotations import argparse import json import logging import uuid from typing import Any, Literal from urllib.parse import urlparse, urlunparse from psycopg import Connection, connect from psycopg.rows import dict_row from psycopg.types.json import Json logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") logger = logging.getLogger(__name__) OnConflict = Literal["upsert", "skip"] PhoneConflictMode = Literal["skip", "replace_target"] def _replace_url_host(url: str, new_host: str) -> str: """将 postgresql URL 中的主机名替换为 new_host(保留用户、密码、端口、库名)。""" u = urlparse(url) if not u.netloc or "@" not in u.netloc: return url auth, hostport = u.netloc.rsplit("@", 1) if ":" in hostport: _old_host, port = hostport.split(":", 1) new_netloc = f"{auth}@{new_host}:{port}" else: new_netloc = f"{auth}@{new_host}" return urlunparse((u.scheme, new_netloc, u.path, u.params, u.query, u.fragment)) def _open(url: str) -> Connection: return connect(url, autocommit=False) def _purge_target_user(conn: Connection, user_id: str) -> None: """ 删除目标库中某用户及其业务数据,顺序与 app.features.user.repo.purge_user_related_rows 一致, 以便随后插入 legacy 用户行而不违反 users.phone 唯一约束。 假定目标库 schema 与 ``alembic upgrade head`` 一致(线上库常态);新增迁移若引入指向 ``users`` 的表或外键,须在此补充 DELETE。 """ uid = user_id with conn.cursor() as cur: cur.execute("DELETE FROM memory_facts WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM memory_chunks WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM memory_sources WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM memory_summaries WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM timeline_events WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM memory_curation_actions WHERE user_id = %s", (uid,)) cur.execute( """ SELECT sii.asset_id FROM story_image_intents sii INNER JOIN stories s ON s.id = sii.story_id WHERE s.user_id = %s AND sii.asset_id IS NOT NULL """, (uid,), ) asset_ids = [row[0] for row in cur.fetchall()] cur.execute( """ SELECT cci.asset_id FROM chapter_cover_intents cci INNER JOIN chapters c ON c.id = cci.chapter_id WHERE c.user_id = %s AND cci.asset_id IS NOT NULL """, (uid,), ) asset_ids.extend(row[0] for row in cur.fetchall()) seen_assets = {a for a in asset_ids if a} if seen_assets: ids = list(seen_assets) ph = ",".join(["%s"] * len(ids)) cur.execute(f"DELETE FROM assets WHERE id IN ({ph})", ids) cur.execute("DELETE FROM stories WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM chapters WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM books WHERE user_id = %s", (uid,)) cur.execute( """ DELETE FROM conversation_messages WHERE conversation_id IN ( SELECT id FROM conversations WHERE user_id = %s ) """, (uid,), ) cur.execute( """ DELETE FROM segments WHERE conversation_id IN ( SELECT id FROM conversations WHERE user_id = %s ) """, (uid,), ) cur.execute("DELETE FROM conversations WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM memoir_states WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM orders WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM refresh_tokens WHERE user_id = %s", (uid,)) cur.execute("DELETE FROM users WHERE id = %s", (uid,)) def _phone_conflict_legacy_skips( legacy: Connection, target: Connection, mode: PhoneConflictMode, ) -> set[str]: """同号不同 id:skip 则跳过 legacy 用户;replace_target 则先删目标占号用户再迁入。""" with target.cursor() as cur: cur.execute("SELECT phone, id FROM users") phone_owner: dict[str, str] = {row[0]: row[1] for row in cur.fetchall()} skipped: set[str] = set() with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT id, phone FROM users ORDER BY id") rows = cur.fetchall() for r in rows: legacy_id = r["id"] phone = r["phone"] owner = phone_owner.get(phone) if owner is None or owner == legacy_id: continue if mode == "skip": skipped.add(legacy_id) logger.warning( "skip legacy user %s phone=%s (target user id=%s)", legacy_id, phone, owner, ) continue logger.warning( "phone conflict: purging target user %s (phone=%s) then migrating legacy user %s", owner, phone, legacy_id, ) _purge_target_user(target, owner) phone_owner = {p: uid for p, uid in phone_owner.items() if uid != owner} return skipped def migrate_users( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM users ORDER BY id") rows = [r for r in cur.fetchall() if r["id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET phone = EXCLUDED.phone, password_hash = EXCLUDED.password_hash, email = EXCLUDED.email, openid = EXCLUDED.openid, nickname = EXCLUDED.nickname, avatar_url = EXCLUDED.avatar_url, subscription_type = EXCLUDED.subscription_type, subscription_expires_at = EXCLUDED.subscription_expires_at, created_at = EXCLUDED.created_at, birth_year = EXCLUDED.birth_year, birth_place = EXCLUDED.birth_place, grew_up_place = EXCLUDED.grew_up_place, occupation = EXCLUDED.occupation""" ) sql = f""" INSERT INTO users ( id, phone, password_hash, email, openid, nickname, avatar_url, subscription_type, subscription_expires_at, created_at, birth_year, birth_place, grew_up_place, occupation ) VALUES ( %(id)s, %(phone)s, %(password_hash)s, %(email)s, %(openid)s, %(nickname)s, %(avatar_url)s, %(subscription_type)s, %(subscription_expires_at)s, %(created_at)s, %(birth_year)s, %(birth_place)s, %(grew_up_place)s, %(occupation)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, rows) return len(rows) def migrate_books( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM books ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, title = EXCLUDED.title, total_pages = EXCLUDED.total_pages, total_words = EXCLUDED.total_words, cover_image_url = EXCLUDED.cover_image_url, updated_at = EXCLUDED.updated_at, has_update = EXCLUDED.has_update, last_update_chapter_id = EXCLUDED.last_update_chapter_id""" ) sql = f""" INSERT INTO books ( id, user_id, title, total_pages, total_words, cover_image_url, updated_at, has_update, last_update_chapter_id ) VALUES ( %(id)s, %(user_id)s, %(title)s, %(total_pages)s, %(total_words)s, %(cover_image_url)s, %(updated_at)s, %(has_update)s, %(last_update_chapter_id)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, rows) return len(rows) def migrate_memoir_states( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM memoir_states ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, stage_order = EXCLUDED.stage_order, current_stage = EXCLUDED.current_stage, covered_stages = EXCLUDED.covered_stages, slots = EXCLUDED.slots, updated_at = EXCLUDED.updated_at""" ) sql = f""" INSERT INTO memoir_states ( id, user_id, stage_order, current_stage, covered_stages, slots, updated_at ) VALUES ( %(id)s, %(user_id)s, %(stage_order)s, %(current_stage)s, %(covered_stages)s, %(slots)s, %(updated_at)s ) {conflict} """ out: list[dict[str, Any]] = [] for r in rows: out.append( { "id": r["id"], "user_id": r["user_id"], "stage_order": Json(r["stage_order"]) if r.get("stage_order") is not None else None, "current_stage": r.get("current_stage"), "covered_stages": Json(r["covered_stages"]) if r.get("covered_stages") is not None else None, "slots": Json(r["slots"]), "updated_at": r.get("updated_at"), } ) with target.cursor() as cur: cur.executemany(sql, out) return len(out) def migrate_conversations( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM conversations ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, started_at = EXCLUDED.started_at, last_message_at = EXCLUDED.last_message_at, ended_at = EXCLUDED.ended_at, duration_seconds = EXCLUDED.duration_seconds, summary = EXCLUDED.summary, status = EXCLUDED.status, current_topic = EXCLUDED.current_topic, conversation_stage = EXCLUDED.conversation_stage, deleted_at = EXCLUDED.deleted_at""" ) out: list[dict[str, Any]] = [] for r in rows: out.append( { "id": r["id"], "user_id": r["user_id"], "started_at": r["started_at"], "last_message_at": None, "ended_at": r["ended_at"], "duration_seconds": r.get("duration_seconds") or 0, "summary": r["summary"], "status": r.get("status") or "active", "current_topic": r["current_topic"], "conversation_stage": r["conversation_stage"], "deleted_at": None, } ) sql = f""" INSERT INTO conversations ( id, user_id, started_at, last_message_at, ended_at, duration_seconds, summary, status, current_topic, conversation_stage, deleted_at ) VALUES ( %(id)s, %(user_id)s, %(started_at)s, %(last_message_at)s, %(ended_at)s, %(duration_seconds)s, %(summary)s, %(status)s, %(current_topic)s, %(conversation_stage)s, %(deleted_at)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, out) return len(out) def migrate_segments( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT id, user_id FROM conversations") conv_user = {r["id"]: r["user_id"] for r in cur.fetchall()} with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM segments ORDER BY id") rows = [] for r in cur.fetchall(): uid = conv_user.get(r["conversation_id"]) if uid is None or uid in skip_user_ids: continue rows.append(r) if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET conversation_id = EXCLUDED.conversation_id, audio_url = EXCLUDED.audio_url, user_input_text = EXCLUDED.user_input_text, audio_duration_seconds = EXCLUDED.audio_duration_seconds, created_at = EXCLUDED.created_at, processed = EXCLUDED.processed, topic_category = EXCLUDED.topic_category, agent_response = EXCLUDED.agent_response, tts_audio_urls = EXCLUDED.tts_audio_urls""" ) out: list[dict[str, Any]] = [] for r in rows: text_val = r.get("transcript_text") if text_val is None: text_val = "" out.append( { "id": r["id"], "conversation_id": r["conversation_id"], "audio_url": r["audio_url"], "user_input_text": text_val, "audio_duration_seconds": None, "created_at": r["created_at"], "processed": r.get("processed") if r.get("processed") is not None else False, "topic_category": r["topic_category"], "agent_response": r["agent_response"], "tts_audio_urls": None, } ) sql = f""" INSERT INTO segments ( id, conversation_id, audio_url, user_input_text, audio_duration_seconds, created_at, processed, topic_category, agent_response, tts_audio_urls ) VALUES ( %(id)s, %(conversation_id)s, %(audio_url)s, %(user_input_text)s, %(audio_duration_seconds)s, %(created_at)s, %(processed)s, %(topic_category)s, %(agent_response)s, %(tts_audio_urls)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, out) return len(out) def migrate_orders( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM orders ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, plan_id = EXCLUDED.plan_id, plan_name = EXCLUDED.plan_name, amount = EXCLUDED.amount, currency = EXCLUDED.currency, payment_method = EXCLUDED.payment_method, status = EXCLUDED.status, trade_no = EXCLUDED.trade_no, paid_at = EXCLUDED.paid_at, created_at = EXCLUDED.created_at, expired_at = EXCLUDED.expired_at""" ) sql = f""" INSERT INTO orders ( id, user_id, plan_id, plan_name, amount, currency, payment_method, status, trade_no, paid_at, created_at, expired_at ) VALUES ( %(id)s, %(user_id)s, %(plan_id)s, %(plan_name)s, %(amount)s, %(currency)s, %(payment_method)s, %(status)s, %(trade_no)s, %(paid_at)s, %(created_at)s, %(expired_at)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, rows) return len(rows) def migrate_refresh_tokens( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM refresh_tokens ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, token = EXCLUDED.token, expires_at = EXCLUDED.expires_at, created_at = EXCLUDED.created_at, is_revoked = EXCLUDED.is_revoked, device_info = EXCLUDED.device_info""" ) out: list[dict[str, Any]] = [] for r in rows: out.append( { "id": r["id"], "user_id": r["user_id"], "token": r["token"], "expires_at": r["expires_at"], "created_at": r["created_at"], "is_revoked": r.get("is_revoked") if r.get("is_revoked") is not None else False, "device_info": r.get("device_info"), } ) sql = f""" INSERT INTO refresh_tokens ( id, user_id, token, expires_at, created_at, is_revoked, device_info ) VALUES ( %(id)s, %(user_id)s, %(token)s, %(expires_at)s, %(created_at)s, %(is_revoked)s, %(device_info)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, out) return len(out) def migrate_sms(legacy: Connection, target: Connection, on_conflict: OnConflict) -> int: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM sms_verification_codes ORDER BY id") rows = cur.fetchall() if not rows: return 0 conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET phone = EXCLUDED.phone, code = EXCLUDED.code, purpose = EXCLUDED.purpose, is_used = EXCLUDED.is_used, is_expired = EXCLUDED.is_expired, expires_at = EXCLUDED.expires_at, created_at = EXCLUDED.created_at, verified_at = EXCLUDED.verified_at, ip_address = EXCLUDED.ip_address""" ) sql = f""" INSERT INTO sms_verification_codes ( id, phone, code, purpose, is_used, is_expired, expires_at, created_at, verified_at, ip_address ) VALUES ( %(id)s, %(phone)s, %(code)s, %(purpose)s, %(is_used)s, %(is_expired)s, %(expires_at)s, %(created_at)s, %(verified_at)s, %(ip_address)s ) {conflict} """ with target.cursor() as cur: cur.executemany(sql, rows) return len(rows) def _book_id_for_user(target: Connection, user_id: str) -> str | None: with target.cursor() as cur: cur.execute( """ SELECT id FROM books WHERE user_id = %s ORDER BY updated_at DESC NULLS LAST LIMIT 1 """, (user_id,), ) row = cur.fetchone() return row[0] if row else None def _parse_images_for_memoir(images_val: Any) -> list[str]: if images_val is None: return [] if isinstance(images_val, str): try: images_val = json.loads(images_val) except json.JSONDecodeError: return [] if not isinstance(images_val, list): return [] urls: list[str] = [] for item in images_val: if isinstance(item, str) and item.startswith(("http://", "https://")): urls.append(item) elif isinstance(item, dict): u = item.get("url") or item.get("image_url") or item.get("src") if isinstance(u, str) and u.startswith(("http://", "https://")): urls.append(u) return urls def migrate_chapters_and_images( legacy: Connection, target: Connection, on_conflict: OnConflict, skip_user_ids: set[str], ) -> tuple[int, int]: with legacy.cursor(row_factory=dict_row) as cur: cur.execute("SELECT * FROM chapters ORDER BY id") rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids] if not rows: return 0, 0 ch_conflict = ( "ON CONFLICT (id) DO NOTHING" if on_conflict == "skip" else """ON CONFLICT (id) DO UPDATE SET user_id = EXCLUDED.user_id, book_id = EXCLUDED.book_id, title = EXCLUDED.title, category = EXCLUDED.category, order_index = EXCLUDED.order_index, summary = EXCLUDED.summary, canonical_markdown = EXCLUDED.canonical_markdown, status = EXCLUDED.status, cover_asset_id = EXCLUDED.cover_asset_id, current_version_id = EXCLUDED.current_version_id, created_at = EXCLUDED.created_at, updated_at = EXCLUDED.updated_at, is_new = EXCLUDED.is_new, is_active = EXCLUDED.is_active, source_segments = EXCLUDED.source_segments, markdown_compose_dirty = EXCLUDED.markdown_compose_dirty, markdown_composed_at = EXCLUDED.markdown_composed_at, reading_segments_json = EXCLUDED.reading_segments_json""" ) insert_ch = f""" INSERT INTO chapters ( id, user_id, book_id, title, category, order_index, summary, canonical_markdown, status, cover_asset_id, current_version_id, created_at, updated_at, is_new, is_active, source_segments, markdown_compose_dirty, markdown_composed_at, reading_segments_json ) VALUES ( %(id)s, %(user_id)s, %(book_id)s, %(title)s, %(category)s, %(order_index)s, %(summary)s, %(canonical_markdown)s, %(status)s, %(cover_asset_id)s, %(current_version_id)s, %(created_at)s, %(updated_at)s, %(is_new)s, %(is_active)s, %(source_segments)s, %(markdown_compose_dirty)s, %(markdown_composed_at)s, %(reading_segments_json)s ) {ch_conflict} """ img_inserted = 0 chapter_payloads: list[dict[str, Any]] = [] for r in rows: uid = r["user_id"] bid = _book_id_for_user(target, uid) ts = r.get("updated_at") chapter_payloads.append( { "id": r["id"], "user_id": uid, "book_id": bid, "title": r["title"], "category": r.get("category"), "order_index": r["order_index"], "summary": None, "canonical_markdown": r.get("content"), "status": r.get("status") or "draft", "cover_asset_id": None, "current_version_id": None, "created_at": ts, "updated_at": ts, "is_new": r.get("is_new") if r.get("is_new") is not None else True, "is_active": r.get("is_active") if r.get("is_active") is not None else True, "source_segments": Json(r["source_segments"]) if r.get("source_segments") is not None else None, "markdown_compose_dirty": False, "markdown_composed_at": None, "reading_segments_json": None, } ) with target.cursor() as cur: cur.executemany(insert_ch, chapter_payloads) if on_conflict == "skip": img_conflict = "ON CONFLICT (id) DO NOTHING" else: img_conflict = """ON CONFLICT (id) DO UPDATE SET chapter_id = EXCLUDED.chapter_id, order_index = EXCLUDED.order_index, url = EXCLUDED.url, status = EXCLUDED.status, updated_at = EXCLUDED.updated_at""" insert_img = f""" INSERT INTO memoir_images ( id, chapter_id, order_index, placeholder, description, status, prompt, url, storage_key, provider, style, size, error, retryable, created_at, updated_at ) VALUES ( %(id)s, %(chapter_id)s, %(order_index)s, %(placeholder)s, %(description)s, %(status)s, %(prompt)s, %(url)s, %(storage_key)s, %(provider)s, %(style)s, %(size)s, %(error)s, %(retryable)s, %(created_at)s, %(updated_at)s ) {img_conflict} """ for r in rows: urls = _parse_images_for_memoir(r.get("images")) if not urls: continue for i, url in enumerate(urls): mid = str( uuid.uuid5( uuid.NAMESPACE_URL, f"memoir_image:{r['id']}:{i}:{url}", ) ) cur.execute( insert_img, { "id": mid, "chapter_id": r["id"], "order_index": i, "placeholder": None, "description": None, "status": "completed", "prompt": None, "url": url, "storage_key": None, "provider": None, "style": None, "size": None, "error": None, "retryable": None, "created_at": r.get("updated_at"), "updated_at": r.get("updated_at"), }, ) img_inserted += 1 return len(chapter_payloads), img_inserted def main() -> None: p = argparse.ArgumentParser( description="Migrate legacy Life Echo DB into current schema." ) p.add_argument( "--legacy-url", required=True, help="PostgreSQL URL to DB restored from old pg_dump (old schema only).", ) p.add_argument( "--target-url", required=True, help="PostgreSQL URL to current DB (alembic upgrade head).", ) p.add_argument( "--on-conflict", choices=("upsert", "skip"), default="upsert", help="upsert: overwrite same id; skip: keep existing rows", ) p.add_argument( "--phone-conflict", choices=("skip", "replace_target"), default="replace_target", help="同号不同 id:replace_target=先删目标占号用户再迁入;skip=跳过该 legacy 用户", ) p.add_argument( "--dry-run", action="store_true", help="Connect and print row counts only; no writes.", ) p.add_argument( "--db-host", metavar="HOST", default=None, help=( "Replace hostname in both URLs (e.g. 127.0.0.1 when running on the host " "while Postgres is published from Docker; keeps user/password/port/dbname)." ), ) args = p.parse_args() on_conflict: OnConflict = args.on_conflict # type: ignore[assignment] phone_conflict: PhoneConflictMode = args.phone_conflict # type: ignore[assignment] legacy_url = args.legacy_url target_url = args.target_url if args.db_host: legacy_url = _replace_url_host(legacy_url, args.db_host) target_url = _replace_url_host(target_url, args.db_host) logger.info("using db host %s (from --db-host)", args.db_host) legacy = _open(legacy_url) target = _open(target_url) try: if args.dry_run: with legacy.cursor(row_factory=dict_row) as cur: for t in ( "users", "books", "chapters", "conversations", "segments", "memoir_states", "orders", "refresh_tokens", "sms_verification_codes", ): cur.execute(f"SELECT COUNT(*) AS c FROM {t}") c = cur.fetchone()["c"] logger.info("legacy %s rows=%s", t, c) logger.info("dry-run done") return skip_users = _phone_conflict_legacy_skips(legacy, target, phone_conflict) if skip_users: logger.info( "skip %d legacy users due to phone conflict mode=skip", len(skip_users), ) n_users = migrate_users(legacy, target, on_conflict, skip_users) n_books = migrate_books(legacy, target, on_conflict, skip_users) n_memoir = migrate_memoir_states(legacy, target, on_conflict, skip_users) n_conv = migrate_conversations(legacy, target, on_conflict, skip_users) n_seg = migrate_segments(legacy, target, on_conflict, skip_users) n_ord = migrate_orders(legacy, target, on_conflict, skip_users) n_rt = migrate_refresh_tokens(legacy, target, on_conflict, skip_users) n_sms = migrate_sms(legacy, target, on_conflict) n_ch, n_img = migrate_chapters_and_images( legacy, target, on_conflict, skip_users ) target.commit() logger.info( "migration committed: users=%s books=%s memoir_states=%s " "conversations=%s segments=%s orders=%s refresh_tokens=%s " "sms=%s chapters=%s memoir_images=%s", n_users, n_books, n_memoir, n_conv, n_seg, n_ord, n_rt, n_sms, n_ch, n_img, ) except Exception: target.rollback() logger.exception("migration failed; target rolled back") raise finally: legacy.close() target.close() if __name__ == "__main__": main()