Files
life-echo/api/app/features/evaluation/judge_manual_service.py
Kevin 78b61c076e feat(eval): Playground GLM 评分落库并可恢复
在 conversations 表增加 playground_conversation_judge_json,流式/非流式对话评审结束后写入最近一次快照(整体分、逐轮分、对比文案、错误与基线文件名等)。新增只读 GET 供前端按会话拉取;评测台 Playground 切换会话时自动恢复,并提示基线是否和当时一致。
2026-04-08 16:51:08 +08:00

571 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""手动触发 GLM-5 评审(不写 eval_runsPlayground 对话评分写入 conversations 表)。"""
from __future__ import annotations
import re
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.dependencies import get_eval_judge_langchain_llm
from app.core.logging import get_logger
from app.features.conversation import repo as conversation_repo
from app.features.evaluation.errors import (
EvaluationBadRequestError,
EvaluationNotFoundError,
)
from app.features.evaluation.eval_trace_service import EvalTraceService
from app.features.evaluation.judge_service import EvalJudgeService
from app.features.evaluation.schemas import MemoirSectionBaselineOut
from app.features.evaluation.session_catalog_service import SessionCatalogService
from app.features.evaluation.transcript_for_judge import (
assistant_text_for_eval_display,
format_eval_turn_block,
format_export_turns_with_labels,
format_session_messages_with_turn_labels,
pair_session_messages_to_turns,
)
from app.features.evaluation.user_export_fixtures import read_user_export_fixture
from app.features.memoir.repo import get_chapters_for_memoir_list
from app.features.story.repo import get_stories_for_user
logger = get_logger(__name__)
_MAX_JUDGE_MARKDOWN_CHARS = 20_000
_MAX_EVAL_CHAPTERS = 30
_MAX_EVAL_STORIES = 40
_PRIOR_TRANSCRIPT_MAX_CHARS = 8000
async def _iter_turn_judgments_for_turns(
judge: EvalJudgeService,
turns: list[tuple[str, str]],
*,
sse_event: str,
) -> AsyncIterator[dict[str, Any]]:
"""与 `execute_eval_run` 相同的逐轮 prior 截断与块累积。"""
prior_blocks: list[str] = []
for idx, (u_raw, ai_raw) in enumerate(turns):
u = (u_raw or "").strip()
reply = assistant_text_for_eval_display(str(ai_raw))
prior = "\n\n".join(prior_blocks)
if len(prior) > _PRIOR_TRANSCRIPT_MAX_CHARS:
prior = prior[-_PRIOR_TRANSCRIPT_MAX_CHARS:]
tj = await judge.judge_turn(
prior_transcript=prior,
user_utterance=u,
assistant_reply=reply,
turn_index_0=idx,
)
yield {
"event": sse_event,
"turn_index": idx,
"ok": tj is not None,
"judge": tj.model_dump() if tj else None,
}
prior_blocks.append(format_eval_turn_block(idx, u, reply))
def _clip_md_for_judge(text: str, max_chars: int = _MAX_JUDGE_MARKDOWN_CHARS) -> str:
s = (text or "").strip()
if len(s) <= max_chars:
return s
return f"{s[:max_chars]}\n\n…(已截断供评审)"
async def _conversation_transcript_for_manual(
db: AsyncSession, conversation_id: str
) -> str:
rows = await conversation_repo.get_conversation_messages(conversation_id, db)
return format_session_messages_with_turn_labels(rows)
def _normalize_title_key(title: str) -> str:
t = (title or "").strip().lower()
t = re.sub(r"^#+\s*", "", t)
return re.sub(r"\s+", " ", t)
def _baseline_for_chapter_title(
baselines: list[MemoirSectionBaselineOut],
chapter_title: str,
index: int,
) -> MemoirSectionBaselineOut | None:
if baselines:
key = _normalize_title_key(chapter_title)
for b in baselines:
if _normalize_title_key(b.title) == key:
return b
if 0 <= index < len(baselines):
return baselines[index]
return None
class EvalJudgeManualService:
def __init__(self, db: AsyncSession) -> None:
self._db = db
async def _persist_playground_conversation_judge(
self, conversation_id: str, bundle: dict[str, Any]
) -> None:
try:
row = await conversation_repo.set_playground_conversation_judge_json(
conversation_id, self._db, bundle
)
if row is not None:
await self._db.commit()
except Exception:
logger.exception(
"persist playground_conversation_judge_json failed conversation_id={}",
conversation_id,
)
async def judge_conversation(
self,
conversation_id: str,
fixture_filename: str | None,
) -> dict[str, Any]:
cid = (conversation_id or "").strip()
if not cid:
raise EvaluationBadRequestError("conversation_id is required")
catalog = SessionCatalogService(self._db)
dialogue = await catalog.get_session_dialogue(cid)
if not dialogue:
raise EvaluationNotFoundError("conversation not found")
replay_transcript = format_session_messages_with_turn_labels(
list(dialogue.messages)
)
if not replay_transcript.strip():
raise EvaluationBadRequestError("no messages to judge")
fn = (fixture_filename or "").strip() or None
baseline_transcript = ""
if fn:
try:
turns, _ = read_user_export_fixture(fn)
baseline_transcript = format_export_turns_with_labels(turns)
except ValueError as e:
raise EvaluationBadRequestError(str(e)) from e
except FileNotFoundError as e:
raise EvaluationNotFoundError("fixture not found") from e
errors: list[str] = []
judge_llm = get_eval_judge_langchain_llm()
judge = EvalJudgeService(judge_llm)
baseline_judge_dict: dict[str, Any] | None = None
if baseline_transcript.strip():
baseline_result = await judge.judge_conversation_result(
full_transcript=baseline_transcript
)
bj = baseline_result.output
if bj:
baseline_judge_dict = bj.model_dump()
else:
errors.append(
f"baseline_glm5_failed: {baseline_result.error or 'unknown error'}"
)
elif fn:
errors.append("baseline_transcript_empty")
replay_result = await judge.judge_conversation_result(
full_transcript=replay_transcript
)
rj = replay_result.output
replay_judge_dict = rj.model_dump() if rj else None
if not rj:
errors.append(
f"replay_glm5_failed: {replay_result.error or 'unknown error'}"
)
bundle: dict[str, Any] = {
"version": 1,
"judged_at": datetime.now(timezone.utc).isoformat(),
"fixture_filename": fn,
"baseline_judge": baseline_judge_dict,
"replay_judge": replay_judge_dict,
"baseline_turn_judges": {},
"replay_turn_judges": {},
"compare_markdown": "",
"errors": list(errors),
"warnings": [],
"options": {
"include_turn_judges": False,
"include_baseline_turn_judges": False,
},
}
await self._persist_playground_conversation_judge(cid, bundle)
return {
"conversation_id": cid,
"fixture_filename": fn,
"baseline_transcript": baseline_transcript,
"replay_transcript": replay_transcript,
"baseline_judge": baseline_judge_dict,
"replay_judge": replay_judge_dict,
"errors": errors,
}
async def iter_conversation_judge_sse(
self,
conversation_id: str,
fixture_filename: str | None,
*,
include_turn_judges: bool = False,
include_baseline_turn_judges: bool = False,
) -> AsyncIterator[dict[str, Any]]:
"""供 SSE先整体基准分、再整体回放分可选逐轮分再流式对比与建议成功后写入 playground 字段。"""
acc: dict[str, Any] = {
"version": 1,
"fixture_filename": None,
"baseline_judge": None,
"replay_judge": None,
"baseline_turn_judges": {},
"replay_turn_judges": {},
"compare_markdown": "",
"errors": [],
"warnings": [],
"options": {
"include_turn_judges": include_turn_judges,
"include_baseline_turn_judges": include_baseline_turn_judges,
},
}
cid = (conversation_id or "").strip()
if not cid:
yield {
"event": "error",
"phase": "validate",
"message": "conversation_id is required",
}
return
catalog = SessionCatalogService(self._db)
dialogue = await catalog.get_session_dialogue(cid)
if not dialogue:
yield {
"event": "error",
"phase": "load",
"message": "conversation not found",
}
return
replay_transcript = format_session_messages_with_turn_labels(
list(dialogue.messages)
)
if not replay_transcript.strip():
yield {"event": "error", "phase": "load", "message": "no messages to judge"}
return
fn = (fixture_filename or "").strip() or None
baseline_transcript = ""
export_turns: list[tuple[str, str]] | None = None
if fn:
try:
turns, _ = read_user_export_fixture(fn)
export_turns = list(turns)
baseline_transcript = format_export_turns_with_labels(turns)
except ValueError as e:
yield {"event": "error", "phase": "fixture", "message": str(e)}
return
except FileNotFoundError:
yield {
"event": "error",
"phase": "fixture",
"message": "fixture not found",
}
return
judge_llm = get_eval_judge_langchain_llm()
if not judge_llm:
yield {
"event": "error",
"phase": "config",
"message": "评审 LLM 未配置eval_judge_api_key / zhipu_api_key",
}
return
acc["fixture_filename"] = fn
persist = True
try:
judge = EvalJudgeService(judge_llm)
yield {"event": "meta", "conversation_id": cid, "fixture_filename": fn}
if not baseline_transcript.strip():
wmsg = "未提供基准 MD 或基准无文本:仅对回放对话打分并输出单侧改进建议"
acc["warnings"].append(wmsg)
yield {"event": "warning", "message": wmsg}
baseline_judge = None
if baseline_transcript.strip():
baseline_result = await judge.judge_conversation_result(
full_transcript=baseline_transcript
)
baseline_judge = baseline_result.output
acc["baseline_judge"] = (
baseline_judge.model_dump() if baseline_judge else None
)
yield {
"event": "baseline_judge",
"ok": baseline_judge is not None,
"judge": acc["baseline_judge"],
}
if not baseline_judge:
err = (
f"基准整体打分失败:{baseline_result.error}"
if baseline_result.error
else "基准整体打分失败(密钥、限流或 JSON 解析失败,见服务端日志)"
)
acc["errors"].append(err)
yield {
"event": "error",
"phase": "baseline_glm5",
"message": err,
}
elif (
include_baseline_turn_judges
and export_turns
and baseline_judge is not None
):
yield {"event": "meta", "phase": "baseline_turn_judges_start"}
async for row in _iter_turn_judgments_for_turns(
judge,
export_turns,
sse_event="baseline_turn_judge",
):
if row.get("event") == "baseline_turn_judge":
idx = row.get("turn_index")
if isinstance(idx, (int, float)):
acc["baseline_turn_judges"][str(int(idx))] = row.get(
"judge"
)
yield row
else:
acc["baseline_judge"] = None
yield {
"event": "baseline_judge",
"ok": False,
"skipped": True,
"judge": None,
}
replay_result = await judge.judge_conversation_result(
full_transcript=replay_transcript
)
replay_judge = replay_result.output
acc["replay_judge"] = (
replay_judge.model_dump() if replay_judge else None
)
yield {
"event": "replay_judge",
"ok": replay_judge is not None,
"judge": acc["replay_judge"],
}
if not replay_judge:
err = (
f"回放对话整体 GLM-5 打分失败:{replay_result.error}"
if replay_result.error
else "回放对话整体 GLM-5 打分失败(空密钥、限流或 JSON 解析失败,见服务端日志)"
)
acc["errors"].append(err)
yield {
"event": "error",
"phase": "replay_glm5",
"message": err,
}
yield {"event": "done"}
return
if include_turn_judges:
replay_pairs = pair_session_messages_to_turns(list(dialogue.messages))
if replay_pairs:
yield {"event": "meta", "phase": "replay_turn_judges_start"}
async for row in _iter_turn_judgments_for_turns(
judge,
replay_pairs,
sse_event="replay_turn_judge",
):
if row.get("event") == "replay_turn_judge":
idx = row.get("turn_index")
if isinstance(idx, (int, float)):
acc["replay_turn_judges"][str(int(idx))] = row.get(
"judge"
)
yield row
async for piece in judge.stream_conversation_compare(
baseline_transcript=baseline_transcript,
replay_transcript=replay_transcript,
baseline_judge=baseline_judge,
replay_judge=replay_judge,
):
if piece:
acc["compare_markdown"] += piece
yield {"event": "compare_delta", "text": piece}
yield {"event": "done"}
finally:
if persist:
acc["judged_at"] = datetime.now(timezone.utc).isoformat()
await self._persist_playground_conversation_judge(cid, acc)
async def judge_memoir_for_user(
self,
user_id: str,
baseline_sections: list[MemoirSectionBaselineOut] | None,
) -> dict[str, Any]:
uid = (user_id or "").strip()
if not uid:
raise EvaluationBadRequestError("user_id is required")
judge_llm = get_eval_judge_langchain_llm()
judge = EvalJudgeService(judge_llm)
baselines = list(baseline_sections or [])
trace_svc = EvalTraceService(self._db)
def _chapter_evidence_notes(
lineage_tier: str, evidence_summary: str, truncated: bool, dropped: list[str]
) -> str:
drops = ",".join(dropped[:12]) if dropped else ""
return (
"严格按文档打分;真实性、事实覆盖率、可追溯性以本章节绑定的证据闭包为准。"
f" lineage_tier={lineage_tier}evidence_summary={evidence_summary}"
f" prompt_truncated={truncated}dropped_sections={drops or 'none'}"
)
chapter_results: list[dict[str, Any]] = []
try:
chapters = await get_chapters_for_memoir_list(
uid, self._db, active_only=True, is_new_only=None
)
for i, ch in enumerate(chapters[:_MAX_EVAL_CHAPTERS]):
body = (ch.canonical_markdown or "").strip()
if not body:
continue
bl = _baseline_for_chapter_title(baselines, str(ch.title or ""), i)
baseline_excerpt = ""
if bl and (bl.body or "").strip():
baseline_excerpt = _clip_md_for_judge(bl.body, max_chars=6000)
md = f"# 章节:{ch.title}\n\n"
if baseline_excerpt:
md += f"## 导出基线(节选)\n\n{baseline_excerpt}\n\n"
md += f"## 当前成稿\n\n{_clip_md_for_judge(body)}"
cb = await trace_svc.build_chapter_bundle(uid, ch)
formatted, cb2 = await trace_svc.format_chapter_bundle(cb)
fm = formatted.format_meta
cj = await judge.judge_memoir(
memoir_markdown=md,
source_transcript=formatted.source_transcript,
structured_evidence=formatted.structured_evidence,
reference_memoir_markdown=baseline_excerpt,
evidence_notes=_chapter_evidence_notes(
cb2.lineage_tier,
formatted.evidence_summary,
fm.truncated,
fm.dropped_sections,
),
)
chapter_results.append(
{
"id": ch.id,
"title": ch.title,
"order_index": ch.order_index,
"baseline_title": bl.title if bl else None,
"lineage_tier": cb2.lineage_tier,
"evidence_summary": formatted.evidence_summary,
"evidence_trace": cb2.model_dump(),
"format_meta": fm.model_dump(),
"judge": cj.model_dump() if cj else None,
}
)
except Exception as e:
logger.warning("manual memoir chapter judges failed: {}", e)
story_results: list[dict[str, Any]] = []
try:
stories = await get_stories_for_user(self._db, uid, status="active")
for st in stories[:_MAX_EVAL_STORIES]:
body = (st.canonical_markdown or "").strip()
if not body:
continue
md = f"# 故事:{st.title}\n\n{_clip_md_for_judge(body)}"
sb = await trace_svc.build_story_bundle(uid, str(st.id))
formatted, sb2 = await trace_svc.format_story_bundle(sb)
fm = formatted.format_meta
sj = await judge.judge_memoir(
memoir_markdown=md,
source_transcript=formatted.source_transcript,
structured_evidence=formatted.structured_evidence,
evidence_notes=_chapter_evidence_notes(
sb2.lineage_tier,
formatted.evidence_summary,
fm.truncated,
fm.dropped_sections,
),
)
story_results.append(
{
"id": st.id,
"title": st.title,
"stage": st.stage,
"lineage_tier": sb2.lineage_tier,
"evidence_summary": formatted.evidence_summary,
"evidence_trace": sb2.model_dump(),
"format_meta": fm.model_dump(),
"judge": sj.model_dump() if sj else None,
}
)
except Exception as e:
logger.warning("manual memoir story judges failed: {}", e)
return {
"user_id": uid,
"chapter_results": chapter_results,
"story_results": story_results,
}
async def memoir_snapshot(self, user_id: str) -> dict[str, Any]:
uid = (user_id or "").strip()
if not uid:
raise EvaluationBadRequestError("user_id is required")
chapters_out: list[dict[str, Any]] = []
stories_out: list[dict[str, Any]] = []
try:
chapters = await get_chapters_for_memoir_list(
uid, self._db, active_only=True, is_new_only=None
)
for ch in chapters[:_MAX_EVAL_CHAPTERS]:
chapters_out.append(
{
"id": ch.id,
"title": ch.title,
"category": ch.category,
"order_index": ch.order_index,
"canonical_markdown": ch.canonical_markdown,
}
)
except Exception as e:
logger.warning("memoir snapshot chapters failed: {}", e)
try:
stories = await get_stories_for_user(self._db, uid, status="active")
for st in stories[:_MAX_EVAL_STORIES]:
stories_out.append(
{
"id": st.id,
"title": st.title,
"stage": st.stage,
"canonical_markdown": st.canonical_markdown,
}
)
except Exception as e:
logger.warning("memoir snapshot stories failed: {}", e)
return {
"user_id": uid,
"chapters": chapters_out,
"stories": stories_out,
}