Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "test_p2p_mix.h"

extern "C" {
#include <ucs/arch/atomic.h>
}
#include <functional>


uct_p2p_mix_test::uct_p2p_mix_test() : uct_p2p_test(0), m_send_size(0) {
}

ucs_status_t uct_p2p_mix_test::am_callback(void *arg, void *data, size_t length,
                                           unsigned flags)
{
    ucs_atomic_sub32(&am_pending, 1);
    return UCS_OK;
}

void uct_p2p_mix_test::completion_callback(uct_completion_t *comp, ucs_status_t status)
{
    EXPECT_UCS_OK(status);
}

template <typename T, uct_atomic_op_t OP>
ucs_status_t uct_p2p_mix_test::uct_p2p_mix_test::atomic_fop(const mapped_buffer &sendbuf,
                                                            const mapped_buffer &recvbuf,
                                                            uct_completion_t *comp)
{
    if (sizeof(T) == sizeof(uint32_t)) {
        return uct_ep_atomic32_fetch(sender().ep(0), OP, 1, (uint32_t*)sendbuf.ptr(),
                                     recvbuf.addr(), recvbuf.rkey(), comp);
    } else {
        return uct_ep_atomic64_fetch(sender().ep(0), OP, 1, (uint64_t*)sendbuf.ptr(),
                                     recvbuf.addr(), recvbuf.rkey(), comp);
    }
}

ucs_status_t uct_p2p_mix_test::cswap64(const mapped_buffer &sendbuf,
                                       const mapped_buffer &recvbuf,
                                       uct_completion_t *comp)
{
    return uct_ep_atomic_cswap64(sender().ep(0), 0, 1, recvbuf.addr(),
                                 recvbuf.rkey(), (uint64_t*)sendbuf.ptr(),
                                 comp);
}

ucs_status_t uct_p2p_mix_test::put_short(const mapped_buffer &sendbuf,
                                         const mapped_buffer &recvbuf,
                                         uct_completion_t *comp)
{
    return uct_ep_put_short(sender().ep(0), sendbuf.ptr(),
                            sendbuf.length(), recvbuf.addr(),
                            recvbuf.rkey());
}

ucs_status_t uct_p2p_mix_test::put_bcopy(const mapped_buffer &sendbuf,
                                         const mapped_buffer &recvbuf,
                                         uct_completion_t *comp)
{
    ssize_t packed_len;
    packed_len = uct_ep_put_bcopy(sender().ep(0), mapped_buffer::pack,
                                  (void*)&sendbuf, recvbuf.addr(), recvbuf.rkey());
    if (packed_len >= 0) {
        EXPECT_EQ(sendbuf.length(), (size_t)packed_len);
        return UCS_OK;
    } else {
        return (ucs_status_t)packed_len;
    }
}

ucs_status_t uct_p2p_mix_test::am_short(const mapped_buffer &sendbuf,
                                        const mapped_buffer &recvbuf,
                                        uct_completion_t *comp)
{
    ucs_status_t status;
    status = uct_ep_am_short(sender().ep(0), AM_ID, *(uint64_t*)sendbuf.ptr(),
                             (uint64_t*)sendbuf.ptr() + 1,
                             sendbuf.length() - sizeof(uint64_t));
    if (status == UCS_OK) {
        ucs_atomic_add32(&am_pending, +1);
    }
    return status;
}

ucs_status_t uct_p2p_mix_test::am_zcopy(const mapped_buffer &sendbuf,
                                        const mapped_buffer &recvbuf,
                                        uct_completion_t *comp)
{
    ucs_status_t status;
    size_t header_length;
    uct_iov_t iov;

    header_length = ucs_min(ucs::rand() % sender().iface_attr().cap.am.max_hdr,
                            sendbuf.length());

    iov.buffer = (char*)sendbuf.ptr() + header_length;
    iov.count  = 1;
    iov.length = sendbuf.length() - header_length;
    iov.memh   = sendbuf.memh();
    status = uct_ep_am_zcopy(sender().ep(0), AM_ID, sendbuf.ptr(), header_length,
                             &iov, 1, 0, comp);
    if (status == UCS_OK || status == UCS_INPROGRESS) {
        ucs_atomic_add32(&am_pending, +1);
    }
    return status;
}

