You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Enso-Bot/venv/Lib/site-packages/config/__init__.py

786 lines
24 KiB
Python

#
# Copyright 2019-2020 by Vinay Sajip. All Rights Reserved.
#
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
import datetime
import importlib
import io
import logging
import os
import re
import sys
from .tokens import Token, SCALAR_TOKENS, WORD, BACKTICK, DOLLAR
from .parser import (
Parser,
MappingBody,
ListBody,
ASTNode,
ODict,
open_file,
RecognizerError,
ParserError,
)
__all__ = ['Config', 'ConfigFormatError', 'ConfigError']
logger = logging.getLogger(__name__)
if sys.version_info[0] < 3:
class timezone(datetime.tzinfo):
def __init__(self, td):
self.td = td
def utcoffset(self, dt):
return self.td
def dst(self, dt): # pragma: no cover
return datetime.timedelta(0)
else:
from datetime import timezone
basestring = str
__version__ = '0.5.0.post0'
class ConfigFormatError(ParserError):
pass
class ConfigError(ValueError):
pass
# This is a marker used in get(key, defaultValue) to catch rather than KeyError
class KeyNotFoundError(ConfigError):
pass
def _parse_path(path):
p = Parser(io.StringIO(path))
try:
p.advance()
if p.token.kind != WORD:
raise ConfigError('Invalid path: %s' % path)
result = p.primary()
if not p.at_end:
raise ConfigError('Invalid path: %s' % path)
except RecognizerError as e:
raise ConfigError('Invalid path: %s: %s' % (path, e))
return result
def _path_iterator(path):
def visit(node):
if isinstance(node, Token):
yield node
else:
op = node['op']
if 'operand' in node:
for val in visit(node['operand']):
yield val
else:
for val in visit(node['lhs']):
yield val
if op == '.':
yield op, node['rhs'].value
else:
assert op in ('[', ':')
yield op, node['rhs']
for v in visit(path):
yield v
def _to_source(node):
if isinstance(node, Token):
return str(node.value)
pi = _path_iterator(node)
parts = [next(pi).value]
for op, operand in pi:
if op == '.':
parts.append('.')
parts.append(operand)
elif op == ':':
parts.append('[')
start, stop, step = operand
if start is not None:
parts.append(_to_source(start))
parts.append(':')
if stop is not None:
parts.append(_to_source(stop))
if step is not None:
parts.append(':')
parts.append(_to_source(step))
parts.append(']')
elif op == '[':
parts.append('[')
parts.append(_to_source(operand))
parts.append(']')
else: # pragma: no cover
raise ConfigError('Unable to navigate: %s' % node)
return ''.join(parts)
def _unwrap(o):
if isinstance(o, DictWrapper):
result = o.as_dict()
elif isinstance(o, ListWrapper):
result = o.as_list()
else:
result = o
return result
# noinspection PyUnboundLocalVariable
_SYSTEM_TYPES = (basestring, bool, int, float, datetime.datetime, datetime.date)
# noinspection PyTypeChecker
_SCALAR_TYPES = _SYSTEM_TYPES + (Token,)
def _merge_dicts(target, source):
for k, v in source.items():
if k in target and isinstance(target[k], dict) and isinstance(v, Mapping):
_merge_dicts(target[k], v)
else:
target[k] = source[k]
# use negative lookahead to disallow anything starting with a digit.
_IDENTIFIER_PATTERN = re.compile(r'^(?!\d)(\w+)$', re.U)
def is_identifier(s):
return bool(_IDENTIFIER_PATTERN.match(s))
class DictWrapper(object):
def __init__(self, config, data):
self.config = config
self._data = data
def get(self, key, default=None):
try:
return self[key]
except KeyNotFoundError:
return default
def __getitem__(self, key):
if not isinstance(key, basestring):
raise ConfigError('string required, but found %r' % key)
data = self._data
config = self.config
if key in data or is_identifier(key):
try:
result = config._evaluated(data[key])
except KeyError:
raise KeyNotFoundError('not found in configuration: %s' % key)
else:
path = _parse_path(key)
result = config._get_from_path(path)
# data[key] = result
return result
def __len__(self): # pragma: no cover
return len(self._data)
def __contains__(self, key):
return key in self._data
def __or__(self, other):
assert isinstance(other, type(self))
data = self.as_dict()
_merge_dicts(data, other.as_dict())
return type(self)(self.config, data)
__add__ = __or__
def __sub__(self, other):
assert isinstance(other, type(self))
data = dict(self._data)
for k in other._data:
data.pop(k, None)
return type(self)(self.config, data)
def as_dict(self):
result = {}
for k, v in self._data.items():
v = self[k]
if isinstance(v, (DictWrapper, Config)):
v = v.as_dict()
elif isinstance(v, ListWrapper):
v = v.as_list()
result[k] = v
return result
def __repr__(self):
s = str(', '.join(self._data.keys()))
if len(s) > 60:
s = s[:57] + '...'
return '%s(%s)' % (self.__class__.__name__, s)
class ListWrapper(object):
def __init__(self, config, data):
self.config = config
self._data = data
def __len__(self):
return len(self._data)
def __getitem__(self, index):
assert isinstance(index, int) # slices handled in Evaluator
result = self._data[index]
evaluated = self.config._evaluated(result)
if evaluated is not result:
self._data[index] = result = evaluated
return result
def __add__(self, other):
assert isinstance(other, type(self))
return type(self)(self.config, self.as_list() + other.as_list())
def as_list(self):
result = []
for item in self:
if isinstance(item, (DictWrapper, Config)):
item = item.as_dict()
elif isinstance(item, ListWrapper):
item = item.as_list()
result.append(item)
return result
def __repr__(self):
s = str(self.as_list())
if len(s) > 60:
s = s[:57] + '...'
return '%s(%s)' % (self.__class__.__name__, s)
_ISO_DATETIME_PATTERN = re.compile(
r'\d{4}-\d{2}-\d{2}(([ T])'
r'((?P<time>\d{2}:\d{2}:\d{2})'
r'(?P<ms>\.\d{1,6})?'
r'((?P<sign>[+-])(?P<oh>\d{2}):'
r'(?P<om>\d{2})(:(?P<os>\d{2})'
r'(?P<oms>\.\d{1,6})?)?)?))?$'
)
_DOTTED_WORDS = r'[A-Za-z_]\w*(\.[A-Za-z_]\w*)*'
_COLON_OBJECT_PATTERN = re.compile('%s:(%s)?$' % (_DOTTED_WORDS, _DOTTED_WORDS))
_DOTTED_OBJECT_PATTERN = re.compile('%s$' % _DOTTED_WORDS)
_ENV_VALUE_PATTERN = re.compile(r'\$(\w+)(\|(.*))?$')
class Evaluator(object):
"""
This class is used to evaluate AST nodes. A separate class for this (rather
than putting the functionality in Config) because an evaluation context is
sometimes required. For example, resolving references needs to keep track
of references already seen in an evaluation, to catch circular references.
That needs to be done per-evaluation.
"""
op_map = {
'@': 'eval_at',
'$': 'eval_reference',
':': 'eval_slice',
'+': 'eval_add',
'-': 'eval_subtract',
'*': 'eval_multiply',
'**': 'eval_power',
'/': 'eval_divide',
'//': 'eval_integer_divide',
'%': 'eval_modulo',
'<<': 'eval_left_shift',
'>>': 'eval_right_shift',
'|': 'eval_bitor',
'&': 'eval_bitand',
'^': 'eval_bitxor',
'or': 'eval_logor',
'and': 'eval_logand',
}
def __init__(self, config):
self.config = config
self.refs_seen = {}
def evaluate(self, node):
result = node
if isinstance(node, Token):
if node.kind in SCALAR_TOKENS:
result = node.value
elif node.kind == WORD:
try:
result = self.config.context[node.value]
except KeyError:
msg = 'Unknown variable \'%s\' at %s' % (node.value, node.start)
raise ConfigError(msg)
elif node.kind == BACKTICK:
result = self.config.convert_string(node.value)
else: # pragma: no cover
raise NotImplementedError('Unable to evaluate %s' % node)
elif isinstance(node, ASTNode):
op = node['op']
meth = self.op_map[op]
result = getattr(self, meth)(node)
if isinstance(result, (MappingBody, ListBody)):
result = self.config._wrap(result)
return result
def eval_at(self, node):
operand = node['operand']
config = self.config
fn = config._evaluated(operand)
if not isinstance(fn, basestring):
raise ConfigError('@ operand must be a string, but is %s' % fn)
found = False
if os.path.isabs(fn):
if os.path.isfile(fn):
p = fn
found = True
else:
p = os.path.join(config.rootdir, fn)
if os.path.isfile(p):
found = True
else:
for ip in config.include_path:
p = os.path.join(ip, fn)
if os.path.isfile(p):
found = True
break
if not found:
raise ConfigError('Unable to locate %s' % fn)
with open_file(p) as f:
p = Parser(f)
result = p.container()
if isinstance(result, MappingBody):
cfg = Config(
None, context=config.context, cache=self.config._cache is not None
)
cfg._parent = config
cfg._data = cfg._wrap_mapping(result)
result = cfg
return result
def _get_from_path(self, path):
def is_ref(n):
return isinstance(n, ASTNode) and n['op'] == DOLLAR
pi = _path_iterator(path)
first = next(pi)
config = self.config
node = config._data[first.value]
result = self.evaluate(node)
# We start the evaluation with the current instance, but a path may
# cross sub-configuration boundaries, and references must always be
# evaluated in the context of the immediately enclosing configuration,
# not the top-level configuration (references are relative to the
# root of the enclosing configuration - otherwise configurations would
# not be standalone. So whenever we cross a sub-configuration boundary,
# the current_evaluator has to be pegged to that sub-configuration.
current_evaluator = self
for op, operand in pi:
need_string = False
if not isinstance(result, (ListWrapper, DictWrapper, Config)):
container = result
else:
container = result._data
if isinstance(result, Config):
current_evaluator = result._evaluator
need_string = not isinstance(result, ListWrapper)
sliced = False
if isinstance(operand, tuple):
operand = slice(*[current_evaluator.evaluate(item) for item in operand])
sliced = True
if not isinstance(result, ListWrapper):
raise ConfigError('slices can only operate on lists')
elif op != '.':
operand = current_evaluator.evaluate(operand)
if need_string:
if not isinstance(operand, basestring):
raise ConfigError('string required, but found %r' % operand)
elif not isinstance(operand, (int, slice)):
raise ConfigError(
'integer or slice required, but found \'%s\'' % operand
)
try:
v = container[operand] # always use indexing, never attr
except IndexError:
raise ConfigError('index out of range: %s' % operand)
except KeyError:
raise KeyNotFoundError('not found in configuration: %s' % operand)
if is_ref(v):
vid = id(v)
if vid in current_evaluator.refs_seen:
parts = []
for v in current_evaluator.refs_seen.values():
parts.append(
'%s %s' % (_to_source(v), v['operand']['lhs'].start)
)
s = ', '.join(sorted(parts))
raise ConfigError('Circular reference: %s' % s)
current_evaluator.refs_seen[vid] = v
if sliced:
v = ListBody(v) # ListBody gets wrapped, but not list
# v = config._wrap(v)
evaluated = current_evaluator.evaluate(v)
if evaluated is not v:
container[operand] = evaluated
result = evaluated
return result
def eval_reference(self, node):
result = self._get_from_path(node['operand'])
return result
def eval_add(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
result = lhs + rhs
if isinstance(result, list):
result = ListWrapper(lhs.config, result)
return result
except TypeError:
raise ConfigError('Unable to add %s to %s' % (rhs, lhs))
def eval_subtract(self, node):
if 'operand' in node:
# unary
operand = self.evaluate(node['operand'])
try:
return -operand
except TypeError:
raise ConfigError('Unable to negate %s' % operand)
else:
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs - rhs
except TypeError:
raise ConfigError('Unable to subtract %s from %s' % (rhs, lhs))
def eval_multiply(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs * rhs
except TypeError:
raise ConfigError('Unable to multiply %s by %s' % (lhs, rhs))
def eval_divide(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs / rhs
except TypeError:
raise ConfigError('Unable to divide %s by %s' % (lhs, rhs))
def eval_integer_divide(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs // rhs
except TypeError:
raise ConfigError('Unable to integer-divide %s by %s' % (lhs, rhs))
def eval_modulo(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs % rhs
except TypeError:
raise ConfigError('Unable to compute %s modulo %s' % (lhs, rhs))
def eval_power(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs ** rhs
except TypeError:
raise ConfigError('Unable to divide %s by %s' % (lhs, rhs))
def eval_left_shift(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs << rhs
except TypeError:
raise ConfigError('Unable to left-shift %s by %s' % (lhs, rhs))
def eval_right_shift(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs >> rhs
except TypeError:
raise ConfigError('Unable to right-shift %s by %s' % (lhs, rhs))
def eval_bitor(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs | rhs
except TypeError:
raise ConfigError('Unable to bitwise-or %s and %s' % (lhs, rhs))
def eval_bitand(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs & rhs
except TypeError:
raise ConfigError('Unable to bitwise-and %s and %s' % (lhs, rhs))
def eval_bitxor(self, node):
lhs = self.evaluate(node['lhs'])
rhs = self.evaluate(node['rhs'])
try:
return lhs ^ rhs
except TypeError:
raise ConfigError('Unable to bitwise-xor %s and %s' % (lhs, rhs))
def eval_logor(self, node):
lhs = self.evaluate(node['lhs'])
return lhs or self.evaluate(node['rhs'])
def eval_logand(self, node):
lhs = self.evaluate(node['lhs'])
return lhs and self.evaluate(node['rhs'])
def _walk_dotted_path(result, dotted_path):
if isinstance(dotted_path, basestring):
parts = dotted_path.split('.')
else:
parts = dotted_path
for p in parts:
result = getattr(result, p)
return result
def _resolve_dotted_object(s):
parts = s.split('.')
modname = parts.pop(0)
# first part must be a module/package.
result = importlib.import_module(modname)
while parts:
p = parts[0]
s = '%s.%s' % (modname, p)
try:
result = importlib.import_module(s)
parts.pop(0)
modname = s
except ImportError:
result = _walk_dotted_path(result, parts)
break
return result
def _resolve_colon_object(s):
module_name, dotted_path = s.split(':')
if module_name in sys.modules:
mod = sys.modules[module_name]
else:
mod = importlib.import_module(module_name)
if not dotted_path:
result = mod
else:
result = _walk_dotted_path(mod, dotted_path)
return result
def _default_convert_string(s):
result = s
m = _ISO_DATETIME_PATTERN.match(s)
if m:
gd = m.groupdict()
if not gd['time']:
result = datetime.datetime.strptime(m.string, '%Y-%m-%d').date()
else:
s = '%s %s' % (m.string[:10], gd['time'])
result = datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
if gd['ms']:
ms = int(float(gd['ms']) * 1e6)
result = result.replace(microsecond=ms)
if gd['oh']:
oh = int(gd['oh'], 10)
om = int(gd['om'], 10)
osec = oms = 0
if sys.version_info[:2] >= (3, 7):
if gd['os']:
osec = int(gd['os'], 10)
if gd['oms']:
oms = int(float(gd['oms']) * 1e6)
td = datetime.timedelta(
hours=oh, minutes=om, seconds=osec, microseconds=oms
)
if gd['sign'] == '-':
td = -td
tzinfo = timezone(td)
result = result.replace(tzinfo=tzinfo)
else:
m = _ENV_VALUE_PATTERN.match(s)
if m:
key, _, default = m.groups()
result = os.environ.get(key, default)
else:
m = _COLON_OBJECT_PATTERN.match(s)
try:
if m:
result = _resolve_colon_object(s)
else:
m = _DOTTED_OBJECT_PATTERN.match(s)
if m:
result = _resolve_dotted_object(s)
except ImportError:
pass
return result
class Config(object):
def __init__(self, stream_or_path, **kwargs):
self.context = kwargs.get('context', {})
self.include_path = kwargs.get('include_path', [])
self.no_duplicates = kwargs.get('no_duplicates', True)
self.strict_conversions = kwargs.get('strict_conversions', True)
cache = kwargs.get('cache', False)
self.path = kwargs.get('path')
self.rootdir = kwargs.get('rootdir')
self._can_close = False
self._parent = self._data = self._stream = None
self._cache = {} if cache else None
self._string_converter = _default_convert_string
self._evaluator = Evaluator(self)
if stream_or_path:
if isinstance(stream_or_path, basestring):
self.load_file(stream_or_path, kwargs.get('encoding'))
else:
self.load(stream_or_path)
def _wrap_mapping(self, items):
data = ODict()
seen = {}
result = DictWrapper(self, data)
for k, v in items:
key = k.value
if self.no_duplicates:
if key in seen:
msg = 'Duplicate key %s seen at %s ' '(previously at %s)' % (
key,
k.start,
seen[key],
)
raise ConfigError(msg)
seen[key] = k.start
data[key] = v
return result
def _get_from_path(self, path):
# convenience method
evaluator = self._evaluator
evaluator.refs_seen.clear()
return evaluator._get_from_path(path)
def convert_string(self, s):
result = self._string_converter(s)
if result is s and self.strict_conversions:
raise ConfigError('Unable to convert string \'%s\'' % s)
return result
def _wrap(self, o):
if isinstance(o, MappingBody):
result = self._wrap_mapping(o)
elif isinstance(o, ListBody):
result = ListWrapper(self, o)
else:
result = o
return result
def _evaluated(self, v):
return self._evaluator.evaluate(v)
def _get(self, key, default=None):
cached = self._cache is not None
if cached and key in self._cache:
result = self._cache[key]
else:
result = self._data.get(key, default)
if cached:
self._cache[key] = result
return result
def get(self, key, default=None):
return _unwrap(self._get(key, default))
def __getitem__(self, key):
cached = self._cache is not None
if cached and key in self._cache:
result = self._cache[key]
else:
result = self._data[key]
if cached:
self._cache[key] = result
return _unwrap(result)
def __contains__(self, key):
return key in self._data
def as_dict(self):
return self._data.as_dict()
# for compatibility with old code base (and its tests)
def __len__(self):
result = 0
if self._data is not None:
result = len(self._data)
return result
def load(self, stream):
self._stream = stream
path = self.path
if path is None:
path = getattr(stream, 'name', None)
if path is None:
rootdir = os.getcwd()
else:
rootdir = os.path.dirname(os.path.abspath(path))
self.rootdir = rootdir
self.path = path
try:
p = Parser(stream)
items = p.container()
except ParserError as pe:
cfe = ConfigFormatError(*pe.args)
cfe.location = pe.location
raise cfe
if not isinstance(items, MappingBody):
raise ConfigError('Root configuration must be a mapping')
self._data = self._wrap_mapping(items)
if self._cache is not None:
self._cache.clear()
def load_file(self, path, encoding=None):
with io.open(path, encoding=encoding or 'utf-8') as f:
self.load(f)
def close(self):
if self._can_close and self._stream:
self._stream.close()
def __getattr__(self, key):
import warnings
msg = 'Attribute access is deprecated; use indexed access instead.'
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return self._data[key]