Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "ucp_test.h"
#include "common/test.h"
#include "ucp/ucp_test.h"

#include <algorithm>
#include <set>

extern "C" {
#include <ucp/wireup/address.h>
#include <ucp/core/ucp_ep.inl>
}

class test_ucp_wireup : public ucp_test {
public:
    static std::vector<ucp_test_param>
    enum_test_params_features(const ucp_params_t& ctx_params,
                              const std::string& name,
                              const std::string& test_case_name,
                              const std::string& tls,
                              uint64_t features, bool test_all = 0);

protected:
    enum {
        TEST_RMA     = UCS_BIT(0),
        TEST_TAG     = UCS_BIT(1),
        TEST_STREAM  = UCS_BIT(2),
        UNIFIED_MODE = UCS_BIT(3),
        TEST_AMO     = UCS_BIT(4)
    };

    typedef uint64_t               elem_type;
    typedef std::vector<elem_type> vec_type;

    static const size_t BUFFER_LENGTH    = 16384;
    static const ucp_datatype_t DT_U64 = ucp_dt_make_contig(sizeof(elem_type));
    static const uint64_t TAG          = 0xdeadbeef;
    static const elem_type SEND_DATA   = 0xdeadbeef12121212ull;

    virtual void init();
    virtual void cleanup();

    void send_nb(ucp_ep_h ep, size_t length, int repeat, std::vector<void*>& reqs,
                 uint64_t send_data = SEND_DATA);

    void send_b(ucp_ep_h ep, size_t length, int repeat,
                uint64_t send_data = SEND_DATA);

    void recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length, int repeat,
                uint64_t recv_data = SEND_DATA);

    void send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker, ucp_ep_h recv_ep,
                   size_t vecsize, int repeat);

    void waitall(std::vector<void*> reqs);

    void disconnect(ucp_ep_h ep);

    void disconnect(ucp_test::entity &e);

    static void send_completion(void *request, ucs_status_t status);

    static void tag_recv_completion(void *request, ucs_status_t status,
                                    ucp_tag_recv_info_t *info);

    void rkeys_cleanup();

    void memhs_cleanup();

    void clear_recv_data();

    void fill_send_data();

    ucp_rkey_h get_rkey(ucp_ep_h ep, ucp_mem_h memh);

protected:
    vec_type                               m_send_data;
    vec_type                               m_recv_data;
    ucs::handle<ucp_mem_h, ucp_context_h>  m_memh_sender;
    ucs::handle<ucp_mem_h, ucp_context_h>  m_memh_receiver;
    std::vector< ucs::handle<ucp_rkey_h> > m_rkeys;

private:
    static void stream_recv_completion(void *request, ucs_status_t status,
                                       size_t length);

    static void unmap_memh(ucp_mem_h memh, ucp_context_h context);
};

std::vector<ucp_test_param>
test_ucp_wireup::enum_test_params_features(const ucp_params_t& ctx_params,
                                           const std::string& name,
                                           const std::string& test_case_name,
                                           const std::string& tls,
                                           uint64_t features, bool test_all)
{
    std::vector<ucp_test_param> result;
    ucp_params_t tmp_ctx_params = ctx_params;

    if (features & UCP_FEATURE_RMA) {
        tmp_ctx_params.features = UCP_FEATURE_RMA;
        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
                                     tls, TEST_RMA, result);

        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/rma",
                                     tls, TEST_RMA | UNIFIED_MODE, result);
    }

    if (features & UCP_FEATURE_TAG) {
        tmp_ctx_params.features = UCP_FEATURE_TAG;
        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
                                     tls, TEST_TAG, result);

        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/tag",
                                     tls, TEST_TAG | UNIFIED_MODE, result);
    }

    if (features & UCP_FEATURE_STREAM) {
        tmp_ctx_params.features = UCP_FEATURE_STREAM;
        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
                                     tls, TEST_STREAM, result);

        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/stream",
                                     tls, TEST_STREAM | UNIFIED_MODE, result);
    }

    if (features & (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64)) {
        tmp_ctx_params.features = (UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64);
        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/amo",
                                     tls, TEST_AMO, result);
    }

    if (test_all) {
        uint64_t all_flags = (TEST_TAG | TEST_RMA | TEST_STREAM);
        tmp_ctx_params.features = features;
        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
                                     tls, all_flags, result);

        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/all",
                                     tls, all_flags | UNIFIED_MODE, result);
    }

    return result;
}

void test_ucp_wireup::unmap_memh(ucp_mem_h memh, ucp_context_h context)
{
    ucs_status_t status = ucp_mem_unmap(context, memh);
    if (status != UCS_OK) {
        ucs_warn("failed to unmap memory: %s", ucs_status_string(status));
    }
}

void test_ucp_wireup::init()
{
    if (GetParam().variant & UNIFIED_MODE) {
        modify_config("UNIFIED_MODE",  "y");
    }

    ucp_test::init();

    m_send_data.resize(BUFFER_LENGTH, 0);
    m_recv_data.resize(BUFFER_LENGTH, 0);

    if (GetParam().variant & (TEST_RMA | TEST_AMO)) {
        ucs_status_t status;
        ucp_mem_map_params_t params;
        ucp_mem_h memh;

        params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
                            UCP_MEM_MAP_PARAM_FIELD_LENGTH |
                            UCP_MEM_MAP_PARAM_FIELD_FLAGS;
        params.address    = &m_recv_data[0];
        params.length     = m_recv_data.size() * sizeof(m_recv_data[0]);
        params.flags      = 0;

        status = ucp_mem_map(sender().ucph(), &params, &memh);
        ASSERT_UCS_OK(status);
        m_memh_sender.reset(memh, unmap_memh, sender().ucph());

        status = ucp_mem_map(receiver().ucph(), &params, &memh);
        ASSERT_UCS_OK(status);
        m_memh_receiver.reset(memh, unmap_memh, receiver().ucph());
    }
}

