/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED. * Copyright (C) The University of Tennessee and The University * of Tennessee Research Foundation. 2015-2016. ALL RIGHTS RESERVED. * Copyright (C) ARM Ltd. 2017. ALL RIGHTS RESERVED. * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #include #include #include #include #include #if _OPENMP #include #endif /* _OPENMP */ #define ATOMIC_OP_CONFIG(_size, _op32, _op64, _op, _msg, _params, _status) \ _status = __get_atomic_flag((_size), (_op32), (_op64), (_op)); \ if (_status != UCS_OK) { \ ucs_error("%s/%s does not support atomic %s for message size %zu bytes", \ (_params)->uct.tl_name, (_params)->uct.dev_name, \ (_msg)[_op], (_size)); \ return _status; \ } #define ATOMIC_OP_CHECK(_size, _attr, _required, _params, _msg) \ if (!ucs_test_all_flags(_attr, _required)) { \ if ((_params)->flags & UCX_PERF_TEST_FLAG_VERBOSE) { \ ucs_error("%s/%s does not support required "#_size"-bit atomic: %s", \ (_params)->uct.tl_name, (_params)->uct.dev_name, \ (_msg)[ucs_ffs64(~(_attr) & (_required))]); \ } \ return UCS_ERR_UNSUPPORTED; \ } typedef struct { union { struct { size_t dev_addr_len; size_t iface_addr_len; size_t ep_addr_len; } uct; struct { size_t addr_len; } ucp; }; size_t rkey_size; unsigned long recv_buffer; } ucx_perf_ep_info_t; const ucx_perf_allocator_t* ucx_perf_mem_type_allocators[UCS_MEMORY_TYPE_LAST]; static const char *perf_iface_ops[] = { [ucs_ilog2(UCT_IFACE_FLAG_AM_SHORT)] = "am short", [ucs_ilog2(UCT_IFACE_FLAG_AM_BCOPY)] = "am bcopy", [ucs_ilog2(UCT_IFACE_FLAG_AM_ZCOPY)] = "am zcopy", [ucs_ilog2(UCT_IFACE_FLAG_PUT_SHORT)] = "put short", [ucs_ilog2(UCT_IFACE_FLAG_PUT_BCOPY)] = "put bcopy", [ucs_ilog2(UCT_IFACE_FLAG_PUT_ZCOPY)] = "put zcopy", [ucs_ilog2(UCT_IFACE_FLAG_GET_SHORT)] = "get short", [ucs_ilog2(UCT_IFACE_FLAG_GET_BCOPY)] = "get bcopy", [ucs_ilog2(UCT_IFACE_FLAG_GET_ZCOPY)] = "get zcopy", [ucs_ilog2(UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE)] = "peer failure handler", [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_IFACE)] = "connect to iface", [ucs_ilog2(UCT_IFACE_FLAG_CONNECT_TO_EP)] = "connect to ep", [ucs_ilog2(UCT_IFACE_FLAG_AM_DUP)] = "full reliability", [ucs_ilog2(UCT_IFACE_FLAG_CB_SYNC)] = "sync callback", [ucs_ilog2(UCT_IFACE_FLAG_CB_ASYNC)] = "async callback", [ucs_ilog2(UCT_IFACE_FLAG_EVENT_SEND_COMP)] = "send completion event", [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV)] = "tag or active message event", [ucs_ilog2(UCT_IFACE_FLAG_EVENT_RECV_SIG)] = "signaled message event", [ucs_ilog2(UCT_IFACE_FLAG_PENDING)] = "pending", [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_SHORT)] = "tag eager short", [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_BCOPY)] = "tag eager bcopy", [ucs_ilog2(UCT_IFACE_FLAG_TAG_EAGER_ZCOPY)] = "tag eager zcopy", [ucs_ilog2(UCT_IFACE_FLAG_TAG_RNDV_ZCOPY)] = "tag rndv zcopy" }; static const char *perf_atomic_op[] = { [UCT_ATOMIC_OP_ADD] = "add", [UCT_ATOMIC_OP_AND] = "and", [UCT_ATOMIC_OP_OR] = "or" , [UCT_ATOMIC_OP_XOR] = "xor" }; static const char *perf_atomic_fop[] = { [UCT_ATOMIC_OP_ADD] = "fetch-add", [UCT_ATOMIC_OP_AND] = "fetch-and", [UCT_ATOMIC_OP_OR] = "fetch-or", [UCT_ATOMIC_OP_XOR] = "fetch-xor", [UCT_ATOMIC_OP_SWAP] = "swap", [UCT_ATOMIC_OP_CSWAP] = "cswap" }; /* * This Quickselect routine is based on the algorithm described in * "Numerical recipes in C", Second Edition, * Cambridge University Press, 1992, Section 8.5, ISBN 0-521-43108-5 * This code by Nicolas Devillard - 1998. Public domain. */ static ucs_time_t __find_median_quick_select(ucs_time_t arr[], int n) { int low, high ; int median; int middle, ll, hh; #define ELEM_SWAP(a,b) { register ucs_time_t t=(a);(a)=(b);(b)=t; } low = 0 ; high = n-1 ; median = (low + high) / 2; for (;;) { if (high <= low) /* One element only */ return arr[median] ; if (high == low + 1) { /* Two elements only */ if (arr[low] > arr[high]) ELEM_SWAP(arr[low], arr[high]) ; return arr[median] ; } /* Find median of low, middle and high items; swap into position low */ middle = (low + high) / 2; if (arr[middle] > arr[high]) ELEM_SWAP(arr[middle], arr[high]) ; if (arr[low] > arr[high]) ELEM_SWAP(arr[low], arr[high]) ; if (arr[middle] > arr[low]) ELEM_SWAP(arr[middle], arr[low]) ; /* Swap low item (now in position middle) into position (low+1) */ ELEM_SWAP(arr[middle], arr[low+1]) ; /* Nibble from each end towards middle, swapping items when stuck */ ll = low + 1; hh = high; for (;;) { do ll++; while (arr[low] > arr[ll]) ; do hh--; while (arr[hh] > arr[low]) ; if (hh < ll) break; ELEM_SWAP(arr[ll], arr[hh]) ; } /* Swap middle item (in position low) back into correct position */ ELEM_SWAP(arr[low], arr[hh]) ; /* Re-set active partition */ if (hh <= median) low = ll; if (hh >= median) high = hh - 1; } } static ucs_status_t uct_perf_test_alloc_host(const ucx_perf_context_t *perf, size_t length, unsigned flags, uct_allocated_memory_t *alloc_mem) { ucs_status_t status; status = uct_iface_mem_alloc(perf->uct.iface, length, flags, "perftest", alloc_mem); if (status != UCS_OK) { ucs_free(alloc_mem); ucs_error("failed to allocate memory: %s", ucs_status_string(status)); return status; } ucs_assert(alloc_mem->md == perf->uct.md); return UCS_OK; } static void uct_perf_test_free_host(const ucx_perf_context_t *perf, uct_allocated_memory_t *alloc_mem) { uct_iface_mem_free(alloc_mem); } static void ucx_perf_test_memcpy_host(void *dst, ucs_memory_type_t dst_mem_type, const void *src, ucs_memory_type_t src_mem_type, size_t count) { if ((dst_mem_type != UCS_MEMORY_TYPE_HOST) || (src_mem_type != UCS_MEMORY_TYPE_HOST)) { ucs_error("wrong memory type passed src - %d, dst - %d", src_mem_type, dst_mem_type); } else { memcpy(dst, src, count); } } static ucs_status_t uct_perf_test_alloc_mem(ucx_perf_context_t *perf) { ucx_perf_params_t *params = &perf->params; ucs_status_t status; unsigned flags; size_t buffer_size; if ((UCT_PERF_DATA_LAYOUT_ZCOPY == params->uct.data_layout) && params->iov_stride) { buffer_size = params->msg_size_cnt * params->iov_stride; } else { buffer_size = ucx_perf_get_message_size(params); } /* TODO use params->alignment */ flags = (params->flags & UCX_PERF_TEST_FLAG_MAP_NONBLOCK) ? UCT_MD_MEM_FLAG_NONBLOCK : 0; flags |= UCT_MD_MEM_ACCESS_ALL; /* Allocate send buffer memory */ status = perf->allocator->uct_alloc(perf, buffer_size * params->thread_count, flags, &perf->uct.send_mem); if (status != UCS_OK) { goto err; } perf->send_buffer = perf->uct.send_mem.address; /* Allocate receive buffer memory */ status = perf->allocator->uct_alloc(perf, buffer_size * params->thread_count, flags, &perf->uct.recv_mem); if (status != UCS_OK) { goto err_free_send; } perf->recv_buffer = perf->uct.recv_mem.address; /* Allocate IOV datatype memory */ perf->params.msg_size_cnt = params->msg_size_cnt; perf->uct.iov = malloc(sizeof(*perf->uct.iov) * perf->params.msg_size_cnt * params->thread_count); if (NULL == perf->uct.iov) { status = UCS_ERR_NO_MEMORY; ucs_error("Failed allocate send IOV(%lu) buffer: %s", perf->params.msg_size_cnt, ucs_status_string(status)); goto err_free_recv; } perf->offset = 0; ucs_debug("allocated memory. Send buffer %p, Recv buffer %p", perf->send_buffer, perf->recv_buffer); return UCS_OK; err_free_recv: perf->allocator->uct_free(perf, &perf->uct.recv_mem); err_free_send: perf->allocator->uct_free(perf, &perf->uct.send_mem); err: return status; } static void uct_perf_test_free_mem(ucx_perf_context_t *perf) { perf->allocator->uct_free(perf, &perf->uct.send_mem); perf->allocator->uct_free(perf, &perf->uct.recv_mem); free(perf->uct.iov); } void ucx_perf_test_start_clock(ucx_perf_context_t *perf) { ucs_time_t start_time = ucs_get_time(); perf->start_time_acc = ucs_get_accurate_time(); perf->end_time = (perf->params.max_time == 0.0) ? UINT64_MAX : ucs_time_from_sec(perf->params.max_time) + start_time; perf->prev_time = start_time; perf->prev.time = start_time; perf->prev.time_acc = perf->start_time_acc; perf->current.time_acc = perf->start_time_acc; } /* Initialize/reset all parameters that could be modified by the warm-up run */ static void ucx_perf_test_prepare_new_run(ucx_perf_context_t *perf, ucx_perf_params_t *params) { unsigned i; perf->max_iter = (perf->params.max_iter == 0) ? UINT64_MAX : perf->params.max_iter; perf->report_interval = ucs_time_from_sec(perf->params.report_interval); perf->current.time = 0; perf->current.msgs = 0; perf->current.bytes = 0; perf->current.iters = 0; perf->prev.msgs = 0; perf->prev.bytes = 0; perf->prev.iters = 0; perf->timing_queue_head = 0; for (i = 0; i < TIMING_QUEUE_SIZE; ++i) { perf->timing_queue[i] = 0; } ucx_perf_test_start_clock(perf); } static void ucx_perf_test_init(ucx_perf_context_t *perf, ucx_perf_params_t *params) { unsigned group_index; perf->params = *params; perf->offset = 0; group_index = rte_call(perf, group_index); if (0 == group_index) { perf->allocator = ucx_perf_mem_type_allocators[params->send_mem_type]; } else { perf->allocator = ucx_perf_mem_type_allocators[params->recv_mem_type]; } ucx_perf_test_prepare_new_run(perf, params); } void ucx_perf_calc_result(ucx_perf_context_t *perf, ucx_perf_result_t *result) { ucs_time_t median; double factor; if (perf->params.test_type == UCX_PERF_TEST_TYPE_PINGPONG) { factor = 2.0; } else { factor = 1.0; } result->iters = perf->current.iters; result->bytes = perf->current.bytes; result->elapsed_time = perf->current.time_acc - perf->start_time_acc; /* Latency */ median = __find_median_quick_select(perf->timing_queue, TIMING_QUEUE_SIZE); result->latency.typical = ucs_time_to_sec(median) / factor; result->latency.moment_average = (perf->current.time_acc - perf->prev.time_acc) / (perf->current.iters - perf->prev.iters) / factor; result->latency.total_average = (perf->current.time_acc - perf->start_time_acc) / perf->current.iters / factor; /* Bandwidth */ result->bandwidth.typical = 0.0; // Undefined result->bandwidth.moment_average = (perf->current.bytes - perf->prev.bytes) / (perf->current.time_acc - perf->prev.time_acc) * factor; result->bandwidth.total_average = perf->current.bytes / (perf->current.time_acc - perf->start_time_acc) * factor; /* Packet rate */ result->msgrate.typical = 0.0; // Undefined result->msgrate.moment_average = (perf->current.msgs - perf->prev.msgs) / (perf->current.time_acc - perf->prev.time_acc) * factor; result->msgrate.total_average = perf->current.msgs / (perf->current.time_acc - perf->start_time_acc) * factor; } static ucs_status_t ucx_perf_test_check_params(ucx_perf_params_t *params) { size_t it; /* check if zero-size messages are requested and supported */ if ((/* they are not supported by: */ /* - UCT tests, except UCT AM Short/Bcopy */ (params->api == UCX_PERF_API_UCT) || (/* - UCP RMA and AMO tests */ (params->api == UCX_PERF_API_UCP) && (params->command != UCX_PERF_CMD_AM) && (params->command != UCX_PERF_CMD_TAG) && (params->command != UCX_PERF_CMD_TAG_SYNC) && (params->command != UCX_PERF_CMD_STREAM))) && ucx_perf_get_message_size(params) < 1) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Message size too small, need to be at least 1"); } return UCS_ERR_INVALID_PARAM; } if ((params->api == UCX_PERF_API_UCP) && ((params->send_mem_type != UCS_MEMORY_TYPE_HOST) || (params->recv_mem_type != UCS_MEMORY_TYPE_HOST)) && ((params->command == UCX_PERF_CMD_PUT) || (params->command == UCX_PERF_CMD_GET) || (params->command == UCX_PERF_CMD_ADD) || (params->command == UCX_PERF_CMD_FADD) || (params->command == UCX_PERF_CMD_SWAP) || (params->command == UCX_PERF_CMD_CSWAP))) { /* TODO: remove when support for non-HOST memory types will be added */ if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("UCP doesn't support RMA/AMO for \"%s\"<->\"%s\" memory types", ucs_memory_type_names[params->send_mem_type], ucs_memory_type_names[params->recv_mem_type]); } return UCS_ERR_INVALID_PARAM; } if (params->max_outstanding < 1) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("max_outstanding, need to be at least 1"); } return UCS_ERR_INVALID_PARAM; } /* check if particular message size fit into stride size */ if (params->iov_stride) { for (it = 0; it < params->msg_size_cnt; ++it) { if (params->msg_size_list[it] > params->iov_stride) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Buffer size %lu bigger than stride %lu", params->msg_size_list[it], params->iov_stride); } return UCS_ERR_INVALID_PARAM; } } } return UCS_OK; } void uct_perf_iface_flush_b(ucx_perf_context_t *perf) { ucs_status_t status; do { status = uct_iface_flush(perf->uct.iface, 0, NULL); uct_worker_progress(perf->uct.worker); } while (status == UCS_INPROGRESS); } static inline uint64_t __get_flag(uct_perf_data_layout_t layout, uint64_t short_f, uint64_t bcopy_f, uint64_t zcopy_f) { return (layout == UCT_PERF_DATA_LAYOUT_SHORT) ? short_f : (layout == UCT_PERF_DATA_LAYOUT_BCOPY) ? bcopy_f : (layout == UCT_PERF_DATA_LAYOUT_ZCOPY) ? zcopy_f : 0; } static inline ucs_status_t __get_atomic_flag(size_t size, uint64_t *op32, uint64_t *op64, uint64_t op) { if (size == sizeof(uint32_t)) { *op32 = UCS_BIT(op); return UCS_OK; } else if (size == sizeof(uint64_t)) { *op64 = UCS_BIT(op); return UCS_OK; } return UCS_ERR_UNSUPPORTED; } static inline size_t __get_max_size(uct_perf_data_layout_t layout, size_t short_m, size_t bcopy_m, uint64_t zcopy_m) { return (layout == UCT_PERF_DATA_LAYOUT_SHORT) ? short_m : (layout == UCT_PERF_DATA_LAYOUT_BCOPY) ? bcopy_m : (layout == UCT_PERF_DATA_LAYOUT_ZCOPY) ? zcopy_m : 0; } static ucs_status_t uct_perf_test_check_md_support(ucx_perf_params_t *params, ucs_memory_type_t mem_type, uct_md_attr_t *md_attr) { if (!(md_attr->cap.access_mem_type == mem_type) && !(md_attr->cap.reg_mem_types & UCS_BIT(mem_type))) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Unsupported memory type %s by %s/%s", ucs_memory_type_names[mem_type], params->uct.tl_name, params->uct.dev_name); return UCS_ERR_INVALID_PARAM; } } return UCS_OK; } static ucs_status_t uct_perf_test_check_capabilities(ucx_perf_params_t *params, uct_iface_h iface, uct_md_h md) { uint64_t required_flags = 0; uint64_t atomic_op32 = 0; uint64_t atomic_op64 = 0; uint64_t atomic_fop32 = 0; uint64_t atomic_fop64 = 0; uct_md_attr_t md_attr; uct_iface_attr_t attr; ucs_status_t status; size_t min_size, max_size, max_iov, message_size; status = uct_md_query(md, &md_attr); if (status != UCS_OK) { ucs_error("uct_md_query(%s) failed: %s", params->uct.md_name, ucs_status_string(status)); return status; } status = uct_iface_query(iface, &attr); if (status != UCS_OK) { ucs_error("uct_iface_query(%s/%s) failed: %s", params->uct.tl_name, params->uct.dev_name, ucs_status_string(status)); return status; } min_size = 0; max_iov = 1; message_size = ucx_perf_get_message_size(params); switch (params->command) { case UCX_PERF_CMD_AM: required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_AM_SHORT, UCT_IFACE_FLAG_AM_BCOPY, UCT_IFACE_FLAG_AM_ZCOPY); required_flags |= UCT_IFACE_FLAG_CB_SYNC; min_size = __get_max_size(params->uct.data_layout, 0, 0, attr.cap.am.min_zcopy); max_size = __get_max_size(params->uct.data_layout, attr.cap.am.max_short, attr.cap.am.max_bcopy, attr.cap.am.max_zcopy); max_iov = attr.cap.am.max_iov; break; case UCX_PERF_CMD_PUT: required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_PUT_SHORT, UCT_IFACE_FLAG_PUT_BCOPY, UCT_IFACE_FLAG_PUT_ZCOPY); min_size = __get_max_size(params->uct.data_layout, 0, 0, attr.cap.put.min_zcopy); max_size = __get_max_size(params->uct.data_layout, attr.cap.put.max_short, attr.cap.put.max_bcopy, attr.cap.put.max_zcopy); max_iov = attr.cap.put.max_iov; break; case UCX_PERF_CMD_GET: required_flags = __get_flag(params->uct.data_layout, UCT_IFACE_FLAG_GET_SHORT, UCT_IFACE_FLAG_GET_BCOPY, UCT_IFACE_FLAG_GET_ZCOPY); min_size = __get_max_size(params->uct.data_layout, 0, 0, attr.cap.get.min_zcopy); max_size = __get_max_size(params->uct.data_layout, attr.cap.get.max_short, attr.cap.get.max_bcopy, attr.cap.get.max_zcopy); max_iov = attr.cap.get.max_iov; break; case UCX_PERF_CMD_ADD: ATOMIC_OP_CONFIG(message_size, &atomic_op32, &atomic_op64, UCT_ATOMIC_OP_ADD, perf_atomic_op, params, status); max_size = 8; break; case UCX_PERF_CMD_FADD: ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_ADD, perf_atomic_fop, params, status); max_size = 8; break; case UCX_PERF_CMD_SWAP: ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_SWAP, perf_atomic_fop, params, status); max_size = 8; break; case UCX_PERF_CMD_CSWAP: ATOMIC_OP_CONFIG(message_size, &atomic_fop32, &atomic_fop64, UCT_ATOMIC_OP_CSWAP, perf_atomic_fop, params, status); max_size = 8; break; default: if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Invalid test command"); } return UCS_ERR_INVALID_PARAM; } status = ucx_perf_test_check_params(params); if (status != UCS_OK) { return status; } /* check atomics first */ ATOMIC_OP_CHECK(32, attr.cap.atomic32.op_flags, atomic_op32, params, perf_atomic_op); ATOMIC_OP_CHECK(64, attr.cap.atomic64.op_flags, atomic_op64, params, perf_atomic_op); ATOMIC_OP_CHECK(32, attr.cap.atomic32.fop_flags, atomic_fop32, params, perf_atomic_fop); ATOMIC_OP_CHECK(64, attr.cap.atomic64.fop_flags, atomic_fop64, params, perf_atomic_fop); /* check iface flags */ if (!(atomic_op32 | atomic_op64 | atomic_fop32 | atomic_fop64) && (!ucs_test_all_flags(attr.cap.flags, required_flags) || !required_flags)) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("%s/%s does not support operation %s", params->uct.tl_name, params->uct.dev_name, perf_iface_ops[ucs_ffs64(~attr.cap.flags & required_flags)]); } return UCS_ERR_UNSUPPORTED; } if (message_size < min_size) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Message size (%zu) is smaller than min supported (%zu)", message_size, min_size); } return UCS_ERR_UNSUPPORTED; } if (message_size > max_size) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Message size (%zu) is larger than max supported (%zu)", message_size, max_size); } return UCS_ERR_UNSUPPORTED; } if (params->command == UCX_PERF_CMD_AM) { if ((params->uct.data_layout == UCT_PERF_DATA_LAYOUT_SHORT) && (params->am_hdr_size != sizeof(uint64_t))) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Short AM header size must be 8 bytes"); } return UCS_ERR_INVALID_PARAM; } if ((params->uct.data_layout == UCT_PERF_DATA_LAYOUT_ZCOPY) && (params->am_hdr_size > attr.cap.am.max_hdr)) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("AM header size (%zu) is larger than max supported (%zu)", params->am_hdr_size, attr.cap.am.max_hdr); } return UCS_ERR_UNSUPPORTED; } if (params->am_hdr_size > message_size) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("AM header size (%zu) is larger than message size (%zu)", params->am_hdr_size, message_size); } return UCS_ERR_INVALID_PARAM; } if (params->uct.fc_window > UCT_PERF_TEST_MAX_FC_WINDOW) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("AM flow-control window (%d) too large (should be <= %d)", params->uct.fc_window, UCT_PERF_TEST_MAX_FC_WINDOW); } return UCS_ERR_INVALID_PARAM; } if ((params->flags & UCX_PERF_TEST_FLAG_ONE_SIDED) && (params->flags & UCX_PERF_TEST_FLAG_VERBOSE)) { ucs_warn("Running active-message test with on-sided progress"); } } if (UCT_PERF_DATA_LAYOUT_ZCOPY == params->uct.data_layout) { if (params->msg_size_cnt > max_iov) { if ((params->flags & UCX_PERF_TEST_FLAG_VERBOSE) || !params->msg_size_cnt) { ucs_error("Wrong number of IOV entries. Requested is %lu, " "should be in the range 1...%lu", params->msg_size_cnt, max_iov); } return UCS_ERR_UNSUPPORTED; } /* if msg_size_cnt == 1 the message size checked above */ if ((UCX_PERF_CMD_AM == params->command) && (params->msg_size_cnt > 1)) { if (params->am_hdr_size > params->msg_size_list[0]) { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("AM header size (%lu) larger than the first IOV " "message size (%lu)", params->am_hdr_size, params->msg_size_list[0]); } return UCS_ERR_INVALID_PARAM; } } } status = uct_perf_test_check_md_support(params, params->send_mem_type, &md_attr); if (status != UCS_OK) { return status; } status = uct_perf_test_check_md_support(params, params->recv_mem_type, &md_attr); if (status != UCS_OK) { return status; } return UCS_OK; } static ucs_status_t uct_perf_test_setup_endpoints(ucx_perf_context_t *perf) { const size_t buffer_size = 2048; ucx_perf_ep_info_t info, *remote_info; unsigned group_size, i, group_index; uct_device_addr_t *dev_addr; uct_iface_addr_t *iface_addr; uct_ep_addr_t *ep_addr; uct_iface_attr_t iface_attr; uct_md_attr_t md_attr; uct_ep_params_t ep_params; void *rkey_buffer; ucs_status_t status; struct iovec vec[5]; void *buffer; void *req; buffer = malloc(buffer_size); if (buffer == NULL) { ucs_error("Failed to allocate RTE buffer"); status = UCS_ERR_NO_MEMORY; goto err; } status = uct_iface_query(perf->uct.iface, &iface_attr); if (status != UCS_OK) { ucs_error("Failed to uct_iface_query: %s", ucs_status_string(status)); goto err_free; } status = uct_md_query(perf->uct.md, &md_attr); if (status != UCS_OK) { ucs_error("Failed to uct_md_query: %s", ucs_status_string(status)); goto err_free; } if (md_attr.cap.flags & (UCT_MD_FLAG_ALLOC|UCT_MD_FLAG_REG)) { info.rkey_size = md_attr.rkey_packed_size; } else { info.rkey_size = 0; } info.uct.dev_addr_len = iface_attr.device_addr_len; info.uct.iface_addr_len = iface_attr.iface_addr_len; info.uct.ep_addr_len = iface_attr.ep_addr_len; info.recv_buffer = (uintptr_t)perf->recv_buffer; rkey_buffer = buffer; dev_addr = UCS_PTR_BYTE_OFFSET(rkey_buffer, info.rkey_size); iface_addr = UCS_PTR_BYTE_OFFSET(dev_addr, info.uct.dev_addr_len); ep_addr = UCS_PTR_BYTE_OFFSET(iface_addr, info.uct.iface_addr_len); ucs_assert_always(UCS_PTR_BYTE_OFFSET(ep_addr, info.uct.ep_addr_len) <= UCS_PTR_BYTE_OFFSET(buffer, buffer_size)); status = uct_iface_get_device_address(perf->uct.iface, dev_addr); if (status != UCS_OK) { ucs_error("Failed to uct_iface_get_device_address: %s", ucs_status_string(status)); goto err_free; } status = uct_iface_get_address(perf->uct.iface, iface_addr); if (status != UCS_OK) { ucs_error("Failed to uct_iface_get_address: %s", ucs_status_string(status)); goto err_free; } if (info.rkey_size > 0) { memset(rkey_buffer, 0, info.rkey_size); status = uct_md_mkey_pack(perf->uct.md, perf->uct.recv_mem.memh, rkey_buffer); if (status != UCS_OK) { ucs_error("Failed to uct_rkey_pack: %s", ucs_status_string(status)); goto err_free; } } group_size = rte_call(perf, group_size); group_index = rte_call(perf, group_index); perf->uct.peers = calloc(group_size, sizeof(*perf->uct.peers)); if (perf->uct.peers == NULL) { goto err_free; } ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE; ep_params.iface = perf->uct.iface; if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { for (i = 0; i < group_size; ++i) { if (i == group_index) { continue; } status = uct_ep_create(&ep_params, &perf->uct.peers[i].ep); if (status != UCS_OK) { ucs_error("Failed to uct_ep_create: %s", ucs_status_string(status)); goto err_destroy_eps; } status = uct_ep_get_address(perf->uct.peers[i].ep, ep_addr); if (status != UCS_OK) { ucs_error("Failed to uct_ep_get_address: %s", ucs_status_string(status)); goto err_destroy_eps; } } } else if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { ep_params.field_mask |= UCT_EP_PARAM_FIELD_DEV_ADDR | UCT_EP_PARAM_FIELD_IFACE_ADDR; } vec[0].iov_base = &info; vec[0].iov_len = sizeof(info); vec[1].iov_base = buffer; vec[1].iov_len = info.rkey_size + info.uct.dev_addr_len + info.uct.iface_addr_len + info.uct.ep_addr_len; rte_call(perf, post_vec, vec, 2, &req); rte_call(perf, exchange_vec, req); for (i = 0; i < group_size; ++i) { if (i == group_index) { continue; } rte_call(perf, recv, i, buffer, buffer_size, req); remote_info = buffer; rkey_buffer = remote_info + 1; dev_addr = UCS_PTR_BYTE_OFFSET(rkey_buffer, remote_info->rkey_size); iface_addr = UCS_PTR_BYTE_OFFSET(dev_addr, remote_info->uct.dev_addr_len); ep_addr = UCS_PTR_BYTE_OFFSET(iface_addr, remote_info->uct.iface_addr_len); perf->uct.peers[i].remote_addr = remote_info->recv_buffer; if (!uct_iface_is_reachable(perf->uct.iface, dev_addr, remote_info->uct.iface_addr_len ? iface_addr : NULL)) { ucs_error("Destination is unreachable"); status = UCS_ERR_UNREACHABLE; goto err_destroy_eps; } if (remote_info->rkey_size > 0) { status = uct_rkey_unpack(perf->uct.cmpt, rkey_buffer, &perf->uct.peers[i].rkey); if (status != UCS_OK) { ucs_error("Failed to uct_rkey_unpack: %s", ucs_status_string(status)); goto err_destroy_eps; } } else { perf->uct.peers[i].rkey.handle = NULL; perf->uct.peers[i].rkey.rkey = UCT_INVALID_RKEY; } if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { status = uct_ep_connect_to_ep(perf->uct.peers[i].ep, dev_addr, ep_addr); } else if (iface_attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) { ep_params.dev_addr = dev_addr; ep_params.iface_addr = iface_addr; status = uct_ep_create(&ep_params, &perf->uct.peers[i].ep); } else { status = UCS_ERR_UNSUPPORTED; } if (status != UCS_OK) { ucs_error("Failed to connect endpoint: %s", ucs_status_string(status)); goto err_destroy_eps; } } uct_perf_iface_flush_b(perf); free(buffer); uct_perf_barrier(perf); return UCS_OK; err_destroy_eps: for (i = 0; i < group_size; ++i) { if (perf->uct.peers[i].rkey.rkey != UCT_INVALID_RKEY) { uct_rkey_release(perf->uct.cmpt, &perf->uct.peers[i].rkey); } if (perf->uct.peers[i].ep != NULL) { uct_ep_destroy(perf->uct.peers[i].ep); } } free(perf->uct.peers); err_free: free(buffer); err: return status; } static void uct_perf_test_cleanup_endpoints(ucx_perf_context_t *perf) { unsigned group_size, group_index, i; uct_perf_barrier(perf); uct_iface_set_am_handler(perf->uct.iface, UCT_PERF_TEST_AM_ID, NULL, NULL, 0); group_size = rte_call(perf, group_size); group_index = rte_call(perf, group_index); for (i = 0; i < group_size; ++i) { if (i != group_index) { if (perf->uct.peers[i].rkey.rkey != UCT_INVALID_RKEY) { uct_rkey_release(perf->uct.cmpt, &perf->uct.peers[i].rkey); } if (perf->uct.peers[i].ep) { uct_ep_destroy(perf->uct.peers[i].ep); } } } free(perf->uct.peers); } static ucs_status_t ucp_perf_test_fill_params(ucx_perf_params_t *params, ucp_params_t *ucp_params) { ucs_status_t status; size_t message_size; message_size = ucx_perf_get_message_size(params); switch (params->command) { case UCX_PERF_CMD_PUT: case UCX_PERF_CMD_GET: ucp_params->features |= UCP_FEATURE_RMA; break; case UCX_PERF_CMD_ADD: case UCX_PERF_CMD_FADD: case UCX_PERF_CMD_SWAP: case UCX_PERF_CMD_CSWAP: if (message_size == sizeof(uint32_t)) { ucp_params->features |= UCP_FEATURE_AMO32; } else if (message_size == sizeof(uint64_t)) { ucp_params->features |= UCP_FEATURE_AMO64; } else { if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Atomic size should be either 32 or 64 bit"); } return UCS_ERR_INVALID_PARAM; } break; case UCX_PERF_CMD_TAG: case UCX_PERF_CMD_TAG_SYNC: ucp_params->features |= UCP_FEATURE_TAG; ucp_params->field_mask |= UCP_PARAM_FIELD_REQUEST_SIZE; ucp_params->request_size = sizeof(ucp_perf_request_t); break; case UCX_PERF_CMD_STREAM: ucp_params->features |= UCP_FEATURE_STREAM; ucp_params->field_mask |= UCP_PARAM_FIELD_REQUEST_SIZE; ucp_params->request_size = sizeof(ucp_perf_request_t); break; default: if (params->flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Invalid test command"); } return UCS_ERR_INVALID_PARAM; } status = ucx_perf_test_check_params(params); if (status != UCS_OK) { return status; } return UCS_OK; } static ucs_status_t ucp_perf_test_alloc_iov_mem(ucp_perf_datatype_t datatype, size_t iovcnt, unsigned thread_count, ucp_dt_iov_t **iov_p) { ucp_dt_iov_t *iov; if (UCP_PERF_DATATYPE_IOV == datatype) { iov = malloc(sizeof(*iov) * iovcnt * thread_count); if (NULL == iov) { ucs_error("Failed allocate IOV buffer with iovcnt=%lu", iovcnt); return UCS_ERR_NO_MEMORY; } *iov_p = iov; } return UCS_OK; } static ucs_status_t ucp_perf_test_alloc_host(const ucx_perf_context_t *perf, size_t length, void **address_p, ucp_mem_h *memh, int non_blk_flag) { ucp_mem_map_params_t mem_map_params; ucp_mem_attr_t mem_attr; ucs_status_t status; mem_map_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH | UCP_MEM_MAP_PARAM_FIELD_FLAGS; mem_map_params.address = *address_p; mem_map_params.length = length; mem_map_params.flags = UCP_MEM_MAP_ALLOCATE; if (perf->params.flags & UCX_PERF_TEST_FLAG_MAP_NONBLOCK) { mem_map_params.flags |= non_blk_flag; } status = ucp_mem_map(perf->ucp.context, &mem_map_params, memh); if (status != UCS_OK) { goto err; } mem_attr.field_mask = UCP_MEM_ATTR_FIELD_ADDRESS; status = ucp_mem_query(*memh, &mem_attr); if (status != UCS_OK) { goto err; } *address_p = mem_attr.address; return UCS_OK; err: return status; } static void ucp_perf_test_free_host(const ucx_perf_context_t *perf, void *address, ucp_mem_h memh) { ucs_status_t status; status = ucp_mem_unmap(perf->ucp.context, memh); if (status != UCS_OK) { ucs_warn("ucp_mem_unmap() failed: %s", ucs_status_string(status)); } } static ucs_status_t ucp_perf_test_alloc_mem(ucx_perf_context_t *perf) { ucx_perf_params_t *params = &perf->params; ucs_status_t status; size_t buffer_size; if (params->iov_stride) { buffer_size = params->msg_size_cnt * params->iov_stride; } else { buffer_size = ucx_perf_get_message_size(params); } /* Allocate send buffer memory */ perf->send_buffer = NULL; status = perf->allocator->ucp_alloc(perf, buffer_size * params->thread_count, &perf->send_buffer, &perf->ucp.send_memh, UCP_MEM_MAP_NONBLOCK); if (status != UCS_OK) { goto err; } /* Allocate receive buffer memory */ perf->recv_buffer = NULL; status = perf->allocator->ucp_alloc(perf, buffer_size * params->thread_count, &perf->recv_buffer, &perf->ucp.recv_memh, 0); if (status != UCS_OK) { goto err_free_send_buffer; } /* Allocate IOV datatype memory */ perf->ucp.send_iov = NULL; status = ucp_perf_test_alloc_iov_mem(params->ucp.send_datatype, perf->params.msg_size_cnt, params->thread_count, &perf->ucp.send_iov); if (UCS_OK != status) { goto err_free_buffers; } perf->ucp.recv_iov = NULL; status = ucp_perf_test_alloc_iov_mem(params->ucp.recv_datatype, perf->params.msg_size_cnt, params->thread_count, &perf->ucp.recv_iov); if (UCS_OK != status) { goto err_free_send_iov_buffers; } return UCS_OK; err_free_send_iov_buffers: free(perf->ucp.send_iov); err_free_buffers: perf->allocator->ucp_free(perf, perf->recv_buffer, perf->ucp.recv_memh); err_free_send_buffer: perf->allocator->ucp_free(perf, perf->send_buffer, perf->ucp.send_memh); err: return UCS_ERR_NO_MEMORY; } static void ucp_perf_test_free_mem(ucx_perf_context_t *perf) { free(perf->ucp.recv_iov); free(perf->ucp.send_iov); perf->allocator->ucp_free(perf, perf->recv_buffer, perf->ucp.recv_memh); perf->allocator->ucp_free(perf, perf->send_buffer, perf->ucp.send_memh); } static void ucp_perf_test_destroy_eps(ucx_perf_context_t* perf, unsigned group_size) { ucs_status_ptr_t *reqs; ucp_tag_recv_info_t info; ucs_status_t status; unsigned i; reqs = calloc(sizeof(*reqs), group_size); for (i = 0; i < group_size; ++i) { if (perf->ucp.peers[i].rkey != NULL) { ucp_rkey_destroy(perf->ucp.peers[i].rkey); } if (perf->ucp.peers[i].ep != NULL) { reqs[i] = ucp_disconnect_nb(perf->ucp.peers[i].ep); } } for (i = 0; i < group_size; ++i) { if (!UCS_PTR_IS_PTR(reqs[i])) { continue; } do { ucp_worker_progress(perf->ucp.worker); status = ucp_request_test(reqs[i], &info); } while (status == UCS_INPROGRESS); ucp_request_release(reqs[i]); } free(reqs); free(perf->ucp.peers); } static ucs_status_t ucp_perf_test_exchange_status(ucx_perf_context_t *perf, ucs_status_t status) { unsigned group_size = rte_call(perf, group_size); ucs_status_t collective_status = status; struct iovec vec; void *req = NULL; unsigned i; vec.iov_base = &status; vec.iov_len = sizeof(status); rte_call(perf, post_vec, &vec, 1, &req); rte_call(perf, exchange_vec, req); for (i = 0; i < group_size; ++i) { rte_call(perf, recv, i, &status, sizeof(status), req); if (status != UCS_OK) { collective_status = status; } } return collective_status; } static ucs_status_t ucp_perf_test_setup_endpoints(ucx_perf_context_t *perf, uint64_t features) { const size_t buffer_size = 2048; ucx_perf_ep_info_t info, *remote_info; unsigned group_size, i, group_index; ucp_address_t *address; size_t address_length = 0; ucp_ep_params_t ep_params; ucs_status_t status; struct iovec vec[3]; void *rkey_buffer; void *req = NULL; void *buffer; group_size = rte_call(perf, group_size); group_index = rte_call(perf, group_index); status = ucp_worker_get_address(perf->ucp.worker, &address, &address_length); if (status != UCS_OK) { if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("ucp_worker_get_address() failed: %s", ucs_status_string(status)); } goto err; } info.ucp.addr_len = address_length; info.recv_buffer = (uintptr_t)perf->recv_buffer; vec[0].iov_base = &info; vec[0].iov_len = sizeof(info); vec[1].iov_base = address; vec[1].iov_len = address_length; if (features & (UCP_FEATURE_RMA|UCP_FEATURE_AMO32|UCP_FEATURE_AMO64)) { status = ucp_rkey_pack(perf->ucp.context, perf->ucp.recv_memh, &rkey_buffer, &info.rkey_size); if (status != UCS_OK) { if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("ucp_rkey_pack() failed: %s", ucs_status_string(status)); } ucp_worker_release_address(perf->ucp.worker, address); goto err; } vec[2].iov_base = rkey_buffer; vec[2].iov_len = info.rkey_size; rte_call(perf, post_vec, vec, 3, &req); ucp_rkey_buffer_release(rkey_buffer); } else { info.rkey_size = 0; rte_call(perf, post_vec, vec, 2, &req); } ucp_worker_release_address(perf->ucp.worker, address); rte_call(perf, exchange_vec, req); perf->ucp.peers = calloc(group_size, sizeof(*perf->ucp.peers)); if (perf->ucp.peers == NULL) { goto err; } buffer = malloc(buffer_size); if (buffer == NULL) { ucs_error("Failed to allocate RTE receive buffer"); status = UCS_ERR_NO_MEMORY; goto err_destroy_eps; } for (i = 0; i < group_size; ++i) { if (i == group_index) { continue; } rte_call(perf, recv, i, buffer, buffer_size, req); remote_info = buffer; address = (ucp_address_t*)(remote_info + 1); rkey_buffer = UCS_PTR_BYTE_OFFSET(address, remote_info->ucp.addr_len); perf->ucp.peers[i].remote_addr = remote_info->recv_buffer; ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS; ep_params.address = address; status = ucp_ep_create(perf->ucp.worker, &ep_params, &perf->ucp.peers[i].ep); if (status != UCS_OK) { if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("ucp_ep_create() failed: %s", ucs_status_string(status)); } goto err_free_buffer; } if (remote_info->rkey_size > 0) { status = ucp_ep_rkey_unpack(perf->ucp.peers[i].ep, rkey_buffer, &perf->ucp.peers[i].rkey); if (status != UCS_OK) { if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_fatal("ucp_rkey_unpack() failed: %s", ucs_status_string(status)); } goto err_free_buffer; } } else { perf->ucp.peers[i].rkey = NULL; } } free(buffer); status = ucp_perf_test_exchange_status(perf, UCS_OK); if (status != UCS_OK) { ucp_perf_test_destroy_eps(perf, group_size); } /* force wireup completion */ status = ucp_worker_flush(perf->ucp.worker); if (status != UCS_OK) { ucs_warn("ucp_worker_flush() failed: %s", ucs_status_string(status)); } return status; err_free_buffer: free(buffer); err_destroy_eps: ucp_perf_test_destroy_eps(perf, group_size); err: (void)ucp_perf_test_exchange_status(perf, status); return status; } static void ucp_perf_test_cleanup_endpoints(ucx_perf_context_t *perf) { unsigned group_size; ucp_perf_barrier(perf); group_size = rte_call(perf, group_size); ucp_perf_test_destroy_eps(perf, group_size); } static void ucx_perf_set_warmup(ucx_perf_context_t* perf, ucx_perf_params_t* params) { perf->max_iter = ucs_min(params->warmup_iter, ucs_div_round_up(params->max_iter, 10)); perf->report_interval = ULONG_MAX; } static ucs_status_t uct_perf_create_md(ucx_perf_context_t *perf) { uct_component_h *uct_components; uct_component_attr_t component_attr; uct_tl_resource_desc_t *tl_resources; unsigned md_index, num_components; unsigned tl_index, num_tl_resources; unsigned cmpt_index; ucs_status_t status; uct_md_h md; uct_md_config_t *md_config; status = uct_query_components(&uct_components, &num_components); if (status != UCS_OK) { goto out; } for (cmpt_index = 0; cmpt_index < num_components; ++cmpt_index) { component_attr.field_mask = UCT_COMPONENT_ATTR_FIELD_MD_RESOURCE_COUNT; status = uct_component_query(uct_components[cmpt_index], &component_attr); if (status != UCS_OK) { goto out_release_components_list; } component_attr.field_mask = UCT_COMPONENT_ATTR_FIELD_MD_RESOURCES; component_attr.md_resources = alloca(sizeof(*component_attr.md_resources) * component_attr.md_resource_count); status = uct_component_query(uct_components[cmpt_index], &component_attr); if (status != UCS_OK) { goto out_release_components_list; } for (md_index = 0; md_index < component_attr.md_resource_count; ++md_index) { status = uct_md_config_read(uct_components[cmpt_index], NULL, NULL, &md_config); if (status != UCS_OK) { goto out_release_components_list; } ucs_strncpy_zero(perf->params.uct.md_name, component_attr.md_resources[md_index].md_name, UCT_MD_NAME_MAX); status = uct_md_open(uct_components[cmpt_index], component_attr.md_resources[md_index].md_name, md_config, &md); uct_config_release(md_config); if (status != UCS_OK) { goto out_release_components_list; } status = uct_md_query_tl_resources(md, &tl_resources, &num_tl_resources); if (status != UCS_OK) { uct_md_close(md); goto out_release_components_list; } for (tl_index = 0; tl_index < num_tl_resources; ++tl_index) { if (!strcmp(perf->params.uct.tl_name, tl_resources[tl_index].tl_name) && !strcmp(perf->params.uct.dev_name, tl_resources[tl_index].dev_name)) { uct_release_tl_resource_list(tl_resources); perf->uct.cmpt = uct_components[cmpt_index]; perf->uct.md = md; status = UCS_OK; goto out_release_components_list; } } uct_md_close(md); uct_release_tl_resource_list(tl_resources); } } ucs_error("Cannot use transport %s on device %s", perf->params.uct.tl_name, perf->params.uct.dev_name); status = UCS_ERR_NO_DEVICE; out_release_components_list: uct_release_component_list(uct_components); out: return status; } void uct_perf_barrier(ucx_perf_context_t *perf) { rte_call(perf, barrier, (void(*)(void*))uct_worker_progress, (void*)perf->uct.worker); } void ucp_perf_barrier(ucx_perf_context_t *perf) { rte_call(perf, barrier, (void(*)(void*))ucp_worker_progress, (void*)perf->ucp.worker); } static ucs_status_t uct_perf_setup(ucx_perf_context_t *perf) { ucx_perf_params_t *params = &perf->params; uct_iface_config_t *iface_config; ucs_status_t status; uct_iface_params_t iface_params = { .field_mask = UCT_IFACE_PARAM_FIELD_OPEN_MODE | UCT_IFACE_PARAM_FIELD_STATS_ROOT | UCT_IFACE_PARAM_FIELD_RX_HEADROOM | UCT_IFACE_PARAM_FIELD_CPU_MASK, .open_mode = UCT_IFACE_OPEN_MODE_DEVICE, .mode.device.tl_name = params->uct.tl_name, .mode.device.dev_name = params->uct.dev_name, .stats_root = ucs_stats_get_root(), .rx_headroom = 0 }; UCS_CPU_ZERO(&iface_params.cpu_mask); status = ucs_async_context_init(&perf->uct.async, params->async_mode); if (status != UCS_OK) { goto out; } status = uct_worker_create(&perf->uct.async, params->thread_mode, &perf->uct.worker); if (status != UCS_OK) { goto out_cleanup_async; } status = uct_perf_create_md(perf); if (status != UCS_OK) { goto out_destroy_worker; } status = uct_md_iface_config_read(perf->uct.md, params->uct.tl_name, NULL, NULL, &iface_config); if (status != UCS_OK) { goto out_destroy_md; } status = uct_iface_open(perf->uct.md, perf->uct.worker, &iface_params, iface_config, &perf->uct.iface); uct_config_release(iface_config); if (status != UCS_OK) { ucs_error("Failed to open iface: %s", ucs_status_string(status)); goto out_destroy_md; } status = uct_perf_test_check_capabilities(params, perf->uct.iface, perf->uct.md); /* sync status across all processes */ status = ucp_perf_test_exchange_status(perf, status); if (status != UCS_OK) { goto out_iface_close; } status = uct_perf_test_alloc_mem(perf); if (status != UCS_OK) { goto out_iface_close; } /* Enable progress before `uct_iface_flush` and `uct_worker_progress` called * to give a chance to finish connection for some tranports (ib/ud, tcp). * They may return UCS_INPROGRESS from `uct_iface_flush` when connections are * in progress */ uct_iface_progress_enable(perf->uct.iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); status = uct_perf_test_setup_endpoints(perf); if (status != UCS_OK) { ucs_error("Failed to setup endpoints: %s", ucs_status_string(status)); goto out_free_mem; } return UCS_OK; out_free_mem: uct_perf_test_free_mem(perf); out_iface_close: uct_iface_close(perf->uct.iface); out_destroy_md: uct_md_close(perf->uct.md); out_destroy_worker: uct_worker_destroy(perf->uct.worker); out_cleanup_async: ucs_async_context_cleanup(&perf->uct.async); out: return status; } static void uct_perf_cleanup(ucx_perf_context_t *perf) { uct_perf_test_cleanup_endpoints(perf); uct_perf_test_free_mem(perf); uct_iface_close(perf->uct.iface); uct_md_close(perf->uct.md); uct_worker_destroy(perf->uct.worker); ucs_async_context_cleanup(&perf->uct.async); } static ucs_status_t ucp_perf_setup(ucx_perf_context_t *perf) { ucp_params_t ucp_params; ucp_worker_params_t worker_params; ucp_config_t *config; ucs_status_t status; ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES; ucp_params.features = 0; status = ucp_perf_test_fill_params(&perf->params, &ucp_params); if (status != UCS_OK) { goto err; } status = ucp_config_read(NULL, NULL, &config); if (status != UCS_OK) { goto err; } status = ucp_init(&ucp_params, config, &perf->ucp.context); ucp_config_release(config); if (status != UCS_OK) { goto err; } worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE; worker_params.thread_mode = perf->params.thread_mode; status = ucp_worker_create(perf->ucp.context, &worker_params, &perf->ucp.worker); if (status != UCS_OK) { goto err_cleanup; } status = ucp_perf_test_alloc_mem(perf); if (status != UCS_OK) { ucs_warn("ucp test failed to alocate memory"); goto err_destroy_worker; } status = ucp_perf_test_setup_endpoints(perf, ucp_params.features); if (status != UCS_OK) { if (perf->params.flags & UCX_PERF_TEST_FLAG_VERBOSE) { ucs_error("Failed to setup endpoints: %s", ucs_status_string(status)); } goto err_free_mem; } return UCS_OK; err_free_mem: ucp_perf_test_free_mem(perf); err_destroy_worker: ucp_worker_destroy(perf->ucp.worker); err_cleanup: ucp_cleanup(perf->ucp.context); err: return status; } static void ucp_perf_cleanup(ucx_perf_context_t *perf) { ucp_perf_test_cleanup_endpoints(perf); ucp_perf_barrier(perf); ucp_perf_test_free_mem(perf); ucp_worker_destroy(perf->ucp.worker); ucp_cleanup(perf->ucp.context); } static struct { ucs_status_t (*setup)(ucx_perf_context_t *perf); void (*cleanup)(ucx_perf_context_t *perf); ucs_status_t (*run)(ucx_perf_context_t *perf); void (*barrier)(ucx_perf_context_t *perf); } ucx_perf_funcs[] = { [UCX_PERF_API_UCT] = {uct_perf_setup, uct_perf_cleanup, uct_perf_test_dispatch, uct_perf_barrier}, [UCX_PERF_API_UCP] = {ucp_perf_setup, ucp_perf_cleanup, ucp_perf_test_dispatch, ucp_perf_barrier} }; static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf, ucx_perf_result_t* result); ucs_status_t ucx_perf_run(ucx_perf_params_t *params, ucx_perf_result_t *result) { ucx_perf_context_t *perf; ucs_status_t status; ucx_perf_global_init(); if (params->command == UCX_PERF_CMD_LAST) { ucs_error("Test is not selected"); status = UCS_ERR_INVALID_PARAM; goto out; } if ((params->api != UCX_PERF_API_UCT) && (params->api != UCX_PERF_API_UCP)) { ucs_error("Invalid test API parameter (should be UCT or UCP)"); status = UCS_ERR_INVALID_PARAM; goto out; } perf = malloc(sizeof(*perf)); if (perf == NULL) { status = UCS_ERR_NO_MEMORY; goto out; } ucx_perf_test_init(perf, params); if (perf->allocator == NULL) { ucs_error("Unsupported memory types %s<->%s", ucs_memory_type_names[params->send_mem_type], ucs_memory_type_names[params->recv_mem_type]); status = UCS_ERR_UNSUPPORTED; goto out_free; } if ((params->api == UCX_PERF_API_UCT) && (perf->allocator->mem_type != UCS_MEMORY_TYPE_HOST)) { ucs_warn("UCT tests also copy 2-byte values from %s memory to " "%s memory, which may impact performance results", ucs_memory_type_names[perf->allocator->mem_type], ucs_memory_type_names[UCS_MEMORY_TYPE_HOST]); } status = perf->allocator->init(perf); if (status != UCS_OK) { goto out_free; } status = ucx_perf_funcs[params->api].setup(perf); if (status != UCS_OK) { goto out_free; } if (UCS_THREAD_MODE_SINGLE == params->thread_mode) { if (params->warmup_iter > 0) { ucx_perf_set_warmup(perf, params); status = ucx_perf_funcs[params->api].run(perf); if (status != UCS_OK) { goto out_cleanup; } ucx_perf_funcs[params->api].barrier(perf); ucx_perf_test_prepare_new_run(perf, params); } /* Run test */ status = ucx_perf_funcs[params->api].run(perf); ucx_perf_funcs[params->api].barrier(perf); if (status == UCS_OK) { ucx_perf_calc_result(perf, result); rte_call(perf, report, result, perf->params.report_arg, 1); } } else { status = ucx_perf_thread_spawn(perf, result); } out_cleanup: ucx_perf_funcs[params->api].cleanup(perf); out_free: free(perf); out: return status; } #if _OPENMP /* multiple threads sharing the same worker/iface */ typedef struct { pthread_t pt; int tid; int ntid; ucs_status_t* statuses; ucx_perf_context_t perf; ucx_perf_result_t result; } ucx_perf_thread_context_t; static void* ucx_perf_thread_run_test(void* arg) { ucx_perf_thread_context_t* tctx = (ucx_perf_thread_context_t*) arg; ucx_perf_result_t* result = &tctx->result; ucx_perf_context_t* perf = &tctx->perf; ucx_perf_params_t* params = &perf->params; ucs_status_t* statuses = tctx->statuses; int tid = tctx->tid; int i; if (params->warmup_iter > 0) { ucx_perf_set_warmup(perf, params); statuses[tid] = ucx_perf_funcs[params->api].run(perf); ucx_perf_funcs[params->api].barrier(perf); for (i = 0; i < tctx->ntid; i++) { if (UCS_OK != statuses[i]) { goto out; } } ucx_perf_test_prepare_new_run(perf, params); } /* Run test */ #pragma omp barrier statuses[tid] = ucx_perf_funcs[params->api].run(perf); ucx_perf_funcs[params->api].barrier(perf); for (i = 0; i < tctx->ntid; i++) { if (UCS_OK != statuses[i]) { goto out; } } #pragma omp master { /* Assuming all threads are fairly treated, reporting only tid==0 TODO: aggregate reports */ ucx_perf_calc_result(perf, result); rte_call(perf, report, result, perf->params.report_arg, 1); } out: return &statuses[tid]; } static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf, ucx_perf_result_t* result) { ucx_perf_thread_context_t* tctx; ucs_status_t* statuses; size_t message_size; ucs_status_t status; int ti, nti; message_size = ucx_perf_get_message_size(&perf->params); omp_set_num_threads(perf->params.thread_count); nti = perf->params.thread_count; tctx = calloc(nti, sizeof(ucx_perf_thread_context_t)); statuses = calloc(nti, sizeof(ucs_status_t)); if ((tctx == NULL) || (statuses == NULL)) { status = UCS_ERR_NO_MEMORY; goto out_free; } #pragma omp parallel private(ti) { ti = omp_get_thread_num(); tctx[ti].tid = ti; tctx[ti].ntid = nti; tctx[ti].statuses = statuses; tctx[ti].perf = *perf; /* Doctor the src and dst buffers to make them thread specific */ tctx[ti].perf.send_buffer = UCS_PTR_BYTE_OFFSET(tctx[ti].perf.send_buffer, ti * message_size); tctx[ti].perf.recv_buffer = UCS_PTR_BYTE_OFFSET(tctx[ti].perf.recv_buffer, ti * message_size); tctx[ti].perf.offset = ti * message_size; ucx_perf_thread_run_test((void*)&tctx[ti]); } status = UCS_OK; for (ti = 0; ti < nti; ti++) { if (UCS_OK != statuses[ti]) { ucs_error("Thread %d failed to run test: %s", tctx[ti].tid, ucs_status_string(statuses[ti])); status = statuses[ti]; } } out_free: free(statuses); free(tctx); return status; } #else static ucs_status_t ucx_perf_thread_spawn(ucx_perf_context_t *perf, ucx_perf_result_t* result) { ucs_error("Invalid test parameter (thread mode requested without OpenMP capabilities)"); return UCS_ERR_INVALID_PARAM; } #endif /* _OPENMP */ void ucx_perf_global_init() { static ucx_perf_allocator_t host_allocator = { .mem_type = UCS_MEMORY_TYPE_HOST, .init = ucs_empty_function_return_success, .ucp_alloc = ucp_perf_test_alloc_host, .ucp_free = ucp_perf_test_free_host, .uct_alloc = uct_perf_test_alloc_host, .uct_free = uct_perf_test_free_host, .memcpy = ucx_perf_test_memcpy_host, .memset = memset }; UCS_MODULE_FRAMEWORK_DECLARE(ucx_perftest); ucx_perf_mem_type_allocators[UCS_MEMORY_TYPE_HOST] = &host_allocator; /* FIXME Memtype allocator modules must be loaded to global scope, otherwise * alloc hooks, which are using dlsym() to get pointer to original function, * do not work. Need to use bistro for memtype hooks to fix it. */ UCS_MODULE_FRAMEWORK_LOAD(ucx_perftest, UCS_MODULE_LOAD_FLAG_GLOBAL); }