/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
* Copyright (C) Los Alamos National Security, LLC. 2019 ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifndef UCP_EP_H_
#define UCP_EP_H_
#include "ucp_types.h"
#include <ucp/wireup/ep_match.h>
#include <uct/api/uct.h>
#include <ucs/datastruct/queue.h>
#include <ucs/stats/stats.h>
#include <ucs/datastruct/strided_alloc.h>
#include <ucs/debug/assert.h>
#define UCP_MAX_IOV 16UL
/* Configuration */
typedef uint16_t ucp_ep_cfg_index_t;
/* Endpoint flags type */
#if ENABLE_DEBUG_DATA || UCS_ENABLE_ASSERT
typedef uint32_t ucp_ep_flags_t;
#else
typedef uint16_t ucp_ep_flags_t;
#endif
/**
* Endpoint flags
*/
enum {
UCP_EP_FLAG_LOCAL_CONNECTED = UCS_BIT(0), /* All local endpoints are connected */
UCP_EP_FLAG_REMOTE_CONNECTED = UCS_BIT(1), /* All remote endpoints are connected */
UCP_EP_FLAG_CONNECT_REQ_QUEUED = UCS_BIT(2), /* Connection request was queued */
UCP_EP_FLAG_FAILED = UCS_BIT(3), /* EP is in failed state */
UCP_EP_FLAG_USED = UCS_BIT(4), /* EP is in use by the user */
UCP_EP_FLAG_STREAM_HAS_DATA = UCS_BIT(5), /* EP has data in the ext.stream.match_q */
UCP_EP_FLAG_ON_MATCH_CTX = UCS_BIT(6), /* EP is on match queue */
UCP_EP_FLAG_DEST_EP = UCS_BIT(7), /* dest_ep_ptr is valid */
UCP_EP_FLAG_LISTENER = UCS_BIT(8), /* EP holds pointer to a listener
(on server side due to receiving partial
worker address from the client) */
UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED = UCS_BIT(9), /* Pre-Connection request was queued */
UCP_EP_FLAG_CLOSED = UCS_BIT(10),/* EP was closed */
UCP_EP_FLAG_CLOSE_REQ_VALID = UCS_BIT(11),/* close protocol is started and
close_req is valid */
/* DEBUG bits */
UCP_EP_FLAG_CONNECT_REQ_SENT = UCS_BIT(16),/* DEBUG: Connection request was sent */
UCP_EP_FLAG_CONNECT_REP_SENT = UCS_BIT(17),/* DEBUG: Connection reply was sent */
UCP_EP_FLAG_CONNECT_ACK_SENT = UCS_BIT(18),/* DEBUG: Connection ACK was sent */
UCP_EP_FLAG_CONNECT_REQ_IGNORED = UCS_BIT(19),/* DEBUG: Connection request was ignored */
UCP_EP_FLAG_CONNECT_PRE_REQ_SENT = UCS_BIT(20),/* DEBUG: Connection pre-request was sent */
UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR = UCS_BIT(21),/* DEBUG: Partial worker address was sent
to the remote peer when starting
connection establishment on this EP */
UCP_EP_FLAG_FLUSH_STATE_VALID = UCS_BIT(22) /* DEBUG: flush_state is valid */
};
/**
* UCP endpoint statistics counters
*/
enum {
UCP_EP_STAT_TAG_TX_EAGER,
UCP_EP_STAT_TAG_TX_EAGER_SYNC,
UCP_EP_STAT_TAG_TX_RNDV,
UCP_EP_STAT_LAST
};
/**
* Endpoint init flags
*/
enum {
UCP_EP_INIT_FLAG_MEM_TYPE = UCS_BIT(0), /**< Endpoint for local mem type transfers */
UCP_EP_INIT_CREATE_AM_LANE = UCS_BIT(1), /**< Endpoint requires an AM lane */
UCP_EP_INIT_CM_WIREUP_CLIENT = UCS_BIT(2), /**< Endpoint wireup protocol is based on CM,
client side */
UCP_EP_INIT_CM_WIREUP_SERVER = UCS_BIT(3), /**< Endpoint wireup protocol is based on CM,
server side */
UCP_EP_INIT_ERR_MODE_PEER_FAILURE = UCS_BIT(4) /**< Endpoint requires an
@ref UCP_ERR_HANDLING_MODE_PEER */
};
#define UCP_EP_STAT_TAG_OP(_ep, _op) \
UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCP_EP_STAT_TAG_TX_##_op, 1);
/*
* Endpoint configuration key.
* This is filled by to the transport selection logic, according to the local
* resources and set of remote addresses.
*/
typedef struct ucp_ep_config_key {
ucp_lane_index_t num_lanes; /* Number of active lanes */
struct {
ucp_rsc_index_t rsc_index; /* Resource index */
ucp_lane_index_t proxy_lane; /* UCP_NULL_LANE - no proxy
otherwise - in which lane the real
transport endpoint is stored */
ucp_md_index_t dst_md_index; /* Destination memory domain index */
} lanes[UCP_MAX_LANES];
ucp_lane_index_t am_lane; /* Lane for AM (can be NULL) */
ucp_lane_index_t tag_lane; /* Lane for tag matching offload (can be NULL) */
ucp_lane_index_t wireup_lane; /* Lane for wireup messages (can be NULL) */
ucp_lane_index_t cm_lane; /* Lane for holding a CM connection */
/* Lanes for remote memory access, sorted by priority, highest first */
ucp_lane_index_t rma_lanes[UCP_MAX_LANES];
/* Lanes for high-bw memory access, sorted by priority, highest first */
ucp_lane_index_t rma_bw_lanes[UCP_MAX_LANES];
/* Lanes for atomic operations, sorted by priority, highest first */
ucp_lane_index_t amo_lanes[UCP_MAX_LANES];
/* Lanes for high-bw active messages, sorted by priority, highest first */
ucp_lane_index_t am_bw_lanes[UCP_MAX_LANES];
/* Local memory domains to send remote keys for in high-bw rma protocols
* NOTE: potentially it can be different than what is imposed by rma_bw_lanes,
* since these are the MDs used by remote side for accessing our memory. */
ucp_md_map_t rma_bw_md_map;
/* Bitmap of remote mds which are reachable from this endpoint (with any set
* of transports which could be selected in the future).
*/
ucp_md_map_t reachable_md_map;
/* Array with popcount(reachable_md_map) elements, each entry holds the local
* component index to be used for unpacking remote key from each set bit in
* reachable_md_map */
ucp_rsc_index_t *dst_md_cmpts;
/* Error handling mode */
ucp_err_handling_mode_t err_mode;
ucs_status_t status;
} ucp_ep_config_key_t;
/*
* Configuration for RMA protocols
*/
typedef struct ucp_ep_rma_config {
size_t max_put_short; /* Maximal payload of put short */
size_t max_put_bcopy; /* Maximal total size of put_bcopy */
size_t max_put_zcopy;
size_t max_get_short; /* Maximal payload of get short */
size_t max_get_bcopy; /* Maximal total size of get_bcopy */
size_t max_get_zcopy;
size_t put_zcopy_thresh;
size_t get_zcopy_thresh;
} ucp_ep_rma_config_t;
/*
* Configuration for AM and tag offload protocols
*/
typedef struct ucp_ep_msg_config {
ssize_t max_short;
size_t max_bcopy;
size_t max_zcopy;
size_t max_iov;
/* zero-copy threshold for operations which do not have to wait for remote side */
size_t zcopy_thresh[UCP_MAX_IOV];
/* zero-copy threshold for mem type buffers */
size_t mem_type_zcopy_thresh[UCS_MEMORY_TYPE_LAST];
/* zero-copy threshold for operations which anyways have to wait for remote side */
size_t sync_zcopy_thresh[UCP_MAX_IOV];
uint8_t zcopy_auto_thresh; /* if != 0 the zcopy enabled */
} ucp_ep_msg_config_t;
/*
* Thresholds with and without non-host memory
*/
typedef struct ucp_memtype_thresh {
ssize_t memtype_on;
ssize_t memtype_off;
} ucp_memtype_thresh_t;
typedef struct ucp_ep_config {
/* A key which uniquely defines the configuration, and all other fields of
* configuration (in the current worker) and defined only by it.
*/
ucp_ep_config_key_t key;
/* Bitmap of which lanes are p2p; affects the behavior of connection
* establishment protocols.
*/
ucp_lane_map_t p2p_lanes;
/* Configuration for each lane that provides RMA */
ucp_ep_rma_config_t rma[UCP_MAX_LANES];
/* Threshold for switching from put_short to put_bcopy */
size_t bcopy_thresh;
/* Configuration for AM lane */
ucp_ep_msg_config_t am;
/* MD index of each lane */
ucp_md_index_t md_index[UCP_MAX_LANES];
struct {
/* Protocols used for tag matching operations
* (can be AM based or tag offload). */
const ucp_request_send_proto_t *proto;
const ucp_request_send_proto_t *sync_proto;
/* Lane used for tag matching operations. */
ucp_lane_index_t lane;
/* Maximal size for eager short. */
ucp_memtype_thresh_t max_eager_short;
/* Configuration of the lane used for eager protocols
* (can be AM or tag offload). */
ucp_ep_msg_config_t eager;
struct {
/* Maximal total size of rndv_get_zcopy */
size_t max_get_zcopy;
/* Minimal size of rndv_get_zcopy */
size_t min_get_zcopy;
/* Maximal total size of rndv_put_zcopy */
size_t max_put_zcopy;
/* Minimal size of rndv_put_zcopy */
size_t min_put_zcopy;
/* Threshold for switching from eager to RMA based rendezvous */
size_t rma_thresh;
/* Threshold for switching from eager to AM based rendezvous */
size_t am_thresh;
/* Total size of packed rkey, according to high-bw md_map */
size_t rkey_size;
/* remote memory domains which support rkey_ptr */
ucp_md_map_t rkey_ptr_dst_mds;
/* Lanes for GET zcopy */
ucp_lane_index_t get_zcopy_lanes[UCP_MAX_LANES];
/* Lanes for PUT zcopy */
ucp_lane_index_t put_zcopy_lanes[UCP_MAX_LANES];
/* BW based scale factor */
double scale[UCP_MAX_LANES];
} rndv;
/* special thresholds for the ucp_tag_send_nbr() */
struct {
/* Threshold for switching from eager to RMA based rendezvous */
size_t rma_thresh;
/* Threshold for switching from eager to AM based rendezvous */
size_t am_thresh;
} rndv_send_nbr;
struct {
/* Maximal size for eager short. */
ucp_memtype_thresh_t max_eager_short;
/* Maximal iov count for RNDV offload */
size_t max_rndv_iov;
/* Maximal total size for RNDV offload */
size_t max_rndv_zcopy;
} offload;
} tag;
struct {
/* Protocols used for stream operations
* (currently it's only AM based). */
const ucp_request_send_proto_t *proto;
} stream;
struct {
/* Protocols used for am operations */
const ucp_request_send_proto_t *proto;
const ucp_request_send_proto_t *reply_proto;
} am_u;
} ucp_ep_config_t;
/**
* Protocol layer endpoint, represents a connection to a remote worker
*/
typedef struct ucp_ep {
ucp_worker_h worker; /* Worker this endpoint belongs to */
ucp_ep_cfg_index_t cfg_index; /* Configuration index */
ucp_ep_conn_sn_t conn_sn; /* Sequence number for remote connection */
ucp_lane_index_t am_lane; /* Cached value */
ucp_ep_flags_t flags; /* Endpoint flags */
/* TODO allocate ep dynamically according to number of lanes */
uct_ep_h uct_eps[UCP_MAX_LANES]; /* Transports for every lane */
#if ENABLE_DEBUG_DATA
char peer_name[UCP_WORKER_NAME_MAX];
#endif
UCS_STATS_NODE_DECLARE(stats)
} ucp_ep_t;
/**
* Status of protocol-level remote completions
*/
typedef struct {
ucs_queue_head_t reqs; /* Queue of flush requests which
are waiting for remote completion */
uint32_t send_sn; /* Sequence number of sent operations */
uint32_t cmpl_sn; /* Sequence number of completions */
} ucp_ep_flush_state_t;
/**
* Status of protocol-level remote completions
*/
typedef struct {
ucp_request_t *req; /* Flush request which is
used in close protocol */
} ucp_ep_close_proto_req_t;
/*
* Endpoint extension for generic non fast-path data
*/
typedef struct {
uintptr_t dest_ep_ptr; /* Remote EP pointer */
void *user_data; /* User data associated with ep */
ucs_list_link_t ep_list; /* List entry in worker's all eps list */
ucp_err_handler_cb_t err_cb; /* Error handler */
/* Endpoint match context and remote completion status are mutually exclusive,
* since remote completions are counted only after the endpoint is already
* matched to a remote peer.
*/
union {
ucp_ep_match_t ep_match; /* Matching with remote endpoints */
ucp_ep_flush_state_t flush_state; /* Remove completion status */
ucp_listener_h listener; /* Listener that may be associated with ep */
ucp_ep_close_proto_req_t close_req; /* Close protocol request */
};
} ucp_ep_ext_gen_t;
/*
* Endpoint extension for specific protocols
*/
typedef struct {
struct {
ucs_list_link_t ready_list; /* List entry in worker's EP list */
ucs_queue_head_t match_q; /* Queue of receive data or requests,
depends on UCP_EP_FLAG_STREAM_HAS_DATA */
} stream;
struct {
ucs_list_link_t started_ams;
} am;
} ucp_ep_ext_proto_t;
enum {
UCP_WIREUP_SA_DATA_FULL_ADDR = 0, /* Sockaddr client data contains full
address. */
UCP_WIREUP_SA_DATA_PARTIAL_ADDR, /* Sockaddr client data contains partial
address, wireup protocol requires
extra MSGs. */
UCP_WIREUP_SA_DATA_CM_ADDR /* Sockaddr client data contains address
for CM based wireup: there is only
iface and ep address of transport
lanes, remote device address is
provided by CM and has to be added to
unpacked UCP address locally. */
};
struct ucp_wireup_sockaddr_data {
uintptr_t ep_ptr; /**< Endpoint pointer */
uint8_t err_mode; /**< Error handling mode */
uint8_t addr_mode; /**< The attached address format
defined by
UCP_WIREUP_SA_DATA_xx */
uint8_t dev_index; /**< Device address index used to
build remote address in
UCP_WIREUP_SA_DATA_CM_ADDR
mode */
/* packed worker address follows */
} UCS_S_PACKED;
typedef struct ucp_conn_request {
ucp_listener_h listener;
union {
uct_listener_h listener;
uct_iface_h iface;
} uct;
uct_conn_request_h uct_req;
char dev_name[UCT_DEVICE_NAME_MAX];
uct_device_addr_t *remote_dev_addr;
ucp_wireup_sockaddr_data_t sa_data;
/* packed worker address follows */
} ucp_conn_request_t;
void ucp_ep_config_key_reset(ucp_ep_config_key_t *key);
void ucp_ep_config_lane_info_str(ucp_context_h context,
const ucp_ep_config_key_t *key,
const unsigned *addr_indices,
ucp_lane_index_t lane,
ucp_rsc_index_t aux_rsc_index,
char *buf, size_t max);
ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p);
void ucp_ep_delete(ucp_ep_h ep);
ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep);
ucs_status_t ucp_ep_create_to_worker_addr(ucp_worker_h worker,
uint64_t local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
unsigned ep_init_flags,
const char *message, ucp_ep_h *ep_p);
ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
const ucp_conn_request_h conn_request,
ucp_ep_h *ep_p);
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
ucp_send_callback_t req_cb,
unsigned req_flags,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
const char *debug_name);
ucs_status_t
ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address,
ucp_ep_h *ep_p);
void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags);
void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg);
void ucp_ep_disconnected(ucp_ep_h ep, int force);
void ucp_ep_destroy_internal(ucp_ep_h ep);
void ucp_ep_cleanup_lanes(ucp_ep_h ep);
int ucp_ep_is_sockaddr_stub(ucp_ep_h ep);
ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
const ucp_ep_config_key_t *key);
void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config);
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2);
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane);
size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
const uct_linear_growth_t *reg_cost,
const ucp_context_h context,
double bandwidth);
ucs_status_t ucp_worker_create_mem_type_endpoints(ucp_worker_h worker);
ucp_wireup_ep_t * ucp_ep_get_cm_wireup_ep(ucp_ep_h ep);
uint64_t ucp_ep_get_tl_bitmap(ucp_ep_h ep);
uct_ep_h ucp_ep_get_cm_uct_ep(ucp_ep_h ep);
int ucp_ep_is_cm_local_connected(ucp_ep_h ep);
unsigned ucp_ep_local_disconnect_progress(void *arg);
size_t ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config_t *config);
void ucp_ep_invoke_err_cb(ucp_ep_h ep, ucs_status_t status);
int ucp_ep_config_test_rndv_support(const ucp_ep_config_t *config);
#endif