diff --git a/api/app/agents/memoir/batch_phase1_prep.py b/api/app/agents/memoir/batch_phase1_prep.py index 307acc6..c26ae21 100644 --- a/api/app/agents/memoir/batch_phase1_prep.py +++ b/api/app/agents/memoir/batch_phase1_prep.py @@ -4,6 +4,7 @@ Phase1 批处理:一次 LLM 调用完成多段的抽取 + 章节分类(与 from __future__ import annotations +import math from dataclasses import dataclass from typing import Any, Dict, List @@ -107,3 +108,76 @@ def run_batch_phase1_prep( logger.warning("batch phase1 id mismatch missing={} extra={}", missing, extra) raise ValueError("batch phase1 response segment ids do not match input") return by_id + + +def _run_batch_phase1_prep_chunk_with_bisect( + segments: List[Segment], + state: MemoirStateSchema, + llm: Any, +) -> Dict[str, BatchPhase1SegmentRow]: + """单块 LLM;失败时(如输出截断)将块二等分重试直至单段。""" + try: + return run_batch_phase1_prep(segments, state, llm) + except ValueError: + if len(segments) <= 1: + raise + mid = len(segments) // 2 + if mid < 1: + raise + left = _run_batch_phase1_prep_chunk_with_bisect( + segments[:mid], state, llm + ) + right = _run_batch_phase1_prep_chunk_with_bisect( + segments[mid:], state, llm + ) + merged = {**left, **right} + expected = {str(s.id) for s in segments} + if merged.keys() != expected: + raise ValueError( + "batch phase1 chunked bisect merge: segment ids do not match input" + ) + return merged + + +def run_batch_phase1_prep_chunked( + segments: List[Segment], + state: MemoirStateSchema, + llm: Any, + *, + chunk_size: int, +) -> Dict[str, BatchPhase1SegmentRow]: + """ + 将 segments 按 chunk_size 切片多次调用 Phase1 批处理 LLM,合并 by_id。 + 单块仍失败时在块内二分回退(最后回退到单段),与 orchestrator 外层逐段回退衔接。 + """ + if not segments: + return {} + if chunk_size < 1: + chunk_size = 1 + n = len(segments) + total_chunks = max(1, math.ceil(n / chunk_size)) + merged: Dict[str, BatchPhase1SegmentRow] = {} + for i in range(0, n, chunk_size): + chunk_idx = i // chunk_size + 1 + sub = segments[i : i + chunk_size] + logger.info( + "event=batch_phase1_chunk chunk_idx={}/{} segment_count={} batch_path=chunked", + chunk_idx, + total_chunks, + len(sub), + ) + part = _run_batch_phase1_prep_chunk_with_bisect(sub, state, llm) + merged.update(part) + expected = {str(s.id) for s in segments} + if merged.keys() != expected: + missing = expected - merged.keys() + extra = merged.keys() - expected + logger.warning( + "batch phase1 chunked id mismatch missing={} extra={}", + missing, + extra, + ) + raise ValueError( + "batch phase1 chunked: merged segment ids do not match input" + ) + return merged diff --git a/api/app/agents/memoir/orchestrator.py b/api/app/agents/memoir/orchestrator.py index 7afa8a7..325b92a 100644 --- a/api/app/agents/memoir/orchestrator.py +++ b/api/app/agents/memoir/orchestrator.py @@ -12,7 +12,7 @@ from typing import Any, Callable, Dict, List, Set, Tuple from app.agents.memoir.batch_phase1_prep import ( STAGE_ALLOWED_SLOTS, - run_batch_phase1_prep, + run_batch_phase1_prep_chunked, ) from app.agents.memoir.classification_agent import ( ClassificationAgent, @@ -177,7 +177,12 @@ class MemoirOrchestrator: segment_skip_story_ids: Set[str] = set() segment_chapter_category: Dict[str, str] = {} - by_id = run_batch_phase1_prep(segments, state, classify_extract_llm) + by_id = run_batch_phase1_prep_chunked( + segments, + state, + classify_extract_llm, + chunk_size=int(settings.memoir_phase1_batch_llm_chunk_size), + ) for segment in segments: text = segment.user_input_text or "" diff --git a/api/app/core/config.py b/api/app/core/config.py index 6050cc2..83a6fb5 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -110,6 +110,8 @@ class Settings(BaseSettings): # Memoir Phase1:多 segment 一批一次 LLM 完成抽取+章节分类(失败回退逐段);单段且关时仍逐段 memoir_phase1_batch_llm_enabled: bool = True memoir_phase1_batch_llm_max_tokens: int = Field(default=4096, ge=512, le=32_768) + #: Phase1 批处理 LLM:单次请求最多包含的 segment 数(多块合并,避免 completion 顶满截断) + memoir_phase1_batch_llm_chunk_size: int = Field(default=24, ge=1, le=500) # Memoir agents:`invoke_json_object` / `llm_json_call` 的 max_tokens(原硬编码迁至配置) memoir_extraction_max_tokens: int = Field(default=1024, ge=64, le=8192) memoir_classification_max_tokens: int = Field(default=256, ge=32, le=4096) @@ -172,8 +174,8 @@ class Settings(BaseSettings): log_level: str = "INFO" # LOG_AGENT_VERBOSE:为 True 时额外输出 Agent 单行 INFO 摘要(耗时、规模),无需全局 DEBUG log_agent_verbose: bool = False - # AGENT_LOG_MAX_CHARS:DEBUG 下记录 prompt/响应预览时的最大字符数;0=不截断(完整输出) - agent_log_max_chars: int = Field(default=0, ge=0, le=50_000_000) + # AGENT_LOG_MAX_CHARS:DEBUG 下记录 prompt/响应预览时的最大字符数;0=不截断(完整输出,慎用) + agent_log_max_chars: int = Field(default=4096, ge=0, le=50_000_000) # AGENT_LOG_OMIT_SYSTEM_MESSAGE_BODY:DEBUG 下访谈/资料聊天日志省略 System 正文(仅 len+sha12) agent_log_omit_system_message_body: bool = True # AGENT_LOG_JSON_PROMPT_PREFIX_CHARS:DEBUG 下 *.prompt 总长超过下项时再跳过前 N 字符后预览(0=不跳过) @@ -182,6 +184,10 @@ class Settings(BaseSettings): agent_log_json_prompt_prefix_only_if_len_gt: int = Field( default=4000, ge=0, le=2_000_000 ) + # AGENT_LOG_PROMPT_MODE:DEBUG 下 *.prompt 记录方式 preview=截断预览 | hash_only=仅 sha12+长度(无正文) + agent_log_prompt_mode: str = Field(default="preview") + # AGENT_LOG_PROMPT_DEDUP:DEBUG 下同一 label 连续相同全文时第二条起跳过(减重复模板噪音) + agent_log_prompt_dedup: bool = False # 第三方 stdlib logging(空=自动:LOG_LEVEL 为 DEBUG/TRACE 时 Celery→INFO、httpx/httpcore→WARNING) celery_log_level: str = "" httpx_log_level: str = "" @@ -216,6 +222,25 @@ class Settings(BaseSettings): return False return True + @field_validator("agent_log_prompt_mode", mode="before") + @classmethod + def _normalize_agent_log_prompt_mode(cls, v: object) -> str: + if v is None: + return "preview" + s = str(v).strip().lower() + if s not in ("preview", "hash_only"): + return "preview" + return s + + @field_validator("agent_log_prompt_dedup", mode="before") + @classmethod + def _coerce_agent_log_prompt_dedup(cls, v: object) -> bool: + if isinstance(v, bool): + return v + if v is None: + return False + return str(v).strip().lower() in ("1", "true", "yes", "on") + # ── Misc ───────────────────────────────────────────────── enable_test_subscription: int = 0 enable_test_plan: str = "" # "1" / "true" / "yes" 为 True diff --git a/api/app/features/evaluation/memoir_readiness_service.py b/api/app/features/evaluation/memoir_readiness_service.py index 87d2927..d1f7c52 100644 --- a/api/app/features/evaluation/memoir_readiness_service.py +++ b/api/app/features/evaluation/memoir_readiness_service.py @@ -2,15 +2,23 @@ from __future__ import annotations +import time +from datetime import datetime, timezone + from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.core.db import utc_now from app.features.conversation.models import Conversation, Segment from app.features.conversation.ws.pipeline import background_runner from app.features.evaluation.errors import ( EvaluationBadRequestError, EvaluationNotFoundError, ) +from app.features.evaluation.phase1_job_timing import ( + load_phase1_job_meta, + record_phase1_job_submitted, +) from app.features.evaluation.schemas import MemoirPhase1ReadyOut, MemoirSubmitOut @@ -51,10 +59,36 @@ class MemoirReadinessService: pending = [s.id for s in rows if s.topic_category is None] ready = len(pending) == 0 + job_submitted_at_utc: datetime | None = None + elapsed_ms_since_submit: int | None = None + durations_ms: dict[str, int] = {} + meta = await load_phase1_job_meta(cid) + if meta: + raw_sub = meta.get("submitted_at_utc") + if isinstance(raw_sub, str) and raw_sub.strip(): + try: + iso = raw_sub.strip().replace("Z", "+00:00") + job_submitted_at_utc = datetime.fromisoformat(iso) + if job_submitted_at_utc.tzinfo is None: + job_submitted_at_utc = job_submitted_at_utc.replace( + tzinfo=timezone.utc + ) + now = utc_now() + elapsed_ms_since_submit = max( + 0, + int((now - job_submitted_at_utc).total_seconds() * 1000), + ) + durations_ms["since_playground_submit"] = elapsed_ms_since_submit + except ValueError: + job_submitted_at_utc = None + elapsed_ms_since_submit = None return MemoirPhase1ReadyOut( ready=ready, checked_segment_ids=ids, pending_segment_ids=pending, + job_submitted_at_utc=job_submitted_at_utc, + elapsed_ms_since_submit=elapsed_ms_since_submit, + durations_ms=durations_ms, ) async def submit_memoir_phase1_for_conversation( @@ -87,13 +121,24 @@ class MemoirReadinessService: user_id=uid, segment_ids=[], celery_task_id=None, + submitted_at_utc=None, + elapsed_ms=None, ) + t0 = time.perf_counter() task_id = await background_runner.flush_pending( uid, extra_segment_ids=segment_ids ) + elapsed_ms = max(0, int((time.perf_counter() - t0) * 1000)) + submitted_at = await record_phase1_job_submitted( + cid, + celery_task_id=task_id, + segment_count=len(segment_ids), + ) return MemoirSubmitOut( conversation_id=cid, user_id=uid, segment_ids=segment_ids, celery_task_id=task_id, + submitted_at_utc=submitted_at, + elapsed_ms=elapsed_ms, ) diff --git a/api/app/features/evaluation/phase1_job_timing.py b/api/app/features/evaluation/phase1_job_timing.py new file mode 100644 index 0000000..2ab6ebc --- /dev/null +++ b/api/app/features/evaluation/phase1_job_timing.py @@ -0,0 +1,56 @@ +"""Playground Phase1 提交时间:供 memoir-phase1-ready 轮询展示服务端等待耗时。""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any + +from app.core.logging import get_logger +from app.core.redis import redis_service + +logger = get_logger(__name__) + +TTL_SECONDS = 172800 # 48h,覆盖长队列与多次评测;下次 submit 会覆盖 + +_EVAL_PREFIX = "internal_eval:playground_phase1_job:" + + +def _redis_key(conversation_id: str) -> str: + return f"{_EVAL_PREFIX}{conversation_id}" + + +async def record_phase1_job_submitted( + conversation_id: str, + *, + celery_task_id: str | None, + segment_count: int, +) -> datetime: + now = datetime.now(timezone.utc) + payload: dict[str, Any] = { + "submitted_at_utc": now.isoformat().replace("+00:00", "Z"), + "celery_task_id": celery_task_id, + "segment_count": segment_count, + } + try: + client = await redis_service.get_client() + await client.setex( + _redis_key(conversation_id), + TTL_SECONDS, + json.dumps(payload, ensure_ascii=False), + ) + except Exception as e: + logger.warning("eval phase1 job timing redis write failed: {}", e) + return now + + +async def load_phase1_job_meta(conversation_id: str) -> dict[str, Any] | None: + try: + client = await redis_service.get_client() + raw = await client.get(_redis_key(conversation_id)) + if not raw: + return None + return json.loads(raw) + except Exception as e: + logger.warning("eval phase1 job timing redis read failed: {}", e) + return None diff --git a/api/app/features/evaluation/replay_service.py b/api/app/features/evaluation/replay_service.py index 47f2dcd..683748e 100644 --- a/api/app/features/evaluation/replay_service.py +++ b/api/app/features/evaluation/replay_service.py @@ -3,7 +3,10 @@ from __future__ import annotations import secrets +import time import uuid +from dataclasses import dataclass +from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession @@ -28,6 +31,13 @@ from app.features.user.models import User logger = get_logger(__name__) +@dataclass(frozen=True) +class ReplayServerTiming: + started_at_utc: datetime + finished_at_utc: datetime + elapsed_ms: int + + class ReplayConversationService: def __init__(self, db: AsyncSession, quota_service: QuotaService) -> None: self._db = db @@ -99,7 +109,7 @@ class ReplayConversationService: flush_memoir_after: bool, skip_memoir: bool, skip_tts: bool, - ) -> tuple[int, list[str], list[str]]: + ) -> tuple[int, list[str], list[str], ReplayServerTiming]: try: turns, _ = read_user_export_fixture(fixture_filename) except ValueError as e: @@ -109,14 +119,14 @@ class ReplayConversationService: utterances = [u.strip() for u, _ in turns if (u or "").strip()] if not utterances: raise EvaluationBadRequestError("fixture produced no user utterances") - n, segment_ids = await self.replay_utterances( + n, segment_ids, timing = await self.replay_utterances( conversation_id=conversation_id, utterances=utterances, flush_memoir_after=flush_memoir_after, skip_memoir=skip_memoir, skip_tts=skip_tts, ) - return n, utterances, segment_ids + return n, utterances, segment_ids, timing async def replay_utterances( self, @@ -126,7 +136,9 @@ class ReplayConversationService: flush_memoir_after: bool, skip_memoir: bool, skip_tts: bool, - ) -> tuple[int, list[str]]: + ) -> tuple[int, list[str], ReplayServerTiming]: + t_wall0 = time.perf_counter() + started_at_utc = utc_now() cid = (conversation_id or "").strip() if not cid: raise EvaluationBadRequestError("conversation_id is required") @@ -180,4 +192,11 @@ class ReplayConversationService: skip_memoir, skip_tts, ) - return count, segment_ids + finished_at_utc = utc_now() + elapsed_ms = max(0, int((time.perf_counter() - t_wall0) * 1000)) + timing = ReplayServerTiming( + started_at_utc=started_at_utc, + finished_at_utc=finished_at_utc, + elapsed_ms=elapsed_ms, + ) + return count, segment_ids, timing diff --git a/api/app/features/evaluation/router.py b/api/app/features/evaluation/router.py index 0f37c42..de59201 100644 --- a/api/app/features/evaluation/router.py +++ b/api/app/features/evaluation/router.py @@ -265,9 +265,10 @@ async def replay_conversation( ) try: segment_ids: list[str] = [] + timing = None if body.fixture_filename: fn = body.fixture_filename.strip() - n, echo, segment_ids = await replay.replay_fixture( + n, echo, segment_ids, timing = await replay.replay_fixture( conversation_id=body.conversation_id, fixture_filename=fn, flush_memoir_after=body.flush_memoir_after, @@ -278,7 +279,7 @@ async def replay_conversation( utt = [str(u) for u in body.user_utterances if str(u).strip()] if not utt: raise EvaluationBadRequestError("user_utterances is empty") - n, segment_ids = await replay.replay_utterances( + n, segment_ids, timing = await replay.replay_utterances( conversation_id=body.conversation_id, utterances=utt, flush_memoir_after=body.flush_memoir_after, @@ -299,6 +300,9 @@ async def replay_conversation( turns_replayed=n, utterances_echo=echo, segment_ids=segment_ids, + started_at_utc=timing.started_at_utc if timing else None, + finished_at_utc=timing.finished_at_utc if timing else None, + elapsed_ms=timing.elapsed_ms if timing else None, ) diff --git a/api/app/features/evaluation/schemas.py b/api/app/features/evaluation/schemas.py index d8e5795..7a82a86 100644 --- a/api/app/features/evaluation/schemas.py +++ b/api/app/features/evaluation/schemas.py @@ -103,12 +103,25 @@ class ReplayConversationOut(BaseModel): default_factory=list, description="本批请求创建并已走 orchestrator 的用户 segment id(顺序与落库一致)", ) + #: 服务端计量:本 HTTP 请求内回放逻辑耗时(与浏览器轮询间隔无关) + started_at_utc: datetime | None = None + finished_at_utc: datetime | None = None + elapsed_ms: int | None = Field( + default=None, + description="服务端 wall 耗时(本请求内 replay_utterances / replay_fixture)", + ) class MemoirPhase1ReadyOut(BaseModel): ready: bool checked_segment_ids: list[str] = Field(default_factory=list) pending_segment_ids: list[str] = Field(default_factory=list) + #: 最近一次 Playground memoir-submit 写入 Redis 的提交时间(无记录时为 None) + job_submitted_at_utc: datetime | None = None + #: 自 job_submitted_at_utc 至本响应生成时服务端经过的毫秒数 + elapsed_ms_since_submit: int | None = Field(default=None, ge=0) + #: 可选分步耗时(毫秒),键由服务端定义 + durations_ms: dict[str, int] = Field(default_factory=dict) class MemoirSubmitOut(BaseModel): @@ -116,6 +129,9 @@ class MemoirSubmitOut(BaseModel): user_id: str segment_ids: list[str] = Field(default_factory=list) celery_task_id: str | None = None + submitted_at_utc: datetime | None = None + #: 提交接口瞬间耗时,通常为 0;与 Phase1 Celery 执行时间无关 + elapsed_ms: int | None = Field(default=None, ge=0) class ManualJudgeConversationBody(BaseModel): diff --git a/api/tests/evaluation/test_memoir_readiness_router.py b/api/tests/evaluation/test_memoir_readiness_router.py index 0497f8c..256affd 100644 --- a/api/tests/evaluation/test_memoir_readiness_router.py +++ b/api/tests/evaluation/test_memoir_readiness_router.py @@ -1,5 +1,7 @@ """memoir-phase1-ready internal 路由(依赖注入替身)。""" +from datetime import datetime, timezone + import pytest from httpx import ASGITransport, AsyncClient @@ -54,6 +56,60 @@ async def test_memoir_phase1_ready_returns_bundle(monkeypatch: pytest.MonkeyPatc assert body["pending_segment_ids"] == [] +@pytest.mark.asyncio +async def test_memoir_phase1_ready_includes_server_elapsed_fields( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from fastapi import FastAPI + + monkeypatch.setattr( + "app.core.config.settings.internal_eval_api_key", + "secret", + raising=False, + ) + from app.features.evaluation.deps import get_memoir_readiness_service + from app.features.evaluation.router import router + + class _Fake: + async def memoir_phase1_ready_for_segments( + self, *, conversation_id: str, segment_ids: list[str] + ) -> MemoirPhase1ReadyOut: + return MemoirPhase1ReadyOut( + ready=False, + checked_segment_ids=list(segment_ids), + pending_segment_ids=["pending-1"], + job_submitted_at_utc=datetime( + 2026, 4, 9, 8, 0, 0, tzinfo=timezone.utc + ), + elapsed_ms_since_submit=12_000, + durations_ms={"since_playground_submit": 12_000}, + ) + + app = FastAPI() + app.include_router(router, prefix="/internal/api/evaluation") + + async def _override_auth(): + from app.features.evaluation.internal_auth import InternalEvalPrincipal + + return InternalEvalPrincipal() + + app.dependency_overrides[get_internal_eval_principal] = _override_auth + app.dependency_overrides[get_memoir_readiness_service] = lambda: _Fake() + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://t") as client: + r = await client.get( + "/internal/api/evaluation/sessions/cid-a/memoir-phase1-ready", + headers={"X-Internal-Eval-Key": "secret"}, + params=[("segment_ids", "s1")], + ) + assert r.status_code == 200 + body = r.json() + assert body["elapsed_ms_since_submit"] == 12_000 + assert body["durations_ms"]["since_playground_submit"] == 12_000 + assert body["job_submitted_at_utc"] is not None + + @pytest.mark.asyncio async def test_memoir_phase1_ready_404_propagates(monkeypatch: pytest.MonkeyPatch) -> None: from fastapi import FastAPI diff --git a/api/tests/evaluation/test_replay_timing_response.py b/api/tests/evaluation/test_replay_timing_response.py new file mode 100644 index 0000000..f3a193c --- /dev/null +++ b/api/tests/evaluation/test_replay_timing_response.py @@ -0,0 +1,71 @@ +"""replay/conversation 响应携带服务端 elapsed 字段。""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest +from httpx import ASGITransport, AsyncClient + +from app.features.evaluation.internal_auth import get_internal_eval_principal +from app.features.evaluation.replay_service import ReplayServerTiming + + +@pytest.mark.asyncio +async def test_replay_conversation_includes_server_elapsed_ms( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from fastapi import FastAPI + + monkeypatch.setattr( + "app.core.config.settings.internal_eval_api_key", + "secret", + raising=False, + ) + from app.features.evaluation.deps import get_replay_conversation_service + from app.features.evaluation.router import router + + t0 = datetime(2026, 4, 9, 10, 0, 0, tzinfo=timezone.utc) + t1 = datetime(2026, 4, 9, 10, 0, 1, tzinfo=timezone.utc) + + class _FakeReplay: + async def replay_utterances(self, **kwargs): + return ( + 1, + ["seg-a"], + ReplayServerTiming( + started_at_utc=t0, + finished_at_utc=t1, + elapsed_ms=150, + ), + ) + + app = FastAPI() + app.include_router(router, prefix="/internal/api/evaluation") + + async def _override_auth(): + from app.features.evaluation.internal_auth import InternalEvalPrincipal + + return InternalEvalPrincipal() + + app.dependency_overrides[get_internal_eval_principal] = _override_auth + app.dependency_overrides[get_replay_conversation_service] = lambda: _FakeReplay() + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://t") as client: + r = await client.post( + "/internal/api/evaluation/replay/conversation", + headers={"X-Internal-Eval-Key": "secret"}, + json={ + "conversation_id": "00000000-0000-0000-0000-000000000099", + "user_utterances": ["hi"], + "flush_memoir_after": False, + "skip_memoir": True, + "skip_tts": True, + }, + ) + assert r.status_code == 200 + body = r.json() + assert body["elapsed_ms"] == 150 + assert body["started_at_utc"] is not None + assert body["finished_at_utc"] is not None diff --git a/api/tests/test_batch_phase1_chunked.py b/api/tests/test_batch_phase1_chunked.py new file mode 100644 index 0000000..c927bfd --- /dev/null +++ b/api/tests/test_batch_phase1_chunked.py @@ -0,0 +1,99 @@ +"""Phase1 批处理 LLM 分块:大量 segment 时拆多次请求并合并 by_id。""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from app.agents.memoir.batch_phase1_prep import ( + BatchPhase1SegmentRow, + run_batch_phase1_prep_chunked, +) +from app.agents.state_schema import MemoirStateSchema + + +def _state() -> MemoirStateSchema: + return MemoirStateSchema( + stage_order=["childhood"], + current_stage="childhood", + covered_stages=[], + slots={}, + ) + + +def test_run_batch_phase1_prep_chunked_splits_95_into_four_calls( + monkeypatch: pytest.MonkeyPatch, +) -> None: + chunk_lengths: list[int] = [] + + def fake_prep( + segments: list, + state: MemoirStateSchema, + llm: object, + ) -> dict[str, BatchPhase1SegmentRow]: + chunk_lengths.append(len(segments)) + return { + str(s.id): BatchPhase1SegmentRow( + detected_stage="childhood", + slots={}, + chapter_category_raw="summary", + ) + for s in segments + } + + monkeypatch.setattr( + "app.agents.memoir.batch_phase1_prep.run_batch_phase1_prep", + fake_prep, + ) + segments = [ + SimpleNamespace(id=f"s{i}", user_input_text="hello") for i in range(95) + ] + by_id = run_batch_phase1_prep_chunked( + segments, + _state(), + MagicMock(), + chunk_size=24, + ) + assert len(by_id) == 95 + assert chunk_lengths == [24, 24, 24, 23] + + +def test_chunked_bisect_on_value_error(monkeypatch: pytest.MonkeyPatch) -> None: + """块内失败时二分重试,仍能拼回全量 id。""" + chunk_lengths: list[int] = [] + + def fake_prep( + segments: list, + state: MemoirStateSchema, + llm: object, + ) -> dict[str, BatchPhase1SegmentRow]: + chunk_lengths.append(len(segments)) + if len(segments) == 4: + raise ValueError("simulate length limit") + return { + str(s.id): BatchPhase1SegmentRow( + detected_stage="childhood", + slots={}, + chapter_category_raw="summary", + ) + for s in segments + } + + monkeypatch.setattr( + "app.agents.memoir.batch_phase1_prep.run_batch_phase1_prep", + fake_prep, + ) + segments = [ + SimpleNamespace(id=f"b{i}", user_input_text="x") for i in range(4) + ] + by_id = run_batch_phase1_prep_chunked( + segments, + _state(), + MagicMock(), + chunk_size=100, + ) + assert len(by_id) == 4 + assert chunk_lengths[0] == 4 + assert 2 in chunk_lengths diff --git a/api/tests/test_memoir_pipeline_optimization.py b/api/tests/test_memoir_pipeline_optimization.py index 9c8896b..a24f0c0 100644 --- a/api/tests/test_memoir_pipeline_optimization.py +++ b/api/tests/test_memoir_pipeline_optimization.py @@ -30,6 +30,7 @@ def test_phase1_batch_enabled_by_default() -> None: s = Settings() assert s.memoir_phase1_batch_llm_enabled is True + assert s.memoir_phase1_batch_llm_chunk_size >= 1 def test_quality_pass_enabled_by_default() -> None: diff --git a/api/tests/test_memoir_skip_story.py b/api/tests/test_memoir_skip_story.py index 0f81930..0ba2d1b 100644 --- a/api/tests/test_memoir_skip_story.py +++ b/api/tests/test_memoir_skip_story.py @@ -131,6 +131,8 @@ def test_prepare_batches_batch_llm_path_matches_per_segment_skip_logic( segments: list, state: MemoirStateSchema, llm: object, + *, + chunk_size: int = 24, ) -> dict: return { "mix-1": BatchPhase1SegmentRow( @@ -146,7 +148,7 @@ def test_prepare_batches_batch_llm_path_matches_per_segment_skip_logic( } monkeypatch.setattr( - "app.agents.memoir.orchestrator.run_batch_phase1_prep", + "app.agents.memoir.orchestrator.run_batch_phase1_prep_chunked", fake_batch, ) orch = MemoirOrchestrator() diff --git a/app-eval-web/src/pages/PlaygroundPage.tsx b/app-eval-web/src/pages/PlaygroundPage.tsx index ef01692..308cf22 100644 --- a/app-eval-web/src/pages/PlaygroundPage.tsx +++ b/app-eval-web/src/pages/PlaygroundPage.tsx @@ -65,11 +65,18 @@ function delay(ms: number, signal: AbortSignal): Promise { }); } +type MemoirPhase1PollBody = { + ready: boolean; + elapsed_ms_since_submit?: number | null; + durations_ms?: Record; +}; + /** 轮询直到 Phase1 写入 topic_category 或超时 / 中止 */ async function waitUntilMemoirPhase1Ready( conversationId: string, segmentIds: string[], signal: AbortSignal, + onPoll?: (body: MemoirPhase1PollBody) => void, ): Promise<{ ok: true } | { ok: false; error: string }> { if (!segmentIds.length) return { ok: true }; const deadline = Date.now() + MEMOIR_PHASE1_WAIT_MAX_MS; @@ -77,13 +84,14 @@ async function waitUntilMemoirPhase1Ready( while (Date.now() < deadline) { if (signal.aborted) return { ok: false, error: "aborted" }; - const r = await api<{ ready: boolean }>(path, { signal }); + const r = await api(path, { signal }); if (r.error === "aborted") return { ok: false, error: "aborted" }; if (!r.ok) return { ok: false, error: r.error ?? "memoir-phase1-ready 请求失败", }; + if (r.data && onPoll) onPoll(r.data); if (r.data?.ready) return { ok: true }; try { await delay(MEMOIR_PHASE1_POLL_MS, signal); @@ -168,6 +176,18 @@ export default function PlaygroundPage() { const [judgeProvider, setJudgeProvider] = useState(() => loadEvalJudgeProvider(), ); + /** 最近一次 replay/conversation 服务端耗时(毫秒) */ + const [lastReplayElapsedMs, setLastReplayElapsedMs] = useState( + null, + ); + /** 最近一次 memoir-submit HTTP 服务端耗时 */ + const [lastMemoirSubmitElapsedMs, setLastMemoirSubmitElapsedMs] = useState< + number | null + >(null); + /** 轮询 Phase1 时服务端报告的自 submit 起经过时间 */ + const [phase1WaitServerMs, setPhase1WaitServerMs] = useState( + null, + ); const replayUtterances = useMemo( () => utterancesForReplayFromTurns(fixtureTurns), @@ -422,6 +442,7 @@ export default function PlaygroundPage() { setReplayBusy(true); setReplayProgress(null); setReplayErrors([]); + setLastReplayElapsedMs(null); try { type SandboxUser = { user_id: string; @@ -516,6 +537,9 @@ export default function PlaygroundPage() { turns_replayed: number; utterances_echo: string[]; segment_ids: string[]; + elapsed_ms?: number | null; + started_at_utc?: string | null; + finished_at_utc?: string | null; }>("/internal/api/evaluation/replay/conversation", { method: "POST", signal, @@ -537,6 +561,9 @@ export default function PlaygroundPage() { setReplayErrors([msg]); return; } + if (typeof r.data?.elapsed_ms === "number") { + setLastReplayElapsedMs(r.data.elapsed_ms); + } await pullDialogue(cid, signal); const draftBase = { @@ -605,12 +632,16 @@ export default function PlaygroundPage() { memoirSubmitAbortRef.current = ac; const { signal } = ac; setMemoirSubmitBusy(true); + setLastMemoirSubmitElapsedMs(null); + setPhase1WaitServerMs(null); try { type SubmitOut = { conversation_id: string; user_id: string; segment_ids: string[]; celery_task_id: string | null; + submitted_at_utc?: string | null; + elapsed_ms?: number | null; }; const r = await api( `/internal/api/evaluation/sessions/${encodeURIComponent(cid)}/memoir-submit`, @@ -624,6 +655,9 @@ export default function PlaygroundPage() { pushNotice(r.error ?? "memoir-submit 失败", "error"); return; } + if (typeof r.data.elapsed_ms === "number") { + setLastMemoirSubmitElapsedMs(r.data.elapsed_ms); + } const ids = r.data.segment_ids ?? []; if (ids.length === 0) { pushNotice("当前会话没有待提交的 segment(可能已全部处理)", "info"); @@ -635,7 +669,16 @@ export default function PlaygroundPage() { "success", ); if (waitAfterMemoirSubmit) { - const mem = await waitUntilMemoirPhase1Ready(cid, ids, signal); + const mem = await waitUntilMemoirPhase1Ready( + cid, + ids, + signal, + (body) => { + if (typeof body.elapsed_ms_since_submit === "number") { + setPhase1WaitServerMs(body.elapsed_ms_since_submit); + } + }, + ); if (!mem.ok) { if (mem.error === "aborted") { pushNotice("已中止", "info"); @@ -1120,6 +1163,9 @@ export default function PlaygroundPage() { {dialogueUpdatedAt ? ` · ${dialogueUpdatedAt.toLocaleTimeString()}` : ""} + {lastReplayElapsedMs != null + ? ` · 上轮 replay 服务端 ${lastReplayElapsedMs}ms` + : ""} @@ -1156,6 +1202,17 @@ export default function PlaygroundPage() { 提交后轮询直到 Phase1 就绪(最长 {MEMOIR_PHASE1_WAIT_MAX_MS / 60_000} 分钟) + {(lastMemoirSubmitElapsedMs != null || phase1WaitServerMs != null) && ( +

+ 服务端计量: + {lastMemoirSubmitElapsedMs != null + ? ` memoir-submit HTTP ${lastMemoirSubmitElapsedMs}ms` + : ""} + {phase1WaitServerMs != null + ? ` · 自提交起 Celery/Phase1 ${phase1WaitServerMs}ms(轮询读数)` + : ""} +

+ )}