配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
71 lines
2.1 KiB
Python
71 lines
2.1 KiB
Python
import json
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from app.core.redis import RedisService
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_message_uses_rpush_pipeline() -> None:
|
|
service = RedisService()
|
|
client = AsyncMock()
|
|
pipe = MagicMock()
|
|
pipe.execute = AsyncMock(return_value=[])
|
|
client.pipeline = MagicMock(return_value=pipe)
|
|
service._client = client
|
|
|
|
ok = await service.add_message("conv-1", "user", "hello")
|
|
|
|
assert ok is True
|
|
pipe.rpush.assert_called_once()
|
|
pipe.expire.assert_called_once_with(
|
|
"conversation:history:conv-1", service.session_ttl
|
|
)
|
|
pipe.execute.assert_awaited_once()
|
|
client.get.assert_not_called()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_conversation_history_migrates_legacy_string() -> None:
|
|
service = RedisService()
|
|
client = AsyncMock()
|
|
legacy = [{"role": "user", "content": "hi", "messageType": "text"}]
|
|
client.exists.return_value = 1
|
|
client.type.return_value = "string"
|
|
client.get.return_value = json.dumps(legacy, ensure_ascii=False)
|
|
pipe = MagicMock()
|
|
pipe.execute = AsyncMock(return_value=[])
|
|
client.pipeline = MagicMock(return_value=pipe)
|
|
service._client = client
|
|
|
|
history = await service.get_conversation_history("conv-legacy")
|
|
|
|
assert history == legacy
|
|
pipe.execute.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_tts_updates_list_entry() -> None:
|
|
service = RedisService()
|
|
client = AsyncMock()
|
|
item = {
|
|
"role": "ai",
|
|
"content": "reply",
|
|
"messageType": "text",
|
|
"timestamp": "2026-01-01T00:00:00+00:00",
|
|
}
|
|
client.exists.return_value = 1
|
|
client.type.return_value = "list"
|
|
client.lrange.return_value = [json.dumps(item, ensure_ascii=False)]
|
|
service._client = client
|
|
|
|
ok = await service.append_tts_audio_url_to_last_ai_message(
|
|
"conv-1", "https://cdn.example/audio.mp3"
|
|
)
|
|
|
|
assert ok is True
|
|
client.lset.assert_awaited_once()
|
|
updated = json.loads(client.lset.await_args.args[2])
|
|
assert updated["ttsAudioUrls"] == ["https://cdn.example/audio.mp3"]
|