"""执行单次评测 run 与整实验(供 Celery / 内联调试)。""" from __future__ import annotations from datetime import datetime, timezone from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.db import AsyncSessionLocal from app.core.dependencies import get_eval_judge_langchain_llm, get_llm_provider from app.core.logging import get_logger from app.features.evaluation import repo as eval_repo from app.features.evaluation.candidate_runner import ( EvalCandidateRunner, simple_memoir_from_transcript, ) from app.features.evaluation.gate_report_service import gate_result_to_details from app.features.evaluation.gating_service import compute_gate from app.features.evaluation.judge_service import EvalJudgeService from app.features.evaluation.models import EvalCase, EvalRun, EvalVersion logger = get_logger(__name__) def _composite( conv: float | None, mem: float | None, weights: dict[str, Any] | None ) -> float: w = weights or {} wc = float(w.get("conversation", 0.5)) wm = float(w.get("memoir", 0.5)) c = float(conv or 0) m = float(mem or 0) return wc * c + wm * m def _utterances_for_case(case: EvalCase) -> list[str]: raw = case.user_utterances or [] return [str(u).strip() for u in raw if str(u).strip()] async def execute_eval_run( db: AsyncSession, *, run: EvalRun, case: EvalCase, version: EvalVersion, ) -> None: if not settings.eval_execution_enabled: await eval_repo.update_run( db, run, status="failed", error_message="EVAL_EXECUTION_ENABLED=false", completed_at=datetime.now(timezone.utc), ) return utterances = _utterances_for_case(case) if not utterances: await eval_repo.update_run( db, run, status="failed", error_message="empty user_utterances", completed_at=datetime.now(timezone.utc), ) return await eval_repo.update_run( db, run, status="running", started_at=datetime.now(timezone.utc), error_message=None, ) await db.commit() provider_llm = getattr(get_llm_provider(), "langchain_llm", None) if provider_llm is None: await eval_repo.update_run( db, run, status="failed", error_message="生产 LLM 未配置", completed_at=datetime.now(timezone.utc), ) await db.commit() return judge_llm = get_eval_judge_langchain_llm() judge = EvalJudgeService(judge_llm) runner = EvalCandidateRunner(provider_llm) cfg = version.config_json if isinstance(version.config_json, dict) else None try: replies, latencies = await runner.replay_utterances( utterances, version_config=cfg, temperature=settings.eval_candidate_temperature, ) except Exception as e: logger.exception("eval replay failed: {}", e) await eval_repo.update_run( db, run, status="failed", error_message=str(e)[:2000], completed_at=datetime.now(timezone.utc), ) await db.commit() return transcript_parts: list[str] = [] for i, u in enumerate(utterances): if i >= len(replies): break transcript_parts.append(f"用户: {u}\nAI: {replies[i]}") prior = "" for idx, u in enumerate(utterances): if idx >= len(replies): break reply = replies[idx] lat = latencies[idx] if idx < len(latencies) else None tj = await judge.judge_turn( prior_transcript=prior, user_utterance=u, assistant_reply=reply, ) scores = tj.model_dump() if tj else None rationale = tj.rationale if tj else None await eval_repo.add_turn( db, run_id=run.id, turn_index=idx, user_utterance=u, assistant_reply=reply, duration_ms=lat, judge_scores_json=scores, judge_rationale=rationale, ) await db.commit() prior = (prior + f"\n用户: {u}\nAI: {reply}")[-8000:] full_transcript = "\n\n".join(transcript_parts) conv_out = await judge.judge_conversation(full_transcript=full_transcript) conv_total = conv_out.total_score if conv_out else None memoir_md = simple_memoir_from_transcript(utterances, replies) mem_out = await judge.judge_memoir(memoir_markdown=memoir_md) mem_total = mem_out.total_score if mem_out else None exp = await eval_repo.get_experiment(db, run.experiment_id) weights = exp.composite_weights_json if exp else None comp = _composite(conv_total, mem_total, weights) bundle: dict[str, Any] = { "conversation_judge": conv_out.model_dump() if conv_out else None, "memoir_judge": mem_out.model_dump() if mem_out else None, } await eval_repo.update_run( db, run, status="completed", memoir_markdown=memoir_md, conversation_score_total=conv_total, memoir_score_total=mem_total, composite_score=comp, judge_bundle_json=bundle, completed_at=datetime.now(timezone.utc), ) await db.commit() async def _finalize_experiment_gate(db: AsyncSession, experiment_id: str) -> None: runs = await eval_repo.list_runs_for_experiment(db, experiment_id) exp = await eval_repo.get_experiment(db, experiment_id) if not exp: return cases = await eval_repo.list_cases(db, exp.regression_set_id) incomplete = [r for r in runs if r.status not in ("completed", "failed")] if incomplete: return failed = [r for r in runs if r.status == "failed"] if failed: await eval_repo.update_experiment( db, exp, status="failed", error_message="部分 run 失败", completed_at=datetime.now(timezone.utc), ) await db.commit() return gr = compute_gate(cases=cases, runs=runs) await eval_repo.upsert_gate_verdict( db, experiment_id=experiment_id, passed=gr.passed, mean_composite_delta=gr.mean_delta, protected_regressions_json=gr.protected_regressions, details_json=gate_result_to_details(gr), ) await eval_repo.update_experiment( db, exp, status="completed", completed_at=datetime.now(timezone.utc), ) await db.commit() async def execute_experiment_full(experiment_id: str) -> None: async with AsyncSessionLocal() as db: exp = await eval_repo.get_experiment(db, experiment_id) if not exp: return await eval_repo.update_experiment(db, exp, status="running") await db.commit() cases = await eval_repo.list_cases(db, exp.regression_set_id) base_v = await eval_repo.get_version(db, exp.baseline_version_id) cand_v = await eval_repo.get_version(db, exp.candidate_version_id) if not base_v or not cand_v: await eval_repo.update_experiment( db, exp, status="failed", error_message="version 不存在", completed_at=datetime.now(timezone.utc), ) await db.commit() return for case in cases: for side, ver in ("baseline", base_v), ("candidate", cand_v): run = await eval_repo.get_run(db, experiment_id, case.id, side) if not run: run = await eval_repo.create_run( db, experiment_id=experiment_id, case_id=case.id, side=side, ) await db.commit() await execute_eval_run(db, run=run, case=case, version=ver) await _finalize_experiment_gate(db, experiment_id)