ucp_rkey_h test_ucp_wireup::get_rkey(ucp_ep_h ep, ucp_mem_h memh)
{
    void *rkey_buffer;
    size_t rkey_size;
    ucs_status_t status;
    ucp_rkey_h rkey;

    if (memh == m_memh_receiver) {
        status = ucp_rkey_pack(receiver().ucph(), memh, &rkey_buffer, &rkey_size);
    } else if (memh == m_memh_sender) {
        status = ucp_rkey_pack(sender().ucph(), memh, &rkey_buffer, &rkey_size);
    } else {
        status = UCS_ERR_INVALID_PARAM;
    }
    ASSERT_UCS_OK(status);

    status = ucp_ep_rkey_unpack(ep, rkey_buffer, &rkey);
    ASSERT_UCS_OK(status);

    ucp_rkey_buffer_release(rkey_buffer);

    return rkey;
}

void test_ucp_wireup::rkeys_cleanup() {
    m_rkeys.clear();
}

void test_ucp_wireup::memhs_cleanup() {
    m_memh_sender.reset();
    m_memh_receiver.reset();
}

void test_ucp_wireup::cleanup() {
    rkeys_cleanup();
    memhs_cleanup();
    ucp_test::cleanup();
}

void test_ucp_wireup::clear_recv_data() {
    std::fill(m_recv_data.begin(), m_recv_data.end(), 0);
}

void test_ucp_wireup::send_nb(ucp_ep_h ep, size_t length, int repeat,
                              std::vector<void*>& reqs, uint64_t send_data)
{
    if (GetParam().variant & TEST_TAG) {
        std::fill(m_send_data.begin(), m_send_data.end(), send_data);
        for (int i = 0; i < repeat; ++i) {
            void *req = ucp_tag_send_nb(ep, &m_send_data[0], length,
                                        DT_U64, TAG, send_completion);
            if (UCS_PTR_IS_PTR(req)) {
                reqs.push_back(req);
            } else {
                ASSERT_UCS_OK(UCS_PTR_STATUS(req));
            }
        }
    } else if (GetParam().variant & TEST_STREAM) {
        std::fill(m_send_data.begin(), m_send_data.end(), send_data);
        for (int i = 0; i < repeat; ++i) {
            void *req = ucp_stream_send_nb(ep, &m_send_data[0], length, DT_U64,
                                           send_completion, 0);
            if (UCS_PTR_IS_PTR(req)) {
                reqs.push_back(req);
            } else {
                ASSERT_UCS_OK(UCS_PTR_STATUS(req));
            }
        }
    } else if (GetParam().variant & TEST_RMA) {
        clear_recv_data();

        ucp_mem_h memh  = (sender().ucph() == ep->worker->context) ?
                            m_memh_receiver : m_memh_sender;
        ucp_rkey_h rkey = get_rkey(ep, memh);

        m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));

        for (int i = 0; i < repeat; ++i) {
            std::fill(m_send_data.begin(), m_send_data.end(), send_data + i);
            void *req = ucp_put_nb(ep, &m_send_data[0],
                                   m_send_data.size() * sizeof(m_send_data[0]),
                                   (uintptr_t)&m_recv_data[0], rkey,
                                   send_completion);
            if (UCS_PTR_IS_PTR(req)) {
                reqs.push_back(req);
            } else {
                ASSERT_UCS_OK(UCS_PTR_STATUS(req));
            }
        }
    }
}

void test_ucp_wireup::send_b(ucp_ep_h ep, size_t length, int repeat,
                             uint64_t send_data)
{
    std::vector<void*> reqs;
    send_nb(ep, length, repeat, reqs, send_data);
    waitall(reqs);
}

void test_ucp_wireup::recv_b(ucp_worker_h worker, ucp_ep_h ep, size_t length,
                             int repeat, uint64_t recv_data)
{
    if (GetParam().variant & (TEST_TAG | TEST_STREAM)) {
        for (int i = 0; i < repeat; ++i) {
            size_t recv_length;
            void *req;

            clear_recv_data();
            if (GetParam().variant & TEST_TAG) {
                req = ucp_tag_recv_nb(worker, &m_recv_data[0], length, DT_U64,
                                      TAG, (ucp_tag_t)-1, tag_recv_completion);
            } else if (GetParam().variant & TEST_STREAM) {
                req = ucp_stream_recv_nb(ep, &m_recv_data[0], length, DT_U64,
                                         stream_recv_completion, &recv_length,
                                         UCP_STREAM_RECV_FLAG_WAITALL);
            } else {
                req = NULL;
            }
            if (UCS_PTR_IS_PTR(req)) {
                wait(req);
            } else {
                ASSERT_UCS_OK(UCS_PTR_STATUS(req));
            }
            EXPECT_EQ(recv_data, m_recv_data[0])
                      << "repeat " << i << "/" << repeat;
            EXPECT_EQ(length,
                      (size_t)std::count(m_recv_data.begin(),
                                         m_recv_data.begin() + length,
                                         recv_data));
        }
    } else if (GetParam().variant & TEST_RMA) {
        for (size_t i = 0; i < length; ++i) {
            while (m_recv_data[i] != recv_data + repeat - 1) {
                progress();
            }
        }
    }
}

void test_ucp_wireup::send_completion(void *request, ucs_status_t status)
{
}

void test_ucp_wireup::tag_recv_completion(void *request, ucs_status_t status,
                                          ucp_tag_recv_info_t *info)
{
}

void test_ucp_wireup::stream_recv_completion(void *request, ucs_status_t status,
                                             size_t length)
{
}

