Files
life-echo/api/scripts/run_chapter_sections_migration.py

158 lines
5.9 KiB
Python
Raw Normal View History

"""
一键执行 chapter_sections 迁移先执行 SQL 建表/加列再回填数据并删列
依赖.env DATABASE_URL以及 psycopgpython-dotenv
用法 api 目录下:
python -m scripts.run_chapter_sections_migration
"""
import json
import logging
import os
import sys
import uuid
from pathlib import Path
# 仅加载 .env不导入 database避免 asyncpg 等依赖)
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dotenv import load_dotenv
load_dotenv()
import psycopg
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger(__name__)
def get_engine() -> Engine:
url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/life_echo")
return create_engine(url.replace("postgresql://", "postgresql+psycopg://"), pool_pre_ping=True)
def run_sql_migration(engine: Engine):
sql_path = Path(__file__).parent.parent / "migrations" / "add_chapter_sections.sql"
sql = sql_path.read_text(encoding="utf-8")
# 按 DO $$ ... $$; 与普通 ; 拆分,避免把 PL/pgSQL 块拆碎
stmts = []
rest = sql
while rest:
rest = rest.lstrip()
if rest.startswith("--"):
rest = rest[rest.find("\n") + 1:] if "\n" in rest else ""
continue
if rest.upper().startswith("DO "):
# 找到 $$; 或 $$ ;
i = rest.find("$$")
if i == -1:
break
j = rest.find("$$", i + 2)
if j == -1:
break
stmts.append(rest[: j + 2].strip() + ";")
rest = rest[j + 2:].lstrip().lstrip(";").lstrip()
continue
idx = rest.find(";")
if idx == -1:
break
part = rest[: idx].strip()
rest = rest[idx + 1:]
if part and not part.startswith("--"):
stmts.append(part + ";")
with engine.begin() as conn:
for i, s in enumerate(stmts):
try:
conn.execute(text(s))
logger.info(" SQL %s OK", i + 1)
except Exception as e:
if "already exists" in str(e).lower():
logger.info(" SQL %s (已存在)", i + 1)
continue
raise
logger.info("1/2 SQL 迁移完成")
def run_data_migration(engine: Engine):
from services.memoir_images.parser import split_narrative_to_sections
with engine.connect() as conn:
r = conn.execute(text("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'chapters' AND column_name = 'content'
"""))
if r.fetchone() is None:
logger.info("chapters.content 已不存在,跳过数据迁移")
return
rows = conn.execute(text("""
SELECT id, content, images FROM chapters WHERE content IS NOT NULL AND trim(content) != ''
""")).fetchall()
for row in rows:
ch_id, content, images_raw = row[0], row[1], row[2]
if isinstance(images_raw, str):
try:
images = json.loads(images_raw)
except Exception:
images = []
else:
images = images_raw if isinstance(images_raw, list) else []
sections = split_narrative_to_sections(content or "")
if not sections:
section_id = str(uuid.uuid4()).replace("-", "")[:32]
conn.execute(text("""
INSERT INTO chapter_sections (id, chapter_id, order_index, content, image, updated_at)
VALUES (:id, :ch_id, 0, :content, NULL, NOW())
"""), {"id": section_id, "ch_id": ch_id, "content": (content or "").strip()})
conn.commit()
logger.info("章节 %s: 1 条 section无图", ch_id)
continue
first_cover = None
img_index = 0
for order_idx, seg in enumerate(sections):
section_id = str(uuid.uuid4()).replace("-", "")[:32]
seg_content = seg.get("content") or ""
ph = seg.get("placeholder_info")
image_json = None
if ph is not None and img_index < len(images):
image_json = json.dumps(images[img_index]) if isinstance(images[img_index], dict) else None
if first_cover is None and image_json:
first_cover = image_json
img_index += 1
conn.execute(text("""
INSERT INTO chapter_sections (id, chapter_id, order_index, content, image, updated_at)
VALUES (:id, :ch_id, :ord, :content, CAST(:img AS jsonb), NOW())
"""), {
"id": section_id,
"ch_id": ch_id,
"ord": order_idx,
"content": seg_content,
"img": image_json,
})
if first_cover:
conn.execute(
text("UPDATE chapters SET cover_image = CAST(:img AS jsonb) WHERE id = :id"),
{"img": first_cover, "id": ch_id},
)
conn.commit()
logger.info("章节 %s: %d 条 sections", ch_id, len(sections))
conn.execute(text("ALTER TABLE chapters DROP COLUMN IF EXISTS content"))
conn.execute(text("ALTER TABLE chapters DROP COLUMN IF EXISTS images"))
conn.commit()
logger.info("已删除 chapters.content 与 chapters.images")
logger.info("2/2 数据迁移完成")
if __name__ == "__main__":
logger.info("开始 chapter_sections 迁移…")
engine = get_engine()
run_sql_migration(engine)
run_data_migration(engine)
logger.info("迁移全部完成")