# # Copyright 2019-2020 by Vinay Sajip. All Rights Reserved. # try: from collections.abc import Mapping except ImportError: from collections import Mapping import datetime import importlib import io import logging import os import re import sys from .tokens import Token, SCALAR_TOKENS, WORD, BACKTICK, DOLLAR from .parser import ( Parser, MappingBody, ListBody, ASTNode, ODict, open_file, RecognizerError, ParserError, ) __all__ = ['Config', 'ConfigFormatError', 'ConfigError'] logger = logging.getLogger(__name__) if sys.version_info[0] < 3: class timezone(datetime.tzinfo): def __init__(self, td): self.td = td def utcoffset(self, dt): return self.td def dst(self, dt): # pragma: no cover return datetime.timedelta(0) else: from datetime import timezone basestring = str __version__ = '0.5.0.post0' class ConfigFormatError(ParserError): pass class ConfigError(ValueError): pass # This is a marker used in get(key, defaultValue) to catch rather than KeyError class KeyNotFoundError(ConfigError): pass def _parse_path(path): p = Parser(io.StringIO(path)) try: p.advance() if p.token.kind != WORD: raise ConfigError('Invalid path: %s' % path) result = p.primary() if not p.at_end: raise ConfigError('Invalid path: %s' % path) except RecognizerError as e: raise ConfigError('Invalid path: %s: %s' % (path, e)) return result def _path_iterator(path): def visit(node): if isinstance(node, Token): yield node else: op = node['op'] if 'operand' in node: for val in visit(node['operand']): yield val else: for val in visit(node['lhs']): yield val if op == '.': yield op, node['rhs'].value else: assert op in ('[', ':') yield op, node['rhs'] for v in visit(path): yield v def _to_source(node): if isinstance(node, Token): return str(node.value) pi = _path_iterator(node) parts = [next(pi).value] for op, operand in pi: if op == '.': parts.append('.') parts.append(operand) elif op == ':': parts.append('[') start, stop, step = operand if start is not None: parts.append(_to_source(start)) parts.append(':') if stop is not None: parts.append(_to_source(stop)) if step is not None: parts.append(':') parts.append(_to_source(step)) parts.append(']') elif op == '[': parts.append('[') parts.append(_to_source(operand)) parts.append(']') else: # pragma: no cover raise ConfigError('Unable to navigate: %s' % node) return ''.join(parts) def _unwrap(o): if isinstance(o, DictWrapper): result = o.as_dict() elif isinstance(o, ListWrapper): result = o.as_list() else: result = o return result # noinspection PyUnboundLocalVariable _SYSTEM_TYPES = (basestring, bool, int, float, datetime.datetime, datetime.date) # noinspection PyTypeChecker _SCALAR_TYPES = _SYSTEM_TYPES + (Token,) def _merge_dicts(target, source): for k, v in source.items(): if k in target and isinstance(target[k], dict) and isinstance(v, Mapping): _merge_dicts(target[k], v) else: target[k] = source[k] # use negative lookahead to disallow anything starting with a digit. _IDENTIFIER_PATTERN = re.compile(r'^(?!\d)(\w+)$', re.U) def is_identifier(s): return bool(_IDENTIFIER_PATTERN.match(s)) class DictWrapper(object): def __init__(self, config, data): self.config = config self._data = data def get(self, key, default=None): try: return self[key] except KeyNotFoundError: return default def __getitem__(self, key): if not isinstance(key, basestring): raise ConfigError('string required, but found %r' % key) data = self._data config = self.config if key in data or is_identifier(key): try: result = config._evaluated(data[key]) except KeyError: raise KeyNotFoundError('not found in configuration: %s' % key) else: path = _parse_path(key) result = config._get_from_path(path) # data[key] = result return result def __len__(self): # pragma: no cover return len(self._data) def __contains__(self, key): return key in self._data def __or__(self, other): assert isinstance(other, type(self)) data = self.as_dict() _merge_dicts(data, other.as_dict()) return type(self)(self.config, data) __add__ = __or__ def __sub__(self, other): assert isinstance(other, type(self)) data = dict(self._data) for k in other._data: data.pop(k, None) return type(self)(self.config, data) def as_dict(self): result = {} for k, v in self._data.items(): v = self[k] if isinstance(v, (DictWrapper, Config)): v = v.as_dict() elif isinstance(v, ListWrapper): v = v.as_list() result[k] = v return result def __repr__(self): s = str(', '.join(self._data.keys())) if len(s) > 60: s = s[:57] + '...' return '%s(%s)' % (self.__class__.__name__, s) class ListWrapper(object): def __init__(self, config, data): self.config = config self._data = data def __len__(self): return len(self._data) def __getitem__(self, index): assert isinstance(index, int) # slices handled in Evaluator result = self._data[index] evaluated = self.config._evaluated(result) if evaluated is not result: self._data[index] = result = evaluated return result def __add__(self, other): assert isinstance(other, type(self)) return type(self)(self.config, self.as_list() + other.as_list()) def as_list(self): result = [] for item in self: if isinstance(item, (DictWrapper, Config)): item = item.as_dict() elif isinstance(item, ListWrapper): item = item.as_list() result.append(item) return result def __repr__(self): s = str(self.as_list()) if len(s) > 60: s = s[:57] + '...' return '%s(%s)' % (self.__class__.__name__, s) _ISO_DATETIME_PATTERN = re.compile( r'\d{4}-\d{2}-\d{2}(([ T])' r'((?P