Blob Blame History Raw
#
# Copyright (C) 2015  Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#

from __future__ import absolute_import
from __future__ import unicode_literals

from dnf.i18n import ucd
from dnfpluginscore import _, logger

import dnf
import dnf.cli
import gzip
import hawkey
import os
import rpm
import sys
import time

DEBUG_VERSION = "dnf-debug-dump version 1\n"


class Debug(dnf.Plugin):

    name = 'debug'

    def __init__(self, base, cli):
        super(Debug, self).__init__(base, cli)
        self.base = base
        self.cli = cli
        if self.cli is not None:
            self.cli.register_command(DebugDumpCommand)
            self.cli.register_command(DebugRestoreCommand)


class DebugDumpCommand(dnf.cli.Command):

    aliases = ("debug-dump",)
    summary = _("dump information about installed rpm packages to file")

    def __init__(self, cli):
        super(DebugDumpCommand, self).__init__(cli)
        self.dump_file = None

    def configure(self):
        self.cli.demands.sack_activation = True
        self.cli.demands.available_repos = True

    @staticmethod
    def set_argparser(parser):
        parser.add_argument(
            "--norepos", action="store_true", default=False,
            help=_("do not attempt to dump the repository contents."))
        parser.add_argument(
            "filename", nargs="?",
            help=_("optional name of dump file"))

    def run(self):
        """create debug txt file and compress it, if no filename specified
           use dnf_debug_dump-<timestamp>.txt.gz by default"""

        filename = self.opts.filename
        if not filename:
            now = time.strftime("%Y-%m-%d_%T", time.localtime(time.time()))
            filename = "dnf_debug_dump-%s-%s.txt.gz" % (os.uname()[1], now)

        filename = os.path.abspath(filename)
        if filename.endswith(".gz"):
            self.dump_file = gzip.GzipFile(filename, "w")
        else:
            self.dump_file = open(filename, "w")

        self.write(DEBUG_VERSION)
        self.dump_system_info()
        self.dump_dnf_config_info()
        self.dump_rpm_problems()
        self.dump_packages(not self.opts.norepos)
        self.dump_rpmdb_versions()
        self.dump_file.close()

        print(_("Output written to: %s") % filename)

    def write(self, msg):
        if dnf.pycomp.PY3 and isinstance(self.dump_file, gzip.GzipFile):
            msg = bytes(msg, "utf8")
        dnf.pycomp.write_to_file(self.dump_file, msg)

    def dump_system_info(self):
        self.write("%%%%SYSTEM INFO\n")
        uname = os.uname()
        self.write("  uname: %s, %s\n" % (uname[2], uname[4]))
        self.write("  rpm ver: %s\n" % rpm.__version__)
        self.write("  python ver: %s\n" % sys.version.replace("\n", ""))
        return

    def dump_dnf_config_info(self):
        var = self.base.conf.substitutions
        plugins = ",".join([p.name for p in self.base._plugins.plugins])
        self.write("%%%%DNF INFO\n")
        self.write("  arch: %s\n" % var["arch"])
        self.write("  basearch: %s\n" % var["basearch"])
        self.write("  releasever: %s\n" % var["releasever"])
        self.write("  dnf ver: %s\n" % dnf.const.VERSION)
        self.write("  enabled plugins: %s\n" % plugins)
        self.write("  global excludes: %s\n" % ",".join(self.base.conf.excludepkgs))
        return

    def dump_rpm_problems(self):
        self.write("%%%%RPMDB PROBLEMS\n")
        (missing, conflicts) = rpm_problems(self.base)
        self.write("".join(["Package %s requires %s\n" % (ucd(pkg), ucd(req))
                            for (req, pkg) in missing]))
        self.write("".join(["Package %s conflicts with %s\n" % (ucd(pkg),
                                                                ucd(conf))
                            for (conf, pkg) in conflicts]))

    def dump_packages(self, load_repos):
        q = self.base.sack.query()
        # packages from rpmdb
        self.write("%%%%RPMDB\n")
        for p in sorted(q.installed()):
            self.write("  %s\n" % pkgspec(p))

        if not load_repos:
            return

        self.write("%%%%REPOS\n")
        available = q.available()
        for repo in sorted(self.base.repos.iter_enabled(), key=lambda x: x.id):
            try:
                url = None
                if repo.metalink is not None:
                    url = repo.metalink
                elif repo.mirrorlist is not None:
                    url = repo.mirrorlist
                elif len(repo.baseurl) > 0:
                    url = repo.baseurl[0]
                self.write("%%%s - %s\n" % (repo.id, url))
                self.write("  excludes: %s\n" % ",".join(repo.excludepkgs))
                for po in sorted(available.filter(reponame=repo.id)):
                    self.write("  %s\n" % pkgspec(po))

            except dnf.exceptions.Error as e:
                self.write("Error accessing repo %s: %s\n" % (repo, str(e)))
                continue
        return

    def dump_rpmdb_versions(self):
        self.write("%%%%RPMDB VERSIONS\n")
        version = self.base.sack._rpmdb_version()
        self.write("  all: %s\n" % version)
        return


