Blob Blame History Raw
#!/usr/bin/env python

# Copyright (C) 2016 g10 Code GmbH
#
# This file is part of GPGME.
#
# GPGME is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# GPGME is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General
# Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <https://www.gnu.org/licenses/>.

from __future__ import absolute_import, print_function, unicode_literals

import os
import platform
import gpg
import support
_ = support  # to appease pyflakes.

del absolute_import, print_function, unicode_literals

oops = None
c = gpg.Context()
c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK)

source = gpg.Data("Hallo Leute\n")
sink = gpg.Data()

# Valid passphrases, both as string and bytes.
for passphrase in ('foo', b'foo'):

    def passphrase_cb(hint, desc, prev_bad, hook=None):
        assert hook == passphrase
        return hook

    c.set_passphrase_cb(passphrase_cb, passphrase)
    c.op_encrypt([], 0, source, sink)


# Returning an invalid type.
def passphrase_cb(hint, desc, prev_bad, hook=None):
    return 0


c.set_passphrase_cb(passphrase_cb, None)
try:
    c.op_encrypt([], 0, source, sink)
except Exception as e:
    assert type(e) == TypeError
    assert str(e) == "expected str or bytes from passphrase callback, got int"
else:
    assert False, "Expected an error, got none"

# Raising an exception inside callback.
myException = Exception()


def passphrase_cb(hint, desc, prev_bad, hook=None):
    raise myException


c.set_passphrase_cb(passphrase_cb, None)
try:
    c.op_encrypt([], 0, source, sink)
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"


# Wrong kind of callback function.
def bad_passphrase_cb():
    pass


c.set_passphrase_cb(bad_passphrase_cb, None)
try:
    c.op_encrypt([], 0, source, sink)
except Exception as e:
    assert type(e) == TypeError
else:
    assert False, "Expected an error, got none"

# Test the progress callback.
parms = """<GnupgKeyParms format="internal">
Key-Type: RSA
Key-Length: 1024
Name-Real: Joe Tester
Name-Comment: with stupid passphrase
Name-Email: joe+gpg@example.org
Passphrase: Crypt0R0cks
Expire-Date: 2099-12-31
</GnupgKeyParms>
"""

prams = """<GnupgKeyParms format="internal">
Key-Type: RSA
Key-Length: 1024
Name-Real: Joe Tester
Name-Comment: with stupid passphrase
Name-Email: joe+gpg@example.org
Passphrase: Crypt0R0cks
Expire-Date: 2037-12-31
</GnupgKeyParms>
"""

messages = []


def progress_cb(what, typ, current, total, hook=None):
    assert hook == messages
    messages.append(
        "PROGRESS UPDATE: what = {}, type = {}, current = {}, total = {}"
        .format(what, typ, current, total))


c = gpg.Context()
c.set_progress_cb(progress_cb, messages)
try:
    c.op_genkey(parms, None, None)
except Exception as oops:
    c.op_genkey(prams, None, None)
assert len(messages) > 0


# Test exception handling.
def progress_cb(what, typ, current, total, hook=None):
    raise myException


c = gpg.Context()
c.set_progress_cb(progress_cb, None)
try:
    try:
        c.op_genkey(parms, None, None)
    except Exception as oops:
        c.op_genkey(prams, None, None)
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"

# Test the edit callback.
c = gpg.Context()
c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK)
c.set_passphrase_cb(lambda *args: "abc")
sink = gpg.Data()
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)

cookie = object()
edit_cb_called = False

def oops_check():
    if oops is not None and platform.architecture()[0] != "64bit":
        y2k38_msg = "System appears to be 32-bit and vulnerable to EOL in 2038."
    elif oops is not None and platform.architecture()[0] == "64bit":
        y2k38_msg = "System appears to be 64-bit, but may use 32-bit time."
    else:
        y2k38_msg = "System is 64-bit and/or not susceptible to 2038 EOL."
    return y2k38_msg


def edit_cb(status, args, hook):
    global edit_cb_called
    edit_cb_called = True
    assert hook == cookie
    return "quit" if args == "keyedit.prompt" else None


c.op_edit(alpha, edit_cb, cookie, sink)
assert edit_cb_called

# Test exceptions.
c = gpg.Context()
c.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK)
c.set_passphrase_cb(lambda *args: "abc")
sink = gpg.Data()


def edit_cb(status, args):
    raise myException


try:
    c.op_edit(alpha, edit_cb, None, sink)
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"

# Test the status callback.
source = gpg.Data("Hallo Leute\n")
sink = gpg.Data()

status_cb_called = False


def status_cb(keyword, args, hook=None):
    global status_cb_called
    status_cb_called = True
    assert hook == cookie


c = gpg.Context()
c.set_status_cb(status_cb, cookie)
c.set_ctx_flag("full-status", "1")
c.op_encrypt([alpha], gpg.constants.ENCRYPT_ALWAYS_TRUST, source, sink)
assert status_cb_called

# Test exceptions.
source = gpg.Data("Hallo Leute\n")
sink = gpg.Data()


def status_cb(keyword, args):
    raise myException


c = gpg.Context()
c.set_status_cb(status_cb, None)
c.set_ctx_flag("full-status", "1")
try:
    c.op_encrypt([alpha], gpg.constants.ENCRYPT_ALWAYS_TRUST, source, sink)
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"


# Test the data callbacks.
def read_cb(amount, hook=None):
    assert hook == cookie
    return 0


def release_cb(hook=None):
    assert hook == cookie


data = gpg.Data(cbs=(read_cb, None, None, release_cb, cookie))
try:
    data.read()
except Exception as e:
    assert type(e) == TypeError
else:
    assert False, "Expected an error, got none"


def read_cb(amount):
    raise myException


data = gpg.Data(cbs=(read_cb, None, None, lambda: None))
try:
    data.read()
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"


def write_cb(what, hook=None):
    assert hook == cookie
    return "wrong type"


data = gpg.Data(cbs=(None, write_cb, None, release_cb, cookie))
try:
    data.write(b'stuff')
except Exception as e:
    assert type(e) == TypeError
else:
    assert False, "Expected an error, got none"


def write_cb(what):
    raise myException


data = gpg.Data(cbs=(None, write_cb, None, lambda: None))
try:
    data.write(b'stuff')
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"


def seek_cb(offset, whence, hook=None):
    assert hook == cookie
    return "wrong type"


data = gpg.Data(cbs=(None, None, seek_cb, release_cb, cookie))
try:
    data.seek(0, os.SEEK_SET)
except Exception as e:
    assert type(e) == TypeError
else:
    assert False, "Expected an error, got none"


def seek_cb(offset, whence):
    raise myException


data = gpg.Data(cbs=(None, None, seek_cb, lambda: None))
try:
    data.seek(0, os.SEEK_SET)
except Exception as e:
    assert e == myException
else:
    assert False, "Expected an error, got none"