ffmpeg dist working with av1
This commit is contained in:
File diff suppressed because one or more lines are too long
+22
-5
@@ -1,3 +1,4 @@
|
|||||||
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
@@ -9,10 +10,10 @@ from datetime import datetime
|
|||||||
import shutil
|
import shutil
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# Distributed encode defaults (AV1 CQ 0, maxrate 9000k; override via DISTRIBUTED_REMOTE_ARGS / DISTRIBUTED_HOSTS).
|
# Distributed encode defaults (AV1 libaom: crf 0, maxrate 9000k; -cpu-used 8 = faster, -threads 0 = use all cores; override via DISTRIBUTED_REMOTE_ARGS).
|
||||||
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
|
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
|
||||||
DISTRIBUTED_HOSTS_DEFAULT = ["PostIrony", "Pyro", "RenderScrap", "root@GuiltsCurse", "root@Godzilla"]
|
DISTRIBUTED_HOSTS_DEFAULT = ["Pyro", "RenderScrap", "root@GuiltsCurse", "PostIrony", "root@Godzilla"]
|
||||||
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v libsvtav1 -crf 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -an"
|
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v libaom-av1 -crf 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -cpu-used 8 -threads 0 -an"
|
||||||
DISTRIBUTED_SEGMENT_SECONDS = 60
|
DISTRIBUTED_SEGMENT_SECONDS = 60
|
||||||
|
|
||||||
# ANSI color codes
|
# ANSI color codes
|
||||||
@@ -342,8 +343,9 @@ def encode_dvr(input_file, output_dir, gpu):
|
|||||||
f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}")
|
f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}")
|
||||||
|
|
||||||
|
|
||||||
def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, remote_args=None, concat_args="-c:a copy"):
|
def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, remote_args=None, concat_args="-c:a copy", probe_host=None, probe_path=None, remote_ffmpeg_path=None):
|
||||||
"""Encode one file using ffmpeg_distributed (split -> farm -> concat). CWD is set to output_dir for temp files."""
|
"""Encode one file using ffmpeg_distributed (split -> farm -> concat). Segment temp dirs go under script dir/tmp/.
|
||||||
|
If probe_host and probe_path are set, ffprobe runs there (faster when input is on NAS)."""
|
||||||
input_path = Path(input_file).resolve()
|
input_path = Path(input_file).resolve()
|
||||||
output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve()
|
output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve()
|
||||||
if output_path.exists():
|
if output_path.exists():
|
||||||
@@ -351,12 +353,23 @@ def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, re
|
|||||||
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
|
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
|
||||||
return
|
return
|
||||||
remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT)
|
remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT)
|
||||||
|
probe_host = probe_host or os.environ.get("PROBE_HOST")
|
||||||
|
if probe_path is None and probe_host and os.environ.get("PROBE_PATH_PREFIX"):
|
||||||
|
prefix = os.environ.get("PROBE_PATH_PREFIX", "").rstrip("/")
|
||||||
|
probe_path = f"{prefix}/{input_path.name}"
|
||||||
|
script_dir = Path(__file__).resolve().parent
|
||||||
|
tmp_base = script_dir / "tmp"
|
||||||
|
tmp_base.mkdir(exist_ok=True)
|
||||||
|
path_for_hash = os.path.abspath(os.path.expanduser(str(input_path)))
|
||||||
|
segment_hash = hashlib.md5(path_for_hash.encode()).hexdigest()
|
||||||
|
tmp_dir = str(tmp_base / f"ffmpeg_segments_{segment_hash}")
|
||||||
cwd = os.getcwd()
|
cwd = os.getcwd()
|
||||||
try:
|
try:
|
||||||
os.chdir(output_dir)
|
os.chdir(output_dir)
|
||||||
from ffmpeg_distributed import encode as distributed_encode
|
from ffmpeg_distributed import encode as distributed_encode
|
||||||
safe_log_info(f"Distributed encode: {input_path} -> {output_path} (hosts: {hosts})")
|
safe_log_info(f"Distributed encode: {input_path} -> {output_path} (hosts: {hosts})")
|
||||||
print(f"{Colors.BLUE}Distributed encode (AV1): {input_path.name}{Colors.ENDC}")
|
print(f"{Colors.BLUE}Distributed encode (AV1): {input_path.name}{Colors.ENDC}")
|
||||||
|
remote_ffmpeg = remote_ffmpeg_path or os.environ.get("DISTRIBUTED_REMOTE_FFMPEG_PATH")
|
||||||
distributed_encode(
|
distributed_encode(
|
||||||
hosts,
|
hosts,
|
||||||
str(input_path),
|
str(input_path),
|
||||||
@@ -364,6 +377,10 @@ def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, re
|
|||||||
segment_seconds=segment_seconds,
|
segment_seconds=segment_seconds,
|
||||||
remote_args=remote_args,
|
remote_args=remote_args,
|
||||||
concat_args=concat_args,
|
concat_args=concat_args,
|
||||||
|
tmp_dir=tmp_dir,
|
||||||
|
probe_host=probe_host,
|
||||||
|
probe_path=probe_path,
|
||||||
|
remote_ffmpeg_path=remote_ffmpeg,
|
||||||
)
|
)
|
||||||
if output_path.exists():
|
if output_path.exists():
|
||||||
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
|
safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}")
|
||||||
|
|||||||
+259
-82
@@ -7,10 +7,10 @@ from glob import glob
|
|||||||
from os import mkdir, unlink, listdir, environ
|
from os import mkdir, unlink, listdir, environ
|
||||||
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
|
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from time import sleep
|
from time import sleep, time
|
||||||
from typing import List, NamedTuple, Callable, Union
|
from typing import List, NamedTuple, Callable, Union, Tuple
|
||||||
from signal import signal, SIGINT
|
from signal import signal, SIGINT
|
||||||
from sys import exit, stderr
|
from sys import exit, stderr, platform as sys_platform
|
||||||
from shlex import split, join
|
from shlex import split, join
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from time import strptime
|
from time import strptime
|
||||||
@@ -30,10 +30,114 @@ def _popen(args, **kwargs):
|
|||||||
kwargs['universal_newlines'] = True
|
kwargs['universal_newlines'] = True
|
||||||
return Popen(args, **kwargs)
|
return Popen(args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]:
|
||||||
|
"""Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast).
|
||||||
|
If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed."""
|
||||||
|
cmd = [
|
||||||
|
'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey',
|
||||||
|
'-show_entries', 'frame=pts_time', '-of', 'csv=p=0',
|
||||||
|
probe_path if (probe_host and probe_path) else input_file
|
||||||
|
]
|
||||||
|
if probe_host and probe_path:
|
||||||
|
cmd = ['ssh', probe_host, join(cmd)]
|
||||||
|
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||||
|
out, err = proc.communicate()
|
||||||
|
if proc.returncode != 0 and err:
|
||||||
|
dprint('ffprobe keyframes:', err)
|
||||||
|
out = out or ''
|
||||||
|
times = []
|
||||||
|
for line in out.strip().splitlines():
|
||||||
|
line = line.strip().split('=')[-1] if '=' in line else line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
times.append(float(line))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return sorted(times)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float:
|
||||||
|
"""Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there."""
|
||||||
|
path = probe_path if (probe_host and probe_path) else input_file
|
||||||
|
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path]
|
||||||
|
if probe_host and probe_path:
|
||||||
|
cmd = ['ssh', probe_host, join(cmd)]
|
||||||
|
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||||
|
out, _ = proc.communicate()
|
||||||
|
out = (out or '').strip()
|
||||||
|
try:
|
||||||
|
if out:
|
||||||
|
return float(out)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path]
|
||||||
|
if probe_host and probe_path:
|
||||||
|
cmd = ['ssh', probe_host, join(cmd)]
|
||||||
|
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||||
|
out, _ = proc.communicate()
|
||||||
|
out = (out or '').strip()
|
||||||
|
try:
|
||||||
|
if out:
|
||||||
|
return float(out)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool:
|
||||||
|
"""Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py)."""
|
||||||
|
proc = Popen(
|
||||||
|
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', segment_path],
|
||||||
|
stdout=PIPE, stderr=PIPE, universal_newlines=True
|
||||||
|
)
|
||||||
|
out, _ = proc.communicate()
|
||||||
|
if proc.returncode != 0:
|
||||||
|
return False
|
||||||
|
out = (out or '').strip()
|
||||||
|
try:
|
||||||
|
duration = float(out)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return abs(duration - expected_duration_sec) <= tolerance_sec
|
||||||
|
|
||||||
|
|
||||||
|
def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:
|
||||||
|
"""Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds."""
|
||||||
|
if total_duration <= 0:
|
||||||
|
return []
|
||||||
|
if not keyframe_times or len(keyframe_times) == 1:
|
||||||
|
# No keyframes: split at fixed intervals so we still get multiple segments
|
||||||
|
segments = []
|
||||||
|
start = 0.0
|
||||||
|
while start < total_duration:
|
||||||
|
end = min(start + segment_seconds, total_duration)
|
||||||
|
segments.append((start, end))
|
||||||
|
start = end
|
||||||
|
return segments
|
||||||
|
segments = []
|
||||||
|
i = 0
|
||||||
|
while i < len(keyframe_times):
|
||||||
|
start = keyframe_times[i]
|
||||||
|
end_target = start + segment_seconds
|
||||||
|
j = i + 1
|
||||||
|
while j < len(keyframe_times) and keyframe_times[j] < end_target:
|
||||||
|
j += 1
|
||||||
|
if j < len(keyframe_times):
|
||||||
|
end = keyframe_times[j]
|
||||||
|
else:
|
||||||
|
end = total_duration
|
||||||
|
segments.append((start, end))
|
||||||
|
i = j
|
||||||
|
return segments
|
||||||
|
|
||||||
|
|
||||||
class Task(NamedTuple):
|
class Task(NamedTuple):
|
||||||
input_file: str
|
start_sec: float
|
||||||
|
duration_sec: float
|
||||||
output_file: str
|
output_file: str
|
||||||
ffmpeg_args: List[str] = []
|
ffmpeg_args: List[str]
|
||||||
|
|
||||||
class FFMPEGProc:
|
class FFMPEGProc:
|
||||||
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
|
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
|
||||||
@@ -43,7 +147,7 @@ class FFMPEGProc:
|
|||||||
def _match_to_sec(match):
|
def _match_to_sec(match):
|
||||||
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
|
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
|
||||||
|
|
||||||
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None):
|
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False):
|
||||||
self._cmd = cmd
|
self._cmd = cmd
|
||||||
self._update_callback = update_callback
|
self._update_callback = update_callback
|
||||||
self._should_stop = False
|
self._should_stop = False
|
||||||
@@ -51,6 +155,8 @@ class FFMPEGProc:
|
|||||||
self._duration = None
|
self._duration = None
|
||||||
self._stdin = stdin
|
self._stdin = stdin
|
||||||
self._stdout = stdout
|
self._stdout = stdout
|
||||||
|
self._binary_io = binary_io
|
||||||
|
self._echo_stderr = echo_stderr
|
||||||
self.stderr = ''
|
self.stderr = ''
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
@@ -59,11 +165,14 @@ class FFMPEGProc:
|
|||||||
def _read_stderr_loop(self, stderr_lines: list):
|
def _read_stderr_loop(self, stderr_lines: list):
|
||||||
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
|
"""Read stderr in a loop (used on Windows where select.poll is unavailable)."""
|
||||||
while True:
|
while True:
|
||||||
line = self._proc.stderr.readline()
|
raw = self._proc.stderr.readline()
|
||||||
if not line and self._proc.poll() is not None:
|
if not raw and self._proc.poll() is not None:
|
||||||
break
|
break
|
||||||
if line:
|
if raw:
|
||||||
|
line = raw.decode(errors='replace') if self._binary_io else raw
|
||||||
stderr_lines.append(line)
|
stderr_lines.append(line)
|
||||||
|
if self._echo_stderr:
|
||||||
|
tqdm.write(line.rstrip(), file=stderr)
|
||||||
match = self._progress_re.match(line)
|
match = self._progress_re.match(line)
|
||||||
if match and self._update_callback:
|
if match and self._update_callback:
|
||||||
self._update_callback(
|
self._update_callback(
|
||||||
@@ -79,7 +188,10 @@ class FFMPEGProc:
|
|||||||
self._duration = self._match_to_sec(dm)
|
self._duration = self._match_to_sec(dm)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self._proc = Popen(self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=True)
|
self._proc = Popen(
|
||||||
|
self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout,
|
||||||
|
universal_newlines=not self._binary_io
|
||||||
|
)
|
||||||
if getattr(select, 'poll', None):
|
if getattr(select, 'poll', None):
|
||||||
poll = select.poll()
|
poll = select.poll()
|
||||||
poll.register(self._proc.stderr)
|
poll.register(self._proc.stderr)
|
||||||
@@ -88,7 +200,10 @@ class FFMPEGProc:
|
|||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
continue
|
continue
|
||||||
sleep(0.001)
|
sleep(0.001)
|
||||||
line = self._proc.stderr.readline()
|
raw = self._proc.stderr.readline()
|
||||||
|
line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '')
|
||||||
|
if self._echo_stderr and line:
|
||||||
|
tqdm.write(line.rstrip(), file=stderr)
|
||||||
match = self._progress_re.match(line)
|
match = self._progress_re.match(line)
|
||||||
if not match:
|
if not match:
|
||||||
self.stderr += line
|
self.stderr += line
|
||||||
@@ -106,6 +221,8 @@ class FFMPEGProc:
|
|||||||
self._duration = self._match_to_sec(match)
|
self._duration = self._match_to_sec(match)
|
||||||
try:
|
try:
|
||||||
_, err = self._proc.communicate(timeout=1)
|
_, err = self._proc.communicate(timeout=1)
|
||||||
|
if err and self._binary_io:
|
||||||
|
err = err.decode(errors='replace') if isinstance(err, bytes) else err
|
||||||
self.stderr += err or ''
|
self.stderr += err or ''
|
||||||
except TimeoutExpired:
|
except TimeoutExpired:
|
||||||
pass
|
pass
|
||||||
@@ -131,94 +248,150 @@ class TqdmAbsolute(tqdm):
|
|||||||
def update(self, to):
|
def update(self, to):
|
||||||
super().update(to - self.n) # will also set self.n = b * bsize
|
super().update(to - self.n) # will also set self.n = b * bsize
|
||||||
|
|
||||||
|
HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red
|
||||||
|
RESET = '\033[0m'
|
||||||
|
|
||||||
class TaskThread(Thread):
|
class TaskThread(Thread):
|
||||||
def __init__(self, host: str, task_queue: SimpleQueue, bar_pos):
|
def __init__(self, host: str, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self._should_stop = False
|
self._should_stop = False
|
||||||
self._host = host
|
self._host = host
|
||||||
|
self._bar_pos = bar_pos
|
||||||
|
self._remote_ffmpeg_path = remote_ffmpeg_path
|
||||||
|
self._source_file = source_file
|
||||||
self._task_queue = task_queue
|
self._task_queue = task_queue
|
||||||
self._ffmpeg = None
|
self._ffmpeg = None
|
||||||
self._bar = TqdmAbsolute(desc=host, position=bar_pos)
|
self._bar = TqdmAbsolute(desc=host, position=bar_pos)
|
||||||
self._current_file = None
|
self._current_file = None
|
||||||
|
|
||||||
|
def _host_tag(self):
|
||||||
|
c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)]
|
||||||
|
return f'{c}{self._host}{RESET}'
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._should_stop = True
|
self._should_stop = True
|
||||||
if self._ffmpeg:
|
if self._ffmpeg:
|
||||||
self._ffmpeg.stop()
|
self._ffmpeg.stop()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
def upd(frames, fps, time, duration, speed):
|
last_log = [0.0] # mutable for progress heartbeat
|
||||||
|
def upd(frames, fps, t, duration, speed):
|
||||||
self._bar.total = duration or 999
|
self._bar.total = duration or 999
|
||||||
self._bar.desc = self._host + ': ' + self._current_file
|
self._bar.desc = self._host + ': ' + (self._current_file or '')
|
||||||
self._bar.update(time)
|
self._bar.update(t)
|
||||||
|
if duration and duration > 0 and (time() - last_log[0]) >= 30:
|
||||||
|
tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr)
|
||||||
|
stderr.flush()
|
||||||
|
last_log[0] = time()
|
||||||
try:
|
try:
|
||||||
while not self._should_stop:
|
while not self._should_stop:
|
||||||
task = self._task_queue.get(False)
|
task = self._task_queue.get(False)
|
||||||
|
|
||||||
self._current_file = basename(task.input_file)
|
self._current_file = basename(task.output_file)
|
||||||
with open(task.input_file, 'r') as infile, open(task.output_file, 'w') as outfile:
|
tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s)', file=stderr)
|
||||||
ffmpeg_cmd = [
|
stderr.flush()
|
||||||
'nice', '-n10', 'ionice', '-c3',
|
reader_cmd = [
|
||||||
'ffmpeg', '-f', 'matroska', '-i', 'pipe:',
|
'ffmpeg', '-ss', str(task.start_sec), '-t', str(task.duration_sec),
|
||||||
*task.ffmpeg_args,
|
'-i', self._source_file, '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1'
|
||||||
'-f', 'matroska', 'pipe:'
|
]
|
||||||
]
|
ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg'
|
||||||
if self._host != 'localhost':
|
encoder_cmd = [
|
||||||
ffmpeg_cmd = ['ssh', self._host, join(ffmpeg_cmd)]
|
ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:',
|
||||||
self._ffmpeg = FFMPEGProc(ffmpeg_cmd, stdin=infile, stdout=outfile, update_callback=upd)
|
*task.ffmpeg_args,
|
||||||
|
'-f', 'mpegts', 'pipe:1'
|
||||||
|
]
|
||||||
|
if self._host == 'localhost' and sys_platform != 'win32':
|
||||||
|
encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd
|
||||||
|
if self._host != 'localhost':
|
||||||
|
encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)]
|
||||||
|
|
||||||
ret = self._ffmpeg.run()
|
reader_proc = Popen(reader_cmd, stdout=PIPE, stderr=DEVNULL)
|
||||||
if ret != 0:
|
ret = -1
|
||||||
tqdm.write(f'task for {self._current_file} failed on host {self._host}', file=stderr)
|
try:
|
||||||
tqdm.write(self._ffmpeg.stderr, file=stderr)
|
with open(task.output_file, 'wb') as outfile:
|
||||||
self._task_queue.put(task)
|
self._ffmpeg = FFMPEGProc(
|
||||||
|
encoder_cmd, stdin=reader_proc.stdout, stdout=outfile,
|
||||||
|
update_callback=upd, binary_io=True
|
||||||
|
)
|
||||||
|
ret = self._ffmpeg.run()
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
reader_proc.wait(timeout=2)
|
||||||
|
except TimeoutExpired:
|
||||||
|
reader_proc.terminate()
|
||||||
|
reader_proc.wait(timeout=5)
|
||||||
|
|
||||||
|
if ret != 0:
|
||||||
|
try:
|
||||||
|
if isfile(task.output_file):
|
||||||
|
unlink(task.output_file)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True)
|
||||||
|
if self._ffmpeg.stderr:
|
||||||
|
print(self._ffmpeg.stderr, file=stderr, end='', flush=True)
|
||||||
|
self._task_queue.put(task)
|
||||||
|
else:
|
||||||
|
tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr)
|
||||||
|
stderr.flush()
|
||||||
except Empty:
|
except Empty:
|
||||||
pass
|
pass
|
||||||
self._bar.close()
|
self._bar.close()
|
||||||
|
|
||||||
def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False):
|
def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None):
|
||||||
input_file = abspath(expanduser(input_file))
|
input_file = abspath(expanduser(input_file))
|
||||||
output_file = abspath(expanduser(output_file))
|
output_file = abspath(expanduser(output_file))
|
||||||
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
|
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
|
||||||
tmp_in = f'{tmp_dir}/in'
|
|
||||||
tmp_out = f'{tmp_dir}/out'
|
|
||||||
try:
|
try:
|
||||||
mkdir(tmp_dir)
|
mkdir(tmp_dir)
|
||||||
mkdir(tmp_in)
|
|
||||||
mkdir(tmp_out)
|
|
||||||
except FileExistsError:
|
except FileExistsError:
|
||||||
if not resume:
|
pass # previous job: resume and re-queue failed segments
|
||||||
raise
|
|
||||||
|
|
||||||
# skip splitting on resume
|
verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes')
|
||||||
if len(listdir(tmp_in)) == 0 or not resume:
|
tqdm.write('[1/4] Probing keyframes and duration...', file=stderr)
|
||||||
cv = ['copy'] if copy_input else ['libx264', '-crf', '0', '-preset', 'ultrafast', '-bf', '0']
|
stderr.flush()
|
||||||
with TqdmAbsolute(desc="splitting input file") as bar:
|
keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path)
|
||||||
def upd(frames, fps, time, duration, speed):
|
total_duration = _get_duration_seconds(input_file, probe_host, probe_path)
|
||||||
bar.total = duration
|
segments = _build_segments(keyframe_times, segment_seconds, total_duration)
|
||||||
bar.update(time)
|
|
||||||
ffmpeg = FFMPEGProc([
|
|
||||||
'ffmpeg', '-i', expanduser(input_file),
|
|
||||||
'-an', '-sn',
|
|
||||||
'-c:v', *cv,
|
|
||||||
'-f', 'segment', '-reset_timestamps', '1', '-segment_time', str(segment_seconds) + 's',
|
|
||||||
tmp_in + '/%08d.mkv'
|
|
||||||
],
|
|
||||||
update_callback=upd
|
|
||||||
)
|
|
||||||
ret = ffmpeg.run()
|
|
||||||
if ret != 0:
|
|
||||||
tqdm.write(ffmpeg.stderr, file=stderr)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
if not segments:
|
||||||
|
tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)
|
||||||
|
return
|
||||||
|
|
||||||
|
MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode
|
||||||
|
DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py
|
||||||
task_queue = SimpleQueue()
|
task_queue = SimpleQueue()
|
||||||
for f in sorted(glob(tmp_in+'/*')):
|
removed = 0
|
||||||
output_segment = tmp_out+f'/{basename(f)}'
|
for i, (start_sec, end_sec) in enumerate(segments):
|
||||||
# skip already encoded segments
|
duration_sec = end_sec - start_sec
|
||||||
if not isfile(output_segment):
|
output_path = f'{tmp_dir}/{i:08d}.ts'
|
||||||
task_queue.put(Task(f, output_segment, split(remote_args)))
|
if isfile(output_path):
|
||||||
|
try:
|
||||||
|
if getsize(output_path) < MIN_SEGMENT_BYTES:
|
||||||
|
unlink(output_path)
|
||||||
|
removed += 1
|
||||||
|
elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC):
|
||||||
|
unlink(output_path)
|
||||||
|
removed += 1
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args)))
|
||||||
|
|
||||||
threads = [TaskThread(host, task_queue, pos) for pos,host in enumerate(hosts,0)]
|
n_tasks = task_queue.qsize()
|
||||||
|
if removed:
|
||||||
|
tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr)
|
||||||
|
if n_tasks == 0:
|
||||||
|
tqdm.write('All segments already done (resume).', file=stderr)
|
||||||
|
else:
|
||||||
|
tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr)
|
||||||
|
stderr.flush()
|
||||||
|
dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued')
|
||||||
|
|
||||||
|
tqdm.write(f'[3/4] Encoding segments on {len(hosts)} host(s)...', file=stderr)
|
||||||
|
stderr.flush()
|
||||||
|
threads = [TaskThread(host, input_file, task_queue, pos, remote_ffmpeg_path) for pos, host in enumerate(hosts, 0)]
|
||||||
|
|
||||||
def sigint(sig, stack):
|
def sigint(sig, stack):
|
||||||
print('Got SIGINT, stopping...')
|
print('Got SIGINT, stopping...')
|
||||||
@@ -235,31 +408,31 @@ def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds:
|
|||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.join()
|
thread.join()
|
||||||
|
|
||||||
with open('output_segments.txt', 'w') as f:
|
list_path = f'{tmp_dir}/output_segments.txt'
|
||||||
f.write('\n'.join([f"file '{file}'" for file in sorted(glob(tmp_out+'/*'))]))
|
with open(list_path, 'w') as f:
|
||||||
|
f.write('\n'.join([f"file '{fpath}'" for fpath in sorted(glob(f'{tmp_dir}/*.ts'))]))
|
||||||
|
|
||||||
|
tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr)
|
||||||
|
concat_extra = ['-stats_period', '5'] if verbose else []
|
||||||
|
concat_cmd = [
|
||||||
|
'ffmpeg', *concat_extra, '-i', input_file,
|
||||||
|
'-f', 'concat', '-safe', '0', '-i', list_path,
|
||||||
|
'-map_metadata', '0:g',
|
||||||
|
'-map', '1:v', '-map', '0:a?', '-map', '0:s?',
|
||||||
|
'-c:v', 'copy', '-c:s', 'copy',
|
||||||
|
*split(concat_args),
|
||||||
|
'-y', output_file
|
||||||
|
]
|
||||||
|
tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr)
|
||||||
with TqdmAbsolute(desc='concatenating output segments') as bar:
|
with TqdmAbsolute(desc='concatenating output segments') as bar:
|
||||||
def upd(frames, fps, time, duration, speed):
|
def upd(frames, fps, time, duration, speed):
|
||||||
bar.total = duration
|
bar.total = duration
|
||||||
bar.update(time)
|
bar.update(time)
|
||||||
ffmpeg = FFMPEGProc([
|
ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose)
|
||||||
'ffmpeg', '-i', input_file,
|
|
||||||
'-f', 'concat', '-safe', '0', '-i', 'output_segments.txt',
|
|
||||||
'-map_metadata', '0:g',
|
|
||||||
'-map', '1:v',
|
|
||||||
'-map', '0:a?',
|
|
||||||
'-map', '0:s?',
|
|
||||||
'-c:v', 'copy',
|
|
||||||
'-c:s', 'copy',
|
|
||||||
*split(concat_args),
|
|
||||||
'-y', output_file
|
|
||||||
],
|
|
||||||
update_callback=upd
|
|
||||||
)
|
|
||||||
if ffmpeg.run() != 0:
|
if ffmpeg.run() != 0:
|
||||||
tqdm.write(ffmpeg.stderr, file=stderr)
|
tqdm.write(ffmpeg.stderr, file=stderr)
|
||||||
return
|
return
|
||||||
unlink('output_segments.txt')
|
unlink(list_path)
|
||||||
|
|
||||||
if not keep_tmp:
|
if not keep_tmp:
|
||||||
rmtree(tmp_dir)
|
rmtree(tmp_dir)
|
||||||
@@ -277,6 +450,8 @@ if __name__ == '__main__':
|
|||||||
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
|
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
|
||||||
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
|
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
|
||||||
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
|
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
|
||||||
|
parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.')
|
||||||
|
parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
encode(
|
encode(
|
||||||
args.host,
|
args.host,
|
||||||
@@ -288,5 +463,7 @@ if __name__ == '__main__':
|
|||||||
tmp_dir=args.tmp_dir,
|
tmp_dir=args.tmp_dir,
|
||||||
keep_tmp=args.keep_tmp,
|
keep_tmp=args.keep_tmp,
|
||||||
resume=args.resume,
|
resume=args.resume,
|
||||||
copy_input=args.copy_input
|
copy_input=args.copy_input,
|
||||||
|
probe_host=args.probe_host,
|
||||||
|
probe_path=args.probe_path
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user