feat(voice-client): PySide6 desktop client and Windows build scripts
Add voice_confirmation_client (poll, TTS MP3 playback, mic WAV resolve), PyInstaller spec, start/build helpers, and API unit tests. Pending manual testing: end-to-end on OR workstations and packaged exe. Made-with: Cursor
This commit is contained in:
3
voice_confirmation_client/core/__init__.py
Normal file
3
voice_confirmation_client/core/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from voice_confirmation_client.core.monitor_worker import MonitorWorker
|
||||
|
||||
__all__ = ["MonitorWorker"]
|
||||
87
voice_confirmation_client/core/api.py
Normal file
87
voice_confirmation_client/core/api.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""HTTP client for pending-confirmation and resolve endpoints."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from urllib.parse import quote, urljoin
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
@dataclass
|
||||
class PendingConfirmationPayload:
|
||||
surgery_id: str
|
||||
confirmation_id: str
|
||||
prompt_text: str
|
||||
prompt_audio_mp3_base64: str
|
||||
options: list[dict[str, Any]]
|
||||
model_top1_label: str
|
||||
model_top1_confidence: float
|
||||
created_at: str
|
||||
raw: dict[str, Any]
|
||||
|
||||
|
||||
class ConfirmationApiClient:
|
||||
def __init__(self, base_url: str, timeout: float = 60.0) -> None:
|
||||
self._base = base_url.rstrip("/") + "/"
|
||||
self._timeout = timeout
|
||||
self._client = httpx.Client(timeout=timeout)
|
||||
|
||||
@property
|
||||
def base_url_normalized(self) -> str:
|
||||
return self._base
|
||||
|
||||
def close(self) -> None:
|
||||
self._client.close()
|
||||
|
||||
def _url(self, path: str) -> str:
|
||||
return urljoin(self._base, path.lstrip("/"))
|
||||
|
||||
def get_pending(self, surgery_id: str) -> tuple[int, dict[str, Any] | str]:
|
||||
url = self._url(f"client/surgeries/{surgery_id}/pending-confirmation")
|
||||
r = self._client.get(url)
|
||||
text = r.text
|
||||
if not text:
|
||||
return r.status_code, {}
|
||||
try:
|
||||
body: dict[str, Any] | str = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
body = text
|
||||
return r.status_code, body
|
||||
|
||||
def parse_pending(self, body: dict[str, Any]) -> PendingConfirmationPayload:
|
||||
return PendingConfirmationPayload(
|
||||
surgery_id=str(body.get("surgery_id", "")),
|
||||
confirmation_id=str(body["confirmation_id"]),
|
||||
prompt_text=str(body.get("prompt_text", "")),
|
||||
prompt_audio_mp3_base64=str(body.get("prompt_audio_mp3_base64", "")),
|
||||
options=list(body.get("options") or []),
|
||||
model_top1_label=str(body.get("model_top1_label", "")),
|
||||
model_top1_confidence=float(body.get("model_top1_confidence", 0.0)),
|
||||
created_at=str(body.get("created_at", "")),
|
||||
raw=body,
|
||||
)
|
||||
|
||||
def post_resolve(
|
||||
self,
|
||||
surgery_id: str,
|
||||
confirmation_id: str,
|
||||
wav_bytes: bytes,
|
||||
filename: str = "voice.wav",
|
||||
) -> tuple[int, dict[str, Any] | str]:
|
||||
cid_enc = quote(confirmation_id, safe="")
|
||||
url = self._url(
|
||||
f"client/surgeries/{surgery_id}/pending-confirmation/{cid_enc}/resolve"
|
||||
)
|
||||
files = {"audio": (filename, wav_bytes, "audio/wav")}
|
||||
r = self._client.post(url, files=files)
|
||||
text = r.text
|
||||
if not text:
|
||||
return r.status_code, {}
|
||||
try:
|
||||
body: dict[str, Any] | str = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
body = text
|
||||
return r.status_code, body
|
||||
347
voice_confirmation_client/core/monitor_worker.py
Normal file
347
voice_confirmation_client/core/monitor_worker.py
Normal file
@@ -0,0 +1,347 @@
|
||||
"""Background polling + play + record + resolve (threaded, Qt-free)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from voice_confirmation_client.core.api import ConfirmationApiClient
|
||||
from voice_confirmation_client.core.playback import play_mp3_from_base64
|
||||
from voice_confirmation_client.core.record import record_wav_16k_mono
|
||||
|
||||
|
||||
@dataclass
|
||||
class MonitorSettings:
|
||||
base_url: str = "http://127.0.0.1:38080"
|
||||
surgery_id: str = ""
|
||||
interval_sec: float = 5.0
|
||||
record_seconds: float = 8.0
|
||||
dry_run: bool = False
|
||||
hide_404_logs: bool = True
|
||||
prefer_ffmpeg_record: bool = False
|
||||
sounddevice_device: int | str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class _MutableState:
|
||||
generation: int = 0
|
||||
busy: bool = False
|
||||
spoken_cid: str | None = None
|
||||
failed_resolve_cid: str | None = None
|
||||
force_retry: bool = False
|
||||
last_payload: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class MonitorWorker:
|
||||
"""Polls pending-confirmation; on new item plays MP3, records WAV, POSTs resolve."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
on_log: Callable[[str], None] | None = None,
|
||||
on_state: Callable[[str], None] | None = None,
|
||||
on_pending: Callable[[dict[str, Any] | None], None] | None = None,
|
||||
) -> None:
|
||||
self._on_log = on_log
|
||||
self._on_state = on_state
|
||||
self._on_pending = on_pending
|
||||
self._settings = MonitorSettings()
|
||||
self._settings_lock = threading.Lock()
|
||||
self._state = _MutableState()
|
||||
self._state_lock = threading.Lock()
|
||||
self._stop = threading.Event()
|
||||
self._wake = threading.Event()
|
||||
self._monitoring = threading.Event()
|
||||
self._thread: threading.Thread | None = None
|
||||
self._api: ConfirmationApiClient | None = None
|
||||
self._api_base: str | None = None
|
||||
self._api_lock = threading.Lock()
|
||||
|
||||
def set_settings(self, **kwargs: Any) -> None:
|
||||
with self._settings_lock:
|
||||
old_sid = self._settings.surgery_id
|
||||
for k, v in kwargs.items():
|
||||
if hasattr(self._settings, k):
|
||||
setattr(self._settings, k, v)
|
||||
sid_changed = (
|
||||
"surgery_id" in kwargs and self._settings.surgery_id != old_sid
|
||||
)
|
||||
with self._state_lock:
|
||||
self._state.generation += 1
|
||||
if sid_changed:
|
||||
self._state.spoken_cid = None
|
||||
self._state.failed_resolve_cid = None
|
||||
self._state.last_payload = None
|
||||
self._state.force_retry = False
|
||||
self._emit_pending(None)
|
||||
|
||||
def start_thread(self) -> None:
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._stop.clear()
|
||||
self._thread = threading.Thread(target=self._run, name="VoiceMonitor", daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def stop_thread(self) -> None:
|
||||
self._stop.set()
|
||||
self._wake.set()
|
||||
if self._thread:
|
||||
self._thread.join(timeout=8.0)
|
||||
self._thread = None
|
||||
with self._api_lock:
|
||||
if self._api:
|
||||
self._api.close()
|
||||
self._api = None
|
||||
self._api_base = None
|
||||
|
||||
def set_monitoring(self, active: bool) -> None:
|
||||
if active:
|
||||
self._monitoring.set()
|
||||
self._wake.set()
|
||||
else:
|
||||
self._monitoring.clear()
|
||||
with self._state_lock:
|
||||
self._state.generation += 1
|
||||
|
||||
def retry_failed(self) -> None:
|
||||
with self._state_lock:
|
||||
self._state.force_retry = True
|
||||
self._wake.set()
|
||||
|
||||
def replay_prompt_only(self) -> None:
|
||||
"""Play last pending MP3 again (GUI button); no record/upload."""
|
||||
threading.Thread(target=self._replay_prompt_job, name="ReplayPrompt", daemon=True).start()
|
||||
|
||||
def _replay_prompt_job(self) -> None:
|
||||
with self._state_lock:
|
||||
payload = self._state.last_payload
|
||||
if not payload:
|
||||
self._log("没有可重播的待确认数据")
|
||||
return
|
||||
b64 = payload.get("prompt_audio_mp3_base64") or ""
|
||||
if not b64:
|
||||
self._log("当前任务无 MP3 数据")
|
||||
return
|
||||
self._emit_state("播放话术(手动重播)…")
|
||||
try:
|
||||
play_mp3_from_base64(str(b64))
|
||||
except Exception as e:
|
||||
self._log(f"重播失败: {e}")
|
||||
finally:
|
||||
self._emit_state("待机")
|
||||
|
||||
def _log(self, msg: str) -> None:
|
||||
if self._on_log:
|
||||
self._on_log(msg)
|
||||
|
||||
def _emit_state(self, s: str) -> None:
|
||||
if self._on_state:
|
||||
self._on_state(s)
|
||||
|
||||
def _emit_pending(self, p: dict[str, Any] | None) -> None:
|
||||
if self._on_pending:
|
||||
self._on_pending(p)
|
||||
|
||||
def _get_api(self, base_url: str) -> ConfirmationApiClient:
|
||||
norm = base_url.rstrip("/") + "/"
|
||||
with self._api_lock:
|
||||
if self._api is None or self._api_base != norm:
|
||||
if self._api:
|
||||
self._api.close()
|
||||
self._api = ConfirmationApiClient(base_url)
|
||||
self._api_base = norm
|
||||
return self._api
|
||||
|
||||
def _run(self) -> None:
|
||||
while not self._stop.is_set():
|
||||
if not self._monitoring.is_set():
|
||||
time.sleep(0.15)
|
||||
continue
|
||||
|
||||
with self._settings_lock:
|
||||
cfg = MonitorSettings(
|
||||
base_url=self._settings.base_url,
|
||||
surgery_id=self._settings.surgery_id,
|
||||
interval_sec=self._settings.interval_sec,
|
||||
record_seconds=self._settings.record_seconds,
|
||||
dry_run=self._settings.dry_run,
|
||||
hide_404_logs=self._settings.hide_404_logs,
|
||||
prefer_ffmpeg_record=self._settings.prefer_ffmpeg_record,
|
||||
sounddevice_device=self._settings.sounddevice_device,
|
||||
)
|
||||
|
||||
if not re.fullmatch(r"\d{6}", cfg.surgery_id or ""):
|
||||
self._emit_state("手术号无效(需 6 位数字)")
|
||||
self._wake.wait(timeout=1.0)
|
||||
self._wake.clear()
|
||||
continue
|
||||
|
||||
api = self._get_api(cfg.base_url)
|
||||
|
||||
with self._state_lock:
|
||||
if self._state.busy:
|
||||
self._wake.wait(timeout=0.5)
|
||||
self._wake.clear()
|
||||
continue
|
||||
gen_before = self._state.generation
|
||||
|
||||
try:
|
||||
status, body = api.get_pending(cfg.surgery_id)
|
||||
except Exception as e:
|
||||
self._log(f"GET pending 失败: {e}")
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
with self._state_lock:
|
||||
if self._state.generation != gen_before:
|
||||
continue
|
||||
if self._state.busy:
|
||||
continue
|
||||
|
||||
if status == 404:
|
||||
with self._state_lock:
|
||||
self._state.last_payload = None
|
||||
self._state.spoken_cid = None
|
||||
self._state.failed_resolve_cid = None
|
||||
self._emit_pending(None)
|
||||
if not cfg.hide_404_logs:
|
||||
self._log("暂无待确认")
|
||||
self._emit_state("轮询中(无待确认)")
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
if status != 200 or not isinstance(body, dict):
|
||||
self._log(f"GET pending 异常 HTTP {status}: {body}")
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
cid = str(body.get("confirmation_id") or "")
|
||||
if not cid:
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
with self._state_lock:
|
||||
self._state.last_payload = body
|
||||
failed = self._state.failed_resolve_cid
|
||||
force = self._state.force_retry
|
||||
spoken = self._state.spoken_cid
|
||||
|
||||
if failed is not None and failed != cid:
|
||||
self._state.failed_resolve_cid = None
|
||||
self._state.force_retry = False
|
||||
failed = None
|
||||
|
||||
if failed == cid and not force:
|
||||
self._emit_pending(body)
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
if spoken == cid and failed is None and not force:
|
||||
# Already completed pipeline for this cid without failure; server still returns same id?
|
||||
self._emit_pending(body)
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
continue
|
||||
|
||||
self._state.force_retry = False
|
||||
self._state.busy = True
|
||||
self._state.spoken_cid = cid
|
||||
|
||||
self._emit_pending(body)
|
||||
|
||||
try:
|
||||
self._pipeline_play_record_resolve(cfg, api, body, cid)
|
||||
finally:
|
||||
with self._state_lock:
|
||||
self._state.busy = False
|
||||
|
||||
self._wake.clear()
|
||||
self._wait_interval(cfg.interval_sec)
|
||||
|
||||
def _wait_interval(self, interval_sec: float) -> None:
|
||||
self._wake.wait(timeout=max(0.5, interval_sec))
|
||||
self._wake.clear()
|
||||
|
||||
def _pipeline_play_record_resolve(
|
||||
self,
|
||||
cfg: MonitorSettings,
|
||||
api: ConfirmationApiClient,
|
||||
body: dict[str, Any],
|
||||
cid: str,
|
||||
) -> None:
|
||||
gen_lock = self._state_lock
|
||||
with gen_lock:
|
||||
gen_run = self._state.generation
|
||||
|
||||
try:
|
||||
self._emit_state("播放话术…")
|
||||
play_mp3_from_base64(str(body.get("prompt_audio_mp3_base64") or ""))
|
||||
except Exception as e:
|
||||
self._log(f"播放失败: {e}")
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = cid
|
||||
self._emit_state("播放失败(可重试)")
|
||||
return
|
||||
|
||||
with gen_lock:
|
||||
if self._state.generation != gen_run:
|
||||
return
|
||||
|
||||
try:
|
||||
self._emit_state("录音中…")
|
||||
wav = record_wav_16k_mono(
|
||||
cfg.record_seconds,
|
||||
device=cfg.sounddevice_device,
|
||||
prefer_ffmpeg=cfg.prefer_ffmpeg_record,
|
||||
)
|
||||
except Exception as e:
|
||||
self._log(f"录音失败: {e}")
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = cid
|
||||
self._emit_state("录音失败(可重试)")
|
||||
return
|
||||
|
||||
with gen_lock:
|
||||
if self._state.generation != gen_run:
|
||||
return
|
||||
|
||||
if cfg.dry_run:
|
||||
self._log(f"[dry-run] 已录音 {len(wav)} 字节,跳过上传")
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = None
|
||||
self._state.spoken_cid = None
|
||||
self._state.generation += 1
|
||||
self._emit_state("待机(dry-run)")
|
||||
return
|
||||
|
||||
try:
|
||||
self._emit_state("上传识别…")
|
||||
st, res = api.post_resolve(cfg.surgery_id, cid, wav)
|
||||
except Exception as e:
|
||||
self._log(f"POST resolve 失败: {e}")
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = cid
|
||||
self._emit_state("上传失败(可重试)")
|
||||
return
|
||||
|
||||
if st == 200 and isinstance(res, dict) and res.get("status") == "accepted":
|
||||
self._log(
|
||||
f"已确认: {res.get('message', '')} "
|
||||
f"(resolved_label={res.get('resolved_label')!r})"
|
||||
)
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = None
|
||||
self._state.spoken_cid = None
|
||||
self._state.last_payload = None
|
||||
self._state.generation += 1
|
||||
self._emit_pending(None)
|
||||
self._emit_state("待机")
|
||||
return
|
||||
|
||||
self._log(f"resolve 未接受 HTTP {st}: {res}")
|
||||
with gen_lock:
|
||||
self._state.failed_resolve_cid = cid
|
||||
self._emit_state("解析/上传被拒(可重试)")
|
||||
47
voice_confirmation_client/core/paths.py
Normal file
47
voice_confirmation_client/core/paths.py
Normal file
@@ -0,0 +1,47 @@
|
||||
"""Resolve bundled helper binaries (ffplay/ffmpeg) next to the package or PyInstaller extract dir."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def package_root() -> Path:
|
||||
"""Directory containing `voice_confirmation_client` package."""
|
||||
return Path(__file__).resolve().parent.parent
|
||||
|
||||
|
||||
def frozen_base() -> Path | None:
|
||||
"""PyInstaller onefile/onedir: sys._MEIPASS or executable dir."""
|
||||
if getattr(sys, "frozen", False):
|
||||
meipass = getattr(sys, "_MEIPASS", None)
|
||||
if meipass:
|
||||
return Path(meipass)
|
||||
return Path(sys.executable).resolve().parent
|
||||
return None
|
||||
|
||||
|
||||
def bin_dir() -> Path:
|
||||
"""Optional `bin/` next to package (dev) or under _MEIPASS (frozen)."""
|
||||
fb = frozen_base()
|
||||
if fb is not None:
|
||||
d = fb / "voice_confirmation_bin"
|
||||
if d.is_dir():
|
||||
return d
|
||||
return package_root() / "bin"
|
||||
|
||||
|
||||
def find_ffplay() -> Path | None:
|
||||
for name in ("ffplay", "ffplay.exe"):
|
||||
p = bin_dir() / name
|
||||
if p.is_file():
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def find_ffmpeg() -> Path | None:
|
||||
for name in ("ffmpeg", "ffmpeg.exe"):
|
||||
p = bin_dir() / name
|
||||
if p.is_file():
|
||||
return p
|
||||
return None
|
||||
61
voice_confirmation_client/core/playback.py
Normal file
61
voice_confirmation_client/core/playback.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Play MP3 bytes via system player or bundled ffplay."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from voice_confirmation_client.core.paths import find_ffplay
|
||||
|
||||
|
||||
def play_mp3_from_base64(b64: str) -> None:
|
||||
raw_b64 = "".join((b64 or "").split())
|
||||
if not raw_b64:
|
||||
raise ValueError("empty prompt_audio_mp3_base64")
|
||||
data = base64.b64decode(raw_b64, validate=False)
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
|
||||
f.write(data)
|
||||
tmp = f.name
|
||||
try:
|
||||
_play_mp3_path(Path(tmp))
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _play_mp3_path(path: Path) -> None:
|
||||
bundled = find_ffplay()
|
||||
if bundled and bundled.is_file():
|
||||
subprocess.run(
|
||||
[str(bundled), "-nodisp", "-autoexit", "-loglevel", "quiet", str(path)],
|
||||
check=True,
|
||||
timeout=600,
|
||||
)
|
||||
return
|
||||
ffplay = shutil.which("ffplay")
|
||||
if ffplay:
|
||||
subprocess.run(
|
||||
[ffplay, "-nodisp", "-autoexit", "-loglevel", "quiet", str(path)],
|
||||
check=True,
|
||||
timeout=600,
|
||||
)
|
||||
return
|
||||
if sys.platform == "darwin":
|
||||
subprocess.run(["afplay", str(path)], check=True, timeout=600)
|
||||
return
|
||||
if os.name == "nt":
|
||||
os.startfile(str(path)) # type: ignore[attr-defined]
|
||||
import time
|
||||
|
||||
time.sleep(5)
|
||||
return
|
||||
raise RuntimeError(
|
||||
"No MP3 player found. Install ffmpeg (ffplay) or run on macOS with afplay."
|
||||
)
|
||||
94
voice_confirmation_client/core/record.py
Normal file
94
voice_confirmation_client/core/record.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Record microphone to 16 kHz mono WAV (sounddevice or ffmpeg)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import wave
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
from voice_confirmation_client.core.paths import find_ffmpeg
|
||||
|
||||
|
||||
def record_wav_16k_mono(
|
||||
duration_sec: float,
|
||||
*,
|
||||
device: int | str | None = None,
|
||||
prefer_ffmpeg: bool = False,
|
||||
ffmpeg_input_args: list[str] | None = None,
|
||||
) -> bytes:
|
||||
"""Return WAV file bytes (16-bit PCM, 16 kHz, mono)."""
|
||||
if prefer_ffmpeg:
|
||||
bundled = find_ffmpeg()
|
||||
ffmpeg_bin = str(bundled) if bundled and bundled.is_file() else shutil_which_ffmpeg()
|
||||
if ffmpeg_bin:
|
||||
return _record_ffmpeg(ffmpeg_bin, duration_sec, ffmpeg_input_args)
|
||||
return _record_sounddevice(duration_sec, device=device)
|
||||
|
||||
|
||||
def shutil_which_ffmpeg() -> str | None:
|
||||
import shutil
|
||||
|
||||
return shutil.which("ffmpeg")
|
||||
|
||||
|
||||
def _record_sounddevice(duration_sec: float, device: int | str | None) -> bytes:
|
||||
import sounddevice as sd
|
||||
|
||||
samplerate = 16000
|
||||
frames = int(duration_sec * samplerate)
|
||||
kwargs: dict = {"samplerate": samplerate, "channels": 1, "dtype": "float32"}
|
||||
if device is not None and device != "":
|
||||
kwargs["device"] = device
|
||||
recording = sd.rec(frames, **kwargs)
|
||||
sd.wait()
|
||||
mono = np.clip(recording.reshape(-1), -1.0, 1.0)
|
||||
pcm = (mono * 32767.0).astype(np.int16)
|
||||
buf = io.BytesIO()
|
||||
with wave.open(buf, "wb") as wf:
|
||||
wf.setnchannels(1)
|
||||
wf.setsampwidth(2)
|
||||
wf.setframerate(samplerate)
|
||||
wf.writeframes(pcm.tobytes())
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
def default_ffmpeg_input_args() -> list[str]:
|
||||
if sys.platform == "darwin":
|
||||
return ["-f", "avfoundation", "-i", ":0"]
|
||||
if sys.platform == "win32":
|
||||
return ["-f", "dshow", "-i", "audio=Microphone"]
|
||||
return ["-f", "alsa", "-i", "default"]
|
||||
|
||||
|
||||
def _record_ffmpeg(
|
||||
ffmpeg_bin: str, duration_sec: float, ffmpeg_input_args: list[str] | None
|
||||
) -> bytes:
|
||||
input_args = ffmpeg_input_args if ffmpeg_input_args else default_ffmpeg_input_args()
|
||||
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
|
||||
out = tmp.name
|
||||
try:
|
||||
cmd = [
|
||||
ffmpeg_bin,
|
||||
"-y",
|
||||
"-loglevel",
|
||||
"error",
|
||||
*input_args,
|
||||
"-t",
|
||||
str(duration_sec),
|
||||
"-ar",
|
||||
"16000",
|
||||
"-ac",
|
||||
"1",
|
||||
"-sample_fmt",
|
||||
"s16",
|
||||
out,
|
||||
]
|
||||
subprocess.run(cmd, check=True, timeout=int(duration_sec) + 45)
|
||||
return Path(out).read_bytes()
|
||||
finally:
|
||||
Path(out).unlink(missing_ok=True)
|
||||
Reference in New Issue
Block a user