adjust stopping behavior

This commit is contained in:
2026-02-16 00:09:06 -07:00
parent a76bb1e1e0
commit dc7b224005
3 changed files with 1749 additions and 42 deletions
+9 -28
View File
@@ -10,10 +10,9 @@ from datetime import datetime
import shutil import shutil
import time import time
# Distributed encode defaults (AV1 libaom: crf 0, maxrate 9000k; -cpu-used 8 = faster, -threads 0 = use all cores; override via DISTRIBUTED_REMOTE_ARGS).
# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux. # Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux.
DISTRIBUTED_HOSTS_DEFAULT = ["Pyro", "RenderScrap", "root@GuiltsCurse", "PostIrony", "root@Godzilla"] DISTRIBUTED_HOSTS_DEFAULT = ["Pyro", "RenderScrap", "root@GuiltsCurse", "PostIrony", "root@Godzilla"]
DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v libaom-av1 -crf 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -cpu-used 8 -threads 0 -an" DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v libaom-av1 -crf 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -cpu-used 8 -row-mt 1 -an"
DISTRIBUTED_SEGMENT_SECONDS = 60 DISTRIBUTED_SEGMENT_SECONDS = 60
# ANSI color codes # ANSI color codes
@@ -393,36 +392,21 @@ def encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=60, re
if __name__ == "__main__": if __name__ == "__main__":
use_distributed = "--distributed" in sys.argv or "-d" in sys.argv if sys.platform == "win32":
if "--distributed" in sys.argv:
sys.argv.remove("--distributed")
if "-d" in sys.argv:
sys.argv.remove("-d")
if not use_distributed:
choice = input(f"\n{Colors.BLUE}Encode mode: [L]ocal (NVENC) / [D]istributed (farm):{Colors.ENDC} ").strip().upper() or "L"
use_distributed = choice == "D"
if use_distributed and sys.platform == "win32":
print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}") print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}")
input_dir = "input" input_dir = "input"
output_dir = "output" output_dir = "output"
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
if use_distributed: hosts_str = os.environ.get("DISTRIBUTED_HOSTS")
hosts_str = os.environ.get("DISTRIBUTED_HOSTS") if hosts_str:
if hosts_str: hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
hosts = [h.strip() for h in hosts_str.split(",") if h.strip()]
else:
hosts = DISTRIBUTED_HOSTS_DEFAULT
print(f"{Colors.BLUE}Using hosts: {', '.join(hosts)}{Colors.ENDC}")
safe_log_info(f"Distributed mode; hosts: {hosts}")
else: else:
gpu = get_gpu_selection() hosts = DISTRIBUTED_HOSTS_DEFAULT
safe_log_info(f"Selected GPU: {gpu}") print(f"{Colors.BLUE}Using hosts: {', '.join(hosts)}{Colors.ENDC}")
safe_log_info(f"Distributed mode; hosts: {hosts}")
# Get list of files to process
files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))] files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))]
total_files = len(files) total_files = len(files)
@@ -435,7 +419,4 @@ if __name__ == "__main__":
input_file = os.path.join(input_dir, file) input_file = os.path.join(input_dir, file)
safe_log_info(f"Processing file {i}/{total_files}: {file}") safe_log_info(f"Processing file {i}/{total_files}: {file}")
print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}") print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}")
if use_distributed: encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS)
encode_dvr_distributed(input_file, output_dir, hosts, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS)
else:
encode_dvr(input_file, output_dir, gpu)
+74 -14
View File
@@ -10,7 +10,7 @@ from threading import Thread
from time import sleep, time from time import sleep, time
from typing import List, NamedTuple, Callable, Union, Tuple from typing import List, NamedTuple, Callable, Union, Tuple
from signal import signal, SIGINT from signal import signal, SIGINT
from sys import exit, stderr, platform as sys_platform from sys import exit, stderr, stdin, platform as sys_platform
from shlex import split, join from shlex import split, join
from hashlib import md5 from hashlib import md5
from time import strptime from time import strptime
@@ -161,6 +161,11 @@ class FFMPEGProc:
def stop(self): def stop(self):
self._should_stop = True self._should_stop = True
if getattr(self, '_proc', None) is not None and self._proc.poll() is None:
try:
self._proc.terminate()
except OSError:
pass
def _read_stderr_loop(self, stderr_lines: list): def _read_stderr_loop(self, stderr_lines: list):
"""Read stderr in a loop (used on Windows where select.poll is unavailable).""" """Read stderr in a loop (used on Windows where select.poll is unavailable)."""
@@ -270,6 +275,11 @@ class TaskThread(Thread):
def stop(self): def stop(self):
self._should_stop = True self._should_stop = True
if getattr(self, '_reader_proc', None) is not None and self._reader_proc.poll() is None:
try:
self._reader_proc.terminate()
except OSError:
pass
if self._ffmpeg: if self._ffmpeg:
self._ffmpeg.stop() self._ffmpeg.stop()
@@ -305,21 +315,24 @@ class TaskThread(Thread):
if self._host != 'localhost': if self._host != 'localhost':
encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)] encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)]
reader_proc = Popen(reader_cmd, stdout=PIPE, stderr=DEVNULL) self._reader_proc = Popen(reader_cmd, stdout=PIPE, stderr=DEVNULL)
ret = -1 ret = -1
try: try:
with open(task.output_file, 'wb') as outfile: with open(task.output_file, 'wb') as outfile:
self._ffmpeg = FFMPEGProc( self._ffmpeg = FFMPEGProc(
encoder_cmd, stdin=reader_proc.stdout, stdout=outfile, encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile,
update_callback=upd, binary_io=True update_callback=upd, binary_io=True
) )
ret = self._ffmpeg.run() ret = self._ffmpeg.run()
finally: finally:
try: rp = getattr(self, '_reader_proc', None)
reader_proc.wait(timeout=2) if rp is not None:
except TimeoutExpired: try:
reader_proc.terminate() rp.wait(timeout=2)
reader_proc.wait(timeout=5) except TimeoutExpired:
rp.terminate()
rp.wait(timeout=5)
self._reader_proc = None
if ret != 0: if ret != 0:
try: try:
@@ -393,20 +406,67 @@ def encode(hosts: List[str], input_file: str, output_file: str, segment_seconds:
stderr.flush() stderr.flush()
threads = [TaskThread(host, input_file, task_queue, pos, remote_ffmpeg_path) for pos, host in enumerate(hosts, 0)] threads = [TaskThread(host, input_file, task_queue, pos, remote_ffmpeg_path) for pos, host in enumerate(hosts, 0)]
def sigint(sig, stack): def stop_all():
print('Got SIGINT, stopping...') tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr)
stderr.flush()
for thread in threads: for thread in threads:
thread.stop() thread.stop()
for thread in threads: for thread in threads:
thread.join() thread.join(timeout=5)
exit(1) exit(1)
signal(SIGINT, sigint) stop_file = abspath('.encode_stop')
def stdin_stop_listener():
try:
while True:
line = stdin.readline()
if not line:
break
if line.strip().lower() == 'stop':
stop_all()
break
except (OSError, EOFError):
pass
def stop_file_poller():
try:
while True:
sleep(1.5)
if isfile(stop_file):
try:
unlink(stop_file)
except OSError:
pass
stop_all()
break
except OSError:
pass
def sigint(sig, stack):
stop_all()
try:
signal(SIGINT, sigint)
except (ValueError, OSError):
pass # SIGINT not available on this platform
if getattr(stdin, 'isatty', lambda: False)():
tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
else:
tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr)
stderr.flush()
if getattr(stdin, 'isatty', lambda: False)():
Thread(target=stdin_stop_listener, daemon=True).start()
Thread(target=stop_file_poller, daemon=True).start()
for thread in threads: for thread in threads:
thread.start() thread.start()
for thread in threads: try:
thread.join() for thread in threads:
thread.join()
except KeyboardInterrupt:
stop_all()
list_path = f'{tmp_dir}/output_segments.txt' list_path = f'{tmp_dir}/output_segments.txt'
with open(list_path, 'w') as f: with open(list_path, 'w') as f: