44 lines
1.3 KiB
Python
44 lines
1.3 KiB
Python
|
|
"""从 URL 解析当前环境腾讯云 COS object key(仅当 host 与配置一致时)。"""
|
|||
|
|
|
|||
|
|
from urllib.parse import urlparse
|
|||
|
|
|
|||
|
|
from app.core.config import settings
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_cos_object_key_if_owned(url: str | None) -> str | None:
|
|||
|
|
"""
|
|||
|
|
若 url 指向 settings 中配置的 COS 域名,返回去掉前导 / 的 object key。
|
|||
|
|
非 http(s)、或 host 不匹配时返回 None。
|
|||
|
|
"""
|
|||
|
|
if not url:
|
|||
|
|
return None
|
|||
|
|
s = str(url).strip()
|
|||
|
|
if not s.startswith(("http://", "https://")):
|
|||
|
|
return None
|
|||
|
|
parsed = urlparse(s)
|
|||
|
|
host = (parsed.netloc or "").lower()
|
|||
|
|
if not host:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
candidates: list[str] = []
|
|||
|
|
bucket = (settings.tencent_cos_bucket or "").strip().lower()
|
|||
|
|
region = (settings.tencent_cos_region or "").strip().lower()
|
|||
|
|
if bucket and region:
|
|||
|
|
candidates.append(f"{bucket}.cos.{region}.myqcloud.com")
|
|||
|
|
base = (settings.tencent_cos_base_url or "").strip()
|
|||
|
|
if base:
|
|||
|
|
base_parsed = urlparse(base if "://" in base else f"https://{base}")
|
|||
|
|
bh = (base_parsed.netloc or "").lower()
|
|||
|
|
if bh:
|
|||
|
|
candidates.append(bh)
|
|||
|
|
|
|||
|
|
if not candidates:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
matched = any(host == c for c in candidates if c)
|
|||
|
|
if not matched:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
key = (parsed.path or "").lstrip("/")
|
|||
|
|
return key or None
|