/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "test_ucp_tag.h"
#include "ucp_datatype.h"
extern "C" {
#include <ucp/core/ucp_ep.inl> /* for testing EP RNDV configuration */
#include <ucp/core/ucp_request.h> /* for debug */
#include <ucp/core/ucp_worker.h> /* for testing memory consumption */
}
class test_ucp_peer_failure : public ucp_test {
public:
test_ucp_peer_failure();
static std::vector<ucp_test_param>
enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
const std::string& test_case_name, const std::string& tls);
ucp_ep_params_t get_ep_params();
protected:
enum {
TEST_TAG = UCS_BIT(0),
TEST_RMA = UCS_BIT(1),
FAIL_IMM = UCS_BIT(2)
};
enum {
STABLE_EP_INDEX,
FAILING_EP_INDEX
};
typedef ucs::handle<ucp_mem_h, ucp_context_h> mem_handle_t;
void set_timeouts();
static void err_cb(void *arg, ucp_ep_h ep, ucs_status_t status);
ucp_ep_h stable_sender();
ucp_ep_h failing_sender();
entity& stable_receiver();
entity& failing_receiver();
void *send_nb(ucp_ep_h ep, ucp_rkey_h rkey);
void *recv_nb(entity& e);
void fail_receiver();
void smoke_test(bool stable_pair);
static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
void get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
ucs::handle<ucp_rkey_h>& rkey);
void set_rkeys();
static void send_cb(void *request, ucs_status_t status);
static void recv_cb(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info);
virtual void cleanup();
void do_test(size_t msg_size, int pre_msg_count, bool force_close,
bool request_must_fail);
size_t m_err_count;
ucs_status_t m_err_status;
std::string m_sbuf, m_rbuf;
mem_handle_t m_stable_memh, m_failing_memh;
ucs::handle<ucp_rkey_h> m_stable_rkey, m_failing_rkey;
ucs::ptr_vector<ucs::scoped_setenv> m_env;
};
UCP_INSTANTIATE_TEST_CASE(test_ucp_peer_failure)
test_ucp_peer_failure::test_ucp_peer_failure() : m_err_count(0), m_err_status(UCS_OK) {
ucs::fill_random(m_sbuf);
set_timeouts();
}
std::vector<ucp_test_param>
test_ucp_peer_failure::enum_test_params(const ucp_params_t& ctx_params,
const std::string& name,
const std::string& test_case_name,
const std::string& tls)
{
std::vector<ucp_test_param> result;
ucp_params_t params = ucp_test::get_ctx_params();
params.field_mask |= UCP_PARAM_FIELD_FEATURES;
params.features = UCP_FEATURE_TAG;
generate_test_params_variant(params, name, test_case_name + "/tag", tls,
TEST_TAG, result);
generate_test_params_variant(params, name, test_case_name + "/tag_fail_imm",
tls, TEST_TAG | FAIL_IMM, result);
params.features = UCP_FEATURE_RMA;
generate_test_params_variant(params, name, test_case_name + "/rma", tls,
TEST_RMA, result);
generate_test_params_variant(params, name, test_case_name + "/rma_fail_imm",
tls, TEST_RMA | FAIL_IMM, result);
return result;
}
ucp_ep_params_t test_ucp_peer_failure::get_ep_params() {
ucp_ep_params_t params;
memset(¶ms, 0, sizeof(params));
params.field_mask = UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER;
params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
params.err_handler.cb = err_cb;
params.err_handler.arg = reinterpret_cast<void*>(this);
return params;
}
void test_ucp_peer_failure::set_timeouts() {
/* Set small TL timeouts to reduce testing time */
m_env.push_back(new ucs::scoped_setenv("UCX_RC_TIMEOUT", "10ms"));
m_env.push_back(new ucs::scoped_setenv("UCX_RC_RNR_TIMEOUT", "10ms"));
m_env.push_back(new ucs::scoped_setenv("UCX_RC_RETRY_COUNT", "2"));
}
void test_ucp_peer_failure::err_cb(void *arg, ucp_ep_h ep, ucs_status_t status) {
test_ucp_peer_failure *self = reinterpret_cast<test_ucp_peer_failure*>(arg);
EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status);
self->m_err_status = status;
++self->m_err_count;
}
ucp_ep_h test_ucp_peer_failure::stable_sender() {
return sender().ep(0, STABLE_EP_INDEX);
}
ucp_ep_h test_ucp_peer_failure::failing_sender() {
return sender().ep(0, FAILING_EP_INDEX);
}
ucp_test::entity& test_ucp_peer_failure::stable_receiver() {
return m_entities.at(m_entities.size() - 2);
}
ucp_test::entity& test_ucp_peer_failure::failing_receiver() {
return m_entities.at(m_entities.size() - 1);
}
void *test_ucp_peer_failure::send_nb(ucp_ep_h ep, ucp_rkey_h rkey) {
if (GetParam().variant & TEST_TAG) {
return ucp_tag_send_nb(ep, &m_sbuf[0], m_sbuf.size(), DATATYPE, 0,
send_cb);
} else if (GetParam().variant & TEST_RMA) {
return ucp_put_nb(ep, &m_sbuf[0], m_sbuf.size(), (uintptr_t)&m_rbuf[0],
rkey, send_cb);
} else {
ucs_fatal("invalid test case");
}
}
void *test_ucp_peer_failure::recv_nb(entity& e) {
ucs_assert(m_rbuf.size() >= m_sbuf.size());
if (GetParam().variant & TEST_TAG) {
return ucp_tag_recv_nb(e.worker(), &m_rbuf[0], m_rbuf.size(), DATATYPE, 0,
0, recv_cb);
} else if (GetParam().variant & TEST_RMA) {
return NULL;
} else {
ucs_fatal("invalid test case");
}
}
void test_ucp_peer_failure::fail_receiver() {
/* TODO: need to handle non-empty TX window in UD EP destructor",
* see debug message (ud_ep.c:220)
* ucs_debug("ep=%p id=%d conn_id=%d has %d unacked packets",
* self, self->ep_id, self->conn_id,
* (int)ucs_queue_length(&self->tx.window));
*/
// TODO use force-close to close connections
flush_worker(failing_receiver());
m_failing_memh.reset();
failing_receiver().cleanup();
}
void test_ucp_peer_failure::smoke_test(bool stable_pair) {
void *rreq = recv_nb(stable_pair ? stable_receiver() : failing_receiver());
void *sreq = send_nb(stable_pair ? stable_sender() : failing_sender(),
stable_pair ? m_stable_rkey : m_failing_rkey);
wait(sreq);
wait(rreq);
EXPECT_EQ(m_sbuf, m_rbuf);
}
void test_ucp_peer_failure::unmap_memh(ucp_mem_h memh, ucp_context_h context)
{
ucs_status_t status = ucp_mem_unmap(context, memh);
if (status != UCS_OK) {
ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
}
}
void test_ucp_peer_failure::get_rkey(ucp_ep_h ep, entity& dst, mem_handle_t& memh,
ucs::handle<ucp_rkey_h>& rkey) {
ucp_mem_map_params_t params;
memset(¶ms, 0, sizeof(params));
params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
UCP_MEM_MAP_PARAM_FIELD_LENGTH;
params.address = &m_rbuf[0];
params.length = m_rbuf.size();
ucp_mem_h ucp_memh;
ucs_status_t status = ucp_mem_map(dst.ucph(), ¶ms, &ucp_memh);
ASSERT_UCS_OK(status);
memh.reset(ucp_memh, unmap_memh, dst.ucph());
void *rkey_buffer;
size_t rkey_buffer_size;
status = ucp_rkey_pack(dst.ucph(), memh, &rkey_buffer, &rkey_buffer_size);
ASSERT_UCS_OK(status);
ucp_rkey_h ucp_rkey;
status = ucp_ep_rkey_unpack(ep, rkey_buffer, &ucp_rkey);
ASSERT_UCS_OK(status);
rkey.reset(ucp_rkey, ucp_rkey_destroy);
ucp_rkey_buffer_release(rkey_buffer);
}
void test_ucp_peer_failure::set_rkeys() {
if (GetParam().variant & TEST_RMA) {
get_rkey(failing_sender(), failing_receiver(), m_failing_memh,
m_failing_rkey);
get_rkey(stable_sender(), stable_receiver(), m_stable_memh,
m_stable_rkey);
}
}
void test_ucp_peer_failure::send_cb(void *request, ucs_status_t status)
{
}
void test_ucp_peer_failure::recv_cb(void *request, ucs_status_t status,
ucp_tag_recv_info_t *info)
{
}
void test_ucp_peer_failure::cleanup() {
m_failing_rkey.reset();
m_stable_rkey.reset();
m_failing_memh.reset();
m_stable_memh.reset();
ucp_test::cleanup();
}
void test_ucp_peer_failure::do_test(size_t msg_size, int pre_msg_count,
bool force_close, bool request_must_fail)
{
skip_loopback();
m_sbuf.resize(msg_size);
m_rbuf.resize(msg_size);
/* connect 2 ep's from sender() to 2 receiver entities */
create_entity();
sender().connect(&stable_receiver(), get_ep_params(), STABLE_EP_INDEX);
sender().connect(&failing_receiver(), get_ep_params(), FAILING_EP_INDEX);
set_rkeys();
/* Since we don't want to test peer failure on a stable pair
* and don't expect EP timeout error on those EPs,
* run traffic on a stable pair to connect it */
smoke_test(true);
if (!(GetParam().variant & FAIL_IMM)) {
/* if not fail immediately, run traffic on failing pair to connect it */
smoke_test(false);
}
/* put some sends on the failing pair */
std::vector<void*> sreqs_pre;
for (int i = 0; i < pre_msg_count; ++i) {
progress();
void *req = send_nb(failing_sender(), m_failing_rkey);
ASSERT_FALSE(UCS_PTR_IS_ERR(req));
if (UCS_PTR_IS_PTR(req)) {
sreqs_pre.push_back(req);
}
}
EXPECT_EQ(UCS_OK, m_err_status);
/* Since UCT/UD EP has a SW implementation of reliablity on which peer
* failure mechanism is based, we should set small UCT/UD EP timeout
* for UCT/UD EPs for sender's UCP EP to reduce testing time */
double prev_ib_ud_timeout = sender().set_ib_ud_timeout(3.);
{
scoped_log_handler slh(wrap_errors_logger);
fail_receiver();
void *sreq = send_nb(failing_sender(), m_failing_rkey);
while (!m_err_count) {
progress();
}
EXPECT_NE(UCS_OK, m_err_status);
if (UCS_PTR_IS_PTR(sreq)) {
/* The request may either succeed or fail, even though the data is
* not * delivered - depends on when the error is detected on sender
* side and if zcopy/bcopy protocol is used. In any case, the
* request must complete, and all resources have to be released.
*/
ucs_status_t status = ucp_request_check_status(sreq);
EXPECT_NE(UCS_INPROGRESS, status);
if (request_must_fail) {
EXPECT_EQ(m_err_status, status);
} else {
EXPECT_TRUE((m_err_status == status) || (UCS_OK == status));
}
ucp_request_release(sreq);
}
/* Additional sends must fail */
void *sreq2 = send_nb(failing_sender(), m_failing_rkey);
EXPECT_FALSE(UCS_PTR_IS_PTR(sreq2));
EXPECT_EQ(m_err_status, UCS_PTR_STATUS(sreq2));
if (force_close) {
unsigned allocd_eps_before =
ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
ucp_ep_h ep = sender().revoke_ep(0, FAILING_EP_INDEX);
m_failing_rkey.reset();
void *creq = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FORCE);
wait(creq);
unsigned allocd_eps_after =
ucs_strided_alloc_inuse_count(&sender().worker()->ep_alloc);
if (!(GetParam().variant & FAIL_IMM)) {
EXPECT_LT(allocd_eps_after, allocd_eps_before);
}
}
/* release requests */
while (!sreqs_pre.empty()) {
void *req = sreqs_pre.back();
sreqs_pre.pop_back();
EXPECT_NE(UCS_INPROGRESS, ucp_request_test(req, NULL));
ucp_request_release(req);
}
}
/* Since we won't test peer failure anymore, reset UCT/UD EP timeout to the
* default value to avoid possible UD EP timeout errors under high load */
sender().set_ib_ud_timeout(prev_ib_ud_timeout);
/* Check workability of stable pair */
smoke_test(true);
/* Check that TX polling is working well */
while (sender().progress());
/* Destroy rkeys before destroying the worker (which also destroys the
* endpoints) */
m_failing_rkey.reset();
m_stable_rkey.reset();
/* When all requests on sender are done we need to prevent LOCAL_FLUSH
* in test teardown. Receiver is killed and doesn't respond on FC requests
*/
sender().destroy_worker();
}
UCS_TEST_P(test_ucp_peer_failure, basic) {
do_test(UCS_KBYTE, /* msg_size */
0, /* pre_msg_cnt */
false, /* force_close */
false /* must_fail */);
}
UCS_TEST_P(test_ucp_peer_failure, rndv_disable) {
const size_t size_max = std::numeric_limits<size_t>::max();
sender().connect(&receiver(), get_ep_params(), STABLE_EP_INDEX);
EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.am_thresh);
EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv.rma_thresh);
EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.am_thresh);
EXPECT_EQ(size_max, ucp_ep_config(sender().ep())->tag.rndv_send_nbr.rma_thresh);
}
UCS_TEST_P(test_ucp_peer_failure, zcopy, "ZCOPY_THRESH=1023") {
do_test(UCS_KBYTE, /* msg_size */
0, /* pre_msg_cnt */
false, /* force_close */
true /* must_fail */);
}
UCS_TEST_P(test_ucp_peer_failure, bcopy_multi, "SEG_SIZE?=512", "RC_TM_ENABLE?=n") {
do_test(UCS_KBYTE, /* msg_size */
0, /* pre_msg_cnt */
false, /* force_close */
false /* must_fail */);
}
UCS_TEST_P(test_ucp_peer_failure, force_close, "RC_FC_ENABLE?=n") {
do_test(16000, /* msg_size */
1000, /* pre_msg_cnt */
true, /* force_close */
false /* must_fail */);
}
UCS_TEST_SKIP_COND_P(test_ucp_peer_failure, disable_sync_send,
!(GetParam().variant & TEST_TAG)) {
const size_t max_size = UCS_MBYTE;
std::vector<char> buf(max_size, 0);
void *req;
sender().connect(&receiver(), get_ep_params());
/* Make sure API is disabled for any size and data type */
for (size_t size = 1; size <= max_size; size *= 2) {
req = ucp_tag_send_sync_nb(sender().ep(), buf.data(), size, DATATYPE,
0x111337, NULL);
EXPECT_FALSE(UCS_PTR_IS_PTR(req));
EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
ucp::data_type_desc_t dt_desc(DATATYPE_IOV, buf.data(), size);
req = ucp_tag_send_sync_nb(sender().ep(), dt_desc.buf(), dt_desc.count(),
dt_desc.dt(), 0x111337, NULL);
EXPECT_FALSE(UCS_PTR_IS_PTR(req));
EXPECT_EQ(UCS_ERR_UNSUPPORTED, UCS_PTR_STATUS(req));
}
}