Blame lang/python/tests/support.py

Packit Service 672cf4
# Copyright (C) 2016 g10 Code GmbH
Packit Service 672cf4
#
Packit Service 672cf4
# This file is part of GPGME.
Packit Service 672cf4
#
Packit Service 672cf4
# GPGME is free software; you can redistribute it and/or modify it
Packit Service 672cf4
# under the terms of the GNU General Public License as published by
Packit Service 672cf4
# the Free Software Foundation; either version 2 of the License, or
Packit Service 672cf4
# (at your option) any later version.
Packit Service 672cf4
#
Packit Service 672cf4
# GPGME is distributed in the hope that it will be useful, but WITHOUT
Packit Service 672cf4
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
Packit Service 672cf4
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
Packit Service 672cf4
# Public License for more details.
Packit Service 672cf4
#
Packit Service 672cf4
# You should have received a copy of the GNU Lesser General Public
Packit Service 6c01f9
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
Packit Service 672cf4
Packit Service 672cf4
from __future__ import absolute_import, print_function, unicode_literals
Packit Service 6c01f9
del absolute_import, print_function, unicode_literals
Packit Service 672cf4
Packit Service 672cf4
import contextlib
Packit Service 672cf4
import shutil
Packit Service 672cf4
import sys
Packit Service 672cf4
import os
Packit Service 672cf4
import re
Packit Service 672cf4
import tempfile
Packit Service 672cf4
import time
Packit Service 672cf4
import gpg
Packit Service 672cf4
Packit Service 672cf4
def assert_gpg_version(version=(2, 1, 0)):
Packit Service 672cf4
    with gpg.Context() as c:
Packit Service 6c01f9
        clean_version = re.match(r'\d+\.\d+\.\d+', c.engine_info.version).group(0)
Packit Service 672cf4
        if tuple(map(int, clean_version.split('.'))) < version:
Packit Service 672cf4
            print("GnuPG too old: have {0}, need {1}.".format(
Packit Service 672cf4
                c.engine_info.version, '.'.join(map(str, version))))
Packit Service 672cf4
            sys.exit(77)
Packit Service 672cf4
Packit Service 672cf4
def have_tofu_support(ctx, some_uid):
Packit Service 6c01f9
    keys = list(ctx.keylist(some_uid,
Packit Service 6c01f9
                            mode=(gpg.constants.keylist.mode.LOCAL
Packit Service 6c01f9
                                  |gpg.constants.keylist.mode.WITH_TOFU)))
Packit Service 672cf4
    return len(keys) > 0
Packit Service 672cf4
Packit Service 672cf4
# Skip the Python tests for GnuPG < 2.1.12.  Prior versions do not
Packit Service 672cf4
# understand the command line flags that we assume exist.  C.f. issue
Packit Service 672cf4
# 3008.
Packit Service 672cf4
assert_gpg_version((2, 1, 12))
Packit Service 672cf4
Packit Service 672cf4
# known keys
Packit Service 672cf4
alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734"
Packit Service 672cf4
bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2"
Packit Service 672cf4
encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C"
Packit Service 672cf4
sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D"
Packit Service 672cf4
no_such_key = "A" * 40
Packit Service 672cf4
Packit Service 672cf4
def make_filename(name):
Packit Service 672cf4
    return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name)
Packit Service 672cf4
Packit Service 672cf4
def in_srcdir(name):
Packit Service 672cf4
    return os.path.join(os.environ['srcdir'], name)
Packit Service 672cf4
Packit Service 672cf4
verbose = int(os.environ.get('verbose', 0)) > 1
Packit Service 672cf4
def print_data(data):
Packit Service 672cf4
    if verbose:
Packit Service 672cf4
        try:
Packit Service 672cf4
            # See if it is a file-like object.
Packit Service 672cf4
            data.seek(0, os.SEEK_SET)
Packit Service 672cf4
            data = data.read()
