Blame lang/python/tests/support.py

Packit d7e8d0
# Copyright (C) 2016 g10 Code GmbH
Packit d7e8d0
#
Packit d7e8d0
# This file is part of GPGME.
Packit d7e8d0
#
Packit d7e8d0
# GPGME is free software; you can redistribute it and/or modify it
Packit d7e8d0
# under the terms of the GNU General Public License as published by
Packit d7e8d0
# the Free Software Foundation; either version 2 of the License, or
Packit d7e8d0
# (at your option) any later version.
Packit d7e8d0
#
Packit d7e8d0
# GPGME is distributed in the hope that it will be useful, but WITHOUT
Packit d7e8d0
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
Packit d7e8d0
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
Packit d7e8d0
# Public License for more details.
Packit d7e8d0
#
Packit d7e8d0
# You should have received a copy of the GNU Lesser General Public
Packit d7e8d0
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
Packit d7e8d0
Packit d7e8d0
from __future__ import absolute_import, print_function, unicode_literals
Packit d7e8d0
del absolute_import, print_function, unicode_literals
Packit d7e8d0
Packit d7e8d0
import contextlib
Packit d7e8d0
import shutil
Packit d7e8d0
import sys
Packit d7e8d0
import os
Packit d7e8d0
import re
Packit d7e8d0
import tempfile
Packit d7e8d0
import time
Packit d7e8d0
import gpg
Packit d7e8d0
Packit d7e8d0
def assert_gpg_version(version=(2, 1, 0)):
Packit d7e8d0
    with gpg.Context() as c:
Packit d7e8d0
        clean_version = re.match(r'\d+\.\d+\.\d+', c.engine_info.version).group(0)
Packit d7e8d0
        if tuple(map(int, clean_version.split('.'))) < version:
Packit d7e8d0
            print("GnuPG too old: have {0}, need {1}.".format(
Packit d7e8d0
                c.engine_info.version, '.'.join(map(str, version))))
Packit d7e8d0
            sys.exit(77)
Packit d7e8d0
Packit d7e8d0
def have_tofu_support(ctx, some_uid):
Packit d7e8d0
    keys = list(ctx.keylist(some_uid,
Packit d7e8d0
                            mode=(gpg.constants.keylist.mode.LOCAL
Packit d7e8d0
                                  |gpg.constants.keylist.mode.WITH_TOFU)))
Packit d7e8d0
    return len(keys) > 0
Packit d7e8d0
Packit d7e8d0
# Skip the Python tests for GnuPG < 2.1.12.  Prior versions do not
Packit d7e8d0
# understand the command line flags that we assume exist.  C.f. issue
Packit d7e8d0
# 3008.
Packit d7e8d0
assert_gpg_version((2, 1, 12))
Packit d7e8d0
Packit d7e8d0
# known keys
Packit d7e8d0
alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734"
Packit d7e8d0
bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2"
Packit d7e8d0
encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C"
Packit d7e8d0
sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D"
Packit d7e8d0
no_such_key = "A" * 40
Packit d7e8d0
Packit d7e8d0
def make_filename(name):
Packit d7e8d0
    return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name)
Packit d7e8d0
Packit d7e8d0
def in_srcdir(name):
Packit d7e8d0
    return os.path.join(os.environ['srcdir'], name)
Packit d7e8d0
Packit d7e8d0
verbose = int(os.environ.get('verbose', 0)) > 1
Packit d7e8d0
def print_data(data):
Packit d7e8d0
    if verbose:
Packit d7e8d0
        try:
Packit d7e8d0
            # See if it is a file-like object.
Packit d7e8d0
            data.seek(0, os.SEEK_SET)
Packit d7e8d0
            data = data.read()
Packit d7e8d0
        except:
Packit d7e8d0
            # Hope for the best.
Packit d7e8d0
            pass
Packit d7e8d0
Packit d7e8d0
        if hasattr(sys.stdout, "buffer"):
Packit d7e8d0
            sys.stdout.buffer.write(data)
Packit d7e8d0
        else:
Packit d7e8d0
            sys.stdout.write(data)
Packit d7e8d0
Packit d7e8d0
def mark_key_trusted(ctx, key):
Packit d7e8d0
    class Editor(object):
Packit d7e8d0
        def __init__(self):
Packit d7e8d0
            self.steps = ["trust", "save"]
Packit d7e8d0
        def edit(self, status, args, out):
Packit d7e8d0
            if args == "keyedit.prompt":
Packit d7e8d0
                result = self.steps.pop(0)
Packit d7e8d0
            elif args == "edit_ownertrust.value":
Packit d7e8d0
                result = "5"
Packit d7e8d0
            elif args == "edit_ownertrust.set_ultimate.okay":
Packit d7e8d0
                result = "Y"
Packit d7e8d0
            elif args == "keyedit.save.okay":
Packit d7e8d0
                result = "Y"
Packit d7e8d0
            else:
Packit d7e8d0
                result = None
Packit d7e8d0
            return result
Packit d7e8d0
    with gpg.Data() as sink:
Packit d7e8d0
        ctx.op_edit(key, Editor().edit, sink, sink)
Packit d7e8d0
Packit d7e8d0
Packit d7e8d0
# Python3.2 and up has tempfile.TemporaryDirectory, but we cannot use
Packit d7e8d0
# that, because there shutil.rmtree is used without
Packit d7e8d0
# ignore_errors=True, and that races against gpg-agent deleting its
Packit d7e8d0
# sockets.
Packit d7e8d0
class TemporaryDirectory(object):
Packit d7e8d0
    def __enter__(self):
Packit d7e8d0
        self.path = tempfile.mkdtemp()
Packit d7e8d0
        return self.path
Packit d7e8d0
    def __exit__(self, *args):
Packit d7e8d0
        shutil.rmtree(self.path, ignore_errors=True)
Packit d7e8d0
Packit d7e8d0
@contextlib.contextmanager
Packit d7e8d0
def EphemeralContext():
Packit d7e8d0
    with TemporaryDirectory() as tmp:
Packit d7e8d0
        home = os.environ['GNUPGHOME']
Packit d7e8d0
        shutil.copy(os.path.join(home, "gpg.conf"), tmp)
Packit d7e8d0
        shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp)
Packit d7e8d0
Packit d7e8d0
        with gpg.Context(home_dir=tmp) as ctx:
Packit d7e8d0
            yield ctx
Packit d7e8d0
Packit d7e8d0
            # Ask the agent to quit.
Packit d7e8d0
            agent_socket = os.path.join(tmp, "S.gpg-agent")
Packit d7e8d0
            ctx.protocol = gpg.constants.protocol.ASSUAN
Packit d7e8d0
            ctx.set_engine_info(ctx.protocol, file_name=agent_socket)
Packit d7e8d0
            try:
Packit d7e8d0
                ctx.assuan_transact(["KILLAGENT"])
Packit d7e8d0
            except gpg.errors.GPGMEError as e:
Packit d7e8d0
                if e.getcode() == gpg.errors.ASS_CONNECT_FAILED:
Packit d7e8d0
                    pass # the agent was not running
Packit d7e8d0
                else:
Packit d7e8d0
                    raise
Packit d7e8d0
Packit d7e8d0
            # Block until it is really gone.
Packit d7e8d0
            while os.path.exists(agent_socket):
Packit d7e8d0
                time.sleep(.01)