Merge origin/development into main

Keep origin/main env files (api/.env.staging, app-expo/.env.*); take development branch code changes only.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Kevin
2026-05-19 15:47:21 +08:00
62 changed files with 3473 additions and 607 deletions

View File

@@ -14,6 +14,9 @@ logger = get_logger(__name__)
# 客户端再读 TTS / 拉取音频:预签名有效期(秒),与移动端会话长度匹配
TTS_PRESIGNED_EXPIRES_SEC = 86_400
# 用户头像 API 下发(与 TTS 一致)
AVATAR_PRESIGNED_EXPIRES_SEC = TTS_PRESIGNED_EXPIRES_SEC
def extract_cos_object_key_if_owned(url: str | None) -> str | None:
"""
@@ -119,3 +122,50 @@ def presign_tts_urls_for_playback(
else:
out.append(s)
return out
def avatar_url_for_api_response(stored_url: str | None) -> str | None:
"""DB 中的头像 URL本环境 COS 直链改为预签名下载 URL。"""
if stored_url is None:
return None
s = str(stored_url).strip()
if not s:
return None
key = extract_cos_object_key_if_owned(s)
if not key:
return s
if not (
(settings.tencent_cos_secret_id or "").strip()
and (settings.tencent_cos_bucket or "").strip()
):
return s
from app.core.dependencies import get_object_storage
storage = get_object_storage()
try:
return storage.get_url(key, expires=AVATAR_PRESIGNED_EXPIRES_SEC)
except Exception as exc:
logger.warning(
"presign avatar url failed, keeping original: key={} err={}",
key,
exc,
)
return s
def best_effort_delete_cos_object_for_url(url: str | None) -> None:
"""本环境 COS 对象则尽力 delete换头像 / 预设时清理)。"""
key = extract_cos_object_key_if_owned(url)
if not key:
return
if not (
(settings.tencent_cos_secret_id or "").strip()
and (settings.tencent_cos_bucket or "").strip()
):
return
from app.core.dependencies import get_object_storage
try:
get_object_storage().delete(key)
except Exception as exc:
logger.warning("delete cos avatar object failed: key={} err={}", key, exc)

View File

@@ -1,4 +1,5 @@
import io
import time
from pathlib import Path
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
@@ -6,6 +7,11 @@ from fastapi.responses import FileResponse
from PIL import Image
from app.core.config import settings
from app.core.cos_url_keys import (
avatar_url_for_api_response,
best_effort_delete_cos_object_for_url,
extract_cos_object_key_if_owned,
)
from app.core.dependencies import get_current_user
from app.core.logging import get_logger
from app.features.auth.deps import get_auth_service
@@ -45,7 +51,6 @@ router = APIRouter(
)
AVATAR_DIR = Path("uploads/avatars")
AVATAR_DIR.mkdir(parents=True, exist_ok=True)
# ── helpers ──────────────────────────────────────────────────
@@ -74,7 +79,7 @@ def _user_response(user: User) -> UserResponse:
phone=user.phone,
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url,
avatar_url=avatar_url_for_api_response(user.avatar_url),
subscription_type=user.subscription_type,
created_at=user.created_at.isoformat(),
language_preference=lang,
@@ -277,9 +282,17 @@ async def upload_avatar(
len(file_content),
)
try:
AVATAR_DIR.mkdir(parents=True, exist_ok=True)
if not (
(settings.tencent_cos_secret_id or "").strip()
and (settings.tencent_cos_secret_key or "").strip()
and (settings.tencent_cos_bucket or "").strip()
):
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="头像存储服务未配置,请稍后再试",
)
try:
image_bytes = io.BytesIO(file_content)
image_bytes.seek(0)
@@ -325,13 +338,28 @@ async def upload_avatar(
if size > 512:
image = image.resize((512, 512), Image.Resampling.LANCZOS)
file_extension = "jpg"
filename = f"{current_user.id}.{file_extension}"
file_path = AVATAR_DIR / filename
jpeg_buffer = io.BytesIO()
image.save(jpeg_buffer, format="JPEG", quality=85, optimize=True)
jpeg_bytes = jpeg_buffer.getvalue()
image.save(file_path, "JPEG", quality=85, optimize=True)
cos_key = f"avatars/{current_user.id}.jpg"
old_url = current_user.avatar_url
old_key = extract_cos_object_key_if_owned(old_url) if old_url else None
if old_key and old_key != cos_key:
best_effort_delete_cos_object_for_url(old_url)
from app.core.dependencies import get_object_storage
storage = get_object_storage()
try:
avatar_url = storage.upload(cos_key, jpeg_bytes, "image/jpeg")
except Exception as exc:
logger.exception("COS 头像上传失败: {}", exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="头像存储暂时不可用,请稍后再试",
) from exc
avatar_url = f"/api/auth/avatars/{filename}"
user = await service.update_avatar_url(current_user.id, avatar_url)
return _user_response(user)
except HTTPException:
@@ -379,7 +407,9 @@ async def set_avatar_preset(
status_code=status.HTTP_400_BAD_REQUEST,
detail="预设头像不可用",
)
avatar_url = avatar_url_for_preset_filename(filename)
best_effort_delete_cos_object_for_url(current_user.avatar_url)
avatar_url = f"{avatar_url_for_preset_filename(filename)}?v={time.time_ns()}"
try:
user = await service.update_avatar_url(current_user.id, avatar_url)
except AuthError as e:
@@ -408,6 +438,7 @@ async def get_avatar_preset(filename: str):
responses={404: {"description": "头像不存在"}},
)
async def get_avatar(filename: str):
AVATAR_DIR.mkdir(parents=True, exist_ok=True)
file_path = safe_avatar_upload_path(filename, AVATAR_DIR)
if file_path is None or not file_path.exists():
raise HTTPException(

View File

@@ -51,6 +51,10 @@ async def clear_user_demographics(db: AsyncSession, user_id: str) -> None:
)
async def clear_user_avatar_url(db: AsyncSession, user_id: str) -> None:
await db.execute(update(User).where(User.id == user_id).values(avatar_url=None))
async def collect_purge_context(
db: AsyncSession, user_id: str
) -> tuple[list[str], list[str], list[str]]:
@@ -130,6 +134,13 @@ async def collect_object_storage_keys_before_purge(
tts_urls if isinstance(tts_urls, list) else None
)
r_av = await db.execute(select(User.avatar_url).where(User.id == user_id))
avatar_url_val = r_av.scalar_one_or_none()
if avatar_url_val:
ak = extract_cos_object_key_if_owned(avatar_url_val)
if ak:
keys.add(ak)
return sorted(keys)

View File

@@ -3,6 +3,7 @@ import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from app.core.config import settings
from app.core.cos_url_keys import avatar_url_for_api_response
from app.core.dependencies import get_current_user, get_object_storage
from app.core.logging import get_logger
from app.features.user.deps import get_user_service
@@ -50,7 +51,7 @@ async def get_user_profile(
phone=current_user.phone,
email=current_user.email,
nickname=current_user.nickname,
avatar_url=current_user.avatar_url,
avatar_url=avatar_url_for_api_response(current_user.avatar_url),
subscription_type=current_user.subscription_type,
created_at=current_user.created_at.isoformat(),
birth_year=current_user.birth_year,

View File

@@ -2,6 +2,7 @@ from datetime import timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.cos_url_keys import avatar_url_for_api_response
from app.core.db import utc_now
from app.core.logging import get_logger
from app.core.redis import redis_service
@@ -32,7 +33,7 @@ def _user_to_profile(user: User) -> UserProfileResponse:
phone=user.phone,
email=user.email,
nickname=user.nickname,
avatar_url=user.avatar_url,
avatar_url=avatar_url_for_api_response(user.avatar_url),
subscription_type=user.subscription_type,
created_at=user.created_at.isoformat(),
birth_year=user.birth_year,
@@ -121,6 +122,7 @@ class UserService:
await repo.purge_user_related_rows(self._db, user_id)
await repo.clear_user_demographics(self._db, user_id)
await repo.clear_user_avatar_url(self._db, user_id)
await self._db.commit()
logger.info("用户数据 DB 行已删除、档案字段已清空并提交 user_id={}", user_id)

View File

@@ -0,0 +1,584 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
软著申报用:源码整理生成 PDFreportlab
默认扫描整个 monorepo 根目录(本脚本位于 api/scripts/,向上两级即为仓库根),
会包含后端 api、Expo 客户端 app-expo、评测 Web app-eval-web 等;也可用 --root 只扫某一子项目。
使用说明(简要):
1) 安装依赖:在 api 目录执行 uv sync本仓库已含 reportlab
2) 中文字体:可设置 CJK_MONO_FONT_PATH默认可用 STSong-LightAdobe CID。苹方/冬青等常为 CFF
ReportLab 无法直接嵌入。使用 STSong-Light 时ASCII含空格、英文代码用内置 Courier 分段绘制,
中文用 STSong避免整行用 CID 时拉丁字距错位、挤在一起。
需要退回纯 TrueType 单字体Menlo 等)时用 --no-cid。
3) 修改下方「配置区」常量:软件全称、版本号、必要时改 SOURCE_ROOT / 输出路径、字体路径、后缀与跳过目录
4) 运行(全仓默认,仅 .py / .ts(x) / .js(x) / .vue 等源码cd api && uv run python scripts/copyright_source_pdf.py
仅后端示例:... --root ../api
注意:
- 空行会从统计与输出中剔除;行首行尾以外的空白(缩进)保留。
- 总页数按 ceil(非空行总数/50) 估算;>60 页则取前 1500 行 + 后 1500 行,最终 60 页、页码 160。
- 不足 60 页时输出全部非空行;最后一页若不足 50 行,用空行补齐到 50 行。
"""
from __future__ import annotations
import argparse
import math
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import cm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfgen import canvas
# 本文件位于 <repo>/api/scripts/,仓库根为其上两级目录
REPO_ROOT = Path(__file__).resolve().parents[2]
# =========================
# 配置区(按项目修改;也可用命令行覆盖部分项)
# =========================
SOFTWARE_FULL_NAME = "岁月留书"
VERSION = "V1.0.0"
# 默认整仓源码根api + 各前端/工具子项目);仅申报名某一模块时可改为子路径或 CLI --root
SOURCE_ROOT = REPO_ROOT
OUTPUT_PDF = REPO_ROOT / "copyright_source_listing.pdf"
# PDF 页眉/材料中出现的文件路径:将本机目录前缀替换为申报用单位根(如 上海华嘎科技有限公司/life-echo/...
FILE_PATH_ABSOLUTE_PREFIX = Path("/Users/kevin/Codes/hgtk")
FILE_PATH_DISPLAY_ROOT = "上海华嘎科技有限公司"
# 设为 None 时自动探测macOSMenlo.ttc / Songti.ttc 等);也可填本机 TTF/OTF 或 TTC 路径
CJK_MONO_FONT_PATH: Path | None = None
# STSong-LightCID与拉丁混排时 ReportLab 常把 ASCII 字距算错ASCII 段单独用内置 Courier。
ASCII_LATIN_FONT = "Courier"
@dataclass(frozen=True)
class RenderFonts:
"""primary中文等非 ASCIIlatin_font 非空时 U+0000U+007F 用 Courier。"""
primary: str
latin_font: str | None = None
def _segment_latin_cjk(line: str) -> list[tuple[str, str]]:
if not line:
return []
out: list[tuple[str, str]] = []
buf: list[str] = []
is_lat = ord(line[0]) < 128
for ch in line:
lat = ord(ch) < 128
if lat != is_lat:
out.append(("lat" if is_lat else "cjk", "".join(buf)))
buf = [ch]
is_lat = lat
else:
buf.append(ch)
out.append(("lat" if is_lat else "cjk", "".join(buf)))
return out
def _mixed_line_width(line: str, fonts: RenderFonts, font_size: float) -> float:
if fonts.latin_font is None:
return pdfmetrics.stringWidth(line, fonts.primary, font_size)
w = 0.0
for kind, chunk in _segment_latin_cjk(line):
fn = fonts.latin_font if kind == "lat" else fonts.primary
w += pdfmetrics.stringWidth(chunk, fn, font_size)
return w
def _draw_line(
c: canvas.Canvas,
x: float,
y: float,
line: str,
fonts: RenderFonts,
font_size: float,
) -> None:
if fonts.latin_font is None:
c.setFont(fonts.primary, font_size)
c.drawString(x, y, line)
return
xpos = x
for kind, chunk in _segment_latin_cjk(line):
if not chunk:
continue
fn = fonts.latin_font if kind == "lat" else fonts.primary
c.setFont(fn, font_size)
c.drawString(xpos, y, chunk)
xpos += pdfmetrics.stringWidth(chunk, fn, font_size)
# 仅收录「可执行/可编译」源码:本仓为 Python + TS/JS 栈;排除 .md、.json、.yaml 等配置与文档。
# 其它语言可自行向集合内追加后缀。
INCLUDE_SUFFIXES: set[str] = {
".py",
".pyi",
".ts",
".tsx",
".mts",
".cts",
".js",
".jsx",
".mjs",
".cjs",
".vue",
}
# 需要跳过的目录名(只比对路径每一段 name
SKIP_DIR_NAMES: set[str] = {
".git",
".svn",
".hg",
"node_modules",
"venv",
".venv",
"env",
".env",
".idea",
".vscode",
"__pycache__",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".next",
".nuxt",
".output",
".turbo",
".parcel-cache",
".expo",
"dist",
"build",
"target",
"out",
"coverage",
"htmlcov",
"storybook-static",
"Pods",
".gradle",
"DerivedData",
}
# 是否在文件边界插入标记行
ADD_FILE_MARKERS = True
FILE_MARKER_PREFIX = "// ===== "
LINES_PER_PAGE = 50
MAX_PAGES = 60
# 苹方在多数 macOS 上为 .ttc 内嵌 CFF 轮廓ReportLab TTFont 无法载入;仍会先尝试以下路径(以防特例)。
_PINGFANG_CANDIDATES: tuple[Path, ...] = (
Path("/System/Library/Fonts/PingFang.ttc"),
Path(
"/System/Library/Fonts/Hiragino Sans GB.ttc"
), # 冬青黑体,常与苹方同源 CFF多半失败
Path("/Library/Fonts/PingFang.ttc"),
)
def _try_register_stsong_cid() -> bool:
try:
pdfmetrics.registerFont(UnicodeCIDFont("STSong-Light"))
return True
except Exception:
return False
def _register_stsong_cid_with_notice(*, pingfang_failed: bool) -> str:
if not _try_register_stsong_cid():
return ""
if pingfang_failed:
print(
"已载入 PDF 简体中文STSong-LightAdobe Unicode CID"
"说明:系统「苹方/冬青」等常为 OpenType-CFFReportLab 无法写入该轮廓;"
"已改用内置宋体轮廓以保证中文可见(非苹方字形,软著材料通常可接受)。",
file=sys.stderr,
)
else:
print(
"已载入 PDF 简体中文STSong-LightAdobe Unicode CID",
file=sys.stderr,
)
return "STSong-Light"
# macOSTrueType 轮廓 .ttcMenlo、部分宋体/黑体);苹方多为 CFF见上方说明。
_MACOS_TTC_CANDIDATES: tuple[tuple[Path, int], ...] = (
(Path("/System/Library/Fonts/Menlo.ttc"), 0),
(Path("/System/Library/Fonts/Supplemental/Songti.ttc"), 0),
(Path("/System/Library/Fonts/STHeiti Light.ttc"), 0),
(Path("/System/Library/Fonts/STHeiti Medium.ttc"), 0),
)
def _try_register_ttf_or_ttc(path: Path, font_name: str = "CodeCJK") -> bool:
"""将字体注册为 font_name.ttc 递增 subfontIndex 直至成功或无更多子字体。"""
if not path.is_file():
return False
if path.suffix.lower() == ".ttc":
idx = 0
while idx < 64:
try:
pdfmetrics.registerFont(TTFont(font_name, str(path), subfontIndex=idx))
return True
except Exception as e:
msg = str(e).lower()
if "bad subfontindex" in msg:
break
if "subfontindex" in msg and "not in" in msg:
break
idx += 1
return False
try:
pdfmetrics.registerFont(TTFont(font_name, str(path)))
return True
except Exception:
return False
def register_font(user_path: Path | None, *, no_cid: bool = False) -> RenderFonts:
"""选择正文字体STSong CID 时同时返回 Courier 供拉丁混排。"""
font_tt = "CodeCJK"
if user_path is not None and user_path.is_file():
if _try_register_ttf_or_ttc(user_path, font_tt):
print(f"已载入 PDF 字体:{user_path}", file=sys.stderr)
return RenderFonts(primary=font_tt)
pingfang_failed = False
if sys.platform == "darwin":
for p in _PINGFANG_CANDIDATES:
if _try_register_ttf_or_ttc(p, font_tt):
print(f"已载入 PDF 字体(苹方/平方相关):{p}", file=sys.stderr)
return RenderFonts(primary=font_tt)
pingfang_failed = any(p.is_file() for p in _PINGFANG_CANDIDATES)
if not no_cid:
cid_name = _register_stsong_cid_with_notice(pingfang_failed=pingfang_failed)
if cid_name:
return RenderFonts(primary=cid_name, latin_font=ASCII_LATIN_FONT)
if sys.platform == "darwin":
for ttc_path, sub in _MACOS_TTC_CANDIDATES:
if not ttc_path.is_file():
continue
try:
pdfmetrics.registerFont(
TTFont(font_tt, str(ttc_path), subfontIndex=sub)
)
label = "系统等宽" if "Menlo" in ttc_path.name else "系统字体"
print(
f"已载入 PDF 字体macOS TrueType{ttc_path}{label}",
file=sys.stderr,
)
return RenderFonts(primary=font_tt)
except Exception:
continue
for p in (
Path("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"),
Path("/usr/share/fonts/truetype/noto/NotoSansMonoCJK-Regular.ttf"),
):
if _try_register_ttf_or_ttc(p, font_tt):
print(f"已载入 PDF 字体Linux 常见路径):{p}", file=sys.stderr)
return RenderFonts(primary=font_tt)
if no_cid:
print(
"警告:已指定 --no-cid 且未找到可用 TrueType 轮廓字体,将使用 Courier。",
file=sys.stderr,
)
else:
print(
"警告:未找到可用的中文矢量字体,将使用 Courier中文可能无法显示",
file=sys.stderr,
)
return RenderFonts(primary="Courier")
@dataclass(frozen=True)
class ExtractResult:
lines: list[str]
pages: int
capped: bool
original_non_empty_lines: int
original_pages_ceil: int
def should_skip_dir(name: str) -> bool:
return name in SKIP_DIR_NAMES
def iter_source_files(root: Path) -> Iterable[Path]:
for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
dirnames[:] = [d for d in dirnames if not should_skip_dir(d)]
for fn in filenames:
p = Path(dirpath) / fn
if p.suffix.lower() in INCLUDE_SUFFIXES:
yield p
def path_for_pdf_listing(fp: Path) -> str:
"""将绝对路径转为 PDF 展示路径:.../hgtk/foo -> 上海华嘎科技有限公司/foo。"""
try:
resolved = fp.resolve()
base = FILE_PATH_ABSOLUTE_PREFIX.expanduser().resolve()
rel = resolved.relative_to(base)
return f"{FILE_PATH_DISPLAY_ROOT}/{rel.as_posix()}"
except ValueError:
return fp.as_posix()
def read_nonempty_lines_from_files(files: list[Path]) -> list[str]:
out: list[str] = []
for fp in files:
if ADD_FILE_MARKERS:
out.append(f"{FILE_MARKER_PREFIX}{path_for_pdf_listing(fp)} =====")
try:
text = fp.read_text(encoding="utf-8", errors="replace")
except OSError:
text = fp.read_text(encoding="latin-1", errors="replace")
for raw in text.splitlines():
line = raw.rstrip("\n")
if line.strip() == "":
continue
out.append(line)
return out
def extract_for_pdf(
lines: list[str],
*,
lines_per_page: int = LINES_PER_PAGE,
max_pages: int = MAX_PAGES,
) -> ExtractResult:
original_non_empty_lines = len(lines)
original_pages_ceil = max(1, math.ceil(original_non_empty_lines / lines_per_page))
capped = original_pages_ceil > max_pages
if capped:
first_n = lines_per_page * (max_pages // 2)
last_n = lines_per_page * (max_pages // 2)
selected = lines[:first_n] + lines[-last_n:]
else:
selected = list(lines)
pages = max(1, math.ceil(len(selected) / lines_per_page))
return ExtractResult(
lines=selected,
pages=pages,
capped=capped,
original_non_empty_lines=original_non_empty_lines,
original_pages_ceil=original_pages_ceil,
)
def pad_to_multiple(lines: list[str], lines_per_page: int) -> list[str]:
if not lines:
return [""] * lines_per_page
remainder = len(lines) % lines_per_page
if remainder == 0:
return lines
return lines + [""] * (lines_per_page - remainder)
def fit_line_to_width(
line: str, fonts: RenderFonts, font_size: float, max_width: float
) -> str:
if _mixed_line_width(line, fonts, font_size) <= max_width:
return line
lo, hi = 0, len(line)
while lo < hi:
mid = (lo + hi + 1) // 2
if _mixed_line_width(line[:mid], fonts, font_size) <= max_width:
lo = mid
else:
hi = mid - 1
ell = ""
ell_w = _mixed_line_width(ell, fonts, font_size)
while lo > 0 and _mixed_line_width(line[:lo], fonts, font_size) + ell_w > max_width:
lo -= 1
if lo <= 0:
return ell
return line[:lo] + ell
def draw_pdf(
out_path: Path,
all_lines: list[str],
*,
fonts: RenderFonts,
lines_per_page: int,
software_name: str,
version: str,
font_size: float = 8.5,
) -> None:
page_w, page_h = A4
left = 2 * cm
right = 2 * cm
top = 2 * cm
bottom = 2 * cm
sep_gap = 0.25 * cm
c = canvas.Canvas(str(out_path), pagesize=A4)
c.setTitle(f"{software_name} {version} — 源代码节选")
c.setAuthor(software_name)
header_baseline_y = page_h - top - 0.35 * cm
sep_y = header_baseline_y - 0.35 * cm
content_top = sep_y - sep_gap
content_bottom = bottom + 0.5 * cm
if lines_per_page <= 1:
line_step = max(content_top - content_bottom, 1.0)
else:
line_step = (content_top - content_bottom) / (lines_per_page - 1)
max_text_width = page_w - left - right
truncated_count = 0
n = len(all_lines)
if n % lines_per_page != 0:
raise ValueError("all_lines 长度必须是 lines_per_page 的整数倍(请先 pad")
total_pages = n // lines_per_page
for page_no in range(1, total_pages + 1):
left_text = f"{software_name} {version}"
_draw_line(c, left, header_baseline_y, left_text, fonts, font_size)
page_label = str(page_no)
page_font = fonts.latin_font or fonts.primary
pw = pdfmetrics.stringWidth(page_label, page_font, font_size)
c.setFont(page_font, font_size)
c.drawString(page_w - right - pw, header_baseline_y, page_label)
c.setLineWidth(0.3)
c.line(left, sep_y, page_w - right, sep_y)
start = (page_no - 1) * lines_per_page
page_slice = all_lines[start : start + lines_per_page]
for i, raw in enumerate(page_slice):
y = content_top - i * line_step
fitted = fit_line_to_width(raw, fonts, font_size, max_text_width)
if fitted != raw and raw:
truncated_count += 1
_draw_line(c, left, y, fitted, fonts, font_size)
c.showPage()
if truncated_count:
print(
f"提示:共有 {truncated_count} 行因超宽被截断(可减小字号或换更窄字体)。",
file=sys.stderr,
)
c.save()
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="将源码目录整理为软著申报用 PDFreportlab",
)
p.add_argument(
"--root",
type=Path,
default=None,
help="源码根目录(默认使用脚本内 SOURCE_ROOT",
)
p.add_argument(
"--out",
type=Path,
default=None,
help="输出 PDF 路径(默认使用脚本内 OUTPUT_PDF",
)
p.add_argument(
"--font",
type=Path,
default=None,
help="CJK 等宽字体 TTF/OTF默认使用脚本内 CJK_MONO_FONT_PATH",
)
p.add_argument(
"--name",
default=None,
help="软件全称(默认脚本内 SOFTWARE_FULL_NAME",
)
p.add_argument(
"--version",
dest="version_str",
default=None,
help="版本号(默认脚本内 VERSION",
)
p.add_argument(
"--font-size",
type=float,
default=8.3,
help="正文字号(略小可减少超长行截断,默认 8.3",
)
p.add_argument(
"--no-cid",
action="store_true",
help="不使用 Adobe STSong-LightCID仅尝试 TrueType 轮廓Menlo 等),中文更易缺失",
)
return p.parse_args()
def main() -> int:
args = parse_args()
root = args.root or SOURCE_ROOT
out_pdf = args.out or OUTPUT_PDF
font_path = args.font
if font_path is None:
font_path = CJK_MONO_FONT_PATH
name = args.name or SOFTWARE_FULL_NAME
ver = args.version_str or VERSION
if not root.is_dir():
print(f"错误:源码根目录不存在或不是目录:{root}", file=sys.stderr)
return 1
files = sorted(iter_source_files(root), key=lambda p: p.as_posix())
lines = read_nonempty_lines_from_files(files)
result = extract_for_pdf(lines)
padded = pad_to_multiple(result.lines, LINES_PER_PAGE)
fonts = register_font(font_path, no_cid=args.no_cid)
out_pdf.parent.mkdir(parents=True, exist_ok=True)
draw_pdf(
out_pdf,
padded,
fonts=fonts,
lines_per_page=LINES_PER_PAGE,
software_name=name,
version=ver,
font_size=args.font_size,
)
out_pages = len(padded) // LINES_PER_PAGE
print(
"完成:",
f"源非空行 {result.original_non_empty_lines}(按 50 行/页约 {result.original_pages_ceil} 页)",
f"节选后 {len(result.lines)}",
f"输出 {out_pdf}{out_pages}",
f" capped={result.capped}",
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -4,12 +4,14 @@ from __future__ import annotations
import uuid
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from PIL import Image
from app.core.dependencies import get_current_user
from app.features.auth.deps import get_auth_service
@@ -27,6 +29,7 @@ def _mock_current_user() -> User:
u.avatar_url = None
u.subscription_type = "free"
u.created_at = datetime.now(timezone.utc)
u.language_preference = "zh"
return u
@@ -91,7 +94,6 @@ async def test_get_avatar_preset_path_traversal(preset_auth_app: FastAPI) -> Non
@pytest.mark.asyncio
async def test_set_avatar_preset_ok(preset_auth_app: FastAPI) -> None:
uid = preset_auth_app.state._fixed_user.id
transport = ASGITransport(app=preset_auth_app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
r = await ac.put(
@@ -101,11 +103,13 @@ async def test_set_avatar_preset_ok(preset_auth_app: FastAPI) -> None:
)
assert r.status_code == 200
body = r.json()
assert body["avatar_url"] == "/api/auth/avatar-presets/02.png"
url = body["avatar_url"]
assert url.startswith("/api/auth/avatar-presets/02.png")
assert "?v=" in url
svc: MagicMock = preset_auth_app.state._mock_auth_service
svc.update_avatar_url.assert_awaited_once_with(
uid, "/api/auth/avatar-presets/02.png"
)
stored = svc.update_avatar_url.await_args[0][1]
assert stored.startswith("/api/auth/avatar-presets/02.png")
assert "?v=" in stored
@pytest.mark.asyncio
@@ -168,3 +172,71 @@ async def test_get_uploaded_avatar_ok_when_stem_has_underscore(
async with AsyncClient(transport=transport, base_url="http://test") as ac:
r = await ac.get("/api/auth/avatars/user_abc_01.jpg")
assert r.status_code == 200
def _minimal_jpeg_bytes() -> bytes:
img = Image.new("RGB", (2, 2), color=(120, 80, 200))
buf = BytesIO()
img.save(buf, format="JPEG", quality=85)
return buf.getvalue()
@pytest.mark.asyncio
async def test_upload_avatar_cos_calls_storage_and_presigns(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
import app.core.dependencies as deps
from app.core.config import settings
from app.core.dependencies import get_current_user
from app.features.auth.router import router as auth_router
bucket, region = "life-test-bucket", "ap-shanghai"
uid = str(uuid.uuid4())
public = f"https://{bucket}.cos.{region}.myqcloud.com/avatars/{uid}.jpg"
for attr, val in (
("tencent_cos_secret_id", "sid"),
("tencent_cos_secret_key", "sk"),
("tencent_cos_bucket", bucket),
("tencent_cos_region", region),
("tencent_cos_base_url", f"https://{bucket}.cos.{region}.myqcloud.com"),
):
monkeypatch.setattr(settings, attr, val, raising=False)
mock_storage = MagicMock()
mock_storage.upload = MagicMock(return_value=public)
mock_storage.get_url = MagicMock(return_value="https://example.com/signed-avatar")
monkeypatch.setattr(deps, "get_object_storage", lambda: mock_storage)
fixed_user = _mock_current_user()
fixed_user.id = uid
fixed_user.avatar_url = None
async def _fake_update_avatar(u: str, url: str):
fixed_user.avatar_url = url
return fixed_user
mock_service = MagicMock(spec=AuthService)
mock_service.update_avatar_url = AsyncMock(side_effect=_fake_update_avatar)
app = FastAPI()
app.include_router(auth_router)
app.dependency_overrides[get_auth_service] = lambda: mock_service
app.dependency_overrides[get_current_user] = lambda: fixed_user
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
r = await ac.post(
"/api/auth/me/avatar",
files={"file": ("a.jpg", BytesIO(_minimal_jpeg_bytes()), "image/jpeg")},
headers={"Authorization": "Bearer x"},
)
assert r.status_code == 200
assert r.json()["avatar_url"] == "https://example.com/signed-avatar"
mock_storage.upload.assert_called_once()
assert mock_storage.upload.call_args[0][0] == f"avatars/{uid}.jpg"
mock_storage.get_url.assert_called_once()
assert mock_storage.get_url.call_args[0][0] == f"avatars/{uid}.jpg"

View File

@@ -50,8 +50,30 @@ async def test_avatar_upload_500_detail_sanitized(
) -> None:
from fastapi import FastAPI
from app.core.config import settings
import app.core.dependencies as deps
fake_user = MagicMock()
fake_user.id = "user-contract-test"
fake_user.avatar_url = None
mock_storage = MagicMock()
mock_storage.upload = MagicMock(
return_value="https://test-bucket.cos.ap-shanghai.myqcloud.com/avatars/user-contract-test/abc.jpg"
)
monkeypatch.setattr(deps, "get_object_storage", lambda: mock_storage)
monkeypatch.setattr(settings, "tencent_cos_secret_id", "sid", raising=False)
monkeypatch.setattr(settings, "tencent_cos_secret_key", "sk", raising=False)
monkeypatch.setattr(settings, "tencent_cos_bucket", "test-bucket", raising=False)
monkeypatch.setattr(
settings, "tencent_cos_region", "ap-shanghai", raising=False
)
monkeypatch.setattr(
settings,
"tencent_cos_base_url",
"https://test-bucket.cos.ap-shanghai.myqcloud.com",
raising=False,
)
class BoomAuth:
async def update_avatar_url(self, user_id: str, avatar_url: str):