void test_ucp_wireup::send_recv(ucp_ep_h send_ep, ucp_worker_h recv_worker,
                                ucp_ep_h recv_ep, size_t length, int repeat)
{
    std::vector<void*> send_reqs;
    static uint64_t next_send_data = 0;
    uint64_t send_data = next_send_data++;

    send_nb(send_ep, length, repeat, send_reqs, send_data);
    recv_b (recv_worker, recv_ep, length, repeat, send_data);
    waitall(send_reqs);
    m_rkeys.clear();
}

void test_ucp_wireup::disconnect(ucp_ep_h ep) {
    void *req = ucp_disconnect_nb(ep);
    if (!UCS_PTR_IS_PTR(req)) {
        ASSERT_UCS_OK(UCS_PTR_STATUS(req));
    }
    wait(req);
}

void test_ucp_wireup::disconnect(ucp_test::entity &e) {
    disconnect(e.revoke_ep());
}

void test_ucp_wireup::waitall(std::vector<void*> reqs)
{
    while (!reqs.empty()) {
        wait(reqs.back());
        reqs.pop_back();
    }
}

class test_ucp_wireup_1sided : public test_ucp_wireup {
public:
    static std::vector<ucp_test_param>
    enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
                     const std::string& test_case_name, const std::string& tls)
    {
        return enum_test_params_features(ctx_params, name, test_case_name, tls,
                                         UCP_FEATURE_RMA | UCP_FEATURE_TAG);
    }
};

UCS_TEST_P(test_ucp_wireup_1sided, address) {
    ucs_status_t status;
    size_t size;
    void *buffer;
    ucp_lane_index_t lanes2remote[UCP_MAX_LANES];
    std::set<uint8_t> packed_dev_priorities, unpacked_dev_priorities;
    ucp_rsc_index_t tl;

    status = ucp_address_pack(sender().worker(), NULL,
                              std::numeric_limits<uint64_t>::max(),
                              UCP_ADDRESS_PACK_FLAG_ALL, lanes2remote, &size,
                              &buffer);
    ASSERT_UCS_OK(status);
    ASSERT_TRUE(buffer != NULL);
    ASSERT_GT(size, 0ul);
    EXPECT_LE(size, 2048ul); /* Expect a reasonable address size */

    ucs_for_each_bit(tl, sender().worker()->context->tl_bitmap) {
        if (sender().worker()->context->tl_rscs[tl].flags & UCP_TL_RSC_FLAG_SOCKADDR) {
            continue;
        }
        packed_dev_priorities.insert(ucp_worker_iface_get_attr(sender().worker(), tl)->priority);
    }

    ucp_unpacked_address unpacked_address;

    status = ucp_address_unpack(sender().worker(), buffer,
                                std::numeric_limits<uint64_t>::max(),
                                &unpacked_address);
    ASSERT_UCS_OK(status);

    EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
#if ENABLE_DEBUG_DATA
    EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
              std::string(unpacked_address.name));
#endif
    EXPECT_LE(unpacked_address.address_count,
              static_cast<unsigned>(sender().ucph()->num_tls));

    const ucp_address_entry_t *ae;
    ucp_unpacked_address_for_each(ae, &unpacked_address) {
        unpacked_dev_priorities.insert(ae->iface_attr.priority);
    }

    /* TODO test addresses */

    ucs_free(unpacked_address.address_list);
    ucs_free(buffer);
    /* Make sure that the packed device priorities are equal to the unpacked
     * device priorities */
    ASSERT_TRUE(packed_dev_priorities == unpacked_dev_priorities);
}

UCS_TEST_P(test_ucp_wireup_1sided, empty_address) {
    ucs_status_t status;
    size_t size;
    void *buffer;
    ucp_lane_index_t lanes2remote[UCP_MAX_LANES];

    status = ucp_address_pack(sender().worker(), NULL, 0,
                              UCP_ADDRESS_PACK_FLAG_ALL, lanes2remote, &size,
                              &buffer);
    ASSERT_UCS_OK(status);
    ASSERT_TRUE(buffer != NULL);
    ASSERT_GT(size, 0ul);

    ucp_unpacked_address unpacked_address;

    status = ucp_address_unpack(sender().worker(), buffer,
                                std::numeric_limits<uint64_t>::max(),
                                &unpacked_address);
    ASSERT_UCS_OK(status);

    EXPECT_EQ(sender().worker()->uuid, unpacked_address.uuid);
#if ENABLE_DEBUG_DATA
    EXPECT_EQ(std::string(ucp_worker_get_name(sender().worker())),
              std::string(unpacked_address.name));
#endif
    EXPECT_EQ(0u, unpacked_address.address_count);

    ucs_free(unpacked_address.address_list);
    ucs_free(buffer);
}

UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup) {
    sender().connect(&receiver(), get_ep_params());
    send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
    flush_worker(sender());
}

UCS_TEST_P(test_ucp_wireup_1sided, one_sided_wireup_rndv, "RNDV_THRESH=1") {
    sender().connect(&receiver(), get_ep_params());
    send_recv(sender().ep(), receiver().worker(), receiver().ep(), BUFFER_LENGTH, 1);
    if (is_loopback() && (GetParam().variant & TEST_TAG)) {
        /* expect the endpoint to be connected to itself */
        ucp_ep_h ep = sender().ep();
        EXPECT_EQ((uintptr_t)ep, ucp_ep_dest_ep_ptr(ep));
    }
    flush_worker(sender());
}

UCS_TEST_P(test_ucp_wireup_1sided, multi_wireup) {
    skip_loopback();

    const size_t count = 10;
    while (entities().size() < count) {
        create_entity();
    }

    /* connect from sender() to all the rest */
    for (size_t i = 0; i < count; ++i) {
        sender().connect(&entities().at(i), get_ep_params(), i);
    }
}