void uct_p2p_mix_test::random_op(const mapped_buffer &sendbuf,
                                 const mapped_buffer &recvbuf)
{
    uct_completion_t comp;
    ucs_status_t status;
    int op;

    op         = ucs::rand() % m_avail_send_funcs.size();
    comp.count = 1;
    comp.func  = completion_callback;

    for (;;) {
        status = (this->*m_avail_send_funcs[op])(sendbuf, recvbuf, &comp);
        if (status == UCS_INPROGRESS) {
            /* coverity[loop_condition] */
            while (comp.count > 0) {
                progress();
            }
            break;
        } else if (status == UCS_ERR_NO_RESOURCE) {
            progress();
            continue;
        } else {
            ASSERT_UCS_OK(status);
            break;
        }
    }
}

void uct_p2p_mix_test::run(unsigned count) {
    if (m_avail_send_funcs.size() == 0) {
        UCS_TEST_SKIP_R("unsupported");
    }
    if (sender().md_attr().cap.access_mem_type != UCS_MEMORY_TYPE_HOST) {
        UCS_TEST_SKIP_R("skipping on non-host memory");
    }

    mapped_buffer sendbuf(m_send_size, 0, sender());
    mapped_buffer recvbuf(m_send_size, 0, receiver());

    for (unsigned i = 0; i < count; ++i) {
        random_op(sendbuf, recvbuf);
    }

    flush();
}

void uct_p2p_mix_test::init() {
    uct_p2p_test::init();
    ucs_status_t status = uct_iface_set_am_handler(receiver().iface(), AM_ID,
                                                   am_callback, NULL,
                                                   UCT_CB_FLAG_ASYNC);
    ASSERT_UCS_OK(status);

    m_send_size = MAX_SIZE;
    if (sender().iface_attr().cap.flags & UCT_IFACE_FLAG_AM_SHORT) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::am_short);
        m_send_size = ucs_min(m_send_size, sender().iface_attr().cap.am.max_short);
    }
    if (sender().iface_attr().cap.flags & UCT_IFACE_FLAG_AM_ZCOPY) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::am_zcopy);
        m_send_size = ucs_min(m_send_size, sender().iface_attr().cap.am.max_zcopy);
    }
    if (sender().iface_attr().cap.flags & UCT_IFACE_FLAG_PUT_SHORT) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::put_short);
        m_send_size = ucs_min(m_send_size, sender().iface_attr().cap.put.max_short);
    }
    if (sender().iface_attr().cap.flags & UCT_IFACE_FLAG_PUT_BCOPY) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::put_bcopy);
        m_send_size = ucs_min(m_send_size, sender().iface_attr().cap.put.max_bcopy);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_CSWAP)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::cswap64);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_ADD)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint64_t, UCT_ATOMIC_OP_ADD>);
    }
    if (sender().iface_attr().cap.atomic32.fop_flags & UCS_BIT(UCT_ATOMIC_OP_ADD)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint32_t, UCT_ATOMIC_OP_ADD>);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_AND)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint64_t, UCT_ATOMIC_OP_AND>);
    }
    if (sender().iface_attr().cap.atomic32.fop_flags & UCS_BIT(UCT_ATOMIC_OP_AND)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint32_t, UCT_ATOMIC_OP_AND>);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_OR)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint64_t, UCT_ATOMIC_OP_OR>);
    }
    if (sender().iface_attr().cap.atomic32.fop_flags & UCS_BIT(UCT_ATOMIC_OP_OR)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint32_t, UCT_ATOMIC_OP_OR>);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_XOR)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint64_t, UCT_ATOMIC_OP_XOR>);
    }
    if (sender().iface_attr().cap.atomic32.fop_flags & UCS_BIT(UCT_ATOMIC_OP_XOR)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint32_t, UCT_ATOMIC_OP_XOR>);
    }
    if (sender().iface_attr().cap.atomic64.fop_flags & UCS_BIT(UCT_ATOMIC_OP_SWAP)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint64_t, UCT_ATOMIC_OP_SWAP>);
    }
    if (sender().iface_attr().cap.atomic32.fop_flags & UCS_BIT(UCT_ATOMIC_OP_SWAP)) {
        m_avail_send_funcs.push_back(&uct_p2p_mix_test::atomic_fop<uint32_t, UCT_ATOMIC_OP_SWAP>);
    }
}

void uct_p2p_mix_test::cleanup() {
    while (am_pending) {
        progress();
    }
    uct_iface_set_am_handler(receiver().iface(), AM_ID, NULL, NULL, 0);
    uct_p2p_test::cleanup();
}

uint32_t uct_p2p_mix_test::am_pending = 0;

UCS_TEST_P(uct_p2p_mix_test, mix_10000) {
    run(10000);
}

UCT_INSTANTIATE_TEST_CASE(uct_p2p_mix_test)