Files
life-echo/api/app/features/conversation/service.py
Kevin 786ebf8ae6 refactor(api,expo): 多智能体与会话收敛、回忆录兼容层移除、后端测试集大幅删减
- 对齐「多智能体收敛」与「回忆录 stories-first / markdown-first」方向:收紧运行时契约、
  删除过渡兼容路径与双轨逻辑,并同步更新客户端与文档。

- Chat:以 ChatOrchestrator 为实时编排入口;删除独立 conversation_agent,精简 prompts。
- Memoir:删除 memory_agent;MemoirOrchestrator、classification / story_route 与 prompts 收敛到
  prepare_batches + run_story_pipeline_for_category_batch 主链路。
- 将 agents 侧 processor 迁入 feature 层为 background_runner,并移除 features 下重复/过时
  processor 封装。

- 新增 history_store,强化「conversation_messages 为 DB 真源、Redis 为缓存」模型。
- 调整 models、repo、service、session_history;精简 WS message_types,重构 pipeline 与 router。

- 移除章节占位、整章再生等旧路径;章节列表与封面逻辑要求 story 关联;收紧 cover 资格与
  enqueue。
- helpers、repo、service、router、reading_segment_materialize、story_pipeline_sync、pdf_service
  等按 canonical markdown / cover_asset_id 收缩;删除 memoir_images/provider 等冗余。
- tasks:memoir_tasks、chapter_cover_tasks 等大幅瘦身;story_image_tasks 等与当前图片任务对齐。

- core:config、logging、redis、task_tracker 小幅调整。
- auth / user / payment / quota:路由或服务侧删减过时接口或逻辑(如 payment router 行数减少)。

- pyproject.toml、development.sh、.env.example / .env.production、README 等同步说明或变量。

- Alembic 0001_initial_schema 微调(与当前 schema 叙事一致的小改动)。

- 回忆录:types / mappers / api、章节页与 memoir 页与后端契约对齐;markdown-renderer 调整。
- 语音:删除 voice/player,voice-segment-store 相应精简。

- api/tests:删除 conftest 及绝大部分既有测试文件(websocket_baseline、conversation、memoir
  图片、PDF、SMS 等),属有意收缩/待按 backend-test-system 重建的信号。
- docs:新增多智能体收敛与移除兼容层计划摘要;更新 story-first 设计、backend-test-system、
  multi-agent-refactor-plan、实施总结等。

BREAKING CHANGE: 后端对外契约、回忆录章节字段与若干路由/任务行为已变更;大量 API 测试被移除,
  CI 若依赖这些用例需按新策略补测或调整流水线。
2026-03-22 18:10:28 +08:00

