Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
* Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
* Copyright (C) The University of Tennessee and The University
*               of Tennessee Research Foundation. 2015-2016. ALL RIGHTS RESERVED.
* Copyright (C) ARM Ltd. 2017.  ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include <ucs/debug/log.h>
#include <ucs/arch/bitops.h>
#include <ucs/sys/module.h>
#include <ucs/sys/string.h>
#include <string.h>
#include <tools/perf/lib/libperf_int.h>
#include <unistd.h>

#if _OPENMP
#include <omp.h>
#endif /* _OPENMP */

#define ATOMIC_OP_CONFIG(_size, _op32, _op64, _op, _msg, _params, _status)        \
    _status = __get_atomic_flag((_size), (_op32), (_op64), (_op));                \
    if (_status != UCS_OK) {                                                      \
        ucs_error("%s/%s does not support atomic %s for message size %zu bytes",  \
                  (_params)->uct.tl_name, (_params)->uct.dev_name,                \
                  (_msg)[_op], (_size));                                          \
        return _status;                                                           \
    }

#define ATOMIC_OP_CHECK(_size, _attr, _required, _params, _msg)                   \
    if (!ucs_test_all_flags(_attr, _required)) {                                  \
        if ((_params)->flags & UCX_PERF_TEST_FLAG_VERBOSE) {                      \
            ucs_error("%s/%s does not support required "#_size"-bit atomic: %s",  \
                      (_params)->uct.tl_name, (_params)->uct.dev_name,            \
                      (_msg)[ucs_ffs64(~(_attr) & (_required))]);                 \
        }                                                                         \
        return UCS_ERR_UNSUPPORTED;                                               \
    }

typedef struct {
    union {
        struct {
            size_t     dev_addr_len;
            size_t     iface_addr_len;
            size_t     ep_addr_len;
        } uct;
        struct {
            size_t     addr_len;
        } ucp;
    };
    size_t             rkey_size;
    unsigned long      recv_buffer;
} ucx_perf_ep_info_t;


const ucx_perf_allocator_t* ucx_perf_mem_type_allocators[UCS_MEMORY_TYPE_LAST];

static const char *perf_iface_ops[] = {
    [ucs_ilog2(UCT_IFACE_FLAG_AM_SHORT)]         = "am short",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_BCOPY)]         = "am bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_ZCOPY)]         = "am zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_SHORT)]        = "put short",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_BCOPY)]        = "put bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_PUT_ZCOPY)]        = "put zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_SHORT)]        = "get short",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_BCOPY)]        = "get bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_GET_ZCOPY)]        = "get zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)] = "peer failure handler",
    [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_IFACE)] = "connect to iface",
    [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_EP)]    = "connect to ep",
    [ucs_ilog2(UCT_IFACE_FLAG_AM_DUP)]           = "full reliability",
    [ucs_ilog2(UCT_IFACE_FLAG_CB_SYNC)]          = "sync callback",
    [ucs_ilog2(UCT_IFACE_FLAG_CB_ASYNC)]         = "async callback",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_SEND_COMP)]  = "send completion event",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV)]       = "tag or active message event",
    [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV_SIG)]   = "signaled message event",
    [ucs_ilog2(UCT_IFACE_FLAG_PENDING)]          = "pending",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_SHORT)]  = "tag eager short",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)]  = "tag eager bcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY)]  = "tag eager zcopy",
    [ucs_ilog2(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)]   = "tag rndv zcopy"
};

static const char *perf_atomic_op[] = {
     [UCT_ATOMIC_OP_ADD]   = "add",
     [UCT_ATOMIC_OP_AND]   = "and",
     [UCT_ATOMIC_OP_OR]    = "or" ,
     [UCT_ATOMIC_OP_XOR]   = "xor"
};

static const char *perf_atomic_fop[] = {
     [UCT_ATOMIC_OP_ADD]   = "fetch-add",
     [UCT_ATOMIC_OP_AND]   = "fetch-and",
     [UCT_ATOMIC_OP_OR]    = "fetch-or",
     [UCT_ATOMIC_OP_XOR]   = "fetch-xor",
     [UCT_ATOMIC_OP_SWAP]  = "swap",
     [UCT_ATOMIC_OP_CSWAP] = "cswap"
};

/*
 *  This Quickselect routine is based on the algorithm described in
 *  "Numerical recipes in C", Second Edition,
 *  Cambridge University Press, 1992, Section 8.5, ISBN 0-521-43108-5
 *  This code by Nicolas Devillard - 1998. Public domain.
 */
static ucs_time_t __find_median_quick_select(ucs_time_t arr[], int n)
{
    int low, high ;
    int median;
    int middle, ll, hh;

#define ELEM_SWAP(a,b) { register ucs_time_t t=(a);(a)=(b);(b)=t; }

    low = 0 ; high = n-1 ; median = (low + high) / 2;
    for (;;) {
        if (high <= low) /* One element only */
            return arr[median] ;

        if (high == low + 1) {  /* Two elements only */
            if (arr[low] > arr[high])
                ELEM_SWAP(arr[low], arr[high]) ;
            return arr[median] ;
        }

        /* Find median of low, middle and high items; swap into position low */
        middle = (low + high) / 2;
        if (arr[middle] > arr[high])    ELEM_SWAP(arr[middle], arr[high]) ;
        if (arr[low] > arr[high])       ELEM_SWAP(arr[low], arr[high]) ;
        if (arr[middle] > arr[low])     ELEM_SWAP(arr[middle], arr[low]) ;

        /* Swap low item (now in position middle) into position (low+1) */
        ELEM_SWAP(arr[middle], arr[low+1]) ;

        /* Nibble from each end towards middle, swapping items when stuck */
        ll = low + 1;
        hh = high;
        for (;;) {
            do ll++; while (arr[low] > arr[ll]) ;
            do hh--; while (arr[hh]  > arr[low]) ;

            if (hh < ll)
                break;

            ELEM_SWAP(arr[ll], arr[hh]) ;
        }

        /* Swap middle item (in position low) back into correct position */
        ELEM_SWAP(arr[low], arr[hh]) ;

        /* Re-set active partition */
        if (hh <= median)
            low = ll;
        if (hh >= median)
            high = hh - 1;
    }
}

static ucs_status_t
uct_perf_test_alloc_host(const ucx_perf_context_t *perf, size_t length,
                         unsigned flags, uct_allocated_memory_t *alloc_mem)
{
    ucs_status_t status;

    status = uct_iface_mem_alloc(perf->uct.iface, length,
                                 flags, "perftest", alloc_mem);
    if (status != UCS_OK) {
        ucs_free(alloc_mem);
        ucs_error("failed to allocate memory: %s", ucs_status_string(status));
        return status;
    }

    ucs_assert(alloc_mem->md == perf->uct.md);

    return UCS_OK;
}

static void uct_perf_test_free_host(const ucx_perf_context_t *perf,
                                    uct_allocated_memory_t *alloc_mem)
{
    uct_iface_mem_free(alloc_mem);
}

static void ucx_perf_test_memcpy_host(void *dst, ucs_memory_type_t dst_mem_type,
                                      const void *src, ucs_memory_type_t src_mem_type,
                                      size_t count)
{
    if ((dst_mem_type != UCS_MEMORY_TYPE_HOST) ||
        (src_mem_type != UCS_MEMORY_TYPE_HOST)) {
        ucs_error("wrong memory type passed src - %d, dst - %d",
                  src_mem_type, dst_mem_type);
    } else {
        memcpy(dst, src, count);
    }
}

