Blame dnf/dnssec.py

Packit Service 21c75c
# dnssec.py
Packit Service 21c75c
# DNS extension for automatic GPG key verification
Packit Service 21c75c
#
Packit Service 21c75c
# Copyright (C) 2012-2018 Red Hat, Inc.
Packit Service 21c75c
#
Packit Service 21c75c
# This copyrighted material is made available to anyone wishing to use,
Packit Service 21c75c
# modify, copy, or redistribute it subject to the terms and conditions of
Packit Service 21c75c
# the GNU General Public License v.2, or (at your option) any later version.
Packit Service 21c75c
# This program is distributed in the hope that it will be useful, but WITHOUT
Packit Service 21c75c
# ANY WARRANTY expressed or implied, including the implied warranties of
Packit Service 21c75c
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Packit Service 21c75c
# Public License for more details.  You should have received a copy of the
Packit Service 21c75c
# GNU General Public License along with this program; if not, write to the
Packit Service 21c75c
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 21c75c
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
Packit Service 21c75c
# source code or documentation are not subject to the GNU General Public
Packit Service 21c75c
# License and may only be used or replicated with the express permission of
Packit Service 21c75c
# Red Hat, Inc.
Packit Service 21c75c
#
Packit Service 21c75c
Packit Service 21c75c
from __future__ import print_function
Packit Service 21c75c
from __future__ import absolute_import
Packit Service 21c75c
from __future__ import unicode_literals
Packit Service 21c75c
Packit Service 21c75c
from enum import Enum
Packit Service 21c75c
import base64
Packit Service 21c75c
import hashlib
Packit Service 21c75c
import logging
Packit Service 21c75c
import re
Packit Service 21c75c
Packit Service 21c75c
from dnf.i18n import _
Packit Service 21c75c
import dnf.rpm
Packit Service 21c75c
import dnf.exceptions
Packit Service 21c75c
Packit Service 21c75c
logger = logging.getLogger("dnf")
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
RR_TYPE_OPENPGPKEY = 61
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class DnssecError(dnf.exceptions.Error):
Packit Service 21c75c
    """
Packit Service 21c75c
    Exception used in the dnssec module
Packit Service 21c75c
    """
Packit Service 21c75c
    def __repr__(self):
Packit Service 21c75c
        return "<DnssecError, value='{}'>"\
Packit Service 21c75c
            .format(self.value if self.value is not None else "Not specified")
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
def email2location(email_address, tag="_openpgpkey"):
Packit Service 21c75c
    # type: (str, str) -> str
Packit Service 21c75c
    """
Packit Service 21c75c
    Implements RFC 7929, section 3
Packit Service 21c75c
    https://tools.ietf.org/html/rfc7929#section-3
Packit Service 21c75c
    :param email_address:
Packit Service 21c75c
    :param tag:
Packit Service 21c75c
    :return:
Packit Service 21c75c
    """
Packit Service 21c75c
    split = email_address.split("@")
Packit Service 21c75c
    if len(split) != 2:
Packit Service 21c75c
        msg = "Email address must contain exactly one '@' sign."
Packit Service 21c75c
        raise DnssecError(msg)
Packit Service 21c75c
Packit Service 21c75c
    local = split[0]
Packit Service 21c75c
    domain = split[1]
Packit Service 21c75c
    hash = hashlib.sha256()
Packit Service 21c75c
    hash.update(local.encode('utf-8'))
Packit Service 21c75c
    digest = base64.b16encode(hash.digest()[0:28])\
Packit Service 21c75c
        .decode("utf-8")\
Packit Service 21c75c
        .lower()
Packit Service 21c75c
    return digest + "." + tag + "." + domain
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class Validity(Enum):
Packit Service 21c75c
    """
Packit Service 21c75c
    Output of the verification algorithm.
Packit Service 21c75c
    TODO: this type might be simplified in order to less reflect the underlying DNS layer.
Packit Service 21c75c
    TODO: more specifically the variants from 3 to 5 should have more understandable names
Packit Service 21c75c
    """
Packit Service 21c75c
    VALID = 1
Packit Service 21c75c
    REVOKED = 2
Packit Service 21c75c
    PROVEN_NONEXISTENCE = 3
Packit Service 21c75c
    RESULT_NOT_SECURE = 4
Packit Service 21c75c
    BOGUS_RESULT = 5
