/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED.
* Copyright (C) Los Alamos National Security, LLC. 2019 ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "ucp_ep.h"
#include "ucp_worker.h"
#include "ucp_am.h"
#include "ucp_ep.inl"
#include "ucp_request.inl"
#include <ucp/wireup/wireup_ep.h>
#include <ucp/wireup/wireup.h>
#include <ucp/wireup/wireup_cm.h>
#include <ucp/tag/eager.h>
#include <ucp/tag/offload.h>
#include <ucp/tag/rndv.h>
#include <ucp/stream/stream.h>
#include <ucp/core/ucp_listener.h>
#include <ucs/datastruct/queue.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/sys/string.h>
#include <ucs/sys/sock.h>
#include <string.h>
typedef struct {
double reg_growth;
double reg_overhead;
double overhead;
double latency;
size_t bw;
} ucp_ep_thresh_params_t;
extern const ucp_request_send_proto_t ucp_stream_am_proto;
extern const ucp_request_send_proto_t ucp_am_proto;
extern const ucp_request_send_proto_t ucp_am_reply_proto;
#if ENABLE_STATS
static ucs_stats_class_t ucp_ep_stats_class = {
.name = "ucp_ep",
.num_counters = UCP_EP_STAT_LAST,
.counter_names = {
[UCP_EP_STAT_TAG_TX_EAGER] = "tx_eager",
[UCP_EP_STAT_TAG_TX_EAGER_SYNC] = "tx_eager_sync",
[UCP_EP_STAT_TAG_TX_RNDV] = "tx_rndv"
}
};
#endif
void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
{
ucp_lane_index_t i;
memset(key, 0, sizeof(*key));
key->num_lanes = 0;
for (i = 0; i < UCP_MAX_LANES; ++i) {
key->lanes[i].rsc_index = UCP_NULL_RESOURCE;
key->lanes[i].proxy_lane = UCP_NULL_LANE;
key->lanes[i].dst_md_index = UCP_MAX_MDS;
}
key->am_lane = UCP_NULL_LANE;
key->wireup_lane = UCP_NULL_LANE;
key->cm_lane = UCP_NULL_LANE;
key->tag_lane = UCP_NULL_LANE;
key->rma_bw_md_map = 0;
key->reachable_md_map = 0;
key->dst_md_cmpts = NULL;
key->err_mode = UCP_ERR_HANDLING_MODE_NONE;
key->status = UCS_OK;
memset(key->am_bw_lanes, UCP_NULL_LANE, sizeof(key->am_bw_lanes));
memset(key->rma_lanes, UCP_NULL_LANE, sizeof(key->rma_lanes));
memset(key->rma_bw_lanes, UCP_NULL_LANE, sizeof(key->rma_bw_lanes));
memset(key->amo_lanes, UCP_NULL_LANE, sizeof(key->amo_lanes));
}
ucs_status_t ucp_ep_new(ucp_worker_h worker, const char *peer_name,
const char *message, ucp_ep_h *ep_p)
{
ucs_status_t status;
ucp_ep_config_key_t key;
ucp_lane_index_t lane;
ucp_ep_h ep;
ep = ucs_strided_alloc_get(&worker->ep_alloc, "ucp_ep");
if (ep == NULL) {
ucs_error("Failed to allocate ep");
status = UCS_ERR_NO_MEMORY;
goto err;
}
ucp_ep_config_key_reset(&key);
status = ucp_worker_get_ep_config(worker, &key, 0, &ep->cfg_index);
if (status != UCS_OK) {
goto err_free_ep;
}
ep->worker = worker;
ep->am_lane = UCP_NULL_LANE;
ep->flags = 0;
ep->conn_sn = (ucp_ep_conn_sn_t)-1;
ucp_ep_ext_gen(ep)->user_data = NULL;
ucp_ep_ext_gen(ep)->dest_ep_ptr = 0;
ucp_ep_ext_gen(ep)->err_cb = NULL;
UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen(ep)->ep_match) >=
sizeof(ucp_ep_ext_gen(ep)->listener));
UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen(ep)->ep_match) >=
sizeof(ucp_ep_ext_gen(ep)->flush_state));
memset(&ucp_ep_ext_gen(ep)->ep_match, 0,
sizeof(ucp_ep_ext_gen(ep)->ep_match));
ucp_stream_ep_init(ep);
ucp_am_ep_init(ep);
for (lane = 0; lane < UCP_MAX_LANES; ++lane) {
ep->uct_eps[lane] = NULL;
}
#if ENABLE_DEBUG_DATA
ucs_snprintf_zero(ep->peer_name, UCP_WORKER_NAME_MAX, "%s", peer_name);
#endif
/* Create statistics */
status = UCS_STATS_NODE_ALLOC(&ep->stats, &ucp_ep_stats_class,
worker->stats, "-%p", ep);
if (status != UCS_OK) {
goto err_free_ep;
}
ucs_list_add_tail(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list);
*ep_p = ep;
ucs_debug("created ep %p to %s %s", ep, ucp_ep_peer_name(ep), message);
return UCS_OK;
err_free_ep:
ucs_strided_alloc_put(&worker->ep_alloc, ep);
err:
return status;
}
void ucp_ep_delete(ucp_ep_h ep)
{
ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_wireup_msg_ack_cb_pred, ep);
UCS_STATS_NODE_FREE(ep->stats);
ucs_list_del(&ucp_ep_ext_gen(ep)->ep_list);
ucs_strided_alloc_put(&ep->worker->ep_alloc, ep);
}
ucs_status_t
ucp_ep_create_sockaddr_aux(ucp_worker_h worker, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address,
ucp_ep_h *ep_p)
{
ucp_wireup_ep_t *wireup_ep;
ucs_status_t status;
ucp_ep_h ep;
/* allocate endpoint */
status = ucp_ep_new(worker, remote_address->name, "listener", &ep);
if (status != UCS_OK) {
goto err;
}
status = ucp_ep_init_create_wireup(ep, ep_init_flags, &wireup_ep);
if (status != UCS_OK) {
goto err_delete;
}
status = ucp_wireup_ep_connect_aux(wireup_ep, ep_init_flags, remote_address);
if (status != UCS_OK) {
goto err_destroy_wireup_ep;
}
*ep_p = ep;
return status;
err_destroy_wireup_ep:
uct_ep_destroy(ep->uct_eps[0]);
err_delete:
ucp_ep_delete(ep);
err:
return status;
}
void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags)
{
key->err_mode = (ep_init_flags & UCP_EP_INIT_ERR_MODE_PEER_FAILURE) ?
UCP_ERR_HANDLING_MODE_PEER : UCP_ERR_HANDLING_MODE_NONE;
}
int ucp_ep_is_sockaddr_stub(ucp_ep_h ep)
{
/* Only a sockaddr client-side endpoint may be created as a "stub" */
return ucp_ep_get_rsc_index(ep, 0) == UCP_NULL_RESOURCE;
}
static ucs_status_t
ucp_ep_adjust_params(ucp_ep_h ep, const ucp_ep_params_t *params)
{
/* handle a case where the existing endpoint is incomplete */
if (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) {
if (ucp_ep_config(ep)->key.err_mode != params->err_mode) {
ucs_error("asymmetric endpoint configuration not supported, "
"error handling level mismatch");
return UCS_ERR_UNSUPPORTED;
}
}
if (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLER) {
ucp_ep_ext_gen(ep)->user_data = params->err_handler.arg;
ucp_ep_ext_gen(ep)->err_cb = params->err_handler.cb;
}
if (params->field_mask & UCP_EP_PARAM_FIELD_USER_DATA) {
/* user_data overrides err_handler.arg */
ucp_ep_ext_gen(ep)->user_data = params->user_data;
}
return UCS_OK;
}
ucs_status_t ucp_worker_create_mem_type_endpoints(ucp_worker_h worker)
{
ucp_context_h context = worker->context;
ucp_unpacked_address_t local_address;
unsigned i, mem_type;
ucs_status_t status;
void *address_buffer;
size_t address_length;
for (mem_type = 0; mem_type < UCS_MEMORY_TYPE_LAST; mem_type++) {
if (UCP_MEM_IS_ACCESSIBLE_FROM_CPU(mem_type) ||
!context->mem_type_access_tls[mem_type]) {
continue;
}
status = ucp_address_pack(worker, NULL,
context->mem_type_access_tls[mem_type],
UCP_ADDRESS_PACK_FLAG_ALL, NULL,
&address_length, &address_buffer);
if (status != UCS_OK) {
goto err_cleanup_eps;
}
status = ucp_address_unpack(worker, address_buffer, UINT64_MAX, &local_address);
if (status != UCS_OK) {
goto err_free_address_buffer;
}
status = ucp_ep_create_to_worker_addr(worker, UINT64_MAX,
&local_address,
UCP_EP_INIT_FLAG_MEM_TYPE,
"mem type",
&worker->mem_type_ep[mem_type]);
if (status != UCS_OK) {
goto err_free_address_list;
}
ucs_free(local_address.address_list);
ucs_free(address_buffer);
}
return UCS_OK;
err_free_address_list:
ucs_free(local_address.address_list);
err_free_address_buffer:
ucs_free(address_buffer);
err_cleanup_eps:
for (i = 0; i < UCS_MEMORY_TYPE_LAST; i++) {
if (worker->mem_type_ep[i]) {
ucp_ep_destroy_internal(worker->mem_type_ep[i]);
}
}
return status;
}
ucs_status_t ucp_ep_init_create_wireup(ucp_ep_h ep, unsigned ep_init_flags,
ucp_wireup_ep_t **wireup_ep)
{
ucp_ep_config_key_t key;
ucs_status_t status;
ucp_ep_config_key_reset(&key);
ucp_ep_config_key_set_err_mode(&key, ep_init_flags);
key.num_lanes = 1;
/* all operations will use the first lane, which is a stub endpoint before
* reconfiguration */
key.am_lane = 0;
if (ucp_worker_sockaddr_is_cm_proto(ep->worker)) {
key.cm_lane = 0;
} else {
key.wireup_lane = 0;
}
status = ucp_worker_get_ep_config(ep->worker, &key, 0, &ep->cfg_index);
if (status != UCS_OK) {
return status;
}
ep->am_lane = key.am_lane;
ep->flags |= UCP_EP_FLAG_CONNECT_REQ_QUEUED;
status = ucp_wireup_ep_create(ep, &ep->uct_eps[0]);
if (status != UCS_OK) {
return status;
}
*wireup_ep = ucs_derived_of(ep->uct_eps[0], ucp_wireup_ep_t);
return UCS_OK;
}
ucs_status_t ucp_ep_create_to_worker_addr(ucp_worker_h worker,
uint64_t local_tl_bitmap,
const ucp_unpacked_address_t *remote_address,
unsigned ep_init_flags,
const char *message, ucp_ep_h *ep_p)
{
unsigned addr_indices[UCP_MAX_LANES];
ucs_status_t status;
ucp_ep_h ep;
/* allocate endpoint */
status = ucp_ep_new(worker, remote_address->name, message, &ep);
if (status != UCS_OK) {
goto err;
}
/* initialize transport endpoints */
status = ucp_wireup_init_lanes(ep, ep_init_flags, local_tl_bitmap,
remote_address, addr_indices);
if (status != UCS_OK) {
goto err_delete;
}
ucs_assert(!(ucp_ep_get_tl_bitmap(ep) & ~local_tl_bitmap));
*ep_p = ep;
return UCS_OK;
err_delete:
ucp_ep_delete(ep);
err:
return status;
}
static ucs_status_t ucp_ep_create_to_sock_addr(ucp_worker_h worker,
const ucp_ep_params_t *params,
ucp_ep_h *ep_p)
{
char peer_name[UCS_SOCKADDR_STRING_LEN];
ucp_wireup_ep_t *wireup_ep;
ucs_status_t status;
ucp_ep_h ep;
if (!(params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR)) {
ucs_error("destination socket address is missing");
status = UCS_ERR_INVALID_PARAM;
goto err;
}
UCP_CHECK_PARAM_NON_NULL(params->sockaddr.addr, status, goto err);
/* allocate endpoint */
ucs_sockaddr_str(params->sockaddr.addr, peer_name, sizeof(peer_name));
status = ucp_ep_new(worker, peer_name, "from api call", &ep);
if (status != UCS_OK) {
goto err;
}
status = ucp_ep_init_create_wireup(ep, ucp_ep_init_flags(worker, params),
&wireup_ep);
if (status != UCS_OK) {
goto err_delete;
}
status = ucp_ep_adjust_params(ep, params);
if (status != UCS_OK) {
goto err_cleanup_lanes;
}
status = ucp_worker_sockaddr_is_cm_proto(ep->worker) ?
ucp_ep_client_cm_connect_start(ep, params) :
ucp_wireup_ep_connect_to_sockaddr(ep->uct_eps[0], params);
if (status != UCS_OK) {
goto err_cleanup_lanes;
}
*ep_p = ep;
return UCS_OK;
err_cleanup_lanes:
ucp_ep_cleanup_lanes(ep);
err_delete:
ucp_ep_delete(ep);
err:
return status;
}
/**
* Create an endpoint on the server side connected to the client endpoint.
*/
ucs_status_t ucp_ep_create_server_accept(ucp_worker_h worker,
const ucp_conn_request_h conn_request,
ucp_ep_h *ep_p)
{
const ucp_wireup_sockaddr_data_t *sa_data = &conn_request->sa_data;
unsigned ep_init_flags = 0;
ucp_unpacked_address_t remote_addr;
uint64_t addr_flags;
unsigned i;
ucs_status_t status;
if (sa_data->err_mode == UCP_ERR_HANDLING_MODE_PEER) {
ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
}
if (sa_data->addr_mode == UCP_WIREUP_SA_DATA_CM_ADDR) {
addr_flags = UCP_ADDRESS_PACK_FLAG_IFACE_ADDR |
UCP_ADDRESS_PACK_FLAG_EP_ADDR |
UCP_ADDRESS_PACK_FLAG_TRACE;
} else {
addr_flags = UINT64_MAX;
}
/* coverity[overrun-local] */
status = ucp_address_unpack(worker, sa_data + 1, addr_flags, &remote_addr);
if (status != UCS_OK) {
goto out;
}
switch (sa_data->addr_mode) {
case UCP_WIREUP_SA_DATA_FULL_ADDR:
/* create endpoint to the worker address we got in the private data */
status = ucp_ep_create_to_worker_addr(worker, UINT64_MAX, &remote_addr,
ep_init_flags |
UCP_EP_INIT_CREATE_AM_LANE,
"listener", ep_p);
if (status != UCS_OK) {
goto out_free_address;
}
ucs_assert(ucp_ep_config(*ep_p)->key.err_mode == sa_data->err_mode);
ucp_ep_flush_state_reset(*ep_p);
ucp_ep_update_dest_ep_ptr(*ep_p, sa_data->ep_ptr);
break;
case UCP_WIREUP_SA_DATA_PARTIAL_ADDR:
status = ucp_ep_create_sockaddr_aux(worker, ep_init_flags,
&remote_addr, ep_p);
if (status != UCS_OK) {
goto out_free_address;
}
ucp_ep_update_dest_ep_ptr(*ep_p, sa_data->ep_ptr);
/* the server's ep should be aware of the sent address from the client */
(*ep_p)->flags |= UCP_EP_FLAG_LISTENER;
/* NOTE: protect union */
ucs_assert(!((*ep_p)->flags & (UCP_EP_FLAG_ON_MATCH_CTX |
UCP_EP_FLAG_FLUSH_STATE_VALID)));
break;
case UCP_WIREUP_SA_DATA_CM_ADDR:
ucs_assert(ucp_worker_sockaddr_is_cm_proto(worker));
for (i = 0; i < remote_addr.address_count; ++i) {
remote_addr.address_list[i].dev_addr = conn_request->remote_dev_addr;
remote_addr.address_list[i].dev_index = conn_request->sa_data.dev_index;
}
status = ucp_ep_cm_server_create_connected(worker,
ep_init_flags |
UCP_EP_INIT_CM_WIREUP_SERVER,
&remote_addr, conn_request,
ep_p);
if (status != UCS_OK) {
goto out_free_address;
}
(*ep_p)->flags |= UCP_EP_FLAG_LISTENER;
ucp_ep_ext_gen(*ep_p)->listener = conn_request->listener;
break;
default:
ucs_fatal("client sockaddr data contains invalid address mode %d",
sa_data->addr_mode);
}
out_free_address:
ucs_free(remote_addr.address_list);
out:
return status;
}
static ucs_status_t
ucp_ep_create_api_conn_request(ucp_worker_h worker,
const ucp_ep_params_t *params, ucp_ep_h *ep_p)
{
ucp_conn_request_h conn_request = params->conn_request;
ucp_ep_h ep;
ucs_status_t status;
status = ucp_ep_create_server_accept(worker, conn_request, &ep);
if (status != UCS_OK) {
goto out;
}
status = ucp_ep_adjust_params(ep, params);
if (status != UCS_OK) {
goto out_ep_destroy;
}
if (ucp_worker_sockaddr_is_cm_proto(worker)) {
goto out;
}
if (ep->flags & UCP_EP_FLAG_LISTENER) {
status = ucp_wireup_send_pre_request(ep);
} else {
/* send wireup request message, to connect the client to the server's
new endpoint */
ucs_assert(!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED));
status = ucp_wireup_send_request(ep);
}
if (status == UCS_OK) {
goto out;
}
out_ep_destroy:
ucp_ep_destroy_internal(ep);
out:
if (status == UCS_OK) {
if (!ucp_worker_sockaddr_is_cm_proto(worker)) {
status = uct_iface_accept(conn_request->uct.iface,
conn_request->uct_req);
}
} else {
if (ucp_worker_sockaddr_is_cm_proto(worker)) {
uct_listener_reject(conn_request->uct.listener,
conn_request->uct_req);
} else {
uct_iface_reject(conn_request->uct.iface, conn_request->uct_req);
}
}
if (ucp_worker_sockaddr_is_cm_proto(worker)) {
ucs_free(conn_request->remote_dev_addr);
}
ucs_free(conn_request);
if (status == UCS_OK) {
*ep_p = ep;
}
return status;
}
static ucs_status_t
ucp_ep_create_api_to_worker_addr(ucp_worker_h worker,
const ucp_ep_params_t *params, ucp_ep_h *ep_p)
{
ucp_unpacked_address_t remote_address;
ucp_ep_conn_sn_t conn_sn;
ucs_status_t status;
unsigned flags;
ucp_ep_h ep;
if (!(params->field_mask & UCP_EP_PARAM_FIELD_REMOTE_ADDRESS)) {
status = UCS_ERR_INVALID_PARAM;
ucs_error("remote worker address is missing");
goto out;
}
UCP_CHECK_PARAM_NON_NULL(params->address, status, goto out);
status = ucp_address_unpack(worker, params->address, UINT64_MAX, &remote_address);
if (status != UCS_OK) {
goto out;
}
/* Check if there is already an unconnected internal endpoint to the same
* destination address.
* In case of loopback connection, search the hash table for an endpoint with
* even/odd matching, so that every 2 endpoints connected to the local worker
* with be paired to each other.
* Note that if a loopback endpoint had the UCP_EP_PARAMS_FLAGS_NO_LOOPBACK
* flag set, it will not be added to ep_match as an unexpected ep. Because
* dest_ep_ptr will be initialized, a WIREUP_REQUEST (if sent) will have
* dst_ep != 0. So, ucp_wireup_request() will not create an unexpected ep
* in ep_match.
*/
conn_sn = ucp_ep_match_get_next_sn(&worker->ep_match_ctx, remote_address.uuid);
ep = ucp_ep_match_retrieve_unexp(&worker->ep_match_ctx, remote_address.uuid,
conn_sn ^ (remote_address.uuid == worker->uuid));
if (ep != NULL) {
status = ucp_ep_adjust_params(ep, params);
if (status != UCS_OK) {
ucp_ep_destroy_internal(ep);
}
ucp_ep_flush_state_reset(ep);
ucp_stream_ep_activate(ep);
goto out_free_address;
}
status = ucp_ep_create_to_worker_addr(worker, UINT64_MAX, &remote_address,
ucp_ep_init_flags(worker, params),
"from api call", &ep);
if (status != UCS_OK) {
goto out_free_address;
}
status = ucp_ep_adjust_params(ep, params);
if (status != UCS_OK) {
ucp_ep_destroy_internal(ep);
goto out_free_address;
}
ep->conn_sn = conn_sn;
/*
* If we are connecting to our own worker, and loopback is allowed, connect
* the endpoint to itself by updating dest_ep_ptr.
* Otherwise, add the new ep to the matching context as an expected endpoint,
* waiting for connection request from the peer endpoint
*/
flags = UCP_PARAM_VALUE(EP, params, flags, FLAGS, 0);
if ((remote_address.uuid == worker->uuid) &&
!(flags & UCP_EP_PARAMS_FLAGS_NO_LOOPBACK)) {
ucp_ep_update_dest_ep_ptr(ep, (uintptr_t)ep);
ucp_ep_flush_state_reset(ep);
} else {
ucp_ep_match_insert_exp(&worker->ep_match_ctx, remote_address.uuid, ep);
}
/* if needed, send initial wireup message */
if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) {
ucs_assert(!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED));
status = ucp_wireup_send_request(ep);
if (status != UCS_OK) {
goto out_free_address;
}
}
status = UCS_OK;
out_free_address:
ucs_free(remote_address.address_list);
out:
if (status == UCS_OK) {
*ep_p = ep;
}
return status;
}
ucs_status_t ucp_ep_create(ucp_worker_h worker, const ucp_ep_params_t *params,
ucp_ep_h *ep_p)
{
ucs_status_t status;
unsigned flags;
ucp_ep_h ep = NULL;
UCS_ASYNC_BLOCK(&worker->async);
flags = UCP_PARAM_VALUE(EP, params, flags, FLAGS, 0);
if (flags & UCP_EP_PARAMS_FLAGS_CLIENT_SERVER) {
status = ucp_ep_create_to_sock_addr(worker, params, &ep);
} else if (params->field_mask & UCP_EP_PARAM_FIELD_CONN_REQUEST) {
status = ucp_ep_create_api_conn_request(worker, params, &ep);
} else if (params->field_mask & UCP_EP_PARAM_FIELD_REMOTE_ADDRESS) {
status = ucp_ep_create_api_to_worker_addr(worker, params, &ep);
} else {
status = UCS_ERR_INVALID_PARAM;
}
if (status == UCS_OK) {
ep->flags |= UCP_EP_FLAG_USED;
*ep_p = ep;
}
UCS_ASYNC_UNBLOCK(&worker->async);
return status;
}
ucs_status_ptr_t ucp_ep_modify_nb(ucp_ep_h ep, const ucp_ep_params_t *params)
{
ucp_worker_h worker = ep->worker;
ucs_status_t status;
if (params->field_mask & (UCP_EP_PARAM_FIELD_REMOTE_ADDRESS |
UCP_EP_PARAM_FIELD_SOCK_ADDR |
UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE)) {
return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM);
}
UCS_ASYNC_BLOCK(&worker->async);
status = ucp_ep_adjust_params(ep, params);
UCS_ASYNC_UNBLOCK(&worker->async);
return UCS_STATUS_PTR(status);
}
void ucp_ep_err_pending_purge(uct_pending_req_t *self, void *arg)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucs_status_t status = UCS_PTR_STATUS(arg);
ucp_request_send_state_ff(req, status);
}
static void ucp_destroyed_ep_pending_purge(uct_pending_req_t *self, void *arg)
{
ucs_bug("pending request %p on ep %p should have been flushed", self, arg);
}
void ucp_ep_destroy_internal(ucp_ep_h ep)
{
ucs_debug("ep %p: destroy", ep);
ucp_ep_cleanup_lanes(ep);
ucp_ep_delete(ep);
}
void ucp_ep_cleanup_lanes(ucp_ep_h ep)
{
ucp_lane_index_t lane, proxy_lane;
uct_ep_h uct_ep;
ucs_debug("ep %p: cleanup lanes", ep);
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ep->uct_eps[lane];
if (uct_ep != NULL) {
ucs_debug("ep %p: purge uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_pending_purge(uct_ep, ucp_destroyed_ep_pending_purge, ep);
}
}
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ep->uct_eps[lane];
if (uct_ep == NULL) {
continue;
}
proxy_lane = ucp_ep_get_proxy_lane(ep, lane);
if ((proxy_lane != UCP_NULL_LANE) && (proxy_lane != lane) &&
(ep->uct_eps[lane] == ep->uct_eps[proxy_lane]))
{
/* duplicate of another lane */
continue;
}
ucs_debug("ep %p: destroy uct_ep[%d]=%p", ep, lane, uct_ep);
uct_ep_destroy(uct_ep);
}
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
ep->uct_eps[lane] = NULL;
}
}
/* Must be called with async lock held */
void ucp_ep_disconnected(ucp_ep_h ep, int force)
{
/* remove pending slow-path progress in case it wasn't removed yet */
ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_worker_err_handle_remove_filter, ep);
/* remove pending slow-path function it wasn't removed yet */
ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_listener_accept_cb_remove_filter, ep);
ucp_ep_cm_slow_cbq_cleanup(ep);
ucp_stream_ep_cleanup(ep);
ucp_am_ep_cleanup(ep);
ep->flags &= ~UCP_EP_FLAG_USED;
if ((ep->flags & (UCP_EP_FLAG_CONNECT_REQ_QUEUED|UCP_EP_FLAG_REMOTE_CONNECTED))
&& !force) {
/* Endpoints which have remote connection are destroyed only when the
* worker is destroyed, to enable remote endpoints keep sending
* TODO negotiate disconnect.
*/
ucs_trace("not destroying ep %p because of connection from remote", ep);
return;
}
ucp_ep_match_remove_ep(&ep->worker->ep_match_ctx, ep);
ucp_ep_destroy_internal(ep);
}
unsigned ucp_ep_local_disconnect_progress(void *arg)
{
ucp_request_t *req = arg;
ucp_ep_h ep = req->send.ep;
ucs_async_context_t *async = &ep->worker->async; /* ep becomes invalid */
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));
UCS_ASYNC_BLOCK(async);
ucs_debug("ep %p: disconnected with request %p, %s", ep, req,
ucs_status_string(req->status));
ucp_ep_disconnected(ep, req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL);
UCS_ASYNC_UNBLOCK(async);
/* Complete send request from here, to avoid releasing the request while
* slow-path element is still pending */
ucp_request_complete_send(req, req->status);
return 0;
}
static void ucp_ep_set_close_request(ucp_ep_h ep, ucp_request_t *request,
const char *debug_msg)
{
ucs_trace("ep %p: setting close request %p, %s", ep, request, debug_msg);
ucp_ep_flush_state_invalidate(ep);
ucp_ep_ext_gen(ep)->close_req.req = request;
ep->flags |= UCP_EP_FLAG_CLOSE_REQ_VALID;
}
static void ucp_ep_close_flushed_callback(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
ucs_async_context_t *async = &ep->worker->async;
/* in case of force close, schedule ucp_ep_local_disconnect_progress to
* destroy the ep and all its lanes */
if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) {
goto out;
}
UCS_ASYNC_BLOCK(async);
ucs_debug("ep %p: flags 0x%x close flushed callback for request %p", ep,
ep->flags, req);
if (ucp_ep_is_cm_local_connected(ep)) {
/* Now, when close flush is completed and we are still locally connected,
* we have to notify remote side */
ucp_ep_cm_disconnect_cm_lane(ep);
if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
/* Wait disconnect notification from remote side to complete this
* request */
ucp_ep_set_close_request(ep, req, "close flushed callback");
UCS_ASYNC_UNBLOCK(async);
return;
}
}
UCS_ASYNC_UNBLOCK(async);
out:
/* If a flush is completed from a pending/completion callback, we need to
* schedule slow-path callback to release the endpoint later, since a UCT
* endpoint cannot be released from pending/completion callback context.
*/
ucs_trace("adding slow-path callback to destroy ep %p", ep);
req->send.disconnect.prog_id = UCS_CALLBACKQ_ID_NULL;
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_local_disconnect_progress,
req, UCS_CALLBACKQ_FLAG_ONESHOT,
&req->send.disconnect.prog_id);
}
static ucs_status_t ucp_ep_close_nb_check_params(ucp_ep_h ep, unsigned mode)
{
/* CM lane tracks remote state, so it can be used with any modes of close
* and error handling */
if ((mode == UCP_EP_CLOSE_MODE_FLUSH) ||
(ucp_ep_get_cm_lane(ep) != UCP_NULL_LANE)) {
return UCS_OK;
}
/* In case of close in force mode, remote peer failure detection mechanism
* should be enabled (CM lane is handled above) to prevent hang or any
* other undefined behavior */
if ((mode == UCP_EP_CLOSE_MODE_FORCE) &&
(ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_PEER)) {
return UCS_OK;
}
return UCS_ERR_INVALID_PARAM;
}
ucs_status_ptr_t ucp_ep_close_nb(ucp_ep_h ep, unsigned mode)
{
ucp_worker_h worker = ep->worker;
void *request;
ucp_request_t *close_req;
ucs_status_t status;
status = ucp_ep_close_nb_check_params(ep, mode);
if (status != UCS_OK) {
return UCS_STATUS_PTR(status);
}
UCS_ASYNC_BLOCK(&worker->async);
ep->flags |= UCP_EP_FLAG_CLOSED;
request = ucp_ep_flush_internal(ep,
(mode == UCP_EP_CLOSE_MODE_FLUSH) ?
UCT_FLUSH_FLAG_LOCAL : UCT_FLUSH_FLAG_CANCEL,
NULL, 0, NULL,
ucp_ep_close_flushed_callback, "close");
if (!UCS_PTR_IS_PTR(request)) {
if (ucp_ep_is_cm_local_connected(ep) &&
(mode == UCP_EP_CLOSE_MODE_FLUSH)) {
/* lanes already flushed, start disconnect on CM lane */
ucp_ep_cm_disconnect_cm_lane(ep);
close_req = ucp_ep_cm_close_request_get(ep);
if (close_req != NULL) {
request = close_req + 1;
ucp_ep_set_close_request(ep, close_req, "close");
} else {
request = UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
}
} else {
ucp_ep_disconnected(ep, mode == UCP_EP_CLOSE_MODE_FORCE);
}
}
UCS_ASYNC_UNBLOCK(&worker->async);
return request;
}
ucs_status_ptr_t ucp_disconnect_nb(ucp_ep_h ep)
{
return ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FLUSH);
}
void ucp_ep_destroy(ucp_ep_h ep)
{
ucp_worker_h worker = ep->worker;
ucs_status_ptr_t *request;
ucs_status_t status;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
request = ucp_disconnect_nb(ep);
if (request == NULL) {
goto out;
} else if (UCS_PTR_IS_ERR(request)) {
ucs_warn("disconnect failed: %s",
ucs_status_string(UCS_PTR_STATUS(request)));
goto out;
} else {
do {
ucp_worker_progress(worker);
status = ucp_request_check_status(request);
} while (status == UCS_INPROGRESS);
ucp_request_release(request);
}
out:
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
return;
}
int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
const ucp_ep_config_key_t *key2)
{
ucp_lane_index_t lane;
int i;
if ((key1->num_lanes != key2->num_lanes) ||
memcmp(key1->rma_lanes, key2->rma_lanes, sizeof(key1->rma_lanes)) ||
memcmp(key1->am_bw_lanes, key2->am_bw_lanes, sizeof(key1->am_bw_lanes)) ||
memcmp(key1->rma_bw_lanes, key2->rma_bw_lanes, sizeof(key1->rma_bw_lanes)) ||
memcmp(key1->amo_lanes, key2->amo_lanes, sizeof(key1->amo_lanes)) ||
(key1->rma_bw_md_map != key2->rma_bw_md_map) ||
(key1->reachable_md_map != key2->reachable_md_map) ||
(key1->am_lane != key2->am_lane) ||
(key1->tag_lane != key2->tag_lane) ||
(key1->wireup_lane != key2->wireup_lane) ||
(key1->cm_lane != key2->cm_lane) ||
(key1->err_mode != key2->err_mode) ||
(key1->status != key2->status))
{
return 0;
}
for (lane = 0; lane < key1->num_lanes; ++lane) {
if ((key1->lanes[lane].rsc_index != key2->lanes[lane].rsc_index) ||
(key1->lanes[lane].proxy_lane != key2->lanes[lane].proxy_lane) ||
(key1->lanes[lane].dst_md_index != key2->lanes[lane].dst_md_index))
{
return 0;
}
}
for (i = 0; i < ucs_popcount(key1->reachable_md_map); ++i) {
if (key1->dst_md_cmpts[i] != key2->dst_md_cmpts[i]) {
return 0;
}
}
return 1;
}
static void ucp_ep_config_calc_params(ucp_worker_h worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *lanes,
ucp_ep_thresh_params_t *params)
{
ucp_context_h context = worker->context;
ucp_md_map_t md_map = 0;
ucp_lane_index_t lane;
ucp_rsc_index_t rsc_index;
ucp_md_index_t md_index;
uct_md_attr_t *md_attr;
uct_iface_attr_t *iface_attr;
ucp_worker_iface_t *wiface;
int i;
memset(params, 0, sizeof(*params));
for (i = 0; (i < UCP_MAX_LANES) && (lanes[i] != UCP_NULL_LANE); i++) {
lane = lanes[i];
rsc_index = config->key.lanes[lane].rsc_index;
md_index = config->md_index[lane];
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
if (!(md_map & UCS_BIT(md_index))) {
md_map |= UCS_BIT(md_index);
md_attr = &context->tl_mds[md_index].attr;
if (md_attr->cap.flags & UCT_MD_FLAG_REG) {
params->reg_growth += md_attr->reg_cost.growth;
params->reg_overhead += md_attr->reg_cost.overhead;
params->overhead += iface_attr->overhead;
params->latency += ucp_tl_iface_latency(context, iface_attr);
}
}
wiface = ucp_worker_iface(worker, rsc_index);
params->bw += ucp_tl_iface_bandwidth(context, &wiface->attr.bandwidth);
}
}
static size_t ucp_ep_config_calc_rndv_thresh(ucp_worker_t *worker,
const ucp_ep_config_t *config,
const ucp_lane_index_t *eager_lanes,
const ucp_lane_index_t *rndv_lanes,
int recv_reg_cost)
{
ucp_context_h context = worker->context;
double diff_percent = 1.0 - context->config.ext.rndv_perf_diff / 100.0;
ucp_ep_thresh_params_t eager_zcopy;
ucp_ep_thresh_params_t rndv;
double numerator, denumerator;
ucp_rsc_index_t eager_rsc_index;
uct_iface_attr_t *eager_iface_attr;
double rts_latency;
/* All formulas and descriptions are listed at
* https://github.com/openucx/ucx/wiki/Rendezvous-Protocol-threshold-for-multilane-mode */
ucp_ep_config_calc_params(worker, config, eager_lanes, &eager_zcopy);
ucp_ep_config_calc_params(worker, config, rndv_lanes, &rndv);
if (!eager_zcopy.bw || !rndv.bw) {
goto fallback;
}
eager_rsc_index = config->key.lanes[eager_lanes[0]].rsc_index;
eager_iface_attr = ucp_worker_iface_get_attr(worker, eager_rsc_index);
/* RTS/RTR latency is used from lanes[0] */
rts_latency = ucp_tl_iface_latency(context, eager_iface_attr);
numerator = diff_percent * (rndv.reg_overhead * (1 + recv_reg_cost) +
(2 * rts_latency) + (2 * rndv.latency) +
(2 * eager_zcopy.overhead) + rndv.overhead) -
eager_zcopy.reg_overhead - eager_zcopy.overhead;
denumerator = eager_zcopy.reg_growth +
1.0 / ucs_min(eager_zcopy.bw, context->config.ext.bcopy_bw) -
diff_percent *
(rndv.reg_growth * (1 + recv_reg_cost) + 1.0 / rndv.bw);
if ((numerator > 0) && (denumerator > 0)) {
return ucs_max(numerator / denumerator, eager_iface_attr->cap.am.max_bcopy);
}
fallback:
return context->config.ext.rndv_thresh_fallback;
}
static size_t ucp_ep_thresh(size_t thresh_value, size_t min_value,
size_t max_value)
{
size_t thresh;
ucs_assert(min_value <= max_value);
thresh = ucs_max(min_value, thresh_value);
thresh = ucs_min(max_value, thresh);
return thresh;
}
static void ucp_ep_config_adjust_max_short(ssize_t *max_short,
size_t thresh)
{
*max_short = ucs_min((size_t)(*max_short + 1), thresh) - 1;
ucs_assert(*max_short >= -1);
}
/* With tag offload, SW RNDV requests are temporarily stored in the receiver
* user buffer when matched. Thus, minimum message size allowed to be sent with
* RNDV protocol should be bigger than maximal possible SW RNDV request
* (i.e. header plus packed keys size). */
size_t ucp_ep_tag_offload_min_rndv_thresh(ucp_ep_config_t *config)
{
return sizeof(ucp_rndv_rts_hdr_t) + config->tag.rndv.rkey_size;
}
static void ucp_ep_config_set_am_rndv_thresh(ucp_worker_h worker,
uct_iface_attr_t *iface_attr,
uct_md_attr_t *md_attr,
ucp_ep_config_t *config,
size_t min_rndv_thresh,
size_t max_rndv_thresh)
{
ucp_context_h context = worker->context;
size_t rndv_thresh, rndv_nbr_thresh, min_thresh;
ucs_assert(config->key.am_lane != UCP_NULL_LANE);
ucs_assert(config->key.lanes[config->key.am_lane].rsc_index != UCP_NULL_RESOURCE);
if (!ucp_ep_config_test_rndv_support(config)) {
/* Disable RNDV */
rndv_thresh = rndv_nbr_thresh = SIZE_MAX;
} else if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the AM rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
config->key.am_bw_lanes,
0);
rndv_nbr_thresh = context->config.ext.rndv_send_nbr_thresh;
ucs_trace("active message rendezvous threshold is %zu", rndv_thresh);
} else {
rndv_thresh = context->config.ext.rndv_thresh;
rndv_nbr_thresh = context->config.ext.rndv_thresh;
/* adjust max_short if rndv_thresh is set externally */
ucp_ep_config_adjust_max_short(&config->tag.eager.max_short,
rndv_thresh);
}
min_thresh = ucs_max(iface_attr->cap.am.min_zcopy, min_rndv_thresh);
config->tag.rndv.am_thresh = ucp_ep_thresh(rndv_thresh,
min_thresh,
max_rndv_thresh);
config->tag.rndv_send_nbr.am_thresh = ucp_ep_thresh(rndv_nbr_thresh,
min_thresh,
max_rndv_thresh);
ucs_trace("Active Message rndv threshold is %zu (send_nbr: %zu)",
config->tag.rndv.am_thresh, config->tag.rndv_send_nbr.am_thresh);
}
static void ucp_ep_config_set_rndv_thresh(ucp_worker_t *worker,
ucp_ep_config_t *config,
ucp_lane_index_t *lanes,
size_t min_rndv_thresh,
size_t max_rndv_thresh)
{
ucp_context_t *context = worker->context;
ucp_lane_index_t lane = lanes[0];
ucp_rsc_index_t rsc_index;
size_t rndv_thresh, rndv_nbr_thresh, min_thresh;
uct_iface_attr_t *iface_attr;
if (lane == UCP_NULL_LANE) {
ucs_debug("rendezvous (get_zcopy) protocol is not supported");
return;
}
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index == UCP_NULL_RESOURCE) {
return;
}
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
if (!ucp_ep_config_test_rndv_support(config)) {
/* Disable RNDV */
rndv_thresh = rndv_nbr_thresh = SIZE_MAX;
} else if (context->config.ext.rndv_thresh == UCS_MEMUNITS_AUTO) {
/* auto - Make UCX calculate the RMA (get_zcopy) rndv threshold on its own.*/
rndv_thresh = ucp_ep_config_calc_rndv_thresh(worker, config,
config->key.am_bw_lanes,
lanes, 1);
rndv_nbr_thresh = context->config.ext.rndv_send_nbr_thresh;
} else {
rndv_thresh = context->config.ext.rndv_thresh;
rndv_nbr_thresh = context->config.ext.rndv_thresh;
/* adjust max_short if rndv_thresh is set externally */
ucp_ep_config_adjust_max_short(&config->tag.eager.max_short,
rndv_thresh);
}
min_thresh = ucs_max(iface_attr->cap.get.min_zcopy, min_rndv_thresh);
/* TODO: need to check minimal PUT Zcopy */
config->tag.rndv.rma_thresh = ucp_ep_thresh(rndv_thresh,
min_thresh,
max_rndv_thresh);
config->tag.rndv_send_nbr.rma_thresh = ucp_ep_thresh(rndv_nbr_thresh,
min_thresh,
max_rndv_thresh);
ucs_trace("rndv threshold is %zu (send_nbr: %zu)",
config->tag.rndv.rma_thresh, config->tag.rndv_send_nbr.rma_thresh);
}
static void ucp_ep_config_set_memtype_thresh(ucp_memtype_thresh_t *max_eager_short,
ssize_t max_short, int num_mem_type_mds)
{
if (!num_mem_type_mds) {
max_eager_short->memtype_off = max_short;
}
max_eager_short->memtype_on = max_short;
}
static void ucp_ep_config_init_attrs(ucp_worker_t *worker, ucp_rsc_index_t rsc_index,
ucp_ep_msg_config_t *config, size_t max_short,
size_t max_bcopy, size_t max_zcopy,
size_t max_iov, uint64_t short_flag,
uint64_t bcopy_flag, uint64_t zcopy_flag,
unsigned hdr_len, size_t adjust_min_val)
{
ucp_context_t *context = worker->context;
const uct_md_attr_t *md_attr;
uct_iface_attr_t *iface_attr;
size_t it;
size_t zcopy_thresh;
int mem_type;
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
if ((iface_attr->cap.flags & short_flag)) {
config->max_short = max_short - hdr_len;
} else {
config->max_short = -1;
}
if (iface_attr->cap.flags & bcopy_flag) {
config->max_bcopy = max_bcopy;
} else {
config->max_bcopy = SIZE_MAX;
}
md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;
if (!(iface_attr->cap.flags & zcopy_flag) ||
((md_attr->cap.flags & UCT_MD_FLAG_NEED_MEMH) &&
!(md_attr->cap.flags & UCT_MD_FLAG_REG))) {
return;
}
config->max_zcopy = max_zcopy;
config->max_iov = ucs_min(UCP_MAX_IOV, max_iov);
if (context->config.ext.zcopy_thresh == UCS_MEMUNITS_AUTO) {
config->zcopy_auto_thresh = 1;
for (it = 0; it < UCP_MAX_IOV; ++it) {
zcopy_thresh = ucp_ep_config_get_zcopy_auto_thresh(it + 1,
&md_attr->reg_cost,
context,
ucp_tl_iface_bandwidth(context,
&iface_attr->bandwidth));
zcopy_thresh = ucs_min(zcopy_thresh, adjust_min_val);
config->sync_zcopy_thresh[it] = zcopy_thresh;
config->zcopy_thresh[it] = zcopy_thresh;
}
} else {
config->zcopy_auto_thresh = 0;
config->sync_zcopy_thresh[0] = config->zcopy_thresh[0] =
ucs_min(context->config.ext.zcopy_thresh, adjust_min_val);
/* adjust max_short if zcopy_thresh is set externally */
ucp_ep_config_adjust_max_short(&config->max_short,
config->zcopy_thresh[0]);
}
for (mem_type = 0; mem_type < UCS_MEMORY_TYPE_LAST; mem_type++) {
if (UCP_MEM_IS_ACCESSIBLE_FROM_CPU(mem_type)) {
config->mem_type_zcopy_thresh[mem_type] = config->zcopy_thresh[0];
} else if (md_attr->cap.reg_mem_types & UCS_BIT(mem_type)) {
config->mem_type_zcopy_thresh[mem_type] = 1;
}
}
}
static ucs_status_t ucp_ep_config_key_copy(ucp_ep_config_key_t *dst,
const ucp_ep_config_key_t *src)
{
*dst = *src;
dst->dst_md_cmpts = ucs_calloc(ucs_popcount(src->reachable_md_map),
sizeof(*dst->dst_md_cmpts),
"ucp_dst_md_cmpts");
if (dst->dst_md_cmpts == NULL) {
ucs_error("failed to allocate ucp_ep dest component list");
return UCS_ERR_NO_MEMORY;
}
memcpy(dst->dst_md_cmpts, src->dst_md_cmpts,
ucs_popcount(src->reachable_md_map) * sizeof(*dst->dst_md_cmpts));
return UCS_OK;
}
ucs_status_t ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config,
const ucp_ep_config_key_t *key)
{
ucp_context_h context = worker->context;
ucp_lane_index_t tag_lanes[2] = {UCP_NULL_LANE, UCP_NULL_LANE};
ucp_lane_index_t get_zcopy_lane_count;
ucp_lane_index_t put_zcopy_lane_count;
ucp_ep_rma_config_t *rma_config;
uct_iface_attr_t *iface_attr;
uct_md_attr_t *md_attr;
ucs_memory_type_t mem_type;
ucp_rsc_index_t rsc_index;
ucp_lane_index_t lane;
size_t it;
size_t max_rndv_thresh;
size_t max_am_rndv_thresh;
size_t min_rndv_thresh;
size_t min_am_rndv_thresh;
ucs_status_t status;
double rndv_max_bw;
int i;
memset(config, 0, sizeof(*config));
status = ucp_ep_config_key_copy(&config->key, key);
if (status != UCS_OK) {
return status;
}
/* Default settings */
for (it = 0; it < UCP_MAX_IOV; ++it) {
config->am.zcopy_thresh[it] = SIZE_MAX;
config->am.sync_zcopy_thresh[it] = SIZE_MAX;
config->tag.eager.zcopy_thresh[it] = SIZE_MAX;
config->tag.eager.sync_zcopy_thresh[it] = SIZE_MAX;
}
UCS_STATIC_ASSERT(UCS_MEMORY_TYPE_HOST == 0);
for (mem_type = UCS_MEMORY_TYPE_HOST; mem_type < UCS_MEMORY_TYPE_LAST; mem_type++) {
config->am.mem_type_zcopy_thresh[mem_type] = SIZE_MAX;
config->tag.eager.mem_type_zcopy_thresh[mem_type] = SIZE_MAX;
}
config->tag.eager.zcopy_auto_thresh = 0;
config->am.zcopy_auto_thresh = 0;
config->p2p_lanes = 0;
config->bcopy_thresh = context->config.ext.bcopy_thresh;
config->tag.lane = UCP_NULL_LANE;
config->tag.proto = &ucp_tag_eager_proto;
config->tag.sync_proto = &ucp_tag_eager_sync_proto;
config->tag.rndv.rma_thresh = SIZE_MAX;
config->tag.rndv.min_get_zcopy = 0;
config->tag.rndv.max_get_zcopy = SIZE_MAX;
config->tag.rndv.min_put_zcopy = 0;
config->tag.rndv.max_put_zcopy = SIZE_MAX;
config->tag.rndv.am_thresh = SIZE_MAX;
config->tag.rndv_send_nbr.am_thresh = SIZE_MAX;
config->tag.rndv_send_nbr.rma_thresh = SIZE_MAX;
config->tag.rndv.rkey_size = ucp_rkey_packed_size(context,
config->key.rma_bw_md_map);
for (lane = 0; lane < UCP_MAX_LANES; ++lane) {
config->tag.rndv.get_zcopy_lanes[lane] = UCP_NULL_LANE;
config->tag.rndv.put_zcopy_lanes[lane] = UCP_NULL_LANE;
}
config->tag.rndv.rkey_ptr_dst_mds = 0;
config->stream.proto = &ucp_stream_am_proto;
config->am_u.proto = &ucp_am_proto;
config->am_u.reply_proto = &ucp_am_reply_proto;
max_rndv_thresh = SIZE_MAX;
max_am_rndv_thresh = SIZE_MAX;
min_am_rndv_thresh = 0;
config->tag.offload.max_eager_short.memtype_on = -1;
config->tag.offload.max_eager_short.memtype_off = -1;
config->tag.max_eager_short.memtype_on = -1;
config->tag.max_eager_short.memtype_off = -1;
for (lane = 0; lane < config->key.num_lanes; ++lane) {
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
config->md_index[lane] = context->tl_rscs[rsc_index].md_index;
if (ucp_worker_is_tl_p2p(worker, rsc_index)) {
config->p2p_lanes |= UCS_BIT(lane);
}
} else {
config->md_index[lane] = UCP_NULL_RESOURCE;
}
}
/* configuration for rndv */
get_zcopy_lane_count = 0;
put_zcopy_lane_count = 0;
rndv_max_bw = 0;
for (i = 0; (i < config->key.num_lanes) &&
(config->key.rma_bw_lanes[i] != UCP_NULL_LANE); ++i) {
lane = config->key.rma_bw_lanes[i];
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;
/* Rkey_ptr */
if (md_attr->cap.flags & UCT_MD_FLAG_RKEY_PTR) {
config->tag.rndv.rkey_ptr_dst_mds |=
UCS_BIT(config->key.lanes[lane].dst_md_index);
}
/* GET Zcopy */
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_ZCOPY) {
config->tag.rndv.min_get_zcopy = ucs_max(config->tag.rndv.min_get_zcopy,
iface_attr->cap.get.min_zcopy);
config->tag.rndv.max_get_zcopy = ucs_min(config->tag.rndv.max_get_zcopy,
iface_attr->cap.get.max_zcopy);
ucs_assert(get_zcopy_lane_count < UCP_MAX_LANES);
config->tag.rndv.get_zcopy_lanes[get_zcopy_lane_count++] = lane;
}
/* PUT Zcopy */
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY) {
config->tag.rndv.min_put_zcopy = ucs_max(config->tag.rndv.min_put_zcopy,
iface_attr->cap.put.min_zcopy);
config->tag.rndv.max_put_zcopy = ucs_min(config->tag.rndv.max_put_zcopy,
iface_attr->cap.put.max_zcopy);
ucs_assert(put_zcopy_lane_count < UCP_MAX_LANES);
config->tag.rndv.put_zcopy_lanes[put_zcopy_lane_count++] = lane;
}
rndv_max_bw = ucs_max(rndv_max_bw,
ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth));
}
}
if (get_zcopy_lane_count == 0) {
/* if there are no RNDV RMA BW lanes that support GET Zcopy, reset
* min/max values to show that the scheme is unsupported */
config->tag.rndv.min_get_zcopy = SIZE_MAX;
config->tag.rndv.max_get_zcopy = 0;
}
if (put_zcopy_lane_count == 0) {
/* if there are no RNDV RMA BW lanes that support PUT Zcopy, reset
* min/max values to show that the scheme is unsupported */
config->tag.rndv.min_put_zcopy = SIZE_MAX;
config->tag.rndv.max_put_zcopy = 0;
}
if (rndv_max_bw > 0) {
for (i = 0; (i < config->key.num_lanes) &&
(config->key.rma_bw_lanes[i] != UCP_NULL_LANE); ++i) {
lane = config->key.rma_bw_lanes[i];
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
config->tag.rndv.scale[lane] =
ucp_tl_iface_bandwidth(context, &iface_attr->bandwidth) /
rndv_max_bw;
}
}
}
/* Configuration for tag offload */
if (config->key.tag_lane != UCP_NULL_LANE) {
lane = config->key.tag_lane;
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
ucp_ep_config_init_attrs(worker, rsc_index, &config->tag.eager,
iface_attr->cap.tag.eager.max_short,
iface_attr->cap.tag.eager.max_bcopy,
iface_attr->cap.tag.eager.max_zcopy,
iface_attr->cap.tag.eager.max_iov,
UCT_IFACE_FLAG_TAG_EAGER_SHORT,
UCT_IFACE_FLAG_TAG_EAGER_BCOPY,
UCT_IFACE_FLAG_TAG_EAGER_ZCOPY, 0,
iface_attr->cap.tag.eager.max_bcopy);
config->tag.offload.max_rndv_iov = iface_attr->cap.tag.rndv.max_iov;
config->tag.offload.max_rndv_zcopy = iface_attr->cap.tag.rndv.max_zcopy;
config->tag.sync_proto = &ucp_tag_offload_sync_proto;
config->tag.proto = &ucp_tag_offload_proto;
config->tag.lane = lane;
max_rndv_thresh = iface_attr->cap.tag.eager.max_zcopy;
max_am_rndv_thresh = iface_attr->cap.tag.eager.max_bcopy;
min_rndv_thresh = ucp_ep_tag_offload_min_rndv_thresh(config);
min_am_rndv_thresh = min_rndv_thresh;
ucs_assert_always(iface_attr->cap.tag.rndv.max_hdr >=
sizeof(ucp_tag_offload_unexp_rndv_hdr_t));
if (config->key.am_lane != UCP_NULL_LANE) {
/* Must have active messages for using rendezvous */
tag_lanes[0] = lane;
ucp_ep_config_set_rndv_thresh(worker, config, tag_lanes,
min_rndv_thresh, max_rndv_thresh);
}
/* Max Eager short has to be set after Zcopy and RNDV thresholds */
ucp_ep_config_set_memtype_thresh(&config->tag.offload.max_eager_short,
config->tag.eager.max_short,
context->num_mem_type_detect_mds);
}
}
/* Configuration for active messages */
if (config->key.am_lane != UCP_NULL_LANE) {
lane = config->key.am_lane;
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
md_attr = &context->tl_mds[context->tl_rscs[rsc_index].md_index].attr;
ucp_ep_config_init_attrs(worker, rsc_index, &config->am,
iface_attr->cap.am.max_short,
iface_attr->cap.am.max_bcopy,
iface_attr->cap.am.max_zcopy,
iface_attr->cap.am.max_iov,
UCT_IFACE_FLAG_AM_SHORT,
UCT_IFACE_FLAG_AM_BCOPY,
UCT_IFACE_FLAG_AM_ZCOPY,
sizeof(ucp_eager_hdr_t), SIZE_MAX);
/* All keys must fit in RNDV packet.
* TODO remove some MDs if they don't
*/
ucs_assert_always(config->tag.rndv.rkey_size <= config->am.max_bcopy);
if (!ucp_ep_is_tag_offload_enabled(config)) {
/* Tag offload is disabled, AM will be used for all
* tag-matching protocols */
/* TODO: set threshold level based on all available lanes */
config->tag.eager = config->am;
config->tag.lane = lane;
min_rndv_thresh = iface_attr->cap.get.min_zcopy;
min_am_rndv_thresh = iface_attr->cap.am.min_zcopy;
ucp_ep_config_set_rndv_thresh(worker, config,
config->key.rma_bw_lanes,
min_rndv_thresh,
max_rndv_thresh);
/* Max Eager short has to be set after Zcopy and RNDV thresholds */
ucp_ep_config_set_memtype_thresh(&config->tag.max_eager_short,
config->tag.eager.max_short,
context->num_mem_type_detect_mds);
}
/* Calculate rndv threshold for AM Rendezvous, which may be used by
* any tag-matching protocol (AM and offload). */
ucp_ep_config_set_am_rndv_thresh(worker, iface_attr, md_attr, config,
min_am_rndv_thresh,
max_am_rndv_thresh);
} else {
/* Stub endpoint */
config->am.max_bcopy = UCP_MIN_BCOPY;
}
}
memset(&config->rma, 0, sizeof(config->rma));
/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
rma_config = &config->rma[lane];
rma_config->put_zcopy_thresh = SIZE_MAX;
rma_config->get_zcopy_thresh = SIZE_MAX;
rma_config->max_put_short = SIZE_MAX;
rma_config->max_get_short = SIZE_MAX;
rma_config->max_put_bcopy = SIZE_MAX;
rma_config->max_get_bcopy = SIZE_MAX;
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}
rsc_index = config->key.lanes[lane].rsc_index;
if (rsc_index != UCP_NULL_RESOURCE) {
iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
/* PUT */
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY) {
rma_config->max_put_zcopy = iface_attr->cap.put.max_zcopy;
/* TODO: formula */
if (context->config.ext.zcopy_thresh == UCS_MEMUNITS_AUTO) {
rma_config->put_zcopy_thresh = 16384;
} else {
rma_config->put_zcopy_thresh = context->config.ext.zcopy_thresh;
}
rma_config->put_zcopy_thresh = ucs_max(rma_config->put_zcopy_thresh,
iface_attr->cap.put.min_zcopy);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_BCOPY) {
rma_config->max_put_bcopy = ucs_min(iface_attr->cap.put.max_bcopy,
rma_config->put_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_SHORT) {
rma_config->max_put_short = ucs_min(iface_attr->cap.put.max_short,
rma_config->max_put_bcopy);
}
/* GET */
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_ZCOPY) {
/* TODO: formula */
rma_config->max_get_zcopy = iface_attr->cap.get.max_zcopy;
if (context->config.ext.zcopy_thresh == UCS_MEMUNITS_AUTO) {
rma_config->get_zcopy_thresh = 16384;
} else {
rma_config->get_zcopy_thresh = context->config.ext.zcopy_thresh;
}
rma_config->get_zcopy_thresh = ucs_max(rma_config->get_zcopy_thresh,
iface_attr->cap.get.min_zcopy);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_BCOPY) {
rma_config->max_get_bcopy = ucs_min(iface_attr->cap.get.max_bcopy,
rma_config->get_zcopy_thresh);
}
if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_SHORT) {
rma_config->max_get_short = ucs_min(iface_attr->cap.get.max_short,
rma_config->max_get_bcopy);
}
} else {
rma_config->max_put_bcopy = UCP_MIN_BCOPY; /* Stub endpoint */
}
}
return UCS_OK;
}
void ucp_ep_config_cleanup(ucp_worker_h worker, ucp_ep_config_t *config)
{
ucs_free(config->key.dst_md_cmpts);
}
static int ucp_ep_is_short_lower_thresh(ssize_t max_short,
size_t thresh)
{
return ((max_short < 0) ||
(((size_t)max_short + 1) < thresh));
}
static void ucp_ep_config_print_tag_proto(FILE *stream, const char *name,
ssize_t max_eager_short,
size_t zcopy_thresh,
size_t rndv_rma_thresh,
size_t rndv_am_thresh)
{
size_t max_bcopy, min_rndv, max_short;
min_rndv = ucs_min(rndv_rma_thresh, rndv_am_thresh);
max_bcopy = ucs_min(zcopy_thresh, min_rndv);
fprintf(stream, "# %23s: 0", name);
/* print eager short */
if (max_eager_short > 0) {
max_short = max_eager_short;
ucs_assert(max_short <= SSIZE_MAX);
fprintf(stream, "..<egr/short>..%zu" , max_short + 1);
} else if (!max_eager_short) {
fprintf(stream, "..<egr/short>..%zu" , max_eager_short);
}
/* print eager bcopy */
if (ucp_ep_is_short_lower_thresh(max_eager_short, max_bcopy) && max_bcopy) {
fprintf(stream, "..<egr/bcopy>..");
if (max_bcopy < SIZE_MAX) {
fprintf(stream, "%zu", max_bcopy);
}
}
/* print eager zcopy */
if (ucp_ep_is_short_lower_thresh(max_eager_short, min_rndv) &&
(zcopy_thresh < min_rndv)) {
fprintf(stream, "..<egr/zcopy>..");
if (min_rndv < SIZE_MAX) {
fprintf(stream, "%zu", min_rndv);
}
}
/* print rendezvous */
if (min_rndv < SIZE_MAX) {
fprintf(stream, "..<rndv>..");
}
fprintf(stream, "(inf)\n");
}
static void ucp_ep_config_print_rma_proto(FILE *stream, const char *name,
ucp_lane_index_t lane,
size_t bcopy_thresh, size_t zcopy_thresh)
{
fprintf(stream, "# %20s[%d]: 0", name, lane);
if (bcopy_thresh > 0) {
fprintf(stream, "..<short>");
}
if (bcopy_thresh < zcopy_thresh) {
if (bcopy_thresh > 0) {
fprintf(stream, "..%zu", bcopy_thresh);
}
fprintf(stream, "..<bcopy>");
}
if (zcopy_thresh < SIZE_MAX) {
fprintf(stream, "..%zu..<zcopy>", zcopy_thresh);
}
fprintf(stream, "..(inf)\n");
}
int ucp_ep_config_get_multi_lane_prio(const ucp_lane_index_t *lanes,
ucp_lane_index_t lane)
{
int prio;
for (prio = 0; prio < UCP_MAX_LANES; ++prio) {
if (lane == lanes[prio]) {
return prio;
}
}
return -1;
}
void ucp_ep_config_lane_info_str(ucp_context_h context,
const ucp_ep_config_key_t *key,
const unsigned *addr_indices,
ucp_lane_index_t lane,
ucp_rsc_index_t aux_rsc_index,
char *buf, size_t max)
{
uct_tl_resource_desc_t *rsc;
ucp_rsc_index_t rsc_index;
ucp_lane_index_t proxy_lane;
ucp_md_index_t dst_md_index;
ucp_rsc_index_t cmpt_index;
char *p, *endp;
char *desc_str;
int prio;
p = buf;
endp = buf + max;
rsc_index = key->lanes[lane].rsc_index;
proxy_lane = key->lanes[lane].proxy_lane;
rsc = &context->tl_rscs[rsc_index].tl_rsc;
if ((proxy_lane == lane) || (proxy_lane == UCP_NULL_LANE)) {
if (key->lanes[lane].proxy_lane == lane) {
desc_str = " <proxy>";
} else {
desc_str = "";
}
snprintf(p, endp - p, "lane[%d]: %2d:" UCT_TL_RESOURCE_DESC_FMT " md[%d]%s %-*c-> ",
lane, rsc_index, UCT_TL_RESOURCE_DESC_ARG(rsc),
context->tl_rscs[rsc_index].md_index, desc_str,
20 - (int)(strlen(rsc->dev_name) + strlen(rsc->tl_name) + strlen(desc_str)),
' ');
p += strlen(p);
if (addr_indices != NULL) {
snprintf(p, endp - p, "addr[%d].", addr_indices[lane]);
p += strlen(p);
}
} else {
snprintf(p, endp - p, "lane[%d]: proxy to lane[%d] %12c -> ", lane,
proxy_lane, ' ');
p += strlen(p);
}
dst_md_index = key->lanes[lane].dst_md_index;
cmpt_index = ucp_ep_config_get_dst_md_cmpt(key, dst_md_index);
snprintf(p, endp - p, "md[%d]/%-8s", dst_md_index,
context->tl_cmpts[cmpt_index].attr.name);
p += strlen(p);
prio = ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " rma#%d", prio);
p += strlen(p);
}
prio = ucp_ep_config_get_multi_lane_prio(key->rma_bw_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " rma_bw#%d", prio);
p += strlen(p);
}
prio = ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " amo#%d", prio);
p += strlen(p);
}
if (key->am_lane == lane) {
snprintf(p, endp - p, " am");
p += strlen(p);
}
prio = ucp_ep_config_get_multi_lane_prio(key->am_bw_lanes, lane);
if (prio != -1) {
snprintf(p, endp - p, " am_bw#%d", prio);
p += strlen(p);
}
if (lane == key->tag_lane) {
snprintf(p, endp - p, " tag_offload");
p += strlen(p);
}
if (key->wireup_lane == lane) {
snprintf(p, endp - p, " wireup");
p += strlen(p);
if (aux_rsc_index != UCP_NULL_RESOURCE) {
snprintf(p, endp - p, "{" UCT_TL_RESOURCE_DESC_FMT "}",
UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[aux_rsc_index].tl_rsc));
}
}
}
static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,
const ucp_ep_config_t *config,
const unsigned *addr_indices,
ucp_rsc_index_t aux_rsc_index)
{
ucp_context_h context = worker->context;
char lane_info[128] = {0};
ucp_md_index_t md_index;
ucp_lane_index_t lane;
for (lane = 0; lane < config->key.num_lanes; ++lane) {
ucp_ep_config_lane_info_str(context, &config->key, addr_indices, lane,
aux_rsc_index, lane_info, sizeof(lane_info));
fprintf(stream, "# %s\n", lane_info);
}
fprintf(stream, "#\n");
if (context->config.features & UCP_FEATURE_TAG) {
ucp_ep_config_print_tag_proto(stream, "tag_send",
config->tag.eager.max_short,
config->tag.eager.zcopy_thresh[0],
config->tag.rndv.rma_thresh,
config->tag.rndv.am_thresh);
ucp_ep_config_print_tag_proto(stream, "tag_send_nbr",
config->tag.eager.max_short,
/* disable zcopy */
ucs_min(config->tag.rndv_send_nbr.rma_thresh,
config->tag.rndv_send_nbr.am_thresh),
config->tag.rndv_send_nbr.rma_thresh,
config->tag.rndv_send_nbr.am_thresh);
ucp_ep_config_print_tag_proto(stream, "tag_send_sync",
config->tag.eager.max_short,
config->tag.eager.sync_zcopy_thresh[0],
config->tag.rndv.rma_thresh,
config->tag.rndv.am_thresh);
}
if (context->config.features & UCP_FEATURE_RMA) {
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
continue;
}
ucp_ep_config_print_rma_proto(stream, "put", lane,
ucs_max(config->rma[lane].max_put_short + 1,
config->bcopy_thresh),
config->rma[lane].put_zcopy_thresh);
ucp_ep_config_print_rma_proto(stream, "get", lane, 0,
config->rma[lane].get_zcopy_thresh);
}
}
if (context->config.features & (UCP_FEATURE_TAG|UCP_FEATURE_RMA)) {
fprintf(stream, "#\n");
fprintf(stream, "# %23s: mds ", "rma_bw");
ucs_for_each_bit(md_index, config->key.rma_bw_md_map) {
fprintf(stream, "[%d] ", md_index);
}
}
if (context->config.features & UCP_FEATURE_TAG) {
fprintf(stream, "rndv_rkey_size %zu\n", config->tag.rndv.rkey_size);
}
}
void ucp_ep_print_info(ucp_ep_h ep, FILE *stream)
{
ucp_rsc_index_t aux_rsc_index;
ucp_lane_index_t wireup_lane;
uct_ep_h wireup_ep;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
fprintf(stream, "#\n");
fprintf(stream, "# UCP endpoint\n");
fprintf(stream, "#\n");
fprintf(stream, "# peer: %s\n", ucp_ep_peer_name(ep));
/* if there is a wireup lane, set aux_rsc_index to the stub ep resource */
aux_rsc_index = UCP_NULL_RESOURCE;
wireup_lane = ucp_ep_config(ep)->key.wireup_lane;
if (wireup_lane != UCP_NULL_LANE) {
wireup_ep = ep->uct_eps[wireup_lane];
if (ucp_wireup_ep_test(wireup_ep)) {
aux_rsc_index = ucp_wireup_ep_get_aux_rsc_index(wireup_ep);
}
}
ucp_ep_config_print(stream, ep->worker, ucp_ep_config(ep), NULL,
aux_rsc_index);
fprintf(stream, "#\n");
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
}
size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt,
const uct_linear_growth_t *reg_cost,
const ucp_context_h context,
double bandwidth)
{
double zcopy_thresh;
double bcopy_bw = context->config.ext.bcopy_bw;
zcopy_thresh = (iovcnt * reg_cost->overhead) /
((1.0 / bcopy_bw) - (1.0 / bandwidth) - (iovcnt * reg_cost->growth));
if (zcopy_thresh < 0.0) {
return SIZE_MAX;
}
return zcopy_thresh;
}
ucp_wireup_ep_t * ucp_ep_get_cm_wireup_ep(ucp_ep_h ep)
{
const ucp_lane_index_t lane = ucp_ep_get_cm_lane(ep);
if (lane == UCP_NULL_LANE) {
return NULL;
}
return ucp_wireup_ep_test(ep->uct_eps[lane]) ?
ucs_derived_of(ep->uct_eps[lane], ucp_wireup_ep_t) : NULL;
}
uct_ep_h ucp_ep_get_cm_uct_ep(ucp_ep_h ep)
{
ucp_lane_index_t lane;
ucp_wireup_ep_t *wireup_ep;
lane = ucp_ep_get_cm_lane(ep);
if (lane == UCP_NULL_LANE) {
return NULL;
}
wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
return (wireup_ep == NULL) ? ep->uct_eps[lane] : wireup_ep->super.uct_ep;
}
int ucp_ep_is_cm_local_connected(ucp_ep_h ep)
{
return (ucp_ep_get_cm_lane(ep) != UCP_NULL_LANE) &&
(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED);
}
uint64_t ucp_ep_get_tl_bitmap(ucp_ep_h ep)
{
uint64_t tl_bitmap = 0;
ucp_lane_index_t lane;
ucp_rsc_index_t rsc_idx;
for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
if (lane == ucp_ep_get_cm_lane(ep)) {
continue;
}
rsc_idx = ucp_ep_get_rsc_index(ep, lane);
if (rsc_idx == UCP_NULL_RESOURCE) {
continue;
}
tl_bitmap |= UCS_BIT(rsc_idx);
}
return tl_bitmap;
}
void ucp_ep_invoke_err_cb(ucp_ep_h ep, ucs_status_t status)
{
/* Do not invoke error handler if it's not enabled */
if ((ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) ||
/* error callback is not set */
(ucp_ep_ext_gen(ep)->err_cb == NULL) ||
/* the EP has been closed by user */
(ep->flags & UCP_EP_FLAG_CLOSED)) {
return;
}
ucs_assert(ep->flags & UCP_EP_FLAG_USED);
ucs_debug("ep %p: calling user error callback %p with arg %p and status %s",
ep, ucp_ep_ext_gen(ep)->err_cb, ucp_ep_ext_gen(ep)->user_data,
ucs_status_string(status));
ucp_ep_ext_gen(ep)->err_cb(ucp_ep_ext_gen(ep)->user_data, ep, status);
}
int ucp_ep_config_test_rndv_support(const ucp_ep_config_t *config)
{
return (config->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) ||
(config->key.cm_lane != UCP_NULL_LANE);
}