283 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Conversation service — 对话编排(列表、创建、结束、删除、消息、整理)。"""
import uuid
from datetime import datetime, timezone
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cos_url_keys import (
collect_cos_keys_from_conversation_history,
collect_cos_keys_from_tts_url_list,
extract_cos_object_key_if_owned,
)
from app.core.logging import get_logger
from app.core.redis import redis_service
from app.core.storage_purge import delete_object_storage_keys_best_effort
from app.features.conversation import repo
from app.features.conversation.models import Conversation
from app.features.conversation.session_history import (
conversation_messages_to_redis_history,
)
from app.features.memory import repo as memory_repo
from app.features.quota.service import QuotaService
from app.ports.storage import ObjectStorage
from app.tasks.memoir_tasks import process_memoir_segments
logger = get_logger(__name__)
def _datetime_to_timestamp_ms(value: datetime | None) -> int:
if value is None:
return int(datetime.now(timezone.utc).timestamp() * 1000)
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return int(value.timestamp() * 1000)
def _message_timestamp_ms(msg: dict, fallback: datetime | None) -> int:
raw_timestamp = msg.get("timestamp")
if isinstance(raw_timestamp, (int, float)):
return int(raw_timestamp)
if isinstance(raw_timestamp, str):
try:
return int(
datetime.fromisoformat(raw_timestamp.replace("Z", "+00:00")).timestamp()
* 1000
)
except ValueError:
pass
return _datetime_to_timestamp_ms(fallback)
def _latest_message_time_ms(conversation: Conversation, history: list[dict]) -> int:
if conversation.last_message_at:
return _datetime_to_timestamp_ms(conversation.last_message_at)
if history:
return _message_timestamp_ms(history[-1], conversation.started_at)
return _datetime_to_timestamp_ms(conversation.started_at)
def _build_messages_from_history(
conversation_id: str,
history: list[dict],
fallback_timestamp: datetime | None,
) -> list[dict]:
messages: list[dict] = []
seen_audio_sessions: set[str] = set()
for idx, msg in enumerate(history):
role = msg.get("role")
message_type = msg.get("messageType", "text")
voice_session_id = msg.get("voiceSessionId")
if role == "human" and message_type == "audio" and voice_session_id:
if voice_session_id in seen_audio_sessions:
continue
seen_audio_sessions.add(voice_session_id)
item: dict = {
"id": f"{conversation_id}_msg_{idx}",
"conversationId": conversation_id,
"content": msg.get("content", ""),
"senderType": "user" if role == "human" else "assistant",
"timestamp": _message_timestamp_ms(msg, fallback_timestamp),
"messageType": message_type,
}
if voice_session_id and role == "human":
item["voiceSessionId"] = voice_session_id
ds = msg.get("durationSeconds")
if isinstance(ds, (int, float)) and ds > 0:
item["durationSeconds"] = int(ds)
if role == "ai":
tts = msg.get("ttsAudioUrls")
if isinstance(tts, list) and tts:
item["ttsAudioUrls"] = [x for x in tts if isinstance(x, str)]
messages.append(item)
return messages
class ConversationService:
def __init__(
self,
db: AsyncSession,
quota_service: QuotaService,
*,
object_storage: ObjectStorage | None = None,
):
self._db = db
self._quota = quota_service
self._object_storage = object_storage
async def _clear_history(self, conversation_id: str) -> None:
try:
await redis_service.clear_conversation_history(conversation_id)
except Exception:
pass
async def ensure_redis_history_from_db(self, conversation_id: str) -> list[dict]:
"""
供 WS 与 get_messages 使用:优先 Redis若为空则用 DB conversation_messages 重建并写回。
"""
try:
history = await redis_service.get_conversation_history(conversation_id)
except Exception as exc:
logger.warning("conversation history cache read skipped: %s", exc)
history = []
if history:
return history
rows = await repo.get_conversation_messages(conversation_id, self._db)
if rows:
rebuilt = conversation_messages_to_redis_history(rows)
try:
await redis_service.set_conversation_history(conversation_id, rebuilt)
except Exception as exc:
logger.warning("conversation history cache write skipped: %s", exc)
return rebuilt
return []
async def list_for_user(self, user_id: str) -> list[dict]:
conversations = await repo.get_user_conversations(user_id, self._db)
result = []
for conv in conversations:
history: list[dict] = []
try:
history = await self.ensure_redis_history_from_db(conv.id)
except Exception:
pass
latest_message = history[-1].get("content", "")[:50] if history else None
result.append(
{
"id": conv.id,
"title": (conv.summary or "")[:30] or "岁月知己",
"avatarUrl": None,
"latestMessagePreview": latest_message or conv.summary,
"latestMessageTime": _latest_message_time_ms(conv, history),
"unreadCount": 0,
"isDefaultAssistant": conv.summary is None,
}
)
return result
async def create(self, user_id: str) -> dict:
conv = Conversation(
id=str(uuid.uuid4()),
user_id=user_id,
started_at=datetime.now(timezone.utc),
status="active",
)
repo.add_conversation(conv, self._db)
await self._db.commit()
await self._db.refresh(conv)
return {
"id": conv.id,
"user_id": conv.user_id,
"started_at": conv.started_at.isoformat(),
"status": conv.status,
}
async def get_or_404(self, conversation_id: str, user_id: str) -> Conversation:
conv = await repo.get_conversation(conversation_id, self._db)
if not conv or conv.user_id != user_id or conv.deleted_at is not None:
raise HTTPException(status_code=404, detail="Conversation not found")
return conv
async def get_one(self, conversation_id: str, user_id: str) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
return {
"id": conv.id,
"user_id": conv.user_id,
"started_at": conv.started_at.isoformat(),
"ended_at": conv.ended_at.isoformat() if conv.ended_at else None,
"duration_seconds": conv.duration_seconds,
"summary": conv.summary,
"status": conv.status,
"current_topic": conv.current_topic,
"conversation_stage": conv.conversation_stage,
}
async def end(self, conversation_id: str, user_id: str) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
conv.status = "ended"
conv.ended_at = datetime.now(timezone.utc)
if conv.started_at:
conv.duration_seconds = int(
(conv.ended_at - conv.started_at).total_seconds()
)
await self._db.commit()
return {
"id": conv.id,
"status": conv.status,
"ended_at": conv.ended_at.isoformat(),
"duration_seconds": conv.duration_seconds,
}
async def delete(self, conversation_id: str, user_id: str) -> None:
conv = await self.get_or_404(conversation_id, user_id)
cos_keys: set[str] = set(
await memory_repo.list_storage_keys_for_conversation(
self._db, conversation_id
)
)
try:
hist = await redis_service.get_conversation_history(conversation_id)
cos_keys |= collect_cos_keys_from_conversation_history(hist)
except Exception:
pass
segments = await repo.get_segments_for_conversation(conversation_id, self._db)
for seg in segments:
k = extract_cos_object_key_if_owned(seg.audio_url)
if k:
cos_keys.add(k)
raw_tts = getattr(seg, "tts_audio_urls", None)
if isinstance(raw_tts, list):
cos_keys |= collect_cos_keys_from_tts_url_list(
[str(x) for x in raw_tts if isinstance(x, str)]
)
await self._clear_history(conversation_id)
conv.deleted_at = datetime.now(timezone.utc)
await self._db.commit()
delete_object_storage_keys_best_effort(
self._object_storage,
sorted(cos_keys),
log_prefix=f"conversation_soft_delete id={conversation_id}",
)
async def get_messages(self, conversation_id: str, user_id: str) -> list[dict]:
conv = await self.get_or_404(conversation_id, user_id)
try:
history = await self.ensure_redis_history_from_db(conversation_id)
return _build_messages_from_history(
conversation_id=conversation_id,
history=history,
fallback_timestamp=conv.started_at,
)
except Exception:
return []
async def organize(
self, conversation_id: str, user_id: str, subscription_type: str
) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
segments = await repo.get_segments_for_organize(conversation_id, self._db)
if not segments:
raise HTTPException(status_code=400, detail="该对话没有可整理的内容")
can_submit, quota_message = await self._quota.check_can_submit_organize(
user_id, subscription_type
)
if not can_submit:
raise HTTPException(status_code=403, detail=quota_message)
segment_ids = [s.id for s in segments]
process_memoir_segments.delay(conv.user_id, segment_ids)
logger.info(
"手动触发对话整理: conversation_id=%s, segments=%s",
conversation_id,
len(segment_ids),
)
return {
"message": "对话整理任务已提交",
"conversation_id": conversation_id,
"segments_count": len(segment_ids),
}