Blame dnf/dnssec.py

Packit 6f3914
# dnssec.py
Packit 6f3914
# DNS extension for automatic GPG key verification
Packit 6f3914
#
Packit 6f3914
# Copyright (C) 2012-2018 Red Hat, Inc.
Packit 6f3914
#
Packit 6f3914
# This copyrighted material is made available to anyone wishing to use,
Packit 6f3914
# modify, copy, or redistribute it subject to the terms and conditions of
Packit 6f3914
# the GNU General Public License v.2, or (at your option) any later version.
Packit 6f3914
# This program is distributed in the hope that it will be useful, but WITHOUT
Packit 6f3914
# ANY WARRANTY expressed or implied, including the implied warranties of
Packit 6f3914
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Packit 6f3914
# Public License for more details.  You should have received a copy of the
Packit 6f3914
# GNU General Public License along with this program; if not, write to the
Packit 6f3914
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit 6f3914
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
Packit 6f3914
# source code or documentation are not subject to the GNU General Public
Packit 6f3914
# License and may only be used or replicated with the express permission of
Packit 6f3914
# Red Hat, Inc.
Packit 6f3914
#
Packit 6f3914
Packit 6f3914
from __future__ import print_function
Packit 6f3914
from __future__ import absolute_import
Packit 6f3914
from __future__ import unicode_literals
Packit 6f3914
Packit 6f3914
from enum import Enum
Packit 6f3914
import base64
Packit 6f3914
import hashlib
Packit 6f3914
import logging
Packit 6f3914
import re
Packit 6f3914
Packit 6f3914
from dnf.i18n import _
Packit 6f3914
import dnf.rpm
Packit 6f3914
import dnf.exceptions
Packit 6f3914
Packit 6f3914
logger = logging.getLogger("dnf")
Packit 6f3914
Packit 6f3914
Packit 6f3914
RR_TYPE_OPENPGPKEY = 61
Packit 6f3914
Packit 6f3914
Packit 6f3914
class DnssecError(dnf.exceptions.Error):
Packit 6f3914
    """
Packit 6f3914
    Exception used in the dnssec module
Packit 6f3914
    """
Packit 6f3914
    def __repr__(self):
Packit 6f3914
        return "<DnssecError, value='{}'>"\
Packit 6f3914
            .format(self.value if self.value is not None else "Not specified")
Packit 6f3914
Packit 6f3914
Packit 6f3914
def email2location(email_address, tag="_openpgpkey"):
Packit 6f3914
    # type: (str, str) -> str
Packit 6f3914
    """
Packit 6f3914
    Implements RFC 7929, section 3
Packit 6f3914
    https://tools.ietf.org/html/rfc7929#section-3
Packit 6f3914
    :param email_address:
Packit 6f3914
    :param tag:
Packit 6f3914
    :return:
Packit 6f3914
    """
Packit 6f3914
    split = email_address.split("@")
Packit 6f3914
    if len(split) != 2:
Packit 6f3914
        msg = "Email address should contain exactly one '@' sign."
Packit 6f3914
        logger.error(msg)
Packit 6f3914
        raise DnssecError(msg)
Packit 6f3914
Packit 6f3914
    local = split[0]
Packit 6f3914
    domain = split[1]
Packit 6f3914
    hash = hashlib.sha256()
Packit 6f3914
    hash.update(local.encode('utf-8'))
Packit 6f3914
    digest = base64.b16encode(hash.digest()[0:28])\
Packit 6f3914
        .decode("utf-8")\
Packit 6f3914
        .lower()
Packit 6f3914
    return digest + "." + tag + "." + domain
Packit 6f3914
Packit 6f3914
Packit 6f3914
class Validity(Enum):
Packit 6f3914
    """
Packit 6f3914
    Output of the verification algorithm.
Packit 6f3914
    TODO: this type might be simplified in order to less reflect the underlying DNS layer.
Packit 6f3914
    TODO: more specifically the variants from 3 to 5 should have more understandable names
Packit 6f3914
    """
Packit 6f3914
    VALID = 1
Packit 6f3914
    REVOKED = 2
Packit 6f3914
    PROVEN_NONEXISTENCE = 3
Packit 6f3914
    RESULT_NOT_SECURE = 4
Packit 6f3914
    BOGUS_RESULT = 5