Packit Service 21c75c
    ERROR = 9
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class NoKey:
Packit Service 21c75c
    """
Packit Service 21c75c
    This class represents an absence of a key in the cache. It is an expression of non-existence
Packit Service 21c75c
    using the Python's type system.
Packit Service 21c75c
    """
Packit Service 21c75c
    pass
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class KeyInfo:
Packit Service 21c75c
    """
Packit Service 21c75c
    Wrapper class for email and associated verification key, where both are represented in
Packit Service 21c75c
    form of a string.
Packit Service 21c75c
    """
Packit Service 21c75c
    def __init__(self, email=None, key=None):
Packit Service 21c75c
        self.email = email
Packit Service 21c75c
        self.key = key
Packit Service 21c75c
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def from_rpm_key_object(userid, raw_key):
Packit Service 21c75c
        # type: (str, bytes) -> KeyInfo
Packit Service 21c75c
        """
Packit Service 21c75c
        Since dnf uses different format of the key than the one used in DNS RR, I need to convert
Packit Service 21c75c
        the former one into the new one.
Packit Service 21c75c
        """
Packit Service 21c75c
        input_email = re.search('<(.*@.*)>', userid)
Packit Service 21c75c
        if input_email is None:
Packit Service 21c75c
            raise DnssecError
Packit Service 21c75c
Packit Service 21c75c
        email = input_email.group(1)
Packit Service 21c75c
        key = raw_key.decode('ascii').split('\n')
Packit Service 21c75c
Packit Service 21c75c
        start = 0
Packit Service 21c75c
        stop = 0
Packit Service 21c75c
        for i in range(0, len(key)):
Packit Service 21c75c
            if key[i] == '-----BEGIN PGP PUBLIC KEY BLOCK-----':
Packit Service 21c75c
                start = i
Packit Service 21c75c
            if key[i] == '-----END PGP PUBLIC KEY BLOCK-----':
Packit Service 21c75c
                stop = i
Packit Service 21c75c
Packit Service 21c75c
        cat_key = ''.join(key[start + 2:stop - 1]).encode('ascii')
Packit Service 21c75c
        return KeyInfo(email, cat_key)
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class DNSSECKeyVerification:
Packit Service 21c75c
    """
Packit Service 21c75c
    The main class when it comes to verification itself. It wraps Unbound context and a cache with
Packit Service 21c75c
    already obtained results.
Packit Service 21c75c
    """
Packit Service 21c75c
Packit Service 21c75c
    # Mapping from email address to b64 encoded public key or NoKey in case of proven nonexistence
Packit Service 21c75c
    _cache = {}
Packit Service 21c75c
    # type: Dict[str, Union[str, NoKey]]
Packit Service 21c75c
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def _cache_hit(key_union, input_key_string):
Packit Service 21c75c
        # type: (Union[str, NoKey], str) -> Validity
Packit Service 21c75c
        """
Packit Service 21c75c
        Compare the key in case it was found in the cache.
Packit Service 21c75c
        """
Packit Service 21c75c
        if key_union == input_key_string:
Packit Service 21c75c
            logger.debug("Cache hit, valid key")
Packit Service 21c75c
            return Validity.VALID
Packit Service 21c75c
        elif key_union is NoKey:
Packit Service 21c75c
            logger.debug("Cache hit, proven non-existence")
Packit Service 21c75c
            return Validity.PROVEN_NONEXISTENCE
Packit Service 21c75c
        else:
Packit Service 21c75c
            logger.debug("Key in cache: {}".format(key_union))
Packit Service 21c75c
            logger.debug("Input key   : {}".format(input_key_string))
Packit Service 21c75c
            return Validity.REVOKED
Packit Service 21c75c
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def _cache_miss(input_key):
Packit Service 21c75c
        # type: (KeyInfo) -> Validity
Packit Service 21c75c
        """
Packit Service 21c75c
        In case the key was not found in the cache, create an Unbound context and contact the DNS
Packit Service 21c75c
        system
Packit Service 21c75c
        """
Packit Service 21c75c
        try:
Packit Service 21c75c
            import unbound
Packit Service 21c75c
        except ImportError as e:
Packit Service 21c75c
            msg = _("Configuration option 'gpgkey_dns_verification' requires "
Packit Service 21c75c
                    "libunbound ({})".format(e))
Packit Service 21c75c
            raise dnf.exceptions.Error(msg)
Packit Service 21c75c
Packit Service 21c75c
        ctx = unbound.ub_ctx()
