|
Packit |
d7e8d0 |
#!/usr/bin/env python
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Copyright (C) 2016 g10 Code GmbH
|
|
Packit |
d7e8d0 |
#
|
|
Packit |
d7e8d0 |
# This file is part of GPGME.
|
|
Packit |
d7e8d0 |
#
|
|
Packit |
d7e8d0 |
# GPGME is free software; you can redistribute it and/or modify it
|
|
Packit |
d7e8d0 |
# under the terms of the GNU General Public License as published by
|
|
Packit |
d7e8d0 |
# the Free Software Foundation; either version 2 of the License, or
|
|
Packit |
d7e8d0 |
# (at your option) any later version.
|
|
Packit |
d7e8d0 |
#
|
|
Packit |
d7e8d0 |
# GPGME is distributed in the hope that it will be useful, but WITHOUT
|
|
Packit |
d7e8d0 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
Packit |
d7e8d0 |
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
|
|
Packit |
d7e8d0 |
# Public License for more details.
|
|
Packit |
d7e8d0 |
#
|
|
Packit |
d7e8d0 |
# You should have received a copy of the GNU Lesser General Public
|
|
Packit Service |
30b792 |
# License along with this program; if not, see <https://www.gnu.org/licenses/>.
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
from __future__ import absolute_import, print_function, unicode_literals
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
import io
|
|
Packit |
d7e8d0 |
import os
|
|
Packit |
d7e8d0 |
import tempfile
|
|
Packit |
d7e8d0 |
import gpg
|
|
Packit |
d7e8d0 |
import support
|
|
Packit Service |
30b792 |
_ = support # to appease pyflakes.
|
|
Packit Service |
30b792 |
|
|
Packit Service |
30b792 |
del absolute_import, print_function, unicode_literals
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data('Hello world!')
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
assert data.read() == b''
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
assert data.read() == b''
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data(b'Hello world!')
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data(b'Hello world!', copy=False)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data()
|
|
Packit |
d7e8d0 |
data.write('Hello world!')
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data()
|
|
Packit |
d7e8d0 |
data.write(b'Hello world!')
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data()
|
|
Packit |
d7e8d0 |
data.write(b'Hello world!')
|
|
Packit |
d7e8d0 |
# We expect the second argument to default to SEEK_SET
|
|
Packit |
d7e8d0 |
data.seek(0)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
binjunk = bytes(range(256))
|
|
Packit |
d7e8d0 |
data = gpg.Data()
|
|
Packit |
d7e8d0 |
data.write(binjunk)
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == binjunk
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
data = gpg.Data()
|
|
Packit |
d7e8d0 |
data.set_file_name("foobar")
|
|
Packit |
d7e8d0 |
assert data.get_file_name() == "foobar"
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Test reading from an existing file.
|
|
Packit |
d7e8d0 |
with tempfile.NamedTemporaryFile() as tmp:
|
|
Packit |
d7e8d0 |
tmp.write(binjunk)
|
|
Packit |
d7e8d0 |
tmp.flush()
|
|
Packit |
d7e8d0 |
tmp.seek(0)
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Open using name.
|
|
Packit |
d7e8d0 |
data = gpg.Data(file=tmp.name)
|
|
Packit |
d7e8d0 |
assert data.read() == binjunk
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Open using name, without copying.
|
|
Packit |
d7e8d0 |
if False:
|
|
Packit |
d7e8d0 |
# delayed reads are not yet supported
|
|
Packit |
d7e8d0 |
data = gpg.Data(file=tmp.name, copy=False)
|
|
Packit |
d7e8d0 |
assert data.read() == binjunk
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Open using stream.
|
|
Packit |
d7e8d0 |
tmp.seek(0)
|
|
Packit |
d7e8d0 |
data = gpg.Data(file=tmp)
|
|
Packit |
d7e8d0 |
assert data.read() == binjunk
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Open using stream, offset, and length.
|
|
Packit |
d7e8d0 |
data = gpg.Data(file=tmp, offset=0, length=42)
|
|
Packit |
d7e8d0 |
assert data.read() == binjunk[:42]
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Open using name, offset, and length.
|
|
Packit |
d7e8d0 |
data = gpg.Data(file=tmp.name, offset=23, length=42)
|
|
Packit Service |
30b792 |
assert data.read() == binjunk[23:23 + 42]
|
|
Packit Service |
30b792 |
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Test callbacks.
|
|
Packit |
d7e8d0 |
class DataObject(object):
|
|
Packit |
d7e8d0 |
def __init__(self):
|
|
Packit |
d7e8d0 |
self.buffer = io.BytesIO()
|
|
Packit |
d7e8d0 |
self.released = False
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
def read(self, amount, hook=None):
|
|
Packit |
d7e8d0 |
assert not self.released
|
|
Packit |
d7e8d0 |
return self.buffer.read(amount)
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
def write(self, data, hook=None):
|
|
Packit |
d7e8d0 |
assert not self.released
|
|
Packit |
d7e8d0 |
return self.buffer.write(data)
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
def seek(self, offset, whence, hook=None):
|
|
Packit |
d7e8d0 |
assert not self.released
|
|
Packit |
d7e8d0 |
return self.buffer.seek(offset, whence)
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
def release(self, hook=None):
|
|
Packit |
d7e8d0 |
assert not self.released
|
|
Packit |
d7e8d0 |
self.released = True
|
|
Packit |
d7e8d0 |
|
|
Packit Service |
30b792 |
|
|
Packit |
d7e8d0 |
do = DataObject()
|
|
Packit |
d7e8d0 |
cookie = object()
|
|
Packit |
d7e8d0 |
data = gpg.Data(cbs=(do.read, do.write, do.seek, do.release, cookie))
|
|
Packit |
d7e8d0 |
data.write('Hello world!')
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
del data
|
|
Packit |
d7e8d0 |
assert do.released
|
|
Packit |
d7e8d0 |
|
|
Packit |
d7e8d0 |
# Again, without the cookie.
|
|
Packit |
d7e8d0 |
do = DataObject()
|
|
Packit |
d7e8d0 |
data = gpg.Data(cbs=(do.read, do.write, do.seek, do.release))
|
|
Packit |
d7e8d0 |
data.write('Hello world!')
|
|
Packit |
d7e8d0 |
data.seek(0, os.SEEK_SET)
|
|
Packit |
d7e8d0 |
assert data.read() == b'Hello world!'
|
|
Packit |
d7e8d0 |
del data
|
|
Packit |
d7e8d0 |
assert do.released
|