# This code is originally sourced from the aio-libs project "async_timeout",
# under the Apache 2.0 license. You may see the original project at
# https://github.com/aio-libs/async-timeout

# It is vendored here to reduce chain-dependencies on this library, and
# modified slightly to remove some features we don't use.


import asyncio
import sys
from types import TracebackType
from typing import Any, Optional, Type  # noqa

PY_37 = sys.version_info >= (3, 7)


class timeout:
    """timeout context manager.

    Useful in cases when you want to apply timeout logic around block
    of code or in cases when asyncio.wait_for is not suitable. For example:

    >>> with timeout(0.001):
    ...     async with aiohttp.get('https://github.com') as r:
    ...         await r.text()


    timeout - value in seconds or None to disable timeout logic
    loop - asyncio compatible event loop
    """

    def __init__(
        self,
        timeout: Optional[float],
        *,
        loop: Optional[asyncio.AbstractEventLoop] = None
    ) -> None:
        self._timeout = timeout
        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._task = None  # type: Optional[asyncio.Task[Any]]
        self._cancelled = False
        self._cancel_handler = None  # type: Optional[asyncio.Handle]
        self._cancel_at = None  # type: Optional[float]

    def __enter__(self) -> "timeout":
        return self._do_enter()

    def __exit__(
        self,
        exc_type: Type[BaseException],
        exc_val: BaseException,
        exc_tb: TracebackType,
    ) -> Optional[bool]:
        self._do_exit(exc_type)
        return None

    async def __aenter__(self) -> "timeout":
        return self._do_enter()

    async def __aexit__(
        self,
        exc_type: Type[BaseException],
        exc_val: BaseException,
        exc_tb: TracebackType,
    ) -> None:
        self._do_exit(exc_type)

    @property
    def expired(self) -> bool:
        return self._cancelled

    @property
    def remaining(self) -> Optional[float]:
        if self._cancel_at is not None:
            return max(self._cancel_at - self._loop.time(), 0.0)
        else:
            return None

    def _do_enter(self) -> "timeout":
        # Support Tornado 5- without timeout
        # Details: https://github.com/python/asyncio/issues/392
        if self._timeout is None:
            return self

        self._task = current_task(self._loop)
        if self._task is None:
            raise RuntimeError(
                "Timeout context manager should be used " "inside a task"
            )

        if self._timeout <= 0:
            self._loop.call_soon(self._cancel_task)
            return self

        self._cancel_at = self._loop.time() + self._timeout
        self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
        return self

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
            raise asyncio.TimeoutError
        if self._timeout is not None and self._cancel_handler is not None:
            self._cancel_handler.cancel()
            self._cancel_handler = None
        self._task = None
        return None

    def _cancel_task(self) -> None:
        if self._task is not None:
            self._task.cancel()
            self._cancelled = True


def current_task(loop: asyncio.AbstractEventLoop) -> "asyncio.Task[Any]":
    if PY_37:
        task = asyncio.current_task(loop=loop)  # type: ignore
    else:
        task = asyncio.Task.current_task(loop=loop)
    if task is None:
        # this should be removed, tokio must use register_task and family API
        if hasattr(loop, "current_task"):
            task = loop.current_task()  # type: ignore

    return task