Packit Service 21c75c
        if ctx.set_option("verbosity:", "0") != 0:
Packit Service 21c75c
            logger.debug("Unbound context: Failed to set verbosity")
Packit Service 21c75c
Packit Service 21c75c
        if ctx.set_option("qname-minimisation:", "yes") != 0:
Packit Service 21c75c
            logger.debug("Unbound context: Failed to set qname minimisation")
Packit Service 21c75c
Packit Service 21c75c
        if ctx.resolvconf() != 0:
Packit Service 21c75c
            logger.debug("Unbound context: Failed to read resolv.conf")
Packit Service 21c75c
Packit Service 21c75c
        if ctx.add_ta_file("/var/lib/unbound/root.key") != 0:
Packit Service 21c75c
            logger.debug("Unbound context: Failed to add trust anchor file")
Packit Service 21c75c
Packit Service 21c75c
        status, result = ctx.resolve(email2location(input_key.email),
Packit Service 21c75c
                                     RR_TYPE_OPENPGPKEY, unbound.RR_CLASS_IN)
Packit Service 21c75c
        if status != 0:
Packit Service 21c75c
            logger.debug("Communication with DNS servers failed")
Packit Service 21c75c
            return Validity.ERROR
Packit Service 21c75c
        if result.bogus:
Packit Service 21c75c
            logger.debug("DNSSEC signatures are wrong")
Packit Service 21c75c
            return Validity.BOGUS_RESULT
Packit Service 21c75c
        if not result.secure:
Packit Service 21c75c
            logger.debug("Result is not secured with DNSSEC")
Packit Service 21c75c
            return Validity.RESULT_NOT_SECURE
Packit Service 21c75c
        if result.nxdomain:
Packit Service 21c75c
            logger.debug("Non-existence of this record was proven by DNSSEC")
Packit Service 21c75c
            return Validity.PROVEN_NONEXISTENCE
Packit Service 21c75c
        if not result.havedata:
Packit Service 21c75c
            # TODO: This is weird result, but there is no way to perform validation, so just return
Packit Service 21c75c
            # an error
Packit Service 21c75c
            logger.debug("Unknown error in DNS communication")
Packit Service 21c75c
            return Validity.ERROR
Packit Service 21c75c
        else:
Packit Service 21c75c
            data = result.data.as_raw_data()[0]
Packit Service 21c75c
            dns_data_b64 = base64.b64encode(data)
Packit Service 21c75c
            if dns_data_b64 == input_key.key:
Packit Service 21c75c
                return Validity.VALID
Packit Service 21c75c
            else:
Packit Service 21c75c
                # In case it is different, print the keys for further examination in debug mode
Packit Service 21c75c
                logger.debug("Key from DNS: {}".format(dns_data_b64))
Packit Service 21c75c
                logger.debug("Input key   : {}".format(input_key.key))
Packit Service 21c75c
                return Validity.REVOKED
Packit Service 21c75c
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def verify(input_key):
Packit Service 21c75c
        # type: (KeyInfo) -> Validity
Packit Service 21c75c
        """
Packit Service 21c75c
        Public API. Use this method to verify a KeyInfo object.
Packit Service 21c75c
        """
Packit Service 21c75c
        logger.debug("Running verification for key with id: {}".format(input_key.email))
Packit Service 21c75c
        key_union = DNSSECKeyVerification._cache.get(input_key.email)
Packit Service 21c75c
        if key_union is not None:
Packit Service 21c75c
            return DNSSECKeyVerification._cache_hit(key_union, input_key.key)
Packit Service 21c75c
        else:
Packit Service 21c75c
            result = DNSSECKeyVerification._cache_miss(input_key)
Packit Service 21c75c
            if result == Validity.VALID:
Packit Service 21c75c
                DNSSECKeyVerification._cache[input_key.email] = input_key.key
Packit Service 21c75c
            elif result == Validity.PROVEN_NONEXISTENCE:
Packit Service 21c75c
                DNSSECKeyVerification._cache[input_key.email] = NoKey()
Packit Service 21c75c
            return result
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
def nice_user_msg(ki, v):
Packit Service 21c75c
    # type: (KeyInfo, Validity) -> str
Packit Service 21c75c
    """
Packit Service 21c75c
    Inform the user about key validity in a human readable way.
Packit Service 21c75c
    """
Packit Service 21c75c
    prefix = _("DNSSEC extension: Key for user ") + ki.email + " "
