Blob Blame History Raw
# Copyright(c) 2018, Intel Corporation
#
# Redistribution  and  use  in source  and  binary  forms,  with  or  without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of  source code  must retain the  above copyright notice,
#   this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
# * Neither the name  of Intel Corporation  nor the names of its contributors
#   may be used to  endorse or promote  products derived  from this  software
#   without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,  BUT NOT LIMITED TO,  THE
# IMPLIED WARRANTIES OF  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.  IN NO EVENT  SHALL THE COPYRIGHT OWNER  OR CONTRIBUTORS BE
# LIABLE  FOR  ANY  DIRECT,  INDIRECT,  INCIDENTAL,  SPECIAL,  EXEMPLARY,  OR
# CONSEQUENTIAL  DAMAGES  (INCLUDING,  BUT  NOT LIMITED  TO,  PROCUREMENT  OF
# SUBSTITUTE GOODS OR SERVICES;  LOSS OF USE,  DATA, OR PROFITS;  OR BUSINESS
# INTERRUPTION)  HOWEVER CAUSED  AND ON ANY THEORY  OF LIABILITY,  WHETHER IN
# CONTRACT,  STRICT LIABILITY,  OR TORT  (INCLUDING NEGLIGENCE  OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
import select
import struct
import subprocess
import threading
import time
import unittest
import uuid
import sys
import opae.fpga

NLB0 = "d8424dc4-a4a3-c413-f89e-433683f9040b"

MOCK_PORT_ERROR = "/tmp/class/fpga/intel-fpga-dev.0/intel-fpga-port.0/errors/errors"

NLB0_MDATA = {"version": 640,
              "afu-image": {"clock-frequency-high": 312,
                            "clock-frequency-low": 156,
                            "power": 50,
                            "interface-uuid": "1a422218-6dba-448e-b302-425cbcde1406",
                            "magic-no": 488605312,
                            "accelerator-clusters": [{"total-contexts": 1,
                                                      "name": "nlb_400",
                                                      "accelerator-type-uuid": "d8424dc4-a4a3-c413-f89e-433683f9040b"}]},
              "platform-name": "MCP"}

