Blame cloudinit/gpg.py

Packit bc9a3a
# Copyright (C) 2016 Canonical Ltd.
Packit bc9a3a
#
Packit bc9a3a
# Author: Scott Moser <scott.moser@canonical.com>
Packit bc9a3a
# Author: Christian Ehrhardt <christian.ehrhardt@canonical.com>
Packit bc9a3a
#
Packit bc9a3a
# This file is part of cloud-init. See LICENSE file for license information.
Packit bc9a3a
Packit bc9a3a
"""gpg.py - Collection of gpg key related functions"""
Packit bc9a3a
Packit bc9a3a
from cloudinit import log as logging
Packit bc9a3a
from cloudinit import util
Packit bc9a3a
Packit bc9a3a
import time
Packit bc9a3a
Packit bc9a3a
LOG = logging.getLogger(__name__)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def export_armour(key):
Packit bc9a3a
    """Export gpg key, armoured key gets returned"""
Packit bc9a3a
    try:
Packit bc9a3a
        (armour, _) = util.subp(["gpg", "--export", "--armour", key],
Packit bc9a3a
                                capture=True)
Packit bc9a3a
    except util.ProcessExecutionError as error:
Packit bc9a3a
        # debug, since it happens for any key not on the system initially
Packit bc9a3a
        LOG.debug('Failed to export armoured key "%s": %s', key, error)
Packit bc9a3a
        armour = None
Packit bc9a3a
    return armour
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def recv_key(key, keyserver, retries=(1, 1)):
Packit bc9a3a
    """Receive gpg key from the specified keyserver.
Packit bc9a3a
Packit bc9a3a
    Retries are done by default because keyservers can be unreliable.
Packit bc9a3a
    Additionally, there is no way to determine the difference between
Packit bc9a3a
    a non-existant key and a failure.  In both cases gpg (at least 2.2.4)
Packit bc9a3a
    exits with status 2 and stderr: "keyserver receive failed: No data"
Packit bc9a3a
    It is assumed that a key provided to cloud-init exists on the keyserver
Packit bc9a3a
    so re-trying makes better sense than failing.
Packit bc9a3a
Packit bc9a3a
    @param key: a string key fingerprint (as passed to gpg --recv-keys).
Packit bc9a3a
    @param keyserver: the keyserver to request keys from.
Packit bc9a3a
    @param retries: an iterable of sleep lengths for retries.
Packit bc9a3a
                    Use None to indicate no retries."""
Packit bc9a3a
    LOG.debug("Importing key '%s' from keyserver '%s'", key, keyserver)
Packit bc9a3a
    cmd = ["gpg", "--keyserver=%s" % keyserver, "--recv-keys", key]
Packit bc9a3a
    if retries is None:
Packit bc9a3a
        retries = []
Packit bc9a3a
    trynum = 0
Packit bc9a3a
    error = None
Packit bc9a3a
    sleeps = iter(retries)
Packit bc9a3a
    while True:
Packit bc9a3a
        trynum += 1
Packit bc9a3a
        try:
Packit bc9a3a
            util.subp(cmd, capture=True)
Packit bc9a3a
            LOG.debug("Imported key '%s' from keyserver '%s' on try %d",
Packit bc9a3a
                      key, keyserver, trynum)
Packit bc9a3a
            return
Packit bc9a3a
        except util.ProcessExecutionError as e:
Packit bc9a3a
            error = e
Packit bc9a3a
        try:
Packit bc9a3a
            naplen = next(sleeps)
Packit bc9a3a
            LOG.debug(
Packit bc9a3a
                "Import failed with exit code %d, will try again in %ss",
Packit bc9a3a
                error.exit_code, naplen)
Packit bc9a3a
            time.sleep(naplen)
Packit bc9a3a
        except StopIteration:
Packit bc9a3a
            raise ValueError(
Packit bc9a3a
                ("Failed to import key '%s' from keyserver '%s' "
Packit bc9a3a
                 "after %d tries: %s") % (key, keyserver, trynum, error))
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def delete_key(key):
Packit bc9a3a
    """Delete the specified key from the local gpg ring"""
Packit bc9a3a
    try:
Packit bc9a3a
        util.subp(["gpg", "--batch", "--yes", "--delete-keys", key],
Packit bc9a3a
                  capture=True)
Packit bc9a3a
    except util.ProcessExecutionError as error:
Packit bc9a3a
        LOG.warning('Failed delete key "%s": %s', key, error)
Packit bc9a3a
Packit bc9a3a
Packit bc9a3a
def getkeybyid(keyid, keyserver='keyserver.ubuntu.com'):
Packit bc9a3a
    """get gpg keyid from keyserver"""
Packit bc9a3a
    armour = export_armour(keyid)
Packit bc9a3a
    if not armour:
Packit bc9a3a
        try:
Packit bc9a3a
            recv_key(keyid, keyserver=keyserver)
Packit bc9a3a
            armour = export_armour(keyid)
Packit bc9a3a
        except ValueError:
Packit bc9a3a
            LOG.exception('Failed to obtain gpg key %s', keyid)
Packit bc9a3a
            raise
Packit bc9a3a
        finally:
Packit bc9a3a
            # delete just imported key to leave environment as it was before
Packit bc9a3a
            delete_key(keyid)
Packit bc9a3a
Packit bc9a3a
    return armour
Packit bc9a3a
Packit bc9a3a
# vi: ts=4 expandtab