Files
life-echo/api/app/core/task_tracker.py
Kevin a3f61fcc0f feat(api+app): 对话阶段化、回忆录流水线与客户端会话体验
- DB: segments 用户输入文本(Alembic 0002)
- Chat: 阶段检测/阶段提示/回复限制,编排与访谈/画像 prompts 调整
- Memoir: 忠实度检查 agent,叙事与分类等链路更新
- Core: agent 日志、Alembic 启动、LangChain/日志/配置等
- Story: time_hints;Memory 检索与相关测试
- Expo: 助手头像、会话页与消息拆分、实时会话与文案/i18n
- Docs/scripts/tests: 迁移脚本、LLM JSON/记忆检索文档、新增单测
2026-03-26 12:13:36 +08:00

118 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
任务追踪服务:追踪 Celery 任务状态(从 services 迁入 core
"""
import json
from app.core.logging import get_logger
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from app.core.redis import redis_service
logger = get_logger(__name__)
class TaskTracker:
"""任务追踪器,使用 Redis 存储任务状态"""
KEY_PREFIX = "task:user:"
TASK_TTL = 3600
async def add_task(
self, user_id: str, task_id: str, task_type: str = "memoir"
) -> bool:
try:
client = await redis_service.get_client()
key = f"{self.KEY_PREFIX}{user_id}:tasks"
task_info = {
"task_id": task_id,
"task_type": task_type,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}
await client.hset(key, task_id, json.dumps(task_info))
await client.expire(key, self.TASK_TTL)
logger.debug("任务已记录: user_id={}, task_id={}", user_id, task_id)
return True
except Exception as e:
logger.error("记录任务失败: {}", e)
return False
async def update_task_status(
self, user_id: str, task_id: str, status: str, result: Any = None
) -> bool:
try:
client = await redis_service.get_client()
key = f"{self.KEY_PREFIX}{user_id}:tasks"
data = await client.hget(key, task_id)
if data:
task_info = json.loads(data)
else:
task_info = {"task_id": task_id}
task_info["status"] = status
task_info["updated_at"] = datetime.now(timezone.utc).isoformat()
if result is not None:
task_info["result"] = result
await client.hset(key, task_id, json.dumps(task_info))
return True
except Exception as e:
logger.error("更新任务状态失败: {}", e)
return False
async def get_user_tasks(self, user_id: str) -> List[Dict]:
try:
client = await redis_service.get_client()
key = f"{self.KEY_PREFIX}{user_id}:tasks"
tasks_data = await client.hgetall(key)
return [json.loads(data) for data in tasks_data.values()]
except Exception as e:
logger.error("获取用户任务失败: {}", e)
return []
async def get_pending_tasks(self, user_id: str) -> List[Dict]:
tasks = await self.get_user_tasks(user_id)
return [t for t in tasks if t.get("status") in ("pending", "running")]
async def check_tasks_status(self, user_id: str) -> Dict:
tasks = await self.get_user_tasks(user_id)
status_counts: Dict[str, int] = {
"total": len(tasks),
"pending": 0,
"running": 0,
"success": 0,
"failure": 0,
}
for task in tasks:
status = task.get("status", "pending")
if status in status_counts:
status_counts[status] += 1
status_counts["all_completed"] = (
status_counts["total"] > 0
and status_counts["pending"] == 0
and status_counts["running"] == 0
)
return status_counts
async def clear_user_tasks(self, user_id: str) -> bool:
try:
client = await redis_service.get_client()
key = f"{self.KEY_PREFIX}{user_id}:tasks"
await client.delete(key)
return True
except Exception as e:
logger.error("清除用户任务失败: {}", e)
return False
async def remove_task(self, user_id: str, task_id: str) -> bool:
try:
client = await redis_service.get_client()
key = f"{self.KEY_PREFIX}{user_id}:tasks"
await client.hdel(key, task_id)
return True
except Exception as e:
logger.error("移除任务失败: {}", e)
return False
task_tracker = TaskTracker()