integrate zipseglimit

This commit is contained in:
2026-02-23 10:26:21 -07:00
parent 2672b7bb84
commit b173331155
3 changed files with 2100 additions and 61 deletions
File diff suppressed because one or more lines are too long
+2 -1
View File
@@ -3,5 +3,6 @@
"structDir": "A:\\1 Amazon_Active_Projects\\3 ProjectStructure", "structDir": "A:\\1 Amazon_Active_Projects\\3 ProjectStructure",
"zipper": "7z", "zipper": "7z",
"compression": 0, "compression": 0,
"Max7zInst": 0 "Max7zInst": 0,
"zipsegLimit": "2G"
} }
+157 -58
View File
@@ -57,6 +57,7 @@ DEFAULT_CONFIG = {
"compressionMethod": "LZMA2", # Compression method: LZMA2 (multi-threaded), PPMd (single-threaded), BZip2, Deflate "compressionMethod": "LZMA2", # Compression method: LZMA2 (multi-threaded), PPMd (single-threaded), BZip2, Deflate
"dailyFormat": "daily_YYMMDD", "dailyFormat": "daily_YYMMDD",
"Max7zInst": 0, # Maximum concurrent 7z instances (0 = auto-calculate) "Max7zInst": 0, # Maximum concurrent 7z instances (0 = auto-calculate)
"zipsegLimit": "2G", # Max bytes per 7z segment (e.g. "2G", "2GB"); 0 or omit = no segmenting
} }
@@ -145,6 +146,42 @@ if ZIPPER_TYPE == "7z":
SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za") SEVEN_Z_EXE = shutil.which("7z") or shutil.which("7za")
def _parse_zipseg_limit(value: str | int | float | None) -> int | None:
"""Parse zipsegLimit config to bytes. Returns None if 0 or disabled."""
if value is None:
return None
if isinstance(value, (int, float)):
v = int(value)
return None if v <= 0 else v
s = str(value).strip().upper().rstrip("B")
if not s or s == "0":
return None
num_str = ""
for c in s:
if c in "0123456789.":
num_str += c
else:
break
unit = s[len(num_str):] or "B"
try:
num = float(num_str) if "." in num_str else int(num_str)
except ValueError:
return None
if num <= 0:
return None
mult = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3}
factor = mult.get(unit, 1)
return int(num * factor)
ZIPSEG_LIMIT_BYTES: int | None = None
if ZIPPER_TYPE == "7z":
raw = CONFIG.get("zipsegLimit", "2G")
ZIPSEG_LIMIT_BYTES = _parse_zipseg_limit(raw)
if ZIPSEG_LIMIT_BYTES is not None and ZIPSEG_LIMIT_BYTES < 100 * 1024 * 1024:
ZIPSEG_LIMIT_BYTES = 100 * 1024 * 1024 # min 100MB
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.") parser = argparse.ArgumentParser(description="Sync render sequences with zipped archives.")
parser.add_argument( parser.add_argument(
@@ -658,10 +695,16 @@ def archive_path_for(seq_dir: Path) -> Path:
return ARCHIVE_ROOT / f"{rel}{suffix}" return ARCHIVE_ROOT / f"{rel}{suffix}"
def base_archive_path(zip_path: Path) -> Path:
"""Return the base archive path (no volume suffix). path.7z.001 -> path.7z; path.7z -> path.7z."""
if zip_path.suffix and zip_path.suffix[1:].isdigit() and zip_path.stem.endswith(".7z"):
return zip_path.parent / zip_path.stem
return zip_path
def sequence_dir_for(zip_path: Path) -> Path: def sequence_dir_for(zip_path: Path) -> Path:
rel = zip_path.relative_to(ARCHIVE_ROOT) base = base_archive_path(zip_path)
# Remove the archive suffix (.7z or .zip) from the end rel = base.relative_to(ARCHIVE_ROOT)
# Handle both .7z and .zip extensions
rel_str = str(rel) rel_str = str(rel)
if rel_str.endswith(".7z"): if rel_str.endswith(".7z"):
rel_str = rel_str[:-3] rel_str = rel_str[:-3]
@@ -671,7 +714,16 @@ def sequence_dir_for(zip_path: Path) -> Path:
def state_path_for(zip_path: Path) -> Path: def state_path_for(zip_path: Path) -> Path:
return zip_path.with_suffix(zip_path.suffix + STATE_SUFFIX) base = base_archive_path(zip_path)
return base.with_suffix(base.suffix + STATE_SUFFIX)
def is_archive_present(zip_path: Path) -> bool:
"""True if a single-file archive or the first segment exists."""
if zip_path.exists():
return True
first_segment = zip_path.parent / (zip_path.name + ".001")
return first_segment.exists()
def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None, worker_count: int = 1, *, verbose: bool = False) -> None: def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None = None, worker_count: int = 1, *, verbose: bool = False) -> None:
@@ -692,6 +744,12 @@ def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None
old_state_path = state_path_for(old_zip_path) old_state_path = state_path_for(old_zip_path)
if old_state_path.exists(): if old_state_path.exists():
old_state_path.unlink(missing_ok=True) old_state_path.unlink(missing_ok=True)
# Remove existing single-file archive and any segments so we don't leave stale files
if zip_path.exists():
zip_path.unlink()
for seg in zip_path.parent.glob(zip_path.name + ".*"):
if seg.suffix[1:].isdigit():
seg.unlink(missing_ok=True)
# Build list of files to archive with relative paths # Build list of files to archive with relative paths
# Sort files to ensure consistent archive ordering (matches compute_state) # Sort files to ensure consistent archive ordering (matches compute_state)
@@ -735,6 +793,17 @@ def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None
f"-mx={COMPRESSION_LEVEL}", f"-mx={COMPRESSION_LEVEL}",
"-t7z", # Use 7z format, not zip "-t7z", # Use 7z format, not zip
] ]
if ZIPSEG_LIMIT_BYTES is not None:
# Segment size for 7z: -v2g etc.
if ZIPSEG_LIMIT_BYTES >= 1024**3:
v_str = f"{ZIPSEG_LIMIT_BYTES // 1024**3}g"
elif ZIPSEG_LIMIT_BYTES >= 1024**2:
v_str = f"{ZIPSEG_LIMIT_BYTES // 1024**2}m"
elif ZIPSEG_LIMIT_BYTES >= 1024:
v_str = f"{ZIPSEG_LIMIT_BYTES // 1024}k"
else:
v_str = f"{ZIPSEG_LIMIT_BYTES}b"
cmd.append(f"-v{v_str}")
# Set compression method and memory/dictionary size based on method # Set compression method and memory/dictionary size based on method
# At compression level 0, use Copy (store) method for maximum speed # At compression level 0, use Copy (store) method for maximum speed
@@ -805,11 +874,28 @@ def zip_sequence(seq_dir: Path, zip_path: Path, per_job_memory_limit: int | None
error_msg += f"\nstdout: {result.stdout.strip()}" error_msg += f"\nstdout: {result.stdout.strip()}"
raise RuntimeError(f"7z compression failed: {error_msg}") raise RuntimeError(f"7z compression failed: {error_msg}")
# Move temp zip to final location, replacing any existing file # Move temp archive to final location
if zip_path.exists(): if ZIPSEG_LIMIT_BYTES is not None:
zip_path.unlink() segments = sorted(
temp_zip.replace(zip_path) s for s in temp_zip.parent.glob(temp_zip.name + ".*")
temp_zip = None # Mark as moved so we don't delete it if s.suffix[1:].isdigit()
)
if len(segments) == 1:
# Single segment: keep legacy naming (seqname.7z) for consistency
if zip_path.exists():
zip_path.unlink()
segments[0].replace(zip_path)
else:
# Multiple segments: use .7z.001, .7z.002, ...
for seg in segments:
dest = zip_path.parent / (zip_path.name + seg.suffix)
seg.replace(dest)
temp_zip = None
else:
if zip_path.exists():
zip_path.unlink()
temp_zip.replace(zip_path)
temp_zip = None # Mark as moved so we don't delete it
finally: finally:
# Clean up temp zip if it wasn't moved # Clean up temp zip if it wasn't moved
if temp_zip and temp_zip.exists(): if temp_zip and temp_zip.exists():
@@ -874,12 +960,21 @@ def expand_sequence(zip_path: Path, seq_state: dict, *, verbose: bool = False) -
"7z extraction requested but 7z executable not found in PATH. " "7z extraction requested but 7z executable not found in PATH. "
"Please install 7z or set zipper to 'zip' in config.json" "Please install 7z or set zipper to 'zip' in config.json"
) )
# Use meta.json "segmented" so we use single .7z or first segment .7z.001
segmented = seq_state.get("segmented", False)
if segmented:
extract_path = zip_path.parent / (zip_path.name + ".001")
elif zip_path.exists():
extract_path = zip_path
else:
first_seg = zip_path.parent / (zip_path.name + ".001")
extract_path = first_seg if first_seg.exists() else zip_path # backward compat: old segmented without flag
cmd = [ cmd = [
SEVEN_Z_EXE, SEVEN_Z_EXE,
"x", "x",
"-y", "-y",
"-mtc=on", # Preserve timestamps during extraction "-mtc=on", # Preserve timestamps during extraction
str(zip_path), str(extract_path),
f"-o{target_dir}", f"-o{target_dir}",
] ]
result = subprocess.run( result = subprocess.run(
@@ -973,7 +1068,10 @@ def expand_sequence(zip_path: Path, seq_state: dict, *, verbose: bool = False) -
def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, worker_count: int, *, verbose: bool) -> Sequence[Path]: def process_zip(seq_dir: Path, zip_path: Path, state_path: Path, seq_state: dict, per_job_memory_limit: int | None, worker_count: int, *, verbose: bool) -> Sequence[Path]:
log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose) log("zip", f"{seq_dir} -> {zip_path}", verbose_only=True, verbose=verbose)
zip_sequence(seq_dir, zip_path, per_job_memory_limit, worker_count, verbose=verbose) zip_sequence(seq_dir, zip_path, per_job_memory_limit, worker_count, verbose=verbose)
state_path.write_text(json.dumps(seq_state, indent=2)) # Record whether archive is single file (seqname.7z) or segmented (seqname.7z.001, ...) for expand
segmented = not zip_path.exists() and (zip_path.parent / (zip_path.name + ".001")).exists()
out_state = {**seq_state, "segmented": segmented}
state_path.write_text(json.dumps(out_state, indent=2))
return (zip_path, state_path) return (zip_path, state_path)
@@ -1007,7 +1105,7 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
# Debug: log if stored state is missing # Debug: log if stored state is missing
if stored_state is None: if stored_state is None:
if zip_path.exists(): if is_archive_present(zip_path):
if verbose: if verbose:
log("scan", f"Warning: {rel} archive exists but no stored state file found at {state_path}", verbose_only=True, verbose=verbose) log("scan", f"Warning: {rel} archive exists but no stored state file found at {state_path}", verbose_only=True, verbose=verbose)
else: else:
@@ -1023,11 +1121,11 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
old_state_path = state_path_for(old_zip_path) old_state_path = state_path_for(old_zip_path)
old_stored_state = load_state(old_state_path) old_stored_state = load_state(old_state_path)
# If old .zip exists and .7z doesn't, use old .zip's state for comparison # If old .zip exists and .7z doesn't, use old .zip's state for comparison
if not zip_path.exists() and old_stored_state is not None: if not is_archive_present(zip_path) and old_stored_state is not None:
stored_state = old_stored_state stored_state = old_stored_state
# If .7z archive exists and we have stored state, do quick check before computing full state # If .7z archive exists and we have stored state, do quick check before computing full state
if zip_path.exists() and stored_state is not None: if is_archive_present(zip_path) and stored_state is not None:
# Quick check: if directory mtime is older than archive, likely unchanged # Quick check: if directory mtime is older than archive, likely unchanged
# But first verify that all files in stored state still exist (catches deletions) # But first verify that all files in stored state still exist (catches deletions)
try: try:
@@ -1045,7 +1143,8 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
# Only do mtime check if all stored files still exist # Only do mtime check if all stored files still exist
if stored_files_exist: if stored_files_exist:
dir_mtime = seq_dir.stat().st_mtime_ns dir_mtime = seq_dir.stat().st_mtime_ns
archive_mtime = zip_path.stat().st_mtime_ns archive_file = zip_path if zip_path.exists() else (zip_path.parent / (zip_path.name + ".001"))
archive_mtime = archive_file.stat().st_mtime_ns
# If directory wasn't modified since archive was created, skip state computation # If directory wasn't modified since archive was created, skip state computation
if dir_mtime <= archive_mtime: if dir_mtime <= archive_mtime:
quick_skipped += 1 quick_skipped += 1
@@ -1086,8 +1185,8 @@ def run_zip(requested_workers: int | None, *, verbose: bool) -> int:
state_skipped += 1 state_skipped += 1
if state_skipped <= 5: if state_skipped <= 5:
log("scan", f"{rel} metadata unchanged; archive up to date") log("scan", f"{rel} metadata unchanged; archive up to date")
if zip_path.exists(): if is_archive_present(zip_path):
# .7z exists and is up to date, clean up old .zip if it exists # .7z (or segments) exist and are up to date, clean up old .zip if it exists
if old_zip_path and old_zip_path.exists(): if old_zip_path and old_zip_path.exists():
old_zip_path.unlink(missing_ok=True) old_zip_path.unlink(missing_ok=True)
old_state_path = state_path_for(old_zip_path) old_state_path = state_path_for(old_zip_path)
@@ -1177,22 +1276,25 @@ def run_expand(worker_count: int, *, verbose: bool) -> int:
return 0 return 0
work_items: list[tuple[Path, dict]] = [] work_items: list[tuple[Path, dict]] = []
seen_bases: set[Path] = set()
# Look for both .zip and .7z archives # Look for .zip, .7z, and .7z.001 (segmented) so each logical archive is processed once
archive_patterns = ["*.zip", "*.7z"] archive_patterns = ["*.zip", "*.7z", "*.7z.001"]
for pattern in archive_patterns: for pattern in archive_patterns:
for zip_path in ARCHIVE_ROOT.rglob(pattern): for zip_path in ARCHIVE_ROOT.rglob(pattern):
state_path = state_path_for(zip_path) base = base_archive_path(zip_path)
if base in seen_bases:
continue
state_path = state_path_for(base)
seq_state = load_state(state_path) seq_state = load_state(state_path)
if seq_state is None: if seq_state is None:
log("expand", f"Skipping {zip_path} (missing metadata)") log("expand", f"Skipping {base} (missing metadata)")
continue continue
target_dir = sequence_dir_for(base)
target_dir = sequence_dir_for(zip_path)
if current_state(target_dir) == seq_state: if current_state(target_dir) == seq_state:
continue continue
seen_bases.add(base)
work_items.append((zip_path, seq_state)) work_items.append((base, seq_state))
if not work_items: if not work_items:
log("expand", "Working folders already match archives; nothing to expand.") log("expand", "Working folders already match archives; nothing to expand.")
@@ -1224,53 +1326,45 @@ def cleanup_orphan_archives(*, verbose: bool) -> int:
return 0 return 0
removed: list[Path] = [] removed: list[Path] = []
seen_bases: set[Path] = set()
log("zip", f"Scanning for orphan archives in {ARCHIVE_ROOT.resolve()}", verbose_only=True, verbose=verbose) log("zip", f"Scanning for orphan archives in {ARCHIVE_ROOT.resolve()}", verbose_only=True, verbose=verbose)
# Look for both .zip and .7z archives # Look for .zip, .7z, and .7z.001 (segmented) so each logical archive is processed once by base path
archive_patterns = ["*.zip", "*.7z"] archive_patterns = ["*.zip", "*.7z", "*.7z.001"]
for pattern in archive_patterns: for pattern in archive_patterns:
try: try:
for zip_path in ARCHIVE_ROOT.rglob(pattern): for zip_path in ARCHIVE_ROOT.rglob(pattern):
try: try:
# Resolve to absolute paths for consistent checking base = base_archive_path(zip_path)
zip_path_abs = zip_path.resolve() base_resolved = base.resolve()
if base_resolved in seen_bases:
# Calculate state path BEFORE checking/removing archive
state_path = state_path_for(zip_path)
state_path_abs = state_path.resolve()
# Calculate sequence directory using sequence_dir_for
# This function works with paths relative to ARCHIVE_ROOT
seq_dir = sequence_dir_for(zip_path)
seq_dir_abs = seq_dir.resolve()
# Check if sequence directory exists and is actually a directory
if seq_dir_abs.exists() and seq_dir_abs.is_dir():
log("zip", f"Archive {zip_path.relative_to(ARCHIVE_ROOT)} has matching sequence directory; keeping", verbose_only=True, verbose=verbose)
continue continue
seen_bases.add(base_resolved)
# Sequence directory doesn't exist - this is an orphan archive state_path = state_path_for(base)
rel = zip_path.relative_to(ARCHIVE_ROOT) state_path_abs = state_path.resolve()
seq_dir = sequence_dir_for(base)
seq_dir_abs = seq_dir.resolve()
if seq_dir_abs.exists() and seq_dir_abs.is_dir():
log("zip", f"Archive {base.relative_to(ARCHIVE_ROOT)} has matching sequence directory; keeping", verbose_only=True, verbose=verbose)
continue
rel = base.relative_to(ARCHIVE_ROOT)
log("zip", f"Removing orphan archive {rel}", verbose_only=False, verbose=verbose) log("zip", f"Removing orphan archive {rel}", verbose_only=False, verbose=verbose)
if base.exists():
# Remove archive file base.unlink()
if zip_path_abs.exists():
zip_path_abs.unlink()
log("zip", f"Deleted archive: {rel}", verbose_only=True, verbose=verbose) log("zip", f"Deleted archive: {rel}", verbose_only=True, verbose=verbose)
for seg in base.parent.glob(base.name + ".*"):
# Remove state file if it exists if seg.suffix[1:].isdigit():
seg.unlink(missing_ok=True)
log("zip", f"Deleted segment: {seg.relative_to(ARCHIVE_ROOT)}", verbose_only=True, verbose=verbose)
if state_path_abs.exists(): if state_path_abs.exists():
state_path_abs.unlink() state_path_abs.unlink()
state_rel = state_path.relative_to(ARCHIVE_ROOT) log("zip", f"Removed orphan metadata {state_path.relative_to(ARCHIVE_ROOT)}", verbose_only=False, verbose=verbose)
log("zip", f"Removed orphan metadata {state_rel}", verbose_only=False, verbose=verbose) removed.append(base.resolve())
removed.append(zip_path_abs)
except Exception as e: except Exception as e:
# Log error but continue processing other archives
try: try:
rel = zip_path.relative_to(ARCHIVE_ROOT) rel = zip_path.relative_to(ARCHIVE_ROOT)
except: except Exception:
rel = zip_path rel = zip_path
log("zip", f"Error processing archive {rel}: {e}", verbose_only=True, verbose=verbose) log("zip", f"Error processing archive {rel}: {e}", verbose_only=True, verbose=verbose)
log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose) log("zip", f"Traceback: {traceback.format_exc()}", verbose_only=True, verbose=verbose)
@@ -1309,7 +1403,12 @@ def main() -> int:
if ZIPPER_TYPE == "7z": if ZIPPER_TYPE == "7z":
exe = SEVEN_Z_EXE or "not found" exe = SEVEN_Z_EXE or "not found"
max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto" max_inst = MAX_7Z_INSTANCES if MAX_7Z_INSTANCES is not None else "auto"
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}, method: {COMPRESSION_METHOD}, level: {COMPRESSION_LEVEL}") if ZIPSEG_LIMIT_BYTES:
zipseg_gb = ZIPSEG_LIMIT_BYTES / (1024**3)
zipseg = f", zipsegLimit: {zipseg_gb:.1f}GB" if zipseg_gb >= 1 else f", zipsegLimit: {ZIPSEG_LIMIT_BYTES // (1024**2)}MB"
else:
zipseg = ""
log("init", f"7z executable: {exe}, Max7zInst: {max_inst}, method: {COMPRESSION_METHOD}, level: {COMPRESSION_LEVEL}{zipseg}")
if args.mode == "expand": if args.mode == "expand":
# For expand mode, use simple CPU-based worker calculation # For expand mode, use simple CPU-based worker calculation