Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
* Copyright (C) The University of Tennessee and The University
*               of Tennessee Research Foundation. 2016. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include <tools/perf/lib/libperf_int.h>

extern "C" {
#include <ucs/debug/log.h>
#include <ucs/sys/preprocessor.h>
#include <ucs/sys/math.h>
#include <ucs/sys/sys.h>
}

#include <limits>

template <ucx_perf_cmd_t CMD, ucx_perf_test_type_t TYPE, uct_perf_data_layout_t DATA, bool ONESIDED>
class uct_perf_test_runner {
public:

    typedef uint8_t psn_t;

    uct_perf_test_runner(ucx_perf_context_t &perf) :
        m_perf(perf),
        m_max_outstanding(m_perf.params.max_outstanding),
        m_send_b_count(0)

    {
        ucs_assert_always(m_max_outstanding > 0);

        m_completion.count = 1;
        m_completion.func  = NULL;
        m_last_recvd_sn    = 0;

        ucs_status_t status;
        uct_iface_attr_t attr;
        status = uct_iface_query(m_perf.uct.iface, &attr);
        ucs_assert_always(status == UCS_OK);
        if (attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT |
                              UCT_IFACE_FLAG_AM_BCOPY |
                              UCT_IFACE_FLAG_AM_ZCOPY)) {
            status = uct_iface_set_am_handler(m_perf.uct.iface,
                                              UCT_PERF_TEST_AM_ID, am_hander,
                                              (void*)&m_last_recvd_sn, 0);
            ucs_assert_always(status == UCS_OK);
        }
    }

    ~uct_perf_test_runner() {
        uct_iface_set_am_handler(m_perf.uct.iface, UCT_PERF_TEST_AM_ID, NULL,
                                 NULL, 0);
    }

    /**
     * Make uct_iov_t iov[msg_size_cnt] array with pointer elements to
     * original buffer
     */
    static void uct_perf_get_buffer_iov(uct_iov_t *iov, void *buffer,
                                        unsigned header_size, uct_mem_h memh,
                                        const ucx_perf_context_t *perf)
    {
        const size_t iovcnt    = perf->params.msg_size_cnt;
        size_t iov_length_it, iov_it;

        ucs_assert(UCT_PERF_DATA_LAYOUT_ZCOPY == DATA);
        ucs_assert(NULL != perf->params.msg_size_list);
        ucs_assert(iovcnt > 0);
        ucs_assert(perf->params.msg_size_list[0] >= header_size);

        iov_length_it = 0;
        for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
            iov[iov_it].buffer = (char *)buffer + iov_length_it + header_size;
            iov[iov_it].length = perf->params.msg_size_list[iov_it] - header_size;
            iov[iov_it].memh   = memh;
            iov[iov_it].stride = 0;
            iov[iov_it].count  = 1;

            if (perf->params.iov_stride) {
                iov_length_it += perf->params.iov_stride - header_size;
            } else {
                iov_length_it += iov[iov_it].length;
            }

            header_size        = 0; /* should be zero for next iterations */
        }

        ucs_debug("IOV buffer filled by %lu slices with total length %lu",
                  iovcnt, iov_length_it);
    }

    void uct_perf_test_prepare_iov_buffer() {
        if (UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) {
            size_t start_iov_buffer_size = 0;
            if (UCX_PERF_CMD_AM == CMD) {
                start_iov_buffer_size = m_perf.params.am_hdr_size;
            }
            uct_perf_get_buffer_iov(m_perf.uct.iov, m_perf.send_buffer,
                                    start_iov_buffer_size,
                                    m_perf.uct.send_mem.memh,
                                    &m_perf);
        }
    }

    /**
     * Get the length between beginning of the IOV first buffer and the latest byte
     * in the latest IOV buffer.
     */
    size_t uct_perf_get_buffer_extent(const ucx_perf_params_t *params)
    {
        size_t length;

        if ((UCT_PERF_DATA_LAYOUT_ZCOPY == DATA) && params->iov_stride) {
            length = ((params->msg_size_cnt - 1) * params->iov_stride) +
                     params->msg_size_list[params->msg_size_cnt - 1];
        } else {
            length = ucx_perf_get_message_size(params);
        }

        return length;
    }

    inline void set_sn(void *dst_sn,
                       ucs_memory_type_t dst_mem_type,
                       const void *src_sn) const {
        if (ucs_likely(m_perf.allocator->mem_type == UCS_MEMORY_TYPE_HOST)) {
            ucs_assert(dst_mem_type == UCS_MEMORY_TYPE_HOST);
            *reinterpret_cast<psn_t*>(dst_sn) = *reinterpret_cast<const psn_t*>(src_sn);
        }

        m_perf.allocator->memcpy(dst_sn, dst_mem_type,
                                 src_sn, UCS_MEMORY_TYPE_HOST,
                                 sizeof(psn_t));
    }

    inline psn_t get_sn(const volatile void *sn,
                        ucs_memory_type_t mem_type) const {
        if (ucs_likely(mem_type == UCS_MEMORY_TYPE_HOST)) {
            return *reinterpret_cast<const volatile psn_t*>(sn);
        }

        psn_t host_sn;
        m_perf.allocator->memcpy(&host_sn, UCS_MEMORY_TYPE_HOST,
                                 const_cast<const void*>(sn),
                                 mem_type, sizeof(psn_t));
        return host_sn;
    }

    inline void set_recv_sn(void *recv_sn,
                            ucs_memory_type_t recv_mem_type,
                            const void *src_sn) const {
        if (CMD == UCX_PERF_CMD_AM) {
            ucs_assert(&m_last_recvd_sn == recv_sn);
            *(psn_t*)recv_sn = *(const psn_t*)src_sn;
        } else {
            set_sn(recv_sn, recv_mem_type, src_sn);
        }
    }

    inline psn_t get_recv_sn(const volatile void *recv_sn,
                             ucs_memory_type_t recv_mem_type) const {
        if (CMD == UCX_PERF_CMD_AM) {
            /* it has to be updated after AM completion */
            ucs_assert(&m_last_recvd_sn == recv_sn);
            return m_last_recvd_sn;
        } else {
            return get_sn(recv_sn, recv_mem_type);
        }
    }

    void UCS_F_ALWAYS_INLINE progress_responder() {
        if (!ONESIDED) {
            uct_worker_progress(m_perf.uct.worker);
        }
    }

    void UCS_F_ALWAYS_INLINE progress_requestor() {
        uct_worker_progress(m_perf.uct.worker);
    }

    void UCS_F_ALWAYS_INLINE wait_for_window(bool send_window)
    {
        while (send_window && (outstanding() >= m_max_outstanding)) {
            progress_requestor();
        }
    }

    static ucs_status_t am_hander(void *arg, void *data, size_t length,
                                  unsigned flags)
    {
        /* we always assume that buffers provided by TLs are host memory */
        ucs_assert(UCS_CIRCULAR_COMPARE8(*(psn_t*)arg, <=, *(psn_t*)data));
        *(psn_t*)arg = *(psn_t*)data;
        return UCS_OK;
    }

    static size_t pack_cb(void *dest, void *arg)
    {
        uct_perf_test_runner *self = (uct_perf_test_runner *)arg;
        size_t length = ucx_perf_get_message_size(&self->m_perf.params);

        self->m_perf.allocator->memcpy(/* we always assume that buffers
                                        * provided by TLs are host memory */
                                       dest, UCS_MEMORY_TYPE_HOST,
                                       self->m_perf.send_buffer,
                                       self->m_perf.uct.send_mem.mem_type,
                                       length);

        return length;
    }

    static void unpack_cb(void *arg, const void *data, size_t length)
    {
        uct_perf_test_runner *self = (uct_perf_test_runner *)arg;

        self->m_perf.allocator->memcpy(self->m_perf.send_buffer,
                                       self->m_perf.uct.send_mem.mem_type,
                                       /* we always assume that buffers
                                        * provided by TLs are host memory */
                                       data, UCS_MEMORY_TYPE_HOST,
                                       length);
    }

    ucs_status_t UCS_F_ALWAYS_INLINE
    send(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
         uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
    {
        uint64_t am_short_hdr;
        size_t header_size;
        ssize_t packed_len;

        /* coverity[switch_selector_expr_is_constant] */
        switch (CMD) {
        case UCX_PERF_CMD_AM:
            /* coverity[switch_selector_expr_is_constant] */
            switch (DATA) {
            case UCT_PERF_DATA_LAYOUT_SHORT:
                am_short_hdr = sn;
                return uct_ep_am_short(ep, UCT_PERF_TEST_AM_ID, am_short_hdr,
                                       (char*)buffer + sizeof(am_short_hdr),
                                       length - sizeof(am_short_hdr));
            case UCT_PERF_DATA_LAYOUT_BCOPY:
                set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
                packed_len = uct_ep_am_bcopy(ep, UCT_PERF_TEST_AM_ID, pack_cb,
                                             (void*)this, 0);
                return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
            case UCT_PERF_DATA_LAYOUT_ZCOPY:
                set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
                header_size = m_perf.params.am_hdr_size;
                return uct_ep_am_zcopy(ep, UCT_PERF_TEST_AM_ID, buffer, header_size,
                                       m_perf.uct.iov, m_perf.params.msg_size_cnt,
                                       0, comp);
            default:
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_PUT:
            if (TYPE == UCX_PERF_TEST_TYPE_PINGPONG) {
                /* Put the control word at the latest byte of the IOV message */
                set_sn(UCS_PTR_BYTE_OFFSET(buffer,
                                           uct_perf_get_buffer_extent(&m_perf.params) - 1),
                       m_perf.uct.send_mem.mem_type, &sn);
            }
            /* coverity[switch_selector_expr_is_constant] */
            switch (DATA) {
            case UCT_PERF_DATA_LAYOUT_SHORT:
                return uct_ep_put_short(ep, buffer, length, remote_addr, rkey);
            case UCT_PERF_DATA_LAYOUT_BCOPY:
                packed_len = uct_ep_put_bcopy(ep, pack_cb, (void*)this, remote_addr, rkey);
                return (packed_len >= 0) ? UCS_OK : (ucs_status_t)packed_len;
            case UCT_PERF_DATA_LAYOUT_ZCOPY:
                return uct_ep_put_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
                                        remote_addr, rkey, comp);
            default:
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_GET:
            /* coverity[switch_selector_expr_is_constant] */
            switch (DATA) {
            case UCT_PERF_DATA_LAYOUT_BCOPY:
                return uct_ep_get_bcopy(ep, unpack_cb, (void*)this,
                                        length, remote_addr, rkey, comp);
            case UCT_PERF_DATA_LAYOUT_ZCOPY:
                return uct_ep_get_zcopy(ep, m_perf.uct.iov, m_perf.params.msg_size_cnt,
                                        remote_addr, rkey, comp);
            default:
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_ADD:
            if (length == sizeof(uint32_t)) {
                return uct_ep_atomic32_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
            } else if (length == sizeof(uint64_t)) {
                return uct_ep_atomic64_post(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn, remote_addr, rkey);
            } else {
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_FADD:
            if (length == sizeof(uint32_t)) {
                return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
                                             (uint32_t*)buffer, remote_addr, rkey, comp);
            } else if (length == sizeof(uint64_t)) {
                return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_ADD, sn - prev_sn,
                                             (uint64_t*)buffer, remote_addr, rkey, comp);
            } else {
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_SWAP:
            if (length == sizeof(uint32_t)) {
                return uct_ep_atomic32_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
                                             (uint32_t*)buffer, remote_addr, rkey, comp);
            } else if (length == sizeof(uint64_t)) {
                return uct_ep_atomic64_fetch(ep, UCT_ATOMIC_OP_SWAP, sn,
                                             (uint64_t*)buffer, remote_addr, rkey, comp);
            } else {
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_CMD_CSWAP:
            if (length == sizeof(uint32_t)) {
                return uct_ep_atomic_cswap32(ep, prev_sn, sn, remote_addr, rkey,
                                             (uint32_t*)buffer, comp);
            } else if (length == sizeof(uint64_t)) {
                return uct_ep_atomic_cswap64(ep, prev_sn, sn, remote_addr, rkey,
                                             (uint64_t*)buffer, comp);
            } else {
                return UCS_ERR_INVALID_PARAM;
            }
        default:
            return UCS_ERR_INVALID_PARAM;
        }
    }

    void UCS_F_ALWAYS_INLINE
    send_b(uct_ep_h ep, psn_t sn, psn_t prev_sn, void *buffer, unsigned length,
           uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp)
    {
        ucs_status_t status;
        for (;;) {
            status = send(ep, sn, prev_sn, buffer, length, remote_addr, rkey, comp);
            if (ucs_likely(status == UCS_OK)) {
                if ((m_send_b_count++ % N_SEND_B_PER_PROGRESS) == 0) {
                    progress_requestor();
                }
                return;
            } else if (status == UCS_INPROGRESS) {
                ++m_completion.count;
                progress_requestor();
                ucs_assert((comp == NULL) || (outstanding() <= m_max_outstanding));
                return;
            } else if (status == UCS_ERR_NO_RESOURCE) {
                progress_requestor();
                continue;
            } else {
                ucs_error("Failed to send: %s", ucs_status_string(status));
                return;
            }
        };
    }

    ucs_status_t run_pingpong()
    {
        psn_t send_sn, *recv_sn, sn;
        unsigned my_index;
        uct_ep_h ep;
        uint64_t remote_addr;
        uct_rkey_t rkey;
        void *buffer;
        size_t length;

        length = ucx_perf_get_message_size(&m_perf.params);
        ucs_assert(length >= sizeof(psn_t));

        /* coverity[switch_selector_expr_is_constant] */
        switch (CMD) {
        case UCX_PERF_CMD_AM:
            recv_sn = &m_last_recvd_sn;
            break;
        case UCX_PERF_CMD_ADD:
            recv_sn = (psn_t*)m_perf.recv_buffer;
            break;
        case UCX_PERF_CMD_PUT:
            /* since polling on data, must be end of the buffer */
            recv_sn = (psn_t*)m_perf.recv_buffer + length - 1;
            break;
        default:
            ucs_error("Cannot run this test in ping-pong mode");
            return UCS_ERR_INVALID_PARAM;
        }

        uct_perf_test_prepare_iov_buffer();

        sn = std::numeric_limits<uint8_t>::max();
        set_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type, &sn);

        uct_perf_barrier(&m_perf);

        my_index = rte_call(&m_perf, group_index);

        ucx_perf_test_start_clock(&m_perf);

        buffer      = m_perf.send_buffer;
        remote_addr = m_perf.uct.peers[1 - my_index].remote_addr + m_perf.offset;
        rkey        = m_perf.uct.peers[1 - my_index].rkey.rkey;
        ep          = m_perf.uct.peers[1 - my_index].ep;

        send_sn = 0;
        if (my_index == 0) {
            UCX_PERF_TEST_FOREACH(&m_perf) {
                send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
                       rkey, NULL);
                ucx_perf_update(&m_perf, 1, length);

                do {
                    progress_responder();
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                } while (sn != send_sn);

                ++send_sn;
            }
        } else if (my_index == 1) {
            UCX_PERF_TEST_FOREACH(&m_perf) {
                do {
                    progress_responder();
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                } while (sn != send_sn);

                send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
                       rkey, NULL);
                ucx_perf_update(&m_perf, 1, length);
                ++send_sn;
            }
        }

        uct_perf_iface_flush_b(&m_perf);
        ucx_perf_get_time(&m_perf);
        return UCS_OK;
    }

    ucs_status_t run_stream_req_uni(bool flow_control, bool send_window,
                                    bool direction_to_responder)
    {
        unsigned long remote_addr;
        volatile psn_t *recv_sn;
        psn_t sn, send_sn;
        uct_rkey_t rkey;
        void *buffer;
        unsigned fc_window;
        unsigned my_index;
        unsigned length;
        uct_ep_h ep;

        length = ucx_perf_get_message_size(&m_perf.params);
        ucs_assert(length >= sizeof(psn_t));
        ucs_assert(m_perf.params.uct.fc_window <= ((psn_t)-1) / 2);

        m_perf.allocator->memset(m_perf.send_buffer, 0, length);
        m_perf.allocator->memset(m_perf.recv_buffer, 0, length);

        uct_perf_test_prepare_iov_buffer();

        recv_sn  = (direction_to_responder ?
                    ((CMD == UCX_PERF_CMD_AM) ?
                     &m_last_recvd_sn :
                     (psn_t*)m_perf.recv_buffer) :
                    (psn_t*)m_perf.send_buffer);
        my_index = rte_call(&m_perf, group_index);

        uct_perf_barrier(&m_perf);

        ucx_perf_test_start_clock(&m_perf);

        ep          = m_perf.uct.peers[1 - my_index].ep;
        buffer      = m_perf.send_buffer;
        remote_addr = m_perf.uct.peers[1 - my_index].remote_addr + m_perf.offset;
        rkey        = m_perf.uct.peers[1 - my_index].rkey.rkey;
        fc_window   = m_perf.params.uct.fc_window;

        if (my_index == 1) {
            /* send_sn is the next SN to send */
            if (flow_control) {
                send_sn     = 1;
            } else{
                send_sn     = 0; /* Remote buffer will remain 0 throughout the test */
            }

            set_sn(buffer, m_perf.uct.send_mem.mem_type, &send_sn);

            UCX_PERF_TEST_FOREACH(&m_perf) {
                if (flow_control) {
                    /* Wait until getting ACK from responder */
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                    ucs_assertv(UCS_CIRCULAR_COMPARE8(send_sn - 1, >=, sn),
                                "recv_sn=%d iters=%ld", sn, m_perf.current.iters);

                    while (UCS_CIRCULAR_COMPARE8(send_sn, >, sn + fc_window)) {
                        progress_responder();
                        sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                    }
                }

                /* Wait until we have enough sends completed, then take
                 * the next completion handle in the window. */
                wait_for_window(send_window);

                if (flow_control) {
                    send_b(ep, send_sn, send_sn - 1, buffer, length, remote_addr,
                           rkey, &m_completion);
                    ++send_sn;
                } else {
                    send_b(ep, send_sn, send_sn, buffer, length, remote_addr,
                           rkey, &m_completion);
                }

                ucx_perf_update(&m_perf, 1, length);
            }

            if (!flow_control) {
                sn = 2;
                /* Send "sentinel" value */
                if (direction_to_responder) {
                    wait_for_window(send_window);
                    set_sn(buffer, m_perf.uct.send_mem.mem_type, &sn);
                    send_b(ep, 2, send_sn, buffer, length, remote_addr, rkey,
                           &m_completion);
                } else {
                    set_sn(m_perf.recv_buffer,
                           m_perf.uct.recv_mem.mem_type,
                           &sn);
                }
            } else {
                /* Wait for last ACK, to make sure no more messages will arrive. */
                ucs_assert(direction_to_responder);

                do {
                    progress_responder();
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                } while (UCS_CIRCULAR_COMPARE8((psn_t)(send_sn - 1), >, sn));
            }
        } else if (my_index == 0) {
            if (flow_control) {
                /* Since we're doing flow control, we can count exactly how
                 * many packets were received.
                 */
                send_sn = (psn_t)-1; /* Last SN we have sent (as acknowledgment) */
                ucs_assert(direction_to_responder);
                UCX_PERF_TEST_FOREACH(&m_perf) {
                    progress_responder();
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);

                    if (UCS_CIRCULAR_COMPARE8(sn, >, (psn_t)(send_sn + (fc_window / 2)))) {
                        /* Send ACK every half-window */
                        wait_for_window(send_window);
                        send_b(ep, sn, send_sn, buffer, length, remote_addr,
                               rkey, &m_completion);
                        send_sn = sn;
                    }

                    /* Calculate number of iterations */
                    m_perf.current.iters +=
                                    (psn_t)(sn - (psn_t)m_perf.current.iters);
                }

                /* Send ACK for last packet */
                sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                if (UCS_CIRCULAR_COMPARE8(sn, >, send_sn)) {
                    wait_for_window(send_window);
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                    send_b(ep, sn, send_sn, buffer, length, remote_addr,
                           rkey, &m_completion);
                }
            } else {
                /* Wait for "sentinel" value */
                ucs_time_t poll_time = ucs_get_time();

                do {
                    progress_responder();
                    sn = get_recv_sn(recv_sn, m_perf.uct.recv_mem.mem_type);
                    if (!direction_to_responder) {
                        if (ucs_get_time() > poll_time + ucs_time_from_msec(1.0)) {
                            wait_for_window(send_window);
                            send_b(ep, 0, 0, buffer, length, remote_addr, rkey,
                                   &m_completion);
                            poll_time = ucs_get_time();
                        }
                    }
                } while (sn != 2);
            }
        }

        uct_perf_iface_flush_b(&m_perf);
        ucx_perf_get_time(&m_perf);
        ucs_assert(outstanding() == 0);
        if (my_index == 1) {
            ucx_perf_update(&m_perf, 0, 0);
        }

        return UCS_OK;
    }

    ucs_status_t run()
    {
        bool zcopy = (DATA == UCT_PERF_DATA_LAYOUT_ZCOPY);

        /* coverity[switch_selector_expr_is_constant] */
        switch (TYPE) {
        case UCX_PERF_TEST_TYPE_PINGPONG:
            return run_pingpong();
        case UCX_PERF_TEST_TYPE_STREAM_UNI:
            /* coverity[switch_selector_expr_is_constant] */
            switch (CMD) {
            case UCX_PERF_CMD_PUT:
                return run_stream_req_uni(false, /* No need for flow control for RMA */
                                          zcopy, /* ZCOPY can return INPROGRESS */
                                          true /* data goes to responder */);
            case UCX_PERF_CMD_ADD:
                return run_stream_req_uni(false, /* No need for flow control for RMA */
                                          false, /* This atomic does not wait for reply */
                                          true /* Data goes to responder */);
            case UCX_PERF_CMD_AM:
                return run_stream_req_uni(true, /* Need flow control for active messages,
                                                   because they are handled in SW */
                                          zcopy, /* ZCOPY can return INPROGRESS */
                                          true /* data goes to responder */);
            case UCX_PERF_CMD_GET:
                return run_stream_req_uni(false, /* No flow control for RMA/AMO */
                                          true, /* Waiting for replies */
                                          false /* For GET, data is delivered to requester */ );
            case UCX_PERF_CMD_FADD:
            case UCX_PERF_CMD_SWAP:
            case UCX_PERF_CMD_CSWAP:
                return run_stream_req_uni(false, /* No flow control for RMA/AMO */
                                          true, /* Waiting for replies */
                                          true /* For atomics, data goes both ways, but
                                                     the request is easier to predict */ );
            default:
                return UCS_ERR_INVALID_PARAM;
            }
        case UCX_PERF_TEST_TYPE_STREAM_BI:
        default:
            return UCS_ERR_INVALID_PARAM;
        }
    }