UCS_TEST_P(test_ucp_wireup_1sided, stress_connect) {
    for (int i = 0; i < 30; ++i) {
        sender().connect(&receiver(), get_ep_params());
        send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1,
                  10000 / (ucs::test_time_multiplier() *
                           ucs::test_time_multiplier()));
        if (!is_loopback()) {
            receiver().connect(&sender(), get_ep_params());
        }

        disconnect(sender());
        if (!is_loopback()) {
            disconnect(receiver());
        }
    }
}

UCS_TEST_P(test_ucp_wireup_1sided, stress_connect2) {
    int max_count = (int)ucs_max(10,
                                 (1000.0 / (ucs::test_time_multiplier() *
                                            ucs::test_time_multiplier())));
    int count     = ucs_min(max_count, max_connections() / 2);

    for (int i = 0; i < count; ++i) {
        sender().connect(&receiver(), get_ep_params());
        send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
        if (!is_loopback()) {
            receiver().connect(&sender(), get_ep_params());
        }

        disconnect(sender());
        if (!is_loopback()) {
            disconnect(receiver());
        }
    }
}

UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nonexistent) {
    skip_loopback();
    sender().connect(&receiver(), get_ep_params());
    disconnect(sender());
    receiver().destroy_worker();
    sender().destroy_worker();
}

UCS_TEST_P(test_ucp_wireup_1sided, disconnect_reconnect) {
    sender().connect(&receiver(), get_ep_params());
    send_b(sender().ep(), 1000, 1);
    disconnect(sender());
    recv_b(receiver().worker(), receiver().ep(), 1000, 1);

    sender().connect(&receiver(), get_ep_params());
    send_b(sender().ep(), 1000, 1);
    disconnect(sender());
    recv_b(receiver().worker(), receiver().ep(), 1000, 1);
}

UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided) {
    sender().connect(&receiver(), get_ep_params());
    send_b(sender().ep(), 1000, 100);
    disconnect(sender());
    recv_b(receiver().worker(), receiver().ep(), 1000, 100);
}

UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_nozcopy, "ZCOPY_THRESH=-1") {
    sender().connect(&receiver(), get_ep_params());
    send_b(sender().ep(), 1000, 100);
    disconnect(sender());
    recv_b(receiver().worker(), receiver().ep(), 1000, 100);
}

UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_onesided_wait) {
    sender().connect(&receiver(), get_ep_params());
    send_recv(sender().ep(), receiver().worker(), receiver().ep(), 8, 1);
    send_b(sender().ep(), 1000, 200);
    disconnect(sender());
    recv_b(receiver().worker(), receiver().ep(), 1000, 200);
}

UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply1) {
    sender().connect(&receiver(), get_ep_params());
    if (!is_loopback()) {
        receiver().connect(&sender(), get_ep_params());
    }

    send_b(sender().ep(), 8, 1);
    if (!is_loopback()) {
        disconnect(sender());
    }

    recv_b(receiver().worker(), receiver().ep(), 8, 1);
    send_b(receiver().ep(), 8, 1);
    disconnect(receiver());
    recv_b(sender().worker(), sender().ep(), 8, 1);
}

UCS_TEST_P(test_ucp_wireup_1sided, send_disconnect_reply2) {
    sender().connect(&receiver(), get_ep_params());

    send_b(sender().ep(), 8, 1);
    if (!is_loopback()) {
        disconnect(sender());
    }
    recv_b(receiver().worker(), receiver().ep(),  8, 1);

    if (!is_loopback()) {
        receiver().connect(&sender(), get_ep_params());
    }

    send_b(receiver().ep(), 8, 1);
    disconnect(receiver());
    recv_b(sender().worker(), receiver().ep(), 8, 1);
}

UCS_TEST_P(test_ucp_wireup_1sided, disconnect_nb_onesided) {
    sender().connect(&receiver(), get_ep_params());

    std::vector<void*> sreqs;
    send_nb(sender().ep(), 1000, 1000, sreqs);

    void *dreq = sender().disconnect_nb();
    if (!UCS_PTR_IS_PTR(dreq)) {
        ASSERT_UCS_OK(UCS_PTR_STATUS(dreq));
    }

    wait(dreq);
    recv_b(receiver().worker(), receiver().ep(), 1000, 1000);

    waitall(sreqs);
}

UCS_TEST_P(test_ucp_wireup_1sided, multi_ep_1sided) {
    const unsigned count = 10;

    for (unsigned i = 0; i < count; ++i) {
        sender().connect(&receiver(), get_ep_params(), i);
    }

    for (unsigned i = 0; i < count; ++i) {
        send_recv(sender().ep(0, i), receiver().worker(), receiver().ep(), 8, 1);
    }
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_1sided)

class test_ucp_wireup_2sided : public test_ucp_wireup {
public:
    static std::vector<ucp_test_param>
    enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
                     const std::string& test_case_name, const std::string& tls)
    {
        return enum_test_params_features(ctx_params, name, test_case_name, tls,
                                         UCP_FEATURE_RMA | UCP_FEATURE_TAG |
                                         UCP_FEATURE_STREAM);
    }

protected:
    void test_connect_loopback(bool delay_before_connect, bool enable_loopback);
};

UCS_TEST_P(test_ucp_wireup_2sided, two_sided_wireup) {
    sender().connect(&receiver(), get_ep_params());
    if (!is_loopback()) {
        receiver().connect(&sender(), get_ep_params());
    }

    send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
    flush_worker(sender());
    send_recv(receiver().ep(), sender().worker(), sender().ep(), 1, 1);
    flush_worker(receiver());
}

