#!/usr/bin/env python3 """回填 chapters.evidence_bundle_json(Phase 九:历史 chapter 无快照时可批量物化)。 用法(需在已执行 Alembic 0009+ 的库上):: cd api && uv run python scripts/backfill_chapter_evidence_snapshots.py cd api && uv run python scripts/backfill_chapter_evidence_snapshots.py --user-id """ from __future__ import annotations import argparse from sqlalchemy import select from app.features.auth import models as _auth_models # noqa: F401 from app.features.conversation import models as _conv_models # noqa: F401 from app.features.memory import models as _memory_models # noqa: F401 from app.features.memoir import models as _memoir_models # noqa: F401 from app.features.payment import models as _payment_models # noqa: F401 from app.features.story import models as _story_models # noqa: F401 from app.features.user import models as _user_models # noqa: F401 from app.core.db import SessionLocal from app.features.memoir.chapter_evidence_snapshot import ( refresh_chapter_evidence_snapshot_sync, ) from app.features.memoir.models import Chapter def main() -> None: p = argparse.ArgumentParser() p.add_argument("--user-id", default="", help="仅刷新该用户的章节;默认全表") p.add_argument("--limit", type=int, default=0, help="最多处理条数,0 表示不限制") args = p.parse_args() uid = (args.user_id or "").strip() session = SessionLocal() n_ok = 0 try: stmt = select(Chapter.id) if uid: stmt = stmt.where(Chapter.user_id == uid) if args.limit > 0: stmt = stmt.limit(args.limit) ids = list(session.execute(stmt).scalars().all()) for cid in ids: if refresh_chapter_evidence_snapshot_sync(session, str(cid)): n_ok += 1 session.commit() print(f"refreshed_snapshots={n_ok} chapter_rows={len(ids)}") finally: session.close() if __name__ == "__main__": main()