"""托管预览视频:Linux 上 mimetypes 常无法识别 .mp4,Starlette 会退回 text/plain,导致浏览器表现异常。 同时提供 Range 请求支持,使视频可以在浏览器中流式播放、跳转进度。 """ from __future__ import annotations import os from mimetypes import guess_type from pathlib import Path from typing import Dict, List, Optional, Tuple, Union from starlette.datastructures import Headers from starlette.responses import FileResponse, Response, StreamingResponse from starlette.staticfiles import NotModifiedResponse, StaticFiles from starlette.types import Scope PathLike = Union[str, "os.PathLike[str]"] def _media_type_for_file(path: PathLike) -> str: ext = Path(path).suffix.lower() if ext in (".mp4", ".m4v"): return "video/mp4" if ext == ".webm": return "video/webm" if ext == ".mkv": return "video/x-matroska" guessed = guess_type(str(path))[0] if guessed: return guessed return "application/octet-stream" def _parse_range_header(range_header: str, file_size: int) -> List[Tuple[int, int]]: """解析 HTTP Range 头,返回 (start, end) 列表。""" if not range_header.startswith("bytes="): return [] ranges = [] for part in range_header[len("bytes="):].split(","): part = part.strip() if part.startswith("-"): # 后缀范围:最后 N 字节 try: suffix = int(part[1:]) if suffix > 0: start = max(0, file_size - suffix) ranges.append((start, file_size - 1)) except ValueError: continue else: if "-" in part: start_str, end_str = part.split("-", 1) try: start = int(start_str) if start_str else 0 except ValueError: continue try: end = int(end_str) if end_str else file_size - 1 except ValueError: continue if start > end: continue end = min(end, file_size - 1) ranges.append((start, end)) return ranges class _RangeFileResponse(Response): """支持 HTTP Range 请求的文件响应,用于视频流播放。""" def __init__( self, path: PathLike, media_type: Optional[str] = None, stat_result: Optional[os.stat_result] = None, headers: Optional[Dict] = None, range_header: Optional[str] = None, ): self.path = path self.stat_result = stat_result or os.stat(path) self.file_size = self.stat_result.st_size self.range_header = range_header headers = headers or {} headers.setdefault("accept-ranges", "bytes") headers.setdefault("last-modified", str(self.stat_result.st_mtime)) # 解析 Range 请求 ranges = [] if range_header: ranges = _parse_range_header(range_header, self.file_size) if len(ranges) == 1: # 单范围请求 start, end = ranges[0] self.status_code = 206 self.start = start self.end = end headers["content-length"] = str(end - start + 1) headers["content-range"] = f"bytes {start}-{end}/{self.file_size}" elif len(ranges) > 1: # 多范围请求太复杂,返回整个文件 self.status_code = 200 self.start = 0 self.end = self.file_size - 1 headers["content-length"] = str(self.file_size) else: # 无 Range 请求,返回整个文件 self.status_code = 200 self.start = 0 self.end = self.file_size - 1 headers["content-length"] = str(self.file_size) super().__init__( content=None, status_code=self.status_code, headers=headers, media_type=media_type, ) async def __call__(self, scope: Scope, receive, send): if self.status_code == 416: await send({"type": "http.response.start", "status": 416, "headers": self.raw_headers}) await send({"type": "http.response.body", "body": b""}) return await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers}) chunk_size = 64 * 1024 # 64KB chunks async def file_stream(): with open(self.path, "rb") as f: f.seek(self.start) remaining = self.end - self.start + 1 while remaining > 0: to_read = min(chunk_size, remaining) chunk = f.read(to_read) if not chunk: break yield chunk remaining -= len(chunk) async for chunk in file_stream(): await send({"type": "http.response.body", "body": chunk, "more_body": True}) await send({"type": "http.response.body", "body": b"", "more_body": False}) class MediaStaticFiles(StaticFiles): """与 StaticFiles 相同,但对常见视频后缀显式设置 Content-Type,并支持 Range 请求。""" def file_response( self, full_path: PathLike, stat_result: os.stat_result, scope: Scope, status_code: int = 200, ) -> Response: request_headers = Headers(scope=scope) range_header = request_headers.get("range") media_type = _media_type_for_file(full_path) # 视频文件支持 Range 请求 if media_type.startswith("video/") and range_header: response = _RangeFileResponse( full_path, media_type=media_type, stat_result=stat_result, range_header=range_header, ) else: response = FileResponse( full_path, status_code=status_code, stat_result=stat_result, media_type=media_type, ) if self.is_not_modified(response.headers, request_headers): return NotModifiedResponse(response.headers) return response