1543 lines
57 KiB
Python
1543 lines
57 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import math
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import numpy as np
|
||
|
||
from app.logging_config import format_json_pretty
|
||
from app.settings import Settings
|
||
from app.state import MeasureSnapshot
|
||
from app.subprocess_run import run_subprocess_with_log
|
||
from loguru import logger
|
||
|
||
|
||
def _py_exe(settings: Settings) -> str:
|
||
return settings.python_fish_measure or sys.executable
|
||
|
||
|
||
def _predict_weigth_from_svo2_extra_args(settings: Settings) -> List[str]:
|
||
"""构建 predict_weigth_from_svo2.py 的额外参数。
|
||
|
||
所有控制参数统一从 .env / Settings 读取,保持单一配置源。
|
||
"""
|
||
out: List[str] = [
|
||
"--conf",
|
||
str(settings.measure_yolo_conf),
|
||
]
|
||
# 点云过滤参数(默认与 FishMeasure run_predict_from_svo2_fish9.sh 对齐)
|
||
if settings.measure_filter_pointcloud:
|
||
out.append("--filter-pointcloud")
|
||
if settings.measure_use_density_filter:
|
||
out.append("--use-density-filter")
|
||
if settings.measure_use_pointcloud_classifier:
|
||
cls_path = settings.measure_pointcloud_classifier
|
||
if cls_path and Path(cls_path).is_file():
|
||
out.append("--use-pointcloud-classifier")
|
||
out.extend(["--pointcloud-classifier", str(Path(cls_path).resolve())])
|
||
out.extend(
|
||
[
|
||
"--pointcloud-classifier-threshold",
|
||
str(settings.measure_pointcloud_classifier_threshold),
|
||
]
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"[FishMeasure] pointcloud classifier enabled but model not found: {}",
|
||
cls_path,
|
||
)
|
||
if settings.measure_use_flatness_filter:
|
||
out.append("--use-flatness-filter")
|
||
out.extend(["--flatness-threshold", str(settings.measure_flatness_threshold)])
|
||
if settings.predict_show_large_labels_at_top_right:
|
||
out.append("--show-large-labels-at-top-right")
|
||
# PLY 复用控制:由 .env 中的 MEASURE_REUSE_EXISTING_CLOUDS 决定
|
||
if not settings.measure_reuse_existing_clouds:
|
||
out.append("--no-reuse-existing-clouds")
|
||
|
||
# 体重聚合规则
|
||
out.extend(["--weight-top-k", str(settings.measure_weight_top_k)])
|
||
if settings.measure_weight_top_by_length:
|
||
out.append("--weight-top-by-length")
|
||
else:
|
||
out.append("--no-weight-top-by-length")
|
||
out.extend(["--weight-length-switch-mm", str(settings.measure_weight_length_switch_mm)])
|
||
out.extend(["--weight-max-length-mm", str(settings.measure_weight_max_length_mm)])
|
||
out.extend(["--weight-min-length-width-ratio", str(settings.measure_weight_min_length_width_ratio)])
|
||
if settings.measure_weight_average_all_after_filter:
|
||
out.append("--weight-average-all-after-filter")
|
||
else:
|
||
out.append("--no-weight-average-all-after-filter")
|
||
out.extend(["--weight-average-all-fallback-max-if-mean-over-g", str(settings.measure_weight_avg_all_fallback_max_g)])
|
||
out.extend(["--weight-mean-pool-fallback-max-if-over-g", str(settings.measure_weight_mean_pool_fallback_max_g)])
|
||
if settings.measure_weight_remove_outliers:
|
||
out.append("--weight-remove-outliers")
|
||
out.extend(["--weight-outlier-method", settings.measure_weight_outlier_method])
|
||
|
||
return out
|
||
|
||
|
||
def _resolve_output_root(settings: Settings, output_root: Optional[Path] = None) -> Path:
|
||
"""``output_root`` 优先;否则 ``settings.measure_output_root``。"""
|
||
root = output_root if output_root is not None else settings.measure_output_root
|
||
root.mkdir(parents=True, exist_ok=True)
|
||
return root
|
||
|
||
|
||
def run_measure_subprocess(
|
||
svo_path: Path, settings: Settings, *, output_root: Optional[Path] = None
|
||
) -> None:
|
||
"""运行单个 SVO 文件的 FishMeasure 处理。"""
|
||
script = settings.fish_measure_root / "predict_weigth_from_svo2.py"
|
||
if not script.is_file():
|
||
raise FileNotFoundError(f"Missing FishMeasure script: {script}")
|
||
|
||
root = _resolve_output_root(settings, output_root)
|
||
|
||
cmd = [
|
||
_py_exe(settings),
|
||
str(script),
|
||
"--svo",
|
||
str(svo_path.resolve()),
|
||
"--save-output",
|
||
str(root.resolve()),
|
||
"--sam-device",
|
||
settings.sam_device,
|
||
]
|
||
cmd.extend(_predict_weigth_from_svo2_extra_args(settings))
|
||
|
||
proc = run_subprocess_with_log(
|
||
cmd,
|
||
cwd=str(settings.fish_measure_root),
|
||
env=os.environ.copy(),
|
||
log_name="FishMeasure",
|
||
stream_to_logger=False,
|
||
)
|
||
if proc.returncode != 0:
|
||
err = proc.stdout or ""
|
||
raise RuntimeError(
|
||
f"predict_weigth_from_svo2.py failed ({proc.returncode}): {err[-4000:]}"
|
||
)
|
||
|
||
|
||
def run_measure_batch_subprocess(
|
||
svo_paths: List[Path], settings: Settings, *, output_root: Optional[Path] = None
|
||
) -> None:
|
||
"""批量运行多个 SVO 文件的 FishMeasure 处理,合并点云进行重量预测。
|
||
|
||
使用与 test_dgcnn.sh --batch-root / run_predict_from_svo2_fish*.sh 相同的逻辑:
|
||
1. 处理所有 SVO 文件生成点云
|
||
2. 合并所有点云进行批量重量预测(top-K 应用于合并后的点云集合)
|
||
|
||
``output_root`` 为每条鱼隔离的输出目录(如 ``measure_output_root/fish1``)。
|
||
不传时退化为 ``settings.measure_output_root``(所有鱼共用,不推荐)。
|
||
"""
|
||
if not svo_paths:
|
||
raise ValueError("No SVO files provided for batch processing")
|
||
|
||
script = settings.fish_measure_root / "predict_weigth_from_svo2.py"
|
||
if not script.is_file():
|
||
raise FileNotFoundError(f"Missing FishMeasure script: {script}")
|
||
|
||
root = _resolve_output_root(settings, output_root)
|
||
|
||
if len(svo_paths) == 1:
|
||
return run_measure_subprocess(svo_paths[0], settings, output_root=root)
|
||
|
||
parent_dirs = {p.parent for p in svo_paths}
|
||
if len(parent_dirs) > 1:
|
||
logger.warning(
|
||
"[FishMeasure] SVO files are in different directories, falling back to individual processing"
|
||
)
|
||
for svo in svo_paths:
|
||
run_measure_subprocess(svo, settings, output_root=root)
|
||
return
|
||
|
||
batch_folder = svo_paths[0].parent
|
||
|
||
cmd = [
|
||
_py_exe(settings),
|
||
str(script),
|
||
"--batch-svo-folder",
|
||
str(batch_folder.resolve()),
|
||
"--save-output",
|
||
str(root.resolve()),
|
||
"--sam-device",
|
||
settings.sam_device,
|
||
]
|
||
cmd.extend(_predict_weigth_from_svo2_extra_args(settings))
|
||
|
||
logger.info(
|
||
"[FishMeasure] Batch processing {} SVOs from {} -> {}: {}",
|
||
len(svo_paths),
|
||
batch_folder.name,
|
||
root,
|
||
", ".join(p.name for p in svo_paths),
|
||
)
|
||
|
||
proc = run_subprocess_with_log(
|
||
cmd,
|
||
cwd=str(settings.fish_measure_root),
|
||
env=os.environ.copy(),
|
||
log_name="FishMeasure",
|
||
stream_to_logger=False,
|
||
)
|
||
if proc.returncode != 0:
|
||
err = proc.stdout or ""
|
||
raise RuntimeError(
|
||
f"predict_weigth_from_svo2.py batch mode failed ({proc.returncode}): {err[-4000:]}"
|
||
)
|
||
|
||
|
||
def _summary_entry_matches_svo(item: Dict[str, Any], svo_path: Path) -> bool:
|
||
stem = svo_path.stem
|
||
resolved = str(svo_path.resolve())
|
||
svo_key = item.get("svo")
|
||
if svo_key:
|
||
try:
|
||
if Path(str(svo_key)).resolve() == svo_path.resolve():
|
||
return True
|
||
except OSError:
|
||
pass
|
||
if str(svo_key) == resolved:
|
||
return True
|
||
if item.get("svo_name") == stem:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _load_weight_json(
|
||
svo_path: Path, settings: Settings, *, output_root: Optional[Path] = None
|
||
) -> Dict[str, Any]:
|
||
"""读取 FishMeasure 合并结果。优先 per-SVO 的 weight_prediction.json;否则从 weight_predictions_summary.json 取匹配项(predict 脚本在权重步失败时仍 exit 0 只写 summary)。"""
|
||
stem = svo_path.stem
|
||
root = output_root if output_root is not None else settings.measure_output_root
|
||
candidate = root / stem / "weight_prediction.json"
|
||
if candidate.is_file():
|
||
with open(candidate, encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
summary_path = root / "weight_predictions_summary.json"
|
||
if summary_path.is_file():
|
||
with open(summary_path, encoding="utf-8") as f:
|
||
summary_list: Any = json.load(f)
|
||
if isinstance(summary_list, list):
|
||
for item in reversed(summary_list):
|
||
if not isinstance(item, dict):
|
||
continue
|
||
if not _summary_entry_matches_svo(item, svo_path):
|
||
continue
|
||
err = item.get("error")
|
||
if err:
|
||
raise RuntimeError(
|
||
f"FishMeasure 权重步骤失败({svo_path.name}): {err}"
|
||
)
|
||
if item.get("per_cloud") or item.get("per_file") or item.get(
|
||
"dgcnn_summary"
|
||
):
|
||
return item
|
||
break
|
||
|
||
combined_path = root / "weight_prediction.json"
|
||
if combined_path.is_file():
|
||
with open(combined_path, encoding="utf-8") as f:
|
||
combined: Any = json.load(f)
|
||
if isinstance(combined, dict) and combined.get("combined"):
|
||
names = combined.get("svo_names") or []
|
||
if stem in names:
|
||
return combined
|
||
|
||
raise FileNotFoundError(
|
||
f"未找到测量结果 JSON:{candidate}(且 summary 中无本条 SVO 的成功记录)"
|
||
)
|
||
|
||
|
||
_TID_RE = re.compile(r"_tid(\d+)")
|
||
|
||
|
||
def _parse_tid_from_ply_name(name: str) -> Optional[int]:
|
||
"""与 FishMeasure/fish_video_weight_evaluation._parse_tid_from_ply_name 一致。"""
|
||
m = _TID_RE.search(name)
|
||
return int(m.group(1)) if m else None
|
||
|
||
|
||
def _safe_media_prefix(stem: str) -> str:
|
||
s = re.sub(r"[^\w.\-]+", "_", stem, flags=re.UNICODE).strip("._") or "svo"
|
||
return s[:120]
|
||
|
||
|
||
def _derive_id_from_folder_name(folder_name: str) -> str:
|
||
"""从文件夹名称派生 id:去除 'fish' 前缀/后缀(不区分大小写)。"""
|
||
# 去除不区分大小写的 "fish" 前缀或后缀
|
||
s = folder_name.strip()
|
||
s = re.sub(r'^fish[_\-]?', '', s, flags=re.IGNORECASE)
|
||
s = re.sub(r'[_\-]?fish$', '', s, flags=re.IGNORECASE)
|
||
return s or folder_name
|
||
|
||
|
||
def _final_pred_g_for_compare_summary(summary: Dict[str, Any]) -> Optional[float]:
|
||
"""与 ``test_dgcnn_weight_estimator._final_pred_g_for_compare`` 一致(仅读 summary)。
|
||
|
||
优先 ``pred_weight_g``,否则 ``avg_predicted_weight_g`` — 与 batch 日志 ``pred=… g`` 同源。
|
||
"""
|
||
pg = summary.get("pred_weight_g")
|
||
if pg is not None:
|
||
try:
|
||
v = float(pg)
|
||
if math.isfinite(v):
|
||
return v
|
||
except (TypeError, ValueError):
|
||
pass
|
||
ag = summary.get("avg_predicted_weight_g")
|
||
if ag is not None:
|
||
try:
|
||
v = float(ag)
|
||
if math.isfinite(v):
|
||
return v
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
|
||
def _final_pred_weight_g_from_weight_json(data: Dict[str, Any]) -> Optional[float]:
|
||
"""返回与 test_dgcc / batch 日志 ``pred=`` 相同的克数:先 ``dgcnn_summary``,再根级(merge JSON 副本)。"""
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
v = _final_pred_g_for_compare_summary(summary)
|
||
if v is not None:
|
||
return v
|
||
root_only: Dict[str, Any] = {}
|
||
if "pred_weight_g" in data:
|
||
root_only["pred_weight_g"] = data["pred_weight_g"]
|
||
if "avg_predicted_weight_g" in data:
|
||
root_only["avg_predicted_weight_g"] = data["avg_predicted_weight_g"]
|
||
return _final_pred_g_for_compare_summary(root_only) if root_only else None
|
||
|
||
|
||
def _mean_column_g_for_log_summary(summary: Dict[str, Any]) -> Optional[float]:
|
||
"""与 test_dgcnn_weight_estimator._mean_column_g_for_log 一致(batch 日志第一列 avg)。"""
|
||
v = summary.get("mean_all_pred_g_after_filters")
|
||
if v is not None:
|
||
try:
|
||
f = float(v)
|
||
if math.isfinite(f):
|
||
return f
|
||
except (TypeError, ValueError):
|
||
pass
|
||
ag = summary.get("avg_predicted_weight_g")
|
||
if ag is not None:
|
||
try:
|
||
f = float(ag)
|
||
if math.isfinite(f):
|
||
return f
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
|
||
def _summary_top_label_for_log(
|
||
summary: Dict[str, Any], top_k: int, fallback_by_length: bool
|
||
) -> str:
|
||
"""与 ``test_dgcnn_weight_estimator._summary_top_label`` 一致。"""
|
||
if summary.get("average_all_after_filter"):
|
||
return "mean of all after filters"
|
||
eff = summary.get("effective_top_by_length")
|
||
if eff is None:
|
||
eff = fallback_by_length
|
||
return f"top{top_k} by length" if eff else f"top{top_k} by pred"
|
||
|
||
|
||
def _build_test_dgcnn_invoke_command_line(data: Dict[str, Any], settings: Settings) -> str:
|
||
"""与 ``predict_weigth_from_svo2._run_test_dgcnn_weight_estimator_subprocess`` 打印的 argv 一致。"""
|
||
meta = data.get("dgcnn_meta") or {}
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
script = settings.fish_measure_root / "weight_estimator" / "test_dgcnn_weight_estimator.py"
|
||
py = _py_exe(settings)
|
||
ckpt = str(meta.get("checkpoint", ""))
|
||
device = str(meta.get("device", "cuda"))
|
||
num_points = int(meta.get("num_points", 768))
|
||
xyz_scale = float(meta.get("xyz_scale", 0.001))
|
||
top_k = int(summary.get("top_k") or 5)
|
||
json_base = str(data.get("dgcnn_output_json") or "")
|
||
if json_base:
|
||
jp = Path(json_base)
|
||
# 合并 JSON 可能记录 ``dgcnn_test_output_topK[_by_length].json``;子进程打印的是 ``dgcnn_test_output.json``
|
||
if jp.name.startswith("dgcnn_test_output_top") and jp.suffix == ".json":
|
||
json_base = str(jp.parent / "dgcnn_test_output.json")
|
||
labels = str(meta.get("labels_json", ""))
|
||
ply_folder = str(meta.get("ply_folder") or data.get("cloud_dir") or "")
|
||
ply_list_file = str(meta.get("ply_list_file") or data.get("ply_list_file") or "")
|
||
top_by_length = bool(summary.get("top_by_length", True))
|
||
length_switch = float(
|
||
meta.get("length_switch_mm", summary.get("length_switch_threshold_mm", 319.0))
|
||
)
|
||
max_len = float(meta.get("max_length_mm", summary.get("max_length_mm", 400.0)))
|
||
min_lw = float(
|
||
meta.get("min_length_width_ratio", summary.get("min_length_width_ratio", 1.5))
|
||
)
|
||
lq_cv = float(meta.get("length_quality_cv_threshold_pct", 15.0))
|
||
lq_span = float(meta.get("length_quality_max_span_mm", 130.0))
|
||
avg_all_fb = meta.get("average_all_fallback_max_if_mean_over_g")
|
||
if avg_all_fb is None:
|
||
avg_all_fb = summary.get("average_all_fallback_to_max_if_mean_over_g", 400.0)
|
||
mp_fb = meta.get("mean_pool_fallback_max_if_over_g")
|
||
if mp_fb is None:
|
||
mp_fb = summary.get("mean_pool_fallback_to_max_if_over_g", 440.0)
|
||
|
||
parts: List[str] = [
|
||
py,
|
||
str(script),
|
||
"--checkpoint",
|
||
ckpt,
|
||
"--device",
|
||
device,
|
||
"--num-points",
|
||
str(num_points),
|
||
"--xyz-scale",
|
||
str(xyz_scale),
|
||
"--top-k",
|
||
str(top_k),
|
||
"--output-json",
|
||
json_base
|
||
if json_base
|
||
else (
|
||
str(Path(ply_list_file).parent / "dgcnn_test_output.json")
|
||
if ply_list_file
|
||
else (
|
||
str(Path(ply_folder).parent / "dgcnn_test_output.json")
|
||
if ply_folder
|
||
else "dgcnn_test_output.json"
|
||
)
|
||
),
|
||
"--labels-json",
|
||
labels,
|
||
]
|
||
if ply_list_file:
|
||
parts.extend(["--ply-list-file", ply_list_file])
|
||
else:
|
||
parts.extend(["--ply-folder", ply_folder])
|
||
if top_by_length:
|
||
parts.append("--top-by-length")
|
||
else:
|
||
parts.extend(["--no-top-by-length"])
|
||
parts.extend(["--length-switch-mm", str(length_switch)])
|
||
orm = summary.get("outlier_removal")
|
||
if isinstance(orm, dict) and orm.get("enabled"):
|
||
parts.append("--remove-outliers")
|
||
om = orm.get("method") or "iqr"
|
||
parts.extend(["--outlier-method", str(om)])
|
||
parts.extend(["--max-length-mm", str(max_len)])
|
||
parts.extend(["--min-length-width-ratio", str(min_lw)])
|
||
parts.extend(["--length-quality-cv-threshold-pct", str(lq_cv)])
|
||
parts.extend(["--length-quality-max-span-mm", str(lq_span)])
|
||
if bool(meta.get("average_all_after_filter") or summary.get("average_all_after_filter")):
|
||
parts.append("--average-all-after-filter")
|
||
if avg_all_fb is not None and float(avg_all_fb) > 0:
|
||
parts.extend(
|
||
["--average-all-fallback-max-if-mean-over-g", str(float(avg_all_fb))]
|
||
)
|
||
if mp_fb is not None and float(mp_fb) > 0:
|
||
parts.extend(["--mean-pool-fallback-max-if-over-g", str(float(mp_fb))])
|
||
return " ".join(parts)
|
||
|
||
|
||
def format_dgcnn_weight_calculation_log(
|
||
data: Dict[str, Any],
|
||
*,
|
||
settings: Settings,
|
||
svo_display_name: str,
|
||
) -> str:
|
||
"""从 ``weight_prediction.json`` 合并结果生成与 ``test_dgcnn_weight_estimator.py`` 终端输出一致的文本。
|
||
|
||
与 FishMeasure 中 ``test_dgcnn_weight_estimator`` 主流程打印格式对齐(含 per-PLY 行、汇总行、最终 pred)。
|
||
"""
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
lines: List[str] = []
|
||
lines.append(f"=== Weight prediction: {svo_display_name} ===")
|
||
meta = data.get("dgcnn_meta") or {}
|
||
if meta.get("checkpoint"):
|
||
cmd = _build_test_dgcnn_invoke_command_line(data, settings)
|
||
lines.append("Invoking test_dgcnn_weight_estimator.py:")
|
||
lines.append(f" {cmd}")
|
||
|
||
if summary.get("skipped") or data.get("skipped_weight"):
|
||
lines.append(
|
||
f"Skipped: {summary.get('skip_reason', 'fewer than 5 PLYs')} "
|
||
f"(files={summary.get('num_files', 0)})"
|
||
)
|
||
return "\n".join(lines)
|
||
|
||
per_file = data.get("per_cloud") or data.get("per_file") or []
|
||
top_k = int(summary.get("top_k") or 5)
|
||
fallback_by_length = bool(summary.get("top_by_length", True))
|
||
top_label = _summary_top_label_for_log(summary, top_k, fallback_by_length)
|
||
lines.append(f"[Kept = {top_label}]")
|
||
|
||
xyz_scale = float(meta.get("xyz_scale", 0.001))
|
||
max_length_eff = meta.get("max_length_mm")
|
||
if max_length_eff is None:
|
||
max_length_eff = summary.get("max_length_mm")
|
||
min_lw_eff = meta.get("min_length_width_ratio")
|
||
if min_lw_eff is None:
|
||
min_lw_eff = summary.get("min_length_width_ratio")
|
||
|
||
n_total = int(summary.get("num_files_predicted", len(per_file)))
|
||
|
||
for it in per_file:
|
||
if not isinstance(it, dict):
|
||
continue
|
||
ply = Path(str(it.get("ply", ""))).name
|
||
try:
|
||
g = float(it.get("predicted_weight_g", float("nan")))
|
||
except (TypeError, ValueError):
|
||
g = float("nan")
|
||
try:
|
||
kg = float(it.get("predicted_weight_kg", g / 1000.0))
|
||
except (TypeError, ValueError):
|
||
kg = float("nan")
|
||
try:
|
||
length_input = float(it.get("length_input", float("nan")))
|
||
except (TypeError, ValueError):
|
||
length_input = float("nan")
|
||
used = it.get("used_for_prediction", True)
|
||
try:
|
||
rank = int(it.get("rank_by_selection", 0) or 0)
|
||
except (TypeError, ValueError):
|
||
rank = 0
|
||
|
||
if it.get("filtered_by_max_length"):
|
||
cap_s = (
|
||
f"{float(max_length_eff):.0f}"
|
||
if max_length_eff is not None
|
||
and math.isfinite(float(max_length_eff))
|
||
else "?"
|
||
)
|
||
tag = f" [FILTERED length>{cap_s}mm]"
|
||
elif it.get("filtered_by_length_width_ratio"):
|
||
thr_s = (
|
||
f"{float(min_lw_eff):.2f}"
|
||
if min_lw_eff is not None
|
||
and math.isfinite(float(min_lw_eff))
|
||
else "?"
|
||
)
|
||
tag = f" [FILTERED L/W<{thr_s}]"
|
||
elif rank > 0:
|
||
tag = (
|
||
f" (kept, rank {rank}/{n_total})"
|
||
if used
|
||
else f" (filtered, rank {rank}/{n_total})"
|
||
)
|
||
else:
|
||
tag = " (kept)" if used else " (filtered)"
|
||
|
||
if abs(xyz_scale - 0.001) < 1e-12:
|
||
length_str = f"{length_input:.1f} mm" if math.isfinite(length_input) else "nan"
|
||
else:
|
||
length_str = f"{length_input:.4f} units" if math.isfinite(length_input) else "nan"
|
||
lines.append(f"{ply}: len={length_str} | {g:.2f} g ({kg:.4f} kg){tag}")
|
||
|
||
lines.append(
|
||
f"Files: {summary.get('num_files_predicted', summary.get('num_files', len(per_file)))}"
|
||
)
|
||
|
||
n_cap = summary.get("num_files_filtered_by_max_length", 0) or 0
|
||
if n_cap > 0 and max_length_eff is not None:
|
||
try:
|
||
mf = float(max_length_eff)
|
||
if math.isfinite(mf):
|
||
lines.append(
|
||
f"Filtered by length cap (>{mf:.0f} mm): {n_cap} "
|
||
f"(excluded from aggregation)"
|
||
)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
n_lw = summary.get("num_files_filtered_by_length_width_ratio", 0) or 0
|
||
if n_lw > 0 and min_lw_eff is not None:
|
||
try:
|
||
mlw = float(min_lw_eff)
|
||
if math.isfinite(mlw):
|
||
lines.append(
|
||
f"Filtered by length/width ratio (<{mlw:.2f}): {n_lw} "
|
||
f"(excluded from aggregation)"
|
||
)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
orm = summary.get("outlier_removal")
|
||
if (
|
||
isinstance(orm, dict)
|
||
and orm.get("enabled")
|
||
and (summary.get("num_outliers_removed", 0) or 0) > 0
|
||
):
|
||
lines.append(
|
||
f"Outliers removed: {summary['num_outliers_removed']} "
|
||
f"(method={orm.get('method')}, field={orm.get('field')})"
|
||
)
|
||
|
||
top_label2 = _summary_top_label_for_log(summary, top_k, fallback_by_length)
|
||
mcol = _mean_column_g_for_log_summary(summary)
|
||
tk = int(summary.get("top_k") or top_k)
|
||
topk_g = summary.get("avg_topk_mean_pred_g")
|
||
topk_sel = summary.get("avg_topk_mean_pred_selection") or "pred"
|
||
topk_extra = ""
|
||
if topk_g is not None:
|
||
try:
|
||
tg = float(topk_g)
|
||
if math.isfinite(tg):
|
||
by = "length" if topk_sel == "by_length" else "pred"
|
||
topk_extra = f" | top{tk}_avg={tg:.2f} g by {by}"
|
||
except (TypeError, ValueError):
|
||
pass
|
||
if mcol is not None and math.isfinite(mcol):
|
||
lines.append(
|
||
f"Average predicted weight {top_label2}: "
|
||
f"{mcol:.2f} g, {mcol / 1000.0:.4f} kg{topk_extra}"
|
||
)
|
||
|
||
mx = summary.get("max_predicted_weight_g_after_filter")
|
||
if mx is not None:
|
||
try:
|
||
mxv = float(mx)
|
||
if math.isfinite(mxv):
|
||
mk = mxv / 1000.0
|
||
lines.append(
|
||
f"Max predicted weight (after filters): {mxv:.2f} g ({mk:.4f} kg)"
|
||
)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
out_json = data.get("dgcnn_output_json")
|
||
if out_json:
|
||
p = Path(str(out_json))
|
||
by_len = bool(summary.get("top_by_length", True))
|
||
suffix = f"_top{tk}_by_length" if by_len else f"_top{tk}"
|
||
saved = p.parent / f"{p.stem}{suffix}{p.suffix}"
|
||
if saved.is_file():
|
||
lines.append(f"Saved: {saved}")
|
||
elif p.is_file():
|
||
lines.append(f"Saved: {p}")
|
||
|
||
out_dir_raw = data.get("output_dir")
|
||
if out_dir_raw:
|
||
summary_json = Path(str(out_dir_raw)).parent / "weight_predictions_summary.json"
|
||
else:
|
||
summary_json = settings.measure_output_root / "weight_predictions_summary.json"
|
||
if summary_json.is_file():
|
||
lines.append(f"Saved summary: {summary_json}")
|
||
|
||
pred_final = _final_pred_weight_g_from_weight_json(data)
|
||
root_pg = data.get("pred_weight_g")
|
||
summ_pg = summary.get("pred_weight_g")
|
||
label = (
|
||
"pred_weight"
|
||
if (root_pg is not None or summ_pg is not None)
|
||
else "avg_predicted_weight"
|
||
)
|
||
if pred_final is not None and math.isfinite(pred_final):
|
||
lines.append(
|
||
f"Final predicted weight (test_dgcnn, {label}): {pred_final:.2f} g"
|
||
)
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _write_measure_calculation_debug_file(
|
||
settings: Settings,
|
||
*,
|
||
file_stem: str,
|
||
calculation_log: str,
|
||
svo_path: Optional[Path] = None,
|
||
weight_json_path: Optional[Path] = None,
|
||
fish_id: Optional[str] = None,
|
||
pred_weight_g: Optional[float] = None,
|
||
star_confident: Optional[bool] = None,
|
||
length_mm: Optional[float] = None,
|
||
weight_data: Optional[Dict[str, Any]] = None,
|
||
extra_header_lines: Optional[List[str]] = None,
|
||
) -> None:
|
||
"""将体重推算过程写入 ``settings.measure_debug_log_dir``,便于离线对照终端输出。"""
|
||
if not settings.measure_debug_log_write:
|
||
return
|
||
root = settings.measure_debug_log_dir
|
||
try:
|
||
root.mkdir(parents=True, exist_ok=True)
|
||
except OSError as e:
|
||
logger.warning("[FishMeasure] debug log dir unavailable {}: {}", root, e)
|
||
return
|
||
safe = _safe_media_prefix(file_stem)[:96]
|
||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||
path = root / f"{ts}_{safe}_weight_calc.txt"
|
||
hdr: List[str] = [
|
||
"# fish_api DGCNN weight calculation debug",
|
||
f"# written_at_utc: {datetime.now(timezone.utc).isoformat()}",
|
||
]
|
||
if fish_id is not None and str(fish_id).strip() != "":
|
||
hdr.append(f"# fish_id: {fish_id}")
|
||
if svo_path is not None:
|
||
try:
|
||
hdr.append(f"# svo: {svo_path.resolve()}")
|
||
except OSError:
|
||
hdr.append(f"# svo: {svo_path}")
|
||
try:
|
||
hdr.append(f"# svo_parent_folder: {svo_path.parent.name}")
|
||
except OSError:
|
||
pass
|
||
if weight_json_path is not None:
|
||
hdr.append(f"# weight_prediction_json: {weight_json_path}")
|
||
if pred_weight_g is not None and math.isfinite(float(pred_weight_g)):
|
||
hdr.append(f"# pred_weight_g: {float(pred_weight_g):.4f}")
|
||
if star_confident is not None:
|
||
hdr.append(f"# star_confident: {bool(star_confident)}")
|
||
if length_mm is not None and math.isfinite(float(length_mm)):
|
||
hdr.append(f"# length_mm: {float(length_mm):.2f}")
|
||
wd = weight_data or {}
|
||
for key in ("output_dir", "cloud_dir", "dgcnn_output_json"):
|
||
v = wd.get(key)
|
||
if v:
|
||
hdr.append(f"# {key}: {v}")
|
||
sk = wd.get("skipped_weight")
|
||
if sk is not None:
|
||
hdr.append(f"# skipped_weight: {sk}")
|
||
for key in ("num_ply_predicted", "num_clouds_used", "num_ply_on_disk"):
|
||
v = wd.get(key)
|
||
if v is not None:
|
||
hdr.append(f"# {key}: {v}")
|
||
summ = wd.get("dgcnn_summary") or wd.get("weight_summary") or {}
|
||
if isinstance(summ, dict):
|
||
pr = summ.get("pred_weight_rule")
|
||
if pr:
|
||
hdr.append(f"# pred_weight_rule: {pr}")
|
||
meta = wd.get("dgcnn_meta") or {}
|
||
if isinstance(meta, dict) and meta.get("checkpoint"):
|
||
hdr.append(f"# dgcnn_checkpoint: {meta['checkpoint']}")
|
||
if extra_header_lines:
|
||
hdr.extend(extra_header_lines)
|
||
hdr.append("# ---")
|
||
body = "\n".join(hdr) + "\n" + calculation_log + "\n"
|
||
try:
|
||
path.write_text(body, encoding="utf-8")
|
||
except OSError as e:
|
||
logger.warning("[FishMeasure] cannot write debug log {}: {}", path, e)
|
||
return
|
||
fid = fish_id if fish_id is not None and str(fish_id).strip() != "" else "?"
|
||
logger.info(
|
||
"[FishMeasure] fish_id={} calculation debug log file: {}",
|
||
fid,
|
||
path,
|
||
)
|
||
|
||
|
||
def _star_confident_like_test_dgcnn(summary: Dict[str, Any]) -> bool:
|
||
"""与 test_dgcnn_weight_estimator._batch_topk_max_stable_star 的判定一致(日志中的 *)。"""
|
||
mean_g = _mean_column_g_for_log_summary(summary)
|
||
if mean_g is not None and mean_g > 440.0:
|
||
return True
|
||
frac = summary.get("fraction_in_near_max_length_band")
|
||
if frac is not None:
|
||
try:
|
||
f = float(frac)
|
||
if math.isfinite(f) and f >= 0.25:
|
||
return True
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return False
|
||
|
||
|
||
def _length_mm_from_dgcnn_summary(
|
||
summary: Dict[str, Any], data: Dict[str, Any]
|
||
) -> Optional[float]:
|
||
"""体长:与 fish_video / batch 展示一致,优先 avg_length_input_topk。"""
|
||
for key in ("avg_length_input_topk", "avg_length_input"):
|
||
v = summary.get(key)
|
||
if v is not None:
|
||
try:
|
||
f = float(v)
|
||
if math.isfinite(f):
|
||
return f
|
||
except (TypeError, ValueError):
|
||
pass
|
||
v = data.get("avg_length_input")
|
||
if v is not None:
|
||
try:
|
||
f = float(v)
|
||
if math.isfinite(f):
|
||
return f
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
|
||
def _length_mm_for_debug(
|
||
result: List[Dict[str, Any]],
|
||
summary: Dict[str, Any],
|
||
data: Dict[str, Any],
|
||
) -> Optional[float]:
|
||
"""调试头用体长:优先 API result 中的 length,否则与 summary 一致。"""
|
||
if result:
|
||
raw = result[0].get("length")
|
||
if raw is not None and str(raw).strip() != "":
|
||
try:
|
||
v = float(raw)
|
||
if math.isfinite(v):
|
||
return v
|
||
except (TypeError, ValueError):
|
||
pass
|
||
return _length_mm_from_dgcnn_summary(summary, data)
|
||
|
||
|
||
def _fallback_mean_length_mm_from_per_file(data: Dict[str, Any]) -> Optional[float]:
|
||
"""summary 无体长时,对 used_for_prediction 的 PLY 取 length_input 均值(仅作回退)。"""
|
||
items = data.get("per_cloud") or data.get("per_file") or []
|
||
if not isinstance(items, list):
|
||
return None
|
||
lengths: List[float] = []
|
||
for it in items:
|
||
if not isinstance(it, dict):
|
||
continue
|
||
if not it.get("used_for_prediction", True):
|
||
continue
|
||
try:
|
||
ln = float(it.get("length_input", float("nan")))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if math.isfinite(ln):
|
||
lengths.append(ln)
|
||
if not lengths:
|
||
return None
|
||
return float(np.mean(lengths))
|
||
|
||
|
||
def _result_from_weight_prediction_legacy_tid(
|
||
data: Dict[str, Any], folder_id: Optional[str]
|
||
) -> Tuple[List[Dict[str, Any]], Optional[float], bool]:
|
||
"""仅在 summary 无法给出最终 pred 时使用:按 tid 分组(旧逻辑)。"""
|
||
items = data.get("per_cloud") or data.get("per_file") or []
|
||
if not isinstance(items, list):
|
||
return [], None, False
|
||
|
||
track_predictions: Dict[int, List[Tuple[float, float]]] = {}
|
||
for it in items:
|
||
if not isinstance(it, dict):
|
||
continue
|
||
ply = it.get("ply")
|
||
if not ply:
|
||
continue
|
||
tid = _parse_tid_from_ply_name(Path(str(ply)).name)
|
||
if tid is None:
|
||
continue
|
||
if not it.get("used_for_prediction", True):
|
||
continue
|
||
try:
|
||
wg = float(it.get("predicted_weight_g", float("nan")))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if not math.isfinite(wg):
|
||
continue
|
||
try:
|
||
ln = float(it.get("length_input", float("nan")))
|
||
except (TypeError, ValueError):
|
||
ln = float("nan")
|
||
track_predictions.setdefault(tid, []).append((wg, ln))
|
||
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
pred_weight = _final_pred_weight_g_from_weight_json(data)
|
||
out: List[Dict[str, Any]] = []
|
||
TOP_K = 5
|
||
total_valid_clouds = 0
|
||
|
||
for tid in sorted(track_predictions.keys()):
|
||
predictions = track_predictions[tid]
|
||
if not predictions:
|
||
continue
|
||
total_valid_clouds += len(predictions)
|
||
predictions_sorted = sorted(predictions, key=lambda x: x[0], reverse=True)
|
||
top5 = predictions_sorted[: min(TOP_K, len(predictions_sorted))]
|
||
avg_weight = float(np.mean([p[0] for p in top5]))
|
||
valid_lengths = [p[1] for p in top5 if math.isfinite(p[1])]
|
||
avg_length = float(np.mean(valid_lengths)) if valid_lengths else float("nan")
|
||
if not math.isfinite(avg_length):
|
||
continue
|
||
result_id = folder_id if folder_id else str(tid)
|
||
out.append({
|
||
"id": result_id,
|
||
"type": "大黄鱼",
|
||
"weight": str(round(avg_weight)),
|
||
"length": str(round(avg_length)),
|
||
"date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
})
|
||
|
||
if pred_weight is not None and math.isfinite(pred_weight):
|
||
for item in out:
|
||
item["weight"] = str(round(pred_weight))
|
||
|
||
is_confident = _star_confident_like_test_dgcnn(summary)
|
||
if not is_confident:
|
||
is_confident = len(out) > 0 and total_valid_clouds >= 5
|
||
|
||
return out, pred_weight, is_confident
|
||
|
||
|
||
def _result_from_weight_prediction(data: Dict[str, Any], folder_id: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Optional[float], bool]:
|
||
"""从 weight_prediction.json 构建 API 结果,与 ``test_dgcc`` / batch 日志对齐。
|
||
|
||
最终体重(克)与 ``test_dgcnn_weight_estimator._final_pred_g_for_compare`` 一致:
|
||
``summary['pred_weight_g']``,否则 ``summary['avg_predicted_weight_g']``(即日志里的 ``pred=… g`` / JSON 的 ``pred_weight_g``)。
|
||
|
||
体长用 ``avg_length_input_topk`` 等 summary 字段。``star`` 与 ``_batch_topk_max_stable_star`` 一致。
|
||
|
||
Args:
|
||
data: FishMeasure 输出的 JSON 数据
|
||
folder_id: 输入文件夹名称(已处理掉 "fish"),用于作为 result 中的 id
|
||
|
||
Returns:
|
||
(result_list, pred_weight, is_confident)
|
||
"""
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
|
||
if data.get("skipped_weight") or summary.get("skipped"):
|
||
return [], None, False
|
||
|
||
pred_weight = _final_pred_weight_g_from_weight_json(data)
|
||
is_confident = _star_confident_like_test_dgcnn(summary)
|
||
|
||
if pred_weight is not None and math.isfinite(pred_weight):
|
||
length_mm = _length_mm_from_dgcnn_summary(summary, data)
|
||
if length_mm is None or not math.isfinite(length_mm):
|
||
length_mm = _fallback_mean_length_mm_from_per_file(data)
|
||
|
||
rid = folder_id if folder_id else "1"
|
||
length_str = str(round(length_mm)) if length_mm is not None and math.isfinite(length_mm) else ""
|
||
|
||
row: Dict[str, Any] = {
|
||
"id": rid,
|
||
"type": "大黄鱼",
|
||
"weight": str(round(pred_weight)),
|
||
"length": length_str,
|
||
"date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
}
|
||
return [row], pred_weight, is_confident
|
||
|
||
return _result_from_weight_prediction_legacy_tid(data, folder_id)
|
||
|
||
|
||
def _find_preview_videos(output_dir: Path) -> Tuple[Optional[Path], Optional[Path]]:
|
||
previews = sorted(output_dir.rglob("*preview*.mp4"))
|
||
if len(previews) >= 2:
|
||
return previews[0], previews[1]
|
||
all_mp4 = sorted(output_dir.rglob("*.mp4"))
|
||
if len(all_mp4) >= 2:
|
||
return all_mp4[0], all_mp4[1]
|
||
if len(all_mp4) == 1:
|
||
return all_mp4[0], all_mp4[0]
|
||
if len(previews) == 1:
|
||
return previews[0], previews[0]
|
||
return None, None
|
||
|
||
|
||
def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool:
|
||
"""Split a side-by-side stereo video (W x H where W == 2*H_single) into left/right halves.
|
||
|
||
Returns True if split succeeded, False otherwise (caller should fall back to copy).
|
||
"""
|
||
ffmpeg_path = _get_ffmpeg_path()
|
||
ffprobe_path = str(Path(ffmpeg_path).parent / "ffprobe")
|
||
probe = subprocess.run(
|
||
[
|
||
ffprobe_path, "-v", "quiet", "-print_format", "json",
|
||
"-show_streams", str(src),
|
||
],
|
||
capture_output=True, text=True,
|
||
)
|
||
if probe.returncode != 0:
|
||
return False
|
||
import json as _json
|
||
try:
|
||
streams = _json.loads(probe.stdout).get("streams", [])
|
||
vstream = next((s for s in streams if s.get("codec_type") == "video"), None)
|
||
if vstream is None:
|
||
return False
|
||
w, h = int(vstream["width"]), int(vstream["height"])
|
||
except Exception:
|
||
return False
|
||
half_w = w // 2
|
||
if half_w < 1 or w < h:
|
||
return False
|
||
|
||
encoder, encoder_options, _ = _get_h264_encoder()
|
||
for crop, dst in [
|
||
(f"crop={half_w}:{h}:{half_w}:0", left_dst),
|
||
(f"crop={half_w}:{h}:0:0", right_dst),
|
||
]:
|
||
cmd = [ffmpeg_path, "-y", "-i", str(src), "-vf", crop, "-an"]
|
||
if encoder:
|
||
cmd.extend(["-c:v", encoder, "-pix_fmt", "yuv420p", "-movflags", "+faststart"])
|
||
cmd.extend(encoder_options)
|
||
else:
|
||
cmd.extend(["-q:v", "5"])
|
||
cmd.append(str(dst))
|
||
r = subprocess.run(cmd, capture_output=True, text=True)
|
||
if r.returncode != 0:
|
||
return False
|
||
return True
|
||
|
||
|
||
def _get_ffmpeg_path() -> str:
|
||
"""获取可用的 ffmpeg 路径。优先使用项目配置的 ffmpeg。"""
|
||
# 优先使用项目目录下的 ffmpeg
|
||
project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg")
|
||
if project_ffmpeg.is_file():
|
||
return str(project_ffmpeg)
|
||
# 尝试系统路径
|
||
system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"]
|
||
for path in system_paths:
|
||
if Path(path).is_file():
|
||
return path
|
||
# 回退到 PATH 中的 ffmpeg
|
||
return "ffmpeg"
|
||
|
||
|
||
def _get_h264_encoder() -> tuple[str, list[str], str]:
|
||
"""检测可用的H.264编码器,返回 (encoder_name, options, ffmpeg_path)。
|
||
|
||
优先使用 libx264(纯软件,最可靠),硬件编码器需要实际测试才能确认可用。
|
||
"""
|
||
encoders_to_try = [
|
||
("libx264", ["-preset", "fast", "-crf", "23"]),
|
||
("h264_nvenc", ["-preset", "fast"]),
|
||
("libopenh264", []),
|
||
]
|
||
|
||
ffmpeg_path = _get_ffmpeg_path()
|
||
try:
|
||
result = subprocess.run(
|
||
[ffmpeg_path, "-encoders"],
|
||
capture_output=True, text=True, timeout=10
|
||
)
|
||
encoders_output = result.stdout
|
||
for encoder, options in encoders_to_try:
|
||
if encoder in encoders_output:
|
||
return encoder, options, ffmpeg_path
|
||
except Exception:
|
||
pass
|
||
return "", [], ffmpeg_path
|
||
|
||
|
||
def _get_x264_path() -> Optional[str]:
|
||
"""检测系统上是否有可用的 x264 命令行工具。"""
|
||
for path in ["/usr/bin/x264", "/usr/local/bin/x264", "x264"]:
|
||
if path == "x264":
|
||
try:
|
||
result = subprocess.run(["which", "x264"], capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0 and result.stdout.strip():
|
||
return result.stdout.strip()
|
||
except Exception:
|
||
pass
|
||
elif Path(path).is_file():
|
||
return path
|
||
return None
|
||
|
||
|
||
def _transcode_with_x264(src: Path, dst: Path) -> bool:
|
||
"""使用 x264 命令行工具将视频转码为 H.264。
|
||
|
||
这是当 ffmpeg 的 H.264 编码器都不可用时(如 libopenh264 版本不匹配)的最后备选方案。
|
||
通过 ffmpeg 提取原始 YUV 帧,然后用 x264 编码。
|
||
"""
|
||
import tempfile
|
||
import shutil
|
||
|
||
x264_path = _get_x264_path()
|
||
ffmpeg_path = _get_ffmpeg_path()
|
||
|
||
if not x264_path:
|
||
logger.debug("[FishMeasure] x264 not available")
|
||
return False
|
||
|
||
# 首先用 ffprobe 获取视频信息
|
||
try:
|
||
probe = subprocess.run(
|
||
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(src)],
|
||
capture_output=True, text=True, timeout=10
|
||
)
|
||
import json as _json
|
||
streams = _json.loads(probe.stdout).get("streams", [])
|
||
vstream = next((s for s in streams if s.get("codec_type") == "video"), None)
|
||
if not vstream:
|
||
return False
|
||
width = int(vstream["width"])
|
||
height = int(vstream["height"])
|
||
fps_str = vstream.get("r_frame_rate", "25/1")
|
||
# 解析 fps (可能是 "30/1" 或 "30000/1001" 格式)
|
||
if "/" in fps_str:
|
||
num, den = map(int, fps_str.split("/"))
|
||
fps = num / den if den != 0 else 25.0
|
||
else:
|
||
fps = float(fps_str)
|
||
except Exception as e:
|
||
logger.debug("[FishMeasure] x264 probe failed: {}", str(e))
|
||
return False
|
||
|
||
tmp_yuv = None
|
||
try:
|
||
# 创建临时 YUV 文件
|
||
with tempfile.NamedTemporaryFile(suffix=".yuv", delete=False) as f:
|
||
tmp_yuv = Path(f.name)
|
||
|
||
# 步骤1: 用 ffmpeg 提取 YUV 原始帧
|
||
extract_cmd = [
|
||
ffmpeg_path, "-y", "-i", str(src),
|
||
"-f", "rawvideo",
|
||
"-pix_fmt", "yuv420p",
|
||
str(tmp_yuv)
|
||
]
|
||
result = subprocess.run(extract_cmd, capture_output=True, text=True, timeout=300)
|
||
if result.returncode != 0:
|
||
logger.debug("[FishMeasure] x264: YUV extraction failed: {}", result.stderr[-200:] if result.stderr else "unknown")
|
||
return False
|
||
|
||
# 步骤2: 用 x264 编码
|
||
# x264 需要特定格式的输入参数
|
||
encode_cmd = [
|
||
x264_path,
|
||
"--input-res", f"{width}x{height}",
|
||
"--fps", str(fps),
|
||
"--preset", "fast",
|
||
"--crf", "23",
|
||
"--output-csp", "i420",
|
||
"-o", str(dst),
|
||
str(tmp_yuv)
|
||
]
|
||
result = subprocess.run(encode_cmd, capture_output=True, text=True, timeout=600)
|
||
|
||
if result.returncode == 0 and dst.is_file():
|
||
logger.info("[FishMeasure] x264 transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size)
|
||
return True
|
||
else:
|
||
stderr = result.stderr[-300:] if result.stderr else "Unknown error"
|
||
logger.warning("[FishMeasure] x264 transcoding FAILED: {}", stderr)
|
||
if dst.exists():
|
||
dst.unlink()
|
||
return False
|
||
except Exception as e:
|
||
logger.warning("[FishMeasure] x264 transcoding exception: {}", str(e))
|
||
if dst.exists():
|
||
dst.unlink()
|
||
return False
|
||
finally:
|
||
if tmp_yuv and tmp_yuv.exists():
|
||
tmp_yuv.unlink()
|
||
|
||
|
||
def _transcode_fallback(src: Path, dst: Path) -> bool:
|
||
"""备选转码方案:提取帧为图像序列,然后用ffmpeg编码为H.264。
|
||
|
||
这种方法避免编码器直接读取 mp4v 文件的兼容性问题。
|
||
"""
|
||
import tempfile
|
||
import shutil
|
||
|
||
encoder, encoder_options, ffmpeg_path = _get_h264_encoder()
|
||
if not encoder:
|
||
return False
|
||
|
||
tmp_dir = tempfile.mkdtemp()
|
||
try:
|
||
# 步骤1: 提取帧为 jpg 序列
|
||
frames_pattern = f"{tmp_dir}/frame_%04d.jpg"
|
||
extract_cmd = [
|
||
ffmpeg_path, "-y", "-i", str(src),
|
||
"-q:v", "2", # 高质量
|
||
frames_pattern
|
||
]
|
||
result = subprocess.run(extract_cmd, capture_output=True, text=True, timeout=60)
|
||
if result.returncode != 0:
|
||
logger.debug("[FishMeasure] Fallback: frame extraction failed: {}", result.stderr[-200:] if result.stderr else "unknown")
|
||
return False
|
||
|
||
# 步骤2: 从帧编码为 H.264 MP4
|
||
encode_cmd = [
|
||
ffmpeg_path, "-y",
|
||
"-i", frames_pattern,
|
||
"-c:v", encoder,
|
||
"-pix_fmt", "yuv420p",
|
||
"-movflags", "+faststart",
|
||
"-an",
|
||
]
|
||
encode_cmd.extend(encoder_options)
|
||
encode_cmd.append(str(dst))
|
||
|
||
result = subprocess.run(encode_cmd, capture_output=True, text=True, timeout=120)
|
||
|
||
if result.returncode == 0 and dst.is_file():
|
||
logger.info("[FishMeasure] Fallback transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size)
|
||
return True
|
||
else:
|
||
stderr = result.stderr[-300:] if result.stderr else "Unknown error"
|
||
logger.warning("[FishMeasure] Fallback transcoding FAILED: {}", stderr)
|
||
if dst.exists():
|
||
dst.unlink()
|
||
return False
|
||
except Exception as e:
|
||
logger.warning("[FishMeasure] Fallback transcoding exception: {}", str(e))
|
||
if dst.exists():
|
||
dst.unlink()
|
||
return False
|
||
finally:
|
||
# 清理临时目录
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
|
||
|
||
def _transcode_to_h264(src: Path, dst: Path) -> bool:
|
||
"""使用 ffmpeg 将视频转码为 H.264 (浏览器兼容格式)。
|
||
|
||
尝试多种H.264编码器,包括软件编码和硬件加速编码。
|
||
如果直接转码失败,依次尝试备选方案:
|
||
1. 提取帧重新编码
|
||
2. 使用 x264 命令行工具(当 ffmpeg 的 H.264 编码器都不可用时)
|
||
"""
|
||
encoder, encoder_options, ffmpeg_path = _get_h264_encoder()
|
||
|
||
# 如果有可用的 ffmpeg H.264 编码器,先尝试直接转码
|
||
if encoder:
|
||
try:
|
||
# 基础参数
|
||
cmd = [
|
||
ffmpeg_path, "-y", "-i", str(src),
|
||
"-c:v", encoder,
|
||
"-pix_fmt", "yuv420p", # 确保兼容性
|
||
"-movflags", "+faststart", # 优化网络播放(moov前置)
|
||
"-an", # 去除音频
|
||
]
|
||
cmd.extend(encoder_options)
|
||
cmd.append(str(dst))
|
||
|
||
logger.info("[FishMeasure] Transcoding with {} using {}: {} -> {}", encoder, ffmpeg_path, src.name, dst.name)
|
||
result = subprocess.run(
|
||
cmd, capture_output=True, text=True, timeout=300
|
||
)
|
||
|
||
if result.returncode == 0 and dst.is_file():
|
||
logger.info("[FishMeasure] Transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size)
|
||
return True
|
||
else:
|
||
stderr = result.stderr[-500:] if result.stderr else "Unknown error"
|
||
logger.warning("[FishMeasure] Direct transcoding FAILED, trying fallback: {}", stderr)
|
||
# 尝试备选方案1: 提取帧重新编码
|
||
if _transcode_fallback(src, dst):
|
||
return True
|
||
# 备选方案1失败,尝试 x264
|
||
logger.info("[FishMeasure] Fallback failed, trying x264...")
|
||
return _transcode_with_x264(src, dst)
|
||
except Exception as e:
|
||
logger.warning("[FishMeasure] Transcoding exception: {}", str(e))
|
||
if _transcode_fallback(src, dst):
|
||
return True
|
||
return _transcode_with_x264(src, dst)
|
||
else:
|
||
# 没有可用的 ffmpeg H.264 编码器,直接尝试 x264
|
||
logger.warning("[FishMeasure] No H.264 encoder available in ffmpeg, trying x264...")
|
||
return _transcode_with_x264(src, dst)
|
||
|
||
|
||
def transcode_src_to_h264_dst(src: Path, dst: Path) -> bool:
|
||
"""将 MP4 转码为 H.264;供 biomass 水上视频等复用 FishMeasure 同款 ffmpeg 逻辑。"""
|
||
return _transcode_to_h264(src, dst)
|
||
|
||
|
||
def _publish_media(
|
||
left: Optional[Path],
|
||
right: Optional[Path],
|
||
settings: Settings,
|
||
file_prefix: str,
|
||
) -> Tuple[str, str]:
|
||
settings.media_root.mkdir(parents=True, exist_ok=True)
|
||
safe_p = _safe_media_prefix(file_prefix)
|
||
left_dst = settings.media_root / f"{safe_p}_left.mp4"
|
||
right_dst = settings.media_root / f"{safe_p}_right.mp4"
|
||
base = settings.public_base_url.rstrip("/")
|
||
|
||
if left is not None and left == right and left.is_file():
|
||
if _split_sbs_video(left, left_dst, right_dst):
|
||
return (
|
||
f"{base}/media/{left_dst.name}",
|
||
f"{base}/media/{right_dst.name}",
|
||
)
|
||
|
||
def publish(src: Optional[Path], dst: Path) -> str:
|
||
if src is None or not src.is_file():
|
||
return ""
|
||
# 尝试转码为 H.264,如果失败则直接复制原文件
|
||
if _transcode_to_h264(src, dst):
|
||
logger.info("[FishMeasure] transcoded to H.264: {} -> {}", src.name, dst.name)
|
||
else:
|
||
# 转码失败,直接复制原文件
|
||
shutil.copy2(src, dst)
|
||
logger.warning("[FishMeasure] copied without transcoding: {} -> {}", src.name, dst.name)
|
||
return f"{base}/media/{dst.name}"
|
||
|
||
vl = publish(left, left_dst)
|
||
vr = publish(right, right_dst)
|
||
return vl, vr
|
||
|
||
|
||
def build_measure_snapshot(
|
||
svo_path: Path, settings: Settings, *, output_root: Optional[Path] = None
|
||
) -> MeasureSnapshot:
|
||
root = output_root if output_root is not None else settings.measure_output_root
|
||
data = _load_weight_json(svo_path, settings, output_root=root)
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
|
||
folder_id = _derive_id_from_folder_name(svo_path.parent.name)
|
||
|
||
result, pred_weight, is_confident = _result_from_weight_prediction(data, folder_id)
|
||
|
||
if pred_weight is None:
|
||
pred_weight = _final_pred_weight_g_from_weight_json(data)
|
||
|
||
# 回退:解析失败时仍尝试用 DGCNN summary 拼一条(与 test_dgcnn 一致)
|
||
if not result:
|
||
w = _final_pred_weight_g_from_weight_json(data)
|
||
if w is not None and math.isfinite(w):
|
||
lmm = _length_mm_from_dgcnn_summary(summary, data)
|
||
if lmm is None or not math.isfinite(lmm):
|
||
lmm = _fallback_mean_length_mm_from_per_file(data)
|
||
fallback_id = folder_id if folder_id else "1"
|
||
length_str = str(round(lmm)) if lmm is not None and math.isfinite(lmm) else ""
|
||
result = [{
|
||
"id": fallback_id,
|
||
"type": "大黄鱼",
|
||
"weight": str(round(w)),
|
||
"length": length_str,
|
||
"date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
}]
|
||
if pred_weight is None:
|
||
pred_weight = w
|
||
if not is_confident:
|
||
is_confident = _star_confident_like_test_dgcnn(summary)
|
||
|
||
calc_log = format_dgcnn_weight_calculation_log(
|
||
data, settings=settings, svo_display_name=svo_path.name
|
||
)
|
||
logger.info("[FishMeasure] weight calculation process:\n{}", calc_log)
|
||
wp_json = root / svo_path.stem / "weight_prediction.json"
|
||
len_mm = _length_mm_for_debug(result, summary, data)
|
||
_write_measure_calculation_debug_file(
|
||
settings,
|
||
file_stem=svo_path.stem,
|
||
calculation_log=calc_log,
|
||
svo_path=svo_path,
|
||
weight_json_path=wp_json if wp_json.is_file() else None,
|
||
fish_id=str(folder_id),
|
||
pred_weight_g=pred_weight,
|
||
star_confident=is_confident,
|
||
length_mm=len_mm,
|
||
weight_data=data,
|
||
)
|
||
|
||
logger.info(
|
||
"[FishMeasure] parsed {}\nresult ({} fish):\n{}\ndgcnn_summary:\n{}\npred_weight={} star={}",
|
||
svo_path.name,
|
||
len(result),
|
||
format_json_pretty(result),
|
||
format_json_pretty(summary if summary else {}),
|
||
pred_weight,
|
||
is_confident,
|
||
)
|
||
|
||
out_dir = Path(data.get("output_dir", root / svo_path.stem))
|
||
lv, rv = _find_preview_videos(out_dir)
|
||
prefix = (
|
||
f"{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}_{svo_path.stem}"
|
||
)
|
||
v_left, v_right = _publish_media(lv, rv, settings, prefix)
|
||
logger.info(
|
||
"[FishMeasure] media preview_paths={} {} | published_left={} published_right={}",
|
||
lv,
|
||
rv,
|
||
v_left or "(none)",
|
||
v_right or "(none)",
|
||
)
|
||
|
||
return MeasureSnapshot(
|
||
result=result,
|
||
video_left=v_left,
|
||
video_right=v_right,
|
||
updated_at=datetime.now(timezone.utc),
|
||
raw_prediction_path=str(root / svo_path.stem / "weight_prediction.json"),
|
||
pred=pred_weight,
|
||
star=is_confident,
|
||
calculation_log=calc_log,
|
||
)
|
||
|
||
|
||
def run_full_measure(
|
||
svo_path: Path, settings: Settings, *, output_root: Optional[Path] = None
|
||
) -> MeasureSnapshot:
|
||
logger.info("[FishMeasure] start svo={}", svo_path.resolve())
|
||
run_measure_subprocess(svo_path, settings, output_root=output_root)
|
||
snap = build_measure_snapshot(svo_path, settings, output_root=output_root)
|
||
logger.info("[FishMeasure] done svo={} result_len={}", svo_path.name, len(snap.result))
|
||
return snap
|
||
|
||
|
||
def run_full_measure_batch(svo_paths: List[Path], settings: Settings, fish_id: str = "1") -> MeasureSnapshot:
|
||
"""批量处理多个 SVO:predict_weigth_from_svo2 将多段 SVO 的点云合并为一次 DGCNN(ply-list)。
|
||
|
||
每条鱼的输出隔离到 ``measure_output_root / fish{fish_id}``,与
|
||
``run_predict_from_svo2_fish*.sh`` 里 ``--save-output output_weight_estimator/fish{N}`` 一致,
|
||
避免多条鱼共用一个扁平目录导致 ``weight_prediction.json`` 互相覆盖。
|
||
"""
|
||
if not svo_paths:
|
||
raise ValueError("No SVO files provided for batch processing")
|
||
|
||
fish_output_root = settings.measure_output_root / f"fish{fish_id}"
|
||
fish_output_root.mkdir(parents=True, exist_ok=True)
|
||
|
||
if len(svo_paths) == 1:
|
||
return run_full_measure(svo_paths[0], settings, output_root=fish_output_root)
|
||
|
||
svo_names = ", ".join(p.name for p in svo_paths)
|
||
logger.info(
|
||
"[FishMeasure] batch start: fish_id={} {} SVOs -> {}: {}",
|
||
fish_id, len(svo_paths), fish_output_root, svo_names,
|
||
)
|
||
|
||
run_measure_batch_subprocess(svo_paths, settings, output_root=fish_output_root)
|
||
|
||
snap = build_measure_snapshot_batch(
|
||
svo_paths, settings, fish_id=fish_id, output_root=fish_output_root
|
||
)
|
||
|
||
logger.info(
|
||
"[FishMeasure] batch done: fish_id={} {} SVOs, result_len={}",
|
||
fish_id, len(svo_paths), len(snap.result),
|
||
)
|
||
return snap
|
||
|
||
|
||
def build_measure_snapshot_batch(
|
||
svo_paths: List[Path],
|
||
settings: Settings,
|
||
fish_id: str = "1",
|
||
*,
|
||
output_root: Optional[Path] = None,
|
||
) -> MeasureSnapshot:
|
||
"""从批量处理的输出构建 MeasureSnapshot。
|
||
|
||
``output_root`` 是每条鱼独立的输出目录(如 ``measure_output_root/fish1``)。
|
||
"""
|
||
if not svo_paths:
|
||
return MeasureSnapshot(result=[], video_left="", video_right="")
|
||
|
||
root = output_root if output_root is not None else settings.measure_output_root
|
||
combined_json = root / "weight_prediction.json"
|
||
first_svo = svo_paths[0]
|
||
|
||
data: Dict[str, Any] = {}
|
||
if combined_json.is_file():
|
||
try:
|
||
with open(combined_json, encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
except (json.JSONDecodeError, OSError) as e:
|
||
logger.warning("[FishMeasure] Failed to read combined weight_prediction.json: {}", e)
|
||
data = {}
|
||
|
||
if not data:
|
||
return build_measure_snapshot(first_svo, settings, output_root=root)
|
||
|
||
summary = data.get("dgcnn_summary") or data.get("weight_summary") or {}
|
||
|
||
result, pred_weight, is_confident = _result_from_weight_prediction(data)
|
||
|
||
if pred_weight is None:
|
||
pred_weight = _final_pred_weight_g_from_weight_json(data)
|
||
|
||
if not result:
|
||
w = _final_pred_weight_g_from_weight_json(data)
|
||
if w is not None and math.isfinite(w):
|
||
lmm = _length_mm_from_dgcnn_summary(summary, data)
|
||
if lmm is None or not math.isfinite(lmm):
|
||
lmm = _fallback_mean_length_mm_from_per_file(data)
|
||
length_str = str(round(lmm)) if lmm is not None and math.isfinite(lmm) else ""
|
||
result = [{
|
||
"id": int(fish_id) if fish_id.isdigit() else 1,
|
||
"type": "大黄鱼",
|
||
"weight": str(round(w)),
|
||
"length": length_str,
|
||
"date": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
}]
|
||
if pred_weight is None:
|
||
pred_weight = w
|
||
if not is_confident:
|
||
is_confident = _star_confident_like_test_dgcnn(summary)
|
||
|
||
if result:
|
||
fid = int(fish_id) if fish_id.isdigit() else 1
|
||
for item in result:
|
||
item["id"] = fid
|
||
|
||
out_dir = root / first_svo.stem
|
||
lv, rv = _find_preview_videos(out_dir)
|
||
|
||
prefix = f"{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}_batch_{len(svo_paths)}svos"
|
||
v_left, v_right = _publish_media(lv, rv, settings, prefix)
|
||
|
||
names = data.get("svo_names")
|
||
if isinstance(names, list) and names:
|
||
batch_title = ", ".join(str(x) for x in names)
|
||
else:
|
||
batch_title = ", ".join(p.name for p in svo_paths)
|
||
calc_log = format_dgcnn_weight_calculation_log(
|
||
data, settings=settings, svo_display_name=batch_title
|
||
)
|
||
logger.info("[FishMeasure] weight calculation process (batch):\n{}", calc_log)
|
||
batch_extra: List[str] = [
|
||
f"# batch_svo_count: {len(svo_paths)}",
|
||
f"# batch_svos: {', '.join(p.name for p in svo_paths)}",
|
||
f"# fish_output_root: {root}",
|
||
]
|
||
len_mm_b = _length_mm_for_debug(result, summary, data)
|
||
_write_measure_calculation_debug_file(
|
||
settings,
|
||
file_stem=f"fish{fish_id}_batch_{len(svo_paths)}svos",
|
||
calculation_log=calc_log,
|
||
svo_path=first_svo,
|
||
weight_json_path=combined_json if combined_json.is_file() else None,
|
||
fish_id=str(fish_id),
|
||
pred_weight_g=pred_weight,
|
||
star_confident=is_confident,
|
||
length_mm=len_mm_b,
|
||
weight_data=data,
|
||
extra_header_lines=batch_extra,
|
||
)
|
||
|
||
logger.info(
|
||
"[FishMeasure] batch parsed fish_id={} {} SVOs\nresult ({} fish):\n{}\ndgcnn_summary:\n{}\npred_weight={} star={}",
|
||
fish_id,
|
||
len(svo_paths),
|
||
len(result),
|
||
format_json_pretty(result),
|
||
format_json_pretty(summary if summary else {}),
|
||
pred_weight,
|
||
is_confident,
|
||
)
|
||
|
||
return MeasureSnapshot(
|
||
result=result,
|
||
video_left=v_left,
|
||
video_right=v_right,
|
||
updated_at=datetime.now(timezone.utc),
|
||
raw_prediction_path=str(combined_json) if combined_json.is_file() else None,
|
||
pred=pred_weight,
|
||
star=is_confident,
|
||
calculation_log=calc_log,
|
||
)
|