/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifndef UCT_IFACE_H_
#define UCT_IFACE_H_
#include "uct_worker.h"
#include <uct/api/uct.h>
#include <uct/base/uct_component.h>
#include <ucs/config/parser.h>
#include <ucs/datastruct/arbiter.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/queue.h>
#include <ucs/debug/log.h>
#include <ucs/stats/stats.h>
#include <ucs/sys/compiler.h>
#include <ucs/sys/sys.h>
#include <ucs/type/class.h>
#include <ucs/datastruct/mpool.inl>
enum {
UCT_EP_STAT_AM,
UCT_EP_STAT_PUT,
UCT_EP_STAT_GET,
UCT_EP_STAT_ATOMIC,
#if IBV_HW_TM
UCT_EP_STAT_TAG,
#endif
UCT_EP_STAT_BYTES_SHORT,
UCT_EP_STAT_BYTES_BCOPY,
UCT_EP_STAT_BYTES_ZCOPY,
UCT_EP_STAT_NO_RES,
UCT_EP_STAT_FLUSH,
UCT_EP_STAT_FLUSH_WAIT, /* number of times flush called while in progress */
UCT_EP_STAT_PENDING,
UCT_EP_STAT_FENCE,
UCT_EP_STAT_LAST
};
enum {
UCT_IFACE_STAT_RX_AM,
UCT_IFACE_STAT_RX_AM_BYTES,
UCT_IFACE_STAT_TX_NO_DESC,
UCT_IFACE_STAT_FLUSH,
UCT_IFACE_STAT_FLUSH_WAIT, /* number of times flush called while in progress */
UCT_IFACE_STAT_FENCE,
UCT_IFACE_STAT_LAST
};
/*
* Statistics macros
*/
#define UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_##_op, 1); \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_BYTES_##_method, _size);
#define UCT_TL_EP_STAT_OP_IF_SUCCESS(_status, _ep, _op, _method, _size) \
if (_status >= 0) { \
UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
}
#define UCT_TL_EP_STAT_ATOMIC(_ep) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_ATOMIC, 1);
#define UCT_TL_EP_STAT_FLUSH(_ep) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH, 1);
#define UCT_TL_EP_STAT_FLUSH_WAIT(_ep) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH_WAIT, 1);
#define UCT_TL_EP_STAT_FENCE(_ep) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FENCE, 1);
#define UCT_TL_EP_STAT_PEND(_ep) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_PENDING, 1);
#define UCT_TL_IFACE_STAT_FLUSH(_iface) \
UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH, 1);
#define UCT_TL_IFACE_STAT_FLUSH_WAIT(_iface) \
UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH_WAIT, 1);
#define UCT_TL_IFACE_STAT_FENCE(_iface) \
UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FENCE, 1);
#define UCT_TL_IFACE_STAT_TX_NO_DESC(_iface) \
UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_TX_NO_DESC, 1);
#define UCT_CB_FLAGS_CHECK(_flags) \
do { \
if ((_flags) & UCT_CB_FLAG_RESERVED) { \
ucs_error("Unsupported callback flag 0x%x", UCT_CB_FLAG_RESERVED); \
return UCS_ERR_INVALID_PARAM; \
} \
} while (0)
/**
* In release mode - do nothing.
*
* In debug mode, if _condition is not true, return an error. This could be less
* optimal because of additional checks, and that compiler needs to generate code
* for error flow as well.
*/
#define UCT_CHECK_PARAM(_condition, _err_message, ...) \
if (ENABLE_PARAMS_CHECK && !(_condition)) { \
ucs_error(_err_message, ## __VA_ARGS__); \
return UCS_ERR_INVALID_PARAM; \
}
/**
* In release mode - do nothing.
*
* In debug mode, if @a _params field mask does not have set
* @ref UCT_EP_PARAM_FIELD_DEV_ADDR and @ref UCT_EP_PARAM_FIELD_IFACE_ADDR
* flags, return an error.
*/
#define UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(_params) \
UCT_CHECK_PARAM(ucs_test_all_flags((_params)->field_mask, \
UCT_EP_PARAM_FIELD_DEV_ADDR | \
UCT_EP_PARAM_FIELD_IFACE_ADDR), \
"UCT_EP_PARAM_FIELD_DEV_ADDR and UCT_EP_PARAM_FIELD_IFACE_ADDR are not defined")
/**
* Check the condition and return status as a pointer if not true.
*/
#define UCT_CHECK_PARAM_PTR(_condition, _err_message, ...) \
if (ENABLE_PARAMS_CHECK && !(_condition)) { \
ucs_error(_err_message, ## __VA_ARGS__); \
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM); \
}
/**
* Check the size of the IOV array
*/
#define UCT_CHECK_IOV_SIZE(_iovcnt, _max_iov, _name) \
UCT_CHECK_PARAM((_iovcnt) <= (_max_iov), \
"iovcnt(%lu) should be limited by %lu in %s", \
_iovcnt, _max_iov, _name)
/**
* In debug mode, if _condition is not true, generate 'Invalid length' error.
*/
#define UCT_CHECK_LENGTH(_length, _min_length, _max_length, _name) \
{ \
typeof(_length) __length = _length; \
UCT_CHECK_PARAM((_length) <= (_max_length), \
"Invalid %s length: %zu (expected: <= %zu)", \
_name, (size_t)(__length), (size_t)(_max_length)); \
UCT_CHECK_PARAM((ssize_t)(_length) >= (_min_length), \
"Invalid %s length: %zu (expected: >= %zu)", \
_name, (size_t)(__length), (size_t)(_min_length)); \
}
/**
* Skip if this is a zero-length operation.
*/
#define UCT_SKIP_ZERO_LENGTH(_length, ...) \
if (0 == (_length)) { \
ucs_trace_data("Zero length request: skip it"); \
UCS_PP_FOREACH(_UCT_RELEASE_DESC, _, __VA_ARGS__) \
return UCS_OK; \
}
#define _UCT_RELEASE_DESC(_, _desc) \
ucs_mpool_put(_desc);
/**
* In debug mode, check that active message ID is valid.
*/
#define UCT_CHECK_AM_ID(_am_id) \
UCT_CHECK_PARAM((_am_id) < UCT_AM_ID_MAX, \
"Invalid active message id (valid range: 0..%d)", \
(int)UCT_AM_ID_MAX - 1)
/**
* Declare classes for structures defined in api/tl.h
*/
UCS_CLASS_DECLARE(uct_iface_h, uct_iface_ops_t, uct_md_h);
UCS_CLASS_DECLARE(uct_ep_t, uct_iface_h);
/**
* Active message handle table entry
*/
typedef struct uct_am_handler {
uct_am_callback_t cb;
void *arg;
uint32_t flags;
} uct_am_handler_t;
/**
* Base structure of all interfaces.
* Includes the AM table which we don't want to expose.
*/
typedef struct uct_base_iface {
uct_iface_t super;
uct_md_h md; /* MD this interface is using */
uct_priv_worker_t *worker; /* Worker this interface is on */
uct_am_handler_t am[UCT_AM_ID_MAX];/* Active message table */
uct_am_tracer_t am_tracer; /* Active message tracer */
void *am_tracer_arg; /* Tracer argument */
uct_error_handler_t err_handler; /* Error handler */
void *err_handler_arg; /* Error handler argument */
uint32_t err_handler_flags; /* Error handler callback flags */
uct_worker_progress_t prog; /* Will be removed once all transports
support progress control */
unsigned progress_flags; /* Which progress is currently enabled */
struct {
unsigned num_alloc_methods;
uct_alloc_method_t alloc_methods[UCT_ALLOC_METHOD_LAST];
ucs_log_level_t failure_level;
size_t max_num_eps;
} config;
UCS_STATS_NODE_DECLARE(stats) /* Statistics */
} uct_base_iface_t;
UCS_CLASS_DECLARE(uct_base_iface_t, uct_iface_ops_t*, uct_md_h, uct_worker_h,
const uct_iface_params_t*, const uct_iface_config_t*
UCS_STATS_ARG(ucs_stats_node_t*) UCS_STATS_ARG(const char*));
/**
* Stub interface used for failed endpoints
*/
typedef struct uct_failed_iface {
uct_iface_t super;
ucs_queue_head_t pend_q;
} uct_failed_iface_t;
/**
* Base structure of all endpoints.
*/
typedef struct uct_base_ep {
uct_ep_t super;
UCS_STATS_NODE_DECLARE(stats)
} uct_base_ep_t;
UCS_CLASS_DECLARE(uct_base_ep_t, uct_base_iface_t*);
/**
* Internal resource descriptor of a transport device
*/
typedef struct uct_tl_device_resource {
char name[UCT_DEVICE_NAME_MAX]; /**< Hardware device name */
uct_device_type_t type; /**< Device type. To which UCT group it belongs to */
} uct_tl_device_resource_t;
/**
* UCT transport definition. This structure should not be used directly; use
* @ref UCT_TL_DEFINE macro to define a transport.
*/
typedef struct uct_tl {
char name[UCT_TL_NAME_MAX]; /**< Transport name */
ucs_status_t (*query_devices)(uct_md_h md,
uct_tl_device_resource_t **tl_devices_p,
unsigned *num_tl_devices_p);
ucs_status_t (*iface_open)(uct_md_h md, uct_worker_h worker,
const uct_iface_params_t *params,
const uct_iface_config_t *config,
uct_iface_h *iface_p);
ucs_config_global_list_entry_t config; /**< Transport configuration entry */
ucs_list_link_t list; /**< Entry in component's transports list */
} uct_tl_t;
/**
* Define a transport
*
* @param _component Component to add the transport to
* @param _name Name of the transport (should be a token, not a string)
* @param _query_devices Function to query the list of available devices
* @param _iface_class Struct type defining the uct_iface class
*/
#define UCT_TL_DEFINE(_component, _name, _query_devices, _iface_class, \
_cfg_prefix, _cfg_table, _cfg_struct) \
\
uct_tl_t uct_##_name##_tl = { \
.name = #_name, \
.query_devices = _query_devices, \
.iface_open = UCS_CLASS_NEW_FUNC_NAME(_iface_class), \
.config = { \
.name = #_name" transport", \
.prefix = _cfg_prefix, \
.table = _cfg_table, \
.size = sizeof(_cfg_struct), \
} \
}; \
UCS_CONFIG_REGISTER_TABLE_ENTRY(&(uct_##_name##_tl).config); \
UCS_STATIC_INIT { \
ucs_list_add_tail(&(_component)->tl_list, &(uct_##_name##_tl).list); \
}
/**
* "Base" structure which defines interface configuration options.
* Specific transport extend this structure.
*/
struct uct_iface_config {
struct {
uct_alloc_method_t *methods;
unsigned count;
} alloc_methods;
int failure; /* Level of failure reports */
size_t max_num_eps;
};
/**
* Memory pool configuration.
*/
typedef struct uct_iface_mpool_config {
unsigned max_bufs; /* Upper limit to number of buffers */
unsigned bufs_grow; /* How many buffers (approx.) are allocated every time */
} uct_iface_mpool_config_t;
/**
* Define configuration fields for memory pool parameters.
*/
#define UCT_IFACE_MPOOL_CONFIG_FIELDS(_prefix, _dfl_max, _dfl_grow, _mp_name, _offset, _desc) \
{_prefix "MAX_BUFS", UCS_PP_QUOTE(_dfl_max), \
"Maximal number of " _mp_name " buffers for the interface. -1 is infinite." \
_desc, \
(_offset) + ucs_offsetof(uct_iface_mpool_config_t, max_bufs), UCS_CONFIG_TYPE_INT}, \
\
{_prefix "BUFS_GROW", UCS_PP_QUOTE(_dfl_grow), \
"How much buffers are added every time the " _mp_name " memory pool grows.\n" \
"0 means the value is chosen by the transport.", \
(_offset) + ucs_offsetof(uct_iface_mpool_config_t, bufs_grow), UCS_CONFIG_TYPE_UINT}
/**
* Get a descriptor from memory pool, tell valgrind it's already defined, return
* error if the memory pool is empty.
*
* @param _mp Memory pool to get descriptor from.
* @param _desc Variable to assign descriptor to.
* @param _failure What do to if memory poll is empty.
*
* @return TX descriptor fetched from memory pool.
*/
#define UCT_TL_IFACE_GET_TX_DESC(_iface, _mp, _desc, _failure) \
{ \
_desc = ucs_mpool_get_inline(_mp); \
if (ucs_unlikely((_desc) == NULL)) { \
UCT_TL_IFACE_STAT_TX_NO_DESC(_iface); \
_failure; \
} \
\
VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
}
#define UCT_TL_IFACE_GET_RX_DESC(_iface, _mp, _desc, _failure) \
{ \
_desc = ucs_mpool_get_inline(_mp); \
if (ucs_unlikely((_desc) == NULL)) { \
uct_iface_mpool_empty_warn(_iface, _mp); \
_failure; \
} \
\
VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
}
#define UCT_TL_IFACE_PUT_DESC(_desc) \
{ \
ucs_mpool_put_inline(_desc); \
VALGRIND_MAKE_MEM_UNDEFINED(_desc, sizeof(*(_desc))); \
}
/**
* TL Memory pool object initialization callback.
*/
typedef void (*uct_iface_mpool_init_obj_cb_t)(uct_iface_h iface, void *obj,
uct_mem_h memh);
/**
* Base structure for private data held inside a pending request for TLs
* which use ucs_arbiter_t to progress pending requests.
*/
typedef struct {
ucs_arbiter_elem_t arb_elem;
} uct_pending_req_priv_arb_t;
static UCS_F_ALWAYS_INLINE ucs_arbiter_elem_t *
uct_pending_req_priv_arb_elem(uct_pending_req_t *req)
{
uct_pending_req_priv_arb_t *priv_arb_p =
(uct_pending_req_priv_arb_t *)&req->priv;
return &priv_arb_p->arb_elem;
}
/**
* Add a pending request to the arbiter.
*/
#define uct_pending_req_arb_group_push(_arbiter_group, _req) \
do { \
ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
ucs_arbiter_group_push_elem(_arbiter_group, \
uct_pending_req_priv_arb_elem(_req)); \
} while (0)
/**
* Add a pending request to the head of group in arbiter.
*/
#define uct_pending_req_arb_group_push_head(_arbiter, _arbiter_group, _req) \
do { \
ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
ucs_arbiter_group_push_head_elem(_arbiter, _arbiter_group, \
uct_pending_req_priv_arb_elem(_req)); \
} while (0)
/**
* Base structure for private data held inside a pending request for TLs
* which use ucs_queue_t to progress pending requests.
*/
typedef struct {
ucs_queue_elem_t queue_elem;
} uct_pending_req_priv_queue_t;
static UCS_F_ALWAYS_INLINE ucs_queue_elem_t *
uct_pending_req_priv_queue_elem(uct_pending_req_t* req)
{
uct_pending_req_priv_queue_t *priv_queue_p =
(uct_pending_req_priv_queue_t *)&req->priv;
return &priv_queue_p->queue_elem;
}
/**
* Add a pending request to the queue.
*/
#define uct_pending_req_queue_push(_queue, _req) \
ucs_queue_push((_queue), uct_pending_req_priv_queue_elem(_req))
typedef struct {
uct_pending_purge_callback_t cb;
void *arg;
} uct_purge_cb_args_t;
/**
* Dispatch all requests in the pending queue, as long as _cond holds true.
* _cond is an expression which can use "_priv" variable.
*
* @param _priv Variable which will hold a pointer to request private data.
* @param _queue The pending queue.
* @param _cond Condition which should be true in order to keep dispatching.
*
* TODO support a callback returning UCS_INPROGRESS.
*/
#define uct_pending_queue_dispatch(_priv, _queue, _cond) \
while (!ucs_queue_is_empty(_queue)) { \
uct_pending_req_priv_queue_t *_base_priv; \
uct_pending_req_t *_req; \
ucs_status_t _status; \
\
_base_priv = \
ucs_queue_head_elem_non_empty((_queue), \
uct_pending_req_priv_queue_t, \
queue_elem); \
\
UCS_STATIC_ASSERT(sizeof(*(_priv)) <= UCT_PENDING_REQ_PRIV_LEN); \
_priv = (typeof(_priv))(_base_priv); \
\
if (!(_cond)) { \
break; \
} \
\
_req = ucs_container_of(_priv, uct_pending_req_t, priv); \
ucs_queue_pull_non_empty(_queue); \
_status = _req->func(_req); \
if (_status != UCS_OK) { \
ucs_queue_push_head(_queue, &_base_priv->queue_elem); \
} \
}
/**
* Purge messages from the pending queue.
*
* @param _priv Variable which will hold a pointer to request private data.
* @param _queue The pending queue.
* @param _cond Condition which should be true in order to remove a request.
* @param _cb Callback for purging the request.
* @return Callback return value.
*/
#define uct_pending_queue_purge(_priv, _queue, _cond, _cb, _arg) \
{ \
uct_pending_req_priv_queue_t *_base_priv; \
ucs_queue_iter_t _iter; \
\
ucs_queue_for_each_safe(_base_priv, _iter, _queue, queue_elem) { \
_priv = (typeof(_priv))(_base_priv); \
if (_cond) { \
ucs_queue_del_iter(_queue, _iter); \
(void)_cb(ucs_container_of(_base_priv, uct_pending_req_t, priv), _arg); \
} \
} \
}
/**
* Helper macro to trace active message send/receive.
*
* @param _iface Interface.
* @param _type Message type (send/receive)
* @param _am_id Active message ID.
* @param _payload Active message payload.
* @paral _length Active message length
*/
#define uct_iface_trace_am(_iface, _type, _am_id, _payload, _length, _fmt, ...) \
if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { \
char buf[256] = {0}; \
uct_iface_dump_am(_iface, _type, _am_id, _payload, _length, \
buf, sizeof(buf) - 1); \
ucs_trace_data(_fmt " am_id %d len %zu %s", ## __VA_ARGS__, \
_am_id, (size_t)(_length), buf); \
}
extern ucs_config_field_t uct_iface_config_table[];
/**
* Initialize a memory pool for buffers used by TL interface.
*
* @param mp
* @param elem_size
* @param align_offset
* @param alignment Data will be aligned to these units.
* @param config Memory pool configuration.
* @param grow Default number of buffers added for every chunk.
* @param init_obj_cb Object constructor.
* @param name Memory pool name.
*/
ucs_status_t uct_iface_mpool_init(uct_base_iface_t *iface, ucs_mpool_t *mp,
size_t elem_size, size_t align_offset, size_t alignment,
const uct_iface_mpool_config_t *config, unsigned grow,
uct_iface_mpool_init_obj_cb_t init_obj_cb,
const char *name);
/**
* Dump active message contents using the user-defined tracer callback.
*/
void uct_iface_dump_am(uct_base_iface_t *iface, uct_am_trace_type_t type,
uint8_t id, const void *data, size_t length,
char *buffer, size_t max);
void uct_iface_mpool_empty_warn(uct_base_iface_t *iface, ucs_mpool_t *mp);
ucs_status_t uct_set_ep_failed(ucs_class_t* cls, uct_ep_h tl_ep, uct_iface_h
tl_iface, ucs_status_t status);
void uct_base_iface_query(uct_base_iface_t *iface, uct_iface_attr_t *iface_attr);
ucs_status_t uct_single_device_resource(uct_md_h md, const char *dev_name,
uct_device_type_t dev_type,
uct_tl_device_resource_t **tl_devices_p,
unsigned *num_tl_devices_p);
ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp);
ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags);
void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags);
void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface,
ucs_callback_t cb, unsigned flags);
void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags);
ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp);
ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags);
/*
* Invoke active message handler.
*
* @param iface Interface to invoke the handler for.
* @param id Active message ID.
* @param data Received data.
* @param length Length of received data.
* @param flags Mask with @ref uct_cb_param_flags
*/
static inline ucs_status_t
uct_iface_invoke_am(uct_base_iface_t *iface, uint8_t id, void *data,
unsigned length, unsigned flags)
{
ucs_status_t status;
uct_am_handler_t *handler;
ucs_assertv(id < UCT_AM_ID_MAX, "invalid am id: %d (max: %lu)",
id, UCT_AM_ID_MAX - 1);
UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM, 1);
UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM_BYTES, length);
handler = &iface->am[id];
status = handler->cb(handler->arg, data, length, flags);
ucs_assert((status == UCS_OK) ||
((status == UCS_INPROGRESS) && (flags & UCT_CB_PARAM_FLAG_DESC)));
return status;
}
/**
* Invoke send completion.
*
* @param comp Completion to invoke.
* @param data Optional completion data (operation reply).
*/
static UCS_F_ALWAYS_INLINE
void uct_invoke_completion(uct_completion_t *comp, ucs_status_t status)
{
ucs_trace_func("comp=%p, count=%d, status=%d", comp, comp->count, status);
if (--comp->count == 0) {
comp->func(comp, status);
}
}
/**
* Calculates total length of particular iov data buffer.
* Currently has no support for stride.
* If stride supported it should be like: length + ((count - 1) * stride)
*/
static UCS_F_ALWAYS_INLINE
size_t uct_iov_get_length(const uct_iov_t *iov)
{
return iov->count * iov->length;
}
/**
* Calculates total length of the iov array buffers.
*/
static UCS_F_ALWAYS_INLINE
size_t uct_iov_total_length(const uct_iov_t *iov, size_t iovcnt)
{
size_t iov_it, total_length = 0;
for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
total_length += uct_iov_get_length(&iov[iov_it]);
}
return total_length;
}
/**
* Fill iovec data structure by data provided in uct_iov_t.
* The function avoids copying IOVs with zero length.
* @return Number of elements in io_vec[].
*/
static UCS_F_ALWAYS_INLINE
size_t uct_iovec_fill_iov(struct iovec *io_vec, const uct_iov_t *iov,
size_t iovcnt, size_t *total_length)
{
size_t io_vec_it = 0;
size_t io_vec_len = 0;
size_t iov_it;
*total_length = 0;
for (iov_it = 0; iov_it < iovcnt; ++iov_it) {
io_vec_len = uct_iov_get_length(&iov[iov_it]);
/* Avoid zero length elements in resulted iov_vec */
if (io_vec_len != 0) {
io_vec[io_vec_it].iov_len = io_vec_len;
io_vec[io_vec_it].iov_base = iov[iov_it].buffer;
*total_length += io_vec_len;
++io_vec_it;
}
}
return io_vec_it;
}
/**
* Copy data to target am_short buffer
*/
static UCS_F_ALWAYS_INLINE
void uct_am_short_fill_data(void *buffer, uint64_t header, const void *payload,
size_t length)
{
/**
* Helper structure to fill send buffer of short messages for
* non-accelerated transports
*/
struct uct_am_short_packet {
uint64_t header;
char payload[];
} UCS_S_PACKED *packet = (struct uct_am_short_packet*)buffer;
packet->header = header;
/* suppress false positive diagnostic from uct_mm_ep_am_common_send call */
/* cppcheck-suppress ctunullpointer */
memcpy(packet->payload, payload, length);
}
#endif