void test_ucp_wireup_2sided::test_connect_loopback(bool delay_before_connect,
                                                   bool enable_loopback) {
    ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
    if (!enable_loopback) {
        params.field_mask |= UCP_EP_PARAM_FIELD_FLAGS;
        params.flags      |= UCP_EP_PARAMS_FLAGS_NO_LOOPBACK;
    }

    for (int i = 0; i < 5; ++i) {
        int base_index = i * 2;
        sender().connect(&sender(), params, base_index);
        ucp_ep_h ep1 = sender().ep(0, base_index);

        if (delay_before_connect) {
            /* let one side create ep */
            short_progress_loop(0);
        }

        sender().connect(&sender(), params, base_index + 1);
        ucp_ep_h ep2 = sender().ep(0, base_index + 1);

        EXPECT_NE(ep1, ep2);

        if (GetParam().variant & TEST_STREAM) {
            uint64_t data1 = (base_index * 10) + 1;
            uint64_t data2 = (base_index * 10) + 2;

            send_b(ep1, 1, 1, data1);
            send_b(ep2, 1, 1, data2);

            if (enable_loopback) {
                /* self-send - each ep receives what was sent on it */
                recv_b(sender().worker(), ep1, 1, 1, data1);
                recv_b(sender().worker(), ep2, 1, 1, data2);
            } else {
                /* cross-send - each ep receives what was sent on the other ep */
                recv_b(sender().worker(), ep1, 1, 1, data2);
                recv_b(sender().worker(), ep2, 1, 1, data1);
            }
        }
    }
    flush_worker(sender());
}

UCS_TEST_P(test_ucp_wireup_2sided, loopback) {
    test_connect_loopback(false, true);
}

UCS_TEST_P(test_ucp_wireup_2sided, loopback_with_delay) {
    test_connect_loopback(true, true);
}

UCS_TEST_P(test_ucp_wireup_2sided, no_loopback) {
    test_connect_loopback(false, false);
}

UCS_TEST_P(test_ucp_wireup_2sided, no_loopback_with_delay) {
    test_connect_loopback(true, false);
}

UCS_TEST_SKIP_COND_P(test_ucp_wireup_2sided, async_connect,
                     !(GetParam().ctx_params.features & UCP_FEATURE_TAG)) {
    sender().connect(&receiver(), get_ep_params());
    ucp_ep_h send_ep = sender().ep();
    std::vector<void *> reqs;

    reqs.push_back(ucp_tag_send_nb(send_ep, NULL, 0, DT_U64, 1, send_completion));
    EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));

    ucs_time_t deadline = ucs::get_deadline();
    /* waiting of async reply on wiriup without calling progress on receiver */
    while(!(send_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED) &&
          (ucs_get_time() < deadline)) {
        ucp_worker_progress(sender().worker());
    }

    reqs.push_back(ucp_tag_recv_nb(receiver().worker(), NULL, 0, DT_U64, 1,
                                   (ucp_tag_t)-1, tag_recv_completion));
    EXPECT_FALSE(UCS_PTR_IS_ERR(reqs.back()));
    waitall(reqs);
}

UCS_TEST_P(test_ucp_wireup_2sided, connect_disconnect) {
    sender().connect(&receiver(), get_ep_params());
    if (!is_loopback()) {
        receiver().connect(&sender(), get_ep_params());
    }
    disconnect(sender());
    if (!is_loopback()) {
        disconnect(receiver());
    }
}

UCS_TEST_P(test_ucp_wireup_2sided, multi_ep_2sided) {
    const unsigned count = 10;

    for (unsigned j = 0; j < 4; ++j) {

        unsigned offset = j * count;

        for (unsigned i = 0; i < count; ++i) {
            unsigned ep_idx = offset + i;
            sender().connect(&receiver(), get_ep_params(), ep_idx);
            if (!is_loopback()) {
                receiver().connect(&sender(), get_ep_params(), ep_idx);
            }
            UCS_TEST_MESSAGE << "iteration " << j << " pair " << i << ": " <<
                            sender().ep(0, ep_idx) << " <--> " << receiver().ep(0, ep_idx);
        }

        for (unsigned i = 0; i < count; ++i) {
            unsigned ep_idx = offset + i;
            send_recv(sender().ep(0, ep_idx), receiver().worker(),
                      receiver().ep(0, ep_idx), 8, 1);
            send_recv(receiver().ep(0, ep_idx), sender().worker(),
                      sender().ep(0, ep_idx), 8, 1);
        }

        short_progress_loop(0);
    }
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_2sided)

class test_ucp_wireup_errh_peer : public test_ucp_wireup_1sided
{
public:
    virtual ucp_ep_params_t get_ep_params() {
        ucp_ep_params_t params = test_ucp_wireup::get_ep_params();
        params.field_mask     |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
                                 UCP_EP_PARAM_FIELD_ERR_HANDLER;
        params.err_mode        = UCP_ERR_HANDLING_MODE_PEER;
        params.err_handler.cb  = err_cb;
        params.err_handler.arg = NULL;
        return params;
    }

    virtual void init() {
        test_ucp_wireup::init();
        skip_loopback();
    }

    static void err_cb(void *, ucp_ep_h, ucs_status_t) {}
};

UCS_TEST_P(test_ucp_wireup_errh_peer, msg_after_ep_create) {
    receiver().connect(&sender(), get_ep_params());

    sender().connect(&receiver(), get_ep_params());
    send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
    flush_worker(sender());
}

UCS_TEST_P(test_ucp_wireup_errh_peer, msg_before_ep_create) {

    sender().connect(&receiver(), get_ep_params());
    send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
    flush_worker(sender());

    receiver().connect(&sender(), get_ep_params());

    send_recv(receiver().ep(), sender().worker(), receiver().ep(), 1, 1);
    flush_worker(receiver());
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_errh_peer)

class test_ucp_wireup_fallback : public test_ucp_wireup {
public:
    test_ucp_wireup_fallback() {
        m_num_lanes = 0;
    }

