"""手动触发评测台评审(智谱 / DeepSeek;不写 eval_runs;Playground 对话评分写入 conversations 表)。""" from __future__ import annotations import copy import re from collections.abc import AsyncIterator from datetime import datetime, timezone from typing import Any from sqlalchemy.ext.asyncio import AsyncSession from app.core.dependencies import ( EvalJudgeProvider, build_eval_judge_llm_spec, ) from app.core.logging import get_logger from app.features.conversation import repo as conversation_repo from app.features.evaluation.errors import ( EvaluationBadRequestError, EvaluationNotFoundError, ) from app.features.evaluation.conversation_compare_summary import ( build_conversation_compare_summary, ) from app.features.evaluation.eval_trace_service import EvalTraceService from app.features.evaluation.judge_schemas import ConversationJudgeOutput from app.features.evaluation.judge_service import ( EvalJudgeService, eval_judge_compare_bundle_caps, eval_judge_conversation_transcript_max_chars_for_context, ) from app.features.evaluation.schemas import MemoirSectionBaselineOut from app.features.evaluation.session_catalog_service import SessionCatalogService from app.features.evaluation.transcript_for_judge import ( assistant_text_for_eval_display, format_eval_turn_block, format_export_turns_with_labels, format_session_messages_with_turn_labels, pair_session_messages_to_turns, ) from app.features.evaluation.user_export_fixtures import read_user_export_fixture from app.features.memoir.repo import get_chapters_for_memoir_list from app.features.story.repo import get_stories_for_user logger = get_logger(__name__) _MAX_JUDGE_MARKDOWN_CHARS = 20_000 _MAX_EVAL_CHAPTERS = 30 _MAX_EVAL_STORIES = 40 _PRIOR_TRANSCRIPT_MAX_CHARS = 8000 _JUDGE_CONFIG_HINT = ( "评审未配置:智谱需 eval_judge_api_key 或 zhipu_api_key;" "DeepSeek 需 deepseek_api_key(或 llm_api_key)" ) def _make_eval_judge( judge_provider: EvalJudgeProvider, judge_model: str | None, ) -> tuple[EvalJudgeService | None, str]: spec = build_eval_judge_llm_spec(judge_provider, judge_model) if not spec or not spec.llm: return None, "" return ( EvalJudgeService( spec.llm, context_window_tokens=spec.context_window_tokens, ), spec.resolved_model, ) def _strip_baseline_judge_errors(errs: list[Any]) -> list[str]: out: list[str] = [] for e in errs: s = str(e) if e is not None else "" if not s.strip(): continue if ( "基准整体打分失败" in s or s.startswith("baseline_glm5:") or "baseline_glm5_failed:" in s ): continue out.append(s) return out async def _iter_turn_judgments_for_turns( judge: EvalJudgeService, turns: list[tuple[str, str]], *, sse_event: str, ) -> AsyncIterator[dict[str, Any]]: """与 `execute_eval_run` 相同的逐轮 prior 截断与块累积。""" prior_blocks: list[str] = [] for idx, (u_raw, ai_raw) in enumerate(turns): u = (u_raw or "").strip() reply = assistant_text_for_eval_display(str(ai_raw)) prior = "\n\n".join(prior_blocks) if len(prior) > _PRIOR_TRANSCRIPT_MAX_CHARS: prior = prior[-_PRIOR_TRANSCRIPT_MAX_CHARS:] tj = await judge.judge_turn( prior_transcript=prior, user_utterance=u, assistant_reply=reply, turn_index_0=idx, ) yield { "event": sse_event, "turn_index": idx, "ok": tj is not None, "judge": tj.model_dump() if tj else None, } prior_blocks.append(format_eval_turn_block(idx, u, reply)) def _clip_md_for_judge(text: str, max_chars: int = _MAX_JUDGE_MARKDOWN_CHARS) -> str: s = (text or "").strip() if len(s) <= max_chars: return s return f"{s[:max_chars]}\n\n…(已截断供评审)" async def _conversation_transcript_for_manual( db: AsyncSession, conversation_id: str ) -> str: rows = await conversation_repo.get_conversation_messages(conversation_id, db) return format_session_messages_with_turn_labels(rows) def _normalize_title_key(title: str) -> str: t = (title or "").strip().lower() t = re.sub(r"^#+\s*", "", t) return re.sub(r"\s+", " ", t) def _baseline_for_chapter_title( baselines: list[MemoirSectionBaselineOut], chapter_title: str, index: int, ) -> MemoirSectionBaselineOut | None: if baselines: key = _normalize_title_key(chapter_title) for b in baselines: if _normalize_title_key(b.title) == key: return b if 0 <= index < len(baselines): return baselines[index] return None class EvalJudgeManualService: def __init__(self, db: AsyncSession) -> None: self._db = db async def _persist_playground_conversation_judge( self, conversation_id: str, bundle: dict[str, Any] ) -> None: try: row = await conversation_repo.set_playground_conversation_judge_json( conversation_id, self._db, bundle ) if row is not None: await self._db.commit() except Exception: logger.exception( "persist playground_conversation_judge_json failed conversation_id={}", conversation_id, ) async def judge_conversation( self, conversation_id: str, fixture_filename: str | None, *, judge_provider: EvalJudgeProvider = "zhipu", judge_model: str | None = None, ) -> dict[str, Any]: cid = (conversation_id or "").strip() if not cid: raise EvaluationBadRequestError("conversation_id is required") catalog = SessionCatalogService(self._db) dialogue = await catalog.get_session_dialogue(cid) if not dialogue: raise EvaluationNotFoundError("conversation not found") replay_transcript = format_session_messages_with_turn_labels( list(dialogue.messages) ) if not replay_transcript.strip(): raise EvaluationBadRequestError("no messages to judge") fn = (fixture_filename or "").strip() or None baseline_transcript = "" if fn: try: turns, _ = read_user_export_fixture(fn) baseline_transcript = format_export_turns_with_labels(turns) except ValueError as e: raise EvaluationBadRequestError(str(e)) from e except FileNotFoundError as e: raise EvaluationNotFoundError("fixture not found") from e errors: list[str] = [] judge, resolved_model = _make_eval_judge(judge_provider, judge_model) if not judge: raise EvaluationBadRequestError(_JUDGE_CONFIG_HINT) baseline_judge_dict: dict[str, Any] | None = None if baseline_transcript.strip(): baseline_result = await judge.judge_conversation_result( full_transcript=baseline_transcript ) bj = baseline_result.output if bj: baseline_judge_dict = bj.model_dump() else: errors.append( f"baseline_glm5_failed: {baseline_result.error or 'unknown error'}" ) elif fn: errors.append("baseline_transcript_empty") replay_result = await judge.judge_conversation_result( full_transcript=replay_transcript ) rj = replay_result.output replay_judge_dict = rj.model_dump() if rj else None if not rj: errors.append( f"replay_glm5_failed: {replay_result.error or 'unknown error'}" ) _cmp_total, _cmp_per_side = eval_judge_compare_bundle_caps(judge._ctx_tokens) bundle: dict[str, Any] = { "version": 1, "judged_at": datetime.now(timezone.utc).isoformat(), "fixture_filename": fn, "baseline_judge": baseline_judge_dict, "replay_judge": replay_judge_dict, "baseline_turn_judges": {}, "replay_turn_judges": {}, "compare_summary": build_conversation_compare_summary( baseline_judge=bj, replay_judge=rj, baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, conv_cap=eval_judge_conversation_transcript_max_chars_for_context( judge._ctx_tokens ), compare_cap_total=_cmp_total, compare_per_side_cap=_cmp_per_side, fixture_filename=fn, ), "compare_markdown": "", "errors": list(errors), "warnings": [], "options": { "include_turn_judges": False, "include_baseline_turn_judges": False, "judge_provider": judge_provider, "judge_model": resolved_model, }, } await self._persist_playground_conversation_judge(cid, bundle) return { "conversation_id": cid, "fixture_filename": fn, "baseline_transcript": baseline_transcript, "replay_transcript": replay_transcript, "baseline_judge": baseline_judge_dict, "replay_judge": replay_judge_dict, "compare_summary": bundle.get("compare_summary"), "errors": errors, } async def iter_conversation_judge_sse( self, conversation_id: str, fixture_filename: str | None, *, include_turn_judges: bool = False, include_baseline_turn_judges: bool = False, judge_provider: EvalJudgeProvider = "zhipu", judge_model: str | None = None, ) -> AsyncIterator[dict[str, Any]]: """供 SSE:先整体基准分、再整体回放分,可选逐轮分,再流式对比与建议;成功后写入 playground 字段。""" acc: dict[str, Any] = { "version": 1, "fixture_filename": None, "baseline_judge": None, "replay_judge": None, "baseline_turn_judges": {}, "replay_turn_judges": {}, "compare_summary": None, "compare_markdown": "", "errors": [], "warnings": [], "options": { "include_turn_judges": include_turn_judges, "include_baseline_turn_judges": include_baseline_turn_judges, "judge_provider": judge_provider, "judge_model": "", }, } cid = (conversation_id or "").strip() if not cid: yield { "event": "error", "phase": "validate", "message": "conversation_id is required", } return catalog = SessionCatalogService(self._db) dialogue = await catalog.get_session_dialogue(cid) if not dialogue: yield { "event": "error", "phase": "load", "message": "conversation not found", } return replay_transcript = format_session_messages_with_turn_labels( list(dialogue.messages) ) if not replay_transcript.strip(): yield {"event": "error", "phase": "load", "message": "no messages to judge"} return fn = (fixture_filename or "").strip() or None baseline_transcript = "" export_turns: list[tuple[str, str]] | None = None if fn: try: turns, _ = read_user_export_fixture(fn) export_turns = list(turns) baseline_transcript = format_export_turns_with_labels(turns) except ValueError as e: yield {"event": "error", "phase": "fixture", "message": str(e)} return except FileNotFoundError: yield { "event": "error", "phase": "fixture", "message": "fixture not found", } return judge, resolved_model = _make_eval_judge(judge_provider, judge_model) if not judge: yield { "event": "error", "phase": "config", "message": _JUDGE_CONFIG_HINT, } return acc["options"]["judge_model"] = resolved_model acc["fixture_filename"] = fn _sse_cmp_total, _sse_cmp_per = eval_judge_compare_bundle_caps(judge._ctx_tokens) persist = True try: yield { "event": "meta", "conversation_id": cid, "fixture_filename": fn, "judge_provider": judge_provider, "judge_model": resolved_model, } if not baseline_transcript.strip(): wmsg = "未提供基准 MD 或基准无文本:仅对回放对话打分并输出单侧改进建议" acc["warnings"].append(wmsg) yield {"event": "warning", "message": wmsg} baseline_judge = None if baseline_transcript.strip(): baseline_result = await judge.judge_conversation_result( full_transcript=baseline_transcript ) baseline_judge = baseline_result.output acc["baseline_judge"] = ( baseline_judge.model_dump() if baseline_judge else None ) yield { "event": "baseline_judge", "ok": baseline_judge is not None, "judge": acc["baseline_judge"], } if not baseline_judge: err = ( f"基准整体打分失败:{baseline_result.error}" if baseline_result.error else "基准整体打分失败(密钥、限流或 JSON 解析失败,见服务端日志)" ) acc["errors"].append(err) yield { "event": "error", "phase": "baseline_glm5", "message": err, } elif ( include_baseline_turn_judges and export_turns and baseline_judge is not None ): yield {"event": "meta", "phase": "baseline_turn_judges_start"} async for row in _iter_turn_judgments_for_turns( judge, export_turns, sse_event="baseline_turn_judge", ): if row.get("event") == "baseline_turn_judge": idx = row.get("turn_index") if isinstance(idx, (int, float)): acc["baseline_turn_judges"][str(int(idx))] = row.get( "judge" ) yield row else: acc["baseline_judge"] = None yield { "event": "baseline_judge", "ok": False, "skipped": True, "judge": None, } replay_result = await judge.judge_conversation_result( full_transcript=replay_transcript ) replay_judge = replay_result.output acc["replay_judge"] = replay_judge.model_dump() if replay_judge else None acc["compare_summary"] = build_conversation_compare_summary( baseline_judge=baseline_judge, replay_judge=replay_judge, baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, conv_cap=eval_judge_conversation_transcript_max_chars_for_context( judge._ctx_tokens ), compare_cap_total=_sse_cmp_total, compare_per_side_cap=_sse_cmp_per, fixture_filename=fn, ) yield { "event": "replay_judge", "ok": replay_judge is not None, "judge": acc["replay_judge"], } yield {"event": "compare_summary", "summary": acc["compare_summary"]} if not replay_judge: err = ( f"回放对话整体打分失败:{replay_result.error}" if replay_result.error else "回放对话整体打分失败(限流或 JSON 解析失败,见服务端日志)" ) acc["errors"].append(err) yield { "event": "error", "phase": "replay_glm5", "message": err, } yield {"event": "done"} return if include_turn_judges: replay_pairs = pair_session_messages_to_turns(list(dialogue.messages)) if replay_pairs: yield {"event": "meta", "phase": "replay_turn_judges_start"} async for row in _iter_turn_judgments_for_turns( judge, replay_pairs, sse_event="replay_turn_judge", ): if row.get("event") == "replay_turn_judge": idx = row.get("turn_index") if isinstance(idx, (int, float)): acc["replay_turn_judges"][str(int(idx))] = row.get( "judge" ) yield row async for piece in judge.stream_conversation_compare( baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, baseline_judge=baseline_judge, replay_judge=replay_judge, ): if piece: acc["compare_markdown"] += piece yield {"event": "compare_delta", "text": piece} yield {"event": "done"} finally: if persist: acc["judged_at"] = datetime.now(timezone.utc).isoformat() await self._persist_playground_conversation_judge(cid, acc) async def retry_baseline_conversation_judge( self, conversation_id: str, fixture_filename: str | None, *, include_baseline_turn_judges: bool = False, judge_provider: EvalJudgeProvider = "zhipu", judge_model: str | None = None, ) -> dict[str, Any]: """仅重试导出基线整体 GLM 分(及可选基线逐轮),并基于已有 replay 分重新生成对比稿。""" cid = (conversation_id or "").strip() if not cid: raise EvaluationBadRequestError("conversation_id is required") catalog = SessionCatalogService(self._db) dialogue = await catalog.get_session_dialogue(cid) if not dialogue: raise EvaluationNotFoundError("conversation not found") replay_transcript = format_session_messages_with_turn_labels( list(dialogue.messages) ) if not replay_transcript.strip(): raise EvaluationBadRequestError("no messages to judge") fn = (fixture_filename or "").strip() or None if not fn: raise EvaluationBadRequestError( "请选择基线 MD(fixture_filename)后再重试基准分" ) try: turns, _ = read_user_export_fixture(fn) export_turns = list(turns) baseline_transcript = format_export_turns_with_labels(turns) except ValueError as e: raise EvaluationBadRequestError(str(e)) from e except FileNotFoundError: raise EvaluationNotFoundError("fixture not found") from None if not baseline_transcript.strip(): raise EvaluationBadRequestError("baseline transcript is empty") prev = await catalog.get_playground_conversation_judge_json(cid) if not prev or not isinstance(prev, dict): raise EvaluationBadRequestError( "服务端没有已保存的评分草稿:请先跑一次「自动评分(流式)」" "直到回放侧打分完成,再使用本重试。" ) raw_replay = prev.get("replay_judge") if not raw_replay or not isinstance(raw_replay, dict): raise EvaluationBadRequestError( "已保存结果中缺少回放侧整体分:请先完成流式评分中的回放打分阶段再重试基准。" ) try: replay_model = ConversationJudgeOutput.model_validate(raw_replay) except Exception as e: raise EvaluationBadRequestError( "已保存的回放评分格式无效,请重新跑一次完整流式评分。" ) from e judge, resolved_model = _make_eval_judge(judge_provider, judge_model) if not judge: raise EvaluationBadRequestError(_JUDGE_CONFIG_HINT) _rt_cmp_total, _rt_cmp_per = eval_judge_compare_bundle_caps(judge._ctx_tokens) baseline_result = await judge.judge_conversation_result( full_transcript=baseline_transcript ) if not baseline_result.output: err = baseline_result.error or "unknown error" msg = f"基准整体打分失败:{err}" errs = _strip_baseline_judge_errors(list(prev.get("errors") or [])) errs.append(msg) return { "ok": False, "error": err, "message": msg, "baseline_judge": None, "replay_judge": raw_replay, "compare_summary": build_conversation_compare_summary( baseline_judge=None, replay_judge=replay_model, baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, conv_cap=eval_judge_conversation_transcript_max_chars_for_context( judge._ctx_tokens ), compare_cap_total=_rt_cmp_total, compare_per_side_cap=_rt_cmp_per, fixture_filename=fn, ), "compare_markdown": "", "baseline_turn_judges": {}, "errors": errs, } baseline_judge = baseline_result.output acc: dict[str, Any] = copy.deepcopy(prev) acc.setdefault("version", 1) acc["baseline_judge"] = baseline_judge.model_dump() acc["fixture_filename"] = fn acc["errors"] = _strip_baseline_judge_errors(list(acc.get("errors") or [])) opts = acc.setdefault("options", {}) if isinstance(opts, dict): opts["judge_provider"] = judge_provider opts["judge_model"] = resolved_model if include_baseline_turn_judges and export_turns: acc["baseline_turn_judges"] = {} async for row in _iter_turn_judgments_for_turns( judge, export_turns, sse_event="baseline_turn_judge", ): idx = row.get("turn_index") if isinstance(idx, (int, float)) and row.get("judge") is not None: acc["baseline_turn_judges"][str(int(idx))] = row["judge"] acc["compare_markdown"] = "" acc["compare_summary"] = build_conversation_compare_summary( baseline_judge=baseline_judge, replay_judge=replay_model, baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, conv_cap=eval_judge_conversation_transcript_max_chars_for_context( judge._ctx_tokens ), compare_cap_total=_rt_cmp_total, compare_per_side_cap=_rt_cmp_per, fixture_filename=fn, ) async for piece in judge.stream_conversation_compare( baseline_transcript=baseline_transcript, replay_transcript=replay_transcript, baseline_judge=baseline_judge, replay_judge=replay_model, ): if piece: acc["compare_markdown"] += piece acc["judged_at"] = datetime.now(timezone.utc).isoformat() await self._persist_playground_conversation_judge(cid, acc) return { "ok": True, "error": None, "message": None, "baseline_judge": acc["baseline_judge"], "replay_judge": acc.get("replay_judge"), "compare_summary": acc.get("compare_summary"), "compare_markdown": acc.get("compare_markdown") or "", "baseline_turn_judges": acc.get("baseline_turn_judges") or {}, "errors": acc["errors"], } async def judge_memoir_for_user( self, user_id: str, baseline_sections: list[MemoirSectionBaselineOut] | None, *, judge_provider: EvalJudgeProvider = "zhipu", judge_model: str | None = None, ) -> dict[str, Any]: uid = (user_id or "").strip() if not uid: raise EvaluationBadRequestError("user_id is required") judge, _resolved = _make_eval_judge(judge_provider, judge_model) if not judge: raise EvaluationBadRequestError(_JUDGE_CONFIG_HINT) baselines = list(baseline_sections or []) trace_svc = EvalTraceService(self._db) def _chapter_evidence_notes( lineage_tier: str, evidence_summary: str, truncated: bool, dropped: list[str], ) -> str: drops = ",".join(dropped[:12]) if dropped else "" return ( "严格按文档打分;真实性、事实覆盖率、可追溯性以本章节绑定的证据闭包为准。" f" lineage_tier={lineage_tier};evidence_summary={evidence_summary};" f" prompt_truncated={truncated};dropped_sections={drops or 'none'}" ) chapter_results: list[dict[str, Any]] = [] try: chapters = await get_chapters_for_memoir_list( uid, self._db, active_only=True, is_new_only=None ) for i, ch in enumerate(chapters[:_MAX_EVAL_CHAPTERS]): body = (ch.canonical_markdown or "").strip() if not body: continue bl = _baseline_for_chapter_title(baselines, str(ch.title or ""), i) baseline_excerpt = "" if bl and (bl.body or "").strip(): baseline_excerpt = _clip_md_for_judge(bl.body, max_chars=6000) md = f"# 章节:{ch.title}\n\n" if baseline_excerpt: md += f"## 导出基线(节选)\n\n{baseline_excerpt}\n\n" md += f"## 当前成稿\n\n{_clip_md_for_judge(body)}" cb = await trace_svc.build_chapter_bundle(uid, ch) formatted, cb2 = await trace_svc.format_chapter_bundle(cb) fm = formatted.format_meta cj = await judge.judge_memoir( memoir_markdown=md, source_transcript=formatted.source_transcript, structured_evidence=formatted.structured_evidence, reference_memoir_markdown=baseline_excerpt, evidence_notes=_chapter_evidence_notes( cb2.lineage_tier, formatted.evidence_summary, fm.truncated, fm.dropped_sections, ), ) chapter_results.append( { "id": ch.id, "title": ch.title, "order_index": ch.order_index, "baseline_title": bl.title if bl else None, "lineage_tier": cb2.lineage_tier, "evidence_summary": formatted.evidence_summary, "evidence_trace": cb2.model_dump(), "format_meta": fm.model_dump(), "judge": cj.model_dump() if cj else None, } ) except Exception as e: logger.warning("manual memoir chapter judges failed: {}", e) story_results: list[dict[str, Any]] = [] try: stories = await get_stories_for_user(self._db, uid, status="active") for st in stories[:_MAX_EVAL_STORIES]: body = (st.canonical_markdown or "").strip() if not body: continue md = f"# 故事:{st.title}\n\n{_clip_md_for_judge(body)}" sb = await trace_svc.build_story_bundle(uid, str(st.id)) formatted, sb2 = await trace_svc.format_story_bundle(sb) fm = formatted.format_meta sj = await judge.judge_memoir( memoir_markdown=md, source_transcript=formatted.source_transcript, structured_evidence=formatted.structured_evidence, evidence_notes=_chapter_evidence_notes( sb2.lineage_tier, formatted.evidence_summary, fm.truncated, fm.dropped_sections, ), ) story_results.append( { "id": st.id, "title": st.title, "stage": st.stage, "lineage_tier": sb2.lineage_tier, "evidence_summary": formatted.evidence_summary, "evidence_trace": sb2.model_dump(), "format_meta": fm.model_dump(), "judge": sj.model_dump() if sj else None, } ) except Exception as e: logger.warning("manual memoir story judges failed: {}", e) return { "user_id": uid, "chapter_results": chapter_results, "story_results": story_results, } async def memoir_snapshot(self, user_id: str) -> dict[str, Any]: uid = (user_id or "").strip() if not uid: raise EvaluationBadRequestError("user_id is required") chapters_out: list[dict[str, Any]] = [] stories_out: list[dict[str, Any]] = [] try: chapters = await get_chapters_for_memoir_list( uid, self._db, active_only=True, is_new_only=None ) for ch in chapters[:_MAX_EVAL_CHAPTERS]: chapters_out.append( { "id": ch.id, "title": ch.title, "category": ch.category, "order_index": ch.order_index, "canonical_markdown": ch.canonical_markdown, } ) except Exception as e: logger.warning("memoir snapshot chapters failed: {}", e) try: stories = await get_stories_for_user(self._db, uid, status="active") for st in stories[:_MAX_EVAL_STORIES]: stories_out.append( { "id": st.id, "title": st.title, "stage": st.stage, "canonical_markdown": st.canonical_markdown, } ) except Exception as e: logger.warning("memoir snapshot stories failed: {}", e) return { "user_id": uid, "chapters": chapters_out, "stories": stories_out, }