bl_info = { "name": "Multi-Headless Instance Renderer | HoloMARI Platform", "author": "HP Park", "version": (1, 6, 5), "blender": (2, 93, 0), "location": "Render Properties â–¸ Multi-Instance Frames", "description": "Spawns one or more headless Blender workers per GPU to maximize available compute resources.", "category": "Render", } import bpy import os import sys import re import shutil import tempfile import time import threading import queue import subprocess import signal import socket import json import random import addon_utils import string import shlex import hashlib from pathlib import Path from collections import Counter, deque ADDON_KEY = "multi_instance_render" _KM_ITEMS = [] _MANAGER = None IS_WIN = (os.name == "nt") IS_MAC = (sys.platform == "darwin") VIDEO_FORMATS = {"FFMPEG", "AVI_JPEG", "AVI_RAW", "FRAME_SERVER"} # ----------------------- helpers ----------------------- def _log(msg): print(f"[MGPU] {msg}") def _manager_has_active_workers(manager): if not manager: return False try: for w in getattr(manager, "workers", []): try: if w.running: return True except Exception: continue except Exception: pass return False def _cleanup_stale_manager(): global _MANAGER if _MANAGER and not _manager_has_active_workers(_MANAGER): try: _MANAGER.stop() except Exception: pass _MANAGER = None def _cycles_prefs(): try: return bpy.context.preferences.addons["cycles"].preferences except Exception: return None def _current_compute_type(): cp = _cycles_prefs() if not cp: return "CUDA" return getattr(cp, "compute_device_type", "CUDA") or "CUDA" def _cycles_cpu_device_selected(): cp = _cycles_prefs() if not cp: return False try: cp.refresh_devices() except Exception: pass for d in getattr(cp, "devices", []): try: if str(getattr(d, "type", "") or "").upper() == "CPU" and bool(getattr(d, "use", False)): return True except Exception: continue return False def _fmt_bytes(n): try: for unit in ["B","KiB","MiB","GiB","TiB"]: if n < 1024: return f"{n:.1f}{unit}" n /= 1024.0 except Exception: pass return "?" def _median(values): vals = [] for v in (values or []): try: fv = float(v) if fv > 0: vals.append(fv) except Exception: pass if not vals: return None vals.sort() n = len(vals) m = n // 2 if n % 2 == 1: return vals[m] return (vals[m - 1] + vals[m]) * 0.5 _RENDERTIME_GUARD_PROFILES = { "OFF": { "enabled": False, "warmup_completed_jobs": 1, "warmup_per_worker_jobs": 1, "periodic_recycle_enabled": False, "periodic_recycle_points": [], }, "CONSERVATIVE": { "enabled": True, "warmup_completed_jobs": 1, "warmup_per_worker_jobs": 1, "min_samples_soft": 6, "soft_mult": 3.8, "soft_min_s": 150.0, "hard_mult": 7.0, "hard_min_s": 420.0, "progress_stall_s": 120.0, "hedge_grace_s": 90.0, "hedge_max_per_job": 1, "restart_max_per_job": 1, "worker_restart_cooldown_s": 300.0, "worker_restart_budget": 1, "worker_restart_window_frames": 20, "global_restart_limit": 2, "global_restart_window_s": 180.0, "single_worker_min_stall_s": 240.0, "min_baseline_s": 20.0, "periodic_recycle_enabled": False, "periodic_recycle_points": [0.25, 0.50, 0.75], "periodic_recycle_min_completed_jobs": 32, }, "BALANCED": { "enabled": True, "warmup_completed_jobs": 1, "warmup_per_worker_jobs": 1, "min_samples_soft": 4, "soft_mult": 3.0, "soft_min_s": 90.0, "hard_mult": 5.5, "hard_min_s": 300.0, "progress_stall_s": 90.0, "hedge_grace_s": 60.0, "hedge_max_per_job": 1, "restart_max_per_job": 1, "worker_restart_cooldown_s": 180.0, "worker_restart_budget": 2, "worker_restart_window_frames": 20, "global_restart_limit": 3, "global_restart_window_s": 150.0, "single_worker_min_stall_s": 180.0, "min_baseline_s": 20.0, "periodic_recycle_enabled": False, "periodic_recycle_points": [0.25, 0.50, 0.75], "periodic_recycle_min_completed_jobs": 32, }, "AGGRESSIVE": { "enabled": True, "warmup_completed_jobs": 1, "warmup_per_worker_jobs": 1, "min_samples_soft": 2, "soft_mult": 1.6, "soft_min_s": 35.0, "hard_mult": 2.1, "hard_min_s": 95.0, "progress_stall_s": 35.0, "hedge_grace_s": 20.0, "hedge_max_per_job": 1, "restart_max_per_job": 1, "worker_restart_cooldown_s": 90.0, "worker_restart_budget": 3, "worker_restart_window_frames": 20, "global_restart_limit": 8, "global_restart_window_s": 180.0, "single_worker_min_stall_s": 100.0, "min_baseline_s": 20.0, "periodic_recycle_enabled": True, "periodic_recycle_points": [0.25, 0.50, 0.75], "periodic_recycle_min_completed_jobs": 32, }, } def _rendertime_guard_profile(tier: str): key = str(tier or "AGGRESSIVE").upper() base = _RENDERTIME_GUARD_PROFILES.get(key) or _RENDERTIME_GUARD_PROFILES["AGGRESSIVE"] out = dict(base) out["tier"] = key return out def _classify_launch_exception(exc: Exception): s = str(exc or "") low = s.lower() if ("1455" in low) or ("paging file" in low) or ("not enough memory" in low): return "SYSTEM_RAM_OR_COMMIT_EXHAUSTED" if ("access is denied" in low) or ("permission denied" in low): return "ACCESS_DENIED" if ("file not found" in low) or ("no such file" in low): return "BINARY_OR_PATH_NOT_FOUND" if ("is not recognized" in low): return "BINARY_NOT_EXECUTABLE" return "PROCESS_START_EXCEPTION" def _classify_runtime_exit_reason(last_line: str, returncode): rc = "" if returncode is None else str(returncode) ll = (last_line or "").lower() if ("out of memory" in ll) or ("not enough memory" in ll): return "EXIT_OUT_OF_MEMORY" if ("cuda error out of memory" in ll) or ("optix error out of memory" in ll): return "EXIT_GPU_VRAM_OOM" if ("failed to create" in ll and "context" in ll): return "EXIT_GPU_CONTEXT_INIT_FAILED" if rc == "-1073741819": return "EXIT_ACCESS_VIOLATION" if rc == "-1073740791": return "EXIT_STACK_BUFFER_OVERRUN" return "EXIT_BEFORE_HANDSHAKE" def _mgpu_is_video(scene): try: img = scene.render.image_settings fmt = str(getattr(img, "file_format", "") or "").upper() media = str(getattr(img, "media_type", "") or "").upper() return (fmt in VIDEO_FORMATS or media == "VIDEO") except Exception: return False def _mgpu_video_output_path(scene): try: return bpy.path.abspath(scene.render.filepath) except Exception: return "" def _mgpu_first_frame_path(scene): try: return bpy.path.abspath(scene.render.frame_path(frame=scene.frame_start)) except Exception: try: return bpy.path.abspath(scene.render.filepath) except Exception: return "" def _mgpu_dir_has_entries(path): try: if not os.path.isdir(path): return False with os.scandir(path) as it: for _entry in it: return True except Exception: return False return False def _mgpu_sequence_exists(first_frame_path): try: dir_path = os.path.dirname(first_frame_path) if not os.path.isdir(dir_path): return False base = os.path.basename(first_frame_path) m = re.search(r"(\\d+)(\\.[^.]+)?$", base) if not m: return False prefix = base[:m.start(1)] suffix = m.group(2) or "" for name in os.listdir(dir_path): if not name.startswith(prefix): continue if suffix and not name.endswith(suffix): continue return True except Exception: return False return False def _mgpu_video_temp_dir_for(scene, use_target_dir=True): out_path = _mgpu_video_output_path(scene) or "mgpu_video" base = os.path.splitext(os.path.basename(out_path))[0] or "render" safe = re.sub(r"[^A-Za-z0-9._-]+", "_", base) h = hashlib.md5(out_path.encode("utf-8", "ignore")).hexdigest()[:8] if use_target_dir: out_dir = os.path.dirname(out_path) if out_dir: return os.path.join(out_dir, f"{safe}_TEMP") return os.path.join(tempfile.gettempdir(), f"mgpu_frames_{safe}_{h}") def _mgpu_overwrite_warnings(scene, is_video, temp_dir=None): warnings = [] if is_video: out_path = _mgpu_video_output_path(scene) if out_path and os.path.exists(out_path): warnings.append(f"Output file exists: {out_path}") else: first_frame = _mgpu_first_frame_path(scene) if first_frame and os.path.exists(first_frame): warnings.append(f"Output frame exists: {first_frame}") if first_frame and _mgpu_sequence_exists(first_frame): warnings.append(f"Existing frame files detected in: {os.path.dirname(first_frame)}") if temp_dir and _mgpu_dir_has_entries(temp_dir): warnings.append(f"Temp frame folder has files: {temp_dir}") return warnings def _mgpu_has_mari_addon(): mari_mod = None try: for mod in addon_utils.modules(): if getattr(mod, "addon_prefix", None) == "mari": mari_mod = mod break bi = getattr(mod, "bl_info", {}) or {} if (bi.get("name") or "").strip().lower() == "mari advanced": mari_mod = mod break except Exception: pass if not mari_mod: return False try: name = getattr(mari_mod, "__name__", None) if name: _loaded, enabled = addon_utils.check(name) return bool(enabled) except Exception: pass return False def _mgpu_enabled_addons_snapshot(): """Capture add-ons currently enabled in this Blender session.""" records = [] names = [] try: prefs_addons = getattr(bpy.context.preferences, "addons", None) if prefs_addons is not None: names.extend(list(prefs_addons.keys())) except Exception: pass try: for meta in addon_utils.modules(): mod_name = getattr(meta, "__name__", None) if not mod_name: continue enabled = False try: state = addon_utils.check(mod_name) if isinstance(state, tuple): enabled = any(bool(v) for v in state) else: enabled = bool(state) except Exception: enabled = False if enabled: names.append(mod_name) except Exception: pass dedup = [] seen = set() for n in names: if not n or n in seen: continue seen.add(n) src = "" is_pkg = False try: mod = sys.modules.get(n) if mod is None: for meta in addon_utils.modules(): if getattr(meta, "__name__", None) == n: mod = meta break src = str(getattr(mod, "__file__", "") or "") if src: src = os.path.abspath(src) is_pkg = os.path.basename(src).lower() == "__init__.py" except Exception: src = "" is_pkg = False rec = {"module": n} if src: rec["file"] = src rec["is_package"] = bool(is_pkg) records.append(rec) return records def _mgpu_enabled_addon_module_names(records): names = [] seen = set() for entry in (records or []): if isinstance(entry, str): mod_name = str(entry or "") elif isinstance(entry, dict): mod_name = str(entry.get("module") or "") else: mod_name = "" if not mod_name or mod_name in seen: continue seen.add(mod_name) names.append(mod_name) return names def _proc_rss_bytes(): if IS_WIN: try: import ctypes, ctypes.wintypes as wt class PROCESS_MEMORY_COUNTERS(ctypes.Structure): _fields_ = [ ("cb", wt.DWORD), ("PageFaultCount", wt.DWORD), ("PeakWorkingSetSize", ctypes.c_size_t), ("WorkingSetSize", ctypes.c_size_t), ("QuotaPeakPagedPoolUsage", ctypes.c_size_t), ("QuotaPagedPoolUsage", ctypes.c_size_t), ("QuotaPeakNonPagedPoolUsage", ctypes.c_size_t), ("QuotaNonPagedPoolUsage", ctypes.c_size_t), ("PagefileUsage", ctypes.c_size_t), ("PeakPagefileUsage", ctypes.c_size_t), ] GetProcessMemoryInfo = ctypes.windll.psapi.GetProcessMemoryInfo GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess h = GetCurrentProcess() counters = PROCESS_MEMORY_COUNTERS() counters.cb = ctypes.sizeof(PROCESS_MEMORY_COUNTERS) if GetProcessMemoryInfo(h, ctypes.byref(counters), counters.cb): return int(counters.WorkingSetSize) except Exception: return None else: try: import resource r = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss if IS_MAC: return int(r) return int(r * 1024) except Exception: return None def _sys_mem_available_bytes(): if IS_WIN: try: import ctypes, ctypes.wintypes as wt class MEMORYSTATUSEX(ctypes.Structure): _fields_ = [ ("dwLength", wt.DWORD), ("dwMemoryLoad", wt.DWORD), ("ullTotalPhys", ctypes.c_ulonglong), ("ullAvailPhys", ctypes.c_ulonglong), ("ullTotalPageFile", ctypes.c_ulonglong), ("ullAvailPageFile", ctypes.c_ulonglong), ("ullTotalVirtual", ctypes.c_ulonglong), ("ullAvailVirtual", ctypes.c_ulonglong), ("ullAvailExtendedVirtual", ctypes.c_ulonglong), ] stat = MEMORYSTATUSEX() stat.dwLength = ctypes.sizeof(MEMORYSTATUSEX) ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(stat)) return int(stat.ullAvailPhys) except Exception: return None try: import psutil return int(psutil.virtual_memory().available) except Exception: return None # ----------------------- GPU detection ----------------------- def _normalize_gpu_name(n: str) -> str: n = (n or "") n = re.sub(r"\s*\(Display.*?\)", "", n) n = n.replace("NVIDIA", "").replace("GeForce", "").strip() return re.sub(r"\s+", " ", n) def _normalize_pci_bus_id(raw: str) -> str: s = (str(raw or "").strip().lower()) if not s: return "" m = re.search(r"([0-9a-f]{4,8})?:?([0-9a-f]{1,2}):([0-9a-f]{1,2})(?:\.([0-7]))?", s) if not m: return "" dom = (m.group(1) or "00000000") if len(dom) == 4: dom = "0000" + dom elif len(dom) < 8: dom = dom.rjust(8, "0") bus = m.group(2).rjust(2, "0") dev = m.group(3).rjust(2, "0") fn = m.group(4) or "0" return f"{dom}:{bus}:{dev}.{fn}" def _extract_pci_bus_id_from_dev(dev) -> str | None: pat = re.compile(r"(?:^|[_\s:])([0-9A-Fa-f]{4,8}:[0-9A-Fa-f]{1,2}:[0-9A-Fa-f]{1,2}(?:\.[0-7])?)") for field in (getattr(dev, "id", ""), getattr(dev, "name", "")): m = pat.search(str(field) or "") if m: norm = _normalize_pci_bus_id(m.group(1)) if norm: return norm return None def _win_query_nvidia_smi_detailed(): if not IS_WIN: return None try: out = subprocess.check_output( ["nvidia-smi", "--query-gpu=index,uuid,pci.bus_id,name", "--format=csv,noheader"], encoding="utf-8", errors="ignore" ) phys = [] for line in out.strip().splitlines(): parts = [p.strip() for p in line.split(",")] if len(parts) >= 4: idx = int(parts[0]) uuid = parts[1] bus = _normalize_pci_bus_id(parts[2]) or parts[2].lower() name = ",".join(parts[3:]).strip() phys.append({"index": idx, "uuid": uuid, "bus": bus, "name": name}) return phys except Exception: return None def _dev_key(name: str, bus: str) -> tuple: bus = _normalize_pci_bus_id(bus) if bus: return ("bus", bus) return ("name", _normalize_gpu_name(name)) # LEGACY: broad scan (often contains 'ghost' GPU) def _detect_gpu_devices_legacy(selected_only=False): cp = _cycles_prefs() if not cp: return [] backend = getattr(cp, "compute_device_type", None) try: cp.refresh_devices() except Exception: pass devs = [] for d in getattr(cp, "devices", []): if getattr(d, "type", "") != backend: continue sel = bool(getattr(d, "use", False)) if selected_only and not sel: continue devs.append(d) if not devs: return [] phys = _win_query_nvidia_smi_detailed() out = [] if phys: bus_to_idx = {g["bus"]: g["index"] for g in phys} used = set() matched_dev_ids = set() # Pass 1: bus for d in devs: bus = _extract_pci_bus_id_from_dev(d) if bus and bus in bus_to_idx: idx = bus_to_idx[bus]; used.add(idx) out.append((idx, getattr(d,"name","?"), backend, bool(getattr(d,"use",False)), bus)) matched_dev_ids.add(id(d)) # Pass 2: name fallback for d in devs: if id(d) in matched_dev_ids: continue dn = _normalize_gpu_name(getattr(d,"name","")) cand = None for g in phys: if g["index"] in used: continue if _normalize_gpu_name(g["name"]) == dn: cand = g; break if cand: out.append((cand["index"], getattr(d,"name","?"), backend, bool(getattr(d,"use",False)), cand["bus"])) used.add(cand["index"]) else: out.append((999, getattr(d,"name","?"), backend, bool(getattr(d,"use",False)), _extract_pci_bus_id_from_dev(d) or "")) out.sort(key=lambda t: (t[0]==999, t[0])) return out return [(i, getattr(d,"name","?"), backend, bool(getattr(d,"use",False)), _extract_pci_bus_id_from_dev(d) or "") for i,d in enumerate(devs)] def _detect_gpu_devices_strict(selected_only=True): cp = _cycles_prefs() if not cp: return [] backend = getattr(cp, "compute_device_type", None) try: cp.refresh_devices() except Exception: pass devs = [] for d in getattr(cp, "devices", []): if getattr(d, "type", "") != backend: continue sel = bool(getattr(d, "use", False)) if selected_only and not sel: continue devs.append(d) if not devs: return [] phys = _win_query_nvidia_smi_detailed() out = [] if phys: bus_to_idx = {g["bus"]: g["index"] for g in phys} used = set() for d in devs: bus = _extract_pci_bus_id_from_dev(d) if bus and bus in bus_to_idx: idx = bus_to_idx[bus]; used.add(idx) out.append((idx, getattr(d,"name","?"), backend, True, bus)) else: dn = _normalize_gpu_name(getattr(d,"name","")) cand = None for g in phys: if g["index"] in used: continue if _normalize_gpu_name(g["name"]) == dn: cand = g; break if cand: used.add(cand["index"]) out.append((cand["index"], getattr(d,"name","?"), backend, True, cand["bus"])) else: out.append((999, getattr(d,"name","?"), backend, True, _extract_pci_bus_id_from_dev(d) or "")) out.sort(key=lambda t: (t[0]==999, t[0])) return out return [(i, getattr(d,"name","?"), backend, True, _extract_pci_bus_id_from_dev(d) or "") for i,d in enumerate(devs)] def _multiset_subtract(primary_list, subtract_list): sub_counts = Counter(_dev_key(name, bus) for (_i, name, _t, _sel, bus) in subtract_list) result = [] for tpl in primary_list: (_i, name, _t, _sel, bus) = tpl k = _dev_key(name, bus) if sub_counts[k] > 0: sub_counts[k] -= 1 else: result.append(tpl) return result def _dedupe_selection_by_bus(rows): rows = list(rows or []) if not rows: return rows phys = _win_query_nvidia_smi_detailed() or [] phys_name_by_bus = { _normalize_pci_bus_id(g.get("bus")): _normalize_gpu_name(g.get("name")) for g in phys if _normalize_pci_bus_id(g.get("bus")) } out = [] bus_pos = {} for row in rows: try: idx, name, backend, selected, bus = row except Exception: out.append(row) continue nbus = _normalize_pci_bus_id(bus) if not nbus: out.append(row) continue phys_norm = phys_name_by_bus.get(nbus, "") cur_norm = _normalize_gpu_name(name) def _score(_row, _norm): try: _idx, _name, _backend, _selected, _bus = _row except Exception: return -999 s = 0 if bool(_selected): s += 2 if _idx != 999: s += 1 if _norm and _normalize_gpu_name(_name) == _norm: s += 4 return s if nbus not in bus_pos: bus_pos[nbus] = len(out) out.append((idx, name, backend, selected, nbus)) continue pos = bus_pos[nbus] prev = out[pos] if _score((idx, name, backend, selected, nbus), phys_norm) > _score(prev, phys_norm): out[pos] = (idx, name, backend, selected, nbus) return out def _detect_gpu_devices_final_from_lists(mode: str, legacy, strict): legacy = list(legacy or []) strict = list(strict or []) if mode == "LEGACY_ONLY": return _dedupe_selection_by_bus(legacy) if mode == "STRICT_ONLY": return _dedupe_selection_by_bus(strict) if mode == "LEGACY_MINUS_STRICT": return _dedupe_selection_by_bus(_multiset_subtract(legacy, strict)) final = _multiset_subtract(strict, legacy) # Safety: never silently drop explicitly selected strict GPUs. if strict and len(final) < len(strict): return _dedupe_selection_by_bus(strict) return _dedupe_selection_by_bus(final) def _detect_gpu_devices_final(mode: str = "STRICT_MINUS_LEGACY"): """ mode: - 'STRICT_MINUS_LEGACY' -> strict(all) minus legacy(ghost) [default] - 'LEGACY_MINUS_STRICT' -> legacy minus strict - 'STRICT_ONLY' -> just strict - 'LEGACY_ONLY' -> just legacy """ strict = _detect_gpu_devices_strict(selected_only=True) legacy = _detect_gpu_devices_legacy(selected_only=False) return _detect_gpu_devices_final_from_lists(mode, legacy, strict) def _cycles_device_snapshot(): cp = _cycles_prefs() if not cp: return {"backend": None, "rows": []} backend = getattr(cp, "compute_device_type", None) try: cp.refresh_devices() except Exception: pass rows = [] for d in getattr(cp, "devices", []): rows.append({ "name": getattr(d, "name", "?"), "type": getattr(d, "type", "?"), "use": bool(getattr(d, "use", False)), "bus": _extract_pci_bus_id_from_dev(d) or "", "id": str(getattr(d, "id", "") or ""), }) return {"backend": backend, "rows": rows} # -------- map selection to physical UUIDs -------- def _map_selection_to_uuids(sel_tuples): phys = _win_query_nvidia_smi_detailed() if not phys: out = [] for (idx, name, _t, _sel, bus) in sel_tuples: out.append({ "index": idx, "name": name, "cycles_name": name, "bus": _normalize_pci_bus_id(bus) or bus or "", "uuid": None, "phys_index": idx if idx != 999 else None }) return out bus_map = {g["bus"]: g for g in phys} name_buckets = {} for g in phys: name_buckets.setdefault(_normalize_gpu_name(g["name"]), []).append(g) used_ids = set() out = [] for (idx, name, _t, _sel, bus) in sel_tuples: bus = _normalize_pci_bus_id(bus) or (bus or "") g = None if bus and bus in bus_map and bus_map[bus]["uuid"] not in used_ids: g = bus_map[bus] if (g is None) and (idx is not None) and isinstance(idx, int): for cand in phys: if cand["index"] == idx and cand["uuid"] not in used_ids: g = cand; break if g is None: nb = name_buckets.get(_normalize_gpu_name(name), []) for cand in nb: if cand["uuid"] not in used_ids: g = cand; break if g: used_ids.add(g["uuid"]) out.append({ "index": g["index"], # Prefer physical inventory naming from nvidia-smi. "name": g.get("name") or name, "cycles_name": name, "bus": g["bus"], "uuid": g["uuid"], "phys_index": g["index"], }) else: out.append({ "index": idx, "name": name, "cycles_name": name, "bus": bus or "", "uuid": None, "phys_index": None }) return out def _filter_known_mapped_gpus(mapped): """Hide unresolved mapped entries (phys_index=None => shown as '?').""" keep = [] dropped = [] for m in list(mapped or []): if m.get("phys_index") is None: dropped.append(m) else: keep.append(m) return keep, dropped BANNER_ASCII = r""" ▒▒▒▒▒▒▒▒▒▒▒ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ ▒▌▄ ███ ▄▐▒ ░█░█░███░█░░███░█████░███░███░█░ ▒▌█▌███▐█▐▒ ░█░█░█░█░█░░█░█░█░█░█░█░█░█░█░█░ ▒▌█▌███▐█▐▒ ░███░█░█░█░░█░█░█░█░█░███░██░░█░ ▒▌█▌███▐█▐▒ ░█░█░█░█░█░░█░█░█░█░█░█░█░█░█░█░ ▒▌█▌███▐█▐▒ ░█░█░███░██░███░█░█░█░█░█░█░█░█░ ▒▌▀ ███ ▀▐▒ ░▒░▒░▒▒▒░▒▒░▒▒▒░▒░▒░▒░▒░▒░▒░▒░▒░ ▒▒▒▒▒▒▒▒▒▒▒ BLENDER MULTI-INSTANCE RENDERER [[Provided By HoloMARI - Holographic Media Creators Platform]] Check our Holographic image rendering/sharing at: holomari.com """ # Manager-side diagnostics console banner (line 3 labels this as manager console). BANNER_MANAGER_ASCII = "\n\nRENDER MANAGER CONSOLE\n\n" + BANNER_ASCII.lstrip("\n") _WORKER_BANNER_REPEAT_EVERY_LINES = 30 _WORKER_BANNER_REPEAT_TEXT = BANNER_ASCII.lstrip("\n").rstrip("\n") + "\n" # ----------------------- child script ----------------------- _CHILD_SCRIPT_SRC = r""" import bpy, sys, json, socket, os, re, time, traceback try: import addon_utils except Exception: addon_utils = None # Make all prints flush immediately so the parent can time frames try: sys.stdout.reconfigure(line_buffering=True, write_through=True) except Exception: pass HOST = "127.0.0.1" args = sys.argv[sys.argv.index("--")+1:] if "--" in sys.argv else [] def _argval(flag, default=None): if flag in args: i = args.index(flag) return args[i+1] if i+1 < len(args) else default return default PORT = int(_argval("--mgpu-port", "0")) TOKEN = _argval("--mgpu-token", "") TAG = _argval("--mgpu-tag", "worker") DEVICE = _argval("--mgpu-device", None) FALLBACK_DEVICE = _argval("--mgpu-fallback-device", "") TARGET_GPU_BUS = _argval("--mgpu-gpu-bus", "") or "" TARGET_GPU_NAME = _argval("--mgpu-gpu-name", "") or "" THREADS = int(_argval("--mgpu-threads", "0") or "0") USECPU = int(_argval("--mgpu-usecpu", "0") or "0") DENOISE_GPU = int(_argval("--mgpu-denoise-gpu", "1") or "1") PERSIST = int(_argval("--mgpu-persistent", "1") or "1") MODE = _argval("--mgpu-mode", "FRAMES") SRC_DIR = _argval("--src-dir", "") or "" SEQ_DIR = _argval("--mgpu-seq-dir", "") or "" SEQ_FMT = (_argval("--mgpu-seq-format", "PNG") or "PNG").upper() SEQ_EXT = (_argval("--mgpu-seq-ext", ".png") or ".png").strip() or ".png" if not SEQ_EXT.startswith("."): SEQ_EXT = "." + SEQ_EXT PRECHECKED_EXISTING = int(_argval("--mgpu-prechecked-existing", "0") or "0") ADDONS_FILE = _argval("--mgpu-enabled-addons-file", "") or "" MARI_PATH = _argval("--mari-path", "") def _normalize_gpu_name(n): n = str(n or "") n = n.replace("NVIDIA", "").replace("GeForce", "").strip() n = re.sub(r"\s+", " ", n) return n.lower() def _normalize_pci_bus_id(raw): s = (str(raw or "").strip().lower()) if not s: return "" m = re.search(r"([0-9a-f]{4,8})?:?([0-9a-f]{1,2}):([0-9a-f]{1,2})(?:\.([0-7]))?", s) if not m: return "" dom = (m.group(1) or "00000000") if len(dom) == 4: dom = "0000" + dom elif len(dom) < 8: dom = dom.rjust(8, "0") bus = m.group(2).rjust(2, "0") dev = m.group(3).rjust(2, "0") fn = m.group(4) or "0" return f"{dom}:{bus}:{dev}.{fn}" def _extract_pci_bus_id_from_dev(dev): pat = re.compile(r"([0-9A-Fa-f]{4,8}:[0-9A-Fa-f]{1,2}:[0-9A-Fa-f]{1,2}(?:\.[0-7])?)") for field in (getattr(dev, "id", ""), getattr(dev, "name", "")): m = pat.search(str(field) or "") if m: norm = _normalize_pci_bus_id(m.group(1)) if norm: return norm return "" def _addon_record_fields(entry): if isinstance(entry, str): return entry, "", False if isinstance(entry, dict): return ( str(entry.get("module") or ""), str(entry.get("file") or ""), bool(entry.get("is_package", False)), ) return "", "", False def _load_addon_from_source(mod_name, src_path, is_package=False): try: import importlib.util if not mod_name or not src_path: return False src_path = os.path.abspath(src_path) if os.path.isdir(src_path): is_package = True init_path = os.path.join(src_path, "__init__.py") if not os.path.isfile(init_path): return False spec = importlib.util.spec_from_file_location( mod_name, init_path, submodule_search_locations=[src_path] ) elif is_package or os.path.basename(src_path).lower() == "__init__.py": pkg_dir = os.path.dirname(src_path) spec = importlib.util.spec_from_file_location( mod_name, src_path, submodule_search_locations=[pkg_dir] ) else: spec = importlib.util.spec_from_file_location(mod_name, src_path) if not spec or not spec.loader: return False mod = sys.modules.get(mod_name) if mod is None: mod = importlib.util.module_from_spec(spec) sys.modules[mod_name] = mod spec.loader.exec_module(mod) if hasattr(mod, "register"): try: mod.register() except Exception: pass return True except Exception: return False def _enable_parent_addons(): activated = [] if not ADDONS_FILE or not addon_utils: return activated try: with open(ADDONS_FILE, "r", encoding="utf-8") as fp: payload = json.load(fp) mods = payload.get("addons", []) if isinstance(payload, dict) else payload if not isinstance(mods, (list, tuple)): mods = [] req = len(mods) ok = 0 fail = 0 loaded_from_source = 0 already_enabled = 0 for entry in mods: mod_name, src_path, is_package = _addon_record_fields(entry) if not mod_name: continue try: st = addon_utils.check(mod_name) if isinstance(st, tuple) and any(bool(v) for v in st): ok += 1 already_enabled += 1 activated.append(mod_name) continue except Exception: pass try: addon_utils.enable(mod_name, default_set=False, persistent=False) ok += 1 activated.append(mod_name) except Exception: if _load_addon_from_source(mod_name, src_path, is_package=is_package): ok += 1 loaded_from_source += 1 activated.append(mod_name) else: fail += 1 print(f"[MGPU-CHILD] Add-on sync: requested={req} enabled={ok} already_enabled={already_enabled} loaded_from_source={loaded_from_source} failed={fail}") sys.stdout.flush() except Exception as e: print(f"[MGPU-CHILD] WARNING: addon sync failed: {e}") sys.stdout.flush() return activated _SYNCED_ADDON_MODULES = _enable_parent_addons() try: ops = dir(bpy.ops.mari) print("[MGPU-CHILD] bpy.ops.mari ->", ", ".join(ops)) except Exception: print("[MGPU-CHILD] bpy.ops.mari namespace missing") if MARI_PATH: try: if MARI_PATH not in sys.path: sys.path.insert(0, MARI_PATH) try: import holo_mari_addon as _hma except Exception: import importlib.util p = os.path.join(MARI_PATH, "__init__.py") spec = importlib.util.spec_from_file_location("holo_mari_addon", p) _hma = importlib.util.module_from_spec(spec) sys.modules["holo_mari_addon"] = _hma spec.loader.exec_module(_hma) if hasattr(_hma, "register"): _hma.register() _SYNCED_ADDON_MODULES.append("holo_mari_addon") print("[MGPU-CHILD] Loaded MARI addon from path:", MARI_PATH); sys.stdout.flush() except Exception as e: print("[MGPU-CHILD] ERROR loading MARI addon path:", e); sys.stdout.flush() def _ensure_threads(): try: if THREADS and THREADS > 0: try: bpy.context.preferences.system.threads = THREADS except Exception: pass except Exception: pass _ensure_threads() def _scene_override_kwargs(scn): kw = {} if scn is None: return kw kw["scene"] = scn try: vls = getattr(scn, "view_layers", None) if vls: active_name = "" try: active_name = str(getattr(bpy.context.view_layer, "name", "") or "") except Exception: active_name = "" if active_name and active_name in vls: kw["view_layer"] = vls[active_name] else: kw["view_layer"] = vls[0] except Exception: pass return kw def _run_with_scene_override(scn, fn): kw = _scene_override_kwargs(scn) if kw: try: ctx = bpy.context.temp_override(**kw) except Exception: ctx = None if ctx is not None: with ctx: return fn() win = getattr(bpy.context, "window", None) prev_scene = None try: if win and scn is not None: prev_scene = win.scene win.scene = scn except Exception: prev_scene = None try: return fn() finally: try: if win and prev_scene is not None: win.scene = prev_scene except Exception: pass def _handler_matches_modules(handler, modules): hmod = str(getattr(handler, "__module__", "") or "") if not hmod: return False for mod_name in (modules or []): mod_name = str(mod_name or "") if mod_name and (hmod == mod_name or hmod.startswith(mod_name + ".")): return True return False def _replay_addon_load_post_handlers(modules): mods = [str(m or "") for m in (modules or []) if str(m or "")] if not mods: return try: handlers = list(getattr(bpy.app.handlers, "load_post", []) or []) except Exception: handlers = [] if not handlers: return filepath = bpy.data.filepath or "" called = 0 failed = 0 for handler in handlers: if not callable(handler) or not _handler_matches_modules(handler, mods): continue try: handler(filepath) called += 1 except TypeError: try: handler() called += 1 except Exception as e: failed += 1 print(f"[MGPU-CHILD] WARNING: load_post replay failed for {getattr(handler, '__name__', '')}: {e}") sys.stdout.flush() except Exception as e: failed += 1 print(f"[MGPU-CHILD] WARNING: load_post replay failed for {getattr(handler, '__name__', '')}: {e}") sys.stdout.flush() if called or failed: print(f"[MGPU-CHILD] load_post replay: called={called} failed={failed} modules={len(mods)}") sys.stdout.flush() def _replay_addon_scene_handlers(handler_name, scn, modules): mods = [str(m or "") for m in (modules or []) if str(m or "")] if not scn or not mods: return try: handlers = list(getattr(bpy.app.handlers, handler_name, []) or []) except Exception: handlers = [] for handler in handlers: if not callable(handler) or not _handler_matches_modules(handler, mods): continue try: _run_with_scene_override(scn, lambda _h=handler: _h(scn)) except TypeError: try: _run_with_scene_override(scn, lambda _h=handler: _h()) except Exception as e: print(f"[MGPU-CHILD] WARNING: {handler_name} replay failed for {getattr(handler, '__name__', '')}: {e}") sys.stdout.flush() except Exception as e: print(f"[MGPU-CHILD] WARNING: {handler_name} replay failed for {getattr(handler, '__name__', '')}: {e}") sys.stdout.flush() def _force_scene_refresh(scn, frame=None): if scn is None: return try: target = int(scn.frame_current if frame is None else frame) except Exception: try: target = int(scn.frame_current) except Exception: target = 0 try: cur = int(scn.frame_current) except Exception: cur = target def _apply(): try: if cur == target: alt = target - 1 try: start = int(getattr(scn, "frame_start", target)) end = int(getattr(scn, "frame_end", target)) if alt < start and (target + 1) <= end: alt = target + 1 if alt != target and start <= alt <= end: try: scn.frame_set(alt, subframe=0.0) except TypeError: scn.frame_set(alt) except Exception: pass try: scn.frame_set(target, subframe=0.0) except TypeError: scn.frame_set(target) except Exception: pass try: bpy.context.view_layer.update() except Exception: pass try: deps = bpy.context.evaluated_depsgraph_get() upd = getattr(deps, "update", None) if callable(upd): upd() except Exception: pass try: _replay_addon_scene_handlers("frame_change_post", scn, _SYNCED_ADDON_MODULES) except Exception: pass _run_with_scene_override(scn, _apply) _replay_addon_load_post_handlers(_SYNCED_ADDON_MODULES) try: scn0 = bpy.context.scene if scn0: _force_scene_refresh(scn0, scn0.frame_current) except Exception: pass # Cycles device setup: # - GPU workers must stay on GPU (no silent CPU fallback). # - If OptiX cannot enable devices, try CUDA as a fallback backend. def _configure_cycles_devices(): try: prefs = bpy.context.preferences.addons['cycles'].preferences except Exception as e: print(f"[MGPU-CHILD] WARNING: Cycles preferences unavailable: {e}") sys.stdout.flush() # For GPU workers, missing Cycles prefs must be treated as failure to # prevent silent CPU rendering. return bool(USECPU) scn = bpy.context.scene if USECPU: try: scn.cycles.device = 'CPU' except Exception: pass try: prefs.refresh_devices() except Exception: pass cpu_enabled = 0 for d in getattr(prefs, "devices", []): try: dtype = str(getattr(d, "type", "") or "").upper() if dtype == "CPU": d.use = True cpu_enabled += 1 else: d.use = False except Exception: pass print(f"[MGPU-CHILD] Cycles device setup: mode=CPU cpu_enabled={cpu_enabled}") sys.stdout.flush() return (cpu_enabled > 0) # GPU worker path try: scn.cycles.device = 'GPU' except Exception: pass wanted = str(DEVICE or "").upper() fallback = str(FALLBACK_DEVICE or "").upper() target_bus = _normalize_pci_bus_id(TARGET_GPU_BUS) target_name_norm = _normalize_gpu_name(TARGET_GPU_NAME) attempts = [] if wanted: attempts.append(wanted) if fallback and fallback not in attempts: attempts.append(fallback) if not attempts: attempts = ["CUDA"] for backend in attempts: try: prefs.compute_device_type = backend except Exception as e: print(f"[MGPU-CHILD] Cycles backend set failed for {backend}: {e}") sys.stdout.flush() continue try: prefs.refresh_devices() except Exception as e: print(f"[MGPU-CHILD] Cycles refresh failed for {backend}: {e}") sys.stdout.flush() continue candidates = [] for d in getattr(prefs, "devices", []): try: dtype = str(getattr(d, "type", "") or "").upper() if dtype == "CPU": d.use = False continue if dtype != backend: d.use = False continue bus = _extract_pci_bus_id_from_dev(d) name = str(getattr(d, "name", dtype)) candidates.append((d, bus, name, dtype)) except Exception: pass selected = [] # Primary selector: PCI bus id (stable and unique per physical GPU). if target_bus: for (d, bus, name, _dtype) in candidates: use_this = bool(bus and bus == target_bus) d.use = use_this if use_this: selected.append((bus, name)) # Secondary selector: normalized device name (only if bus match failed). if (not selected) and target_name_norm: picked = False for (d, _bus, name, _dtype) in candidates: use_this = (not picked) and (_normalize_gpu_name(name) == target_name_norm) d.use = use_this if use_this: selected.append((_bus, name)) picked = True # Last resort: if only one candidate exists for this backend, use it. if (not selected) and len(candidates) == 1: d, bus, name, _dtype = candidates[0] d.use = True selected.append((bus, name)) print(f"[MGPU-CHILD] Cycles selector fallback: single-candidate backend={backend} bus={bus or '-'} name={name}") sys.stdout.flush() # If no explicit selector was passed, keep previous behavior (all backend devices). if (not target_bus) and (not target_name_norm): selected = [] for (d, bus, name, _dtype) in candidates: d.use = True selected.append((bus, name)) enabled = len(selected) names = [n for (_b, n) in selected] print( f"[MGPU-CHILD] Cycles device setup: backend={backend} enabled_gpu={enabled} names={names} " f"target_bus={target_bus or '-'} target_name={TARGET_GPU_NAME or '-'}" ) sys.stdout.flush() if enabled > 0: if wanted and backend != wanted: print(f"[MGPU-CHILD] Cycles backend fallback: requested={wanted} active={backend}") sys.stdout.flush() return True else: cands = [f"{b or '-'}:{n}" for (_d, b, n, _t) in candidates] print(f"[MGPU-CHILD] Cycles selector no-match for backend={backend}; candidates={cands}") sys.stdout.flush() print(f"[MGPU-CHILD] ERROR: No GPU devices enabled for backends={attempts}; CPU fallback disabled for GPU worker.") sys.stdout.flush() return False _CYCLES_READY = _configure_cycles_devices() def _enforce_cycles_scene_device(scn): try: if not scn or scn.render.engine != "CYCLES": return scn.cycles.device = 'CPU' if USECPU else 'GPU' except Exception: pass try: for _scn in list(getattr(bpy.data, "scenes", []) or []): _enforce_cycles_scene_device(_scn) except Exception: pass def _cycles_runtime_report(): rep = { "scene_device": None, "compute_device_type": None, "cpu_enabled": None, "gpu_enabled": 0, "enabled": [], } try: scn = bpy.context.scene rep["scene_device"] = str(getattr(getattr(scn, "cycles", None), "device", "") or "").upper() except Exception: pass try: prefs = bpy.context.preferences.addons['cycles'].preferences rep["compute_device_type"] = str(getattr(prefs, "compute_device_type", "") or "").upper() try: prefs.refresh_devices() except Exception: pass cpu_enabled = False gpu_enabled = 0 enabled = [] for d in getattr(prefs, "devices", []): try: use = bool(getattr(d, "use", False)) dtype = str(getattr(d, "type", "") or "").upper() name = str(getattr(d, "name", dtype)) if not use: continue enabled.append(f"{dtype}:{name}") if dtype == "CPU": cpu_enabled = True else: gpu_enabled += 1 except Exception: pass rep["cpu_enabled"] = cpu_enabled rep["gpu_enabled"] = gpu_enabled rep["enabled"] = enabled except Exception as e: rep["error"] = str(e) return rep def _cycles_policy_ok(require_gpu): rep = _cycles_runtime_report() if require_gpu: ok = ( rep.get("scene_device") == "GPU" and int(rep.get("gpu_enabled", 0) or 0) > 0 and not bool(rep.get("cpu_enabled")) ) else: ok = (rep.get("scene_device") == "CPU" and bool(rep.get("cpu_enabled"))) return ok, rep def _ensure_cycles_policy(require_gpu, phase): ok, rep = _cycles_policy_ok(require_gpu) if ok: return True, rep try: _configure_cycles_devices() except Exception: pass try: _enforce_cycles_scene_device(bpy.context.scene) except Exception: pass ok2, rep2 = _cycles_policy_ok(require_gpu) if ok2: print(f"[MGPU-CHILD] Cycles policy recovered at {phase}: {rep2}") sys.stdout.flush() return True, rep2 print(f"[MGPU-CHILD] GPU_POLICY_VIOLATION at {phase}: {rep2}") sys.stdout.flush() return False, rep2 def _set_enum_if_valid(owner, prop_name, value): try: prop = owner.bl_rna.properties.get(prop_name) if not prop: return False items = [e.identifier for e in prop.enum_items] if value in items: setattr(owner, prop_name, value) return True except Exception: pass return False def _configure_cycles_denoiser(): try: scn = bpy.context.scene except Exception: return if not scn or scn.render.engine != "CYCLES": return if not bool(DENOISE_GPU): print("[MGPU-CHILD] Cycles denoise policy: disabled by add-on setting.") sys.stdout.flush() return if bool(USECPU): print("[MGPU-CHILD] Cycles denoise policy: CPU worker, leaving denoiser unchanged.") sys.stdout.flush() return changed = [] try: c = scn.cycles except Exception: c = None if c is not None: try: if bool(getattr(c, "use_denoising", False)): if _set_enum_if_valid(c, "denoiser", "OPTIX"): changed.append("scene.cycles.denoiser=OPTIX") except Exception: pass try: if bool(getattr(c, "use_preview_denoising", False)): if _set_enum_if_valid(c, "preview_denoiser", "OPTIX"): changed.append("scene.cycles.preview_denoiser=OPTIX") except Exception: pass try: for vl in list(getattr(scn, "view_layers", []) or []): vc = getattr(vl, "cycles", None) if not vc: continue if bool(getattr(vc, "use_denoising", False)): if _set_enum_if_valid(vc, "denoiser", "OPTIX"): changed.append(f"view_layer[{vl.name}].cycles.denoiser=OPTIX") except Exception: pass if changed: print(f"[MGPU-CHILD] Cycles denoise policy: GPU denoiser configured ({'; '.join(changed)}).") else: print("[MGPU-CHILD] Cycles denoise policy: no active denoiser properties changed.") sys.stdout.flush() try: if bpy.context.scene and bpy.context.scene.render.engine == "CYCLES": _configure_cycles_denoiser() print(f"[MGPU-CHILD] Cycles runtime report(init): {_cycles_runtime_report()}"); sys.stdout.flush() except Exception: pass try: bpy.context.scene.render.use_persistent_data = bool(PERSIST) except Exception: pass # If we are building a video in the parent, workers render a temp image sequence. try: if MODE == "FRAMES" and SEQ_DIR: os.makedirs(SEQ_DIR, exist_ok=True) scn = bpy.context.scene try: scn.render.image_settings.color_mode = "RGBA" except Exception: pass try: if hasattr(scn.render.image_settings, "media_type"): scn.render.image_settings.media_type = "IMAGE" except Exception: pass scn.render.filepath = os.path.join(SEQ_DIR, "frame_") try: scn.render.use_file_extension = True except Exception: pass except Exception as e: print(f"[MGPU-CHILD] WARNING: sequence bootstrap setup failed: {e}"); sys.stdout.flush() # --- Rebase MARI output folder to original .blend directory --- try: scn = bpy.context.scene prop = getattr(scn, "mari_props", None) if prop: raw = getattr(prop, "render_settings_filepath", "") or "" name = getattr(prop, "render_settings_name", "") or "" rebased = raw # If Blender-style relative path ("//..."), rebase against SRC_DIR if raw.startswith("//") and SRC_DIR: rebased = os.path.normpath(os.path.join(SRC_DIR, raw[2:])) else: # Resolve any other path using Blender's abspath (will be absolute already) rebased = bpy.path.abspath(raw) # Ensure a trailing separator if not rebased.endswith(os.sep): rebased += os.sep # Persist back so ALL operators (including bpy.ops.mari.render_one) use the corrected absolute path prop.render_settings_filepath = rebased print(f"[MGPU-CHILD] Rebased MARI output dir to: {rebased} (name='{name}')"); sys.stdout.flush() except Exception as e: print("[MGPU-CHILD] WARNING: Could not rebase MARI output path:", e); sys.stdout.flush() def _mari_ext_from_settings(scn): # Match MARI add-on's extension mapping so filenames match everywhere ff = scn.render.image_settings.file_format.lower() if ff == "ffmpeg": # MARI uses ffmpeg.format to decide container (e.g. mkv) fmt = scn.render.ffmpeg.format return {"MPEG1":"mpeg1","MPEG2":"mpeg2","MPEG4":"mp4","AVI":"avi","QUICKTIME":"mov", "DV":"dv","OGG":"ogg","MKV":"mkv","FLASH":"flv","WEBM":"webm"}.get(fmt, "mkv") return { "jpeg":"jpeg","jpeg_2000":"jpeg","iris":"rgb", "targa":"tga","targa_raw":"tga","cineon":"cin", "open_exr":"exr","open_exr_multilayer":"exr", "tiff":"tif","avi_jpeg":"avi","avi_raw":"avi", "png":"png","bmp":"bmp" }.get(ff, ff) def _fix_mari_still_output_name(scn, H, V): try: prop = getattr(scn, "mari_props", None) if not prop: return base_dir = bpy.path.abspath(getattr(prop, "render_settings_filepath", "")) name = (getattr(prop, "render_settings_name", "") or "").strip() if not (base_dir and name): return if not base_dir.endswith(os.sep): base_dir += os.sep root = os.path.join(base_dir, name) if not os.path.isdir(root): return ext = (_mari_ext_from_settings(scn) or "").lower().lstrip(".") if not ext: return stem = f"{name}_H{int(H)}_V{int(V)}" final_path = os.path.join(root, f"{stem}.{ext}") candidates = [] for fname in os.listdir(root): full = os.path.join(root, fname) if not os.path.isfile(full): continue fstem, fext = os.path.splitext(fname) if fext.lower().lstrip(".") != ext: continue if not fstem.startswith(stem): continue suffix = fstem[len(stem):] if suffix and len(suffix) >= 3 and suffix.isdigit(): candidates.append(full) if os.path.isfile(final_path) and os.path.getsize(final_path) > 0: for extra in candidates: try: os.remove(extra) except Exception: pass return if not candidates: return candidates.sort(key=lambda p: os.path.getmtime(p), reverse=True) os.replace(candidates[0], final_path) for extra in candidates[1:]: try: os.remove(extra) except Exception: pass except Exception: pass def _prime_mari_output_for_frame(scn, H, V, action): prop = getattr(scn, "mari_props", None) if not prop: return base_dir = bpy.path.abspath(prop.render_settings_filepath) if not base_dir.endswith(os.sep): base_dir += os.sep name = prop.render_settings_name ext = _mari_ext_from_settings(scn) # Root "\\" root = os.path.join(base_dir, name) os.makedirs(root, exist_ok=True) # For image-sequence ANIM, make the per-camera folder and let Blender append frame numbers (NAME_0001, NAME_0002 ...) if action == "ANIM" and scn.render.image_settings.file_format.lower() != "ffmpeg": cam_dir = os.path.join(root, f"{name}_H{int(H)}_V{int(V)}") os.makedirs(cam_dir, exist_ok=True) scn.render.filepath = os.path.join(cam_dir, f"{name}_") elif action == "STILL": scn.render.filepath = os.path.join(root, f"{name}_H{int(H)}_V{int(V)}") else: # Video ANIM: point to final video base (per camera) scn.render.filepath = os.path.join(root, f"{name}_H{int(H)}_V{int(V)}") try: scn.render.use_file_extension = True except Exception: pass def jsend(sock, obj): sock.sendall((json.dumps(obj) + "\n").encode("utf-8", "ignore")) def jrecv(sock): buf = b"" while b"\n" not in buf: chunk = sock.recv(4096) if not chunk: raise ConnectionError("server closed") buf += chunk line, rest = buf.split(b"\n", 1) return json.loads(line.decode("utf-8", "ignore")) def _proj_bar(done, total, width=30): try: total = int(total) done = max(0, int(done)) if total <= 0: return "[------------------------------]", 0.0 ratio = min(1.0, done / float(total)) filled = int(round(width * ratio)) return "[" + ("#" * filled) + ("-" * (width - filled)) + "]", ratio * 100.0 except Exception: return "[------------------------------]", 0.0 def _proj_print(H, V, elapsed, glb): # glb carries per-job globals from the parent; we extend it in step 4 total = int(glb.get("proj_total") or 0) done_before = int(glb.get("proj_done") or 0) done_now = min(total, done_before + 1) if total > 0 else (done_before + 1) bar, pct = _proj_bar(done_now, total) try: h = int(H) except Exception: h = H try: v = int(V) except Exception: v = V print(f"[MGPU-PROJ] H{h}_V{v} | {float(elapsed):.2f}s | {bar} {pct:.1f}% ({done_now}/{total})") sys.stdout.flush() def _safe_out_path(scn, n): # Native Blender path for frame n try: p = scn.render.frame_path(frame=n) return bpy.path.abspath(p) except Exception: base = bpy.path.abspath(scn.render.filepath) if "#" in base: hashes = len(re.search(r"(#+)", base).group(1)) return re.sub(r"(#+)", str(n).zfill(hashes), base) else: root, ext = os.path.splitext(base) if not ext: ext = "." + (scn.render.file_extension or "png") return f"{root}{str(n).zfill(4)}{ext}" def _scene_expected_image_size(scn): try: pct = float(getattr(scn.render, "resolution_percentage", 100) or 100.0) w = int(round(float(scn.render.resolution_x) * pct / 100.0)) h = int(round(float(scn.render.resolution_y) * pct / 100.0)) if w > 0 and h > 0: return (w, h) except Exception: pass return None def _is_valid_render_output(path, expected_size=None): try: if os.path.getsize(path) <= 0: return False except Exception: return False img = None try: img = bpy.data.images.load(path, check_existing=False) size = getattr(img, "size", None) if not (size and size[0] > 0 and size[1] > 0): return False if expected_size: try: exp_w = int(expected_size[0]); exp_h = int(expected_size[1]) return int(size[0]) == exp_w and int(size[1]) == exp_h except Exception: return False return True except Exception: return False finally: if img is not None: try: bpy.data.images.remove(img) except Exception: pass def _render_meta(rendered=False, skipped=False, elapsed=0.0): try: elapsed = float(elapsed or 0.0) except Exception: elapsed = 0.0 return { "rendered": bool(rendered), "skipped": bool(skipped), "elapsed": elapsed, } _MARI_RENDER_ONE_STATUS_KEY = "_mari_render_one_status" def _mari_read_render_status(scn): raw = "" try: raw = str(scn.get(_MARI_RENDER_ONE_STATUS_KEY, "") or "").upper() except Exception: raw = "" return { "status": raw, "rendered": raw == "RENDERED", "skipped": raw == "SKIPPED", } _SEQ_DIRECT_SAVE = False _SEQ_DIRECT_SAVE_LOGGED = False def _render_result_image(): img = bpy.data.images.get("Render Result") if img is not None: return img for candidate in bpy.data.images: try: if getattr(candidate, "type", "") == "RENDER_RESULT": return candidate except Exception: continue return None def _save_render_result_to_file(out_path, file_format="PNG"): img = _render_result_image() if img is None: raise RuntimeError("Render Result image not found after frame render.") prev_raw = getattr(img, "filepath_raw", "") prev_fmt = getattr(img, "file_format", "PNG") try: os.makedirs(os.path.dirname(out_path), exist_ok=True) except Exception: pass try: img.filepath_raw = out_path except Exception: img.filepath = out_path try: img.file_format = str(file_format or "PNG").upper() except Exception: img.file_format = "PNG" img.save() try: img.filepath_raw = prev_raw except Exception: pass try: img.file_format = prev_fmt except Exception: pass def _render_frame_via_sandbox(main_scene, out_path, frame): global _SEQ_DIRECT_SAVE, _SEQ_DIRECT_SAVE_LOGGED prev_fp = main_scene.render.filepath prev_use_ext = getattr(main_scene.render, "use_file_extension", True) img = main_scene.render.image_settings prev_fmt = getattr(img, "file_format", "PNG") prev_mode = getattr(img, "color_mode", "RGBA") prev_depth = getattr(img, "color_depth", "8") prev_comp = getattr(img, "compression", 15) prev_media = getattr(img, "media_type", None) if hasattr(img, "media_type") else None try: # Render from the real scene so scene-bound add-ons keep their state. _force_scene_refresh(main_scene, frame) use_direct_save = bool(_SEQ_DIRECT_SAVE) if not use_direct_save: try: img.file_format = SEQ_FMT img.color_mode = "RGBA" img.color_depth = "16" img.compression = 0 try: if hasattr(img, "media_type"): img.media_type = "IMAGE" except Exception: pass except Exception as fmt_err: use_direct_save = True _SEQ_DIRECT_SAVE = True if not _SEQ_DIRECT_SAVE_LOGGED: print( f"[MGPU-CHILD] {TAG} WARN: temp frame format {SEQ_FMT} unavailable on scene render settings; " f"using Render Result direct-save fallback ({fmt_err})" ) sys.stdout.flush() _SEQ_DIRECT_SAVE_LOGGED = True main_scene.render.filepath = os.path.splitext(out_path)[0] try: main_scene.render.use_file_extension = True except Exception: pass if use_direct_save: _run_with_scene_override( main_scene, lambda: bpy.ops.render.render(write_still=False, animation=False, use_viewport=False), ) _save_render_result_to_file(out_path, file_format=SEQ_FMT) return {'FINISHED'} return _run_with_scene_override( main_scene, lambda: bpy.ops.render.render(write_still=True, animation=False, use_viewport=False), ) finally: main_scene.render.filepath = prev_fp try: main_scene.render.use_file_extension = prev_use_ext except Exception: pass try: img.file_format = prev_fmt except Exception: pass try: img.color_mode = prev_mode except Exception: pass try: img.color_depth = prev_depth except Exception: pass try: img.compression = prev_comp except Exception: pass try: if hasattr(img, "media_type") and prev_media is not None: img.media_type = prev_media except Exception: pass def render_frame(n): scn = bpy.context.scene expected_size = _scene_expected_image_size(scn) if scn.render.engine == "CYCLES": _enforce_cycles_scene_device(scn) if not _CYCLES_READY: return False, "Cycles device setup failed (no eligible GPU/CPU device configured for this worker).", _render_meta() ok_policy, rep = _ensure_cycles_policy(require_gpu=(not bool(USECPU)), phase=f"pre-frame-{n}") if not ok_policy: return False, f"GPU_POLICY_VIOLATION pre-frame-{n}: {rep}", _render_meta() _force_scene_refresh(scn, n) if SEQ_DIR: out_path = os.path.join(SEQ_DIR, f"frame_{n:04d}{SEQ_EXT}") if (not PRECHECKED_EXISTING) and (not getattr(scn.render, "use_overwrite", True)): try: if os.path.exists(out_path) and _is_valid_render_output(out_path, expected_size=expected_size): start = time.time() print(f"[MGPU-CHILD] {TAG} start frame {n} -> {out_path}"); sys.stdout.flush() elapsed = time.time() - start print(f"[MGPU-CHILD] {TAG} finished frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return True, "Skipped existing frame (overwrite disabled)", _render_meta(rendered=False, skipped=True, elapsed=elapsed) except Exception: pass try: os.makedirs(os.path.dirname(out_path), exist_ok=True) except Exception: pass if getattr(scn.render, "use_placeholder", False): try: if not os.path.exists(out_path): with open(out_path, "wb"): pass except Exception: pass try: start = time.time() print(f"[MGPU-CHILD] {TAG} start frame {n} -> {out_path}"); sys.stdout.flush() _render_frame_via_sandbox(scn, out_path, n) ok = os.path.exists(out_path) and os.path.getsize(out_path) > 0 elapsed = time.time() - start if ok and scn.render.engine == "CYCLES": ok_policy, rep = _ensure_cycles_policy(require_gpu=(not bool(USECPU)), phase=f"post-frame-{n}") if not ok_policy: return False, f"GPU_POLICY_VIOLATION post-frame-{n}: {rep}", _render_meta(rendered=True, elapsed=elapsed) if ok: print(f"[MGPU-CHILD] {TAG} finished frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return True, "", _render_meta(rendered=True, elapsed=elapsed) else: print(f"[MGPU-CHILD] {TAG} MISSING frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return False, f"Rendered file missing or empty: {out_path}", _render_meta(elapsed=elapsed) except Exception as e: print(f"[MGPU-CHILD] {TAG} ERROR frame {n}: {e}"); sys.stdout.flush() return False, str(e), _render_meta() prev_fp = scn.render.filepath prev_use_ext = getattr(scn.render, "use_file_extension", True) out_path = _safe_out_path(scn, n) if (not PRECHECKED_EXISTING) and (not getattr(scn.render, "use_overwrite", True)): try: if os.path.exists(out_path) and _is_valid_render_output(out_path, expected_size=expected_size): start = time.time() print(f"[MGPU-CHILD] {TAG} start frame {n} -> {out_path}"); sys.stdout.flush() elapsed = time.time() - start print(f"[MGPU-CHILD] {TAG} finished frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return True, "Skipped existing frame (overwrite disabled)", _render_meta(rendered=False, skipped=True, elapsed=elapsed) except Exception: pass try: os.makedirs(os.path.dirname(out_path), exist_ok=True) except Exception: pass if getattr(scn.render, "use_placeholder", False): try: if not os.path.exists(out_path): with open(out_path, "wb"): pass except Exception: pass try: scn.render.filepath = out_path try: scn.render.use_file_extension = False # out_path already has extension except Exception: pass start = time.time() print(f"[MGPU-CHILD] {TAG} start frame {n} -> {out_path}"); sys.stdout.flush() _run_with_scene_override( scn, lambda: bpy.ops.render.render(animation=False, write_still=True, use_viewport=False), ) ok = os.path.exists(out_path) and os.path.getsize(out_path) > 0 elapsed = time.time() - start if ok and scn.render.engine == "CYCLES": ok_policy, rep = _ensure_cycles_policy(require_gpu=(not bool(USECPU)), phase=f"post-frame-{n}") if not ok_policy: return False, f"GPU_POLICY_VIOLATION post-frame-{n}: {rep}", _render_meta(rendered=True, elapsed=elapsed) if ok: print(f"[MGPU-CHILD] {TAG} finished frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return True, "", _render_meta(rendered=True, elapsed=elapsed) else: print(f"[MGPU-CHILD] {TAG} MISSING frame {n} ({elapsed:.2f}s) -> {out_path}"); sys.stdout.flush() return False, f"Rendered file missing or empty: {out_path}", _render_meta(elapsed=elapsed) except Exception as e: print(f"[MGPU-CHILD] {TAG} ERROR frame {n}: {e}"); sys.stdout.flush() return False, str(e), _render_meta() finally: scn.render.filepath = prev_fp try: scn.render.use_file_extension = prev_use_ext except Exception: pass def _ensure_mari_enabled(): # If we injected a path, we already imported & registered it if MARI_PATH: return True try: import addon_utils for m in addon_utils.modules(): bi = getattr(m, "bl_info", {}) or {} nm = (bi.get("name") or "").lower() if "mari" in nm: addon_utils.enable(m.__name__, default_set=True, persistent=True) return True return False except Exception: return False def _mari_prop_mode_id(value): key = str(value or "").upper() if key == "FRAME": return "FRAME" return "CRICLE" def _apply_mari_scene_settings(scn, glb): prop = getattr(scn, "mari_props", None) settings = dict(glb.get("mari_settings") or {}) try: if "render_resolution_x" in glb: scn.render.resolution_x = int(glb.get("render_resolution_x")) if "render_resolution_y" in glb: scn.render.resolution_y = int(glb.get("render_resolution_y")) if "render_resolution_percentage" in glb: scn.render.resolution_percentage = int(glb.get("render_resolution_percentage")) except Exception: pass if not prop: return vector_props = ("frame_ratio", "frame_dimensions", "frame_center", "frame_rotation") for name in vector_props: if name not in settings: continue value = settings.get(name) try: setattr(prop, name, tuple(value)) continue except Exception: pass try: seq = tuple(value) cur = getattr(prop, name) for idx in range(min(len(cur), len(seq))): cur[idx] = seq[idx] except Exception: pass scalar_props = ("render_settings_filepath", "render_settings_name", "render_settings_normalize") for name in scalar_props: if name not in settings: continue try: setattr(prop, name, settings.get(name)) except Exception: pass def _render_mari_job(job, glb): ''' job: {"cam_name": str, "H": int, "V": int, ["frame": int]} glb: {"mode": "FRAME"/"CIRCLE", "action": "STILL"/"ANIM", "is_video": bool} ''' try: if not _ensure_mari_enabled(): return False, "MARI add-on not enabled in child", _render_meta() if not (hasattr(bpy.ops, "mari") and hasattr(bpy.ops.mari, "render_one")): return False, "bpy.ops.mari.render_one unavailable", _render_meta() scn = bpy.context.scene if scn.render.engine == "CYCLES": _enforce_cycles_scene_device(scn) if not _CYCLES_READY: return False, "Cycles device setup failed (no eligible GPU/CPU device configured for this worker).", _render_meta() ok_policy, rep = _ensure_cycles_policy(require_gpu=(not bool(USECPU)), phase=f"pre-mari-{job.get('cam_name','?')}") if not ok_policy: return False, f"GPU_POLICY_VIOLATION pre-mari: {rep}", _render_meta() prop = getattr(scn, "mari_props", None) try: scn.render.use_overwrite = bool(glb.get("use_overwrite", scn.render.use_overwrite)) if hasattr(scn.render, "use_placeholder"): scn.render.use_placeholder = bool(glb.get("use_placeholder", scn.render.use_placeholder)) except Exception: pass try: _apply_mari_scene_settings(scn, glb) except Exception: pass cam_name = job.get("cam_name") cam_obj = bpy.data.objects.get(cam_name) if cam_name else None if cam_obj: scn.camera = cam_obj else: return False, f"Camera '{cam_name}' not found", _render_meta() try: bpy.context.view_layer.update() except Exception: pass mode_target = _mari_prop_mode_id(glb.get("mode")) if prop and _mari_prop_mode_id(getattr(prop, "frame", None)) != mode_target: try: prop.frame = mode_target print(f"[MGPU-CHILD] Adjusted MARI mode to {prop.frame}") except Exception: pass try: obj = bpy.context.object if obj and obj.mode != 'OBJECT': bpy.ops.object.mode_set(mode='OBJECT', toggle=False) except Exception: pass action = glb.get("action", "STILL") try: frame = int(job.get("frame", -1)) except Exception: frame = -1 try: _force_scene_refresh(scn, (frame if frame >= 0 else scn.frame_current)) except Exception: pass H = job.get("H") V = job.get("V") if prop: # Ensure per-job paths exist and set filepaths to avoid any spillover across cameras. _prime_mari_output_for_frame(scn, H, V, action) try: scn.render.use_file_extension = True except Exception: pass tag = TAG start_msg = f"[MGPU-CHILD] {tag} start MARI {action} H{H} V{V}" if frame >= 0: start_msg += f" f{frame}" print(start_msg + f" -> {cam_name}") st = time.time() try: scn[_MARI_RENDER_ONE_STATUS_KEY] = "" except Exception: pass try: res = _run_with_scene_override( scn, lambda: bpy.ops.mari.render_one(camera_name=cam_name, action=action, frame=frame), ) except Exception as call_err: return False, str(call_err), _render_meta() ok = (res == {'FINISHED'}) if ok and scn.render.engine == "CYCLES": ok_policy, rep = _ensure_cycles_policy( require_gpu=(not bool(USECPU)), phase=f"post-mari-{cam_name}-f{frame}" ) if not ok_policy: return False, f"GPU_POLICY_VIOLATION post-mari: {rep}", _render_meta(rendered=True, elapsed=(time.time() - st)) if ok and action == "STILL": _fix_mari_still_output_name(scn, H, V) elapsed = time.time() - st status = _mari_read_render_status(scn) fin_msg = f"[MGPU-CHILD] {tag} finished MARI {action} H{H} V{V}" if frame >= 0: fin_msg += f" f{frame}" if ok: print(fin_msg + f" ({elapsed:.2f}s) -> {cam_name}") else: print(fin_msg.replace("finished", "MISSING") + f" ({elapsed:.2f}s) -> {cam_name}") sys.stdout.flush() _proj_print(H, V, elapsed, glb) meta = _render_meta(rendered=status.get("rendered"), skipped=status.get("skipped"), elapsed=elapsed) return ok, "" if ok else "mari.render_one returned CANCELLED", meta except Exception as e: traceback.print_exc() return False, str(e), _render_meta() # connect to parent scheduler import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) _CONNECT_RETRIES = 40 for attempt in range(_CONNECT_RETRIES): try: sock.connect((HOST, PORT)) break except Exception as e: if attempt + 1 == _CONNECT_RETRIES: raise print(f"[MGPU-CHILD] {TAG} waiting for scheduler ({attempt+1}/{_CONNECT_RETRIES}): {e}") time.sleep(0.25) jsend(sock, {"hello": TAG, "token": TOKEN}) while True: jsend(sock, {"get": True}) msg = jrecv(sock) if msg.get("exit"): print(f"[MGPU-CHILD] {TAG} exit signal received"); sys.stdout.flush() break if MODE == "MARI" and msg.get("mari_job"): job = msg.get("mari_job") or {} glb = msg.get("globals") or {} ok, err, meta = _render_mari_job(job, glb) if not ok: print(f"[MGPU-CHILD] {TAG} ERROR MARI job {job.get('cam_name')} f{job.get('frame','-')}: {err}"); sys.stdout.flush() jsend(sock, {"done": job.get("cam_name"), "ok": bool(ok), "job": job, "err": err, "meta": meta}) continue # default legacy: frames if "frame" in msg: n = int(msg["frame"]) ok, err, meta = render_frame(n) jsend(sock, {"done": n, "ok": bool(ok), "err": err, "meta": meta}) continue try: sock.close() except Exception: pass """ def _write_child_script(dirpath): path = os.path.join(dirpath, "mgpu_worker.py") with open(path, "w", encoding="utf-8") as f: f.write(_CHILD_SCRIPT_SRC) return path def _mgpu_scene_expected_image_size(scene): try: pct = float(getattr(scene.render, "resolution_percentage", 100) or 100.0) w = int(round(float(scene.render.resolution_x) * pct / 100.0)) h = int(round(float(scene.render.resolution_y) * pct / 100.0)) if w > 0 and h > 0: return (w, h) except Exception: pass return None def _mgpu_existing_file_nonempty(path): try: return os.path.isfile(path) and os.path.getsize(path) > 0 except Exception: return False def _mgpu_validate_existing_image(path, expected_size=None, cache=None): if cache is not None and path in cache: return bool(cache[path]) ok = False img = None try: if os.path.getsize(path) <= 0: ok = False else: img = bpy.data.images.load(path, check_existing=False) size = getattr(img, "size", None) ok = bool(size and size[0] > 0 and size[1] > 0) if ok and expected_size: try: ok = (int(size[0]) == int(expected_size[0]) and int(size[1]) == int(expected_size[1])) except Exception: ok = False except Exception: ok = False finally: if img is not None: try: bpy.data.images.remove(img) except Exception: pass if cache is not None: cache[path] = bool(ok) return bool(ok) def _mgpu_scan_dir_files(dir_path): index = {} try: if not os.path.isdir(dir_path): return index with os.scandir(dir_path) as it: for entry in it: try: if not entry.is_file(): continue except Exception: continue try: size = int(entry.stat().st_size) except Exception: size = -1 index[entry.name.lower()] = {"path": entry.path, "size": size} except Exception: return {} return index def _mgpu_scene_frame_output_path(scene, frame): try: return bpy.path.abspath(scene.render.frame_path(frame=int(frame))) except Exception: try: return bpy.path.abspath(scene.render.filepath) except Exception: return "" def _mgpu_video_seq_frame_path(seq_dir, frame, ext=".png"): ext = str(ext or ".png").strip() or ".png" if not ext.startswith("."): ext = "." + ext try: frame_num = int(frame) except Exception: frame_num = frame return os.path.join(seq_dir, f"frame_{frame_num:04d}{ext}") def _mgpu_mari_ext_from_scene(scene): try: ff = str(scene.render.image_settings.file_format or "").lower() except Exception: ff = "" if ff == "ffmpeg": try: fmt = str(scene.render.ffmpeg.format or "") except Exception: fmt = "" return { "MPEG1": "mpeg1", "MPEG2": "mpeg2", "MPEG4": "mp4", "AVI": "avi", "QUICKTIME": "mov", "DV": "dv", "OGG": "ogg", "MKV": "mkv", "FLASH": "flv", "WEBM": "webm", }.get(fmt, "mkv") return { "jpeg": "jpeg", "jpeg_2000": "jpeg", "iris": "rgb", "targa": "tga", "targa_raw": "tga", "cineon": "cin", "open_exr": "exr", "open_exr_multilayer": "exr", "tiff": "tif", "avi_jpeg": "avi", "avi_raw": "avi", "png": "png", "bmp": "bmp", }.get(ff, ff) def _mgpu_mari_output_root(scene): try: prop = getattr(scene, "mari_props", None) if not prop: return "", "" base = bpy.path.abspath(getattr(prop, "render_settings_filepath", "")) name = str(getattr(prop, "render_settings_name", "") or "").strip() if not (base and name): return "", "" return os.path.join(base, name), name except Exception: return "", "" def _mgpu_format_hv_label(value): try: return str(int(value)) except Exception: return str(value) def _mgpu_get_vse_strip_collection(se): if hasattr(se, "strips"): return se.strips if hasattr(se, "strips_all"): return se.strips_all if hasattr(se, "sequences"): return se.sequences if hasattr(se, "sequences_all"): return se.sequences_all return None def _mgpu_build_video_from_sequence(scene, seq_dir, frames, final_path): if not frames: raise RuntimeError("No frames provided to build video.") seq_dir = os.path.normpath(seq_dir) frames = sorted(frames) first_name = frames[0] first_path = os.path.join(seq_dir, first_name) if not os.path.isfile(first_path): raise RuntimeError(f"First frame not found: {first_path!r}") work_scene = scene created_scene = False try: work_scene = scene.copy() work_scene.name = "MGPU_TEMP_VSE" created_scene = True except Exception: work_scene = scene old_se = work_scene.sequence_editor old_frame_start = work_scene.frame_start old_frame_end = work_scene.frame_end old_filepath = work_scene.render.filepath old_use_seq = work_scene.render.use_sequencer old_use_cmp = work_scene.render.use_compositing se = old_se if old_se is not None else work_scene.sequence_editor_create() strip_coll = _mgpu_get_vse_strip_collection(se) if strip_coll is None: raise RuntimeError("SequenceEditor has no strips/sequences collection.") try: for s in list(strip_coll): strip_coll.remove(s) except Exception: pass strip = None frame_count = len(frames) try: try: strip = strip_coll.new_image( name="MGPU_TEMP_SEQ", filepath=first_path, channel=1, frame_start=old_frame_start, ) except TypeError: strip = strip_coll.new_image("MGPU_TEMP_SEQ", first_path, 1, old_frame_start) directory = seq_dir + os.sep if not seq_dir.endswith(os.sep) else seq_dir strip.directory = directory if strip.elements: strip.elements[0].filename = first_name else: strip.elements.append(first_name) for name in frames[1:]: strip.elements.append(name) strip.frame_start = old_frame_start strip.frame_final_duration = frame_count work_scene.frame_start = old_frame_start work_scene.frame_end = old_frame_start + frame_count - 1 work_scene.render.use_sequencer = True work_scene.render.use_compositing = False work_scene.render.filepath = os.path.splitext(final_path)[0] try: with bpy.context.temp_override(scene=work_scene, view_layer=work_scene.view_layers[0]): bpy.ops.render.render(animation=True) except Exception: win = bpy.context.window prev_scene = win.scene if win else None try: if win: win.scene = work_scene bpy.ops.render.render(animation=True) finally: if win and prev_scene: win.scene = prev_scene finally: try: work_scene.render.filepath = old_filepath work_scene.frame_start = old_frame_start work_scene.frame_end = old_frame_end work_scene.render.use_sequencer = old_use_seq work_scene.render.use_compositing = old_use_cmp except Exception: pass try: if strip is not None and strip_coll is not None and hasattr(strip_coll, "remove"): strip_coll.remove(strip) except Exception: pass if created_scene: try: bpy.data.scenes.remove(work_scene) except Exception: pass # ----------------------- progress parsing (parent side) ----------------------- _SAMPLE_RE = re.compile(r"[Ss]amples?\s+(\d+)\s*/\s*(\d+)") _TILE_RE = re.compile(r"[Tt]iles?\s+(\d+)\s*/\s*(\d+)") _TILE2_RE = re.compile(r"[Tt]ile\s+(\d+)\s*/\s*(\d+)") _CHILD_START_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+start\s+frame\s+(\d+)\s+->\s+(.+)$") _CHILD_FIN_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+finished\s+frame\s+(\d+)\s+\(([\d.]+)s\)\s+->\s+(.+)$") _CHILD_MISS_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+MISSING\s+frame\s+(\d+)\s+\(([\d.]+)s\)\s+->\s+(.+)$") _CHILD_MARI_START_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+start\s+MARI\s+(\S+)\s+H(-?\d+)\s+V(-?\d+)(?:\s+f(-?\d+))?\s+->\s+(.+)$") _CHILD_MARI_FIN_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+finished\s+MARI\s+(\S+)\s+H(-?\d+)\s+V(-?\d+)(?:\s+f(-?\d+))?\s+\(([\d.]+)s\)\s+->\s+(.+)$") _CHILD_MARI_MISS_RE = re.compile(r"^\[MGPU-CHILD\]\s+(.+?)\s+MISSING\s+MARI\s+(\S+)\s+H(-?\d+)\s+V(-?\d+)(?:\s+f(-?\d+))?\s+\(([\d.]+)s\)\s+->\s+(.+)$") def _parse_progress_fields(line: str): s_cur = s_tot = t_cur = t_tot = None m = _SAMPLE_RE.search(line) if m: try: s_cur, s_tot = int(m.group(1)), int(m.group(2)) except Exception: pass m2 = _TILE_RE.search(line) or _TILE2_RE.search(line) if m2: try: t_cur, t_tot = int(m2.group(1)), int(m2.group(2)) except Exception: pass return s_cur, s_tot, t_cur, t_tot def _progress_percent(s_cur, s_tot, t_cur, t_tot): if s_cur is not None and s_tot and s_tot > 0: return max(0.0, min(100.0, (s_cur / s_tot) * 100.0)) if t_cur is not None and t_tot and t_tot > 0: return max(0.0, min(100.0, (t_cur / t_tot) * 100.0)) return None def _progress_bar(pct, width=20): if pct is None: return "-" * width filled = max(0, min(width, int(round((pct / 100.0) * width)))) return "#" * filled + "-" * (width - filled) # ----------------------- Windows Job Object (kill children on Blender exit) ----------------------- _WS_JOB = None def _win_job_init(): if not IS_WIN: return None try: import ctypes from ctypes import wintypes as wt kernel32 = ctypes.windll.kernel32 CreateJobObjectW = kernel32.CreateJobObjectW SetInformationJobObject = kernel32.SetInformationJobObject class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure): _fields_ = [ ("PerProcessUserTimeLimit", ctypes.c_longlong), ("PerJobUserTimeLimit", ctypes.c_longlong), ("LimitFlags", wt.DWORD), ("MinimumWorkingSetSize", ctypes.c_size_t), ("MaximumWorkingSetSize", ctypes.c_size_t), ("ActiveProcessLimit", wt.DWORD), ("Affinity", wt.LPVOID), ("PriorityClass", wt.DWORD), ("SchedulingClass", wt.DWORD), ] class IO_COUNTERS(ctypes.Structure): _fields_ = [ ("ReadOperationCount", ctypes.c_ulonglong), ("WriteOperationCount", ctypes.c_ulonglong), ("OtherOperationCount", ctypes.c_ulonglong), ("ReadTransferCount", ctypes.c_ulonglong), ("WriteTransferCount", ctypes.c_ulonglong), ("OtherTransferCount", ctypes.c_ulonglong), ] class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure): _fields_ = [ ("BasicLimitInformation", JOBOBJECT_BASIC_LIMIT_INFORMATION), ("IoInfo", IO_COUNTERS), ("ProcessMemoryLimit", ctypes.c_size_t), ("JobMemoryLimit", ctypes.c_size_t), ("PeakProcessMemoryUsed", ctypes.c_size_t), ("PeakJobMemoryUsed", ctypes.c_size_t), ] JOB_OBJECT_EXTENDED_LIMIT_INFORMATION = 9 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000 hJob = CreateJobObjectW(None, None) if not hJob: return None info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION() info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE if not SetInformationJobObject(hJob, JOB_OBJECT_EXTENDED_LIMIT_INFORMATION, ctypes.byref(info), ctypes.sizeof(info)): return None return hJob except Exception: return None def _win_job_assign(proc): if not IS_WIN or not proc: return global _WS_JOB if _WS_JOB is None: _WS_JOB = _win_job_init() if _WS_JOB: try: import ctypes AssignProcessToJobObject = ctypes.windll.kernel32.AssignProcessToJobObject AssignProcessToJobObject(_WS_JOB, int(proc._handle)) except Exception: pass # ----------------------- manager / workers ----------------------- class Worker: def __init__(self, tag_label, gpu_uuid, phys_index, instance_index, total_instances, is_cpu=False, gpu_bus="", gpu_name=""): self.is_cpu = bool(is_cpu) self.gpu_uuid = gpu_uuid # 'GPU-xxxx...' or None for CPU self.phys_index = phys_index # physical index for display (int or None) self.gpu_bus = str(gpu_bus or "") self.gpu_name = str(gpu_name or "") self.instance_index = instance_index self.total_instances = total_instances self.proc = None self.stdout_thread = None self.last_line = "" self.log_path = None self._log_fp = None self.tag = f"{tag_label}-#{instance_index}" safe = re.sub(r"[^A-Za-z0-9._-]+", "_", self.tag) if not safe: safe = f"worker_{instance_index}" self._file_tag = safe self._live_line_active = False self._last_samples = (None, None) # remember for the final 100% line self.local_frames = [] # terminal tail process (PowerShell/xterm) self.term_proc = None # progress throttling state self._last_emit_time = 0.0 self._last_pct = -1.0 self._last_s_pair = (None, None) self._last_t_pair = (None, None) # per-frame state (for richer messages) self.cur_frame = None self.cur_path = None self.frame_start_time = 0.0 # launch diagnostics self.launch_state = "PLANNED" self.launch_reason = "planned" self.launch_detail = "" self.launch_attempted = False self.launch_ok = False self.launch_ts = 0.0 self.launch_pid = None self.hello_received = False self.hello_ts = 0.0 self.hello_timeout_reported = False self.exit_before_hello_reported = False self.guard_last_progress_ts = 0.0 self.guard_last_progress_sig = None self.guard_epoch = 0 self.guard_restart_ts = deque(maxlen=16) self.guard_restart_marks = [] self.guard_restarts_total = 0 self.cycles_backend_override = None self.cycles_policy_failures = 0 self.cycles_cpu_hint_ts = 0.0 self.cycles_cpu_hint_line = "" self._banner_lines_since_repeat = 0 @property def running(self): return (self.proc is not None) and (self.proc.poll() is None) def open_log(self, path): dir_path = os.path.dirname(path) os.makedirs(dir_path, exist_ok=True) self.log_path = os.path.join(dir_path, f"{self._file_tag}.log") # UTF-8 with BOM so PS5/PS7 detect UTF-8 and render block glyphs correctly self._log_fp = open(self.log_path, "w", encoding="utf-8-sig", newline="") def alive(self) -> bool: p = getattr(self, "proc", None) try: return (p is not None) and (p.poll() is None) except Exception: return False def close_log(self): try: if self._log_fp: self._log_fp.close() except Exception: pass class MultiGPUManager: def __init__(self, scene, threads=0, instances_per_gpu=1, dispatch_mode="DYNAMIC", max_retries=2, open_terms=True, ghost_mode="STRICT_MINUS_LEGACY", use_persistent_data=True, job_mode="FRAMES", mari_jobs=None, mari_globals=None, render_guard_tier="AGGRESSIVE", denoise_on_gpu=True): self.scene = scene self.device_mode = _current_compute_type() self.fallback_device_mode = "CUDA" if str(self.device_mode or "").upper() == "OPTIX" else "" self.cpu_selected = bool(_cycles_cpu_device_selected()) if scene.render.engine == "CYCLES" else False self.threads = int(threads) self.instances_per_gpu = max(1, int(instances_per_gpu)) self.dispatch_mode = dispatch_mode self.max_retries = max(0, int(max_retries)) self.open_terms = bool(open_terms) self.ghost_mode = ghost_mode self.use_persistent_data = bool(use_persistent_data) self.denoise_on_gpu = bool(denoise_on_gpu) self.job_mode = job_mode # "FRAMES" or "MARI" self.mari_jobs = list(mari_jobs or []) self.mari_globals = dict(mari_globals or {}) self.render_guard_tier = str(render_guard_tier or "AGGRESSIVE").upper() self.rt_guard_cfg = _rendertime_guard_profile(self.render_guard_tier) self.rt_guard_enabled = bool(self.rt_guard_cfg.get("enabled", False)) self.rt_guard_pause_until = 0.0 self.rt_guard_job_state = {} self.rt_guard_restart_ts = deque(maxlen=64) self._worker_hist = {} self._global_hist = deque(maxlen=80) self._rt_guard_last_log = {} self.rt_periodic_recycle_enabled = bool(self.rt_guard_cfg.get("periodic_recycle_enabled", False)) raw_points = list(self.rt_guard_cfg.get("periodic_recycle_points", []) or []) cleaned_points = [] for p in raw_points: try: fp = float(p) if 0.0 < fp < 1.0: cleaned_points.append(fp) except Exception: pass self.rt_periodic_recycle_points = sorted(set(cleaned_points)) self.rt_periodic_recycle_seen = set() self.rt_periodic_recycle_pending = {} self.video_mode = False self.video_seq_dir = None self.video_seq_format = "PNG" self.video_seq_ext = ".png" self.video_output_path = None self._forced_temp_dir = None self._preflight_existing_check_done = False self._skip_video_encode = False try: if self.job_mode == "FRAMES": img = scene.render.image_settings fmt = str(getattr(img, "file_format", "") or "").upper() media = str(getattr(img, "media_type", "") or "").upper() if fmt in VIDEO_FORMATS or media == "VIDEO": self.video_mode = True self.video_output_path = bpy.path.abspath(scene.render.filepath) except Exception: pass if self.job_mode == "FRAMES": fstart, fend, fstep = scene.frame_start, scene.frame_end, max(1, scene.frame_step) self.frames = list(range(fstart, fend + 1, fstep)) self.total_frames = len(self.frames) self.pending = list(self.frames) else: # MARI jobs can be expanded later (per-frame for ANIM) before start() self.frames = [] self.total_frames = len(self.mari_jobs) self.pending = list(self.mari_jobs) self.finished = [] self.retries = {} self.finished_set = set() self.inflight = {} self.worker_stats = {} self.total_render_time = 0.0 self.total_render_count = 0 self.rt_last_real_completion_ts = 0.0 self.temp_dir = None self.temp_blend = None self.logs_dir = None self.cancelled = False self._lock = threading.Lock() self._server_sock = None self._server_thread = None self._clients = {} self._token = ''.join(random.choice(string.ascii_letters+string.digits) for _ in range(24)) self._child_script = None self._hello_timeout_s = 20.0 self._launch_events = [] self._ram_cap_estimate = None self._ram_cap_note = "" self._enabled_addon_modules_csv = "" self._diag_log_path = None self._diag_log_fp = None self._diag_term_proc = None self._diag_term_opened = False self._diag_buffer = [] self._selection_warning = False self._legacy_detect = _detect_gpu_devices_legacy(False) self._strict_detect = _detect_gpu_devices_strict(True) sel = _detect_gpu_devices_final_from_lists( self.ghost_mode, self._legacy_detect, self._strict_detect ) mapped_all = _map_selection_to_uuids(sel) mapped, dropped_unknown = _filter_known_mapped_gpus(mapped_all) if dropped_unknown: msg = ( f"[MGPU-GPUSEL] INFO: Hidden {len(dropped_unknown)} unresolved GPU entry(s) " f"(index='?'). They will not be launched." ) _log(msg) self._diag_write(msg) for d in dropped_unknown: self._diag_write( f"[MGPU-GPUSEL] dropped idx={d.get('index')} bus={d.get('bus') or '-'} " f"uuid={d.get('uuid') or 'NONE'} name={d.get('name')}" ) self.workers = [] if mapped: for m in mapped: tag_label = f"GPU{m['phys_index'] if m['phys_index'] is not None else '??'}" for i in range(1, self.instances_per_gpu + 1): self.workers.append( Worker( tag_label, m["uuid"], m["phys_index"], i, self.instances_per_gpu, is_cpu=False, gpu_bus=(m.get("bus") or ""), gpu_name=(m.get("name") or "") ) ) if self.scene.render.engine == "CYCLES": if self.cpu_selected: self.workers.append(Worker("CPU", None, None, 1, 1, is_cpu=True)) _log("[MGPU-LAUNCH] Cycles CPU device is enabled; adding one dedicated CPU worker.") elif not mapped: _log("[MGPU-LAUNCH] No mapped GPU workers and Cycles CPU is disabled; CPU fallback is disabled.") else: if not mapped: self.workers.append(Worker("EEVEE", None, 0, 1, 1, is_cpu=False)) _log("No explicit GPU list for Eevee - running one worker.") if self.scene.render.engine == "CYCLES" and not self.workers: raise RuntimeError( "No Cycles workers planned. Enable at least one GPU device, or enable CPU in Cycles render devices." ) if self.scene.render.engine == "CYCLES": _log( f"[MGPU-LAUNCH] Cycles device policy: requested_backend={self.device_mode} " f"fallback_backend={self.fallback_device_mode or 'none'} " f"cpu_selected={'YES' if self.cpu_selected else 'NO'}" ) _log(f"[MGPU-LAUNCH] Worker plan: mapped_gpus={len(mapped)} instances_per_gpu={self.instances_per_gpu} planned_workers={len(self.workers)}") for w in self.workers: dev_txt = "CPU" if w.is_cpu else f"GPU idx={w.phys_index if w.phys_index is not None else '?'} uuid={(w.gpu_uuid or 'none')[:24]}" self._record_launch_event(w, "PLANNED", "WORKER_PLANNED", dev_txt) self._log_gpu_selection_breakdown(sel, mapped) self._rebuild_dispatch_queues() if self.rt_guard_enabled: _log( f"[MGPU-GUARD] Render-time guard active: tier={self.render_guard_tier} " f"(soft={self.rt_guard_cfg.get('soft_mult')}x/{int(self.rt_guard_cfg.get('soft_min_s', 0))}s, " f"hard={self.rt_guard_cfg.get('hard_mult')}x/{int(self.rt_guard_cfg.get('hard_min_s', 0))}s, " f"warmup_worker={int(self.rt_guard_cfg.get('warmup_per_worker_jobs', 0) or 0)}, " f"warmup_global={int(self.rt_guard_cfg.get('warmup_completed_jobs', 0) or 0)})" ) if self.rt_periodic_recycle_enabled and self.rt_periodic_recycle_points: pts = ",".join(str(int(round(p * 100.0))) for p in self.rt_periodic_recycle_points) _log(f"[MGPU-GUARD] Periodic VRAM hygiene restarts enabled at progress points: {pts}%") else: _log("[MGPU-GUARD] Render-time guard is OFF.") def prepare_blend_copy(self): if self._forced_temp_dir: self.temp_dir = self._forced_temp_dir try: if os.path.isdir(self.temp_dir) and self.scene.render.use_overwrite: shutil.rmtree(self.temp_dir, ignore_errors=True) except Exception: pass os.makedirs(self.temp_dir, exist_ok=True) else: self.temp_dir = tempfile.mkdtemp(prefix="mgpu_frames_") self.logs_dir = os.path.join(self.temp_dir, "logs") os.makedirs(self.logs_dir, exist_ok=True) self._init_diag_log() if self.video_mode: self.video_seq_dir = os.path.join(self.temp_dir, "frames") os.makedirs(self.video_seq_dir, exist_ok=True) base = os.path.basename(bpy.data.filepath) or "untitled.blend" temp_path = os.path.join(self.temp_dir, base) bpy.ops.wm.save_as_mainfile(filepath=temp_path, copy=True) self.temp_blend = temp_path self.src_blend_dir = os.path.dirname(bpy.data.filepath) self._child_script = _write_child_script(self.temp_dir) self._enabled_addons_file = None try: enabled_addons = _mgpu_enabled_addons_snapshot() addons_file = os.path.join(self.temp_dir, "enabled_addons.json") with open(addons_file, "w", encoding="utf-8") as fp: json.dump({"addons": enabled_addons}, fp) self._enabled_addons_file = addons_file self._enabled_addon_modules_csv = ",".join(_mgpu_enabled_addon_module_names(enabled_addons)) _log(f"Captured {len(enabled_addons)} enabled add-ons for workers.") except Exception as e: _log(f"WARN: Failed to capture enabled add-ons: {e}") _log(f"Prepared temp blend: {self.temp_blend}") # Bundle the MARI add-on so child workers load the same version self._mari_dir = None if self.job_mode == "MARI": try: import importlib, inspect mari_mod = None # 1) Search installed add-ons; import each real module, then look for addon_prefix == "mari" for meta in addon_utils.modules(): name = getattr(meta, "__name__", None) if not name: continue try: mod = importlib.import_module(name) except Exception: continue # Primary signal: addon declares addon_prefix = "mari" if getattr(mod, "addon_prefix", None) == "mari": mari_mod = mod break # Fallback heuristic: any registered classes with bl_idname starting with "mari." try: if any( isinstance(obj, type) and getattr(obj, "bl_idname", "").startswith("mari.") for obj in mod.__dict__.values() ): mari_mod = mod break except Exception: pass if mari_mod: src = os.path.dirname(mari_mod.__file__) dst = os.path.join(self.temp_dir, "holo_mari_addon") if os.path.isdir(src): shutil.copytree(src, dst, ignore=shutil.ignore_patterns("__pycache__", "*.pyc")) else: os.makedirs(dst, exist_ok=True) shutil.copy2(mari_mod.__file__, os.path.join(dst, "__init__.py")) self._mari_dir = dst _log(f"Copied MARI addon to: {self._mari_dir}") else: _log("WARN: Could not import/locate the MARI add-on; child will not have bpy.ops.mari.*") except Exception as e: _log(f"WARN: Failed to copy MARI add-on: {e}") def _start_server(self): self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_sock.bind(("127.0.0.1", 0)) self._server_sock.listen(16) self._server_sock.settimeout(1.0) self._server_port = self._server_sock.getsockname()[1] self._server_thread = threading.Thread(target=self._accept_loop, daemon=True) self._server_thread.start() _log(f"Scheduler server on port {self._server_port}") def _accept_loop(self): while not self.cancelled: try: conn, _addr = self._server_sock.accept() except socket.timeout: continue except Exception: break threading.Thread(target=self._client_loop, args=(conn,), daemon=True).start() def _client_loop(self, conn): f = conn.makefile("rwb", buffering=0) tag = None def jrecv(): line = f.readline() if not line: raise ConnectionError("client closed") return json.loads(line.decode("utf-8", "ignore")) def jsend(obj): f.write((json.dumps(obj)+"\n").encode("utf-8", "ignore")); f.flush() try: hello = jrecv() tag = hello.get("hello"); token = hello.get("token") if token != self._token or not tag: try: jsend({"exit": True}) except Exception: pass f.close(); conn.close(); return with self._lock: self._clients[tag] = (conn, f, jsend, jrecv) self._mark_worker_connected(tag) while not self.cancelled: msg = jrecv() if msg.get("get"): unit = None send_exit = False g = None frame = None with self._lock: if self.job_mode == "MARI": unit = self._next_mari_for_tag(tag) else: unit = self._next_frame_for_tag(tag) if unit is None: send_exit = True else: self._record_inflight(tag, unit) if self.job_mode == "MARI": g = dict(self.mari_globals) g["proj_total"] = getattr(self, "total_frames", 0) g["proj_done"] = len(self.finished_set) else: frame = int(unit) if send_exit: try: jsend({"exit": True}) except Exception: pass break if self.job_mode == "MARI": jsend({"mari_job": unit, "globals": g}) else: jsend({"frame": frame}) continue if "done" in msg: with self._lock: self._handle_job_done(tag, msg) continue except Exception as e: _log(f"[client] connection end: {e}") finally: if tag: try: with self._lock: cur = self._clients.get(tag) if cur and cur[0] is conn: self._clients.pop(tag, None) except Exception: pass try: f.close() except Exception: pass try: conn.close() except Exception: pass def _worker_by_tag(self, tag): for w in self.workers: if w.tag == tag: return w return None def _cycles_backend_for_worker(self, w: Worker): if not w: return str(self.device_mode or "CUDA").upper() if w.is_cpu or self.scene.render.engine != "CYCLES": return str(self.device_mode or "CUDA").upper() override = str(getattr(w, "cycles_backend_override", "") or "").upper() if override: return override return str(self.device_mode or "CUDA").upper() def _cycles_fallback_for_worker(self, w: Worker, primary_backend: str = ""): if (not w) or w.is_cpu or self.scene.render.engine != "CYCLES": return "" primary = str(primary_backend or self._cycles_backend_for_worker(w) or "").upper() if primary == "OPTIX": return "CUDA" return "" def _handle_cycles_gpu_policy_failure(self, w: Worker, err_text: str, inflight=None): if (not w) or w.is_cpu or self.scene.render.engine != "CYCLES": return False txt = str(err_text or "") txt_up = txt.upper() trigger = ("GPU_POLICY_VIOLATION" in txt_up) or ("CYCLES DEVICE SETUP FAILED" in txt_up) if not trigger: return False w.cycles_policy_failures = int(getattr(w, "cycles_policy_failures", 0) or 0) + 1 current_backend = self._cycles_backend_for_worker(w) switched = False if current_backend == "OPTIX": w.cycles_backend_override = "CUDA" switched = True reason = f"cycles-gpu-policy-failure#{w.cycles_policy_failures}" if switched: reason += f" backend={current_backend}->CUDA" _log(f"[MGPU-LAUNCH] {w.tag}: GPU policy violation; switching backend {current_backend} -> CUDA and restarting worker.") else: reason += f" backend={self._cycles_backend_for_worker(w)}" _log(f"[MGPU-LAUNCH] {w.tag}: GPU policy violation persisted on backend={self._cycles_backend_for_worker(w)}; restarting worker.") self._diag_write(f"[MGPU-LAUNCH] {w.tag}: err='{txt[:220]}'") ok = self._restart_worker_same_gpu(w, reason, info=inflight) if ok: self._open_diag_terminal_if_needed() else: _log(f"[MGPU-LAUNCH] WARNING: restart failed after GPU policy violation for {w.tag}.") return ok def _init_diag_log(self): if not self.logs_dir: return try: path = os.path.join(self.logs_dir, "_launch_diagnostics.log") self._diag_log_path = path self._diag_log_fp = open(path, "w", encoding="utf-8-sig", newline="") self._diag_log_fp.write(BANNER_MANAGER_ASCII.rstrip("\n") + "\n") self._diag_log_fp.write("[MGPU-LAUNCH] Diagnostics log initialized.\n") for line in self._diag_buffer: self._diag_log_fp.write((line or "").rstrip("\n") + "\n") self._diag_buffer = [] # Flush queued events captured before logs_dir existed for evt in self._launch_events: msg = f"[MGPU-LAUNCH] {evt.get('tag','?')} {evt.get('state','')}:{evt.get('reason','')}" det = evt.get("detail") or "" if det: msg += f" | {det}" self._diag_log_fp.write(msg + "\n") self._diag_log_fp.flush() except Exception: self._diag_log_fp = None def _diag_write(self, text): try: line = (text or "").rstrip("\n") if self._diag_log_fp: self._diag_log_fp.write(line + "\n") self._diag_log_fp.flush() else: self._diag_buffer.append(line) if len(self._diag_buffer) > 300: self._diag_buffer = self._diag_buffer[-300:] except Exception: pass def _spawn_tail_terminal(self, log_path: str, enable_vt=False): if not log_path: return None if IS_WIN: path_ps = str(log_path).replace("'", "''") vt_block = "" if enable_vt: vt_block = ( "$c='using System; using System.Runtime.InteropServices; " "public static class VT{" "[DllImport(\"kernel32.dll\")] public static extern System.IntPtr GetStdHandle(int n); " "[DllImport(\"kernel32.dll\")] public static extern bool GetConsoleMode(System.IntPtr h, out int m); " "[DllImport(\"kernel32.dll\")] public static extern bool SetConsoleMode(System.IntPtr h, int m);" "}'; " "Add-Type -TypeDefinition $c -ErrorAction SilentlyContinue; " "$h=[VT]::GetStdHandle(-11); $m=0; [VT]::GetConsoleMode($h,[ref]$m)|Out-Null; " "[VT]::SetConsoleMode($h, ($m -bor 4)) | Out-Null; " ) cmd = ( "$ErrorActionPreference='SilentlyContinue'; " + vt_block + "try{[Console]::OutputEncoding=[Text.UTF8Encoding]::new($true)}catch{}; " "chcp 65001 | Out-Null; " "try{$raw=$Host.UI.RawUI; $raw.BackgroundColor='Black'; $raw.ForegroundColor='Red'; Clear-Host}catch{}; " f"$p='{path_ps}'; Get-Content -LiteralPath $p -Wait" ) try: proc = subprocess.Popen( ["powershell", "-NoLogo", "-NoProfile", "-Command", cmd], creationflags=subprocess.CREATE_NEW_CONSOLE ) _win_job_assign(proc) return proc except Exception: return None if IS_MAC: path_applescript = str(log_path).replace("\\", "\\\\").replace('"', '\\"') cmd_prefix = "printf '\\\\033[0;31;40m'; clear; tail -f " script = ( 'tell application "Terminal"\n' f' do script "{cmd_prefix}" & quoted form of POSIX path of "{path_applescript}"\n' 'end tell' ) try: return subprocess.Popen(["osascript", "-e", script]) except Exception: return None # Linux / BSD / other POSIX quoted = shlex.quote(str(log_path)) tail_cmd = f"printf '\\033[0;31;40m'; clear; tail -f {quoted}" candidates = [] if shutil.which("xterm"): candidates.append(["xterm", "-hold", "-bg", "black", "-fg", "red", "-e", "sh", "-lc", tail_cmd]) if shutil.which("x-terminal-emulator"): candidates.append(["x-terminal-emulator", "-e", "sh", "-lc", tail_cmd]) if shutil.which("gnome-terminal"): candidates.append(["gnome-terminal", "--", "sh", "-lc", tail_cmd]) if shutil.which("konsole"): candidates.append(["konsole", "--hold", "-e", "sh", "-lc", tail_cmd]) if shutil.which("xfce4-terminal"): candidates.append(["xfce4-terminal", "--hold", "--command", f"sh -lc {shlex.quote(tail_cmd)}"]) if shutil.which("mate-terminal"): candidates.append(["mate-terminal", "--", "sh", "-lc", tail_cmd]) if shutil.which("lxterminal"): candidates.append(["lxterminal", "-e", f"sh -lc {shlex.quote(tail_cmd)}"]) if shutil.which("kitty"): candidates.append(["kitty", "sh", "-lc", tail_cmd]) if shutil.which("alacritty"): candidates.append(["alacritty", "-e", "sh", "-lc", tail_cmd]) for argv in candidates: try: return subprocess.Popen(argv) except Exception: continue return None def _open_diag_terminal_if_needed(self): if self._diag_term_opened: return if (not self.open_terms) or (not self._diag_log_path): return self._diag_term_opened = True self._diag_term_proc = self._spawn_tail_terminal(self._diag_log_path, enable_vt=False) if self._diag_term_proc is None: self._diag_term_opened = False def _log_gpu_selection_breakdown(self, final_sel, mapped): mode = self.ghost_mode snap = _cycles_device_snapshot() backend = snap.get("backend") rows = list(snap.get("rows") or []) if rows: type_counts = Counter(str(r.get("type") or "?") for r in rows) cnt_text = ", ".join(f"{k}:{type_counts[k]}" for k in sorted(type_counts.keys())) sum_msg = f"[MGPU-GPUSEL] cycles backend={backend} rows={len(rows)} by_type=[{cnt_text}]" _log(sum_msg) self._diag_write(sum_msg) for i, r in enumerate(rows): self._diag_write( f"[MGPU-GPUSEL] cycles[{i}] type={r.get('type')} use={r.get('use')} " f"bus={r.get('bus') or '-'} id={r.get('id') or '-'} name={r.get('name')}" ) _log( f"[MGPU-GPUSEL] mode={mode} legacy={len(self._legacy_detect)} strict={len(self._strict_detect)} " f"final={len(final_sel)} mapped={len(mapped)}" ) for i, item in enumerate(final_sel): idx, name, backend, selected, bus = item msg = f"[MGPU-GPUSEL] final[{i}] idx={idx} sel={selected} backend={backend} bus={bus or '-'} name={name}" _log(msg) self._diag_write(msg) for i, m in enumerate(mapped): msg = ( f"[MGPU-GPUSEL] mapped[{i}] phys_index={m.get('phys_index')} idx={m.get('index')} " f"bus={m.get('bus') or '-'} uuid={(m.get('uuid') or 'NONE')} " f"name={m.get('name') or '-'}" f"{(' cycles_name=' + str(m.get('cycles_name'))) if m.get('cycles_name') else ''}" ) _log(msg) self._diag_write(msg) if len(final_sel) != len(mapped): msg = f"[MGPU-GPUSEL] WARNING: final({len(final_sel)}) != mapped({len(mapped)})" _log(msg) self._diag_write(msg) self._selection_warning = True if len(final_sel) < len(self._strict_detect): msg = ( f"[MGPU-GPUSEL] WARNING: strict selected GPUs ({len(self._strict_detect)}) reduced to final ({len(final_sel)}) " f"by ghost filter mode '{mode}'. STRICT safeguard should prevent launch drops." ) _log(msg) self._diag_write(msg) self._selection_warning = True missing_uuid = sum(1 for m in mapped if not m.get("uuid")) if missing_uuid > 0: msg = f"[MGPU-GPUSEL] WARNING: {missing_uuid} mapped GPU(s) have no UUID match; launch pinning may be unreliable." _log(msg) self._diag_write(msg) self._selection_warning = True phys = _win_query_nvidia_smi_detailed() or [] if phys: mapped_phys = {m.get("phys_index") for m in mapped if m.get("phys_index") is not None} backend_rows = [r for r in rows if str(r.get("type")) == str(backend)] if backend and len(backend_rows) < len(phys): msg = ( f"[MGPU-GPUSEL] WARNING: Cycles backend '{backend}' exposes {len(backend_rows)} " f"device row(s), but nvidia-smi sees {len(phys)} GPU(s)." ) _log(msg) self._diag_write(msg) self._selection_warning = True if len(mapped_phys) < len(phys): phys_idx = {g.get("index") for g in phys if g.get("index") is not None} missing_idx = sorted([i for i in phys_idx if i not in mapped_phys]) msg = ( f"[MGPU-GPUSEL] WARNING: NVIDIA physical GPUs={len(phys)} but mapped GPUs={len(mapped_phys)}. " f"A GPU may be filtered out by backend/type mismatch or unresolved bus-id mapping." ) _log(msg) self._diag_write(msg) if missing_idx: miss_msg = f"[MGPU-GPUSEL] WARNING: unmapped NVIDIA index(es): {','.join(str(i) for i in missing_idx)}" _log(miss_msg) self._diag_write(miss_msg) self._selection_warning = True for g in phys: msg = ( f"[MGPU-GPUSEL] phys idx={g.get('index')} bus={g.get('bus')} " f"uuid={g.get('uuid')} name={g.get('name')}" ) self._diag_write(msg) def _record_launch_event(self, w: Worker, state: str, reason: str, detail: str = ""): w.launch_state = str(state or "") w.launch_reason = str(reason or "") w.launch_detail = str(detail or "") evt = {"tag": w.tag, "state": w.launch_state, "reason": w.launch_reason, "detail": w.launch_detail, "t": time.time()} self._launch_events.append(evt) msg = f"[MGPU-LAUNCH] {w.tag} {w.launch_state}: {w.launch_reason}" if w.launch_detail: msg += f" | {w.launch_detail}" _log(msg) self._diag_write(msg) try: self._emit(w, msg + "\n") except Exception: pass if w.launch_state in {"FAILED_TO_LAUNCH", "FAILED_RUNTIME", "STALLING"}: self._open_diag_terminal_if_needed() def _update_ram_capacity_note(self): rss = _proc_rss_bytes() avail = _sys_mem_available_bytes() if rss is None or avail is None: return per_child = max(int(rss * 0.8), 512 * 1024 * 1024) if per_child <= 0: return cap = max(1, int(avail // per_child)) self._ram_cap_estimate = cap planned = len(self.workers) if planned > cap: self._ram_cap_note = ( f"Planned workers={planned} exceeds rough RAM capacity={cap} " f"(RSS={_fmt_bytes(rss)}, free={_fmt_bytes(avail)}, per-worker~{_fmt_bytes(per_child)})." ) _log(f"[MGPU-LAUNCH] WARN: {self._ram_cap_note}") def _mark_worker_connected(self, tag): w = self._worker_by_tag(tag) if not w: return if not w.hello_received: w.hello_received = True w.hello_ts = time.time() self._record_launch_event(w, "CONNECTED", "WORKER_HELLO_OK", f"pid={w.launch_pid if w.launch_pid is not None else '?'}") def _check_launch_health(self): now = time.time() for w in self.workers: if not w.launch_attempted: continue p = w.proc if not p: continue try: rc = p.poll() except Exception: rc = None if (rc is None) and (not w.hello_received) and w.launch_ts and (now - w.launch_ts > self._hello_timeout_s) and (not w.hello_timeout_reported): w.hello_timeout_reported = True self._record_launch_event(w, "STALLING", "NO_HANDSHAKE_TIMEOUT", f"waited>{int(self._hello_timeout_s)}s") if (rc is not None) and (not w.hello_received) and (not w.exit_before_hello_reported): w.exit_before_hello_reported = True reason = _classify_runtime_exit_reason(getattr(w, "last_line", ""), rc) self._record_launch_event(w, "FAILED_RUNTIME", reason, f"returncode={rc} last='{(w.last_line or '').strip()[:180]}'") def _rebuild_dispatch_queues(self): if self.dispatch_mode != "STRIDE": for w in self.workers: w.local_frames = [] return worker_count = max(1, len(self.workers)) units = list(self.pending) for idx, w in enumerate(self.workers): w.local_frames = units[idx::worker_count] def _output_spec_for_unit(self, unit): expected_size = _mgpu_scene_expected_image_size(self.scene) if self.job_mode == "MARI": if not isinstance(unit, dict): return None root, name = _mgpu_mari_output_root(self.scene) ext = (_mgpu_mari_ext_from_scene(self.scene) or "").lower().lstrip(".") if not (root and name and ext): return None action = str(self.mari_globals.get("action", "STILL") or "STILL").upper() is_video = bool(self.mari_globals.get("is_video", False)) h_txt = _mgpu_format_hv_label(unit.get("H")) v_txt = _mgpu_format_hv_label(unit.get("V")) stem = f"{name}_H{h_txt}_V{v_txt}" try: frame = int(unit.get("frame", -1)) except Exception: frame = -1 if action == "ANIM" and (not is_video) and frame >= 0: return { "path": os.path.join(root, stem, f"{name}_{frame:04d}.{ext}"), "kind": "image", "expected_size": expected_size, } if action == "ANIM" and is_video: start_f = int(getattr(self.scene, "frame_start", 0)) end_f = int(getattr(self.scene, "frame_end", 0)) video_stem = f"{name}_{start_f:04d}-{end_f:04d}_H{h_txt}_V{v_txt}" return { "path": os.path.join(root, f"{video_stem}.{ext}"), "kind": "video", "expected_size": None, } return { "path": os.path.join(root, f"{stem}.{ext}"), "kind": "image", "expected_size": expected_size, } try: frame = int(unit) except Exception: frame = unit if self.video_mode and self.video_seq_dir: return { "path": _mgpu_video_seq_frame_path(self.video_seq_dir, frame, self.video_seq_ext), "kind": "image", "expected_size": expected_size, } return { "path": _mgpu_scene_frame_output_path(self.scene, frame), "kind": "image", "expected_size": expected_size, } def _preflight_existing_outputs(self): self._preflight_existing_check_done = False self._skip_video_encode = False self._rebuild_dispatch_queues() if getattr(self.scene.render, "use_overwrite", True): return pending_before = list(self.pending) total_checked = len(pending_before) if total_checked <= 0: return if self.job_mode == "FRAMES" and self.video_mode and _mgpu_existing_file_nonempty(self.video_output_path): reused = 0 for unit in pending_before: if self._mark_finished(self._job_key(unit, "FRAMES")): reused += 1 self.pending = [] self._skip_video_encode = True self._preflight_existing_check_done = True self._rebuild_dispatch_queues() msg = ( f"[MGPU-PREFLIGHT] overwrite=OFF reused existing final video; " f"checked={total_checked} reusable={reused} pending=0 final={self.video_output_path}" ) _log(msg) self._diag_write(msg) return dir_cache = {} image_cache = {} pending_after = [] reused = 0 missing = 0 invalid = 0 for unit in pending_before: spec = self._output_spec_for_unit(unit) path = os.path.normpath(str((spec or {}).get("path", "") or "")) if not path: pending_after.append(unit) missing += 1 continue dir_path = os.path.dirname(path) base = os.path.basename(path).lower() if dir_path not in dir_cache: dir_cache[dir_path] = _mgpu_scan_dir_files(dir_path) entry = dir_cache[dir_path].get(base) if not entry: pending_after.append(unit) missing += 1 continue if int(entry.get("size", -1) or -1) <= 0: pending_after.append(unit) invalid += 1 continue kind = str(spec.get("kind", "image") or "image").lower() if kind == "image": ok = _mgpu_validate_existing_image( path, expected_size=spec.get("expected_size"), cache=image_cache, ) else: ok = True if ok: if self._mark_finished(self._job_key(unit)): reused += 1 else: pending_after.append(unit) invalid += 1 self.pending = pending_after self._preflight_existing_check_done = True if self.job_mode == "FRAMES" and self.video_mode and (not self.pending) and _mgpu_existing_file_nonempty(self.video_output_path): self._skip_video_encode = True self._rebuild_dispatch_queues() msg = ( f"[MGPU-PREFLIGHT] overwrite=OFF checked={total_checked} reusable={reused} " f"pending={len(self.pending)} missing={missing} invalid={invalid} dirs={len(dir_cache)}" ) _log(msg) self._diag_write(msg) if self.job_mode == "FRAMES" and self.video_mode and (not self.pending) and (not self._skip_video_encode): enc_msg = ( f"[MGPU-PREFLIGHT] All temp frames are already valid; final video will be encoded -> " f"{self.video_output_path}" ) _log(enc_msg) self._diag_write(enc_msg) def _next_frame_for_tag(self, tag): if self.dispatch_mode == "STRIDE": for w in self.workers: if w.tag == tag and getattr(w, "local_frames", None) is not None: while w.local_frames: frame = w.local_frames.pop(0) if self._job_key(frame, "FRAMES") in self.finished_set: continue return frame return None while self.pending: frame = self.pending.pop(0) if self._job_key(frame, "FRAMES") in self.finished_set: continue return frame return None def _requeue_frame_for_tag(self, tag, frame, prefer_other=False): if self.dispatch_mode == "STRIDE": target = None if prefer_other: others = [w for w in self.workers if w.tag != tag] if others: target = min(others, key=lambda w: len(getattr(w, "local_frames", []) or [])) if target is None: for w in self.workers: if w.tag == tag: target = w break if target: if getattr(target, "local_frames", None) is None: target.local_frames = [] target.local_frames.append(frame) return else: self.pending.insert(0, frame) def _next_mari_for_tag(self, tag): if self.dispatch_mode == "STRIDE": for w in self.workers: if w.tag == tag and getattr(w, "local_frames", None) is not None: # For MARI stride we reuse local_frames to store jobs while w.local_frames: job = w.local_frames.pop(0) if self._job_key(job, "MARI") in self.finished_set: continue return job return None while self.pending: job = self.pending.pop(0) if self._job_key(job, "MARI") in self.finished_set: continue return job return None def _requeue_mari_for_tag(self, tag, job, prefer_other=False): if self.dispatch_mode == "STRIDE": target = None if prefer_other: others = [w for w in self.workers if w.tag != tag] if others: target = min(others, key=lambda w: len(getattr(w, "local_frames", []) or [])) if target is None: for w in self.workers: if w.tag == tag: target = w break if target: if getattr(target, "local_frames", None) is None: target.local_frames = [] target.local_frames.append(job) return else: self.pending.insert(0, job) def _job_key(self, job, mode=None): mode = mode or self.job_mode if mode == "MARI": if isinstance(job, dict): cam = job.get("cam_name") or job.get("camera") or job.get("name") or "?" h = job.get("H") v = job.get("V") frame = job.get("frame", None) try: frame_val = int(frame) if frame is not None else -1 except Exception: frame_val = frame if frame is not None else -1 return f"mari:{cam}:{h}:{v}:{frame_val}" return f"mari:{job}" try: n = int(job) except Exception: n = job return f"frame:{n}" def _job_label(self, job, mode=None): mode = mode or self.job_mode if mode == "MARI": if isinstance(job, dict): cam = job.get("cam_name") or "?" frame = job.get("frame", None) try: frame_val = int(frame) if frame is not None else -1 except Exception: frame_val = frame if frame is not None else -1 if frame_val is None or frame_val == -1: return cam return f"{cam} f{frame_val}" return str(job) return f"frame {job}" def _record_inflight(self, tag, job): key = self._job_key(job) now = time.time() w = self._worker_by_tag(tag) progress_ts = now if w: w.cycles_cpu_hint_ts = 0.0 w.cycles_cpu_hint_line = "" w.guard_last_progress_ts = now w.guard_last_progress_sig = ("ASSIGN", str(key)) self.inflight[tag] = { "job": job, "key": key, "start": now, "stolen": False, "guard_hedged": False, "guard_hedge_ts": 0.0, "guard_last_progress_ts": progress_ts, "guard_progress_seen": False, "guard_epoch": int(getattr(w, "guard_epoch", 0) or 0), } return key def _mark_finished(self, job_key): if job_key in self.finished_set: return False self.finished_set.add(job_key) self.finished.append(job_key) self.rt_guard_job_state.pop(job_key, None) return True def _update_worker_avg(self, tag, elapsed): try: elapsed = float(elapsed) except Exception: return if elapsed <= 0.0: return st = self.worker_stats.get(tag) if not st: st = {"count": 0, "avg": 0.0} count = st["count"] + 1 avg = (st["avg"] * st["count"] + elapsed) / count st["count"] = count st["avg"] = avg self.worker_stats[tag] = st self.total_render_time += elapsed self.total_render_count += 1 self.rt_last_real_completion_ts = time.time() try: self._worker_hist.setdefault(tag, deque(maxlen=12)).append(float(elapsed)) self._global_hist.append(float(elapsed)) except Exception: pass def _reset_worker_timing_baseline(self, tag): self.worker_stats.pop(tag, None) try: self._worker_hist[tag] = deque(maxlen=12) except Exception: self._worker_hist[tag] = deque(maxlen=12) def _avg_for_tag(self, tag): st = self.worker_stats.get(tag) if st and st.get("count", 0) > 0: return st.get("avg", 0.0) if self.total_render_count > 0: return self.total_render_time / float(self.total_render_count) return None def _rt_guard_log(self, key: str, msg: str, every_s: float = 20.0, force: bool = False): now = time.time() if not force: last = self._rt_guard_last_log.get(key, 0.0) if (now - last) < max(0.0, float(every_s)): return self._rt_guard_last_log[key] = now _log(msg) self._diag_write(msg) def _alive_worker_count(self): n = 0 for w in self.workers: try: if w.alive(): n += 1 except Exception: continue return n def _has_other_alive_worker(self, tag): for w in self.workers: if w.tag == tag: continue try: if w.alive(): return True except Exception: continue return False def _rt_pending_count(self): count = 0 if self.dispatch_mode == "STRIDE": for w in self.workers: for unit in list(getattr(w, "local_frames", []) or []): try: if self._job_key(unit) in self.finished_set: continue except Exception: pass count += 1 return count for unit in list(self.pending): try: if self._job_key(unit) in self.finished_set: continue except Exception: pass count += 1 return count def _rt_tail_straggler_ready(self, tag): # Tail-straggler means this worker is the only remaining inflight job # and there is no queued work left for anyone else. if tag not in self.inflight: return False if len(self.inflight) != 1: return False if self._rt_pending_count() > 0: return False others = [w for w in self.workers if w.tag != tag] if not others: return False for ow in others: try: st = self.worker_stats.get(ow.tag) or {} if int(st.get("count", 0) or 0) > 0: return True except Exception: pass try: if (ow.tag not in self.inflight) and ow.alive() and bool(getattr(ow, "hello_received", False)): return True except Exception: pass return False def _rt_warmup_state(self, tag): need_global = int(self.rt_guard_cfg.get("warmup_completed_jobs", 0) or 0) need_worker = int(self.rt_guard_cfg.get("warmup_per_worker_jobs", 0) or 0) done_global = int(self.total_render_count or 0) done_worker = int((self.worker_stats.get(tag) or {}).get("count", 0) or 0) block_global = (need_global > 0 and done_global < need_global) block_worker = (need_worker > 0 and done_worker < need_worker) block = bool(block_global or block_worker) return { "block": block, "done_global": done_global, "done_worker": done_worker, "need_global": need_global, "need_worker": need_worker, } def _rt_activate_periodic_recycle_stages(self): if (not self.rt_guard_enabled) or (not self.rt_periodic_recycle_enabled): return if not self.rt_periodic_recycle_points: return total = int(getattr(self, "total_frames", 0) or 0) if total <= 0: return rendered_done = int(self.total_render_count or 0) min_done = int(self.rt_guard_cfg.get("periodic_recycle_min_completed_jobs", 0) or 0) if rendered_done < max(0, min_done): return progress = float(rendered_done) / float(max(1, total)) gpu_tags = [w.tag for w in self.workers if not getattr(w, "is_cpu", False)] if not gpu_tags: return for p in self.rt_periodic_recycle_points: mark = int(round(float(p) * 100.0)) if mark in self.rt_periodic_recycle_seen: continue if progress < float(p): continue self.rt_periodic_recycle_seen.add(mark) self.rt_periodic_recycle_pending[mark] = set(gpu_tags) self._rt_guard_log( f"rt-periodic-activate-{mark}", f"[MGPU-GUARD] Activated VRAM hygiene recycle stage {mark}% " f"(rendered={rendered_done}/{total}, completed={len(self.finished_set)}/{total}).", force=True ) def _rt_try_periodic_recycle_after_job(self, tag): if (not self.rt_guard_enabled) or (not self.rt_periodic_recycle_enabled): return self._rt_activate_periodic_recycle_stages() if not self.rt_periodic_recycle_pending: return try: stages = sorted(self.rt_periodic_recycle_pending.keys()) except Exception: stages = list(self.rt_periodic_recycle_pending.keys()) for stage in stages: pending = self.rt_periodic_recycle_pending.get(stage) if not pending: self.rt_periodic_recycle_pending.pop(stage, None) continue if tag not in pending: continue w = self._worker_by_tag(tag) if (not w) or w.is_cpu: pending.discard(tag) if not pending: self.rt_periodic_recycle_pending.pop(stage, None) continue now = time.time() can_restart, why = self._rt_can_restart_worker(w, now) if not can_restart: self._rt_guard_log( f"rt-periodic-skip-{stage}-{tag}", f"[MGPU-GUARD] {tag}: periodic VRAM recycle {stage}% delayed ({why}).", every_s=20.0 ) return reason = f"periodic-vram-hygiene-{stage}%" ok = self._restart_worker_same_gpu(w, reason, info=None) if ok: pending.discard(tag) self._rt_guard_log( f"rt-periodic-restarted-{stage}-{tag}", f"[MGPU-GUARD] {tag}: periodic VRAM hygiene restart at {stage}% complete.", force=True ) self._open_diag_terminal_if_needed() if not pending: self.rt_periodic_recycle_pending.pop(stage, None) self._rt_guard_log( f"rt-periodic-stage-done-{stage}", f"[MGPU-GUARD] Periodic VRAM hygiene stage {stage}% completed for all GPU workers.", force=True ) else: self._rt_guard_log( f"rt-periodic-fail-{stage}-{tag}", f"[MGPU-GUARD] {tag}: periodic VRAM hygiene restart at {stage}% failed.", force=True ) self._open_diag_terminal_if_needed() return def _rt_baseline_for(self, tag): min_base = float(self.rt_guard_cfg.get("min_baseline_s", 20.0) or 20.0) worker_med = _median(self._worker_hist.get(tag, [])) global_med = _median(self._global_hist) tag_avg = self._avg_for_tag(tag) worker_count = int((self.worker_stats.get(tag) or {}).get("count", 0) or 0) cands = [] pref = (worker_med, tag_avg) if worker_count > 0 else () fallback = (global_med,) if worker_count <= 0 else (global_med,) for v in tuple(pref) + tuple(fallback): try: fv = float(v) if fv > 0: cands.append(fv) except Exception: pass if not cands: return min_base return max(max(cands), min_base) def _rt_stall_restart_threshold(self, baseline, soft_th, progress_stall_s): try: baseline = float(baseline) except Exception: baseline = 0.0 try: soft_th = float(soft_th) except Exception: soft_th = 0.0 try: progress_stall_s = float(progress_stall_s) except Exception: progress_stall_s = 0.0 return max(progress_stall_s * 2.0, min(soft_th if soft_th > 0.0 else progress_stall_s, baseline * 2.5 if baseline > 0.0 else progress_stall_s)) def _rt_clean_worker_restart_marks(self, w: Worker): window_frames = int(self.rt_guard_cfg.get("worker_restart_window_frames", 20) or 20) kept = [] for marker in list(getattr(w, "guard_restart_marks", []) or []): try: if (int(self.total_render_count) - int(marker)) < window_frames: kept.append(int(marker)) except Exception: pass w.guard_restart_marks = kept def _rt_can_restart_worker(self, w: Worker, now: float): if (not w) or (not w.alive()): return (False, "worker-not-alive") if now < float(getattr(self, "rt_guard_pause_until", 0.0) or 0.0): return (False, "global-pause") global_window = float(self.rt_guard_cfg.get("global_restart_window_s", 150.0) or 150.0) global_limit = int(self.rt_guard_cfg.get("global_restart_limit", 3) or 3) while self.rt_guard_restart_ts and ((now - self.rt_guard_restart_ts[0]) > global_window): self.rt_guard_restart_ts.popleft() if global_limit > 0 and len(self.rt_guard_restart_ts) >= global_limit: self.rt_guard_pause_until = now + global_window self._rt_guard_log( "rt-global-circuit", f"[MGPU-GUARD] Global restart circuit open for {int(global_window)}s (limit={global_limit}).", force=True ) self._open_diag_terminal_if_needed() return (False, "global-circuit") cooldown = float(self.rt_guard_cfg.get("worker_restart_cooldown_s", 120.0) or 120.0) if w.guard_restart_ts and ((now - w.guard_restart_ts[-1]) < cooldown): return (False, "worker-cooldown") self._rt_clean_worker_restart_marks(w) budget = int(self.rt_guard_cfg.get("worker_restart_budget", 2) or 2) if budget > 0 and len(w.guard_restart_marks) >= budget: return (False, "worker-budget") return (True, "ok") def _duplicate_job_for_hedge(self, tag, job): if job is None: return if self.job_mode == "MARI": self._requeue_mari_for_tag(tag, job, prefer_other=True) else: try: frame = int(job) except Exception: frame = job self._requeue_frame_for_tag(tag, frame, prefer_other=True) def _restart_worker_same_gpu(self, w: Worker, reason: str, info=None): now = time.time() if info is None: info = self.inflight.get(w.tag) self.inflight.pop(w.tag, None) if info: key = info.get("key") job = info.get("job") if job is not None and (not key or key not in self.finished_set): self._duplicate_job_for_hedge(w.tag, job) self._rt_guard_log( f"rt-requeue-{w.tag}", f"[MGPU-GUARD] {w.tag}: requeued {self._job_label(job)} before restart.", force=True ) p = getattr(w, "proc", None) if p and (p.poll() is None): try: if IS_WIN: p.send_signal(signal.CTRL_BREAK_EVENT) else: p.terminate() except Exception: pass deadline = time.time() + 2.5 while (p.poll() is None) and (time.time() < deadline): time.sleep(0.05) if p.poll() is None: try: p.kill() except Exception: pass w.proc = None w.stdout_thread = None w.launch_ok = False w.launch_pid = None w.hello_received = False w.hello_ts = 0.0 w.hello_timeout_reported = False w.exit_before_hello_reported = False w.guard_epoch = int(getattr(w, "guard_epoch", 0) or 0) + 1 w.guard_last_progress_ts = now w.guard_last_progress_sig = ("RESTART", int(now)) self._reset_worker_timing_baseline(w.tag) self._record_launch_event(w, "RESTARTING", "RENDERTIME_GUARD", reason) ok = self._launch_worker_process(w) if ok: w.guard_restart_ts.append(now) w.guard_restart_marks.append(int(self.total_render_count)) w.guard_restarts_total += 1 self.rt_guard_restart_ts.append(now) return ok def _check_render_time_guard(self): if not self.rt_guard_enabled: return cfg = self.rt_guard_cfg now = time.time() min_samples_soft = int(cfg.get("min_samples_soft", 3) or 3) soft_mult = float(cfg.get("soft_mult", 2.5) or 2.5) soft_min = float(cfg.get("soft_min_s", 60.0) or 60.0) hard_mult = float(cfg.get("hard_mult", 4.5) or 4.5) hard_min = float(cfg.get("hard_min_s", 180.0) or 180.0) progress_stall_s = float(cfg.get("progress_stall_s", 60.0) or 60.0) hedge_grace_s = float(cfg.get("hedge_grace_s", 45.0) or 45.0) hedge_max_per_job = int(cfg.get("hedge_max_per_job", 1) or 1) restart_max_per_job = int(cfg.get("restart_max_per_job", 1) or 1) single_worker_min_stall = float(cfg.get("single_worker_min_stall_s", 180.0) or 180.0) snapshots = {} for tag, info in list(self.inflight.items()): w = self._worker_by_tag(tag) if (not w) or (not w.alive()): continue key = info.get("key") if not key: continue start = float(info.get("start", now) or now) elapsed = max(0.0, now - start) baseline = self._rt_baseline_for(tag) soft_th = max(soft_min, baseline * soft_mult) hard_th = max(hard_min, baseline * hard_mult) progress_ts = float(getattr(w, "guard_last_progress_ts", 0.0) or 0.0) if progress_ts <= 0.0: progress_ts = float(info.get("guard_last_progress_ts", start) or start) progress_ts = max(progress_ts, start) info["guard_last_progress_ts"] = progress_ts no_progress_for = max(0.0, now - progress_ts) stall_restart_s = self._rt_stall_restart_threshold(baseline, soft_th, progress_stall_s) snapshots[tag] = { "worker": w, "key": key, "start": start, "elapsed": elapsed, "baseline": baseline, "soft_th": soft_th, "hard_th": hard_th, "progress_ts": progress_ts, "no_progress_for": no_progress_for, "stall_restart_s": stall_restart_s, } global_wave_tags = set() active_tags = [tag for tag, snap in snapshots.items() if snap["key"] not in self.finished_set] if len(active_tags) > 1 and self.total_render_count >= max(1, min_samples_soft): all_stalled = True for tag in active_tags: snap = snapshots[tag] info = self.inflight.get(tag) or {} if (not bool(info.get("guard_progress_seen"))) or snap["elapsed"] < snap["stall_restart_s"] or snap["no_progress_for"] < snap["stall_restart_s"]: all_stalled = False break if all_stalled: global_wave_tags = set(active_tags) wave_gap = min(snapshots[tag]["no_progress_for"] for tag in active_tags) self._rt_guard_log( "rt-global-stall-wave", f"[MGPU-GUARD] Global no-progress wave detected across {len(active_tags)} workers " f"(stall={wave_gap:.1f}s).", every_s=15.0 ) for tag, info in list(self.inflight.items()): snap = snapshots.get(tag) if not snap: continue w = snap["worker"] key = snap["key"] job = info.get("job") elapsed = snap["elapsed"] baseline = snap["baseline"] soft_th = snap["soft_th"] hard_th = snap["hard_th"] no_progress_for = snap["no_progress_for"] stall_restart_s = snap["stall_restart_s"] progress_seen = bool(info.get("guard_progress_seen")) state = self.rt_guard_job_state.setdefault(key, {"hedges": 0, "restarts": 0}) warm = self._rt_warmup_state(tag) if warm.get("block") and tag not in global_wave_tags: tail_ready = self._rt_tail_straggler_ready(tag) if not tail_ready: needs = [] if int(warm.get("need_worker", 0) or 0) > 0: needs.append(f"worker={int(warm.get('done_worker', 0))}/{int(warm.get('need_worker', 0))}") if int(warm.get("need_global", 0) or 0) > 0: needs.append(f"global={int(warm.get('done_global', 0))}/{int(warm.get('need_global', 0))}") detail = ", ".join(needs) if needs else "warmup" self._rt_guard_log( f"rt-warmup-skip-{tag}", f"[MGPU-GUARD] {tag}: warmup skip for {self._job_label(job)} ({detail}).", every_s=20.0 ) continue self._rt_guard_log( f"rt-warmup-tail-{tag}", f"[MGPU-GUARD] {tag}: warmup override (tail-straggler) for {self._job_label(job)}.", every_s=20.0 ) restart_reason = None if self.scene.render.engine == "CYCLES" and (not w.is_cpu): hint_ts = float(getattr(w, "cycles_cpu_hint_ts", 0.0) or 0.0) if hint_ts > 0.0 and hint_ts >= snap["start"]: line_hint = str(getattr(w, "cycles_cpu_hint_line", "") or "").strip() if self._cycles_backend_for_worker(w) == "OPTIX": w.cycles_backend_override = "CUDA" self._rt_guard_log( f"rt-cpu-hint-switch-{tag}", f"[MGPU-GUARD] {tag}: CPU hint detected; switching backend OPTIX -> CUDA.", every_s=10.0 ) restart_reason = f"cpu-device-hint ({line_hint[:96]})" if (key not in self.finished_set) and (restart_reason is None): can_hedge = ( tag not in global_wave_tags and progress_seen and elapsed >= soft_th and self.total_render_count >= min_samples_soft and (not info.get("guard_hedged")) and state.get("hedges", 0) < hedge_max_per_job and self._has_other_alive_worker(tag) ) if can_hedge: info["guard_hedged"] = True info["guard_hedge_ts"] = now state["hedges"] = int(state.get("hedges", 0)) + 1 self._duplicate_job_for_hedge(tag, job) self._rt_guard_log( f"rt-hedge-{key}", f"[MGPU-GUARD] {tag}: hedge duplicate for {self._job_label(job)} " f"(elapsed={elapsed:.1f}s, baseline={baseline:.1f}s, soft={soft_th:.1f}s).", force=True ) continue if (key in self.finished_set) and info.get("guard_hedged"): hedge_for = max(0.0, now - float(info.get("guard_hedge_ts", now) or now)) if hedge_for >= hedge_grace_s: restart_reason = f"hedged-copy-finished-elsewhere ({hedge_for:.0f}s)" if restart_reason is None and info.get("guard_hedged"): hedge_for = max(0.0, now - float(info.get("guard_hedge_ts", now) or now)) if progress_seen and hedge_for >= hedge_grace_s and no_progress_for >= stall_restart_s: restart_reason = f"post-hedge no-progress {no_progress_for:.0f}s" if restart_reason is None and tag in global_wave_tags: restart_reason = f"global-no-progress-wave {no_progress_for:.0f}s" if restart_reason is None and elapsed >= hard_th: if progress_seen and no_progress_for >= progress_stall_s: restart_reason = f"no-progress {no_progress_for:.0f}s" elif info.get("guard_hedged"): hedge_for = max(0.0, now - float(info.get("guard_hedge_ts", now) or now)) if hedge_for >= hedge_grace_s: restart_reason = f"post-hedge slow ({hedge_for:.0f}s)" elif elapsed >= (hard_th * 1.35): restart_reason = "hard-timeout" if restart_reason is None: continue if int(state.get("restarts", 0)) >= restart_max_per_job: self._rt_guard_log( f"rt-restart-cap-{key}", f"[MGPU-GUARD] {tag}: restart cap reached for {self._job_label(job)}; continuing without restart.", every_s=45.0 ) continue if self._alive_worker_count() <= 1 and no_progress_for < single_worker_min_stall: self._rt_guard_log( f"rt-single-skip-{tag}", f"[MGPU-GUARD] {tag}: single-worker mode, delaying restart until stall>{int(single_worker_min_stall)}s.", every_s=45.0 ) continue can_restart, why = self._rt_can_restart_worker(w, now) if not can_restart: self._rt_guard_log( f"rt-restart-skip-{tag}-{why}", f"[MGPU-GUARD] {tag}: restart skipped ({why}).", every_s=30.0 ) continue state["restarts"] = int(state.get("restarts", 0)) + 1 reason = ( f"{restart_reason}; elapsed={elapsed:.1f}s baseline={baseline:.1f}s " f"stall={stall_restart_s:.1f}s soft={soft_th:.1f}s hard={hard_th:.1f}s" ) ok = self._restart_worker_same_gpu(w, reason, info=info) if ok: self._rt_guard_log( f"rt-restarted-{tag}", f"[MGPU-GUARD] {tag}: restarted on same GPU ({reason}).", force=True ) self._open_diag_terminal_if_needed() else: self._rt_guard_log( f"rt-restart-fail-{tag}", f"[MGPU-GUARD] {tag}: restart failed ({reason}).", force=True ) self._open_diag_terminal_if_needed() def _handle_job_done(self, tag, msg): ok = bool(msg.get("ok", False)) meta = dict(msg.get("meta") or {}) inflight = self.inflight.pop(tag, None) job = inflight.get("job") if inflight else None key = inflight.get("key") if inflight else None if job is None: job = msg.get("job") if job is None: job = msg.get("done") if job is None: _log(f"WARNING: Missing job payload from {tag}") return if key is None: mode = "MARI" if isinstance(job, dict) else "FRAMES" key = self._job_key(job, mode) if not ok: w = self._worker_by_tag(tag) err_text = str(msg.get("err", "") or "") if self._handle_cycles_gpu_policy_failure(w, err_text, inflight=inflight): return self._handle_retry(tag, job, key=key, reason="failed") return if key in self.finished_set: self.rt_guard_job_state.pop(key, None) return w = self._worker_by_tag(tag) if w: try: w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("SOCKET_DONE", str(key)) except Exception: pass if self.scene.render.engine == "CYCLES" and w and (not w.is_cpu): hint_ts = float(getattr(w, "cycles_cpu_hint_ts", 0.0) or 0.0) start_ts = float((inflight or {}).get("start", 0.0) or 0.0) if hint_ts > 0.0 and start_ts > 0.0 and hint_ts >= start_ts: err_text = f"GPU_POLICY_VIOLATION cpu-hint: {getattr(w, 'cycles_cpu_hint_line', '')}" if self._handle_cycles_gpu_policy_failure(w, err_text, inflight=inflight): return self._handle_retry(tag, job, key=key, reason="cpu-hint") return rendered = meta.get("rendered") skipped = meta.get("skipped") if rendered is None: err_text = str(msg.get("err", "") or "") if "skip existing" in err_text.lower(): rendered = False skipped = True else: rendered = ok rendered = bool(rendered) skipped = bool(skipped) if rendered: elapsed = 0.0 try: elapsed = float(meta.get("elapsed", 0.0) or 0.0) except Exception: elapsed = 0.0 if elapsed <= 0.0 and inflight and inflight.get("start"): elapsed = max(0.0, time.time() - inflight["start"]) self._update_worker_avg(tag, elapsed) elif skipped: self._rt_guard_log( f"rt-skip-{tag}-{key}", f"[MGPU-GUARD] {tag}: reused existing output for {self._job_label(job)}; not counted in timing baseline.", every_s=5.0 ) marked = self._mark_finished(key) if marked and rendered: self._rt_try_periodic_recycle_after_job(tag) def _handle_retry(self, tag, job, key=None, reason="failed", prefer_other=False): if job is None: return if key is None: mode = "MARI" if isinstance(job, dict) else "FRAMES" key = self._job_key(job, mode) if key in self.finished_set: return self.inflight.pop(tag, None) tries = self.retries.get(key, 0) + 1 self.retries[key] = tries if tries > self.max_retries: _log(f"Giving up on {self._job_label(job)} after {tries - 1} retries ({reason}).") self._mark_finished(key) return _log(f"Retrying {self._job_label(job)} ({reason}) attempt {tries}/{self.max_retries}") if self.job_mode == "MARI": self._requeue_mari_for_tag(tag, job, prefer_other=prefer_other) else: try: frame = int(job) except Exception: frame = job self._requeue_frame_for_tag(tag, frame, prefer_other=prefer_other) def _check_inflight_timeouts(self): if self.rt_guard_enabled: return if len(self.workers) < 2: return now = time.time() for tag, info in list(self.inflight.items()): if info.get("stolen"): continue start = info.get("start", 0) if not start: continue avg = self._avg_for_tag(tag) if avg is None or avg <= 0.0: continue elapsed = now - start threshold = max(avg * 2.5, 30.0) if elapsed >= threshold: info["stolen"] = True job = info.get("job") _log(f"Slow job on {tag} ({elapsed:.1f}s > {threshold:.1f}s). Requeueing {self._job_label(job)}.") self._handle_retry(tag, job, reason="slow", prefer_other=True) # ---------- terminal tail ---------- def _write_header(self, w: Worker): try: scene = self.scene engine = scene.render.engine backend = self.device_mode if engine == "CYCLES" and not w.is_cpu: backend = self._cycles_backend_for_worker(w) dev = "CPU" if w.is_cpu else ( f"GPU {w.phys_index if w.phys_index is not None else '?'} {(w.gpu_uuid or '')[:12]}" f"{(' bus=' + w.gpu_bus) if w.gpu_bus else ''}" ) blend_name = os.path.basename(bpy.data.filepath or "untitled.blend") fstart, fend, fstep = scene.frame_start, scene.frame_end, scene.frame_step try: out_dir = os.path.dirname(bpy.path.abspath(scene.render.frame_path(frame=fstart))) except Exception: out_dir = bpy.path.abspath(scene.render.filepath) header = ( BANNER_ASCII + "\n" f"[MGPU-INFO] Tag: {w.tag}\n" f"[MGPU-INFO] Engine: {engine} | Backend: {backend} | Device: {dev} | Threads/child: {self.threads} | Guard: {self.render_guard_tier} | DenoiseGPU: {'ON' if self.denoise_on_gpu else 'OFF'}\n" f"[MGPU-INFO] .blend: {blend_name} | Frames: {fstart}–{fend} step {fstep}\n" f"[MGPU-INFO] Output dir: {out_dir}\n" ) w._banner_lines_since_repeat = 0 self._emit(w, header, count_for_banner=False) except Exception: pass def _open_terminal_tail(self, w: Worker): if not self.open_terms: return proc = self._spawn_tail_terminal(w.log_path, enable_vt=True) if proc: w.term_proc = proc def _launch_worker_process(self, w: Worker): with open(w.log_path, "a", encoding="utf-8"): pass w.launch_attempted = True w.launch_ts = time.time() w.guard_last_progress_ts = w.launch_ts w.guard_last_progress_sig = ("LAUNCH", int(w.launch_ts)) w.cycles_cpu_hint_ts = 0.0 w.cycles_cpu_hint_line = "" blender_bin = bpy.app.binary_path launch_backend = str(self.device_mode or "").upper() launch_fallback = "" if self.scene.render.engine == "CYCLES" and not w.is_cpu: launch_backend = self._cycles_backend_for_worker(w) launch_fallback = self._cycles_fallback_for_worker(w, launch_backend) cmd = [ blender_bin, "--enable-autoexec", ] if getattr(self, "_enabled_addon_modules_csv", ""): cmd += ["--addons", self._enabled_addon_modules_csv] cmd += [ "-b", self.temp_blend, "-P", self._child_script, "--", ] if self.scene.render.engine == "CYCLES": cli_dev = "CPU" if w.is_cpu else (launch_backend or self.device_mode or "CUDA") cmd += ["--cycles-device", str(cli_dev).upper()] cmd += [ "--mgpu-port", str(self._server_port), "--mgpu-token", self._token, "--mgpu-tag", w.tag, "--mgpu-device", launch_backend or self.device_mode, "--mgpu-threads", str(self.threads), "--mgpu-usecpu", "1" if w.is_cpu else "0", "--mgpu-denoise-gpu", "1" if self.denoise_on_gpu else "0", "--mgpu-persistent", "1" if self.use_persistent_data else "0", "--mgpu-mode", self.job_mode, ] if (self.scene.render.engine == "CYCLES") and (not w.is_cpu): if w.gpu_bus: cmd += ["--mgpu-gpu-bus", w.gpu_bus] if w.gpu_name: cmd += ["--mgpu-gpu-name", w.gpu_name] if (self.scene.render.engine == "CYCLES") and (not w.is_cpu) and launch_fallback: cmd += ["--mgpu-fallback-device", launch_fallback] if getattr(self, "src_blend_dir", None): cmd += ["--src-dir", self.src_blend_dir] if getattr(self, "_enabled_addons_file", None): cmd += ["--mgpu-enabled-addons-file", self._enabled_addons_file] if self.video_mode and self.job_mode == "FRAMES" and self.video_seq_dir: cmd += [ "--mgpu-seq-dir", self.video_seq_dir, "--mgpu-seq-format", self.video_seq_format, "--mgpu-seq-ext", self.video_seq_ext, ] if self._preflight_existing_check_done: cmd += ["--mgpu-prechecked-existing", "1"] # Pass MARI add-on path (so child imports & registers it) if self.job_mode == "MARI" and getattr(self, "_mari_dir", None): cmd += ["--mari-path", self._mari_dir] env = os.environ.copy() # Bind EXACT GPU via UUID if not w.is_cpu and w.gpu_uuid: env["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" env["CUDA_VISIBLE_DEVICES"] = w.gpu_uuid else: env.pop("CUDA_VISIBLE_DEVICES", None) creationflags = subprocess.CREATE_NEW_PROCESS_GROUP if IS_WIN else 0 try: proc = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, creationflags=creationflags ) w.proc = proc w.launch_ok = True w.launch_pid = int(getattr(proc, "pid", -1)) w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("PROCESS_STARTED", w.launch_pid) _win_job_assign(proc) w.stdout_thread = threading.Thread(target=self._pump_stdout, args=(w,), daemon=True) w.stdout_thread.start() self._record_launch_event( w, "LAUNCHED", "PROCESS_STARTED", f"pid={w.launch_pid} dev={'CPU' if w.is_cpu else (w.gpu_uuid or 'no-uuid')} " f"{(' bus=' + w.gpu_bus) if ((not w.is_cpu) and w.gpu_bus) else ''} " f"{(' cli_cycles_device=' + ('CPU' if w.is_cpu else (launch_backend or self.device_mode or 'CUDA'))) if (self.scene.render.engine == 'CYCLES') else ''} " f"backend={(launch_backend or self.device_mode)}{(' fallback=' + launch_fallback) if ((not w.is_cpu) and launch_fallback) else ''} " f"denoise_gpu={'ON' if self.denoise_on_gpu else 'OFF'}" ) return True except Exception as e: w.launch_ok = False w.launch_pid = None reason = _classify_launch_exception(e) self._record_launch_event(w, "FAILED_TO_LAUNCH", reason, str(e)) return False def _emit(self, w: Worker, text: str, count_for_banner: bool = True): try: if w._log_fp: w._log_fp.write(text) w._log_fp.flush() w.last_line = text.rstrip() except Exception: return if not count_for_banner: return try: step = int(_WORKER_BANNER_REPEAT_EVERY_LINES or 0) if step <= 0: return added = int(str(text).count("\n")) if added <= 0 and str(text): added = 1 w._banner_lines_since_repeat = int(getattr(w, "_banner_lines_since_repeat", 0) or 0) + max(0, added) if w._banner_lines_since_repeat < step: return w._banner_lines_since_repeat = 0 banner = "\n" + _WORKER_BANNER_REPEAT_TEXT if w._log_fp: w._log_fp.write(banner) w._log_fp.flush() w.last_line = banner.rstrip() except Exception: pass def _pump_stdout(self, w: Worker): PERCENT_STEP = 5.0 # update when percentage jumps by ≥5 TIME_STEP = 2.0 # every ≥2s try: for raw in w.proc.stdout: line = raw.strip() if not line: continue if (self.scene.render.engine == "CYCLES") and (not w.is_cpu): ll = line.lower() cpu_hint = False if ("'cpu_enabled': true" in ll) or ('"cpu_enabled": true' in ll): cpu_hint = True if ("'scene_device': 'cpu'" in ll) or ('"scene_device": "cpu"' in ll): cpu_hint = True if not cpu_hint and re.search(r"\b(using|use|rendering on|fallback(?:ing)? to)\s+cpu\b", ll): cpu_hint = True if ( (not cpu_hint) and re.search(r"\bdevice\b.{0,24}\bcpu\b", ll) and ("scene_device" not in ll) and ("cpu_enabled" not in ll) ): cpu_hint = True if cpu_hint: w.cycles_cpu_hint_ts = time.time() w.cycles_cpu_hint_line = line[:220] self._emit(w, f"[MGPU-GUARD] {w.tag} CPU device hint: {w.cycles_cpu_hint_line}\n") # Frame lifecycle from child markers ms = _CHILD_START_RE.match(line) if ms: w.cur_frame = int(ms.group(2)) w.cur_path = ms.group(3) w.frame_start_time = time.time() w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("START", w.cur_frame) w._live_line_active = False w._last_pct = -1.0 w._last_emit_time = 0.0 self._emit(w, f"[MGPU-DASH] frame {w.cur_frame} preparing -> {w.cur_path}\n") continue msm = _CHILD_MARI_START_RE.match(line) if msm: action = str(msm.group(2) or "") h = msm.group(3) v = msm.group(4) frame_txt = msm.group(5) target = msm.group(6) label = f"{action} H{h} V{v}" + (f" f{frame_txt}" if frame_txt is not None else "") w.cur_frame = None w.cur_path = target w.frame_start_time = time.time() w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("MARI_START", label) w._live_line_active = False w._last_pct = -1.0 w._last_emit_time = 0.0 self._emit(w, f"[MGPU-DASH] {label} preparing -> {target}\n") continue mf = _CHILD_FIN_RE.match(line) if mf: frame = int(mf.group(2)) elapsed = float(mf.group(3)) w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("FIN", frame) s_cur, s_tot = w._last_samples bar = _progress_bar(100.0, 20) samples_txt = f" samples {s_tot}/{s_tot}" if (s_tot is not None) else "" prefix = "\x1b[1F\x1b[2K" if w._live_line_active else "" final_line = f"{prefix}[MGPU-PROG] {w.tag} f{frame:>4} 100.0% [{bar}]{samples_txt} {elapsed:.1f}s\n" self._emit(w, final_line) # reset w._live_line_active = False w.cur_frame = None w.cur_path = None w.frame_start_time = 0.0 continue mfm = _CHILD_MARI_FIN_RE.match(line) if mfm: action = str(mfm.group(2) or "") h = mfm.group(3) v = mfm.group(4) frame_txt = mfm.group(5) elapsed = float(mfm.group(6)) target = mfm.group(7) label = f"{action} H{h} V{v}" + (f" f{frame_txt}" if frame_txt is not None else "") w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("MARI_FIN", label) prefix = "\x1b[1F\x1b[2K" if w._live_line_active else "" self._emit(w, f"{prefix}[MGPU-PROG] {w.tag} {label} 100.0% {elapsed:.1f}s -> {target}\n") w._live_line_active = False w.cur_frame = None w.cur_path = None w.frame_start_time = 0.0 continue mm = _CHILD_MISS_RE.match(line) if mm: frame = int(mm.group(2)) elapsed = float(mm.group(3)) w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("MISS", frame) prefix = "\x1b[1F\x1b[2K" if w._live_line_active else "" self._emit(w, f"{prefix}[MGPU-FAIL] {w.tag} f{frame:>4} ({elapsed:.1f}s)\n") w._live_line_active = False w.cur_frame = None w.cur_path = None w.frame_start_time = 0.0 continue mmm = _CHILD_MARI_MISS_RE.match(line) if mmm: action = str(mmm.group(2) or "") h = mmm.group(3) v = mmm.group(4) frame_txt = mmm.group(5) elapsed = float(mmm.group(6)) target = mmm.group(7) label = f"{action} H{h} V{v}" + (f" f{frame_txt}" if frame_txt is not None else "") w.guard_last_progress_ts = time.time() w.guard_last_progress_sig = ("MARI_MISS", label) prefix = "\x1b[1F\x1b[2K" if w._live_line_active else "" self._emit(w, f"{prefix}[MGPU-FAIL] {w.tag} {label} ({elapsed:.1f}s) -> {target}\n") w._live_line_active = False w.cur_frame = None w.cur_path = None w.frame_start_time = 0.0 continue if line.startswith("[MGPU-CHILD]") or "ERROR" in line or "WARNING" in line or "Traceback" in line: self._emit(w, line + "\n") continue if line.startswith("[MGPU-PROJ]"): self._emit(w, line + "\n") continue s_cur, s_tot, t_cur, t_tot = _parse_progress_fields(line) pct = _progress_percent(s_cur, s_tot, t_cur, t_tot) has_progress = any(v is not None for v in (s_cur, s_tot, t_cur, t_tot, pct)) if has_progress: try: info = self.inflight.get(w.tag) if info is not None: info["guard_progress_seen"] = True except Exception: pass now = time.time() progress_sig = (s_cur, s_tot, t_cur, t_tot, (None if pct is None else int(pct))) if progress_sig != w.guard_last_progress_sig: w.guard_last_progress_sig = progress_sig w.guard_last_progress_ts = now if s_cur is not None or s_tot is not None: w._last_samples = (s_cur, s_tot) should_emit = False if pct is not None: if (pct - w._last_pct) >= PERCENT_STEP or (now - w._last_emit_time) >= TIME_STEP: should_emit = True if should_emit and pct is not None: bar = _progress_bar(pct, 20) samples_txt = f" samples {s_cur}/{s_tot}" if (s_cur is not None and s_tot) else "" elapsed = (now - w.frame_start_time) if w.frame_start_time else 0.0 fr = f"f{w.cur_frame:>4}" if (w.cur_frame is not None) else "f --" prefix = "\x1b[1F\x1b[2K" if w._live_line_active else "" out = f"{prefix}[MGPU-PROG] {w.tag} {fr} {pct:5.1f}% [{bar}]{samples_txt} {elapsed:.1f}s\n" self._emit(w, out) w._live_line_active = True w._last_pct = pct w._last_emit_time = now except Exception: pass def prepare_and_spawn(self): self._update_ram_capacity_note() if self._selection_warning: self._open_diag_terminal_if_needed() if not self.pending: msg = "[MGPU-LAUNCH] No worker launch needed; all pending outputs were resolved during preflight." _log(msg) self._diag_write(msg) return launched = 0 for w in self.workers: try: log_file = os.path.join(self.logs_dir, f"{w.tag}.log") w.open_log(log_file) self._write_header(w) if self.open_terms: self._open_terminal_tail(w) ok = self._launch_worker_process(w) if ok: launched += 1 except Exception as e: self._record_launch_event(w, "FAILED_TO_LAUNCH", "PREPARE_OR_OPEN_LOG_FAILED", str(e)) failed = len(self.workers) - launched _log(f"[MGPU-LAUNCH] Spawn summary: planned={len(self.workers)} launched={launched} failed_to_launch={failed}") self._diag_write(f"[MGPU-LAUNCH] Spawn summary: planned={len(self.workers)} launched={launched} failed_to_launch={failed}") if self._ram_cap_note: _log(f"[MGPU-LAUNCH] Note: {self._ram_cap_note}") self._diag_write(f"[MGPU-LAUNCH] Note: {self._ram_cap_note}") self._open_diag_terminal_if_needed() if failed > 0: self._open_diag_terminal_if_needed() if launched <= 0: raise RuntimeError("No workers launched. Check [MGPU-LAUNCH] lines for reasons.") def _print_launch_status_summary(self, title="Status"): _log(f"[MGPU-LAUNCH] {title} summary:") self._diag_write(f"[MGPU-LAUNCH] {title} summary:") for w in self.workers: detail = f" | {w.launch_detail}" if w.launch_detail else "" conn = " hello=yes" if w.hello_received else " hello=no" pid = f" pid={w.launch_pid}" if w.launch_pid is not None else "" line = f"[MGPU-LAUNCH] {w.tag}: {w.launch_state}/{w.launch_reason}{pid}{conn}{detail}" _log(line) self._diag_write(line) def finish(self): self._print_launch_status_summary("Final") self._kill_all() for w in self.workers: try: if getattr(w, "term_proc", None) and (w.term_proc.poll() is None): if IS_WIN: w.term_proc.send_signal(signal.CTRL_BREAK_EVENT) else: w.term_proc.terminate() except Exception: pass for w in self.workers: w.close_log() # Build final video from temp frames (non-MARI mode only). try: if self.job_mode == "FRAMES" and self.video_mode and (not self._skip_video_encode): self._encode_video_from_sequence() elif self.job_mode == "FRAMES" and self.video_mode and self._skip_video_encode: _log(f"[MGPU] Reused existing final video: {self.video_output_path}") except Exception as _e: print(f"[MGPU] Video encode failed: {_e}") # --- NEW: package MARI media if requested --- try: if self.job_mode == "MARI": self._package_mari_zip() except Exception as _e: print(f"[MGPU] ZIP packaging skipped/failed: {_e}") # ------------------------------------------- try: if self.job_mode == "MARI": self._cleanup_mari_temp_dirs() except Exception as _e: print(f"[MGPU] TEMP cleanup skipped/failed: {_e}") self._cleanup_temp() def _package_mari_zip(self): """Create /.zip that contains the entire MARI output folder.""" try: prop = self.scene.mari_props except Exception: return try: save_zip = bool(getattr(prop, "mari_save_media", False)) except Exception: save_zip = False if not save_zip: return try: base = bpy.path.abspath(getattr(prop, "render_settings_filepath", "")) name = getattr(prop, "render_settings_name", "").strip() src_dir = os.path.join(base, name) if not (name and os.path.isdir(src_dir)): print(f"[MGPU] ZIP: source folder missing or invalid: {src_dir}") return # Zip lives one level above the render folder, alongside it. zip_path = os.path.join(base, f"{name}.zip") from zipfile import ZipFile, ZIP_DEFLATED with ZipFile(zip_path, 'w', ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(src_dir): for file in files: full = os.path.join(root, file) # Avoid adding the zip file into itself if it already exists. if os.path.normpath(full) == os.path.normpath(zip_path): continue arc = os.path.relpath(full, start=src_dir) # keep folder name at the top level in the archive zipf.write(full, arcname=os.path.join(name, arc)) print(f"[MGPU] Wrote MARI media ZIP: {zip_path}") except Exception as e: print(f"[MGPU] ZIP packaging failed: {e}") def _collect_video_frames(self): if not self.video_seq_dir: return [] try: ext = (self.video_seq_ext or "").lower() files = [f for f in os.listdir(self.video_seq_dir) if f.lower().endswith(ext)] files.sort() return files except Exception: return [] def _encode_video_from_sequence(self): if not self.video_seq_dir or not self.video_output_path: return frames = self._collect_video_frames() if not frames: raise RuntimeError("No rendered frames found for video encode.") expected = int(getattr(self, "total_frames", 0) or 0) if expected and len(frames) < expected: raise RuntimeError(f"Missing frames ({len(frames)}/{expected}) for video encode.") _log(f"Encoding video from {len(frames)} frames -> {self.video_output_path}") _mgpu_build_video_from_sequence(self.scene, self.video_seq_dir, frames, self.video_output_path) def start(self): if (self.scene.render.engine == 'CYCLES') and (not _cycles_prefs()): raise RuntimeError("Cycles add-on is not enabled. Enable it in Preferences > Add-ons, or switch render engine to Eevee.") if self.job_mode == "FRAMES": if not self.frames: raise RuntimeError("No frames to render (check frame start/end).") else: if not self.pending: raise RuntimeError("No MARI camera jobs to render (job list empty).") self.prepare_blend_copy() # If MARI ANIM, expand camera jobs into per-frame jobs unless we're rendering video containers. # Explicit per-frame jobs supplied by MARI are preserved as-is so partial resumes can start immediately. expand_frames = ( self.job_mode == "MARI" and self.mari_globals.get("action") == "ANIM" and not self.mari_globals.get("is_video", False) ) if expand_frames: fstart, fend, fstep = self.scene.frame_start, self.scene.frame_end, max(1, self.scene.frame_step) expanded = [] for j in self.mari_jobs: try: existing_frame = int(j.get("frame", -1)) except Exception: existing_frame = -1 if existing_frame >= 0: expanded.append(dict(j)) continue for f in range(fstart, fend + 1, fstep): jj = dict(j) jj["frame"] = int(f) expanded.append(jj) self.pending = expanded self.total_frames = len(expanded) self._preflight_existing_outputs() self._start_server() self.prepare_and_spawn() def stop(self): self._print_launch_status_summary("Stop") self.cancelled = True self._kill_all() for w in self.workers: try: if getattr(w, "term_proc", None) and (w.term_proc.poll() is None): if IS_WIN: w.term_proc.send_signal(signal.CTRL_BREAK_EVENT) else: w.term_proc.terminate() except Exception: pass for w in self.workers: w.close_log() self._cleanup_temp() def poll(self) -> bool: with self._lock: self._check_render_time_guard() self._check_inflight_timeouts() self._check_launch_health() for w in self.workers: if w.proc and (w.proc.poll() is not None): try: if w.stdout_thread: w.stdout_thread.join(timeout=0.1) except Exception: pass with self._lock: info = self.inflight.get(w.tag) if info: self._handle_retry(w.tag, info.get("job"), key=info.get("key"), reason="worker-exit", prefer_other=True) return self._shutdown_if_done() def _shutdown_if_done(self) -> bool: all_frames_done = len(self.finished_set) >= getattr(self, "total_frames", 0) procs_alive = any(getattr(w, "alive", lambda: False)() if callable(getattr(w, "alive", None)) else w.alive for w in self.workers) if all_frames_done and not procs_alive: self.finish() return True return False def _kill_all(self): for w in self.workers: if w.proc and (w.proc.poll() is None): try: if IS_WIN: w.proc.send_signal(signal.CTRL_BREAK_EVENT) else: w.proc.terminate() except Exception: pass for w in self.workers: if w.term_proc and (w.term_proc.poll() is None): try: if IS_WIN: w.term_proc.send_signal(signal.CTRL_BREAK_EVENT) else: w.term_proc.terminate() except Exception: pass try: if self._diag_term_proc and (self._diag_term_proc.poll() is None): if IS_WIN: self._diag_term_proc.send_signal(signal.CTRL_BREAK_EVENT) else: self._diag_term_proc.terminate() except Exception: pass def _cleanup_mari_temp_dirs(self): try: prop = self.scene.mari_props except Exception: return try: base = bpy.path.abspath(getattr(prop, "render_settings_filepath", "")) name = getattr(prop, "render_settings_name", "").strip() root = os.path.join(base, name) if not (name and os.path.isdir(root)): return for entry in os.scandir(root): if entry.is_dir() and entry.name.upper().endswith("_TEMP"): shutil.rmtree(entry.path, ignore_errors=True) except Exception: pass def _cleanup_temp(self): try: if self._server_sock: self._server_sock.close() except Exception: pass try: if self._diag_log_fp: self._diag_log_fp.close() except Exception: pass try: if self.temp_dir and os.path.isdir(self.temp_dir): shutil.rmtree(self.temp_dir, ignore_errors=True) except Exception: pass def _shorten_path(p, maxlen=96): try: p = os.path.normpath(p) except Exception: p = str(p) if len(p) <= maxlen: return p keep = max(12, maxlen // 2 - 3) return p[:keep] + "..." + p[-(maxlen - keep - 3):] # ----------------------- UI / operators ----------------------- class MGPU_RuntimePrefs(bpy.types.PropertyGroup): threads_per_process: bpy.props.IntProperty( name="Threads per Process", min=0, max=256, default=0, description="CPU threads per child Blender (0 = Blender decides)" ) instances_per_gpu: bpy.props.IntProperty( name="Instances per GPU", min=1, max=8, default=1, description="Workers launched per physical GPU" ) dispatch_mode: bpy.props.EnumProperty( name="Dispatch", items=[("DYNAMIC","Dynamic Queue (recommended)",""), ("STRIDE","Stride (round-robin)","")], default="DYNAMIC" ) max_retries: bpy.props.IntProperty( name="Max Retries per Frame", min=0, max=10, default=2 ) render_time_guard_tier: bpy.props.EnumProperty( name="Render-Time Guard", description="Adaptive slow-frame guard behavior: duplicate, then restart unhealthy worker on same GPU", items=[ ("AGGRESSIVE", "Aggressive (default)", "Fast intervention; highest chance to restart slow workers"), ("BALANCED", "Balanced", "Moderate intervention and restart cadence"), ("CONSERVATIVE", "Conservative", "Rare intervention; safer for naturally long frames"), ("OFF", "Off", "Disable adaptive render-time guard"), ], default="AGGRESSIVE" ) open_terminals: bpy.props.BoolProperty( name="Open terminal windows for logs", default=IS_WIN ) use_persistent_data: bpy.props.BoolProperty( name="Use Persistent Data", default=True, description="Keep render caches between frames in workers (uses more RAM)" ) denoise_on_gpu: bpy.props.BoolProperty( name="Denoise on GPU", default=True, description="When enabled, workers try to use Blender's GPU denoiser (OptiX) where applicable" ) use_target_dir_for_video_temp: bpy.props.BoolProperty( name="Temp Frames In Target Dir", default=True, description="Store video temp frames beside the final output file. Disable to use the system temp/AppData folder instead" ) ghost_filter_mode: bpy.props.EnumProperty( name="Ghost filter mode", description="How to build the GPU list (your system works best with STRICT − LEGACY).", items=[ ("STRICT_MINUS_LEGACY", "Strict − Legacy (default)", "Use strict(full) minus legacy(ghost)"), ("LEGACY_MINUS_STRICT", "Legacy − Strict", "Use broad scan then subtract strict"), ("STRICT_ONLY", "Strict only", "Use strict set only"), ("LEGACY_ONLY", "Legacy only", "Use legacy set only"), ], default="STRICT_MINUS_LEGACY" ) class MGPU_OT_render_frames(bpy.types.Operator): bl_idname = "render.multi_gpu_frames" bl_label = "Render (Multi-GPU Frames)" bl_options = {'REGISTER', 'INTERNAL'} _timer = None confirm_message: bpy.props.StringProperty(default="") forced_temp_dir: bpy.props.StringProperty(default="") def _resolve_video_temp_dir(self, context): prefs = getattr(context.window_manager, "mgpu_runtime_prefs", None) use_target_dir = True if prefs is not None: use_target_dir = bool(getattr(prefs, "use_target_dir_for_video_temp", True)) return _mgpu_video_temp_dir_for(context.scene, use_target_dir=use_target_dir) def draw(self, context): layout = self.layout lines = [l for l in (self.confirm_message or "").split("\n") if l.strip()] if not lines: layout.label(text="Overwrite existing output?") return for line in lines: layout.label(text=line) def invoke(self, context, event=None): is_video = _mgpu_is_video(context.scene) temp_dir = self._resolve_video_temp_dir(context) if is_video else None if is_video: self.forced_temp_dir = temp_dir or "" if context.scene.render.use_overwrite: warnings = _mgpu_overwrite_warnings(context.scene, is_video, temp_dir=temp_dir) if warnings: self.confirm_message = "Overwrite existing output?\n" + "\n".join(warnings) return context.window_manager.invoke_confirm(self, event) return self.execute(context) def execute(self, context): global _MANAGER _cleanup_stale_manager() if _MANAGER is not None: self.report({'ERROR'}, "Multi-GPU job already running.") return {'CANCELLED'} is_video = _mgpu_is_video(context.scene) if is_video: if not self.forced_temp_dir: self.forced_temp_dir = self._resolve_video_temp_dir(context) or "" self.report({'INFO'}, "Video output detected. Rendering to a temp image sequence, then encoding.") p = context.window_manager.mgpu_runtime_prefs try: mgr = MultiGPUManager( context.scene, threads=p.threads_per_process, instances_per_gpu=p.instances_per_gpu, dispatch_mode=p.dispatch_mode, max_retries=p.max_retries, open_terms=p.open_terminals, ghost_mode=p.ghost_filter_mode, use_persistent_data=p.use_persistent_data, render_guard_tier=p.render_time_guard_tier, denoise_on_gpu=p.denoise_on_gpu ) if is_video: mgr.video_mode = True mgr.video_output_path = bpy.path.abspath(context.scene.render.filepath) if self.forced_temp_dir: mgr._forced_temp_dir = self.forced_temp_dir mgr.start() except Exception as e: self.report({'ERROR'}, str(e)); return {'CANCELLED'} _MANAGER = mgr wm = context.window_manager self._timer = wm.event_timer_add(0.25, window=context.window) wm.modal_handler_add(self) _log("Multi-GPU frames started.") return {'RUNNING_MODAL'} def modal(self, context, event): global _MANAGER if _MANAGER is None: return {'CANCELLED'} if event and event.type == 'ESC': _MANAGER.stop(); _MANAGER = None try: context.window_manager.event_timer_remove(self._timer) except Exception: pass self.report({'INFO'}, "Multi-GPU cancelled.") return {'CANCELLED'} if event.type == 'TIMER': try: done = _MANAGER.poll() if done: try: context.window_manager.event_timer_remove(self._timer) except Exception: pass _MANAGER = None try: bpy.ops.wm.redraw_timer(type='DRAW_WIN_SWAP', iterations=1) except Exception: pass self.report({'INFO'}, "Multi-GPU render finished.") return {'FINISHED'} except Exception as e: _log(f"Manager error: {e}") _MANAGER.stop(); _MANAGER = None try: context.window_manager.event_timer_remove(self._timer) except Exception: pass self.report({'ERROR'}, str(e)) return {'CANCELLED'} return {'RUNNING_MODAL'} class MGPU_OT_render_mari(bpy.types.Operator): bl_idname = "render.multi_gpu_mari" bl_label = "Render (Multi-Instance MARI)" bl_options = {'REGISTER', 'INTERNAL'} job_json: bpy.props.StringProperty(name="Jobs JSON") # {"jobs":[{"cam_name":..., "H":..,"V":..},...]} mode: bpy.props.EnumProperty(items=[("FRAME","FRAME",""),("CIRCLE","CIRCLE","")], default="FRAME") action: bpy.props.EnumProperty(items=[("STILL","STILL",""),("ANIM","ANIM","")], default="STILL") _timer = None def invoke(self, context, event=None): global _MANAGER _cleanup_stale_manager() if _MANAGER is not None: self.report({'ERROR'}, "Multi-Instance job already running.") return {'CANCELLED'} try: payload = json.loads(self.job_json or "{}") jobs = payload.get("jobs") or [] except Exception as e: self.report({'ERROR'}, f"Bad job_json: {e}") return {'CANCELLED'} if not jobs: self.report({'ERROR'}, "No MARI jobs provided.") return {'CANCELLED'} # Prepare globals sent to children is_video = _mgpu_is_video(context.scene) if self.action == "STILL" and is_video: self.report({'ERROR'}, "Cannot render STILL directly to video. Switch to an image format or use ANIM.") return {'CANCELLED'} mari_prop = getattr(context.scene, "mari_props", None) mari_settings = {} if mari_prop: try: mari_settings = { "frame_ratio": [float(mari_prop.frame_ratio[0]), float(mari_prop.frame_ratio[1])], "frame_dimensions": [float(mari_prop.frame_dimensions[0]), float(mari_prop.frame_dimensions[1])], "frame_center": [float(mari_prop.frame_center[0]), float(mari_prop.frame_center[1]), float(mari_prop.frame_center[2])], "frame_rotation": [float(mari_prop.frame_rotation[0]), float(mari_prop.frame_rotation[1]), float(mari_prop.frame_rotation[2])], "render_settings_filepath": bpy.path.abspath(getattr(mari_prop, "render_settings_filepath", "")), "render_settings_name": str(getattr(mari_prop, "render_settings_name", "") or ""), "render_settings_normalize": bool(getattr(mari_prop, "render_settings_normalize", False)), } except Exception: mari_settings = {} mari_globals = {"mode": "FRAME" if self.mode == "FRAME" else "CIRCLE", "action": self.action, "is_video": is_video, "use_overwrite": bool(getattr(context.scene.render, "use_overwrite", True)), "use_placeholder": bool(getattr(context.scene.render, "use_placeholder", False)), "render_resolution_x": int(getattr(context.scene.render, "resolution_x", 0)), "render_resolution_y": int(getattr(context.scene.render, "resolution_y", 0)), "render_resolution_percentage": int(getattr(context.scene.render, "resolution_percentage", 100)), "mari_settings": mari_settings} # Pre-export .mari3d and ensure the output folder is prepared exactly like MARI does try: export_type = "FRAME" if self.mode == "FRAME" else "CIRCLE" # --- ensure the MARI output directory exists (mirrors MARI add-on) --- try: mari_prop = context.scene.mari_props base = bpy.path.abspath(getattr(mari_prop, "render_settings_filepath", "")) name = getattr(mari_prop, "render_settings_name", "").strip() target_dir = os.path.join(base, name) if context.scene.render.use_overwrite and os.path.isdir(target_dir): shutil.rmtree(target_dir) os.makedirs(target_dir, exist_ok=True) print(f"[MGPU-PARENT] Ensured MARI output folder exists: {target_dir}") except Exception as _e: self.report({'ERROR'}, f"Could not prepare MARI folder: {target_dir} ({_e})") return {'CANCELLED'} # -------------------------------------------------------------------- bpy.ops.mari.export_mari(action="RENDER", type=export_type, format=self.action) print(f"[MGPU-PARENT] Exported MARI .mari3d (type={export_type}, format={self.action})") except Exception as e: self.report({'ERROR'}, f"MARI export failed: {e}") return {'CANCELLED'} # Use same runtime prefs as frames operator p = context.window_manager.mgpu_runtime_prefs threads = p.threads_per_process ipg = p.instances_per_gpu dispatch = p.dispatch_mode retries = p.max_retries open_terms= p.open_terminals mgr = MultiGPUManager(context.scene, threads=threads, instances_per_gpu=ipg, dispatch_mode=dispatch, max_retries=retries, open_terms=open_terms, use_persistent_data=p.use_persistent_data, job_mode="MARI", mari_jobs=jobs, mari_globals=mari_globals, render_guard_tier=p.render_time_guard_tier, denoise_on_gpu=p.denoise_on_gpu) _MANAGER = mgr mgr.start() # will prepare blend copy, write child script, start server # Replace worker command with MARI mode for w in mgr.workers: # re-launch with --mgpu-mode MARI pass # handled in _launch_worker_process via self.job_mode # Install modal timer for UI progress like frames op self._timer = context.window_manager.event_timer_add(0.3, window=context.window) context.window_manager.modal_handler_add(self) return {'RUNNING_MODAL'} def modal(self, context, event): global _MANAGER if event.type == 'ESC' and getattr(event, "value", 'PRESS') == 'PRESS': _MANAGER.stop(); _MANAGER = None try: context.window_manager.event_timer_remove(self._timer) except Exception: pass self.report({'INFO'}, "Cancelled.") return {'CANCELLED'} if event.type == 'TIMER': if _MANAGER and _MANAGER.poll(): # finished try: context.window_manager.event_timer_remove(self._timer) except Exception: pass _MANAGER = None # Export .mari3d to the same folder the single-instance flow would use try: bpy.ops.mari.export_mari('EXEC_DEFAULT') self.report({'INFO'}, "Completed (exported .mari3d).") except Exception as e: self.report({'WARNING'}, f"Completed (but .mari3d export failed: {e})") return {'FINISHED'} return {'RUNNING_MODAL'} class MGPU_OT_cancel(bpy.types.Operator): bl_idname = "render.multi_gpu_frames_cancel" bl_label = "Cancel Multi-GPU Frames" bl_options = {'INTERNAL'} def execute(self, context): global _MANAGER if _MANAGER: _MANAGER.stop(); _MANAGER = None self.report({'INFO'}, "Multi-GPU job cancelled.") return {'FINISHED'} self.report({'INFO'}, "No Multi-GPU job running.") return {'CANCELLED'} class MGPU_OT_open_logs(bpy.types.Operator): bl_idname = "render.multi_gpu_frames_open_logs" bl_label = "Open Logs Folder" bl_options = {'INTERNAL'} def execute(self, context): global _MANAGER if not _MANAGER or not _MANAGER.logs_dir: self.report({'ERROR'}, "No job/logs available.") return {'CANCELLED'} path = _MANAGER.logs_dir if IS_WIN: os.startfile(path) # noqa elif IS_MAC: subprocess.call(["open", path]) else: subprocess.call(["xdg-open", path]) return {'FINISHED'} class MGPU_PT_panel(bpy.types.Panel): bl_label = "Multi-Instance Render" bl_space_type = 'PROPERTIES' bl_region_type = 'WINDOW' bl_context = "render" def draw(self, context): layout = self.layout p = context.window_manager.mgpu_runtime_prefs col = layout.column(align=True) row = col.row(align=True) row.scale_y = 1.4 row.alert = True row.operator("render.multi_gpu_frames", icon='RENDER_STILL', text="Render (Multi-GPU Frames)") box = layout.box() box.label(text="Scheduler Settings") row = box.row(align=True) row.prop(p, "dispatch_mode") row = box.row(align=True) row.prop(p, "threads_per_process") row.prop(p, "max_retries") row = box.row(align=True) row.prop(p, "render_time_guard_tier") row = box.row(align=True) row.prop(p, "instances_per_gpu") row = box.row(align=True) row.prop(p, "use_persistent_data") row.prop(p, "open_terminals") row = box.row(align=True) row.prop(p, "denoise_on_gpu") row = box.row(align=True) row.prop(p, "use_target_dir_for_video_temp") box.label(text="Launch diagnostics: check console/logs for [MGPU-LAUNCH] reason codes.") if _mgpu_is_video(context.scene): temp_dir = _mgpu_video_temp_dir_for( context.scene, use_target_dir=bool(getattr(p, "use_target_dir_for_video_temp", True)) ) box.label(text=f"Video temp folder: {_shorten_path(temp_dir)}") box2 = layout.box() box2.label(text="GPU Detection") row = box2.row(align=True) row.alert = True row.label(text="Please Find your Correct number/listing of GPUs") row = box2.row(align=True) row.prop(p, "ghost_filter_mode", expand=True) backend = _current_compute_type() legacy = _detect_gpu_devices_legacy(False) strict = _detect_gpu_devices_strict(True) final = _detect_gpu_devices_final_from_lists(p.ghost_filter_mode, legacy, strict) box2.label(text=f"Compute backend: {backend}") box2.label(text="Note: GPU index order follows NVIDIA/nvidia-smi and may differ from Windows Task Manager numbering.") final_note = "" if (not final) and (bpy.context.scene.render.engine == "CYCLES"): final_note = " (no mapped GPU; CPU worker only if Cycles CPU device is enabled)" box2.label(text=f"Legacy:{len(legacy)} Strict:{len(strict)} Final:{len(final)}{final_note}") if len(final) < len(strict): box2.label(text="Note: Final < Strict; ghost filter may be excluding one or more GPUs.") mapped_all = _map_selection_to_uuids(final) mapped, dropped_unknown = _filter_known_mapped_gpus(mapped_all) if dropped_unknown: box2.label(text=f"Hidden unresolved GPU entries: {len(dropped_unknown)} (index '?').") if len(mapped) < len(final): box2.label(text="Note: Mapping lost devices; check [MGPU-GPUSEL] and [MGPU-LAUNCH] logs.") phys = _win_query_nvidia_smi_detailed() or [] if phys: mapped_phys = {m.get("phys_index") for m in mapped if m.get("phys_index") is not None} if len(mapped_phys) < len(phys): phys_idx = {g.get("index") for g in phys if g.get("index") is not None} missing_idx = sorted([i for i in phys_idx if i not in mapped_phys]) box2.label(text=f"Warning: NVIDIA physical GPUs={len(phys)} but mapped={len(mapped_phys)}.") if missing_idx: box2.label(text=f"Unmapped NVIDIA index(es): {', '.join(str(i) for i in missing_idx)}") box2.label(text="Missing GPUs can be backend/type filtered; check [MGPU-GPUSEL] logs.") for m in mapped: label = f" [GPU {m['phys_index'] if m['phys_index'] is not None else '?'}] {m.get('name') or '?'}" c_nm = str(m.get("cycles_name") or "") p_nm = str(m.get("name") or "") if c_nm and _normalize_gpu_name(c_nm) != _normalize_gpu_name(p_nm): label += f" (Cycles row: {c_nm})" box2.label(text=label) est = layout.box() rss = _proc_rss_bytes() avail = _sys_mem_available_bytes() n_workers = (len(mapped) if mapped else 1) * (p.instances_per_gpu if mapped else 1) per_child = None if rss is None else max(int(rss * 0.8), 512 * 1024 * 1024) total_need = None if (per_child is None) else per_child * n_workers msg = f"Instances planned: {n_workers} | Blender RSS: {_fmt_bytes(rss) if rss else '?'}" est.label(text=msg) if total_need is not None and avail is not None: risk = " (high risk of OOM)" if total_need > avail*0.8 else "" est.label(text=f"Estimated RAM needed: {_fmt_bytes(total_need)} | Free: {_fmt_bytes(avail)}{risk}") else: est.label(text="RAM estimate not available on this platform (ok to ignore).") if not _mgpu_has_mari_addon(): ad = layout.box() ad.label(text="Render & Share Holographic 3D Images!!") row = ad.row(align=True) row.scale_y = 1.3 row.alert = True row.operator("wm.url_open", text="holomari.com", icon='URL').url = "https://holomari.com/info/index" # ----------------------- registration ----------------------- class MGPU_RuntimePrefsReg(bpy.types.AddonPreferences): bl_idname = ADDON_KEY def draw(self, context): self.layout.label(text="Use the panel in Render Properties â–¸ Multi-GPU Frames.") def _add_keymap(): """Bind our operator to Ctrl+F12 (animation), and remove any old F12 binding we created.""" kc = bpy.context.window_manager.keyconfigs.addon if not kc: return for km_name in ("Screen", "Window"): km = kc.keymaps.get(km_name) if not km: continue for kmi in list(km.keymap_items): if kmi.idname == "render.multi_gpu_frames" and kmi.type == 'F12' and not kmi.ctrl: try: km.keymap_items.remove(kmi) except Exception: pass km = kc.keymaps.new(name="Screen", space_type="EMPTY", region_type='WINDOW') kmi = km.keymap_items.new("render.multi_gpu_frames", 'F12', 'PRESS', ctrl=True) _KM_ITEMS.append((km, kmi)) def _remove_keymap(): """Remove only the keymap items we added during this session.""" for km, kmi in _KM_ITEMS: try: km.keymap_items.remove(kmi) except Exception: pass _KM_ITEMS.clear() def register(): bpy.utils.register_class(MGPU_RuntimePrefs) bpy.utils.register_class(MGPU_RuntimePrefsReg) bpy.utils.register_class(MGPU_OT_render_frames) bpy.utils.register_class(MGPU_OT_cancel) bpy.utils.register_class(MGPU_OT_open_logs) bpy.utils.register_class(MGPU_PT_panel) bpy.utils.register_class(MGPU_OT_render_mari) bpy.types.WindowManager.mgpu_runtime_prefs = bpy.props.PointerProperty(type=MGPU_RuntimePrefs) _add_keymap() def unregister(): _remove_keymap() try: del bpy.types.WindowManager.mgpu_runtime_prefs except Exception: pass for cls in [ MGPU_PT_panel, MGPU_OT_open_logs, MGPU_OT_cancel, MGPU_OT_render_frames, MGPU_OT_render_mari, # <-- add this MGPU_RuntimePrefsReg, MGPU_RuntimePrefs, ]: try: bpy.utils.unregister_class(cls) except Exception: pass