53 lines
1.5 KiB
Python
53 lines
1.5 KiB
Python
"""Tencent COS adapter — implements ObjectStorage port."""
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
from qcloud_cos import CosConfig, CosS3Client
|
|
from qcloud_cos.cos_exception import CosClientError, CosServiceError
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class TencentCosStorage:
|
|
def __init__(
|
|
self,
|
|
secret_id: str,
|
|
secret_key: str,
|
|
region: str,
|
|
bucket: str,
|
|
base_url: str = "",
|
|
token: str = "",
|
|
):
|
|
self._bucket = bucket
|
|
self._base_url = (base_url or f"https://{bucket}.cos.{region}.myqcloud.com").rstrip("/")
|
|
config = CosConfig(
|
|
Region=region,
|
|
SecretId=secret_id,
|
|
SecretKey=secret_key,
|
|
Token=token,
|
|
Scheme="https",
|
|
)
|
|
self._client = CosS3Client(config)
|
|
|
|
def upload(self, key: str, data: bytes, content_type: str) -> str:
|
|
self._client.put_object(
|
|
Bucket=self._bucket,
|
|
Body=data,
|
|
Key=key,
|
|
ContentType=content_type,
|
|
)
|
|
return f"{self._base_url}/{key}"
|
|
|
|
def get_url(self, key: str, expires: int = 3600) -> str:
|
|
return self._client.get_presigned_download_url(
|
|
Bucket=self._bucket,
|
|
Key=key,
|
|
Expired=expires,
|
|
)
|
|
|
|
def delete(self, key: str) -> None:
|
|
try:
|
|
self._client.delete_object(Bucket=self._bucket, Key=key)
|
|
except (CosClientError, CosServiceError) as e:
|
|
logger.warning("COS delete failed: key=%s, error=%s", key, e)
|