static ucs_status_t uct_perf_test_alloc_mem(ucx_perf_context_t *perf)
{
    ucx_perf_params_t *params = &perf->params;
    ucs_status_t status;
    unsigned flags;
    size_t buffer_size;

    if ((UCT_PERF_DATA_LAYOUT_ZCOPY == params->uct.data_layout) && params->iov_stride) {
        buffer_size = params->msg_size_cnt * params->iov_stride;
    } else {
        buffer_size = ucx_perf_get_message_size(params);
    }

    /* TODO use params->alignment  */

    flags = (params->flags & UCX_PERF_TEST_FLAG_MAP_NONBLOCK) ?
             UCT_MD_MEM_FLAG_NONBLOCK : 0;
    flags |= UCT_MD_MEM_ACCESS_ALL;

    /* Allocate send buffer memory */
    status = perf->allocator->uct_alloc(perf, buffer_size * params->thread_count,
                                        flags, &perf->uct.send_mem);
    if (status != UCS_OK) {
        goto err;
    }

    perf->send_buffer = perf->uct.send_mem.address;

    /* Allocate receive buffer memory */
    status = perf->allocator->uct_alloc(perf, buffer_size * params->thread_count,
                                        flags, &perf->uct.recv_mem);
    if (status != UCS_OK) {
        goto err_free_send;
    }

    perf->recv_buffer = perf->uct.recv_mem.address;

    /* Allocate IOV datatype memory */
    perf->params.msg_size_cnt = params->msg_size_cnt;
    perf->uct.iov             = malloc(sizeof(*perf->uct.iov) *
                                       perf->params.msg_size_cnt *
                                       params->thread_count);
    if (NULL == perf->uct.iov) {
        status = UCS_ERR_NO_MEMORY;
        ucs_error("Failed allocate send IOV(%lu) buffer: %s",
                  perf->params.msg_size_cnt, ucs_status_string(status));
        goto err_free_recv;
    }

    perf->offset = 0;

    ucs_debug("allocated memory. Send buffer %p, Recv buffer %p",
              perf->send_buffer, perf->recv_buffer);
    return UCS_OK;

err_free_recv:
    perf->allocator->uct_free(perf, &perf->uct.recv_mem);
err_free_send:
    perf->allocator->uct_free(perf, &perf->uct.send_mem);
err:
    return status;
}

static void uct_perf_test_free_mem(ucx_perf_context_t *perf)
{
    perf->allocator->uct_free(perf, &perf->uct.send_mem);
    perf->allocator->uct_free(perf, &perf->uct.recv_mem);
    free(perf->uct.iov);
}

void ucx_perf_test_start_clock(ucx_perf_context_t *perf)
{
    ucs_time_t start_time = ucs_get_time();

    perf->start_time_acc   = ucs_get_accurate_time();
    perf->end_time         = (perf->params.max_time == 0.0) ? UINT64_MAX :
                              ucs_time_from_sec(perf->params.max_time) + start_time;
    perf->prev_time        = start_time;
    perf->prev.time        = start_time;
    perf->prev.time_acc    = perf->start_time_acc;
    perf->current.time_acc = perf->start_time_acc;
}

/* Initialize/reset all parameters that could be modified by the warm-up run */
static void ucx_perf_test_prepare_new_run(ucx_perf_context_t *perf,
                                          ucx_perf_params_t *params)
{
    unsigned i;

    perf->max_iter          = (perf->params.max_iter == 0) ? UINT64_MAX :
                               perf->params.max_iter;
    perf->report_interval   = ucs_time_from_sec(perf->params.report_interval);
    perf->current.time      = 0;
    perf->current.msgs      = 0;
    perf->current.bytes     = 0;
    perf->current.iters     = 0;
    perf->prev.msgs         = 0;
    perf->prev.bytes        = 0;
    perf->prev.iters        = 0;
    perf->timing_queue_head = 0;

    for (i = 0; i < TIMING_QUEUE_SIZE; ++i) {
        perf->timing_queue[i] = 0;
    }
    ucx_perf_test_start_clock(perf);
}

static void ucx_perf_test_init(ucx_perf_context_t *perf,
                               ucx_perf_params_t *params)
{
    unsigned group_index;

    perf->params = *params;
    perf->offset = 0;
    group_index  = rte_call(perf, group_index);

    if (0 == group_index) {
        perf->allocator = ucx_perf_mem_type_allocators[params->send_mem_type];
    } else {
        perf->allocator = ucx_perf_mem_type_allocators[params->recv_mem_type];
    }

    ucx_perf_test_prepare_new_run(perf, params);
}

void ucx_perf_calc_result(ucx_perf_context_t *perf, ucx_perf_result_t *result)
{
    ucs_time_t median;
    double factor;

    if (perf->params.test_type == UCX_PERF_TEST_TYPE_PINGPONG) {
        factor = 2.0;
    } else {
        factor = 1.0;
    }

    result->iters = perf->current.iters;
    result->bytes = perf->current.bytes;
    result->elapsed_time = perf->current.time_acc - perf->start_time_acc;

    /* Latency */
    median = __find_median_quick_select(perf->timing_queue, TIMING_QUEUE_SIZE);
    result->latency.typical = ucs_time_to_sec(median) / factor;

    result->latency.moment_average =
        (perf->current.time_acc - perf->prev.time_acc)
        / (perf->current.iters - perf->prev.iters)
        / factor;

    result->latency.total_average =
        (perf->current.time_acc - perf->start_time_acc)
        / perf->current.iters
        / factor;


    /* Bandwidth */

    result->bandwidth.typical = 0.0; // Undefined

    result->bandwidth.moment_average =
        (perf->current.bytes - perf->prev.bytes) /
        (perf->current.time_acc - perf->prev.time_acc) * factor;

    result->bandwidth.total_average =
        perf->current.bytes /
        (perf->current.time_acc - perf->start_time_acc) * factor;


    /* Packet rate */

    result->msgrate.typical = 0.0; // Undefined

    result->msgrate.moment_average =
        (perf->current.msgs - perf->prev.msgs) /
        (perf->current.time_acc - perf->prev.time_acc) * factor;

    result->msgrate.total_average =
        perf->current.msgs /
        (perf->current.time_acc - perf->start_time_acc) * factor;

}

static ucs_status_t ucx_perf_test_check_params(ucx_perf_params_t *params)
{
    size_t it;

    /* check if zero-size messages are requested and supported */
    if ((/* they are not supported by: */
         /* - UCT tests, except UCT AM Short/Bcopy */
         (params->api == UCX_PERF_API_UCT) ||
         (/* - UCP RMA and AMO tests */
          (params->api == UCX_PERF_API_UCP) &&
          (params->command != UCX_PERF_CMD_AM) &&
          (params->command != UCX_PERF_CMD_TAG) &&
          (params->command != UCX_PERF_CMD_TAG_SYNC) &&
          (params->command != UCX_PERF_CMD_STREAM))) &&
        ucx_perf_get_message_size(params) < 1) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Message size too small, need to be at least 1");
        }
        return UCS_ERR_INVALID_PARAM;
    }

    if ((params->api == UCX_PERF_API_UCP) &&
        ((params->send_mem_type != UCS_MEMORY_TYPE_HOST) ||
         (params->recv_mem_type != UCS_MEMORY_TYPE_HOST)) &&
        ((params->command == UCX_PERF_CMD_PUT) ||
         (params->command == UCX_PERF_CMD_GET) ||
         (params->command == UCX_PERF_CMD_ADD) ||
         (params->command == UCX_PERF_CMD_FADD) ||
         (params->command == UCX_PERF_CMD_SWAP) ||
         (params->command == UCX_PERF_CMD_CSWAP))) {
        /* TODO: remove when support for non-HOST memory types will be added */
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("UCP doesn't support RMA/AMO for \"%s\"<->\"%s\" memory types",
                      ucs_memory_type_names[params->send_mem_type],
                      ucs_memory_type_names[params->recv_mem_type]);
        }
        return UCS_ERR_INVALID_PARAM;
    }

    if (params->max_outstanding < 1) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("max_outstanding, need to be at least 1");
        }
        return UCS_ERR_INVALID_PARAM;
    }

    /* check if particular message size fit into stride size */
    if (params->iov_stride) {
        for (it = 0; it < params->msg_size_cnt; ++it) {
            if (params->msg_size_list[it] > params->iov_stride) {
                if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                    ucs_error("Buffer size %lu bigger than stride %lu",
                              params->msg_size_list[it], params->iov_stride);
                }
                return UCS_ERR_INVALID_PARAM;
            }
        }
    }

    return UCS_OK;
}

