Files
life-echo/api/app/features/conversation/service.py
2026-03-19 14:36:40 +08:00

213 lines
7.9 KiB
Python

"""Conversation service — 对话编排(列表、创建、结束、删除、消息、整理)。"""
from app.core.logging import get_logger
import uuid
from datetime import datetime, timezone
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.features.conversation import repo
from app.features.conversation.models import Conversation, Segment
from app.features.quota.service import QuotaService
from app.tasks.memoir_tasks import process_memoir_segments
logger = get_logger(__name__)
def _datetime_to_timestamp_ms(value: datetime | None) -> int:
if value is None:
return int(datetime.now(timezone.utc).timestamp() * 1000)
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return int(value.timestamp() * 1000)
def _message_timestamp_ms(msg: dict, fallback: datetime | None) -> int:
raw_timestamp = msg.get("timestamp")
if isinstance(raw_timestamp, (int, float)):
return int(raw_timestamp)
if isinstance(raw_timestamp, str):
try:
return int(
datetime.fromisoformat(raw_timestamp.replace("Z", "+00:00")).timestamp()
* 1000
)
except ValueError:
pass
return _datetime_to_timestamp_ms(fallback)
def _latest_message_time_ms(conversation: Conversation, history: list[dict]) -> int:
if conversation.last_message_at:
return _datetime_to_timestamp_ms(conversation.last_message_at)
if history:
return _message_timestamp_ms(history[-1], conversation.started_at)
return _datetime_to_timestamp_ms(conversation.started_at)
def _build_messages_from_history(
conversation_id: str,
history: list[dict],
fallback_timestamp: datetime | None,
) -> list[dict]:
messages: list[dict] = []
seen_audio_sessions: set[str] = set()
for idx, msg in enumerate(history):
role = msg.get("role")
message_type = msg.get("messageType", "text")
voice_session_id = msg.get("voiceSessionId")
if role == "human" and message_type == "audio" and voice_session_id:
if voice_session_id in seen_audio_sessions:
continue
seen_audio_sessions.add(voice_session_id)
messages.append(
{
"id": f"{conversation_id}_msg_{idx}",
"conversationId": conversation_id,
"content": msg.get("content", ""),
"senderType": "user" if role == "human" else "assistant",
"timestamp": _message_timestamp_ms(msg, fallback_timestamp),
"messageType": message_type,
}
)
return messages
class ConversationService:
def __init__(self, db: AsyncSession, quota_service: QuotaService):
self._db = db
self._quota = quota_service
async def _get_history(self, conversation_id: str) -> list[dict]:
from app.core.redis import redis_service
return await redis_service.get_conversation_history(conversation_id)
async def _clear_history(self, conversation_id: str) -> None:
try:
from app.core.redis import redis_service
await redis_service.clear_conversation_history(conversation_id)
except Exception:
pass
async def list_for_user(self, user_id: str) -> list[dict]:
conversations = await repo.get_user_conversations(user_id, self._db)
result = []
for conv in conversations:
history: list[dict] = []
try:
history = await self._get_history(conv.id)
except Exception:
pass
latest_message = history[-1].get("content", "")[:50] if history else None
result.append(
{
"id": conv.id,
"title": (conv.summary or "")[:30] or "岁月知己",
"avatarUrl": None,
"latestMessagePreview": latest_message or conv.summary,
"latestMessageTime": _latest_message_time_ms(conv, history),
"unreadCount": 0,
"isDefaultAssistant": conv.summary is None,
}
)
return result
async def create(self, user_id: str) -> dict:
conv = Conversation(
id=str(uuid.uuid4()),
user_id=user_id,
started_at=datetime.now(timezone.utc),
status="active",
)
repo.add_conversation(conv, self._db)
await self._db.commit()
await self._db.refresh(conv)
return {
"id": conv.id,
"user_id": conv.user_id,
"started_at": conv.started_at.isoformat(),
"status": conv.status,
}
async def get_or_404(self, conversation_id: str, user_id: str) -> Conversation:
conv = await repo.get_conversation(conversation_id, self._db)
if not conv or conv.user_id != user_id:
raise HTTPException(status_code=404, detail="Conversation not found")
return conv
async def get_one(self, conversation_id: str, user_id: str) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
return {
"id": conv.id,
"user_id": conv.user_id,
"started_at": conv.started_at.isoformat(),
"ended_at": conv.ended_at.isoformat() if conv.ended_at else None,
"duration_seconds": conv.duration_seconds,
"summary": conv.summary,
"status": conv.status,
"current_topic": conv.current_topic,
"conversation_stage": conv.conversation_stage,
}
async def end(self, conversation_id: str, user_id: str) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
conv.status = "ended"
conv.ended_at = datetime.now(timezone.utc)
if conv.started_at:
conv.duration_seconds = int(
(conv.ended_at - conv.started_at).total_seconds()
)
await self._db.commit()
return {
"id": conv.id,
"status": conv.status,
"ended_at": conv.ended_at.isoformat(),
"duration_seconds": conv.duration_seconds,
}
async def delete(self, conversation_id: str, user_id: str) -> None:
conv = await self.get_or_404(conversation_id, user_id)
await self._clear_history(conversation_id)
await self._db.delete(conv)
await self._db.commit()
async def get_messages(self, conversation_id: str, user_id: str) -> list[dict]:
conv = await self.get_or_404(conversation_id, user_id)
try:
history = await self._get_history(conversation_id)
return _build_messages_from_history(
conversation_id=conversation_id,
history=history,
fallback_timestamp=conv.started_at,
)
except Exception:
return []
async def organize(
self, conversation_id: str, user_id: str, subscription_type: str
) -> dict:
conv = await self.get_or_404(conversation_id, user_id)
segments = await repo.get_segments_for_organize(conversation_id, self._db)
if not segments:
raise HTTPException(status_code=400, detail="该对话没有可整理的内容")
can_submit, quota_message = await self._quota.check_can_submit_organize(
user_id, subscription_type
)
if not can_submit:
raise HTTPException(status_code=403, detail=quota_message)
segment_ids = [s.id for s in segments]
process_memoir_segments.delay(conv.user_id, segment_ids)
logger.info(
"手动触发对话整理: conversation_id=%s, segments=%s",
conversation_id,
len(segment_ids),
)
return {
"message": "对话整理任务已提交",
"conversation_id": conversation_id,
"segments_count": len(segment_ids),
}