Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "uct_p2p_test.h"
extern "C" {
#include <ucs/arch/atomic.h>
}

class uct_flush_test : public uct_test {
public:
    static const uint64_t SEED1 = 0x1111111111111111lu;
    static const uint64_t SEED2 = 0x2222222222222222lu;
    static const uint64_t SEED3 = 0x3333333333333333lu;
    static const int      AM_ID         = 1;
    static const int      AM_ID_CANCEL  = 2;

    typedef void (uct_flush_test::* flush_func_t)();

    struct test_req_t {
        uct_pending_req_t  uct;
        uct_completion_t   comp;
        mapped_buffer      *sendbuf;
        uct_flush_test     *test;
    };

    void init() {
        uct_test::init();

        entity *m_sender = uct_test::create_entity(0);
        m_entities.push_back(m_sender);

        check_skip_test();

        if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) {
            m_sender->connect(0, *m_sender, 0);
        } else {
            entity *m_receiver = uct_test::create_entity(0);
            m_entities.push_back(m_receiver);

            m_sender->connect(0, *m_receiver, 0);
        }
        am_rx_count   = 0;
        m_flush_flags = 0;
    }

    static size_t pack_cb(void *dest, void *arg)
    {
        const mapped_buffer *sendbuf = (const mapped_buffer *)arg;
        memcpy(dest, sendbuf->ptr(), sendbuf->length());
        return sendbuf->length();
    }

    void blocking_put_bcopy(const mapped_buffer &sendbuf,
                            const mapped_buffer &recvbuf)
    {
        ssize_t status;
         for (;;) {
             status = uct_ep_put_bcopy(sender().ep(0), pack_cb, (void*)&sendbuf,
                                       recvbuf.addr(), recvbuf.rkey());
             if (status >= 0) {
                 return;
             } else if (status == UCS_ERR_NO_RESOURCE) {
                 progress();
                 continue;
             } else {
                 ASSERT_UCS_OK((ucs_status_t)status);
             }
         }
    }

    void blocking_am_bcopy(const mapped_buffer &sendbuf)
    {
         ssize_t status;
         for (;;) {
             status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
                                      (void*)&sendbuf, 0);
             if (status >= 0) {
                 return;
             } else if (status == UCS_ERR_NO_RESOURCE) {
                 progress();
                 continue;
             } else {
                 ASSERT_UCS_OK((ucs_status_t)status);
             }
         }
    }

    static ucs_status_t am_handler(void *arg, void *data, size_t length,
                                   unsigned flags)
    {
        if (arg == NULL) {
            /* This is not completely canceled message, drop it */
            return UCS_OK;
        }
        const mapped_buffer *recvbuf = (const mapped_buffer *)arg;
        memcpy(recvbuf->ptr(), data, ucs_min(length, recvbuf->length()));
        ucs_atomic_add32(&am_rx_count, 1);
        return UCS_OK;
    }

    ucs_status_t am_send_pending(test_req_t *am_req)
    {
        ssize_t status;

        status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
                                 (void*)am_req->sendbuf, 0);
        if (status >= 0) {
            --am_req->comp.count;
            return UCS_OK;
        } else {
            return (ucs_status_t)status;
        }
    }

    static ucs_status_t am_progress(uct_pending_req_t *req)
    {
        test_req_t *am_req = ucs_container_of(req, test_req_t, uct);
        return am_req->test->am_send_pending(am_req);
    }

    static ucs_status_t flush_progress(uct_pending_req_t *req)
    {
        test_req_t *flush_req = ucs_container_of(req, test_req_t, uct);
        ucs_status_t status;

        status = uct_ep_flush(flush_req->test->sender().ep(0), 0,
                              &flush_req->comp);
        if (status == UCS_OK) {
            --flush_req->comp.count;
            return UCS_OK;
        } else if (status == UCS_INPROGRESS) {
            return UCS_OK;
        } else if (status == UCS_ERR_NO_RESOURCE) {
            return UCS_ERR_NO_RESOURCE;
        } else {
            UCS_TEST_ABORT("Error: " << ucs_status_string(status));
        }
    }

    void test_flush_put_bcopy(flush_func_t flush) {
        const size_t length = 8;
        mapped_buffer sendbuf(length, SEED1, sender());
        mapped_buffer recvbuf(length, SEED2, receiver());
        sendbuf.pattern_fill(SEED3);
        blocking_put_bcopy(sendbuf, recvbuf);
        (this->*flush)();

        if (is_flush_cancel()) {
            return;
        }

        recvbuf.pattern_check(SEED3);
    }

    void wait_am(unsigned count) {
        while (am_rx_count < count) {
            progress();
            sched_yield();
        }
    }

    void test_flush_am_zcopy(flush_func_t flush, bool destroy_ep) {
        const size_t length = 8;
        if (is_flush_cancel()) {
            ASSERT_TRUE(destroy_ep);
        }
        mapped_buffer sendbuf(length, SEED1, sender());
        mapped_buffer recvbuf(length, SEED2, receiver());
        sendbuf.pattern_fill(SEED3);

        uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
                                 is_flush_cancel() ? NULL : &recvbuf,
                                 UCT_CB_FLAG_ASYNC);

        uct_completion_t zcomp;
        zcomp.count = 2;
        zcomp.func  = NULL;

        ucs_status_t status;
        UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(),
                                sendbuf.memh(),
                                sender().iface_attr().cap.am.max_iov);
        do {
            status = uct_ep_am_zcopy(sender().ep(0), get_am_id(), NULL, 0, iov,
                                     iovcnt, 0, &zcomp);
            progress();
        } while (status == UCS_ERR_NO_RESOURCE);
        ASSERT_UCS_OK_OR_INPROGRESS(status);
        if (status == UCS_OK) {
            --zcomp.count;
        }

        (this->*flush)();

        EXPECT_EQ(1, zcomp.count); /* Zero copy op should be already completed
                                      since flush returned */

        if (destroy_ep) {
            sender().destroy_ep(0);
        }

        if (is_flush_cancel()) {
            return;
        }

        wait_am(1);

        uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);

        recvbuf.pattern_check(SEED3);
    }

    void test_flush_am_disconnect(flush_func_t flush, bool destroy_ep) {
        const size_t length = 8;
        if (is_flush_cancel()) {
            ASSERT_TRUE(destroy_ep);
        }
        mapped_buffer sendbuf(length, SEED1, sender());
        mapped_buffer recvbuf(length, SEED2, receiver());
        sendbuf.pattern_fill(SEED3);

        uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
                                 is_flush_cancel() ? NULL : &recvbuf,
                                 UCT_CB_FLAG_ASYNC);
        blocking_am_bcopy(sendbuf);
        (this->*flush)();

        if (destroy_ep) {
            sender().destroy_ep(0);
        }

        if (is_flush_cancel()) {
            return;
        }

        wait_am(1);
        uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);

        recvbuf.pattern_check(SEED3);
    }

    void flush_ep_no_comp() {
        ucs_status_t status;
        do {
            progress();
            status = uct_ep_flush(sender().ep(0), m_flush_flags, NULL);
        } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
        ASSERT_UCS_OK(status);
    }

    void flush_iface_no_comp() {
        ucs_status_t status;
        do {
            progress();
            status = uct_iface_flush(sender().iface(), m_flush_flags, NULL);
        } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS));
        ASSERT_UCS_OK(status);
    }

    void flush_ep_nb() {
        uct_completion_t comp;
        ucs_status_t status;
        comp.count = 2;
        comp.func  = NULL;
        do {
            progress();
            status = uct_ep_flush(sender().ep(0), m_flush_flags, &comp);
        } while (status == UCS_ERR_NO_RESOURCE);
        ASSERT_UCS_OK_OR_INPROGRESS(status);
        if (status == UCS_OK) {
            return;
        }
        /* coverity[loop_condition] */
        while (comp.count != 1) {
            progress();
        }
    }

    void test_flush_am_pending(flush_func_t flush, bool destroy_ep);

