Compare commits

...

5 Commits

Author SHA1 Message Date
Raincloud 7b48b709ce cap concat output 2026-02-16 00:53:48 -07:00
Raincloud 70684f5644 swap back to nvenc/HEVC and redefine hosts accordingly 2026-02-16 00:37:04 -07:00
Raincloud dc7b224005 adjust stopping behavior 2026-02-16 00:09:06 -07:00
Raincloud a76bb1e1e0 ffmpeg dist working with av1 2026-02-15 23:52:47 -07:00
Raincloud 0f29c0c457 ffmpeg dist setup 2026-02-15 17:26:55 -07:00
5 changed files with 16932 additions and 191 deletions
+5
View File
@@ -7,6 +7,7 @@ logs/*
# Encoded files
input/*
output/*
tmp/*
# Video files
*.mp4
*.mkv
@@ -19,3 +20,7 @@ output/*
*.ts
*.m2ts
*.mts
# Python cache files
__pycache__/
*.pyc
File diff suppressed because one or more lines are too long
+97 -9
View File
@@ -1,6 +1,8 @@
import hashlib
import os
import re
import subprocess
import sys
from pathlib import Path
import json
import logging
@@ -8,6 +10,35 @@ from datetime import datetime
import shutil
import time
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
# Workers = (ssh_host, gpu_index). Unraid (GuiltsCurse, Godzilla) excluded; RenderScrap has 2 GPUs.
DISTRIBUTED_WORKERS_DEFAULT = [
("Pyro", 0),
("RenderScrap", 0),
("RenderScrap", 1),
("PostIrony", 0),
]
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v hevc_nvenc -preset p7 -tune hq -rc vbr -rc-lookahead 32 -spatial-aq 1 -aq-strength 15 -cq 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -an"
DISTRIBUTED_SEGMENT_SECONDS = 60
def _parse_workers_env(s):
"""Parse DISTRIBUTED_WORKERS e.g. 'Pyro:0,RenderScrap:0,RenderScrap:1,PostIrony:0' -> [(host, gpu_id), ...]."""
out = []
for part in (s or "").strip().split(","):
part = part.strip()
if not part:
continue
if ":" in part:
host, gpu = part.rsplit(":", 1)
try:
out.append((host.strip(), int(gpu.strip())))
except ValueError:
pass
else:
out.append((part, 0))
return out
# ANSI color codes
class Colors:
PURPLE = '\033[95m'
@@ -334,26 +365,83 @@ def encode_dvr(input_file, output_dir, gpu):
safe_log_error(f"Unexpected error encoding {input_path}: {type(e).__name__}: {e}",
f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}")
def encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=60, remote_args=None, concat_args="-c:a copy", probe_host=None, probe_path=None, remote_ffmpeg_path=None):
"""Encode one file using ffmpeg_distributed (split -> farm -> concat). workers = [(host, gpu_id), ...].
Segment temp dirs go under script dir/tmp/. If probe_host and probe_path are set, ffprobe runs there (faster when input is on NAS)."""
input_path = Path(input_file).resolve()
output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve()
if output_path.exists():
safe_log_info(f"Skipping {input_path} - output already exists: {output_path}")
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
return
remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT)
probe_host = probe_host or os.environ.get("PROBE_HOST")
if probe_path is None and probe_host and os.environ.get("PROBE_PATH_PREFIX"):
prefix = os.environ.get("PROBE_PATH_PREFIX", "").rstrip("/")
probe_path = f"{prefix}/{input_path.name}"
script_dir = Path(__file__).resolve().parent
tmp_base = script_dir / "tmp"
tmp_base.mkdir(exist_ok=True)
path_for_hash = os.path.abspath(os.path.expanduser(str(input_path)))
segment_hash = hashlib.md5(path_for_hash.encode()).hexdigest()
tmp_dir = str(tmp_base / f"ffmpeg_segments_{segment_hash}")
cwd = os.getcwd()
try:
os.chdir(output_dir)
from ffmpeg_distributed import encode as distributed_encode
safe_log_info(f"Distributed encode: {input_path} -> {output_path} (workers: {workers})")
print(f"{Colors.BLUE}Distributed encode (HEVC): {input_path.name}{Colors.ENDC}")
remote_ffmpeg = remote_ffmpeg_path or os.environ.get("DISTRIBUTED_REMOTE_FFMPEG_PATH")
distributed_encode(
workers,
str(input_path),
str(output_path),
segment_seconds=segment_seconds,
remote_args=remote_args,
concat_args=concat_args,
tmp_dir=tmp_dir,
probe_host=probe_host,
probe_path=probe_path,
remote_ffmpeg_path=remote_ffmpeg,
)
if output_path.exists():
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
else:
safe_log_error("Distributed encode did not produce output", f"{Colors.RED}Distributed encode did not produce output{Colors.ENDC}")
except Exception as e:
safe_log_error(f"Distributed encode failed: {e}", f"{Colors.RED}Distributed encode failed: {e}{Colors.ENDC}")
finally:
os.chdir(cwd)
if __name__ == "__main__":
# Get GPU selection
gpu = get_gpu_selection()
safe_log_info(f"Selected GPU: {gpu}")
if sys.platform == "win32":
print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}")
input_dir = "input"
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
# Get list of files to process
workers_str = os.environ.get("DISTRIBUTED_WORKERS")
if workers_str:
workers = _parse_workers_env(workers_str)
else:
workers = DISTRIBUTED_WORKERS_DEFAULT
workers_desc = ", ".join(f"{h}:gpu{g}" for h, g in workers)
print(f"{Colors.BLUE}Using workers: {workers_desc}{Colors.ENDC}")
safe_log_info(f"Distributed mode; workers: {workers}")
files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))]
total_files = len(files)
if total_files == 0:
safe_log_info("No files to process in input directory", f"{Colors.YELLOW}No files to process in input directory{Colors.ENDC}")
else:
safe_log_info(f"Found {total_files} files to process", f"{Colors.BLUE}Found {total_files} files to process{Colors.ENDC}")
for i, file in enumerate(files, 1):
input_file = os.path.join(input_dir, file)
safe_log_info(f"Processing file {i}/{total_files}: {file}")
print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}")
encode_dvr(input_file, output_dir, gpu)
encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS)
+411 -120
View File
@@ -7,10 +7,10 @@ from glob import glob
from os import mkdir, unlink, listdir, environ
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
from threading import Thread
from time import sleep
from typing import List, NamedTuple, Callable, Union
from time import sleep, time
from typing import List, NamedTuple, Callable, Union, Tuple
from signal import signal, SIGINT
from sys import exit, stderr
from sys import exit, stderr, stdin, platform as sys_platform
from shlex import split, join
from hashlib import md5
from time import strptime
@@ -30,10 +30,114 @@ def _popen(args, **kwargs):
kwargs['universal_newlines'] = True
return Popen(args, **kwargs)
def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]:
"""Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast).
If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed."""
cmd = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey',
'-show_entries', 'frame=pts_time', '-of', 'csv=p=0',
probe_path if (probe_host and probe_path) else input_file
]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, err = proc.communicate()
if proc.returncode != 0 and err:
dprint('ffprobe keyframes:', err)
out = out or ''
times = []
for line in out.strip().splitlines():
line = line.strip().split('=')[-1] if '=' in line else line.strip()
if not line:
continue
try:
times.append(float(line))
except ValueError:
pass
return sorted(times)
def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
"""Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there."""
path = probe_path if (probe_host and probe_path) else input_file
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
try:
if out:
return float(out)
except ValueError:
pass
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path]
if probe_host and probe_path:
cmd = ['ssh', probe_host, join(cmd)]
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
out, _ = proc.communicate()
out = (out or '').strip()
try:
if out:
return float(out)
except ValueError:
pass
return 0.0
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
proc = Popen(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', segment_path],
stdout=PIPE, stderr=PIPE, universal_newlines=True
)
out, _ = proc.communicate()
if proc.returncode != 0:
return False
out = (out or '').strip()
try:
duration = float(out)
except ValueError:
return False
return abs(duration - expected_duration_sec) <= tolerance_sec
def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:
"""Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds."""
if total_duration <= 0:
return []
if not keyframe_times or len(keyframe_times) == 1:
# No keyframes: split at fixed intervals so we still get multiple segments
segments = []
start = 0.0
while start < total_duration:
end = min(start + segment_seconds, total_duration)
segments.append((start, end))
start = end
return segments
segments = []
i = 0
while i < len(keyframe_times):
start = keyframe_times[i]
end_target = start + segment_seconds
j = i + 1
while j < len(keyframe_times) and keyframe_times[j] < end_target:
j += 1
if j < len(keyframe_times):
end = keyframe_times[j]
else:
end = total_duration
segments.append((start, end))
i = j
return segments
class Task(NamedTuple):
input_file: str
start_sec: float
duration_sec: float
output_file: str
ffmpeg_args: List[str] = []
ffmpeg_args: List[str]
class FFMPEGProc:
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
@@ -43,7 +147,7 @@ class FFMPEGProc:
def _match_to_sec(match):
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None):
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False):
self._cmd = cmd
self._update_callback = update_callback
self._should_stop = False
@@ -51,43 +155,90 @@ class FFMPEGProc:
self._duration = None
self._stdin = stdin
self._stdout = stdout
self._binary_io = binary_io
self._echo_stderr = echo_stderr
self.stderr = ''
def stop(self):
self._should_stop = True
if getattr(self, '_proc', None) is not None and self._proc.poll() is None:
try:
self._proc.terminate()
except OSError:
pass
def _read_stderr_loop(self, stderr_lines: list):
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
while True:
raw = self._proc.stderr.readline()
if not raw and self._proc.poll() is not None:
break
if raw:
line = raw.decode(errors='replace') if self._binary_io else raw
stderr_lines.append(line)
if self._echo_stderr:
tqdm.write(line.rstrip(), file=stderr)
match = self._progress_re.match(line)
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed')),
)
elif self._duration is None:
dm = self._duration_re.match(line)
if dm:
self._duration = self._match_to_sec(dm)
def run(self):
self._proc = Popen(self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=True)
poll = select.poll()
poll.register(self._proc.stderr)
while self._proc.poll() is None and not self._should_stop:
if not poll.poll(1):
sleep(0.1)
continue
sleep(0.001)
line = self._proc.stderr.readline()
match = self._progress_re.match(line)
if not match:
self.stderr += line
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed'))
)
elif self._duration is None:
match = self._duration_re.match(line)
if match:
self._duration = self._match_to_sec(match)
try:
out, err = self._proc.communicate(timeout=1)
self.stderr += err
except TimeoutExpired as ex:
pass
self._proc = Popen(
self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout,
universal_newlines=not self._binary_io
)
if getattr(select, 'poll', None):
poll = select.poll()
poll.register(self._proc.stderr)
while self._proc.poll() is None and not self._should_stop:
if not poll.poll(1):
sleep(0.1)
continue
sleep(0.001)
raw = self._proc.stderr.readline()
line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '')
if self._echo_stderr and line:
tqdm.write(line.rstrip(), file=stderr)
match = self._progress_re.match(line)
if not match:
self.stderr += line
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed')),
)
elif self._duration is None:
match = self._duration_re.match(line)
if match:
self._duration = self._match_to_sec(match)
try:
_, err = self._proc.communicate(timeout=1)
if err and self._binary_io:
err = err.decode(errors='replace') if isinstance(err, bytes) else err
self.stderr += err or ''
except TimeoutExpired:
pass
else:
stderr_lines = []
reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True)
reader.start()
while self._proc.poll() is None and not self._should_stop:
sleep(0.2)
reader.join(timeout=2)
self.stderr = ''.join(stderr_lines)
return self._proc.returncode
class TqdmAbsolute(tqdm):
@@ -102,155 +253,293 @@ class TqdmAbsolute(tqdm):
def update(self, to):
super().update(to - self.n) # will also set self.n = b * bsize
HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red
RESET = '\033[0m'
class TaskThread(Thread):
def __init__(self, host: str, task_queue: SimpleQueue, bar_pos):
def __init__(self, host: str, gpu_id: int, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None):
super().__init__()
self._should_stop = False
self._host = host
self._gpu_id = gpu_id
self._host_desc = f"{host}:gpu{gpu_id}"
self._bar_pos = bar_pos
self._remote_ffmpeg_path = remote_ffmpeg_path
self._source_file = source_file
self._task_queue = task_queue
self._ffmpeg = None
self._bar = TqdmAbsolute(desc=host, position=bar_pos)
self._bar = TqdmAbsolute(desc=self._host_desc, position=bar_pos)
self._current_file = None
def _host_tag(self):
c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)]
return f'{c}{self._host_desc}{RESET}'
def stop(self):
self._should_stop = True
if getattr(self, '_reader_proc', None) is not None and self._reader_proc.poll() is None:
try:
self._reader_proc.terminate()
except OSError:
pass
if self._ffmpeg:
self._ffmpeg.stop()
def run(self):
def upd(frames, fps, time, duration, speed):
last_log = [0.0] # mutable for progress heartbeat
def upd(frames, fps, t, duration, speed):
self._bar.total = duration or 999
self._bar.desc = self._host + ': ' + self._current_file
self._bar.update(time)
self._bar.desc = self._host_desc + ': ' + (self._current_file or '')
self._bar.update(t)
if duration and duration > 0 and (time() - last_log[0]) >= 30:
tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr)
stderr.flush()
last_log[0] = time()
try:
while not self._should_stop:
task = self._task_queue.get(False)
self._current_file = basename(task.input_file)
with open(task.input_file, 'r') as infile, open(task.output_file, 'w') as outfile:
ffmpeg_cmd = [
'nice', '-n10', 'ionice', '-c3',
'ffmpeg', '-f', 'matroska', '-i', 'pipe:',
*task.ffmpeg_args,
'-f', 'matroska', 'pipe:'
]
if self._host != 'localhost':
ffmpeg_cmd = ['ssh', self._host, join(ffmpeg_cmd)]
self._ffmpeg = FFMPEGProc(ffmpeg_cmd, stdin=infile, stdout=outfile, update_callback=upd)
self._current_file = basename(task.output_file)
tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s)', file=stderr)
stderr.flush()
reader_cmd = [
'ffmpeg', '-ss', str(task.start_sec), '-t', str(task.duration_sec),
'-i', self._source_file, '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
]
ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg'
encoder_cmd = [
ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:',
'-gpu', str(self._gpu_id),
*task.ffmpeg_args,
'-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1'
]
if self._host == 'localhost' and sys_platform != 'win32':
encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd
if self._host != 'localhost':
encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)]
ret = self._ffmpeg.run()
if ret != 0:
tqdm.write(f'task for {self._current_file} failed on host {self._host}', file=stderr)
tqdm.write(self._ffmpeg.stderr, file=stderr)
self._task_queue.put(task)
self._reader_proc = Popen(reader_cmd, stdout=PIPE, stderr=DEVNULL)
ret = -1
try:
with open(task.output_file, 'wb') as outfile:
self._ffmpeg = FFMPEGProc(
encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile,
update_callback=upd, binary_io=True
)
ret = self._ffmpeg.run()
finally:
rp = getattr(self, '_reader_proc', None)
if rp is not None:
try:
rp.wait(timeout=2)
except TimeoutExpired:
rp.terminate()
rp.wait(timeout=5)
self._reader_proc = None
if ret != 0:
try:
if isfile(task.output_file):
unlink(task.output_file)
except OSError:
pass
print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True)
if self._ffmpeg.stderr:
print(self._ffmpeg.stderr, file=stderr, end='', flush=True)
self._task_queue.put(task)
else:
tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr)
stderr.flush()
except Empty:
pass
self._bar.close()
def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False):
def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None):
input_file = abspath(expanduser(input_file))
output_file = abspath(expanduser(output_file))
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
tmp_in = f'{tmp_dir}/in'
tmp_out = f'{tmp_dir}/out'
try:
mkdir(tmp_dir)
mkdir(tmp_in)
mkdir(tmp_out)
except FileExistsError:
if not resume:
raise
pass # previous job: resume and re-queue failed segments
# skip splitting on resume
if len(listdir(tmp_in)) == 0 or not resume:
cv = ['copy'] if copy_input else ['libx264', '-crf', '0', '-preset', 'ultrafast', '-bf', '0']
with TqdmAbsolute(desc="splitting input file") as bar:
def upd(frames, fps, time, duration, speed):
bar.total = duration
bar.update(time)
ffmpeg = FFMPEGProc([
'ffmpeg', '-i', expanduser(input_file),
'-an', '-sn',
'-c:v', *cv,
'-f', 'segment', '-reset_timestamps', '1', '-segment_time', str(segment_seconds) + 's',
tmp_in + '/%08d.mkv'
],
update_callback=upd
)
ret = ffmpeg.run()
if ret != 0:
tqdm.write(ffmpeg.stderr, file=stderr)
return
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes')
tqdm.write('[1/4] Probing keyframes and duration...', file=stderr)
stderr.flush()
keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path)
total_duration = _get_duration_seconds(input_file, probe_host, probe_path)
segments = _build_segments(keyframe_times, segment_seconds, total_duration)
if not segments:
tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)
return
MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode
DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py
task_queue = SimpleQueue()
for f in sorted(glob(tmp_in+'/*')):
output_segment = tmp_out+f'/{basename(f)}'
# skip already encoded segments
if not isfile(output_segment):
task_queue.put(Task(f, output_segment, split(remote_args)))
removed = 0
for i, (start_sec, end_sec) in enumerate(segments):
duration_sec = end_sec - start_sec
output_path = f'{tmp_dir}/{i:08d}.mp4'
if isfile(output_path):
try:
if getsize(output_path) < MIN_SEGMENT_BYTES:
unlink(output_path)
removed += 1
elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC):
unlink(output_path)
removed += 1
else:
continue
except OSError:
pass
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args)))
threads = [TaskThread(host, task_queue, pos) for pos,host in enumerate(hosts,0)]
n_tasks = task_queue.qsize()
if removed:
tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr)
if n_tasks == 0:
tqdm.write('All segments already done (resume).', file=stderr)
else:
tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr)
stderr.flush()
dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued')
def sigint(sig, stack):
print('Got SIGINT, stopping...')
tqdm.write(f'[3/4] Encoding segments on {len(workers)} worker(s)...', file=stderr)
stderr.flush()
threads = [TaskThread(host, gpu_id, input_file, task_queue, pos, remote_ffmpeg_path) for pos, (host, gpu_id) in enumerate(workers, 0)]
def stop_all():
tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr)
stderr.flush()
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
thread.join(timeout=5)
exit(1)
signal(SIGINT, sigint)
stop_file = abspath('.encode_stop')
def stdin_stop_listener():
try:
while True:
line = stdin.readline()
if not line:
break
if line.strip().lower() == 'stop':
stop_all()
break
except (OSError, EOFError):
pass
def stop_file_poller():
try:
while True:
sleep(1.5)
if isfile(stop_file):
try:
unlink(stop_file)
except OSError:
pass
stop_all()
break
except OSError:
pass
def sigint(sig, stack):
stop_all()
try:
signal(SIGINT, sigint)
except (ValueError, OSError):
pass # SIGINT not available on this platform
if getattr(stdin, 'isatty', lambda: False)():
tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
else:
tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
stderr.flush()
if getattr(stdin, 'isatty', lambda: False)():
Thread(target=stdin_stop_listener, daemon=True).start()
Thread(target=stop_file_poller, daemon=True).start()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
try:
for thread in threads:
thread.join()
except KeyboardInterrupt:
stop_all()
with open('output_segments.txt', 'w') as f:
f.write('\n'.join([f"file '{file}'" for file in sorted(glob(tmp_out+'/*'))]))
list_path = f'{tmp_dir}/output_segments.txt'
with open(list_path, 'w') as f:
f.write('\n'.join([f"file '{fpath}'" for fpath in sorted(glob(f'{tmp_dir}/*.mp4'))]))
tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr)
concat_extra = ['-stats_period', '5'] if verbose else []
concat_cmd = [
'ffmpeg', *concat_extra, '-i', input_file,
'-f', 'concat', '-safe', '0', '-i', list_path,
'-map_metadata', '0:g',
'-map', '1:v', '-map', '0:a?', '-map', '0:s?',
'-c:v', 'copy', '-c:s', 'copy',
*split(concat_args),
'-t', str(total_duration),
'-y', output_file
]
tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr)
with TqdmAbsolute(desc='concatenating output segments') as bar:
def upd(frames, fps, time, duration, speed):
bar.total = duration
bar.update(time)
ffmpeg = FFMPEGProc([
'ffmpeg', '-i', input_file,
'-f', 'concat', '-safe', '0', '-i', 'output_segments.txt',
'-map_metadata', '0:g',
'-map', '1:v',
'-map', '0:a?',
'-map', '0:s?',
'-c:v', 'copy',
'-c:s', 'copy',
*split(concat_args),
'-y', output_file
],
update_callback=upd
)
ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose)
if ffmpeg.run() != 0:
tqdm.write(ffmpeg.stderr, file=stderr)
return
unlink('output_segments.txt')
unlink(list_path)
if not keep_tmp:
rmtree(tmp_dir)
def _parse_workers(host_specs):
"""Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...]."""
workers = []
for spec in host_specs or []:
spec = (spec or "").strip()
if not spec:
continue
if ":" in spec:
host, gpu = spec.rsplit(":", 1)
try:
workers.append((host.strip(), int(gpu.strip())))
except ValueError:
workers.append((spec, 0))
else:
workers.append((spec, 0))
return workers
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple hosts in parallel using ffmpeg over SSH.')
parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple workers (host:gpu) in parallel using ffmpeg over SSH.')
parser.add_argument('input_file', help='File to encode.')
parser.add_argument('output_file', help='Path to encoded output file.')
parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances. For example: "-c:v libx264 -crf 23 -preset fast"')
parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata. Mainly useful for audio encoding options, or "-an" to get rid of it.')
parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances (e.g. NVENC: -c:v hevc_nvenc -preset p7 ...). -gpu is added per worker.')
parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata.')
parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.')
parser.add_argument('-H', '--host', action='append', help='SSH hostname(s) to encode on. Use "localhost" to include the machine you\'re running this from. Can include username.', required=True)
parser.add_argument('-H', '--host', action='append', help='Worker as host or host:gpu (e.g. -H Pyro:0 -H RenderScrap:0 -H RenderScrap:1).', required=True)
parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.')
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.')
parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).')
args = parser.parse_args()
workers = _parse_workers(args.host)
if not workers:
parser.error('At least one worker required (e.g. -H Pyro:0 -H RenderScrap:0)')
encode(
args.host,
workers,
args.input_file,
args.output_file,
segment_seconds=args.segment_length,
@@ -259,5 +548,7 @@ if __name__ == '__main__':
tmp_dir=args.tmp_dir,
keep_tmp=args.keep_tmp,
resume=args.resume,
copy_input=args.copy_input
copy_input=args.copy_input,
probe_host=args.probe_host,
probe_path=args.probe_path
)
+2
View File
@@ -0,0 +1,2 @@
# For distributed mode (encode_VOD_pyro.py -d): ffmpeg_distributed uses tqdm
tqdm>=4.0.0