void uct_perf_iface_flush_b(ucx_perf_context_t *perf)
{
    ucs_status_t status;

    do {
        status = uct_iface_flush(perf->uct.iface, 0, NULL);
        uct_worker_progress(perf->uct.worker);
    } while (status == UCS_INPROGRESS);
}

static inline uint64_t __get_flag(uct_perf_data_layout_t layout, uint64_t short_f,
                                  uint64_t bcopy_f, uint64_t zcopy_f)
{
    return (layout == UCT_PERF_DATA_LAYOUT_SHORT) ? short_f :
           (layout == UCT_PERF_DATA_LAYOUT_BCOPY) ? bcopy_f :
           (layout == UCT_PERF_DATA_LAYOUT_ZCOPY) ? zcopy_f :
           0;
}

static inline ucs_status_t __get_atomic_flag(size_t size, uint64_t *op32,
                                             uint64_t *op64, uint64_t op)
{
    if (size == sizeof(uint32_t)) {
        *op32 = UCS_BIT(op);
        return UCS_OK;
    } else if (size == sizeof(uint64_t)) {
        *op64 = UCS_BIT(op);
        return UCS_OK;
    }
    return UCS_ERR_UNSUPPORTED;
}

static inline size_t __get_max_size(uct_perf_data_layout_t layout, size_t short_m,
                                    size_t bcopy_m, uint64_t zcopy_m)
{
    return (layout == UCT_PERF_DATA_LAYOUT_SHORT) ? short_m :
           (layout == UCT_PERF_DATA_LAYOUT_BCOPY) ? bcopy_m :
           (layout == UCT_PERF_DATA_LAYOUT_ZCOPY) ? zcopy_m :
           0;
}

static ucs_status_t uct_perf_test_check_md_support(ucx_perf_params_t *params,
                                                   ucs_memory_type_t mem_type,
                                                   uct_md_attr_t *md_attr)
{
    if (!(md_attr->cap.access_mem_type == mem_type) &&
        !(md_attr->cap.reg_mem_types & UCS_BIT(mem_type))) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Unsupported memory type %s by %s/%s",
                      ucs_memory_type_names[mem_type],
                      params->uct.tl_name, params->uct.dev_name);
            return UCS_ERR_INVALID_PARAM;
        }
    }
    return UCS_OK;
}

static ucs_status_t uct_perf_test_check_capabilities(ucx_perf_params_t *params,
                                                     uct_iface_h iface, uct_md_h md)
{
    uint64_t required_flags = 0;
    uint64_t atomic_op32    = 0;
    uint64_t atomic_op64    = 0;
    uint64_t atomic_fop32   = 0;
    uint64_t atomic_fop64   = 0;
    uct_md_attr_t md_attr;
    uct_iface_attr_t attr;
    ucs_status_t status;
    size_t min_size, max_size, max_iov, message_size;

    status = uct_md_query(md, &md_attr);
    if (status != UCS_OK) {
        ucs_error("uct_md_query(%s) failed: %s",
                  params->uct.md_name, ucs_status_string(status));
        return status;
    }

    status = uct_iface_query(iface, &attr);
    if (status != UCS_OK) {
        ucs_error("uct_iface_query(%s/%s) failed: %s",
                  params->uct.tl_name, params->uct.dev_name,
                  ucs_status_string(status));
        return status;
    }

    min_size = 0;
    max_iov  = 1;
    message_size = ucx_perf_get_message_size(params);
    switch (params->command) {
    case UCX_PERF_CMD_AM:
        required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_AM_SHORT,
                                    UCT_IFACE_FLAG_AM_BCOPY, UCT_IFACE_FLAG_AM_ZCOPY);
        required_flags |= UCT_IFACE_FLAG_CB_SYNC;
        min_size = __get_max_size(params->uct.data_layout, 0, 0,
                                  attr.cap.am.min_zcopy);
        max_size = __get_max_size(params->uct.data_layout, attr.cap.am.max_short,
                                  attr.cap.am.max_bcopy, attr.cap.am.max_zcopy);
        max_iov  = attr.cap.am.max_iov;
        break;
    case UCX_PERF_CMD_PUT:
        required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_PUT_SHORT,
                                    UCT_IFACE_FLAG_PUT_BCOPY, UCT_IFACE_FLAG_PUT_ZCOPY);
        min_size = __get_max_size(params->uct.data_layout, 0, 0,
                                  attr.cap.put.min_zcopy);
        max_size = __get_max_size(params->uct.data_layout, attr.cap.put.max_short,
                                  attr.cap.put.max_bcopy, attr.cap.put.max_zcopy);
        max_iov  = attr.cap.put.max_iov;
        break;
    case UCX_PERF_CMD_GET:
        required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_GET_SHORT,
                                    UCT_IFACE_FLAG_GET_BCOPY, UCT_IFACE_FLAG_GET_ZCOPY);
        min_size = __get_max_size(params->uct.data_layout, 0, 0,
                                  attr.cap.get.min_zcopy);
        max_size = __get_max_size(params->uct.data_layout, attr.cap.get.max_short,
                                  attr.cap.get.max_bcopy, attr.cap.get.max_zcopy);
        max_iov  = attr.cap.get.max_iov;
        break;
    case UCX_PERF_CMD_ADD:
        ATOMIC_OP_CONFIG(message_size, &atomic_op32, &atomic_op64, UCT_ATOMIC_OP_ADD,
                         perf_atomic_op, params, status);
        max_size = 8;
        break;
    case UCX_PERF_CMD_FADD:
        ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_ADD,
                         perf_atomic_fop, params, status);
        max_size = 8;
        break;
    case UCX_PERF_CMD_SWAP:
        ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_SWAP,
                         perf_atomic_fop, params, status);
        max_size = 8;
        break;
    case UCX_PERF_CMD_CSWAP:
        ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_CSWAP,
                         perf_atomic_fop, params, status);
        max_size = 8;
        break;
    default:
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Invalid test command");
        }
        return UCS_ERR_INVALID_PARAM;
    }

    status = ucx_perf_test_check_params(params);
    if (status != UCS_OK) {
        return status;
    }

    /* check atomics first */
    ATOMIC_OP_CHECK(32, attr.cap.atomic32.op_flags, atomic_op32, params, perf_atomic_op);
    ATOMIC_OP_CHECK(64, attr.cap.atomic64.op_flags, atomic_op64, params, perf_atomic_op);
    ATOMIC_OP_CHECK(32, attr.cap.atomic32.fop_flags, atomic_fop32, params, perf_atomic_fop);
    ATOMIC_OP_CHECK(64, attr.cap.atomic64.fop_flags, atomic_fop64, params, perf_atomic_fop);

    /* check iface flags */
    if (!(atomic_op32 | atomic_op64 | atomic_fop32 | atomic_fop64) &&
        (!ucs_test_all_flags(attr.cap.flags, required_flags) || !required_flags)) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("%s/%s does not support operation %s",
                      params->uct.tl_name, params->uct.dev_name,
                      perf_iface_ops[ucs_ffs64(~attr.cap.flags & required_flags)]);
        }
        return UCS_ERR_UNSUPPORTED;
    }

    if (message_size < min_size) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Message size (%zu) is smaller than min supported (%zu)",
                      message_size, min_size);
        }
        return UCS_ERR_UNSUPPORTED;
    }

    if (message_size > max_size) {
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Message size (%zu) is larger than max supported (%zu)",
                      message_size, max_size);
        }
        return UCS_ERR_UNSUPPORTED;
    }

    if (params->command == UCX_PERF_CMD_AM) {
        if ((params->uct.data_layout == UCT_PERF_DATA_LAYOUT_SHORT) &&
            (params->am_hdr_size != sizeof(uint64_t)))
        {
            if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("Short AM header size must be 8 bytes");
            }
            return UCS_ERR_INVALID_PARAM;
        }

        if ((params->uct.data_layout == UCT_PERF_DATA_LAYOUT_ZCOPY) &&
            (params->am_hdr_size > attr.cap.am.max_hdr))
        {
            if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("AM header size (%zu) is larger than max supported (%zu)",
                          params->am_hdr_size, attr.cap.am.max_hdr);
            }
            return UCS_ERR_UNSUPPORTED;
        }

        if (params->am_hdr_size > message_size) {
            if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("AM header size (%zu) is larger than message size (%zu)",
                          params->am_hdr_size, message_size);
            }
            return UCS_ERR_INVALID_PARAM;
        }

        if (params->uct.fc_window > UCT_PERF_TEST_MAX_FC_WINDOW) {
            if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("AM flow-control window (%d) too large (should be <= %d)",
                          params->uct.fc_window, UCT_PERF_TEST_MAX_FC_WINDOW);
            }
            return UCS_ERR_INVALID_PARAM;
        }

        if ((params->flags & UCX_PERF_TEST_FLAG_ONE_SIDED) &&
            (params->flags & UCX_PERF_TEST_FLAG_VERBOSE))
        {
            ucs_warn("Running active-message test with on-sided progress");
        }
    }

    if (UCT_PERF_DATA_LAYOUT_ZCOPY == params->uct.data_layout) {
        if (params->msg_size_cnt > max_iov) {
            if ((params->flags & UCX_PERF_TEST_FLAG_VERBOSE) ||
                !params->msg_size_cnt) {
                ucs_error("Wrong number of IOV entries. Requested is %lu, "
                          "should be in the range 1...%lu", params->msg_size_cnt,
                          max_iov);
            }
            return UCS_ERR_UNSUPPORTED;
        }
        /* if msg_size_cnt == 1 the message size checked above */
        if ((UCX_PERF_CMD_AM == params->command) && (params->msg_size_cnt > 1)) {
            if (params->am_hdr_size > params->msg_size_list[0]) {
                if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                    ucs_error("AM header size (%lu) larger than the first IOV "
                              "message size (%lu)", params->am_hdr_size,
                              params->msg_size_list[0]);
                }
                return UCS_ERR_INVALID_PARAM;
            }
        }
    }

    status = uct_perf_test_check_md_support(params, params->send_mem_type, &md_attr);
    if (status != UCS_OK) {
        return status;
    }

    status = uct_perf_test_check_md_support(params, params->recv_mem_type, &md_attr);
    if (status != UCS_OK) {
        return status;
    }

    return UCS_OK;
}

