from libc cimport stdlib
from libc cimport stdio
cimport cpython.buffer
import sys
available_flags = (
('FORMAT', cpython.buffer.PyBUF_FORMAT),
('INDIRECT', cpython.buffer.PyBUF_INDIRECT),
('ND', cpython.buffer.PyBUF_ND),
('STRIDES', cpython.buffer.PyBUF_STRIDES),
('C_CONTIGUOUS', cpython.buffer.PyBUF_C_CONTIGUOUS),
('F_CONTIGUOUS', cpython.buffer.PyBUF_F_CONTIGUOUS),
('WRITABLE', cpython.buffer.PyBUF_WRITABLE)
)
cdef class MockBuffer:
cdef object format, offset
cdef void* buffer
cdef Py_ssize_t len, itemsize
cdef Py_ssize_t* strides
cdef Py_ssize_t* shape
cdef Py_ssize_t* suboffsets
cdef object label, log
cdef int ndim
cdef bint writable
cdef readonly object received_flags, release_ok
cdef public object fail
def __init__(self, label, data, shape=None, strides=None, format=None, writable=True, offset=0):
# It is important not to store references to data after the constructor
# as refcounting is checked on object buffers.
self.label = label
self.release_ok = True
self.log = ""
self.offset = offset
self.itemsize = self.get_itemsize()
self.writable = writable
if format is None: format = self.get_default_format()
if shape is None: shape = (len(data),)
if strides is None:
strides = []
cumprod = 1
rshape = list(shape)
rshape.reverse()
for s in rshape:
strides.append(cumprod)
cumprod *= s
strides.reverse()
strides = [x * self.itemsize for x in strides]
suboffsets = [-1] * len(shape)
datashape = [len(data)]
p = data
while True:
p = p[0]
if isinstance(p, list): datashape.append(len(p))
else: break
if len(datashape) > 1:
# indirect access
self.ndim = <int>len(datashape)
shape = datashape
self.buffer = self.create_indirect_buffer(data, shape)
suboffsets = [0] * (self.ndim-1) + [-1]
strides = [sizeof(void*)] * (self.ndim-1) + [self.itemsize]
self.suboffsets = self.list_to_sizebuf(suboffsets)
else:
# strided and/or simple access
self.buffer = self.create_buffer(data)
self.ndim = <int>len(shape)
self.suboffsets = NULL
try:
format = format.encode('ASCII')
except AttributeError:
pass
self.format = format
self.len = len(data) * self.itemsize
self.strides = self.list_to_sizebuf(strides)
self.shape = self.list_to_sizebuf(shape)
def __dealloc__(self):
stdlib.free(self.strides)
stdlib.free(self.shape)
if self.suboffsets != NULL:
stdlib.free(self.suboffsets)
# must recursively free indirect...
else:
stdlib.free(self.buffer)
cdef void* create_buffer(self, data) except NULL:
cdef size_t n = <size_t>(len(data) * self.itemsize)
cdef char* buf = <char*>stdlib.malloc(n)
if buf == NULL:
raise MemoryError
cdef char* it = buf
for value in data:
self.write(it, value)
it += self.itemsize
return buf
cdef void* create_indirect_buffer(self, data, shape) except NULL:
cdef size_t n = 0
cdef void** buf
assert shape[0] == len(data), (shape[0], len(data))
if len(shape) == 1:
return self.create_buffer(data)
else:
shape = shape[1:]
n = <size_t>len(data) * sizeof(void*)
buf = <void**>stdlib.malloc(n)
if buf == NULL:
return NULL
for idx, subdata in enumerate(data):
buf[idx] = self.create_indirect_buffer(subdata, shape)
return buf
cdef Py_ssize_t* list_to_sizebuf(self, l):
cdef size_t n = <size_t>len(l) * sizeof(Py_ssize_t)
cdef Py_ssize_t* buf = <Py_ssize_t*>stdlib.malloc(n)
for i, x in enumerate(l):
buf[i] = x
return buf
def __getbuffer__(MockBuffer self, Py_buffer* buffer, int flags):
if self.fail:
raise ValueError("Failing on purpose")
self.received_flags = []
cdef int value
for name, value in available_flags:
if (value & flags) == value:
self.received_flags.append(name)
if flags & cpython.buffer.PyBUF_WRITABLE and not self.writable:
raise BufferError("Writable buffer requested from read-only mock: %s" % ' | '.join(self.received_flags))
buffer.buf = <void*>(<char*>self.buffer + (<int>self.offset * self.itemsize))
buffer.obj = self
buffer.len = self.len
buffer.readonly = not self.writable
buffer.format = <char*>self.format
buffer.ndim = self.ndim
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = self.suboffsets
buffer.itemsize = self.itemsize
buffer.internal = NULL
if self.label:
msg = "acquired %s" % self.label
print msg
self.log += msg + "\n"
def __releasebuffer__(MockBuffer self, Py_buffer* buffer):
if buffer.suboffsets != self.suboffsets:
self.release_ok = False
if self.label:
msg = "released %s" % self.label
print msg
self.log += msg + "\n"
def printlog(self):
print self.log[:-1]
def resetlog(self):
self.log = ""
cdef int write(self, char* buf, object value) except -1: raise Exception()
cdef get_itemsize(self):
print "ERROR, not subclassed", self.__class__
cdef get_default_format(self):
print "ERROR, not subclassed", self.__class__
cdef class CharMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<char*>buf)[0] = <char>value
return 0
cdef get_itemsize(self): return sizeof(char)
cdef get_default_format(self): return b"@b"
cdef class IntMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<int*>buf)[0] = <int>value
return 0
cdef get_itemsize(self): return sizeof(int)
cdef get_default_format(self): return b"@i"
cdef class UnsignedIntMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<unsigned int*>buf)[0] = <unsigned int>value
return 0
cdef get_itemsize(self): return sizeof(unsigned int)
cdef get_default_format(self): return b"@I"
cdef class ShortMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<short*>buf)[0] = <short>value
return 0
cdef get_itemsize(self): return sizeof(short)
cdef get_default_format(self): return b"h" # Try without endian specifier
cdef class UnsignedShortMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<unsigned short*>buf)[0] = <unsigned short>value
return 0
cdef get_itemsize(self): return sizeof(unsigned short)
cdef get_default_format(self): return b"@1H" # Try with repeat count
cdef class FloatMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<float*>buf)[0] = <float>(<double>value)
return 0
cdef get_itemsize(self): return sizeof(float)
cdef get_default_format(self): return b"f"
cdef class DoubleMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<double*>buf)[0] = <double>value
return 0
cdef get_itemsize(self): return sizeof(double)
cdef get_default_format(self): return b"d"
cdef extern from *:
void* addr_of_pyobject "(void*)"(object)
cdef class ObjectMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
(<void**>buf)[0] = addr_of_pyobject(value)
return 0
cdef get_itemsize(self): return sizeof(void*)
cdef get_default_format(self): return b"@O"
cdef class IntStridedMockBuffer(IntMockBuffer):
cdef __cythonbufferdefaults__ = {"mode" : "strided"}
cdef class ErrorBuffer:
cdef object label
def __init__(self, label):
self.label = label
def __getbuffer__(ErrorBuffer self, Py_buffer* buffer, int flags):
raise Exception("acquiring %s" % self.label)
def __releasebuffer__(ErrorBuffer self, Py_buffer* buffer):
raise Exception("releasing %s" % self.label)
#
# Structs
#
cdef struct MyStruct:
signed char a
signed char b
long long int c
int d
int e
cdef struct SmallStruct:
int a
int b
cdef struct NestedStruct:
SmallStruct x
SmallStruct y
int z
cdef packed struct PackedStruct:
signed char a
int b
cdef struct NestedPackedStruct:
signed char a
int b
PackedStruct sub
int c
cdef class MyStructMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
cdef MyStruct* s
s = <MyStruct*>buf
s.a, s.b, s.c, s.d, s.e = value
return 0
cdef get_itemsize(self): return sizeof(MyStruct)
cdef get_default_format(self): return b"2cq2i"
cdef class NestedStructMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
cdef NestedStruct* s
s = <NestedStruct*>buf
s.x.a, s.x.b, s.y.a, s.y.b, s.z = value
return 0
cdef get_itemsize(self): return sizeof(NestedStruct)
cdef get_default_format(self): return b"2T{ii}i"
cdef class PackedStructMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
cdef PackedStruct* s
s = <PackedStruct*>buf
s.a, s.b = value
return 0
cdef get_itemsize(self): return sizeof(PackedStruct)
cdef get_default_format(self): return b"^ci"
cdef class NestedPackedStructMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
cdef NestedPackedStruct* s
s = <NestedPackedStruct*>buf
s.a, s.b, s.sub.a, s.sub.b, s.c = value
return 0
cdef get_itemsize(self): return sizeof(NestedPackedStruct)
cdef get_default_format(self): return b"ci^ci@i"
cdef struct LongComplex:
long double real
long double imag
cdef class LongComplexMockBuffer(MockBuffer):
cdef int write(self, char* buf, object value) except -1:
cdef LongComplex* s
s = <LongComplex*>buf
s.real, s.imag = value
return 0
cdef get_itemsize(self): return sizeof(LongComplex)
cdef get_default_format(self): return b"Zg"
def print_offsets(*args, size, newline=True):
sys.stdout.write(' '.join([str(item // size) for item in args]))
if newline:
sys.stdout.write('\n')
def print_int_offsets(*args, newline=True):
print_offsets(*args, size=sizeof(int), newline=newline)
shape_5_3_4_list = [[list(range(k * 12 + j * 4, k * 12 + j * 4 + 4))
for j in range(3)]
for k in range(5)]
stride1 = 21 * 14
stride2 = 21
shape_9_14_21_list = [[list(range(k * stride1 + j * stride2, k * stride1 + j * stride2 + 21))
for j in range(14)]
for k in range(9)]