class TestProperties(unittest.TestCase):
    def test_set_parent(self):
        props = opae.fpga.properties(type=opae.fpga.DEVICE)
        toks = opae.fpga.enumerate([props])
        props2 = opae.fpga.properties(type=opae.fpga.ACCELERATOR,
                                      parent=toks[0])
        assert props2.parent
        props2 = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        props2.parent = toks[0]
        assert props2.parent

    def test_guid(self):
        props = opae.fpga.properties(guid=NLB0)
        guid_str = props.guid
        guid = uuid.UUID(guid_str)
        assert str(guid).lower() == NLB0
        props = opae.fpga.properties()
        props.guid = NLB0
        guid_str = props.guid
        guid = uuid.UUID(guid_str)
        assert str(guid).lower() == NLB0

    def test_set_objtype_accelerator(self):
        props = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        assert props.type == opae.fpga.ACCELERATOR
        props = opae.fpga.properties(type=opae.fpga.DEVICE)
        props.type = opae.fpga.ACCELERATOR
        assert props.type == opae.fpga.ACCELERATOR

    def test_set_objtype_device(self):
        props = opae.fpga.properties(type=opae.fpga.DEVICE)
        assert props.type == opae.fpga.DEVICE
        props = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        props.type = opae.fpga.DEVICE
        assert props.type == opae.fpga.DEVICE

    def test_set_segment(self):
        props = opae.fpga.properties(segment=0x9090)
        assert props.segment == 0x9090
        props.segment = 0xA1A1
        assert props.segment == 0xA1A1

    def test_set_bus(self):
        props = opae.fpga.properties(bus=0x5e)
        assert props.bus == 0x5e
        props.bus = 0xbe
        assert props.bus == 0xbe

    def test_set_device(self):
        props = opae.fpga.properties(device=0xe)
        assert props.device == 0xe
        props.device = 0xf
        assert props.device == 0xf

    def test_set_function(self):
        props = opae.fpga.properties(function=0x7)
        assert props.function == 0x7
        props.function = 0x6
        assert props.function == 0x6

    def test_set_socket_id(self):
        props = opae.fpga.properties(socket_id=1)
        assert props.socket_id == 1
        props.socket_id = 0
        assert props.socket_id == 0

    def test_set_object_id(self):
        props = opae.fpga.properties(object_id=0xcafe)
        assert props.object_id == 0xcafe
        props.object_id = 0xfade
        assert props.object_id == 0xfade

    def test_set_num_errors(self):
        props = opae.fpga.properties(num_errors=8)
        assert props.num_errors == 8
        props.num_errors = 4
        assert props.num_errors == 4

    def test_set_num_slots(self):
        props = opae.fpga.properties(type=opae.fpga.DEVICE,
                                     num_slots=3)
        assert props.num_slots == 3
        props.num_slots = 2
        assert props.num_slots == 2

    def test_set_bbs_id(self):
        props = opae.fpga.properties(type=opae.fpga.DEVICE,
                                     bbs_id=0xc0c0cafe)
        assert props.bbs_id == 0xc0c0cafe
        props.bbs_id = 0xb0b0fade
        assert props.bbs_id == 0xb0b0fade

    def test_set_bbs_version(self):
        props = opae.fpga.properties(type=opae.fpga.DEVICE,
                                     bbs_version=(0, 1, 2))
        assert props.bbs_version == (0, 1, 2)
        props.bbs_version = (1, 2, 3)
        assert props.bbs_version == (1, 2, 3)

    def test_set_vendor_id(self):
        props = opae.fpga.properties(vendor_id=0xfafa)
        assert props.vendor_id == 0xfafa
        props.vendor_id = 0xdada
        assert props.vendor_id == 0xdada

    def test_set_device_id(self):
        props = opae.fpga.properties(device_id=0xfa)
        assert props.device_id == 0xfa
        props.device_id = 0xda
        assert props.device_id == 0xda

    @unittest.skip("model not implemented yet")
    def test_set_model(self):
        props = opae.fpga.properties(model="intel skxp")
        assert props.model == "intel skxp"
        props.model = "intel skxp 2"
        assert props.model == "intel skxp 2"

    @unittest.skip("local_memory_size not implemented yet")
    def test_set_local_memory_size(self):
        props = opae.fpga.properties(local_memory_size=0xffff)
        assert props.local_memory_size == 0xffff
        props.local_memory_size = 0xaaaa
        assert props.local_memory_size == 0xaaaa

    @unittest.skip("capabilities not implemented yet")
    def test_set_capabilities(self):
        props = opae.fpga.properties(capabilities=0xdeadbeef)
        assert props.capabilities == 0xdeadbeef
        props.capabilities = 0xfeebdaed
        assert props.capabilities == 0xfeebdaed

    def test_set_num_mmio(self):
        props = opae.fpga.properties(type=opae.fpga.ACCELERATOR,
                                     num_mmio=4)
        assert props.num_mmio == 4
        props.num_mmio = 5
        assert props.num_mmio == 5

    def test_set_num_interrupts(self):
        props = opae.fpga.properties(type=opae.fpga.ACCELERATOR,
                                     num_interrupts=9)
        assert props.num_interrupts == 9
        props.num_interrupts = 8
        assert props.num_interrupts == 8

    def test_set_accelerator_state(self):
        props = opae.fpga.properties(
            type=opae.fpga.ACCELERATOR,
            accelerator_state=opae.fpga.ACCELERATOR_ASSIGNED)
        assert props.accelerator_state == opae.fpga.ACCELERATOR_ASSIGNED
        props.accelerator_state = opae.fpga.ACCELERATOR_UNASSIGNED
        assert props.accelerator_state == opae.fpga.ACCELERATOR_UNASSIGNED


