/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
#include "ucp_test.h"
#include <common/test_helpers.h>
extern "C" {
#include <ucp/core/ucp_worker.h>
#if HAVE_IB
#include <uct/ib/ud/base/ud_iface.h>
#endif
#include <ucs/arch/atomic.h>
#include <ucs/stats/stats.h>
}
#include <queue>
namespace ucp {
const uint32_t MAGIC = 0xd7d7d7d7U;
}
std::ostream& operator<<(std::ostream& os, const ucp_test_param& test_param)
{
std::vector<std::string>::const_iterator iter;
const std::vector<std::string>& transports = test_param.transports;
for (iter = transports.begin(); iter != transports.end(); ++iter) {
if (iter != transports.begin()) {
os << ",";
}
os << *iter;
}
return os;
}
const ucp_datatype_t ucp_test::DATATYPE = ucp_dt_make_contig(1);
const ucp_datatype_t ucp_test::DATATYPE_IOV = ucp_dt_make_iov();
ucp_test::ucp_test() {
ucs_status_t status;
status = ucp_config_read(NULL, NULL, &m_ucp_config);
ASSERT_UCS_OK(status);
}
ucp_test::~ucp_test() {
for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
iter != entities().end(); ++iter)
{
(*iter)->warn_existing_eps();
}
ucp_config_release(m_ucp_config);
}
void ucp_test::cleanup() {
/* disconnect before destroying the entities */
for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
iter != entities().end(); ++iter)
{
disconnect(**iter);
}
for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
iter != entities().end(); ++iter)
{
(*iter)->cleanup();
}
m_entities.clear();
}
void ucp_test::init() {
test_base::init();
create_entity();
if (!is_self()) {
create_entity();
}
}
static bool check_transport(const std::string check_tl_name,
const std::vector<std::string>& tl_names) {
return (std::find(tl_names.begin(), tl_names.end(),
check_tl_name) != tl_names.end());
}
bool ucp_test::has_transport(const std::string& tl_name) const {
return check_transport(tl_name, GetParam().transports);
}
bool ucp_test::has_only_transports(const std::vector<std::string>& tl_names) const {
const std::vector<std::string>& transports = GetParam().transports;
size_t other_tls_count = 0;
std::vector<std::string>::const_iterator iter;
for(iter = transports.begin(); iter != transports.end(); ++iter) {
if (!check_transport(*iter, tl_names)) {
other_tls_count++;
}
}
return !other_tls_count;
}
bool ucp_test::is_self() const {
return "self" == GetParam().transports.front();
}
ucp_test_base::entity* ucp_test::create_entity(bool add_in_front) {
return create_entity(add_in_front, GetParam());
}
ucp_test_base::entity*
ucp_test::create_entity(bool add_in_front, const ucp_test_param &test_param) {
entity *e = new entity(test_param, m_ucp_config, get_worker_params(), this);
if (add_in_front) {
m_entities.push_front(e);
} else {
m_entities.push_back(e);
}
return e;
}
ucp_params_t ucp_test::get_ctx_params() {
ucp_params_t params;
memset(¶ms, 0, sizeof(params));
params.field_mask |= UCP_PARAM_FIELD_FEATURES;
return params;
}
ucp_worker_params_t ucp_test::get_worker_params() {
ucp_worker_params_t params;
memset(¶ms, 0, sizeof(params));
params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
params.thread_mode = UCS_THREAD_MODE_MULTI;
return params;
}
ucp_ep_params_t ucp_test::get_ep_params() {
ucp_ep_params_t params;
memset(¶ms, 0, sizeof(params));
return params;
}
unsigned ucp_test::progress(int worker_index) const {
unsigned count = 0;
for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
iter != entities().end(); ++iter)
{
count += (*iter)->progress(worker_index);
sched_yield();
}
return count;
}
void ucp_test::short_progress_loop(int worker_index) const {
for (unsigned i = 0; i < 100; ++i) {
progress(worker_index);
usleep(100);
}
}
void ucp_test::flush_ep(const entity &e, int worker_index, int ep_index)
{
void *request = e.flush_ep_nb(worker_index, ep_index);
wait(request, worker_index);
}
void ucp_test::flush_worker(const entity &e, int worker_index)
{
void *request = e.flush_worker_nb(worker_index);
wait(request, worker_index);
}
void ucp_test::disconnect(const entity& e) {
bool has_failed_entity = false;
for (ucs::ptr_vector<entity>::const_iterator iter = entities().begin();
!has_failed_entity && (iter != entities().end()); ++iter) {
has_failed_entity = ((*iter)->get_err_num() > 0);
}
for (int i = 0; i < e.get_num_workers(); i++) {
enum ucp_ep_close_mode close_mode;
if (has_failed_entity) {
close_mode = UCP_EP_CLOSE_MODE_FORCE;
} else {
flush_worker(e, i);
close_mode = UCP_EP_CLOSE_MODE_FLUSH;
}
for (int j = 0; j < e.get_num_eps(i); j++) {
void *dreq = e.disconnect_nb(i, j, close_mode);
if (!UCS_PTR_IS_PTR(dreq)) {
ASSERT_UCS_OK(UCS_PTR_STATUS(dreq));
}
wait(dreq, i);
}
}
}
void ucp_test::wait(void *req, int worker_index)
{
if (req == NULL) {
return;
}
if (UCS_PTR_IS_ERR(req)) {
ucs_error("operation returned error: %s",
ucs_status_string(UCS_PTR_STATUS(req)));
return;
}
ucs_status_t status;
do {
progress(worker_index);
status = ucp_request_check_status(req);
} while (status == UCS_INPROGRESS);
if (status != UCS_OK) {
/* UCS errors are suppressed in case of error handling tests */
ucs_error("request %p completed with error %s", req,
ucs_status_string(status));
}
ucp_request_release(req);
}
void ucp_test::set_ucp_config(ucp_config_t *config) {
set_ucp_config(config, GetParam());
}
int ucp_test::max_connections() {
if (has_transport("tcp")) {
return ucs::max_tcp_connections();
} else {
return std::numeric_limits<int>::max();
}
}
std::vector<ucp_test_param>
ucp_test::enum_test_params(const ucp_params_t& ctx_params,
const std::string& name,
const std::string& test_case_name,
const std::string& tls)
{
ucp_test_param test_param;
std::stringstream ss(tls);
test_param.ctx_params = ctx_params;
test_param.variant = DEFAULT_PARAM_VARIANT;
test_param.thread_type = SINGLE_THREAD;
while (ss.good()) {
std::string tl_name;
std::getline(ss, tl_name, ',');
test_param.transports.push_back(tl_name);
}
if (check_test_param(name, test_case_name, test_param)) {
return std::vector<ucp_test_param>(1, test_param);
} else {
return std::vector<ucp_test_param>();
}
}
void ucp_test::generate_test_params_variant(const ucp_params_t& ctx_params,
const std::string& name,
const std::string& test_case_name,
const std::string& tls,
int variant,
std::vector<ucp_test_param>& test_params,
int thread_type)
{
std::vector<ucp_test_param> tmp_test_params;
tmp_test_params = ucp_test::enum_test_params(ctx_params,name,
test_case_name, tls);
for (std::vector<ucp_test_param>::iterator iter = tmp_test_params.begin();
iter != tmp_test_params.end(); ++iter)
{
iter->variant = variant;
iter->thread_type = thread_type;
test_params.push_back(*iter);
}
}
void ucp_test::set_ucp_config(ucp_config_t *config,
const ucp_test_param& test_param)
{
std::stringstream ss;
ss << test_param;
ucp_config_modify(config, "TLS", ss.str().c_str());
/* prevent configuration warnings in the UCP testing */
ucp_config_modify(config, "WARN_INVALID_CONFIG", "no");
}
void ucp_test::modify_config(const std::string& name, const std::string& value,
bool optional)
{
ucs_status_t status;
status = ucp_config_modify(m_ucp_config, name.c_str(), value.c_str());
if (status == UCS_ERR_NO_ELEM) {
test_base::modify_config(name, value, optional);
} else if (status != UCS_OK) {
UCS_TEST_ABORT("Couldn't modify ucp config parameter: " <<
name.c_str() << " to " << value.c_str() << ": " <<
ucs_status_string(status));
}
}
void ucp_test::stats_activate()
{
ucs_stats_cleanup();
push_config();
modify_config("STATS_DEST", "file:/dev/null");
modify_config("STATS_TRIGGER", "exit");
ucs_stats_init();
ASSERT_TRUE(ucs_stats_is_active());
}
void ucp_test::stats_restore()
{
ucs_stats_cleanup();
pop_config();
ucs_stats_init();
}
bool ucp_test::check_test_param(const std::string& name,
const std::string& test_case_name,
const ucp_test_param& test_param)
{
typedef std::map<std::string, bool> cache_t;
static cache_t cache;
if (test_param.transports.empty()) {
return false;
}
cache_t::iterator iter = cache.find(name);
if (iter != cache.end()) {
return iter->second;
}
ucs::handle<ucp_config_t*> config;
UCS_TEST_CREATE_HANDLE(ucp_config_t*, config, ucp_config_release,
ucp_config_read, NULL, NULL);
set_ucp_config(config, test_param);
ucp_context_h ucph;
ucs_status_t status;
{
scoped_log_handler slh(hide_errors_logger);
status = ucp_init(&test_param.ctx_params, config, &ucph);
}
bool result;
if (status == UCS_OK) {
ucp_cleanup(ucph);
result = true;
} else if (status == UCS_ERR_NO_DEVICE) {
result = false;
} else {
UCS_TEST_ABORT("Failed to create context (" << test_case_name << "): "
<< ucs_status_string(status));
}
UCS_TEST_MESSAGE << "checking " << name << ": " << (result ? "yes" : "no");
cache[name] = result;
return result;
}
ucp_test_base::entity::entity(const ucp_test_param& test_param,
ucp_config_t* ucp_config,
const ucp_worker_params_t& worker_params,
const ucp_test_base *test_owner)
: m_err_cntr(0), m_rejected_cntr(0)
{
ucp_test_param entity_param = test_param;
ucp_worker_params_t local_worker_params = worker_params;
int num_workers;
if (test_param.thread_type == MULTI_THREAD_CONTEXT) {
num_workers = MT_TEST_NUM_THREADS;
entity_param.ctx_params.mt_workers_shared = 1;
local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
} else if (test_param.thread_type == MULTI_THREAD_WORKER) {
num_workers = 1;
entity_param.ctx_params.mt_workers_shared = 0;
local_worker_params.thread_mode = UCS_THREAD_MODE_MULTI;
} else {
num_workers = 1;
entity_param.ctx_params.mt_workers_shared = 0;
local_worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
}
entity_param.ctx_params.field_mask |= UCP_PARAM_FIELD_MT_WORKERS_SHARED;
local_worker_params.field_mask |= UCP_WORKER_PARAM_FIELD_THREAD_MODE;
ucp_test::set_ucp_config(ucp_config, entity_param);
{
scoped_log_handler slh(hide_errors_logger);
UCS_TEST_CREATE_HANDLE(ucp_context_h, m_ucph, ucp_cleanup, ucp_init,
&entity_param.ctx_params, ucp_config);
}
m_workers.resize(num_workers);
for (int i = 0; i < num_workers; i++) {
UCS_TEST_CREATE_HANDLE(ucp_worker_h, m_workers[i].first,
ucp_worker_destroy, ucp_worker_create, m_ucph,
&local_worker_params);
}
}
ucp_test_base::entity::~entity() {
cleanup();
}
void ucp_test_base::entity::connect(const entity* other,
const ucp_ep_params_t& ep_params,
int ep_idx, int do_set_ep) {
assert(get_num_workers() == other->get_num_workers());
for (unsigned i = 0; i < unsigned(get_num_workers()); i++) {
ucs_status_t status;
ucp_address_t *address;
size_t address_length;
ucp_ep_h ep;
status = ucp_worker_get_address(other->worker(i), &address, &address_length);
ASSERT_UCS_OK(status);
{
scoped_log_handler slh(hide_errors_logger);
ucp_ep_params_t local_ep_params = ep_params;
local_ep_params.field_mask |= UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
local_ep_params.address = address;
status = ucp_ep_create(m_workers[i].first, &local_ep_params, &ep);
}
if (status == UCS_ERR_UNREACHABLE) {
ucp_worker_release_address(other->worker(i), address);
UCS_TEST_SKIP_R(m_errors.empty() ? "Unreachable" : m_errors.back());
}
ASSERT_UCS_OK(status, << " (" << m_errors.back() << ")");
if (do_set_ep) {
set_ep(ep, i, ep_idx);
}
ucp_worker_release_address(other->worker(i), address);
}
}
ucp_ep_h ucp_test_base::entity::accept(ucp_worker_h worker,
ucp_conn_request_h conn_request)
{
ucp_ep_params_t ep_params = *m_server_ep_params;
ucp_ep_h ep;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_CONN_REQUEST |
UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.user_data = reinterpret_cast<void*>(this);
ep_params.conn_request = conn_request;
ucs_status_t status = ucp_ep_create(worker, &ep_params, &ep);
if (status == UCS_ERR_UNREACHABLE) {
UCS_TEST_SKIP_R("Skipping due an unreachable destination (unsupported "
"feature or no supported transport to send partial "
"worker address)");
}
ASSERT_UCS_OK(status);
return ep;
}
void* ucp_test_base::entity::modify_ep(const ucp_ep_params_t& ep_params,
int worker_idx, int ep_idx) {
return ucp_ep_modify_nb(ep(worker_idx, ep_idx), &ep_params);
}
void ucp_test_base::entity::set_ep(ucp_ep_h ep, int worker_index, int ep_index)
{
if (ep_index < get_num_eps(worker_index)) {
m_workers[worker_index].second[ep_index].reset(ep, ep_destructor, this);
} else {
m_workers[worker_index].second.push_back(
ucs::handle<ucp_ep_h, entity *>(ep, ucp_ep_destroy));
}
}
void ucp_test_base::entity::empty_send_completion(void *r, ucs_status_t status) {
}
void ucp_test_base::entity::accept_ep_cb(ucp_ep_h ep, void *arg) {
entity *self = reinterpret_cast<entity*>(arg);
int worker_index = 0; /* TODO pass worker index in arg */
/* take error handler from test fixture and add user data */
ucp_ep_params_t ep_params = *self->m_server_ep_params;
ep_params.field_mask &= UCP_EP_PARAM_FIELD_ERR_HANDLER;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_USER_DATA;
ep_params.user_data = reinterpret_cast<void*>(self);
void *req = ucp_ep_modify_nb(ep, &ep_params);
ASSERT_UCS_PTR_OK(req); /* don't expect this operation to block */
self->set_ep(ep, worker_index, self->get_num_eps(worker_index));
}
void ucp_test_base::entity::accept_conn_cb(ucp_conn_request_h conn_req, void* arg)
{
entity *self = reinterpret_cast<entity*>(arg);
self->m_conn_reqs.push(conn_req);
}
void ucp_test_base::entity::reject_conn_cb(ucp_conn_request_h conn_req, void* arg)
{
entity *self = reinterpret_cast<entity*>(arg);
ucp_listener_reject(self->m_listener, conn_req);
self->m_rejected_cntr++;
}
void* ucp_test_base::entity::flush_ep_nb(int worker_index, int ep_index) const {
return ucp_ep_flush_nb(ep(worker_index, ep_index), 0, empty_send_completion);
}
void* ucp_test_base::entity::flush_worker_nb(int worker_index) const {
if (worker(worker_index) == NULL) {
return NULL;
}
return ucp_worker_flush_nb(worker(worker_index), 0, empty_send_completion);
}
void ucp_test_base::entity::fence(int worker_index) const {
ucs_status_t status = ucp_worker_fence(worker(worker_index));
ASSERT_UCS_OK(status);
}
void* ucp_test_base::entity::disconnect_nb(int worker_index, int ep_index,
enum ucp_ep_close_mode mode) const {
ucp_ep_h ep = revoke_ep(worker_index, ep_index);
if (ep == NULL) {
return NULL;
}
return ucp_ep_close_nb(ep, mode);
}
void ucp_test_base::entity::destroy_worker(int worker_index) {
for (size_t i = 0; i < m_workers[worker_index].second.size(); ++i) {
m_workers[worker_index].second[i].revoke();
}
m_workers[worker_index].first.reset();
}
ucp_ep_h ucp_test_base::entity::ep(int worker_index, int ep_index) const {
if (size_t(worker_index) < m_workers.size()) {
if (size_t(ep_index) < m_workers[worker_index].second.size()) {
return m_workers[worker_index].second[ep_index];
}
}
return NULL;
}
ucp_ep_h ucp_test_base::entity::revoke_ep(int worker_index, int ep_index) const {
ucp_ep_h ucp_ep = ep(worker_index, ep_index);
if (ucp_ep) {
m_workers[worker_index].second[ep_index].revoke();
}
return ucp_ep;
}
ucs_status_t ucp_test_base::entity::listen(listen_cb_type_t cb_type,
const struct sockaddr* saddr,
socklen_t addrlen,
const ucp_ep_params_t& ep_params,
int worker_index)
{
ucp_listener_params_t params;
ucp_listener_h listener;
params.field_mask = UCP_LISTENER_PARAM_FIELD_SOCK_ADDR;
params.sockaddr.addr = saddr;
params.sockaddr.addrlen = addrlen;
switch (cb_type) {
case LISTEN_CB_EP:
params.field_mask |= UCP_LISTENER_PARAM_FIELD_ACCEPT_HANDLER;
params.accept_handler.cb = accept_ep_cb;
params.accept_handler.arg = reinterpret_cast<void*>(this);
break;
case LISTEN_CB_CONN:
params.field_mask |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
params.conn_handler.cb = accept_conn_cb;
params.conn_handler.arg = reinterpret_cast<void*>(this);
break;
case LISTEN_CB_REJECT:
params.field_mask |= UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
params.conn_handler.cb = reject_conn_cb;
params.conn_handler.arg = reinterpret_cast<void*>(this);
break;
default:
UCS_TEST_ABORT("invalid test parameter");
}
m_server_ep_params.reset(new ucp_ep_params_t(ep_params),
ucs::deleter<ucp_ep_params_t>);
ucs_status_t status;
{
scoped_log_handler wrap_err(wrap_errors_logger);
status = ucp_listener_create(worker(worker_index), ¶ms, &listener);
}
if (status == UCS_OK) {
m_listener.reset(listener, ucp_listener_destroy);
} else {
/* throw error if status is not (UCS_OK or UCS_ERR_UNREACHABLE or
* UCS_ERR_BUSY).
* UCS_ERR_INVALID_PARAM may also return but then the test should fail */
EXPECT_TRUE((status == UCS_ERR_UNREACHABLE) ||
(status == UCS_ERR_BUSY)) << ucs_status_string(status);
}
return status;
}
ucp_worker_h ucp_test_base::entity::worker(int worker_index) const {
if (worker_index < get_num_workers()) {
return m_workers[worker_index].first;
} else {
return NULL;
}
}
ucp_context_h ucp_test_base::entity::ucph() const {
return m_ucph;
}
ucp_listener_h ucp_test_base::entity::listenerh() const {
return m_listener;
}
unsigned ucp_test_base::entity::progress(int worker_index)
{
ucp_worker_h ucp_worker = worker(worker_index);
if (ucp_worker == NULL) {
return 0;
}
unsigned progress_count = 0;
if (!m_conn_reqs.empty()) {
ucp_conn_request_h conn_req = m_conn_reqs.back();
m_conn_reqs.pop();
ucp_ep_h ep = accept(ucp_worker, conn_req);
set_ep(ep, worker_index, std::numeric_limits<int>::max());
++progress_count;
}
return progress_count + ucp_worker_progress(ucp_worker);
}
int ucp_test_base::entity::get_num_workers() const {
return m_workers.size();
}
int ucp_test_base::entity::get_num_eps(int worker_index) const {
return m_workers[worker_index].second.size();
}
void ucp_test_base::entity::add_err(ucs_status_t status) {
switch (status) {
case UCS_ERR_REJECTED:
++m_rejected_cntr;
/* fall through */
default:
++m_err_cntr;
}
}
const size_t &ucp_test_base::entity::get_err_num_rejected() const {
return m_rejected_cntr;
}
const size_t &ucp_test_base::entity::get_err_num() const {
return m_err_cntr;
}
void ucp_test_base::entity::warn_existing_eps() const {
for (size_t worker_index = 0; worker_index < m_workers.size(); ++worker_index) {
for (size_t ep_index = 0; ep_index < m_workers[worker_index].second.size();
++ep_index) {
ADD_FAILURE() << "ep(" << worker_index << "," << ep_index <<
")=" << m_workers[worker_index].second[ep_index].get() <<
" was not destroyed during test cleanup()";
}
}
}
double ucp_test_base::entity::set_ib_ud_timeout(double timeout_sec)
{
double prev_timeout_sec = 0.;
#if HAVE_IB
for (ucp_rsc_index_t rsc_index = 0;
rsc_index < ucph()->num_tls; ++rsc_index) {
ucp_worker_iface_t *wiface = ucp_worker_iface(worker(), rsc_index);
// check if the iface is ud transport
if (wiface->iface->ops.iface_flush == uct_ud_iface_flush) {
uct_ud_iface_t *iface =
ucs_derived_of(wiface->iface, uct_ud_iface_t);
uct_ud_enter(iface);
if (!prev_timeout_sec) {
prev_timeout_sec = ucs_time_to_sec(iface->config.peer_timeout);
}
iface->config.peer_timeout = ucs_time_from_sec(timeout_sec);
uct_ud_leave(iface);
}
}
#endif
return prev_timeout_sec;
}
void ucp_test_base::entity::cleanup() {
m_listener.reset();
m_workers.clear();
}
void ucp_test_base::entity::ep_destructor(ucp_ep_h ep, entity *e)
{
ucs_status_ptr_t req = ucp_disconnect_nb(ep);
if (!UCS_PTR_IS_PTR(req)) {
return;
}
ucs_status_t status;
ucp_tag_recv_info_t info;
do {
e->progress();
status = ucp_request_test(req, &info);
} while (status == UCS_INPROGRESS);
EXPECT_EQ(UCS_OK, status);
ucp_request_release(req);
}
ucp_test::mapped_buffer::mapped_buffer(size_t size, const entity& entity,
int flags, ucs_memory_type_t mem_type) :
mem_buffer(size, mem_type), m_entity(entity), m_memh(NULL),
m_rkey_buffer(NULL)
{
ucs_status_t status;
if (flags & (UCP_MEM_MAP_ALLOCATE|UCP_MEM_MAP_FIXED)) {
UCS_TEST_ABORT("mapped_buffer does not support allocation by UCP");
}
ucp_mem_map_params_t params;
params.field_mask = UCP_MEM_MAP_PARAM_FIELD_ADDRESS |
UCP_MEM_MAP_PARAM_FIELD_LENGTH |
UCP_MEM_MAP_PARAM_FIELD_FLAGS;
params.flags = flags;
params.address = ptr();
params.length = size;
status = ucp_mem_map(m_entity.ucph(), ¶ms, &m_memh);
ASSERT_UCS_OK(status);
size_t rkey_buffer_size;
status = ucp_rkey_pack(m_entity.ucph(), m_memh, &m_rkey_buffer,
&rkey_buffer_size);
ASSERT_UCS_OK(status);
}
ucp_test::mapped_buffer::~mapped_buffer()
{
ucp_rkey_buffer_release(m_rkey_buffer);
ucs_status_t status = ucp_mem_unmap(m_entity.ucph(), m_memh);
EXPECT_UCS_OK(status);
}
ucs::handle<ucp_rkey_h> ucp_test::mapped_buffer::rkey(const entity& entity) const
{
ucp_rkey_h rkey;
ucs_status_t status = ucp_ep_rkey_unpack(entity.ep(), m_rkey_buffer, &rkey);
ASSERT_UCS_OK(status);
return ucs::handle<ucp_rkey_h>(rkey, ucp_rkey_destroy);
}
ucp_mem_h ucp_test::mapped_buffer::memh() const
{
return m_memh;
}