""" PEP 610 """
import json
import re

from pip._vendor import six
from pip._vendor.six.moves.urllib import parse as urllib_parse

from pip._internal.utils.typing import MYPY_CHECK_RUNNING

if MYPY_CHECK_RUNNING:
    from typing import (
        Any, Dict, Iterable, Optional, Type, TypeVar, Union
    )

    T = TypeVar("T")


DIRECT_URL_METADATA_NAME = "direct_url.json"
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")

__all__ = [
    "DirectUrl",
    "DirectUrlValidationError",
    "DirInfo",
    "ArchiveInfo",
    "VcsInfo",
]


class DirectUrlValidationError(Exception):
    pass


def _get(d, expected_type, key, default=None):
    # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
    """Get value from dictionary and verify expected type."""
    if key not in d:
        return default
    value = d[key]
    if six.PY2 and expected_type is str:
        expected_type = six.string_types  # type: ignore
    if not isinstance(value, expected_type):
        raise DirectUrlValidationError(
            "{!r} has unexpected type for {} (expected {})".format(
                value, key, expected_type
            )
        )
    return value


def _get_required(d, expected_type, key, default=None):
    # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
    value = _get(d, expected_type, key, default)
    if value is None:
        raise DirectUrlValidationError("{} must have a value".format(key))
    return value


def _exactly_one_of(infos):
    # type: (Iterable[Optional[InfoType]]) -> InfoType
    infos = [info for info in infos if info is not None]
    if not infos:
        raise DirectUrlValidationError(
            "missing one of archive_info, dir_info, vcs_info"
        )
    if len(infos) > 1:
        raise DirectUrlValidationError(
            "more than one of archive_info, dir_info, vcs_info"
        )
    assert infos[0] is not None
    return infos[0]


def _filter_none(**kwargs):
    # type: (Any) -> Dict[str, Any]
    """Make dict excluding None values."""
    return {k: v for k, v in kwargs.items() if v is not None}


class VcsInfo(object):
    name = "vcs_info"

    def __init__(
        self,
        vcs,  # type: str
        commit_id,  # type: str
        requested_revision=None,  # type: Optional[str]
        resolved_revision=None,  # type: Optional[str]
        resolved_revision_type=None,  # type: Optional[str]
    ):
        self.vcs = vcs
        self.requested_revision = requested_revision
        self.commit_id = commit_id
        self.resolved_revision = resolved_revision
        self.resolved_revision_type = resolved_revision_type

    @classmethod
    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
        if d is None:
            return None
        return cls(
            vcs=_get_required(d, str, "vcs"),
            commit_id=_get_required(d, str, "commit_id"),
            requested_revision=_get(d, str, "requested_revision"),
            resolved_revision=_get(d, str, "resolved_revision"),
            resolved_revision_type=_get(d, str, "resolved_revision_type"),
        )

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(
            vcs=self.vcs,
            requested_revision=self.requested_revision,
            commit_id=self.commit_id,
            resolved_revision=self.resolved_revision,
            resolved_revision_type=self.resolved_revision_type,
        )


class ArchiveInfo(object):
    name = "archive_info"

    def __init__(
        self,
        hash=None,  # type: Optional[str]
    ):
        self.hash = hash

    @classmethod
    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
        if d is None:
            return None
        return cls(hash=_get(d, str, "hash"))

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(hash=self.hash)


class DirInfo(object):
    name = "dir_info"

    def __init__(
        self,
        editable=False,  # type: bool
    ):
        self.editable = editable

    @classmethod
    def _from_dict(cls, d):
        # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
        if d is None:
            return None
        return cls(
            editable=_get_required(d, bool, "editable", default=False)
        )

    def _to_dict(self):
        # type: () -> Dict[str, Any]
        return _filter_none(editable=self.editable or None)


if MYPY_CHECK_RUNNING:
    InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]


class DirectUrl(object):

    def __init__(
        self,
        url,  # type: str
        info,  # type: InfoType
        subdirectory=None,  # type: Optional[str]
    ):
        self.url = url
        self.info = info
        self.subdirectory = subdirectory

    def _remove_auth_from_netloc(self, netloc):
        # type: (str) -> str
        if "@" not in netloc:
            return netloc
        user_pass, netloc_no_user_pass = netloc.split("@", 1)
        if (
            isinstance(self.info, VcsInfo) and
            self.info.vcs == "git" and
            user_pass == "git"
        ):
            return netloc
        if ENV_VAR_RE.match(user_pass):
            return netloc
        return netloc_no_user_pass

    @property
    def redacted_url(self):
        # type: () -> str
        """url with user:password part removed unless it is formed with
        environment variables as specified in PEP 610, or it is ``git``
        in the case of a git URL.
        """
        purl = urllib_parse.urlsplit(self.url)
        netloc = self._remove_auth_from_netloc(purl.netloc)
        surl = urllib_parse.urlunsplit(
            (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
        )
        return surl

    def validate(self):
        # type: () -> None
        self.from_dict(self.to_dict())

    @classmethod
    def from_dict(cls, d):
        # type: (Dict[str, Any]) -> DirectUrl
        return DirectUrl(
            url=_get_required(d, str, "url"),
            subdirectory=_get(d, str, "subdirectory"),
            info=_exactly_one_of(
                [
                    ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
                    DirInfo._from_dict(_get(d, dict, "dir_info")),
                    VcsInfo._from_dict(_get(d, dict, "vcs_info")),
                ]
            ),
        )

    def to_dict(self):
        # type: () -> Dict[str, Any]
        res = _filter_none(
            url=self.redacted_url,
            subdirectory=self.subdirectory,
        )
        res[self.info.name] = self.info._to_dict()
        return res

    @classmethod
    def from_json(cls, s):
        # type: (str) -> DirectUrl
        return cls.from_dict(json.loads(s))

    def to_json(self):
        # type: () -> str
        return json.dumps(self.to_dict(), sort_keys=True)