/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * Copyright (C) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED. * Copyright (C) The University of Tennessee and The University * of Tennessee Research Foundation. 2015. ALL RIGHTS RESERVED. * Copyright (C) ARM Ltd. 2016. ALL RIGHTS RESERVED. * See file LICENSE for terms. */ #include "test_perf.h" extern "C" { #include #include } #include #include #include test_perf::rte_comm::rte_comm() { pthread_mutex_init(&m_mutex, NULL); } void test_perf::rte_comm::push(const void *data, size_t size) { pthread_mutex_lock(&m_mutex); m_queue.append((const char *)data, size); pthread_mutex_unlock(&m_mutex); } void test_perf::rte_comm::pop(void *data, size_t size, void (*progress)(void *arg), void *arg) { bool done = false; do { pthread_mutex_lock(&m_mutex); if (m_queue.length() >= size) { memcpy(data, &m_queue[0], size); m_queue.erase(0, size); done = true; } pthread_mutex_unlock(&m_mutex); if (!done) { progress(arg); } } while (!done); } test_perf::rte::rte(unsigned index, rte_comm& send, rte_comm& recv) : m_index(index), m_send(send), m_recv(recv) { } unsigned test_perf::rte::index() const { return m_index; } unsigned test_perf::rte::group_size(void *rte_group) { return 2; } unsigned test_perf::rte::group_index(void *rte_group) { rte *self = reinterpret_cast(rte_group); return self->index(); } void test_perf::rte::barrier(void *rte_group, void (*progress)(void *arg), void *arg) { static const uint32_t magic = 0xdeadbeed; rte *self = reinterpret_cast(rte_group); uint32_t dummy = magic; self->m_send.push(&dummy, sizeof(dummy)); dummy = 0; self->m_recv.pop(&dummy, sizeof(dummy), progress, arg); ucs_assert_always(dummy == magic); } void test_perf::rte::post_vec(void *rte_group, const struct iovec *iovec, int iovcnt, void **req) { rte *self = reinterpret_cast(rte_group); size_t size; int i; size = 0; for (i = 0; i < iovcnt; ++i) { size += iovec[i].iov_len; } self->m_send.push(&size, sizeof(size)); for (i = 0; i < iovcnt; ++i) { self->m_send.push(iovec[i].iov_base, iovec[i].iov_len); } } void test_perf::rte::recv(void *rte_group, unsigned src, void *buffer, size_t max, void *req) { rte *self = reinterpret_cast(rte_group); size_t size; if (src != 1 - self->m_index) { return; } self->m_recv.pop(&size, sizeof(size), (void(*)(void*))ucs_empty_function, NULL); ucs_assert_always(size <= max); self->m_recv.pop(buffer, size, (void(*)(void*))ucs_empty_function, NULL); } void test_perf::rte::exchange_vec(void *rte_group, void * req) { } void test_perf::rte::report(void *rte_group, const ucx_perf_result_t *result, void *arg, int is_final) { } ucx_perf_rte_t test_perf::rte::test_rte = { rte::group_size, rte::group_index, rte::barrier, rte::post_vec, rte::recv, rte::exchange_vec, rte::report, }; std::vector test_perf::get_affinity() { std::vector cpus; cpu_set_t affinity; int ret, nr_cpus; ret = sched_getaffinity(getpid(), sizeof(affinity), &affinity); if (ret != 0) { ucs_error("Failed to get CPU affinity: %m"); throw ucs::test_abort_exception(); } nr_cpus = sysconf(_SC_NPROCESSORS_CONF); if (nr_cpus < 0) { ucs_error("Failed to get CPU count: %m"); throw ucs::test_abort_exception(); } for (int cpu = 0; cpu < nr_cpus; ++cpu) { if (CPU_ISSET(cpu, &affinity)) { cpus.push_back(cpu); } } return cpus; } void test_perf::set_affinity(int cpu) { cpu_set_t affinity; CPU_ZERO(&affinity); CPU_SET(cpu , &affinity); sched_setaffinity(ucs_get_tid(), sizeof(affinity), &affinity); } void* test_perf::thread_func(void *arg) { thread_arg *a = (thread_arg*)arg; test_result *result; set_affinity(a->cpu); result = new test_result(); result->status = ucx_perf_run(&a->params, &result->result); return result; } test_perf::test_result test_perf::run_multi_threaded(const test_spec &test, unsigned flags, const std::string &tl_name, const std::string &dev_name, const std::vector &cpus) { rte_comm c0to1, c1to0; ucx_perf_params_t params; memset(¶ms, 0, sizeof(params)); params.api = test.api; params.command = test.command; params.test_type = test.test_type; params.thread_mode = UCS_THREAD_MODE_SINGLE; params.async_mode = UCS_ASYNC_THREAD_LOCK_TYPE; params.thread_count = 1; params.wait_mode = UCX_PERF_WAIT_MODE_LAST; params.flags = test.test_flags | flags; params.am_hdr_size = 8; params.alignment = ucs_get_page_size(); params.max_outstanding = test.max_outstanding; if (ucs::test_time_multiplier() == 1) { params.warmup_iter = test.iters / 10; params.max_iter = test.iters; } else { params.warmup_iter = 0; params.max_iter = ucs_min(20u, test.iters / ucs::test_time_multiplier()); } params.max_time = 0.0; params.report_interval = 1.0; params.rte_group = NULL; params.rte = &rte::test_rte; params.report_arg = NULL; ucs_strncpy_zero(params.uct.dev_name, dev_name.c_str(), sizeof(params.uct.dev_name)); ucs_strncpy_zero(params.uct.tl_name , tl_name.c_str(), sizeof(params.uct.tl_name)); params.uct.data_layout = (uct_perf_data_layout_t)test.data_layout; params.uct.fc_window = UCT_PERF_TEST_MAX_FC_WINDOW; params.msg_size_cnt = test.msglencnt; params.msg_size_list = (size_t *)test.msglen; params.iov_stride = test.msg_stride; params.ucp.send_datatype = (ucp_perf_datatype_t)test.data_layout; params.ucp.recv_datatype = (ucp_perf_datatype_t)test.data_layout; thread_arg arg0; arg0.params = params; arg0.cpu = cpus[0]; rte rte0(0, c0to1, c1to0); arg0.params.rte_group = &rte0; pthread_t thread0, thread1; int ret = pthread_create(&thread0, NULL, thread_func, &arg0); if (ret) { UCS_TEST_MESSAGE << strerror(errno); throw ucs::test_abort_exception(); } thread_arg arg1; arg1.params = params; arg1.cpu = cpus[1]; rte rte1(1, c1to0, c0to1); arg1.params.rte_group = &rte1; ret = pthread_create(&thread1, NULL, thread_func, &arg1); if (ret) { UCS_TEST_MESSAGE << strerror(errno); throw ucs::test_abort_exception(); } void *ptr0, *ptr1; pthread_join(thread0, &ptr0); pthread_join(thread1, &ptr1); test_result *result0 = reinterpret_cast(ptr0), *result1 = reinterpret_cast(ptr1); test_result result = *result1; delete result0; delete result1; return result; } void test_perf::run_test(const test_spec& test, unsigned flags, bool check_perf, const std::string &tl_name, const std::string &dev_name) { std::vector cpus = get_affinity(); if (cpus.size() < 2) { UCS_TEST_MESSAGE << "Need at least 2 CPUs (got: " << cpus.size() << " )"; throw ucs::test_abort_exception(); } cpus.resize(2); check_perf = check_perf && (ucs::test_time_multiplier() == 1) && (ucs::perf_retry_count > 0); for (int i = 0; i < (ucs::perf_retry_count + 1); ++i) { test_result result = run_multi_threaded(test, flags, tl_name, dev_name, cpus); if ((result.status == UCS_ERR_UNSUPPORTED) || (result.status == UCS_ERR_UNREACHABLE)) { return; /* Skipped */ } ASSERT_UCS_OK(result.status); double value = *(double*)( ((char*)&result.result) + test.field_offset) * test.norm; char result_str[200] = {0}; snprintf(result_str, sizeof(result_str) - 1, "%s %25s : %.3f %s", dev_name.c_str(), test.title, value, test.units); if (i == 0) { if (check_perf) { UCS_TEST_MESSAGE << result_str; } else { UCS_TEST_MESSAGE << result_str << " (performance not checked)"; } } else { UCS_TEST_MESSAGE << result_str << " (attempt " << i << ")"; } if (!check_perf) { return; /* Skip */ } else if ((value >= test.min) && (value <= test.max)) { return; /* Success */ } else { ucs::safe_sleep(ucs::perf_retry_interval); } } ADD_FAILURE() << "Invalid " << test.title << " performance, expected: " << std::setprecision(3) << test.min << ".." << test.max; }