static ucs_status_t uct_perf_test_setup_endpoints(ucx_perf_context_t *perf)
{
    const size_t buffer_size = 2048;
    ucx_perf_ep_info_t info, *remote_info;
    unsigned group_size, i, group_index;
    uct_device_addr_t *dev_addr;
    uct_iface_addr_t *iface_addr;
    uct_ep_addr_t *ep_addr;
    uct_iface_attr_t iface_attr;
    uct_md_attr_t md_attr;
    uct_ep_params_t ep_params;
    void *rkey_buffer;
    ucs_status_t status;
    struct iovec vec[5];
    void *buffer;
    void *req;

    buffer = malloc(buffer_size);
    if (buffer == NULL) {
        ucs_error("Failed to allocate RTE buffer");
        status = UCS_ERR_NO_MEMORY;
        goto err;
    }

    status = uct_iface_query(perf->uct.iface, &iface_attr);
    if (status != UCS_OK) {
        ucs_error("Failed to uct_iface_query: %s", ucs_status_string(status));
        goto err_free;
    }

    status = uct_md_query(perf->uct.md, &md_attr);
    if (status != UCS_OK) {
        ucs_error("Failed to uct_md_query: %s", ucs_status_string(status));
        goto err_free;
    }

    if (md_attr.cap.flags & (UCT_MD_FLAG_ALLOC|UCT_MD_FLAG_REG)) {
        info.rkey_size      = md_attr.rkey_packed_size;
    } else {
        info.rkey_size      = 0;
    }
    info.uct.dev_addr_len   = iface_attr.device_addr_len;
    info.uct.iface_addr_len = iface_attr.iface_addr_len;
    info.uct.ep_addr_len    = iface_attr.ep_addr_len;
    info.recv_buffer        = (uintptr_t)perf->recv_buffer;

    rkey_buffer             = buffer;
    dev_addr                = UCS_PTR_BYTE_OFFSET(rkey_buffer, info.rkey_size);
    iface_addr              = UCS_PTR_BYTE_OFFSET(dev_addr, info.uct.dev_addr_len);
    ep_addr                 = UCS_PTR_BYTE_OFFSET(iface_addr, info.uct.iface_addr_len);
    ucs_assert_always(UCS_PTR_BYTE_OFFSET(ep_addr, info.uct.ep_addr_len) <=
                      UCS_PTR_BYTE_OFFSET(buffer, buffer_size));

    status = uct_iface_get_device_address(perf->uct.iface, dev_addr);
    if (status != UCS_OK) {
        ucs_error("Failed to uct_iface_get_device_address: %s",
                  ucs_status_string(status));
        goto err_free;
    }

    status = uct_iface_get_address(perf->uct.iface, iface_addr);
    if (status != UCS_OK) {
        ucs_error("Failed to uct_iface_get_address: %s", ucs_status_string(status));
        goto err_free;
    }

    if (info.rkey_size > 0) {
        memset(rkey_buffer, 0, info.rkey_size);
        status = uct_md_mkey_pack(perf->uct.md, perf->uct.recv_mem.memh, rkey_buffer);
        if (status != UCS_OK) {
            ucs_error("Failed to uct_rkey_pack: %s", ucs_status_string(status));
            goto err_free;
        }
    }

    group_size  = rte_call(perf, group_size);
    group_index = rte_call(perf, group_index);

    perf->uct.peers = calloc(group_size, sizeof(*perf->uct.peers));
    if (perf->uct.peers == NULL) {
        goto err_free;
    }

    ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
    ep_params.iface      = perf->uct.iface;
    if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
        for (i = 0; i < group_size; ++i) {
            if (i == group_index) {
                continue;
            }

            status = uct_ep_create(&ep_params, &perf->uct.peers[i].ep);
            if (status != UCS_OK) {
                ucs_error("Failed to uct_ep_create: %s", ucs_status_string(status));
                goto err_destroy_eps;
            }
            status = uct_ep_get_address(perf->uct.peers[i].ep, ep_addr);
            if (status != UCS_OK) {
                ucs_error("Failed to uct_ep_get_address: %s", ucs_status_string(status));
                goto err_destroy_eps;
            }
        }
    } else if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
        ep_params.field_mask |= UCT_EP_PARAM_FIELD_DEV_ADDR |
                                UCT_EP_PARAM_FIELD_IFACE_ADDR;
    }

    vec[0].iov_base         = &info;
    vec[0].iov_len          = sizeof(info);
    vec[1].iov_base         = buffer;
    vec[1].iov_len          = info.rkey_size + info.uct.dev_addr_len +
                              info.uct.iface_addr_len + info.uct.ep_addr_len;

    rte_call(perf, post_vec, vec, 2, &req);
    rte_call(perf, exchange_vec, req);

    for (i = 0; i < group_size; ++i) {
        if (i == group_index) {
            continue;
        }

        rte_call(perf, recv, i, buffer, buffer_size, req);

        remote_info = buffer;
        rkey_buffer = remote_info + 1;
        dev_addr    = UCS_PTR_BYTE_OFFSET(rkey_buffer, remote_info->rkey_size);
        iface_addr  = UCS_PTR_BYTE_OFFSET(dev_addr, remote_info->uct.dev_addr_len);
        ep_addr     = UCS_PTR_BYTE_OFFSET(iface_addr, remote_info->uct.iface_addr_len);
        perf->uct.peers[i].remote_addr = remote_info->recv_buffer;

        if (!uct_iface_is_reachable(perf->uct.iface, dev_addr,
                                    remote_info->uct.iface_addr_len ?
                                    iface_addr : NULL)) {
            ucs_error("Destination is unreachable");
            status = UCS_ERR_UNREACHABLE;
            goto err_destroy_eps;
        }

        if (remote_info->rkey_size > 0) {
            status = uct_rkey_unpack(perf->uct.cmpt, rkey_buffer,
                                     &perf->uct.peers[i].rkey);
            if (status != UCS_OK) {
                ucs_error("Failed to uct_rkey_unpack: %s", ucs_status_string(status));
                goto err_destroy_eps;
            }
        } else {
            perf->uct.peers[i].rkey.handle = NULL;
            perf->uct.peers[i].rkey.rkey   = UCT_INVALID_RKEY;
        }

        if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {
            status = uct_ep_connect_to_ep(perf->uct.peers[i].ep, dev_addr, ep_addr);
        } else if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) {
            ep_params.dev_addr   = dev_addr;
            ep_params.iface_addr = iface_addr;
            status = uct_ep_create(&ep_params, &perf->uct.peers[i].ep);
        } else {
            status = UCS_ERR_UNSUPPORTED;
        }
        if (status != UCS_OK) {
            ucs_error("Failed to connect endpoint: %s", ucs_status_string(status));
            goto err_destroy_eps;
        }
    }
    uct_perf_iface_flush_b(perf);

    free(buffer);
    uct_perf_barrier(perf);
    return UCS_OK;

