Files
FishServer/fish_api/app/media_static.py
2026-04-10 18:16:15 +08:00

181 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""托管预览视频Linux 上 mimetypes 常无法识别 .mp4Starlette 会退回 text/plain导致浏览器表现异常。
同时提供 Range 请求支持,使视频可以在浏览器中流式播放、跳转进度。
"""
from __future__ import annotations
import os
from mimetypes import guess_type
from pathlib import Path
from typing import Union
from starlette.datastructures import Headers
from starlette.responses import FileResponse, Response, StreamingResponse
from starlette.staticfiles import NotModifiedResponse, StaticFiles
from starlette.types import Scope
PathLike = Union[str, "os.PathLike[str]"]
def _media_type_for_file(path: PathLike) -> str:
ext = Path(path).suffix.lower()
if ext in (".mp4", ".m4v"):
return "video/mp4"
if ext == ".webm":
return "video/webm"
if ext == ".mkv":
return "video/x-matroska"
guessed = guess_type(str(path))[0]
if guessed:
return guessed
return "application/octet-stream"
def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]]:
"""解析 HTTP Range 头,返回 (start, end) 列表。"""
if not range_header.startswith("bytes="):
return []
ranges = []
for part in range_header[len("bytes="):].split(","):
part = part.strip()
if part.startswith("-"):
# 后缀范围:最后 N 字节
try:
suffix = int(part[1:])
if suffix > 0:
start = max(0, file_size - suffix)
ranges.append((start, file_size - 1))
except ValueError:
continue
else:
if "-" in part:
start_str, end_str = part.split("-", 1)
try:
start = int(start_str) if start_str else 0
except ValueError:
continue
try:
end = int(end_str) if end_str else file_size - 1
except ValueError:
continue
if start > end:
continue
end = min(end, file_size - 1)
ranges.append((start, end))
return ranges
class _RangeFileResponse(Response):
"""支持 HTTP Range 请求的文件响应,用于视频流播放。"""
def __init__(
self,
path: PathLike,
media_type: str | None = None,
stat_result: os.stat_result | None = None,
headers: dict | None = None,
range_header: str | None = None,
):
self.path = path
self.stat_result = stat_result or os.stat(path)
self.file_size = self.stat_result.st_size
self.range_header = range_header
headers = headers or {}
headers.setdefault("accept-ranges", "bytes")
headers.setdefault("last-modified", str(self.stat_result.st_mtime))
# 解析 Range 请求
ranges = []
if range_header:
ranges = _parse_range_header(range_header, self.file_size)
if len(ranges) == 1:
# 单范围请求
start, end = ranges[0]
self.status_code = 206
self.start = start
self.end = end
headers["content-length"] = str(end - start + 1)
headers["content-range"] = f"bytes {start}-{end}/{self.file_size}"
elif len(ranges) > 1:
# 多范围请求太复杂,返回整个文件
self.status_code = 200
self.start = 0
self.end = self.file_size - 1
headers["content-length"] = str(self.file_size)
else:
# 无 Range 请求,返回整个文件
self.status_code = 200
self.start = 0
self.end = self.file_size - 1
headers["content-length"] = str(self.file_size)
super().__init__(
content=None,
status_code=self.status_code,
headers=headers,
media_type=media_type,
)
async def __call__(self, scope: Scope, receive, send):
if self.status_code == 416:
await send({"type": "http.response.start", "status": 416, "headers": self.raw_headers})
await send({"type": "http.response.body", "body": b""})
return
await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers})
chunk_size = 64 * 1024 # 64KB chunks
async def file_stream():
with open(self.path, "rb") as f:
f.seek(self.start)
remaining = self.end - self.start + 1
while remaining > 0:
to_read = min(chunk_size, remaining)
chunk = f.read(to_read)
if not chunk:
break
yield chunk
remaining -= len(chunk)
async for chunk in file_stream():
await send({"type": "http.response.body", "body": chunk, "more_body": True})
await send({"type": "http.response.body", "body": b"", "more_body": False})
class MediaStaticFiles(StaticFiles):
"""与 StaticFiles 相同,但对常见视频后缀显式设置 Content-Type并支持 Range 请求。"""
def file_response(
self,
full_path: PathLike,
stat_result: os.stat_result,
scope: Scope,
status_code: int = 200,
) -> Response:
request_headers = Headers(scope=scope)
range_header = request_headers.get("range")
media_type = _media_type_for_file(full_path)
# 视频文件支持 Range 请求
if media_type.startswith("video/") and range_header:
response = _RangeFileResponse(
full_path,
media_type=media_type,
stat_result=stat_result,
range_header=range_header,
)
else:
response = FileResponse(
full_path,
status_code=status_code,
stat_result=stat_result,
media_type=media_type,
)
if self.is_not_modified(response.headers, request_headers):
return NotModifiedResponse(response.headers)
return response