Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2017.  ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

extern "C" {
#include <uct/api/uct.h>
}
#include <uct/uct_test.h>
#include <ucs/time/time.h>
#include <poll.h>
#include <infiniband/verbs.h>

/* wait for 1 sec to get statistics */
static const unsigned long test_period_usec = (1ul * UCS_USEC_PER_SEC);
static const unsigned moderation_period_usec = 1000; /* usecs */
/* use multiplier 2 because we have same iface to send/recv which may produce 2x events */
static const unsigned event_limit = (2 * test_period_usec / moderation_period_usec);
static const unsigned max_repeats = 60; /* max 3 minutes per test */

class test_uct_cq_moderation : public uct_test {
protected:

    void init() {
        if (RUNNING_ON_VALGRIND) {
            UCS_TEST_SKIP_R("skipping on valgrind");
        }

        if (!has_rc() && !has_ud()) {
            UCS_TEST_SKIP_R("unsupported");
        }

        uct_test::init();

        if (has_rc()) {
            set_config("RC_FC_ENABLE=n");
        }

        set_config(std::string("IB_TX_EVENT_MOD_PERIOD=") + ucs::to_string(moderation_period_usec) + "us");
        set_config(std::string("IB_RX_EVENT_MOD_PERIOD=") + ucs::to_string(moderation_period_usec) + "us");

        m_sender = uct_test::create_entity(0);
        m_entities.push_back(m_sender);

        check_skip_test();

        m_receiver = uct_test::create_entity(0);
        m_entities.push_back(m_receiver);
    }

    void connect() {
        m_sender->connect(0, *m_receiver, 0);
        short_progress_loop(10); /* Some transports need time to become ready */
    }

    void disconnect() {
        flush();
        if (m_receiver->iface_attr().cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
            m_receiver->destroy_ep(0);
        }
        m_sender->destroy_ep(0);
    }

    void iface_arm(uct_iface_h iface) {
        struct pollfd pfd;
        int fd;

        /* wait for all messages are arrived */
        while (m_recv < m_send) {
            progress();
        }

        uct_iface_event_fd_get(iface, &fd);

        pfd.fd = fd;
        pfd.events = POLLIN;

        do {
            /* arm all event types */
            while (1) {
                if (uct_iface_event_arm(iface,
                                        UCT_EVENT_SEND_COMP |
                                        UCT_EVENT_RECV      |
                                        UCT_EVENT_RECV_SIG) != UCS_ERR_BUSY) {
                    break;
                }
                progress();
            }
            /* repeat till FD is in active state */
        } while (poll(&pfd, 1, 0) > 0);
    }

    static ucs_status_t am_cb(void *arg, void *data, size_t len, unsigned flags) {
        ucs_assert_always(arg != NULL);
        test_uct_cq_moderation *self = static_cast<test_uct_cq_moderation*>(arg);

        self->m_recv++;

        return UCS_OK;
    }

    void run_test(uct_iface_h iface);

    entity * m_sender;
    entity * m_receiver;

    unsigned m_send;
    unsigned m_recv;
};

void test_uct_cq_moderation::run_test(uct_iface_h iface) {
    unsigned events;
    int fd;
    unsigned i;
    int polled;
    struct pollfd pfd;
    ucs_status_t status;

    uct_iface_set_am_handler(m_receiver->iface(), 0, am_cb, this, 0);

    connect();

    m_send = 0;
    m_recv = 0;

    uct_iface_event_fd_get(iface, &fd);
    pfd.fd = fd;
    pfd.events = POLLIN;

    /* repeat test till at least one iteration is successful
     * to exclude random fluctuations */
    for (i = 0; i < max_repeats; i++) {
        events = 0;
        iface_arm(iface);

        ucs_time_t tm = ucs_get_time();

        while ((ucs_time_to_usec(ucs_get_time()) - ucs_time_to_usec(tm)) < test_period_usec) {
            polled = poll(&pfd, 1, 0);
            if (polled > 0) {
                events++;
                iface_arm(iface);
            }

            do {
                status = uct_ep_am_short(m_sender->ep(0), 0, 0, NULL, 0);
                progress();
            } while (status == UCS_ERR_NO_RESOURCE);
            m_send++;
            ASSERT_UCS_OK(status);
        }
        m_sender->flush();
        UCS_TEST_MESSAGE << "iteration: " << i + 1 << ", events: " << events
                         << ", limit: " << event_limit;
        if (events <= event_limit) {
            break;
        }
    }

    disconnect();

    EXPECT_LE(events, event_limit);
}

UCS_TEST_SKIP_COND_P(test_uct_cq_moderation, send_period,
                     !check_caps(UCT_IFACE_FLAG_EVENT_SEND_COMP |
                                 UCT_IFACE_FLAG_EVENT_RECV)) {
    run_test(m_sender->iface());
}

UCS_TEST_SKIP_COND_P(test_uct_cq_moderation, recv_period,
                     !check_caps(UCT_IFACE_FLAG_EVENT_SEND_COMP |
                                 UCT_IFACE_FLAG_EVENT_RECV)) {
    run_test(m_receiver->iface());
}

#if HAVE_DECL_IBV_EXP_CQ_MODERATION
UCT_INSTANTIATE_IB_TEST_CASE(test_uct_cq_moderation)
#endif /* HAVE_DECL_IBV_EXP_CQ_MODERATION */