Files
blender-portable-repo/extensions/user_default/blenderkit/addon_updater.py
T
2026-03-17 14:30:01 -06:00

1788 lines
64 KiB
Python

# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
# type: ignore
"""
See documentation for usage
https://github.com/CGCookie/blender-addon-updater
"""
__version__ = "1.1.1"
import errno
import fnmatch
import json
import os
import platform
import shutil
import ssl
import threading
import traceback
import urllib
import urllib.request
import zipfile
from datetime import datetime, timedelta
import addon_utils
# Blender imports, used in limited cases.
import bpy
# -----------------------------------------------------------------------------
# The main class
# -----------------------------------------------------------------------------
class SingletonUpdater:
"""Addon updater service class.
This is the singleton class to instance once and then reference where
needed throughout the addon. It implements all the interfaces for running
updates.
"""
def __init__(self):
self._engine = GithubEngine()
self._user = None
self._repo = None
self._website = None
self._current_version = None
self._subfolder_path = None
self._tags = list()
self._tag_latest = None
self._tag_names = list()
self._latest_release = None
self._use_releases = False
self._include_branches = False
self._include_branch_list = ["main"]
self._include_branch_auto_check = False
self._manual_only = False
self._version_min_update = None
self._version_max_update = None
# By default, backup current addon on update/target install.
self._backup_current = True
self._backup_ignore_patterns = None
# Set patterns the files to overwrite during an update.
self._overwrite_patterns = ["*.py", "*.pyc"]
self._remove_pre_update_patterns = list()
# By default, don't auto disable+re-enable the addon after an update,
# as this is less stable/often won't fully reload all modules anyways.
self._auto_reload_post_update = False
# Settings for the frequency of automated background checks.
self._enable_prereleases = False
self._check_interval_enabled = False
self._check_interval_months = 0
self._check_interval_days = 7
self._check_interval_hours = 0
self._check_interval_minutes = 0
# runtime variables, initial conditions
self._verbose = False
self._use_print_traces = True
self._fake_install = False
self._async_checking = False # only true when async daemon started
self._update_ready = None
self._update_link = None
self._update_version = None
self._source_zip = None
self._check_thread = None
self._select_link = None
self.skip_tag = None
# Get data from the running blender module (addon).
self._addon = __package__.lower()
self._addon_package = __package__ # Must not change.
self._updater_path = os.path.join(
os.path.dirname(__file__), self._addon + "_updater"
)
self._addon_root = os.path.dirname(__file__)
self._json = dict()
self._error = None
self._error_msg = None
self._prefiltered_tag_count = 0
# UI properties, not used within this module but still useful to have.
# to verify a valid import, in place of placeholder import
self.show_popups = True # UI uses to show popups or not.
self.invalid_updater = False
# pre-assign basic select-link function
def select_link_function(self, tag):
return tag["zipball_url"]
self._select_link = select_link_function
def print_trace(self):
"""Print handled exception details when use_print_traces is set"""
if self._use_print_traces:
traceback.print_exc()
def print_verbose(self, msg):
"""Print out a verbose logging message if verbose is true."""
if not self._verbose:
return
print("🔄 {}: ".format(self.addon) + msg)
# -------------------------------------------------------------------------
# Getters and setters
# -------------------------------------------------------------------------
@property
def addon(self):
return self._addon
@addon.setter
def addon(self, value):
self._addon = str(value)
@property
def api_url(self):
return self._engine.api_url
@api_url.setter
def api_url(self, value):
if not self.check_is_url(value):
raise ValueError("Not a valid URL: " + value)
self._engine.api_url = value
@property
def async_checking(self):
return self._async_checking
@property
def auto_reload_post_update(self):
return self._auto_reload_post_update
@auto_reload_post_update.setter
def auto_reload_post_update(self, value):
try:
self._auto_reload_post_update = bool(value)
except:
raise ValueError("auto_reload_post_update must be a boolean value")
@property
def backup_current(self):
return self._backup_current
@backup_current.setter
def backup_current(self, value):
if value is None:
self._backup_current = False
else:
self._backup_current = value
@property
def backup_ignore_patterns(self):
return self._backup_ignore_patterns
@backup_ignore_patterns.setter
def backup_ignore_patterns(self, value):
if value is None:
self._backup_ignore_patterns = None
elif not isinstance(value, list):
raise ValueError("Backup pattern must be in list format")
else:
self._backup_ignore_patterns = value
@property
def check_interval(self):
return (
self._enable_prereleases,
self._check_interval_enabled,
self._check_interval_months,
self._check_interval_days,
self._check_interval_hours,
self._check_interval_minutes,
)
@property
def current_version(self):
return self._current_version
@current_version.setter
def current_version(self, tuple_values):
if tuple_values is None:
self._current_version = None
return
elif type(tuple_values) is not tuple:
try:
tuple(tuple_values)
except:
raise ValueError("current_version must be a tuple of integers")
for i in tuple_values:
if type(i) is not int:
raise ValueError("current_version must be a tuple of integers")
self._current_version = tuple(tuple_values)
@property
def engine(self):
return self._engine.name
@engine.setter
def engine(self, value):
engine = value.lower()
if engine == "github":
self._engine = GithubEngine()
elif engine == "gitlab":
self._engine = GitlabEngine()
elif engine == "bitbucket":
self._engine = BitbucketEngine()
else:
raise ValueError("Invalid engine selection")
@property
def error(self):
return self._error
@property
def error_msg(self):
return self._error_msg
@property
def fake_install(self):
return self._fake_install
@fake_install.setter
def fake_install(self, value):
if not isinstance(value, bool):
raise ValueError("fake_install must be a boolean value")
self._fake_install = bool(value)
# not currently used
@property
def include_branch_auto_check(self):
return self._include_branch_auto_check
@include_branch_auto_check.setter
def include_branch_auto_check(self, value):
try:
self._include_branch_auto_check = bool(value)
except:
raise ValueError("include_branch_autocheck must be a boolean")
@property
def include_branch_list(self):
return self._include_branch_list
@include_branch_list.setter
def include_branch_list(self, value):
try:
if value is None:
self._include_branch_list = ["main"]
elif not isinstance(value, list) or len(value) == 0:
raise ValueError(
"include_branch_list should be a list of valid branches"
)
else:
self._include_branch_list = value
except:
raise ValueError("include_branch_list should be a list of valid branches")
@property
def include_branches(self):
return self._include_branches
@include_branches.setter
def include_branches(self, value):
try:
self._include_branches = bool(value)
except:
raise ValueError("include_branches must be a boolean value")
@property
def json(self):
if len(self._json) == 0:
self.set_updater_json()
return self._json
@property
def latest_release(self):
if self._latest_release is None:
return None
return self._latest_release
@property
def manual_only(self):
return self._manual_only
@manual_only.setter
def manual_only(self, value):
try:
self._manual_only = bool(value)
except:
raise ValueError("manual_only must be a boolean value")
@property
def overwrite_patterns(self):
return self._overwrite_patterns
@overwrite_patterns.setter
def overwrite_patterns(self, value):
if value is None:
self._overwrite_patterns = ["*.py", "*.pyc"]
elif not isinstance(value, list):
raise ValueError("overwrite_patterns needs to be in a list format")
else:
self._overwrite_patterns = value
@property
def private_token(self):
return self._engine.token
@private_token.setter
def private_token(self, value):
if value is None:
self._engine.token = None
else:
self._engine.token = str(value)
@property
def remove_pre_update_patterns(self):
return self._remove_pre_update_patterns
@remove_pre_update_patterns.setter
def remove_pre_update_patterns(self, value):
if value is None:
self._remove_pre_update_patterns = list()
elif not isinstance(value, list):
raise ValueError("remove_pre_update_patterns needs to be in a list format")
else:
self._remove_pre_update_patterns = value
@property
def repo(self):
return self._repo
@repo.setter
def repo(self, value):
try:
self._repo = str(value)
except:
raise ValueError("repo must be a string value")
@property
def select_link(self):
return self._select_link
@select_link.setter
def select_link(self, value):
# ensure it is a function assignment, with signature:
# input self, tag; returns link name
if not hasattr(value, "__call__"):
raise ValueError("select_link must be a function")
self._select_link = value
@property
def stage_path(self):
return self._updater_path
@stage_path.setter
def stage_path(self, value):
if value is None:
self.print_verbose("Aborting assigning stage_path, it's null")
return
elif value is not None and not os.path.exists(value):
try:
os.makedirs(value)
except:
self.print_verbose("Error trying to staging path")
self.print_trace()
return
self._updater_path = value
@property
def subfolder_path(self):
return self._subfolder_path
@subfolder_path.setter
def subfolder_path(self, value):
self._subfolder_path = value
@property
def tags(self):
if len(self._tags) == 0:
return list()
tag_names = list()
for tag in self._tags:
tag_names.append(tag["name"])
return tag_names
@property
def tag_latest(self):
if self._tag_latest is None:
return None
return self._tag_latest["name"]
@property
def update_link(self):
return self._update_link
@property
def update_ready(self):
return self._update_ready
@property
def update_version(self):
return self._update_version
@property
def use_releases(self):
return self._use_releases
@use_releases.setter
def use_releases(self, value):
try:
self._use_releases = bool(value)
except:
raise ValueError("use_releases must be a boolean value")
@property
def user(self):
return self._user
@user.setter
def user(self, value):
try:
self._user = str(value)
except:
raise ValueError("User must be a string value")
@property
def verbose(self):
return self._verbose
@verbose.setter
def verbose(self, value):
try:
self._verbose = bool(value)
self.print_verbose("Verbose is enabled")
except:
raise ValueError("Verbose must be a boolean value")
@property
def use_print_traces(self):
return self._use_print_traces
@use_print_traces.setter
def use_print_traces(self, value):
try:
self._use_print_traces = bool(value)
except:
raise ValueError("use_print_traces must be a boolean value")
@property
def version_max_update(self):
return self._version_max_update
@version_max_update.setter
def version_max_update(self, value):
if value is None:
self._version_max_update = None
return
if not isinstance(value, tuple):
raise ValueError("Version maximum must be a tuple")
for subvalue in value:
if type(subvalue) is not int:
raise ValueError("Version elements must be integers")
self._version_max_update = value
@property
def version_min_update(self):
return self._version_min_update
@version_min_update.setter
def version_min_update(self, value):
if value is None:
self._version_min_update = None
return
if not isinstance(value, tuple):
raise ValueError("Version minimum must be a tuple")
for subvalue in value:
if type(subvalue) != int:
raise ValueError("Version elements must be integers")
self._version_min_update = value
@property
def website(self):
return self._website
@website.setter
def website(self, value):
if not self.check_is_url(value):
raise ValueError("Not a valid URL: " + value)
self._website = value
# -------------------------------------------------------------------------
# Parameter validation related functions
# -------------------------------------------------------------------------
@staticmethod
def check_is_url(url):
if not ("http://" in url or "https://" in url):
return False
if "." not in url:
return False
return True
def _get_tag_names(self):
tag_names = list()
self.get_tags()
for tag in self._tags:
tag_names.append(tag["name"])
return tag_names
def set_check_interval(
self,
enable_prereleases=False,
enabled=False,
months=0,
days=14,
hours=0,
minutes=0,
):
"""Set the time interval between automated checks, and if enabled.
Has enabled = False as default to not check against frequency,
if enabled, default is 2 weeks.
"""
if type(enabled) is not bool:
raise ValueError("Enable must be a boolean value")
if type(months) is not int:
raise ValueError("Months must be an integer value")
if type(days) is not int:
raise ValueError("Days must be an integer value")
if type(hours) is not int:
raise ValueError("Hours must be an integer value")
if type(minutes) is not int:
raise ValueError("Minutes must be an integer value")
if not enabled:
self._check_interval_enabled = False
else:
self._check_interval_enabled = True
self._prereleases_enabled = enable_prereleases
self._check_interval_months = months
self._check_interval_days = days
self._check_interval_hours = hours
self._check_interval_minutes = minutes
def __repr__(self):
return "<Module updater from {a}>".format(a=__file__)
def __str__(self):
return "Updater, with user: {a}, repository: {b}, url: {c}".format(
a=self._user, b=self._repo, c=self.form_repo_url()
)
# -------------------------------------------------------------------------
# API-related functions
# -------------------------------------------------------------------------
def form_repo_url(self):
return self._engine.form_repo_url(self)
def form_tags_url(self):
return self._engine.form_tags_url(self)
def form_branch_url(self, branch):
return self._engine.form_branch_url(branch, self)
def get_tags(self):
request = self.form_tags_url()
self.print_verbose("Getting tags from server")
# get all tags, internet call
all_tags = self._engine.parse_tags(self.get_api(request), self)
if all_tags is not None:
self._prefiltered_tag_count = len(all_tags)
else:
self._prefiltered_tag_count = 0
all_tags = list()
# pre-process to skip tags
if self.skip_tag is not None:
self._tags = [tg for tg in all_tags if not self.skip_tag(self, tg)]
else:
self._tags = all_tags
# get additional branches too, if needed, and place in front
# Does NO checking here whether branch is valid
if self._include_branches:
temp_branches = self._include_branch_list.copy()
temp_branches.reverse()
for branch in temp_branches:
request = self.form_branch_url(branch)
include = {"name": branch.title(), "zipball_url": request}
self._tags = [include] + self._tags # append to front
if self._tags is None:
# some error occurred
self._tag_latest = None
self._tags = list()
elif self._prefiltered_tag_count == 0 and not self._include_branches:
self._tag_latest = None
if self._error is None: # if not None, could have had no internet
self._error = "No releases found"
self._error_msg = "No releases or tags found in repository"
self.print_verbose("No releases or tags found in repository")
elif self._prefiltered_tag_count == 0 and self._include_branches:
if not self._error:
self._tag_latest = self._tags[0]
branch = self._include_branch_list[0]
self.print_verbose(
"{} branch found, no releases: {}".format(branch, self._tags[0])
)
elif (
(
len(self._tags) - len(self._include_branch_list) == 0
and self._include_branches
)
or (len(self._tags) == 0 and not self._include_branches)
and self._prefiltered_tag_count > 0
):
self._tag_latest = None
self._error = "No releases available"
self._error_msg = "No versions found within compatible version range"
self.print_verbose(self._error_msg)
else:
if not self._include_branches:
self._tag_latest = self._tags[0]
self.print_verbose(
"Most recent tag found:" + str(self._tags[0]["name"])
)
else:
# Don't return branch if in list.
n = len(self._include_branch_list)
self._tag_latest = self._tags[n] # guaranteed at least len()=n+1
self.print_verbose(
"Most recent tag found:" + str(self._tags[n]["name"])
)
def get_raw(self, url):
"""All API calls to base url."""
request = urllib.request.Request(url)
try:
context = ssl._create_unverified_context()
except:
# Some blender packaged python versions don't have this, largely
# useful for local network setups otherwise minimal impact.
context = None
# Setup private request headers if appropriate.
if self._engine.token is not None:
if self._engine.name == "gitlab":
request.add_header("PRIVATE-TOKEN", self._engine.token)
else:
self.print_verbose("Tokens not setup for engine yet")
# Always set user agent.
request.add_header("User-Agent", "Python/" + str(platform.python_version()))
# Run the request.
try:
if context:
result = urllib.request.urlopen(request, context=context)
else:
result = urllib.request.urlopen(request)
except urllib.error.HTTPError as e:
if str(e.code) == "403":
self._error = "HTTP error (access denied)"
self._error_msg = str(e.code) + " - server error response"
print(self._error, self._error_msg)
else:
self._error = "HTTP error"
self._error_msg = str(e.code)
print(self._error, self._error_msg)
self.print_trace()
self._update_ready = None
except urllib.error.URLError as e:
reason = str(e.reason)
if "TLSV1_ALERT" in reason or "SSL" in reason.upper():
self._error = "Connection rejected, download manually"
self._error_msg = reason
print(self._error, self._error_msg)
else:
self._error = "URL error, check internet connection"
self._error_msg = reason
print(self._error, self._error_msg)
self.print_trace()
self._update_ready = None
return None
else:
result_string = result.read()
result.close()
return result_string.decode()
def get_api(self, url):
"""Result of all api calls, decoded into json format."""
get = None
get = self.get_raw(url)
if get is not None:
try:
return json.JSONDecoder().decode(get)
except Exception as e:
self._error = "API response has invalid JSON format"
self._error_msg = str(e.reason)
self._update_ready = None
print(self._error, self._error_msg)
self.print_trace()
return None
else:
return None
def stage_repository(self, url):
"""Create a working directory and download the new files"""
local = os.path.join(self._updater_path, "update_staging")
error = None
# Make/clear the staging folder, to ensure the folder is always clean.
self.print_verbose("Preparing staging folder for download:\n" + str(local))
if os.path.isdir(local):
try:
shutil.rmtree(local)
os.makedirs(local)
except:
error = "failed to remove existing staging directory"
self.print_trace()
else:
try:
os.makedirs(local)
except:
error = "failed to create staging directory"
self.print_trace()
if error is not None:
self.print_verbose("Error: Aborting update, " + error)
self._error = "Update aborted, staging path error"
self._error_msg = "Error: {}".format(error)
return False
if self._backup_current:
self.create_backup()
self.print_verbose("Now retrieving the new source zip")
self._source_zip = os.path.join(local, "source.zip")
self.print_verbose("Starting download update zip")
try:
request = urllib.request.Request(url)
context = ssl._create_unverified_context()
# Setup private token if appropriate.
if self._engine.token is not None:
if self._engine.name == "gitlab":
request.add_header("PRIVATE-TOKEN", self._engine.token)
else:
self.print_verbose("Tokens not setup for selected engine yet")
# Always set user agent
request.add_header("User-Agent", "Python/" + str(platform.python_version()))
self.url_retrieve(
urllib.request.urlopen(request, context=context), self._source_zip
)
# Add additional checks on file size being non-zero.
self.print_verbose("Successfully downloaded update zip")
return True
except Exception as e:
self._error = "Error retrieving download, bad link?"
self._error_msg = "Error: {}".format(e)
print("Error retrieving download, bad link?")
print("Error: {}".format(e))
self.print_trace()
return False
def create_backup(self):
"""Save a backup of the current installed addon prior to an update."""
self.print_verbose("Backing up current addon folder")
local = os.path.join(self._updater_path, "backup")
tempdest = os.path.join(
self._addon_root, os.pardir, self._addon + "_updater_backup_temp"
)
self.print_verbose("Backup destination path: " + str(local))
if os.path.isdir(local):
try:
shutil.rmtree(local)
except:
self.print_verbose(
"Failed to removed previous backup folder, continuing"
)
self.print_trace()
# Remove the temp folder.
# Shouldn't exist but could if previously interrupted.
if os.path.isdir(tempdest):
try:
shutil.rmtree(tempdest)
except:
self.print_verbose("Failed to remove existing temp folder, continuing")
self.print_trace()
# Make a full addon copy, temporarily placed outside the addon folder.
if self._backup_ignore_patterns is not None:
try:
shutil.copytree(
self._addon_root,
tempdest,
ignore=shutil.ignore_patterns(*self._backup_ignore_patterns),
)
except:
print("Failed to create backup, still attempting update.")
self.print_trace()
return
else:
try:
shutil.copytree(self._addon_root, tempdest)
except:
print("Failed to create backup, still attempting update.")
self.print_trace()
return
shutil.move(tempdest, local)
# Save the date for future reference.
now = datetime.now()
self._json["backup_date"] = "{m}-{d}-{yr}".format(
m=now.strftime("%B"), d=now.day, yr=now.year
)
self.save_updater_json()
def restore_backup(self):
"""Restore the last backed up addon version, user initiated only"""
self.print_verbose("Restoring backup, backing up current addon folder")
backuploc = os.path.join(self._updater_path, "backup")
tempdest = os.path.join(
self._addon_root, os.pardir, self._addon + "_updater_backup_temp"
)
tempdest = os.path.abspath(tempdest)
# Move instead contents back in place, instead of copy.
shutil.move(backuploc, tempdest)
shutil.rmtree(self._addon_root)
os.rename(tempdest, self._addon_root)
self._json["backup_date"] = ""
self._json["just_restored"] = True
self._json["just_updated"] = True
self.save_updater_json()
self.reload_addon()
def unpack_staged_zip(self, clean=False):
"""Unzip the downloaded file, and validate contents"""
if not os.path.isfile(self._source_zip):
self.print_verbose("Error, update zip not found")
self._error = "Install failed"
self._error_msg = "Downloaded zip not found"
return -1
# Clear the existing source folder in case previous files remain.
outdir = os.path.join(self._updater_path, "source")
try:
shutil.rmtree(outdir)
self.print_verbose("Source folder cleared")
except:
self.print_trace()
# Create parent directories if needed, would not be relevant unless
# installing addon into another location or via an addon manager.
try:
os.mkdir(outdir)
except Exception as err:
print("Error occurred while making extract dir:")
print(str(err))
self.print_trace()
self._error = "Install failed"
self._error_msg = "Failed to make extract directory"
return -1
if not os.path.isdir(outdir):
print("Failed to create source directory")
self._error = "Install failed"
self._error_msg = "Failed to create extract directory"
return -1
self.print_verbose("Begin extracting source from zip:" + str(self._source_zip))
with zipfile.ZipFile(self._source_zip, "r") as zfile:
if not zfile:
self._error = "Install failed"
self._error_msg = "Resulting file is not a zip, cannot extract"
self.print_verbose(self._error_msg)
return -1
# Now extract directly from the first subfolder (not root)
# this avoids adding the first subfolder to the path length,
# which can be too long if the download has the SHA in the name.
zsep = "/" # Not using os.sep, always the / value even on windows.
for name in zfile.namelist():
if zsep not in name:
continue
top_folder = name[: name.index(zsep) + 1]
if name == top_folder + zsep:
continue # skip top level folder
sub_path = name[name.index(zsep) + 1 :]
if name.endswith(zsep):
try:
os.mkdir(os.path.join(outdir, sub_path))
self.print_verbose(
"Extract - mkdir: " + os.path.join(outdir, sub_path)
)
except OSError as exc:
if exc.errno != errno.EEXIST:
self._error = "Install failed"
self._error_msg = "Could not create folder from zip"
self.print_trace()
return -1
else:
with open(os.path.join(outdir, sub_path), "wb") as outfile:
data = zfile.read(name)
outfile.write(data)
self.print_verbose(
"Extract - create: " + os.path.join(outdir, sub_path)
)
self.print_verbose("Extracted source")
unpath = os.path.join(self._updater_path, "source")
if not os.path.isdir(unpath):
self._error = "Install failed"
self._error_msg = "Extracted path does not exist"
print("Extracted path does not exist: ", unpath)
return -1
if self._subfolder_path:
self._subfolder_path.replace("/", os.path.sep)
self._subfolder_path.replace("\\", os.path.sep)
# Either directly in root of zip/one subfolder, or use specified path.
if not os.path.isfile(os.path.join(unpath, "__init__.py")):
dirlist = os.listdir(unpath)
if len(dirlist) > 0:
if self._subfolder_path == "" or self._subfolder_path is None:
unpath = os.path.join(unpath, dirlist[0])
else:
unpath = os.path.join(unpath, self._subfolder_path)
# Smarter check for additional sub folders for a single folder
# containing the __init__.py file.
if not os.path.isfile(os.path.join(unpath, "__init__.py")):
print("Not a valid addon found")
print("Paths:")
print(dirlist)
self._error = "Install failed"
self._error_msg = "No __init__ file found in new source"
return -1
# Merge code with the addon directory, using blender default behavior,
# plus any modifiers indicated by user (e.g. force remove/keep).
self.deep_merge_directory(self._addon_root, unpath, clean)
# Now save the json state.
# Change to True to trigger the handler on other side if allowing
# reloading within same blender session.
self._json["just_updated"] = True
self.save_updater_json()
self.reload_addon()
self._update_ready = False
return 0
def deep_merge_directory(self, base, merger, clean=False):
"""Merge folder 'merger' into 'base' without deleting existing"""
if not os.path.exists(base):
self.print_verbose("Base path does not exist:" + str(base))
return -1
elif not os.path.exists(merger):
self.print_verbose("Merger path does not exist")
return -1
# Path to be aware of and not overwrite/remove/etc.
staging_path = os.path.join(self._updater_path, "update_staging")
# If clean install is enabled, clear existing files ahead of time
# note: will not delete the update.json, update folder, staging, or
# staging but will delete all other folders/files in addon directory.
error = None
if clean:
try:
# Implement clearing of all folders/files, except the updater
# folder and updater json.
# Careful, this deletes entire subdirectories recursively...
# Make sure that base is not a high level shared folder, but
# is dedicated just to the addon itself.
self.print_verbose(
"clean=True, clearing addon folder to fresh install state"
)
# Remove root files and folders (except update folder).
files = [
f for f in os.listdir(base) if os.path.isfile(os.path.join(base, f))
]
folders = [
f for f in os.listdir(base) if os.path.isdir(os.path.join(base, f))
]
for f in files:
try:
os.remove(os.path.join(base, f))
self.print_verbose(
"Clean removing file {}".format(os.path.join(base, f))
)
except Exception as e:
print(f"Error removing file {os.path.join(base, f)}: {e}")
for f in folders:
if os.path.join(base, f) is self._updater_path:
continue
try:
shutil.rmtree(os.path.join(base, f))
self.print_verbose(
"Clean removing folder and contents {}".format(
os.path.join(base, f)
)
)
except Exception as e:
print(f"Error removing folder {os.path.join(base, f)}: {e}")
except Exception as err:
error = "failed to create clean existing addon folder"
print(error, str(err))
self.print_trace()
# Walk through the base addon folder for rules on pre-removing
# but avoid removing/altering backup and updater file.
for path, dirs, files in os.walk(base):
# Prune ie skip updater folder.
dirs[:] = [
d for d in dirs if os.path.join(path, d) not in [self._updater_path]
]
for file in files:
for pattern in self.remove_pre_update_patterns:
if fnmatch.filter([file], pattern):
try:
fl = os.path.join(path, file)
os.remove(fl)
self.print_verbose("Pre-removed file " + file)
except OSError:
print("Failed to pre-remove " + file)
self.print_trace()
# Walk through the temp addon sub folder for replacements
# this implements the overwrite rules, which apply after
# the above pre-removal rules. This also performs the
# actual file copying/replacements.
for path, dirs, files in os.walk(merger):
# Verify structure works to prune updater sub folder overwriting.
dirs[:] = [
d for d in dirs if os.path.join(path, d) not in [self._updater_path]
]
rel_path = os.path.relpath(path, merger)
dest_path = os.path.join(base, rel_path)
if not os.path.exists(dest_path):
os.makedirs(dest_path)
for file in files:
try:
# Bring in additional logic around copying/replacing.
# Blender default: overwrite .py's, don't overwrite the rest.
dest_file = os.path.join(dest_path, file)
srcFile = os.path.join(path, file)
# Decide to replace if file already exists, and copy new over.
if os.path.isfile(dest_file):
# Otherwise, check each file for overwrite pattern match.
replaced = False
for pattern in self._overwrite_patterns:
if fnmatch.filter([file], pattern):
replaced = True
break
if replaced:
os.remove(dest_file)
os.rename(srcFile, dest_file)
self.print_verbose(
"Overwrote file " + os.path.basename(dest_file)
)
else:
self.print_verbose(
"Pattern not matched to {}, not overwritten".format(
os.path.basename(dest_file)
)
)
else:
# File did not previously exist, simply move it over.
os.rename(srcFile, dest_file)
self.print_verbose("New file " + os.path.basename(dest_file))
except Exception as e:
print(f"Error replacing file {file}: {e}")
# now remove the temp staging folder and downloaded zip
try:
shutil.rmtree(staging_path)
except:
error = (
"Error: Failed to remove existing staging directory, "
"consider manually removing "
) + staging_path
self.print_verbose(error)
self.print_trace()
def reload_addon(self):
# if post_update false, skip this function
# else, unload/reload addon & trigger popup
if not self._auto_reload_post_update:
print("Restart blender to reload addon and complete update")
return
self.print_verbose("Reloading addon...")
addon_utils.modules(refresh=True)
bpy.utils.refresh_script_paths()
# not allowed in restricted context, such as register module
# toggle to refresh
if "addon_disable" in dir(bpy.ops.wm): # 2.7
bpy.ops.wm.addon_disable(module=self._addon_package)
bpy.ops.wm.addon_refresh()
bpy.ops.wm.addon_enable(module=self._addon_package)
print("2.7 reload complete")
else: # 2.8
bpy.ops.preferences.addon_disable(module=self._addon_package)
bpy.ops.preferences.addon_refresh()
bpy.ops.preferences.addon_enable(module=self._addon_package)
print("2.8 reload complete")
# -------------------------------------------------------------------------
# Other non-api functions and setups
# -------------------------------------------------------------------------
def clear_state(self):
self._update_ready = None
self._update_link = None
self._update_version = None
self._source_zip = None
self._error = None
self._error_msg = None
def url_retrieve(self, url_file, filepath):
"""Custom urlretrieve implementation"""
chunk = 1024 * 8
f = open(filepath, "wb")
while 1:
data = url_file.read(chunk)
if not data:
# print("done.")
break
f.write(data)
# print("Read %s bytes" % len(data))
f.close()
def version_tuple_from_text(self, text):
"""Convert text into a tuple of numbers (int).
Should go through string and remove all non-integers, and for any
given break split into a different section.
"""
if text is None:
return ()
segments = list()
tmp = ""
for char in str(text):
if not char.isdigit():
if len(tmp) > 0:
segments.append(int(tmp))
tmp = ""
else:
tmp += char
if len(tmp) > 0:
segments.append(int(tmp))
if len(segments) == 0:
self.print_verbose("No version strings found text: " + str(text))
if not self._include_branches:
return ()
else:
return text
return tuple(segments)
def check_for_update_async(self, callback=None):
"""Called for running check in a background thread"""
is_ready = (
self._json is not None
and "update_ready" in self._json
and self._json["version_text"] != dict()
and self._json["update_ready"]
)
if is_ready:
self._update_ready = True
self._update_link = self._json["version_text"]["link"]
self._update_version = str(self._json["version_text"]["version"])
# Cached update.
callback(True)
return
# do the check
if not self._check_interval_enabled:
return
elif self._async_checking:
self.print_verbose("Skipping async check, already started")
# already running the bg thread
elif self._update_ready is None:
print("{} updater: Running background check for update".format(self.addon))
self.start_async_check_update(False, callback)
def check_for_update_now(self, callback=None):
self._error = None
self._error_msg = None
self.print_verbose("Check update pressed, first getting current status")
if self._async_checking:
self.print_verbose("Skipping async check, already started")
return # already running the bg thread
elif self._update_ready is None:
self.start_async_check_update(True, callback)
else:
self._update_ready = None
self.start_async_check_update(True, callback)
def check_for_update(self, now=False):
"""Check for update not in a syncrhonous manner.
This function is not async, will always return in sequential fashion
but should have a parent which calls it in another thread.
"""
self.print_verbose("Checking for update function")
# clear the errors if any
self._error = None
self._error_msg = None
# avoid running again in, just return past result if found
# but if force now check, then still do it
if self._update_ready is not None and not now:
return (self._update_ready, self._update_version, self._update_link)
if self._current_version is None:
raise ValueError("current_version not yet defined")
if self._repo is None:
raise ValueError("repo not yet defined")
if self._user is None:
raise ValueError("username not yet defined")
self.set_updater_json() # self._json
if not now and not self.past_interval_timestamp():
self.print_verbose("Aborting check for updated, check interval not reached")
return (False, None, None)
# check if using tags or releases
# note that if called the first time, this will pull tags from online
if self._fake_install:
self.print_verbose("fake_install = True, setting fake version as ready")
self._update_ready = True
self._update_version = "(999,999,999)"
self._update_link = "http://127.0.0.1"
return (self._update_ready, self._update_version, self._update_link)
# Primary internet call, sets self._tags and self._tag_latest.
self.get_tags()
self._json["last_check"] = str(datetime.now())
self.save_updater_json()
# Can be () or ('master') in addition to branches, and version tag.
new_version = self.version_tuple_from_text(self.tag_latest)
if len(self._tags) == 0:
self._update_ready = False
self._update_version = None
self._update_link = None
return (False, None, None)
if not self._include_branches:
link = self.select_link(self, self._tags[0])
else:
n = len(self._include_branch_list)
if len(self._tags) == n:
# effectively means no tags found on repo
# so provide the first one as default
link = self.select_link(self, self._tags[0])
else:
link = self.select_link(self, self._tags[n])
if new_version == ():
self._update_ready = False
self._update_version = None
self._update_link = None
return (False, None, None)
elif str(new_version).lower() in self._include_branch_list:
# Handle situation where master/whichever branch is included
# however, this code effectively is not triggered now
# as new_version will only be tag names, not branch names.
if not self._include_branch_auto_check:
# Don't offer update as ready, but set the link for the
# default branch for installing.
self._update_ready = False
self._update_version = new_version
self._update_link = link
self.save_updater_json()
return (True, new_version, link)
else:
# Bypass releases and look at timestamp of last update from a
# branch compared to now, see if commit values match or not.
raise ValueError("include_branch_autocheck: NOT YET DEVELOPED")
else:
# Situation where branches not included.
if new_version > self._current_version:
self._update_ready = True
self._update_version = new_version
self._update_link = link
self.save_updater_json()
return (True, new_version, link)
# If no update, set ready to False from None to show it was checked.
self._update_ready = False
self._update_version = None
self._update_link = None
return (False, None, None)
def set_tag(self, name):
"""Assign the tag name and url to update to"""
tg = None
for tag in self._tags:
if name == tag["name"]:
tg = tag
break
if tg:
new_version = self.version_tuple_from_text(self.tag_latest)
self._update_version = new_version
self._update_link = self.select_link(self, tg)
elif self._include_branches and name in self._include_branch_list:
# scenario if reverting to a specific branch name instead of tag
tg = name
link = self.form_branch_url(tg)
self._update_version = name # this will break things
self._update_link = link
if not tg:
raise ValueError("Version tag not found: " + name)
def run_update(self, force=False, revert_tag=None, clean=False, callback=None):
"""Runs an install, update, or reversion of an addon from online source
Arguments:
force: Install assigned link, even if self.update_ready is False
revert_tag: Version to install, if none uses detected update link
clean: not used, but in future could use to totally refresh addon
callback: used to run function on update completion
"""
self._json["update_ready"] = False
self._json["ignore"] = False # clear ignore flag
self._json["version_text"] = dict()
if revert_tag is not None:
self.set_tag(revert_tag)
self._update_ready = True
# clear the errors if any
self._error = None
self._error_msg = None
self.print_verbose("Running update")
if self._fake_install:
# Change to True, to trigger the reload/"update installed" handler.
self.print_verbose("fake_install=True")
self.print_verbose("Just reloading and running any handler triggers")
self._json["just_updated"] = True
self.save_updater_json()
if self._backup_current is True:
self.create_backup()
self.reload_addon()
self._update_ready = False
res = True # fake "success" zip download flag
elif not force:
if not self._update_ready:
self.print_verbose("Update stopped, new version not ready")
if callback:
callback(
self._addon_package, "Update stopped, new version not ready"
)
return "Update stopped, new version not ready"
elif self._update_link is None:
# this shouldn't happen if update is ready
self.print_verbose("Update stopped, update link unavailable")
if callback:
callback(
self._addon_package, "Update stopped, update link unavailable"
)
return "Update stopped, update link unavailable"
if revert_tag is None:
self.print_verbose("Staging update")
else:
self.print_verbose("Staging install")
res = self.stage_repository(self._update_link)
if not res:
print("Error in staging repository: " + str(res))
if callback is not None:
callback(self._addon_package, self._error_msg)
return self._error_msg
res = self.unpack_staged_zip(clean)
if res < 0:
if callback:
callback(self._addon_package, self._error_msg)
return res
else:
if self._update_link is None:
self.print_verbose("Update stopped, could not get link")
return "Update stopped, could not get link"
self.print_verbose("Forcing update")
res = self.stage_repository(self._update_link)
if not res:
print("Error in staging repository: " + str(res))
if callback:
callback(self._addon_package, self._error_msg)
return self._error_msg
res = self.unpack_staged_zip(clean)
if res < 0:
return res
# would need to compare against other versions held in tags
# run the front-end's callback if provided
if callback:
callback(self._addon_package)
# return something meaningful, 0 means it worked
return 0
def past_interval_timestamp(self):
if self._prereleases_enabled:
self.print_verbose("Prereleases enabled. Checking for updates now!")
return True
if not self._check_interval_enabled:
return True # ie this exact feature is disabled
if "last_check" not in self._json or self._json["last_check"] == "":
return True
now = datetime.now()
last_check = datetime.strptime(self._json["last_check"], "%Y-%m-%d %H:%M:%S.%f")
offset = timedelta(
days=self._check_interval_days + 30 * self._check_interval_months,
hours=self._check_interval_hours,
minutes=self._check_interval_minutes,
)
delta = (now - offset) - last_check
if delta.total_seconds() > 0:
self.print_verbose("Time to check for updates!")
return True
self.print_verbose("Determined it's not yet time to check for updates")
return False
def get_json_path(self):
"""Returns the full path to the JSON state file used by this updater.
Will also rename old file paths to addon-specific path if found.
"""
json_path = os.path.join(
self._updater_path, "{}_updater_status.json".format(self._addon_package)
)
old_json_path = os.path.join(self._updater_path, "updater_status.json")
# Rename old file if it exists.
try:
os.rename(old_json_path, json_path)
except FileNotFoundError:
pass
except Exception as err:
print("Other OS error occurred while trying to rename old JSON")
print(err)
self.print_trace()
return json_path
def set_updater_json(self):
"""Load or initialize JSON dictionary data for updater state"""
if self._updater_path is None:
raise ValueError("updater_path is not defined")
elif not os.path.isdir(self._updater_path):
os.makedirs(self._updater_path)
jpath = self.get_json_path()
if os.path.isfile(jpath):
with open(jpath) as data_file:
self._json = json.load(data_file)
self.print_verbose("Read in JSON settings from file")
else:
self._json = {
"last_check": "",
"backup_date": "",
"update_ready": False,
"ignore": False,
"just_restored": False,
"just_updated": False,
"version_text": dict(),
}
self.save_updater_json()
def save_updater_json(self):
"""Trigger save of current json structure into file within addon"""
if self._update_ready:
if isinstance(self._update_version, tuple):
self._json["update_ready"] = True
self._json["version_text"]["link"] = self._update_link
self._json["version_text"]["version"] = self._update_version
else:
self._json["update_ready"] = False
self._json["version_text"] = dict()
else:
self._json["update_ready"] = False
self._json["version_text"] = dict()
jpath = self.get_json_path()
if not os.path.isdir(os.path.dirname(jpath)):
print(
"State error: Directory does not exist, cannot save json: ",
os.path.basename(jpath),
)
return
try:
with open(jpath, "w") as outf:
data_out = json.dumps(self._json, indent=4)
outf.write(data_out)
except:
print("Failed to open/save data to json: ", jpath)
self.print_trace()
self.print_verbose("Wrote out updater JSON settings with content:")
self.print_verbose(str(self._json))
def json_reset_postupdate(self):
self._json["just_updated"] = False
self._json["update_ready"] = False
self._json["version_text"] = dict()
self.save_updater_json()
def json_reset_restore(self):
self._json["just_restored"] = False
self._json["update_ready"] = False
self._json["version_text"] = dict()
self.save_updater_json()
self._update_ready = None # Reset so you could check update again.
def ignore_update(self):
self._json["ignore"] = True
self.save_updater_json()
# -------------------------------------------------------------------------
# ASYNC related methods
# -------------------------------------------------------------------------
def start_async_check_update(self, now=False, callback=None):
"""Start a background thread which will check for updates"""
if self._async_checking:
return
self.print_verbose("Starting background checking thread")
check_thread = threading.Thread(
target=self.async_check_update,
args=(
now,
callback,
),
)
check_thread.daemon = True
self._check_thread = check_thread
check_thread.start()
def async_check_update(self, now, callback=None):
"""Perform update check, run as target of background thread"""
self._async_checking = True
self.print_verbose("Checking for update now in background")
try:
self.check_for_update(now=now)
except Exception as exception:
print("Checking for update error:")
print(exception)
self.print_trace()
if not self._error:
self._update_ready = False
self._update_version = None
self._update_link = None
self._error = "Error occurred"
self._error_msg = "Encountered an error while checking for updates"
self._async_checking = False
self._check_thread = None
if callback:
self.print_verbose("Finished check update, doing callback")
callback(self._update_ready)
self.print_verbose("BG thread: Finished check update, no callback")
def stop_async_check_update(self):
"""Method to give impression of stopping check for update.
Currently does nothing but allows user to retry/stop blocking UI from
hitting a refresh button. This does not actually stop the thread, as it
will complete after the connection timeout regardless. If the thread
does complete with a successful response, this will be still displayed
on next UI refresh (ie no update, or update available).
"""
if self._check_thread is not None:
self.print_verbose("Thread will end in normal course.")
# however, "There is no direct kill method on a thread object."
# better to let it run its course
# self._check_thread.stop()
self._async_checking = False
self._error = None
self._error_msg = None
# -----------------------------------------------------------------------------
# Updater Engines
# -----------------------------------------------------------------------------
class BitbucketEngine:
"""Integration to Bitbucket API for git-formatted repositories"""
def __init__(self):
self.api_url = "https://api.bitbucket.org"
self.token = None
self.name = "bitbucket"
def form_repo_url(self, updater):
return "{}/2.0/repositories/{}/{}".format(
self.api_url, updater.user, updater.repo
)
def form_tags_url(self, updater):
return self.form_repo_url(updater) + "/refs/tags?sort=-name"
def form_branch_url(self, branch, updater):
return self.get_zip_url(branch, updater)
def get_zip_url(self, name, updater):
return "https://bitbucket.org/{user}/{repo}/get/{name}.zip".format(
user=updater.user, repo=updater.repo, name=name
)
def parse_tags(self, response, updater):
if response is None:
return list()
return [
{"name": tag["name"], "zipball_url": self.get_zip_url(tag["name"], updater)}
for tag in response["values"]
]
class GithubEngine:
"""Integration to Github API"""
def __init__(self):
self.api_url = "https://api.github.com"
self.token = None
self.name = "github"
def form_repo_url(self, updater):
return "{}/repos/{}/{}".format(self.api_url, updater.user, updater.repo)
def form_tags_url(self, updater):
if updater.use_releases:
return "{}/releases".format(self.form_repo_url(updater))
else:
return "{}/tags".format(self.form_repo_url(updater))
def form_branch_list_url(self, updater):
return "{}/branches".format(self.form_repo_url(updater))
def form_branch_url(self, branch, updater):
return "{}/zipball/{}".format(self.form_repo_url(updater), branch)
def parse_tags(self, response, updater):
if response is None:
return list()
return response
class GitlabEngine:
"""Integration to GitLab API"""
def __init__(self):
self.api_url = "https://gitlab.com"
self.token = None
self.name = "gitlab"
def form_repo_url(self, updater):
return "{}/api/v4/projects/{}".format(self.api_url, updater.repo)
def form_tags_url(self, updater):
return "{}/repository/tags".format(self.form_repo_url(updater))
def form_branch_list_url(self, updater):
# does not validate branch name.
return "{}/repository/branches".format(self.form_repo_url(updater))
def form_branch_url(self, branch, updater):
# Could clash with tag names and if it does, it will download TAG zip
# instead of branch zip to get direct path, would need.
return "{}/repository/archive.zip?sha={}".format(
self.form_repo_url(updater), branch
)
def get_zip_url(self, sha, updater):
return "{base}/repository/archive.zip?sha={sha}".format(
base=self.form_repo_url(updater), sha=sha
)
# def get_commit_zip(self, id, updater):
# return self.form_repo_url(updater)+"/repository/archive.zip?sha:"+id
def parse_tags(self, response, updater):
if response is None:
return list()
return [
{
"name": tag["name"],
"zipball_url": self.get_zip_url(tag["commit"]["id"], updater),
}
for tag in response
]
# -----------------------------------------------------------------------------
# The module-shared class instance,
# should be what's imported to other files
# -----------------------------------------------------------------------------
Updater = SingletonUpdater()