182 lines
5.7 KiB
Python
182 lines
5.7 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response
|
|
|
|
from app.compat import to_thread
|
|
from app.db import (
|
|
health_snapshot_deliverable,
|
|
measure_snapshot_deliverable,
|
|
save_health_snapshot,
|
|
save_measure_snapshot,
|
|
)
|
|
from app.deps import require_ingest_auth
|
|
from app.services import action as action_svc
|
|
from app.services import measure as measure_svc
|
|
from app.services.sessions import (
|
|
finalize_rename,
|
|
new_session_dir,
|
|
partial_path,
|
|
write_chunk,
|
|
)
|
|
from app.settings import Settings, get_settings
|
|
from app.state import app_state
|
|
|
|
router = APIRouter(prefix="/api/v1/ingest", tags=["ingest"])
|
|
|
|
|
|
async def _measure_job_serial(svo_path: Path, settings: Settings) -> None:
|
|
async with app_state.measure_lock:
|
|
app_state.measure_status = "running"
|
|
try:
|
|
|
|
def _run():
|
|
with app_state.measure_thread_lock:
|
|
return measure_svc.run_full_measure(svo_path, settings)
|
|
|
|
snap = await to_thread(_run)
|
|
if measure_snapshot_deliverable(snap):
|
|
save_measure_snapshot(
|
|
settings, snap, source_path=str(svo_path.resolve())
|
|
)
|
|
app_state.measure_status = "idle"
|
|
except Exception:
|
|
app_state.measure_status = "error"
|
|
|
|
|
|
async def _action_job_serial(mp4_path: Path, settings: Settings) -> None:
|
|
async with app_state.action_lock:
|
|
app_state.action_status = "running"
|
|
try:
|
|
slice_files, duration = await to_thread(
|
|
action_svc.prepare_action_slices, mp4_path, settings
|
|
)
|
|
total_slices = len(slice_files)
|
|
|
|
for i, slice_file in enumerate(slice_files):
|
|
snap = await to_thread(
|
|
action_svc.run_single_slice_inference,
|
|
slice_file, i, total_slices, duration, mp4_path.name, settings,
|
|
)
|
|
if health_snapshot_deliverable(snap):
|
|
slice_source = f"{mp4_path.resolve()}#slice{i}"
|
|
save_health_snapshot(settings, snap, source_path=slice_source)
|
|
|
|
app_state.action_status = "idle"
|
|
except Exception:
|
|
app_state.action_status = "error"
|
|
|
|
|
|
@router.post("/svo/session")
|
|
async def create_svo_session(
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
settings.stream_tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
sid, d = new_session_dir(settings.stream_tmp_dir)
|
|
return {"session_id": sid, "upload_path": str(d)}
|
|
|
|
|
|
@router.put("/svo/session/{session_id}")
|
|
async def append_svo_chunk(
|
|
session_id: str,
|
|
request: Request,
|
|
offset: int = 0,
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
d = settings.stream_tmp_dir / session_id
|
|
if not d.is_dir():
|
|
raise HTTPException(status_code=404, detail="Unknown session")
|
|
body = await request.body()
|
|
try:
|
|
write_chunk(d, body, offset)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
p = partial_path(d)
|
|
return {"bytes_total": p.stat().st_size}
|
|
|
|
|
|
@router.post("/svo/session/{session_id}/finalize")
|
|
async def finalize_svo(
|
|
session_id: str,
|
|
background_tasks: BackgroundTasks,
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
d = settings.stream_tmp_dir / session_id
|
|
if not d.is_dir():
|
|
raise HTTPException(status_code=404, detail="Unknown session")
|
|
try:
|
|
final = finalize_rename(d, "recording.svo2")
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
|
|
async def job() -> None:
|
|
await _measure_job_serial(final, settings)
|
|
|
|
background_tasks.add_task(job)
|
|
return Response(
|
|
status_code=202,
|
|
content='{"status":"accepted","job":"fish_measure"}',
|
|
media_type="application/json",
|
|
)
|
|
|
|
|
|
@router.post("/mp4/session")
|
|
async def create_mp4_session(
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
settings.stream_tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
sid, d = new_session_dir(settings.stream_tmp_dir)
|
|
return {"session_id": sid, "upload_path": str(d)}
|
|
|
|
|
|
@router.put("/mp4/session/{session_id}")
|
|
async def append_mp4_chunk(
|
|
session_id: str,
|
|
request: Request,
|
|
offset: int = 0,
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
d = settings.stream_tmp_dir / session_id
|
|
if not d.is_dir():
|
|
raise HTTPException(status_code=404, detail="Unknown session")
|
|
body = await request.body()
|
|
try:
|
|
write_chunk(d, body, offset)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
p = partial_path(d)
|
|
return {"bytes_total": p.stat().st_size}
|
|
|
|
|
|
@router.post("/mp4/session/{session_id}/finalize")
|
|
async def finalize_mp4(
|
|
session_id: str,
|
|
background_tasks: BackgroundTasks,
|
|
settings: Settings = Depends(get_settings),
|
|
_: None = Depends(require_ingest_auth),
|
|
):
|
|
d = settings.stream_tmp_dir / session_id
|
|
if not d.is_dir():
|
|
raise HTTPException(status_code=404, detail="Unknown session")
|
|
try:
|
|
final = finalize_rename(d, "clip.mp4")
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
|
|
async def job() -> None:
|
|
await _action_job_serial(final, settings)
|
|
|
|
background_tasks.add_task(job)
|
|
return Response(
|
|
status_code=202,
|
|
content='{"status":"accepted","job":"fish_action"}',
|
|
media_type="application/json",
|
|
)
|