Files
life-echo/api/app/agents/chat/reply_limits.py
Kevin ccdc4e4277 feat(i18n): persist language preference and thread through chat, memoir, TTS
- Add users.language_preference (Alembic 0018, default zh); capture at signup/SMS
  only; expose on auth and profile APIs
- Lite English prompts for chat and memoir; localized stage labels and agent
  names (Life Echo / 岁月知己)
- Tencent TTS: language-aware synthesis, ModelType=1 for 501004, English chunking
- WebSocket pipeline: emit all AGENT_RESPONSE segments when TTS cancels; INFO logs
  for tts_this_turn and TTS decisions; on-demand TTS logging
- Expo: device language on auth, i18n tiers/agent name, [SPLIT] streaming UX fixes
- Tests for migration, prompts, pipeline, router tts_this_turn, reply segments

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 16:16:49 +08:00

175 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""访谈/资料追问:回复条数与单条字数硬限制(不靠长 prompt"""
from __future__ import annotations
import re
# 零宽字符LLM 偶尔会在 [SPLIT] 周围注入 ZWSP/ZWNJ/ZWJ/BOM需在拆段前去掉
_ZERO_WIDTH_RE = re.compile(r"[\u200B-\u200D\uFEFF]")
# 与客户端 `message-split.ts` 对齐:宽松正则匹配 [SPLIT] / [ SPLIT ] / [split] 等
# 全角中括号 【】 / 先在 _normalize_split_markers 里折成 ASCII 再走该正则
SPLIT_MARKER_RE = re.compile(r"\[\s*SPLIT\s*\]", re.IGNORECASE)
def _normalize_split_markers(text: str) -> str:
"""归一化 [SPLIT] 周围常见变体,确保后端拆段与前端 `MESSAGE_SPLIT_REGEX` 等价。
覆盖:
- 零宽空格 / ZWNJ / ZWJ / BOM
- 全角方括号 【】 / 折叠为 ASCII []
后续仍用 ``SPLIT_MARKER_RE`` 一次性匹配(含大小写、内部空白)。
"""
if not text:
return text
s = _ZERO_WIDTH_RE.sub("", text)
s = s.replace("\uff3b", "[").replace("\uff3d", "]")
s = s.replace("\u3010", "[").replace("\u3011", "]")
return s
def strip_markdown_for_chat(text: str) -> str:
"""
将模型偶然输出的常见 Markdown 剥成纯文本,供 App 聊天气泡展示。
保留换行与字面量 [SPLIT](实际拆段由 `segments_from_llm_response` 用宽松正则完成,
支持 `[ SPLIT ]`、`[split]`、`【SPLIT】` 等变体)。不做完整 MD 解析,以简单可预测为主。
"""
if not text:
return text
s = text
# 围栏代码块(含首行语言标记):整段替换为块内正文,去掉栅栏
s = re.sub(
r"```(?:[^\n`]*)\n([\s\S]*?)```",
r"\1",
s,
flags=re.MULTILINE,
)
s = s.replace("```", "")
# 图片 ![alt](url) → alt链接 [label](url) → label
s = re.sub(r"!\[([^\]]*)\]\([^)]*\)", r"\1", s)
s = re.sub(r"\[([^\]]*)\]\([^)]*\)", r"\1", s)
# ATX 标题
s = re.sub(r"(?m)^#{1,6}\s+", "", s)
# 无序列表行首(仅限行首减号/星号/+ 后接空格,避免误判「—」)
s = re.sub(r"(?m)^\s*[-*+]\s+", "", s)
# 有序列表「数字. 」仅行首
s = re.sub(r"(?m)^\s*\d+\.\s+", "", s)
# 粗体/删除线常见标记
s = s.replace("**", "").replace("__", "")
s = s.replace("~~", "")
# 行内反引号
s = s.replace("`", "")
# 孤立 emphasis*词* 或 _词_不含跨行
s = re.sub(r"(?<![*])\*([^*\n]+)\*(?![*])", r"\1", s)
s = re.sub(r"(?<![_])_([^_\n]+)_(?![_])", r"\1", s)
# 分割线
s = re.sub(r"(?m)^\s*---+\s*$", "", s)
return s
def strip_parenthetical_asides_for_chat(text: str) -> str:
"""
去掉模型输出的表演性括注(全角「(…)」与半角「(...)」),迭代至不再有可删对。
口述回忆录场景下助理回复几乎不需要夹注若写成「约1993年」等说明也会被删属产品上有意识取舍
与禁止「(轻轻笑)」类舞台说明一致。须在 strip_markdown_for_chat 之后调用(链接里的 () 已先处理)。
"""
if not text:
return text
s = text
prev: str | None = None
while prev != s:
prev = s
s = re.sub(r"[^]*", "", s)
s = re.sub(r"\([^)]*\)", "", s)
s = re.sub(r"[ \t]{2,}", " ", s)
return s.strip()
def strip_leading_en_period_ack_for_chat(text: str) -> str:
"""
去掉段首生硬的「嗯。」(可重复),即使后面还有正文;只剥字符串开头,不误伤句中「嗯。」。
支持全角/半角句号。
"""
s = (text or "").strip()
if not s:
return s
# 允许多次「嗯。」/「嗯嗯。」叠在段首;句号仅匹配全角 。、. 与 ASCII `.`
s2 = re.sub(r"^(?:嗯+(?:。||\.)+\s*)+", "", s)
return s2.strip()
def segments_from_llm_response(
response_text: str,
*,
max_segments: int = 3,
min_paragraph_chars: int = 12,
) -> list[str]:
"""
优先按 [SPLIT] 标记拆段(容错:大小写、内部空白、全角中括号、零宽字符均视作分隔符);
若模型只输出一段、但用空行写了多段,再按段落拆。
解决「两段话 + 换行」却未写 [SPLIT] 时仍要拆气泡 / 多段 TTS 的情况,
并避免后端 literal split 与前端容错正则不一致时把字面 `[ SPLIT ]` 留在文本里。
"""
text = strip_markdown_for_chat((response_text or "").strip())
text = strip_parenthetical_asides_for_chat(text)
if not text:
return []
normalized = _normalize_split_markers(text)
primary = [
strip_leading_en_period_ack_for_chat(p)
for p in SPLIT_MARKER_RE.split(normalized)
if strip_leading_en_period_ack_for_chat(p).strip()
]
if len(primary) > 1:
return primary[:max_segments]
blob = primary[0] if primary else strip_leading_en_period_ack_for_chat(normalized)
blob = strip_leading_en_period_ack_for_chat(blob)
if "\n" not in blob:
return [blob]
paras = [
strip_leading_en_period_ack_for_chat(p)
for p in re.split(r"\n\s*\n+", blob)
if strip_leading_en_period_ack_for_chat(p).strip()
]
if len(paras) < 2:
return [blob]
paras = [p for p in paras if len(p) >= min_paragraph_chars]
if len(paras) < 2:
return [blob]
return paras[:max_segments]
def nonempty_segments_or_fallback(
segments: list[str],
*,
fallback: str,
) -> list[str]:
"""去掉空段;若全部为空白/空串则返回单条 fallback避免 WS 下发空 text。"""
cleaned = [s for s in segments if (s or "").strip()]
if cleaned:
return cleaned
fb = (fallback or "").strip()
return [fb] if fb else [""]
def truncate_chat_segments(
segments: list[str],
*,
max_segments: int,
max_chars_per_segment: int,
) -> list[str]:
"""保留前 max_segments 条,每条截断至 max_chars_per_segment按字符数中文友好"""
if not segments:
return []
out: list[str] = []
for raw in segments[:max_segments]:
s = (raw or "").strip()
if not s:
continue
if len(s) > max_chars_per_segment:
# 保留 1 个字符给省略号,使总长度不超过上限
s = s[: max_chars_per_segment - 1].rstrip() + ""
out.append(s)
return out