Files
GigaMux/ffmpeg_distributed.py
T
2026-02-19 07:45:42 -07:00

687 lines
31 KiB
Python

#!/usr/bin/env python3
from queue import SimpleQueue, Empty
from subprocess import Popen, DEVNULL, PIPE, TimeoutExpired
import select
from shutil import rmtree
from glob import glob
from os import mkdir, unlink, listdir, environ
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
from threading import Thread
from time import sleep, time
from typing import List, NamedTuple, Callable, Union, Tuple
from signal import signal, SIGINT
from sys import exit, stderr, stdin, platform as sys_platform
from shlex import split, join
from hashlib import md5
from time import strptime
from tqdm import tqdm
import re
DEBUG = 'DEBUG' in environ
def dprint(*args, **kwargs):
if DEBUG:
print(*args, *kwargs)
def _popen(args, **kwargs):
dprint(f'calling subprocess: {args}')
kwargs['stderr'] = PIPE
kwargs['stdout'] = DEVNULL
kwargs['universal_newlines'] = True
return Popen(args, **kwargs)
def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]:
"""Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast).
If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed."""
cmd = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey',
'-show_entries', 'frame=pts_time', '-of', 'csv=p=0',
probe_path if (probe_host and probe_path) else input_file
]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, err = proc.communicate()
if proc.returncode != 0 and err:
dprint('ffprobe keyframes:', err)
out = out or ''
times = []
for line in out.strip().splitlines():
line = line.strip().split('=')[-1] if '=' in line else line.strip()
if not line:
continue
try:
times.append(float(line))
except ValueError:
pass
return sorted(times)
def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
"""Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there."""
path = probe_path if (probe_host and probe_path) else input_file
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
try:
if out:
return float(out)
except ValueError:
pass
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
try:
if out:
return float(out)
except ValueError:
pass
return 0.0
def _get_fps(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
"""Get video stream frame rate (e.g. 60.0) via ffprobe. If probe_host set, run there. Default 60.0 on failure."""
path = probe_path if (probe_host and probe_path) else input_file
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
if not out:
return 60.0
try:
if '/' in out:
num, den = out.split('/', 1)
return float(num) / float(den) if float(den) else 60.0
return float(out)
except (ValueError, ZeroDivisionError):
return 60.0
def _probe_duration(path: str) -> float:
"""Return duration in seconds from ffprobe, or 0.0 on failure."""
proc = Popen(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path],
stdout=PIPE, stderr=PIPE, universal_newlines=True
)
out, _ = proc.communicate()
if proc.returncode != 0:
return 0.0
out = (out or '').strip()
try:
return float(out)
except ValueError:
return 0.0
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
duration = _probe_duration(segment_path)
return duration > 0 and abs(duration - expected_duration_sec) <= tolerance_sec
def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:
"""Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds."""
if total_duration <= 0:
return []
if not keyframe_times or len(keyframe_times) == 1:
# No keyframes: split at fixed intervals so we still get multiple segments
segments = []
start = 0.0
while start < total_duration:
end = min(start + segment_seconds, total_duration)
segments.append((start, end))
start = end
return segments
segments = []
i = 0
while i < len(keyframe_times):
start = keyframe_times[i]
end_target = start + segment_seconds
j = i + 1
while j < len(keyframe_times) and keyframe_times[j] < end_target:
j += 1
if j < len(keyframe_times):
end = keyframe_times[j]
else:
end = total_duration
segments.append((start, end))
i = j
return segments
class Task(NamedTuple):
start_sec: float
duration_sec: float
output_file: str
ffmpeg_args: List[str]
fps: float = 60.0
class FFMPEGProc:
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
_progress_re = re.compile(r'frame=\s*(?P<frame>[0-9]+)\s+fps=\s*(?P<fps>[0-9]+).*time=-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9,.]+)\s+.*speed=(?P<speed>[0-9\.]+)x')
@staticmethod
def _match_to_sec(match):
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False):
self._cmd = cmd
self._update_callback = update_callback
self._should_stop = False
self._shell = shell
self._duration = None
self._stdin = stdin
self._stdout = stdout
self._binary_io = binary_io
self._echo_stderr = echo_stderr
self.stderr = ''
def stop(self):
self._should_stop = True
if getattr(self, '_proc', None) is not None and self._proc.poll() is None:
try:
self._proc.terminate()
except OSError:
pass
def _read_stderr_loop(self, stderr_lines: list):
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
while True:
raw = self._proc.stderr.readline()
if not raw and self._proc.poll() is not None:
break
if raw:
line = raw.decode(errors='replace') if self._binary_io else raw
stderr_lines.append(line)
if self._echo_stderr:
tqdm.write(line.rstrip(), file=stderr)
match = self._progress_re.match(line)
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed')),
)
elif self._duration is None:
dm = self._duration_re.match(line)
if dm:
self._duration = self._match_to_sec(dm)
def run(self):
self._proc = Popen(
self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout,
universal_newlines=not self._binary_io
)
if getattr(select, 'poll', None):
poll = select.poll()
poll.register(self._proc.stderr)
while self._proc.poll() is None and not self._should_stop:
if not poll.poll(1):
sleep(0.1)
continue
sleep(0.001)
raw = self._proc.stderr.readline()
line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '')
if self._echo_stderr and line:
tqdm.write(line.rstrip(), file=stderr)
match = self._progress_re.match(line)
if not match:
self.stderr += line
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed')),
)
elif self._duration is None:
match = self._duration_re.match(line)
if match:
self._duration = self._match_to_sec(match)
try:
_, err = self._proc.communicate(timeout=1)
if err and self._binary_io:
err = err.decode(errors='replace') if isinstance(err, bytes) else err
self.stderr += err or ''
except TimeoutExpired:
pass
else:
stderr_lines = []
reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True)
reader.start()
while self._proc.poll() is None and not self._should_stop:
sleep(0.2)
reader.join(timeout=2)
self.stderr = ''.join(stderr_lines)
return self._proc.returncode
class TqdmAbsolute(tqdm):
def __init__(self, *args, **kwargs):
kwargs['bar_format'] = '{l_bar}{bar}|{n:.1f}/{total:.1f} [{elapsed}<{remaining}]'
kwargs['dynamic_ncols'] = True
if not 'total' in kwargs:
kwargs['total'] = 99999999
if not 'leave' in kwargs:
kwargs['leave'] = False
super().__init__(*args, **kwargs)
def update(self, to):
super().update(to - self.n) # will also set self.n = b * bsize
HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red
RESET = '\033[0m'
SEGMENT_STALL_TIMEOUT = int(environ.get('SEGMENT_STALL_TIMEOUT', '300')) # no encoder progress for this many sec -> kill and re-queue
class TaskThread(Thread):
def __init__(self, host: str, gpu_id: int, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None):
super().__init__()
self._should_stop = False
self._host = host
self._gpu_id = gpu_id
self._host_desc = f"{host}:gpu{gpu_id}"
self._bar_pos = bar_pos
self._remote_ffmpeg_path = remote_ffmpeg_path
self._source_file = source_file
self._task_queue = task_queue
self._ffmpeg = None
self._bar = TqdmAbsolute(desc=self._host_desc, position=bar_pos)
self._current_file = None
def _host_tag(self):
c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)]
return f'{c}{self._host_desc}{RESET}'
def stop(self):
self._should_stop = True
rp = getattr(self, '_reader_proc', None)
if rp is not None and rp.poll() is None:
try:
if rp.stdin is not None:
rp.stdin.write(b'q')
rp.stdin.flush()
sleep(0.5)
except (OSError, BrokenPipeError):
pass
try:
if rp.poll() is None:
rp.terminate()
except OSError:
pass
if self._ffmpeg:
self._ffmpeg.stop()
def run(self):
last_log = [0.0] # mutable for progress heartbeat
last_progress = [time()] # for stall detection
last_encoder_t = [0.0] # last encoder progress position (sec) for stall log
segment_start_time = [0.0] # when current segment started
first_progress_logged = [False] # one-time "first frame" log per segment
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes') or environ.get('DISTRIBUTED_DEBUG', '').lower() in ('1', 'true', 'yes')
def upd(frames, fps, t, duration, speed):
now = time()
last_progress[0] = now
last_encoder_t[0] = t
self._bar.total = duration or 999
self._bar.desc = self._host_desc + ': ' + (self._current_file or '')
self._bar.update(t)
if not first_progress_logged[0] and (frames > 0 or t > 0):
elapsed = now - segment_start_time[0]
tqdm.write(f' {self._host_tag()}: {self._current_file} first frame after {elapsed:.1f}s', file=stderr)
stderr.flush()
first_progress_logged[0] = True
if duration and duration > 0 and (now - last_log[0]) >= 30:
tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr)
stderr.flush()
last_log[0] = now
try:
while not self._should_stop:
task = self._task_queue.get(False)
self._current_file = basename(task.output_file)
last_progress[0] = time()
segment_start_time[0] = last_progress[0]
first_progress_logged[0] = False
last_encoder_t[0] = 0.0
n_frames = round(task.duration_sec * task.fps)
# Default fast_seek=True: -ss before -i (low RAM, quick). Set READER_FAST_SEEK=0 for frame-accurate (high RAM, slow for late segments).
fast_seek = environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no')
tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s, n_frames={n_frames}, fast_seek={fast_seek})', file=stderr)
stderr.flush()
# READER_FAST_SEEK=1: -ss before -i (keyframe seek, low RAM, no decode from 0). Else -i then -ss (frame-accurate but decodes 0..start = high RAM for late segments).
if fast_seek:
reader_cmd = [
'ffmpeg', '-ss', str(task.start_sec), '-noaccurate_seek', '-i', self._source_file,
'-frames:v', str(n_frames), '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
]
else:
reader_cmd = [
'ffmpeg', '-i', self._source_file,
'-ss', str(task.start_sec), '-frames:v', str(n_frames),
'-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
]
ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg'
# -r only: lock output to CFR; do not use -frames:v or segments get truncated when reader sends fewer frames
encoder_cmd = [
ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:',
'-gpu', str(self._gpu_id),
'-r', str(task.fps),
*task.ffmpeg_args,
'-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1'
]
if self._host == 'localhost' and sys_platform != 'win32':
encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd
if self._host != 'localhost':
encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)]
reader_stderr_lines = []
if verbose:
self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
def read_reader_stderr():
rp = getattr(self, '_reader_proc', None)
if rp and rp.stderr:
for line in iter(rp.stderr.readline, b''):
reader_stderr_lines.append(line.decode(errors='replace').rstrip())
Thread(target=read_reader_stderr, daemon=True).start()
else:
self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL)
ret = -1
stall_done = [False] # watchdog sets True when it kills
def stall_watchdog():
if SEGMENT_STALL_TIMEOUT <= 0:
return
while not self._should_stop and not stall_done[0]:
sleep(60)
if stall_done[0] or self._should_stop:
break
if time() - last_progress[0] > SEGMENT_STALL_TIMEOUT:
ago = time() - last_progress[0]
enc_t = last_encoder_t[0]
tqdm.write(f' {self._host_tag()}: STALLED {self._current_file} (no progress {ago:.0f}s; last encoder at {enc_t:.1f}s into segment)', file=stderr)
if not fast_seek and task.start_sec > 60:
tqdm.write(f' {self._host_tag()}: (reader -ss {task.start_sec:.0f} after -i may still be decoding 0..{task.start_sec:.0f}s from source)', file=stderr)
tqdm.write(f' {self._host_tag()}: killing and re-queuing', file=stderr)
stderr.flush()
stall_done[0] = True
self.stop()
break
proc = getattr(self._ffmpeg, '_proc', None) if self._ffmpeg else None
if proc is not None and proc.poll() is not None:
break
watchdog = Thread(target=stall_watchdog, daemon=True)
watchdog.start()
try:
with open(task.output_file, 'wb') as outfile:
self._ffmpeg = FFMPEGProc(
encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile,
update_callback=upd, binary_io=True
)
ret = self._ffmpeg.run()
finally:
stall_done[0] = True
watchdog.join(timeout=2)
rp = getattr(self, '_reader_proc', None)
if rp is not None:
try:
rp.wait(timeout=2)
except TimeoutExpired:
rp.terminate()
rp.wait(timeout=5)
self._reader_proc = None
if ret != 0:
try:
if isfile(task.output_file):
unlink(task.output_file)
except OSError:
pass
print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True)
if self._ffmpeg.stderr:
print(self._ffmpeg.stderr, file=stderr, end='', flush=True)
if reader_stderr_lines:
tqdm.write(f' {self._host_tag()}: Reader stderr:', file=stderr)
for line in reader_stderr_lines[-50:]: # last 50 lines
tqdm.write(f' {line}', file=stderr)
stderr.flush()
self._task_queue.put(task)
else:
tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr)
stderr.flush()
except Empty:
pass
self._bar.close()
def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None):
input_file = abspath(expanduser(input_file))
output_file = abspath(expanduser(output_file))
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
try:
mkdir(tmp_dir)
except FileExistsError:
pass # previous job: resume and re-queue failed segments
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes')
tqdm.write('[1/4] Probing keyframes and duration...', file=stderr)
stderr.flush()
keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path)
total_duration = _get_duration_seconds(input_file, probe_host, probe_path)
fps = _get_fps(input_file, probe_host, probe_path)
segments = _build_segments(keyframe_times, segment_seconds, total_duration)
if not segments:
tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)
return False
MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode
DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py
task_queue = SimpleQueue()
removed = 0
for i, (start_sec, end_sec) in enumerate(segments):
duration_sec = end_sec - start_sec
output_path = f'{tmp_dir}/{i:08d}.mp4'
if isfile(output_path):
try:
if getsize(output_path) < MIN_SEGMENT_BYTES:
unlink(output_path)
removed += 1
elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC):
unlink(output_path)
removed += 1
else:
continue
except OSError:
pass
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args), fps))
n_tasks = task_queue.qsize()
if removed:
tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr)
if n_tasks == 0:
tqdm.write('All segments already done (resume).', file=stderr)
else:
tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr)
stderr.flush()
dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued')
tqdm.write(f'[3/4] Encoding segments on {len(workers)} worker(s)...', file=stderr)
if environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no'):
tqdm.write(' (Reader: fast seek -ss before -i; set READER_FAST_SEEK=0 for frame-accurate)', file=stderr)
stderr.flush()
threads = [TaskThread(host, gpu_id, input_file, task_queue, pos, remote_ffmpeg_path) for pos, (host, gpu_id) in enumerate(workers, 0)]
def stop_all():
tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr)
stderr.flush()
for thread in threads:
thread.stop()
for thread in threads:
thread.join(timeout=5)
exit(1)
stop_file = abspath('.encode_stop')
def stdin_stop_listener():
try:
while True:
line = stdin.readline()
if not line:
break
if line.strip().lower() == 'stop':
stop_all()
break
except (OSError, EOFError):
pass
def stop_file_poller():
try:
while True:
sleep(1.5)
if isfile(stop_file):
try:
unlink(stop_file)
except OSError:
pass
stop_all()
break
except OSError:
pass
def sigint(sig, stack):
stop_all()
try:
signal(SIGINT, sigint)
except (ValueError, OSError):
pass # SIGINT not available on this platform
if getattr(stdin, 'isatty', lambda: False)():
tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
else:
tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
stderr.flush()
if getattr(stdin, 'isatty', lambda: False)():
Thread(target=stdin_stop_listener, daemon=True).start()
Thread(target=stop_file_poller, daemon=True).start()
for thread in threads:
thread.start()
try:
for thread in threads:
thread.join()
except KeyboardInterrupt:
stop_all()
list_path = f'{tmp_dir}/output_segments.txt'
segment_files = sorted(glob(f'{tmp_dir}/*.mp4'))
expected_count = len(segments)
if len(segment_files) < expected_count:
tqdm.write(f'[4/4] ERROR: Only {len(segment_files)} of {expected_count} segments produced. Missing segments (check for FAILED lines above; fix or exclude failing worker and re-run).', file=stderr)
stderr.flush()
return False
with open(list_path, 'w') as f:
f.write('\n'.join([f"file '{fpath}'" for fpath in segment_files]))
CONCAT_DURATION_TOLERANCE = 0.1
segments_total = sum(_probe_duration(p) for p in segment_files)
duration_diff = segments_total - total_duration
if abs(duration_diff) > CONCAT_DURATION_TOLERANCE:
if duration_diff < -1.0:
tqdm.write(f'[4/4] WARNING: Segment total ({segments_total:.2f}s) is {abs(duration_diff):.2f}s shorter than source ({total_duration:.2f}s). Proceeding with concat (output capped to source length).', file=stderr)
stderr.flush()
# exit(1) # re-enable for hard fail when segment setup is wrong
tqdm.write(f'[4/4] WARNING: Segment total duration ({segments_total:.2f}s) differs from source ({total_duration:.2f}s) by {duration_diff:+.2f}s. Output will be capped to source length; it may not be a frame-accurate mirror.', file=stderr)
stderr.flush()
tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr)
concat_extra = ['-stats_period', '5'] if verbose else []
concat_cmd = [
'ffmpeg', *concat_extra, '-i', input_file,
'-f', 'concat', '-safe', '0', '-i', list_path,
'-map_metadata', '0:g',
'-map', '1:v', '-map', '0:a?', '-map', '0:s?',
'-c:v', 'copy', '-c:s', 'copy',
*split(concat_args),
'-t', str(total_duration),
'-y', output_file
]
tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr)
with TqdmAbsolute(desc='concatenating output segments') as bar:
def upd(frames, fps, time, duration, speed):
bar.total = duration
bar.update(time)
ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose)
if ffmpeg.run() != 0:
tqdm.write(ffmpeg.stderr, file=stderr)
return False
unlink(list_path)
if not keep_tmp:
rmtree(tmp_dir)
return True
def _parse_workers(host_specs):
"""Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...]."""
workers = []
for spec in host_specs or []:
spec = (spec or "").strip()
if not spec:
continue
if ":" in spec:
host, gpu = spec.rsplit(":", 1)
try:
workers.append((host.strip(), int(gpu.strip())))
except ValueError:
workers.append((spec, 0))
else:
workers.append((spec, 0))
return workers
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple workers (host:gpu) in parallel using ffmpeg over SSH.')
parser.add_argument('input_file', help='File to encode.')
parser.add_argument('output_file', help='Path to encoded output file.')
parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances (e.g. NVENC: -c:v hevc_nvenc -preset p7 ...). -gpu is added per worker.')
parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata.')
parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.')
parser.add_argument('-H', '--host', action='append', help='Worker as host or host:gpu (e.g. -H Pyro:0 -H RenderScrap:0 -H RenderScrap:1).', required=True)
parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.')
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.')
parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).')
args = parser.parse_args()
workers = _parse_workers(args.host)
if not workers:
parser.error('At least one worker required (e.g. -H Pyro:0 -H RenderScrap:0)')
encode(
workers,
args.input_file,
args.output_file,
segment_seconds=args.segment_length,
remote_args=args.remote_args,
concat_args=args.concat_args,
tmp_dir=args.tmp_dir,
keep_tmp=args.keep_tmp,
resume=args.resume,
copy_input=args.copy_input,
probe_host=args.probe_host,
probe_path=args.probe_path
)