24 KiB
name, overview, todos, isProject
| name | overview | todos | isProject | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 架构重构计划 | 将现有 Life Echo API 从混合分层结构重构为 feature-first flat 架构,引入 ports/adapters 实现 provider-agnostic 基础设施抽象,集成 Alembic 做 schema 变更治理,统一日志到 loguru。 |
|
false |
Life Echo API 架构重构计划
一、现状诊断
当前项目位于 api/ 下,采用混合分层结构:
api/
main.py
routers/ # 16 个路由文件,部分含大量业务逻辑
services/ # 混合了业务服务和外部 SDK 调用
database/ # 单文件 models.py(9 个模型)+ database.py
agents/ # AI agent 层(conversation, memory, memoir)
middleware/ # 仅 auth.py
payment/ # 唯一较完整的 feature 模块
tasks/ # Celery 后台任务
migrations/ # 手写 SQL,无版本管理
核心问题:
- 路由层过胖,直接操作 DB 和调用外部服务
- 外部服务耦合:SMS(腾讯云)、存储(腾讯 COS)、TTS(OpenAI)直接写死 SDK
- 无统一配置类,
os.getenv()散落各处 - 无 Alembic,手写 SQL 迁移无版本追踪
- 标准库 logging,无结构化日志
- 无全局异常处理
- WebSocket 路由 ~1000 行,职责过重
已有较好抽象的部分(可复用/迁移):
payment/:已有 service facade + 双 providerservices/llm_service.py:通过 LangChain 已做 provider 切换- ASR:已有 provider 切换机制
二、目标目录结构
api/
app/
main.py # 仅做组装:include_router, middleware, lifespan
core/
config.py # 统一 BaseSettings(pydantic-settings)
db.py # Base, engine, session factory, get_async_db
logging.py # loguru 配置
errors.py # 全局异常体系 + exception handlers
security.py # JWT 签发/校验、密码哈希
middleware.py # CORS, request_id, logging middleware
dependencies.py # 全局共享依赖(get_current_user, port DI factory)
pagination.py # 通用分页
features/
auth/
router.py # 注册、登录、SMS 登录、刷新、登出
schemas.py
service.py # 认证业务逻辑
models.py # RefreshToken, SmsVerificationCode
repo.py
deps.py # feature 特有依赖(如 sms_port 注入)
conversation/
router.py # 对话 CRUD
schemas.py
service.py # 对话编排(调用 agent、quota 检查等)
models.py # Conversation, Segment
repo.py
ws/ # WebSocket 子包(从 1067 行 websocket.py 拆分)
router.py # WebSocket endpoint,仅做连接生命周期
connection_manager.py # 连接注册/注销、active_connections 字典
message_types.py # MessageType 枚举 + 消息序列化
pipeline.py # 消息分发管道:audio_segment -> ASR -> agent -> TTS -> 响应
profile_collector.py # 资料收集模式:缺失字段检测、提取、回填
quota_guard.py # 配额校验 hook(从 routers/quota 独立)
memoir/
router.py # books, chapters, memoir_state 合并
schemas.py
service.py # 回忆录编排(章节生成、状态流转)
models.py # Book, Chapter, MemoirState
repo.py
processor.py # 从 agents/memoir_processor.py 迁入
payment/
router.py
schemas.py
service.py # 统一支付门面(已有良好抽象,迁移即可)
models.py # Order
repo.py
deps.py
user/
router.py # profile, subscription, feedback
schemas.py
service.py
models.py # User
repo.py
quota/
service.py # PLAN_QUOTAS 配置 + check_can_send/check_can_organize
schemas.py # QuotaCheckResponse
router.py # GET /api/quota/check
deps.py # get_quota_service 注入
plan/
router.py
schemas.py
service.py
content/
router.py # faq, legal, home(轻量静态内容)
schemas.py
agents/
conversation_agent.py
memory_agent.py
prompts/
conversation_prompts.py
memory_prompts.py
profile_prompts.py
ports/ # 能力接口定义(Protocol / ABC)
sms.py # SmsSender protocol
llm.py # LLMProvider protocol
image_gen.py # ImageGenerator protocol
storage.py # ObjectStorage protocol
tts.py # TTSProvider protocol
asr.py # ASRProvider protocol
adapters/ # 具体 provider 实现
sms/
tencent.py
llm/
deepseek.py # 基于现有 llm_service.py
image_gen/
liblib.py
storage/
tencent_cos.py
tts/
openai_tts.py
asr/
whisper_local.py
tencent_asr.py
payment/
wechat.py
alipay.py
tasks/
celery_app.py
memoir_tasks.py
alembic/
env.py
versions/
alembic.ini
tests/
...
目录结构修订说明
修订 1:core/db.py 显式列出。 存放 Base = declarative_base()、engine/session factory、get_async_db 生成器。所有 feature models 共享此 Base。
修订 2:core/response.py 移除。 统一响应包装只用于错误响应(在 core/errors.py 的 exception handler 中处理)。成功响应直接返回 Pydantic model / FileResponse / 原始 dict,不强制 {code, data, message} 包装,避免对 FastAPI schema 自动生成、文件下载、支付回调、WebSocket 造成额外负担。
修订 3:auth models 与 user models 分离。 RefreshToken 和 SmsVerificationCode 归入 features/auth/models.py,因为它们是 identity/session 资产,认证流程 (auth/service.py) 是唯一强依赖方。User 模型留在 features/user/models.py,auth 通过 import user models 引用 User(允许的依赖方向:auth -> user)。
修订 4:WebSocket 从单个 ws_handler.py 拆为 ws/ 子包。 当前 routers/websocket.py 有 1067 行,同时承担连接管理、消息类型定义、音频分段处理、ASR 转写、配额校验、用户资料收集、Agent 编排、DB 落库、Celery 任务提交。仅改名为 ws_handler.py 会原样搬运复杂度。拆分为 6 个文件,每个职责单一。
修订 5:quota 提升为独立 feature。 当前 routers/quota.py 包含 PLAN_QUOTAS 配置、get_segment_count、get_chapter_count、check_can_send_message、check_can_submit_organize,被 conversation(WebSocket line 628)、memoir(任务提交前 line 1050)、plan 共同依赖。放在任何单个 feature 内都会造成反向依赖。独立为 features/quota/,其他 feature 的 service 通过注入 QuotaService 调用。
三、事务边界与 Session 管理
3.1 现状问题
当前存在两种矛盾的事务策略:
get_async_db()在请求结束时自动 commit(database.py line 76),但 router 内部又有显式await db.commit()(如 auth.py line 194),导致双重 commit 或语义不清。- Celery 任务直接用同步
SessionLocal()(memoir_tasks.py line 18),手动管理 session 生命周期,无统一规则。 - WebSocket handler 用
async for db in get_async_db()获取独立 session(websocket.py line 276),绕过 FastAPI 依赖注入。
3.2 新规则
核心原则:repo 不提交,service 统一管理事务边界。
core/db.py中的get_async_db()改为"干净 session":只负责创建和关闭,不自动 commit/rollback。- 事务控制权交给 service 层:service 方法内显式
await db.commit()或使用async with db.begin():块。 repo.py只做查询和db.add()/db.delete(),绝不调用commit()或flush()。- router 层不接触 session,通过
Depends注入 service(service 内部持有 session)。
Celery 任务:在 tasks/ 中定义 get_sync_db() context manager,同样遵循"service commit"规则。任务函数调用 service 的同步版本或直接在任务函数中管理事务。
WebSocket:ws/pipeline.py 中每个消息处理周期获取独立 session(通过 async with AsyncSessionLocal() as session:),处理完成后由 pipeline 显式 commit。
# core/db.py - 新的 session 生成器
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# features/auth/service.py - service 管理事务
class AuthService:
def __init__(self, db: AsyncSession, sms: SmsSender):
self._db = db
self._sms = sms
async def register(self, request: RegisterRequest) -> TokenResponse:
user = User(...)
self._db.add(user)
refresh_token = RefreshToken(...)
self._db.add(refresh_token)
await self._db.commit()
await self._db.refresh(user)
return TokenResponse(...)
四、依赖方向规则
用以下有向图描述允许的依赖方向(箭头 = "依赖于"):
graph TD
Router["features/*/router.py"] --> Service["features/*/service.py"]
Service --> Repo["features/*/repo.py"]
Service --> Ports["ports/*"]
Service --> QuotaSvc["features/quota/service.py"]
Repo --> Models["features/*/models.py"]
Adapters["adapters/*"] -.->|"实现"| Ports
Core["core/*"] --> Nothing["(无外部依赖)"]
Router --> Core
Service --> Core
Tasks["tasks/*"] --> Service
Agents["agents/*"] --> Ports
WsPipeline["conversation/ws/pipeline.py"] --> Service
WsPipeline --> Ports
硬性禁止:
router.py不得直接 import adapter 或操作 DB sessionservice.py不得 import 具体 adapter,只依赖 port protocolports/不得 importadapters/(方向反转)- feature 之间不得互相 import router;跨 feature 调用通过 service 注入
repo.py不得调用commit()/rollback()/flush()
四、Provider-Agnostic Ports 设计
4.1 核心可移植契约(Core Portable Contract)
每个 port 定义为 typing.Protocol,仅包含业务真正依赖的最小稳定能力:
SMS (ports/sms.py):
send_verification_code(phone: str, code: str) -> bool
LLM (ports/llm.py):
complete(messages: list[Message], temperature: float, ...) -> strstream(messages: list[Message], ...) -> AsyncIterator[str]
ImageGen (ports/image_gen.py):
generate(prompt: str, size: ImageSize, ...) -> ImageResultcheck_status(task_id: str) -> TaskStatus
Storage (ports/storage.py):
upload(key: str, data: bytes, content_type: str) -> strget_url(key: str, expires: int) -> strdelete(key: str) -> None
TTS (ports/tts.py):
synthesize(text: str, voice: str) -> bytes
ASR (ports/asr.py):
transcribe(audio: bytes, format: str) -> str
5.2 Provider Extensions 规则
绝对禁止 service 直接拿 adapter 扩展方法。 这会打穿 port 边界,让 service 再次耦合到具体 provider。
处理厂商特有能力的正确方式:
- 方案 A(首选):扩充现有 port。 如果能力具备跨 provider 通用性(如 image_gen 的 inpaint),在 port protocol 上新增方法,各 adapter 按能力实现或抛
NotImplementedError。 - 方案 B:定义第二个 port。 如果能力确实只属于某个 provider 的特殊功能(如 Liblib 的模板管理),定义一个独立的窄 port(如
ImageTemplateManager),只在需要的 feature deps 中注入。service 依赖的仍然是 port protocol,而非 adapter 类。 - 方案 C:adapter 内部自行消化。 厂商特有的配置项(如 Liblib template UUID、COS 生命周期策略)作为 adapter 构造参数传入,不暴露给 service。adapter 在实现 port 方法时自行使用这些配置。
判断标准:如果 service 代码中出现了 from adapters.xxx import ...,说明边界被打穿了。
4.3 DI 装配
在 core/dependencies.py 中根据 config.py 的配置选择 adapter:
def get_sms_sender() -> SmsSender:
if settings.sms_provider == "tencent":
return TencentSmsSender(settings.tencent_sms)
raise ValueError(f"Unknown SMS provider: {settings.sms_provider}")
feature 的 deps.py 或 router 通过 Depends(get_sms_sender) 注入。
五、Alembic 集成与 Schema 变更治理
5.1 初始化
- 在
api/根目录执行alembic init alembic - 修改
alembic/env.py,导入所有 feature 的models.py到统一Base.metadata alembic.ini中sqlalchemy.url读取DATABASE_URL
6.2 初始基线(不依赖 autogenerate)
当前 schema 来源是 Base.metadata.create_all() + 9 个手写 SQL 迁移文件,两者可能已经出现漂移。直接 autogenerate 大概率产出不准确的 diff。
正确步骤:
- 在 staging/dev 环境连接真实数据库,用
pg_dump --schema-only导出当前实际 schema 作为基准参照。 - 手写一个空的 Alembic revision 作为 baseline:
alembic revision -m "baseline_empty",upgrade/downgrade 均为 pass。 - 对现有数据库执行
alembic stamp head,将当前库标记为已在 head。 - 此后才开始用 autogenerate 生成增量迁移,每次 review 时对照
pg_dump确认无遗漏。 - 将
migrations/目录下的 9 个手写 SQL 归档到migrations_legacy/,不再使用。
5.3 变更治理规则
- 所有 schema 变更必须通过 Alembic migration 合入,禁止直接改 DB
autogenerate只做草稿,PR 中必须人工 review- 破坏性变更(删列、改类型)采用 expand/contract:先加新列 -> 双写 -> 迁移数据 -> 切读路径 -> 删旧列
- data migration 和 schema migration 分开写
- 每次部署前校验
alembic current==alembic heads main.py移除Base.metadata.create_all(),改为部署流程执行alembic upgrade head
6.4 Model 拆分
将 database/models.py 的 9 个模型拆入对应 feature:
User->features/user/models.pyRefreshToken,SmsVerificationCode->features/auth/models.py(identity/session 资产,auth 是唯一强依赖方)Conversation,Segment->features/conversation/models.pyBook,Chapter,MemoirState->features/memoir/models.pyOrder->features/payment/models.py
所有模型共享同一个 Base = declarative_base()(放 core/db.py),Alembic env.py 统一 import 所有 feature models。
跨 feature model 引用规则:auth/models.py 中的 RefreshToken.user_id 外键指向 users.id(表名级引用),不需要 import User 类。如需 ORM relationship,通过字符串引用 relationship("User", ...)。
六、日志统一化(loguru)
6.1 配置
在 core/logging.py 中:
- 移除标准库
logging.basicConfig - 配置 loguru:JSON 格式输出、按级别分文件、rotation/retention
- 拦截标准库 logging(
InterceptHandler),使 uvicorn / sqlalchemy / celery 日志统一走 loguru
6.2 Request Logging Middleware
新增 middleware,每个请求注入 request_id(UUID),通过 loguru 的 contextualize 绑定到所有日志:
with logger.contextualize(request_id=request_id, user_id=user_id):
response = await call_next(request)
6.3 Provider 调用日志
在每个 adapter 中统一记录:provider 名称、操作、耗时、成功/失败、错误分类。
七、全局异常处理
在 core/errors.py 中定义异常体系:
class AppError(Exception):
status_code: int
error_code: str
message: str
class NotFoundError(AppError): ...
class AuthenticationError(AppError): ...
class ProviderError(AppError): ...
在 main.py 注册全局 exception handler,统一返回格式:
{"error_code": "PROVIDER_ERROR", "message": "...", "request_id": "..."}
八、统一配置
将散落的 os.getenv() 收归 core/config.py,使用 pydantic-settings BaseSettings:
class Settings(BaseSettings):
database_url: str
redis_url: str
secret_key: str
sms_provider: str = "tencent"
storage_provider: str = "tencent_cos"
image_gen_provider: str = "liblib"
# 嵌套配置
tencent_sms: TencentSmsConfig
tencent_cos: TencentCosConfig
...
model_config = SettingsConfigDict(env_file=".env", env_nested_delimiter="__")
九、迁移执行策略
Phase 0 - 基础设施搭建(无功能变更)
- 迁移到 uv:
uv init,将requirements.txt的依赖迁入pyproject.toml的[project.dependencies],删除requirements.txt,生成uv.lock并纳入版本控制 - 创建
app/core/骨架:config, db, logging, errors, security, dependencies, middleware - 重写
get_async_db()去掉自动 commit,定义get_sync_db()context manager 供 Celery 使用 - 初始化 Alembic:手写空 baseline revision +
stamp head(不用 autogenerate) - 集成 loguru,配置 InterceptHandler 拦截标准库 logging
- 添加 request_id middleware
- 定义全局异常体系(仅统一错误响应格式)
验证点:uv sync --dev 能安装全部依赖,应用能以新 core 启动,所有现有 API 不受影响。注意 get_async_db() 的 commit 语义变更需要同步修改所有现有 router 中缺少显式 commit 的写操作。
Phase 1 - Ports 定义 + Adapters 迁移
- 定义 6 个 port protocol(sms, llm, image_gen, storage, tts, asr)
- 将现有 SDK 代码封装为 adapter,实现对应 protocol
- 在
core/dependencies.py中注册 DI factory - payment adapter 从
payment/直接迁移(已有好的抽象)
验证点:所有 adapter 通过 port protocol 类型检查,现有功能不变。
Phase 2 - Feature 模块重构(逐个 feature)
按依赖关系从外到内逐个迁移,建议顺序:
content(faq, legal, home)— 最简单,验证 feature 骨架plan— 轻量,少依赖quota— 提升为独立 feature,定义QuotaServiceuser— 拆出 User modelauth— 拆出 RefreshToken/SmsVerificationCode models,依赖 user models + sms portpayment— 迁移现有 payment/ 模块memoir— 合并 books/chapters/memoir_state,迁移 processorconversation— 最复杂,包含 WebSocket 深度拆分(见下)
每个 feature 迁移步骤:
- 创建
models.py(从全局 models.py 拆出) - 创建
repo.py(从 router 中提取 DB 操作,禁止 commit) - 创建
service.py(从 router 中提取业务逻辑,负责事务提交) - 创建
schemas.py(收集 Pydantic schema) - 精简
router.py(只保留 HTTP 边界:参数解析、响应状态码、调用 service) - 更新
main.py的include_router
conversation WebSocket 拆分详细步骤:
当前 routers/websocket.py(1067 行)需拆为 features/conversation/ws/ 子包:
message_types.py:提取MessageType枚举(当前 line 32-45)+ 消息构造工具函数connection_manager.py:提取ConnectionManager类(当前 line 49-120),只保留连接注册/注销/发送,移除 agent/runner 实例持有profile_collector.py:提取_get_missing_profile_fields、_get_filled_profile_fields、_apply_extracted_profile(当前 line 854-898)和资料收集模式分支(line 917-952)quota_guard.py:封装配额校验逻辑(当前 line 627-637),调用quota/service.pypipeline.py:核心消息处理管道,编排 ASR -> 落库 -> Agent -> TTS -> 响应,依赖上述模块 + ports(asr, tts, llm)router.py:WebSocket endpoint,仅做连接生命周期管理(accept, 主循环, disconnect),将每条消息委托给 pipeline
验证点:每迁移一个 feature,运行测试确保该 feature API 行为不变。
Phase 3 - 清理与加固
- 删除旧
routers/,services/,database/models.py,middleware/,migrations/ - 删除
main.py中的init_db()/create_all() - 更新 Dockerfile:改用 uv 安装依赖(
COPY pyproject.toml uv.lock ./+uv sync --frozen --no-dev),或用uv export --no-dev > requirements.txt导出后继续用 pip - 更新 docker-compose、deploy.sh 中的路径和启动命令
- 更新 Celery worker 启动命令(
uv run celery ...) - 更新所有测试 import 路径
- 补充缺失的测试
十一、硬性规则(写入项目规范)
**common//utils//shared//helpers/禁止创建**:只有 truly cross-cutting 能力进core/,业务 DTO/校验/错误码归 feature- router 禁止操作 DB:所有数据访问通过 repo,所有业务逻辑通过 service
- service 禁止 import adapter:只依赖 port protocol,通过 DI 注入。代码中出现
from adapters.xxx import ...即为违规 - feature 间禁止 import router:跨 feature 调用通过 service 注入
- 所有 schema 变更走 Alembic:禁止直接 DDL
- 新增 provider 必须实现 port protocol:不得在 feature 中直接调用 SDK
- 事务边界:repo 不提交,service 管事务:
repo.py只做add/delete/query,commit/rollback由 service 或 UoW 统一执行。get_async_db()不自动 commit - port 边界不可打穿:service 需要厂商增强能力时,必须扩充 port 或定义第二个窄 port,禁止直接引用 adapter 扩展方法
- quota 是独立 feature:conversation、memoir、payment 如需配额检查,通过注入
QuotaService,不得直接 import quota 内部函数 - 成功响应不强制包装:
core/errors.py只统一错误响应格式{error_code, message, request_id},成功响应直接返回 Pydantic model / FileResponse / 原始结构