err_destroy_eps:
    for (i = 0; i < group_size; ++i) {
        if (perf->uct.peers[i].rkey.rkey != UCT_INVALID_RKEY) {
            uct_rkey_release(perf->uct.cmpt, &perf->uct.peers[i].rkey);
        }
        if (perf->uct.peers[i].ep != NULL) {
            uct_ep_destroy(perf->uct.peers[i].ep);
        }
    }
    free(perf->uct.peers);
err_free:
    free(buffer);
err:
    return status;
}

static void uct_perf_test_cleanup_endpoints(ucx_perf_context_t *perf)
{
    unsigned group_size, group_index, i;

    uct_perf_barrier(perf);

    uct_iface_set_am_handler(perf->uct.iface, UCT_PERF_TEST_AM_ID, NULL, NULL, 0);

    group_size  = rte_call(perf, group_size);
    group_index = rte_call(perf, group_index);

    for (i = 0; i < group_size; ++i) {
        if (i != group_index) {
            if (perf->uct.peers[i].rkey.rkey != UCT_INVALID_RKEY) {
                uct_rkey_release(perf->uct.cmpt, &perf->uct.peers[i].rkey);
            }
            if (perf->uct.peers[i].ep) {
                uct_ep_destroy(perf->uct.peers[i].ep);
            }
        }
    }
    free(perf->uct.peers);
}

static ucs_status_t ucp_perf_test_fill_params(ucx_perf_params_t *params,
                                               ucp_params_t *ucp_params)
{
    ucs_status_t status;
    size_t message_size;

    message_size = ucx_perf_get_message_size(params);
    switch (params->command) {
    case UCX_PERF_CMD_PUT:
    case UCX_PERF_CMD_GET:
        ucp_params->features |= UCP_FEATURE_RMA;
        break;
    case UCX_PERF_CMD_ADD:
    case UCX_PERF_CMD_FADD:
    case UCX_PERF_CMD_SWAP:
    case UCX_PERF_CMD_CSWAP:
        if (message_size == sizeof(uint32_t)) {
            ucp_params->features |= UCP_FEATURE_AMO32;
        } else if (message_size == sizeof(uint64_t)) {
            ucp_params->features |= UCP_FEATURE_AMO64;
        } else {
            if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("Atomic size should be either 32 or 64 bit");
            }
            return UCS_ERR_INVALID_PARAM;
        }

        break;
    case UCX_PERF_CMD_TAG:
    case UCX_PERF_CMD_TAG_SYNC:
        ucp_params->features    |= UCP_FEATURE_TAG;
        ucp_params->field_mask  |= UCP_PARAM_FIELD_REQUEST_SIZE;
        ucp_params->request_size = sizeof(ucp_perf_request_t);
        break;
    case UCX_PERF_CMD_STREAM:
        ucp_params->features    |= UCP_FEATURE_STREAM;
        ucp_params->field_mask  |= UCP_PARAM_FIELD_REQUEST_SIZE;
        ucp_params->request_size = sizeof(ucp_perf_request_t);
        break;
    default:
        if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Invalid test command");
        }
        return UCS_ERR_INVALID_PARAM;
    }

    status = ucx_perf_test_check_params(params);
    if (status != UCS_OK) {
        return status;
    }

    return UCS_OK;
}

static ucs_status_t ucp_perf_test_alloc_iov_mem(ucp_perf_datatype_t datatype,
                                                size_t iovcnt, unsigned thread_count,
                                                ucp_dt_iov_t **iov_p)
{
    ucp_dt_iov_t *iov;

    if (UCP_PERF_DATATYPE_IOV == datatype) {
        iov = malloc(sizeof(*iov) * iovcnt * thread_count);
        if (NULL == iov) {
            ucs_error("Failed allocate IOV buffer with iovcnt=%lu", iovcnt);
            return UCS_ERR_NO_MEMORY;
        }
        *iov_p = iov;
    }

    return UCS_OK;
}

static ucs_status_t
ucp_perf_test_alloc_host(const ucx_perf_context_t *perf, size_t length,
                         void **address_p, ucp_mem_h *memh, int non_blk_flag)
{
    ucp_mem_map_params_t mem_map_params;
    ucp_mem_attr_t mem_attr;
    ucs_status_t status;

    mem_map_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
                                UCP_MEM_MAP_PARAM_FIELD_LENGTH |
                                UCP_MEM_MAP_PARAM_FIELD_FLAGS;
    mem_map_params.address    = *address_p;
    mem_map_params.length     = length;
    mem_map_params.flags      = UCP_MEM_MAP_ALLOCATE;
    if (perf->params.flags & UCX_PERF_TEST_FLAG_MAP_NONBLOCK) {
        mem_map_params.flags |= non_blk_flag;
    }

    status = ucp_mem_map(perf->ucp.context, &mem_map_params, memh);
    if (status != UCS_OK) {
        goto err;
    }

    mem_attr.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS;
    status = ucp_mem_query(*memh, &mem_attr);
    if (status != UCS_OK) {
        goto err;
    }

    *address_p = mem_attr.address;
    return UCS_OK;

err:
    return status;
}

static void ucp_perf_test_free_host(const ucx_perf_context_t *perf,
                                    void *address, ucp_mem_h memh)
{
    ucs_status_t status;

    status = ucp_mem_unmap(perf->ucp.context, memh);
    if (status != UCS_OK) {
        ucs_warn("ucp_mem_unmap() failed: %s", ucs_status_string(status));
    }
}

static ucs_status_t ucp_perf_test_alloc_mem(ucx_perf_context_t *perf)
{
    ucx_perf_params_t *params = &perf->params;
    ucs_status_t status;
    size_t buffer_size;

    if (params->iov_stride) {
        buffer_size = params->msg_size_cnt * params->iov_stride;
    } else {
        buffer_size = ucx_perf_get_message_size(params);
    }

    /* Allocate send buffer memory */
    perf->send_buffer = NULL;
    status = perf->allocator->ucp_alloc(perf, buffer_size * params->thread_count,
                                        &perf->send_buffer, &perf->ucp.send_memh,
                                        UCP_MEM_MAP_NONBLOCK);
    if (status != UCS_OK) {
        goto err;
    }

    /* Allocate receive buffer memory */
    perf->recv_buffer = NULL;
    status = perf->allocator->ucp_alloc(perf, buffer_size * params->thread_count,
                                        &perf->recv_buffer, &perf->ucp.recv_memh,
                                        0);
    if (status != UCS_OK) {
        goto err_free_send_buffer;
    }

    /* Allocate IOV datatype memory */
    perf->ucp.send_iov = NULL;
    status = ucp_perf_test_alloc_iov_mem(params->ucp.send_datatype,
                                         perf->params.msg_size_cnt,
                                         params->thread_count,
                                         &perf->ucp.send_iov);
    if (UCS_OK != status) {
        goto err_free_buffers;
    }

    perf->ucp.recv_iov = NULL;
    status = ucp_perf_test_alloc_iov_mem(params->ucp.recv_datatype,
                                         perf->params.msg_size_cnt,
                                         params->thread_count,
                                         &perf->ucp.recv_iov);
    if (UCS_OK != status) {
        goto err_free_send_iov_buffers;
    }

    return UCS_OK;

err_free_send_iov_buffers:
    free(perf->ucp.send_iov);
err_free_buffers:
    perf->allocator->ucp_free(perf, perf->recv_buffer, perf->ucp.recv_memh);
err_free_send_buffer:
    perf->allocator->ucp_free(perf, perf->send_buffer, perf->ucp.send_memh);
err:
    return UCS_ERR_NO_MEMORY;
}