class TestToken(unittest.TestCase):
    def test_enumerate(self):
        props = opae.fpga.properties(guid=NLB0)
        toks = opae.fpga.enumerate([props])
        assert toks

    def test_enumerate_kwargs(self):
        toks = opae.fpga.enumerate(guid=NLB0)
        assert toks

    def test_enumerate_empty_kwargs(self):
        toks = opae.fpga.enumerate()
        assert toks

    def test_token_properties(self):
        props = opae.fpga.properties(guid=NLB0)
        toks = opae.fpga.enumerate([props])
        assert toks
        for tok in toks:
            token_props = opae.fpga.properties(tok)
            assert token_props
            assert token_props.guid and token_props.guid != ""


class TestHandle(unittest.TestCase):
    def setUp(self):
        self.props = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        self.toks = opae.fpga.enumerate([self.props])
        assert self.toks
        self.handle = opae.fpga.open(self.toks[0])
        assert self.handle
        with open('m0.gbs', 'w+b') as fd:
            if sys.version_info[0] == 3:
                fd.write(bytes("XeonFPGA\b7GBSv001\53\02\00\00", 'UTF-8'))
                fd.write(bytes(json.dumps(NLB0_MDATA), 'UTF-8'))
            else:
                fd.write("XeonFPGA\b7GBSv001\53\02\00\00")
                fd.write(json.dumps(NLB0_MDATA))


    def test_open_null_token(self):
        with self.assertRaises(ValueError):
            hndl = opae.fpga.open(None)

    def test_open(self):
        assert self.handle

    def test_reconfigure(self):
        with open('m0.gbs', 'r') as fd:
            self.handle.reconfigure(0, fd)

    def test_close(self):
        self.handle.close()
        assert not self.handle

    def test_context(self):
        self.handle.close()
        assert not self.handle
        with opae.fpga.open(self.toks[0]) as h:
            assert h
        assert not h

    def test_reset(self):
        self.handle.reset()

    def test_close_reset(self):
        self.handle.close()
        assert not self.handle
        with self.assertRaises(RuntimeError):
            self.handle.reset()

    def test_mmio(self):
        offset = 0x100
        write_value = 10
        self.handle.write_csr32(offset, write_value)
        read_value = self.handle.read_csr32(offset)
        assert read_value == write_value
        self.handle.write_csr64(offset, write_value)
        read_value = self.handle.read_csr64(offset)
        assert read_value == write_value
        write_value = 0
        self.handle.write_csr32(offset, write_value)
        read_value = self.handle.read_csr32(offset)
        assert read_value == write_value
        self.handle.write_csr64(offset, write_value)
        read_value = self.handle.read_csr64(offset)
        assert read_value == write_value

    def test_close_mmio(self):
        self.handle.close()
        assert not self.handle
        with self.assertRaises(RuntimeError):
            self.handle.write_csr32(0x100, 0xbeef)

        with self.assertRaises(RuntimeError):
            self.handle.write_csr64(0x100, 0xbeef)

        with self.assertRaises(RuntimeError):
            self.handle.read_csr32(0x100)

        with self.assertRaises(RuntimeError):
            self.handle.read_csr64(0x100)


