"""托管预览视频:Linux 上 mimetypes 常无法识别 .mp4,Starlette 会退回 text/plain,导致浏览器表现异常。 同时提供 Range 请求支持,使视频可以在浏览器中流式播放、跳转进度。 """ from __future__ import annotations import os from mimetypes import guess_type from pathlib import Path from typing import Union from starlette.datastructures import Headers from starlette.responses import FileResponse, Response, StreamingResponse from starlette.staticfiles import NotModifiedResponse, StaticFiles from starlette.types import Scope PathLike = Union[str, "os.PathLike[str]"] def _media_type_for_file(path: PathLike) -> str: ext = Path(path).suffix.lower() if ext in (".mp4", ".m4v"): return "video/mp4" if ext == ".webm": return "video/webm" if ext == ".mkv": return "video/x-matroska" guessed = guess_type(str(path))[0] if guessed: return guessed return "application/octet-stream" def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]]: """解析 HTTP Range 头,返回 (start, end) 列表。""" if not range_header.startswith("bytes="): return [] ranges = [] for part in range_header[len("bytes="):].split(","): part = part.strip() if part.startswith("-"): # 后缀范围:最后 N 字节 try: suffix = int(part[1:]) if suffix > 0: start = max(0, file_size - suffix) ranges.append((start, file_size - 1)) except ValueError: continue else: if "-" in part: start_str, end_str = part.split("-", 1) try: start = int(start_str) if start_str else 0 except ValueError: continue try: end = int(end_str) if end_str else file_size - 1 except ValueError: continue if start > end: continue end = min(end, file_size - 1) ranges.append((start, end)) return ranges class _RangeFileResponse(Response): """支持 HTTP Range 请求的文件响应,用于视频流播放。""" def __init__( self, path: PathLike, media_type: str | None = None, stat_result: os.stat_result | None = None, headers: dict | None = None, range_header: str | None = None, ): self.path = path self.stat_result = stat_result or os.stat(path) self.file_size = self.stat_result.st_size self.range_header = range_header headers = headers or {} headers.setdefault("accept-ranges", "bytes") headers.setdefault("last-modified", str(self.stat_result.st_mtime)) # 解析 Range 请求 ranges = [] if range_header: ranges = _parse_range_header(range_header, self.file_size) if len(ranges) == 1: # 单范围请求 start, end = ranges[0] self.status_code = 206 self.start = start self.end = end headers["content-length"] = str(end - start + 1) headers["content-range"] = f"bytes {start}-{end}/{self.file_size}" elif len(ranges) > 1: # 多范围请求太复杂,返回整个文件 self.status_code = 200 self.start = 0 self.end = self.file_size - 1 headers["content-length"] = str(self.file_size) else: # 无 Range 请求,返回整个文件 self.status_code = 200 self.start = 0 self.end = self.file_size - 1 headers["content-length"] = str(self.file_size) super().__init__( content=None, status_code=self.status_code, headers=headers, media_type=media_type, ) async def __call__(self, scope: Scope, receive, send): if self.status_code == 416: await send({"type": "http.response.start", "status": 416, "headers": self.raw_headers}) await send({"type": "http.response.body", "body": b""}) return await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers}) chunk_size = 64 * 1024 # 64KB chunks async def file_stream(): with open(self.path, "rb") as f: f.seek(self.start) remaining = self.end - self.start + 1 while remaining > 0: to_read = min(chunk_size, remaining) chunk = f.read(to_read) if not chunk: break yield chunk remaining -= len(chunk) async for chunk in file_stream(): await send({"type": "http.response.body", "body": chunk, "more_body": True}) await send({"type": "http.response.body", "body": b"", "more_body": False}) class MediaStaticFiles(StaticFiles): """与 StaticFiles 相同,但对常见视频后缀显式设置 Content-Type,并支持 Range 请求。""" def file_response( self, full_path: PathLike, stat_result: os.stat_result, scope: Scope, status_code: int = 200, ) -> Response: request_headers = Headers(scope=scope) range_header = request_headers.get("range") media_type = _media_type_for_file(full_path) # 视频文件支持 Range 请求 if media_type.startswith("video/") and range_header: response = _RangeFileResponse( full_path, media_type=media_type, stat_result=stat_result, range_header=range_header, ) else: response = FileResponse( full_path, status_code=status_code, stat_result=stat_result, media_type=media_type, ) if self.is_not_modified(response.headers, request_headers): return NotModifiedResponse(response.headers) return response