/**
* Copyright (C) Mellanox Technologies Ltd. 2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "wireup_cm.h"
#include <ucp/core/ucp_listener.h>
#include <ucp/core/ucp_request.inl>
#include <ucp/wireup/wireup.h>
#include <ucp/wireup/wireup_ep.h>
#include <ucs/sys/sock.h>
#include <ucs/sys/string.h>
unsigned
ucp_cm_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params)
{
if (!ucp_worker_sockaddr_is_cm_proto(worker)) {
return 0;
}
if (params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR) {
return UCP_EP_INIT_CM_WIREUP_CLIENT;
}
if (params->field_mask & UCP_EP_PARAM_FIELD_CONN_REQUEST) {
return UCP_EP_INIT_CM_WIREUP_SERVER;
}
return 0;
}
static ucs_status_t
ucp_cm_ep_client_initial_config_get(ucp_ep_h ucp_ep, const char *dev_name,
ucp_ep_config_key_t *key)
{
ucp_worker_h worker = ucp_ep->worker;
uint64_t addr_pack_flags = UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR |
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR;
ucp_wireup_ep_t *wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep);
uint64_t tl_bitmap = ucp_context_dev_tl_bitmap(worker->context,
dev_name);
void *ucp_addr;
size_t ucp_addr_size;
ucp_unpacked_address_t unpacked_addr;
unsigned addr_indices[UCP_MAX_RESOURCES];
ucs_status_t status;
ucs_assert_always(wireup_ep != NULL);
/* Construct local dummy address for lanes selection taking an assumption
* that server has the transports which are the best from client's
* perspective. */
status = ucp_address_pack(worker, NULL, tl_bitmap, addr_pack_flags, NULL,
&ucp_addr_size, &ucp_addr);
if (status != UCS_OK) {
goto out;
}
status = ucp_address_unpack(worker, ucp_addr, addr_pack_flags,
&unpacked_addr);
if (status != UCS_OK) {
goto free_ucp_addr;
}
ucs_assert(unpacked_addr.address_count <= UCP_MAX_RESOURCES);
ucp_ep_config_key_reset(key);
ucp_ep_config_key_set_err_mode(key, wireup_ep->ep_init_flags);
status = ucp_wireup_select_lanes(ucp_ep, wireup_ep->ep_init_flags,
tl_bitmap, &unpacked_addr, addr_indices,
key);
ucs_free(unpacked_addr.address_list);
free_ucp_addr:
ucs_free(ucp_addr);
out:
return status;
}
static void ucp_cm_priv_data_pack(ucp_wireup_sockaddr_data_t *sa_data,
ucp_ep_h ep, ucp_rsc_index_t dev_index,
const ucp_address_t *addr, size_t addr_size)
{
ucs_assert((int)ucp_ep_config(ep)->key.err_mode <= UINT8_MAX);
ucs_assert(dev_index != UCP_NULL_RESOURCE);
sa_data->ep_ptr = (uintptr_t)ep;
sa_data->err_mode = ucp_ep_config(ep)->key.err_mode;
sa_data->addr_mode = UCP_WIREUP_SA_DATA_CM_ADDR;
sa_data->dev_index = dev_index;
memcpy(sa_data + 1, addr, addr_size);
}
static ssize_t ucp_cm_client_priv_pack_cb(void *arg, const char *dev_name,
void *priv_data)
{
ucp_wireup_sockaddr_data_t *sa_data = priv_data;
ucp_ep_h ep = arg;
ucp_worker_h worker = ep->worker;
uct_cm_h cm = worker->cms[/*cm_idx = */ 0].cm;
ucp_rsc_index_t dev_index = UCP_NULL_RESOURCE;
ucp_ep_config_key_t key;
uint64_t tl_bitmap;
uct_ep_h tl_ep;
ucp_wireup_ep_t *cm_wireup_ep;
uct_cm_attr_t cm_attr;
uct_ep_params_t tl_ep_params;
void* ucp_addr;
size_t ucp_addr_size;
ucs_status_t status;
ucp_lane_index_t lane_idx;
ucp_rsc_index_t rsc_idx;
UCS_ASYNC_BLOCK(&worker->async);
status = ucp_cm_ep_client_initial_config_get(ep, dev_name, &key);
if (status != UCS_OK) {
goto out;
}
/* At this point the ep has only CM lane */
ucs_assert((ucp_ep_num_lanes(ep) == 1) &&
(ucp_ep_get_cm_lane(ep) != UCP_NULL_LANE));
/* Detach it before reconfiguration and restore then */
cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ep);
ucs_assert(cm_wireup_ep != NULL);
status = ucp_worker_get_ep_config(worker, &key, 0, &ep->cfg_index);
if (status != UCS_OK) {
goto out;
}
ep->am_lane = key.am_lane;
cm_attr.field_mask = UCT_CM_ATTR_FIELD_MAX_CONN_PRIV;
status = uct_cm_query(cm, &cm_attr);
if (status != UCS_OK) {
goto out;
}
tl_bitmap = 0;
for (lane_idx = 0; lane_idx < ucp_ep_num_lanes(ep); ++lane_idx) {
if (lane_idx == ucp_ep_get_cm_lane(ep)) {
ep->uct_eps[lane_idx] = &cm_wireup_ep->super.super;
continue;
}
rsc_idx = ucp_ep_get_rsc_index(ep, lane_idx);
if (rsc_idx == UCP_NULL_RESOURCE) {
continue;
}
status = ucp_wireup_ep_create(ep, &ep->uct_eps[lane_idx]);
if (status != UCS_OK) {
goto out;
}
ucs_assert((dev_index == UCP_NULL_RESOURCE) ||
(dev_index == worker->context->tl_rscs[rsc_idx].dev_index));
dev_index = worker->context->tl_rscs[rsc_idx].dev_index;
tl_bitmap |= UCS_BIT(rsc_idx);
if (ucp_worker_is_tl_p2p(worker, rsc_idx)) {
tl_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
tl_ep_params.iface = ucp_worker_iface(worker, rsc_idx)->iface;
status = uct_ep_create(&tl_ep_params, &tl_ep);
if (status != UCS_OK) {
/* coverity[leaked_storage] */
goto out;
}
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane_idx], tl_ep);
} else {
ucs_assert(ucp_worker_iface_get_attr(worker, rsc_idx)->cap.flags &
UCT_IFACE_FLAG_CONNECT_TO_IFACE);
}
}
/* Make sure that CM lane is restored */
ucs_assert(cm_wireup_ep == ucp_ep_get_cm_wireup_ep(ep));
/* Don't pack the device address to reduce address size, it will be
* delivered by uct_listener_conn_request_callback_t in
* uct_cm_remote_data_t */
status = ucp_address_pack(worker, ep, tl_bitmap,
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR |
UCP_ADDRESS_PACK_FLAG_EP_ADDR,
NULL, &ucp_addr_size, &ucp_addr);
if (status != UCS_OK) {
goto out;
}
if (cm_attr.max_conn_priv < (sizeof(*sa_data) + ucp_addr_size)) {
ucs_error("CM private data buffer is to small to pack UCP endpoint info, "
"ep %p service data %lu, address length %lu, cm %p max_conn_priv %lu",
ep, sizeof(*sa_data), ucp_addr_size, cm,
cm_attr.max_conn_priv);
status = UCS_ERR_BUFFER_TOO_SMALL;
goto free_addr;
}
ucp_cm_priv_data_pack(sa_data, ep, dev_index, ucp_addr, ucp_addr_size);
free_addr:
ucs_free(ucp_addr);
out:
if (status == UCS_OK) {
ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
} else {
ucp_worker_set_ep_failed(worker, ep,
&ucp_ep_get_cm_wireup_ep(ep)->super.super,
ucp_ep_get_cm_lane(ep), status);
}
UCS_ASYNC_UNBLOCK(&worker->async);
/* coverity[leaked_storage] */
return (status == UCS_OK) ? (sizeof(*sa_data) + ucp_addr_size) : status;
}
/*
* The main thread progress part of connection establishment on client side
*/
static unsigned ucp_cm_client_connect_progress(void *arg)
{
ucp_cm_client_connect_progress_arg_t *progress_arg = arg;
ucp_ep_h ucp_ep = progress_arg->ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
ucp_context_h context = worker->context;
ucp_wireup_ep_t *wireup_ep;
ucp_unpacked_address_t addr;
uint64_t tl_bitmap;
ucp_rsc_index_t dev_index;
ucp_rsc_index_t rsc_index;
unsigned addr_idx;
unsigned addr_indices[UCP_MAX_RESOURCES];
ucs_status_t status;
wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep);
ucs_assert(wireup_ep != NULL);
status = ucp_address_unpack(worker, progress_arg->sa_data + 1,
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR |
UCP_ADDRESS_PACK_FLAG_EP_ADDR, &addr);
if (status != UCS_OK) {
goto out;
}
if (addr.address_count == 0) {
status = UCS_ERR_UNREACHABLE;
goto out_free_addr;
}
for (addr_idx = 0; addr_idx < addr.address_count; ++addr_idx) {
addr.address_list[addr_idx].dev_addr = progress_arg->dev_addr;
addr.address_list[addr_idx].dev_index = progress_arg->sa_data->dev_index;
}
UCS_ASYNC_BLOCK(&worker->async);
ucp_ep_update_dest_ep_ptr(ucp_ep, progress_arg->sa_data->ep_ptr);
ucs_assert(addr.address_count <= UCP_MAX_RESOURCES);
ucs_assert(wireup_ep->ep_init_flags & UCP_EP_INIT_CM_WIREUP_CLIENT);
/* extend tl_bitmap to all TLs on the same device as initial configuration
since TL can be changed due to server side configuration */
tl_bitmap = ucp_ep_get_tl_bitmap(ucp_ep);
ucs_assert(tl_bitmap != 0);
rsc_index = ucs_ffs64(tl_bitmap);
dev_index = context->tl_rscs[rsc_index].dev_index;
#if ENABLE_ASSERT
ucs_for_each_bit(rsc_index, tl_bitmap) {
ucs_assert(dev_index == context->tl_rscs[rsc_index].dev_index);
}
#endif
tl_bitmap = ucp_context_dev_idx_tl_bitmap(context, dev_index);
status = ucp_wireup_init_lanes(ucp_ep, wireup_ep->ep_init_flags,
tl_bitmap, &addr, addr_indices);
if (status != UCS_OK) {
goto out_unblock;
}
status = ucp_wireup_connect_local(ucp_ep, &addr, NULL);
if (status != UCS_OK) {
goto out_unblock;
}
ucp_wireup_remote_connected(ucp_ep);
out_unblock:
UCS_ASYNC_UNBLOCK(&worker->async);
out_free_addr:
ucs_free(addr.address_list);
out:
ucs_free(progress_arg->sa_data);
ucs_free(progress_arg->dev_addr);
ucs_free(progress_arg);
if (status != UCS_OK) {
ucp_worker_set_ep_failed(worker, ucp_ep, &wireup_ep->super.super,
ucp_ep_get_cm_lane(ucp_ep), status);
}
return 1;
}
static ucs_status_t
ucp_cm_remote_data_check(const uct_cm_remote_data_t *remote_data)
{
if (ucs_test_all_flags(remote_data->field_mask,
UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR |
UCT_CM_REMOTE_DATA_FIELD_DEV_ADDR_LENGTH |
UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA |
UCT_CM_REMOTE_DATA_FIELD_CONN_PRIV_DATA_LENGTH)) {
return UCS_OK;
}
ucs_error("incompatible client server connection establishment protocol");
return UCS_ERR_UNSUPPORTED;
}
/*
* Async callback on a client side which notifies that server is connected.
*/
static void ucp_cm_client_connect_cb(uct_ep_h uct_cm_ep, void *arg,
const uct_cm_remote_data_t *remote_data,
ucs_status_t status)
{
ucp_ep_h ucp_ep = (ucp_ep_h)arg;
ucp_worker_h worker = ucp_ep->worker;
uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL;
ucp_cm_client_connect_progress_arg_t *progress_arg;
if (status != UCS_OK) {
goto err_out;
}
status = ucp_cm_remote_data_check(remote_data);
if (status != UCS_OK) {
goto err_out;
}
progress_arg = ucs_malloc(sizeof(*progress_arg),
"ucp_cm_client_connect_progress_arg_t");
if (progress_arg == NULL) {
status = UCS_ERR_NO_MEMORY;
goto err_out;
}
progress_arg->sa_data = ucs_malloc(remote_data->conn_priv_data_length,
"sa data");
if (progress_arg->sa_data == NULL) {
status = UCS_ERR_NO_MEMORY;
goto err_free_arg;
}
progress_arg->dev_addr = ucs_malloc(remote_data->dev_addr_length,
"device address");
if (progress_arg->dev_addr == NULL) {
status = UCS_ERR_NO_MEMORY;
goto err_free_sa_data;
}
progress_arg->ucp_ep = ucp_ep;
memcpy(progress_arg->dev_addr, remote_data->dev_addr,
remote_data->dev_addr_length);
memcpy(progress_arg->sa_data, remote_data->conn_priv_data,
remote_data->conn_priv_data_length);
uct_worker_progress_register_safe(worker->uct,
ucp_cm_client_connect_progress,
progress_arg, UCS_CALLBACKQ_FLAG_ONESHOT,
&prog_id);
ucp_worker_signal_internal(ucp_ep->worker);
return;
err_free_sa_data:
ucs_free(progress_arg->sa_data);
err_free_arg:
ucs_free(progress_arg);
err_out:
ucp_ep->flags &= ~UCP_EP_FLAG_LOCAL_CONNECTED;
ucp_worker_set_ep_failed(worker, ucp_ep, uct_cm_ep,
ucp_ep_get_cm_lane(ucp_ep), status);
}
/*
* Internal flush completion callback which is a part of close protocol,
* this flush was initiated by remote peer in disconnect callback on CM lane.
*/
static void ucp_ep_cm_disconnect_flushed_cb(ucp_request_t *req)
{
ucp_ep_h ucp_ep = req->send.ep;
/* the EP can be closed/destroyed from err callback */
ucs_async_context_t *async = &ucp_ep->worker->async;
UCS_ASYNC_BLOCK(async);
ucp_ep_cm_disconnect_cm_lane(ucp_ep);
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_CALLBACK));
ucp_request_put(req);
UCS_ASYNC_UNBLOCK(async);
}
static unsigned ucp_ep_cm_remote_disconnect_progress(void *arg)
{
ucp_ep_h ucp_ep = arg;
void *req;
ucs_status_t status;
ucs_trace("ep %p: flags %xu cm_remote_disconnect_progress", ucp_ep,
ucp_ep->flags);
ucs_assert(ucp_ep_get_cm_uct_ep(ucp_ep) != NULL);
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED);
if (ucs_test_all_flags(ucp_ep->flags, UCP_EP_FLAG_CLOSED |
UCP_EP_FLAG_CLOSE_REQ_VALID)) {
ucp_request_complete_send(ucp_ep_ext_gen(ucp_ep)->close_req.req, UCS_OK);
return 1;
}
if (ucp_ep->flags & UCP_EP_FLAG_CLOSED) {
/* the ep is closed by API but close req is not valid yet (checked
* above), it will be set later from scheduled
* @ref ucp_ep_close_flushed_callback */
ucs_debug("ep %p: ep closed but request is not set, waiting for the flush callback",
ucp_ep);
return 1;
}
/*
* TODO: set the ucp_ep to error state to prevent user from sending more
* ops.
*/
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FLUSH_STATE_VALID);
ucs_assert(!(ucp_ep->flags & UCP_EP_FLAG_CLOSED));
req = ucp_ep_flush_internal(ucp_ep, UCT_FLUSH_FLAG_LOCAL, NULL, 0, NULL,
ucp_ep_cm_disconnect_flushed_cb,
"cm_disconnected_cb");
if (req == NULL) {
/* flush is successfully completed in place, notify remote peer
* that we are disconnected, the EP will be destroyed from API call */
ucp_ep_cm_disconnect_cm_lane(ucp_ep);
} else if (UCS_PTR_IS_ERR(req)) {
status = UCS_PTR_STATUS(req);
ucs_error("ucp_ep_flush_internal completed with error: %s",
ucs_status_string(status));
goto err;
}
return 1;
err:
ucp_worker_set_ep_failed(ucp_ep->worker, ucp_ep,
ucp_ep_get_cm_uct_ep(ucp_ep),
ucp_ep_get_cm_lane(ucp_ep), status);
return 1;
}
static unsigned ucp_ep_cm_disconnect_progress(void *arg)
{
ucp_ep_h ucp_ep = arg;
uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep);
ucs_async_context_t *async = &ucp_ep->worker->async;
ucp_request_t *close_req;
UCS_ASYNC_BLOCK(async);
ucs_trace("ep %p: got remote disconnect, cm_ep %p", ucp_ep, uct_cm_ep);
ucs_assert(ucp_ep_get_cm_uct_ep(ucp_ep) == uct_cm_ep);
ucp_ep_invoke_err_cb(ucp_ep, UCS_ERR_CONNECTION_RESET);
ucp_ep->flags &= ~UCP_EP_FLAG_REMOTE_CONNECTED;
if (ucp_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED) {
/* if the EP is local connected, need to flush it from main thread first */
ucp_ep_cm_remote_disconnect_progress(ucp_ep);
} else {
/* if the EP is not local connected, the EP has been flushed and CM lane is
* disconnected, schedule close request completion and EP destroy */
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_CLOSE_REQ_VALID);
close_req = ucp_ep_ext_gen(ucp_ep)->close_req.req;
ucp_ep_local_disconnect_progress(close_req);
}
UCS_ASYNC_UNBLOCK(async);
return 1;
}
static void ucp_cm_disconnect_cb(uct_ep_h uct_cm_ep, void *arg)
{
ucp_ep_h ucp_ep = arg;
uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL;
ucs_debug("ep %p: CM remote disconnect callback invoked, flags 0x%x",
ucp_ep, ucp_ep->flags);
uct_worker_progress_register_safe(ucp_ep->worker->uct,
ucp_ep_cm_disconnect_progress,
ucp_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&prog_id);
ucp_worker_signal_internal(ucp_ep->worker);
}
ucs_status_t ucp_ep_client_cm_connect_start(ucp_ep_h ucp_ep,
const ucp_ep_params_t *params)
{
ucp_wireup_ep_t *wireup_ep = ucp_ep_get_cm_wireup_ep(ucp_ep);
ucp_worker_h worker = ucp_ep->worker;
uct_ep_h cm_ep;
uct_ep_params_t cm_lane_params;
ucs_status_t status;
wireup_ep->ep_init_flags = ucp_ep_init_flags(ucp_ep->worker, params);
cm_lane_params.field_mask = UCT_EP_PARAM_FIELD_CM |
UCT_EP_PARAM_FIELD_USER_DATA |
UCT_EP_PARAM_FIELD_SOCKADDR |
UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS |
UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB |
UCT_EP_PARAM_FIELD_SOCKADDR_CONNECT_CB |
UCT_EP_PARAM_FIELD_SOCKADDR_DISCONNECT_CB;
cm_lane_params.user_data = ucp_ep;
cm_lane_params.sockaddr = ¶ms->sockaddr;
cm_lane_params.sockaddr_cb_flags = UCT_CB_FLAG_ASYNC;
cm_lane_params.sockaddr_pack_cb = ucp_cm_client_priv_pack_cb;
cm_lane_params.sockaddr_connect_cb.client = ucp_cm_client_connect_cb;
cm_lane_params.disconnect_cb = ucp_cm_disconnect_cb;
ucs_assert_always(ucp_worker_num_cm_cmpts(worker) == 1);
cm_lane_params.cm = worker->cms[0].cm;
status = uct_ep_create(&cm_lane_params, &cm_ep);
if (status != UCS_OK) {
/* coverity[leaked_storage] */
return status;
}
ucp_wireup_ep_set_next_ep(&wireup_ep->super.super, cm_ep);
ucp_ep_flush_state_reset(ucp_ep);
return UCS_OK;
}
static unsigned ucp_cm_server_conn_request_progress(void *arg)
{
ucp_conn_request_h conn_request = arg;
ucp_listener_h listener = conn_request->listener;
ucp_worker_h worker = listener->worker;
ucp_ep_h ep;
ucs_status_t status;
ucs_trace_func("listener %p, connect request %p", listener, conn_request);
if (listener->conn_cb) {
listener->conn_cb(conn_request, listener->arg);
return 1;
}
UCS_ASYNC_BLOCK(&worker->async);
status = ucp_ep_create_server_accept(worker, conn_request, &ep);
if (status != UCS_OK) {
ucs_warn("server endpoint creation with connect request %p failed, status %s",
conn_request, ucs_status_string(status));
}
UCS_ASYNC_UNBLOCK(&worker->async);
ucs_free(conn_request->remote_dev_addr);
ucs_free(conn_request);
return 1;
}
void ucp_cm_server_conn_request_cb(uct_listener_h listener, void *arg,
const char *local_dev_name,
uct_conn_request_h conn_request,
const uct_cm_remote_data_t *remote_data)
{
ucp_listener_h ucp_listener = arg;
uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL;
ucp_conn_request_h ucp_conn_request;
ucs_status_t status;
status = ucp_cm_remote_data_check(remote_data);
if (status != UCS_OK) {
goto err_reject;
}
ucp_conn_request = ucs_malloc(ucs_offsetof(ucp_conn_request_t, sa_data) +
remote_data->conn_priv_data_length,
"ucp_conn_request_h");
if (ucp_conn_request == NULL) {
ucs_error("failed to allocate connect request, rejecting connection request %p on TL listener %p",
conn_request, listener);
goto err_reject;
}
ucp_conn_request->remote_dev_addr = ucs_malloc(remote_data->dev_addr_length,
"remote device address");
if (ucp_conn_request->remote_dev_addr == NULL) {
ucs_error("failed to allocate device address, rejecting connection request %p on TL listener %p",
conn_request, listener);
goto err_free_ucp_conn_request;
}
ucp_conn_request->listener = ucp_listener;
ucp_conn_request->uct.listener = listener;
ucp_conn_request->uct_req = conn_request;
ucs_strncpy_safe(ucp_conn_request->dev_name, local_dev_name,
UCT_DEVICE_NAME_MAX);
memcpy(ucp_conn_request->remote_dev_addr, remote_data->dev_addr,
remote_data->dev_addr_length);
memcpy(&ucp_conn_request->sa_data, remote_data->conn_priv_data,
remote_data->conn_priv_data_length);
uct_worker_progress_register_safe(ucp_listener->worker->uct,
ucp_cm_server_conn_request_progress,
ucp_conn_request,
UCS_CALLBACKQ_FLAG_ONESHOT, &prog_id);
/* If the worker supports the UCP_FEATURE_WAKEUP feature, signal the user so
* that he can wake-up on this event */
ucp_worker_signal_internal(ucp_listener->worker);
return;
err_free_ucp_conn_request:
ucs_free(ucp_conn_request);
err_reject:
status = uct_listener_reject(listener, conn_request);
if (status != UCS_OK) {
ucs_warn("failed to reject connect request %p on listener %p",
conn_request, listener);
}
}
ucs_status_t
ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_addr,
ucp_conn_request_h conn_request,
ucp_ep_h *ep_p)
{
uint64_t tl_bitmap = ucp_context_dev_tl_bitmap(worker->context,
conn_request->dev_name);
ucp_ep_h ep;
ucs_status_t status;
/* Create and connect TL part */
status = ucp_ep_create_to_worker_addr(worker, tl_bitmap, remote_addr,
ep_init_flags,
"conn_request on uct_listener", &ep);
if (status != UCS_OK) {
return status;
}
status = ucp_wireup_connect_local(ep, remote_addr, NULL);
if (status != UCS_OK) {
return status;
}
status = ucp_ep_cm_connect_server_lane(ep, conn_request);
if (status != UCS_OK) {
return status;
}
ucp_ep_update_dest_ep_ptr(ep, conn_request->sa_data.ep_ptr);
ucp_listener_schedule_accept_cb(ep);
*ep_p = ep;
return UCS_OK;
}
static ssize_t ucp_cm_server_priv_pack_cb(void *arg, const char *dev_name,
void *priv_data)
{
ucp_wireup_sockaddr_data_t *sa_data = priv_data;
ucp_ep_h ep = arg;
ucp_worker_h worker = ep->worker;
uint64_t tl_bitmap;
uct_cm_attr_t cm_attr;
void* ucp_addr;
size_t ucp_addr_size;
ucp_rsc_index_t rsc_index;
ucp_rsc_index_t dev_index;
ucs_status_t status;
UCS_ASYNC_BLOCK(&worker->async);
tl_bitmap = ucp_ep_get_tl_bitmap(ep);
/* make sure that all lanes are created on correct device */
ucs_assert(!(tl_bitmap & ~ucp_context_dev_tl_bitmap(worker->context,
dev_name)));
status = ucp_address_pack(worker, ep, tl_bitmap,
UCP_ADDRESS_PACK_FLAG_IFACE_ADDR |
UCP_ADDRESS_PACK_FLAG_EP_ADDR, NULL,
&ucp_addr_size, &ucp_addr);
if (status != UCS_OK) {
goto out;
}
cm_attr.field_mask = UCT_CM_ATTR_FIELD_MAX_CONN_PRIV;
ucs_assert(ucp_worker_num_cm_cmpts(worker) == 1);
status = uct_cm_query(worker->cms[0].cm, &cm_attr);
if (status != UCS_OK) {
goto out;
}
if (cm_attr.max_conn_priv < (sizeof(*sa_data) + ucp_addr_size)) {
status = UCS_ERR_BUFFER_TOO_SMALL;
goto free_addr;
}
rsc_index = ucs_ffs64_safe(tl_bitmap);
ucs_assert(rsc_index != UCP_NULL_RESOURCE);
dev_index = worker->context->tl_rscs[rsc_index].dev_index;
ucp_cm_priv_data_pack(sa_data, ep, dev_index, ucp_addr, ucp_addr_size);
free_addr:
ucs_free(ucp_addr);
out:
if (status == UCS_OK) {
ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
} else {
ucp_worker_set_ep_failed(worker, ep,
&ucp_ep_get_cm_wireup_ep(ep)->super.super,
ucp_ep_get_cm_lane(ep), status);
}
UCS_ASYNC_UNBLOCK(&worker->async);
return (status == UCS_OK) ? (sizeof(*sa_data) + ucp_addr_size) : status;
}
/*
* The main thread progress part of connection establishment on server side
*/
static unsigned ucp_cm_server_connect_progress(void *arg)
{
ucp_ep_h ucp_ep = arg;
UCS_ASYNC_BLOCK(&ucp_ep->worker->async);
ucp_wireup_remote_connected(ucp_ep);
UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async);
return 1;
}
/*
* Async callback on a server side which notifies that client is connected.
*/
static void ucp_cm_server_connect_cb(uct_ep_h ep, void *arg,
ucs_status_t status)
{
ucp_ep_h ucp_ep = arg;
uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL;
ucp_lane_index_t cm_lane;
if (status == UCS_OK) {
uct_worker_progress_register_safe(ucp_ep->worker->uct,
ucp_cm_server_connect_progress,
ucp_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&prog_id);
ucp_worker_signal_internal(ucp_ep->worker);
} else if (status == UCS_ERR_CONNECTION_RESET) {
/* remote side initiated disconnect before local side has completed
* connection establishment, so:
* 1) establish connection to complete any pending requests from wireup
* lane
* 2) handle disconnect same way as close protocol
* 3) TODO: remove (1) when the EP can be moved to err state to block
* new send operations but still able to flush transport lanes */
uct_worker_progress_register_safe(ucp_ep->worker->uct,
ucp_cm_server_connect_progress,
ucp_ep, UCS_CALLBACKQ_FLAG_ONESHOT,
&prog_id);
ucp_cm_disconnect_cb(ep, ucp_ep);
} else {
/* if reject is arrived on server side, then UCT does something wrong */
ucs_assert(status != UCS_ERR_REJECTED);
cm_lane = ucp_ep_get_cm_lane(ucp_ep);
ucp_ep->flags &= ~UCP_EP_FLAG_LOCAL_CONNECTED;
ucp_worker_set_ep_failed(ucp_ep->worker, ucp_ep,
ucp_ep->uct_eps[cm_lane], cm_lane, status);
}
}
ucs_status_t ucp_ep_cm_connect_server_lane(ucp_ep_h ep,
ucp_conn_request_h conn_request)
{
ucp_worker_h worker = ep->worker;
ucp_lane_index_t lane = ucp_ep_get_cm_lane(ep);
uct_ep_params_t uct_ep_params;
uct_ep_h uct_ep;
ucs_status_t status;
ucs_assert(lane != UCP_NULL_LANE);
ucs_assert(ep->uct_eps[lane] == NULL);
/* TODO: split CM and wireup lanes */
status = ucp_wireup_ep_create(ep, &ep->uct_eps[lane]);
if (status != UCS_OK) {
return status;
}
/* create a server side CM endpoint */
ucs_trace("ep %p: uct_ep[%d]", ep, lane);
uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_CM |
UCT_EP_PARAM_FIELD_CONN_REQUEST |
UCT_EP_PARAM_FIELD_USER_DATA |
UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS |
UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB |
UCT_EP_PARAM_FIELD_SOCKADDR_CONNECT_CB |
UCT_EP_PARAM_FIELD_SOCKADDR_DISCONNECT_CB;
ucs_assertv_always(ucp_worker_num_cm_cmpts(worker) == 1,
"multiple CMs are not supported");
uct_ep_params.cm = worker->cms[0].cm;
uct_ep_params.user_data = ep;
uct_ep_params.conn_request = conn_request->uct_req;
uct_ep_params.sockaddr_cb_flags = UCT_CB_FLAG_ASYNC;
uct_ep_params.sockaddr_pack_cb = ucp_cm_server_priv_pack_cb;
uct_ep_params.sockaddr_connect_cb.server = ucp_cm_server_connect_cb;
uct_ep_params.disconnect_cb = ucp_cm_disconnect_cb;
status = uct_ep_create(&uct_ep_params, &uct_ep);
if (status != UCS_OK) {
/* coverity[leaked_storage] */
return status;
}
ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
return UCS_OK;
}
void ucp_ep_cm_disconnect_cm_lane(ucp_ep_h ucp_ep)
{
uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep);
ucs_status_t status;
ucs_assert_always(uct_cm_ep != NULL);
/* No reason to try disconnect twice */
ucs_assert(ucp_ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED);
ucp_ep->flags &= ~UCP_EP_FLAG_LOCAL_CONNECTED;
/* this will invoke @ref ucp_cm_disconnect_cb on remote side */
status = uct_ep_disconnect(uct_cm_ep, 0);
if (status != UCS_OK) {
ucs_warn("failed to disconnect CM lane %p of ep %p, %s", ucp_ep,
uct_cm_ep, ucs_status_string(status));
}
}
ucp_request_t* ucp_ep_cm_close_request_get(ucp_ep_h ep)
{
ucp_request_t *request = ucp_request_get(ep->worker);
if (request == NULL) {
ucs_error("failed to allocate close request for ep %p", ep);
return NULL;
}
request->status = UCS_OK;
request->flags = 0;
request->send.ep = ep;
request->send.flush.uct_flags = UCT_FLUSH_FLAG_LOCAL;
return request;
}
static int ucp_cm_cbs_remove_filter(const ucs_callbackq_elem_t *elem, void *arg)
{
if ((elem->cb == ucp_ep_cm_disconnect_progress) ||
(elem->cb == ucp_ep_cm_remote_disconnect_progress) ||
(elem->cb == ucp_cm_client_connect_progress) ||
(elem->cb == ucp_cm_server_connect_progress)) {
return arg == elem->arg;
} else {
return 0;
}
}
void ucp_ep_cm_slow_cbq_cleanup(ucp_ep_h ep)
{
ucs_callbackq_remove_if(&ep->worker->uct->progress_q,
ucp_cm_cbs_remove_filter, ep);
}