purge distributed
The result will never be good because the encoder will be starting fresh every single chunk (minute) so the only solution is either larger chunk sizes, which somewhat defeats the purpose, or single worker-only jobs, which we've already been doing. Tuff
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -1,3 +0,0 @@
|
||||
@echo off
|
||||
python encode_VOD_pyro.py
|
||||
pause
|
||||
@@ -1,447 +0,0 @@
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
import time
|
||||
|
||||
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
|
||||
# Workers = (ssh_host, gpu_index). Unraid (GuiltsCurse, Godzilla) excluded; RenderScrap has 2 GPUs.
|
||||
DISTRIBUTED_WORKERS_DEFAULT = [
|
||||
("Pyro", 0),
|
||||
("RenderScrap", 0),
|
||||
("RenderScrap", 1),
|
||||
("PostIrony", 0),
|
||||
]
|
||||
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v hevc_nvenc -preset p7 -tune hq -rc vbr -rc-lookahead 32 -spatial-aq 1 -aq-strength 15 -cq 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -an"
|
||||
DISTRIBUTED_SEGMENT_SECONDS = 60
|
||||
|
||||
|
||||
def _parse_workers_env(s):
|
||||
"""Parse DISTRIBUTED_WORKERS e.g. 'Pyro:0,RenderScrap:0,RenderScrap:1,PostIrony:0' -> [(host, gpu_id), ...]."""
|
||||
out = []
|
||||
for part in (s or "").strip().split(","):
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
if ":" in part:
|
||||
host, gpu = part.rsplit(":", 1)
|
||||
try:
|
||||
out.append((host.strip(), int(gpu.strip())))
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
out.append((part, 0))
|
||||
return out
|
||||
|
||||
# ANSI color codes
|
||||
class Colors:
|
||||
PURPLE = '\033[95m'
|
||||
BLUE = '\033[94m'
|
||||
GREEN = '\033[92m'
|
||||
YELLOW = '\033[93m'
|
||||
RED = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
|
||||
def get_gpu_selection():
|
||||
while True:
|
||||
print(f"\n{Colors.BLUE}Select GPU slot:{Colors.ENDC}")
|
||||
print("0 - First GPU")
|
||||
print("1 - Second GPU")
|
||||
print("2 - Third GPU")
|
||||
gpu = input(f"{Colors.YELLOW}Enter GPU number (0-2):{Colors.ENDC} ").strip()
|
||||
if gpu in ['0', '1', '2']:
|
||||
return gpu
|
||||
print(f"{Colors.RED}Invalid selection. Please try again.{Colors.ENDC}")
|
||||
|
||||
# Custom file handler that silently handles I/O errors (for network shares)
|
||||
class SafeFileHandler(logging.FileHandler):
|
||||
"""File handler that silently handles I/O errors during flush"""
|
||||
def flush(self):
|
||||
"""Override flush to silently handle I/O errors"""
|
||||
try:
|
||||
super().flush()
|
||||
except (OSError, IOError):
|
||||
# Silently ignore I/O errors (network share issues)
|
||||
pass
|
||||
except Exception:
|
||||
# Silently ignore all other errors during flush
|
||||
pass
|
||||
|
||||
def emit(self, record):
|
||||
"""Override emit to handle errors gracefully"""
|
||||
try:
|
||||
super().emit(record)
|
||||
except (OSError, IOError):
|
||||
# Silently ignore I/O errors - we'll fall back to console output
|
||||
self.handleError(record)
|
||||
except Exception:
|
||||
# Handle other errors
|
||||
self.handleError(record)
|
||||
|
||||
def handleError(self, record):
|
||||
"""Override to prevent error messages from being printed"""
|
||||
# Don't print "--- Logging error ---" messages
|
||||
pass
|
||||
|
||||
# Set up logging
|
||||
log_dir = "logs"
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
log_file = os.path.join(log_dir, f"encode_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
|
||||
|
||||
# Configure logging with custom handler that handles network share errors
|
||||
handler = SafeFileHandler(log_file, mode='w', encoding='utf-8')
|
||||
handler.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.addHandler(handler)
|
||||
# Remove default handlers to avoid duplicate output
|
||||
logger.handlers = [handler]
|
||||
|
||||
def get_file_info(input_file):
|
||||
cmd = [
|
||||
'ffprobe',
|
||||
'-v', 'error',
|
||||
'-show_entries', 'format=duration,size:stream=codec_type,codec_name,width,height,r_frame_rate,channels,channel_layout',
|
||||
'-of', 'json',
|
||||
input_file
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
return json.loads(result.stdout)
|
||||
|
||||
def get_audio_labels(input_file):
|
||||
cmd = [
|
||||
'ffprobe',
|
||||
'-v', 'error',
|
||||
'-select_streams', 'a',
|
||||
'-show_entries', 'stream=index:stream_tags=title',
|
||||
'-of', 'json',
|
||||
input_file
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
info = json.loads(result.stdout)
|
||||
labels = []
|
||||
for stream in info.get('streams', []):
|
||||
title = stream.get('tags', {}).get('title', None)
|
||||
labels.append(title)
|
||||
return labels
|
||||
|
||||
def parse_fps(r_frame_rate):
|
||||
"""Parse ffprobe r_frame_rate e.g. '60/1' or '30000/1001' to float."""
|
||||
if not r_frame_rate:
|
||||
return None
|
||||
parts = str(r_frame_rate).strip().split('/')
|
||||
if len(parts) == 2 and float(parts[1]) != 0:
|
||||
return float(parts[0]) / float(parts[1])
|
||||
try:
|
||||
return float(parts[0])
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
def format_size(size_bytes):
|
||||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes:.2f} {unit}"
|
||||
size_bytes /= 1024
|
||||
return f"{size_bytes:.2f} TB"
|
||||
|
||||
def safe_log_info(message, print_msg=None):
|
||||
"""Safely log info message, ensuring console output even if logging fails"""
|
||||
try:
|
||||
logging.info(message)
|
||||
except (OSError, IOError) as e:
|
||||
# Logging failed (likely network share issue) - print to console
|
||||
if print_msg is None:
|
||||
print(f"{Colors.YELLOW}[Logging failed: {e}] {message}{Colors.ENDC}")
|
||||
else:
|
||||
print(print_msg)
|
||||
print(f"{Colors.YELLOW}[Logging failed: {e}]{Colors.ENDC}")
|
||||
return
|
||||
except Exception as e:
|
||||
# Other logging errors
|
||||
if print_msg is None:
|
||||
print(f"{Colors.YELLOW}[Logging error: {e}] {message}{Colors.ENDC}")
|
||||
else:
|
||||
print(print_msg)
|
||||
print(f"{Colors.YELLOW}[Logging error: {e}]{Colors.ENDC}")
|
||||
return
|
||||
|
||||
# Always print to console if message provided
|
||||
if print_msg is not None:
|
||||
print(print_msg)
|
||||
|
||||
def safe_log_error(message, print_msg=None):
|
||||
"""Safely log error message, ensuring console output even if logging fails"""
|
||||
try:
|
||||
logging.error(message)
|
||||
except (OSError, IOError) as e:
|
||||
# Logging failed (likely network share issue) - print to console
|
||||
if print_msg is None:
|
||||
print(f"{Colors.RED}[Logging failed: {e}] {message}{Colors.ENDC}")
|
||||
else:
|
||||
print(print_msg)
|
||||
print(f"{Colors.RED}[Logging failed: {e}]{Colors.ENDC}")
|
||||
return
|
||||
except Exception as e:
|
||||
# Other logging errors
|
||||
if print_msg is None:
|
||||
print(f"{Colors.RED}[Logging error: {e}] {message}{Colors.ENDC}")
|
||||
else:
|
||||
print(print_msg)
|
||||
print(f"{Colors.RED}[Logging error: {e}]{Colors.ENDC}")
|
||||
return
|
||||
|
||||
# Always print to console if message provided
|
||||
if print_msg is not None:
|
||||
print(print_msg)
|
||||
else:
|
||||
print(f"{Colors.RED}{message}{Colors.ENDC}")
|
||||
|
||||
def encode_dvr(input_file, output_dir, gpu):
|
||||
input_path = Path(input_file)
|
||||
output_path = Path(output_dir) / f"{input_path.stem}{input_path.suffix}"
|
||||
|
||||
# Get file info for logging
|
||||
file_info = get_file_info(str(input_path))
|
||||
input_size = int(file_info['format']['size'])
|
||||
duration = float(file_info['format']['duration'])
|
||||
|
||||
safe_log_info(f"Processing file: {input_path}")
|
||||
safe_log_info(f"Input size: {format_size(input_size)}")
|
||||
safe_log_info(f"Duration: {duration:.2f} seconds")
|
||||
|
||||
print(f"\n{Colors.BLUE}Processing file: {input_path}{Colors.ENDC}")
|
||||
print(f"Input size: {format_size(input_size)}")
|
||||
print(f"Duration: {duration:.2f} seconds")
|
||||
|
||||
# Log stream information
|
||||
for i, stream in enumerate(file_info.get('streams', [])):
|
||||
stream_type = 'Video' if stream.get('codec_name', '').startswith('h') else 'Audio'
|
||||
safe_log_info(f"Stream {i} ({stream_type}):")
|
||||
for key, value in stream.items():
|
||||
safe_log_info(f" {key}: {value}")
|
||||
|
||||
# Skip if output file already exists
|
||||
if output_path.exists():
|
||||
output_size = output_path.stat().st_size
|
||||
safe_log_info(f"Skipping {input_path} - output already exists: {output_path}")
|
||||
safe_log_info(f"Output size: {format_size(output_size)}")
|
||||
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
|
||||
return
|
||||
|
||||
# Get audio labels and input FPS (for speed display when stderr is piped)
|
||||
audio_labels = get_audio_labels(str(input_path))
|
||||
safe_log_info(f"Audio labels: {audio_labels}")
|
||||
input_fps = None
|
||||
for s in file_info.get('streams', []):
|
||||
if s.get('codec_type') == 'video':
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
break
|
||||
if input_fps is None:
|
||||
# Fallback: first stream with r_frame_rate (e.g. codec_type not in probe)
|
||||
for s in file_info.get('streams', []):
|
||||
if 'width' in s or s.get('codec_name', '').startswith(('h', 'm')):
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
break
|
||||
if input_fps is None:
|
||||
for s in file_info.get('streams', []):
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
if input_fps and input_fps > 0:
|
||||
break
|
||||
|
||||
# FFmpeg command with NVIDIA HEVC encoder and maximum quality
|
||||
cmd = [
|
||||
'ffmpeg',
|
||||
'-v', 'info', # Lower verbosity to reduce noise
|
||||
'-stats', # Emit periodic stats
|
||||
'-stats_period', '1.0', # Update stats every 1s (more stable)
|
||||
'-i', str(input_path),
|
||||
'-c:v', 'hevc_nvenc',
|
||||
'-gpu', gpu,
|
||||
'-preset', 'p7',
|
||||
'-tune', 'hq',
|
||||
'-rc', 'vbr',
|
||||
'-rc-lookahead', '32',
|
||||
'-spatial-aq', '1',
|
||||
'-aq-strength', '15',
|
||||
'-cq', '0',
|
||||
'-b:v', '9000k',
|
||||
'-maxrate', '9000k',
|
||||
'-bufsize', '18000k',
|
||||
'-c:a', 'copy',
|
||||
'-map', '0',
|
||||
]
|
||||
# Add metadata for each audio stream if label exists
|
||||
for idx, label in enumerate(audio_labels):
|
||||
if label:
|
||||
cmd += [f'-metadata:s:a:{idx}', f'title={label}']
|
||||
cmd.append(str(output_path))
|
||||
|
||||
try:
|
||||
# Run FFmpeg and capture combined output (avoid dual-pipe deadlocks on Windows)
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
universal_newlines=True,
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# Stream output line-by-line; compute speed when stderr is piped (FFmpeg shows speed=N/A)
|
||||
for line in iter(process.stdout.readline, ''):
|
||||
if not line:
|
||||
break
|
||||
text = line.strip().strip('\r')
|
||||
try:
|
||||
if text.startswith('frame=') or ' fps=' in text:
|
||||
display = text
|
||||
m_frame = re.search(r'frame=\s*(\d+)', text)
|
||||
m_elapsed = re.search(r'elapsed=(\d+):(\d+):(\d+\.?\d*)', text)
|
||||
m_size = re.search(r'size=\s*(\d+)\s*KiB', text)
|
||||
if m_frame and m_elapsed:
|
||||
frames = int(m_frame.group(1))
|
||||
h, m, s = float(m_elapsed.group(1)), float(m_elapsed.group(2)), float(m_elapsed.group(3))
|
||||
elapsed_sec = h * 3600 + m * 60 + s
|
||||
if elapsed_sec > 0:
|
||||
if input_fps and input_fps > 0:
|
||||
speed_x = (frames / input_fps) / elapsed_sec
|
||||
display = re.sub(r'speed=N/A', f'speed={speed_x:.2f}x', text)
|
||||
# time = output position (HH:MM:SS.ms)
|
||||
video_sec = frames / input_fps
|
||||
t_h = int(video_sec // 3600)
|
||||
t_m = int((video_sec % 3600) // 60)
|
||||
t_s = video_sec % 60
|
||||
time_str = f'{t_h}:{t_m:02d}:{t_s:06.3f}' if t_h else f'0:{t_m:02d}:{t_s:06.3f}'
|
||||
display = re.sub(r'time=N/A', f'time={time_str}', display)
|
||||
# bitrate when muxer has written data (stays N/A until size > 0)
|
||||
size_kib = int(m_size.group(1)) if m_size else 0
|
||||
if size_kib > 0 and elapsed_sec > 0:
|
||||
bitrate_kbps = (size_kib * 8192) / (elapsed_sec * 1000)
|
||||
display = re.sub(r'bitrate=N/A', f'bitrate={bitrate_kbps:.0f}kbits/s', display)
|
||||
else:
|
||||
enc_fps = frames / elapsed_sec
|
||||
display = re.sub(r'speed=N/A', f'speed={enc_fps:.0f}fps', text)
|
||||
safe_log_info(f"Progress: {text}", f"{Colors.PURPLE}Progress: {display}{Colors.ENDC}")
|
||||
else:
|
||||
safe_log_info(f"FFmpeg: {text}", f"{Colors.GREEN}FFmpeg: {text}{Colors.ENDC}")
|
||||
except (OSError, IOError) as e:
|
||||
# I/O error reading from pipe - log it
|
||||
safe_log_error(f"I/O error reading FFmpeg output: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
# Unexpected error
|
||||
safe_log_error(f"Unexpected error processing FFmpeg output: {e}")
|
||||
|
||||
process.wait()
|
||||
|
||||
if process.returncode == 0:
|
||||
# Get output file info
|
||||
output_info = get_file_info(str(output_path))
|
||||
output_size = int(output_info['format']['size'])
|
||||
compression_ratio = input_size / output_size if output_size > 0 else 0
|
||||
|
||||
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
|
||||
safe_log_info(f"Output size: {format_size(output_size)}")
|
||||
safe_log_info(f"Compression ratio: {compression_ratio:.2f}x", f"Compression ratio: {compression_ratio:.2f}x")
|
||||
else:
|
||||
# Convert Windows error code to signed integer if needed
|
||||
return_code = process.returncode
|
||||
if return_code > 2147483647: # If it's a large unsigned int, convert to signed
|
||||
return_code = return_code - 4294967296
|
||||
safe_log_error(f"FFmpeg process failed with return code {return_code}",
|
||||
f"{Colors.RED}FFmpeg process failed with return code {return_code}{Colors.ENDC}")
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
safe_log_error(f"Error encoding {input_path}: {e}", f"{Colors.RED}Error encoding {input_path}: {e}{Colors.ENDC}")
|
||||
except Exception as e:
|
||||
safe_log_error(f"Unexpected error encoding {input_path}: {type(e).__name__}: {e}",
|
||||
f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}")
|
||||
|
||||
|
||||
def encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=60, remote_args=None, concat_args="-c:a copy", probe_host=None, probe_path=None, remote_ffmpeg_path=None):
|
||||
"""Encode one file using ffmpeg_distributed (split -> farm -> concat). workers = [(host, gpu_id), ...].
|
||||
Segment temp dirs go under script dir/tmp/. If probe_host and probe_path are set, ffprobe runs there (faster when input is on NAS)."""
|
||||
input_path = Path(input_file).resolve()
|
||||
output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve()
|
||||
if output_path.exists():
|
||||
safe_log_info(f"Skipping {input_path} - output already exists: {output_path}")
|
||||
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
|
||||
return
|
||||
remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT)
|
||||
probe_host = probe_host or os.environ.get("PROBE_HOST")
|
||||
if probe_path is None and probe_host and os.environ.get("PROBE_PATH_PREFIX"):
|
||||
prefix = os.environ.get("PROBE_PATH_PREFIX", "").rstrip("/")
|
||||
probe_path = f"{prefix}/{input_path.name}"
|
||||
script_dir = Path(__file__).resolve().parent
|
||||
tmp_base = script_dir / "tmp"
|
||||
tmp_base.mkdir(exist_ok=True)
|
||||
path_for_hash = os.path.abspath(os.path.expanduser(str(input_path)))
|
||||
segment_hash = hashlib.md5(path_for_hash.encode()).hexdigest()
|
||||
tmp_dir = str(tmp_base / f"ffmpeg_segments_{segment_hash}")
|
||||
cwd = os.getcwd()
|
||||
try:
|
||||
os.chdir(output_dir)
|
||||
from ffmpeg_distributed import encode as distributed_encode
|
||||
safe_log_info(f"Distributed encode: {input_path} -> {output_path} (workers: {workers})")
|
||||
print(f"{Colors.BLUE}Distributed encode (HEVC): {input_path.name}{Colors.ENDC}")
|
||||
remote_ffmpeg = remote_ffmpeg_path or os.environ.get("DISTRIBUTED_REMOTE_FFMPEG_PATH")
|
||||
ok = distributed_encode(
|
||||
workers,
|
||||
str(input_path),
|
||||
str(output_path),
|
||||
segment_seconds=segment_seconds,
|
||||
remote_args=remote_args,
|
||||
concat_args=concat_args,
|
||||
tmp_dir=tmp_dir,
|
||||
probe_host=probe_host,
|
||||
probe_path=probe_path,
|
||||
remote_ffmpeg_path=remote_ffmpeg,
|
||||
)
|
||||
if ok and output_path.exists():
|
||||
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
|
||||
else:
|
||||
safe_log_error("Distributed encode did not produce output (see [4/4] ERROR above)", f"{Colors.RED}Distributed encode did not produce output{Colors.ENDC}")
|
||||
except Exception as e:
|
||||
safe_log_error(f"Distributed encode failed: {e}", f"{Colors.RED}Distributed encode failed: {e}{Colors.ENDC}")
|
||||
finally:
|
||||
os.chdir(cwd)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.platform == "win32":
|
||||
print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}")
|
||||
|
||||
input_dir = "input"
|
||||
output_dir = "output"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
workers_str = os.environ.get("DISTRIBUTED_WORKERS")
|
||||
if workers_str:
|
||||
workers = _parse_workers_env(workers_str)
|
||||
else:
|
||||
workers = DISTRIBUTED_WORKERS_DEFAULT
|
||||
workers_desc = ", ".join(f"{h}:gpu{g}" for h, g in workers)
|
||||
print(f"{Colors.BLUE}Using workers: {workers_desc}{Colors.ENDC}")
|
||||
safe_log_info(f"Distributed mode; workers: {workers}")
|
||||
|
||||
files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))]
|
||||
total_files = len(files)
|
||||
|
||||
if total_files == 0:
|
||||
safe_log_info("No files to process in input directory", f"{Colors.YELLOW}No files to process in input directory{Colors.ENDC}")
|
||||
else:
|
||||
safe_log_info(f"Found {total_files} files to process", f"{Colors.BLUE}Found {total_files} files to process{Colors.ENDC}")
|
||||
|
||||
for i, file in enumerate(files, 1):
|
||||
input_file = os.path.join(input_dir, file)
|
||||
safe_log_info(f"Processing file {i}/{total_files}: {file}")
|
||||
print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}")
|
||||
encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS)
|
||||
@@ -1,686 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
from queue import SimpleQueue, Empty
|
||||
from subprocess import Popen, DEVNULL, PIPE, TimeoutExpired
|
||||
import select
|
||||
from shutil import rmtree
|
||||
from glob import glob
|
||||
from os import mkdir, unlink, listdir, environ
|
||||
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
|
||||
from threading import Thread
|
||||
from time import sleep, time
|
||||
from typing import List, NamedTuple, Callable, Union, Tuple
|
||||
from signal import signal, SIGINT
|
||||
from sys import exit, stderr, stdin, platform as sys_platform
|
||||
from shlex import split, join
|
||||
from hashlib import md5
|
||||
from time import strptime
|
||||
from tqdm import tqdm
|
||||
import re
|
||||
|
||||
DEBUG = 'DEBUG' in environ
|
||||
|
||||
def dprint(*args, **kwargs):
|
||||
if DEBUG:
|
||||
print(*args, *kwargs)
|
||||
|
||||
def _popen(args, **kwargs):
|
||||
dprint(f'calling subprocess: {args}')
|
||||
kwargs['stderr'] = PIPE
|
||||
kwargs['stdout'] = DEVNULL
|
||||
kwargs['universal_newlines'] = True
|
||||
return Popen(args, **kwargs)
|
||||
|
||||
|
||||
def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]:
|
||||
"""Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast).
|
||||
If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed."""
|
||||
cmd = [
|
||||
'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey',
|
||||
'-show_entries', 'frame=pts_time', '-of', 'csv=p=0',
|
||||
probe_path if (probe_host and probe_path) else input_file
|
||||
]
|
||||
if probe_host and probe_path:
|
||||
cmd = ['ssh', probe_host, join(cmd)]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
out, err = proc.communicate()
|
||||
if proc.returncode != 0 and err:
|
||||
dprint('ffprobe keyframes:', err)
|
||||
out = out or ''
|
||||
times = []
|
||||
for line in out.strip().splitlines():
|
||||
line = line.strip().split('=')[-1] if '=' in line else line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
times.append(float(line))
|
||||
except ValueError:
|
||||
pass
|
||||
return sorted(times)
|
||||
|
||||
|
||||
def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
|
||||
"""Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there."""
|
||||
path = probe_path if (probe_host and probe_path) else input_file
|
||||
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path]
|
||||
if probe_host and probe_path:
|
||||
cmd = ['ssh', probe_host, join(cmd)]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
out, _ = proc.communicate()
|
||||
out = (out or '').strip()
|
||||
try:
|
||||
if out:
|
||||
return float(out)
|
||||
except ValueError:
|
||||
pass
|
||||
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path]
|
||||
if probe_host and probe_path:
|
||||
cmd = ['ssh', probe_host, join(cmd)]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
out, _ = proc.communicate()
|
||||
out = (out or '').strip()
|
||||
try:
|
||||
if out:
|
||||
return float(out)
|
||||
except ValueError:
|
||||
pass
|
||||
return 0.0
|
||||
|
||||
|
||||
def _get_fps(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
|
||||
"""Get video stream frame rate (e.g. 60.0) via ffprobe. If probe_host set, run there. Default 60.0 on failure."""
|
||||
path = probe_path if (probe_host and probe_path) else input_file
|
||||
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'csv=p=0', path]
|
||||
if probe_host and probe_path:
|
||||
cmd = ['ssh', probe_host, join(cmd)]
|
||||
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
out, _ = proc.communicate()
|
||||
out = (out or '').strip()
|
||||
if not out:
|
||||
return 60.0
|
||||
try:
|
||||
if '/' in out:
|
||||
num, den = out.split('/', 1)
|
||||
return float(num) / float(den) if float(den) else 60.0
|
||||
return float(out)
|
||||
except (ValueError, ZeroDivisionError):
|
||||
return 60.0
|
||||
|
||||
|
||||
def _probe_duration(path: str) -> float:
|
||||
"""Return duration in seconds from ffprobe, or 0.0 on failure."""
|
||||
proc = Popen(
|
||||
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path],
|
||||
stdout=PIPE, stderr=PIPE, universal_newlines=True
|
||||
)
|
||||
out, _ = proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
return 0.0
|
||||
out = (out or '').strip()
|
||||
try:
|
||||
return float(out)
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
|
||||
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
|
||||
duration = _probe_duration(segment_path)
|
||||
return duration > 0 and abs(duration - expected_duration_sec) <= tolerance_sec
|
||||
|
||||
|
||||
def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:
|
||||
"""Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds."""
|
||||
if total_duration <= 0:
|
||||
return []
|
||||
if not keyframe_times or len(keyframe_times) == 1:
|
||||
# No keyframes: split at fixed intervals so we still get multiple segments
|
||||
segments = []
|
||||
start = 0.0
|
||||
while start < total_duration:
|
||||
end = min(start + segment_seconds, total_duration)
|
||||
segments.append((start, end))
|
||||
start = end
|
||||
return segments
|
||||
segments = []
|
||||
i = 0
|
||||
while i < len(keyframe_times):
|
||||
start = keyframe_times[i]
|
||||
end_target = start + segment_seconds
|
||||
j = i + 1
|
||||
while j < len(keyframe_times) and keyframe_times[j] < end_target:
|
||||
j += 1
|
||||
if j < len(keyframe_times):
|
||||
end = keyframe_times[j]
|
||||
else:
|
||||
end = total_duration
|
||||
segments.append((start, end))
|
||||
i = j
|
||||
return segments
|
||||
|
||||
|
||||
class Task(NamedTuple):
|
||||
start_sec: float
|
||||
duration_sec: float
|
||||
output_file: str
|
||||
ffmpeg_args: List[str]
|
||||
fps: float = 60.0
|
||||
|
||||
class FFMPEGProc:
|
||||
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
|
||||
_progress_re = re.compile(r'frame=\s*(?P<frame>[0-9]+)\s+fps=\s*(?P<fps>[0-9]+).*time=-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9,.]+)\s+.*speed=(?P<speed>[0-9\.]+)x')
|
||||
|
||||
@staticmethod
|
||||
def _match_to_sec(match):
|
||||
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
|
||||
|
||||
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False):
|
||||
self._cmd = cmd
|
||||
self._update_callback = update_callback
|
||||
self._should_stop = False
|
||||
self._shell = shell
|
||||
self._duration = None
|
||||
self._stdin = stdin
|
||||
self._stdout = stdout
|
||||
self._binary_io = binary_io
|
||||
self._echo_stderr = echo_stderr
|
||||
self.stderr = ''
|
||||
|
||||
def stop(self):
|
||||
self._should_stop = True
|
||||
if getattr(self, '_proc', None) is not None and self._proc.poll() is None:
|
||||
try:
|
||||
self._proc.terminate()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _read_stderr_loop(self, stderr_lines: list):
|
||||
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
|
||||
while True:
|
||||
raw = self._proc.stderr.readline()
|
||||
if not raw and self._proc.poll() is not None:
|
||||
break
|
||||
if raw:
|
||||
line = raw.decode(errors='replace') if self._binary_io else raw
|
||||
stderr_lines.append(line)
|
||||
if self._echo_stderr:
|
||||
tqdm.write(line.rstrip(), file=stderr)
|
||||
match = self._progress_re.match(line)
|
||||
if match and self._update_callback:
|
||||
self._update_callback(
|
||||
int(match.group('frame')),
|
||||
int(match.group('fps')),
|
||||
self._match_to_sec(match),
|
||||
self._duration,
|
||||
float(match.group('speed')),
|
||||
)
|
||||
elif self._duration is None:
|
||||
dm = self._duration_re.match(line)
|
||||
if dm:
|
||||
self._duration = self._match_to_sec(dm)
|
||||
|
||||
def run(self):
|
||||
self._proc = Popen(
|
||||
self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout,
|
||||
universal_newlines=not self._binary_io
|
||||
)
|
||||
if getattr(select, 'poll', None):
|
||||
poll = select.poll()
|
||||
poll.register(self._proc.stderr)
|
||||
while self._proc.poll() is None and not self._should_stop:
|
||||
if not poll.poll(1):
|
||||
sleep(0.1)
|
||||
continue
|
||||
sleep(0.001)
|
||||
raw = self._proc.stderr.readline()
|
||||
line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '')
|
||||
if self._echo_stderr and line:
|
||||
tqdm.write(line.rstrip(), file=stderr)
|
||||
match = self._progress_re.match(line)
|
||||
if not match:
|
||||
self.stderr += line
|
||||
if match and self._update_callback:
|
||||
self._update_callback(
|
||||
int(match.group('frame')),
|
||||
int(match.group('fps')),
|
||||
self._match_to_sec(match),
|
||||
self._duration,
|
||||
float(match.group('speed')),
|
||||
)
|
||||
elif self._duration is None:
|
||||
match = self._duration_re.match(line)
|
||||
if match:
|
||||
self._duration = self._match_to_sec(match)
|
||||
try:
|
||||
_, err = self._proc.communicate(timeout=1)
|
||||
if err and self._binary_io:
|
||||
err = err.decode(errors='replace') if isinstance(err, bytes) else err
|
||||
self.stderr += err or ''
|
||||
except TimeoutExpired:
|
||||
pass
|
||||
else:
|
||||
stderr_lines = []
|
||||
reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True)
|
||||
reader.start()
|
||||
while self._proc.poll() is None and not self._should_stop:
|
||||
sleep(0.2)
|
||||
reader.join(timeout=2)
|
||||
self.stderr = ''.join(stderr_lines)
|
||||
return self._proc.returncode
|
||||
|
||||
class TqdmAbsolute(tqdm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['bar_format'] = '{l_bar}{bar}|{n:.1f}/{total:.1f} [{elapsed}<{remaining}]'
|
||||
kwargs['dynamic_ncols'] = True
|
||||
if not 'total' in kwargs:
|
||||
kwargs['total'] = 99999999
|
||||
if not 'leave' in kwargs:
|
||||
kwargs['leave'] = False
|
||||
super().__init__(*args, **kwargs)
|
||||
def update(self, to):
|
||||
super().update(to - self.n) # will also set self.n = b * bsize
|
||||
|
||||
HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red
|
||||
RESET = '\033[0m'
|
||||
SEGMENT_STALL_TIMEOUT = int(environ.get('SEGMENT_STALL_TIMEOUT', '300')) # no encoder progress for this many sec -> kill and re-queue
|
||||
|
||||
class TaskThread(Thread):
|
||||
def __init__(self, host: str, gpu_id: int, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None):
|
||||
super().__init__()
|
||||
self._should_stop = False
|
||||
self._host = host
|
||||
self._gpu_id = gpu_id
|
||||
self._host_desc = f"{host}:gpu{gpu_id}"
|
||||
self._bar_pos = bar_pos
|
||||
self._remote_ffmpeg_path = remote_ffmpeg_path
|
||||
self._source_file = source_file
|
||||
self._task_queue = task_queue
|
||||
self._ffmpeg = None
|
||||
self._bar = TqdmAbsolute(desc=self._host_desc, position=bar_pos)
|
||||
self._current_file = None
|
||||
|
||||
def _host_tag(self):
|
||||
c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)]
|
||||
return f'{c}{self._host_desc}{RESET}'
|
||||
|
||||
def stop(self):
|
||||
self._should_stop = True
|
||||
rp = getattr(self, '_reader_proc', None)
|
||||
if rp is not None and rp.poll() is None:
|
||||
try:
|
||||
if rp.stdin is not None:
|
||||
rp.stdin.write(b'q')
|
||||
rp.stdin.flush()
|
||||
sleep(0.5)
|
||||
except (OSError, BrokenPipeError):
|
||||
pass
|
||||
try:
|
||||
if rp.poll() is None:
|
||||
rp.terminate()
|
||||
except OSError:
|
||||
pass
|
||||
if self._ffmpeg:
|
||||
self._ffmpeg.stop()
|
||||
|
||||
def run(self):
|
||||
last_log = [0.0] # mutable for progress heartbeat
|
||||
last_progress = [time()] # for stall detection
|
||||
last_encoder_t = [0.0] # last encoder progress position (sec) for stall log
|
||||
segment_start_time = [0.0] # when current segment started
|
||||
first_progress_logged = [False] # one-time "first frame" log per segment
|
||||
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes') or environ.get('DISTRIBUTED_DEBUG', '').lower() in ('1', 'true', 'yes')
|
||||
|
||||
def upd(frames, fps, t, duration, speed):
|
||||
now = time()
|
||||
last_progress[0] = now
|
||||
last_encoder_t[0] = t
|
||||
self._bar.total = duration or 999
|
||||
self._bar.desc = self._host_desc + ': ' + (self._current_file or '')
|
||||
self._bar.update(t)
|
||||
if not first_progress_logged[0] and (frames > 0 or t > 0):
|
||||
elapsed = now - segment_start_time[0]
|
||||
tqdm.write(f' {self._host_tag()}: {self._current_file} first frame after {elapsed:.1f}s', file=stderr)
|
||||
stderr.flush()
|
||||
first_progress_logged[0] = True
|
||||
if duration and duration > 0 and (now - last_log[0]) >= 30:
|
||||
tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr)
|
||||
stderr.flush()
|
||||
last_log[0] = now
|
||||
try:
|
||||
while not self._should_stop:
|
||||
task = self._task_queue.get(False)
|
||||
|
||||
self._current_file = basename(task.output_file)
|
||||
last_progress[0] = time()
|
||||
segment_start_time[0] = last_progress[0]
|
||||
first_progress_logged[0] = False
|
||||
last_encoder_t[0] = 0.0
|
||||
n_frames = round(task.duration_sec * task.fps)
|
||||
# Default fast_seek=True: -ss before -i (low RAM, quick). Set READER_FAST_SEEK=0 for frame-accurate (high RAM, slow for late segments).
|
||||
fast_seek = environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no')
|
||||
tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s, n_frames={n_frames}, fast_seek={fast_seek})', file=stderr)
|
||||
stderr.flush()
|
||||
# READER_FAST_SEEK=1: -ss before -i (keyframe seek, low RAM, no decode from 0). Else -i then -ss (frame-accurate but decodes 0..start = high RAM for late segments).
|
||||
if fast_seek:
|
||||
reader_cmd = [
|
||||
'ffmpeg', '-ss', str(task.start_sec), '-noaccurate_seek', '-i', self._source_file,
|
||||
'-frames:v', str(n_frames), '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
|
||||
]
|
||||
else:
|
||||
reader_cmd = [
|
||||
'ffmpeg', '-i', self._source_file,
|
||||
'-ss', str(task.start_sec), '-frames:v', str(n_frames),
|
||||
'-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
|
||||
]
|
||||
ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg'
|
||||
# -r only: lock output to CFR; do not use -frames:v or segments get truncated when reader sends fewer frames
|
||||
encoder_cmd = [
|
||||
ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:',
|
||||
'-gpu', str(self._gpu_id),
|
||||
'-r', str(task.fps),
|
||||
*task.ffmpeg_args,
|
||||
'-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1'
|
||||
]
|
||||
if self._host == 'localhost' and sys_platform != 'win32':
|
||||
encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd
|
||||
if self._host != 'localhost':
|
||||
encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)]
|
||||
|
||||
reader_stderr_lines = []
|
||||
if verbose:
|
||||
self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
|
||||
def read_reader_stderr():
|
||||
rp = getattr(self, '_reader_proc', None)
|
||||
if rp and rp.stderr:
|
||||
for line in iter(rp.stderr.readline, b''):
|
||||
reader_stderr_lines.append(line.decode(errors='replace').rstrip())
|
||||
Thread(target=read_reader_stderr, daemon=True).start()
|
||||
else:
|
||||
self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL)
|
||||
ret = -1
|
||||
stall_done = [False] # watchdog sets True when it kills
|
||||
|
||||
def stall_watchdog():
|
||||
if SEGMENT_STALL_TIMEOUT <= 0:
|
||||
return
|
||||
while not self._should_stop and not stall_done[0]:
|
||||
sleep(60)
|
||||
if stall_done[0] or self._should_stop:
|
||||
break
|
||||
if time() - last_progress[0] > SEGMENT_STALL_TIMEOUT:
|
||||
ago = time() - last_progress[0]
|
||||
enc_t = last_encoder_t[0]
|
||||
tqdm.write(f' {self._host_tag()}: STALLED {self._current_file} (no progress {ago:.0f}s; last encoder at {enc_t:.1f}s into segment)', file=stderr)
|
||||
if not fast_seek and task.start_sec > 60:
|
||||
tqdm.write(f' {self._host_tag()}: (reader -ss {task.start_sec:.0f} after -i may still be decoding 0..{task.start_sec:.0f}s from source)', file=stderr)
|
||||
tqdm.write(f' {self._host_tag()}: killing and re-queuing', file=stderr)
|
||||
stderr.flush()
|
||||
stall_done[0] = True
|
||||
self.stop()
|
||||
break
|
||||
proc = getattr(self._ffmpeg, '_proc', None) if self._ffmpeg else None
|
||||
if proc is not None and proc.poll() is not None:
|
||||
break
|
||||
|
||||
watchdog = Thread(target=stall_watchdog, daemon=True)
|
||||
watchdog.start()
|
||||
try:
|
||||
with open(task.output_file, 'wb') as outfile:
|
||||
self._ffmpeg = FFMPEGProc(
|
||||
encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile,
|
||||
update_callback=upd, binary_io=True
|
||||
)
|
||||
ret = self._ffmpeg.run()
|
||||
finally:
|
||||
stall_done[0] = True
|
||||
watchdog.join(timeout=2)
|
||||
rp = getattr(self, '_reader_proc', None)
|
||||
if rp is not None:
|
||||
try:
|
||||
rp.wait(timeout=2)
|
||||
except TimeoutExpired:
|
||||
rp.terminate()
|
||||
rp.wait(timeout=5)
|
||||
self._reader_proc = None
|
||||
|
||||
if ret != 0:
|
||||
try:
|
||||
if isfile(task.output_file):
|
||||
unlink(task.output_file)
|
||||
except OSError:
|
||||
pass
|
||||
print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True)
|
||||
if self._ffmpeg.stderr:
|
||||
print(self._ffmpeg.stderr, file=stderr, end='', flush=True)
|
||||
if reader_stderr_lines:
|
||||
tqdm.write(f' {self._host_tag()}: Reader stderr:', file=stderr)
|
||||
for line in reader_stderr_lines[-50:]: # last 50 lines
|
||||
tqdm.write(f' {line}', file=stderr)
|
||||
stderr.flush()
|
||||
self._task_queue.put(task)
|
||||
else:
|
||||
tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr)
|
||||
stderr.flush()
|
||||
except Empty:
|
||||
pass
|
||||
self._bar.close()
|
||||
|
||||
def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None):
|
||||
input_file = abspath(expanduser(input_file))
|
||||
output_file = abspath(expanduser(output_file))
|
||||
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
|
||||
try:
|
||||
mkdir(tmp_dir)
|
||||
except FileExistsError:
|
||||
pass # previous job: resume and re-queue failed segments
|
||||
|
||||
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes')
|
||||
tqdm.write('[1/4] Probing keyframes and duration...', file=stderr)
|
||||
stderr.flush()
|
||||
keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path)
|
||||
total_duration = _get_duration_seconds(input_file, probe_host, probe_path)
|
||||
fps = _get_fps(input_file, probe_host, probe_path)
|
||||
segments = _build_segments(keyframe_times, segment_seconds, total_duration)
|
||||
|
||||
if not segments:
|
||||
tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)
|
||||
return False
|
||||
|
||||
MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode
|
||||
DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py
|
||||
task_queue = SimpleQueue()
|
||||
removed = 0
|
||||
for i, (start_sec, end_sec) in enumerate(segments):
|
||||
duration_sec = end_sec - start_sec
|
||||
output_path = f'{tmp_dir}/{i:08d}.mp4'
|
||||
if isfile(output_path):
|
||||
try:
|
||||
if getsize(output_path) < MIN_SEGMENT_BYTES:
|
||||
unlink(output_path)
|
||||
removed += 1
|
||||
elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC):
|
||||
unlink(output_path)
|
||||
removed += 1
|
||||
else:
|
||||
continue
|
||||
except OSError:
|
||||
pass
|
||||
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args), fps))
|
||||
|
||||
n_tasks = task_queue.qsize()
|
||||
if removed:
|
||||
tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr)
|
||||
if n_tasks == 0:
|
||||
tqdm.write('All segments already done (resume).', file=stderr)
|
||||
else:
|
||||
tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr)
|
||||
stderr.flush()
|
||||
dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued')
|
||||
|
||||
tqdm.write(f'[3/4] Encoding segments on {len(workers)} worker(s)...', file=stderr)
|
||||
if environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no'):
|
||||
tqdm.write(' (Reader: fast seek -ss before -i; set READER_FAST_SEEK=0 for frame-accurate)', file=stderr)
|
||||
stderr.flush()
|
||||
threads = [TaskThread(host, gpu_id, input_file, task_queue, pos, remote_ffmpeg_path) for pos, (host, gpu_id) in enumerate(workers, 0)]
|
||||
|
||||
def stop_all():
|
||||
tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr)
|
||||
stderr.flush()
|
||||
for thread in threads:
|
||||
thread.stop()
|
||||
for thread in threads:
|
||||
thread.join(timeout=5)
|
||||
exit(1)
|
||||
|
||||
stop_file = abspath('.encode_stop')
|
||||
|
||||
def stdin_stop_listener():
|
||||
try:
|
||||
while True:
|
||||
line = stdin.readline()
|
||||
if not line:
|
||||
break
|
||||
if line.strip().lower() == 'stop':
|
||||
stop_all()
|
||||
break
|
||||
except (OSError, EOFError):
|
||||
pass
|
||||
|
||||
def stop_file_poller():
|
||||
try:
|
||||
while True:
|
||||
sleep(1.5)
|
||||
if isfile(stop_file):
|
||||
try:
|
||||
unlink(stop_file)
|
||||
except OSError:
|
||||
pass
|
||||
stop_all()
|
||||
break
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def sigint(sig, stack):
|
||||
stop_all()
|
||||
|
||||
try:
|
||||
signal(SIGINT, sigint)
|
||||
except (ValueError, OSError):
|
||||
pass # SIGINT not available on this platform
|
||||
|
||||
if getattr(stdin, 'isatty', lambda: False)():
|
||||
tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
|
||||
else:
|
||||
tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
|
||||
stderr.flush()
|
||||
if getattr(stdin, 'isatty', lambda: False)():
|
||||
Thread(target=stdin_stop_listener, daemon=True).start()
|
||||
Thread(target=stop_file_poller, daemon=True).start()
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
try:
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
except KeyboardInterrupt:
|
||||
stop_all()
|
||||
|
||||
list_path = f'{tmp_dir}/output_segments.txt'
|
||||
segment_files = sorted(glob(f'{tmp_dir}/*.mp4'))
|
||||
expected_count = len(segments)
|
||||
if len(segment_files) < expected_count:
|
||||
tqdm.write(f'[4/4] ERROR: Only {len(segment_files)} of {expected_count} segments produced. Missing segments (check for FAILED lines above; fix or exclude failing worker and re-run).', file=stderr)
|
||||
stderr.flush()
|
||||
return False
|
||||
|
||||
with open(list_path, 'w') as f:
|
||||
f.write('\n'.join([f"file '{fpath}'" for fpath in segment_files]))
|
||||
|
||||
CONCAT_DURATION_TOLERANCE = 0.1
|
||||
segments_total = sum(_probe_duration(p) for p in segment_files)
|
||||
duration_diff = segments_total - total_duration
|
||||
if abs(duration_diff) > CONCAT_DURATION_TOLERANCE:
|
||||
if duration_diff < -1.0:
|
||||
tqdm.write(f'[4/4] WARNING: Segment total ({segments_total:.2f}s) is {abs(duration_diff):.2f}s shorter than source ({total_duration:.2f}s). Proceeding with concat (output capped to source length).', file=stderr)
|
||||
stderr.flush()
|
||||
# exit(1) # re-enable for hard fail when segment setup is wrong
|
||||
tqdm.write(f'[4/4] WARNING: Segment total duration ({segments_total:.2f}s) differs from source ({total_duration:.2f}s) by {duration_diff:+.2f}s. Output will be capped to source length; it may not be a frame-accurate mirror.', file=stderr)
|
||||
stderr.flush()
|
||||
|
||||
tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr)
|
||||
concat_extra = ['-stats_period', '5'] if verbose else []
|
||||
concat_cmd = [
|
||||
'ffmpeg', *concat_extra, '-i', input_file,
|
||||
'-f', 'concat', '-safe', '0', '-i', list_path,
|
||||
'-map_metadata', '0:g',
|
||||
'-map', '1:v', '-map', '0:a?', '-map', '0:s?',
|
||||
'-c:v', 'copy', '-c:s', 'copy',
|
||||
*split(concat_args),
|
||||
'-t', str(total_duration),
|
||||
'-y', output_file
|
||||
]
|
||||
tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr)
|
||||
with TqdmAbsolute(desc='concatenating output segments') as bar:
|
||||
def upd(frames, fps, time, duration, speed):
|
||||
bar.total = duration
|
||||
bar.update(time)
|
||||
ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose)
|
||||
if ffmpeg.run() != 0:
|
||||
tqdm.write(ffmpeg.stderr, file=stderr)
|
||||
return False
|
||||
unlink(list_path)
|
||||
|
||||
if not keep_tmp:
|
||||
rmtree(tmp_dir)
|
||||
return True
|
||||
|
||||
def _parse_workers(host_specs):
|
||||
"""Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...]."""
|
||||
workers = []
|
||||
for spec in host_specs or []:
|
||||
spec = (spec or "").strip()
|
||||
if not spec:
|
||||
continue
|
||||
if ":" in spec:
|
||||
host, gpu = spec.rsplit(":", 1)
|
||||
try:
|
||||
workers.append((host.strip(), int(gpu.strip())))
|
||||
except ValueError:
|
||||
workers.append((spec, 0))
|
||||
else:
|
||||
workers.append((spec, 0))
|
||||
return workers
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple workers (host:gpu) in parallel using ffmpeg over SSH.')
|
||||
parser.add_argument('input_file', help='File to encode.')
|
||||
parser.add_argument('output_file', help='Path to encoded output file.')
|
||||
parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances (e.g. NVENC: -c:v hevc_nvenc -preset p7 ...). -gpu is added per worker.')
|
||||
parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata.')
|
||||
parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.')
|
||||
parser.add_argument('-H', '--host', action='append', help='Worker as host or host:gpu (e.g. -H Pyro:0 -H RenderScrap:0 -H RenderScrap:1).', required=True)
|
||||
parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.')
|
||||
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
|
||||
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
|
||||
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
|
||||
parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.')
|
||||
parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).')
|
||||
args = parser.parse_args()
|
||||
workers = _parse_workers(args.host)
|
||||
if not workers:
|
||||
parser.error('At least one worker required (e.g. -H Pyro:0 -H RenderScrap:0)')
|
||||
encode(
|
||||
workers,
|
||||
args.input_file,
|
||||
args.output_file,
|
||||
segment_seconds=args.segment_length,
|
||||
remote_args=args.remote_args,
|
||||
concat_args=args.concat_args,
|
||||
tmp_dir=args.tmp_dir,
|
||||
keep_tmp=args.keep_tmp,
|
||||
resume=args.resume,
|
||||
copy_input=args.copy_input,
|
||||
probe_host=args.probe_host,
|
||||
probe_path=args.probe_path
|
||||
)
|
||||
@@ -1,2 +0,0 @@
|
||||
# For distributed mode (encode_VOD_pyro.py -d): ffmpeg_distributed uses tqdm
|
||||
tqdm>=4.0.0
|
||||
Reference in New Issue
Block a user