Files
life-echo/api/app/features/evaluation/judge_service.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

526 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""评测台评审:智谱 / DeepSeek 等 OpenAI 兼容端点(结构化 JSON"""
from __future__ import annotations
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
from app.core.config import settings
from app.core.eval_judge_spec import EvalJudgeProvider
from app.core.llm_call import LLMCallError, allm_json_call
from app.core.logging import get_logger
from app.features.evaluation.judge_schemas import (
ConversationJudgeOutput,
MemoirJudgeOutput,
TurnJudgeOutput,
)
from app.features.evaluation.rubrics.conversation_v1 import (
COMPARE_CONV_STREAM_HINT,
CONV_JUDGE_INSTRUCTIONS,
TURN_JUDGE_INSTRUCTIONS,
)
from app.features.evaluation.rubrics.memoir_v1 import MEMOIR_JUDGE_INSTRUCTIONS
from app.features.evaluation.constants import eval_cfg
from app.features.memoir.constants import memoir
logger = get_logger(__name__)
TJudgeOutput = TypeVar(
"TJudgeOutput", TurnJudgeOutput, ConversationJudgeOutput, MemoirJudgeOutput
)
_TURN_MAX = 768
_CONV_JUDGE_JSON_MAX = 2048
_CONV_HEADER = "【完整对话】(每轮以 `[Turn k]` 开头)\n\n"
_COMPARE_STREAM_MAX = 6144
def _eval_judge_prompt_char_pool_for_context(context_window_tokens: int) -> int:
"""整段请求的字符预算(由评审模型 context window 推导,保守)。"""
toks = (
int(context_window_tokens)
- eval_cfg.judge_completion_reserve_tokens
- eval_cfg.judge_prompt_budget_safety_tokens
)
toks = max(1, toks)
return max(1, int(toks / eval_cfg.judge_approx_tokens_per_char))
def _eval_judge_prompt_char_pool() -> int:
return _eval_judge_prompt_char_pool_for_context(
eval_cfg.judge_context_window_tokens
)
def eval_judge_conversation_transcript_max_chars() -> int:
"""整段对话评审【完整对话】transcript 最大字符数(默认 GLM 上下文)。"""
if eval_cfg.judge_max_transcript_chars > 0:
return eval_cfg.judge_max_transcript_chars
overhead = len(CONV_JUDGE_INSTRUCTIONS) + len(_CONV_HEADER) + 32
return max(1, _eval_judge_prompt_char_pool() - overhead)
def eval_judge_conversation_transcript_max_chars_for_context(
context_window_tokens: int,
) -> int:
if eval_cfg.judge_max_transcript_chars > 0:
return eval_cfg.judge_max_transcript_chars
overhead = len(CONV_JUDGE_INSTRUCTIONS) + len(_CONV_HEADER) + 32
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
return max(1, pool - overhead)
def eval_judge_turn_prior_transcript_max_chars() -> int:
"""逐轮评审:截至上一轮的 transcript 节选上限(默认 GLM 上下文)。"""
if eval_cfg.judge_max_transcript_chars > 0:
return eval_cfg.judge_max_transcript_chars
static = len(TURN_JUDGE_INSTRUCTIONS) + 8800
return max(1, _eval_judge_prompt_char_pool() - static)
def eval_judge_turn_prior_transcript_max_chars_for_context(
context_window_tokens: int,
) -> int:
if eval_cfg.judge_max_transcript_chars > 0:
return eval_cfg.judge_max_transcript_chars
static = len(TURN_JUDGE_INSTRUCTIONS) + 8800
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
return max(1, pool - static)
def eval_judge_compare_transcript_each_max_chars() -> int:
"""单侧对称参考上限(默认与 eval_cfg.judge_context_window_tokens 一致)。"""
return eval_judge_compare_transcript_each_max_chars_for_context(
eval_cfg.judge_context_window_tokens
)
def eval_judge_compare_transcript_pair_total_budget_for_context(
context_window_tokens: int,
) -> int:
"""A/B 同 prompt 时,两份 transcript 合计最大字符数(已扣对比模板与双份 JSON 等开销)。"""
if eval_cfg.judge_max_compare_transcript_chars_each > 0:
return max(1, 2 * int(eval_cfg.judge_max_compare_transcript_chars_each))
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
return max(1, pool - int(eval_cfg.judge_compare_prompt_overhead_chars))
def eval_judge_compare_transcript_each_max_chars_for_context(
context_window_tokens: int,
) -> int:
"""单侧对称上限的参考值auto 模式下约为合计预算的一半;供兼容与展示)。"""
if eval_cfg.judge_max_compare_transcript_chars_each > 0:
return int(eval_cfg.judge_max_compare_transcript_chars_each)
total = eval_judge_compare_transcript_pair_total_budget_for_context(
context_window_tokens
)
return max(1, total // 2)
def eval_judge_compare_bundle_caps(
context_window_tokens: int,
) -> tuple[int, int | None]:
"""返回 (compare_cap_total, per_side_cap|None),供 Playground 摘要与流式对比共用。"""
per = int(eval_cfg.judge_max_compare_transcript_chars_each or 0)
if per > 0:
return max(1, 2 * per), per
return eval_judge_compare_transcript_pair_total_budget_for_context(
context_window_tokens
), None
def trim_compare_transcript_pair(
baseline: str,
replay: str,
*,
total_max_chars: int,
per_side_max_chars: int | None = None,
) -> tuple[str, str, bool, bool]:
"""A/B 对比 prompt 用:在合计预算内尽量保留全文;仅超长时优先从较长的一侧裁尾部。
若配置了 eval_judge_max_compare_transcript_chars_each则仍按单侧硬顶与旧行为一致
"""
b = (baseline or "").strip()
r = (replay or "").strip()
if per_side_max_chars is not None and int(per_side_max_chars) > 0:
cap = int(per_side_max_chars)
return b[:cap], r[:cap], len(b) > cap, len(r) > cap
cap_total = max(1, int(total_max_chars))
if len(b) + len(r) <= cap_total:
return b, r, False, False
need_drop = len(b) + len(r) - cap_total
b2, r2 = b, r
while need_drop > 0 and (b2 or r2):
if len(b2) >= len(r2):
if b2:
b2 = b2[:-1]
need_drop -= 1
elif r2:
r2 = r2[:-1]
need_drop -= 1
else:
break
else:
if r2:
r2 = r2[:-1]
need_drop -= 1
elif b2:
b2 = b2[:-1]
need_drop -= 1
else:
break
return b2, r2, len(b) > len(b2), len(r) > len(r2)
_CONV_JUDGE_TRANSCRIPT_TRUNCATION_TAIL = (
"\n\n【评审边界——输入已为截断稿】\n"
"以上仅为全文前 {n} 个字符,其后未提供给模型。"
"对依赖长程多轮轨迹的细项(尤其 context_memory、interview_structure、跨轮重复盘问"
"必须保守给分(倾向区间中低),并在 insufficient_evidence 写明「输入为截断稿,长程证据不足」;"
"不得臆断未展示轮次中的行为confidence 须显著降低;禁止因未见问题而默认高分或推断后半段无缺陷。\n"
)
_TURN_PRIOR_TRUNCATION_TAIL = (
"\n\n【评审边界——上文节选已截断】\n"
"「截至上一轮」节选可能仅为更长对话的前 {n} 字;跨轮重复、长程结构若无法从节选核实,"
"须在 insufficient_evidence 说明,并对相关细项保守给分。\n"
)
_COMPARE_STREAM_PAIR_TRUNCATION_NOTE = (
"\n【评审边界】以下 A/B transcript 至少一侧为截断稿,请仅就**已展示片段**比较;"
"不得断言未展示轮次的优劣;涉及跨轮重复盘问等须明确证据范围或说不足以判断。\n"
)
def conversation_judge_transcript_excerpt(full_transcript: str, cap: int) -> str:
"""整段评审:在 cap 内截断时在正文后附加边界说明,减少「假装看了全文」的幻觉打分。"""
raw = (full_transcript or "").strip()
c = max(0, int(cap))
if len(raw) <= c:
return raw
return raw[:c] + _CONV_JUDGE_TRANSCRIPT_TRUNCATION_TAIL.format(n=c)
def turn_judge_prior_excerpt(prior_transcript: str, cap: int) -> str:
"""逐轮评审里「截至上一轮」节选;截断时附加边界说明。"""
raw = (prior_transcript or "").strip()
c = max(0, int(cap))
if len(raw) <= c:
return raw
return raw[:c] + _TURN_PRIOR_TRUNCATION_TAIL.format(n=c)
@dataclass(slots=True)
class JudgeCallResult(Generic[TJudgeOutput]):
output: TJudgeOutput | None
error: str | None = None
def _judge_error_message(e: LLMCallError) -> str:
prefix = {
"invoke": "模型调用失败",
"decode": "JSON 解析失败",
"validation": "结果校验失败",
}.get(e.kind, "评审失败")
detail = str(e).strip()
return f"{prefix}: {detail}" if detail else prefix
def _build_memoir_judge_prompt(
*,
memoir_markdown: str,
source_transcript: str = "",
structured_evidence: str = "",
reference_memoir_markdown: str = "",
evidence_notes: str = "",
) -> str:
"""Assemble an evidence-aware memoir judging prompt."""
source = (source_transcript or "").strip()
struct = (structured_evidence or "").strip()
reference = (reference_memoir_markdown or "").strip()
notes = (evidence_notes or "").strip()
sections = [
MEMOIR_JUDGE_INSTRUCTIONS,
"",
"【证据与输入顺序】以下区块按优先级给出:"
"评审说明(若有)→ 原始访谈/对话证据segment 绑定)→ 结构化记忆证据chunk/fact/timeline/summary"
"→ 参考基线(若有)→ 待评成稿。**真实性、覆盖率、可追溯性以「artifact 绑定证据闭包」为准**"
"若证据不足,须保守打分并写 `insufficient_evidence`。",
"",
]
ev_cap = max(1, int(eval_cfg.judge_memoir_evidence_max_chars))
body_cap = max(1, int(eval_cfg.judge_memoir_body_max_chars))
if notes:
sections.extend(["【评审说明】", notes[:1200], ""])
if source:
sections.extend(["【原始访谈/对话证据】", source[:ev_cap], ""])
else:
sections.extend(
[
"【原始访谈/对话证据】",
"无可用局部对话证据。对于记忆忠实度、事实准确性、事实覆盖率、记忆可追溯性,必须保守打分,不得凭空高分。",
"",
]
)
if struct:
sections.extend(["【结构化记忆证据】", struct[:ev_cap], ""])
else:
sections.extend(
[
"【结构化记忆证据】",
"(本 artifact 未绑定或未解析到 chunk/fact/timeline/summary 证据。)",
"",
]
)
if reference:
sections.extend(["【参考基线/导出成稿】", reference[:ev_cap], ""])
sections.extend(["【当前回忆录正文】", memoir_markdown[:body_cap]])
return "\n".join(sections)
class EvalJudgeService:
def __init__(
self,
judge_llm: Any | None,
*,
context_window_tokens: int | None = None,
http_error_vendor: EvalJudgeProvider = "deepseek",
) -> None:
self._llm = judge_llm
self._http_error_vendor: EvalJudgeProvider = http_error_vendor
self._ctx_tokens = int(
context_window_tokens or eval_cfg.judge_context_window_tokens
)
def _conv_transcript_cap(self) -> int:
return eval_judge_conversation_transcript_max_chars_for_context(
self._ctx_tokens
)
def _turn_prior_cap(self) -> int:
return eval_judge_turn_prior_transcript_max_chars_for_context(self._ctx_tokens)
async def judge_turn(
self,
*,
prior_transcript: str,
user_utterance: str,
assistant_reply: str,
turn_index_0: int = 0,
) -> TurnJudgeOutput | None:
if not self._llm:
return None
t = max(0, int(turn_index_0))
prompt = f"""{TURN_JUDGE_INSTRUCTIONS}
【本轮位置】完整对话中当前轮次为 Turn {t + 1}(与下方节选及全量 transcript 的 `[Turn ...]` 编号一致。evidence_refs.turn_index 请使用该编号。
【截至上一轮的对话节选】(含 `[Turn k]` 标签)
{turn_judge_prior_excerpt(prior_transcript, self._turn_prior_cap())}
【本轮用户】
{user_utterance[:4000]}
【本轮 AI】
{assistant_reply[:4000]}
"""
try:
return await allm_json_call(
self._llm,
prompt,
TurnJudgeOutput,
max_tokens=_TURN_MAX,
agent="EvalJudgeService.judge_turn",
http_error_vendor=self._http_error_vendor,
)
except LLMCallError as e:
logger.warning("turn judge failed: {}", e)
return None
async def judge_conversation_result(
self, *, full_transcript: str
) -> JudgeCallResult[ConversationJudgeOutput]:
if not self._llm:
return JudgeCallResult(
output=None,
error="评审模型未配置(智谱或 DeepSeek 密钥)",
)
prompt = f"""{CONV_JUDGE_INSTRUCTIONS}
【完整对话】(每轮以 `[Turn k]` 开头)
{conversation_judge_transcript_excerpt(full_transcript, self._conv_transcript_cap())}
"""
try:
out = await allm_json_call(
self._llm,
prompt,
ConversationJudgeOutput,
max_tokens=_CONV_JUDGE_JSON_MAX,
agent="EvalJudgeService.judge_conversation",
http_error_vendor=self._http_error_vendor,
)
return JudgeCallResult(output=out)
except LLMCallError as e:
error = _judge_error_message(e)
logger.warning("conversation judge failed: {}", error)
return JudgeCallResult(output=None, error=error)
async def judge_conversation(
self, *, full_transcript: str
) -> ConversationJudgeOutput | None:
result = await self.judge_conversation_result(full_transcript=full_transcript)
return result.output
async def stream_conversation_compare(
self,
*,
baseline_transcript: str,
replay_transcript: str,
baseline_judge: ConversationJudgeOutput | None,
replay_judge: ConversationJudgeOutput | None,
) -> AsyncIterator[str]:
"""流式输出中文对比与建议(非 JSON"""
if not self._llm:
yield "[错误] 未配置评审模型 API Key智谱ZHIPU_API_KEYDeepSeekDEEPSEEK_API_KEY"
return
cap_total, per_side = eval_judge_compare_bundle_caps(self._ctx_tokens)
cap_single = self._conv_transcript_cap()
b_tr, r_tr, b_cmp_trunc, r_cmp_trunc = trim_compare_transcript_pair(
baseline_transcript or "",
replay_transcript or "",
total_max_chars=cap_total,
per_side_max_chars=per_side,
)
compare_pair_truncated = b_cmp_trunc or r_cmp_trunc
b_json = (
baseline_judge.model_dump_json(ensure_ascii=False)
if baseline_judge
else "null"
)
r_json = (
replay_judge.model_dump_json(ensure_ascii=False) if replay_judge else "null"
)
if baseline_judge and replay_judge:
trunc_line = (
_COMPARE_STREAM_PAIR_TRUNCATION_NOTE if compare_pair_truncated else ""
)
prompt = f"""你是访谈对话评测专家。下面给出两份对话 transcript 及各自的整体打分JSON。请用中文直接写正文不要用 JSON、不要用 Markdown 代码块):
【A导出基准对话】历史快照用户与当时导出的线上 AI多轮合并为一篇
{b_tr}
【B本次回放/新测对话】用户句与基准对齐AI 为当前后端重新生成)
{r_tr}
{trunc_line}
【A 的整体评分 JSON】
{b_json}
【B 的整体评分 JSON】
{r_json}
请依次撰写:
1) 两段对话在整体体验上的主要差异(情绪承接、信息挖掘、人物建模、访谈结构、提问质量、上下文与重复盘问等);
2) B 相对 A 的优点与不足;
3) 若 B 在关键维度明显弱于 A给出可操作的改进方向系统提示、访谈策略、模型或温度等
笔调简洁、偏执行清单。"""
elif replay_judge:
r_one = conversation_judge_transcript_excerpt(
replay_transcript or "", cap_single
)
prompt = f"""{COMPARE_CONV_STREAM_HINT}
【回放/新测 transcript】
{r_one}
【整体评分 JSON】
{r_json}
"""
else:
yield "[错误] 缺少回放对话评分,无法生成建议"
return
llm = self._llm
if hasattr(llm, "bind"):
llm = llm.bind(max_tokens=_COMPARE_STREAM_MAX)
try:
from app.core.llm_telemetry import infer_provider_model, observe_astream
provider, model = infer_provider_model(llm, http_error_vendor="zhipu")
async for chunk in observe_astream(
llm,
prompt,
agent="EvalJudge.stream_conversation_compare",
provider=provider,
model=model,
):
piece = getattr(chunk, "content", None)
if piece:
yield piece
except Exception as e:
logger.warning("conversation compare stream failed: {}", e)
yield f"\n\n[流式输出中断:{e}]"
async def judge_memoir(
self,
*,
memoir_markdown: str,
source_transcript: str = "",
structured_evidence: str = "",
reference_memoir_markdown: str = "",
evidence_notes: str = "",
) -> MemoirJudgeOutput | None:
result = await self.judge_memoir_result(
memoir_markdown=memoir_markdown,
source_transcript=source_transcript,
structured_evidence=structured_evidence,
reference_memoir_markdown=reference_memoir_markdown,
evidence_notes=evidence_notes,
)
return result.output
async def judge_memoir_result(
self,
*,
memoir_markdown: str,
source_transcript: str = "",
structured_evidence: str = "",
reference_memoir_markdown: str = "",
evidence_notes: str = "",
) -> JudgeCallResult[MemoirJudgeOutput]:
if not self._llm:
return JudgeCallResult(
output=None,
error="评审模型未配置(智谱或 DeepSeek 密钥)",
)
prompt = _build_memoir_judge_prompt(
memoir_markdown=memoir_markdown,
source_transcript=source_transcript,
structured_evidence=structured_evidence,
reference_memoir_markdown=reference_memoir_markdown,
evidence_notes=evidence_notes,
)
try:
out = await allm_json_call(
self._llm,
prompt,
MemoirJudgeOutput,
max_tokens=max(
512, int(eval_cfg.judge_memoir_completion_max_tokens)
),
agent="EvalJudgeService.judge_memoir",
http_error_vendor=self._http_error_vendor,
)
return JudgeCallResult(output=out)
except LLMCallError as e:
error = _judge_error_message(e)
# 回忆录评审在 INFO 也要可见eval-web 排障);非异常路径、不刷堆栈
logger.info(
"event=eval_memoir_judge_llm_call_failed agent=EvalJudgeService.judge_memoir msg={}",
error,
)
return JudgeCallResult(output=None, error=error)