getting closer with sync issues

This commit is contained in:
2026-02-16 18:32:05 -07:00
parent 7b48b709ce
commit 0201d3e8bd
3 changed files with 2145 additions and 17 deletions
+65 -14
View File
@@ -86,21 +86,46 @@ def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: s
return 0.0
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
def _get_fps(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
"""Get video stream frame rate (e.g. 60.0) via ffprobe. If probe_host set, run there. Default 60.0 on failure."""
path = probe_path if (probe_host and probe_path) else input_file
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
if not out:
return 60.0
try:
if '/' in out:
num, den = out.split('/', 1)
return float(num) / float(den) if float(den) else 60.0
return float(out)
except (ValueError, ZeroDivisionError):
return 60.0
def _probe_duration(path: str) -> float:
"""Return duration in seconds from ffprobe, or 0.0 on failure."""
proc = Popen(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', segment_path],
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path],
stdout=PIPE, stderr=PIPE, universal_newlines=True
)
out, _ = proc.communicate()
if proc.returncode != 0:
return False
return 0.0
out = (out or '').strip()
try:
duration = float(out)
return float(out)
except ValueError:
return False
return abs(duration - expected_duration_sec) <= tolerance_sec
return 0.0
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
duration = _probe_duration(segment_path)
return duration > 0 and abs(duration - expected_duration_sec) <= tolerance_sec
def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:
@@ -138,6 +163,7 @@ class Task(NamedTuple):
duration_sec: float
output_file: str
ffmpeg_args: List[str]
fps: float = 60.0
class FFMPEGProc:
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
@@ -302,14 +328,19 @@ class TaskThread(Thread):
self._current_file = basename(task.output_file)
tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s)', file=stderr)
stderr.flush()
# -i then -ss then -frames:v so we send exactly N frames (avoids VFR/source timestamps giving short segments)
n_frames = round(task.duration_sec * task.fps)
reader_cmd = [
'ffmpeg', '-ss', str(task.start_sec), '-t', str(task.duration_sec),
'-i', self._source_file, '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
'ffmpeg', '-i', self._source_file,
'-ss', str(task.start_sec), '-frames:v', str(n_frames),
'-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
]
ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg'
# -r only: lock output to CFR; do not use -frames:v or segments get truncated when reader sends fewer frames
encoder_cmd = [
ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:',
'-gpu', str(self._gpu_id),
'-r', str(task.fps),
*task.ffmpeg_args,
'-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1'
]
@@ -368,11 +399,12 @@ def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, se
stderr.flush()
keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path)
total_duration = _get_duration_seconds(input_file, probe_host, probe_path)
fps = _get_fps(input_file, probe_host, probe_path)
segments = _build_segments(keyframe_times, segment_seconds, total_duration)
if not segments:
tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)
return
return False
MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode
DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py
@@ -393,7 +425,7 @@ def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, se
continue
except OSError:
pass
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args)))
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args), fps))
n_tasks = task_queue.qsize()
if removed:
@@ -472,8 +504,26 @@ def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, se
stop_all()
list_path = f'{tmp_dir}/output_segments.txt'
segment_files = sorted(glob(f'{tmp_dir}/*.mp4'))
expected_count = len(segments)
if len(segment_files) < expected_count:
tqdm.write(f'[4/4] ERROR: Only {len(segment_files)} of {expected_count} segments produced. Missing segments (check for FAILED lines above; fix or exclude failing worker and re-run).', file=stderr)
stderr.flush()
return False
with open(list_path, 'w') as f:
f.write('\n'.join([f"file '{fpath}'" for fpath in sorted(glob(f'{tmp_dir}/*.mp4'))]))
f.write('\n'.join([f"file '{fpath}'" for fpath in segment_files]))
CONCAT_DURATION_TOLERANCE = 0.1
segments_total = sum(_probe_duration(p) for p in segment_files)
duration_diff = segments_total - total_duration
if abs(duration_diff) > CONCAT_DURATION_TOLERANCE:
if duration_diff < -1.0:
tqdm.write(f'[4/4] WARNING: Segment total ({segments_total:.2f}s) is {abs(duration_diff):.2f}s shorter than source ({total_duration:.2f}s). Proceeding with concat (output capped to source length).', file=stderr)
stderr.flush()
# exit(1) # re-enable for hard fail when segment setup is wrong
tqdm.write(f'[4/4] WARNING: Segment total duration ({segments_total:.2f}s) differs from source ({total_duration:.2f}s) by {duration_diff:+.2f}s. Output will be capped to source length; it may not be a frame-accurate mirror.', file=stderr)
stderr.flush()
tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr)
concat_extra = ['-stats_period', '5'] if verbose else []
@@ -493,13 +543,14 @@ def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, se
bar.total = duration
bar.update(time)
ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose)
if ffmpeg.run() != 0:
if ffmpeg.run() != 0:
tqdm.write(ffmpeg.stderr, file=stderr)
return
return False
unlink(list_path)
if not keep_tmp:
rmtree(tmp_dir)
return True
def _parse_workers(host_specs):
"""Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...]."""