feat(api+app): 对话阶段化、回忆录流水线与客户端会话体验

- DB: segments 用户输入文本(Alembic 0002)
- Chat: 阶段检测/阶段提示/回复限制,编排与访谈/画像 prompts 调整
- Memoir: 忠实度检查 agent,叙事与分类等链路更新
- Core: agent 日志、Alembic 启动、LangChain/日志/配置等
- Story: time_hints;Memory 检索与相关测试
- Expo: 助手头像、会话页与消息拆分、实时会话与文案/i18n
- Docs/scripts/tests: 迁移脚本、LLM JSON/记忆检索文档、新增单测
This commit is contained in:
Kevin
2026-03-26 12:13:36 +08:00
parent 49b089354c
commit a3f61fcc0f
94 changed files with 3332 additions and 672 deletions

View File

@@ -0,0 +1,751 @@
#!/usr/bin/env python3
"""
将「旧版 pg_dump 全量 SQL」恢复到的数据库中的数据迁移到当前 Alembic schema 的目标库。
旧 schema 来源:仅含 books / chapters / conversations / memoir_states / orders /
refresh_tokens / segments / sms_verification_codes / users见仓库内历史备份
用法(推荐):
1) 在 Postgres 上单独建一个库,只用于承载旧备份(表结构保持 dump 原样)::
createdb life_echo_legacy
psql -d life_echo_legacy -f api/backups/life_echo_20260313_182756.sql
2) 目标库已执行 ``alembic upgrade head``(含 pgvector 与当前 ORM 表)。
3) 运行::
cd api && uv run python scripts/migrate_legacy_to_current.py \\
--legacy-url postgresql://postgres:postgres@localhost:5432/life_echo_legacy \\
--target-url postgresql://postgres:postgres@localhost:5432/life_echo
说明:
- 不会创建 stories / memory_* / conversation_messages 等旧库中不存在的表数据;
- chapterscontent → canonical_markdown按 user_id 关联该用户唯一 book 填 book_id若无书则为 NULL
- segmentstranscript_text → user_input_text新列 audio_duration_seconds / tts_audio_urls 置 NULL
- conversations补 last_message_at / deleted_at均为 NULL
- 若 images JSON 非空且可解析为 URL 列表,会写入 memoir_imagesorder_index 递增)。
冲突策略:默认对主键 id 做 UPSERT旧数据覆盖目标同 id 行)。可用 --on-conflict skip 跳过已存在主键。
若目标库已有用户且手机号与某条 legacy 用户冲突(同号不同 id会自动跳过该 legacy 用户及其 books/chapters/
conversations 等关联行,避免违反 ``users.phone`` 唯一约束。新生产库一般为空库,不会触发。
"""
from __future__ import annotations
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import Any, Literal
_ROOT = Path(__file__).resolve().parents[1]
if str(_ROOT) not in sys.path:
sys.path.insert(0, str(_ROOT))
from psycopg import Connection, connect
from psycopg.rows import dict_row
from psycopg.types.json import Json
from app.core.logging import get_logger
logger = get_logger(__name__)
OnConflict = Literal["upsert", "skip"]
def _open(url: str) -> Connection:
return connect(url, autocommit=False)
def _legacy_user_ids_skipped_for_phone(
legacy: Connection, target: Connection
) -> set[str]:
"""目标库已占用某手机号且 id 与 legacy 不一致时,不能插入该 legacy 用户。"""
with target.cursor() as cur:
cur.execute("SELECT phone, id FROM users")
phone_owner = {row[0]: row[1] for row in cur.fetchall()}
skipped: set[str] = set()
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT id, phone FROM users")
for r in cur.fetchall():
owner = phone_owner.get(r["phone"])
if owner is not None and owner != r["id"]:
skipped.add(r["id"])
logger.warning(
"skip legacy user {} phone={} (target user id={})",
r["id"],
r["phone"],
owner,
)
return skipped
def migrate_users(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM users ORDER BY id")
rows = [r for r in cur.fetchall() if r["id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
phone = EXCLUDED.phone,
password_hash = EXCLUDED.password_hash,
email = EXCLUDED.email,
openid = EXCLUDED.openid,
nickname = EXCLUDED.nickname,
avatar_url = EXCLUDED.avatar_url,
subscription_type = EXCLUDED.subscription_type,
subscription_expires_at = EXCLUDED.subscription_expires_at,
created_at = EXCLUDED.created_at,
birth_year = EXCLUDED.birth_year,
birth_place = EXCLUDED.birth_place,
grew_up_place = EXCLUDED.grew_up_place,
occupation = EXCLUDED.occupation"""
)
sql = f"""
INSERT INTO users (
id, phone, password_hash, email, openid, nickname, avatar_url,
subscription_type, subscription_expires_at, created_at,
birth_year, birth_place, grew_up_place, occupation
) VALUES (
%(id)s, %(phone)s, %(password_hash)s, %(email)s, %(openid)s, %(nickname)s,
%(avatar_url)s, %(subscription_type)s, %(subscription_expires_at)s, %(created_at)s,
%(birth_year)s, %(birth_place)s, %(grew_up_place)s, %(occupation)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_books(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM books ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
title = EXCLUDED.title,
total_pages = EXCLUDED.total_pages,
total_words = EXCLUDED.total_words,
cover_image_url = EXCLUDED.cover_image_url,
updated_at = EXCLUDED.updated_at,
has_update = EXCLUDED.has_update,
last_update_chapter_id = EXCLUDED.last_update_chapter_id"""
)
sql = f"""
INSERT INTO books (
id, user_id, title, total_pages, total_words, cover_image_url,
updated_at, has_update, last_update_chapter_id
) VALUES (
%(id)s, %(user_id)s, %(title)s, %(total_pages)s, %(total_words)s,
%(cover_image_url)s, %(updated_at)s, %(has_update)s, %(last_update_chapter_id)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_memoir_states(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM memoir_states ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
stage_order = EXCLUDED.stage_order,
current_stage = EXCLUDED.current_stage,
covered_stages = EXCLUDED.covered_stages,
slots = EXCLUDED.slots,
updated_at = EXCLUDED.updated_at"""
)
sql = f"""
INSERT INTO memoir_states (
id, user_id, stage_order, current_stage, covered_stages, slots, updated_at
) VALUES (
%(id)s, %(user_id)s, %(stage_order)s, %(current_stage)s,
%(covered_stages)s, %(slots)s, %(updated_at)s
) {conflict}
"""
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"stage_order": Json(r["stage_order"])
if r.get("stage_order") is not None
else None,
"current_stage": r.get("current_stage"),
"covered_stages": Json(r["covered_stages"])
if r.get("covered_stages") is not None
else None,
"slots": Json(r["slots"]),
"updated_at": r.get("updated_at"),
}
)
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_conversations(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM conversations ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
started_at = EXCLUDED.started_at,
last_message_at = EXCLUDED.last_message_at,
ended_at = EXCLUDED.ended_at,
duration_seconds = EXCLUDED.duration_seconds,
summary = EXCLUDED.summary,
status = EXCLUDED.status,
current_topic = EXCLUDED.current_topic,
conversation_stage = EXCLUDED.conversation_stage,
deleted_at = EXCLUDED.deleted_at"""
)
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"started_at": r["started_at"],
"last_message_at": None,
"ended_at": r["ended_at"],
"duration_seconds": r.get("duration_seconds") or 0,
"summary": r["summary"],
"status": r.get("status") or "active",
"current_topic": r["current_topic"],
"conversation_stage": r["conversation_stage"],
"deleted_at": None,
}
)
sql = f"""
INSERT INTO conversations (
id, user_id, started_at, last_message_at, ended_at, duration_seconds,
summary, status, current_topic, conversation_stage, deleted_at
) VALUES (
%(id)s, %(user_id)s, %(started_at)s, %(last_message_at)s, %(ended_at)s,
%(duration_seconds)s, %(summary)s, %(status)s, %(current_topic)s,
%(conversation_stage)s, %(deleted_at)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_segments(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT id, user_id FROM conversations")
conv_user = {r["id"]: r["user_id"] for r in cur.fetchall()}
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM segments ORDER BY id")
rows = []
for r in cur.fetchall():
uid = conv_user.get(r["conversation_id"])
if uid is None or uid in skip_user_ids:
continue
rows.append(r)
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
conversation_id = EXCLUDED.conversation_id,
audio_url = EXCLUDED.audio_url,
user_input_text = EXCLUDED.user_input_text,
audio_duration_seconds = EXCLUDED.audio_duration_seconds,
created_at = EXCLUDED.created_at,
processed = EXCLUDED.processed,
topic_category = EXCLUDED.topic_category,
agent_response = EXCLUDED.agent_response,
tts_audio_urls = EXCLUDED.tts_audio_urls"""
)
out: list[dict[str, Any]] = []
for r in rows:
text_val = r.get("transcript_text")
if text_val is None:
text_val = ""
out.append(
{
"id": r["id"],
"conversation_id": r["conversation_id"],
"audio_url": r["audio_url"],
"user_input_text": text_val,
"audio_duration_seconds": None,
"created_at": r["created_at"],
"processed": r.get("processed")
if r.get("processed") is not None
else False,
"topic_category": r["topic_category"],
"agent_response": r["agent_response"],
"tts_audio_urls": None,
}
)
sql = f"""
INSERT INTO segments (
id, conversation_id, audio_url, user_input_text, audio_duration_seconds,
created_at, processed, topic_category, agent_response, tts_audio_urls
) VALUES (
%(id)s, %(conversation_id)s, %(audio_url)s, %(user_input_text)s,
%(audio_duration_seconds)s, %(created_at)s, %(processed)s, %(topic_category)s,
%(agent_response)s, %(tts_audio_urls)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_orders(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM orders ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
plan_id = EXCLUDED.plan_id,
plan_name = EXCLUDED.plan_name,
amount = EXCLUDED.amount,
currency = EXCLUDED.currency,
payment_method = EXCLUDED.payment_method,
status = EXCLUDED.status,
trade_no = EXCLUDED.trade_no,
paid_at = EXCLUDED.paid_at,
created_at = EXCLUDED.created_at,
expired_at = EXCLUDED.expired_at"""
)
sql = f"""
INSERT INTO orders (
id, user_id, plan_id, plan_name, amount, currency, payment_method,
status, trade_no, paid_at, created_at, expired_at
) VALUES (
%(id)s, %(user_id)s, %(plan_id)s, %(plan_name)s, %(amount)s, %(currency)s,
%(payment_method)s, %(status)s, %(trade_no)s, %(paid_at)s, %(created_at)s,
%(expired_at)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def migrate_refresh_tokens(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM refresh_tokens ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
token = EXCLUDED.token,
expires_at = EXCLUDED.expires_at,
created_at = EXCLUDED.created_at,
is_revoked = EXCLUDED.is_revoked,
device_info = EXCLUDED.device_info"""
)
out: list[dict[str, Any]] = []
for r in rows:
out.append(
{
"id": r["id"],
"user_id": r["user_id"],
"token": r["token"],
"expires_at": r["expires_at"],
"created_at": r["created_at"],
"is_revoked": r.get("is_revoked")
if r.get("is_revoked") is not None
else False,
"device_info": r.get("device_info"),
}
)
sql = f"""
INSERT INTO refresh_tokens (
id, user_id, token, expires_at, created_at, is_revoked, device_info
) VALUES (
%(id)s, %(user_id)s, %(token)s, %(expires_at)s, %(created_at)s,
%(is_revoked)s, %(device_info)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, out)
return len(out)
def migrate_sms(legacy: Connection, target: Connection, on_conflict: OnConflict) -> int:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM sms_verification_codes ORDER BY id")
rows = cur.fetchall()
if not rows:
return 0
conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
phone = EXCLUDED.phone,
code = EXCLUDED.code,
purpose = EXCLUDED.purpose,
is_used = EXCLUDED.is_used,
is_expired = EXCLUDED.is_expired,
expires_at = EXCLUDED.expires_at,
created_at = EXCLUDED.created_at,
verified_at = EXCLUDED.verified_at,
ip_address = EXCLUDED.ip_address"""
)
sql = f"""
INSERT INTO sms_verification_codes (
id, phone, code, purpose, is_used, is_expired, expires_at,
created_at, verified_at, ip_address
) VALUES (
%(id)s, %(phone)s, %(code)s, %(purpose)s, %(is_used)s, %(is_expired)s,
%(expires_at)s, %(created_at)s, %(verified_at)s, %(ip_address)s
) {conflict}
"""
with target.cursor() as cur:
cur.executemany(sql, rows)
return len(rows)
def _book_id_for_user(target: Connection, user_id: str) -> str | None:
with target.cursor() as cur:
cur.execute(
"""
SELECT id FROM books
WHERE user_id = %s
ORDER BY updated_at DESC NULLS LAST
LIMIT 1
""",
(user_id,),
)
row = cur.fetchone()
return row[0] if row else None
def _parse_images_for_memoir(images_val: Any) -> list[str]:
if images_val is None:
return []
if isinstance(images_val, str):
try:
images_val = json.loads(images_val)
except json.JSONDecodeError:
return []
if not isinstance(images_val, list):
return []
urls: list[str] = []
for item in images_val:
if isinstance(item, str) and item.startswith(("http://", "https://")):
urls.append(item)
elif isinstance(item, dict):
u = item.get("url") or item.get("image_url") or item.get("src")
if isinstance(u, str) and u.startswith(("http://", "https://")):
urls.append(u)
return urls
def migrate_chapters_and_images(
legacy: Connection,
target: Connection,
on_conflict: OnConflict,
skip_user_ids: set[str],
) -> tuple[int, int]:
with legacy.cursor(row_factory=dict_row) as cur:
cur.execute("SELECT * FROM chapters ORDER BY id")
rows = [r for r in cur.fetchall() if r["user_id"] not in skip_user_ids]
if not rows:
return 0, 0
ch_conflict = (
"ON CONFLICT (id) DO NOTHING"
if on_conflict == "skip"
else """ON CONFLICT (id) DO UPDATE SET
user_id = EXCLUDED.user_id,
book_id = EXCLUDED.book_id,
title = EXCLUDED.title,
category = EXCLUDED.category,
order_index = EXCLUDED.order_index,
summary = EXCLUDED.summary,
canonical_markdown = EXCLUDED.canonical_markdown,
status = EXCLUDED.status,
cover_asset_id = EXCLUDED.cover_asset_id,
current_version_id = EXCLUDED.current_version_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
is_new = EXCLUDED.is_new,
is_active = EXCLUDED.is_active,
source_segments = EXCLUDED.source_segments,
markdown_compose_dirty = EXCLUDED.markdown_compose_dirty,
markdown_composed_at = EXCLUDED.markdown_composed_at,
reading_segments_json = EXCLUDED.reading_segments_json"""
)
insert_ch = f"""
INSERT INTO chapters (
id, user_id, book_id, title, category, order_index, summary,
canonical_markdown, status, cover_asset_id, current_version_id,
created_at, updated_at, is_new, is_active, source_segments,
markdown_compose_dirty, markdown_composed_at, reading_segments_json
) VALUES (
%(id)s, %(user_id)s, %(book_id)s, %(title)s, %(category)s, %(order_index)s,
%(summary)s, %(canonical_markdown)s, %(status)s, %(cover_asset_id)s,
%(current_version_id)s, %(created_at)s, %(updated_at)s, %(is_new)s,
%(is_active)s, %(source_segments)s, %(markdown_compose_dirty)s,
%(markdown_composed_at)s, %(reading_segments_json)s
) {ch_conflict}
"""
img_inserted = 0
chapter_payloads: list[dict[str, Any]] = []
for r in rows:
uid = r["user_id"]
bid = _book_id_for_user(target, uid)
ts = r.get("updated_at")
chapter_payloads.append(
{
"id": r["id"],
"user_id": uid,
"book_id": bid,
"title": r["title"],
"category": r.get("category"),
"order_index": r["order_index"],
"summary": None,
"canonical_markdown": r.get("content"),
"status": r.get("status") or "draft",
"cover_asset_id": None,
"current_version_id": None,
"created_at": ts,
"updated_at": ts,
"is_new": r.get("is_new") if r.get("is_new") is not None else True,
"is_active": r.get("is_active")
if r.get("is_active") is not None
else True,
"source_segments": Json(r["source_segments"])
if r.get("source_segments") is not None
else None,
"markdown_compose_dirty": False,
"markdown_composed_at": None,
"reading_segments_json": None,
}
)
with target.cursor() as cur:
cur.executemany(insert_ch, chapter_payloads)
if on_conflict == "skip":
img_conflict = "ON CONFLICT (id) DO NOTHING"
else:
img_conflict = """ON CONFLICT (id) DO UPDATE SET
chapter_id = EXCLUDED.chapter_id,
order_index = EXCLUDED.order_index,
url = EXCLUDED.url,
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at"""
insert_img = f"""
INSERT INTO memoir_images (
id, chapter_id, order_index, placeholder, description, status,
prompt, url, storage_key, provider, style, size, error,
retryable, created_at, updated_at
) VALUES (
%(id)s, %(chapter_id)s, %(order_index)s, %(placeholder)s,
%(description)s, %(status)s, %(prompt)s, %(url)s, %(storage_key)s,
%(provider)s, %(style)s, %(size)s, %(error)s, %(retryable)s,
%(created_at)s, %(updated_at)s
) {img_conflict}
"""
for r in rows:
urls = _parse_images_for_memoir(r.get("images"))
if not urls:
continue
for i, url in enumerate(urls):
mid = str(
uuid.uuid5(
uuid.NAMESPACE_URL,
f"memoir_image:{r['id']}:{i}:{url}",
)
)
cur.execute(
insert_img,
{
"id": mid,
"chapter_id": r["id"],
"order_index": i,
"placeholder": None,
"description": None,
"status": "completed",
"prompt": None,
"url": url,
"storage_key": None,
"provider": None,
"style": None,
"size": None,
"error": None,
"retryable": None,
"created_at": r.get("updated_at"),
"updated_at": r.get("updated_at"),
},
)
img_inserted += 1
return len(chapter_payloads), img_inserted
def main() -> None:
p = argparse.ArgumentParser(
description="Migrate legacy Life Echo DB into current schema."
)
p.add_argument(
"--legacy-url",
required=True,
help="PostgreSQL URL to DB restored from old pg_dump (old schema only).",
)
p.add_argument(
"--target-url",
required=True,
help="PostgreSQL URL to current DB (alembic upgrade head).",
)
p.add_argument(
"--on-conflict",
choices=("upsert", "skip"),
default="upsert",
help="upsert: overwrite same id; skip: keep existing rows",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Connect and print row counts only; no writes.",
)
args = p.parse_args()
on_conflict: OnConflict = args.on_conflict # type: ignore[assignment]
legacy = _open(args.legacy_url)
target = _open(args.target_url)
try:
if args.dry_run:
with legacy.cursor(row_factory=dict_row) as cur:
for t in (
"users",
"books",
"chapters",
"conversations",
"segments",
"memoir_states",
"orders",
"refresh_tokens",
"sms_verification_codes",
):
cur.execute(f"SELECT COUNT(*) AS c FROM {t}")
c = cur.fetchone()["c"]
logger.info("legacy {} rows={}", t, c)
logger.info("dry-run done")
return
skip_users = _legacy_user_ids_skipped_for_phone(legacy, target)
if skip_users:
logger.info(
"skip {} legacy users due to phone already owned in target",
len(skip_users),
)
n_users = migrate_users(legacy, target, on_conflict, skip_users)
n_books = migrate_books(legacy, target, on_conflict, skip_users)
n_memoir = migrate_memoir_states(legacy, target, on_conflict, skip_users)
n_conv = migrate_conversations(legacy, target, on_conflict, skip_users)
n_seg = migrate_segments(legacy, target, on_conflict, skip_users)
n_ord = migrate_orders(legacy, target, on_conflict, skip_users)
n_rt = migrate_refresh_tokens(legacy, target, on_conflict, skip_users)
n_sms = migrate_sms(legacy, target, on_conflict)
n_ch, n_img = migrate_chapters_and_images(
legacy, target, on_conflict, skip_users
)
target.commit()
logger.info(
"migration committed: users={} books={} memoir_states={} "
"conversations={} segments={} orders={} refresh_tokens={} "
"sms={} chapters={} memoir_images={}",
n_users,
n_books,
n_memoir,
n_conv,
n_seg,
n_ord,
n_rt,
n_sms,
n_ch,
n_img,
)
except Exception:
target.rollback()
logger.exception("migration failed; target rolled back")
raise
finally:
legacy.close()
target.close()
if __name__ == "__main__":
main()