Packit Service 672cf4
        except:
Packit Service 672cf4
            # Hope for the best.
Packit Service 672cf4
            pass
Packit Service 672cf4
Packit Service 672cf4
        if hasattr(sys.stdout, "buffer"):
Packit Service 672cf4
            sys.stdout.buffer.write(data)
Packit Service 672cf4
        else:
Packit Service 672cf4
            sys.stdout.write(data)
Packit Service 672cf4
Packit Service 672cf4
def mark_key_trusted(ctx, key):
Packit Service 672cf4
    class Editor(object):
Packit Service 672cf4
        def __init__(self):
Packit Service 672cf4
            self.steps = ["trust", "save"]
Packit Service 672cf4
        def edit(self, status, args, out):
Packit Service 672cf4
            if args == "keyedit.prompt":
Packit Service 672cf4
                result = self.steps.pop(0)
Packit Service 672cf4
            elif args == "edit_ownertrust.value":
Packit Service 672cf4
                result = "5"
Packit Service 672cf4
            elif args == "edit_ownertrust.set_ultimate.okay":
Packit Service 672cf4
                result = "Y"
Packit Service 672cf4
            elif args == "keyedit.save.okay":
Packit Service 672cf4
                result = "Y"
Packit Service 672cf4
            else:
Packit Service 672cf4
                result = None
Packit Service 672cf4
            return result
Packit Service 672cf4
    with gpg.Data() as sink:
Packit Service 672cf4
        ctx.op_edit(key, Editor().edit, sink, sink)
Packit Service 672cf4
Packit Service 672cf4
Packit Service 672cf4
# Python3.2 and up has tempfile.TemporaryDirectory, but we cannot use
Packit Service 672cf4
# that, because there shutil.rmtree is used without
Packit Service 672cf4
# ignore_errors=True, and that races against gpg-agent deleting its
Packit Service 672cf4
# sockets.
Packit Service 672cf4
class TemporaryDirectory(object):
Packit Service 672cf4
    def __enter__(self):
Packit Service 672cf4
        self.path = tempfile.mkdtemp()
Packit Service 672cf4
        return self.path
Packit Service 672cf4
    def __exit__(self, *args):
Packit Service 672cf4
        shutil.rmtree(self.path, ignore_errors=True)
Packit Service 672cf4
Packit Service 672cf4
@contextlib.contextmanager
Packit Service 672cf4
def EphemeralContext():
Packit Service 672cf4
    with TemporaryDirectory() as tmp:
Packit Service 672cf4
        home = os.environ['GNUPGHOME']
Packit Service 672cf4
        shutil.copy(os.path.join(home, "gpg.conf"), tmp)
Packit Service 672cf4
        shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp)
Packit Service 672cf4
Packit Service 672cf4
        with gpg.Context(home_dir=tmp) as ctx:
Packit Service 672cf4
            yield ctx
Packit Service 672cf4
Packit Service 672cf4
            # Ask the agent to quit.
Packit Service 672cf4
            agent_socket = os.path.join(tmp, "S.gpg-agent")
Packit Service 672cf4
            ctx.protocol = gpg.constants.protocol.ASSUAN
Packit Service 672cf4
            ctx.set_engine_info(ctx.protocol, file_name=agent_socket)
Packit Service 672cf4
            try:
Packit Service 672cf4
                ctx.assuan_transact(["KILLAGENT"])
Packit Service 672cf4
            except gpg.errors.GPGMEError as e:
Packit Service 672cf4
                if e.getcode() == gpg.errors.ASS_CONNECT_FAILED:
Packit Service 6c01f9
                    pass # the agent was not running
Packit Service 672cf4
                else:
Packit Service 672cf4
                    raise
Packit Service 672cf4
Packit Service 672cf4
            # Block until it is really gone.
Packit Service 672cf4
            while os.path.exists(agent_socket):
Packit Service 672cf4
                time.sleep(.01)