Blob Blame History Raw
# download.py, supplies the 'download' command.
#
# Copyright (C) 2013-2015  Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#

from __future__ import absolute_import
from __future__ import unicode_literals
from dnfpluginscore import _, logger
from dnf.cli.option_parser import OptionParser

import dnf
import dnf.cli
import dnf.exceptions
import dnf.i18n
import dnf.subject
import dnf.util
import hawkey
import itertools
import os
import shutil


@dnf.plugin.register_command
class DownloadCommand(dnf.cli.Command):

    aliases = ['download']
    summary = _('Download package to current directory')

    def __init__(self, cli):
        super(DownloadCommand, self).__init__(cli)
        self.opts = None
        self.parser = None

    @staticmethod
    def set_argparser(parser):
        parser.add_argument('packages', nargs='+',
                            help=_('packages to download'))
        parser.add_argument("--source", action='store_true',
                            help=_('download the src.rpm instead'))
        parser.add_argument("--debuginfo", action='store_true',
                            help=_('download the -debuginfo package instead'))
        parser.add_argument("--debugsource", action='store_true',
                            help=_('download the -debugsource package instead'))
        parser.add_argument("--arch", '--archlist', dest='arches', default=[],
                            action=OptionParser._SplitCallback, metavar='[arch]',
                            help=_("limit  the  query to packages of given architectures."))
        parser.add_argument('--resolve', action='store_true',
                            help=_('resolve and download needed dependencies'))
        parser.add_argument('--alldeps', action='store_true',
                            help=_('when running with --resolve, download all dependencies '
                                   '(do not exclude already installed ones)'))
        parser.add_argument('--url', '--urls', action='store_true', dest='url',
                            help=_('print list of urls where the rpms '
                                   'can be downloaded instead of downloading'))
        parser.add_argument('--urlprotocols', action='append',
                            choices=['http', 'https', 'rsync', 'ftp'],
                            default=[],
                            help=_('when running with --url, '
                                   'limit to specific protocols'))

    def configure(self):
        # setup sack and populate it with enabled repos
        demands = self.cli.demands
        demands.sack_activation = True
        demands.available_repos = True
        if self.opts.resolve and self.opts.alldeps:
            demands.load_system_repo = False

        if self.opts.source:
            self.base.repos.enable_source_repos()

        if self.opts.debuginfo or self.opts.debugsource:
            self.base.repos.enable_debug_repos()

        if self.opts.destdir:
            self.base.conf.destdir = self.opts.destdir
        else:
            self.base.conf.destdir = dnf.i18n.ucd(os.getcwd())

    def run(self):
        """Execute the util action here."""

        if (not self.opts.source
                and not self.opts.debuginfo
                and not self.opts.debugsource):
            pkgs = self._get_pkg_objs_rpms(self.opts.packages)
        else:
            pkgs = []
            if self.opts.source:
                pkgs.extend(self._get_pkg_objs_source(self.opts.packages))

            if self.opts.debuginfo:
                pkgs.extend(self._get_pkg_objs_debuginfo(self.opts.packages))

            if self.opts.debugsource:
                pkgs.extend(self._get_pkg_objs_debugsource(self.opts.packages))

        # If user asked for just urls then print them and we're done
        if self.opts.url:
            for pkg in pkgs:
                # command line repo packages do not have .remote_location
                if pkg.repoid != hawkey.CMDLINE_REPO_NAME:
                    url = pkg.remote_location(schemes=self.opts.urlprotocols)
                    if url:
                        print(url)
                    else:
                        msg = _("Failed to get mirror for package: %s") % pkg.name
                        if self.base.conf.strict:
                            raise dnf.exceptions.Error(msg)
                        logger.warning(msg)
            return
        else:
            self._do_downloads(pkgs)  # download rpms

    def _do_downloads(self, pkgs):
        """
        Perform the download for a list of packages
        """
        pkg_dict = {}
        for pkg in pkgs:
            pkg_dict.setdefault(str(pkg), []).append(pkg)

        to_download = []
        cmdline = []
        for pkg_list in pkg_dict.values():
            pkgs_cmdline = [pkg for pkg in pkg_list
                            if pkg.repoid == hawkey.CMDLINE_REPO_NAME]
            if pkgs_cmdline:
                cmdline.append(pkgs_cmdline[0])
                continue
            pkg_list.sort(key=lambda x: (x.repo.priority, x.repo.cost))
            to_download.append(pkg_list[0])
        if to_download:
            self.base.download_packages(to_download, self.base.output.progress)
        if cmdline:
            # command line repo packages are either local files or already downloaded urls
            # just copy them to the destination
            for pkg in cmdline:
                # python<3.4 shutil module does not raise SameFileError, check manually
                src = pkg.localPkg()
                dst = os.path.join(self.base.conf.destdir, os.path.basename(src))
                if os.path.exists(dst) and os.path.samefile(src, dst):
                    continue
                shutil.copy(src, self.base.conf.destdir)
        locations = sorted([pkg.localPkg() for pkg in to_download + cmdline])
        return locations

    def _get_pkg_objs_rpms(self, pkg_specs):
        """
        Return a list of dnf.Package objects that represent the rpms
        to download.
        """
        if self.opts.resolve:
            pkgs = self._get_packages_with_deps(pkg_specs)
        else:
            pkgs = self._get_packages(pkg_specs)
        return pkgs

    def _get_pkg_objs_source(self, pkg_specs):
        """
        Return a list of dnf.Package objects that represent the source
        rpms to download.
        """
        pkgs = self._get_pkg_objs_rpms(pkg_specs)
        source_pkgs = self._get_source_packages(pkgs)
        pkgs = set(self._get_packages(source_pkgs, source=True))
        return pkgs

    def _get_pkg_objs_debuginfo(self, pkg_specs):
        """
        Return a list of dnf.Package objects that represent the debuginfo
        rpms to download.
        """
        dbg_pkgs = set()
        q = self.base.sack.query().available()

        for pkg in self._get_packages(pkg_specs):
            for dbg_name in [pkg.debug_name, pkg.source_debug_name]:
                dbg_available = q.filter(
                    name=dbg_name,
                    epoch=int(pkg.epoch),
                    version=pkg.version,
                    release=pkg.release,
                    arch=pkg.arch
                )

                if not dbg_available:
                    continue

                for p in dbg_available:
                    dbg_pkgs.add(p)

                break

        return dbg_pkgs

    def _get_pkg_objs_debugsource(self, pkg_specs):
        """
        Return a list of dnf.Package objects that represent the debugsource
        rpms to download.
        """
        dbg_pkgs = set()
        q = self.base.sack.query().available()

        for pkg in self._get_packages(pkg_specs):
            dbg_available = q.filter(
                name=pkg.debugsource_name,
                epoch=int(pkg.epoch),
                version=pkg.version,
                release=pkg.release,
                arch=pkg.arch
            )

            for p in dbg_available:
                dbg_pkgs.add(p)

        return dbg_pkgs

    def _get_packages(self, pkg_specs, source=False):
        """Get packages matching pkg_specs."""
        func = self._get_query_source if source else self._get_query
        queries = []
        for pkg_spec in pkg_specs:
            try:
                queries.append(func(pkg_spec))
            except dnf.exceptions.PackageNotFoundError as e:
                logger.error(dnf.i18n.ucd(e))
                if self.base.conf.strict:
                    logger.error(_("Exiting due to strict setting."))
                    raise dnf.exceptions.Error(e)

        pkgs = list(itertools.chain(*queries))
        return pkgs

    def _get_packages_with_deps(self, pkg_specs, source=False):
        """Get packages matching pkg_specs and the deps."""
        pkgs = self._get_packages(pkg_specs)
        pkg_set = set(pkgs)
        for pkg in pkgs:
            goal = hawkey.Goal(self.base.sack)
            goal.install(pkg)
            rc = goal.run()
            if rc:
                pkg_set.update(goal.list_installs())
                pkg_set.update(goal.list_upgrades())
            else:
                msg = [_('Error in resolve of packages:')]
                logger.error("\n    ".join(msg + [str(pkg) for pkg in pkgs]))
                logger.error(dnf.util._format_resolve_problems(goal.problem_rules()))
                raise dnf.exceptions.Error()
        return pkg_set

    @staticmethod
    def _get_source_packages(pkgs):
        """Get list of source rpm names for a list of packages."""
        source_pkgs = set()
        for pkg in pkgs:
            if pkg.sourcerpm:
                source_pkgs.add(pkg.sourcerpm)
                logger.debug('  --> Package : %s Source : %s',
                             str(pkg), pkg.sourcerpm)
            elif pkg.arch == 'src':
                source_pkgs.add("%s-%s.src.rpm" % (pkg.name, pkg.evr))
            else:
                logger.info(_("No source rpm defined for %s"), str(pkg))
        return list(source_pkgs)

    def _get_query(self, pkg_spec):
        """Return a query to match a pkg_spec."""
        schemes = dnf.pycomp.urlparse.urlparse(pkg_spec)[0]
        is_url = schemes and schemes in ('http', 'ftp', 'file', 'https')
        if is_url or (pkg_spec.endswith('.rpm') and os.path.isfile(pkg_spec)):
            pkgs = self.base.add_remote_rpms([pkg_spec], progress=self.base.output.progress)
            return self.base.sack.query().filterm(pkg=pkgs)
        subj = dnf.subject.Subject(pkg_spec)
        q = subj.get_best_query(self.base.sack, with_src=self.opts.source)
        q = q.available()
        q = q.filterm(latest_per_arch_by_priority=True)
        if self.opts.arches:
            q = q.filter(arch=self.opts.arches)
        if len(q.run()) == 0:
            msg = _("No package %s available.") % (pkg_spec)
            raise dnf.exceptions.PackageNotFoundError(msg)
        return q

    def _get_query_source(self, pkg_spec):
        """Return a query to match a source rpm file name."""
        pkg_spec = pkg_spec[:-4]  # skip the .rpm
        subj = dnf.subject.Subject(pkg_spec)
        for nevra_obj in subj.get_nevra_possibilities():
            tmp_query = nevra_obj.to_query(self.base.sack).available()
            if tmp_query:
                return tmp_query.latest()

        msg = _("No package %s available.") % (pkg_spec)
        raise dnf.exceptions.PackageNotFoundError(msg)