/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "wireup_ep.h"
#include "wireup.h"
#include <ucp/core/ucp_request.h>
#include <ucp/core/ucp_worker.h>
#include <ucs/arch/atomic.h>
#include <ucs/datastruct/queue.h>
#include <ucs/type/class.h>
#include <ucs/sys/sock.h>
#include <ucs/sys/stubs.h>
#include <ucp/core/ucp_ep.inl>
#include <ucp/core/ucp_request.inl>
UCS_CLASS_DECLARE(ucp_wireup_ep_t, ucp_ep_h);
static UCS_CLASS_DEFINE_DELETE_FUNC(ucp_wireup_ep_t, uct_ep_t);
static inline ucs_queue_elem_t* ucp_wireup_ep_req_priv(uct_pending_req_t *req)
{
UCS_STATIC_ASSERT(sizeof(ucs_queue_elem_t) <= UCT_PENDING_REQ_PRIV_LEN);
return (ucs_queue_elem_t*)req->priv;
}
static ucs_status_t
ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED;
return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr);
}
/*
* We switch the endpoint in this function (instead in wireup code) since
* this is guaranteed to run from the main thread.
*/
static unsigned ucp_wireup_ep_progress(void *arg)
{
ucp_wireup_ep_t *wireup_ep = arg;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucs_queue_head_t tmp_pending_queue;
uct_pending_req_t *uct_req;
ucp_request_t *req;
UCS_ASYNC_BLOCK(&ucp_ep->worker->async);
ucs_assert(wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY);
ucs_assert(wireup_ep->super.uct_ep != NULL);
/* If we still have pending wireup messages, send them out first */
if (wireup_ep->pending_count != 0) {
goto out_unblock;
}
/* If an error happened on the endpoint (but perhaps the deferred error handler,
* ucp_worker_iface_err_handle_progress(), was not called yet, avoid changing
* ep state, and let the error handler take care of cleanup.
*/
if (ucp_ep->flags & UCP_EP_FLAG_FAILED) {
ucs_trace("ep %p: not switching wireup_ep %p to ready state because of error",
ucp_ep, wireup_ep);
goto out_unblock;
}
ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep);
/* Move wireup pending queue to temporary queue and remove references to
* the wireup progress function
*/
ucs_queue_head_init(&tmp_pending_queue);
ucs_queue_for_each_extract(uct_req, &wireup_ep->pending_q, priv, 1) {
ucs_queue_push(&tmp_pending_queue, ucp_wireup_ep_req_priv(uct_req));
}
/* Switch to real transport and destroy proxy endpoint (aux_ep as well) */
ucp_proxy_ep_replace(&wireup_ep->super);
wireup_ep = NULL;
UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async);
/* Replay pending requests */
ucs_queue_for_each_extract(uct_req, &tmp_pending_queue, priv, 1) {
req = ucs_container_of(uct_req, ucp_request_t, send.uct);
ucs_assert(req->send.ep == ucp_ep);
ucp_request_send(req, 0);
--ucp_ep->worker->flush_ops_count;
}
return 0;
out_unblock:
UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async);
return 0;
}
static ssize_t ucp_wireup_ep_bcopy_send_func(uct_ep_h uct_ep)
{
return UCS_ERR_NO_RESOURCE;
}
static uct_ep_h ucp_wireup_ep_get_msg_ep(ucp_wireup_ep_t *wireup_ep)
{
uct_ep_h wireup_msg_ep;
if ((wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) || (wireup_ep->aux_ep == NULL)) {
wireup_msg_ep = wireup_ep->super.uct_ep;
} else {
wireup_msg_ep = wireup_ep->aux_ep;
}
ucs_assertv(wireup_msg_ep != NULL,
"ucp_ep=%p wireup_ep=%p flags=%c%c next_ep=%p aux_ep=%p",
wireup_ep->super.ucp_ep, wireup_ep,
(wireup_ep->flags & UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED) ? 'c' : '-',
(wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) ? 'r' : '-',
wireup_ep->super.uct_ep, wireup_ep->aux_ep);
return wireup_msg_ep;
}
ucs_status_t ucp_wireup_ep_progress_pending(uct_pending_req_t *self)
{
ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct);
uct_pending_req_t *req = proxy_req->send.proxy.req;
ucp_wireup_ep_t *wireup_ep = proxy_req->send.proxy.wireup_ep;
ucs_status_t status;
status = req->func(req);
if (status == UCS_OK) {
ucs_atomic_sub32(&wireup_ep->pending_count, 1);
ucs_free(proxy_req);
}
return status;
}
static void
ucp_wireup_ep_pending_req_release(uct_pending_req_t *self, void *arg)
{
ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t,
send.uct);
ucp_wireup_ep_t *wireup_ep = proxy_req->send.proxy.wireup_ep;
ucp_request_t *req;
ucs_atomic_sub32(&wireup_ep->pending_count, 1);
if (proxy_req->send.proxy.req->func == ucp_wireup_msg_progress) {
req = ucs_container_of(proxy_req->send.proxy.req, ucp_request_t,
send.uct);
ucs_free((void*)req->send.buffer);
ucs_free(req);
}
ucs_free(proxy_req);
}
static ucs_status_t ucp_wireup_ep_pending_add(uct_ep_h uct_ep,
uct_pending_req_t *req,
unsigned flags)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
ucp_request_t *proxy_req;
uct_ep_h wireup_msg_ep;
ucs_status_t status;
UCS_ASYNC_BLOCK(&worker->async);
if (req->func == ucp_wireup_msg_progress) {
proxy_req = ucs_malloc(sizeof(*proxy_req), "ucp_wireup_proxy_req");
if (proxy_req == NULL) {
status = UCS_ERR_NO_MEMORY;
goto out;
}
wireup_msg_ep = ucp_wireup_ep_get_msg_ep(wireup_ep);
proxy_req->send.uct.func = ucp_wireup_ep_progress_pending;
proxy_req->send.proxy.req = req;
proxy_req->send.proxy.wireup_ep = wireup_ep;
proxy_req->send.state.uct_comp.func = NULL;
status = uct_ep_pending_add(wireup_msg_ep, &proxy_req->send.uct,
UCT_CB_FLAG_ASYNC);
if (status == UCS_OK) {
ucs_atomic_add32(&wireup_ep->pending_count, +1);
} else {
ucs_free(proxy_req);
}
} else {
ucs_queue_push(&wireup_ep->pending_q, ucp_wireup_ep_req_priv(req));
++ucp_ep->worker->flush_ops_count;
status = UCS_OK;
}
out:
UCS_ASYNC_UNBLOCK(&worker->async);
/* coverity[leaked_storage] */
return status;
}
static void
ucp_wireup_ep_pending_purge(uct_ep_h uct_ep, uct_pending_purge_callback_t cb,
void *arg)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucp_worker_h worker;
uct_pending_req_t *req;
ucp_request_t *ucp_req;
worker = wireup_ep->super.ucp_ep->worker;
ucs_queue_for_each_extract(req, &wireup_ep->pending_q, priv, 1) {
ucp_req = ucs_container_of(req, ucp_request_t, send.uct);
UCS_ASYNC_BLOCK(&worker->async);
--worker->flush_ops_count;
UCS_ASYNC_UNBLOCK(&worker->async);
cb(&ucp_req->send.uct, arg);
}
if (wireup_ep->pending_count > 0) {
uct_ep_pending_purge(ucp_wireup_ep_get_msg_ep(wireup_ep),
ucp_wireup_ep_pending_req_release, arg);
}
ucs_assert(wireup_ep->pending_count == 0);
}
static ssize_t ucp_wireup_ep_am_bcopy(uct_ep_h uct_ep, uint8_t id,
uct_pack_callback_t pack_cb, void *arg,
unsigned flags)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
if (id == UCP_AM_ID_WIREUP) {
return uct_ep_am_bcopy(ucp_wireup_ep_get_msg_ep(wireup_ep),
UCP_AM_ID_WIREUP, pack_cb, arg, flags);
}
return UCS_ERR_NO_RESOURCE;
}
UCS_CLASS_DEFINE_NAMED_NEW_FUNC(ucp_wireup_ep_create, ucp_wireup_ep_t, uct_ep_t,
ucp_ep_h);
ucs_status_t
ucp_wireup_ep_connect_aux(ucp_wireup_ep_t *wireup_ep, unsigned ep_init_flags,
const ucp_unpacked_address_t *remote_address)
{
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
ucp_wireup_select_info_t select_info = {0};
uct_ep_params_t uct_ep_params;
const ucp_address_entry_t *aux_addr;
ucp_worker_iface_t *wiface;
ucs_status_t status;
/* select an auxiliary transport which would be used to pass connection
* establishment messages.
*/
status = ucp_wireup_select_aux_transport(ucp_ep, ep_init_flags,
remote_address, &select_info);
if (status != UCS_OK) {
return status;
}
wireup_ep->aux_rsc_index = select_info.rsc_index;
aux_addr = &remote_address->address_list[select_info.addr_index];
wiface = ucp_worker_iface(worker, select_info.rsc_index);
/* create auxiliary endpoint connected to the remote iface. */
uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE |
UCT_EP_PARAM_FIELD_DEV_ADDR |
UCT_EP_PARAM_FIELD_IFACE_ADDR;
uct_ep_params.iface = wiface->iface;
uct_ep_params.dev_addr = aux_addr->dev_addr;
uct_ep_params.iface_addr = aux_addr->iface_addr;
status = uct_ep_create(&uct_ep_params, &wireup_ep->aux_ep);
if (status != UCS_OK) {
return status;
}
ucp_worker_iface_progress_ep(wiface);
ucs_debug("ep %p: wireup_ep %p created aux_ep %p to %s using "
UCT_TL_RESOURCE_DESC_FMT, ucp_ep, wireup_ep, wireup_ep->aux_ep,
ucp_ep_peer_name(ucp_ep),
UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[select_info.rsc_index].tl_rsc));
return UCS_OK;
}
static ucs_status_t ucp_wireup_ep_flush(uct_ep_h uct_ep, unsigned flags,
uct_completion_t *comp)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
if (flags & UCT_FLUSH_FLAG_CANCEL) {
if (wireup_ep->aux_ep) {
return uct_ep_flush(wireup_ep->aux_ep, flags, comp);
}
return UCS_OK;
}
return UCS_ERR_NO_RESOURCE;
}
UCS_CLASS_INIT_FUNC(ucp_wireup_ep_t, ucp_ep_h ucp_ep)
{
static uct_iface_ops_t ops = {
.ep_connect_to_ep = ucp_wireup_ep_connect_to_ep,
.ep_flush = ucp_wireup_ep_flush,
.ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(ucp_wireup_ep_t),
.ep_pending_add = ucp_wireup_ep_pending_add,
.ep_pending_purge = ucp_wireup_ep_pending_purge,
.ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_no_resource,
.ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucp_wireup_ep_bcopy_send_func,
.ep_put_zcopy = (uct_ep_put_zcopy_func_t)ucs_empty_function_return_no_resource,
.ep_get_short = (uct_ep_get_short_func_t)ucs_empty_function_return_no_resource,
.ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_no_resource,
.ep_get_zcopy = (uct_ep_get_zcopy_func_t)ucs_empty_function_return_no_resource,
.ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_no_resource,
.ep_am_bcopy = ucp_wireup_ep_am_bcopy,
.ep_am_zcopy = (uct_ep_am_zcopy_func_t)ucs_empty_function_return_no_resource,
.ep_tag_eager_short = (uct_ep_tag_eager_short_func_t)ucs_empty_function_return_no_resource,
.ep_tag_eager_bcopy = (uct_ep_tag_eager_bcopy_func_t)ucp_wireup_ep_bcopy_send_func,
.ep_tag_eager_zcopy = (uct_ep_tag_eager_zcopy_func_t)ucs_empty_function_return_no_resource,
.ep_tag_rndv_zcopy = (uct_ep_tag_rndv_zcopy_func_t)ucs_empty_function_return_ptr_no_resource,
.ep_tag_rndv_request = (uct_ep_tag_rndv_request_func_t)ucs_empty_function_return_no_resource,
.ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_no_resource,
.ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_no_resource,
.ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_no_resource,
.ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_no_resource,
.ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_no_resource,
.ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_no_resource
};
UCS_CLASS_CALL_SUPER_INIT(ucp_proxy_ep_t, &ops, ucp_ep, NULL, 0);
self->aux_ep = NULL;
self->sockaddr_ep = NULL;
self->aux_rsc_index = UCP_NULL_RESOURCE;
self->sockaddr_rsc_index = UCP_NULL_RESOURCE;
self->pending_count = 0;
self->flags = 0;
self->progress_id = UCS_CALLBACKQ_ID_NULL;
ucs_queue_head_init(&self->pending_q);
UCS_ASYNC_BLOCK(&ucp_ep->worker->async);
++ucp_ep->worker->flush_ops_count;
UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async);
ucs_trace("ep %p: created wireup ep %p to %s ", ucp_ep, self,
ucp_ep_peer_name(ucp_ep));
return UCS_OK;
}
static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t)
{
ucp_ep_h ucp_ep = self->super.ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
ucs_assert(ucs_queue_is_empty(&self->pending_q));
ucs_assert(self->pending_count == 0);
ucs_debug("ep %p: destroy wireup ep %p", ucp_ep, self);
uct_worker_progress_unregister_safe(worker->uct, &self->progress_id);
if (self->aux_ep != NULL) {
ucp_worker_iface_unprogress_ep(ucp_worker_iface(worker,
self->aux_rsc_index));
uct_ep_destroy(self->aux_ep);
}
if (self->sockaddr_ep != NULL) {
uct_ep_destroy(self->sockaddr_ep);
}
UCS_ASYNC_BLOCK(&worker->async);
--worker->flush_ops_count;
UCS_ASYNC_UNBLOCK(&worker->async);
}
UCS_CLASS_DEFINE(ucp_wireup_ep_t, ucp_proxy_ep_t);
ucp_rsc_index_t ucp_wireup_ep_get_aux_rsc_index(uct_ep_h uct_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
if (!ucp_wireup_ep_test(uct_ep)) {
return UCP_NULL_RESOURCE;
}
if (wireup_ep->aux_ep == NULL) {
return UCP_NULL_RESOURCE;
}
return wireup_ep->aux_rsc_index;
}
ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ucp_ep_init_flags,
ucp_rsc_index_t rsc_index, int connect_aux,
const ucp_unpacked_address_t *remote_address)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
uct_ep_params_t uct_ep_params;
ucs_status_t status;
uct_ep_h next_ep;
ucs_assert(wireup_ep != NULL);
uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
uct_ep_params.iface = ucp_worker_iface(worker, rsc_index)->iface;
status = uct_ep_create(&uct_ep_params, &next_ep);
if (status != UCS_OK) {
/* make Coverity happy */
ucs_assert(next_ep == NULL);
goto err;
}
ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1);
ucs_debug("ep %p: created next_ep %p to %s using " UCT_TL_RESOURCE_DESC_FMT,
ucp_ep, wireup_ep->super.uct_ep, ucp_ep_peer_name(ucp_ep),
UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc));
/* we need to create an auxiliary transport only for active messages */
if (connect_aux) {
status = ucp_wireup_ep_connect_aux(wireup_ep, ucp_ep_init_flags,
remote_address);
if (status != UCS_OK) {
goto err_destroy_next_ep;
}
}
return UCS_OK;
err_destroy_next_ep:
uct_ep_destroy(wireup_ep->super.uct_ep);
wireup_ep->super.uct_ep = NULL;
err:
return status;
}
static ucs_status_t ucp_wireup_ep_pack_sockaddr_aux_tls(ucp_worker_h worker,
const char *dev_name,
uint64_t *tl_bitmap_p,
ucp_address_t **address_p,
size_t *address_length_p)
{
ucp_context_h context = worker->context;
int tl_id, found_supported_tl = 0;
ucs_status_t status;
uint64_t tl_bitmap = 0;
/* Find a transport which matches the given dev_name and the user's configuration.
* It also has to be a UCT_IFACE_FLAG_CONNECT_TO_IFACE transport and support
* active messaging for sending a wireup message */
ucs_for_each_bit(tl_id, context->config.sockaddr_aux_rscs_bitmap) {
if ((!strncmp(context->tl_rscs[tl_id].tl_rsc.dev_name, dev_name,
UCT_DEVICE_NAME_MAX)) &&
(ucs_test_all_flags(ucp_worker_iface_get_attr(worker, tl_id)->cap.flags,
UCT_IFACE_FLAG_CONNECT_TO_IFACE |
UCT_IFACE_FLAG_AM_BCOPY))) {
found_supported_tl = 1;
tl_bitmap |= UCS_BIT(tl_id);
}
}
if (found_supported_tl) {
status = ucp_address_pack(worker, NULL, tl_bitmap,
UCP_ADDRESS_PACK_FLAG_ALL, NULL,
address_length_p, (void**)address_p);
} else {
ucs_error("no supported sockaddr auxiliary transports found for %s", dev_name);
status = UCS_ERR_UNREACHABLE;
}
*tl_bitmap_p = tl_bitmap;
return status;
}
ssize_t ucp_wireup_ep_sockaddr_fill_private_data(void *arg, const char *dev_name,
void *priv_data)
{
ucp_wireup_sockaddr_data_t *sa_data = priv_data;
ucp_wireup_ep_t *wireup_ep = arg;
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_rsc_index_t sockaddr_rsc = wireup_ep->sockaddr_rsc_index;
ucp_worker_h worker = ucp_ep->worker;
ucp_context_h context = worker->context;
size_t address_length, conn_priv_len;
ucp_address_t *worker_address, *rsc_address;
uct_iface_attr_t *attrs;
ucs_status_t status;
uint64_t tl_bitmap;
char aux_tls_str[64];
status = ucp_address_pack(worker, NULL, UINT64_MAX,
UCP_ADDRESS_PACK_FLAG_ALL, NULL,
&address_length, (void**)&worker_address);
if (status != UCS_OK) {
goto err;
}
conn_priv_len = sizeof(*sa_data) + address_length;
/* pack client data */
ucs_assert((int)ucp_ep_config(ucp_ep)->key.err_mode <= UINT8_MAX);
sa_data->err_mode = ucp_ep_config(ucp_ep)->key.err_mode;
sa_data->ep_ptr = (uintptr_t)ucp_ep;
sa_data->dev_index = UCP_NULL_RESOURCE; /* Not used */
attrs = ucp_worker_iface_get_attr(worker, sockaddr_rsc);
/* check private data length limitation */
if (conn_priv_len > attrs->max_conn_priv) {
/* since the full worker address is too large to fit into the trasnport's
* private data, try to pack sockaddr aux tls to pass in the address */
status = ucp_wireup_ep_pack_sockaddr_aux_tls(worker, dev_name,
&tl_bitmap, &rsc_address,
&address_length);
if (status != UCS_OK) {
goto err_free_address;
}
conn_priv_len = sizeof(*sa_data) + address_length;
/* check the private data length limitation again, now with partial
* resources packed (and not the entire worker address) */
if (conn_priv_len > attrs->max_conn_priv) {
ucs_error("sockaddr aux resources addresses (%s transports)"
" information (%zu) exceeds max_priv on "
UCT_TL_RESOURCE_DESC_FMT" (%zu)",
ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str,
sizeof(aux_tls_str)),
conn_priv_len,
UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[sockaddr_rsc].tl_rsc),
attrs->max_conn_priv);
status = UCS_ERR_UNREACHABLE;
ucs_free(rsc_address);
goto err_free_address;
}
sa_data->addr_mode = UCP_WIREUP_SA_DATA_PARTIAL_ADDR;
memcpy(sa_data + 1, rsc_address, address_length);
ucp_ep->flags |= UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR;
ucs_free(rsc_address);
ucs_trace("sockaddr tl ("UCT_TL_RESOURCE_DESC_FMT") sending partial address: "
"(%s transports) (len=%zu) to server. "
"total client priv data len: %zu",
context->tl_rscs[sockaddr_rsc].tl_rsc.tl_name, dev_name,
ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str,
sizeof(aux_tls_str)),
address_length, conn_priv_len);
} else {
sa_data->addr_mode = UCP_WIREUP_SA_DATA_FULL_ADDR;
memcpy(sa_data + 1, worker_address, address_length);
}
ucp_worker_release_address(worker, worker_address);
return conn_priv_len;
err_free_address:
ucp_worker_release_address(worker, worker_address);
err:
return status;
}
ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep,
const ucp_ep_params_t *params)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep;
ucp_worker_h worker = ucp_ep->worker;
char saddr_str[UCS_SOCKADDR_STRING_LEN];
uct_ep_params_t uct_ep_params;
ucp_rsc_index_t sockaddr_rsc;
ucp_worker_iface_t *wiface;
ucs_status_t status;
ucs_assert(ucp_wireup_ep_test(uct_ep));
status = ucp_wireup_select_sockaddr_transport(worker->context,
¶ms->sockaddr,
&sockaddr_rsc);
if (status != UCS_OK) {
goto out;
}
wiface = ucp_worker_iface(worker, sockaddr_rsc);
wireup_ep->sockaddr_rsc_index = sockaddr_rsc;
/* Fill parameters and send connection request using the transport */
uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE |
UCT_EP_PARAM_FIELD_USER_DATA |
UCT_EP_PARAM_FIELD_SOCKADDR |
UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS |
UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB;
uct_ep_params.iface = wiface->iface;
uct_ep_params.sockaddr = ¶ms->sockaddr;
uct_ep_params.user_data = wireup_ep;
uct_ep_params.sockaddr_cb_flags = UCT_CB_FLAG_ASYNC;
uct_ep_params.sockaddr_pack_cb = ucp_wireup_ep_sockaddr_fill_private_data;
status = uct_ep_create(&uct_ep_params, &wireup_ep->sockaddr_ep);
if (status != UCS_OK) {
goto out;
}
ucs_debug("ep %p connecting to %s", ucp_ep,
ucs_sockaddr_str(params->sockaddr.addr, saddr_str, sizeof(saddr_str)));
status = UCS_OK;
out:
return status;
}
void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucs_assert(wireup_ep != NULL);
ucs_assert(wireup_ep->super.uct_ep == NULL);
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED;
ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1);
}
uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
uct_ep_h next_ep;
ucs_assert_always(wireup_ep != NULL);
next_ep = wireup_ep->super.uct_ep;
wireup_ep->super.uct_ep = NULL;
return next_ep;
}
void ucp_wireup_ep_remote_connected(uct_ep_h uct_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucp_ep_h ucp_ep;
ucs_assert(wireup_ep != NULL);
ucs_assert(wireup_ep->super.uct_ep != NULL);
ucs_assert(wireup_ep->flags & UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED);
ucp_ep = wireup_ep->super.ucp_ep;
ucs_trace("ep %p: wireup ep %p is remote-connected", ucp_ep, wireup_ep);
wireup_ep->flags |= UCP_WIREUP_EP_FLAG_READY;
uct_worker_progress_register_safe(ucp_ep->worker->uct,
ucp_wireup_ep_progress, wireup_ep, 0,
&wireup_ep->progress_id);
ucp_worker_signal_internal(ucp_ep->worker);
}
int ucp_wireup_ep_test(uct_ep_h uct_ep)
{
return uct_ep->iface->ops.ep_destroy ==
UCS_CLASS_DELETE_FUNC_NAME(ucp_wireup_ep_t);
}
int ucp_wireup_ep_is_owner(uct_ep_h uct_ep, uct_ep_h owned_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
if (wireup_ep == NULL) {
return 0;
}
return (wireup_ep->aux_ep == owned_ep) ||
(wireup_ep->sockaddr_ep == owned_ep) ||
(wireup_ep->super.uct_ep == owned_ep);
}
void ucp_wireup_ep_disown(uct_ep_h uct_ep, uct_ep_h owned_ep)
{
ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep);
ucs_assert_always(wireup_ep != NULL);
if (wireup_ep->aux_ep == owned_ep) {
wireup_ep->aux_ep = NULL;
} else if (wireup_ep->sockaddr_ep == owned_ep) {
wireup_ep->sockaddr_ep = NULL;
} else if (wireup_ep->super.uct_ep == owned_ep) {
ucp_proxy_ep_extract(uct_ep);
}
}
ucp_wireup_ep_t *ucp_wireup_ep(uct_ep_h uct_ep)
{
return ucp_wireup_ep_test(uct_ep) ?
ucs_derived_of(uct_ep, ucp_wireup_ep_t) : NULL;
}