/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "wireup_ep.h" #include "wireup.h" #include #include #include #include #include #include #include #include #include UCS_CLASS_DECLARE(ucp_wireup_ep_t, ucp_ep_h); static UCS_CLASS_DEFINE_DELETE_FUNC(ucp_wireup_ep_t, uct_ep_t); static inline ucs_queue_elem_t* ucp_wireup_ep_req_priv(uct_pending_req_t *req) { UCS_STATIC_ASSERT(sizeof(ucs_queue_elem_t) <= UCT_PENDING_REQ_PRIV_LEN); return (ucs_queue_elem_t*)req->priv; } static ucs_status_t ucp_wireup_ep_connect_to_ep(uct_ep_h uct_ep, const uct_device_addr_t *dev_addr, const uct_ep_addr_t *ep_addr) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED; return uct_ep_connect_to_ep(wireup_ep->super.uct_ep, dev_addr, ep_addr); } /* * We switch the endpoint in this function (instead in wireup code) since * this is guaranteed to run from the main thread. */ static unsigned ucp_wireup_ep_progress(void *arg) { ucp_wireup_ep_t *wireup_ep = arg; ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucs_queue_head_t tmp_pending_queue; uct_pending_req_t *uct_req; ucp_request_t *req; UCS_ASYNC_BLOCK(&ucp_ep->worker->async); ucs_assert(wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY); ucs_assert(wireup_ep->super.uct_ep != NULL); /* If we still have pending wireup messages, send them out first */ if (wireup_ep->pending_count != 0) { goto out_unblock; } /* If an error happened on the endpoint (but perhaps the deferred error handler, * ucp_worker_iface_err_handle_progress(), was not called yet, avoid changing * ep state, and let the error handler take care of cleanup. */ if (ucp_ep->flags & UCP_EP_FLAG_FAILED) { ucs_trace("ep %p: not switching wireup_ep %p to ready state because of error", ucp_ep, wireup_ep); goto out_unblock; } ucs_trace("ep %p: switching wireup_ep %p to ready state", ucp_ep, wireup_ep); /* Move wireup pending queue to temporary queue and remove references to * the wireup progress function */ ucs_queue_head_init(&tmp_pending_queue); ucs_queue_for_each_extract(uct_req, &wireup_ep->pending_q, priv, 1) { ucs_queue_push(&tmp_pending_queue, ucp_wireup_ep_req_priv(uct_req)); } /* Switch to real transport and destroy proxy endpoint (aux_ep as well) */ ucp_proxy_ep_replace(&wireup_ep->super); wireup_ep = NULL; UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async); /* Replay pending requests */ ucs_queue_for_each_extract(uct_req, &tmp_pending_queue, priv, 1) { req = ucs_container_of(uct_req, ucp_request_t, send.uct); ucs_assert(req->send.ep == ucp_ep); ucp_request_send(req, 0); --ucp_ep->worker->flush_ops_count; } return 0; out_unblock: UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async); return 0; } static ssize_t ucp_wireup_ep_bcopy_send_func(uct_ep_h uct_ep) { return UCS_ERR_NO_RESOURCE; } static uct_ep_h ucp_wireup_ep_get_msg_ep(ucp_wireup_ep_t *wireup_ep) { uct_ep_h wireup_msg_ep; if ((wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) || (wireup_ep->aux_ep == NULL)) { wireup_msg_ep = wireup_ep->super.uct_ep; } else { wireup_msg_ep = wireup_ep->aux_ep; } ucs_assertv(wireup_msg_ep != NULL, "ucp_ep=%p wireup_ep=%p flags=%c%c next_ep=%p aux_ep=%p", wireup_ep->super.ucp_ep, wireup_ep, (wireup_ep->flags & UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED) ? 'c' : '-', (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) ? 'r' : '-', wireup_ep->super.uct_ep, wireup_ep->aux_ep); return wireup_msg_ep; } ucs_status_t ucp_wireup_ep_progress_pending(uct_pending_req_t *self) { ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); uct_pending_req_t *req = proxy_req->send.proxy.req; ucp_wireup_ep_t *wireup_ep = proxy_req->send.proxy.wireup_ep; ucs_status_t status; status = req->func(req); if (status == UCS_OK) { ucs_atomic_sub32(&wireup_ep->pending_count, 1); ucs_free(proxy_req); } return status; } static void ucp_wireup_ep_pending_req_release(uct_pending_req_t *self, void *arg) { ucp_request_t *proxy_req = ucs_container_of(self, ucp_request_t, send.uct); ucp_wireup_ep_t *wireup_ep = proxy_req->send.proxy.wireup_ep; ucp_request_t *req; ucs_atomic_sub32(&wireup_ep->pending_count, 1); if (proxy_req->send.proxy.req->func == ucp_wireup_msg_progress) { req = ucs_container_of(proxy_req->send.proxy.req, ucp_request_t, send.uct); ucs_free((void*)req->send.buffer); ucs_free(req); } ucs_free(proxy_req); } static ucs_status_t ucp_wireup_ep_pending_add(uct_ep_h uct_ep, uct_pending_req_t *req, unsigned flags) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; ucp_request_t *proxy_req; uct_ep_h wireup_msg_ep; ucs_status_t status; UCS_ASYNC_BLOCK(&worker->async); if (req->func == ucp_wireup_msg_progress) { proxy_req = ucs_malloc(sizeof(*proxy_req), "ucp_wireup_proxy_req"); if (proxy_req == NULL) { status = UCS_ERR_NO_MEMORY; goto out; } wireup_msg_ep = ucp_wireup_ep_get_msg_ep(wireup_ep); proxy_req->send.uct.func = ucp_wireup_ep_progress_pending; proxy_req->send.proxy.req = req; proxy_req->send.proxy.wireup_ep = wireup_ep; proxy_req->send.state.uct_comp.func = NULL; status = uct_ep_pending_add(wireup_msg_ep, &proxy_req->send.uct, UCT_CB_FLAG_ASYNC); if (status == UCS_OK) { ucs_atomic_add32(&wireup_ep->pending_count, +1); } else { ucs_free(proxy_req); } } else { ucs_queue_push(&wireup_ep->pending_q, ucp_wireup_ep_req_priv(req)); ++ucp_ep->worker->flush_ops_count; status = UCS_OK; } out: UCS_ASYNC_UNBLOCK(&worker->async); /* coverity[leaked_storage] */ return status; } static void ucp_wireup_ep_pending_purge(uct_ep_h uct_ep, uct_pending_purge_callback_t cb, void *arg) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucp_worker_h worker; uct_pending_req_t *req; ucp_request_t *ucp_req; worker = wireup_ep->super.ucp_ep->worker; ucs_queue_for_each_extract(req, &wireup_ep->pending_q, priv, 1) { ucp_req = ucs_container_of(req, ucp_request_t, send.uct); UCS_ASYNC_BLOCK(&worker->async); --worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&worker->async); cb(&ucp_req->send.uct, arg); } if (wireup_ep->pending_count > 0) { uct_ep_pending_purge(ucp_wireup_ep_get_msg_ep(wireup_ep), ucp_wireup_ep_pending_req_release, arg); } ucs_assert(wireup_ep->pending_count == 0); } static ssize_t ucp_wireup_ep_am_bcopy(uct_ep_h uct_ep, uint8_t id, uct_pack_callback_t pack_cb, void *arg, unsigned flags) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); if (id == UCP_AM_ID_WIREUP) { return uct_ep_am_bcopy(ucp_wireup_ep_get_msg_ep(wireup_ep), UCP_AM_ID_WIREUP, pack_cb, arg, flags); } return UCS_ERR_NO_RESOURCE; } UCS_CLASS_DEFINE_NAMED_NEW_FUNC(ucp_wireup_ep_create, ucp_wireup_ep_t, uct_ep_t, ucp_ep_h); ucs_status_t ucp_wireup_ep_connect_aux(ucp_wireup_ep_t *wireup_ep, unsigned ep_init_flags, const ucp_unpacked_address_t *remote_address) { ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; ucp_wireup_select_info_t select_info = {0}; uct_ep_params_t uct_ep_params; const ucp_address_entry_t *aux_addr; ucp_worker_iface_t *wiface; ucs_status_t status; /* select an auxiliary transport which would be used to pass connection * establishment messages. */ status = ucp_wireup_select_aux_transport(ucp_ep, ep_init_flags, remote_address, &select_info); if (status != UCS_OK) { return status; } wireup_ep->aux_rsc_index = select_info.rsc_index; aux_addr = &remote_address->address_list[select_info.addr_index]; wiface = ucp_worker_iface(worker, select_info.rsc_index); /* create auxiliary endpoint connected to the remote iface. */ uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE | UCT_EP_PARAM_FIELD_DEV_ADDR | UCT_EP_PARAM_FIELD_IFACE_ADDR; uct_ep_params.iface = wiface->iface; uct_ep_params.dev_addr = aux_addr->dev_addr; uct_ep_params.iface_addr = aux_addr->iface_addr; status = uct_ep_create(&uct_ep_params, &wireup_ep->aux_ep); if (status != UCS_OK) { return status; } ucp_worker_iface_progress_ep(wiface); ucs_debug("ep %p: wireup_ep %p created aux_ep %p to %s using " UCT_TL_RESOURCE_DESC_FMT, ucp_ep, wireup_ep, wireup_ep->aux_ep, ucp_ep_peer_name(ucp_ep), UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[select_info.rsc_index].tl_rsc)); return UCS_OK; } static ucs_status_t ucp_wireup_ep_flush(uct_ep_h uct_ep, unsigned flags, uct_completion_t *comp) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); if (flags & UCT_FLUSH_FLAG_CANCEL) { if (wireup_ep->aux_ep) { return uct_ep_flush(wireup_ep->aux_ep, flags, comp); } return UCS_OK; } return UCS_ERR_NO_RESOURCE; } UCS_CLASS_INIT_FUNC(ucp_wireup_ep_t, ucp_ep_h ucp_ep) { static uct_iface_ops_t ops = { .ep_connect_to_ep = ucp_wireup_ep_connect_to_ep, .ep_flush = ucp_wireup_ep_flush, .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(ucp_wireup_ep_t), .ep_pending_add = ucp_wireup_ep_pending_add, .ep_pending_purge = ucp_wireup_ep_pending_purge, .ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_no_resource, .ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucp_wireup_ep_bcopy_send_func, .ep_put_zcopy = (uct_ep_put_zcopy_func_t)ucs_empty_function_return_no_resource, .ep_get_short = (uct_ep_get_short_func_t)ucs_empty_function_return_no_resource, .ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_no_resource, .ep_get_zcopy = (uct_ep_get_zcopy_func_t)ucs_empty_function_return_no_resource, .ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_no_resource, .ep_am_bcopy = ucp_wireup_ep_am_bcopy, .ep_am_zcopy = (uct_ep_am_zcopy_func_t)ucs_empty_function_return_no_resource, .ep_tag_eager_short = (uct_ep_tag_eager_short_func_t)ucs_empty_function_return_no_resource, .ep_tag_eager_bcopy = (uct_ep_tag_eager_bcopy_func_t)ucp_wireup_ep_bcopy_send_func, .ep_tag_eager_zcopy = (uct_ep_tag_eager_zcopy_func_t)ucs_empty_function_return_no_resource, .ep_tag_rndv_zcopy = (uct_ep_tag_rndv_zcopy_func_t)ucs_empty_function_return_ptr_no_resource, .ep_tag_rndv_request = (uct_ep_tag_rndv_request_func_t)ucs_empty_function_return_no_resource, .ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_no_resource, .ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_no_resource, .ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_no_resource, .ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_no_resource, .ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_no_resource, .ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_no_resource }; UCS_CLASS_CALL_SUPER_INIT(ucp_proxy_ep_t, &ops, ucp_ep, NULL, 0); self->aux_ep = NULL; self->sockaddr_ep = NULL; self->aux_rsc_index = UCP_NULL_RESOURCE; self->sockaddr_rsc_index = UCP_NULL_RESOURCE; self->pending_count = 0; self->flags = 0; self->progress_id = UCS_CALLBACKQ_ID_NULL; ucs_queue_head_init(&self->pending_q); UCS_ASYNC_BLOCK(&ucp_ep->worker->async); ++ucp_ep->worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&ucp_ep->worker->async); ucs_trace("ep %p: created wireup ep %p to %s ", ucp_ep, self, ucp_ep_peer_name(ucp_ep)); return UCS_OK; } static UCS_CLASS_CLEANUP_FUNC(ucp_wireup_ep_t) { ucp_ep_h ucp_ep = self->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; ucs_assert(ucs_queue_is_empty(&self->pending_q)); ucs_assert(self->pending_count == 0); ucs_debug("ep %p: destroy wireup ep %p", ucp_ep, self); uct_worker_progress_unregister_safe(worker->uct, &self->progress_id); if (self->aux_ep != NULL) { ucp_worker_iface_unprogress_ep(ucp_worker_iface(worker, self->aux_rsc_index)); uct_ep_destroy(self->aux_ep); } if (self->sockaddr_ep != NULL) { uct_ep_destroy(self->sockaddr_ep); } UCS_ASYNC_BLOCK(&worker->async); --worker->flush_ops_count; UCS_ASYNC_UNBLOCK(&worker->async); } UCS_CLASS_DEFINE(ucp_wireup_ep_t, ucp_proxy_ep_t); ucp_rsc_index_t ucp_wireup_ep_get_aux_rsc_index(uct_ep_h uct_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); if (!ucp_wireup_ep_test(uct_ep)) { return UCP_NULL_RESOURCE; } if (wireup_ep->aux_ep == NULL) { return UCP_NULL_RESOURCE; } return wireup_ep->aux_rsc_index; } ucs_status_t ucp_wireup_ep_connect(uct_ep_h uct_ep, unsigned ucp_ep_init_flags, ucp_rsc_index_t rsc_index, int connect_aux, const ucp_unpacked_address_t *remote_address) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; uct_ep_params_t uct_ep_params; ucs_status_t status; uct_ep_h next_ep; ucs_assert(wireup_ep != NULL); uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE; uct_ep_params.iface = ucp_worker_iface(worker, rsc_index)->iface; status = uct_ep_create(&uct_ep_params, &next_ep); if (status != UCS_OK) { /* make Coverity happy */ ucs_assert(next_ep == NULL); goto err; } ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1); ucs_debug("ep %p: created next_ep %p to %s using " UCT_TL_RESOURCE_DESC_FMT, ucp_ep, wireup_ep->super.uct_ep, ucp_ep_peer_name(ucp_ep), UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc)); /* we need to create an auxiliary transport only for active messages */ if (connect_aux) { status = ucp_wireup_ep_connect_aux(wireup_ep, ucp_ep_init_flags, remote_address); if (status != UCS_OK) { goto err_destroy_next_ep; } } return UCS_OK; err_destroy_next_ep: uct_ep_destroy(wireup_ep->super.uct_ep); wireup_ep->super.uct_ep = NULL; err: return status; } static ucs_status_t ucp_wireup_ep_pack_sockaddr_aux_tls(ucp_worker_h worker, const char *dev_name, uint64_t *tl_bitmap_p, ucp_address_t **address_p, size_t *address_length_p) { ucp_context_h context = worker->context; int tl_id, found_supported_tl = 0; ucs_status_t status; uint64_t tl_bitmap = 0; /* Find a transport which matches the given dev_name and the user's configuration. * It also has to be a UCT_IFACE_FLAG_CONNECT_TO_IFACE transport and support * active messaging for sending a wireup message */ ucs_for_each_bit(tl_id, context->config.sockaddr_aux_rscs_bitmap) { if ((!strncmp(context->tl_rscs[tl_id].tl_rsc.dev_name, dev_name, UCT_DEVICE_NAME_MAX)) && (ucs_test_all_flags(ucp_worker_iface_get_attr(worker, tl_id)->cap.flags, UCT_IFACE_FLAG_CONNECT_TO_IFACE | UCT_IFACE_FLAG_AM_BCOPY))) { found_supported_tl = 1; tl_bitmap |= UCS_BIT(tl_id); } } if (found_supported_tl) { status = ucp_address_pack(worker, NULL, tl_bitmap, UCP_ADDRESS_PACK_FLAG_ALL, NULL, address_length_p, (void**)address_p); } else { ucs_error("no supported sockaddr auxiliary transports found for %s", dev_name); status = UCS_ERR_UNREACHABLE; } *tl_bitmap_p = tl_bitmap; return status; } ssize_t ucp_wireup_ep_sockaddr_fill_private_data(void *arg, const char *dev_name, void *priv_data) { ucp_wireup_sockaddr_data_t *sa_data = priv_data; ucp_wireup_ep_t *wireup_ep = arg; ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucp_rsc_index_t sockaddr_rsc = wireup_ep->sockaddr_rsc_index; ucp_worker_h worker = ucp_ep->worker; ucp_context_h context = worker->context; size_t address_length, conn_priv_len; ucp_address_t *worker_address, *rsc_address; uct_iface_attr_t *attrs; ucs_status_t status; uint64_t tl_bitmap; char aux_tls_str[64]; status = ucp_address_pack(worker, NULL, UINT64_MAX, UCP_ADDRESS_PACK_FLAG_ALL, NULL, &address_length, (void**)&worker_address); if (status != UCS_OK) { goto err; } conn_priv_len = sizeof(*sa_data) + address_length; /* pack client data */ ucs_assert((int)ucp_ep_config(ucp_ep)->key.err_mode <= UINT8_MAX); sa_data->err_mode = ucp_ep_config(ucp_ep)->key.err_mode; sa_data->ep_ptr = (uintptr_t)ucp_ep; sa_data->dev_index = UCP_NULL_RESOURCE; /* Not used */ attrs = ucp_worker_iface_get_attr(worker, sockaddr_rsc); /* check private data length limitation */ if (conn_priv_len > attrs->max_conn_priv) { /* since the full worker address is too large to fit into the trasnport's * private data, try to pack sockaddr aux tls to pass in the address */ status = ucp_wireup_ep_pack_sockaddr_aux_tls(worker, dev_name, &tl_bitmap, &rsc_address, &address_length); if (status != UCS_OK) { goto err_free_address; } conn_priv_len = sizeof(*sa_data) + address_length; /* check the private data length limitation again, now with partial * resources packed (and not the entire worker address) */ if (conn_priv_len > attrs->max_conn_priv) { ucs_error("sockaddr aux resources addresses (%s transports)" " information (%zu) exceeds max_priv on " UCT_TL_RESOURCE_DESC_FMT" (%zu)", ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str, sizeof(aux_tls_str)), conn_priv_len, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[sockaddr_rsc].tl_rsc), attrs->max_conn_priv); status = UCS_ERR_UNREACHABLE; ucs_free(rsc_address); goto err_free_address; } sa_data->addr_mode = UCP_WIREUP_SA_DATA_PARTIAL_ADDR; memcpy(sa_data + 1, rsc_address, address_length); ucp_ep->flags |= UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR; ucs_free(rsc_address); ucs_trace("sockaddr tl ("UCT_TL_RESOURCE_DESC_FMT") sending partial address: " "(%s transports) (len=%zu) to server. " "total client priv data len: %zu", context->tl_rscs[sockaddr_rsc].tl_rsc.tl_name, dev_name, ucp_tl_bitmap_str(context, tl_bitmap, aux_tls_str, sizeof(aux_tls_str)), address_length, conn_priv_len); } else { sa_data->addr_mode = UCP_WIREUP_SA_DATA_FULL_ADDR; memcpy(sa_data + 1, worker_address, address_length); } ucp_worker_release_address(worker, worker_address); return conn_priv_len; err_free_address: ucp_worker_release_address(worker, worker_address); err: return status; } ucs_status_t ucp_wireup_ep_connect_to_sockaddr(uct_ep_h uct_ep, const ucp_ep_params_t *params) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucp_ep_h ucp_ep = wireup_ep->super.ucp_ep; ucp_worker_h worker = ucp_ep->worker; char saddr_str[UCS_SOCKADDR_STRING_LEN]; uct_ep_params_t uct_ep_params; ucp_rsc_index_t sockaddr_rsc; ucp_worker_iface_t *wiface; ucs_status_t status; ucs_assert(ucp_wireup_ep_test(uct_ep)); status = ucp_wireup_select_sockaddr_transport(worker->context, ¶ms->sockaddr, &sockaddr_rsc); if (status != UCS_OK) { goto out; } wiface = ucp_worker_iface(worker, sockaddr_rsc); wireup_ep->sockaddr_rsc_index = sockaddr_rsc; /* Fill parameters and send connection request using the transport */ uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE | UCT_EP_PARAM_FIELD_USER_DATA | UCT_EP_PARAM_FIELD_SOCKADDR | UCT_EP_PARAM_FIELD_SOCKADDR_CB_FLAGS | UCT_EP_PARAM_FIELD_SOCKADDR_PACK_CB; uct_ep_params.iface = wiface->iface; uct_ep_params.sockaddr = ¶ms->sockaddr; uct_ep_params.user_data = wireup_ep; uct_ep_params.sockaddr_cb_flags = UCT_CB_FLAG_ASYNC; uct_ep_params.sockaddr_pack_cb = ucp_wireup_ep_sockaddr_fill_private_data; status = uct_ep_create(&uct_ep_params, &wireup_ep->sockaddr_ep); if (status != UCS_OK) { goto out; } ucs_debug("ep %p connecting to %s", ucp_ep, ucs_sockaddr_str(params->sockaddr.addr, saddr_str, sizeof(saddr_str))); status = UCS_OK; out: return status; } void ucp_wireup_ep_set_next_ep(uct_ep_h uct_ep, uct_ep_h next_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucs_assert(wireup_ep != NULL); ucs_assert(wireup_ep->super.uct_ep == NULL); wireup_ep->flags |= UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED; ucp_proxy_ep_set_uct_ep(&wireup_ep->super, next_ep, 1); } uct_ep_h ucp_wireup_ep_extract_next_ep(uct_ep_h uct_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); uct_ep_h next_ep; ucs_assert_always(wireup_ep != NULL); next_ep = wireup_ep->super.uct_ep; wireup_ep->super.uct_ep = NULL; return next_ep; } void ucp_wireup_ep_remote_connected(uct_ep_h uct_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucp_ep_h ucp_ep; ucs_assert(wireup_ep != NULL); ucs_assert(wireup_ep->super.uct_ep != NULL); ucs_assert(wireup_ep->flags & UCP_WIREUP_EP_FLAG_LOCAL_CONNECTED); ucp_ep = wireup_ep->super.ucp_ep; ucs_trace("ep %p: wireup ep %p is remote-connected", ucp_ep, wireup_ep); wireup_ep->flags |= UCP_WIREUP_EP_FLAG_READY; uct_worker_progress_register_safe(ucp_ep->worker->uct, ucp_wireup_ep_progress, wireup_ep, 0, &wireup_ep->progress_id); ucp_worker_signal_internal(ucp_ep->worker); } int ucp_wireup_ep_test(uct_ep_h uct_ep) { return uct_ep->iface->ops.ep_destroy == UCS_CLASS_DELETE_FUNC_NAME(ucp_wireup_ep_t); } int ucp_wireup_ep_is_owner(uct_ep_h uct_ep, uct_ep_h owned_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); if (wireup_ep == NULL) { return 0; } return (wireup_ep->aux_ep == owned_ep) || (wireup_ep->sockaddr_ep == owned_ep) || (wireup_ep->super.uct_ep == owned_ep); } void ucp_wireup_ep_disown(uct_ep_h uct_ep, uct_ep_h owned_ep) { ucp_wireup_ep_t *wireup_ep = ucp_wireup_ep(uct_ep); ucs_assert_always(wireup_ep != NULL); if (wireup_ep->aux_ep == owned_ep) { wireup_ep->aux_ep = NULL; } else if (wireup_ep->sockaddr_ep == owned_ep) { wireup_ep->sockaddr_ep = NULL; } else if (wireup_ep->super.uct_ep == owned_ep) { ucp_proxy_ep_extract(uct_ep); } } ucp_wireup_ep_t *ucp_wireup_ep(uct_ep_h uct_ep) { return ucp_wireup_ep_test(uct_ep) ? ucs_derived_of(uct_ep, ucp_wireup_ep_t) : NULL; }