Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
* Copyright (C) UT-Battelle, LLC. 2014. ALL RIGHTS RESERVED.
* Copyright (C) ARM Ltd. 2016.All rights reserved.
* See file LICENSE for terms.
*/

extern "C" {
#include <poll.h>
#include <uct/api/uct.h>
#include <ucs/time/time.h>
}
#include <common/test.h>
#include "uct_test.h"

class test_uct_event_fd : public uct_test {
public:
    void init() {
        uct_test::init();

        m_e1 = uct_test::create_entity(0);
        m_entities.push_back(m_e1);

        check_skip_test();

        m_e2 = uct_test::create_entity(0);
        m_entities.push_back(m_e2);

        m_e1->connect(0, *m_e2, 0);
        m_e2->connect(0, *m_e1, 0);

        m_am_count = 0;
    }

    typedef struct {
        unsigned length;
        /* data follows */
    } recv_desc_t;

    static ucs_status_t am_handler(void *arg, void *data, size_t length,
                                   unsigned flags) {
        recv_desc_t *my_desc  = (recv_desc_t *) arg;
        uint64_t *test_ib_hdr = (uint64_t *) data;
        uint64_t *actual_data = (uint64_t *) test_ib_hdr + 1;
        unsigned data_length  = length - sizeof(test_ib_hdr);

        my_desc->length = data_length;
        if (*test_ib_hdr == 0xbeef) {
            memcpy(my_desc + 1, actual_data , data_length);
        }

        ++m_am_count;
        return UCS_OK;
    }

    void cleanup() {
        uct_test::cleanup();
    }

    void test_recv_am(unsigned arm_flags, unsigned send_flags);

    static size_t pack_u64(void *dest, void *arg)
    {
        *reinterpret_cast<uint64_t*>(dest) = *reinterpret_cast<uint64_t*>(arg);
        return sizeof(uint64_t);
    }

    void arm(entity *e, unsigned arm_flags) {
        ucs_status_t status;
        for (int i = 0; i < 10; ++i) {
            /* have several retries for arming, in case a transport has spurious
             * events */
            status = uct_iface_event_arm(e->iface(), arm_flags);
            if (status == UCS_OK) {
                break;
            }
        }
        ASSERT_EQ(UCS_OK, status);
    }

protected:
    entity *m_e1, *m_e2;
    static int m_am_count;
};

int test_uct_event_fd::m_am_count = 0;

void test_uct_event_fd::test_recv_am(unsigned arm_flags, unsigned send_flags)
{
    uint64_t send_data = 0xdeadbeef;
    int am_send_count = 0;
    ssize_t res;
    recv_desc_t *recv_buffer;
    struct pollfd wakeup_fd;
    ucs_status_t status;

    recv_buffer = (recv_desc_t *)malloc(sizeof(*recv_buffer) +
                                        sizeof(send_data));
    recv_buffer->length = 0; /* Initialize length to 0 */

    /* give a chance to finish connection for some transports (ib/ud, tcp) */
    flush();

    /* set a callback for the uct to invoke for receiving the data */
    uct_iface_set_am_handler(m_e2->iface(), 0, am_handler, recv_buffer, 0);

    /* create receiver wakeup */
    status = uct_iface_event_fd_get(m_e2->iface(), &wakeup_fd.fd);
    ASSERT_EQ(UCS_OK, status);

    wakeup_fd.events = POLLIN;
    EXPECT_EQ(0, poll(&wakeup_fd, 1, 0));

    arm(m_e2, arm_flags);

    EXPECT_EQ(0, poll(&wakeup_fd, 1, 0));

    /* send the data */
    res = uct_ep_am_bcopy(m_e1->ep(0), 0, pack_u64, &send_data, send_flags);
    ASSERT_EQ((ssize_t)sizeof(send_data), res);
    ++am_send_count;

    /* make sure the file descriptor IS signaled ONCE */
    ASSERT_EQ(1, poll(&wakeup_fd, 1, 1000*ucs::test_time_multiplier()));

    for (;;) {
        if ((progress() == 0) && (m_am_count == am_send_count)) {
            status = uct_iface_event_arm(m_e2->iface(), arm_flags);
            if (status != UCS_ERR_BUSY) {
                break;
            }
        }
    }
    ASSERT_EQ(UCS_OK, status);

    arm(m_e2, arm_flags);

    /* send the data again */
    res = uct_ep_am_bcopy(m_e1->ep(0), 0, pack_u64, &send_data, send_flags);
    ASSERT_EQ((ssize_t)sizeof(send_data), res);
    ++am_send_count;

    /* make sure the file descriptor IS signaled */
    ASSERT_EQ(1, poll(&wakeup_fd, 1, 1000*ucs::test_time_multiplier()));

    while (m_am_count < am_send_count) {
        progress();
    }

    m_e1->flush();

    free(recv_buffer);
}

UCS_TEST_SKIP_COND_P(test_uct_event_fd, am,
                     !check_caps(UCT_IFACE_FLAG_EVENT_RECV |
                                 UCT_IFACE_FLAG_CB_SYNC    |
                                 UCT_IFACE_FLAG_AM_BCOPY))
{
    test_recv_am(UCT_EVENT_RECV, 0);
}

UCS_TEST_SKIP_COND_P(test_uct_event_fd, sig_am,
                     !check_caps(UCT_IFACE_FLAG_EVENT_RECV_SIG |
                                 UCT_IFACE_FLAG_CB_SYNC        |
                                 UCT_IFACE_FLAG_AM_BCOPY))
{
    test_recv_am(UCT_EVENT_RECV_SIG, UCT_SEND_FLAG_SIGNALED);
}

UCT_INSTANTIATE_NO_SELF_TEST_CASE(test_uct_event_fd);