175 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			175 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
import logging
 | 
						|
import shutil
 | 
						|
import sys
 | 
						|
import textwrap
 | 
						|
import xmlrpc.client
 | 
						|
from collections import OrderedDict
 | 
						|
from optparse import Values
 | 
						|
from typing import TYPE_CHECKING, Dict, List, Optional
 | 
						|
 | 
						|
from pip._vendor.packaging.version import parse as parse_version
 | 
						|
 | 
						|
from pip._internal.cli.base_command import Command
 | 
						|
from pip._internal.cli.req_command import SessionCommandMixin
 | 
						|
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
 | 
						|
from pip._internal.exceptions import CommandError
 | 
						|
from pip._internal.metadata import get_default_environment
 | 
						|
from pip._internal.models.index import PyPI
 | 
						|
from pip._internal.network.xmlrpc import PipXmlrpcTransport
 | 
						|
from pip._internal.utils.logging import indent_log
 | 
						|
from pip._internal.utils.misc import write_output
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from typing import TypedDict
 | 
						|
 | 
						|
    class TransformedHit(TypedDict):
 | 
						|
        name: str
 | 
						|
        summary: str
 | 
						|
        versions: List[str]
 | 
						|
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class SearchCommand(Command, SessionCommandMixin):
 | 
						|
    """Search for PyPI packages whose name or summary contains <query>."""
 | 
						|
 | 
						|
    usage = """
 | 
						|
      %prog [options] <query>"""
 | 
						|
    ignore_require_venv = True
 | 
						|
 | 
						|
    def add_options(self) -> None:
 | 
						|
        self.cmd_opts.add_option(
 | 
						|
            "-i",
 | 
						|
            "--index",
 | 
						|
            dest="index",
 | 
						|
            metavar="URL",
 | 
						|
            default=PyPI.pypi_url,
 | 
						|
            help="Base URL of Python Package Index (default %default)",
 | 
						|
        )
 | 
						|
 | 
						|
        self.parser.insert_option_group(0, self.cmd_opts)
 | 
						|
 | 
						|
    def run(self, options: Values, args: List[str]) -> int:
 | 
						|
        if not args:
 | 
						|
            raise CommandError("Missing required argument (search query).")
 | 
						|
        query = args
 | 
						|
        pypi_hits = self.search(query, options)
 | 
						|
        hits = transform_hits(pypi_hits)
 | 
						|
 | 
						|
        terminal_width = None
 | 
						|
        if sys.stdout.isatty():
 | 
						|
            terminal_width = shutil.get_terminal_size()[0]
 | 
						|
 | 
						|
        print_results(hits, terminal_width=terminal_width)
 | 
						|
        if pypi_hits:
 | 
						|
            return SUCCESS
 | 
						|
        return NO_MATCHES_FOUND
 | 
						|
 | 
						|
    def search(self, query: List[str], options: Values) -> List[Dict[str, str]]:
 | 
						|
        index_url = options.index
 | 
						|
 | 
						|
        session = self.get_default_session(options)
 | 
						|
 | 
						|
        transport = PipXmlrpcTransport(index_url, session)
 | 
						|
        pypi = xmlrpc.client.ServerProxy(index_url, transport)
 | 
						|
        try:
 | 
						|
            hits = pypi.search({"name": query, "summary": query}, "or")
 | 
						|
        except xmlrpc.client.Fault as fault:
 | 
						|
            message = "XMLRPC request failed [code: {code}]\n{string}".format(
 | 
						|
                code=fault.faultCode,
 | 
						|
                string=fault.faultString,
 | 
						|
            )
 | 
						|
            raise CommandError(message)
 | 
						|
        assert isinstance(hits, list)
 | 
						|
        return hits
 | 
						|
 | 
						|
 | 
						|
def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]:
 | 
						|
    """
 | 
						|
    The list from pypi is really a list of versions. We want a list of
 | 
						|
    packages with the list of versions stored inline. This converts the
 | 
						|
    list from pypi into one we can use.
 | 
						|
    """
 | 
						|
    packages: Dict[str, "TransformedHit"] = OrderedDict()
 | 
						|
    for hit in hits:
 | 
						|
        name = hit["name"]
 | 
						|
        summary = hit["summary"]
 | 
						|
        version = hit["version"]
 | 
						|
 | 
						|
        if name not in packages.keys():
 | 
						|
            packages[name] = {
 | 
						|
                "name": name,
 | 
						|
                "summary": summary,
 | 
						|
                "versions": [version],
 | 
						|
            }
 | 
						|
        else:
 | 
						|
            packages[name]["versions"].append(version)
 | 
						|
 | 
						|
            # if this is the highest version, replace summary and score
 | 
						|
            if version == highest_version(packages[name]["versions"]):
 | 
						|
                packages[name]["summary"] = summary
 | 
						|
 | 
						|
    return list(packages.values())
 | 
						|
 | 
						|
 | 
						|
def print_dist_installation_info(name: str, latest: str) -> None:
 | 
						|
    env = get_default_environment()
 | 
						|
    dist = env.get_distribution(name)
 | 
						|
    if dist is not None:
 | 
						|
        with indent_log():
 | 
						|
            if dist.version == latest:
 | 
						|
                write_output("INSTALLED: %s (latest)", dist.version)
 | 
						|
            else:
 | 
						|
                write_output("INSTALLED: %s", dist.version)
 | 
						|
                if parse_version(latest).pre:
 | 
						|
                    write_output(
 | 
						|
                        "LATEST:    %s (pre-release; install"
 | 
						|
                        " with `pip install --pre`)",
 | 
						|
                        latest,
 | 
						|
                    )
 | 
						|
                else:
 | 
						|
                    write_output("LATEST:    %s", latest)
 | 
						|
 | 
						|
 | 
						|
def print_results(
 | 
						|
    hits: List["TransformedHit"],
 | 
						|
    name_column_width: Optional[int] = None,
 | 
						|
    terminal_width: Optional[int] = None,
 | 
						|
) -> None:
 | 
						|
    if not hits:
 | 
						|
        return
 | 
						|
    if name_column_width is None:
 | 
						|
        name_column_width = (
 | 
						|
            max(
 | 
						|
                [
 | 
						|
                    len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
 | 
						|
                    for hit in hits
 | 
						|
                ]
 | 
						|
            )
 | 
						|
            + 4
 | 
						|
        )
 | 
						|
 | 
						|
    for hit in hits:
 | 
						|
        name = hit["name"]
 | 
						|
        summary = hit["summary"] or ""
 | 
						|
        latest = highest_version(hit.get("versions", ["-"]))
 | 
						|
        if terminal_width is not None:
 | 
						|
            target_width = terminal_width - name_column_width - 5
 | 
						|
            if target_width > 10:
 | 
						|
                # wrap and indent summary to fit terminal
 | 
						|
                summary_lines = textwrap.wrap(summary, target_width)
 | 
						|
                summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines)
 | 
						|
 | 
						|
        name_latest = f"{name} ({latest})"
 | 
						|
        line = f"{name_latest:{name_column_width}} - {summary}"
 | 
						|
        try:
 | 
						|
            write_output(line)
 | 
						|
            print_dist_installation_info(name, latest)
 | 
						|
        except UnicodeEncodeError:
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
def highest_version(versions: List[str]) -> str:
 | 
						|
    return max(versions, key=parse_version)
 |