# ------------------------------------------------------------------------------ # The MIT License (MIT) # # Copyright (c) 2014-2019 Digital Sapphire # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # ------------------------------------------------------------------------------ import io import gzip import logging import os import re import sys from dsdev_utils.exceptions import VersionError log = logging.getLogger(__name__) # Decompress gzip data # # Args: # # data (str): Gzip data # # # Returns: # # (data): Decompressed data def gzip_decompress(data): # if isinstance(data, six.binary_type): # data = data.decode() compressed_file = io.BytesIO() compressed_file.write(data) # # Set the file's current position to the beginning # of the file so that gzip.GzipFile can read # its contents from the top. # compressed_file.seek(0) decompressed_file = gzip.GzipFile(fileobj=compressed_file, mode='rb') data = decompressed_file.read() compressed_file.close() decompressed_file.close() return data def lazy_import(func): """Decorator for declaring a lazy import. This decorator turns a function into an object that will act as a lazy importer. Whenever the object's attributes are accessed, the function is called and its return value used in place of the object. So you can declare lazy imports like this: @lazy_import def socket(): import socket return socket The name "socket" will then be bound to a transparent object proxy which will import the socket module upon first use. The syntax here is slightly more verbose than other lazy import recipes, but it's designed not to hide the actual "import" statements from tools like pyinstaller or grep. """ try: f = sys._getframe(1) except Exception: # pragma: no cover namespace = None else: namespace = f.f_locals return _LazyImport(func.__name__, func, namespace) class _LazyImport(object): """Class representing a lazy import.""" def __init__(self, name, loader, namespace=None): self._dsdev_lazy_target = _LazyImport self._dsdev_lazy_name = name self._dsdev_lazy_loader = loader self._dsdev_lazy_namespace = namespace def _dsdev_lazy_load(self): if self._dsdev_lazy_target is _LazyImport: self._dsdev_lazy_target = self._dsdev_lazy_loader() ns = self._dsdev_lazy_namespace if ns is not None: try: if ns[self._dsdev_lazy_name] is self: ns[self._dsdev_lazy_name] = self._dsdev_lazy_target except KeyError: # pragma: no cover pass def __getattribute__(self, attr): # pragma: no cover try: return object.__getattribute__(self, attr) except AttributeError: if self._dsdev_lazy_target is _LazyImport: self._dsdev_lazy_load() return getattr(self._dsdev_lazy_target, attr) def __nonzero__(self): # pragma: no cover if self._dsdev_lazy_target is _LazyImport: self._dsdev_lazy_load() return bool(self._dsdev_lazy_target) def __str__(self): # pragma: no cover return '_LazyImport: {}'.format(self._dsdev_lazy_name) # Normalizes version strings of different types. Examples # include 1.2, 1.2.1, 1.2b and 1.1.1b # # Args: # # version (str): Version number to normalizes class Version(object): v_re = re.compile(r'(?P\d+)\.(?P\d+)\.?(?P' r'\d+)?-?(?P[abehl' r'pt]+)?-?(?P\d+)?') v_re_big = re.compile(r'(?P\d+)\.(?P\d+)\.' r'(?P\d+)\.(?P\d+)' r'\.(?P\d+)') def __init__(self, version): self.original_version = version self._parse_version_str(version) self.version_str = None def _parse_version_str(self, version): count = self._quick_sanitize(version) try: # version in the form of 1.1, 1.1.1, 1.1.1-b1, 1.1.1a2 if count == 4: version_data = self._parse_parsed_version(version) else: version_data = self._parse_version(version) except AssertionError: raise VersionError('Cannot parse version') self.major = int(version_data.get('major', 0)) self.minor = int(version_data.get('minor', 0)) patch = version_data.get('patch') if patch is None: self.patch = 0 else: self.patch = int(patch) release = version_data.get('release') self.channel = 'stable' if release is None: self.release = 2 # Convert to number for easy comparison and sorting elif release in ['b', 'beta', '1']: self.release = 1 self.channel = 'beta' elif release in ['a', 'alpha', '0']: self.release = 0 self.channel = 'alpha' else: log.debug('Setting release as stable. ' 'Disregard if not prerelease') # Marking release as stable self.release = 2 release_version = version_data.get('releaseversion') if release_version is None: self.release_version = 0 else: self.release_version = int(release_version) self.version_tuple = (self.major, self.minor, self.patch, self.release, self.release_version) self.version_str = str(self.version_tuple) def _parse_version(self, version): r = self.v_re.search(version) assert r is not None return r.groupdict() def _parse_parsed_version(self, version): r = self.v_re_big.search(version) assert r is not None return r.groupdict() @staticmethod def _quick_sanitize(version): log.debug('Version str: %s', version) ext = os.path.splitext(version)[1] # Removing file extensions, to ensure count isn't # contaminated if ext == '.zip': log.debug('Removed ".zip"') version = version[:-4] elif ext == '.gz': log.debug('Removed ".tar.gz"') version = version[:-7] elif ext == '.bz2': log.debug('Removed ".tar.bz2"') version = version[:-8] count = version.count('.') # There will be 4 dots when version is passed # That was created with Version object. # 1.1 once parsed will be 1.1.0.0.0 if count not in [1, 2, 4]: msg = ('Incorrect version format. 1 or 2 dots ' 'You have {} dots'.format(count)) log.debug(msg) raise VersionError(msg) return count def __str__(self): return '.'.join(map(str, self.version_tuple)) def __repr__(self): return '{}: {}'.format(self.__class__.__name__, self.version_str) def __hash__(self): return hash(self.version_tuple) def __eq__(self, obj): return self.version_tuple == obj.version_tuple def __ne__(self, obj): return self.version_tuple != obj.version_tuple def __lt__(self, obj): return self.version_tuple < obj.version_tuple def __gt__(self, obj): return self.version_tuple > obj.version_tuple def __le__(self, obj): return self.version_tuple <= obj.version_tuple def __ge__(self, obj): return self.version_tuple >= obj.version_tuple # Provides access to dict by pass a specially made key to # the get method. Default key sep is "*". Example key would be # updates*mac*1.7.0 would access {"updates":{"mac":{"1.7.0": "hi there"}}} # and return "hi there" # # Kwargs: # # dict_ (dict): Dict you would like easy asses to. # # sep (str): Used as a delimiter between keys class EasyAccessDict(object): def __init__(self, dict_=None, sep='*'): self.sep = sep if not isinstance(dict_, dict): self.dict = {} else: self.dict = dict_ # Retrive value from internal dict. # # args: # # key (str): Key to access value # # Returns: # # (object): Value of key if found or None def get(self, key): try: layers = key.split(self.sep) value = self.dict for key in layers: value = value[key] return value except KeyError: return None except Exception: # pragma: no cover return None # Because I always forget call the get method def __call__(self, key): return self.get(key) def __str__(self): return str(self.dict)