"""Tencent COS adapter — implements ObjectStorage port.""" from qcloud_cos import CosConfig, CosS3Client from qcloud_cos.cos_exception import CosClientError, CosServiceError from app.core.logging import get_logger logger = get_logger(__name__) class TencentCosStorage: def __init__( self, secret_id: str, secret_key: str, region: str, bucket: str, base_url: str = "", token: str = "", ): self._bucket = bucket self._base_url = ( base_url or f"https://{bucket}.cos.{region}.myqcloud.com" ).rstrip("/") config = CosConfig( Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme="https", ) self._client = CosS3Client(config) def upload(self, key: str, data: bytes, content_type: str) -> str: self._client.put_object( Bucket=self._bucket, Body=data, Key=key, ContentType=content_type, ) return f"{self._base_url}/{key}" def get_url(self, key: str, expires: int = 3600) -> str: return self._client.get_presigned_download_url( Bucket=self._bucket, Key=key, Expired=expires, ) def delete(self, key: str) -> None: try: self._client.delete_object(Bucket=self._bucket, Key=key) except (CosClientError, CosServiceError) as e: logger.warning("COS delete failed: key={}, error={}", key, e)