Packit 6f3914
    ERROR = 9
Packit 6f3914
Packit 6f3914
Packit 6f3914
class NoKey:
Packit 6f3914
    """
Packit 6f3914
    This class represents an absence of a key in the cache. It is an expression of non-existence
Packit 6f3914
    using the Python's type system.
Packit 6f3914
    """
Packit 6f3914
    pass
Packit 6f3914
Packit 6f3914
Packit 6f3914
class KeyInfo:
Packit 6f3914
    """
Packit 6f3914
    Wrapper class for email and associated verification key, where both are represented in
Packit 6f3914
    form of a string.
Packit 6f3914
    """
Packit 6f3914
    def __init__(self, email=None, key=None):
Packit 6f3914
        self.email = email
Packit 6f3914
        self.key = key
Packit 6f3914
Packit 6f3914
    @staticmethod
Packit 6f3914
    def from_rpm_key_object(userid, raw_key):
Packit 6f3914
        # type: (str, bytes) -> KeyInfo
Packit 6f3914
        """
Packit 6f3914
        Since dnf uses different format of the key than the one used in DNS RR, I need to convert
Packit 6f3914
        the former one into the new one.
Packit 6f3914
        """
Packit 6f3914
        input_email = re.search('<(.*@.*)>', userid)
Packit 6f3914
        if input_email is None:
Packit 6f3914
            raise DnssecError
Packit 6f3914
Packit 6f3914
        email = input_email.group(1)
Packit 6f3914
        key = raw_key.decode('ascii').split('\n')
Packit 6f3914
Packit 6f3914
        start = 0
Packit 6f3914
        stop = 0
Packit 6f3914
        for i in range(0, len(key)):
Packit 6f3914
            if key[i] == '-----BEGIN PGP PUBLIC KEY BLOCK-----':
Packit 6f3914
                start = i
Packit 6f3914
            if key[i] == '-----END PGP PUBLIC KEY BLOCK-----':
Packit 6f3914
                stop = i
Packit 6f3914
Packit 6f3914
        cat_key = ''.join(key[start + 2:stop - 1]).encode('ascii')
Packit 6f3914
        return KeyInfo(email, cat_key)
Packit 6f3914
Packit 6f3914
Packit 6f3914
class DNSSECKeyVerification:
Packit 6f3914
    """
Packit 6f3914
    The main class when it comes to verification itself. It wraps Unbound context and a cache with
Packit 6f3914
    already obtained results.
Packit 6f3914
    """
Packit 6f3914
Packit 6f3914
    # Mapping from email address to b64 encoded public key or NoKey in case of proven nonexistence
Packit 6f3914
    _cache = {}
Packit 6f3914
    # type: Dict[str, Union[str, NoKey]]
Packit 6f3914
Packit 6f3914
    @staticmethod
Packit 6f3914
    def _cache_hit(key_union, input_key_string):
Packit 6f3914
        # type: (Union[str, NoKey], str) -> Validity
Packit 6f3914
        """
Packit 6f3914
        Compare the key in case it was found in the cache.
Packit 6f3914
        """
Packit 6f3914
        if key_union == input_key_string:
Packit 6f3914
            logger.debug("Cache hit, valid key")
Packit 6f3914
            return Validity.VALID
Packit 6f3914
        elif key_union is NoKey:
Packit 6f3914
            logger.debug("Cache hit, proven non-existence")
Packit 6f3914
            return Validity.PROVEN_NONEXISTENCE
Packit 6f3914
        else:
Packit 6f3914
            logger.debug("Key in cache: {}".format(key_union))
Packit 6f3914
            logger.debug("Input key   : {}".format(input_key_string))
Packit 6f3914
            return Validity.REVOKED
Packit 6f3914
Packit 6f3914
    @staticmethod
Packit 6f3914
    def _cache_miss(input_key):
Packit 6f3914
        # type: (KeyInfo) -> Validity
Packit 6f3914
        """
Packit 6f3914
        In case the key was not found in the cache, create an Unbound context and contact the DNS
Packit 6f3914
        system
Packit 6f3914
        """
Packit 6f3914
        try:
Packit 6f3914
            import unbound
Packit 6f3914
        except ImportError as e:
Packit 6f3914
            msg = _("Configuration option 'gpgkey_dns_verification' requires "
Packit 6f3914
                    "libunbound ({})".format(e))