    static std::vector<ucp_test_param>
    enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
                     const std::string& test_case_name, const std::string& tls)
    {
        return enum_test_params_features(ctx_params, name, test_case_name, tls,
                                         UCP_FEATURE_TAG | UCP_FEATURE_RMA |
                                         UCP_FEATURE_STREAM, 1);
    }

    void init() {
        /* do nothing */
    }

    void cleanup() {
        /* do nothing */
    }

    bool test_est_num_eps_fallback(size_t est_num_eps, size_t &min_max_num_eps,
                                   bool has_only_unscalable) {
        size_t num_lanes = 0;
        bool res         = true;

        min_max_num_eps = UCS_ULUNITS_INF;

        UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
        modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());
        test_ucp_wireup::init();

        sender().connect(&receiver(), get_ep_params());
        if (!is_loopback()) {
            receiver().connect(&sender(), get_ep_params());
        }
        send_recv(sender().ep(), receiver().worker(), receiver().ep(), 1, 1);
        flush_worker(sender());

        for (ucp_lane_index_t lane = 0;
             lane < ucp_ep_num_lanes(sender().ep()); lane++) {
            uct_ep_h uct_ep = sender().ep()->uct_eps[lane];
            if (uct_ep == NULL) {
                continue;
            }

            uct_iface_attr_t iface_attr;
            ucs_status_t status = uct_iface_query(uct_ep->iface, &iface_attr);
            ASSERT_UCS_OK(status);

            num_lanes++;

            if (!has_only_unscalable) {
                if (iface_attr.max_num_eps < est_num_eps) {
                    res = false;
                    goto out;
                }
            }

            if (iface_attr.max_num_eps < min_max_num_eps) {
                min_max_num_eps = iface_attr.max_num_eps;
            }
        }

out:
        test_ucp_wireup::cleanup();

        if (est_num_eps == 1) {
            m_num_lanes = num_lanes;
        } else if (has_only_unscalable) {
            /* If has only unscalable transports, check that the number of
             * lanes is the same as for the case when "est_num_eps == 1" */
            res = (num_lanes == m_num_lanes);
        }

        return res;
    }

private:

    /* The number of lanes activated for the case when "est_num_eps == 1" */
    size_t m_num_lanes;
};

UCS_TEST_P(test_ucp_wireup_fallback, est_num_eps_fallback) {
    size_t test_min_max_eps, min_max_eps;
    std::vector<std::string> rc_tls;

    rc_tls.push_back("rc_v");
    rc_tls.push_back("rc_x");

    /* If test is running with RC only (i.e. unscalable transport),
     * check that a number of created lanes is the same for different
     * number of estimated EPs values */
    bool has_only_rc = has_only_transports(rc_tls);

    test_est_num_eps_fallback(1, test_min_max_eps, has_only_rc);

    size_t prev_min_max_eps = 0;
    while ((test_min_max_eps != UCS_ULUNITS_INF) &&
           /* number of EPs was changed between iterations */
           (test_min_max_eps != prev_min_max_eps)) {
        if (test_min_max_eps > 1) {
            EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps - 1,
                                                  min_max_eps, has_only_rc));
        }

        EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps,
                                              min_max_eps, has_only_rc));

        EXPECT_TRUE(test_est_num_eps_fallback(test_min_max_eps + 1,
                                              min_max_eps, has_only_rc));
        prev_min_max_eps = test_min_max_eps;
        test_min_max_eps = min_max_eps;
    }
}

/* Test fallback from RC to UD, since RC isn't scalable enough
 * as its iface max_num_eps attribute = 256 by default */
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
                              rc_ud, "rc_x,rc_v,ud_x,ud_v")
/* Test two scalable enough transports */
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
                              dc_ud, "dc_x,ud_x,ud_v")
/* Test unsacalable transports only */
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
                              rc, "rc_x,rc_v")
/* Test all available ib transports */
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback,
                              ib, "ib")

class test_ucp_wireup_unified : public test_ucp_wireup {
public:
    static std::vector<ucp_test_param>
    enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
                     const std::string& test_case_name, const std::string& tls)
    {
        std::vector<ucp_test_param> result;
        ucp_params_t tmp_ctx_params = ctx_params;

        tmp_ctx_params.features = UCP_FEATURE_TAG;

        generate_test_params_variant(tmp_ctx_params, name, test_case_name + "/uni",
                                     tls, TEST_TAG | UNIFIED_MODE, result);
        return result;
    }

    bool context_has_tls(ucp_context_h ctx, const std::string& tl,
                         ucp_rsc_index_t md_idx)
    {
        for (ucp_rsc_index_t idx = 0; idx < ctx->num_tls; ++idx) {
            if (ctx->tl_rscs[idx].md_index != md_idx) {
                continue;
            }

            if (!strcmp(ctx->tl_rscs[idx].tl_rsc.tl_name, tl.c_str())) {
                return true;
            }
        }

        return false;
    }

    bool worker_has_tls(ucp_worker_h worker, const std::string& tl,
                        ucp_rsc_index_t md_idx)
    {
        ucp_context_h ctx = worker->context;

        for (unsigned i = 0; i < worker->num_ifaces; ++i) {
            ucp_worker_iface_t *wiface = worker->ifaces[i];
            ucp_rsc_index_t md_idx_it  = ctx->tl_rscs[wiface->rsc_index].md_index;

            if (md_idx_it != md_idx) {
                continue;
            }

            char* name = ctx->tl_rscs[wiface->rsc_index].tl_rsc.tl_name;
            if (!strcmp(name, tl.c_str())) {
                return true;
            }
        }
        return false;
    }

    void check_unified_ifaces(entity *e,
                              const std::string& better_tl,
                              const std::string& tl)
    {
        ucp_context_h ctx   = e->ucph();
        ucp_worker_h worker = e->worker();

        for (ucp_rsc_index_t i = 0; i < ctx->num_mds; ++i) {
            if (!(context_has_tls(ctx, better_tl, i) &&
                  context_has_tls(ctx, tl, i))) {
               continue;
            }

            ASSERT_TRUE(ctx->num_tls > worker->num_ifaces);
            EXPECT_TRUE(worker_has_tls(worker, better_tl, i));
            EXPECT_FALSE(worker_has_tls(worker, tl, i));
        }
    }
};


