Files
life-echo/api/scripts/migrate_legacy_to_current.py
Kevin a3f61fcc0f feat(api+app): 对话阶段化、回忆录流水线与客户端会话体验
- DB: segments 用户输入文本(Alembic 0002)
- Chat: 阶段检测/阶段提示/回复限制,编排与访谈/画像 prompts 调整
- Memoir: 忠实度检查 agent,叙事与分类等链路更新
- Core: agent 日志、Alembic 启动、LangChain/日志/配置等
- Story: time_hints;Memory 检索与相关测试
- Expo: 助手头像、会话页与消息拆分、实时会话与文案/i18n
- Docs/scripts/tests: 迁移脚本、LLM JSON/记忆检索文档、新增单测
2026-03-26 12:13:36 +08:00

752 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
将「旧版 pg_dump 全量 SQL」恢复到的数据库中的数据迁移到当前 Alembic schema 的目标库。
旧 schema 来源:仅含 books / chapters / conversations / memoir_states / orders /
refresh_tokens / segments / sms_verification_codes / users见仓库内历史备份
用法(推荐):
1) 在 Postgres 上单独建一个库,只用于承载旧备份(表结构保持 dump 原样)::
createdb life_echo_legacy
psql -d life_echo_legacy -f api/backups/life_echo_20260313_182756.sql
2) 目标库已执行 ``alembic upgrade head``(含 pgvector 与当前 ORM 表)。
3) 运行::
cd api && uv run python scripts/migrate_legacy_to_current.py \\
--legacy-url postgresql://postgres:postgres@localhost:5432/life_echo_legacy \\
--target-url postgresql://postgres:postgres@localhost:5432/life_echo
说明:
- 不会创建 stories / memory_* / conversation_messages 等旧库中不存在的表数据;
- chapterscontent → canonical_markdown按 user_id 关联该用户唯一 book 填 book_id若无书则为 NULL
- segmentstranscript_text → user_input_text新列 audio_duration_seconds / tts_audio_urls 置 NULL
- conversations补 last_message_at / deleted_at均为 NULL
- 若 images JSON 非空且可解析为 URL 列表,会写入 memoir_imagesorder_index 递增)。
冲突策略:默认对主键 id 做 UPSERT旧数据覆盖目标同 id 行)。可用 --on-conflict skip 跳过已存在主键。
若目标库已有用户且手机号与某条 legacy 用户冲突(同号不同 id会自动跳过该 legacy 用户及其 books/chapters/
conversations 等关联行,避免违反 ``users.phone`` 唯一约束。新生产库一般为空库,不会触发。
"""
from __future__ import annotations
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Literal
_ROOT = Path(__file__).resolve().parents[1]
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from psycopg import Connection, connect
from psycopg.rows import dict_row
from psycopg.types.json import Json
from app.core.logging import get_logger
logger = get_logger(__name__)
OnConflict = Literal["upsert", "skip"]
def _open(url: str) -> Connection:
return connect(url, autocommit=False)
def _legacy_user_ids_skipped_for_phone(
legacy: Connection, target: Connection
) -> set[str]:
"""目标库已占用某手机号且 id 与 legacy 不一致时,不能插入该 legacy 用户。"""
with target.cursor() as cur:
cur.execute("SELECT phone, id FROM users")
phone_owner = {row[0]: row[1] for row in cur.fetchall()}
skipped: set[str] = set()
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT id, phone FROM users")
for r in cur.fetchall():
owner = phone_owner.get(r["phone"])
if owner is not None and owner != r["id"]:
skipped.add(r["id"])
logger.warning(
"skip legacy user {} phone={} (target user id={})",
r["id"],
r["phone"],
owner,
)
return skipped
def migrate_users(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM users ORDER BY id")
rows = [r for r in cur.fetchall() if r["id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
phone = EXCLUDED.phone,
password_hash = EXCLUDED.password_hash,
email = EXCLUDED.email,
openid = EXCLUDED.openid,
nickname = EXCLUDED.nickname,
avatar_url = EXCLUDED.avatar_url,
subscription_type = EXCLUDED.subscription_type,
subscription_expires_at = EXCLUDED.subscription_expires_at,
created_at = EXCLUDED.created_at,
birth_year = EXCLUDED.birth_year,
birth_place = EXCLUDED.birth_place,
grew_up_place = EXCLUDED.grew_up_place,
occupation = EXCLUDED.occupation"""
)
sql = f"""
INSERT INTO users (
id, phone, password_hash, email, openid, nickname, avatar_url,
subscription_type, subscription_expires_at, created_at,
birth_year, birth_place, grew_up_place, occupation
) VALUES (
%(id)s, %(phone)s, %(password_hash)s, %(email)s, %(openid)s, %(nickname)s,
%(avatar_url)s, %(subscription_type)s, %(subscription_expires_at)s, %(created_at)s,
%(birth_year)s, %(birth_place)s, %(grew_up_place)s, %(occupation)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_books(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM books ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
title = EXCLUDED.title,
total_pages = EXCLUDED.total_pages,
total_words = EXCLUDED.total_words,
cover_image_url = EXCLUDED.cover_image_url,
updated_at = EXCLUDED.updated_at,
has_update = EXCLUDED.has_update,
last_update_chapter_id = EXCLUDED.last_update_chapter_id"""
)
sql = f"""
INSERT INTO books (
id, user_id, title, total_pages, total_words, cover_image_url,
updated_at, has_update, last_update_chapter_id
) VALUES (
%(id)s, %(user_id)s, %(title)s, %(total_pages)s, %(total_words)s,
%(cover_image_url)s, %(updated_at)s, %(has_update)s, %(last_update_chapter_id)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_memoir_states(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM memoir_states ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
stage_order = EXCLUDED.stage_order,
current_stage = EXCLUDED.current_stage,
covered_stages = EXCLUDED.covered_stages,
slots = EXCLUDED.slots,
updated_at = EXCLUDED.updated_at"""
)
sql = f"""
INSERT INTO memoir_states (
id, user_id, stage_order, current_stage, covered_stages, slots, updated_at
) VALUES (
%(id)s, %(user_id)s, %(stage_order)s, %(current_stage)s,
%(covered_stages)s, %(slots)s, %(updated_at)s
) {conflict}
"""
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"stage_order": Json(r["stage_order"])
if r.get("stage_order") is not None
else None,
"current_stage": r.get("current_stage"),
"covered_stages": Json(r["covered_stages"])
if r.get("covered_stages") is not None
else None,
"slots": Json(r["slots"]),
"updated_at": r.get("updated_at"),
}
)
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_conversations(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM conversations ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
started_at = EXCLUDED.started_at,
last_message_at = EXCLUDED.last_message_at,
ended_at = EXCLUDED.ended_at,
duration_seconds = EXCLUDED.duration_seconds,
summary = EXCLUDED.summary,
status = EXCLUDED.status,
current_topic = EXCLUDED.current_topic,
conversation_stage = EXCLUDED.conversation_stage,
deleted_at = EXCLUDED.deleted_at"""
)
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"started_at": r["started_at"],
"last_message_at": None,
"ended_at": r["ended_at"],
"duration_seconds": r.get("duration_seconds") or 0,
"summary": r["summary"],
"status": r.get("status") or "active",
"current_topic": r["current_topic"],
"conversation_stage": r["conversation_stage"],
"deleted_at": None,
}
)
sql = f"""
INSERT INTO conversations (
id, user_id, started_at, last_message_at, ended_at, duration_seconds,
summary, status, current_topic, conversation_stage, deleted_at
) VALUES (
%(id)s, %(user_id)s, %(started_at)s, %(last_message_at)s, %(ended_at)s,
%(duration_seconds)s, %(summary)s, %(status)s, %(current_topic)s,
%(conversation_stage)s, %(deleted_at)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_segments(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT id, user_id FROM conversations")
conv_user = {r["id"]: r["user_id"] for r in cur.fetchall()}
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM segments ORDER BY id")
rows = []
for r in cur.fetchall():
uid = conv_user.get(r["conversation_id"])
if uid is None or uid in skip_user_ids:
continue
rows.append(r)
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
conversation_id = EXCLUDED.conversation_id,
audio_url = EXCLUDED.audio_url,
user_input_text = EXCLUDED.user_input_text,
audio_duration_seconds = EXCLUDED.audio_duration_seconds,
created_at = EXCLUDED.created_at,
processed = EXCLUDED.processed,
topic_category = EXCLUDED.topic_category,
agent_response = EXCLUDED.agent_response,
tts_audio_urls = EXCLUDED.tts_audio_urls"""
)
out: list[dict[str, Any]] = []
for r in rows:
text_val = r.get("transcript_text")
if text_val is None:
text_val = ""
out.append(
{
"id": r["id"],
"conversation_id": r["conversation_id"],
"audio_url": r["audio_url"],
"user_input_text": text_val,
"audio_duration_seconds": None,
"created_at": r["created_at"],
"processed": r.get("processed")
if r.get("processed") is not None
else False,
"topic_category": r["topic_category"],
"agent_response": r["agent_response"],
"tts_audio_urls": None,
}
)
sql = f"""
INSERT INTO segments (
id, conversation_id, audio_url, user_input_text, audio_duration_seconds,
created_at, processed, topic_category, agent_response, tts_audio_urls
) VALUES (
%(id)s, %(conversation_id)s, %(audio_url)s, %(user_input_text)s,
%(audio_duration_seconds)s, %(created_at)s, %(processed)s, %(topic_category)s,
%(agent_response)s, %(tts_audio_urls)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_orders(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM orders ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
plan_id = EXCLUDED.plan_id,
plan_name = EXCLUDED.plan_name,
amount = EXCLUDED.amount,
currency = EXCLUDED.currency,
payment_method = EXCLUDED.payment_method,
status = EXCLUDED.status,
trade_no = EXCLUDED.trade_no,
paid_at = EXCLUDED.paid_at,
created_at = EXCLUDED.created_at,
expired_at = EXCLUDED.expired_at"""
)
sql = f"""
INSERT INTO orders (
id, user_id, plan_id, plan_name, amount, currency, payment_method,
status, trade_no, paid_at, created_at, expired_at
) VALUES (
%(id)s, %(user_id)s, %(plan_id)s, %(plan_name)s, %(amount)s, %(currency)s,
%(payment_method)s, %(status)s, %(trade_no)s, %(paid_at)s, %(created_at)s,
%(expired_at)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_refresh_tokens(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM refresh_tokens ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
token = EXCLUDED.token,
expires_at = EXCLUDED.expires_at,
created_at = EXCLUDED.created_at,
is_revoked = EXCLUDED.is_revoked,
device_info = EXCLUDED.device_info"""
)
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"token": r["token"],
"expires_at": r["expires_at"],
"created_at": r["created_at"],
"is_revoked": r.get("is_revoked")
if r.get("is_revoked") is not None
else False,
"device_info": r.get("device_info"),
}
)
sql = f"""
INSERT INTO refresh_tokens (
id, user_id, token, expires_at, created_at, is_revoked, device_info
) VALUES (
%(id)s, %(user_id)s, %(token)s, %(expires_at)s, %(created_at)s,
%(is_revoked)s, %(device_info)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_sms(legacy: Connection, target: Connection, on_conflict: OnConflict) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM sms_verification_codes ORDER BY id")
rows = cur.fetchall()
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
phone = EXCLUDED.phone,
code = EXCLUDED.code,
purpose = EXCLUDED.purpose,
is_used = EXCLUDED.is_used,
is_expired = EXCLUDED.is_expired,
expires_at = EXCLUDED.expires_at,
created_at = EXCLUDED.created_at,
verified_at = EXCLUDED.verified_at,
ip_address = EXCLUDED.ip_address"""
)
sql = f"""
INSERT INTO sms_verification_codes (
id, phone, code, purpose, is_used, is_expired, expires_at,
created_at, verified_at, ip_address
) VALUES (
%(id)s, %(phone)s, %(code)s, %(purpose)s, %(is_used)s, %(is_expired)s,
%(expires_at)s, %(created_at)s, %(verified_at)s, %(ip_address)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def _book_id_for_user(target: Connection, user_id: str) -> str | None:
with target.cursor() as cur:
cur.execute(
"""
SELECT id FROM books
WHERE user_id = %s
ORDER BY updated_at DESC NULLS LAST
LIMIT 1
""",
(user_id,),
)
row = cur.fetchone()
return row[0] if row else None
def _parse_images_for_memoir(images_val: Any) -> list[str]:
if images_val is None:
return []
if isinstance(images_val, str):
try:
images_val = json.loads(images_val)
except json.JSONDecodeError:
return []
if not isinstance(images_val, list):
return []
urls: list[str] = []
for item in images_val:
if isinstance(item, str) and item.startswith(("http://", "https://")):
urls.append(item)
elif isinstance(item, dict):
u = item.get("url") or item.get("image_url") or item.get("src")
if isinstance(u, str) and u.startswith(("http://", "https://")):
urls.append(u)
return urls
def migrate_chapters_and_images(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> tuple[int, int]:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM chapters ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0, 0
ch_conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
book_id = EXCLUDED.book_id,
title = EXCLUDED.title,
category = EXCLUDED.category,
order_index = EXCLUDED.order_index,
summary = EXCLUDED.summary,
canonical_markdown = EXCLUDED.canonical_markdown,
status = EXCLUDED.status,
cover_asset_id = EXCLUDED.cover_asset_id,
current_version_id = EXCLUDED.current_version_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
is_new = EXCLUDED.is_new,
is_active = EXCLUDED.is_active,
source_segments = EXCLUDED.source_segments,
markdown_compose_dirty = EXCLUDED.markdown_compose_dirty,
markdown_composed_at = EXCLUDED.markdown_composed_at,
reading_segments_json = EXCLUDED.reading_segments_json"""
)
insert_ch = f"""
INSERT INTO chapters (
id, user_id, book_id, title, category, order_index, summary,
canonical_markdown, status, cover_asset_id, current_version_id,
created_at, updated_at, is_new, is_active, source_segments,
markdown_compose_dirty, markdown_composed_at, reading_segments_json
) VALUES (
%(id)s, %(user_id)s, %(book_id)s, %(title)s, %(category)s, %(order_index)s,
%(summary)s, %(canonical_markdown)s, %(status)s, %(cover_asset_id)s,
%(current_version_id)s, %(created_at)s, %(updated_at)s, %(is_new)s,
%(is_active)s, %(source_segments)s, %(markdown_compose_dirty)s,
%(markdown_composed_at)s, %(reading_segments_json)s
) {ch_conflict}
"""
img_inserted = 0
chapter_payloads: list[dict[str, Any]] = []
for r in rows:
uid = r["user_id"]
bid = _book_id_for_user(target, uid)
ts = r.get("updated_at")
chapter_payloads.append(
{
"id": r["id"],
"user_id": uid,
"book_id": bid,
"title": r["title"],
"category": r.get("category"),
"order_index": r["order_index"],
"summary": None,
"canonical_markdown": r.get("content"),
"status": r.get("status") or "draft",
"cover_asset_id": None,
"current_version_id": None,
"created_at": ts,
"updated_at": ts,
"is_new": r.get("is_new") if r.get("is_new") is not None else True,
"is_active": r.get("is_active")
if r.get("is_active") is not None
else True,
"source_segments": Json(r["source_segments"])
if r.get("source_segments") is not None
else None,
"markdown_compose_dirty": False,
"markdown_composed_at": None,
"reading_segments_json": None,
}
)
with target.cursor() as cur:
cur.executemany(insert_ch, chapter_payloads)
if on_conflict == "skip":
img_conflict = "ON CONFLICT (id) DO NOTHING"
else:
img_conflict = """ON CONFLICT (id) DO UPDATE SET
chapter_id = EXCLUDED.chapter_id,
order_index = EXCLUDED.order_index,
url = EXCLUDED.url,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at"""
insert_img = f"""
INSERT INTO memoir_images (
id, chapter_id, order_index, placeholder, description, status,
prompt, url, storage_key, provider, style, size, error,
retryable, created_at, updated_at
) VALUES (
%(id)s, %(chapter_id)s, %(order_index)s, %(placeholder)s,
%(description)s, %(status)s, %(prompt)s, %(url)s, %(storage_key)s,
%(provider)s, %(style)s, %(size)s, %(error)s, %(retryable)s,
%(created_at)s, %(updated_at)s
) {img_conflict}
"""
for r in rows:
urls = _parse_images_for_memoir(r.get("images"))
if not urls:
continue
for i, url in enumerate(urls):
mid = str(
uuid.uuid5(
uuid.NAMESPACE_URL,
f"memoir_image:{r['id']}:{i}:{url}",
)
)
cur.execute(
insert_img,
{
"id": mid,
"chapter_id": r["id"],
"order_index": i,
"placeholder": None,
"description": None,
"status": "completed",
"prompt": None,
"url": url,
"storage_key": None,
"provider": None,
"style": None,
"size": None,
"error": None,
"retryable": None,
"created_at": r.get("updated_at"),
"updated_at": r.get("updated_at"),
},
)
img_inserted += 1
return len(chapter_payloads), img_inserted
def main() -> None:
p = argparse.ArgumentParser(
description="Migrate legacy Life Echo DB into current schema."
)
p.add_argument(
"--legacy-url",
required=True,
help="PostgreSQL URL to DB restored from old pg_dump (old schema only).",
)
p.add_argument(
"--target-url",
required=True,
help="PostgreSQL URL to current DB (alembic upgrade head).",
)
p.add_argument(
"--on-conflict",
choices=("upsert", "skip"),
default="upsert",
help="upsert: overwrite same id; skip: keep existing rows",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Connect and print row counts only; no writes.",
)
args = p.parse_args()
on_conflict: OnConflict = args.on_conflict # type: ignore[assignment]
legacy = _open(args.legacy_url)
target = _open(args.target_url)
try:
if args.dry_run:
with legacy.cursor(row_factory=dict_row) as cur:
for t in (
"users",
"books",
"chapters",
"conversations",
"segments",
"memoir_states",
"orders",
"refresh_tokens",
"sms_verification_codes",
):
cur.execute(f"SELECT COUNT(*) AS c FROM {t}")
c = cur.fetchone()["c"]
logger.info("legacy {} rows={}", t, c)
logger.info("dry-run done")
return
skip_users = _legacy_user_ids_skipped_for_phone(legacy, target)
if skip_users:
logger.info(
"skip {} legacy users due to phone already owned in target",
len(skip_users),
)
n_users = migrate_users(legacy, target, on_conflict, skip_users)
n_books = migrate_books(legacy, target, on_conflict, skip_users)
n_memoir = migrate_memoir_states(legacy, target, on_conflict, skip_users)
n_conv = migrate_conversations(legacy, target, on_conflict, skip_users)
n_seg = migrate_segments(legacy, target, on_conflict, skip_users)
n_ord = migrate_orders(legacy, target, on_conflict, skip_users)
n_rt = migrate_refresh_tokens(legacy, target, on_conflict, skip_users)
n_sms = migrate_sms(legacy, target, on_conflict)
n_ch, n_img = migrate_chapters_and_images(
legacy, target, on_conflict, skip_users
)
target.commit()
logger.info(
"migration committed: users={} books={} memoir_states={} "
"conversations={} segments={} orders={} refresh_tokens={} "
"sms={} chapters={} memoir_images={}",
n_users,
n_books,
n_memoir,
n_conv,
n_seg,
n_ord,
n_rt,
n_sms,
n_ch,
n_img,
)
except Exception:
target.rollback()
logger.exception("migration failed; target rolled back")
raise
finally:
legacy.close()
target.close()
if __name__ == "__main__":
main()