/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
* Copyright (C) ARM Ltd. 2016. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "uct_p2p_test.h"
extern "C" {
#include <ucs/time/time.h>
}
int uct_p2p_test::log_data_count = 0;
ucs_log_level_t uct_p2p_test::orig_log_level;
std::string uct_p2p_test::p2p_resource::name() const {
std::stringstream ss;
ss << resource::name();
if (loopback) {
ss << "/loopback";
}
return ss.str();
}
std::vector<const resource*> uct_p2p_test::enum_resources(const std::string& tl_name)
{
static std::vector<p2p_resource> all_resources;
if (all_resources.empty()) {
std::vector<const resource*> r = uct_test::enum_resources("");
for (std::vector<const resource*>::iterator iter = r.begin(); iter != r.end(); ++iter) {
p2p_resource res(**iter);
if (UCT_DEVICE_TYPE_SELF != res.dev_type) {
res.loopback = false;
all_resources.push_back(res);
}
res.loopback = true;
all_resources.push_back(res);
}
}
return filter_resources(all_resources, tl_name);
}
uct_p2p_test::uct_p2p_test(size_t rx_headroom,
uct_error_handler_t err_handler) :
m_rx_headroom(rx_headroom),
m_err_handler(err_handler),
m_completion_count(0)
{
m_null_completion = false;
m_completion.self = this;
m_completion.uct.func = completion_cb;
m_completion.uct.count = 0;
}
void uct_p2p_test::init() {
uct_test::init();
const p2p_resource *r = dynamic_cast<const p2p_resource*>(GetParam());
ucs_assert_always(r != NULL);
/* Create 2 connected endpoints */
entity *e1 = uct_test::create_entity(m_rx_headroom, m_err_handler);
m_entities.push_back(e1);
check_skip_test();
if (r->loopback) {
e1->connect(0, *e1, 0);
} else {
entity *e2 = uct_test::create_entity(m_rx_headroom, m_err_handler);
m_entities.push_back(e2);
e1->connect(0, *e2, 0);
e2->connect(0, *e1, 0);
}
/* Allocate completion handle and set the callback */
m_completion_count = 0;
/* Give a chance to finish connection for some transports (ib/ud, tcp) */
flush();
}
void uct_p2p_test::cleanup() {
flush();
uct_test::cleanup();
}
void uct_p2p_test::test_xfer(send_func_t send, size_t length, unsigned flags,
ucs_memory_type_t mem_type) {
UCS_TEST_SKIP;
}
ucs_log_func_rc_t
uct_p2p_test::log_handler(const char *file, unsigned line, const char *function,
ucs_log_level_t level, const char *message, va_list ap)
{
if (level == UCS_LOG_LEVEL_TRACE_DATA) {
++log_data_count;
}
/* Continue to next log handler if original log level would have allowed it */
return (level <= orig_log_level) ? UCS_LOG_FUNC_RC_CONTINUE
: UCS_LOG_FUNC_RC_STOP;
}
template <typename O>
void uct_p2p_test::test_xfer_print(O& os, send_func_t send, size_t length,
unsigned flags, ucs_memory_type_t mem_type)
{
if (!ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) {
os << ucs::size_value(length) << " " << std::flush;
}
/*
* Set our own log handler, and raise log level, to test that the transport
* prints log messages for the transfers.
*/
int count_before = log_data_count;
ucs_log_push_handler(log_handler);
orig_log_level = ucs_global_opts.log_level;
ucs_global_opts.log_level = UCS_LOG_LEVEL_TRACE_DATA;
bool expect_log = ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA);
UCS_TEST_SCOPE_EXIT() {
/* Restore logging */
ucs_global_opts.log_level = orig_log_level;
ucs_log_pop_handler();
} UCS_TEST_SCOPE_EXIT_END
test_xfer(send, length, flags, mem_type);
if (expect_log) {
EXPECT_GE(log_data_count - count_before, 1);
}
}
void uct_p2p_test::test_xfer_multi(send_func_t send, size_t min_length,
size_t max_length, unsigned flags)
{
for (int mem_type = 0; mem_type < UCS_MEMORY_TYPE_LAST; mem_type++) {
/* test mem type if md supports mem type
* (or) if HOST MD can register mem type
*/
if (!((sender().md_attr().cap.access_mem_type == mem_type) ||
(sender().md_attr().cap.access_mem_type == UCS_MEMORY_TYPE_HOST &&
sender().md_attr().cap.reg_mem_types & UCS_BIT(mem_type)))) {
continue;
}
if (mem_type == UCS_MEMORY_TYPE_CUDA) {
if (!(flags & (TEST_UCT_FLAG_RECV_ZCOPY | TEST_UCT_FLAG_SEND_ZCOPY))) {
continue;
}
}
test_xfer_multi_mem_type(send, min_length, max_length, flags,
(ucs_memory_type_t) mem_type);
}
}
void uct_p2p_test::test_xfer_multi_mem_type(send_func_t send, size_t min_length,
size_t max_length, unsigned flags,
ucs_memory_type_t mem_type) {
ucs::detail::message_stream ms("INFO");
ms << "memory_type:" << ucs_memory_type_names[mem_type] << " " << std::flush;
/* Trim at 4.1 GB */
max_length = ucs_min(max_length, (size_t)(4.1 * (double)UCS_GBYTE));
/* Trim at max. phys memory */
max_length = ucs_min(max_length, ucs_get_phys_mem_size() / 8);
/* Trim at max. shared memory */
max_length = ucs_min(max_length, ucs_get_shmmax() * 0.8);
/* Trim when short of available memory */
max_length = ucs_min(max_length, ucs_get_memfree_size() / 4);
/* For large size, slow down if needed */
if (max_length > UCS_MBYTE) {
max_length = max_length / ucs::test_time_multiplier();
if (RUNNING_ON_VALGRIND) {
max_length = ucs_min(max_length, 20u * UCS_MBYTE);
}
}
if (max_length <= min_length) {
UCS_TEST_SKIP;
}
m_null_completion = false;
/* Run with min and max values */
test_xfer_print(ms, send, min_length, flags, mem_type);
test_xfer_print(ms, send, max_length, flags, mem_type);
/*
* Generate SQRT( log(max/min) ) random sizes
*/
double log_min = log2(min_length + 1);
double log_max = log2(max_length - 1);
/* How many times to repeat */
int repeat_count;
repeat_count = (256 * UCS_KBYTE) / ((max_length + min_length) / 2);
if (repeat_count > 1000) {
repeat_count = 1000;
}
repeat_count /= ucs::test_time_multiplier();
if (repeat_count == 0) {
repeat_count = 1;
}
if (!ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) {
ms << repeat_count << "x{" << ucs::size_value(min_length) << ".."
<< ucs::size_value(max_length) << "} " << std::flush;
}
for (int i = 0; i < repeat_count; ++i) {
double exp = (ucs::rand() * (log_max - log_min)) / RAND_MAX + log_min;
size_t length = (ssize_t)pow(2.0, exp);
ucs_assert(length >= min_length && length <= max_length);
test_xfer(send, length, flags, mem_type);
}
/* Run a test with implicit non-blocking mode */
m_null_completion = true;
ms << "nocomp ";
test_xfer_print(ms, send, (long)sqrt((min_length + 1.0) * max_length),
flags, mem_type);
sender().flush();
}
void uct_p2p_test::blocking_send(send_func_t send, uct_ep_h ep,
const mapped_buffer &sendbuf,
const mapped_buffer &recvbuf,
bool wait_for_completion)
{
unsigned prev_comp_count = m_completion_count;
ucs_status_t status;
do {
status = (this->*send)(ep, sendbuf, recvbuf);
if (status == UCS_OK) {
return;
} else if (status == UCS_ERR_NO_RESOURCE) {
progress();
} else if (status == UCS_INPROGRESS) {
break;
} else {
UCS_TEST_ABORT(ucs_status_string(status));
}
} while (status == UCS_ERR_NO_RESOURCE);
/* Operation in progress, wait for completion */
ucs_assert(status == UCS_INPROGRESS);
if (wait_for_completion) {
if (comp() == NULL) {
/* implicit non-blocking mode */
/* Call flush on local and remote ifaces to progress data
* (e.g. if call flush only on local iface, a target side may
* not be able to send PUT ACK to an initiator in case of TCP) */
flush();
} else {
/* explicit non-blocking mode */
++m_completion.uct.count;
while (m_completion_count <= prev_comp_count) {
progress();
}
EXPECT_EQ(0, m_completion.uct.count);
}
}
}
void uct_p2p_test::wait_for_remote() {
/* Call flush on local and remote ifaces to progress data
* (e.g. if call flush only on local iface, a target side may
* not be able to send PUT ACK to an initiator in case of TCP) */
flush();
}
uct_test::entity& uct_p2p_test::sender() {
return **m_entities.begin();
}
uct_ep_h uct_p2p_test::sender_ep() {
return sender().ep(0);
}
uct_test::entity& uct_p2p_test::receiver() {
return **(m_entities.end() - 1);
}
uct_completion_t *uct_p2p_test::comp() {
if (m_null_completion) {
return NULL;
} else {
return &m_completion.uct;
}
}
void uct_p2p_test::completion_cb(uct_completion_t *self, ucs_status_t status) {
completion *comp = ucs_container_of(self, completion, uct);
++comp->self->m_completion_count;
}