UCS_TEST_P(test_ucp_wireup_unified, select_best_ifaces)
{
    // Accelerated transports have better performance charasteristics than their
    // verbs counterparts. Check that corresponding verbs transports are not used
    // by workers in unified mode.
    check_unified_ifaces(&sender(), "rc_mlx5", "rc_verbs");
    check_unified_ifaces(&sender(), "ud_mlx5", "ud_verbs");

    // RC and DC has similar capabilities, but RC has better latency while
    // estimated number of endpoints is relatively small.
    // sender() is created with 1 ep, so RC should be selected over DC.
    check_unified_ifaces(&sender(), "rc_mlx5", "dc_mlx5");

    // Set some big enough number of endpoints for DC to be more performance
    // efficient than RC. Now check that DC is selected over RC.
    modify_config("NUM_EPS", "1000");
    entity *e = create_entity();
    check_unified_ifaces(e, "dc_mlx5", "rc_mlx5");
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc, "rc")
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, ud, "ud")
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_unified, rc_dc, "rc,dc")

class test_ucp_wireup_fallback_amo : public test_ucp_wireup {
    void init() {
        size_t device_atomics_cnt = 0;

        test_ucp_wireup::init();

        for (ucp_rsc_index_t idx = 0; idx < sender().ucph()->num_tls; ++idx) {
            uct_iface_attr_t *attr = ucp_worker_iface_get_attr(sender().worker(),
                                                               idx);
            if (attr->cap.flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) {
                device_atomics_cnt++;
            }
        }
        bool device_atomics_supported = sender().worker()->atomic_tls != 0;

        test_ucp_wireup::cleanup();

        if (!device_atomics_supported || !device_atomics_cnt) {
            UCS_TEST_SKIP_R("there are no TLs that support device atomics");
        }
    }

    void cleanup() {
        /* do nothing */
    }

protected:

    bool use_device_amo(ucp_ep_h ep) {
        ucp_ep_config_t *ep_config = ucp_ep_config(ep);

        for (ucp_lane_index_t lane = 0; lane < UCP_MAX_LANES; ++lane) {
            if (ep_config->key.amo_lanes[lane] != UCP_NULL_LANE) {
                return (ucp_ep_get_iface_attr(ep, lane)->cap.flags &
                        UCT_IFACE_FLAG_ATOMIC_DEVICE);
            }
        }

        return false;
    }

    size_t get_min_max_num_eps(ucp_ep_h ep) {
        size_t min_max_num_eps = UCS_ULUNITS_INF;

        for (ucp_lane_index_t lane = 0; lane < ucp_ep_num_lanes(ep); lane++) {
            uct_iface_attr_t *iface_attr = ucp_ep_get_iface_attr(ep, lane);

            if (iface_attr->max_num_eps < min_max_num_eps) {
                min_max_num_eps = iface_attr->max_num_eps;
            }
        }

        return min_max_num_eps;
    }

    size_t test_wireup_fallback_amo(const std::vector<std::string> &tls,
                                    size_t est_num_eps, bool should_use_device_amo) {
        size_t min_max_num_eps = UCS_ULUNITS_INF;

        UCS_TEST_MESSAGE << "Testing " << est_num_eps << " number of EPs";
        modify_config("NUM_EPS", ucs::to_string(est_num_eps).c_str());

        // Create new entity and add to to the end of vector
        // (thus it will be receiver without any connections)
        create_entity(false);

        ucp_test_param params = GetParam();
        for (std::vector<std::string>::const_iterator i = tls.begin();
             i != tls.end(); ++i) {
            params.transports.clear();
            params.transports.push_back(*i);
            create_entity(true, params);
            sender().connect(&receiver(), get_ep_params());

            EXPECT_EQ(should_use_device_amo, use_device_amo(sender().ep()));

            size_t max_num_eps = get_min_max_num_eps(sender().ep());
            if (max_num_eps < min_max_num_eps) {
                min_max_num_eps = max_num_eps;
            }
        }

        test_ucp_wireup::cleanup();

        return min_max_num_eps;
    }

public:

    static ucp_params_t get_ctx_params() {
        ucp_params_t params = test_ucp_wireup::get_ctx_params();
        params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
        params.features    |= (UCP_FEATURE_AMO32 |
                               UCP_FEATURE_AMO64);
        return params;
    }
};

class test_ucp_wireup_amo : public test_ucp_wireup {
public:
    typedef struct {
        test_ucp_wireup_amo *test;
    } request_t;

    static ucp_params_t get_ctx_params() {
        ucp_params_t params = test_ucp_wireup::get_ctx_params();
        params.field_mask  |= UCP_PARAM_FIELD_REQUEST_SIZE;
        params.request_size = sizeof(request_t);
        return params;
    }

    static std::vector<ucp_test_param>
    enum_test_params(const ucp_params_t& ctx_params, const std::string& name,
                     const std::string& test_case_name, const std::string& tls)
    {
        uint64_t amo_features;

        EXPECT_TRUE((sizeof(elem_type) == 4ul) || (sizeof(elem_type) == 8ul));
        amo_features = (sizeof(elem_type) == 4ul) ? UCP_FEATURE_AMO32 :
                       UCP_FEATURE_AMO64;
        return enum_test_params_features(ctx_params, name, test_case_name, tls,
                                         amo_features, false);
    }

protected:
    ucp_rkey_h get_rkey(const entity &e) {
        if (&sender() == &e) {
            return test_ucp_wireup::get_rkey(e.ep(), m_memh_receiver);
        } else if (&receiver() == &e) {
            return test_ucp_wireup::get_rkey(e.ep(), m_memh_sender);
        }

        return NULL;
    }

