配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
526 lines
19 KiB
Python
526 lines
19 KiB
Python
"""评测台评审:智谱 / DeepSeek 等 OpenAI 兼容端点(结构化 JSON)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections.abc import AsyncIterator
|
||
from dataclasses import dataclass
|
||
from typing import Any, Generic, TypeVar
|
||
|
||
from app.core.config import settings
|
||
from app.core.eval_judge_spec import EvalJudgeProvider
|
||
from app.core.llm_call import LLMCallError, allm_json_call
|
||
from app.core.logging import get_logger
|
||
from app.features.evaluation.judge_schemas import (
|
||
ConversationJudgeOutput,
|
||
MemoirJudgeOutput,
|
||
TurnJudgeOutput,
|
||
)
|
||
from app.features.evaluation.rubrics.conversation_v1 import (
|
||
COMPARE_CONV_STREAM_HINT,
|
||
CONV_JUDGE_INSTRUCTIONS,
|
||
TURN_JUDGE_INSTRUCTIONS,
|
||
)
|
||
from app.features.evaluation.rubrics.memoir_v1 import MEMOIR_JUDGE_INSTRUCTIONS
|
||
from app.features.evaluation.constants import eval_cfg
|
||
from app.features.memoir.constants import memoir
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
TJudgeOutput = TypeVar(
|
||
"TJudgeOutput", TurnJudgeOutput, ConversationJudgeOutput, MemoirJudgeOutput
|
||
)
|
||
|
||
_TURN_MAX = 768
|
||
_CONV_JUDGE_JSON_MAX = 2048
|
||
_CONV_HEADER = "【完整对话】(每轮以 `[Turn k]` 开头)\n\n"
|
||
_COMPARE_STREAM_MAX = 6144
|
||
|
||
|
||
def _eval_judge_prompt_char_pool_for_context(context_window_tokens: int) -> int:
|
||
"""整段请求的字符预算(由评审模型 context window 推导,保守)。"""
|
||
toks = (
|
||
int(context_window_tokens)
|
||
- eval_cfg.judge_completion_reserve_tokens
|
||
- eval_cfg.judge_prompt_budget_safety_tokens
|
||
)
|
||
toks = max(1, toks)
|
||
return max(1, int(toks / eval_cfg.judge_approx_tokens_per_char))
|
||
|
||
|
||
def _eval_judge_prompt_char_pool() -> int:
|
||
return _eval_judge_prompt_char_pool_for_context(
|
||
eval_cfg.judge_context_window_tokens
|
||
)
|
||
|
||
|
||
def eval_judge_conversation_transcript_max_chars() -> int:
|
||
"""整段对话评审:【完整对话】transcript 最大字符数(默认 GLM 上下文)。"""
|
||
if eval_cfg.judge_max_transcript_chars > 0:
|
||
return eval_cfg.judge_max_transcript_chars
|
||
overhead = len(CONV_JUDGE_INSTRUCTIONS) + len(_CONV_HEADER) + 32
|
||
return max(1, _eval_judge_prompt_char_pool() - overhead)
|
||
|
||
|
||
def eval_judge_conversation_transcript_max_chars_for_context(
|
||
context_window_tokens: int,
|
||
) -> int:
|
||
if eval_cfg.judge_max_transcript_chars > 0:
|
||
return eval_cfg.judge_max_transcript_chars
|
||
overhead = len(CONV_JUDGE_INSTRUCTIONS) + len(_CONV_HEADER) + 32
|
||
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
|
||
return max(1, pool - overhead)
|
||
|
||
|
||
def eval_judge_turn_prior_transcript_max_chars() -> int:
|
||
"""逐轮评审:截至上一轮的 transcript 节选上限(默认 GLM 上下文)。"""
|
||
if eval_cfg.judge_max_transcript_chars > 0:
|
||
return eval_cfg.judge_max_transcript_chars
|
||
static = len(TURN_JUDGE_INSTRUCTIONS) + 8800
|
||
return max(1, _eval_judge_prompt_char_pool() - static)
|
||
|
||
|
||
def eval_judge_turn_prior_transcript_max_chars_for_context(
|
||
context_window_tokens: int,
|
||
) -> int:
|
||
if eval_cfg.judge_max_transcript_chars > 0:
|
||
return eval_cfg.judge_max_transcript_chars
|
||
static = len(TURN_JUDGE_INSTRUCTIONS) + 8800
|
||
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
|
||
return max(1, pool - static)
|
||
|
||
|
||
def eval_judge_compare_transcript_each_max_chars() -> int:
|
||
"""单侧对称参考上限(默认与 eval_cfg.judge_context_window_tokens 一致)。"""
|
||
return eval_judge_compare_transcript_each_max_chars_for_context(
|
||
eval_cfg.judge_context_window_tokens
|
||
)
|
||
|
||
|
||
def eval_judge_compare_transcript_pair_total_budget_for_context(
|
||
context_window_tokens: int,
|
||
) -> int:
|
||
"""A/B 同 prompt 时,两份 transcript 合计最大字符数(已扣对比模板与双份 JSON 等开销)。"""
|
||
if eval_cfg.judge_max_compare_transcript_chars_each > 0:
|
||
return max(1, 2 * int(eval_cfg.judge_max_compare_transcript_chars_each))
|
||
pool = _eval_judge_prompt_char_pool_for_context(context_window_tokens)
|
||
return max(1, pool - int(eval_cfg.judge_compare_prompt_overhead_chars))
|
||
|
||
|
||
def eval_judge_compare_transcript_each_max_chars_for_context(
|
||
context_window_tokens: int,
|
||
) -> int:
|
||
"""单侧对称上限的参考值(auto 模式下约为合计预算的一半;供兼容与展示)。"""
|
||
if eval_cfg.judge_max_compare_transcript_chars_each > 0:
|
||
return int(eval_cfg.judge_max_compare_transcript_chars_each)
|
||
total = eval_judge_compare_transcript_pair_total_budget_for_context(
|
||
context_window_tokens
|
||
)
|
||
return max(1, total // 2)
|
||
|
||
|
||
def eval_judge_compare_bundle_caps(
|
||
context_window_tokens: int,
|
||
) -> tuple[int, int | None]:
|
||
"""返回 (compare_cap_total, per_side_cap|None),供 Playground 摘要与流式对比共用。"""
|
||
per = int(eval_cfg.judge_max_compare_transcript_chars_each or 0)
|
||
if per > 0:
|
||
return max(1, 2 * per), per
|
||
return eval_judge_compare_transcript_pair_total_budget_for_context(
|
||
context_window_tokens
|
||
), None
|
||
|
||
|
||
def trim_compare_transcript_pair(
|
||
baseline: str,
|
||
replay: str,
|
||
*,
|
||
total_max_chars: int,
|
||
per_side_max_chars: int | None = None,
|
||
) -> tuple[str, str, bool, bool]:
|
||
"""A/B 对比 prompt 用:在合计预算内尽量保留全文;仅超长时优先从较长的一侧裁尾部。
|
||
|
||
若配置了 eval_judge_max_compare_transcript_chars_each,则仍按单侧硬顶(与旧行为一致)。
|
||
"""
|
||
b = (baseline or "").strip()
|
||
r = (replay or "").strip()
|
||
if per_side_max_chars is not None and int(per_side_max_chars) > 0:
|
||
cap = int(per_side_max_chars)
|
||
return b[:cap], r[:cap], len(b) > cap, len(r) > cap
|
||
|
||
cap_total = max(1, int(total_max_chars))
|
||
if len(b) + len(r) <= cap_total:
|
||
return b, r, False, False
|
||
|
||
need_drop = len(b) + len(r) - cap_total
|
||
b2, r2 = b, r
|
||
while need_drop > 0 and (b2 or r2):
|
||
if len(b2) >= len(r2):
|
||
if b2:
|
||
b2 = b2[:-1]
|
||
need_drop -= 1
|
||
elif r2:
|
||
r2 = r2[:-1]
|
||
need_drop -= 1
|
||
else:
|
||
break
|
||
else:
|
||
if r2:
|
||
r2 = r2[:-1]
|
||
need_drop -= 1
|
||
elif b2:
|
||
b2 = b2[:-1]
|
||
need_drop -= 1
|
||
else:
|
||
break
|
||
return b2, r2, len(b) > len(b2), len(r) > len(r2)
|
||
|
||
|
||
_CONV_JUDGE_TRANSCRIPT_TRUNCATION_TAIL = (
|
||
"\n\n【评审边界——输入已为截断稿】\n"
|
||
"以上仅为全文前 {n} 个字符,其后未提供给模型。"
|
||
"对依赖长程多轮轨迹的细项(尤其 context_memory、interview_structure、跨轮重复盘问)"
|
||
"必须保守给分(倾向区间中低),并在 insufficient_evidence 写明「输入为截断稿,长程证据不足」;"
|
||
"不得臆断未展示轮次中的行为;confidence 须显著降低;禁止因未见问题而默认高分或推断后半段无缺陷。\n"
|
||
)
|
||
|
||
_TURN_PRIOR_TRUNCATION_TAIL = (
|
||
"\n\n【评审边界——上文节选已截断】\n"
|
||
"「截至上一轮」节选可能仅为更长对话的前 {n} 字;跨轮重复、长程结构若无法从节选核实,"
|
||
"须在 insufficient_evidence 说明,并对相关细项保守给分。\n"
|
||
)
|
||
|
||
_COMPARE_STREAM_PAIR_TRUNCATION_NOTE = (
|
||
"\n【评审边界】以下 A/B transcript 至少一侧为截断稿,请仅就**已展示片段**比较;"
|
||
"不得断言未展示轮次的优劣;涉及跨轮重复盘问等须明确证据范围或说不足以判断。\n"
|
||
)
|
||
|
||
|
||
def conversation_judge_transcript_excerpt(full_transcript: str, cap: int) -> str:
|
||
"""整段评审:在 cap 内截断时在正文后附加边界说明,减少「假装看了全文」的幻觉打分。"""
|
||
raw = (full_transcript or "").strip()
|
||
c = max(0, int(cap))
|
||
if len(raw) <= c:
|
||
return raw
|
||
return raw[:c] + _CONV_JUDGE_TRANSCRIPT_TRUNCATION_TAIL.format(n=c)
|
||
|
||
|
||
def turn_judge_prior_excerpt(prior_transcript: str, cap: int) -> str:
|
||
"""逐轮评审里「截至上一轮」节选;截断时附加边界说明。"""
|
||
raw = (prior_transcript or "").strip()
|
||
c = max(0, int(cap))
|
||
if len(raw) <= c:
|
||
return raw
|
||
return raw[:c] + _TURN_PRIOR_TRUNCATION_TAIL.format(n=c)
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class JudgeCallResult(Generic[TJudgeOutput]):
|
||
output: TJudgeOutput | None
|
||
error: str | None = None
|
||
|
||
|
||
def _judge_error_message(e: LLMCallError) -> str:
|
||
prefix = {
|
||
"invoke": "模型调用失败",
|
||
"decode": "JSON 解析失败",
|
||
"validation": "结果校验失败",
|
||
}.get(e.kind, "评审失败")
|
||
detail = str(e).strip()
|
||
return f"{prefix}: {detail}" if detail else prefix
|
||
|
||
|
||
def _build_memoir_judge_prompt(
|
||
*,
|
||
memoir_markdown: str,
|
||
source_transcript: str = "",
|
||
structured_evidence: str = "",
|
||
reference_memoir_markdown: str = "",
|
||
evidence_notes: str = "",
|
||
) -> str:
|
||
"""Assemble an evidence-aware memoir judging prompt."""
|
||
source = (source_transcript or "").strip()
|
||
struct = (structured_evidence or "").strip()
|
||
reference = (reference_memoir_markdown or "").strip()
|
||
notes = (evidence_notes or "").strip()
|
||
sections = [
|
||
MEMOIR_JUDGE_INSTRUCTIONS,
|
||
"",
|
||
"【证据与输入顺序】以下区块按优先级给出:"
|
||
"评审说明(若有)→ 原始访谈/对话证据(segment 绑定)→ 结构化记忆证据(chunk/fact/timeline/summary)"
|
||
"→ 参考基线(若有)→ 待评成稿。**真实性、覆盖率、可追溯性以「artifact 绑定证据闭包」为准**;"
|
||
"若证据不足,须保守打分并写 `insufficient_evidence`。",
|
||
"",
|
||
]
|
||
ev_cap = max(1, int(eval_cfg.judge_memoir_evidence_max_chars))
|
||
body_cap = max(1, int(eval_cfg.judge_memoir_body_max_chars))
|
||
if notes:
|
||
sections.extend(["【评审说明】", notes[:1200], ""])
|
||
if source:
|
||
sections.extend(["【原始访谈/对话证据】", source[:ev_cap], ""])
|
||
else:
|
||
sections.extend(
|
||
[
|
||
"【原始访谈/对话证据】",
|
||
"无可用局部对话证据。对于记忆忠实度、事实准确性、事实覆盖率、记忆可追溯性,必须保守打分,不得凭空高分。",
|
||
"",
|
||
]
|
||
)
|
||
if struct:
|
||
sections.extend(["【结构化记忆证据】", struct[:ev_cap], ""])
|
||
else:
|
||
sections.extend(
|
||
[
|
||
"【结构化记忆证据】",
|
||
"(本 artifact 未绑定或未解析到 chunk/fact/timeline/summary 证据。)",
|
||
"",
|
||
]
|
||
)
|
||
if reference:
|
||
sections.extend(["【参考基线/导出成稿】", reference[:ev_cap], ""])
|
||
sections.extend(["【当前回忆录正文】", memoir_markdown[:body_cap]])
|
||
return "\n".join(sections)
|
||
|
||
|
||
class EvalJudgeService:
|
||
def __init__(
|
||
self,
|
||
judge_llm: Any | None,
|
||
*,
|
||
context_window_tokens: int | None = None,
|
||
http_error_vendor: EvalJudgeProvider = "deepseek",
|
||
) -> None:
|
||
self._llm = judge_llm
|
||
self._http_error_vendor: EvalJudgeProvider = http_error_vendor
|
||
self._ctx_tokens = int(
|
||
context_window_tokens or eval_cfg.judge_context_window_tokens
|
||
)
|
||
|
||
def _conv_transcript_cap(self) -> int:
|
||
return eval_judge_conversation_transcript_max_chars_for_context(
|
||
self._ctx_tokens
|
||
)
|
||
|
||
def _turn_prior_cap(self) -> int:
|
||
return eval_judge_turn_prior_transcript_max_chars_for_context(self._ctx_tokens)
|
||
|
||
async def judge_turn(
|
||
self,
|
||
*,
|
||
prior_transcript: str,
|
||
user_utterance: str,
|
||
assistant_reply: str,
|
||
turn_index_0: int = 0,
|
||
) -> TurnJudgeOutput | None:
|
||
if not self._llm:
|
||
return None
|
||
t = max(0, int(turn_index_0))
|
||
prompt = f"""{TURN_JUDGE_INSTRUCTIONS}
|
||
|
||
【本轮位置】完整对话中当前轮次为 Turn {t + 1}(与下方节选及全量 transcript 的 `[Turn ...]` 编号一致)。evidence_refs.turn_index 请使用该编号。
|
||
|
||
【截至上一轮的对话节选】(含 `[Turn k]` 标签)
|
||
{turn_judge_prior_excerpt(prior_transcript, self._turn_prior_cap())}
|
||
|
||
【本轮用户】
|
||
{user_utterance[:4000]}
|
||
|
||
【本轮 AI】
|
||
{assistant_reply[:4000]}
|
||
"""
|
||
try:
|
||
return await allm_json_call(
|
||
self._llm,
|
||
prompt,
|
||
TurnJudgeOutput,
|
||
max_tokens=_TURN_MAX,
|
||
agent="EvalJudgeService.judge_turn",
|
||
http_error_vendor=self._http_error_vendor,
|
||
)
|
||
except LLMCallError as e:
|
||
logger.warning("turn judge failed: {}", e)
|
||
return None
|
||
|
||
async def judge_conversation_result(
|
||
self, *, full_transcript: str
|
||
) -> JudgeCallResult[ConversationJudgeOutput]:
|
||
if not self._llm:
|
||
return JudgeCallResult(
|
||
output=None,
|
||
error="评审模型未配置(智谱或 DeepSeek 密钥)",
|
||
)
|
||
prompt = f"""{CONV_JUDGE_INSTRUCTIONS}
|
||
|
||
【完整对话】(每轮以 `[Turn k]` 开头)
|
||
{conversation_judge_transcript_excerpt(full_transcript, self._conv_transcript_cap())}
|
||
"""
|
||
try:
|
||
out = await allm_json_call(
|
||
self._llm,
|
||
prompt,
|
||
ConversationJudgeOutput,
|
||
max_tokens=_CONV_JUDGE_JSON_MAX,
|
||
agent="EvalJudgeService.judge_conversation",
|
||
http_error_vendor=self._http_error_vendor,
|
||
)
|
||
return JudgeCallResult(output=out)
|
||
except LLMCallError as e:
|
||
error = _judge_error_message(e)
|
||
logger.warning("conversation judge failed: {}", error)
|
||
return JudgeCallResult(output=None, error=error)
|
||
|
||
async def judge_conversation(
|
||
self, *, full_transcript: str
|
||
) -> ConversationJudgeOutput | None:
|
||
result = await self.judge_conversation_result(full_transcript=full_transcript)
|
||
return result.output
|
||
|
||
async def stream_conversation_compare(
|
||
self,
|
||
*,
|
||
baseline_transcript: str,
|
||
replay_transcript: str,
|
||
baseline_judge: ConversationJudgeOutput | None,
|
||
replay_judge: ConversationJudgeOutput | None,
|
||
) -> AsyncIterator[str]:
|
||
"""流式输出中文对比与建议(非 JSON)。"""
|
||
if not self._llm:
|
||
yield "[错误] 未配置评审模型 API Key(智谱:ZHIPU_API_KEY;DeepSeek:DEEPSEEK_API_KEY)"
|
||
return
|
||
cap_total, per_side = eval_judge_compare_bundle_caps(self._ctx_tokens)
|
||
cap_single = self._conv_transcript_cap()
|
||
b_tr, r_tr, b_cmp_trunc, r_cmp_trunc = trim_compare_transcript_pair(
|
||
baseline_transcript or "",
|
||
replay_transcript or "",
|
||
total_max_chars=cap_total,
|
||
per_side_max_chars=per_side,
|
||
)
|
||
compare_pair_truncated = b_cmp_trunc or r_cmp_trunc
|
||
b_json = (
|
||
baseline_judge.model_dump_json(ensure_ascii=False)
|
||
if baseline_judge
|
||
else "null"
|
||
)
|
||
r_json = (
|
||
replay_judge.model_dump_json(ensure_ascii=False) if replay_judge else "null"
|
||
)
|
||
if baseline_judge and replay_judge:
|
||
trunc_line = (
|
||
_COMPARE_STREAM_PAIR_TRUNCATION_NOTE if compare_pair_truncated else ""
|
||
)
|
||
prompt = f"""你是访谈对话评测专家。下面给出两份对话 transcript 及各自的整体打分(JSON)。请用中文直接写正文(不要用 JSON、不要用 Markdown 代码块):
|
||
|
||
【A:导出基准对话】(历史快照:用户与当时导出的线上 AI,多轮合并为一篇)
|
||
{b_tr}
|
||
|
||
【B:本次回放/新测对话】(用户句与基准对齐,AI 为当前后端重新生成)
|
||
{r_tr}
|
||
{trunc_line}
|
||
【A 的整体评分 JSON】
|
||
{b_json}
|
||
|
||
【B 的整体评分 JSON】
|
||
{r_json}
|
||
|
||
请依次撰写:
|
||
1) 两段对话在整体体验上的主要差异(情绪承接、信息挖掘、人物建模、访谈结构、提问质量、上下文与重复盘问等);
|
||
2) B 相对 A 的优点与不足;
|
||
3) 若 B 在关键维度明显弱于 A,给出可操作的改进方向(系统提示、访谈策略、模型或温度等)。
|
||
|
||
笔调简洁、偏执行清单。"""
|
||
elif replay_judge:
|
||
r_one = conversation_judge_transcript_excerpt(
|
||
replay_transcript or "", cap_single
|
||
)
|
||
prompt = f"""{COMPARE_CONV_STREAM_HINT}
|
||
|
||
【回放/新测 transcript】
|
||
{r_one}
|
||
|
||
【整体评分 JSON】
|
||
{r_json}
|
||
"""
|
||
else:
|
||
yield "[错误] 缺少回放对话评分,无法生成建议"
|
||
return
|
||
|
||
llm = self._llm
|
||
if hasattr(llm, "bind"):
|
||
llm = llm.bind(max_tokens=_COMPARE_STREAM_MAX)
|
||
try:
|
||
from app.core.llm_telemetry import infer_provider_model, observe_astream
|
||
|
||
provider, model = infer_provider_model(llm, http_error_vendor="zhipu")
|
||
async for chunk in observe_astream(
|
||
llm,
|
||
prompt,
|
||
agent="EvalJudge.stream_conversation_compare",
|
||
provider=provider,
|
||
model=model,
|
||
):
|
||
piece = getattr(chunk, "content", None)
|
||
if piece:
|
||
yield piece
|
||
except Exception as e:
|
||
logger.warning("conversation compare stream failed: {}", e)
|
||
yield f"\n\n[流式输出中断:{e}]"
|
||
|
||
async def judge_memoir(
|
||
self,
|
||
*,
|
||
memoir_markdown: str,
|
||
source_transcript: str = "",
|
||
structured_evidence: str = "",
|
||
reference_memoir_markdown: str = "",
|
||
evidence_notes: str = "",
|
||
) -> MemoirJudgeOutput | None:
|
||
result = await self.judge_memoir_result(
|
||
memoir_markdown=memoir_markdown,
|
||
source_transcript=source_transcript,
|
||
structured_evidence=structured_evidence,
|
||
reference_memoir_markdown=reference_memoir_markdown,
|
||
evidence_notes=evidence_notes,
|
||
)
|
||
return result.output
|
||
|
||
async def judge_memoir_result(
|
||
self,
|
||
*,
|
||
memoir_markdown: str,
|
||
source_transcript: str = "",
|
||
structured_evidence: str = "",
|
||
reference_memoir_markdown: str = "",
|
||
evidence_notes: str = "",
|
||
) -> JudgeCallResult[MemoirJudgeOutput]:
|
||
if not self._llm:
|
||
return JudgeCallResult(
|
||
output=None,
|
||
error="评审模型未配置(智谱或 DeepSeek 密钥)",
|
||
)
|
||
prompt = _build_memoir_judge_prompt(
|
||
memoir_markdown=memoir_markdown,
|
||
source_transcript=source_transcript,
|
||
structured_evidence=structured_evidence,
|
||
reference_memoir_markdown=reference_memoir_markdown,
|
||
evidence_notes=evidence_notes,
|
||
)
|
||
try:
|
||
out = await allm_json_call(
|
||
self._llm,
|
||
prompt,
|
||
MemoirJudgeOutput,
|
||
max_tokens=max(
|
||
512, int(eval_cfg.judge_memoir_completion_max_tokens)
|
||
),
|
||
agent="EvalJudgeService.judge_memoir",
|
||
http_error_vendor=self._http_error_vendor,
|
||
)
|
||
return JudgeCallResult(output=out)
|
||
except LLMCallError as e:
|
||
error = _judge_error_message(e)
|
||
# 回忆录评审在 INFO 也要可见(eval-web 排障);非异常路径、不刷堆栈
|
||
logger.info(
|
||
"event=eval_memoir_judge_llm_call_failed agent=EvalJudgeService.judge_memoir msg={}",
|
||
error,
|
||
)
|
||
return JudgeCallResult(output=None, error=error)
|