/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
* Copyright (C) UT-Battelle, LLC. 2014-2015. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
extern "C" {
#include <uct/api/uct.h>
#include <uct/base/uct_iface.h>
#include <ucs/time/time.h>
}
#include <common/test.h>
#include "uct_test.h"
#include "uct_p2p_test.h"
#if ENABLE_STATS
#define EXPECT_STAT(_side, _uct_obj, _stat, _exp_val) \
do { \
uint64_t v = UCS_STATS_GET_COUNTER(_uct_obj(_side())->stats, _stat); \
EXPECT_EQ(get_cntr_init(UCS_PP_MAKE_STRING(_side), \
UCS_PP_MAKE_STRING(_stat)) + (_exp_val), v); \
} while (0)
class test_uct_stats : public uct_p2p_test {
public:
test_uct_stats() : uct_p2p_test(0), lbuf(NULL), rbuf(NULL) {
m_comp.func = NULL;
m_comp.count = 0;
}
virtual void init() {
stats_activate();
uct_p2p_test::init();
// Sender EP
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_FLUSH),
UCT_EP_STAT_FLUSH);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_FLUSH_WAIT),
UCT_EP_STAT_FLUSH_WAIT);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_FENCE),
UCT_EP_STAT_FENCE);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_AM),
UCT_EP_STAT_AM);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_NO_RES),
UCT_EP_STAT_NO_RES);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_PENDING),
UCT_EP_STAT_PENDING);
collect_cntr_init("sender", uct_ep(sender())->stats,
UCS_PP_MAKE_STRING(UCT_EP_STAT_ATOMIC),
UCT_EP_STAT_ATOMIC);
// Sender IFACE
collect_cntr_init("sender", uct_iface(sender())->stats,
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_FLUSH),
UCT_IFACE_STAT_FLUSH);
collect_cntr_init("sender", uct_iface(sender())->stats,
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_FLUSH_WAIT),
UCT_IFACE_STAT_FLUSH_WAIT);
collect_cntr_init("sender", uct_iface(sender())->stats,
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_FENCE),
UCT_IFACE_STAT_FENCE);
// Receiver IFACE
collect_cntr_init("receiver", uct_iface(receiver())->stats,
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_RX_AM),
UCT_IFACE_STAT_RX_AM);
collect_cntr_init("receiver", uct_iface(receiver())->stats,
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_RX_AM_BYTES),
UCT_IFACE_STAT_RX_AM_BYTES);
}
void collect_cntr_init(std::string side, ucs_stats_node_t *stats_node,
std::string stat_name, unsigned stat) {
cntr_init[side][stat_name] = UCS_STATS_GET_COUNTER(stats_node, stat);
}
size_t get_cntr_init(std::string side, std::string stat_name) {
return cntr_init[side][stat_name];
}
void init_bufs(size_t min, size_t max)
{
size_t size = ucs_max(min, ucs_min(64ul, max));
lbuf = new mapped_buffer(size, 0, sender(), 0, sender().md_attr().cap.access_mem_type);
rbuf = new mapped_buffer(size, 0, receiver(), 0, sender().md_attr().cap.access_mem_type);
}
virtual void cleanup() {
flush();
delete lbuf;
delete rbuf;
uct_p2p_test::cleanup();
stats_restore();
}
uct_base_ep_t *uct_ep(const entity &e)
{
return ucs_derived_of(e.ep(0), uct_base_ep_t);
}
uct_base_iface_t *uct_iface(const entity &e)
{
return ucs_derived_of(e.iface(), uct_base_iface_t);
}
static ucs_status_t am_handler(void *arg, void *data, size_t length,
unsigned flags) {
return UCS_OK;
}
static void purge_cb(uct_pending_req_t *r, void *arg)
{
}
void check_am_rx_counters(size_t len) {
uint64_t iface_rx_am_init = get_cntr_init("receiver",
UCS_PP_MAKE_STRING(UCT_IFACE_STAT_RX_AM));
uint64_t v;
ucs_time_t deadline = ucs::get_deadline();
do {
short_progress_loop();
v = UCS_STATS_GET_COUNTER(uct_iface(receiver())->stats, UCT_IFACE_STAT_RX_AM);
} while ((ucs_get_time() < deadline) && (v == iface_rx_am_init));
EXPECT_STAT(receiver, uct_iface, UCT_IFACE_STAT_RX_AM, 1UL);
EXPECT_STAT(receiver, uct_iface, UCT_IFACE_STAT_RX_AM_BYTES, len);
}
void check_atomic_counters() {
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_ATOMIC, 1UL);
/* give atomic chance to complete */
short_progress_loop();
}
int fill_tx_q(int n) {
int count_wait;
int i, max;
size_t len;
max = (n == 0) ? 1024 : n;
for (count_wait = i = 0; i < max; i++) {
len = uct_ep_am_bcopy(sender_ep(), 0, mapped_buffer::pack, lbuf, 0);
if (len != lbuf->length()) {
if (n == 0) {
return 1;
}
count_wait++;
}
}
return count_wait;
}
void init_completion() {
m_comp.count = 2;
m_comp.func = NULL;
}
void wait_for_completion(ucs_status_t status) {
EXPECT_TRUE(UCS_INPROGRESS == status || UCS_OK == status);
if (status == UCS_OK) {
--m_comp.count;
}
ucs_time_t deadline = ucs::get_deadline();
do {
short_progress_loop();
} while ((ucs_get_time() < deadline) && (m_comp.count > 1));
EXPECT_EQ(1, m_comp.count);
}
protected:
mapped_buffer *lbuf, *rbuf;
uct_completion_t m_comp;
std::map< std::string, std::map< std::string, uint64_t > > cntr_init;
};
/* test basic stat counters:
* am, put, get, amo, flush and fence
*/
UCS_TEST_SKIP_COND_P(test_uct_stats, am_short,
!check_caps(UCT_IFACE_FLAG_AM_SHORT))
{
uint64_t hdr=0xdeadbeef, send_data=0xfeedf00d;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_short);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler,
0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
UCT_TEST_CALL_AND_TRY_AGAIN(uct_ep_am_short(sender_ep(), 0, hdr, &send_data,
sizeof(send_data)), status);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_AM, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_SHORT,
sizeof(hdr) + sizeof(send_data));
check_am_rx_counters(sizeof(hdr) + sizeof(send_data));
}
UCS_TEST_SKIP_COND_P(test_uct_stats, am_bcopy,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
ssize_t v;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
UCT_TEST_CALL_AND_TRY_AGAIN(uct_ep_am_bcopy(sender_ep(), 0, mapped_buffer::pack,
lbuf, 0), v);
EXPECT_EQ((ssize_t)lbuf->length(), v);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_AM, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_BCOPY,
lbuf->length());
check_am_rx_counters(lbuf->length());
}
UCS_TEST_SKIP_COND_P(test_uct_stats, am_zcopy,
!check_caps(UCT_IFACE_FLAG_AM_ZCOPY))
{
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_zcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, lbuf->ptr(), lbuf->length(), lbuf->memh(),
sender().iface_attr().cap.am.max_iov);
UCT_TEST_CALL_AND_TRY_AGAIN(uct_ep_am_zcopy(sender_ep(), 0, 0, 0,
iov, iovcnt, 0, NULL), status);
EXPECT_TRUE(UCS_INPROGRESS == status || UCS_OK == status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_AM, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_ZCOPY,
lbuf->length());
check_am_rx_counters(lbuf->length());
}
UCS_TEST_SKIP_COND_P(test_uct_stats, put_short,
!check_caps(UCT_IFACE_FLAG_PUT_SHORT))
{
uint64_t send_data=0xfeedf00d;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.put.max_short);
UCT_TEST_CALL_AND_TRY_AGAIN(uct_ep_put_short(sender_ep(), &send_data, sizeof(send_data),
rbuf->addr(), rbuf->rkey()), status);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_PUT, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_SHORT,
sizeof(send_data));
}
UCS_TEST_SKIP_COND_P(test_uct_stats, put_bcopy,
!check_caps(UCT_IFACE_FLAG_PUT_BCOPY))
{
ssize_t v;
init_bufs(0, sender().iface_attr().cap.put.max_bcopy);
UCT_TEST_CALL_AND_TRY_AGAIN(uct_ep_put_bcopy(sender_ep(), mapped_buffer::pack, lbuf,
rbuf->addr(), rbuf->rkey()), v);
EXPECT_EQ((ssize_t)lbuf->length(), v);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_PUT, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_BCOPY,
lbuf->length());
}
UCS_TEST_SKIP_COND_P(test_uct_stats, put_zcopy,
!check_caps(UCT_IFACE_FLAG_PUT_ZCOPY))
{
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.put.max_zcopy);
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, lbuf->ptr(), lbuf->length(), lbuf->memh(),
sender().iface_attr().cap.put.max_iov);
UCT_TEST_CALL_AND_TRY_AGAIN(
uct_ep_put_zcopy(sender_ep(), iov, iovcnt, rbuf->addr(),
rbuf->rkey(), 0), status);
EXPECT_TRUE(UCS_INPROGRESS == status || UCS_OK == status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_PUT, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_ZCOPY,
lbuf->length());
}
UCS_TEST_SKIP_COND_P(test_uct_stats, get_bcopy,
!check_caps(UCT_IFACE_FLAG_GET_BCOPY))
{
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.get.max_bcopy);
init_completion();
UCT_TEST_CALL_AND_TRY_AGAIN(
uct_ep_get_bcopy(sender_ep(), (uct_unpack_callback_t)memcpy,
lbuf->ptr(), lbuf->length(),
rbuf->addr(), rbuf->rkey(), &m_comp), status);
wait_for_completion(status);
short_progress_loop();
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_GET, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_BCOPY,
lbuf->length());
}
UCS_TEST_SKIP_COND_P(test_uct_stats, get_zcopy,
!check_caps(UCT_IFACE_FLAG_GET_ZCOPY))
{
ucs_status_t status;
init_bufs(sender().iface_attr().cap.get.min_zcopy,
sender().iface_attr().cap.get.max_zcopy);
UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, lbuf->ptr(), lbuf->length(), lbuf->memh(),
sender().iface_attr().cap.get.max_iov);
init_completion();
UCT_TEST_CALL_AND_TRY_AGAIN(
uct_ep_get_zcopy(sender_ep(), iov, iovcnt, rbuf->addr(),
rbuf->rkey(), &m_comp), status);
wait_for_completion(status);
short_progress_loop();
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_GET, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_BYTES_ZCOPY,
lbuf->length());
}
#define TEST_STATS_ATOMIC_POST(_op, _val) \
UCS_TEST_SKIP_COND_P(test_uct_stats, atomic_post_ ## _op ## _val, \
!check_atomics(UCS_BIT(UCT_ATOMIC_OP_ ## _op), OP ## _val)) \
{ \
ucs_status_t status; \
init_bufs(sizeof(uint##_val##_t), sizeof(uint##_val##_t)); \
status = uct_ep_atomic ##_val##_post(sender_ep(), (UCT_ATOMIC_OP_ ## _op), \
1, rbuf->addr(), rbuf->rkey()); \
EXPECT_UCS_OK(status); \
check_atomic_counters(); \
}
TEST_STATS_ATOMIC_POST(ADD, 32)
TEST_STATS_ATOMIC_POST(ADD, 64)
TEST_STATS_ATOMIC_POST(AND, 32)
TEST_STATS_ATOMIC_POST(AND, 64)
TEST_STATS_ATOMIC_POST(OR, 32)
TEST_STATS_ATOMIC_POST(OR, 64)
TEST_STATS_ATOMIC_POST(XOR, 32)
TEST_STATS_ATOMIC_POST(XOR, 64)
#define TEST_STATS_ATOMIC_FETCH(_op, _val) \
UCS_TEST_SKIP_COND_P(test_uct_stats, atomic_fetch_## _op ## _val, \
!check_atomics(UCS_BIT(UCT_ATOMIC_OP_ ## _op), FOP ## _val)) \
{ \
ucs_status_t status; \
uint##_val##_t result; \
\
init_bufs(sizeof(result), sizeof(result)); \
\
init_completion(); \
status = uct_ep_atomic##_val##_fetch(sender_ep(), (UCT_ATOMIC_OP_ ## _op), 1, \
&result, rbuf->addr(), rbuf->rkey(), &m_comp); \
wait_for_completion(status); \
\
check_atomic_counters(); \
}
TEST_STATS_ATOMIC_FETCH(ADD, 32)
TEST_STATS_ATOMIC_FETCH(ADD, 64)
TEST_STATS_ATOMIC_FETCH(AND, 32)
TEST_STATS_ATOMIC_FETCH(AND, 64)
TEST_STATS_ATOMIC_FETCH(OR, 32)
TEST_STATS_ATOMIC_FETCH(OR, 64)
TEST_STATS_ATOMIC_FETCH(XOR, 32)
TEST_STATS_ATOMIC_FETCH(XOR, 64)
TEST_STATS_ATOMIC_FETCH(SWAP, 32)
TEST_STATS_ATOMIC_FETCH(SWAP, 64)
#define TEST_STATS_ATOMIC_CSWAP(val) \
UCS_TEST_SKIP_COND_P(test_uct_stats, atomic_cswap##val, \
!check_atomics(UCS_BIT(UCT_ATOMIC_OP_CSWAP), FOP ## val)) \
{ \
ucs_status_t status; \
uint##val##_t result; \
\
init_bufs(sizeof(result), sizeof(result)); \
\
init_completion(); \
UCT_TEST_CALL_AND_TRY_AGAIN( \
uct_ep_atomic_cswap##val (sender_ep(), 1, 2, rbuf->addr(), \
rbuf->rkey(), &result, &m_comp), \
status); \
wait_for_completion(status); \
\
check_atomic_counters(); \
}
TEST_STATS_ATOMIC_CSWAP(32)
TEST_STATS_ATOMIC_CSWAP(64)
UCS_TEST_P(test_uct_stats, flush)
{
ucs_status_t status;
if (sender_ep()) {
status = uct_ep_flush(sender_ep(), 0, NULL);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FLUSH, 1Ul);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FLUSH_WAIT, 0UL);
}
status = uct_iface_flush(sender().iface(), 0, NULL);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FLUSH, 1UL);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FLUSH_WAIT, 0UL);
}
UCS_TEST_P(test_uct_stats, fence)
{
ucs_status_t status;
if (sender_ep()) {
status = uct_ep_fence(sender_ep(), 0);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FENCE, 1UL);
}
status = uct_iface_fence(sender().iface(), 0);
EXPECT_UCS_OK(status);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FENCE, 1UL);
}
/* flush test only check stats on tls with am_bcopy
* TODO: full test matrix
*/
UCS_TEST_SKIP_COND_P(test_uct_stats, flush_wait_iface,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
uint64_t count_wait;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
fill_tx_q(0);
count_wait = 0;
do {
status = uct_iface_flush(sender().iface(), 0, NULL);
if (status == UCS_INPROGRESS) {
count_wait++;
}
progress();
} while (status != UCS_OK);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FLUSH, 1UL);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FLUSH_WAIT, count_wait);
}
UCS_TEST_SKIP_COND_P(test_uct_stats, flush_wait_ep,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
uint64_t count_wait;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
fill_tx_q(0);
count_wait = 0;
do {
status = uct_ep_flush(sender_ep(), 0, NULL);
if (status == UCS_INPROGRESS) {
count_wait++;
}
progress();
} while (status != UCS_OK);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FLUSH, 1UL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FLUSH_WAIT, count_wait);
}
/* fence test only check stats on tls with am_bcopy
* TODO: full test matrix
*/
UCS_TEST_SKIP_COND_P(test_uct_stats, fence_iface,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
fill_tx_q(0);
status = uct_iface_fence(sender().iface(), 0);
EXPECT_UCS_OK(status);
fill_tx_q(0);
EXPECT_STAT(sender, uct_iface, UCT_IFACE_STAT_FENCE, 1UL);
}
UCS_TEST_SKIP_COND_P(test_uct_stats, fence_ep,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
fill_tx_q(0);
status = uct_ep_fence(sender_ep(), 0);
EXPECT_UCS_OK(status);
fill_tx_q(0);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_FENCE, 1UL);
}
UCS_TEST_SKIP_COND_P(test_uct_stats, tx_no_res,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY))
{
uint64_t count;
ucs_status_t status;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
status = uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0, UCT_CB_FLAG_ASYNC);
EXPECT_UCS_OK(status);
count = fill_tx_q(1024);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_NO_RES, count);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_AM, 1024 - count);
}
UCS_TEST_SKIP_COND_P(test_uct_stats, pending_add,
!check_caps(UCT_IFACE_FLAG_AM_BCOPY |
UCT_IFACE_FLAG_PENDING))
{
const size_t num_reqs = 5;
uct_pending_req_t p_reqs[num_reqs];
ssize_t len;
init_bufs(0, sender().iface_attr().cap.am.max_bcopy);
EXPECT_UCS_OK(uct_iface_set_am_handler(receiver().iface(), 0, am_handler, 0,
UCT_CB_FLAG_ASYNC));
// Check that counter is not increased if pending_add returns NOT_OK
EXPECT_EQ(uct_ep_pending_add(sender().ep(0), &p_reqs[0], 0),
UCS_ERR_BUSY);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_PENDING, 0UL);
// Check that counter gets increased on every successfull pending_add returns NOT_OK
fill_tx_q(0);
UCT_TEST_CALL_AND_TRY_AGAIN(
uct_ep_am_bcopy(sender_ep(), 0, mapped_buffer::pack,
lbuf, 0), len);
if (len == (ssize_t)lbuf->length()) {
UCS_TEST_SKIP_R("Can't add to pending");
}
for (size_t i = 0; i < num_reqs; ++i) {
p_reqs[i].func = NULL;
EXPECT_UCS_OK(uct_ep_pending_add(sender().ep(0), &p_reqs[i], 0));
}
uct_ep_pending_purge(sender().ep(0), purge_cb, NULL);
EXPECT_STAT(sender, uct_ep, UCT_EP_STAT_PENDING, num_reqs);
}
UCT_INSTANTIATE_TEST_CASE(test_uct_stats);
#endif