Packit Service 21c75c
    if v == Validity.VALID:
Packit Service 21c75c
        return prefix + _("is valid.")
Packit Service 21c75c
    else:
Packit Service 21c75c
        return prefix + _("has unknown status.")
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
def any_msg(m):
Packit Service 21c75c
    # type: (str) -> str
Packit Service 21c75c
    """
Packit Service 21c75c
    Label any given message with DNSSEC extension tag
Packit Service 21c75c
    """
Packit Service 21c75c
    return _("DNSSEC extension: ") + m
Packit Service 21c75c
Packit Service 21c75c
Packit Service 21c75c
class RpmImportedKeys:
Packit Service 21c75c
    """
Packit Service 21c75c
    Wrapper around keys, that are imported in the RPM database.
Packit Service 21c75c
Packit Service 21c75c
    The keys are stored in packages with name gpg-pubkey, where the version and
Packit Service 21c75c
    release is different for each of them. The key content itself is stored as
Packit Service 21c75c
    an ASCII armored string in the package description, so it needs to be parsed
Packit Service 21c75c
    before it can be used.
Packit Service 21c75c
    """
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def _query_db_for_gpg_keys():
Packit Service 21c75c
        # type: () -> List[KeyInfo]
Packit Service 21c75c
        # TODO: base.conf.installroot ?? -----------------------\
Packit Service 21c75c
        transaction_set = dnf.rpm.transaction.TransactionWrapper()
Packit Service 21c75c
        packages = transaction_set.dbMatch("name", "gpg-pubkey")
Packit Service 21c75c
        return_list = []
Packit Service 21c75c
        for pkg in packages:
Packit Service 21c75c
            packager = dnf.rpm.getheader(pkg, 'packager')
Packit Service 21c75c
            email = re.search('<(.*@.*)>', packager).group(1)
Packit Service 21c75c
            description = dnf.rpm.getheader(pkg, 'description')
Packit Service 21c75c
            key_lines = description.split('\n')[3:-3]
Packit Service 21c75c
            key_str = ''.join(key_lines)
Packit Service 21c75c
            return_list += [KeyInfo(email, key_str.encode('ascii'))]
Packit Service 21c75c
Packit Service 21c75c
        return return_list
Packit Service 21c75c
Packit Service 21c75c
    @staticmethod
Packit Service 21c75c
    def check_imported_keys_validity():
Packit Service 21c75c
        keys = RpmImportedKeys._query_db_for_gpg_keys()
Packit Service 21c75c
        logger.info(any_msg(_("Testing already imported keys for their validity.")))
Packit Service 21c75c
        for key in keys:
Packit Service 21c75c
            try:
Packit Service 21c75c
                result = DNSSECKeyVerification.verify(key)
Packit Service 21c75c
            except DnssecError as e:
Packit Service 21c75c
                # Errors in this exception should not be fatal, print it and just continue
Packit Service 21c75c
                logger.warning("DNSSEC extension error (email={}): {}"
Packit Service 21c75c
                             .format(key.email, e.value))
Packit Service 21c75c
                continue
Packit Service 21c75c
            # TODO: remove revoked keys automatically and possibly ask user to confirm
Packit Service 21c75c
            if result == Validity.VALID:
Packit Service 21c75c
                logger.debug(any_msg("GPG Key {} is valid".format(key.email)))
Packit Service 21c75c
                pass
Packit Service 21c75c
            elif result == Validity.PROVEN_NONEXISTENCE:
Packit Service 21c75c
                logger.debug(any_msg("GPG Key {} does not support DNS"
Packit Service 21c75c
                                    " verification".format(key.email)))
Packit Service 21c75c
            elif result == Validity.BOGUS_RESULT:
Packit Service 21c75c
                logger.info(any_msg("GPG Key {} could not be verified, because DNSSEC signatures"
Packit Service 21c75c
                                    " are bogus. Possible causes: wrong configuration of the DNS"
Packit Service 21c75c
                                    " server, MITM attack".format(key.email)))
Packit Service 21c75c
            elif result == Validity.REVOKED:
Packit Service 21c75c
                logger.info(any_msg("GPG Key {} has been revoked and should"
Packit Service 21c75c
                                    " be removed immediately".format(key.email)))
Packit Service 21c75c
            else:
Packit Service 21c75c
                logger.debug(any_msg("GPG Key {} could not be tested".format(key.email)))