Blob Blame History Raw
# __init__.py
# dnf.automatic CLI
#
# Copyright (C) 2014-2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.  You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#

from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from dnf.i18n import _, ucd
import dnf
import dnf.automatic.emitter
import dnf.cli
import dnf.cli.cli
import dnf.cli.output
import dnf.conf
import libdnf.conf
import dnf.const
import dnf.exceptions
import dnf.util
import dnf.logging
import hawkey
import logging
import socket
import argparse
import random
import time

logger = logging.getLogger('dnf')


def build_emitters(conf):
    emitters = dnf.util.MultiCallList([])
    system_name = conf.emitters.system_name
    emit_via = conf.emitters.emit_via
    if emit_via:
        for name in emit_via:
            if name == 'email':
                emitter = dnf.automatic.emitter.EmailEmitter(system_name, conf.email)
                emitters.append(emitter)
            elif name == 'stdio':
                emitter = dnf.automatic.emitter.StdIoEmitter(system_name)
                emitters.append(emitter)
            elif name == 'motd':
                emitter = dnf.automatic.emitter.MotdEmitter(system_name)
                emitters.append(emitter)
            elif name == 'command_email':
                emitter = dnf.automatic.emitter.CommandEmailEmitter(system_name, conf.command_email)
                emitters.append(emitter)
            else:
                raise dnf.exceptions.ConfigError("Unknown emitter option: %s" % name)
    return emitters


def parse_arguments(args):
    parser = argparse.ArgumentParser()
    parser.add_argument('conf_path', nargs='?', default=dnf.const.CONF_AUTOMATIC_FILENAME)
    parser.add_argument('--timer', action='store_true')
    parser.add_argument('--installupdates', dest='installupdates', action='store_true')
    parser.add_argument('--downloadupdates', dest='downloadupdates', action='store_true')
    parser.add_argument('--no-installupdates', dest='installupdates', action='store_false')
    parser.add_argument('--no-downloadupdates', dest='downloadupdates', action='store_false')
    parser.set_defaults(installupdates=None)
    parser.set_defaults(downloadupdates=None)

    return parser.parse_args(args), parser


class AutomaticConfig(object):
    def __init__(self, filename=None, downloadupdates=None,
                 installupdates=None):
        if not filename:
            filename = dnf.const.CONF_AUTOMATIC_FILENAME
        self.commands = CommandsConfig()
        self.email = EmailConfig()
        self.emitters = EmittersConfig()
        self.command_email = CommandEmailConfig()
        self._parser = None
        self._load(filename)

        if downloadupdates:
            self.commands.download_updates = True
        elif downloadupdates is False:
            self.commands.download_updates = False
        if installupdates:
            self.commands.apply_updates = True
        elif installupdates is False:
            self.commands.apply_updates = False

        self.commands.imply()
        self.filename = filename

    def _load(self, filename):
        parser = libdnf.conf.ConfigParser()
        try:
            parser.read(filename)
        except RuntimeError as e:
            raise dnf.exceptions.ConfigError('Parsing file "%s" failed: %s' % (filename, e))
        except IOError as e:
            logger.warning(e)

        self.commands.populate(parser, 'commands', filename,
                               libdnf.conf.Option.Priority_AUTOMATICCONFIG)
        self.email.populate(parser, 'email', filename, libdnf.conf.Option.Priority_AUTOMATICCONFIG)
        self.emitters.populate(parser, 'emitters', filename,
                               libdnf.conf.Option.Priority_AUTOMATICCONFIG)
        self.command_email.populate(parser, 'command_email', filename,
                                    libdnf.conf.Option.Priority_AUTOMATICCONFIG)
        self._parser = parser

    def update_baseconf(self, baseconf):
        baseconf._populate(self._parser, 'base', self.filename, dnf.conf.PRIO_AUTOMATICCONFIG)


class Config(object):
    def __init__(self):
        self._options = {}

    def add_option(self, name, optionobj):
        self._options[name] = optionobj

        def prop_get(obj):
            return obj._options[name].getValue()

        def prop_set(obj, val):
            obj._options[name].set(libdnf.conf.Option.Priority_RUNTIME, val)

        setattr(type(self), name, property(prop_get, prop_set))

    def populate(self, parser, section, filename, priority):
        """Set option values from an INI file section."""
        if parser.hasSection(section):
            for name in parser.options(section):
                value = parser.getValue(section, name)
                if not value or value == 'None':
                    value = ''
                opt = self._options.get(name, None)
                if opt:
                    try:
                        opt.set(priority, value)
                    except RuntimeError as e:
                        logger.debug(_('Unknown configuration value: %s=%s in %s; %s'),
                                     ucd(name), ucd(value), ucd(filename), str(e))
                else:
                    logger.debug(
                        _('Unknown configuration option: %s = %s in %s'),
                        ucd(name), ucd(value), ucd(filename))