Packit 6f3914
            raise dnf.exceptions.Error(msg)
Packit 6f3914
Packit 6f3914
        ctx = unbound.ub_ctx()
Packit 6f3914
        if ctx.set_option("verbosity:", "0") != 0:
Packit 6f3914
            logger.debug("Unbound context: Failed to set verbosity")
Packit 6f3914
Packit 6f3914
        if ctx.set_option("qname-minimisation:", "yes") != 0:
Packit 6f3914
            logger.debug("Unbound context: Failed to set qname minimisation")
Packit 6f3914
Packit 6f3914
        if ctx.resolvconf() != 0:
Packit 6f3914
            logger.debug("Unbound context: Failed to read resolv.conf")
Packit 6f3914
Packit 6f3914
        if ctx.add_ta_file("/var/lib/unbound/root.key") != 0:
Packit 6f3914
            logger.debug("Unbound context: Failed to add trust anchor file")
Packit 6f3914
Packit 6f3914
        status, result = ctx.resolve(email2location(input_key.email),
Packit 6f3914
                                     RR_TYPE_OPENPGPKEY, unbound.RR_CLASS_IN)
Packit 6f3914
        if status != 0:
Packit 6f3914
            logger.debug("Communication with DNS servers failed")
Packit 6f3914
            return Validity.ERROR
Packit 6f3914
        if result.bogus:
Packit 6f3914
            logger.debug("DNSSEC signatures are wrong")
Packit 6f3914
            return Validity.BOGUS_RESULT
Packit 6f3914
        if not result.secure:
Packit 6f3914
            logger.debug("Result is not secured with DNSSEC")
Packit 6f3914
            return Validity.RESULT_NOT_SECURE
Packit 6f3914
        if result.nxdomain:
Packit 6f3914
            logger.debug("Non-existence of this record was proven by DNSSEC")
Packit 6f3914
            return Validity.PROVEN_NONEXISTENCE
Packit 6f3914
        if not result.havedata:
Packit 6f3914
            # TODO: This is weird result, but there is no way to perform validation, so just return
Packit 6f3914
            # an error
Packit 6f3914
            logger.debug("Unknown error in DNS communication")
Packit 6f3914
            return Validity.ERROR
Packit 6f3914
        else:
Packit 6f3914
            data = result.data.as_raw_data()[0]
Packit 6f3914
            dns_data_b64 = base64.b64encode(data)
Packit 6f3914
            if dns_data_b64 == input_key.key:
Packit 6f3914
                return Validity.VALID
Packit 6f3914
            else:
Packit 6f3914
                # In case it is different, print the keys for further examination in debug mode
Packit 6f3914
                logger.debug("Key from DNS: {}".format(dns_data_b64))
Packit 6f3914
                logger.debug("Input key   : {}".format(input_key.key))
Packit 6f3914
                return Validity.REVOKED
Packit 6f3914
Packit 6f3914
    @staticmethod
Packit 6f3914
    def verify(input_key):
Packit 6f3914
        # type: (KeyInfo) -> Validity
Packit 6f3914
        """
Packit 6f3914
        Public API. Use this method to verify a KeyInfo object.
Packit 6f3914
        """
Packit 6f3914
        logger.debug("Running verification for key with id: {}".format(input_key.email))
Packit 6f3914
        key_union = DNSSECKeyVerification._cache.get(input_key.email)
Packit 6f3914
        if key_union is not None:
Packit 6f3914
            return DNSSECKeyVerification._cache_hit(key_union, input_key.key)
Packit 6f3914
        else:
Packit 6f3914
            result = DNSSECKeyVerification._cache_miss(input_key)
Packit 6f3914
            if result == Validity.VALID:
Packit 6f3914
                DNSSECKeyVerification._cache[input_key.email] = input_key.key
Packit 6f3914
            elif result == Validity.PROVEN_NONEXISTENCE:
Packit 6f3914
                DNSSECKeyVerification._cache[input_key.email] = NoKey()
Packit 6f3914
            return result
Packit 6f3914
Packit 6f3914
Packit 6f3914
def nice_user_msg(ki, v):
Packit 6f3914
    # type: (KeyInfo, Validity) -> str
Packit 6f3914
    """
Packit 6f3914
    Inform the user about key validity in a human readable way.
Packit 6f3914
    """