static void ucp_perf_test_free_mem(ucx_perf_context_t *perf)
{
    free(perf->ucp.recv_iov);
    free(perf->ucp.send_iov);
    perf->allocator->ucp_free(perf, perf->recv_buffer, perf->ucp.recv_memh);
    perf->allocator->ucp_free(perf, perf->send_buffer, perf->ucp.send_memh);
}

static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf,
                                      unsigned group_size)
{
    ucs_status_ptr_t    *reqs;
    ucp_tag_recv_info_t info;
    ucs_status_t        status;
    unsigned i;

    reqs = calloc(sizeof(*reqs), group_size);

    for (i = 0; i < group_size; ++i) {
        if (perf->ucp.peers[i].rkey != NULL) {
            ucp_rkey_destroy(perf->ucp.peers[i].rkey);
        }
        if (perf->ucp.peers[i].ep != NULL) {
            reqs[i] = ucp_disconnect_nb(perf->ucp.peers[i].ep);
        }
    }

    for (i = 0; i < group_size; ++i) {
        if (!UCS_PTR_IS_PTR(reqs[i])) {
            continue;
        }

        do {
            ucp_worker_progress(perf->ucp.worker);
            status = ucp_request_test(reqs[i], &info);
        } while (status == UCS_INPROGRESS);
        ucp_request_release(reqs[i]);
    }

    free(reqs);
    free(perf->ucp.peers);
}

static ucs_status_t ucp_perf_test_exchange_status(ucx_perf_context_t *perf,
                                                  ucs_status_t status)
{
    unsigned group_size  = rte_call(perf, group_size);
    ucs_status_t collective_status = status;
    struct iovec vec;
    void *req = NULL;
    unsigned i;

    vec.iov_base = &status;
    vec.iov_len  = sizeof(status);

    rte_call(perf, post_vec, &vec, 1, &req);
    rte_call(perf, exchange_vec, req);
    for (i = 0; i < group_size; ++i) {
        rte_call(perf, recv, i, &status, sizeof(status), req);
        if (status != UCS_OK) {
            collective_status = status;
        }
    }
    return collective_status;
}

static ucs_status_t ucp_perf_test_setup_endpoints(ucx_perf_context_t *perf,
                                                  uint64_t features)
{
    const size_t buffer_size = 2048;
    ucx_perf_ep_info_t info, *remote_info;
    unsigned group_size, i, group_index;
    ucp_address_t *address;
    size_t address_length = 0;
    ucp_ep_params_t ep_params;
    ucs_status_t status;
    struct iovec vec[3];
    void *rkey_buffer;
    void *req = NULL;
    void *buffer;

    group_size  = rte_call(perf, group_size);
    group_index = rte_call(perf, group_index);

    status = ucp_worker_get_address(perf->ucp.worker, &address, &address_length);
    if (status != UCS_OK) {
        if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("ucp_worker_get_address() failed: %s", ucs_status_string(status));
        }
        goto err;
    }

    info.ucp.addr_len  = address_length;
    info.recv_buffer   = (uintptr_t)perf->recv_buffer;

    vec[0].iov_base    = &info;
    vec[0].iov_len     = sizeof(info);
    vec[1].iov_base    = address;
    vec[1].iov_len     = address_length;

    if (features & (UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64)) {
        status = ucp_rkey_pack(perf->ucp.context, perf->ucp.recv_memh,
                               &rkey_buffer, &info.rkey_size);
        if (status != UCS_OK) {
            if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("ucp_rkey_pack() failed: %s", ucs_status_string(status));
            }
            ucp_worker_release_address(perf->ucp.worker, address);
            goto err;
        }

        vec[2].iov_base = rkey_buffer;
        vec[2].iov_len  = info.rkey_size;
        rte_call(perf, post_vec, vec, 3, &req);
        ucp_rkey_buffer_release(rkey_buffer);
    } else {
        info.rkey_size  = 0;
        rte_call(perf, post_vec, vec, 2, &req);
    }

    ucp_worker_release_address(perf->ucp.worker, address);
    rte_call(perf, exchange_vec, req);

    perf->ucp.peers = calloc(group_size, sizeof(*perf->ucp.peers));
    if (perf->ucp.peers == NULL) {
        goto err;
    }

    buffer = malloc(buffer_size);
    if (buffer == NULL) {
        ucs_error("Failed to allocate RTE receive buffer");
        status = UCS_ERR_NO_MEMORY;
        goto err_destroy_eps;
    }

    for (i = 0; i < group_size; ++i) {
        if (i == group_index) {
            continue;
        }

        rte_call(perf, recv, i, buffer, buffer_size, req);

        remote_info = buffer;
        address     = (ucp_address_t*)(remote_info + 1);
        rkey_buffer = UCS_PTR_BYTE_OFFSET(address, remote_info->ucp.addr_len);
        perf->ucp.peers[i].remote_addr = remote_info->recv_buffer;

        ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
        ep_params.address    = address;

        status = ucp_ep_create(perf->ucp.worker, &ep_params, &perf->ucp.peers[i].ep);
        if (status != UCS_OK) {
            if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                ucs_error("ucp_ep_create() failed: %s", ucs_status_string(status));
            }
            goto err_free_buffer;
        }

        if (remote_info->rkey_size > 0) {
            status = ucp_ep_rkey_unpack(perf->ucp.peers[i].ep, rkey_buffer,
                                        &perf->ucp.peers[i].rkey);
            if (status != UCS_OK) {
                if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) {
                    ucs_fatal("ucp_rkey_unpack() failed: %s", ucs_status_string(status));
                }
                goto err_free_buffer;
            }
        } else {
            perf->ucp.peers[i].rkey = NULL;
        }
    }

    free(buffer);

    status = ucp_perf_test_exchange_status(perf, UCS_OK);
    if (status != UCS_OK) {
        ucp_perf_test_destroy_eps(perf, group_size);
    }

    /* force wireup completion */
    status = ucp_worker_flush(perf->ucp.worker);
    if (status != UCS_OK) {
        ucs_warn("ucp_worker_flush() failed: %s", ucs_status_string(status));
    }

    return status;

err_free_buffer:
    free(buffer);
err_destroy_eps:
    ucp_perf_test_destroy_eps(perf, group_size);
err:
    (void)ucp_perf_test_exchange_status(perf, status);
    return status;
}

static void ucp_perf_test_cleanup_endpoints(ucx_perf_context_t *perf)
{
    unsigned group_size;

    ucp_perf_barrier(perf);

    group_size  = rte_call(perf, group_size);

    ucp_perf_test_destroy_eps(perf, group_size);
}

static void ucx_perf_set_warmup(ucx_perf_context_t* perf, ucx_perf_params_t* params)
{
    perf->max_iter = ucs_min(params->warmup_iter, ucs_div_round_up(params->max_iter, 10));
    perf->report_interval = ULONG_MAX;
}

