Files
life-echo/api/app/features/evaluation/execution_service.py
Kevin ca8bcc8489 feat(evaluation): session catalog, user export import, and eval web UI
- Extend evaluation API: schemas, router, repo, admin and execution services
- Improve user export markdown importer; add fixtures and importer tests
- Session catalog repo/service updates; internal app wiring and docs
- Add internal-eval.sh helper; refresh app-eval-web (App, styles, Vite)
2026-04-06 13:49:28 +08:00

325 lines
11 KiB
Python

"""执行单次评测 run 与整实验(供 Celery / 内联调试)。"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.db import AsyncSessionLocal
from app.core.dependencies import get_eval_judge_langchain_llm, get_llm_provider
from app.core.logging import get_logger
from app.features.evaluation import repo as eval_repo
from app.features.evaluation.candidate_runner import (
EvalCandidateRunner,
simple_memoir_from_transcript,
)
from app.features.evaluation.gate_report_service import gate_result_to_details
from app.features.evaluation.gating_service import compute_gate
from app.features.evaluation.judge_service import EvalJudgeService
from app.features.evaluation.models import EvalCase, EvalRun, EvalVersion
logger = get_logger(__name__)
_MAX_JUDGE_MARKDOWN_CHARS = 20_000
_MAX_EVAL_CHAPTERS = 30
_MAX_EVAL_STORIES = 40
def _clip_md_for_judge(text: str, max_chars: int = _MAX_JUDGE_MARKDOWN_CHARS) -> str:
s = (text or "").strip()
if len(s) <= max_chars:
return s
return f"{s[:max_chars]}\n\n…(已截断供评审)"
def _composite(
conv: float | None, mem: float | None, weights: dict[str, Any] | None
) -> float:
w = weights or {}
wc = float(w.get("conversation", 0.5))
wm = float(w.get("memoir", 0.5))
c = float(conv or 0)
m = float(mem or 0)
return wc * c + wm * m
def _utterances_for_case(case: EvalCase) -> list[str]:
raw = case.user_utterances or []
return [str(u).strip() for u in raw if str(u).strip()]
async def execute_eval_run(
db: AsyncSession,
*,
run: EvalRun,
case: EvalCase,
version: EvalVersion,
) -> None:
if not settings.eval_execution_enabled:
await eval_repo.update_run(
db,
run,
status="failed",
error_message="EVAL_EXECUTION_ENABLED=false",
completed_at=datetime.now(timezone.utc),
)
return
utterances = _utterances_for_case(case)
if not utterances:
await eval_repo.update_run(
db,
run,
status="failed",
error_message="empty user_utterances",
completed_at=datetime.now(timezone.utc),
)
return
await eval_repo.update_run(
db,
run,
status="running",
started_at=datetime.now(timezone.utc),
error_message=None,
)
await db.commit()
provider_llm = getattr(get_llm_provider(), "langchain_llm", None)
if provider_llm is None:
await eval_repo.update_run(
db,
run,
status="failed",
error_message="生产 LLM 未配置",
completed_at=datetime.now(timezone.utc),
)
await db.commit()
return
judge_llm = get_eval_judge_langchain_llm()
judge = EvalJudgeService(judge_llm)
runner = EvalCandidateRunner(provider_llm)
cfg = version.config_json if isinstance(version.config_json, dict) else None
try:
replies, latencies = await runner.replay_utterances(
utterances,
version_config=cfg,
temperature=settings.eval_candidate_temperature,
)
except Exception as e:
logger.exception("eval replay failed: {}", e)
await eval_repo.update_run(
db,
run,
status="failed",
error_message=str(e)[:2000],
completed_at=datetime.now(timezone.utc),
)
await db.commit()
return
transcript_parts: list[str] = []
for i, u in enumerate(utterances):
if i >= len(replies):
break
transcript_parts.append(f"用户: {u}\nAI: {replies[i]}")
prior = ""
for idx, u in enumerate(utterances):
if idx >= len(replies):
break
reply = replies[idx]
lat = latencies[idx] if idx < len(latencies) else None
tj = await judge.judge_turn(
prior_transcript=prior,
user_utterance=u,
assistant_reply=reply,
)
scores = tj.model_dump() if tj else None
rationale = tj.rationale if tj else None
await eval_repo.add_turn(
db,
run_id=run.id,
turn_index=idx,
user_utterance=u,
assistant_reply=reply,
duration_ms=lat,
judge_scores_json=scores,
judge_rationale=rationale,
)
await db.commit()
prior = (prior + f"\n用户: {u}\nAI: {reply}")[-8000:]
full_transcript = "\n\n".join(transcript_parts)
conv_out = await judge.judge_conversation(full_transcript=full_transcript)
conv_total = conv_out.total_score if conv_out else None
memoir_md = simple_memoir_from_transcript(utterances, replies)
mem_out = await judge.judge_memoir(memoir_markdown=memoir_md)
chapter_entries: list[dict[str, Any]] = []
story_entries: list[dict[str, Any]] = []
uid = (case.source_user_id or "").strip()
if uid:
from app.features.memoir.repo import get_chapters_for_memoir_list
from app.features.story.repo import get_stories_for_user
try:
chapters = await get_chapters_for_memoir_list(
uid, db, active_only=True, is_new_only=None
)
for ch in chapters[:_MAX_EVAL_CHAPTERS]:
body = (ch.canonical_markdown or "").strip()
if not body:
continue
md = f"# 章节:{ch.title}\n\n{_clip_md_for_judge(body)}"
cj = await judge.judge_memoir(memoir_markdown=md)
chapter_entries.append(
{
"id": ch.id,
"title": ch.title,
"order_index": ch.order_index,
"judge": cj.model_dump() if cj else None,
}
)
except Exception as e:
logger.warning("eval chapter judges skipped: {}", e)
try:
stories = await get_stories_for_user(db, uid, status="active")
for st in stories[:_MAX_EVAL_STORIES]:
body = (st.canonical_markdown or "").strip()
if not body:
continue
md = f"# 故事:{st.title}\n\n{_clip_md_for_judge(body)}"
sj = await judge.judge_memoir(memoir_markdown=md)
story_entries.append(
{
"id": st.id,
"title": st.title,
"stage": st.stage,
"judge": sj.model_dump() if sj else None,
}
)
except Exception as e:
logger.warning("eval story judges skipped: {}", e)
mem_parts: list[float] = []
if mem_out is not None:
mem_parts.append(float(mem_out.total_score))
for row in chapter_entries:
j = row.get("judge")
if isinstance(j, dict) and j.get("total_score") is not None:
mem_parts.append(float(j["total_score"]))
for row in story_entries:
j = row.get("judge")
if isinstance(j, dict) and j.get("total_score") is not None:
mem_parts.append(float(j["total_score"]))
mem_total = sum(mem_parts) / len(mem_parts) if mem_parts else None
exp = await eval_repo.get_experiment(db, run.experiment_id)
weights = exp.composite_weights_json if exp else None
comp = _composite(conv_total, mem_total, weights)
bundle: dict[str, Any] = {
"conversation_judge": conv_out.model_dump() if conv_out else None,
"memoir_judge": mem_out.model_dump() if mem_out else None,
"chapters": chapter_entries,
"stories": story_entries,
}
await eval_repo.update_run(
db,
run,
status="completed",
memoir_markdown=memoir_md,
conversation_score_total=conv_total,
memoir_score_total=mem_total,
composite_score=comp,
judge_bundle_json=bundle,
completed_at=datetime.now(timezone.utc),
)
await db.commit()
async def _finalize_experiment_gate(db: AsyncSession, experiment_id: str) -> None:
runs = await eval_repo.list_runs_for_experiment(db, experiment_id)
exp = await eval_repo.get_experiment(db, experiment_id)
if not exp:
return
cases = await eval_repo.list_cases(db, exp.regression_set_id)
incomplete = [r for r in runs if r.status not in ("completed", "failed")]
if incomplete:
return
failed = [r for r in runs if r.status == "failed"]
if failed:
await eval_repo.update_experiment(
db,
exp,
status="failed",
error_message="部分 run 失败",
completed_at=datetime.now(timezone.utc),
)
await db.commit()
return
gr = compute_gate(cases=cases, runs=runs)
await eval_repo.upsert_gate_verdict(
db,
experiment_id=experiment_id,
passed=gr.passed,
mean_composite_delta=gr.mean_delta,
protected_regressions_json=gr.protected_regressions,
details_json=gate_result_to_details(gr),
)
await eval_repo.update_experiment(
db,
exp,
status="completed",
completed_at=datetime.now(timezone.utc),
)
await db.commit()
async def execute_experiment_full(experiment_id: str) -> None:
async with AsyncSessionLocal() as db:
exp = await eval_repo.get_experiment(db, experiment_id)
if not exp:
return
await eval_repo.update_experiment(db, exp, status="running")
await db.commit()
cases = await eval_repo.list_cases(db, exp.regression_set_id)
base_v = await eval_repo.get_version(db, exp.baseline_version_id)
cand_v = await eval_repo.get_version(db, exp.candidate_version_id)
if not base_v or not cand_v:
await eval_repo.update_experiment(
db,
exp,
status="failed",
error_message="version 不存在",
completed_at=datetime.now(timezone.utc),
)
await db.commit()
return
for case in cases:
for side, ver in ("baseline", base_v), ("candidate", cand_v):
run = await eval_repo.get_run(db, experiment_id, case.id, side)
if not run:
run = await eval_repo.create_run(
db,
experiment_id=experiment_id,
case_id=case.id,
side=side,
)
await db.commit()
await execute_eval_run(db, run=run, case=case, version=ver)
await _finalize_experiment_gate(db, experiment_id)