432 lines
12 KiB
Python
432 lines
12 KiB
Python
# SPDX-FileCopyrightText: 2021 Blender Studio Tools Authors
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import json
|
|
import math
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Set, Union, Optional, List, Dict, Any, Tuple
|
|
|
|
import bpy
|
|
|
|
from . import vars, checksqe, util
|
|
from .. import prefs, cache
|
|
from ..sqe import opsdata as sqe_opsdata
|
|
from .exception import NoImageStripAvailableException
|
|
|
|
from ..logger import LoggerFactory
|
|
|
|
logger = LoggerFactory.getLogger()
|
|
|
|
|
|
copytree_list: List[Path] = []
|
|
copytree_num_of_items: int = 0
|
|
copytree_prev_printed_len: int = 0
|
|
|
|
|
|
def copytree_verbose(src: Union[str, Path], dest: Union[str, Path], **kwargs):
|
|
_copytree_init_progress_update(Path(src))
|
|
shutil.copytree(src, dest, copy_function=_copy2_tree_progress, **kwargs)
|
|
_copytree_clear_progress_update()
|
|
|
|
|
|
def _copytree_init_progress_update(source_dir: Path):
|
|
global copytree_num_of_items
|
|
file_list = [f for f in source_dir.glob("**/*") if f.is_file()]
|
|
copytree_num_of_items = len(file_list)
|
|
|
|
|
|
def _copy2_tree_progress(src, dst):
|
|
"""
|
|
Function that can be used for copy_function
|
|
argument on shutil.copytree function.
|
|
Logs every item that is currently copied.
|
|
"""
|
|
global copytree_num_of_items
|
|
global copytree_list
|
|
global copytree_prev_printed_len
|
|
|
|
term_col_len = shutil.get_terminal_size(fallback=(0, 0))[0]
|
|
if term_col_len != 0:
|
|
delete_lines = math.ceil(copytree_prev_printed_len / term_col_len)
|
|
else:
|
|
delete_lines = 0
|
|
|
|
copytree_list.append(Path(src))
|
|
progress = round((len(copytree_list) * 100) / copytree_num_of_items)
|
|
delete_prev_line = "\033[1A\x1b[2K"
|
|
message = "Copying %s (%i%%)" % (src, progress)
|
|
logger.warn(delete_lines * delete_prev_line + message)
|
|
shutil.copy2(src, dst)
|
|
|
|
copytree_prev_printed_len = len(message)
|
|
|
|
|
|
def _copytree_clear_progress_update():
|
|
global copytree_num_of_items
|
|
global copytree_list
|
|
global copytree_prev_printed_len
|
|
|
|
copytree_num_of_items = 0
|
|
copytree_list.clear()
|
|
copytree_prev_printed_len = 0
|
|
|
|
|
|
def get_valid_cs_sequences(
|
|
context: bpy.types.Context, sequence_list: List[bpy.types.Strip] = []
|
|
) -> List[bpy.types.Strip]:
|
|
|
|
sequences: List[bpy.types.Strip] = []
|
|
|
|
if sequence_list:
|
|
sequences = sequence_list
|
|
else:
|
|
sequences = context.selected_sequences or context.scene.sequence_editor.sequences_all
|
|
|
|
if cache.project_active_get():
|
|
|
|
valid_sequences = [
|
|
s
|
|
for s in sequences
|
|
if s.type in ["MOVIE", "IMAGE"] and not s.mute and not s.kitsu.initialized
|
|
]
|
|
else:
|
|
valid_sequences = [s for s in sequences if s.type in ["MOVIE", "IMAGE"] and not s.mute]
|
|
|
|
return valid_sequences
|
|
|
|
|
|
def get_frames_root_dir(strip: bpy.types.Strip) -> Path:
|
|
# sf = shot_frames | fo = farm_output.
|
|
addon_prefs = prefs.addon_prefs_get(bpy.context)
|
|
fo_dir = get_strip_folder(strip)
|
|
sf_dir = addon_prefs.frames_root_dir / fo_dir.parent.relative_to(fo_dir.parents[3])
|
|
|
|
return sf_dir
|
|
|
|
|
|
def get_strip_folder(strip: bpy.types.Strip) -> Path:
|
|
if hasattr(strip, 'directory'):
|
|
return Path(strip.directory)
|
|
else:
|
|
return Path(strip.filepath).parent
|
|
|
|
|
|
def get_shot_previews_path(strip: bpy.types.Strip) -> Path:
|
|
# Fo > farm_output.
|
|
addon_prefs = prefs.addon_prefs_get(bpy.context)
|
|
fo_dir = get_strip_folder(strip)
|
|
shot_previews_dir = addon_prefs.shot_playblast_root_dir / fo_dir.parent.relative_to(
|
|
fo_dir.parents[3]
|
|
)
|
|
|
|
return shot_previews_dir
|
|
|
|
|
|
def get_shot_dot_task_type(path: Path):
|
|
return path.parent.name
|
|
|
|
|
|
def get_farm_output_mp4_path(strip: bpy.types.Strip) -> Path:
|
|
render_dir = get_strip_folder(strip)
|
|
return get_farm_output_mp4_path_from_folder(render_dir)
|
|
|
|
|
|
def get_farm_output_mp4_path_from_folder(render_dir: str) -> Path:
|
|
render_dir = Path(render_dir)
|
|
shot_name = render_dir.parent.name
|
|
|
|
# 070_0040_A.lighting-101-136.mp4 #farm always does .lighting not .comp
|
|
# because flamenco writes in and out frame in filename we need check the first and
|
|
# last frame in the folder
|
|
preview_seq = get_best_preview_sequence(render_dir)
|
|
|
|
mp4_filename = f"{shot_name}-{int(preview_seq[0].stem)}-{int(preview_seq[-1].stem)}.mp4"
|
|
|
|
return render_dir / mp4_filename
|
|
|
|
|
|
def get_best_preview_sequence(dir: Path) -> List[Path]:
|
|
|
|
files: List[List[Path]] = gather_files_by_suffix(
|
|
dir, output=dict, search_suffixes=[".jpg", ".png"]
|
|
)
|
|
if not files:
|
|
raise NoImageStripAvailableException(f"No preview files found in: {dir.as_posix()}")
|
|
|
|
# Select the right images sequence.
|
|
if len(files) == 1:
|
|
# If only one image sequence available take that.
|
|
preview_seq = files[list(files.keys())[0]]
|
|
|
|
# Both jpg and png available.
|
|
else:
|
|
# If same amount of frames take png.
|
|
if len(files[".jpg"]) == len(files[".png"]):
|
|
preview_seq = files[".png"]
|
|
else:
|
|
# If not, take whichever is longest.
|
|
preview_seq = [files[".jpg"], files[".png"]].sort(key=lambda x: len(x))[-1]
|
|
|
|
return preview_seq
|
|
|
|
|
|
def get_shot_frames_backup_path(strip: bpy.types.Strip) -> Path:
|
|
fs_dir = get_frames_root_dir(strip)
|
|
return fs_dir.parent / f"_backup.{fs_dir.name}"
|
|
|
|
|
|
def get_shot_frames_metadata_path(strip: bpy.types.Strip) -> Path:
|
|
fs_dir = get_frames_root_dir(strip)
|
|
return fs_dir.parent / "metadata.json"
|
|
|
|
|
|
def get_shot_previews_metadata_path(strip: bpy.types.Strip) -> Path:
|
|
fs_dir = get_shot_previews_path(strip)
|
|
return fs_dir / "metadata.json"
|
|
|
|
|
|
def load_json(path: Path) -> Any:
|
|
with open(path.as_posix(), "r") as file:
|
|
obj = json.load(file)
|
|
return obj
|
|
|
|
|
|
def save_to_json(obj: Any, path: Path) -> None:
|
|
with open(path.as_posix(), "w") as file:
|
|
json.dump(obj, file, indent=4)
|
|
|
|
|
|
def update_sequence_statuses(
|
|
context: bpy.types.Context,
|
|
) -> List[bpy.types.Strip]:
|
|
return update_is_approved(context), update_is_pushed_to_edit(context)
|
|
|
|
|
|
def update_is_approved(
|
|
context: bpy.types.Context,
|
|
) -> List[bpy.types.Strip]:
|
|
sequences = [s for s in context.scene.sequence_editor.sequences_all if s.rr.is_render]
|
|
|
|
approved_strips = []
|
|
|
|
for s in sequences:
|
|
metadata_path = get_shot_frames_metadata_path(s)
|
|
if not metadata_path.exists():
|
|
continue
|
|
json_obj = load_json(metadata_path) # TODO: prevent opening same json multi times
|
|
|
|
if Path(json_obj["source_current"]) == get_strip_folder(s):
|
|
s.rr.is_approved = True
|
|
approved_strips.append(s)
|
|
logger.info("Detected approved strip: %s", s.name)
|
|
else:
|
|
s.rr.is_approved = False
|
|
|
|
return approved_strips
|
|
|
|
|
|
def update_is_pushed_to_edit(
|
|
context: bpy.types.Context,
|
|
) -> List[bpy.types.Strip]:
|
|
sequences = [s for s in context.scene.sequence_editor.sequences_all if s.rr.is_render]
|
|
|
|
pushed_strips = []
|
|
|
|
for s in sequences:
|
|
metadata_path = get_shot_previews_metadata_path(s)
|
|
if not metadata_path.exists():
|
|
continue
|
|
|
|
json_obj = load_json(metadata_path)
|
|
|
|
valid_paths = {Path(value).parent for _key, value in json_obj.items()}
|
|
|
|
if get_strip_folder(s) in valid_paths:
|
|
s.rr.is_pushed_to_edit = True
|
|
pushed_strips.append(s)
|
|
logger.info("Detected pushed strip: %s", s.name)
|
|
else:
|
|
s.rr.is_pushed_to_edit = False
|
|
|
|
return pushed_strips
|
|
|
|
|
|
def gather_files_by_suffix(
|
|
dir: Path, output=str, search_suffixes: List[str] = [".jpg", ".png", ".exr"]
|
|
) -> Union[str, List, Dict]:
|
|
"""
|
|
Gathers files in dir that end with an extension in search_suffixes.
|
|
Supported values for output: str, list, dict
|
|
"""
|
|
|
|
files: Dict[str, List[Path]] = {}
|
|
|
|
# Gather files.
|
|
for f in dir.iterdir():
|
|
if not f.is_file():
|
|
continue
|
|
|
|
for suffix in search_suffixes:
|
|
if f.suffix == suffix:
|
|
files.setdefault(suffix, [])
|
|
files[suffix].append(f)
|
|
|
|
# Sort.
|
|
for suffix, file_list in files.items():
|
|
files[suffix] = sorted(file_list, key=lambda f: f.name)
|
|
|
|
# Return.
|
|
if output == str:
|
|
return_str = ""
|
|
for suffix, file_list in files.items():
|
|
return_str += f" | {suffix}: {len(file_list)}"
|
|
|
|
# Replace first occurence, we dont want that at the beginning.
|
|
return_str = return_str.replace(" | ", "", 1)
|
|
|
|
return return_str
|
|
|
|
elif output == dict:
|
|
return files
|
|
|
|
elif output == list:
|
|
output_list = []
|
|
for suffix, file_list in files.items():
|
|
output_list.append(file_list)
|
|
|
|
return output_list
|
|
else:
|
|
raise ValueError(
|
|
f"Supported output types are: str, dict, list. {str(output)} not implemented yet."
|
|
)
|
|
|
|
|
|
def gen_frames_found_text(dir: Path, search_suffixes: List[str] = [".jpg", ".png", ".exr"]) -> str:
|
|
files_dict = gather_files_by_suffix(dir, output=dict, search_suffixes=search_suffixes)
|
|
|
|
frames_found_text = "" # frames found text will be used in ui
|
|
for suffix, file_list in files_dict.items():
|
|
frames_found_text += f" | {suffix}: {len(file_list)}"
|
|
|
|
# Replace first occurence, we dont want that at the beginning.
|
|
frames_found_text = frames_found_text.replace(
|
|
" | ",
|
|
"",
|
|
1,
|
|
)
|
|
return frames_found_text
|
|
|
|
|
|
def is_sequence_dir(dir: Path) -> bool:
|
|
return dir.parent.name == "shots"
|
|
|
|
|
|
def is_shot_dir(dir: Path) -> bool:
|
|
return dir.parent.parent.name == "shots"
|
|
|
|
|
|
def get_shot_name_from_dir(dir: Path) -> str:
|
|
return dir.stem # 060_0010_A.lighting > 060_0010_A
|
|
|
|
|
|
def get_image_editor(context: bpy.types.Context) -> Optional[bpy.types.Area]:
|
|
image_editor = None
|
|
|
|
for area in context.screen.areas:
|
|
if area.type == "IMAGE_EDITOR":
|
|
image_editor = area
|
|
|
|
return image_editor
|
|
|
|
|
|
def get_sqe_editor(context: bpy.types.Context) -> Optional[bpy.types.Area]:
|
|
sqe_editor = None
|
|
|
|
for area in context.screen.areas:
|
|
if area.type == "SEQUENCE_EDITOR":
|
|
sqe_editor = area
|
|
|
|
return sqe_editor
|
|
|
|
|
|
def fit_frame_range_to_strips(
|
|
context: bpy.types.Context, strips: Optional[List[bpy.types.Strip]] = None
|
|
) -> Tuple[int, int]:
|
|
def get_sort_tuple(strip: bpy.types.Strip) -> Tuple[int, int]:
|
|
return (strip.frame_final_start, strip.frame_final_duration)
|
|
|
|
if not strips:
|
|
strips = context.scene.sequence_editor.sequences_all
|
|
|
|
if not strips:
|
|
return (0, 0)
|
|
|
|
strips = list(strips)
|
|
strips.sort(key=get_sort_tuple)
|
|
|
|
context.scene.frame_start = strips[0].frame_final_start
|
|
context.scene.frame_end = strips[-1].frame_final_end - 1
|
|
|
|
return (context.scene.frame_start, context.scene.frame_end)
|
|
|
|
|
|
def get_top_level_valid_strips_continious(
|
|
context: bpy.types.Context,
|
|
) -> List[bpy.types.Strip]:
|
|
|
|
sequences_tmp = get_valid_cs_sequences(
|
|
context, sequence_list=list(context.scene.sequence_editor.sequences_all)
|
|
)
|
|
|
|
sequences_tmp.sort(key=lambda s: (s.channel, s.frame_final_start), reverse=True)
|
|
sequences: List[bpy.types.Strip] = []
|
|
|
|
for strip in sequences_tmp:
|
|
|
|
occ_ranges = checksqe.get_occupied_ranges_for_strips(sequences)
|
|
s_range = range(strip.frame_final_start, strip.frame_final_end + 1)
|
|
|
|
if not checksqe.is_range_occupied(s_range, occ_ranges):
|
|
sequences.append(strip)
|
|
|
|
return sequences
|
|
|
|
|
|
def setup_color_management(context: bpy.types.Context) -> None:
|
|
if context.scene.view_settings.view_transform != 'Standard':
|
|
context.scene.view_settings.view_transform = 'Standard'
|
|
logger.info("Set view transform to: Standard")
|
|
|
|
|
|
def is_active_project() -> bool:
|
|
return bool(cache.project_active_get())
|
|
|
|
|
|
def link_strip_by_name(
|
|
context: bpy.types.Context,
|
|
strip: bpy.types.Strip,
|
|
shot_name: str,
|
|
sequence_name: str,
|
|
) -> None:
|
|
# Get seq and shot.
|
|
active_project = cache.project_active_get()
|
|
seq = active_project.get_sequence_by_name(sequence_name)
|
|
shot = active_project.get_shot_by_name(seq, shot_name)
|
|
|
|
if not shot:
|
|
logger.error("Unable to find shot %s on kitsu", shot_name)
|
|
return
|
|
|
|
sqe_opsdata.link_metadata_strip(context, shot, seq, strip)
|
|
|
|
# Log.
|
|
t = "Linked strip: %s to shot: %s with ID: %s" % (
|
|
strip.name,
|
|
shot.name,
|
|
shot.id,
|
|
)
|
|
logger.info(t)
|
|
util.redraw_ui()
|