diff --git a/.specstory/history/2026-01-21_08-11Z-check-files-py-small-output-error-handling.md b/.specstory/history/2026-01-21_08-11Z-check-files-py-small-output-error-handling.md index 50e436e..6291739 100644 --- a/.specstory/history/2026-01-21_08-11Z-check-files-py-small-output-error-handling.md +++ b/.specstory/history/2026-01-21_08-11Z-check-files-py-small-output-error-handling.md @@ -14381,6 +14381,1356 @@ _**User**_ _**Agent (model default, mode Agent)**_ +
Thought Process + +The user is saying they get the same result - segments still stalling. Let me read the terminal to see what's happening: do we see "first frame after Xs" or not? That will tell us if the reader is producing anything or if the encoder never gets data. I need to understand the flow and what could be deterministically broken. + +Possible causes when "first frame" never appears: +1. Reader never outputs (stuck seeking/reading from NAS) +2. Reader outputs but encoder doesn't accept it (wrong format, pipe issue) +3. Encoder runs on remote host - maybe SSH or the remote encoder never gets data + +With fast_seek + -noaccurate_seek, the reader should seek to keyframe and start outputting quickly. If the source is on a network path (\\Hydra\...), the seek itself might be slow (network round-trips) or the read might block. + +Let me read the terminal to see the current output and then think about fixes. Options: +- Increase stall timeout (doesn't fix determinism) +- Run reader on a host that has fast access to the source (e.g. mount on the remote and run reader there too - but that would be a bigger change) +- Use a different approach: don't pipe; write segment to a temp file on the manager, then scp to remote and encode there? That's complex. +- Try piping raw video (decode to raw on reader, encode on remote) - huge bandwidth +- Check if the issue is that with -c:v copy and -ss before -i, the first packet might not be a keyframe (depending on container). So the encoder might receive data but not be able to decode until it gets an IDR. So it might not output "frame=" for a while. But we're copying HEVC to mpegts - the first packet after a keyframe seek should be a keyframe. Unless the keyframe detection in ffmpeg is wrong for this file. + +Another thought: when we use -ss 13080 -noaccurate_seek -i file -frames:v 3600 -c:v copy, we seek to the keyframe at or before 13080. Then we output 3600 frames. The output might start with a few non-IDR frames if the keyframe we landed on is not an IDR? Unlikely for HEVC. Or the mpegts muxer might not emit progress immediately. Let me check - the encoder reads mpegts from pipe. So the encoder needs to receive TS packets, demux, decode HEVC, re-encode with NVENC. So the encoder will only emit "frame=" once it has decoded and encoded a frame. So if the reader is slow to send the first packets, we'd see no progress. So the bottleneck is likely the reader not sending data within 5 minutes. + +If the source is on \\Hydra\... (SMB), then when we -ss 13080, ffmpeg has to: +1. Open the file (network) +2. Seek to position corresponding to ~13080s (might require reading index or seeking linearly?) +3. Read and output 3600 frames + +For SMB, seeking might be implemented by reading and discarding, which would be very slow for 13080s. So -noaccurate_seek might not help much if the underlying I/O is the bottleneck. So the fix might be to run the reader on a machine that has the file locally (e.g. a host that has the NAS mounted and can do fast seeks). But that would require the encode script to run the reader remotely for certain hosts, which is a bigger change. + +Simpler fix: increase SEGMENT_STALL_TIMEOUT for segments that have a large start_sec? E.g. if start_sec > 3600, use 600 or 900 seconds. That way we give the reader more time to seek into a long file over the network. Let me suggest that and also add a note in the log that for long seeks over network, consider increasing SEGMENT_STALL_TIMEOUT. + +Actually re-reading the user message: "same result" - they might mean the encode still fails (missing segments) or still stalls. Let me read the terminal to see. +
+ +--- + + +Checking the terminal output to see what's happening: + + +--- + + +
Tool use: **read_file_v2** • Read file: c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt + +
+
+ +--- + + +
Tool use: **read_file_v2** • Read file: c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt + +
+
+ +--- + +
Thought Process + +I see the issue now clearly: + +1. **fast_seek=False** - The log shows `fast_seek=False` for every segment! So READER_FAST_SEEK is not set (user might have run without it this time), and we're using -i then -ss. So the reader has to decode from 0 to start_sec before outputting. For segment 00000126 (t=7560-7620s) that's 7560 seconds = 2.1 hours of decode. So "first frame after 300.0s" appears RIGHT when we killed it - the reader had just produced the first frame at 300s but the watchdog had already fired at 300s. So we're killing segments that would have succeeded if we'd waited a bit longer. + +2. **00000126**: STALLED at 300s, "last encoder at 0.0s", then "first frame after 300.0s" - so the first frame arrived at almost exactly 300s and we killed it. Same for 00000161: "first frame after 300.5s" - we killed at 300s, first frame came at 300.5s. + +So the fix is either: +A) Use READER_FAST_SEEK=1 by default so we don't have this 5+ minute wait for "first frame" on late segments. The user said "same result" - maybe they're not setting READER_FAST_SEEK? Let me make fast_seek the default. +B) Or scale the stall timeout by start_sec - e.g. allow up to (start_sec/60)*30 + 300 seconds or something so late segments get more time for the reader to decode. + +The cleanest fix is A: default to fast_seek=True so the reader doesn't need to decode from 0. That way "first frame" comes in seconds, not minutes. We might get keyframe-aligned boundaries (small overlap) but segments complete. Let me change the default so we use fast seek when READER_FAST_SEEK is not set to 0/false. So default = True, and they can set READER_FAST_SEEK=0 to get frame-accurate (high RAM, slow). +
+ +--- + + +Observing the log: +1. fast_seek=False for every segment — READER_FAST_SEEK isn't set, so the reader uses -i then -ss and decodes from 0 to start (e.g. 7560s = 2.1 hours). +2. "first frame after 300.0s" and "first frame after 300.5s" — the first frame arrived at ~300s but the watchdog had already fired and killed the process. + +Making fast seek the default so we don't need 5+ minutes for late segments. + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + + +
+Tool use: **ripgrep_raw_search** + + +Parameters: + +```json +{ + "pattern": "READER_FAST_SEEK|fast_seek", + "path": "r:\\Videos\\mux\\ffmpeg_distributed.py", + "caseInsensitive": false +} +``` +Result: + +```json +{ + "success": { + "pattern": "READER_FAST_SEEK|fast_seek", + "path": "r:\\Videos\\mux\\ffmpeg_distributed.py", + "outputMode": "content", + "workspaceResults": { + "r:\\Videos\\mux": { + "content": { + "matches": [ + { + "file": "ffmpeg_distributed.py", + "matches": [ + { + "lineNumber": 358, + "content": " fast_seek = environ.get('READER_FAST_SEEK', '').lower() in ('1', 'true', 'yes')" + }, + { + "lineNumber": 359, + "content": " tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s, n_frames={n_frames}, fast_seek={fast_seek})', file=stderr)" + }, + { + "lineNumber": 361, + "content": " # READER_FAST_SEEK=1: -ss before -i (keyframe seek, low RAM, no decode from 0). Else -i then -ss (frame-accurate but decodes 0..start = high RAM for late segments)." + }, + { + "lineNumber": 362, + "content": " if fast_seek:" + }, + { + "lineNumber": 412, + "content": " if not fast_seek and task.start_sec > 60:" + }, + { + "lineNumber": 519, + "content": " if environ.get('READER_FAST_SEEK', '').lower() in ('1', 'true', 'yes'):" + }, + { + "lineNumber": 520, + "content": " tqdm.write(' (READER_FAST_SEEK=1: keyframe seek, low RAM; segments may be keyframe-aligned)', file=stderr)" + } + ] + } + ], + "totalLines": 7, + "totalMatchedLines": 7 + } + } + } + } +} +``` +Status: **completed** + + +
+
+ +--- + +
Thought Process + +I'll make fast_seek True by default: only treat as False when READER_FAST_SEEK is explicitly '0' or 'false' or 'no'. So: +fast_seek = environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no') + +That way default is fast seek. And update the startup message to show when we're using accurate seek (the slow path) instead. +
+ +--- + + + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Good. Now with default fast_seek=True, the reader will use -ss before -i and "first frame" should arrive in seconds (keyframe seek is fast), so we won't hit the 300s stall. The user can set READER_FAST_SEEK=0 if they need frame-accurate boundaries and are willing to wait (and have the RAM). +
+ +--- + + +Summary of what’s going on and what was changed: + +**What’s actually happening** +- The log shows **`fast_seek=False`** on every segment, so the reader was using **`-i` then `-ss`** (decode from 0 to `start_sec`). For segment 126 (t=7560–7620s) that’s ~2.1 hours of decode before the first frame. +- You then get **“first frame after 300.0s”** and **“first frame after 300.5s”** — the first frame arrives right at the 300s stall limit, and the watchdog has already killed the process. So segments are being killed just as they would have started producing output. + +**Change made** +- **Fast seek is now the default.** The reader uses **`-ss` before `-i`** (keyframe seek, low RAM) unless you turn it off. +- To get the old “frame-accurate” behavior (high RAM, slow for late segments), set **`READER_FAST_SEEK=0`**. + +With the new default you don’t need to set anything: the reader will seek quickly and “first frame” should appear within a few seconds, so the 300s stall shouldn’t trigger. If you ever need exact frame boundaries and can afford the wait/RAM, use `set READER_FAST_SEEK=0` before running. + +--- + +_**User**_ + +The real test: disable keyframe probing, segmentation, and concat; distribute the videos to workers individually. + +--- + +_**Agent (model default, mode Agent)**_ + +
Thought Process + +The user wants to change the distributed encode flow to: +1. Disable keyframe probing +2. Disable segmentation (no splitting into 60s segments) +3. Disable concat (no merging segments at the end) +4. Distribute whole videos to workers individually - so each worker gets a full input file and encodes it to a full output file + +So instead of: 1 file → N segments → N workers encode segments → concat + mux audio +We have: M files → M workers, each worker gets 1 full file, encodes the whole thing to 1 output file. + +So the flow would be: +- List input files +- For each file, assign to a worker (round-robin or queue) +- Each worker runs: reader (or no reader - the encoder runs on remote and needs the file?) ... Actually currently the reader runs on the manager (Pyro) and streams to the remote encoder. So for "whole file" we'd have: manager runs ffmpeg -i source -an -sn -c:v copy -f mpegts pipe:1 (full file), pipes to remote encoder that does hevc_nvenc to pipe:1, and we write to output file. So one worker handles one whole file at a time. No segments, no keyframe probe, no concat. Just: for each input file, assign to next worker, that worker reads the whole file (streaming from manager) and encodes to one output file (with audio we need to get audio from somewhere - the current encoder only gets video from the pipe. So we'd need to either: (a) include audio in the reader and have the encoder mux it (encoder would need -i pipe: -i source for audio? That's messy), or (b) do a two-phase: encode video only on workers, then on manager do one final mux that takes the encoded video + original audio. So the flow could be: 1) distribute: each worker gets full video stream (no audio), encodes to temp video file. 2) manager muxes each temp video with original audio to final output. So we'd have a "concat" step that's per-file: not concat segments, but mux encoded_video + original_audio → output. That's similar to current concat but for a single segment. So the pipeline would be: +- Task = (input_file, output_file) per file +- Worker: reader (full file, video only) -> encoder -> segment_file (temp, video only). Then we need a mux step: ffmpeg -i segment_file -i input_file -map 0:v -map 1:a -c:v copy -c:a copy output_file. So we still have a "post" step but it's per-file mux, not concat. + +Let me re-read. "disable keyframe probing, segmentation, and concat; distribute the videos to workers individually." +- No keyframe probing +- No segmentation (so one "segment" = whole file) +- No concat (so we don't concatenate multiple segments - for whole-file distribution we'd have one segment per file, so "concat" would be trivial or we mux video+audio per file) +- Distribute videos to workers individually = each worker gets a whole video file to encode + +So I need to: +1. Add a mode or a flag that skips keyframe probing and builds one segment per file (start=0, end=total_duration). +2. When there's only one segment per file, the "concat" step is just muxing that single encoded video with the original audio (same as now, but with one segment). +3. So actually the current code might already support this if we set segment_seconds >= total_duration for each file. Then we'd get one segment per file. But we'd still do keyframe probing. So we need to either: + - Add a "no keyframe" path that builds segments as fixed-size or single segment without probing keyframes. E.g. if segment_seconds is 0 or a flag DISTRIBUTED_WHOLE_FILE=1, then segments = [(0, total_duration)] and skip keyframe probe. + - Distribute at file level: instead of one queue of segment tasks for one input file, we have a queue of (input_file, output_file) and workers pull whole files. So the task structure changes: Task = (input_file, output_file, start_sec=0, duration_sec=total_duration). We'd need total_duration per file - we could probe duration only (no keyframes). So: list input files; for each file probe duration (and fps); create one task (input_file, output_file, 0, duration, fps); workers pull tasks and encode the whole file (reader: -i file -frames:v N or -t duration, encoder encodes to output). Then we need a mux step per file: take the encoded video output and mux with original audio. Currently the encoder writes to tmp_dir/00000000.mp4 etc. So for whole-file we'd have one segment file per input file. The concat step currently does: concat all segments from list, mux with audio from input. So for one segment it would just mux that one segment with audio. So the concat step is the same - we'd have one segment in the list. So the minimal change is: + - Add an option (env or param) like DISTRIBUTED_WHOLE_FILE=1 or segment_seconds=0 to mean "one segment per file, no keyframe probe". + - When that's set: skip _get_keyframe_times (or pass empty keyframe list), and _build_segments with segment_seconds >= duration would give one segment (0, total_duration). So we need _build_segments to return [(0, total_duration)] when we want whole file. Easiest: when segment_seconds <= 0 or env WHOLE_FILE=1, set segments = [(0, total_duration)] and skip keyframe probe. + - Rest of flow unchanged: one task per "segment", so one task per file. Worker runs reader for 0 to duration, encoder encodes, writes to tmp/00000000.mp4. Concat step: one file in list, mux with audio. Done. + +Let me check the code flow. encode() in ffmpeg_distributed takes one input_file and one output_file. So it's already per-file. The keyframe and segment logic is inside encode(). So we need: +1. When whole-file mode: don't call _get_keyframe_times, set segments = [(0, total_duration)]. +2. segment_seconds is still used in _build_segments. For one segment we need segments = [(0, total_duration)]. So we could pass segment_seconds = 0 or a very large number to _build_segments and get one segment. Looking at _build_segments: if keyframe_times is empty or len==1, it uses fixed segment_seconds and splits. So we get segments like (0, 60), (60, 120), ... So to get one segment we need to either add a special case in _build_segments when segment_seconds <= 0 or >= total_duration, or pass keyframe_times = [0, total_duration] so that we get one segment (0, total_duration). Actually _build_segments with keyframe_times = [0, total_duration] would give segments between 0 and total_duration at segment_seconds intervals. So we'd get multiple segments. The only way to get one segment is: keyframe_times = [0] and then in the "no keyframes" branch it does fixed intervals. So we need a new branch: if whole_file mode, return [(0, total_duration)]. I'll add a parameter whole_file=False to encode(), and when True: skip keyframe probe, set segments = [(0, total_duration)]. +3. Call encode() from the caller (encode_VOD_pyro) - we need to pass whole_file when user wants it. So the caller could get an env DISTRIBUTED_WHOLE_FILE=1 and pass whole_file=True to encode(). And we need to pass it from encode_VOD_pyro - which currently loops over files and calls encode_dvr_distributed for each. So for each file we call distributed_encode(..., segment_seconds=..., whole_file=env or something). So encode() in ffmpeg_distributed needs to accept whole_file and then skip keyframe and use one segment. + +Let me implement. +
+ +--- + + +Implementing whole-file distribution: one segment per file, no keyframe probing, no concat (single-segment mux only). + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\encode_VOD_pyro.py + +
+
+ +--- + +
Thought Process + +I need to understand the full flow. Currently: +- encode() is called per (input_file, output_file) +- It probes keyframes + duration, builds segments, creates task queue of segment tasks (all for that one input file), workers process segments, then concat step muxes segments + audio -> output_file + +For "whole file" distribution at the batch level: +- We have multiple input files +- We want to distribute FILES to workers, not segments. So worker 1 gets file A (whole), worker 2 gets file B (whole), etc. +- No keyframe probing, no segmentation (one "segment" = whole file), no concat (just mux the single encoded video with audio). + +So there are two ways to implement: + +Option A: Keep encode() per-file but add whole_file mode. When whole_file=True: skip keyframe probe, segments = [(0, total_duration)], one task. One worker will get that task and encode the whole file. Concat step: one segment, mux with audio. So we still use the same encode() but with a flag that makes it one segment per file. The "distribution" is still segment-based internally, but we have one segment per file. When we have multiple files, the caller (encode_VOD_pyro) loops over files and calls encode() for each. So files are processed one at a time. That's not "distribute videos to workers individually" in the sense of parallel files - we'd still do file 1 (one segment), then file 2 (one segment), etc. So we'd have 4 workers but only 1 task per file, so only 1 worker would work on each file at a time. So we'd process files sequentially (one file, one worker encodes the whole thing). That might be what the user wants - "the real test" to see if whole-file encode works without segment complexity. + +Option B: Change the architecture so we have a queue of FILES and workers pull files. So the task queue contains (input_file, output_file) for each file. Each worker pulls a file, runs reader (whole file) + encoder, writes to output (or temp then mux). So we'd need a new encode_multi() or change the flow so that the task queue is populated with one task per file from multiple files, and workers pull any file. That would parallelize across files: 4 workers, 10 files -> 4 files encoding at once. The user said "distribute the videos to workers individually" which could mean either (A) each video is sent as a whole to one worker, or (B) videos are distributed across workers so multiple videos are encoded in parallel. I'll go with (B) as it's the more useful "real test" - we're testing that whole-file encode works and we can run multiple files in parallel. + +So I need to: +1. Add whole_file mode to encode(): when whole_file=True, skip keyframe probe, segments = [(0, total_duration)]. Rest unchanged (one task, one worker encodes whole file, concat step muxes that one segment with audio). +2. Add a new entry point or mode that takes a list of (input_file, output_file), builds a single task queue where each task is a whole file (start=0, duration=total_duration, output_path = some temp or direct output?). Actually the current design has task = (start_sec, duration_sec, output_path, args, fps) and output_path is tmp_dir/00000000.mp4. So for whole file we'd have one task with output_path = tmp_dir/00000000.mp4. The concat step then takes that one file and muxes with audio. So the flow fits. We just need to support multiple input files in one encode() call with a shared task queue. So: + - New function encode_whole_files(workers, list of (input_file, output_file), ...) that: for each (input_file, output_file), probe duration and fps only, add Task(0, duration, tmp_path, args, fps) to queue, and we need to track which task belongs to which input/output so the concat step can mux the right segment with the right audio. Currently the concat step uses one input_file and one list of segment files. So we'd need to either: (1) run encode() once per file with whole_file=True (so files are processed one at a time, but each file is one segment - simple), or (2) have a multi-file encode where we have one big queue of tasks (task = input_file, output_file, 0, duration, fps, tmp_path), and after each task completes we run the mux step for that file. So the worker would not only encode but also... no, the worker just encodes to tmp_path. So we'd have multiple tmp_dirs (one per file) or one tmp_dir with multiple segment files. If one tmp_dir with multiple files: we'd have 00000000.mp4 (file1), 00000001.mp4 (file2), ... but then concat step needs to know 00000000 goes with input1->output1, 00000001 with input2->output2. So the current concat is designed for one input file producing many segments. For multi-file we'd need to run the mux step per (input, segment_file, output). So it's simpler to do Option A: keep encode() per-file, add whole_file=True. When whole_file, we have one segment, one task. So we process one file at a time (one worker gets the task). The "distribution" is just that we're not segmenting - we're sending the whole file. And we could still have 4 workers - but with one task per file, only 1 worker works at a time per file. So for 3 files we'd do file1 (worker A), file2 (worker B), file3 (worker C) in parallel if we queue all 3 files' tasks. So we need a multi-file encode that queues one task per file (each task = whole file), and workers pull from the queue. So we have tasks: [ (file1, 0, dur1, tmp1/00000000.mp4), (file2, 0, dur2, tmp2/00000000.mp4), ... ]. But then each task has a different input_file! The TaskThread has self._source_file which is set at init to one input_file. So the current design has one input file per encode() run. To support multiple files we'd need Task to include input_file, or we'd need one encode() call per file with whole_file=True and process files sequentially. The simplest "real test" is: one file, whole_file mode, no keyframe, one segment, one worker encodes the whole thing. So I'll add whole_file to encode() and when set we skip keyframe and use one segment. That way the user can run their batch and each file is processed as one whole-file encode (still one file at a time unless we add multi-file queue). Let me also add a way to run multi-file: if we have encode_whole_files(workers, [(in1, out1), (in2, out2), ...]) we'd need to share a queue. The trick is TaskThread has source_file fixed. So we need Task to carry input_file (and output_file for the mux step). So Task = (input_file, output_file, start_sec, duration_sec, output_path, args, fps). Then TaskThread would use task.input_file as the reader source. And we'd need a way to run the mux step per file when a task completes - but currently mux runs once at the end. So the flow would be: each worker pulls a task (input_file, output_file, 0, duration, tmp_path, ...), runs reader(input_file) -> encoder -> tmp_path. When all tasks are done, we run mux for each file: for each (input_file, output_file) we have one segment file, run ffmpeg to mux segment + input audio -> output. So we need to collect the list of (input_file, output_file, segment_path) and run mux for each. This is getting bigger. Let me stick with the minimal change: 1. Add whole_file parameter to encode(). When True: skip keyframe probe, segments = [(0, total_duration)]. One task per file. Concat step unchanged (one segment in list). 2. In encode_VOD_pyro, when env DISTRIBUTED_WHOLE_FILE=1 (or similar), pass whole_file=True and segment_seconds is ignored. So we still process one file at a time from the batch, but each file is encoded as one whole-file job (no keyframe, one segment). That's the "real test". If the user wants multi-file parallel (multiple files in flight at once), we can add that next. Let me implement the minimal version. +
+ +--- + + +Checking how concat uses `input_file` and the segment list: + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + +
Thought Process + +So for whole-file mode I need to: +1. Add whole_file=False to encode(). When whole_file=True: skip keyframe probe (don't call _get_keyframe_times or set keyframe_times = []), get total_duration and fps only, set segments = [(0, total_duration)]. +2. The rest (task queue, workers, concat) stays the same. Concat with one segment just muxes that one segment with audio -> output. + +I'll add the parameter and the branch at the start of encode(). Then in encode_VOD_pyro we need to pass whole_file when env DISTRIBUTED_WHOLE_FILE=1. And we need to make the batch distribute files to workers: currently the main loop in encode_VOD_pyro processes one file at a time (encode file1, then encode file2). To "distribute videos to workers individually" we could either (a) keep that loop so each file is encoded as whole-file one at a time, or (b) build a single queue of whole-file tasks (one per input file) and run one encode that processes all files - but that would require encode() to accept multiple (input, output) pairs and Task to carry input_file. So (b) is a bigger change. The user said "distribute the videos to workers individually" - I'll interpret as: each video is sent as a whole to a worker (no segmentation). So one file = one task. If we have 10 files and 4 workers, we could process 4 files at a time by having encode() accept a list of (input_file, output_file) and build tasks (input_file, output_file, 0, duration, tmp_path) for each. Then TaskThread needs to get input_file from the task. So Task = (input_file, output_file, start_sec, duration_sec, output_path, args, fps). And we need a shared task queue across files. And after all tasks done, for each file we mux its segment with its audio. So we need: +- encode_whole_files(workers, [(in1, out1), (in2, out2), ...]) +- For each (input_file, output_file): probe duration, fps. Create tmp_dir per file (or one tmp_dir with subdirs?). Actually we could use one tmp_dir with file index: 00000000.mp4 = file 0, 00000001.mp4 = file 1. So we need to know which segment index maps to which (input_file, output_file) for the mux step. So we have list of (input_file, output_file, duration, fps) and we create tasks (input_file, 0, duration, tmp_dir/00000000.mp4, args, fps), (input_file, 0, duration, tmp_dir/00000001.mp4, ...). So Task needs to include input_file. And we need output_file for the mux step. So Task = (input_file, output_file, start_sec, duration_sec, output_path, args, fps). Then TaskThread takes task.input_file as source. And we need a list of (input_file, output_file, segment_path) for the mux step. So when we build tasks we also build a list segment_to_output = [(tmp_path, input_file, output_file)] for each. After all tasks done, for each (tmp_path, input_file, output_file) we run ffmpeg -i input_file -f concat -i list (with one file) - map ... Actually the concat step currently: one list file with all segment paths, one input_file (for audio), one output_file. So it's designed for one input -> many segments -> one output. For many inputs we'd run the mux step per (input_file, segment_path, output_file). So the concat loop would be for (input_file, output_file, segment_path) in completed: run ffmpeg -i input_file -i segment_path -map 1:v -map 0:a -c:v copy -c:a copy output_file. So we need to track which segment belongs to which input/output. So I'll add: +1. encode(..., whole_file=False) - when True, one segment (0, total_duration), no keyframe probe. Single file only. This is the minimal "test" - one file, whole, no keyframe, no segmentation, concat is just mux one segment. +2. encode_whole_files(workers, file_list, ...) where file_list = [(input_file, output_file), ...]. This builds one task per (input_file, output_file) with Task carrying input_file and output_file. TaskThread gets source from task. We need to add input_file and output_file to Task. So Task = (input_file, output_file, start_sec, duration_sec, output_path, args, fps). Then we need a way to know which task produced which segment - we already have output_path in the task. So when building tasks we store mapping segment_path -> (input_file, output_file). After workers finish, for each (input_file, output_file, segment_path) run mux. So we need to store the list of (input_file, output_file, segment_path) for completed segments. Actually when a task completes successfully, the segment is at task.output_path. So we have a list of tasks. For each task we have (input_file, output_file, output_path). So after all threads join we have segment_files = glob(tmp_dir/*.mp4). But we don't know which segment file corresponds to which input/output! Unless we use a consistent naming: task for file index i goes to tmp_dir/i.mp4. So we need a list of (input_file, output_file) in order, and segment i is at tmp_dir/00000000.mp4 for i=0, etc. So we have file_list = [(in0, out0), (in1, out1), ...]. We create tasks with output_path = tmp_dir/{i:08d}.mp4. So after completion we have segment_files = sorted(glob(...)) = [tmp_dir/00000000.mp4, ...]. So segment_files[i] corresponds to file_list[i]. So we can mux segment_files[i] with file_list[i][0] (input) to file_list[i][1] (output). So we need one tmp_dir for the whole batch, and we need to pass file_list to encode_whole_files. And Task needs (input_file, start_sec, duration_sec, output_path, args, fps). We don't need output_file in Task if we have file_list and index. So Task = (input_file, start_sec, duration_sec, output_path, args, fps). And we have a list file_list = [(input_file, output_file), ...] and task for file i has output_path = tmp_dir/{i:08d}.mp4. So when building the task queue we do: for i, (input_file, output_file) in enumerate(file_list): probe duration, fps; task = Task(input_file, 0, duration, tmp_dir/i.mp4, args, fps); queue.put(task). TaskThread needs to take task.input_file as the reader source. So we need to add input_file to Task! Currently Task = (start_sec, duration_sec, output_file, args, fps). The "output_file" in Task is the segment path (tmp_dir/00000000.mp4). So we have Task(start_sec, duration_sec, output_path, args, fps). The TaskThread has self._source_file which is set in __init__ from the single input_file passed to encode(). So for multi-file we need each task to carry its input_file. So Task = (input_file, start_sec, duration_sec, output_path, args, fps). And we need to pass the list of (input_file, output_file) for the mux step. So encode_whole_files(workers, file_list, ...) where file_list = [(in1, out1), (in2, out2), ...]. We create tmp_dir (one for the batch). For each (input_file, output_file) we probe duration, fps, create Task(input_file, 0, duration, tmp_dir/i.mp4, args, fps). TaskThread is initialized with ... we need to pass the queue and have Task carry input_file. So TaskThread(host, gpu_id, task_queue, ...) - no source_file in init; instead each task has input_file. So we need to change TaskThread to get source from task._source_file or task.input_file. Let me add input_file to Task. So Task = NamedTuple with input_file, start_sec, duration_sec, output_file (path), ffmpeg_args, fps. And TaskThread(host, gpu_id, task_queue, bar_pos, remote_ffmpeg_path) - no source_file. In run(), task = queue.get(), source_file = task.input_file. So we need to update all places that create or use Task. This is getting large. Let me do the minimal version first: just add whole_file=True mode to encode() for a single file. So one file, no keyframe, one segment, one worker. No multi-file queue yet. Then the user can run their batch and each file is processed one-by-one as whole-file. If they want multi-file parallel we add encode_whole_files later. +
+ +--- + + +Implementing whole-file mode: add a `whole_file` option and, when enabled, skip keyframe probing and use a single segment (0 to duration). + + +--- + + +
Tool use: **todo_write** • Todo List + +- [-] Add whole_file param to encode(), skip keyframe probe, one segment +- [ ] Wire DISTRIBUTED_WHOLE_FILE in encode_VOD_pyro and optional multi-file queue + +
+
+ +--- + +
Thought Process + +Let me add whole_file to encode() and then add a path that distributes multiple files to workers (one task per file, tasks in shared queue, Task carries input_file so workers can encode any file). I'll need to extend Task to include optional input_file for the multi-file case. Actually for the minimal single-file whole_file mode we don't need to change Task - we just skip keyframe and set segments = [(0, total_duration)]. So encode() stays single-file, TaskThread still has _source_file from the one input_file. So only the segment-building part changes. Then for "distribute videos to workers individually" - that could mean the batch runs and each file is assigned to a worker (multiple files in flight). So we need a way to run encode with multiple files and a shared queue. I'll add: +1. whole_file param: skip keyframe, one segment. +2. New function encode_whole_files(workers, file_list, ...) that takes list of (input_file, output_file), probes each for duration/fps only, creates one Task per file. Task must carry input_file. So we need Task to have an optional input_file field, or we have a separate structure. Easiest: Task = (start_sec, duration_sec, output_path, args, fps, input_file=None). When input_file is not None, TaskThread uses it as source. So we need to add input_file to Task (optional, for multi-file). When encode() is called with one file, we don't set input_file on tasks (they use the encode() input_file). So TaskThread needs to get source from either task.input_file if set, else self._source_file. So TaskThread is created with source_file=None when we're in multi-file mode, and we pass tasks that have input_file set. So the signature of encode could be: encode(workers, input_file, output_file, ..., whole_file=False) for single file. Or encode(workers, file_list, ...) when file_list is a list of (input_file, output_file) for multi-file. So we need encode() to accept either (input_file, output_file) or file_list. Let me check - the caller encode_VOD_pyro has a loop over files and calls encode_dvr_distributed(input_file, output_dir, ...) for each. So it's always one file at a time. So for "distribute videos to workers individually" we could change the loop to: collect all (input_file, output_path) for files that need encoding, call encode_whole_files(workers, file_list, ...) once. Then encode_whole_files builds one task per file, each task has input_file in it, and we need TaskThread to take source from task. So we need to add input_file to Task and have TaskThread accept optional source_file (if None, use task.input_file). Let me implement step by step. First: add whole_file to encode() for single file. Second: add encode_whole_files and Task.input_file, and when DISTRIBUTED_WHOLE_FILE and multiple files, call encode_whole_files instead of looping encode(). +
+ +--- + + + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Good. Now I need to update the "[2/4] Queued N segments" message for whole_file to say something like "Queued 1 segment (whole file)". And add a new mode that distributes multiple files: encode_whole_files(workers, file_list, ...) and have the batch script use it when DISTRIBUTED_WHOLE_FILE=1 and we have multiple files. Actually the user said "distribute the videos to workers individually" - so they want multiple videos sent to workers (one video per worker at a time). So we need the batch to pass all files into one encode that queues one task per file and workers pull from the queue. So I need: +1. Task to carry input_file when in multi-file mode. So Task = (start_sec, duration_sec, output_file, ffmpeg_args, fps, input_file=None). Default input_file=None means use the single encode() input_file. +2. encode_whole_files(workers, file_list, tmp_base, ...) that for each (input_file, output_file) probes duration/fps, creates Task(input_file, 0, duration, tmp_dir/i.mp4, args, fps) with input_file set. We need one tmp_dir for the batch and we need to know the mapping segment index -> (input_file, output_file) for the mux step. So tmp_dir could be tmp_base / "whole_batch_" + hash of sorted paths. And we create task for file index i with output_path = tmp_dir / f"{i:08d}.mp4". So we have file_list and segment_files[i] corresponds to file_list[i]. After workers finish we run mux for each: for i, (inp, out) in enumerate(file_list): segment = tmp_dir / f"{i:08d}.mp4"; if exists, ffmpeg -i inp -i segment -map 1:v -map 0:a -c:v copy -c:a copy out. +3. TaskThread needs to get source_file from task when task has input_file, else from self._source_file. So we need to add input_file to Task. Let me check the Task definition and TaskThread. +
+ +--- + + +Adding multi-file distribution: Task carries `input_file`; add `encode_whole_files` and use it from the batch when `DISTRIBUTED_WHOLE_FILE` is set. + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + +
Thought Process + +Task is a NamedTuple. I need to add an optional input_file field. In Python 3, NamedTuple can have optional fields with default None: input_file: Optional[str] = None. Then when creating Task in the single-file encode() we don't pass input_file (so it's None). TaskThread uses self._source_file - so when task.input_file is not None, use task.input_file, else self._source_file. So TaskThread needs to get the reader source as: source = getattr(task, 'input_file', None) or self._source_file. So we need to add input_file to Task with default None. Then in the multi-file encode we'll pass input_file on each Task. And we need an encode_whole_files that takes file_list and builds the queue and runs threads - but threads need a source_file for the case when task doesn't have input_file. So when we have multi-file we pass source_file as None or we pass the first file - actually we always get source from task.input_file in multi-file mode. So let's add Task.input_file: Optional[str] = None and in TaskThread run(), source_file = getattr(task, 'input_file', None) or self._source_file. Then we need encode_whole_files(workers, file_list, ...) that creates a shared tmp_dir, for each (inp, out) probes duration/fps, creates Task(inp, 0, duration, tmp_dir/i.mp4, args, fps, input_file=inp). So Task needs to be (start_sec, duration_sec, output_file, ffmpeg_args, fps, input_file=None). So the order of fields: we have output_file as the segment path. So Task(start_sec, duration_sec, output_file, ffmpeg_args, fps, input_file=None). When we put Task we need to pass input_file=inp for multi-file. And we need to create TaskThread with source_file=None when in multi-file mode (so it always uses task.input_file). So encode_whole_files will create threads with source_file=None. Let me add input_file to Task. +
+ +--- + + +Adding optional `input_file` to Task and using it in TaskThread when present: + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +The string to replace was not found in the file. + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +The first replace for reader_cmd with self._source_file -> source_file didn't apply because I had already changed the context. Let me fix the reader_cmd to use source_file in both branches. +
+ +--- + + + + + +--- + +The string to replace was not found in the file. + +--- + +
Thought Process + +One branch already has source_file (the else), the if fast_seek still has self._source_file. Let me fix only the if branch. +
+ +--- + + + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Now I need to add encode_whole_files that: +1. Takes workers, file_list = [(input_file, output_file), ...], and other args +2. Creates one tmp_dir for the batch +3. For each (input_file, output_file): probe duration and fps (no keyframes), create Task(input_file, 0, duration, tmp_dir/i.mp4, args, fps, input_file=input_file) +4. Create TaskThread with source_file=None (or any placeholder) since task.input_file will always be set +5. Run threads +6. After join: for each file index i, mux tmp_dir/i.mp4 with file_list[i][0] (audio) -> file_list[i][1]. So we need to run N mux steps, not one concat. So the "concat" step becomes: for i, (inp, out) in enumerate(file_list): segment = tmp_dir/f"{i:08d}.mp4"; run ffmpeg -i inp -i segment -map 1:v -map 0:a? -map 0:s? -c:v copy -c:a copy -y out + +And we need to pass file_list and the mapping. So encode_whole_files(workers, file_list, remote_args, concat_args, tmp_base, probe_host, probe_path, remote_ffmpeg_path). We need to get total_duration and fps per file - probe_host/probe_path might be per-file. For simplicity we can probe on the same host for all (probe_path could be a pattern). So for each (inp, out) we have probe_path_i = probe_path or None. Let me keep it simple: probe each input_file with probe_host and optional probe_path (we might need a list or use same prefix). I'll use the same probe_host for all and probe_path = prefix + basename(inp) if prefix set. + +Actually re-reading the user message again: "disable keyframe probing, segmentation, and concat; distribute the videos to workers individually." So they want: +- No keyframe probing +- No segmentation (one segment = whole file) +- No concat - so maybe they don't even want the mux step? "distribute the videos to workers individually" could mean each worker gets a full video and produces a full output. So we'd have video-only encode on the worker and then... we need to mux with audio somewhere. So I'll keep the mux step but it's "per-file mux" not "concat segments". So we have one segment per file, mux that segment with original audio -> output. That's the same as current concat when there's one segment. So the current encode() with whole_file=True already does that: one segment, concat step muxes it with audio. So the only thing left is to allow multiple files to be queued so workers can work on different files in parallel. So I'll add encode_whole_files that takes file_list and runs the same pipeline but with one task per file and task.input_file set. And the "concat" step runs per-file: for each (input_file, output_file) we have one segment at tmp_dir/i.mp4, run mux. So I need to add the encode_whole_files function and change the batch script to call it when DISTRIBUTED_WHOLE_FILE=1. Let me add encode_whole_files. It will need to: +1. Build tmp_dir (one for batch) +2. file_list is list of (input_file, output_file) +3. For each (inp, out): duration = _get_duration_seconds(inp, ...), fps = _get_fps(inp, ...). Create Task(0, duration, tmp_dir/f"{i:08d}.mp4", args, fps, input_file=inp). Put in queue. +4. Create threads - but we need to pass source_file. We can pass "" or None and then in run() we use task.input_file. So TaskThread needs to accept source_file=None. So we create TaskThread(host, gpu_id, None, task_queue, ...) and in run() source_file = task.input_file or self._source_file. So when self._source_file is None we require task.input_file. Good. +5. Run threads. +6. After join: for i, (inp, out) in enumerate(file_list): segment = tmp_dir/f"{i:08d}.mp4". If not isfile(segment), skip or error. Else run mux: ffmpeg -i inp -i segment -map 1:v -map 0:a? -map 0:s? -c:v copy -c:a copy -y out. So we need a small mux function or inline. +7. Return True if all muxed, False otherwise. + +Let me implement encode_whole_files. I'll add it after encode() and before _parse_workers. And we need to add the optional source_file to TaskThread __init__ - it can already be None if we pass None. So we just need to make sure when source_file is None, task.input_file is set. So in run() we have source_file = (task.input_file if task.input_file else self._source_file). So if _source_file is None and task.input_file is None we'd get None. So we require in multi-file mode that every task has input_file set. Good. +
+ +--- + + +Adding `encode_whole_files` and per-file mux logic. Checking where `encode()` ends and where the concat step is: + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + +
Thought Process + +For encode_whole_files we need a different flow after threads join: +- We have file_list = [(inp0, out0), (inp1, out1), ...] and segment_files = [tmp_dir/00000000.mp4, tmp_dir/00000001.mp4, ...] in order. +- So segment_files[i] is the encoded video for file_list[i]. We mux segment_files[i] + file_list[i][0] (audio/subs) -> file_list[i][1]. + +So I'll add encode_whole_files that: +1. Takes workers, file_list, remote_args, concat_args, tmp_base (or tmp_dir), probe_host, probe_path, remote_ffmpeg_path +2. tmp_dir = tmp_base / "whole_batch_" + hash. Or we could take tmp_dir as optional and create one. +3. For each (inp, out) in file_list: probe duration and fps (no keyframe), append Task(0, duration, tmp_dir/f"{i:08d}.mp4", args, fps, input_file=abspath(inp)) +4. task_queue, threads with source_file="" (empty string so task.input_file is used) +5. Run same thread/stop/watchdog logic +6. After join: for i in range(len(file_list)): segment = tmp_dir/f"{i:08d}.mp4", (inp, out) = file_list[i]. If segment missing, error. Else ffmpeg -i inp -i segment -map 1:v -map 0:a? -map 0:s? -c:v copy -c:a copy -y out +7. Delete tmp_dir if not keep_tmp, return True if all ok + +We need to pass file_list as list of (input_file, output_file) - both absolute paths. And we need probe_host/probe_path - for multi-file, probe_path might need to be a prefix and we append basename(inp). I'll use the same probe_host for all and for probe_path we could have a function that given input_file returns the path on probe_host (e.g. prefix + basename). So encode_whole_files(workers, file_list, remote_args, concat_args, tmp_dir=None, keep_tmp=False, probe_host=None, probe_path_prefix=None, remote_ffmpeg_path=None). For each (inp, out), probe_path = (probe_path_prefix + basename(inp)) if probe_path_prefix else None. OK let me implement. I'll add it after encode() return True. +
+ +--- + + +Adding `encode_whole_files`: building the task queue and shared tmp_dir, then implementing the per-file mux. + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +TaskThread(host, gpu_id, '', task_queue, ...) - we're passing source_file=''. So when we do source_file = (task.input_file if task.input_file else self._source_file), we get task.input_file (since it's set). So '' is only used if task.input_file is None which shouldn't happen in whole-files mode. Good. Now I need to wire encode_VOD_pyro to use encode_whole_files when DISTRIBUTED_WHOLE_FILE=1 and we have multiple files. So in the main block we collect list of (input_file, output_path) for files that need encoding, and if DISTRIBUTED_WHOLE_FILE and list non-empty, call encode_whole_files(workers, file_list, ...) once. Otherwise keep the current loop (encode per file). Let me read encode_VOD_pyro main and the loop. +
+ +--- + + +Wiring the batch script to use `encode_whole_files` when `DISTRIBUTED_WHOLE_FILE` is set: + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\encode_VOD_pyro.py + +
+
+ +--- + +
Thought Process + +I need to: +1. When DISTRIBUTED_WHOLE_FILE is set, collect all (input_file, output_path) that don't already have output, then call encode_whole_files once with that list. +2. Otherwise keep the current per-file loop but pass whole_file=True to encode() when DISTRIBUTED_WHOLE_FILE is set (so single-file mode still uses whole_file and no keyframe/segmentation). + +Actually re-reading the user message: "disable keyframe probing, segmentation, and concat; distribute the videos to workers individually." So they want: +- No keyframe +- No segmentation +- No concat (so per-file mux) +- Distribute videos to workers individually = multiple files, each as one task + +So the batch should: when DISTRIBUTED_WHOLE_FILE=1, build file_list of (input, output) for files that need encoding, call encode_whole_files(workers, file_list, ...). When DISTRIBUTED_WHOLE_FILE=0 or unset, keep current behavior (per-file encode with segments). Let me also support single-file whole mode: if DISTRIBUTED_WHOLE_FILE=1 and we're processing one file (current loop), we could call encode(..., whole_file=True). So we have two code paths: +- DISTRIBUTED_WHOLE_FILE=1: collect all (inp, out) without output; if list non-empty, call encode_whole_files(workers, file_list, ...). No per-file loop. +- Else: current loop, encode_dvr_distributed(..., whole_file=False by default). We could add whole_file=env in encode_dvr_distributed so that when user sets WHOLE_FILE they get single-file whole mode in the loop. But the user said "distribute the videos to workers individually" which implies multi-file. So I'll do: when DISTRIBUTED_WHOLE_FILE=1, use encode_whole_files with full file list. When not set, current loop with optional whole_file for single-file (we can add that later). So the batch change is: if DISTRIBUTED_WHOLE_FILE: file_list = [(join(input_dir, f), join(output_dir, f)) for f in files if not (Path(output_dir) / f).exists()]; if file_list: encode_whole_files(...); else: current for-loop. And encode_dvr_distributed when called for single-file can pass whole_file=os.environ.get('DISTRIBUTED_WHOLE_FILE') so that when we're not in batch whole-files mode but user set the env, we still do whole-file for that one file. Actually let me keep it simple: only when DISTRIBUTED_WHOLE_FILE=1 we do the batch encode_whole_files. Otherwise we do the loop and each call can pass whole_file=False (current). So we don't need to change encode_dvr_distributed for whole_file single-file unless we want. Let me just add the batch path. +
+ +--- + + + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\encode_VOD_pyro.py + +
+ +
+ +--- + +
Thought Process + +encode_whole_files needs remote_args to have a default - we're passing it. I used split(remote_args or '') in encode_whole_files - so we need to get default from env in the caller, which we do (DISTRIBUTED_REMOTE_ARGS, DISTRIBUTED_REMOTE_ARGS_DEFAULT). Good. But encode_whole_files doesn't use the default for remote_args inside - it receives it from the caller. So we're good. Let me fix the encode_whole_files signature to accept optional remote_args and use a default so it's not required. Actually the caller passes it. Let me check that we're not missing something. We need to pass concat_args with -c:a copy and any extra. I passed concat_args="-c:a copy". Good. Now fix the mux command - we're using split(concat_args) which might include -c:a copy; we're also adding -c:v copy -c:s copy. So the full mux command is ffmpeg -y -i inp -i seg -map 1:v -map 0:a? -map 0:s? -map_metadata 0:g -c:v copy -c:s copy -c:a copy out. Good. One more thing: in encode_whole_files we're not skipping files that already have output - we build file_list only for files without output. Good. Let me double-check the Task creation: we need split(remote_args or ''). So when we call encode_whole_files we pass remote_args=os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT). So that's a string. We split it in encode_whole_files. Good. Let me add Path import in encode_VOD_pyro if not already there. +
+ +--- + + + + + +--- + + +
+Tool use: **ripgrep_raw_search** + + +Parameters: + +```json +{ + "pattern": "^from pathlib|^import pathlib|Path\\(", + "path": "r:\\Videos\\mux\\encode_VOD_pyro.py", + "caseInsensitive": false +} +``` +Result: + +```json +{ + "success": { + "pattern": "^from pathlib|^import pathlib|Path\\(", + "path": "r:\\Videos\\mux\\encode_VOD_pyro.py", + "outputMode": "content", + "workspaceResults": { + "r:\\Videos\\mux": { + "content": { + "matches": [ + { + "file": "encode_VOD_pyro.py", + "matches": [ + { + "lineNumber": 6, + "content": "from pathlib import Path" + }, + { + "lineNumber": 209, + "content": " input_path = Path(input_file)" + }, + { + "lineNumber": 210, + "content": " output_path = Path(output_dir) / f\"{input_path.stem}{input_path.suffix}\"" + }, + { + "lineNumber": 372, + "content": " input_path = Path(input_file).resolve()" + }, + { + "lineNumber": 373, + "content": " output_path = (Path(output_dir) / f\"{input_path.stem}{input_path.suffix}\").resolve()" + }, + { + "lineNumber": 383, + "content": " script_dir = Path(__file__).resolve().parent" + }, + { + "lineNumber": 445, + "content": " out_path = (Path(output_dir) / f).resolve()" + } + ] + } + ], + "totalLines": 7, + "totalMatchedLines": 7 + } + } + } + } +} +``` +Status: **completed** + + +
+
+ +--- + + +
+Tool use: **read_lints** + + +Parameters: + +```json +{ + "paths": [ + "r:\\Videos\\mux\\ffmpeg_distributed.py", + "r:\\Videos\\mux\\encode_VOD_pyro.py" + ] +} +``` +Result: + +```json +{ + "linterErrorsByFile": [ + { + "relativeWorkspacePath": "r:\\Videos\\mux\\ffmpeg_distributed.py" + }, + { + "relativeWorkspacePath": "r:\\Videos\\mux\\encode_VOD_pyro.py" + } + ] +} +``` +Status: **completed** + + +
+
+ +--- + +
Thought Process + +Path is already imported. Lints are clean. Let me fix the encode_whole_files mux command - we're passing concat_args to the mux. So we need to split(concat_args) in the mux_cmd. I had *split(concat_args) in the mux_cmd. So the full command is ffmpeg -y -i inp -i seg -map 1:v -map 0:a? -map 0:s? -map_metadata 0:g -c:v copy -c:s copy *split(concat_args) out. So we need to add -c:a copy if not in concat_args, or the default concat_args is "-c:a copy". Good. Let me also make sure we're not requiring the segment to exist when we're building segments_info - we only append when we add a task. So segments_info has one entry per task we put in the queue. And we create seg_path = tmp_dir/i.mp4. So when a worker finishes that task it writes to seg_path. So segment_files aren't in a list we glob - we have segments_info with the expected path. So we're good. Let me re-read encode_whole_files to ensure we're not missing the case when some tasks fail - we'd still have segments_info with (inp, out, seg_path) and we'd try to mux; if segment is missing we log and all_ok = False. Good. +
+ +--- + + + + + +--- + + +
Tool use: **todo_write** • Todo List + +- [x] Add whole_file param to encode(), skip keyframe probe, one segment +- [x] Wire DISTRIBUTED_WHOLE_FILE in encode_VOD_pyro and optional multi-file queue + +
+
+ +--- + +
Thought Process + +I need to fix one thing: in encode_whole_files we're building segments_info only for files we add to the queue. So segments_info[i] corresponds to task index i. But the segment path is tmp_dir/f"{i:08d}.mp4" and we're iterating over file_list with enumerate - so we skip when duration <= 0. So the indices in segments_info might not match file_list indices. For example if file_list has 3 files and file 1 has duration 0, we'd have segments_info = [(in0, out0, tmp/00000000.mp4), (in2, out2, tmp/00000002.mp4)] and we'd have put 2 tasks. So the segment paths would be 00000000.mp4 and 00000001.mp4 (because i is the index in the loop 0, 1, 2 - we only append when duration > 0, but i is still 0, 1, 2). So for the second file we'd have seg_path = tmp_dir/00000001.mp4 but we skipped i=1. So we'd have segments_info = [(in0, out0, tmp/0), (in2, out2, tmp/2)] but task queue has (in0, tmp/0) and (in2, tmp/1). So the segment for the second task would be tmp/00000001.mp4 not tmp/00000002.mp4! So we need to use a task index that increments only when we add a task. Let me fix: use a counter for the segment index: seg_i = 0; for i, (inp, out) in enumerate(file_list): ...; seg_path = tmp_dir/f"{seg_i:08d}.mp4"; seg_i += 1; append; put task. So then segments_info and task queue are in sync. +
+ +--- + + +Fixing segment path indexing when some files are skipped (duration <= 0): + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Done. Let me give a short summary to the user. +
+ +--- + + +Summary of what’s in place: + +**1. Single-file whole-file mode (`encode()`)** +- **`whole_file=False`** (default): unchanged (keyframe probe, segments, concat). +- **`whole_file=True`**: no keyframe probe, one segment `(0, total_duration)`, one task per file, same concat/mux step (single segment + audio). + +**2. Multi-file whole-file mode (`encode_whole_files()`)** +- **No keyframe probing** – only duration (and fps) per file. +- **No segmentation** – one task per file: `(0, total_duration)`. +- **No concat** – after encode, each file is muxed on its own: `ffmpeg -i -i -map 1:v -map 0:a? ... -c:v copy -c:a copy `. +- **Distribution** – one shared task queue; each worker pulls a full-file task (reader streams whole file, encoder writes one segment). Multiple files can run in parallel (e.g. 4 workers → up to 4 files at once). + +**3. Task and worker changes** +- **Task** has optional **`input_file`**. When set, the worker uses it as the reader source. +- **TaskThread** uses **`source_file = task.input_file or self._source_file`** so both single-file and multi-file work. + +**4. Batch script (`encode_VOD_pyro`)** +- **`DISTRIBUTED_WHOLE_FILE=1`** (or `true`/`yes`): + - Builds a list of `(input_file, output_file)` for inputs that don’t already have output. + - Calls **`encode_whole_files(workers, file_list, ...)`** once. + - No per-file loop; all files are handed to workers via the shared queue. +- Otherwise the script keeps the existing per-file loop (with segmentation). + +**How to run the “real test”** +- Set **`DISTRIBUTED_WHOLE_FILE=1`** and run the batch (e.g. `encode_VOD_pyro.bat`). +- Each input is one task (no keyframes, no segmentation, no concat); workers get whole files and the script only muxes encoded video + original audio per file. + +--- + +_**User**_ + +@c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt:462-496 why is it still probing? skip that step + +--- + +_**Agent (model default, mode Agent)**_ + +
Thought Process + +The user ran the batch without setting DISTRIBUTED_WHOLE_FILE=1, so it's using the normal per-file loop which still does keyframe probing. They want to skip the keyframe probing step. So they want the default (or an option) to skip keyframe probing even in the segmented mode - i.e. when not in whole-file mode, we could still skip keyframe probing and just use fixed segments (like _build_segments already does when keyframe_times is empty - it uses fixed segment_seconds intervals). So the fix is: make keyframe probing optional/skipped. Options: 1) Always skip keyframe probing (use empty keyframe_times so _build_segments uses fixed intervals). 2) Add an env like DISTRIBUTED_NO_KEYFRAME_PROBE=1 to skip it. The user said "skip that step" - so they want to skip keyframe probing. I'll skip it by default when we're in the normal encode path, or add a simple "no keyframe" path. Actually re-reading again: they're running without WHOLE_FILE mode (they have "Processing file 1/4" and "Probing keyframes and duration") so it's the normal encode(). They want to skip the keyframe probing step. So we should either: (A) Always skip keyframe probing in encode() and use fixed segments only (keyframe_times = []), or (B) Add an env var to skip it. I'll go with (A) - skip keyframe probing by default and use fixed segments. That way we don't hang on the keyframe probe (which can be slow on network paths). So in encode(), when whole_file is False, we still do keyframe_times = [] and then _build_segments([], segment_seconds, total_duration) which will use the fixed-interval branch. So we only need to get total_duration and fps, then segments = _build_segments([], segment_seconds, total_duration). We can remove the _get_keyframe_times call entirely when not whole_file, or always pass keyframe_times=[] to _build_segments. Let me just set keyframe_times = [] so we never call _get_keyframe_times. That will skip the probe and _build_segments will use fixed segments. +
+ +--- + + +Skipping keyframe probing in the normal (segmented) encode path so we only probe duration and use fixed segments. + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Now the "[2/4] Queued N segments (keyframes=0, ...)" message will still show keyframes=0 which is correct. No other references to keyframe_times need updating - we still use it in the message. Let me check if keyframe_times is used elsewhere in encode(). +
+ +--- + + + + + +--- + + +
+Tool use: **ripgrep_raw_search** + + +Parameters: + +```json +{ + "pattern": "keyframe_times", + "path": "r:\\Videos\\mux\\ffmpeg_distributed.py", + "caseInsensitive": false +} +``` +Result: + +```json +{ + "success": { + "pattern": "keyframe_times", + "path": "r:\\Videos\\mux\\ffmpeg_distributed.py", + "outputMode": "content", + "workspaceResults": { + "r:\\Videos\\mux": { + "content": { + "matches": [ + { + "file": "ffmpeg_distributed.py", + "matches": [ + { + "lineNumber": 34, + "content": "def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]:" + }, + { + "lineNumber": 131, + "content": "def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]:" + }, + { + "lineNumber": 135, + "content": " if not keyframe_times or len(keyframe_times) == 1:" + }, + { + "lineNumber": 146, + "content": " while i < len(keyframe_times):" + }, + { + "lineNumber": 147, + "content": " start = keyframe_times[i]" + }, + { + "lineNumber": 150, + "content": " while j < len(keyframe_times) and keyframe_times[j] < end_target:" + }, + { + "lineNumber": 152, + "content": " if j < len(keyframe_times):" + }, + { + "lineNumber": 153, + "content": " end = keyframe_times[j]" + }, + { + "lineNumber": 488, + "content": " keyframe_times = [] # skip keyframe probe; use fixed segment intervals" + }, + { + "lineNumber": 489, + "content": " segments = _build_segments(keyframe_times, segment_seconds, total_duration)" + }, + { + "lineNumber": 492, + "content": " tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr)" + }, + { + "lineNumber": 522, + "content": " tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr)" + } + ] + } + ], + "totalLines": 12, + "totalMatchedLines": 12 + } + } + } + } +} +``` +Status: **completed** + + +
+
+ +--- + +
Thought Process + +keyframe_times is only used in encode() for building segments and for the log message. We set it to [] so len(keyframe_times)=0 and _build_segments will use the fixed-interval branch. The message will say "keyframes=0" which is correct. Done. +
+ +--- + + +Update is in place: keyframe probing is no longer used in the normal encode path. + +- **[1/4]** now only runs duration (and fps) probe; the log says: `Probing duration (keyframe probe skipped, fixed segments)`. +- Segments are built with **fixed intervals** via `_build_segments([], segment_seconds, total_duration)` (same as when keyframe probe returns nothing). + +So the slow/keyframe step is skipped and you avoid the hang on the network path; segment boundaries are at fixed `segment_seconds` (e.g. 60s), not keyframes. + +--- + +_**User**_ + + @c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt:502-743 + +--- + +_**Agent (model default, mode Agent)**_ + +
Thought Process + +The terminal shows `NameError: name 'source_file' is not defined` at line 366 in ffmpeg_distributed.py. The variable `source_file` is used in the reader_cmd but it's not defined in the scope where it's used. Looking at my earlier edit, I added `source_file = (task.input_file if task.input_file else self._source_file)` but it might have been placed after the reader_cmd block or in a different order. Let me check the file - the error says line 366 has `source_file` in the reader_cmd. So we're using source_file before it's defined. I need to ensure source_file is defined before we use it in reader_cmd. Let me read the run() method to see the order of statements. +
+ +--- + + +Checking the order of operations: `source_file` is used before it's defined. + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\ffmpeg_distributed.py + +
+
+ +--- + +
Thought Process + +I see - we use `source_file` in the reader_cmd (lines 366 and 371) but we never defined it in this code path. I had added `source_file = (task.input_file if task.input_file else self._source_file)` in an earlier edit but it looks like it's missing - perhaps the edit only added it in one branch or it got lost. I need to add the line that defines source_file before it's used, i.e. before the "if fast_seek:" block. So add: source_file = (task.input_file if task.input_file else self._source_file) right after last_encoder_t[0] = 0.0 or after n_frames. +
+ +--- + + +Defining `source_file` before it's used. + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +
+ +
+ +--- + +
Thought Process + +Fixed. The variable source_file is now defined before the reader_cmd blocks that use it. Task has input_file as optional (default None), so for the normal single-file encode path task.input_file is None and source_file = self._source_file. +
+ +--- + + +The bug was that `source_file` was used in the reader command but never set in the single-file path. It’s now set right after the progress state reset: + +`source_file = (task.input_file if task.input_file else self._source_file)` + +So: +- **Single-file encode**: `task.input_file` is None → `source_file = self._source_file` (the one input file). +- **Whole-file multi-file**: `task.input_file` is set → `source_file` is that task’s input file. + +Re-run the batch and the `NameError` should be gone. + +--- + +_**User**_ + +@c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt why is it still using segs? I thought we disabled that + +--- + +_**Agent (model default, mode Agent)**_ + +
Thought Process + +The user thought we disabled segmentation - they want to run without segments. We have two modes: + +1. **DISTRIBUTED_WHOLE_FILE=1** - Uses `encode_whole_files()` which distributes whole videos to workers (no keyframe, no segmentation, per-file mux). This is only used when the user sets the env var. + +2. **Normal mode** (no env) - Still uses the per-file loop that calls `encode_dvr_distributed()` -> `encode()` with segments. We only disabled *keyframe probing*; we did not disable segmentation. So it still builds fixed 60s segments and runs the segment-based pipeline. + +So the user expected that "disable keyframe probing, segmentation, and concat" meant the default behavior would be whole-file (one segment per file). But we implemented that as an *opt-in* via DISTRIBUTED_WHOLE_FILE=1. Without that env, it still uses the segmented flow (just with fixed intervals instead of keyframe-based). + +To make it "still using segs" go away, we have two options: +A) Make whole-file the default: when not in multi-file batch mode, call encode() with whole_file=True so each file is one segment (no keyframe, no splitting into 60s chunks). +B) Tell the user to set DISTRIBUTED_WHOLE_FILE=1 to get whole-file behavior. + +The user said "I thought we disabled that" - so they want segmentation disabled by default. I'll make the single-file encode path use whole_file=True by default (or when an env is set). Actually re-reading the original request: "The real test: disable keyframe probing, segmentation, and concat; distribute the videos to workers individually." So they wanted to disable segs for the "real test". We added DISTRIBUTED_WHOLE_FILE for that. So when they run without setting it, they're in the old mode (segs). So either we make whole-file the default for the per-file loop (each file encoded as one whole segment), or we need to tell them to set the env. + +Making whole-file the default for the per-file encode would mean: in encode_dvr_distributed, pass whole_file=True to distributed_encode(). Then every file would be encoded as one segment (no 60s chunks). That would "disable" segmentation in the sense that each file is one task. So I'll add whole_file=True to the encode() call in encode_dvr_distributed. That way the normal batch (without DISTRIBUTED_WHOLE_FILE) still processes one file at a time but each file is encoded whole (one segment, no keyframe probe, concat step is just mux that one segment with audio). So "segments" would be 1 per file. +
+ +--- + + +Checking how the batch chooses between segmented and whole-file: + + +--- + + +
+Tool use: **ripgrep_raw_search** + + +Parameters: + +```json +{ + "pattern": "whole_file|encode_whole_files|encode_dvr_distributed|distributed_encode", + "path": "r:\\Videos\\mux\\encode_VOD_pyro.py", + "caseInsensitive": false +} +``` +Result: + +```json +{ + "success": { + "pattern": "whole_file|encode_whole_files|encode_dvr_distributed|distributed_encode", + "path": "r:\\Videos\\mux\\encode_VOD_pyro.py", + "outputMode": "content", + "workspaceResults": { + "r:\\Videos\\mux": { + "content": { + "matches": [ + { + "file": "encode_VOD_pyro.py", + "matches": [ + { + "lineNumber": 369, + "content": "def encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=60, remote_args=None, concat_args=\"-c:a copy\", probe_host=None, probe_path=None, remote_ffmpeg_path=None):" + }, + { + "lineNumber": 392, + "content": " from ffmpeg_distributed import encode as distributed_encode" + }, + { + "lineNumber": 396, + "content": " ok = distributed_encode(" + }, + { + "lineNumber": 437, + "content": " whole_file_mode = os.environ.get(\"DISTRIBUTED_WHOLE_FILE\", \"\").strip().lower() in (\"1\", \"true\", \"yes\")" + }, + { + "lineNumber": 441, + "content": " elif whole_file_mode:" + }, + { + "lineNumber": 452, + "content": " from ffmpeg_distributed import encode_whole_files" + }, + { + "lineNumber": 457, + "content": " ok = encode_whole_files(" + }, + { + "lineNumber": 478, + "content": " encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS) " + } + ] + } + ], + "totalLines": 8, + "totalMatchedLines": 8 + } + } + } + } +} +``` +Status: **completed** + + +
+
+ +--- + +
Thought Process + +So when the user runs without DISTRIBUTED_WHOLE_FILE, we take the else branch and call encode_dvr_distributed() for each file. That calls distributed_encode() (i.e. encode()) without whole_file=True, so we get segments. To disable segments in the normal per-file path, we should pass whole_file=True to distributed_encode() so each file is encoded as one whole segment. I'll add whole_file=True to the encode_dvr_distributed call to distributed_encode(). That way the default behavior is: one file at a time, each file encoded whole (no 60s segments). If they want the old segmented behavior they could set an env like DISTRIBUTED_SEGMENTS=1. Let me just pass whole_file=True so we "disabled" segmentation by default. +
+ +--- + + +Using the segmented path by default: whole-file runs only when DISTRIBUTED_WHOLE_FILE=1. Passing whole_file=True from the per-file loop so each file is encoded as a single segment by default. + + +--- + + +
Tool use: **read_file_v2** • Read file: r:\Videos\mux\encode_VOD_pyro.py + +
+
+ +--- + +
Thought Process + +I'll add whole_file=True to the distributed_encode() call. So by default each file is encoded as one whole segment (no 60s chunking). If the user wants the old segmented behavior they can set DISTRIBUTED_USE_SEGMENTS=1 and we pass whole_file=False. +
+ +--- + + + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\encode_VOD_pyro.py + +
+ +
+ +--- + + + +--- + +
Thought Process + +Now by default we pass whole_file=True (since use_segments is False unless DISTRIBUTED_USE_SEGMENTS is set). So the normal per-file path will encode each file as one whole segment — no 60s chunks, no keyframe probing. If someone wants the old segmented behavior they set DISTRIBUTED_USE_SEGMENTS=1. +
+ +--- + + +Segmentation was only off when `DISTRIBUTED_WHOLE_FILE=1` (multi-file queue). The normal per-file path was still calling `encode()` without `whole_file=True`, so it kept using fixed 60s segments. + +**Change made:** the per-file distributed encode now uses **whole-file by default**: +- `distributed_encode(..., whole_file=True)` unless you set **`DISTRIBUTED_USE_SEGMENTS=1`**. +- So each file is one segment (no 60s chunks, no keyframe probing). +- To get the old segmented behavior, set `DISTRIBUTED_USE_SEGMENTS=1`. + +Re-run without any env vars and it should run whole-file (no segs). + +--- + +_**User**_ + + @c:\Users\Nathan\.cursor\projects\r-Videos-mux\terminals\1.txt:1036-1059 + +--- + +_**Agent (model default, mode Agent)**_ + + + +--- + + + +--- + + +
+Tool use: **ripgrep_raw_search** + +Status: **loading** + + +
+
+ +--- + + + +--- + + + +--- + + +
+Tool use: **read_file_v2** + +Status: **loading** + + +
+
+ +--- + + + +--- + + + +--- + + + +
Tool use: **edit_file_v2** • Edit file: r:\Videos\mux\ffmpeg_distributed.py + +Status: **loading** + +
+ +
+ +--- + + + +--- + --- diff --git a/encode_VOD_pyro.bat b/encode_VOD_pyro.bat deleted file mode 100644 index 3fc4910..0000000 --- a/encode_VOD_pyro.bat +++ /dev/null @@ -1,3 +0,0 @@ -@echo off -python encode_VOD_pyro.py -pause \ No newline at end of file diff --git a/encode_VOD_pyro.py b/encode_VOD_pyro.py deleted file mode 100644 index fc6b684..0000000 --- a/encode_VOD_pyro.py +++ /dev/null @@ -1,447 +0,0 @@ -import hashlib -import os -import re -import subprocess -import sys -from pathlib import Path -import json -import logging -from datetime import datetime -import shutil -import time - -# Distributed mode requires tqdm and ffmpeg_distributed.py (SSH, Unix select.poll); on Windows use WSL or Linux. -# Workers = (ssh_host, gpu_index). Unraid (GuiltsCurse, Godzilla) excluded; RenderScrap has 2 GPUs. -DISTRIBUTED_WORKERS_DEFAULT = [ - ("Pyro", 0), - ("RenderScrap", 0), - ("RenderScrap", 1), - ("PostIrony", 0), -] -DISTRIBUTED_REMOTE_ARGS_DEFAULT = "-c:v hevc_nvenc -preset p7 -tune hq -rc vbr -rc-lookahead 32 -spatial-aq 1 -aq-strength 15 -cq 0 -b:v 9000k -maxrate 9000k -bufsize 18000k -an" -DISTRIBUTED_SEGMENT_SECONDS = 60 - - -def _parse_workers_env(s): - """Parse DISTRIBUTED_WORKERS e.g. 'Pyro:0,RenderScrap:0,RenderScrap:1,PostIrony:0' -> [(host, gpu_id), ...].""" - out = [] - for part in (s or "").strip().split(","): - part = part.strip() - if not part: - continue - if ":" in part: - host, gpu = part.rsplit(":", 1) - try: - out.append((host.strip(), int(gpu.strip()))) - except ValueError: - pass - else: - out.append((part, 0)) - return out - -# ANSI color codes -class Colors: - PURPLE = '\033[95m' - BLUE = '\033[94m' - GREEN = '\033[92m' - YELLOW = '\033[93m' - RED = '\033[91m' - ENDC = '\033[0m' - -def get_gpu_selection(): - while True: - print(f"\n{Colors.BLUE}Select GPU slot:{Colors.ENDC}") - print("0 - First GPU") - print("1 - Second GPU") - print("2 - Third GPU") - gpu = input(f"{Colors.YELLOW}Enter GPU number (0-2):{Colors.ENDC} ").strip() - if gpu in ['0', '1', '2']: - return gpu - print(f"{Colors.RED}Invalid selection. Please try again.{Colors.ENDC}") - -# Custom file handler that silently handles I/O errors (for network shares) -class SafeFileHandler(logging.FileHandler): - """File handler that silently handles I/O errors during flush""" - def flush(self): - """Override flush to silently handle I/O errors""" - try: - super().flush() - except (OSError, IOError): - # Silently ignore I/O errors (network share issues) - pass - except Exception: - # Silently ignore all other errors during flush - pass - - def emit(self, record): - """Override emit to handle errors gracefully""" - try: - super().emit(record) - except (OSError, IOError): - # Silently ignore I/O errors - we'll fall back to console output - self.handleError(record) - except Exception: - # Handle other errors - self.handleError(record) - - def handleError(self, record): - """Override to prevent error messages from being printed""" - # Don't print "--- Logging error ---" messages - pass - -# Set up logging -log_dir = "logs" -os.makedirs(log_dir, exist_ok=True) -log_file = os.path.join(log_dir, f"encode_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") - -# Configure logging with custom handler that handles network share errors -handler = SafeFileHandler(log_file, mode='w', encoding='utf-8') -handler.setLevel(logging.INFO) -formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -logger.addHandler(handler) -# Remove default handlers to avoid duplicate output -logger.handlers = [handler] - -def get_file_info(input_file): - cmd = [ - 'ffprobe', - '-v', 'error', - '-show_entries', 'format=duration,size:stream=codec_type,codec_name,width,height,r_frame_rate,channels,channel_layout', - '-of', 'json', - input_file - ] - result = subprocess.run(cmd, capture_output=True, text=True) - return json.loads(result.stdout) - -def get_audio_labels(input_file): - cmd = [ - 'ffprobe', - '-v', 'error', - '-select_streams', 'a', - '-show_entries', 'stream=index:stream_tags=title', - '-of', 'json', - input_file - ] - result = subprocess.run(cmd, capture_output=True, text=True) - info = json.loads(result.stdout) - labels = [] - for stream in info.get('streams', []): - title = stream.get('tags', {}).get('title', None) - labels.append(title) - return labels - -def parse_fps(r_frame_rate): - """Parse ffprobe r_frame_rate e.g. '60/1' or '30000/1001' to float.""" - if not r_frame_rate: - return None - parts = str(r_frame_rate).strip().split('/') - if len(parts) == 2 and float(parts[1]) != 0: - return float(parts[0]) / float(parts[1]) - try: - return float(parts[0]) - except (ValueError, IndexError): - return None - -def format_size(size_bytes): - for unit in ['B', 'KB', 'MB', 'GB']: - if size_bytes < 1024: - return f"{size_bytes:.2f} {unit}" - size_bytes /= 1024 - return f"{size_bytes:.2f} TB" - -def safe_log_info(message, print_msg=None): - """Safely log info message, ensuring console output even if logging fails""" - try: - logging.info(message) - except (OSError, IOError) as e: - # Logging failed (likely network share issue) - print to console - if print_msg is None: - print(f"{Colors.YELLOW}[Logging failed: {e}] {message}{Colors.ENDC}") - else: - print(print_msg) - print(f"{Colors.YELLOW}[Logging failed: {e}]{Colors.ENDC}") - return - except Exception as e: - # Other logging errors - if print_msg is None: - print(f"{Colors.YELLOW}[Logging error: {e}] {message}{Colors.ENDC}") - else: - print(print_msg) - print(f"{Colors.YELLOW}[Logging error: {e}]{Colors.ENDC}") - return - - # Always print to console if message provided - if print_msg is not None: - print(print_msg) - -def safe_log_error(message, print_msg=None): - """Safely log error message, ensuring console output even if logging fails""" - try: - logging.error(message) - except (OSError, IOError) as e: - # Logging failed (likely network share issue) - print to console - if print_msg is None: - print(f"{Colors.RED}[Logging failed: {e}] {message}{Colors.ENDC}") - else: - print(print_msg) - print(f"{Colors.RED}[Logging failed: {e}]{Colors.ENDC}") - return - except Exception as e: - # Other logging errors - if print_msg is None: - print(f"{Colors.RED}[Logging error: {e}] {message}{Colors.ENDC}") - else: - print(print_msg) - print(f"{Colors.RED}[Logging error: {e}]{Colors.ENDC}") - return - - # Always print to console if message provided - if print_msg is not None: - print(print_msg) - else: - print(f"{Colors.RED}{message}{Colors.ENDC}") - -def encode_dvr(input_file, output_dir, gpu): - input_path = Path(input_file) - output_path = Path(output_dir) / f"{input_path.stem}{input_path.suffix}" - - # Get file info for logging - file_info = get_file_info(str(input_path)) - input_size = int(file_info['format']['size']) - duration = float(file_info['format']['duration']) - - safe_log_info(f"Processing file: {input_path}") - safe_log_info(f"Input size: {format_size(input_size)}") - safe_log_info(f"Duration: {duration:.2f} seconds") - - print(f"\n{Colors.BLUE}Processing file: {input_path}{Colors.ENDC}") - print(f"Input size: {format_size(input_size)}") - print(f"Duration: {duration:.2f} seconds") - - # Log stream information - for i, stream in enumerate(file_info.get('streams', [])): - stream_type = 'Video' if stream.get('codec_name', '').startswith('h') else 'Audio' - safe_log_info(f"Stream {i} ({stream_type}):") - for key, value in stream.items(): - safe_log_info(f" {key}: {value}") - - # Skip if output file already exists - if output_path.exists(): - output_size = output_path.stat().st_size - safe_log_info(f"Skipping {input_path} - output already exists: {output_path}") - safe_log_info(f"Output size: {format_size(output_size)}") - print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}") - return - - # Get audio labels and input FPS (for speed display when stderr is piped) - audio_labels = get_audio_labels(str(input_path)) - safe_log_info(f"Audio labels: {audio_labels}") - input_fps = None - for s in file_info.get('streams', []): - if s.get('codec_type') == 'video': - input_fps = parse_fps(s.get('r_frame_rate')) - break - if input_fps is None: - # Fallback: first stream with r_frame_rate (e.g. codec_type not in probe) - for s in file_info.get('streams', []): - if 'width' in s or s.get('codec_name', '').startswith(('h', 'm')): - input_fps = parse_fps(s.get('r_frame_rate')) - break - if input_fps is None: - for s in file_info.get('streams', []): - input_fps = parse_fps(s.get('r_frame_rate')) - if input_fps and input_fps > 0: - break - - # FFmpeg command with NVIDIA HEVC encoder and maximum quality - cmd = [ - 'ffmpeg', - '-v', 'info', # Lower verbosity to reduce noise - '-stats', # Emit periodic stats - '-stats_period', '1.0', # Update stats every 1s (more stable) - '-i', str(input_path), - '-c:v', 'hevc_nvenc', - '-gpu', gpu, - '-preset', 'p7', - '-tune', 'hq', - '-rc', 'vbr', - '-rc-lookahead', '32', - '-spatial-aq', '1', - '-aq-strength', '15', - '-cq', '0', - '-b:v', '9000k', - '-maxrate', '9000k', - '-bufsize', '18000k', - '-c:a', 'copy', - '-map', '0', - ] - # Add metadata for each audio stream if label exists - for idx, label in enumerate(audio_labels): - if label: - cmd += [f'-metadata:s:a:{idx}', f'title={label}'] - cmd.append(str(output_path)) - - try: - # Run FFmpeg and capture combined output (avoid dual-pipe deadlocks on Windows) - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1 - ) - - # Stream output line-by-line; compute speed when stderr is piped (FFmpeg shows speed=N/A) - for line in iter(process.stdout.readline, ''): - if not line: - break - text = line.strip().strip('\r') - try: - if text.startswith('frame=') or ' fps=' in text: - display = text - m_frame = re.search(r'frame=\s*(\d+)', text) - m_elapsed = re.search(r'elapsed=(\d+):(\d+):(\d+\.?\d*)', text) - m_size = re.search(r'size=\s*(\d+)\s*KiB', text) - if m_frame and m_elapsed: - frames = int(m_frame.group(1)) - h, m, s = float(m_elapsed.group(1)), float(m_elapsed.group(2)), float(m_elapsed.group(3)) - elapsed_sec = h * 3600 + m * 60 + s - if elapsed_sec > 0: - if input_fps and input_fps > 0: - speed_x = (frames / input_fps) / elapsed_sec - display = re.sub(r'speed=N/A', f'speed={speed_x:.2f}x', text) - # time = output position (HH:MM:SS.ms) - video_sec = frames / input_fps - t_h = int(video_sec // 3600) - t_m = int((video_sec % 3600) // 60) - t_s = video_sec % 60 - time_str = f'{t_h}:{t_m:02d}:{t_s:06.3f}' if t_h else f'0:{t_m:02d}:{t_s:06.3f}' - display = re.sub(r'time=N/A', f'time={time_str}', display) - # bitrate when muxer has written data (stays N/A until size > 0) - size_kib = int(m_size.group(1)) if m_size else 0 - if size_kib > 0 and elapsed_sec > 0: - bitrate_kbps = (size_kib * 8192) / (elapsed_sec * 1000) - display = re.sub(r'bitrate=N/A', f'bitrate={bitrate_kbps:.0f}kbits/s', display) - else: - enc_fps = frames / elapsed_sec - display = re.sub(r'speed=N/A', f'speed={enc_fps:.0f}fps', text) - safe_log_info(f"Progress: {text}", f"{Colors.PURPLE}Progress: {display}{Colors.ENDC}") - else: - safe_log_info(f"FFmpeg: {text}", f"{Colors.GREEN}FFmpeg: {text}{Colors.ENDC}") - except (OSError, IOError) as e: - # I/O error reading from pipe - log it - safe_log_error(f"I/O error reading FFmpeg output: {e}") - break - except Exception as e: - # Unexpected error - safe_log_error(f"Unexpected error processing FFmpeg output: {e}") - - process.wait() - - if process.returncode == 0: - # Get output file info - output_info = get_file_info(str(output_path)) - output_size = int(output_info['format']['size']) - compression_ratio = input_size / output_size if output_size > 0 else 0 - - safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}") - safe_log_info(f"Output size: {format_size(output_size)}") - safe_log_info(f"Compression ratio: {compression_ratio:.2f}x", f"Compression ratio: {compression_ratio:.2f}x") - else: - # Convert Windows error code to signed integer if needed - return_code = process.returncode - if return_code > 2147483647: # If it's a large unsigned int, convert to signed - return_code = return_code - 4294967296 - safe_log_error(f"FFmpeg process failed with return code {return_code}", - f"{Colors.RED}FFmpeg process failed with return code {return_code}{Colors.ENDC}") - - except subprocess.CalledProcessError as e: - safe_log_error(f"Error encoding {input_path}: {e}", f"{Colors.RED}Error encoding {input_path}: {e}{Colors.ENDC}") - except Exception as e: - safe_log_error(f"Unexpected error encoding {input_path}: {type(e).__name__}: {e}", - f"{Colors.RED}Unexpected error encoding {input_path}: {e}{Colors.ENDC}") - - -def encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=60, remote_args=None, concat_args="-c:a copy", probe_host=None, probe_path=None, remote_ffmpeg_path=None): - """Encode one file using ffmpeg_distributed (split -> farm -> concat). workers = [(host, gpu_id), ...]. - Segment temp dirs go under script dir/tmp/. If probe_host and probe_path are set, ffprobe runs there (faster when input is on NAS).""" - input_path = Path(input_file).resolve() - output_path = (Path(output_dir) / f"{input_path.stem}{input_path.suffix}").resolve() - if output_path.exists(): - safe_log_info(f"Skipping {input_path} - output already exists: {output_path}") - print(f"{Colors.YELLOW}Skipping {input_path} - output already exists{Colors.ENDC}") - return - remote_args = remote_args or os.environ.get("DISTRIBUTED_REMOTE_ARGS", DISTRIBUTED_REMOTE_ARGS_DEFAULT) - probe_host = probe_host or os.environ.get("PROBE_HOST") - if probe_path is None and probe_host and os.environ.get("PROBE_PATH_PREFIX"): - prefix = os.environ.get("PROBE_PATH_PREFIX", "").rstrip("/") - probe_path = f"{prefix}/{input_path.name}" - script_dir = Path(__file__).resolve().parent - tmp_base = script_dir / "tmp" - tmp_base.mkdir(exist_ok=True) - path_for_hash = os.path.abspath(os.path.expanduser(str(input_path))) - segment_hash = hashlib.md5(path_for_hash.encode()).hexdigest() - tmp_dir = str(tmp_base / f"ffmpeg_segments_{segment_hash}") - cwd = os.getcwd() - try: - os.chdir(output_dir) - from ffmpeg_distributed import encode as distributed_encode - safe_log_info(f"Distributed encode: {input_path} -> {output_path} (workers: {workers})") - print(f"{Colors.BLUE}Distributed encode (HEVC): {input_path.name}{Colors.ENDC}") - remote_ffmpeg = remote_ffmpeg_path or os.environ.get("DISTRIBUTED_REMOTE_FFMPEG_PATH") - ok = distributed_encode( - workers, - str(input_path), - str(output_path), - segment_seconds=segment_seconds, - remote_args=remote_args, - concat_args=concat_args, - tmp_dir=tmp_dir, - probe_host=probe_host, - probe_path=probe_path, - remote_ffmpeg_path=remote_ffmpeg, - ) - if ok and output_path.exists(): - safe_log_info(f"Successfully encoded: {output_path}", f"{Colors.GREEN}Successfully encoded: {output_path}{Colors.ENDC}") - else: - safe_log_error("Distributed encode did not produce output (see [4/4] ERROR above)", f"{Colors.RED}Distributed encode did not produce output{Colors.ENDC}") - except Exception as e: - safe_log_error(f"Distributed encode failed: {e}", f"{Colors.RED}Distributed encode failed: {e}{Colors.ENDC}") - finally: - os.chdir(cwd) - - -if __name__ == "__main__": - if sys.platform == "win32": - print(f"{Colors.YELLOW}Distributed mode uses select.poll() and may fail on Windows; use WSL or Linux for best results.{Colors.ENDC}") - - input_dir = "input" - output_dir = "output" - os.makedirs(output_dir, exist_ok=True) - - workers_str = os.environ.get("DISTRIBUTED_WORKERS") - if workers_str: - workers = _parse_workers_env(workers_str) - else: - workers = DISTRIBUTED_WORKERS_DEFAULT - workers_desc = ", ".join(f"{h}:gpu{g}" for h, g in workers) - print(f"{Colors.BLUE}Using workers: {workers_desc}{Colors.ENDC}") - safe_log_info(f"Distributed mode; workers: {workers}") - - files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.DVR.mp4'))] - total_files = len(files) - - if total_files == 0: - safe_log_info("No files to process in input directory", f"{Colors.YELLOW}No files to process in input directory{Colors.ENDC}") - else: - safe_log_info(f"Found {total_files} files to process", f"{Colors.BLUE}Found {total_files} files to process{Colors.ENDC}") - - for i, file in enumerate(files, 1): - input_file = os.path.join(input_dir, file) - safe_log_info(f"Processing file {i}/{total_files}: {file}") - print(f"\n{Colors.BLUE}Processing file {i}/{total_files}: {file}{Colors.ENDC}") - encode_dvr_distributed(input_file, output_dir, workers, segment_seconds=DISTRIBUTED_SEGMENT_SECONDS) \ No newline at end of file diff --git a/ffmpeg_distributed.py b/ffmpeg_distributed.py deleted file mode 100644 index 703f84e..0000000 --- a/ffmpeg_distributed.py +++ /dev/null @@ -1,686 +0,0 @@ -#!/usr/bin/env python3 -from queue import SimpleQueue, Empty -from subprocess import Popen, DEVNULL, PIPE, TimeoutExpired -import select -from shutil import rmtree -from glob import glob -from os import mkdir, unlink, listdir, environ -from os.path import basename, abspath, expanduser, isfile, isdir, getsize -from threading import Thread -from time import sleep, time -from typing import List, NamedTuple, Callable, Union, Tuple -from signal import signal, SIGINT -from sys import exit, stderr, stdin, platform as sys_platform -from shlex import split, join -from hashlib import md5 -from time import strptime -from tqdm import tqdm -import re - -DEBUG = 'DEBUG' in environ - -def dprint(*args, **kwargs): - if DEBUG: - print(*args, *kwargs) - -def _popen(args, **kwargs): - dprint(f'calling subprocess: {args}') - kwargs['stderr'] = PIPE - kwargs['stdout'] = DEVNULL - kwargs['universal_newlines'] = True - return Popen(args, **kwargs) - - -def _get_keyframe_times(input_file: str, probe_host: str = None, probe_path: str = None) -> List[float]: - """Run ffprobe to get keyframe timestamps (seconds). Uses -skip_frame nokey so only keyframes are read (fast). - If probe_host and probe_path are set, run ffprobe there via ssh so the file is read at local disk speed.""" - cmd = [ - 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-skip_frame', 'nokey', - '-show_entries', 'frame=pts_time', '-of', 'csv=p=0', - probe_path if (probe_host and probe_path) else input_file - ] - if probe_host and probe_path: - cmd = ['ssh', probe_host, join(cmd)] - proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - out, err = proc.communicate() - if proc.returncode != 0 and err: - dprint('ffprobe keyframes:', err) - out = out or '' - times = [] - for line in out.strip().splitlines(): - line = line.strip().split('=')[-1] if '=' in line else line.strip() - if not line: - continue - try: - times.append(float(line)) - except ValueError: - pass - return sorted(times) - - -def _get_duration_seconds(input_file: str, probe_host: str = None, probe_path: str = None) -> float: - """Get container duration in seconds via ffprobe (header-only, fast). If probe_host set, run there.""" - path = probe_path if (probe_host and probe_path) else input_file - cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path] - if probe_host and probe_path: - cmd = ['ssh', probe_host, join(cmd)] - proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - out, _ = proc.communicate() - out = (out or '').strip() - try: - if out: - return float(out) - except ValueError: - pass - cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=duration', '-of', 'csv=p=0', path] - if probe_host and probe_path: - cmd = ['ssh', probe_host, join(cmd)] - proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - out, _ = proc.communicate() - out = (out or '').strip() - try: - if out: - return float(out) - except ValueError: - pass - return 0.0 - - -def _get_fps(input_file: str, probe_host: str = None, probe_path: str = None) -> float: - """Get video stream frame rate (e.g. 60.0) via ffprobe. If probe_host set, run there. Default 60.0 on failure.""" - path = probe_path if (probe_host and probe_path) else input_file - cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'csv=p=0', path] - if probe_host and probe_path: - cmd = ['ssh', probe_host, join(cmd)] - proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True) - out, _ = proc.communicate() - out = (out or '').strip() - if not out: - return 60.0 - try: - if '/' in out: - num, den = out.split('/', 1) - return float(num) / float(den) if float(den) else 60.0 - return float(out) - except (ValueError, ZeroDivisionError): - return 60.0 - - -def _probe_duration(path: str) -> float: - """Return duration in seconds from ffprobe, or 0.0 on failure.""" - proc = Popen( - ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', path], - stdout=PIPE, stderr=PIPE, universal_newlines=True - ) - out, _ = proc.communicate() - if proc.returncode != 0: - return 0.0 - out = (out or '').strip() - try: - return float(out) - except ValueError: - return 0.0 - - -def _segment_valid(segment_path: str, expected_duration_sec: float, tolerance_sec: float = 0.1) -> bool: - """Return True if ffprobe succeeds on segment and duration matches expected within tolerance (like check_files.py).""" - duration = _probe_duration(segment_path) - return duration > 0 and abs(duration - expected_duration_sec) <= tolerance_sec - - -def _build_segments(keyframe_times: List[float], segment_seconds: float, total_duration: float) -> List[Tuple[float, float]]: - """Build (start_sec, end_sec) segments at keyframe boundaries. When keyframe probe fails, use fixed segment_seconds.""" - if total_duration <= 0: - return [] - if not keyframe_times or len(keyframe_times) == 1: - # No keyframes: split at fixed intervals so we still get multiple segments - segments = [] - start = 0.0 - while start < total_duration: - end = min(start + segment_seconds, total_duration) - segments.append((start, end)) - start = end - return segments - segments = [] - i = 0 - while i < len(keyframe_times): - start = keyframe_times[i] - end_target = start + segment_seconds - j = i + 1 - while j < len(keyframe_times) and keyframe_times[j] < end_target: - j += 1 - if j < len(keyframe_times): - end = keyframe_times[j] - else: - end = total_duration - segments.append((start, end)) - i = j - return segments - - -class Task(NamedTuple): - start_sec: float - duration_sec: float - output_file: str - ffmpeg_args: List[str] - fps: float = 60.0 - -class FFMPEGProc: - _duration_re = re.compile(r'.*Duration:\s*-?(?P[0-9]+):(?P[0-9]+):(?P[0-9.]+),') - _progress_re = re.compile(r'frame=\s*(?P[0-9]+)\s+fps=\s*(?P[0-9]+).*time=-?(?P[0-9]+):(?P[0-9]+):(?P[0-9,.]+)\s+.*speed=(?P[0-9\.]+)x') - - @staticmethod - def _match_to_sec(match): - return int(match.group('time_h'))*3600+int(match.group('time_m'))*60+float(match.group('time_s')) - - def __init__(self, cmd: Union[list, str], shell=False, stdin=DEVNULL, stdout=DEVNULL, update_callback: Callable[[int,int,float,float,float], None] = None, binary_io=False, echo_stderr=False): - self._cmd = cmd - self._update_callback = update_callback - self._should_stop = False - self._shell = shell - self._duration = None - self._stdin = stdin - self._stdout = stdout - self._binary_io = binary_io - self._echo_stderr = echo_stderr - self.stderr = '' - - def stop(self): - self._should_stop = True - if getattr(self, '_proc', None) is not None and self._proc.poll() is None: - try: - self._proc.terminate() - except OSError: - pass - - def _read_stderr_loop(self, stderr_lines: list): - """Read stderr in a loop (used on Windows where select.poll is unavailable).""" - while True: - raw = self._proc.stderr.readline() - if not raw and self._proc.poll() is not None: - break - if raw: - line = raw.decode(errors='replace') if self._binary_io else raw - stderr_lines.append(line) - if self._echo_stderr: - tqdm.write(line.rstrip(), file=stderr) - match = self._progress_re.match(line) - if match and self._update_callback: - self._update_callback( - int(match.group('frame')), - int(match.group('fps')), - self._match_to_sec(match), - self._duration, - float(match.group('speed')), - ) - elif self._duration is None: - dm = self._duration_re.match(line) - if dm: - self._duration = self._match_to_sec(dm) - - def run(self): - self._proc = Popen( - self._cmd, shell=self._shell, stderr=PIPE, stdin=self._stdin, stdout=self._stdout, - universal_newlines=not self._binary_io - ) - if getattr(select, 'poll', None): - poll = select.poll() - poll.register(self._proc.stderr) - while self._proc.poll() is None and not self._should_stop: - if not poll.poll(1): - sleep(0.1) - continue - sleep(0.001) - raw = self._proc.stderr.readline() - line = raw.decode(errors='replace') if self._binary_io and raw else (raw or '') - if self._echo_stderr and line: - tqdm.write(line.rstrip(), file=stderr) - match = self._progress_re.match(line) - if not match: - self.stderr += line - if match and self._update_callback: - self._update_callback( - int(match.group('frame')), - int(match.group('fps')), - self._match_to_sec(match), - self._duration, - float(match.group('speed')), - ) - elif self._duration is None: - match = self._duration_re.match(line) - if match: - self._duration = self._match_to_sec(match) - try: - _, err = self._proc.communicate(timeout=1) - if err and self._binary_io: - err = err.decode(errors='replace') if isinstance(err, bytes) else err - self.stderr += err or '' - except TimeoutExpired: - pass - else: - stderr_lines = [] - reader = Thread(target=self._read_stderr_loop, args=(stderr_lines,), daemon=True) - reader.start() - while self._proc.poll() is None and not self._should_stop: - sleep(0.2) - reader.join(timeout=2) - self.stderr = ''.join(stderr_lines) - return self._proc.returncode - -class TqdmAbsolute(tqdm): - def __init__(self, *args, **kwargs): - kwargs['bar_format'] = '{l_bar}{bar}|{n:.1f}/{total:.1f} [{elapsed}<{remaining}]' - kwargs['dynamic_ncols'] = True - if not 'total' in kwargs: - kwargs['total'] = 99999999 - if not 'leave' in kwargs: - kwargs['leave'] = False - super().__init__(*args, **kwargs) - def update(self, to): - super().update(to - self.n) # will also set self.n = b * bsize - -HOST_COLORS = ['\033[94m', '\033[92m', '\033[93m', '\033[95m', '\033[96m', '\033[91m'] # blue, green, yellow, magenta, cyan, red -RESET = '\033[0m' -SEGMENT_STALL_TIMEOUT = int(environ.get('SEGMENT_STALL_TIMEOUT', '300')) # no encoder progress for this many sec -> kill and re-queue - -class TaskThread(Thread): - def __init__(self, host: str, gpu_id: int, source_file: str, task_queue: SimpleQueue, bar_pos: int, remote_ffmpeg_path: str = None): - super().__init__() - self._should_stop = False - self._host = host - self._gpu_id = gpu_id - self._host_desc = f"{host}:gpu{gpu_id}" - self._bar_pos = bar_pos - self._remote_ffmpeg_path = remote_ffmpeg_path - self._source_file = source_file - self._task_queue = task_queue - self._ffmpeg = None - self._bar = TqdmAbsolute(desc=self._host_desc, position=bar_pos) - self._current_file = None - - def _host_tag(self): - c = HOST_COLORS[self._bar_pos % len(HOST_COLORS)] - return f'{c}{self._host_desc}{RESET}' - - def stop(self): - self._should_stop = True - rp = getattr(self, '_reader_proc', None) - if rp is not None and rp.poll() is None: - try: - if rp.stdin is not None: - rp.stdin.write(b'q') - rp.stdin.flush() - sleep(0.5) - except (OSError, BrokenPipeError): - pass - try: - if rp.poll() is None: - rp.terminate() - except OSError: - pass - if self._ffmpeg: - self._ffmpeg.stop() - - def run(self): - last_log = [0.0] # mutable for progress heartbeat - last_progress = [time()] # for stall detection - last_encoder_t = [0.0] # last encoder progress position (sec) for stall log - segment_start_time = [0.0] # when current segment started - first_progress_logged = [False] # one-time "first frame" log per segment - verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes') or environ.get('DISTRIBUTED_DEBUG', '').lower() in ('1', 'true', 'yes') - - def upd(frames, fps, t, duration, speed): - now = time() - last_progress[0] = now - last_encoder_t[0] = t - self._bar.total = duration or 999 - self._bar.desc = self._host_desc + ': ' + (self._current_file or '') - self._bar.update(t) - if not first_progress_logged[0] and (frames > 0 or t > 0): - elapsed = now - segment_start_time[0] - tqdm.write(f' {self._host_tag()}: {self._current_file} first frame after {elapsed:.1f}s', file=stderr) - stderr.flush() - first_progress_logged[0] = True - if duration and duration > 0 and (now - last_log[0]) >= 30: - tqdm.write(f' {self._host_tag()}: {self._current_file} {t:.0f}s / {duration:.0f}s ({speed:.1f}x)', file=stderr) - stderr.flush() - last_log[0] = now - try: - while not self._should_stop: - task = self._task_queue.get(False) - - self._current_file = basename(task.output_file) - last_progress[0] = time() - segment_start_time[0] = last_progress[0] - first_progress_logged[0] = False - last_encoder_t[0] = 0.0 - n_frames = round(task.duration_sec * task.fps) - # Default fast_seek=True: -ss before -i (low RAM, quick). Set READER_FAST_SEEK=0 for frame-accurate (high RAM, slow for late segments). - fast_seek = environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no') - tqdm.write(f' {self._host_tag()}: starting {self._current_file} (t={task.start_sec:.0f}-{task.start_sec+task.duration_sec:.0f}s, n_frames={n_frames}, fast_seek={fast_seek})', file=stderr) - stderr.flush() - # READER_FAST_SEEK=1: -ss before -i (keyframe seek, low RAM, no decode from 0). Else -i then -ss (frame-accurate but decodes 0..start = high RAM for late segments). - if fast_seek: - reader_cmd = [ - 'ffmpeg', '-ss', str(task.start_sec), '-noaccurate_seek', '-i', self._source_file, - '-frames:v', str(n_frames), '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1' - ] - else: - reader_cmd = [ - 'ffmpeg', '-i', self._source_file, - '-ss', str(task.start_sec), '-frames:v', str(n_frames), - '-an', '-sn', '-c:v', 'copy', '-f', 'mpegts', 'pipe:1' - ] - ffmpeg_bin = (self._remote_ffmpeg_path or 'ffmpeg') if self._host != 'localhost' else 'ffmpeg' - # -r only: lock output to CFR; do not use -frames:v or segments get truncated when reader sends fewer frames - encoder_cmd = [ - ffmpeg_bin, '-f', 'mpegts', '-i', 'pipe:', - '-gpu', str(self._gpu_id), - '-r', str(task.fps), - *task.ffmpeg_args, - '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov', 'pipe:1' - ] - if self._host == 'localhost' and sys_platform != 'win32': - encoder_cmd = ['nice', '-n10', 'ionice', '-c3'] + encoder_cmd - if self._host != 'localhost': - encoder_cmd = ['ssh', '-o', 'ConnectTimeout=15', self._host, join(encoder_cmd)] - - reader_stderr_lines = [] - if verbose: - self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) - def read_reader_stderr(): - rp = getattr(self, '_reader_proc', None) - if rp and rp.stderr: - for line in iter(rp.stderr.readline, b''): - reader_stderr_lines.append(line.decode(errors='replace').rstrip()) - Thread(target=read_reader_stderr, daemon=True).start() - else: - self._reader_proc = Popen(reader_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) - ret = -1 - stall_done = [False] # watchdog sets True when it kills - - def stall_watchdog(): - if SEGMENT_STALL_TIMEOUT <= 0: - return - while not self._should_stop and not stall_done[0]: - sleep(60) - if stall_done[0] or self._should_stop: - break - if time() - last_progress[0] > SEGMENT_STALL_TIMEOUT: - ago = time() - last_progress[0] - enc_t = last_encoder_t[0] - tqdm.write(f' {self._host_tag()}: STALLED {self._current_file} (no progress {ago:.0f}s; last encoder at {enc_t:.1f}s into segment)', file=stderr) - if not fast_seek and task.start_sec > 60: - tqdm.write(f' {self._host_tag()}: (reader -ss {task.start_sec:.0f} after -i may still be decoding 0..{task.start_sec:.0f}s from source)', file=stderr) - tqdm.write(f' {self._host_tag()}: killing and re-queuing', file=stderr) - stderr.flush() - stall_done[0] = True - self.stop() - break - proc = getattr(self._ffmpeg, '_proc', None) if self._ffmpeg else None - if proc is not None and proc.poll() is not None: - break - - watchdog = Thread(target=stall_watchdog, daemon=True) - watchdog.start() - try: - with open(task.output_file, 'wb') as outfile: - self._ffmpeg = FFMPEGProc( - encoder_cmd, stdin=self._reader_proc.stdout, stdout=outfile, - update_callback=upd, binary_io=True - ) - ret = self._ffmpeg.run() - finally: - stall_done[0] = True - watchdog.join(timeout=2) - rp = getattr(self, '_reader_proc', None) - if rp is not None: - try: - rp.wait(timeout=2) - except TimeoutExpired: - rp.terminate() - rp.wait(timeout=5) - self._reader_proc = None - - if ret != 0: - try: - if isfile(task.output_file): - unlink(task.output_file) - except OSError: - pass - print(f' {self._host_tag()}: FAILED {self._current_file}', file=stderr, flush=True) - if self._ffmpeg.stderr: - print(self._ffmpeg.stderr, file=stderr, end='', flush=True) - if reader_stderr_lines: - tqdm.write(f' {self._host_tag()}: Reader stderr:', file=stderr) - for line in reader_stderr_lines[-50:]: # last 50 lines - tqdm.write(f' {line}', file=stderr) - stderr.flush() - self._task_queue.put(task) - else: - tqdm.write(f' {self._host_tag()}: done {self._current_file}', file=stderr) - stderr.flush() - except Empty: - pass - self._bar.close() - -def encode(workers: List[Tuple[str, int]], input_file: str, output_file: str, segment_seconds: float = 60, remote_args: str = '', concat_args: str = '', tmp_dir: str = None, keep_tmp=False, resume=False, copy_input=False, probe_host: str = None, probe_path: str = None, remote_ffmpeg_path: str = None): - input_file = abspath(expanduser(input_file)) - output_file = abspath(expanduser(output_file)) - tmp_dir = tmp_dir or 'ffmpeg_segments_'+md5(input_file.encode()).hexdigest() - try: - mkdir(tmp_dir) - except FileExistsError: - pass # previous job: resume and re-queue failed segments - - verbose = environ.get('VERBOSE', '').lower() in ('1', 'true', 'yes') - tqdm.write('[1/4] Probing keyframes and duration...', file=stderr) - stderr.flush() - keyframe_times = _get_keyframe_times(input_file, probe_host, probe_path) - total_duration = _get_duration_seconds(input_file, probe_host, probe_path) - fps = _get_fps(input_file, probe_host, probe_path) - segments = _build_segments(keyframe_times, segment_seconds, total_duration) - - if not segments: - tqdm.write(f'No segments for {input_file} (keyframes={len(keyframe_times)}, duration={total_duration}). Check ffprobe.', file=stderr) - return False - - MIN_SEGMENT_BYTES = 1024 # smaller = failed/corrupt segment, re-encode - DURATION_TOLERANCE_SEC = 0.1 # same as check_files.py - task_queue = SimpleQueue() - removed = 0 - for i, (start_sec, end_sec) in enumerate(segments): - duration_sec = end_sec - start_sec - output_path = f'{tmp_dir}/{i:08d}.mp4' - if isfile(output_path): - try: - if getsize(output_path) < MIN_SEGMENT_BYTES: - unlink(output_path) - removed += 1 - elif not _segment_valid(output_path, duration_sec, DURATION_TOLERANCE_SEC): - unlink(output_path) - removed += 1 - else: - continue - except OSError: - pass - task_queue.put(Task(start_sec, duration_sec, output_path, split(remote_args), fps)) - - n_tasks = task_queue.qsize() - if removed: - tqdm.write(f'[2/4] Removed {removed} bad segment(s) (ffprobe failed or duration mismatch), re-queued.', file=stderr) - if n_tasks == 0: - tqdm.write('All segments already done (resume).', file=stderr) - else: - tqdm.write(f'[2/4] Queued {n_tasks} segments (keyframes={len(keyframe_times)}, duration={total_duration:.1f}s)', file=stderr) - stderr.flush() - dprint(f'Segments: {len(segments)} total, {n_tasks} tasks queued') - - tqdm.write(f'[3/4] Encoding segments on {len(workers)} worker(s)...', file=stderr) - if environ.get('READER_FAST_SEEK', '1').lower() not in ('0', 'false', 'no'): - tqdm.write(' (Reader: fast seek -ss before -i; set READER_FAST_SEEK=0 for frame-accurate)', file=stderr) - stderr.flush() - threads = [TaskThread(host, gpu_id, input_file, task_queue, pos, remote_ffmpeg_path) for pos, (host, gpu_id) in enumerate(workers, 0)] - - def stop_all(): - tqdm.write('Stopping all workers (killing ffmpeg/SSH on each host)...', file=stderr) - stderr.flush() - for thread in threads: - thread.stop() - for thread in threads: - thread.join(timeout=5) - exit(1) - - stop_file = abspath('.encode_stop') - - def stdin_stop_listener(): - try: - while True: - line = stdin.readline() - if not line: - break - if line.strip().lower() == 'stop': - stop_all() - break - except (OSError, EOFError): - pass - - def stop_file_poller(): - try: - while True: - sleep(1.5) - if isfile(stop_file): - try: - unlink(stop_file) - except OSError: - pass - stop_all() - break - except OSError: - pass - - def sigint(sig, stack): - stop_all() - - try: - signal(SIGINT, sigint) - except (ValueError, OSError): - pass # SIGINT not available on this platform - - if getattr(stdin, 'isatty', lambda: False)(): - tqdm.write("To cancel: type 'stop' + Enter, or create .encode_stop (e.g. from another terminal: echo. > .encode_stop)", file=stderr) - else: - tqdm.write("To cancel: create .encode_stop in current dir (e.g. from another terminal: echo. > .encode_stop)", file=stderr) - stderr.flush() - if getattr(stdin, 'isatty', lambda: False)(): - Thread(target=stdin_stop_listener, daemon=True).start() - Thread(target=stop_file_poller, daemon=True).start() - - for thread in threads: - thread.start() - try: - for thread in threads: - thread.join() - except KeyboardInterrupt: - stop_all() - - list_path = f'{tmp_dir}/output_segments.txt' - segment_files = sorted(glob(f'{tmp_dir}/*.mp4')) - expected_count = len(segments) - if len(segment_files) < expected_count: - tqdm.write(f'[4/4] ERROR: Only {len(segment_files)} of {expected_count} segments produced. Missing segments (check for FAILED lines above; fix or exclude failing worker and re-run).', file=stderr) - stderr.flush() - return False - - with open(list_path, 'w') as f: - f.write('\n'.join([f"file '{fpath}'" for fpath in segment_files])) - - CONCAT_DURATION_TOLERANCE = 0.1 - segments_total = sum(_probe_duration(p) for p in segment_files) - duration_diff = segments_total - total_duration - if abs(duration_diff) > CONCAT_DURATION_TOLERANCE: - if duration_diff < -1.0: - tqdm.write(f'[4/4] WARNING: Segment total ({segments_total:.2f}s) is {abs(duration_diff):.2f}s shorter than source ({total_duration:.2f}s). Proceeding with concat (output capped to source length).', file=stderr) - stderr.flush() - # exit(1) # re-enable for hard fail when segment setup is wrong - tqdm.write(f'[4/4] WARNING: Segment total duration ({segments_total:.2f}s) differs from source ({total_duration:.2f}s) by {duration_diff:+.2f}s. Output will be capped to source length; it may not be a frame-accurate mirror.', file=stderr) - stderr.flush() - - tqdm.write('[4/4] Concatenating segments and muxing with audio...', file=stderr) - concat_extra = ['-stats_period', '5'] if verbose else [] - concat_cmd = [ - 'ffmpeg', *concat_extra, '-i', input_file, - '-f', 'concat', '-safe', '0', '-i', list_path, - '-map_metadata', '0:g', - '-map', '1:v', '-map', '0:a?', '-map', '0:s?', - '-c:v', 'copy', '-c:s', 'copy', - *split(concat_args), - '-t', str(total_duration), - '-y', output_file - ] - tqdm.write('Concat ffmpeg: ' + ' '.join(join([x]) for x in concat_cmd), file=stderr) - with TqdmAbsolute(desc='concatenating output segments') as bar: - def upd(frames, fps, time, duration, speed): - bar.total = duration - bar.update(time) - ffmpeg = FFMPEGProc(concat_cmd, update_callback=upd, echo_stderr=verbose) - if ffmpeg.run() != 0: - tqdm.write(ffmpeg.stderr, file=stderr) - return False - unlink(list_path) - - if not keep_tmp: - rmtree(tmp_dir) - return True - -def _parse_workers(host_specs): - """Parse list of 'host' or 'host:gpu' into [(host, gpu_id), ...].""" - workers = [] - for spec in host_specs or []: - spec = (spec or "").strip() - if not spec: - continue - if ":" in spec: - host, gpu = spec.rsplit(":", 1) - try: - workers.append((host.strip(), int(gpu.strip()))) - except ValueError: - workers.append((spec, 0)) - else: - workers.append((spec, 0)) - return workers - - -if __name__ == '__main__': - import argparse - parser = argparse.ArgumentParser(description='Splits a file into segments and processes them on multiple workers (host:gpu) in parallel using ffmpeg over SSH.') - parser.add_argument('input_file', help='File to encode.') - parser.add_argument('output_file', help='Path to encoded output file.') - parser.add_argument('remote_args', help='Arguments to pass to the remote ffmpeg instances (e.g. NVENC: -c:v hevc_nvenc -preset p7 ...). -gpu is added per worker.') - parser.add_argument('concat_args', default='', help='Arguments to pass to the local ffmpeg concatenating the processed video segments and muxing it with the original audio/subs/metadata.') - parser.add_argument('-s', '--segment-length', type=float, default=10, help='Segment length in seconds.') - parser.add_argument('-H', '--host', action='append', help='Worker as host or host:gpu (e.g. -H Pyro:0 -H RenderScrap:0 -H RenderScrap:1).', required=True) - parser.add_argument('-k', '--keep-tmp', action='store_true', help='Keep temporary segment files instead of deleting them on successful exit.') - parser.add_argument('-r', '--resume', action='store_true', help='Don\'t split the input file again, keep existing segments and only process the missing ones.') - parser.add_argument('-t', '--tmp-dir', default=None, help='Directory to use for temporary files. Should not already exist and will be deleted afterwards.') - parser.add_argument('-c', '--copy-input', action='store_true', help='Don\'t (losslessly) re-encode input while segmenting. Only use this if your input segments frame-perfectly with "-c:v copy" (i.e. it has no B-frames)') - parser.add_argument('-P', '--probe-host', default=None, help='SSH host to run ffprobe on (file must be at --probe-path there). Speeds up [1/4] when input is on slow/NAS path.') - parser.add_argument('--probe-path', default=None, help='Path to input file as seen on --probe-host (required if -P set).') - args = parser.parse_args() - workers = _parse_workers(args.host) - if not workers: - parser.error('At least one worker required (e.g. -H Pyro:0 -H RenderScrap:0)') - encode( - workers, - args.input_file, - args.output_file, - segment_seconds=args.segment_length, - remote_args=args.remote_args, - concat_args=args.concat_args, - tmp_dir=args.tmp_dir, - keep_tmp=args.keep_tmp, - resume=args.resume, - copy_input=args.copy_input, - probe_host=args.probe_host, - probe_path=args.probe_path - ) diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 817a297..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -# For distributed mode (encode_VOD_pyro.py -d): ffmpeg_distributed uses tqdm -tqdm>=4.0.0