You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Enso-Bot/venv/Lib/site-packages/aiohttp/web_urldispatcher.py

1136 lines
38 KiB
Python

4 years ago
import abc
import asyncio
import base64
import hashlib
4 years ago
import inspect
import keyword
import os
import re
import warnings
from contextlib import contextmanager
from functools import wraps
4 years ago
from pathlib import Path
from types import MappingProxyType
from typing import ( # noqa
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Container,
Dict,
Generator,
Iterable,
Iterator,
List,
Mapping,
Optional,
Set,
Sized,
Tuple,
Type,
Union,
cast,
)
from yarl import URL
4 years ago
from . import hdrs
from .abc import AbstractMatchInfo, AbstractRouter, AbstractView
from .helpers import DEBUG
from .http import HttpVersion11
from .typedefs import PathLike
from .web_exceptions import (
HTTPException,
HTTPExpectationFailed,
HTTPForbidden,
HTTPMethodNotAllowed,
HTTPNotFound,
)
from .web_fileresponse import FileResponse
from .web_request import Request
from .web_response import Response, StreamResponse
from .web_routedef import AbstractRouteDef
4 years ago
__all__ = ('UrlDispatcher', 'UrlMappingMatchInfo',
'AbstractResource', 'Resource', 'PlainResource', 'DynamicResource',
'AbstractRoute', 'ResourceRoute',
'StaticResource', 'View')
4 years ago
if TYPE_CHECKING: # pragma: no cover
from .web_app import Application # noqa
BaseDict = Dict[str, str]
else:
BaseDict = dict
4 years ago
HTTP_METHOD_RE = re.compile(r"^[0-9A-Za-z!#\$%&'\*\+\-\.\^_`\|~]+$")
ROUTE_RE = re.compile(r'(\{[_a-zA-Z][^{}]*(?:\{[^{}]*\}[^{}]*)*\})')
PATH_SEP = re.escape('/')
4 years ago
_WebHandler = Callable[[Request], Awaitable[StreamResponse]]
_ExpectHandler = Callable[[Request], Awaitable[None]]
_Resolve = Tuple[Optional[AbstractMatchInfo], Set[str]]
4 years ago
class AbstractResource(Sized, Iterable['AbstractRoute']):
def __init__(self, *, name: Optional[str]=None) -> None:
4 years ago
self._name = name
@property
def name(self) -> Optional[str]:
4 years ago
return self._name
@property
@abc.abstractmethod
def canonical(self) -> str:
"""Exposes the resource's canonical path.
For example '/foo/bar/{name}'
"""
4 years ago
@abc.abstractmethod # pragma: no branch
def url_for(self, **kwargs: str) -> URL:
4 years ago
"""Construct url for resource with additional params."""
@abc.abstractmethod # pragma: no branch
async def resolve(self, request: Request) -> _Resolve:
4 years ago
"""Resolve resource
Return (UrlMappingMatchInfo, allowed_methods) pair."""
@abc.abstractmethod
def add_prefix(self, prefix: str) -> None:
"""Add a prefix to processed URLs.
Required for subapplications support.
"""
@abc.abstractmethod
def get_info(self) -> Dict[str, Any]:
4 years ago
"""Return a dict with additional info useful for introspection"""
def freeze(self) -> None:
pass
@abc.abstractmethod
def raw_match(self, path: str) -> bool:
"""Perform a raw match against path"""
4 years ago
class AbstractRoute(abc.ABC):
def __init__(self, method: str,
handler: Union[_WebHandler, Type[AbstractView]], *,
expect_handler: _ExpectHandler=None,
resource: AbstractResource=None) -> None:
4 years ago
if expect_handler is None:
expect_handler = _default_expect_handler
4 years ago
assert asyncio.iscoroutinefunction(expect_handler), \
'Coroutine is expected, got {!r}'.format(expect_handler)
method = method.upper()
if not HTTP_METHOD_RE.match(method):
raise ValueError("{} is not allowed HTTP method".format(method))
assert callable(handler), handler
if asyncio.iscoroutinefunction(handler):
pass
elif inspect.isgeneratorfunction(handler):
warnings.warn("Bare generators are deprecated, "
"use @coroutine wrapper", DeprecationWarning)
elif (isinstance(handler, type) and
issubclass(handler, AbstractView)):
pass
else:
warnings.warn("Bare functions are deprecated, "
"use async ones", DeprecationWarning)
@wraps(handler)
async def handler_wrapper(request: Request) -> StreamResponse:
result = old_handler(request)
4 years ago
if asyncio.iscoroutine(result):
return await result
return result # type: ignore
4 years ago
old_handler = handler
handler = handler_wrapper
self._method = method
self._handler = handler
self._expect_handler = expect_handler
self._resource = resource
@property
def method(self) -> str:
4 years ago
return self._method
@property
def handler(self) -> _WebHandler:
4 years ago
return self._handler
@property
@abc.abstractmethod
def name(self) -> Optional[str]:
4 years ago
"""Optional route's name, always equals to resource's name."""
@property
def resource(self) -> Optional[AbstractResource]:
4 years ago
return self._resource
@abc.abstractmethod
def get_info(self) -> Dict[str, Any]:
4 years ago
"""Return a dict with additional info useful for introspection"""
@abc.abstractmethod # pragma: no branch
def url_for(self, *args: str, **kwargs: str) -> URL:
4 years ago
"""Construct url for route with additional params."""
async def handle_expect_header(self, request: Request) -> None:
await self._expect_handler(request)
4 years ago
class UrlMappingMatchInfo(BaseDict, AbstractMatchInfo):
4 years ago
def __init__(self, match_dict: Dict[str, str], route: AbstractRoute):
4 years ago
super().__init__(match_dict)
self._route = route
self._apps = [] # type: List[Application]
self._current_app = None # type: Optional[Application]
self._frozen = False
4 years ago
@property
def handler(self) -> _WebHandler:
4 years ago
return self._route.handler
@property
def route(self) -> AbstractRoute:
4 years ago
return self._route
@property
def expect_handler(self) -> _ExpectHandler:
4 years ago
return self._route.handle_expect_header
@property
def http_exception(self) -> Optional[HTTPException]:
4 years ago
return None
def get_info(self) -> Dict[str, str]:
4 years ago
return self._route.get_info()
@property
def apps(self) -> Tuple['Application', ...]:
return tuple(self._apps)
def add_app(self, app: 'Application') -> None:
if self._frozen:
raise RuntimeError("Cannot change apps stack after .freeze() call")
if self._current_app is None:
self._current_app = app
self._apps.insert(0, app)
@property
def current_app(self) -> 'Application':
app = self._current_app
assert app is not None
return app
@contextmanager
def set_current_app(self,
app: 'Application') -> Generator[None, None, None]:
if DEBUG: # pragma: no cover
if app not in self._apps:
raise RuntimeError(
"Expected one of the following apps {!r}, got {!r}"
.format(self._apps, app))
prev = self._current_app
self._current_app = app
try:
yield
finally:
self._current_app = prev
def freeze(self) -> None:
self._frozen = True
def __repr__(self) -> str:
4 years ago
return "<MatchInfo {}: {}>".format(super().__repr__(), self._route)
class MatchInfoError(UrlMappingMatchInfo):
def __init__(self, http_exception: HTTPException) -> None:
4 years ago
self._exception = http_exception
super().__init__({}, SystemRoute(self._exception))
@property
def http_exception(self) -> HTTPException:
4 years ago
return self._exception
def __repr__(self) -> str:
4 years ago
return "<MatchInfoError {}: {}>".format(self._exception.status,
self._exception.reason)
async def _default_expect_handler(request: Request) -> None:
4 years ago
"""Default handler for Expect header.
Just send "100 Continue" to client.
raise HTTPExpectationFailed if value of header is not "100-continue"
"""
expect = request.headers.get(hdrs.EXPECT)
if request.version == HttpVersion11:
if expect.lower() == "100-continue":
await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n")
4 years ago
else:
raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
class Resource(AbstractResource):
def __init__(self, *, name: Optional[str]=None) -> None:
4 years ago
super().__init__(name=name)
self._routes = [] # type: List[ResourceRoute]
4 years ago
def add_route(self, method: str,
handler: Union[Type[AbstractView], _WebHandler], *,
expect_handler: Optional[_ExpectHandler]=None
) -> 'ResourceRoute':
4 years ago
for route_obj in self._routes:
if route_obj.method == method or route_obj.method == hdrs.METH_ANY:
4 years ago
raise RuntimeError("Added route will never be executed, "
"method {route.method} is already "
"registered".format(route=route_obj))
4 years ago
route_obj = ResourceRoute(method, handler, self,
expect_handler=expect_handler)
self.register_route(route_obj)
return route_obj
4 years ago
def register_route(self, route: 'ResourceRoute') -> None:
4 years ago
assert isinstance(route, ResourceRoute), \
'Instance of Route class is required, got {!r}'.format(route)
self._routes.append(route)
async def resolve(self, request: Request) -> _Resolve:
allowed_methods = set() # type: Set[str]
4 years ago
match_dict = self._match(request.rel_url.raw_path)
4 years ago
if match_dict is None:
return None, allowed_methods
for route_obj in self._routes:
route_method = route_obj.method
4 years ago
allowed_methods.add(route_method)
if (route_method == request.method or
route_method == hdrs.METH_ANY):
return (UrlMappingMatchInfo(match_dict, route_obj),
allowed_methods)
4 years ago
else:
return None, allowed_methods
@abc.abstractmethod
def _match(self, path: str) -> Optional[Dict[str, str]]:
pass # pragma: no cover
def __len__(self) -> int:
4 years ago
return len(self._routes)
def __iter__(self) -> Iterator[AbstractRoute]:
4 years ago
return iter(self._routes)
# TODO: implement all abstract methods
4 years ago
class PlainResource(Resource):
def __init__(self, path: str, *, name: Optional[str]=None) -> None:
4 years ago
super().__init__(name=name)
assert not path or path.startswith('/')
4 years ago
self._path = path
@property
def canonical(self) -> str:
return self._path
def freeze(self) -> None:
if not self._path:
self._path = '/'
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith('/')
assert not prefix.endswith('/')
assert len(prefix) > 1
self._path = prefix + self._path
def _match(self, path: str) -> Optional[Dict[str, str]]:
4 years ago
# string comparison is about 10 times faster than regexp matching
if self._path == path:
return {}
else:
return None
def raw_match(self, path: str) -> bool:
return self._path == path
def get_info(self) -> Dict[str, Any]:
4 years ago
return {'path': self._path}
def url_for(self) -> URL: # type: ignore
return URL.build(path=self._path, encoded=True)
4 years ago
def __repr__(self) -> str:
4 years ago
name = "'" + self.name + "' " if self.name is not None else ""
return "<PlainResource {name} {path}>".format(name=name,
path=self._path)
4 years ago
class DynamicResource(Resource):
DYN = re.compile(r'\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*)\}')
DYN_WITH_RE = re.compile(
r'\{(?P<var>[_a-zA-Z][_a-zA-Z0-9]*):(?P<re>.+)\}')
GOOD = r'[^{}/]+'
def __init__(self, path: str, *, name: Optional[str]=None) -> None:
4 years ago
super().__init__(name=name)
pattern = ''
formatter = ''
for part in ROUTE_RE.split(path):
match = self.DYN.fullmatch(part)
if match:
pattern += '(?P<{}>{})'.format(match.group('var'), self.GOOD)
formatter += '{' + match.group('var') + '}'
continue
match = self.DYN_WITH_RE.fullmatch(part)
if match:
pattern += '(?P<{var}>{re})'.format(**match.groupdict())
formatter += '{' + match.group('var') + '}'
continue
if '{' in part or '}' in part:
raise ValueError("Invalid path '{}'['{}']".format(path, part))
path = URL.build(path=part).raw_path
formatter += path
pattern += re.escape(path)
try:
compiled = re.compile(pattern)
except re.error as exc:
raise ValueError(
"Bad pattern '{}': {}".format(pattern, exc)) from None
assert compiled.pattern.startswith(PATH_SEP)
assert formatter.startswith('/')
self._pattern = compiled
4 years ago
self._formatter = formatter
@property
def canonical(self) -> str:
return self._formatter
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith('/')
assert not prefix.endswith('/')
assert len(prefix) > 1
self._pattern = re.compile(re.escape(prefix)+self._pattern.pattern)
self._formatter = prefix + self._formatter
def _match(self, path: str) -> Optional[Dict[str, str]]:
match = self._pattern.fullmatch(path)
4 years ago
if match is None:
return None
else:
return {key: URL.build(path=value, encoded=True).path
for key, value in match.groupdict().items()}
4 years ago
def raw_match(self, path: str) -> bool:
return self._formatter == path
def get_info(self) -> Dict[str, Any]:
4 years ago
return {'formatter': self._formatter,
'pattern': self._pattern}
def url_for(self, **parts: str) -> URL:
url = self._formatter.format_map({k: URL.build(path=v).raw_path
for k, v in parts.items()})
return URL.build(path=url)
4 years ago
def __repr__(self) -> str:
4 years ago
name = "'" + self.name + "' " if self.name is not None else ""
return ("<DynamicResource {name} {formatter}>"
4 years ago
.format(name=name, formatter=self._formatter))
class PrefixResource(AbstractResource):
def __init__(self, prefix: str, *, name: Optional[str]=None) -> None:
assert not prefix or prefix.startswith('/'), prefix
assert prefix in ('', '/') or not prefix.endswith('/'), prefix
super().__init__(name=name)
self._prefix = URL.build(path=prefix).raw_path
@property
def canonical(self) -> str:
return self._prefix
def add_prefix(self, prefix: str) -> None:
assert prefix.startswith('/')
assert not prefix.endswith('/')
assert len(prefix) > 1
self._prefix = prefix + self._prefix
4 years ago
def raw_match(self, prefix: str) -> bool:
return False
4 years ago
# TODO: impl missing abstract methods
4 years ago
class StaticResource(PrefixResource):
VERSION_KEY = 'v'
4 years ago
def __init__(self, prefix: str, directory: PathLike,
*, name: Optional[str]=None,
expect_handler: Optional[_ExpectHandler]=None,
chunk_size: int=256 * 1024,
show_index: bool=False, follow_symlinks: bool=False,
append_version: bool=False) -> None:
super().__init__(prefix, name=name)
4 years ago
try:
directory = Path(directory)
if str(directory).startswith('~'):
directory = Path(os.path.expanduser(str(directory)))
directory = directory.resolve()
if not directory.is_dir():
raise ValueError('Not a directory')
except (FileNotFoundError, ValueError) as error:
raise ValueError(
"No directory exists at '{}'".format(directory)) from error
self._directory = directory
self._show_index = show_index
self._chunk_size = chunk_size
self._follow_symlinks = follow_symlinks
self._expect_handler = expect_handler
self._append_version = append_version
4 years ago
self._routes = {'GET': ResourceRoute('GET', self._handle, self,
expect_handler=expect_handler),
4 years ago
'HEAD': ResourceRoute('HEAD', self._handle, self,
expect_handler=expect_handler)}
def url_for(self, *, filename: Union[str, Path], # type: ignore
append_version: Optional[bool]=None) -> URL:
if append_version is None:
append_version = self._append_version
4 years ago
if isinstance(filename, Path):
filename = str(filename)
while filename.startswith('/'):
filename = filename[1:]
filename = '/' + filename
# filename is not encoded
url = URL.build(path=self._prefix + filename)
if append_version:
try:
if filename.startswith('/'):
filename = filename[1:]
filepath = self._directory.joinpath(filename).resolve()
if not self._follow_symlinks:
filepath.relative_to(self._directory)
except (ValueError, FileNotFoundError):
# ValueError for case when path point to symlink
# with follow_symlinks is False
return url # relatively safe
if filepath.is_file():
# TODO cache file content
# with file watcher for cache invalidation
with open(str(filepath), mode='rb') as f:
file_bytes = f.read()
h = self._get_file_hash(file_bytes)
url = url.with_query({self.VERSION_KEY: h})
return url
return url
4 years ago
@staticmethod
def _get_file_hash(byte_array: bytes) -> str:
m = hashlib.sha256() # todo sha256 can be configurable param
m.update(byte_array)
b64 = base64.urlsafe_b64encode(m.digest())
return b64.decode('ascii')
def get_info(self) -> Dict[str, Any]:
4 years ago
return {'directory': self._directory,
'prefix': self._prefix}
def set_options_route(self, handler: _WebHandler) -> None:
if 'OPTIONS' in self._routes:
raise RuntimeError('OPTIONS route was set already')
self._routes['OPTIONS'] = ResourceRoute(
'OPTIONS', handler, self,
expect_handler=self._expect_handler)
async def resolve(self, request: Request) -> _Resolve:
path = request.rel_url.raw_path
method = request.method
allowed_methods = set(self._routes)
if not path.startswith(self._prefix):
return None, set()
if method not in allowed_methods:
return None, allowed_methods
match_dict = {'filename': URL.build(path=path[len(self._prefix)+1:],
encoded=True).path}
return (UrlMappingMatchInfo(match_dict, self._routes[method]),
allowed_methods)
def __len__(self) -> int:
return len(self._routes)
def __iter__(self) -> Iterator[AbstractRoute]:
return iter(self._routes.values())
async def _handle(self, request: Request) -> StreamResponse:
rel_url = request.match_info['filename']
4 years ago
try:
filename = Path(rel_url)
if filename.anchor:
# rel_url is an absolute name like
# /static/\\machine_name\c$ or /static/D:\path
# where the static dir is totally different
raise HTTPForbidden()
4 years ago
filepath = self._directory.joinpath(filename).resolve()
if not self._follow_symlinks:
filepath.relative_to(self._directory)
4 years ago
except (ValueError, FileNotFoundError) as error:
# relatively safe
raise HTTPNotFound() from error
except HTTPForbidden:
raise
4 years ago
except Exception as error:
# perm error or other kind!
request.app.logger.exception(error)
raise HTTPNotFound() from error
# on opening a dir, load its contents if allowed
4 years ago
if filepath.is_dir():
if self._show_index:
try:
return Response(text=self._directory_as_html(filepath),
content_type="text/html")
4 years ago
except PermissionError:
raise HTTPForbidden()
else:
raise HTTPForbidden()
elif filepath.is_file():
return FileResponse(filepath, chunk_size=self._chunk_size)
4 years ago
else:
raise HTTPNotFound
def _directory_as_html(self, filepath: Path) -> str:
# returns directory's index as html
4 years ago
# sanity check
assert filepath.is_dir()
relative_path_to_dir = filepath.relative_to(self._directory).as_posix()
4 years ago
index_of = "Index of /{}".format(relative_path_to_dir)
h1 = "<h1>{}</h1>".format(index_of)
index_list = []
dir_index = filepath.iterdir()
for _file in sorted(dir_index):
# show file url as relative to static path
rel_path = _file.relative_to(self._directory).as_posix()
file_url = self._prefix + '/' + rel_path
4 years ago
# if file is a directory, add '/' to the end of the name
if _file.is_dir():
file_name = "{}/".format(_file.name)
else:
file_name = _file.name
index_list.append(
'<li><a href="{url}">{name}</a></li>'.format(url=file_url,
name=file_name)
)
ul = "<ul>\n{}\n</ul>".format('\n'.join(index_list))
body = "<body>\n{}\n{}\n</body>".format(h1, ul)
head_str = "<head>\n<title>{}</title>\n</head>".format(index_of)
html = "<html>\n{}\n{}\n</html>".format(head_str, body)
4 years ago
return html
def __repr__(self) -> str:
name = "'" + self.name + "'" if self.name is not None else ""
return "<StaticResource {name} {path} -> {directory!r}>".format(
name=name, path=self._prefix, directory=self._directory)
class PrefixedSubAppResource(PrefixResource):
def __init__(self, prefix: str, app: 'Application') -> None:
super().__init__(prefix)
self._app = app
for resource in app.router.resources():
resource.add_prefix(prefix)
def add_prefix(self, prefix: str) -> None:
super().add_prefix(prefix)
for resource in self._app.router.resources():
resource.add_prefix(prefix)
4 years ago
def url_for(self, *args: str, **kwargs: str) -> URL:
raise RuntimeError(".url_for() is not supported "
"by sub-application root")
4 years ago
def get_info(self) -> Dict[str, Any]:
return {'app': self._app,
'prefix': self._prefix}
async def resolve(self, request: Request) -> _Resolve:
if not request.url.raw_path.startswith(self._prefix + '/') and \
request.url.raw_path != self._prefix:
return None, set()
match_info = await self._app.router.resolve(request)
match_info.add_app(self._app)
if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
methods = match_info.http_exception.allowed_methods
else:
methods = set()
return match_info, methods
def __len__(self) -> int:
return len(self._app.router.routes())
def __iter__(self) -> Iterator[AbstractRoute]:
return iter(self._app.router.routes())
def __repr__(self) -> str:
return "<PrefixedSubAppResource {prefix} -> {app!r}>".format(
prefix=self._prefix, app=self._app)
class AbstractRuleMatching(abc.ABC):
@abc.abstractmethod # pragma: no branch
async def match(self, request: Request) -> bool:
"""Return bool if the request satisfies the criteria"""
@abc.abstractmethod # pragma: no branch
def get_info(self) -> Dict[str, Any]:
"""Return a dict with additional info useful for introspection"""
@property
@abc.abstractmethod # pragma: no branch
def canonical(self) -> str:
"""Return a str"""
class Domain(AbstractRuleMatching):
re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)")
def __init__(self, domain: str) -> None:
super().__init__()
self._domain = self.validation(domain)
@property
def canonical(self) -> str:
return self._domain
def validation(self, domain: str) -> str:
if not isinstance(domain, str):
raise TypeError("Domain must be str")
domain = domain.rstrip('.').lower()
if not domain:
raise ValueError("Domain cannot be empty")
elif '://' in domain:
raise ValueError("Scheme not supported")
url = URL('http://' + domain)
if not all(
self.re_part.fullmatch(x)
for x in url.raw_host.split(".")): # type: ignore
raise ValueError("Domain not valid")
if url.port == 80:
return url.raw_host # type: ignore
return '{}:{}'.format(url.raw_host, url.port)
async def match(self, request: Request) -> bool:
host = request.headers.get(hdrs.HOST)
return host and self.match_domain(host)
def match_domain(self, host: str) -> bool:
return host.lower() == self._domain
def get_info(self) -> Dict[str, Any]:
return {'domain': self._domain}
class MaskDomain(Domain):
re_part = re.compile(r"(?!-)[a-z\d\*-]{1,63}(?<!-)")
def __init__(self, domain: str) -> None:
super().__init__(domain)
mask = self._domain.replace('.', r'\.').replace('*', '.*')
self._mask = re.compile(mask)
@property
def canonical(self) -> str:
return self._mask.pattern
def match_domain(self, host: str) -> bool:
return self._mask.fullmatch(host) is not None
class MatchedSubAppResource(PrefixedSubAppResource):
def __init__(self, rule: AbstractRuleMatching, app: 'Application') -> None:
AbstractResource.__init__(self)
self._prefix = ''
self._app = app
self._rule = rule
@property
def canonical(self) -> str:
return self._rule.canonical
def get_info(self) -> Dict[str, Any]:
return {'app': self._app,
'rule': self._rule}
async def resolve(self, request: Request) -> _Resolve:
if not await self._rule.match(request):
return None, set()
match_info = await self._app.router.resolve(request)
match_info.add_app(self._app)
if isinstance(match_info.http_exception, HTTPMethodNotAllowed):
methods = match_info.http_exception.allowed_methods
else:
methods = set()
return match_info, methods
4 years ago
def __repr__(self) -> str:
return "<MatchedSubAppResource -> {app!r}>" \
"".format(app=self._app)
class ResourceRoute(AbstractRoute):
"""A route with resource"""
def __init__(self, method: str,
handler: Union[_WebHandler, Type[AbstractView]],
resource: AbstractResource, *,
expect_handler: Optional[_ExpectHandler]=None) -> None:
super().__init__(method, handler, expect_handler=expect_handler,
resource=resource)
def __repr__(self) -> str:
return "<ResourceRoute [{method}] {resource} -> {handler!r}".format(
method=self.method, resource=self._resource,
handler=self.handler)
@property
def name(self) -> Optional[str]:
return self._resource.name # type: ignore
def url_for(self, *args: str, **kwargs: str) -> URL:
"""Construct url for route with additional params."""
return self._resource.url_for(*args, **kwargs) # type: ignore
def get_info(self) -> Dict[str, Any]:
return self._resource.get_info() # type: ignore
class SystemRoute(AbstractRoute):
def __init__(self, http_exception: HTTPException) -> None:
super().__init__(hdrs.METH_ANY, self._handle)
4 years ago
self._http_exception = http_exception
def url_for(self, *args: str, **kwargs: str) -> URL:
raise RuntimeError(".url_for() is not allowed for SystemRoute")
4 years ago
@property
def name(self) -> Optional[str]:
4 years ago
return None
def get_info(self) -> Dict[str, Any]:
4 years ago
return {'http_exception': self._http_exception}
async def _handle(self, request: Request) -> StreamResponse:
4 years ago
raise self._http_exception
@property
def status(self) -> int:
4 years ago
return self._http_exception.status
@property
def reason(self) -> str:
4 years ago
return self._http_exception.reason
def __repr__(self) -> str:
4 years ago
return "<SystemRoute {self.status}: {self.reason}>".format(self=self)
class View(AbstractView):
async def _iter(self) -> StreamResponse:
4 years ago
if self.request.method not in hdrs.METH_ALL:
self._raise_allowed_methods()
method = getattr(self, self.request.method.lower(), None)
if method is None:
self._raise_allowed_methods()
resp = await method()
4 years ago
return resp
def __await__(self) -> Generator[Any, None, StreamResponse]:
return self._iter().__await__()
4 years ago
def _raise_allowed_methods(self) -> None:
4 years ago
allowed_methods = {
m for m in hdrs.METH_ALL if hasattr(self, m.lower())}
raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
class ResourcesView(Sized,
Iterable[AbstractResource],
Container[AbstractResource]):
4 years ago
def __init__(self, resources: List[AbstractResource]) -> None:
4 years ago
self._resources = resources
def __len__(self) -> int:
4 years ago
return len(self._resources)
def __iter__(self) -> Iterator[AbstractResource]:
4 years ago
yield from self._resources
def __contains__(self, resource: object) -> bool:
4 years ago
return resource in self._resources
class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]):
4 years ago
def __init__(self, resources: List[AbstractResource]):
self._routes = [] # type: List[AbstractRoute]
4 years ago
for resource in resources:
for route in resource:
self._routes.append(route)
def __len__(self) -> int:
4 years ago
return len(self._routes)
def __iter__(self) -> Iterator[AbstractRoute]:
4 years ago
yield from self._routes
def __contains__(self, route: object) -> bool:
4 years ago
return route in self._routes
class UrlDispatcher(AbstractRouter, Mapping[str, AbstractResource]):
4 years ago
NAME_SPLIT_RE = re.compile(r'[.:-]')
4 years ago
def __init__(self) -> None:
4 years ago
super().__init__()
self._resources = [] # type: List[AbstractResource]
self._named_resources = {} # type: Dict[str, AbstractResource]
4 years ago
async def resolve(self, request: Request) -> AbstractMatchInfo:
4 years ago
method = request.method
allowed_methods = set() # type: Set[str]
4 years ago
for resource in self._resources:
match_dict, allowed = await resource.resolve(request)
4 years ago
if match_dict is not None:
return match_dict
else:
allowed_methods |= allowed
else:
if allowed_methods:
return MatchInfoError(HTTPMethodNotAllowed(method,
allowed_methods))
else:
return MatchInfoError(HTTPNotFound())
def __iter__(self) -> Iterator[str]:
4 years ago
return iter(self._named_resources)
def __len__(self) -> int:
4 years ago
return len(self._named_resources)
def __contains__(self, resource: object) -> bool:
return resource in self._named_resources
4 years ago
def __getitem__(self, name: str) -> AbstractResource:
4 years ago
return self._named_resources[name]
def resources(self) -> ResourcesView:
4 years ago
return ResourcesView(self._resources)
def routes(self) -> RoutesView:
4 years ago
return RoutesView(self._resources)
def named_resources(self) -> Mapping[str, AbstractResource]:
4 years ago
return MappingProxyType(self._named_resources)
def register_resource(self, resource: AbstractResource) -> None:
4 years ago
assert isinstance(resource, AbstractResource), \
'Instance of AbstractResource class is required, got {!r}'.format(
resource)
if self.frozen:
raise RuntimeError(
"Cannot register a resource into frozen router.")
4 years ago
name = resource.name
if name is not None:
parts = self.NAME_SPLIT_RE.split(name)
for part in parts:
if not part.isidentifier() or keyword.iskeyword(part):
raise ValueError('Incorrect route name {!r}, '
'the name should be a sequence of '
'python identifiers separated '
'by dash, dot or column'.format(name))
if name in self._named_resources:
raise ValueError('Duplicate {!r}, '
'already handled by {!r}'
.format(name, self._named_resources[name]))
self._named_resources[name] = resource
self._resources.append(resource)
def add_resource(self, path: str, *,
name: Optional[str]=None) -> Resource:
if path and not path.startswith('/'):
raise ValueError("path should be started with / or be empty")
# Reuse last added resource if path and name are the same
if self._resources:
resource = self._resources[-1]
if resource.name == name and resource.raw_match(path):
return cast(Resource, resource)
if not ('{' in path or '}' in path or ROUTE_RE.search(path)):
url = URL.build(path=path)
resource = PlainResource(url.raw_path, name=name)
self.register_resource(resource)
4 years ago
return resource
resource = DynamicResource(path, name=name)
self.register_resource(resource)
4 years ago
return resource
def add_route(self, method: str, path: str,
handler: Union[_WebHandler, Type[AbstractView]],
*, name: Optional[str]=None,
expect_handler: Optional[_ExpectHandler]=None
) -> AbstractRoute:
4 years ago
resource = self.add_resource(path, name=name)
return resource.add_route(method, handler,
expect_handler=expect_handler)
def add_static(self, prefix: str, path: PathLike, *,
name: Optional[str]=None,
expect_handler: Optional[_ExpectHandler]=None,
chunk_size: int=256 * 1024,
show_index: bool=False, follow_symlinks: bool=False,
append_version: bool=False) -> AbstractResource:
4 years ago
"""Add static files view.
prefix - url prefix
path - folder with files
"""
assert prefix.startswith('/')
if prefix.endswith('/'):
prefix = prefix[:-1]
resource = StaticResource(prefix, path,
name=name,
expect_handler=expect_handler,
chunk_size=chunk_size,
show_index=show_index,
follow_symlinks=follow_symlinks,
append_version=append_version)
self.register_resource(resource)
return resource
def add_head(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method HEAD
"""
return self.add_route(hdrs.METH_HEAD, path, handler, **kwargs)
4 years ago
def add_options(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method OPTIONS
4 years ago
"""
return self.add_route(hdrs.METH_OPTIONS, path, handler, **kwargs)
4 years ago
def add_get(self, path: str, handler: _WebHandler, *,
name: Optional[str]=None, allow_head: bool=True,
**kwargs: Any) -> AbstractRoute:
"""
Shortcut for add_route with method GET, if allow_head is true another
route is added allowing head requests to the same endpoint
"""
resource = self.add_resource(path, name=name)
if allow_head:
resource.add_route(hdrs.METH_HEAD, handler, **kwargs)
return resource.add_route(hdrs.METH_GET, handler, **kwargs)
def add_post(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method POST
"""
return self.add_route(hdrs.METH_POST, path, handler, **kwargs)
4 years ago
def add_put(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method PUT
"""
return self.add_route(hdrs.METH_PUT, path, handler, **kwargs)
4 years ago
def add_patch(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method PATCH
"""
return self.add_route(hdrs.METH_PATCH, path, handler, **kwargs)
4 years ago
def add_delete(self, path: str, handler: _WebHandler,
**kwargs: Any) -> AbstractRoute:
4 years ago
"""
Shortcut for add_route with method DELETE
"""
return self.add_route(hdrs.METH_DELETE, path, handler, **kwargs)
def add_view(self, path: str, handler: Type[AbstractView],
**kwargs: Any) -> AbstractRoute:
"""
Shortcut for add_route with ANY methods for a class-based view
"""
return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
def freeze(self) -> None:
super().freeze()
for resource in self._resources:
resource.freeze()
def add_routes(self, routes: Iterable[AbstractRouteDef]) -> None:
"""Append routes to route table.
Parameter should be a sequence of RouteDef objects.
"""
for route_def in routes:
route_def.register(self)