feat: 更新对话内容处理逻辑及新增用户回忆整理脚本

- 修改 get_narrative_prompt 函数,优化对话内容的叙述生成逻辑,确保新内容与已有内容自然衔接。
- 在 api/scripts 中新增 reprocess_user_memoir.py 脚本,用于整理用户历史对话为回忆录章节,支持远程预览和确认写入。
- 更新 .gitignore,添加对 certs/ 和 scripts/output/ 目录的忽略规则,确保不必要的文件不被跟踪。
- 在 memoir_tasks.py 中添加章节锁机制,防止并发写入同一章节,提升数据一致性和安全性。
This commit is contained in:
penghanyuan
2026-02-21 09:33:31 +01:00
parent d9e010ef70
commit 99a1660d03
5 changed files with 804 additions and 99 deletions

View File

@@ -27,6 +27,20 @@ from agents.prompts.memory_prompts import (
logger = logging.getLogger(__name__)
def _acquire_chapter_lock(user_id: str, stage: str, timeout: int = 120) -> bool:
"""获取章节分布式锁,防止并发写入同一章节"""
r = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))
lock_key = f"lock:chapter:{user_id}:{stage}"
return r.set(lock_key, "1", nx=True, ex=timeout)
def _release_chapter_lock(user_id: str, stage: str):
"""释放章节分布式锁"""
r = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))
lock_key = f"lock:chapter:{user_id}:{stage}"
r.delete(lock_key)
def _update_task_status_sync(user_id: str, task_id: str, status: str, result: Dict = None):
"""同步更新任务状态(在 Celery 任务中使用)"""
try:
@@ -208,96 +222,116 @@ def process_memoir_segments(self, user_id: str, segment_ids: List[str]):
# 生成章节内容
for stage, stage_segments in stage_to_segments.items():
segment_texts = [seg.transcript_text for seg in stage_segments]
combined_text = "\n\n".join(segment_texts)
source_ids = [seg.id for seg in stage_segments]
# 查找 active 章节(被清除的章节不继续更新,而是创建新的)
stmt_chapter = select(Chapter).where(
Chapter.user_id == user_id,
Chapter.category == stage,
Chapter.is_active == True,
)
result_chapter = db.execute(stmt_chapter)
chapter = result_chapter.scalar_one_or_none()
# 获取 slot snippets
slot_snippets = {
key: value.snippet
for key, value in (state.slots.get(stage, {}) or {}).items()
if value.snippet
}
# 生成标题和内容
title = chapter.title if chapter else f"{stage} 回忆"
existing_content = chapter.content if chapter else ""
narrative = combined_text
if llm:
try:
if not chapter:
title_prompt = get_creative_title_prompt(
stage=stage,
emotion="neutral",
slots=slot_snippets
if not _acquire_chapter_lock(user_id, stage):
logger.warning(f"章节锁竞争: user={user_id}, stage={stage}, 延迟重试")
raise self.retry(countdown=10)
try:
segment_texts = [seg.transcript_text for seg in stage_segments]
combined_text = "\n\n".join(segment_texts)
source_ids = [seg.id for seg in stage_segments]
# 查找 active 章节(被清除的章节不继续更新,而是创建新的)
stmt_chapter = select(Chapter).where(
Chapter.user_id == user_id,
Chapter.category == stage,
Chapter.is_active == True,
)
result_chapter = db.execute(stmt_chapter)
chapter = result_chapter.scalar_one_or_none()
# 获取 slot snippets
slot_snippets = {
key: value.snippet
for key, value in (state.slots.get(stage, {}) or {}).items()
if value.snippet
}
# 生成标题和内容
title = chapter.title if chapter else f"{stage} 回忆"
existing_content = chapter.content if chapter else ""
narrative = combined_text
if llm:
try:
if not chapter:
title_prompt = get_creative_title_prompt(
stage=stage,
emotion="neutral",
slots=slot_snippets
)
title_response = llm.invoke(title_prompt)
title = title_response.content.strip().strip('"')
narrative_prompt = get_narrative_prompt(
stage=stage,
slots=slot_snippets,
new_content=combined_text,
existing_content=existing_content,
)
title_response = llm.invoke(title_prompt)
title = title_response.content.strip().strip('"')
narrative_prompt = get_narrative_prompt(
stage=stage,
slots=slot_snippets,
new_content=combined_text,
existing_content=existing_content,
narrative_response = llm.invoke(narrative_prompt)
new_narrative = narrative_response.content.strip()
# 追加而非替换
if existing_content:
narrative = f"{existing_content}\n\n{new_narrative}"
else:
narrative = new_narrative
except Exception as e:
logger.warning(f"LLM 生成失败: {e}")
if existing_content:
narrative = f"{existing_content}\n\n{combined_text}"
# 安全检查:新内容不应比旧内容短
if existing_content and len(narrative) < len(existing_content) * 0.8:
logger.warning(
f"内容长度异常: existing={len(existing_content)}, "
f"new={len(narrative)}, stage={stage}. 回退为追加模式"
)
narrative_response = llm.invoke(narrative_prompt)
narrative = narrative_response.content.strip()
except Exception as e:
logger.warning(f"LLM 生成失败: {e}")
if existing_content:
narrative = f"{existing_content}\n\n{combined_text}"
# 更新或创建章节
if chapter:
chapter.content = narrative
chapter.title = title
chapter.is_new = True
chapter.source_segments = list({*(chapter.source_segments or []), *source_ids})
else:
# 根据 stage 计算正确的排序索引
calculated_order_index = STAGE_TO_ORDER.get(stage, 999)
chapter = Chapter(
id=str(uuid.uuid4()),
user_id=user_id,
title=title,
content=narrative,
order_index=calculated_order_index,
status="completed",
category=stage,
images=[],
is_new=True,
source_segments=source_ids,
)
db.add(chapter)
db.flush()
# 更新 Book
stmt_book = select(Book).where(Book.user_id == user_id).order_by(Book.updated_at.desc())
result_book = db.execute(stmt_book)
book = result_book.scalar_one_or_none()
if not book:
book = Book(
id=str(uuid.uuid4()),
user_id=user_id,
title="我的回忆录",
total_pages=0,
total_words=0,
cover_image_url=None,
)
db.add(book)
book.has_update = True
book.last_update_chapter_id = chapter.id
narrative = f"{existing_content}\n\n{combined_text}"
# 更新或创建章节
if chapter:
chapter.content = narrative
chapter.title = title
chapter.is_new = True
chapter.source_segments = list({*(chapter.source_segments or []), *source_ids})
else:
# 根据 stage 计算正确的排序索引
calculated_order_index = STAGE_TO_ORDER.get(stage, 999)
chapter = Chapter(
id=str(uuid.uuid4()),
user_id=user_id,
title=title,
content=narrative,
order_index=calculated_order_index,
status="completed",
category=stage,
images=[],
is_new=True,
source_segments=source_ids,
)
db.add(chapter)
db.flush()
# 更新 Book
stmt_book = select(Book).where(Book.user_id == user_id).order_by(Book.updated_at.desc())
result_book = db.execute(stmt_book)
book = result_book.scalar_one_or_none()
if not book:
book = Book(
id=str(uuid.uuid4()),
user_id=user_id,
title="我的回忆录",
total_pages=0,
total_words=0,
cover_image_url=None,
)
db.add(book)
book.has_update = True
book.last_update_chapter_id = chapter.id
finally:
_release_chapter_lock(user_id, stage)
# 标记段落为已处理
for seg in segments:
@@ -360,10 +394,23 @@ def generate_chapter_content(self, user_id: str, stage: str, new_content: str):
existing_content=existing_content,
)
response = llm.invoke(prompt)
narrative = response.content.strip()
new_narrative = response.content.strip()
# 追加而非替换
if existing_content:
narrative = f"{existing_content}\n\n{new_narrative}"
else:
narrative = new_narrative
else:
narrative = f"{existing_content}\n\n{new_content}" if existing_content else new_content
# 安全检查:新内容不应比旧内容短
if existing_content and len(narrative) < len(existing_content) * 0.8:
logger.warning(
f"内容长度异常: existing={len(existing_content)}, "
f"new={len(narrative)}, stage={stage}. 回退为追加模式"
)
narrative = f"{existing_content}\n\n{new_content}"
if chapter:
chapter.content = narrative
chapter.is_new = True