static ucs_status_t uct_perf_create_md(ucx_perf_context_t *perf)
{
    uct_component_h *uct_components;
    uct_component_attr_t component_attr;
    uct_tl_resource_desc_t *tl_resources;
    unsigned md_index, num_components;
    unsigned tl_index, num_tl_resources;
    unsigned cmpt_index;
    ucs_status_t status;
    uct_md_h md;
    uct_md_config_t *md_config;


    status = uct_query_components(&uct_components, &num_components);
    if (status != UCS_OK) {
        goto out;
    }

    for (cmpt_index = 0; cmpt_index < num_components; ++cmpt_index) {

        component_attr.field_mask = UCT_COMPONENT_ATTR_FIELD_MD_RESOURCE_COUNT;
        status = uct_component_query(uct_components[cmpt_index], &component_attr);
        if (status != UCS_OK) {
            goto out_release_components_list;
        }

        component_attr.field_mask   = UCT_COMPONENT_ATTR_FIELD_MD_RESOURCES;
        component_attr.md_resources = alloca(sizeof(*component_attr.md_resources) *
                                             component_attr.md_resource_count);
        status = uct_component_query(uct_components[cmpt_index], &component_attr);
        if (status != UCS_OK) {
            goto out_release_components_list;
        }

        for (md_index = 0; md_index < component_attr.md_resource_count; ++md_index) {
            status = uct_md_config_read(uct_components[cmpt_index], NULL, NULL,
                                        &md_config);
            if (status != UCS_OK) {
                goto out_release_components_list;
            }

            ucs_strncpy_zero(perf->params.uct.md_name,
                             component_attr.md_resources[md_index].md_name,
                             UCT_MD_NAME_MAX);

            status = uct_md_open(uct_components[cmpt_index],
                                 component_attr.md_resources[md_index].md_name,
                                 md_config, &md);
            uct_config_release(md_config);
            if (status != UCS_OK) {
                goto out_release_components_list;
            }

            status = uct_md_query_tl_resources(md, &tl_resources, &num_tl_resources);
            if (status != UCS_OK) {
                uct_md_close(md);
                goto out_release_components_list;
            }

            for (tl_index = 0; tl_index < num_tl_resources; ++tl_index) {
                if (!strcmp(perf->params.uct.tl_name,  tl_resources[tl_index].tl_name) &&
                    !strcmp(perf->params.uct.dev_name, tl_resources[tl_index].dev_name))
                {
                    uct_release_tl_resource_list(tl_resources);
                    perf->uct.cmpt = uct_components[cmpt_index];
                    perf->uct.md   = md;
                    status         = UCS_OK;
                    goto out_release_components_list;
                }
            }

            uct_md_close(md);
            uct_release_tl_resource_list(tl_resources);
        }
    }

    ucs_error("Cannot use transport %s on device %s", perf->params.uct.tl_name,
              perf->params.uct.dev_name);
    status = UCS_ERR_NO_DEVICE;

out_release_components_list:
    uct_release_component_list(uct_components);
out:
    return status;
}

void uct_perf_barrier(ucx_perf_context_t *perf)
{
    rte_call(perf, barrier, (void(*)(void*))uct_worker_progress,
             (void*)perf->uct.worker);
}

void ucp_perf_barrier(ucx_perf_context_t *perf)
{
    rte_call(perf, barrier, (void(*)(void*))ucp_worker_progress,
             (void*)perf->ucp.worker);
}

static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf)
{
    ucx_perf_params_t *params = &perf->params;
    uct_iface_config_t *iface_config;
    ucs_status_t status;
    uct_iface_params_t iface_params = {
        .field_mask           = UCT_IFACE_PARAM_FIELD_OPEN_MODE   |
                                UCT_IFACE_PARAM_FIELD_STATS_ROOT  |
                                UCT_IFACE_PARAM_FIELD_RX_HEADROOM |
                                UCT_IFACE_PARAM_FIELD_CPU_MASK,
        .open_mode            = UCT_IFACE_OPEN_MODE_DEVICE,
        .mode.device.tl_name  = params->uct.tl_name,
        .mode.device.dev_name = params->uct.dev_name,
        .stats_root           = ucs_stats_get_root(),
        .rx_headroom          = 0
    };
    UCS_CPU_ZERO(&iface_params.cpu_mask);

    status = ucs_async_context_init(&perf->uct.async, params->async_mode);
    if (status != UCS_OK) {
        goto out;
    }

    status = uct_worker_create(&perf->uct.async, params->thread_mode,
                               &perf->uct.worker);
    if (status != UCS_OK) {
        goto out_cleanup_async;
    }

    status = uct_perf_create_md(perf);
    if (status != UCS_OK) {
        goto out_destroy_worker;
    }

    status = uct_md_iface_config_read(perf->uct.md, params->uct.tl_name, NULL,
                                      NULL, &iface_config);
    if (status != UCS_OK) {
        goto out_destroy_md;
    }

    status = uct_iface_open(perf->uct.md, perf->uct.worker, &iface_params,
                            iface_config, &perf->uct.iface);
    uct_config_release(iface_config);
    if (status != UCS_OK) {
        ucs_error("Failed to open iface: %s", ucs_status_string(status));
        goto out_destroy_md;
    }

    status = uct_perf_test_check_capabilities(params, perf->uct.iface,
                                              perf->uct.md);
    /* sync status across all processes */
    status = ucp_perf_test_exchange_status(perf, status);
    if (status != UCS_OK) {
        goto out_iface_close;
    }

    status = uct_perf_test_alloc_mem(perf);
    if (status != UCS_OK) {
        goto out_iface_close;
    }

    /* Enable progress before `uct_iface_flush` and `uct_worker_progress` called
     * to give a chance to finish connection for some tranports (ib/ud, tcp).
     * They may return UCS_INPROGRESS from `uct_iface_flush` when connections are
     * in progress */
    uct_iface_progress_enable(perf->uct.iface,
                              UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);

    status = uct_perf_test_setup_endpoints(perf);
    if (status != UCS_OK) {
        ucs_error("Failed to setup endpoints: %s", ucs_status_string(status));
        goto out_free_mem;
    }

    return UCS_OK;

out_free_mem:
    uct_perf_test_free_mem(perf);
out_iface_close:
    uct_iface_close(perf->uct.iface);
out_destroy_md:
    uct_md_close(perf->uct.md);
out_destroy_worker:
    uct_worker_destroy(perf->uct.worker);
out_cleanup_async:
    ucs_async_context_cleanup(&perf->uct.async);
out:
    return status;
}

static void uct_perf_cleanup(ucx_perf_context_t *perf)
{
    uct_perf_test_cleanup_endpoints(perf);
    uct_perf_test_free_mem(perf);
    uct_iface_close(perf->uct.iface);
    uct_md_close(perf->uct.md);
    uct_worker_destroy(perf->uct.worker);
    ucs_async_context_cleanup(&perf->uct.async);
}

static ucs_status_t ucp_perf_setup(ucx_perf_context_t *perf)
{
    ucp_params_t ucp_params;
    ucp_worker_params_t worker_params;
    ucp_config_t *config;
    ucs_status_t status;

    ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
    ucp_params.features   = 0;

    status = ucp_perf_test_fill_params(&perf->params, &ucp_params);
    if (status != UCS_OK) {
        goto err;
    }

    status = ucp_config_read(NULL, NULL, &config);
    if (status != UCS_OK) {
        goto err;
    }

    status = ucp_init(&ucp_params, config, &perf->ucp.context);
    ucp_config_release(config);
    if (status != UCS_OK) {
        goto err;
    }

    worker_params.field_mask  = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
    worker_params.thread_mode = perf->params.thread_mode;

    status = ucp_worker_create(perf->ucp.context, &worker_params,
                               &perf->ucp.worker);
    if (status != UCS_OK) {
        goto err_cleanup;
    }

    status = ucp_perf_test_alloc_mem(perf);
    if (status != UCS_OK) {
        ucs_warn("ucp test failed to alocate memory");
        goto err_destroy_worker;
    }

    status = ucp_perf_test_setup_endpoints(perf, ucp_params.features);
    if (status != UCS_OK) {
        if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) {
            ucs_error("Failed to setup endpoints: %s", ucs_status_string(status));
        }
        goto err_free_mem;
    }

    return UCS_OK;

err_free_mem:
    ucp_perf_test_free_mem(perf);
err_destroy_worker:
    ucp_worker_destroy(perf->ucp.worker);
err_cleanup:
    ucp_cleanup(perf->ucp.context);
err:
    return status;
}

static void ucp_perf_cleanup(ucx_perf_context_t *perf)
{
    ucp_perf_test_cleanup_endpoints(perf);
    ucp_perf_barrier(perf);
    ucp_perf_test_free_mem(perf);
    ucp_worker_destroy(perf->ucp.worker);
    ucp_cleanup(perf->ucp.context);
}

