Memory compaction(近重复 chunk 软排除) - 新增 compaction 调度:Redis debounce、scheduler gate、增量游标;任务结束时 finalize,避免 gate 长期占用并处理运行期新 trigger。 - Celery memory_compaction_run:debounce 未到点则 retry;用户级 Redis 锁;成功路径更新游标并 finalize;异常时释放 scheduler gate 并 self.retry,避免静默卡死调度与瞬时失败不重试。 - compaction_service:多层判定 + canonical 打分;无 embedding 时停止前移游标(awaiting_embeddings);curation details 补全 trigger 等上下文。 - ingest_transcript_sync:同步路径尽力写入 embedding,与异步 ingest 行为对齐,避免 compaction 永远扫不到无向量 chunk。 - repo:新增 update_chunk_embedding_sync。 测试 - 扩展 test_memory_compaction:调度合并、finalize、ingest embedding、无向量游标、异常路径 gate+retry 等回归用
57 lines
1.5 KiB
Python
57 lines
1.5 KiB
Python
"""Small Redis lock helpers for background tasks."""
|
||
|
||
import threading
|
||
import uuid
|
||
from dataclasses import dataclass
|
||
|
||
import redis
|
||
|
||
from app.core.config import settings
|
||
|
||
_redis_lock_client: redis.Redis | None = None
|
||
_redis_lock_init_lock = threading.Lock()
|
||
|
||
|
||
def _get_redis_lock_client() -> redis.Redis:
|
||
"""进程内复用单个 Redis 客户端(decode_responses=False,与锁 token 字节一致)。"""
|
||
global _redis_lock_client
|
||
if _redis_lock_client is None:
|
||
with _redis_lock_init_lock:
|
||
if _redis_lock_client is None:
|
||
_redis_lock_client = redis.from_url(
|
||
settings.redis_url, decode_responses=False
|
||
)
|
||
return _redis_lock_client
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class RedisLockHandle:
|
||
key: str
|
||
token: bytes
|
||
|
||
|
||
def acquire_redis_lock(key: str, *, ttl_seconds: int) -> RedisLockHandle | None:
|
||
"""Acquire a single-owner Redis lock or return None when unavailable."""
|
||
client = _get_redis_lock_client()
|
||
token = uuid.uuid4().hex.encode("utf-8")
|
||
if not client.set(key, token, nx=True, ex=ttl_seconds):
|
||
return None
|
||
return RedisLockHandle(key=key, token=token)
|
||
|
||
|
||
def release_redis_lock(handle: RedisLockHandle | None) -> None:
|
||
"""Release the lock only if we still own it."""
|
||
if handle is None:
|
||
return
|
||
_get_redis_lock_client().eval(
|
||
"""
|
||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||
return redis.call("DEL", KEYS[1])
|
||
end
|
||
return 0
|
||
""",
|
||
1,
|
||
handle.key,
|
||
handle.token,
|
||
)
|