Packit 6f3914
    prefix = _("DNSSEC extension: Key for user ") + ki.email + " "
Packit 6f3914
    if v == Validity.VALID:
Packit 6f3914
        return prefix + _("is valid.")
Packit 6f3914
    else:
Packit 6f3914
        return prefix + _("has unknown status.")
Packit 6f3914
Packit 6f3914
Packit 6f3914
def any_msg(m):
Packit 6f3914
    # type: (str) -> str
Packit 6f3914
    """
Packit 6f3914
    Label any given message with DNSSEC extension tag
Packit 6f3914
    """
Packit 6f3914
    return _("DNSSEC extension: ") + m
Packit 6f3914
Packit 6f3914
Packit 6f3914
class RpmImportedKeys:
Packit 6f3914
    """
Packit 6f3914
    Wrapper around keys, that are imported in the RPM database.
Packit 6f3914
Packit 6f3914
    The keys are stored in packages with name gpg-pubkey, where the version and
Packit 6f3914
    release is different for each of them. The key content itself is stored as
Packit 6f3914
    an ASCII armored string in the package description, so it needs to be parsed
Packit 6f3914
    before it can be used.
Packit 6f3914
    """
Packit 6f3914
    @staticmethod
Packit 6f3914
    def _query_db_for_gpg_keys():
Packit 6f3914
        # type: () -> List[KeyInfo]
Packit 6f3914
        # TODO: base.conf.installroot ?? -----------------------\
Packit 6f3914
        transaction_set = dnf.rpm.transaction.TransactionWrapper()
Packit 6f3914
        packages = transaction_set.dbMatch("name", "gpg-pubkey")
Packit 6f3914
        return_list = []
Packit 6f3914
        for pkg in packages:
Packit 6f3914
            packager = dnf.rpm.getheader(pkg, 'packager')
Packit 6f3914
            email = re.search('<(.*@.*)>', packager).group(1)
Packit 6f3914
            description = dnf.rpm.getheader(pkg, 'description')
Packit 6f3914
            key_lines = description.split('\n')[3:-3]
Packit 6f3914
            key_str = ''.join(key_lines)
Packit 6f3914
            return_list += [KeyInfo(email, key_str.encode('ascii'))]
Packit 6f3914
Packit 6f3914
        return return_list
Packit 6f3914
Packit 6f3914
    @staticmethod
Packit 6f3914
    def check_imported_keys_validity():
Packit 6f3914
        keys = RpmImportedKeys._query_db_for_gpg_keys()
Packit 6f3914
        logger.info(any_msg(_("Testing already imported keys for their validity.")))
Packit 6f3914
        for key in keys:
Packit 6f3914
            try:
Packit 6f3914
                result = DNSSECKeyVerification.verify(key)
Packit 6f3914
            except DnssecError as e:
Packit 6f3914
                # Errors in this exception should not be fatal, print it and just continue
Packit 6f3914
                logger.exception("Exception raised in DNSSEC extension: email={}, exception={}"
Packit 6f3914
                                 .format(key.email, repr(e)))
Packit 6f3914
                continue
Packit 6f3914
            # TODO: remove revoked keys automatically and possibly ask user to confirm
Packit 6f3914
            if result == Validity.VALID:
Packit 6f3914
                logger.debug(any_msg("GPG Key {} is valid".format(key.email)))
Packit 6f3914
                pass
Packit 6f3914
            elif result == Validity.PROVEN_NONEXISTENCE:
Packit 6f3914
                logger.debug(any_msg("GPG Key {} does not support DNS"
Packit 6f3914
                                    " verification".format(key.email)))
Packit 6f3914
            elif result == Validity.BOGUS_RESULT:
Packit 6f3914
                logger.info(any_msg("GPG Key {} could not be verified, because DNSSEC signatures"
Packit 6f3914
                                    " are bogus. Possible causes: wrong configuration of the DNS"
Packit 6f3914
                                    " server, MITM attack".format(key.email)))
Packit 6f3914
            elif result == Validity.REVOKED:
Packit 6f3914
                logger.info(any_msg("GPG Key {} has been revoked and should"
Packit 6f3914
                                    " be removed immediately".format(key.email)))
Packit 6f3914
            else:
Packit 6f3914
                logger.debug(any_msg("GPG Key {} could not be tested".format(key.email)))