protected:
    uct_test::entity& sender() {
        return **m_entities.begin();
    }

    uct_test::entity& receiver() {
        return **(m_entities.end() - 1);
    }

    bool is_flush_cancel() const {
        return (m_flush_flags & UCT_FLUSH_FLAG_CANCEL);
    }

    uint8_t get_am_id() const {
        return is_flush_cancel() ? AM_ID_CANCEL : AM_ID;
    }

    static uint32_t am_rx_count;
    unsigned        m_flush_flags;
};

uint32_t uct_flush_test::am_rx_count = 0;

void uct_flush_test::test_flush_am_pending(flush_func_t flush, bool destroy_ep)
{
     if (is_flush_cancel()) {
         ASSERT_TRUE(destroy_ep);
     }
     const size_t length = 8;
     mapped_buffer sendbuf(length, SEED1, sender());
     mapped_buffer recvbuf(length, SEED2, receiver());
     sendbuf.pattern_fill(SEED3);

     uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler,
                              is_flush_cancel() ? NULL : &recvbuf,
                              UCT_CB_FLAG_ASYNC);

     /* Send until resources are exhausted or timeout in 1sec*/
     unsigned count = 0;
     ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(1.0);
     ssize_t packed_len;
     for (;;) {
         packed_len = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb,
                                      (void*)&sendbuf, 0);
         if (packed_len == UCS_ERR_NO_RESOURCE) {
             break;
         }
         if (ucs_get_time() > loop_end_limit) {
             ++count;
             break;
         }

         if (packed_len >= 0) {
             ++count;
         } else {
             ASSERT_UCS_OK((ucs_status_t)packed_len);
         }
     }

     /* Queue some pending AMs */
     ucs_status_t status;
     std::vector<test_req_t> reqs;
     reqs.resize(10);
     for (std::vector<test_req_t>::iterator it = reqs.begin(); it != reqs.end();) {
         it->sendbuf    = &sendbuf;
         it->test       = this;
         it->uct.func   = am_progress;
         it->comp.count = 2;
         it->comp.func  = NULL;
         status = uct_ep_pending_add(sender().ep(0), &it->uct, 0);
         if (UCS_ERR_BUSY == status) {
             /* User advised to retry the send. It means no requests added
              * to the queue
              */
             it = reqs.erase(it);
             status = UCS_OK;
         } else {
             ++count;
             ++it;
         }
         ASSERT_UCS_OK(status);
     }

     /* Try to start a flush */
     test_req_t flush_req;
     flush_req.comp.count = 2;
     flush_req.comp.func  = NULL;

     for (;;) {
         status = uct_ep_flush(sender().ep(0), m_flush_flags, &flush_req.comp);
         if (status == UCS_OK) {
             --flush_req.comp.count;
         } else if (status == UCS_ERR_NO_RESOURCE) {
             /* If flush returned NO_RESOURCE, add to pending must succeed */
             flush_req.test      = this;
             flush_req.uct.func  = flush_progress;
             status = uct_ep_pending_add(sender().ep(0), &flush_req.uct, 0);
             if (status == UCS_ERR_BUSY) {
                 continue;
             }
             EXPECT_EQ(UCS_OK, status);
         } else if (status == UCS_INPROGRESS) {
         } else {
             UCS_TEST_ABORT("failed to flush ep: " << ucs_status_string(status));
         }
         break;
     }

     /* timeout used to prevent test hung */
     wait_for_value(&flush_req.comp.count, 1, true, 60.0);
     EXPECT_EQ(1, flush_req.comp.count);

     while (!reqs.empty()) {
         if (is_flush_cancel()) {
            EXPECT_EQ(2, reqs.back().comp.count);
         } else {
            EXPECT_EQ(1, reqs.back().comp.count);
         }
         reqs.pop_back();
     }

     if (!is_flush_cancel()) {
        wait_am(count);
     }

     if (destroy_ep) {
        sender().destroy_ep(0);
     }

     if (is_flush_cancel()) {
         return;
     }

     uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0);

     recvbuf.pattern_check(SEED3);
}

UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_no_comp,
                     !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_iface_no_comp,
                     !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
    test_flush_put_bcopy(&uct_flush_test::flush_iface_no_comp);
}

UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_nb,
                     !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_put_bcopy(&uct_flush_test::flush_ep_nb);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_no_comp,
                     !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, false);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, true);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_iface_no_comp,
                     !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) {
    test_flush_am_zcopy(&uct_flush_test::flush_iface_no_comp, true);
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_nb,
                     !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, false);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, true);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_no_comp,
                     !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, false);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, true);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_iface_no_comp,
                     !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;
    test_flush_am_disconnect(&uct_flush_test::flush_iface_no_comp, true);
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_nb,
                     !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, false);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, true);
    }
}

UCS_TEST_SKIP_COND_P(uct_flush_test, am_pending_flush_nb,
                     !check_caps(UCT_IFACE_FLAG_AM_BCOPY |
                                 UCT_IFACE_FLAG_PENDING)) {
    am_rx_count   = 0;
    m_flush_flags = UCT_FLUSH_FLAG_LOCAL;

    test_flush_am_pending(&uct_flush_test::flush_ep_nb, false);

    if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) {
        am_rx_count    = 0;
        m_flush_flags |= UCT_FLUSH_FLAG_CANCEL;
        test_flush_am_pending(&uct_flush_test::flush_ep_nb, true);
    }
}

UCT_INSTANTIATE_TEST_CASE(uct_flush_test)