Files
life-echo/api/app/adapters/storage/tencent_cos.py
Kevin 309a051038 feat: 回忆录证据血缘与内部评测可追溯,顺带对齐本地评测台与 CI
数据库与模型:新增多版迁移(章节证据快照、对话血缘、记忆事实/时间线 lineage 等),把「成稿 ↔ 对话/记忆」的溯源信息落到表结构里。
业务链路:会话与 WS、回忆录/故事流水线、记忆写入与 enrichment 等跟着接上线索与快照;新增章节证据快照与评测侧 EvalTraceService 等模块,方便组评审用的证据包。
内部评测:自动化 run 与手工 memoir 评审共用可追溯证据;rubric/ judge 相关脚本与文档有配套调整。
app-eval-web:Memoir/实验详情里能展开看证据摘要与 evidence_trace(含对话轮次 id);Vite 代理与 development.sh 注入的 API 端口与当前默认内部评测端口一致,避免改端口后页面连错服务。
工程杂项:GitHub Actions / 仓库说明有更新;各适配器与支付/配额/plan 等多处为小改动或跟随主改动的收尾;新增/扩充了?
2026-04-08 15:37:09 +08:00

55 lines
1.5 KiB
Python

"""Tencent COS adapter — implements ObjectStorage port."""
from qcloud_cos import CosConfig, CosS3Client
from qcloud_cos.cos_exception import CosClientError, CosServiceError
from app.core.logging import get_logger
logger = get_logger(__name__)
class TencentCosStorage:
def __init__(
self,
secret_id: str,
secret_key: str,
region: str,
bucket: str,
base_url: str = "",
token: str = "",
):
self._bucket = bucket
self._base_url = (
base_url or f"https://{bucket}.cos.{region}.myqcloud.com"
).rstrip("/")
config = CosConfig(
Region=region,
SecretId=secret_id,
SecretKey=secret_key,
Token=token,
Scheme="https",
)
self._client = CosS3Client(config)
def upload(self, key: str, data: bytes, content_type: str) -> str:
self._client.put_object(
Bucket=self._bucket,
Body=data,
Key=key,
ContentType=content_type,
)
return f"{self._base_url}/{key}"
def get_url(self, key: str, expires: int = 3600) -> str:
return self._client.get_presigned_download_url(
Bucket=self._bucket,
Key=key,
Expired=expires,
)
def delete(self, key: str) -> None:
try:
self._client.delete_object(Bucket=self._bucket, Key=key)
except (CosClientError, CosServiceError) as e:
logger.warning("COS delete failed: key={}, error={}", key, e)