/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "self.h"
#include <uct/sm/base/sm_ep.h>
#include <uct/sm/base/sm_iface.h>
#include <ucs/type/class.h>
#include <ucs/sys/string.h>
#include <ucs/arch/cpu.h>
#include "self.h"
#define UCT_SELF_NAME "self"
#define UCT_SELF_IFACE_SEND_BUFFER_GET(_iface) \
({ /* use buffers from mpool to avoid buffer re-usage */ \
/* till operation completes */ \
void *ptr = ucs_mpool_get_inline(&(_iface)->msg_mp); \
if (ucs_unlikely(ptr == NULL)) { \
return UCS_ERR_NO_MEMORY; \
} \
ptr; \
})
/* Forward declarations */
static uct_iface_ops_t uct_self_iface_ops;
static uct_component_t uct_self_component;
static ucs_config_field_t uct_self_iface_config_table[] = {
{"", "", NULL,
ucs_offsetof(uct_self_iface_config_t, super),
UCS_CONFIG_TYPE_TABLE(uct_iface_config_table)},
{"SEG_SIZE", "8k",
"Size of copy-out buffer",
ucs_offsetof(uct_self_iface_config_t, seg_size), UCS_CONFIG_TYPE_MEMUNITS},
{NULL}
};
static ucs_status_t uct_self_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *attr)
{
uct_self_iface_t *iface = ucs_derived_of(tl_iface, uct_self_iface_t);
ucs_trace_func("iface=%p", iface);
uct_base_iface_query(&iface->super, attr);
attr->iface_addr_len = sizeof(uct_self_iface_addr_t);
attr->device_addr_len = 0;
attr->ep_addr_len = 0;
attr->max_conn_priv = 0;
attr->cap.flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE |
UCT_IFACE_FLAG_AM_SHORT |
UCT_IFACE_FLAG_AM_BCOPY |
UCT_IFACE_FLAG_PUT_SHORT |
UCT_IFACE_FLAG_PUT_BCOPY |
UCT_IFACE_FLAG_GET_BCOPY |
UCT_IFACE_FLAG_ATOMIC_CPU |
UCT_IFACE_FLAG_PENDING |
UCT_IFACE_FLAG_CB_SYNC |
UCT_IFACE_FLAG_EP_CHECK;
attr->cap.atomic32.op_flags =
attr->cap.atomic64.op_flags = UCS_BIT(UCT_ATOMIC_OP_ADD) |
UCS_BIT(UCT_ATOMIC_OP_AND) |
UCS_BIT(UCT_ATOMIC_OP_OR) |
UCS_BIT(UCT_ATOMIC_OP_XOR);
attr->cap.atomic32.fop_flags =
attr->cap.atomic64.fop_flags = UCS_BIT(UCT_ATOMIC_OP_ADD) |
UCS_BIT(UCT_ATOMIC_OP_AND) |
UCS_BIT(UCT_ATOMIC_OP_OR) |
UCS_BIT(UCT_ATOMIC_OP_XOR) |
UCS_BIT(UCT_ATOMIC_OP_SWAP) |
UCS_BIT(UCT_ATOMIC_OP_CSWAP);
attr->cap.put.max_short = UINT_MAX;
attr->cap.put.max_bcopy = SIZE_MAX;
attr->cap.put.min_zcopy = 0;
attr->cap.put.max_zcopy = 0;
attr->cap.put.opt_zcopy_align = 1;
attr->cap.put.align_mtu = attr->cap.put.opt_zcopy_align;
attr->cap.put.max_iov = 1;
attr->cap.get.max_bcopy = SIZE_MAX;
attr->cap.get.min_zcopy = 0;
attr->cap.get.max_zcopy = 0;
attr->cap.get.opt_zcopy_align = 1;
attr->cap.get.align_mtu = attr->cap.get.opt_zcopy_align;
attr->cap.get.max_iov = 1;
attr->cap.am.max_short = iface->send_size;
attr->cap.am.max_bcopy = iface->send_size;
attr->cap.am.min_zcopy = 0;
attr->cap.am.max_zcopy = 0;
attr->cap.am.opt_zcopy_align = 1;
attr->cap.am.align_mtu = attr->cap.am.opt_zcopy_align;
attr->cap.am.max_hdr = 0;
attr->cap.am.max_iov = 1;
attr->latency.overhead = 0;
attr->latency.growth = 0;
attr->bandwidth.dedicated = 6911 * 1024.0 * 1024.0;
attr->bandwidth.shared = 0;
attr->overhead = 10e-9;
attr->priority = 0;
return UCS_OK;
}
static ucs_status_t uct_self_iface_get_address(uct_iface_h tl_iface,
uct_iface_addr_t *addr)
{
const uct_self_iface_t *iface = ucs_derived_of(tl_iface, uct_self_iface_t);
*(uct_self_iface_addr_t*)addr = iface->id;
return UCS_OK;
}
static int uct_self_iface_is_reachable(const uct_iface_h tl_iface,
const uct_device_addr_t *dev_addr,
const uct_iface_addr_t *iface_addr)
{
const uct_self_iface_t *iface = ucs_derived_of(tl_iface, uct_self_iface_t);
const uct_self_iface_addr_t *addr = (const uct_self_iface_addr_t*)iface_addr;
return (addr != NULL) && (iface->id == *addr);
}
static void uct_self_iface_sendrecv_am(uct_self_iface_t *iface, uint8_t am_id,
void *buffer, size_t length, const char *title)
{
ucs_status_t UCS_V_UNUSED status;
uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, am_id,
buffer, length, "TX: AM_%s", title);
uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_RECV, am_id,
buffer, length, "RX: AM_%s", title);
status = uct_iface_invoke_am(&iface->super, am_id, buffer,
length, 0);
ucs_assert(status == UCS_OK);
ucs_mpool_put_inline(buffer);
}
static ucs_mpool_ops_t uct_self_iface_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = NULL,
.obj_cleanup = NULL
};
static UCS_CLASS_DEFINE_DELETE_FUNC(uct_self_iface_t, uct_iface_t);
static UCS_CLASS_INIT_FUNC(uct_self_iface_t, uct_md_h md, uct_worker_h worker,
const uct_iface_params_t *params,
const uct_iface_config_t *tl_config)
{
uct_self_iface_config_t *config = ucs_derived_of(tl_config,
uct_self_iface_config_t);
ucs_status_t status;
UCT_CHECK_PARAM(params->field_mask & UCT_IFACE_PARAM_FIELD_OPEN_MODE,
"UCT_IFACE_PARAM_FIELD_OPEN_MODE is not defined");
if (!(params->open_mode & UCT_IFACE_OPEN_MODE_DEVICE)) {
ucs_error("Self transport supports only UCT_IFACE_OPEN_MODE_DEVICE");
return UCS_ERR_UNSUPPORTED;
}
if (ucs_derived_of(worker, uct_priv_worker_t)->thread_mode == UCS_THREAD_MODE_MULTI) {
ucs_error("Self transport does not support multi-threaded worker");
return UCS_ERR_INVALID_PARAM;
}
UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_self_iface_ops, md, worker,
params, tl_config
UCS_STATS_ARG((params->field_mask &
UCT_IFACE_PARAM_FIELD_STATS_ROOT) ?
params->stats_root : NULL)
UCS_STATS_ARG(UCT_SELF_NAME));
self->id = ucs_generate_uuid((uintptr_t)self);
self->send_size = config->seg_size;
status = ucs_mpool_init(&self->msg_mp, 0, self->send_size, 0,
UCS_SYS_CACHE_LINE_SIZE,
2, /* 2 elements are enough for most of communications */
UINT_MAX, &uct_self_iface_mpool_ops, "self_msg_desc");
if (UCS_STATUS_IS_ERR(status)) {
return status;
}
ucs_debug("created self iface id 0x%lx send_size %zu", self->id,
self->send_size);
return UCS_OK;
}
static UCS_CLASS_CLEANUP_FUNC(uct_self_iface_t)
{
ucs_mpool_cleanup(&self->msg_mp, 1);
}
UCS_CLASS_DEFINE(uct_self_iface_t, uct_base_iface_t);
static UCS_CLASS_DEFINE_NEW_FUNC(uct_self_iface_t, uct_iface_t, uct_md_h,
uct_worker_h, const uct_iface_params_t*,
const uct_iface_config_t*);
static ucs_status_t
uct_self_query_tl_devices(uct_md_h md, uct_tl_device_resource_t **tl_devices_p,
unsigned *num_tl_devices_p)
{
return uct_single_device_resource(md, UCT_SM_DEVICE_NAME,
UCT_DEVICE_TYPE_SELF,
tl_devices_p, num_tl_devices_p);
}
static UCS_CLASS_INIT_FUNC(uct_self_ep_t, const uct_ep_params_t *params)
{
uct_self_iface_t *iface = ucs_derived_of(params->iface, uct_self_iface_t);
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super)
return UCS_OK;
}
static UCS_CLASS_CLEANUP_FUNC(uct_self_ep_t)
{
}
UCS_CLASS_DEFINE(uct_self_ep_t, uct_base_ep_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_self_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_self_ep_t, uct_ep_t);
ucs_status_t uct_self_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t header,
const void *payload, unsigned length)
{
uct_self_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_self_iface_t);
uct_self_ep_t UCS_V_UNUSED *ep = ucs_derived_of(tl_ep, uct_self_ep_t);
size_t total_length;
void *send_buffer;
UCT_CHECK_AM_ID(id);
total_length = length + sizeof(header);
UCT_CHECK_LENGTH(total_length, 0, iface->send_size, "am_short");
send_buffer = UCT_SELF_IFACE_SEND_BUFFER_GET(iface);
uct_am_short_fill_data(send_buffer, header, payload, length);
UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, total_length);
uct_self_iface_sendrecv_am(iface, id, send_buffer, total_length, "SHORT");
return UCS_OK;
}
ssize_t uct_self_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,
uct_pack_callback_t pack_cb, void *arg,
unsigned flags)
{
uct_self_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_self_iface_t);
uct_self_ep_t UCS_V_UNUSED *ep = ucs_derived_of(tl_ep, uct_self_ep_t);
size_t length;
void *send_buffer;
UCT_CHECK_AM_ID(id);
send_buffer = UCT_SELF_IFACE_SEND_BUFFER_GET(iface);
length = pack_cb(send_buffer, arg);
UCT_CHECK_LENGTH(length, 0, iface->send_size, "am_bcopy");
UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, length);
uct_self_iface_sendrecv_am(iface, id, send_buffer, length, "BCOPY");
return length;
}
static uct_iface_ops_t uct_self_iface_ops = {
.ep_put_short = uct_sm_ep_put_short,
.ep_put_bcopy = uct_sm_ep_put_bcopy,
.ep_get_bcopy = uct_sm_ep_get_bcopy,
.ep_am_short = uct_self_ep_am_short,
.ep_am_bcopy = uct_self_ep_am_bcopy,
.ep_atomic_cswap64 = uct_sm_ep_atomic_cswap64,
.ep_atomic64_post = uct_sm_ep_atomic64_post,
.ep_atomic64_fetch = uct_sm_ep_atomic64_fetch,
.ep_atomic_cswap32 = uct_sm_ep_atomic_cswap32,
.ep_atomic32_post = uct_sm_ep_atomic32_post,
.ep_atomic32_fetch = uct_sm_ep_atomic32_fetch,
.ep_flush = uct_base_ep_flush,
.ep_fence = uct_base_ep_fence,
.ep_check = ucs_empty_function_return_success,
.ep_pending_add = ucs_empty_function_return_busy,
.ep_pending_purge = ucs_empty_function,
.ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_self_ep_t),
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_self_ep_t),
.iface_flush = uct_base_iface_flush,
.iface_fence = uct_base_iface_fence,
.iface_progress_enable = ucs_empty_function,
.iface_progress_disable = ucs_empty_function,
.iface_progress = ucs_empty_function_return_zero,
.iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_self_iface_t),
.iface_query = uct_self_iface_query,
.iface_get_device_address = ucs_empty_function_return_success,
.iface_get_address = uct_self_iface_get_address,
.iface_is_reachable = uct_self_iface_is_reachable
};
UCT_TL_DEFINE(&uct_self_component, self, uct_self_query_tl_devices, uct_self_iface_t,
"SELF_", uct_self_iface_config_table, uct_self_iface_config_t);
static ucs_status_t uct_self_md_query(uct_md_h md, uct_md_attr_t *attr)
{
/* Dummy memory registration provided. No real memory handling exists */
attr->cap.flags = UCT_MD_FLAG_REG |
UCT_MD_FLAG_NEED_RKEY; /* TODO ignore rkey in rma/amo ops */
attr->cap.reg_mem_types = UCS_MEMORY_TYPES_CPU_ACCESSIBLE;
attr->cap.detect_mem_types = 0;
attr->cap.access_mem_type = UCS_MEMORY_TYPE_HOST;
attr->cap.max_alloc = 0;
attr->cap.max_reg = ULONG_MAX;
attr->rkey_packed_size = 0; /* uct_md_query adds UCT_COMPONENT_NAME_MAX to this */
attr->reg_cost.overhead = 0;
attr->reg_cost.growth = 0;
memset(&attr->local_cpus, 0xff, sizeof(attr->local_cpus));
return UCS_OK;
}
static ucs_status_t uct_self_mem_reg(uct_md_h md, void *address, size_t length,
unsigned flags, uct_mem_h *memh_p)
{
/* We have to emulate memory registration. Return dummy pointer */
*memh_p = (void *) 0xdeadbeef;
return UCS_OK;
}
static ucs_status_t uct_self_md_open(uct_component_t *component, const char *md_name,
const uct_md_config_t *config, uct_md_h *md_p)
{
static uct_md_ops_t md_ops = {
.close = ucs_empty_function,
.query = uct_self_md_query,
.mkey_pack = ucs_empty_function_return_success,
.mem_reg = uct_self_mem_reg,
.mem_dereg = ucs_empty_function_return_success,
.detect_memory_type = ucs_empty_function_return_unsupported
};
static uct_md_t md = {
.ops = &md_ops,
.component = &uct_self_component
};
*md_p = &md;
return UCS_OK;
}
static ucs_status_t uct_self_md_rkey_unpack(uct_component_t *component,
const void *rkey_buffer, uct_rkey_t *rkey_p,
void **handle_p)
{
/**
* Pseudo stub function for the key unpacking
* Need rkey == 0 due to work with same process to reuse uct_base_[put|get|atomic]*
*/
*rkey_p = 0;
*handle_p = NULL;
return UCS_OK;
}
static uct_component_t uct_self_component = {
.query_md_resources = uct_md_query_single_md_resource,
.md_open = uct_self_md_open,
.cm_open = ucs_empty_function_return_unsupported,
.rkey_unpack = uct_self_md_rkey_unpack,
.rkey_ptr = ucs_empty_function_return_unsupported,
.rkey_release = ucs_empty_function_return_success,
.name = UCT_SELF_NAME,
.md_config = UCT_MD_DEFAULT_CONFIG_INITIALIZER,
.cm_config = UCS_CONFIG_EMPTY_GLOBAL_LIST_ENTRY,
.tl_list = UCT_COMPONENT_TL_LIST_INITIALIZER(&uct_self_component),
.flags = 0
};
UCT_COMPONENT_REGISTER(&uct_self_component);