private:
    inline unsigned outstanding() {
        return m_completion.count - 1;
    }

    ucx_perf_context_t &m_perf;
    const unsigned     m_max_outstanding;
    uct_completion_t   m_completion;
    int                m_send_b_count;
    /* this is only valid for UCT AM tests */
    psn_t              m_last_recvd_sn;
    const static int   N_SEND_B_PER_PROGRESS = 16;
};


#define TEST_CASE(_perf, _cmd, _type, _data, _onesided) \
    if (((_perf)->params.command == (_cmd)) && \
        ((_perf)->params.test_type == (_type)) && \
        ((_perf)->params.uct.data_layout == (_data)) && \
        (!!((_perf)->params.flags & UCX_PERF_TEST_FLAG_ONE_SIDED) == !!(_onesided))) \
    { \
        uct_perf_test_runner<_cmd, _type, _data, _onesided> r(*_perf); \
        return r.run(); \
    }
#define TEST_CASE_ALL_OSD(_perf, _case, _data) \
   TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, true) \
   TEST_CASE(_perf, UCS_PP_TUPLE_0 _case, UCS_PP_TUPLE_1 _case, _data, false)
#define TEST_CASE_ALL_DATA(_perf, _case) \
   TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_SHORT) \
   TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_BCOPY) \
   TEST_CASE_ALL_OSD(_perf, _case, UCT_PERF_DATA_LAYOUT_ZCOPY)

ucs_status_t uct_perf_test_dispatch(ucx_perf_context_t *perf)
{
    UCS_PP_FOREACH(TEST_CASE_ALL_DATA, perf,
        (UCX_PERF_CMD_AM,  UCX_PERF_TEST_TYPE_PINGPONG),
        (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_PINGPONG),
        (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_PINGPONG),
        (UCX_PERF_CMD_AM,  UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_PUT, UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_GET, UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_ADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_FADD, UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_SWAP, UCX_PERF_TEST_TYPE_STREAM_UNI),
        (UCX_PERF_CMD_CSWAP, UCX_PERF_TEST_TYPE_STREAM_UNI)
        );

    ucs_error("Invalid test case");
    return UCS_ERR_INVALID_PARAM;
}