/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * Copyright (C) UT-Battelle, LLC. 2016. ALL RIGHTS RESERVED. * Copyright (C) ARM Ltd. 2016.All rights reserved. * See file LICENSE for terms. */ #include "test_rc.h" #define UCT_INSTANTIATE_RC_TEST_CASE(_test_case) \ _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_verbs) \ _UCT_INSTANTIATE_TEST_CASE(_test_case, rc_mlx5) void test_rc::init() { uct_test::init(); m_e1 = uct_test::create_entity(0); m_entities.push_back(m_e1); check_skip_test(); m_e2 = uct_test::create_entity(0); m_entities.push_back(m_e2); connect(); } void test_rc::connect() { m_e1->connect(0, *m_e2, 0); m_e2->connect(0, *m_e1, 0); uct_iface_set_am_handler(m_e1->iface(), 0, am_dummy_handler, NULL, 0); uct_iface_set_am_handler(m_e2->iface(), 0, am_dummy_handler, NULL, 0); } // Check that iface tx ops buffer and flush comp memory pool are moderated // properly when we have communication ops + lots of flushes void test_rc::test_iface_ops(int cq_len) { entity *e = uct_test::create_entity(0); m_entities.push_back(e); e->connect(0, *m_e2, 0); mapped_buffer sendbuf(1024, 0ul, *e); mapped_buffer recvbuf(1024, 0ul, *m_e2); uct_completion_t comp; comp.count = cq_len * 512; // some big value to avoid func invocation comp.func = NULL; UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, sendbuf.ptr(), sendbuf.length(), sendbuf.memh(), m_e1->iface_attr().cap.put.max_iov); // For _x transports several CQEs can be consumed per WQE, post less put zcopy // ops, so that flush would be sucessfull (otherwise flush will return // NO_RESOURCES and completion will not be added for it). for (int i = 0; i < cq_len / 5; i++) { ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_put_zcopy(e->ep(0), iov, iovcnt, recvbuf.addr(), recvbuf.rkey(), &comp)); // Create some stress on iface (flush mp): // post 10 flushes per every put. for (int j = 0; j < 10; j++) { ASSERT_UCS_OK_OR_INPROGRESS(uct_ep_flush(e->ep(0), 0, &comp)); } } flush(); } UCS_TEST_SKIP_COND_P(test_rc, stress_iface_ops, !check_caps(UCT_IFACE_FLAG_PUT_ZCOPY)) { int cq_len = 16; if (UCS_OK != uct_config_modify(m_iface_config, "RC_TX_CQ_LEN", ucs::to_string(cq_len).c_str())) { UCS_TEST_ABORT("Error: cannot modify RC_TX_CQ_LEN"); } test_iface_ops(cq_len); } UCS_TEST_P(test_rc, tx_cq_moderation) { unsigned tx_mod = ucs_min(rc_iface(m_e1)->config.tx_moderation / 4, 8); int16_t init_rsc = rc_ep(m_e1)->txqp.available; send_am_messages(m_e1, tx_mod, UCS_OK); int16_t rsc = rc_ep(m_e1)->txqp.available; EXPECT_LE(rsc, init_rsc); short_progress_loop(100); EXPECT_EQ(rsc, rc_ep(m_e1)->txqp.available); flush(); EXPECT_EQ(init_rsc, rc_ep(m_e1)->txqp.available); } UCT_INSTANTIATE_RC_TEST_CASE(test_rc) class test_rc_max_wr : public test_rc { protected: virtual void init() { ucs_status_t status1, status2; status1 = uct_config_modify(m_iface_config, "TX_MAX_WR", "32"); status2 = uct_config_modify(m_iface_config, "RC_TX_MAX_BB", "32"); if (status1 != UCS_OK && status2 != UCS_OK) { UCS_TEST_ABORT("Error: cannot set rc max wr/bb"); } test_rc::init(); } }; /* Check that max_wr stops from sending */ UCS_TEST_P(test_rc_max_wr, send_limit) { /* first 32 messages should be OK */ send_am_messages(m_e1, 32, UCS_OK); /* next message - should fail */ send_am_messages(m_e1, 1, UCS_ERR_NO_RESOURCE); progress_loop(); send_am_messages(m_e1, 1, UCS_OK); } UCT_INSTANTIATE_RC_TEST_CASE(test_rc_max_wr) uint32_t test_rc_flow_control::m_am_rx_count = 0; void test_rc_flow_control::init() { /* For correct testing FC needs to be initialized during interface creation */ if (UCS_OK != uct_config_modify(m_iface_config, "RC_FC_ENABLE", "y")) { UCS_TEST_ABORT("Error: cannot enable flow control"); } test_rc::init(); ucs_assert(rc_iface(m_e1)->config.fc_enabled); ucs_assert(rc_iface(m_e2)->config.fc_enabled); uct_iface_set_am_handler(m_e1->iface(), FLUSH_AM_ID, am_handler, NULL, 0); uct_iface_set_am_handler(m_e2->iface(), FLUSH_AM_ID, am_handler, NULL, 0); } void test_rc_flow_control::cleanup() { /* Restore FC state to enabled, so iface cleanup will destroy the grant mpool */ rc_iface(m_e1)->config.fc_enabled = 1; rc_iface(m_e2)->config.fc_enabled = 1; test_rc::cleanup(); } void test_rc_flow_control::send_am_and_flush(entity *e, int num_msg) { m_am_rx_count = 0; send_am_messages(e, num_msg - 1, UCS_OK); send_am_messages(e, 1, UCS_OK, FLUSH_AM_ID); /* send last msg with FLUSH id */ wait_for_flag(&m_am_rx_count); EXPECT_EQ(m_am_rx_count, 1ul); } void test_rc_flow_control::validate_grant(entity *e) { wait_for_flag(&get_fc_ptr(e)->fc_wnd); EXPECT_GT(get_fc_ptr(e)->fc_wnd, 0); } /* Check that FC window works as expected: * - If FC enabled, only 'wnd' messages can be sent in a row * - If FC is disabled 'wnd' does not limit senders flow */ void test_rc_flow_control::test_general(int wnd, int soft_thresh, int hard_thresh, bool is_fc_enabled) { set_fc_attributes(m_e1, is_fc_enabled, wnd, soft_thresh, hard_thresh); send_am_messages(m_e1, wnd, UCS_OK); send_am_messages(m_e1, 1, is_fc_enabled ? UCS_ERR_NO_RESOURCE : UCS_OK); validate_grant(m_e1); send_am_messages(m_e1, 1, UCS_OK); if (!is_fc_enabled) { /* Make valgrind happy, need to enable FC for proper cleanup */ set_fc_attributes(m_e1, true, wnd, wnd, 1); } flush(); } void test_rc_flow_control::test_pending_grant(int wnd) { /* Block send capabilities of m_e2 for fc grant to be * added to the pending queue. */ disable_entity(m_e2); set_fc_attributes(m_e1, true, wnd, wnd, 1); send_am_and_flush(m_e1, wnd); /* Now m_e1 should be blocked by FC window and FC grant * should be in pending queue of m_e2. */ send_am_messages(m_e1, 1, UCS_ERR_NO_RESOURCE); EXPECT_LE(get_fc_ptr(m_e1)->fc_wnd, 0); /* Enable send capabilities of m_e2 and send AM message * to force pending queue dispatch */ enable_entity(m_e2); set_tx_moderation(m_e2, 0); send_am_messages(m_e2, 1, UCS_OK); /* Check that m_e1 got grant */ validate_grant(m_e1); send_am_messages(m_e1, 1, UCS_OK); } void test_rc_flow_control::test_flush_fc_disabled() { set_fc_disabled(m_e1); ucs_status_t status; /* If FC is disabled, wnd=0 should not prevent the flush */ get_fc_ptr(m_e1)->fc_wnd = 0; status = uct_ep_flush(m_e1->ep(0), 0, NULL); EXPECT_EQ(UCS_OK, status); /* send active message should be OK */ get_fc_ptr(m_e1)->fc_wnd = 1; send_am_messages(m_e1, 1, UCS_OK); EXPECT_EQ(0, get_fc_ptr(m_e1)->fc_wnd); /* flush must have resources */ status = uct_ep_flush(m_e1->ep(0), 0, NULL); EXPECT_FALSE(UCS_STATUS_IS_ERR(status)) << ucs_status_string(status); } void test_rc_flow_control::test_pending_purge(int wnd, int num_pend_sends) { pending_send_request_t reqs[num_pend_sends]; disable_entity(m_e2); set_fc_attributes(m_e1, true, wnd, wnd, 1); send_am_and_flush(m_e1, wnd); /* Now m2 ep should have FC grant message in the pending queue. * Add some user pending requests as well */ for (int i = 0; i < num_pend_sends; i++) { reqs[i].uct.func = NULL; /* make valgrind happy */ reqs[i].purge_count = 0; EXPECT_EQ(uct_ep_pending_add(m_e2->ep(0), &reqs[i].uct, 0), UCS_OK); } uct_ep_pending_purge(m_e2->ep(0), purge_cb, NULL); for (int i = 0; i < num_pend_sends; i++) { EXPECT_EQ(1, reqs[i].purge_count); } } /* Check that FC window works as expected */ UCS_TEST_P(test_rc_flow_control, general_enabled) { test_general(8, 4, 2, true); } UCS_TEST_P(test_rc_flow_control, general_disabled) { test_general(8, 4, 2, false); } /* Test the scenario when ep is being destroyed while there is * FC grant message in the pending queue */ UCS_TEST_P(test_rc_flow_control, pending_only_fc) { int wnd = 2; disable_entity(m_e2); set_fc_attributes(m_e1, true, wnd, wnd, 1); send_am_and_flush(m_e1, wnd); m_e2->destroy_ep(0); ASSERT_TRUE(rc_iface(m_e2)->tx.arbiter.current == NULL); } /* Check that user callback passed to uct_ep_pending_purge is not * invoked for FC grant message */ UCS_TEST_P(test_rc_flow_control, pending_purge) { test_pending_purge(2, 5); } UCS_TEST_P(test_rc_flow_control, pending_grant) { test_pending_grant(5); } UCS_TEST_P(test_rc_flow_control, fc_disabled_flush) { test_flush_fc_disabled(); } UCT_INSTANTIATE_RC_TEST_CASE(test_rc_flow_control) #if ENABLE_STATS void test_rc_flow_control_stats::test_general(int wnd, int soft_thresh, int hard_thresh) { uint64_t v; set_fc_attributes(m_e1, true, wnd, soft_thresh, hard_thresh); send_am_messages(m_e1, wnd, UCS_OK); send_am_messages(m_e1, 1, UCS_ERR_NO_RESOURCE); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e1)->stats, UCT_RC_FC_STAT_NO_CRED); EXPECT_EQ(1ul, v); validate_grant(m_e1); send_am_messages(m_e1, 1, UCS_OK); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e1)->stats, UCT_RC_FC_STAT_TX_HARD_REQ); EXPECT_EQ(1ul, v); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e1)->stats, UCT_RC_FC_STAT_RX_PURE_GRANT); EXPECT_EQ(1ul, v); flush(); } UCS_TEST_P(test_rc_flow_control_stats, general) { test_general(5, 2, 1); } UCS_TEST_P(test_rc_flow_control_stats, soft_request) { uint64_t v; int wnd = 8; int s_thresh = 4; int h_thresh = 1; set_fc_attributes(m_e1, true, wnd, s_thresh, h_thresh); send_am_and_flush(m_e1, wnd - (s_thresh - 1)); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e1)->stats, UCT_RC_FC_STAT_TX_SOFT_REQ); EXPECT_EQ(1ul, v); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e2)->stats, UCT_RC_FC_STAT_RX_SOFT_REQ); EXPECT_EQ(1ul, v); send_am_and_flush(m_e2, wnd - (s_thresh - 1)); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e1)->stats, UCT_RC_FC_STAT_RX_GRANT); EXPECT_EQ(1ul, v); v = UCS_STATS_GET_COUNTER(get_fc_ptr(m_e2)->stats, UCT_RC_FC_STAT_TX_GRANT); EXPECT_EQ(1ul, v); } UCT_INSTANTIATE_RC_TEST_CASE(test_rc_flow_control_stats) #endif