/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #include "uct_p2p_test.h" extern "C" { #include } class uct_flush_test : public uct_test { public: static const uint64_t SEED1 = 0x1111111111111111lu; static const uint64_t SEED2 = 0x2222222222222222lu; static const uint64_t SEED3 = 0x3333333333333333lu; static const int AM_ID = 1; static const int AM_ID_CANCEL = 2; typedef void (uct_flush_test::* flush_func_t)(); struct test_req_t { uct_pending_req_t uct; uct_completion_t comp; mapped_buffer *sendbuf; uct_flush_test *test; }; void init() { uct_test::init(); entity *m_sender = uct_test::create_entity(0); m_entities.push_back(m_sender); check_skip_test(); if (UCT_DEVICE_TYPE_SELF == GetParam()->dev_type) { m_sender->connect(0, *m_sender, 0); } else { entity *m_receiver = uct_test::create_entity(0); m_entities.push_back(m_receiver); m_sender->connect(0, *m_receiver, 0); } am_rx_count = 0; m_flush_flags = 0; } static size_t pack_cb(void *dest, void *arg) { const mapped_buffer *sendbuf = (const mapped_buffer *)arg; memcpy(dest, sendbuf->ptr(), sendbuf->length()); return sendbuf->length(); } void blocking_put_bcopy(const mapped_buffer &sendbuf, const mapped_buffer &recvbuf) { ssize_t status; for (;;) { status = uct_ep_put_bcopy(sender().ep(0), pack_cb, (void*)&sendbuf, recvbuf.addr(), recvbuf.rkey()); if (status >= 0) { return; } else if (status == UCS_ERR_NO_RESOURCE) { progress(); continue; } else { ASSERT_UCS_OK((ucs_status_t)status); } } } void blocking_am_bcopy(const mapped_buffer &sendbuf) { ssize_t status; for (;;) { status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb, (void*)&sendbuf, 0); if (status >= 0) { return; } else if (status == UCS_ERR_NO_RESOURCE) { progress(); continue; } else { ASSERT_UCS_OK((ucs_status_t)status); } } } static ucs_status_t am_handler(void *arg, void *data, size_t length, unsigned flags) { if (arg == NULL) { /* This is not completely canceled message, drop it */ return UCS_OK; } const mapped_buffer *recvbuf = (const mapped_buffer *)arg; memcpy(recvbuf->ptr(), data, ucs_min(length, recvbuf->length())); ucs_atomic_add32(&am_rx_count, 1); return UCS_OK; } ucs_status_t am_send_pending(test_req_t *am_req) { ssize_t status; status = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb, (void*)am_req->sendbuf, 0); if (status >= 0) { --am_req->comp.count; return UCS_OK; } else { return (ucs_status_t)status; } } static ucs_status_t am_progress(uct_pending_req_t *req) { test_req_t *am_req = ucs_container_of(req, test_req_t, uct); return am_req->test->am_send_pending(am_req); } static ucs_status_t flush_progress(uct_pending_req_t *req) { test_req_t *flush_req = ucs_container_of(req, test_req_t, uct); ucs_status_t status; status = uct_ep_flush(flush_req->test->sender().ep(0), 0, &flush_req->comp); if (status == UCS_OK) { --flush_req->comp.count; return UCS_OK; } else if (status == UCS_INPROGRESS) { return UCS_OK; } else if (status == UCS_ERR_NO_RESOURCE) { return UCS_ERR_NO_RESOURCE; } else { UCS_TEST_ABORT("Error: " << ucs_status_string(status)); } } void test_flush_put_bcopy(flush_func_t flush) { const size_t length = 8; mapped_buffer sendbuf(length, SEED1, sender()); mapped_buffer recvbuf(length, SEED2, receiver()); sendbuf.pattern_fill(SEED3); blocking_put_bcopy(sendbuf, recvbuf); (this->*flush)(); if (is_flush_cancel()) { return; } recvbuf.pattern_check(SEED3); } void wait_am(unsigned count) { while (am_rx_count < count) { progress(); sched_yield(); } } void test_flush_am_zcopy(flush_func_t flush, bool destroy_ep) { const size_t length = 8; if (is_flush_cancel()) { ASSERT_TRUE(destroy_ep); } mapped_buffer sendbuf(length, SEED1, sender()); mapped_buffer recvbuf(length, SEED2, receiver()); sendbuf.pattern_fill(SEED3); uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler, is_flush_cancel() ? NULL : &recvbuf, UCT_CB_FLAG_ASYNC); uct_completion_t zcomp; zcomp.count = 2; zcomp.func = NULL; ucs_status_t status; UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(), sendbuf.memh(), sender().iface_attr().cap.am.max_iov); do { status = uct_ep_am_zcopy(sender().ep(0), get_am_id(), NULL, 0, iov, iovcnt, 0, &zcomp); progress(); } while (status == UCS_ERR_NO_RESOURCE); ASSERT_UCS_OK_OR_INPROGRESS(status); if (status == UCS_OK) { --zcomp.count; } (this->*flush)(); EXPECT_EQ(1, zcomp.count); /* Zero copy op should be already completed since flush returned */ if (destroy_ep) { sender().destroy_ep(0); } if (is_flush_cancel()) { return; } wait_am(1); uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0); recvbuf.pattern_check(SEED3); } void test_flush_am_disconnect(flush_func_t flush, bool destroy_ep) { const size_t length = 8; if (is_flush_cancel()) { ASSERT_TRUE(destroy_ep); } mapped_buffer sendbuf(length, SEED1, sender()); mapped_buffer recvbuf(length, SEED2, receiver()); sendbuf.pattern_fill(SEED3); uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler, is_flush_cancel() ? NULL : &recvbuf, UCT_CB_FLAG_ASYNC); blocking_am_bcopy(sendbuf); (this->*flush)(); if (destroy_ep) { sender().destroy_ep(0); } if (is_flush_cancel()) { return; } wait_am(1); uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0); recvbuf.pattern_check(SEED3); } void flush_ep_no_comp() { ucs_status_t status; do { progress(); status = uct_ep_flush(sender().ep(0), m_flush_flags, NULL); } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS)); ASSERT_UCS_OK(status); } void flush_iface_no_comp() { ucs_status_t status; do { progress(); status = uct_iface_flush(sender().iface(), m_flush_flags, NULL); } while ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS)); ASSERT_UCS_OK(status); } void flush_ep_nb() { uct_completion_t comp; ucs_status_t status; comp.count = 2; comp.func = NULL; do { progress(); status = uct_ep_flush(sender().ep(0), m_flush_flags, &comp); } while (status == UCS_ERR_NO_RESOURCE); ASSERT_UCS_OK_OR_INPROGRESS(status); if (status == UCS_OK) { return; } /* coverity[loop_condition] */ while (comp.count != 1) { progress(); } } void test_flush_am_pending(flush_func_t flush, bool destroy_ep); protected: uct_test::entity& sender() { return **m_entities.begin(); } uct_test::entity& receiver() { return **(m_entities.end() - 1); } bool is_flush_cancel() const { return (m_flush_flags & UCT_FLUSH_FLAG_CANCEL); } uint8_t get_am_id() const { return is_flush_cancel() ? AM_ID_CANCEL : AM_ID; } static uint32_t am_rx_count; unsigned m_flush_flags; }; uint32_t uct_flush_test::am_rx_count = 0; void uct_flush_test::test_flush_am_pending(flush_func_t flush, bool destroy_ep) { if (is_flush_cancel()) { ASSERT_TRUE(destroy_ep); } const size_t length = 8; mapped_buffer sendbuf(length, SEED1, sender()); mapped_buffer recvbuf(length, SEED2, receiver()); sendbuf.pattern_fill(SEED3); uct_iface_set_am_handler(receiver().iface(), get_am_id(), am_handler, is_flush_cancel() ? NULL : &recvbuf, UCT_CB_FLAG_ASYNC); /* Send until resources are exhausted or timeout in 1sec*/ unsigned count = 0; ucs_time_t loop_end_limit = ucs_get_time() + ucs_time_from_sec(1.0); ssize_t packed_len; for (;;) { packed_len = uct_ep_am_bcopy(sender().ep(0), get_am_id(), pack_cb, (void*)&sendbuf, 0); if (packed_len == UCS_ERR_NO_RESOURCE) { break; } if (ucs_get_time() > loop_end_limit) { ++count; break; } if (packed_len >= 0) { ++count; } else { ASSERT_UCS_OK((ucs_status_t)packed_len); } } /* Queue some pending AMs */ ucs_status_t status; std::vector reqs; reqs.resize(10); for (std::vector::iterator it = reqs.begin(); it != reqs.end();) { it->sendbuf = &sendbuf; it->test = this; it->uct.func = am_progress; it->comp.count = 2; it->comp.func = NULL; status = uct_ep_pending_add(sender().ep(0), &it->uct, 0); if (UCS_ERR_BUSY == status) { /* User advised to retry the send. It means no requests added * to the queue */ it = reqs.erase(it); status = UCS_OK; } else { ++count; ++it; } ASSERT_UCS_OK(status); } /* Try to start a flush */ test_req_t flush_req; flush_req.comp.count = 2; flush_req.comp.func = NULL; for (;;) { status = uct_ep_flush(sender().ep(0), m_flush_flags, &flush_req.comp); if (status == UCS_OK) { --flush_req.comp.count; } else if (status == UCS_ERR_NO_RESOURCE) { /* If flush returned NO_RESOURCE, add to pending must succeed */ flush_req.test = this; flush_req.uct.func = flush_progress; status = uct_ep_pending_add(sender().ep(0), &flush_req.uct, 0); if (status == UCS_ERR_BUSY) { continue; } EXPECT_EQ(UCS_OK, status); } else if (status == UCS_INPROGRESS) { } else { UCS_TEST_ABORT("failed to flush ep: " << ucs_status_string(status)); } break; } /* timeout used to prevent test hung */ wait_for_value(&flush_req.comp.count, 1, true, 60.0); EXPECT_EQ(1, flush_req.comp.count); while (!reqs.empty()) { if (is_flush_cancel()) { EXPECT_EQ(2, reqs.back().comp.count); } else { EXPECT_EQ(1, reqs.back().comp.count); } reqs.pop_back(); } if (!is_flush_cancel()) { wait_am(count); } if (destroy_ep) { sender().destroy_ep(0); } if (is_flush_cancel()) { return; } uct_iface_set_am_handler(receiver().iface(), get_am_id(), NULL, NULL, 0); recvbuf.pattern_check(SEED3); } UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_no_comp, !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_put_bcopy(&uct_flush_test::flush_ep_no_comp); } } UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_iface_no_comp, !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) { test_flush_put_bcopy(&uct_flush_test::flush_iface_no_comp); } UCS_TEST_SKIP_COND_P(uct_flush_test, put_bcopy_flush_ep_nb, !check_caps(UCT_IFACE_FLAG_PUT_BCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_put_bcopy(&uct_flush_test::flush_ep_nb); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_put_bcopy(&uct_flush_test::flush_ep_nb); } } UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_no_comp, !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, false); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_am_zcopy(&uct_flush_test::flush_ep_no_comp, true); } } UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_iface_no_comp, !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) { test_flush_am_zcopy(&uct_flush_test::flush_iface_no_comp, true); } UCS_TEST_SKIP_COND_P(uct_flush_test, am_zcopy_flush_ep_nb, !check_caps(UCT_IFACE_FLAG_AM_ZCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, false); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_am_zcopy(&uct_flush_test::flush_ep_nb, true); } } UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_no_comp, !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, false); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_am_disconnect(&uct_flush_test::flush_ep_no_comp, true); } } UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_iface_no_comp, !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) { m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_disconnect(&uct_flush_test::flush_iface_no_comp, true); } UCS_TEST_SKIP_COND_P(uct_flush_test, am_flush_ep_nb, !check_caps(UCT_IFACE_FLAG_AM_BCOPY)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, false); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_am_disconnect(&uct_flush_test::flush_ep_nb, true); } } UCS_TEST_SKIP_COND_P(uct_flush_test, am_pending_flush_nb, !check_caps(UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_PENDING)) { am_rx_count = 0; m_flush_flags = UCT_FLUSH_FLAG_LOCAL; test_flush_am_pending(&uct_flush_test::flush_ep_nb, false); if (is_caps_supported(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)) { am_rx_count = 0; m_flush_flags |= UCT_FLUSH_FLAG_CANCEL; test_flush_am_pending(&uct_flush_test::flush_ep_nb, true); } } UCT_INSTANTIATE_TEST_CASE(uct_flush_test)