ffmpeg dist setup

This commit is contained in:
2026-02-15 17:26:55 -07:00
parent 57f95fc74d
commit 0f29c0c457
5 changed files with 5206 additions and 100 deletions
+5
View File
@@ -7,6 +7,7 @@ logs/*
# Encoded files # Encoded files
input/* input/*
output/* output/*
tmp/*
# Video files # Video files
*.mp4 *.mp4
*.mkv *.mkv
@@ -19,3 +20,7 @@ output/*
*.ts *.ts
*.m2ts *.m2ts
*.mts *.mts
# Python cache files
__pycache__/
*.pyc
File diff suppressed because one or more lines are too long
+73 -8
View File
@@ -1,6 +1,7 @@
import os import os
import re import re
import subprocess import subprocess
import sys
from pathlib import Path from pathlib import Path
import json import json
import logging import logging
@@ -8,6 +9,12 @@ from datetime import datetime
import shutil import shutil
import time import time
# Distributed encode defaults (AV1 CQ 0, maxrate 9000k; override via DISTRIBUTED_REMOTE_ARGS / DISTRIBUTED_HOSTS).
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
DISTRIBUTED_HOSTS_DEFAULT = ["PostIrony", "Pyro", "RenderScrap", "root@GuiltsCurse", "root@Godzilla"]
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v libsvtav1 -crf 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -an"
DISTRIBUTED_SEGMENT_SECONDS = 60
# ANSI color codes # ANSI color codes
class Colors: class Colors:
PURPLE = '\033[95m' PURPLE = '\033[95m'
@@ -334,26 +341,84 @@ def encode_dvr(input_file, output_dir, gpu):
safe_log_error(f"Unexpected error encoding {input_path}: {type(e).__name__}: {e}", safe_log_error(f"Unexpected error encoding {input_path}: {type(e).__name__}: {e}",
f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}") f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}")
def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, remote_args=None, concat_args="-c:a copy"):
"""Encode one file using ffmpeg_distributed (split -> farm -> concat). CWD is set to output_dir for temp files."""
input_path = Path(input_file).resolve()
output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve()
if output_path.exists():
safe_log_info(f"Skipping {input_path} - output already exists: {output_path}")
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
return
remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT)
cwd = os.getcwd()
try:
os.chdir(output_dir)
from ffmpeg_distributed import encode as distributed_encode
safe_log_info(f"Distributed encode: {input_path} -> {output_path} (hosts: {hosts})")
print(f"{Colors.BLUE}Distributed encode (AV1): {input_path.name}{Colors.ENDC}")
distributed_encode(
hosts,
str(input_path),
str(output_path),
segment_seconds=segment_seconds,
remote_args=remote_args,
concat_args=concat_args,
)
if output_path.exists():
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
else:
safe_log_error("Distributed encode did not produce output", f"{Colors.RED}Distributed encode did not produce output{Colors.ENDC}")
except Exception as e:
safe_log_error(f"Distributed encode failed: {e}", f"{Colors.RED}Distributed encode failed: {e}{Colors.ENDC}")
finally:
os.chdir(cwd)
if __name__ == "__main__": if __name__ == "__main__":
# Get GPU selection use_distributed = "--distributed" in sys.argv or "-d" in sys.argv
gpu = get_gpu_selection() if "--distributed" in sys.argv:
safe_log_info(f"Selected GPU: {gpu}") sys.argv.remove("--distributed")
if "-d" in sys.argv:
sys.argv.remove("-d")
if not use_distributed:
choice = input(f"\n{Colors.BLUE}Encode mode: [L]ocal (NVENC) / [D]istributed (farm):{Colors.ENDC} ").strip().upper() or "L"
use_distributed = choice == "D"
if use_distributed and sys.platform == "win32":
print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}")
input_dir = "input" input_dir = "input"
output_dir = "output" output_dir = "output"
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
if use_distributed:
hosts_str = os.environ.get("DISTRIBUTED_HOSTS")
if hosts_str:
hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
else:
hosts = DISTRIBUTED_HOSTS_DEFAULT
print(f"{Colors.BLUE}Using hosts: {', '.join(hosts)}{Colors.ENDC}")
safe_log_info(f"Distributed mode; hosts: {hosts}")
else:
gpu = get_gpu_selection()
safe_log_info(f"Selected GPU: {gpu}")
# Get list of files to process # Get list of files to process
files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))] files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))]
total_files = len(files) total_files = len(files)
if total_files == 0: if total_files == 0:
safe_log_info("No files to process in input directory", f"{Colors.YELLOW}No files to process in input directory{Colors.ENDC}") safe_log_info("No files to process in input directory", f"{Colors.YELLOW}No files to process in input directory{Colors.ENDC}")
else: else:
safe_log_info(f"Found {total_files} files to process", f"{Colors.BLUE}Found {total_files} files to process{Colors.ENDC}") safe_log_info(f"Found {total_files} files to process", f"{Colors.BLUE}Found {total_files} files to process{Colors.ENDC}")
for i, file in enumerate(files, 1): for i, file in enumerate(files, 1):
input_file = os.path.join(input_dir, file) input_file = os.path.join(input_dir, file)
safe_log_info(f"Processing file {i}/{total_files}: {file}") safe_log_info(f"Processing file {i}/{total_files}: {file}")
print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}") print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}")
encode_dvr(input_file, output_dir, gpu) if use_distributed:
encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS)
else:
encode_dvr(input_file, output_dir, gpu)
+59 -30
View File
@@ -56,38 +56,67 @@ class FFMPEGProc:
def stop(self): def stop(self):
self._should_stop = True self._should_stop = True
def _read_stderr_loop(self, stderr_lines: list):
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
while True:
line = self._proc.stderr.readline()
if not line and self._proc.poll() is not None:
break
if line:
stderr_lines.append(line)
match = self._progress_re.match(line)
if match and self._update_callback:
self._update_callback(
int(match.group('frame')),
int(match.group('fps')),
self._match_to_sec(match),
self._duration,
float(match.group('speed')),
)
elif self._duration is None:
dm = self._duration_re.match(line)
if dm:
self._duration = self._match_to_sec(dm)
def run(self): def run(self):
self._proc = Popen(self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=True) self._proc = Popen(self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=True)
poll = select.poll() if getattr(select, 'poll', None):
poll.register(self._proc.stderr) poll = select.poll()
while self._proc.poll() is None and not self._should_stop: poll.register(self._proc.stderr)
if not poll.poll(1): while self._proc.poll() is None and not self._should_stop:
sleep(0.1) if not poll.poll(1):
continue sleep(0.1)
sleep(0.001) continue
line = self._proc.stderr.readline() sleep(0.001)
match = self._progress_re.match(line) line = self._proc.stderr.readline()
if not match: match = self._progress_re.match(line)
self.stderr += line if not match:
self.stderr += line
if match and self._update_callback: if match and self._update_callback:
self._update_callback( self._update_callback(
int(match.group('frame')), int(match.group('frame')),
int(match.group('fps')), int(match.group('fps')),
self._match_to_sec(match), self._match_to_sec(match),
self._duration, self._duration,
float(match.group('speed')) float(match.group('speed')),
) )
elif self._duration is None: elif self._duration is None:
match = self._duration_re.match(line) match = self._duration_re.match(line)
if match: if match:
self._duration = self._match_to_sec(match) self._duration = self._match_to_sec(match)
try:
try: _, err = self._proc.communicate(timeout=1)
out, err = self._proc.communicate(timeout=1) self.stderr += err or ''
self.stderr += err except TimeoutExpired:
except TimeoutExpired as ex: pass
pass else:
stderr_lines = []
reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True)
reader.start()
while self._proc.poll() is None and not self._should_stop:
sleep(0.2)
reader.join(timeout=2)
self.stderr = ''.join(stderr_lines)
return self._proc.returncode return self._proc.returncode
class TqdmAbsolute(tqdm): class TqdmAbsolute(tqdm):
+2
View File
@@ -0,0 +1,2 @@
# For distributed mode (encode_VOD_pyro.py -d): ffmpeg_distributed uses tqdm
tqdm>=4.0.0