class DebugRestoreCommand(dnf.cli.Command):

    aliases = ("debug-restore",)
    summary = _("restore packages recorded in debug-dump file")

    def configure(self):
        self.cli.demands.sack_activation = True
        self.cli.demands.available_repos = True
        self.cli.demands.root_user = True
        if not self.opts.output:
            self.cli.demands.resolving = True

    @staticmethod
    def set_argparser(parser):
        parser.add_argument(
            "--output", action="store_true",
            help=_("output commands that would be run to stdout."))
        parser.add_argument(
            "--install-latest", action="store_true",
            help=_("Install the latest version of recorded packages."))
        parser.add_argument(
            "--ignore-arch", action="store_true",
            help=_("Ignore architecture and install missing packages matching "
                   "the name, epoch, version and release."))
        parser.add_argument(
            "--filter-types", metavar="[install, remove, replace]",
            default="install, remove, replace",
            help=_("limit to specified type"))
        parser.add_argument(
            "--remove-installonly", action="store_true",
            help=_('Allow removing of install-only packages. Using this option may '
                   'result in an attempt to remove the running kernel.'))
        parser.add_argument(
            "filename", nargs=1, help=_("name of dump file"))

    def run(self):
        """Execute the command action here."""
        if self.opts.filter_types:
            self.opts.filter_types = set(
                self.opts.filter_types.replace(",", " ").split())

        dump_pkgs = self.read_dump_file(self.opts.filename[0])

        self.process_installed(dump_pkgs, self.opts)

        self.process_dump(dump_pkgs, self.opts)

    def process_installed(self, dump_pkgs, opts):
        installed = self.base.sack.query().installed()
        installonly_pkgs = self.base._get_installonly_query(installed)
        for pkg in installed:
            pkg_remove = False
            spec = pkgspec(pkg)
            dumped_versions = dump_pkgs.get((pkg.name, pkg.arch), None)
            if dumped_versions is not None:
                evr = (pkg.epoch, pkg.version, pkg.release)
                if evr in dumped_versions:
                    # the correct version is already installed
                    dumped_versions[evr] = 'skip'
                else:
                    # other version is currently installed
                    if pkg in installonly_pkgs:
                        # package is install-only, should be removed
                        pkg_remove = True
                    else:
                        # package should be upgraded / downgraded
                        if "replace" in opts.filter_types:
                            action = 'replace'
                        else:
                            action = 'skip'
                        for d_evr in dumped_versions.keys():
                            dumped_versions[d_evr] = action
            else:
                # package should not be installed
                pkg_remove = True
            if pkg_remove and "remove" in opts.filter_types:
                if pkg not in installonly_pkgs or opts.remove_installonly:
                    if opts.output:
                        print("remove    %s" % spec)
                    else:
                        self.base.package_remove(pkg)

    def process_dump(self, dump_pkgs, opts):
        for (n, a) in sorted(dump_pkgs.keys()):
            dumped_versions = dump_pkgs[(n, a)]
            for (e, v, r) in sorted(dumped_versions.keys()):
                action = dumped_versions[(e, v, r)]
                if action == 'skip':
                    continue
                if opts.ignore_arch:
                    arch = ""
                else:
                    arch = "." + a
                if opts.install_latest and action == "install":
                    pkg_spec = "%s%s" % (n, arch)
                else:
                    pkg_spec = pkgtup2spec(n, arch, e, v, r)
                if action in opts.filter_types:
                    if opts.output:
                        print("%s   %s" % (action, pkg_spec))
                    else:
                        try:
                            self.base.install(pkg_spec)
                        except dnf.exceptions.MarkingError:
                            logger.error(_("Package %s is not available"), pkg_spec)

    @staticmethod
    def read_dump_file(filename):
        if filename.endswith(".gz"):
            fobj = gzip.GzipFile(filename)
        else:
            fobj = open(filename)

        if ucd(fobj.readline()) != DEBUG_VERSION:
            logger.error(_("Bad dnf debug file: %s"), filename)
            raise dnf.exceptions.Error

        skip = True
        pkgs = {}
        for line in fobj:
            line = ucd(line)
            if skip:
                if line == "%%%%RPMDB\n":
                    skip = False
                continue

            if not line or line[0] != " ":
                break

            pkg_spec = line.strip()
            nevra = hawkey.split_nevra(pkg_spec)
            # {(name, arch): {(epoch, version, release): action}}
            pkgs.setdefault((nevra.name, nevra.arch), {})[
                (nevra.epoch, nevra.version, nevra.release)] = "install"

        return pkgs


def rpm_problems(base):
    rpmdb = dnf.sack._rpmdb_sack(base)
    allpkgs = rpmdb.query().installed()

    requires = set()
    conflicts = set()
    for pkg in allpkgs:
        requires.update([(req, pkg) for req in pkg.requires
                         if not str(req) == "solvable:prereqmarker"
                         and not str(req).startswith("rpmlib(")])
        conflicts.update([(conf, pkg) for conf in pkg.conflicts])

    missing_requires = [(req, pkg) for (req, pkg) in requires
                        if not allpkgs.filter(provides=req)]
    existing_conflicts = [(conf, pkg) for (conf, pkg) in conflicts
                          if allpkgs.filter(provides=conf)]
    return missing_requires, existing_conflicts


def pkgspec(pkg):
    return pkgtup2spec(pkg.name, pkg.arch, pkg.epoch, pkg.version, pkg.release)


def pkgtup2spec(name, arch, epoch, version, release):
    a = "" if not arch else ".%s" % arch.lstrip('.')
    e = "" if epoch in (None, "") else "%s:" % epoch
    return "%s-%s%s-%s%s" % (name, e, version, release, a)