class CommandsConfig(Config):
    def __init__(self):
        super(CommandsConfig, self).__init__()
        self.add_option('apply_updates', libdnf.conf.OptionBool(False))
        self.add_option('base_config_file', libdnf.conf.OptionString('/etc/dnf/dnf.conf'))
        self.add_option('download_updates', libdnf.conf.OptionBool(False))
        self.add_option('upgrade_type', libdnf.conf.OptionEnumString('default',
                        libdnf.conf.VectorString(['default', 'security'])))
        self.add_option('random_sleep', libdnf.conf.OptionNumberInt32(300))

    def imply(self):
        if self.apply_updates:
            self.download_updates = True


class EmailConfig(Config):
    def __init__(self):
        super(EmailConfig, self).__init__()
        self.add_option('email_to',
                        libdnf.conf.OptionStringList(libdnf.conf.VectorString(["root"])))
        self.add_option('email_from', libdnf.conf.OptionString("root"))
        self.add_option('email_host', libdnf.conf.OptionString("localhost"))
        self.add_option('email_port', libdnf.conf.OptionNumberInt32(25))


class CommandConfig(Config):
    _default_command_format = "cat"
    _default_stdin_format = "{body}"

    def __init__(self):
        super(CommandConfig, self).__init__()
        self.add_option('command_format',
                        libdnf.conf.OptionString(self._default_command_format))
        self.add_option('stdin_format',
                        libdnf.conf.OptionString(self._default_stdin_format))


class CommandEmailConfig(CommandConfig):
    _default_command_format = "mail -s {subject} -r {email_from} {email_to}"

    def __init__(self):
        super(CommandEmailConfig, self).__init__()
        self.add_option('email_to',
                        libdnf.conf.OptionStringList(libdnf.conf.VectorString(["root"])))
        self.add_option('email_from', libdnf.conf.OptionString("root"))


class EmittersConfig(Config):
    def __init__(self):
        super(EmittersConfig, self).__init__()
        self.add_option('emit_via', libdnf.conf.OptionStringList(
            libdnf.conf.VectorString(['email', 'stdio'])))
        self.add_option('output_width', libdnf.conf.OptionNumberInt32(80))
        self.add_option('system_name', libdnf.conf.OptionString(socket.gethostname()))


def gpgsigcheck(base, pkgs):
    ok = True
    for po in pkgs:
        result, errmsg = base.package_signature_check(po)
        if result != 0:
            ok = False
            logger.critical(errmsg)
    if not ok:
        raise dnf.exceptions.Error(_("GPG check FAILED"))


def main(args):
    (opts, parser) = parse_arguments(args)

    try:
        conf = AutomaticConfig(opts.conf_path, opts.downloadupdates,
                               opts.installupdates)
        with dnf.Base() as base:
            cli = dnf.cli.Cli(base)
            cli._read_conf_file()
            # Although dnf-automatic does not use demands, the versionlock
            # plugin uses this demand do decide whether it's rules should
            # be applied.
            # https://bugzilla.redhat.com/show_bug.cgi?id=1746562
            cli.demands.resolving = True
            conf.update_baseconf(base.conf)
            base.init_plugins(cli=cli)
            logger.debug(_('Started dnf-automatic.'))

            if opts.timer:
                sleeper = random.randint(0, conf.commands.random_sleep)
                logger.debug(_('Sleep for %s seconds'), sleeper)
                time.sleep(sleeper)

            base.pre_configure_plugins()
            base.read_all_repos()
            base.configure_plugins()
            base.fill_sack()
            upgrade(base, conf.commands.upgrade_type)
            base.resolve()
            output = dnf.cli.output.Output(base, base.conf)
            trans = base.transaction
            if not trans:
                return 0

            lst = output.list_transaction(trans, total_width=80)
            emitters = build_emitters(conf)
            emitters.notify_available(lst)
            if not conf.commands.download_updates:
                emitters.commit()
                return 0

            base.download_packages(trans.install_set)
            emitters.notify_downloaded()
            if not conf.commands.apply_updates:
                emitters.commit()
                return 0

            gpgsigcheck(base, trans.install_set)
            base.do_transaction()
            emitters.notify_applied()
            emitters.commit()
    except dnf.exceptions.Error as exc:
        logger.error(_('Error: %s'), ucd(exc))
        return 1
    return 0


def upgrade(base, upgrade_type):
    if upgrade_type == 'security':
        base._update_security_filters.append(base.sack.query().upgrades().filterm(
            advisory_type='security'))
        base.upgrade_all()
    elif upgrade_type == 'default':
        base.upgrade_all()
    else:
        raise dnf.exceptions.Error(
            'Unsupported upgrade_type "{}", only "default" and "security" supported'.format(
                upgrade_type))