class TestSharedBuffer(unittest.TestCase):
    def setUp(self):
        self.props = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        self.toks = opae.fpga.enumerate([self.props])
        assert self.toks
        self.handle = opae.fpga.open(self.toks[0])
        assert self.handle

    def test_allocate(self):
        buff1 = opae.fpga.allocate_shared_buffer(self.handle, 4096)
        buff2 = opae.fpga.allocate_shared_buffer(self.handle, 4096)
        assert buff1
        assert buff2
        assert buff1.size() == 4096
        assert buff1.wsid() != 0
        assert buff1.io_address() != 0
        mv = memoryview(buff1)
        assert mv
        assert not buff1.compare(buff2, 4096)
        buff1.fill(0xAA)
        buff2.fill(0xEE)
        assert buff1.compare(buff2, 4096)
        if sys.version_info[0] == 2:
            assert mv[0] == '\xaa'
            assert mv[-1] == '\xaa'
        else:
            assert mv[0] == 0xaa
            assert mv[-1] == 0xaa
        ba = bytearray(buff1)
        assert ba[0] == 0xaa
        buff1[42] = int(65536)
        assert struct.unpack('<L', (bytearray(buff1[42:46])))[0] == 65536

    def test_conext_release(self):
        assert self.handle
        self.handle.close()
        with opae.fpga.open(self.toks[0]) as h:
            buff = opae.fpga.allocate_shared_buffer(h, 4096)
            assert buff
            assert buff.size() == 4096
            assert buff.wsid() != 0
        assert not h
        assert buff.size() == 0
        assert buff.wsid() == 0

def trigger_port_error(value=1):
    with open(MOCK_PORT_ERROR, 'w') as fd:
        fd.write('0\n')
    if value:
        with open(MOCK_PORT_ERROR, 'w') as fd:
            fd.write('{}\n'.format(value))

class TestEvent(unittest.TestCase):
    def setUp(self):
        trigger_port_error(0)
        self.props = opae.fpga.properties(type=opae.fpga.ACCELERATOR,
                                          socket_id=0)
        self.toks = opae.fpga.enumerate([self.props])
        assert self.toks
        self.handle = opae.fpga.open(self.toks[0])
        assert self.handle


    def tearDown(self):
        trigger_port_error(0)

    def test_events(self):
        err_ev = opae.fpga.register_event(self.handle,
                                          opae.fpga.EVENT_ERROR)
        assert err_ev
        # FIXME: os_object returns long
        # WA is to cast it to int
        os_object = int(err_ev.os_object())
        assert isinstance(os_object, int)
        assert os_object > -1
        epoll = select.epoll()
        epoll.register(os_object, select.EPOLLIN)
        received_event = False
        count = 0
        trigger_error_timer = threading.Timer(1, trigger_port_error)
        trigger_error_timer.start()
        for _ in range(10):
            for fileno, ev in epoll.poll(1):
                if fileno == os_object:
                    received_event = True
                    break
            if received_event:
                break
            time.sleep(1)

        trigger_error_timer.cancel()
        # temporarily disalbe this assertion
        #assert received_event


class TestError(unittest.TestCase):
    def setUp(self):
        self.port_errors = {"errors": {"can_clear": True},
                            "first_error": {"can_clear": False},
                            "first_malformed_req": {"can_clear": False}}
        self.fme_errors = {"pcie0_errors": {"can_clear": True},
                           "warning_errors": {"can_clear": True},
                           "pcie1_errors": {"can_clear": True},
                           "gbs_errors": {"can_clear": False},
                           "bbs_errors": {"can_clear": False},
                           "next_error": {"can_clear": False},
                           "errors": {"can_clear": False},
                           "first_error": {"can_clear": False},
                           "inject_error": {"can_clear": True}}
        props = opae.fpga.properties(type=opae.fpga.ACCELERATOR)
        toks = opae.fpga.enumerate([props])
        assert toks
        self.acc_token = toks[0]
        props.type = opae.fpga.DEVICE
        toks = opae.fpga.enumerate([props])
        assert toks
        self.dev_token = toks[0]


    def test_port_errors(self):
        for err in opae.fpga.errors(self.acc_token):
            assert self.port_errors[err.name]["can_clear"] == err.can_clear
            self.port_errors.pop(err.name)
        assert not self.port_errors

    def test_fme_errors(self):
        for err in opae.fpga.errors(self.dev_token):
            assert self.fme_errors[err.name]["can_clear"] == err.can_clear
            self.fme_errors.pop(err.name)
        assert not self.fme_errors

if __name__ == "__main__":
    test = TestEvent('test_events')
    test.run()