Files
life-echo/api/app/features/memoir/narrative_safety.py

166 lines
4.7 KiB
Python
Raw Normal View History

"""叙事落库前的确定性安检:防止 prompt 分区标记或摘录块泄漏进正文。"""
from __future__ import annotations
# 与 app.agents.memoir.prompts.format_narrative_user_content 保持一致
ORAL_SECTION_MARKER = "【本段用户口述】"
EVIDENCE_SECTION_MARKER = "【仅供参考的相关记忆摘录"
# 摘录引导语中的固定短语(用于粗检)
EVIDENCE_SECTION_TAIL = "不得把其中具体事实写成本轮亲历经历"
def body_contains_prompt_artifact(markdown_body: str) -> bool:
s = (markdown_body or "").strip()
if not s:
return False
if ORAL_SECTION_MARKER in s:
return True
if EVIDENCE_SECTION_MARKER in s:
return True
if EVIDENCE_SECTION_TAIL in s:
return True
return False
def longest_common_substring_len(a: str, b: str, min_len: int = 14) -> int:
"""O(n*m) DP仅用于短 evidence / body防止过大。"""
a = a or ""
b = b or ""
if len(a) > 8000 or len(b) > 8000:
return 0
if not a or not b:
return 0
best = 0
prev = [0] * (len(b) + 1)
for i in range(1, len(a) + 1):
cur = [0] * (len(b) + 1)
for j in range(1, len(b) + 1):
if a[i - 1] == b[j - 1]:
cur[j] = prev[j - 1] + 1
if cur[j] > best:
best = cur[j]
else:
cur[j] = 0
prev = cur
return best if best >= min_len else 0
def evidence_substring_leak_score(
body: str, evidence_plain: str, min_len: int = 14
) -> int:
"""
若正文与 evidence 存在较长公共子串且该子串不在 oral/existing
则视为摘录渗漏风险返回子串长度否则 0
"""
body = (body or "").strip()
ev = (evidence_plain or "").strip()
if not body or not ev or len(ev) < min_len:
return 0
return longest_common_substring_len(body, ev, min_len=min_len)
def longest_common_substring(a: str, b: str) -> str:
"""返回 a、b 的最长公共子串(长度上限防 DP 爆内存)。"""
a = a or ""
b = b or ""
if len(a) > 8000 or len(b) > 8000:
return ""
best_i, best_len = 0, 0
prev = [0] * (len(b) + 1)
for i in range(1, len(a) + 1):
cur = [0] * (len(b) + 1)
for j in range(1, len(b) + 1):
if a[i - 1] == b[j - 1]:
cur[j] = prev[j - 1] + 1
if cur[j] > best_len:
best_len = cur[j]
best_i = i
else:
cur[j] = 0
prev = cur
if best_len <= 0:
return ""
start = best_i - best_len
return a[start:best_i]
# 具体场合描写:易由「相关摘录」渗入正文但长 LCS 抓不住(词短)。
EVIDENCE_SCENE_ANCHOR_TOKENS: tuple[str, ...] = (
"聚餐",
"酒席",
"酒桌",
"宴会",
"宴席",
"当晚",
"那晚",
"昨夜",
"前一晚",
"前一天晚上",
)
def evidence_scene_anchor_leak(
body: str,
evidence_plain: str,
oral: str,
existing: str,
) -> bool:
"""
True正文出现了与摘录共享的具体场合锚点词且口述与旧正文均未出现
视为摘录场景渗漏短词不走 LCS 阈值
"""
body = (body or "").strip()
ev = (evidence_plain or "").strip()
o = (oral or "").strip()
ex = (existing or "").strip()
if not body or not ev:
return False
base = f"{o}\n{ex}"
for tok in EVIDENCE_SCENE_ANCHOR_TOKENS:
if tok not in body:
continue
if tok in base:
continue
if tok in ev:
return True
return False
def evidence_leakage_heuristic(
body: str,
evidence_plain: str,
oral: str,
existing: str,
min_len: int,
) -> bool:
"""
True正文与 evidence 的最长公共子串足够长且该子串未出现在口述或已有正文中
视为摘录渗漏应回退安全正文
"""
body = (body or "").strip()
ev = (evidence_plain or "").strip()
if not body or not ev:
return False
lcs = longest_common_substring(body, ev)
if len(lcs) < min_len:
return False
o = oral or ""
ex = existing or ""
if lcs in o or lcs in ex:
return False
return True
def strip_evidence_for_overlap_check(evidence_text: str) -> str:
"""去掉 chunk 标记行等噪声,仅保留内容用于 overlap。"""
lines: list[str] = []
for line in (evidence_text or "").splitlines():
t = line.strip()
if t.startswith("[chunk_id="):
continue
if t.startswith("[摘要:"):
continue
lines.append(line)
return "\n".join(lines).strip()