#!/usr/bin/env python3 from queue import SimpleQueue, Empty from subprocess import Popen, DEVNULL, PIPE, TimeoutExpired import select from shutil import rmtree from glob import glob from os import mkdir, unlink, listdir, environ from os.path import basename, abspath, expanduser, isfile, isdir, getsize from threading import Thread from time import sleep, time from typing import List, NamedTuple, Callable, Union, Tuple from signal import signal, SIGINT from sys import exit, stderr, stdin, platform as sys_platform from shlex import split, join from hashlib import md5 from time import strptime from tqdm import tqdm import re DEBUG = 'DEBUG' in environ def dprint(*args, **kwargs): if DEBUG: print(*args, *kwargs) def _popen(args, **kwargs): dprint(f'calling subprocess: {args}') kwargs['stderr'] = PIPE kwargs['stdout'] = DEVNULL kwargs['universal_newlines'] = True return Popen(args, **kwargs) def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]: """Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast). If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed.""" cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey', '-show_entries', 'frame=pts_time', '-of', 'csv=p=0', probe_path if (probe_host and probe_path) else input_file ] if probe_host and probe_path: cmd = ['ssh', probe_host, join(cmd)] proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, err = proc.communicate() if proc.returncode != 0 and err: dprint('ffprobe keyframes:', err) out = out or '' times = [] for line in out.strip().splitlines(): line = line.strip().split('=')[-1] if '=' in line else line.strip() if not line: continue try: times.append(float(line)) except ValueError: pass return sorted(times) def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float: """Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there.""" path = probe_path if (probe_host and probe_path) else input_file cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path] if probe_host and probe_path: cmd = ['ssh', probe_host, join(cmd)] proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, _ = proc.communicate() out = (out or '').strip() try: if out: return float(out) except ValueError: pass cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path] if probe_host and probe_path: cmd = ['ssh', probe_host, join(cmd)] proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) out, _ = proc.communicate() out = (out or '').strip() try: if out: return float(out) except ValueError: pass return 0.0 def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool: """Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py).""" proc = Popen( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', segment_path], stdout=PIPE, stderr=PIPE, universal_newlines=True ) out, _ = proc.communicate() if proc.returncode != 0: return False out = (out or '').strip() try: duration = float(out) except ValueError: return False return abs(duration - expected_duration_sec) <= tolerance_sec def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]: """Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds.""" if total_duration <= 0: return [] if not keyframe_times or len(keyframe_times) == 1: # No keyframes: split at fixed intervals so we still get multiple segments segments = [] start = 0.0 while start < total_duration: end = min(start + segment_seconds, total_duration) segments.append((start, end)) start = end return segments segments = [] i = 0 while i < len(keyframe_times): start = keyframe_times[i] end_target = start + segment_seconds j = i + 1 while j < len(keyframe_times) and keyframe_times[j] < end_target: j += 1 if j < len(keyframe_times): end = keyframe_times[j] else: end = total_duration segments.append((start, end)) i = j return segments class Task(NamedTuple): start_sec: float duration_sec: float output_file: str ffmpeg_args: List[str] class FFMPEGProc: _duration_re = re.compile(r'.*Duration:\s*-?(?P[0-9]+):(?P[0-9]+):(?P[0-9.]+),') _progress_re = re.compile(r'frame=\s*(?P[0-9]+)\s+fps=\s*(?P[0-9]+).*time=-?(?P[0-9]+):(?P[0-9]+):(?P[0-9,.]+)\s+.*speed=(?P[0-9\.]+)x') @staticmethod def _match_to_sec(match): return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s')) def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False): self._cmd = cmd self._update_callback = update_callback self._should_stop = False self._shell = shell self._duration = None self._stdin = stdin self._stdout = stdout self._binary_io = binary_io self._echo_stderr = echo_stderr self.stderr = '' def stop(self): self._should_stop = True if getattr(self, '_proc', None) is not None and self._proc.poll() is None: try: self._proc.terminate() except OSError: pass def _read_stderr_loop(self, stderr_lines: list): """Read stderr in a loop (used on Windows where select.poll is unavailable).""" while True: raw = self._proc.stderr.readline() if not raw and self._proc.poll() is not None: break if raw: line = raw.decode(errors='replace') if self._binary_io else raw stderr_lines.append(line) if self._echo_stderr: tqdm.write(line.rstrip(), file=stderr) match = self._progress_re.match(line) if match and self._update_callback: self._update_callback( int(match.group('frame')), int(match.group('fps')), self._match_to_sec(match), self._duration, float(match.group('speed')), ) elif self._duration is None: dm = self._duration_re.match(line) if dm: self._duration = self._match_to_sec(dm) def run(self): self._proc = Popen( self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=not self._binary_io ) if getattr(select, 'poll', None): poll = select.poll() poll.register(self._proc.stderr) while self._proc.poll() is None and not self._should_stop: if not poll.poll(1): sleep(0.1) continue sleep(0.001) raw = self._proc.stderr.readline() line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '') if self._echo_stderr and line: tqdm.write(line.rstrip(), file=stderr) match = self._progress_re.match(line) if not match: self.stderr += line if match and self._update_callback: self._update_callback( int(match.group('frame')), int(match.group('fps')), self._match_to_sec(match), self._duration, float(match.group('speed')), ) elif self._duration is None: match = self._duration_re.match(line) if match: self._duration = self._match_to_sec(match) try: _, err = self._proc.communicate(timeout=1) if err and self._binary_io: err = err.decode(errors='replace') if isinstance(err, bytes) else err self.stderr += err or '' except TimeoutExpired: pass else: stderr_lines = [] reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True) reader.start() while self._proc.poll() is None and not self._should_stop: sleep(0.2) reader.join(timeout=2) self.stderr = ''.join(stderr_lines) return self._proc.returncode class TqdmAbsolute(tqdm): def __init__(self, *args, **kwargs): kwargs['bar_format'] = '{l_bar}{bar}|{n:.1f}/{total:.1f} [{elapsed}<{remaining}]' kwargs['dynamic_ncols'] = True if not 'total' in kwargs: kwargs['total'] = 99999999 if not 'leave' in kwargs: kwargs['leave'] = False super().__init__(*args, **kwargs) def update(self, to): super().update(to - self.n) # will also set self.n = b * bsize HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red RESET = '\033[0m' class TaskThread(Thread): def __init__(self, host: str, gpu_id: int, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None): super().__init__() self._should_stop = False self._host = host self._gpu_id = gpu_id self._host_desc = f"{host}:gpu{gpu_id}" self._bar_pos = bar_pos self._remote_ffmpeg_path = remote_ffmpeg_path self._source_file = source_file self._task_queue = task_queue self._ffmpeg = None self._bar = TqdmAbsolute(desc=self._host_desc, position=bar_pos) self._current_file = None def _host_tag(self): c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)] return f'{c}{self._host_desc}{RESET}' def stop(self): self._should_stop = True if getattr(self, '_reader_proc', None) is not None and self._reader_proc.poll() is None: try: self._reader_proc.terminate() except OSError: pass if self._ffmpeg: self._ffmpeg.stop() def run(self): last_log = [0.0] # mutable for progress heartbeat def upd(frames, fps, t, duration, speed): self._bar.total = duration or 999 self._bar.desc = self._host_desc + ': ' + (self._current_file or '') self._bar.update(t) if duration and duration > 0 and (time() - last_log[0]) >= 30: tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr) stderr.flush() last_log[0] = time() try: while not self._should_stop: task = self._task_queue.get(False) self._current_file = basename(task.output_file) tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s)', file=stderr) stderr.flush() reader_cmd = [ 'ffmpeg', '-ss', str(task.start_sec), '-t', str(task.duration_sec), '-i', self._source_file, '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1' ] ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg' encoder_cmd = [ ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:', '-gpu', str(self._gpu_id), *task.ffmpeg_args, '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1' ] if self._host == 'localhost' and sys_platform != 'win32': encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd if self._host != 'localhost': encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)] self._reader_proc = Popen(reader_cmd, stdout=PIPE, stderr=DEVNULL) ret = -1 try: with open(task.output_file, 'wb') as outfile: self._ffmpeg = FFMPEGProc( encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile, update_callback=upd, binary_io=True ) ret = self._ffmpeg.run() finally: rp = getattr(self, '_reader_proc', None) if rp is not None: try: rp.wait(timeout=2) except TimeoutExpired: rp.terminate() rp.wait(timeout=5) self._reader_proc = None if ret != 0: try: if isfile(task.output_file): unlink(task.output_file) except OSError: pass print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True) if self._ffmpeg.stderr: print(self._ffmpeg.stderr, file=stderr, end='', flush=True) self._task_queue.put(task) else: tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr) stderr.flush() except Empty: pass self._bar.close() def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None): input_file = abspath(expanduser(input_file)) output_file = abspath(expanduser(output_file)) tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest() try: mkdir(tmp_dir) except FileExistsError: pass # previous job: resume and re-queue failed segments verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes') tqdm.write('[1/4] Probing keyframes and duration...', file=stderr) stderr.flush() keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path) total_duration = _get_duration_seconds(input_file, probe_host, probe_path) segments = _build_segments(keyframe_times, segment_seconds, total_duration) if not segments: tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr) return MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py task_queue = SimpleQueue() removed = 0 for i, (start_sec, end_sec) in enumerate(segments): duration_sec = end_sec - start_sec output_path = f'{tmp_dir}/{i:08d}.mp4' if isfile(output_path): try: if getsize(output_path) < MIN_SEGMENT_BYTES: unlink(output_path) removed += 1 elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC): unlink(output_path) removed += 1 else: continue except OSError: pass task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args))) n_tasks = task_queue.qsize() if removed: tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr) if n_tasks == 0: tqdm.write('All segments already done (resume).', file=stderr) else: tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr) stderr.flush() dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued') tqdm.write(f'[3/4] Encoding segments on {len(workers)} worker(s)...', file=stderr) stderr.flush() threads = [TaskThread(host, gpu_id, input_file, task_queue, pos, remote_ffmpeg_path) for pos, (host, gpu_id) in enumerate(workers, 0)] def stop_all(): tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr) stderr.flush() for thread in threads: thread.stop() for thread in threads: thread.join(timeout=5) exit(1) stop_file = abspath('.encode_stop') def stdin_stop_listener(): try: while True: line = stdin.readline() if not line: break if line.strip().lower() == 'stop': stop_all() break except (OSError, EOFError): pass def stop_file_poller(): try: while True: sleep(1.5) if isfile(stop_file): try: unlink(stop_file) except OSError: pass stop_all() break except OSError: pass def sigint(sig, stack): stop_all() try: signal(SIGINT, sigint) except (ValueError, OSError): pass # SIGINT not available on this platform if getattr(stdin, 'isatty', lambda: False)(): tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr) else: tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr) stderr.flush() if getattr(stdin, 'isatty', lambda: False)(): Thread(target=stdin_stop_listener, daemon=True).start() Thread(target=stop_file_poller, daemon=True).start() for thread in threads: thread.start() try: for thread in threads: thread.join() except KeyboardInterrupt: stop_all() list_path = f'{tmp_dir}/output_segments.txt' with open(list_path, 'w') as f: f.write('\n'.join([f"file '{fpath}'" for fpath in sorted(glob(f'{tmp_dir}/*.mp4'))])) tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr) concat_extra = ['-stats_period', '5'] if verbose else [] concat_cmd = [ 'ffmpeg', *concat_extra, '-i', input_file, '-f', 'concat', '-safe', '0', '-i', list_path, '-map_metadata', '0:g', '-map', '1:v', '-map', '0:a?', '-map', '0:s?', '-c:v', 'copy', '-c:s', 'copy', *split(concat_args), '-y', output_file ] tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr) with TqdmAbsolute(desc='concatenating output segments') as bar: def upd(frames, fps, time, duration, speed): bar.total = duration bar.update(time) ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose) if ffmpeg.run() != 0: tqdm.write(ffmpeg.stderr, file=stderr) return unlink(list_path) if not keep_tmp: rmtree(tmp_dir) def _parse_workers(host_specs): """Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...].""" workers = [] for spec in host_specs or []: spec = (spec or "").strip() if not spec: continue if ":" in spec: host, gpu = spec.rsplit(":", 1) try: workers.append((host.strip(), int(gpu.strip()))) except ValueError: workers.append((spec, 0)) else: workers.append((spec, 0)) return workers if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple workers (host:gpu) in parallel using ffmpeg over SSH.') parser.add_argument('input_file', help='File to encode.') parser.add_argument('output_file', help='Path to encoded output file.') parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances (e.g. NVENC: -c:v hevc_nvenc -preset p7 ...). -gpu is added per worker.') parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata.') parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.') parser.add_argument('-H', '--host', action='append', help='Worker as host or host:gpu (e.g. -H Pyro:0 -H RenderScrap:0 -H RenderScrap:1).', required=True) parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.') parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.') parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.') parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)') parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.') parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).') args = parser.parse_args() workers = _parse_workers(args.host) if not workers: parser.error('At least one worker required (e.g. -H Pyro:0 -H RenderScrap:0)') encode( workers, args.input_file, args.output_file, segment_seconds=args.segment_length, remote_args=args.remote_args, concat_args=args.concat_args, tmp_dir=args.tmp_dir, keep_tmp=args.keep_tmp, resume=args.resume, copy_input=args.copy_input, probe_host=args.probe_host, probe_path=args.probe_path )