    void add_rkey(ucp_rkey_h rkey) {
        ASSERT_NE((ucp_rkey_h)NULL, rkey);
        m_rkeys.push_back(ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy));
    }

    void fill_send_data() {
        m_send_data[0] = ucs_generate_uuid(0);
    }

    static void flush_cb(void *req, ucs_status_t status) {
        request_t *request = (request_t *)req;

        ASSERT_UCS_OK(status);
        request->test->rkeys_cleanup();
        request->test->memhs_cleanup();
    }
};

UCS_TEST_P(test_ucp_wireup_amo, relese_key_after_flush) {
    fill_send_data();
    clear_recv_data();

    sender().connect(&receiver(), get_ep_params());

    ucp_rkey_h rkey = get_rkey(sender());
    add_rkey(rkey);

    ucs_status_t status = ucp_atomic_post(sender().ep(), UCP_ATOMIC_POST_OP_ADD,
                                          m_send_data[0], sizeof(elem_type),
                                          (uint64_t)&m_recv_data[0], rkey);
    ASSERT_UCS_OK(status);
    request_t *req = (request_t *)ucp_ep_flush_nb(sender().ep(),
                                                  UCT_FLUSH_FLAG_LOCAL,
                                                  flush_cb);
    if (UCS_PTR_IS_PTR(req)) {
        req->test = this;
        wait(req);
    } else {
        ASSERT_UCS_OK(UCS_PTR_STATUS(req));
    }
}

UCP_INSTANTIATE_TEST_CASE(test_ucp_wireup_amo)

UCS_TEST_P(test_ucp_wireup_fallback_amo, different_amo_types) {
    std::vector<std::string> tls;

    /* the 1st peer support RC only (device atomics) */
    tls.push_back("rc");
    /* the 2nd peer support RC and SHM (device and CPU atomics) */
    tls.push_back("rc,shm");

    size_t min_max_num_eps = test_wireup_fallback_amo(tls, 1, 1);
    test_wireup_fallback_amo(tls, min_max_num_eps + 1, 0);
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_fallback_amo,
                              shm_rc, "shm,rc_x,rc_v")

/* NOTE: this fixture is NOT inherited from test_ucp_wireup, because we want to
 * create our own entities.
 */
class test_ucp_wireup_asymmetric : public ucp_test {
protected:
    virtual void init() {
        static const char *ibdev_sysfs_dir = "/sys/class/infiniband";

        DIR *dir = opendir(ibdev_sysfs_dir);
        if (dir == NULL) {
            UCS_TEST_SKIP_R(std::string(ibdev_sysfs_dir) + " not found");
        }

        for (;;) {
            struct dirent *entry = readdir(dir);
            if (entry == NULL) {
                break;
            }

            if (entry->d_name[0] == '.') {
                continue;
            }

            m_ib_devices.push_back(entry->d_name);
        }

        closedir(dir);
    }

    void tag_sendrecv(size_t size) {
        std::string send_data(size, 's');
        std::string recv_data(size, 'x');

        ucs_status_ptr_t sreq = ucp_tag_send_nb(
                        sender().ep(0), &send_data[0], size,
                        ucp_dt_make_contig(1), 1,
                        (ucp_send_callback_t)ucs_empty_function);
        ucs_status_ptr_t rreq = ucp_tag_recv_nb(
                        receiver().worker(), &recv_data[0], size,
                        ucp_dt_make_contig(1), 1, 1,
                        (ucp_tag_recv_callback_t)ucs_empty_function);
        wait(sreq);
        wait(rreq);

        EXPECT_EQ(send_data, recv_data);
    }

    /* Generate a pci_bw configuration string for IB devices, which assigns
     * the speed ai+b for device i.
     */
    std::string pci_bw_config(int a, int b) {
        std::string config_str;
        for (size_t i = 0; i < m_ib_devices.size(); ++i) {
            config_str += m_ib_devices[i] + ":" +
                            ucs::to_string((a * i) + b) + "Gbps";
            if (i != (m_ib_devices.size() - 1)) {
                config_str += ",";
            }
        }
        return config_str;
    }

    std::vector<std::string> m_ib_devices;

public:
    static ucp_params_t get_ctx_params() {
        ucp_params_t params = ucp_test::get_ctx_params();
        params.field_mask  |= UCP_PARAM_FIELD_FEATURES;
        params.features     = UCP_FEATURE_TAG;
        return params;
    }
};

/*
 * Force asymmetric configuration by different PCI_BW settings
 */
UCS_TEST_SKIP_COND_P(test_ucp_wireup_asymmetric, connect, is_self()) {

    /* Enable cross-dev connection */
    /* coverity[tainted_string_argument] */
    ucs::scoped_setenv path_mtu_env("UCX_RC_PATH_MTU", "1024");

    {
        std::string config_str = pci_bw_config(20, 20);
        UCS_TEST_MESSAGE << "creating sender: " << config_str;
        /* coverity[tainted_string_argument] */
        ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
        create_entity();
    }

    {
        std::string config_str = pci_bw_config(-20, m_ib_devices.size() * 20);
        UCS_TEST_MESSAGE << "creating receiver: " << config_str;
        /* coverity[tainted_string_argument] */
        ucs::scoped_setenv pci_bw_env("UCX_IB_PCI_BW", config_str.c_str());
        create_entity();
    }

    sender().connect(&receiver(), get_ep_params());
    receiver().connect(&sender(), get_ep_params());

    ucp_ep_print_info(sender().ep(), stdout);
    ucp_ep_print_info(receiver().ep(), stdout);

    tag_sendrecv(1);
    tag_sendrecv(100000);
    tag_sendrecv(1000000);
}

UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcv, "rc_v")
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, rcx, "rc_x")
UCP_INSTANTIATE_TEST_CASE_TLS(test_ucp_wireup_asymmetric, ib, "ib")