/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifndef UCP_WORKER_H_
#define UCP_WORKER_H_
#include "ucp_ep.h"
#include "ucp_context.h"
#include "ucp_thread.h"
#include <ucp/tag/tag_match.h>
#include <ucp/wireup/ep_match.h>
#include <ucs/datastruct/mpool.h>
#include <ucs/datastruct/queue_types.h>
#include <ucs/datastruct/strided_alloc.h>
#include <ucs/arch/bitops.h>
/* The size of the private buffer in UCT descriptor headroom, which UCP may
* use for its own needs. This size does not include ucp_recv_desc_t length,
* because it is common for all cases and protocols (TAG, STREAM). */
#define UCP_WORKER_HEADROOM_PRIV_SIZE 32
#if ENABLE_MT
#define UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(_worker) \
do { \
if ((_worker)->flags & UCP_WORKER_FLAG_MT) { \
UCS_ASYNC_BLOCK(&(_worker)->async); \
} \
} while (0)
#define UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(_worker) \
do { \
if ((_worker)->flags & UCP_WORKER_FLAG_MT) { \
UCS_ASYNC_UNBLOCK(&(_worker)->async); \
} \
} while (0)
#else
#define UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(_worker)
#define UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(_worker)
#endif
/**
* UCP worker flags
*/
enum {
UCP_WORKER_FLAG_EXTERNAL_EVENT_FD = UCS_BIT(0), /**< worker event fd is external */
UCP_WORKER_FLAG_EDGE_TRIGGERED = UCS_BIT(1), /**< events are edge-triggered */
UCP_WORKER_FLAG_MT = UCS_BIT(2) /**< MT locking is required */
};
/**
* UCP iface flags
*/
enum {
UCP_WORKER_IFACE_FLAG_OFFLOAD_ACTIVATED = UCS_BIT(0), /**< UCP iface receives tag
offload messages */
UCP_WORKER_IFACE_FLAG_ON_ARM_LIST = UCS_BIT(1), /**< UCP iface is an element
of arm_ifaces list, so
it needs to be armed
in ucp_worker_arm(). */
UCP_WORKER_IFACE_FLAG_UNUSED = UCS_BIT(2) /**< There is another UCP iface
with the same caps, but
with better performance */
};
/**
* UCP worker statistics counters
*/
enum {
/* Total number of received eager messages */
UCP_WORKER_STAT_TAG_RX_EAGER_MSG,
UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG,
/* Total number of received eager chunks (every message
* can be split into a bunch of chunks). It is possible that
* some chunks of the message arrived unexpectedly and then
* receive had been posted and the rest arrived expectedly */
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP,
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP,
UCP_WORKER_STAT_TAG_RX_RNDV_EXP,
UCP_WORKER_STAT_TAG_RX_RNDV_UNEXP,
UCP_WORKER_STAT_LAST
};
/**
* UCP worker tag offload statistics counters
*/
enum {
UCP_WORKER_STAT_TAG_OFFLOAD_POSTED,
UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED,
UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV,
UCP_WORKER_STAT_TAG_OFFLOAD_CANCELED,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NON_CONTIG,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_WILDCARD,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_SW_PEND,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NO_IFACE,
UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_MEM_REG,
UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_EGR,
UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_RNDV,
UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV,
UCP_WORKER_STAT_TAG_OFFLOAD_LAST
};
#define UCP_WORKER_UCT_RECV_EVENT_ARM_FLAGS (UCT_EVENT_RECV | \
UCT_EVENT_RECV_SIG)
#define UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS (UCT_IFACE_FLAG_EVENT_RECV | \
UCT_IFACE_FLAG_EVENT_RECV_SIG)
#define UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS (UCT_IFACE_FLAG_EVENT_SEND_COMP | \
UCT_IFACE_FLAG_EVENT_RECV | \
UCT_IFACE_FLAG_EVENT_RECV_SIG)
#define UCP_WORKER_UCT_UNSIG_EVENT_CAP_FLAGS (UCT_IFACE_FLAG_EVENT_SEND_COMP | \
UCT_IFACE_FLAG_EVENT_RECV)
#define UCP_WORKER_STAT_EAGER_MSG(_worker, _flags) \
UCS_STATS_UPDATE_COUNTER((_worker)->stats, \
(_flags & UCP_RECV_DESC_FLAG_EAGER_SYNC) ? \
UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG : \
UCP_WORKER_STAT_TAG_RX_EAGER_MSG, 1);
#define UCP_WORKER_STAT_EAGER_CHUNK(_worker, _is_exp) \
UCS_STATS_UPDATE_COUNTER((_worker)->stats, \
UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_##_is_exp, 1);
#define UCP_WORKER_STAT_RNDV(_worker, _is_exp) \
UCS_STATS_UPDATE_COUNTER((_worker)->stats, \
UCP_WORKER_STAT_TAG_RX_RNDV_##_is_exp, 1);
#define UCP_WORKER_STAT_TAG_OFFLOAD(_worker, _name) \
UCS_STATS_UPDATE_COUNTER((_worker)->tm_offload_stats, \
UCP_WORKER_STAT_TAG_OFFLOAD_##_name, 1);
#define ucp_worker_mpool_get(_mp) \
({ \
ucp_mem_desc_t *rdesc = ucs_mpool_get_inline((_mp)); \
if (rdesc != NULL) { \
VALGRIND_MAKE_MEM_DEFINED(rdesc, sizeof(*rdesc)); \
} \
rdesc; \
})
/**
* UCP worker iface, which encapsulates UCT iface, its attributes and
* some auxiliary info needed for tag matching offloads.
*/
struct ucp_worker_iface {
uct_iface_h iface; /* UCT interface */
uct_iface_attr_t attr; /* UCT interface attributes */
ucp_worker_h worker; /* The parent worker */
ucs_list_link_t arm_list; /* Element in arm_ifaces list */
ucp_rsc_index_t rsc_index; /* Resource index */
int event_fd; /* Event FD, or -1 if undefined */
unsigned activate_count;/* How many times this iface has
been activated */
int check_events_id;/* Callback id for check_events */
unsigned proxy_recv_count;/* Counts active messages on proxy handler */
unsigned post_count; /* Counts uncompleted requests which are
offloaded to the transport */
uint8_t flags; /* Interface flags */
};
/**
* UCP worker CM, which encapsulates UCT CM and its auxiliary info.
*/
struct ucp_worker_cm {
uct_cm_h cm; /* UCT CM handle */
ucp_rsc_index_t cmpt_idx; /* Index of corresponding
component */
};
/**
* Data that is stored about each callback registered with a worker
*/
typedef struct ucp_worker_am_entry {
ucp_am_callback_t cb;
void *context;
uint32_t flags;
} ucp_worker_am_entry_t;
/**
* UCP worker (thread context).
*/
typedef struct ucp_worker {
unsigned flags; /* Worker flags */
ucs_async_context_t async; /* Async context for this worker */
ucp_context_h context; /* Back-reference to UCP context */
uint64_t uuid; /* Unique ID for wireup */
uct_worker_h uct; /* UCT worker handle */
ucs_mpool_t req_mp; /* Memory pool for requests */
ucs_mpool_t rkey_mp; /* Pool for small memory keys */
uint64_t atomic_tls; /* Which resources can be used for atomics */
int inprogress;
char name[UCP_WORKER_NAME_MAX]; /* Worker name */
unsigned flush_ops_count;/* Number of pending operations */
int event_fd; /* Allocated (on-demand) event fd for wakeup */
ucs_sys_event_set_t *event_set; /* Allocated UCS event set for wakeup */
int eventfd; /* Event fd to support signal() calls */
unsigned uct_events; /* UCT arm events */
ucs_list_link_t arm_ifaces; /* List of interfaces to arm */
void *user_data; /* User-defined data */
ucs_strided_alloc_t ep_alloc; /* Endpoint allocator */
ucs_list_link_t stream_ready_eps; /* List of EPs with received stream data */
ucs_list_link_t all_eps; /* List of all endpoints */
ucp_ep_match_ctx_t ep_match_ctx; /* Endpoint-to-endpoint matching context */
ucp_worker_iface_t **ifaces; /* Array of pointers to interfaces,
one for each resource */
unsigned num_ifaces; /* Number of elements in ifaces array */
unsigned num_active_ifaces; /* Number of activated ifaces */
uint64_t scalable_tl_bitmap; /* Map of scalable tl resources */
ucp_worker_cm_t *cms; /* Array of CMs, one for each component */
ucs_mpool_t am_mp; /* Memory pool for AM receives */
ucs_mpool_t reg_mp; /* Registered memory pool */
ucs_mpool_t rndv_frag_mp; /* Memory pool for RNDV fragments */
ucp_tag_match_t tm; /* Tag-matching queues and offload info */
uint64_t am_message_id; /* For matching long am's */
ucp_ep_h mem_type_ep[UCS_MEMORY_TYPE_LAST];/* memory type eps */
UCS_STATS_NODE_DECLARE(stats)
UCS_STATS_NODE_DECLARE(tm_offload_stats)
ucp_worker_am_entry_t *am_cbs; /*array of callbacks and their data */
size_t am_cb_array_len; /*len of callback array */
ucs_cpu_set_t cpu_mask; /* Save CPU mask for subsequent calls to ucp_worker_listen */
unsigned ep_config_max; /* Maximal number of configurations */
unsigned ep_config_count; /* Current number of configurations */
ucp_ep_config_t ep_config[0]; /* Array of transport limits and thresholds */
} ucp_worker_t;
/**
* UCP worker argument for the error handling callback
*/
typedef struct ucp_worker_err_handle_arg {
ucp_worker_h worker;
ucp_ep_h ucp_ep;
uct_ep_h uct_ep;
ucp_lane_index_t failed_lane;
ucs_status_t status;
} ucp_worker_err_handle_arg_t;
ucs_status_t ucp_worker_get_ep_config(ucp_worker_h worker,
const ucp_ep_config_key_t *key,
int print_cfg,
ucp_ep_cfg_index_t *config_idx_p);
ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id,
uct_iface_params_t *iface_params,
ucp_worker_iface_t **wiface);
ucs_status_t ucp_worker_iface_init(ucp_worker_h worker, ucp_rsc_index_t tl_id,
ucp_worker_iface_t *wiface);
void ucp_worker_iface_cleanup(ucp_worker_iface_t *wiface);
void ucp_worker_iface_progress_ep(ucp_worker_iface_t *wiface);
void ucp_worker_iface_unprogress_ep(ucp_worker_iface_t *wiface);
void ucp_worker_signal_internal(ucp_worker_h worker);
void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags);
int ucp_worker_err_handle_remove_filter(const ucs_callbackq_elem_t *elem,
void *arg);
ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep,
uct_ep_h uct_ep, ucp_lane_index_t lane,
ucs_status_t status);
static inline const char* ucp_worker_get_name(ucp_worker_h worker)
{
return worker->name;
}
/* get ep by pointer received from remote side, do some debug checks */
static inline ucp_ep_h ucp_worker_get_ep_by_ptr(ucp_worker_h worker,
uintptr_t ep_ptr)
{
ucp_ep_h ep = (ucp_ep_h)ep_ptr;
ucs_assert(ep != NULL);
ucs_assertv(ep->worker == worker, "worker=%p ep=%p ep->worker=%p", worker,
ep, ep->worker);
return ep;
}
static UCS_F_ALWAYS_INLINE ucp_worker_iface_t*
ucp_worker_iface(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return (rsc_index == UCP_NULL_RESOURCE) ? NULL :
worker->ifaces[ucs_bitmap2idx(worker->context->tl_bitmap, rsc_index)];
}
static UCS_F_ALWAYS_INLINE uct_iface_attr_t*
ucp_worker_iface_get_attr(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
return &ucp_worker_iface(worker, rsc_index)->attr;
}
static UCS_F_ALWAYS_INLINE double
ucp_worker_iface_bandwidth(ucp_worker_h worker, ucp_rsc_index_t rsc_index)
{
uct_iface_attr_t *iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
return ucp_tl_iface_bandwidth(worker->context, &iface_attr->bandwidth);
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_unified_mode(ucp_worker_h worker)
{
return worker->context->config.ext.unified_mode;
}
static UCS_F_ALWAYS_INLINE ucp_rsc_index_t
ucp_worker_num_cm_cmpts(const ucp_worker_h worker)
{
return worker->context->config.num_cm_cmpts;
}
static UCS_F_ALWAYS_INLINE int
ucp_worker_sockaddr_is_cm_proto(const ucp_worker_h worker)
{
return !!ucp_worker_num_cm_cmpts(worker);
}
#endif