Compare commits
2 Commits
98a00caafc
...
57f95fc74d
| Author | SHA1 | Date | |
|---|---|---|---|
| 57f95fc74d | |||
| 66882233b0 |
+2562
-9
File diff suppressed because one or more lines are too long
+61
-5
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
import json
|
||||
@@ -78,7 +79,7 @@ def get_file_info(input_file):
|
||||
cmd = [
|
||||
'ffprobe',
|
||||
'-v', 'error',
|
||||
'-show_entries', 'format=duration,size:stream=codec_name,width,height,r_frame_rate,channels,channel_layout',
|
||||
'-show_entries', 'format=duration,size:stream=codec_type,codec_name,width,height,r_frame_rate,channels,channel_layout',
|
||||
'-of', 'json',
|
||||
input_file
|
||||
]
|
||||
@@ -102,6 +103,18 @@ def get_audio_labels(input_file):
|
||||
labels.append(title)
|
||||
return labels
|
||||
|
||||
def parse_fps(r_frame_rate):
|
||||
"""Parse ffprobe r_frame_rate e.g. '60/1' or '30000/1001' to float."""
|
||||
if not r_frame_rate:
|
||||
return None
|
||||
parts = str(r_frame_rate).strip().split('/')
|
||||
if len(parts) == 2 and float(parts[1]) != 0:
|
||||
return float(parts[0]) / float(parts[1])
|
||||
try:
|
||||
return float(parts[0])
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
def format_size(size_bytes):
|
||||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||||
if size_bytes < 1024:
|
||||
@@ -193,9 +206,25 @@ def encode_dvr(input_file, output_dir, gpu):
|
||||
print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}")
|
||||
return
|
||||
|
||||
# Get audio labels
|
||||
# Get audio labels and input FPS (for speed display when stderr is piped)
|
||||
audio_labels = get_audio_labels(str(input_path))
|
||||
safe_log_info(f"Audio labels: {audio_labels}")
|
||||
input_fps = None
|
||||
for s in file_info.get('streams', []):
|
||||
if s.get('codec_type') == 'video':
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
break
|
||||
if input_fps is None:
|
||||
# Fallback: first stream with r_frame_rate (e.g. codec_type not in probe)
|
||||
for s in file_info.get('streams', []):
|
||||
if 'width' in s or s.get('codec_name', '').startswith(('h', 'm')):
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
break
|
||||
if input_fps is None:
|
||||
for s in file_info.get('streams', []):
|
||||
input_fps = parse_fps(s.get('r_frame_rate'))
|
||||
if input_fps and input_fps > 0:
|
||||
break
|
||||
|
||||
# FFmpeg command with NVIDIA HEVC encoder and maximum quality
|
||||
cmd = [
|
||||
@@ -235,14 +264,41 @@ def encode_dvr(input_file, output_dir, gpu):
|
||||
bufsize=1
|
||||
)
|
||||
|
||||
# Stream output line-by-line from a single pipe
|
||||
# Stream output line-by-line; compute speed when stderr is piped (FFmpeg shows speed=N/A)
|
||||
for line in iter(process.stdout.readline, ''):
|
||||
if not line:
|
||||
break
|
||||
text = line.strip()
|
||||
text = line.strip().strip('\r')
|
||||
try:
|
||||
if text.startswith('frame=') or ' fps=' in text:
|
||||
safe_log_info(f"Progress: {text}", f"{Colors.PURPLE}Progress: {text}{Colors.ENDC}")
|
||||
display = text
|
||||
m_frame = re.search(r'frame=\s*(\d+)', text)
|
||||
m_elapsed = re.search(r'elapsed=(\d+):(\d+):(\d+\.?\d*)', text)
|
||||
m_size = re.search(r'size=\s*(\d+)\s*KiB', text)
|
||||
if m_frame and m_elapsed:
|
||||
frames = int(m_frame.group(1))
|
||||
h, m, s = float(m_elapsed.group(1)), float(m_elapsed.group(2)), float(m_elapsed.group(3))
|
||||
elapsed_sec = h * 3600 + m * 60 + s
|
||||
if elapsed_sec > 0:
|
||||
if input_fps and input_fps > 0:
|
||||
speed_x = (frames / input_fps) / elapsed_sec
|
||||
display = re.sub(r'speed=N/A', f'speed={speed_x:.2f}x', text)
|
||||
# time = output position (HH:MM:SS.ms)
|
||||
video_sec = frames / input_fps
|
||||
t_h = int(video_sec // 3600)
|
||||
t_m = int((video_sec % 3600) // 60)
|
||||
t_s = video_sec % 60
|
||||
time_str = f'{t_h}:{t_m:02d}:{t_s:06.3f}' if t_h else f'0:{t_m:02d}:{t_s:06.3f}'
|
||||
display = re.sub(r'time=N/A', f'time={time_str}', display)
|
||||
# bitrate when muxer has written data (stays N/A until size > 0)
|
||||
size_kib = int(m_size.group(1)) if m_size else 0
|
||||
if size_kib > 0 and elapsed_sec > 0:
|
||||
bitrate_kbps = (size_kib * 8192) / (elapsed_sec * 1000)
|
||||
display = re.sub(r'bitrate=N/A', f'bitrate={bitrate_kbps:.0f}kbits/s', display)
|
||||
else:
|
||||
enc_fps = frames / elapsed_sec
|
||||
display = re.sub(r'speed=N/A', f'speed={enc_fps:.0f}fps', text)
|
||||
safe_log_info(f"Progress: {text}", f"{Colors.PURPLE}Progress: {display}{Colors.ENDC}")
|
||||
else:
|
||||
safe_log_info(f"FFmpeg: {text}", f"{Colors.GREEN}FFmpeg: {text}{Colors.ENDC}")
|
||||
except (OSError, IOError) as e:
|
||||
|
||||
@@ -0,0 +1,263 @@
|
||||
#!/usr/bin/env python3
|
||||
from queue import SimpleQueue, Empty
|
||||
from subprocess import Popen, DEVNULL, PIPE, TimeoutExpired
|
||||
import select
|
||||
from shutil import rmtree
|
||||
from glob import glob
|
||||
from os import mkdir, unlink, listdir, environ
|
||||
from os.path import basename, abspath, expanduser, isfile, isdir, getsize
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from typing import List, NamedTuple, Callable, Union
|
||||
from signal import signal, SIGINT
|
||||
from sys import exit, stderr
|
||||
from shlex import split, join
|
||||
from hashlib import md5
|
||||
from time import strptime
|
||||
from tqdm import tqdm
|
||||
import re
|
||||
|
||||
DEBUG = 'DEBUG' in environ
|
||||
|
||||
def dprint(*args, **kwargs):
|
||||
if DEBUG:
|
||||
print(*args, *kwargs)
|
||||
|
||||
def _popen(args, **kwargs):
|
||||
dprint(f'calling subprocess: {args}')
|
||||
kwargs['stderr'] = PIPE
|
||||
kwargs['stdout'] = DEVNULL
|
||||
kwargs['universal_newlines'] = True
|
||||
return Popen(args, **kwargs)
|
||||
|
||||
class Task(NamedTuple):
|
||||
input_file: str
|
||||
output_file: str
|
||||
ffmpeg_args: List[str] = []
|
||||
|
||||
class FFMPEGProc:
|
||||
_duration_re = re.compile(r'.*Duration:\s*-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9.]+),')
|
||||
_progress_re = re.compile(r'frame=\s*(?P<frame>[0-9]+)\s+fps=\s*(?P<fps>[0-9]+).*time=-?(?P<time_h>[0-9]+):(?P<time_m>[0-9]+):(?P<time_s>[0-9,.]+)\s+.*speed=(?P<speed>[0-9\.]+)x')
|
||||
|
||||
@staticmethod
|
||||
def _match_to_sec(match):
|
||||
return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s'))
|
||||
|
||||
def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None):
|
||||
self._cmd = cmd
|
||||
self._update_callback = update_callback
|
||||
self._should_stop = False
|
||||
self._shell = shell
|
||||
self._duration = None
|
||||
self._stdin = stdin
|
||||
self._stdout = stdout
|
||||
self.stderr = ''
|
||||
|
||||
def stop(self):
|
||||
self._should_stop = True
|
||||
|
||||
def run(self):
|
||||
self._proc = Popen(self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, universal_newlines=True)
|
||||
poll = select.poll()
|
||||
poll.register(self._proc.stderr)
|
||||
while self._proc.poll() is None and not self._should_stop:
|
||||
if not poll.poll(1):
|
||||
sleep(0.1)
|
||||
continue
|
||||
sleep(0.001)
|
||||
line = self._proc.stderr.readline()
|
||||
match = self._progress_re.match(line)
|
||||
if not match:
|
||||
self.stderr += line
|
||||
|
||||
if match and self._update_callback:
|
||||
self._update_callback(
|
||||
int(match.group('frame')),
|
||||
int(match.group('fps')),
|
||||
self._match_to_sec(match),
|
||||
self._duration,
|
||||
float(match.group('speed'))
|
||||
)
|
||||
elif self._duration is None:
|
||||
match = self._duration_re.match(line)
|
||||
if match:
|
||||
self._duration = self._match_to_sec(match)
|
||||
|
||||
try:
|
||||
out, err = self._proc.communicate(timeout=1)
|
||||
self.stderr += err
|
||||
except TimeoutExpired as ex:
|
||||
pass
|
||||
return self._proc.returncode
|
||||
|
||||
class TqdmAbsolute(tqdm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs['bar_format'] = '{l_bar}{bar}|{n:.1f}/{total:.1f} [{elapsed}<{remaining}]'
|
||||
kwargs['dynamic_ncols'] = True
|
||||
if not 'total' in kwargs:
|
||||
kwargs['total'] = 99999999
|
||||
if not 'leave' in kwargs:
|
||||
kwargs['leave'] = False
|
||||
super().__init__(*args, **kwargs)
|
||||
def update(self, to):
|
||||
super().update(to - self.n) # will also set self.n = b * bsize
|
||||
|
||||
class TaskThread(Thread):
|
||||
def __init__(self, host: str, task_queue: SimpleQueue, bar_pos):
|
||||
super().__init__()
|
||||
self._should_stop = False
|
||||
self._host = host
|
||||
self._task_queue = task_queue
|
||||
self._ffmpeg = None
|
||||
self._bar = TqdmAbsolute(desc=host, position=bar_pos)
|
||||
self._current_file = None
|
||||
|
||||
def stop(self):
|
||||
self._should_stop = True
|
||||
if self._ffmpeg:
|
||||
self._ffmpeg.stop()
|
||||
|
||||
def run(self):
|
||||
def upd(frames, fps, time, duration, speed):
|
||||
self._bar.total = duration or 999
|
||||
self._bar.desc = self._host + ': ' + self._current_file
|
||||
self._bar.update(time)
|
||||
try:
|
||||
while not self._should_stop:
|
||||
task = self._task_queue.get(False)
|
||||
|
||||
self._current_file = basename(task.input_file)
|
||||
with open(task.input_file, 'r') as infile, open(task.output_file, 'w') as outfile:
|
||||
ffmpeg_cmd = [
|
||||
'nice', '-n10', 'ionice', '-c3',
|
||||
'ffmpeg', '-f', 'matroska', '-i', 'pipe:',
|
||||
*task.ffmpeg_args,
|
||||
'-f', 'matroska', 'pipe:'
|
||||
]
|
||||
if self._host != 'localhost':
|
||||
ffmpeg_cmd = ['ssh', self._host, join(ffmpeg_cmd)]
|
||||
self._ffmpeg = FFMPEGProc(ffmpeg_cmd, stdin=infile, stdout=outfile, update_callback=upd)
|
||||
|
||||
ret = self._ffmpeg.run()
|
||||
if ret != 0:
|
||||
tqdm.write(f'task for {self._current_file} failed on host {self._host}', file=stderr)
|
||||
tqdm.write(self._ffmpeg.stderr, file=stderr)
|
||||
self._task_queue.put(task)
|
||||
except Empty:
|
||||
pass
|
||||
self._bar.close()
|
||||
|
||||
def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False):
|
||||
input_file = abspath(expanduser(input_file))
|
||||
output_file = abspath(expanduser(output_file))
|
||||
tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest()
|
||||
tmp_in = f'{tmp_dir}/in'
|
||||
tmp_out = f'{tmp_dir}/out'
|
||||
try:
|
||||
mkdir(tmp_dir)
|
||||
mkdir(tmp_in)
|
||||
mkdir(tmp_out)
|
||||
except FileExistsError:
|
||||
if not resume:
|
||||
raise
|
||||
|
||||
# skip splitting on resume
|
||||
if len(listdir(tmp_in)) == 0 or not resume:
|
||||
cv = ['copy'] if copy_input else ['libx264', '-crf', '0', '-preset', 'ultrafast', '-bf', '0']
|
||||
with TqdmAbsolute(desc="splitting input file") as bar:
|
||||
def upd(frames, fps, time, duration, speed):
|
||||
bar.total = duration
|
||||
bar.update(time)
|
||||
ffmpeg = FFMPEGProc([
|
||||
'ffmpeg', '-i', expanduser(input_file),
|
||||
'-an', '-sn',
|
||||
'-c:v', *cv,
|
||||
'-f', 'segment', '-reset_timestamps', '1', '-segment_time', str(segment_seconds) + 's',
|
||||
tmp_in + '/%08d.mkv'
|
||||
],
|
||||
update_callback=upd
|
||||
)
|
||||
ret = ffmpeg.run()
|
||||
if ret != 0:
|
||||
tqdm.write(ffmpeg.stderr, file=stderr)
|
||||
return
|
||||
|
||||
task_queue = SimpleQueue()
|
||||
for f in sorted(glob(tmp_in+'/*')):
|
||||
output_segment = tmp_out+f'/{basename(f)}'
|
||||
# skip already encoded segments
|
||||
if not isfile(output_segment):
|
||||
task_queue.put(Task(f, output_segment, split(remote_args)))
|
||||
|
||||
threads = [TaskThread(host, task_queue, pos) for pos,host in enumerate(hosts,0)]
|
||||
|
||||
def sigint(sig, stack):
|
||||
print('Got SIGINT, stopping...')
|
||||
for thread in threads:
|
||||
thread.stop()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
exit(1)
|
||||
|
||||
signal(SIGINT, sigint)
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
with open('output_segments.txt', 'w') as f:
|
||||
f.write('\n'.join([f"file '{file}'" for file in sorted(glob(tmp_out+'/*'))]))
|
||||
|
||||
with TqdmAbsolute(desc='concatenating output segments') as bar:
|
||||
def upd(frames, fps, time, duration, speed):
|
||||
bar.total = duration
|
||||
bar.update(time)
|
||||
ffmpeg = FFMPEGProc([
|
||||
'ffmpeg', '-i', input_file,
|
||||
'-f', 'concat', '-safe', '0', '-i', 'output_segments.txt',
|
||||
'-map_metadata', '0:g',
|
||||
'-map', '1:v',
|
||||
'-map', '0:a?',
|
||||
'-map', '0:s?',
|
||||
'-c:v', 'copy',
|
||||
'-c:s', 'copy',
|
||||
*split(concat_args),
|
||||
'-y', output_file
|
||||
],
|
||||
update_callback=upd
|
||||
)
|
||||
if ffmpeg.run() != 0:
|
||||
tqdm.write(ffmpeg.stderr, file=stderr)
|
||||
return
|
||||
unlink('output_segments.txt')
|
||||
|
||||
if not keep_tmp:
|
||||
rmtree(tmp_dir)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple hosts in parallel using ffmpeg over SSH.')
|
||||
parser.add_argument('input_file', help='File to encode.')
|
||||
parser.add_argument('output_file', help='Path to encoded output file.')
|
||||
parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances. For example: "-c:v libx264 -crf 23 -preset fast"')
|
||||
parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata. Mainly useful for audio encoding options, or "-an" to get rid of it.')
|
||||
parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.')
|
||||
parser.add_argument('-H', '--host', action='append', help='SSH hostname(s) to encode on. Use "localhost" to include the machine you\'re running this from. Can include username.', required=True)
|
||||
parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.')
|
||||
parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.')
|
||||
parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.')
|
||||
parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)')
|
||||
args = parser.parse_args()
|
||||
encode(
|
||||
args.host,
|
||||
args.input_file,
|
||||
args.output_file,
|
||||
segment_seconds=args.segment_length,
|
||||
remote_args=args.remote_args,
|
||||
concat_args=args.concat_args,
|
||||
tmp_dir=args.tmp_dir,
|
||||
keep_tmp=args.keep_tmp,
|
||||
resume=args.resume,
|
||||
copy_input=args.copy_input
|
||||
)
|
||||
Reference in New Issue
Block a user