static struct {
    ucs_status_t (*setup)(ucx_perf_context_t *perf);
    void         (*cleanup)(ucx_perf_context_t *perf);
    ucs_status_t (*run)(ucx_perf_context_t *perf);
    void         (*barrier)(ucx_perf_context_t *perf);
} ucx_perf_funcs[] = {
    [UCX_PERF_API_UCT] = {uct_perf_setup, uct_perf_cleanup,
                          uct_perf_test_dispatch, uct_perf_barrier},
    [UCX_PERF_API_UCP] = {ucp_perf_setup, ucp_perf_cleanup,
                          ucp_perf_test_dispatch, ucp_perf_barrier}
};

static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf,
                                          ucx_perf_result_t* result);

ucs_status_t ucx_perf_run(ucx_perf_params_t *params, ucx_perf_result_t *result)
{
    ucx_perf_context_t *perf;
    ucs_status_t status;

    ucx_perf_global_init();

    if (params->command == UCX_PERF_CMD_LAST) {
        ucs_error("Test is not selected");
        status = UCS_ERR_INVALID_PARAM;
        goto out;
    }

    if ((params->api != UCX_PERF_API_UCT) && (params->api != UCX_PERF_API_UCP)) {
        ucs_error("Invalid test API parameter (should be UCT or UCP)");
        status = UCS_ERR_INVALID_PARAM;
        goto out;
    }

    perf = malloc(sizeof(*perf));
    if (perf == NULL) {
        status = UCS_ERR_NO_MEMORY;
        goto out;
    }

    ucx_perf_test_init(perf, params);

    if (perf->allocator == NULL) {
        ucs_error("Unsupported memory types %s<->%s",
                  ucs_memory_type_names[params->send_mem_type],
                  ucs_memory_type_names[params->recv_mem_type]);
        status = UCS_ERR_UNSUPPORTED;
        goto out_free;
    }

    if ((params->api == UCX_PERF_API_UCT) &&
        (perf->allocator->mem_type != UCS_MEMORY_TYPE_HOST)) {
        ucs_warn("UCT tests also copy 2-byte values from %s memory to "
                 "%s memory, which may impact performance results",
                 ucs_memory_type_names[perf->allocator->mem_type],
                 ucs_memory_type_names[UCS_MEMORY_TYPE_HOST]);
    }

    status = perf->allocator->init(perf);
    if (status != UCS_OK) {
        goto out_free;
    }

    status = ucx_perf_funcs[params->api].setup(perf);
    if (status != UCS_OK) {
        goto out_free;
    }

    if (UCS_THREAD_MODE_SINGLE == params->thread_mode) {
        if (params->warmup_iter > 0) {
            ucx_perf_set_warmup(perf, params);
            status = ucx_perf_funcs[params->api].run(perf);
            if (status != UCS_OK) {
                goto out_cleanup;
            }

            ucx_perf_funcs[params->api].barrier(perf);
            ucx_perf_test_prepare_new_run(perf, params);
        }

        /* Run test */
        status = ucx_perf_funcs[params->api].run(perf);
        ucx_perf_funcs[params->api].barrier(perf);
        if (status == UCS_OK) {
            ucx_perf_calc_result(perf, result);
            rte_call(perf, report, result, perf->params.report_arg, 1);
        }
    } else {
        status = ucx_perf_thread_spawn(perf, result);
    }

out_cleanup:
    ucx_perf_funcs[params->api].cleanup(perf);
out_free:
    free(perf);
out:
    return status;
}

#if _OPENMP
/* multiple threads sharing the same worker/iface */

typedef struct {
    pthread_t           pt;
    int                 tid;
    int                 ntid;
    ucs_status_t*       statuses;
    ucx_perf_context_t  perf;
    ucx_perf_result_t   result;
} ucx_perf_thread_context_t;


static void* ucx_perf_thread_run_test(void* arg)
{
    ucx_perf_thread_context_t* tctx = (ucx_perf_thread_context_t*) arg;
    ucx_perf_result_t* result = &tctx->result;
    ucx_perf_context_t* perf = &tctx->perf;
    ucx_perf_params_t* params = &perf->params;
    ucs_status_t* statuses = tctx->statuses;
    int tid = tctx->tid;
    int i;

    if (params->warmup_iter > 0) {
        ucx_perf_set_warmup(perf, params);
        statuses[tid] = ucx_perf_funcs[params->api].run(perf);
        ucx_perf_funcs[params->api].barrier(perf);
        for (i = 0; i < tctx->ntid; i++) {
            if (UCS_OK != statuses[i]) {
                goto out;
            }
        }
        ucx_perf_test_prepare_new_run(perf, params);
    }

    /* Run test */
#pragma omp barrier
    statuses[tid] = ucx_perf_funcs[params->api].run(perf);
    ucx_perf_funcs[params->api].barrier(perf);
    for (i = 0; i < tctx->ntid; i++) {
        if (UCS_OK != statuses[i]) {
            goto out;
        }
    }
#pragma omp master
    {
        /* Assuming all threads are fairly treated, reporting only tid==0
            TODO: aggregate reports */
        ucx_perf_calc_result(perf, result);
        rte_call(perf, report, result, perf->params.report_arg, 1);
    }

out:
    return &statuses[tid];
}

static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf,
                                          ucx_perf_result_t* result)
{
    ucx_perf_thread_context_t* tctx;
    ucs_status_t* statuses;
    size_t message_size;
    ucs_status_t status;
    int ti, nti;

    message_size = ucx_perf_get_message_size(&perf->params);
    omp_set_num_threads(perf->params.thread_count);
    nti = perf->params.thread_count;

    tctx     = calloc(nti, sizeof(ucx_perf_thread_context_t));
    statuses = calloc(nti, sizeof(ucs_status_t));
    if ((tctx == NULL) || (statuses == NULL)) {
        status = UCS_ERR_NO_MEMORY;
        goto out_free;
    }

#pragma omp parallel private(ti)
{
    ti = omp_get_thread_num();
    tctx[ti].tid = ti;
    tctx[ti].ntid = nti;
    tctx[ti].statuses = statuses;
    tctx[ti].perf = *perf;
    /* Doctor the src and dst buffers to make them thread specific */
    tctx[ti].perf.send_buffer = UCS_PTR_BYTE_OFFSET(tctx[ti].perf.send_buffer,
                                                    ti * message_size);
    tctx[ti].perf.recv_buffer = UCS_PTR_BYTE_OFFSET(tctx[ti].perf.recv_buffer,
                                                    ti * message_size);
    tctx[ti].perf.offset = ti * message_size;
    ucx_perf_thread_run_test((void*)&tctx[ti]);
}

    status = UCS_OK;
    for (ti = 0; ti < nti; ti++) {
        if (UCS_OK != statuses[ti]) {
            ucs_error("Thread %d failed to run test: %s", tctx[ti].tid,
                      ucs_status_string(statuses[ti]));
            status = statuses[ti];
        }
    }

out_free:
    free(statuses);
    free(tctx);
    return status;
}
#else
static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf,
                                          ucx_perf_result_t* result) {
    ucs_error("Invalid test parameter (thread mode requested without OpenMP capabilities)");
    return UCS_ERR_INVALID_PARAM;
}
#endif /* _OPENMP */

void ucx_perf_global_init()
{
    static ucx_perf_allocator_t host_allocator = {
        .mem_type  = UCS_MEMORY_TYPE_HOST,
        .init      = ucs_empty_function_return_success,
        .ucp_alloc = ucp_perf_test_alloc_host,
        .ucp_free  = ucp_perf_test_free_host,
        .uct_alloc = uct_perf_test_alloc_host,
        .uct_free  = uct_perf_test_free_host,
        .memcpy    = ucx_perf_test_memcpy_host,
        .memset    = memset
    };
    UCS_MODULE_FRAMEWORK_DECLARE(ucx_perftest);

    ucx_perf_mem_type_allocators[UCS_MEMORY_TYPE_HOST] = &host_allocator;

    /* FIXME Memtype allocator modules must be loaded to global scope, otherwise
     * alloc hooks, which are using dlsym() to get pointer to original function,
     * do not work. Need to use bistro for memtype hooks to fix it.
     */
    UCS_MODULE_FRAMEWORK_LOAD(ucx_perftest, UCS_MODULE_LOAD_FLAG_GLOBAL);
}