Files
2026-04-16 14:53:01 +08:00

182 lines
5.7 KiB
Python

from __future__ import annotations
import asyncio
from pathlib import Path
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response
from app.compat import to_thread
from app.db import (
health_snapshot_deliverable,
measure_snapshot_deliverable,
save_health_snapshot,
save_measure_snapshot,
)
from app.deps import require_ingest_auth
from app.services import action as action_svc
from app.services import measure as measure_svc
from app.services.sessions import (
finalize_rename,
new_session_dir,
partial_path,
write_chunk,
)
from app.settings import Settings, get_settings
from app.state import app_state
router = APIRouter(prefix="/api/v1/ingest", tags=["ingest"])
async def _measure_job_serial(svo_path: Path, settings: Settings) -> None:
async with app_state.measure_lock:
app_state.measure_status = "running"
try:
def _run():
with app_state.measure_thread_lock:
return measure_svc.run_full_measure(svo_path, settings)
snap = await to_thread(_run)
if measure_snapshot_deliverable(snap):
save_measure_snapshot(
settings, snap, source_path=str(svo_path.resolve())
)
app_state.measure_status = "idle"
except Exception:
app_state.measure_status = "error"
async def _action_job_serial(mp4_path: Path, settings: Settings) -> None:
async with app_state.action_lock:
app_state.action_status = "running"
try:
slice_files, duration = await to_thread(
action_svc.prepare_action_slices, mp4_path, settings
)
total_slices = len(slice_files)
for i, slice_file in enumerate(slice_files):
snap = await to_thread(
action_svc.run_single_slice_inference,
slice_file, i, total_slices, duration, mp4_path.name, settings,
)
if health_snapshot_deliverable(snap):
slice_source = f"{mp4_path.resolve()}#slice{i}"
save_health_snapshot(settings, snap, source_path=slice_source)
app_state.action_status = "idle"
except Exception:
app_state.action_status = "error"
@router.post("/svo/session")
async def create_svo_session(
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
settings.stream_tmp_dir.mkdir(parents=True, exist_ok=True)
sid, d = new_session_dir(settings.stream_tmp_dir)
return {"session_id": sid, "upload_path": str(d)}
@router.put("/svo/session/{session_id}")
async def append_svo_chunk(
session_id: str,
request: Request,
offset: int = 0,
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
d = settings.stream_tmp_dir / session_id
if not d.is_dir():
raise HTTPException(status_code=404, detail="Unknown session")
body = await request.body()
try:
write_chunk(d, body, offset)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
p = partial_path(d)
return {"bytes_total": p.stat().st_size}
@router.post("/svo/session/{session_id}/finalize")
async def finalize_svo(
session_id: str,
background_tasks: BackgroundTasks,
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
d = settings.stream_tmp_dir / session_id
if not d.is_dir():
raise HTTPException(status_code=404, detail="Unknown session")
try:
final = finalize_rename(d, "recording.svo2")
except FileNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
async def job() -> None:
await _measure_job_serial(final, settings)
background_tasks.add_task(job)
return Response(
status_code=202,
content='{"status":"accepted","job":"fish_measure"}',
media_type="application/json",
)
@router.post("/mp4/session")
async def create_mp4_session(
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
settings.stream_tmp_dir.mkdir(parents=True, exist_ok=True)
sid, d = new_session_dir(settings.stream_tmp_dir)
return {"session_id": sid, "upload_path": str(d)}
@router.put("/mp4/session/{session_id}")
async def append_mp4_chunk(
session_id: str,
request: Request,
offset: int = 0,
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
d = settings.stream_tmp_dir / session_id
if not d.is_dir():
raise HTTPException(status_code=404, detail="Unknown session")
body = await request.body()
try:
write_chunk(d, body, offset)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
p = partial_path(d)
return {"bytes_total": p.stat().st_size}
@router.post("/mp4/session/{session_id}/finalize")
async def finalize_mp4(
session_id: str,
background_tasks: BackgroundTasks,
settings: Settings = Depends(get_settings),
_: None = Depends(require_ingest_auth),
):
d = settings.stream_tmp_dir / session_id
if not d.is_dir():
raise HTTPException(status_code=404, detail="Unknown session")
try:
final = finalize_rename(d, "clip.mp4")
except FileNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
async def job() -> None:
await _action_job_serial(final, settings)
background_tasks.add_task(job)
return Response(
status_code=202,
content='{"status":"accepted","job":"fish_action"}',
media_type="application/json",
)