147 lines
5.1 KiB
Python

# The following comment should be removed at some point in the future.
# mypy: disallow-untyped-defs=False
from __future__ import absolute_import
import logging
import sys
import textwrap
from collections import OrderedDict
from pip._vendor import pkg_resources
from pip._vendor.packaging.version import parse as parse_version
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import
from pip._vendor.six.moves import xmlrpc_client # type: ignore
from pip._internal.cli.base_command import Command
from pip._internal.cli.req_command import SessionCommandMixin
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.models.index import PyPI
from pip._internal.network.xmlrpc import PipXmlrpcTransport
from pip._internal.utils.compat import get_terminal_size
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import write_output
logger = logging.getLogger(__name__)
class SearchCommand(Command, SessionCommandMixin):
"""Search for PyPI packages whose name or summary contains <query>."""
usage = """
%prog [options] <query>"""
ignore_require_venv = True
def __init__(self, *args, **kw):
super(SearchCommand, self).__init__(*args, **kw)
self.cmd_opts.add_option(
'-i', '--index',
dest='index',
metavar='URL',
default=PyPI.pypi_url,
help='Base URL of Python Package Index (default %default)')
self.parser.insert_option_group(0, self.cmd_opts)
def run(self, options, args):
if not args:
raise CommandError('Missing required argument (search query).')
query = args
pypi_hits = self.search(query, options)
hits = transform_hits(pypi_hits)
terminal_width = None
if sys.stdout.isatty():
terminal_width = get_terminal_size()[0]
print_results(hits, terminal_width=terminal_width)
if pypi_hits:
return SUCCESS
return NO_MATCHES_FOUND
def search(self, query, options):
index_url = options.index
session = self.get_default_session(options)
transport = PipXmlrpcTransport(index_url, session)
pypi = xmlrpc_client.ServerProxy(index_url, transport)
hits = pypi.search({'name': query, 'summary': query}, 'or')
return hits
def transform_hits(hits):
"""
The list from pypi is really a list of versions. We want a list of
packages with the list of versions stored inline. This converts the
list from pypi into one we can use.
"""
packages = OrderedDict()
for hit in hits:
name = hit['name']
summary = hit['summary']
version = hit['version']
if name not in packages.keys():
packages[name] = {
'name': name,
'summary': summary,
'versions': [version],
}
else:
packages[name]['versions'].append(version)
# if this is the highest version, replace summary and score
if version == highest_version(packages[name]['versions']):
packages[name]['summary'] = summary
return list(packages.values())
def print_results(hits, name_column_width=None, terminal_width=None):
if not hits:
return
if name_column_width is None:
name_column_width = max([
len(hit['name']) + len(highest_version(hit.get('versions', ['-'])))
for hit in hits
]) + 4
installed_packages = [p.project_name for p in pkg_resources.working_set]
for hit in hits:
name = hit['name']
summary = hit['summary'] or ''
latest = highest_version(hit.get('versions', ['-']))
if terminal_width is not None:
target_width = terminal_width - name_column_width - 5
if target_width > 10:
# wrap and indent summary to fit terminal
summary = textwrap.wrap(summary, target_width)
summary = ('\n' + ' ' * (name_column_width + 3)).join(summary)
line = '{name_latest:{name_column_width}} - {summary}'.format(
name_latest='{name} ({latest})'.format(**locals()),
**locals())
try:
write_output(line)
if name in installed_packages:
dist = pkg_resources.get_distribution(name)
with indent_log():
if dist.version == latest:
write_output('INSTALLED: %s (latest)', dist.version)
else:
write_output('INSTALLED: %s', dist.version)
if parse_version(latest).pre:
write_output('LATEST: %s (pre-release; install'
' with "pip install --pre")', latest)
else:
write_output('LATEST: %s', latest)
except UnicodeEncodeError:
pass
def highest_version(versions):
return max(versions, key=parse_version)