/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "wireup.h" #include "address.h" #include "wireup_cm.h" #include "wireup_ep.h" #include #include #include #include #include #include #include #include /* * Description of the protocol in UCX wiki: * https://github.com/openucx/ucx/wiki/Connection-establishment */ static size_t ucp_wireup_msg_pack(void *dest, void *arg) { ucp_request_t *req = arg; *(ucp_wireup_msg_t*)dest = req->send.wireup; memcpy((ucp_wireup_msg_t*)dest + 1, req->send.buffer, req->send.length); return sizeof(ucp_wireup_msg_t) + req->send.length; } static const char* ucp_wireup_msg_str(uint8_t msg_type) { switch (msg_type) { case UCP_WIREUP_MSG_PRE_REQUEST: return "PRE_REQ"; case UCP_WIREUP_MSG_REQUEST: return "REQ"; case UCP_WIREUP_MSG_REPLY: return "REP"; case UCP_WIREUP_MSG_ACK: return "ACK"; default: return ""; } } static ucp_lane_index_t ucp_wireup_get_msg_lane(ucp_ep_h ep, uint8_t msg_type) { ucp_context_h context = ep->worker->context; ucp_ep_config_t *ep_config = ucp_ep_config(ep); ucp_lane_index_t lane = UCP_NULL_LANE; if (msg_type != UCP_WIREUP_MSG_ACK) { /* for request/response, try wireup_lane first */ lane = ep_config->key.wireup_lane; } if (lane == UCP_NULL_LANE) { /* fallback to active messages lane */ lane = ep_config->key.am_lane; } if (lane == UCP_NULL_LANE) { ucs_fatal("ep %p to %s: could not find a lane to send CONN_%s%s", ep, ucp_ep_peer_name(ep), ucp_wireup_msg_str(msg_type), context->config.ext.unified_mode ? ". try to set UCX_UNIFIED_MODE=n." : ""); } return lane; } ucs_status_t ucp_wireup_msg_progress(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_ep_h ep = req->send.ep; ssize_t packed_len; unsigned am_flags; if (req->send.wireup.type == UCP_WIREUP_MSG_REQUEST) { if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) { ucs_trace("ep %p: not sending wireup message - remote already connected", ep); goto out; } } else if (req->send.wireup.type == UCP_WIREUP_MSG_PRE_REQUEST) { ucs_assert (!(ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED)); } /* send the active message */ req->send.lane = ucp_wireup_get_msg_lane(ep, req->send.wireup.type); am_flags = 0; if ((req->send.wireup.type == UCP_WIREUP_MSG_REQUEST) || (req->send.wireup.type == UCP_WIREUP_MSG_PRE_REQUEST)) { am_flags |= UCT_SEND_FLAG_SIGNALED; } VALGRIND_CHECK_MEM_IS_DEFINED(&req->send.wireup, sizeof(req->send.wireup)); VALGRIND_CHECK_MEM_IS_DEFINED(req->send.buffer, req->send.length); packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_WIREUP, ucp_wireup_msg_pack, req, am_flags); if (packed_len < 0) { if (packed_len != UCS_ERR_NO_RESOURCE) { ucs_error("failed to send wireup: %s", ucs_status_string((ucs_status_t)packed_len)); } return (ucs_status_t)packed_len; } switch (req->send.wireup.type) { case UCP_WIREUP_MSG_PRE_REQUEST: ep->flags |= UCP_EP_FLAG_CONNECT_PRE_REQ_SENT; break; case UCP_WIREUP_MSG_REQUEST: ep->flags |= UCP_EP_FLAG_CONNECT_REQ_SENT; break; case UCP_WIREUP_MSG_REPLY: ep->flags |= UCP_EP_FLAG_CONNECT_REP_SENT; break; case UCP_WIREUP_MSG_ACK: ep->flags |= UCP_EP_FLAG_CONNECT_ACK_SENT; break; } out: ucs_free((void*)req->send.buffer); ucs_free(req); return UCS_OK; } static inline int ucp_wireup_is_ep_needed(ucp_ep_h ep) { return (ep != NULL) && !(ep->flags & UCP_EP_FLAG_LISTENER); } /* * @param [in] rsc_tli Resource index for every lane. */ static ucs_status_t ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, uint64_t tl_bitmap, const ucp_lane_index_t *lanes2remote) { ucp_request_t* req; ucs_status_t status; void *address; ucs_assert(ep->cfg_index != (uint8_t)-1); /* We cannot allocate from memory pool because it's not thread safe * and this function may be called from any thread */ req = ucs_malloc(sizeof(*req), "wireup_msg_req"); if (req == NULL) { return UCS_ERR_NO_MEMORY; } req->flags = 0; req->send.ep = ep; req->send.wireup.type = type; req->send.wireup.err_mode = ucp_ep_config(ep)->key.err_mode; req->send.wireup.conn_sn = ep->conn_sn; req->send.wireup.src_ep_ptr = (uintptr_t)ep; if (ep->flags & UCP_EP_FLAG_DEST_EP) { req->send.wireup.dest_ep_ptr = ucp_ep_dest_ep_ptr(ep); } else { req->send.wireup.dest_ep_ptr = 0; } req->send.uct.func = ucp_wireup_msg_progress; req->send.datatype = ucp_dt_make_contig(1); ucp_request_send_state_init(req, ucp_dt_make_contig(1), 0); /* pack all addresses */ status = ucp_address_pack(ep->worker, ucp_wireup_is_ep_needed(ep) ? ep : NULL, tl_bitmap, UCP_ADDRESS_PACK_FLAG_ALL, lanes2remote, &req->send.length, &address); if (status != UCS_OK) { ucs_free(req); ucs_error("failed to pack address: %s", ucs_status_string(status)); return status; } req->send.buffer = address; ucp_request_send(req, 0); return UCS_OK; } static uint64_t ucp_wireup_get_ep_tl_bitmap(ucp_ep_h ep, ucp_lane_map_t lane_map) { uint64_t tl_bitmap = 0; ucp_lane_index_t lane; ucs_for_each_bit(lane, lane_map) { ucs_assert(lane < UCP_MAX_LANES); tl_bitmap |= UCS_BIT(ucp_ep_get_rsc_index(ep, lane)); } return tl_bitmap; } /* * Select remote ep address for every remote address entry (because there * could be multiple ep addresses per entry). This selection is used to create * 'lanes2remote' mapping with the remote lane index for each local lane. */ static void ucp_wireup_match_p2p_lanes(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, const unsigned *addr_indices, ucp_lane_index_t *lanes2remote) { const ucp_address_entry_t *address; unsigned address_index; ucp_lane_index_t lane, remote_lane; unsigned *ep_addr_indexes; unsigned ep_addr_index; uint64_t UCS_V_UNUSED used_remote_lanes; /* Initialize the counters of ep address index for each address entry */ ep_addr_indexes = ucs_alloca(sizeof(ep_addr_index) * remote_address->address_count); for (address_index = 0; address_index < remote_address->address_count; ++address_index) { ep_addr_indexes[address_index] = 0; } /* Initialize lanes2remote array */ for (lane = 0; lane < UCP_MAX_LANES; ++lane) { lanes2remote[lane] = UCP_NULL_LANE; } used_remote_lanes = 0; for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (!ucp_ep_is_lane_p2p(ep, lane)) { continue; } /* Select next remote ep address within the address_index as specified * by addr_indices argument */ address_index = addr_indices[lane]; address = &remote_address->address_list[address_index]; ep_addr_index = ep_addr_indexes[address_index]++; remote_lane = address->ep_addrs[ep_addr_index].lane; lanes2remote[lane] = remote_lane; if (used_remote_lanes & UCS_BIT(remote_lane)) { ucs_fatal("ep %p: remote lane %d is used more than once", ep, remote_lane); } used_remote_lanes |= UCS_BIT(remote_lane); ucs_trace("ep %p: lane[%d]->remote_lane[%d] (address[%d].ep_address[%d])", ep, lane, remote_lane, address_index, ep_addr_index); } } static ucs_status_t ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, const ucp_unpacked_address_t *remote_address, const uct_ep_addr_t **ep_addr_p, const uct_device_addr_t **dev_addr_p) { const ucp_address_entry_t *address; unsigned ep_addr_index; ucp_unpacked_address_for_each(address, remote_address) { for (ep_addr_index = 0; ep_addr_index < address->num_ep_addrs; ++ep_addr_index) { if (remote_lane == address->ep_addrs[ep_addr_index].lane) { *ep_addr_p = address->ep_addrs[ep_addr_index].addr; *dev_addr_p = address->dev_addr; return UCS_OK; } } } return UCS_ERR_UNREACHABLE; } ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, const ucp_lane_index_t *lanes2remote) { ucp_lane_index_t lane, remote_lane; const uct_device_addr_t *dev_addr; const uct_ep_addr_t *ep_addr; ucs_status_t status; ucs_trace("ep %p: connect local transports", ep); for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (!ucp_ep_is_lane_p2p(ep, lane)) { continue; } remote_lane = (lanes2remote == NULL) ? lane : lanes2remote[lane]; status = ucp_wireup_find_remote_p2p_addr(ep, remote_lane, remote_address, &ep_addr, &dev_addr); if (status != UCS_OK) { ucs_error("ep %p: no remote ep address for lane[%d]->remote_lane[%d]", ep, lane, remote_lane); return status; } status = uct_ep_connect_to_ep(ep->uct_eps[lane], dev_addr, ep_addr); if (status != UCS_OK) { return status; } } return UCS_OK; } void ucp_wireup_remote_connected(ucp_ep_h ep) { ucp_lane_index_t lane; if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) { return; } ucs_trace("ep %p: remote connected", ep); ep->flags |= UCP_EP_FLAG_REMOTE_CONNECTED; for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (ucp_ep_is_lane_p2p(ep, lane)) { ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane])); } if (ucp_wireup_ep_test(ep->uct_eps[lane])) { ucp_wireup_ep_remote_connected(ep->uct_eps[lane]); } } ucs_assert(ep->flags & UCP_EP_FLAG_DEST_EP); } static ucs_status_t ucp_wireup_init_lanes_by_request(ucp_worker_h worker, ucp_ep_h ep, unsigned ep_init_flags, const ucp_unpacked_address_t *remote_address, unsigned *addr_indices) { ucs_status_t status = ucp_wireup_init_lanes(ep, ep_init_flags, UINT64_MAX, remote_address, addr_indices); if (status == UCS_OK) { return UCS_OK; } ucp_worker_set_ep_failed(worker, ep, NULL, UCP_NULL_LANE, status); return status; } static UCS_F_NOINLINE void ucp_wireup_process_pre_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, const ucp_unpacked_address_t *remote_address) { unsigned ep_init_flags = UCP_EP_INIT_CREATE_AM_LANE; unsigned addr_indices[UCP_MAX_LANES]; ucs_status_t status; ucp_ep_h ep; ucs_assert(msg->type == UCP_WIREUP_MSG_PRE_REQUEST); ucs_assert(msg->dest_ep_ptr != 0); ucs_trace("got wireup pre_request from 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d", remote_address->uuid, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn); /* wireup pre_request for a specific ep */ ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr); ucs_assert(ep->flags & UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR); ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr); ucp_ep_flush_state_reset(ep); if (ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_PEER) { ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE; } /* initialize transport endpoints */ status = ucp_wireup_init_lanes_by_request(worker, ep, ep_init_flags, remote_address, addr_indices); if (status != UCS_OK) { return; } status = ucp_wireup_send_request(ep); if (status != UCS_OK) { ucp_ep_cleanup_lanes(ep); } } static UCS_F_NOINLINE void ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, const ucp_unpacked_address_t *remote_address) { uint64_t remote_uuid = remote_address->uuid; uint64_t tl_bitmap = 0; int send_reply = 0; unsigned ep_init_flags = 0; ucp_rsc_index_t lanes2remote[UCP_MAX_LANES]; unsigned addr_indices[UCP_MAX_LANES]; ucs_status_t status; ucp_ep_flags_t listener_flag; ucp_ep_h ep; ucs_assert(msg->type == UCP_WIREUP_MSG_REQUEST); ucs_trace("got wireup request from 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d", remote_address->uuid, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn); if (msg->dest_ep_ptr != 0) { /* wireup request for a specific ep */ ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr); ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr); if (!(ep->flags & UCP_EP_FLAG_LISTENER)) { /* Reset flush state only if it's not a client-server wireup on * server side with long address exchange when listener (united with * flush state) should be valid until user's callback invoking */ ucp_ep_flush_state_reset(ep); } ep_init_flags |= UCP_EP_INIT_CREATE_AM_LANE; } else { ep = ucp_ep_match_retrieve_exp(&worker->ep_match_ctx, remote_uuid, msg->conn_sn ^ (remote_uuid == worker->uuid)); if (ep == NULL) { /* Create a new endpoint if does not exist */ status = ucp_ep_new(worker, remote_address->name, "remote-request", &ep); if (status != UCS_OK) { return; } /* add internal endpoint to hash */ ep->conn_sn = msg->conn_sn; ucp_ep_match_insert_unexp(&worker->ep_match_ctx, remote_uuid, ep); } else { ucp_ep_flush_state_reset(ep); } ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr); /* * If the current endpoint already sent a connection request, we have a * "simultaneous connect" situation. In this case, only one of the endpoints * (instead of both) should respect the connect request, otherwise they * will end up being connected to "internal" endpoints on the remote side * instead of each other. We use the uniqueness of worker uuid to decide * which connect request should be ignored. */ if ((ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED) && (remote_uuid > worker->uuid)) { ucs_trace("ep %p: ignoring simultaneous connect request", ep); ep->flags |= UCP_EP_FLAG_CONNECT_REQ_IGNORED; return; } } if (ep->flags & UCP_EP_FLAG_LISTENER) { /* If this is an ep on a listener (server) that received a partial * worker address from the client, then the following lanes initialization * will be done after an aux lane was already created on this ep. * Therefore, remove the existing aux endpoint since will need to create * new lanes now */ ucp_ep_cleanup_lanes(ep); } if (msg->err_mode == UCP_ERR_HANDLING_MODE_PEER) { ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE; } /* Initialize lanes (possible destroy existing lanes) */ status = ucp_wireup_init_lanes_by_request(worker, ep, ep_init_flags, remote_address, addr_indices); if (status != UCS_OK) { return; } ucp_wireup_match_p2p_lanes(ep, remote_address, addr_indices, lanes2remote); /* Send a reply if remote side does not have ep_ptr (active-active flow) or * there are p2p lanes (client-server flow) */ send_reply = (msg->dest_ep_ptr == 0) || ucp_ep_config(ep)->p2p_lanes; /* Connect p2p addresses to remote endpoint */ if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) { status = ucp_wireup_connect_local(ep, remote_address, lanes2remote); if (status != UCS_OK) { return; } tl_bitmap = ucp_wireup_get_ep_tl_bitmap(ep, ucp_ep_config(ep)->p2p_lanes); ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED; ucs_assert(send_reply); } /* mark the endpoint as connected to remote */ if (!ucp_ep_config(ep)->p2p_lanes) { ucp_wireup_remote_connected(ep); } if (send_reply) { listener_flag = ep->flags & UCP_EP_FLAG_LISTENER; /* Remove this flag at this point if it's set * (so that address packing would be correct) */ ep->flags &= ~UCP_EP_FLAG_LISTENER; ucs_trace("ep %p: sending wireup reply", ep); status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REPLY, tl_bitmap, lanes2remote); if (status != UCS_OK) { return; } /* Restore saved flag value */ ep->flags |= listener_flag; } else { /* if in client-server flow, schedule invoking the user's callback * (if server is connected) from the main thread */ if (ucs_test_all_flags(ep->flags, (UCP_EP_FLAG_LISTENER | UCP_EP_FLAG_LOCAL_CONNECTED))) { ucp_listener_schedule_accept_cb(ep); } } } static unsigned ucp_wireup_send_msg_ack(void *arg) { ucp_ep_h ep = (ucp_ep_h)arg; ucp_rsc_index_t rsc_tli[UCP_MAX_LANES]; ucs_status_t status; /* Send ACK without any address, we've already sent it as part of the request */ ucs_trace("ep %p: sending wireup ack", ep); memset(rsc_tli, UCP_NULL_RESOURCE, sizeof(rsc_tli)); status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_ACK, 0, rsc_tli); return (status == UCS_OK); } int ucp_wireup_msg_ack_cb_pred(const ucs_callbackq_elem_t *elem, void *arg) { return ((elem->arg == arg) && (elem->cb == ucp_wireup_send_msg_ack)); } static UCS_F_NOINLINE void ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg, const ucp_unpacked_address_t *remote_address) { uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; ucs_status_t status; ucp_ep_h ep; int ack; ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr); ucs_assert(msg->type == UCP_WIREUP_MSG_REPLY); ucs_assert((!(ep->flags & UCP_EP_FLAG_LISTENER))); ucs_trace("ep %p: got wireup reply src_ep 0x%lx dst_ep 0x%lx sn %d", ep, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn); ucp_ep_match_remove_ep(&worker->ep_match_ctx, ep); ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr); ucp_ep_flush_state_reset(ep); /* Connect p2p addresses to remote endpoint */ if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) { /* * In the wireup reply message, the lane indexes specify which * **receiver** ep lane should be connected to a given ep address. So we * don't pass 'lanes2remote' mapping, and use local lanes directly. */ status = ucp_wireup_connect_local(ep, remote_address, NULL); if (status != UCS_OK) { return; } ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED; ack = 1; } else { ack = 0; } ucp_wireup_remote_connected(ep); if (ack) { /* Send `UCP_WIREUP_MSG_ACK` from progress function * to avoid calling UCT routines from an async thread */ uct_worker_progress_register_safe(worker->uct, ucp_wireup_send_msg_ack, ep, UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id); } } static UCS_F_NOINLINE void ucp_wireup_process_ack(ucp_worker_h worker, const ucp_wireup_msg_t *msg) { ucp_ep_h ep; ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr); ucs_assert(msg->type == UCP_WIREUP_MSG_ACK); ucs_trace("ep %p: got wireup ack", ep); ucs_assert(ep->flags & UCP_EP_FLAG_DEST_EP); ucs_assert(ep->flags & UCP_EP_FLAG_CONNECT_REP_SENT); ucs_assert(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED); ucp_wireup_remote_connected(ep); /* if this ack is received as part of the client-server flow, when handling * a large worker address from the client, invoke the cached user callback * from the main thread */ if (ep->flags & UCP_EP_FLAG_LISTENER) { ucp_listener_schedule_accept_cb(ep); } } static ucs_status_t ucp_wireup_msg_handler(void *arg, void *data, size_t length, unsigned flags) { ucp_worker_h worker = arg; ucp_wireup_msg_t *msg = data; ucp_unpacked_address_t remote_address; ucs_status_t status; UCS_ASYNC_BLOCK(&worker->async); status = ucp_address_unpack(worker, msg + 1, UINT64_MAX, &remote_address); if (status != UCS_OK) { ucs_error("failed to unpack address: %s", ucs_status_string(status)); goto out; } if (msg->type == UCP_WIREUP_MSG_ACK) { ucs_assert(remote_address.address_count == 0); ucp_wireup_process_ack(worker, msg); } else if (msg->type == UCP_WIREUP_MSG_PRE_REQUEST) { ucp_wireup_process_pre_request(worker, msg, &remote_address); } else if (msg->type == UCP_WIREUP_MSG_REQUEST) { ucp_wireup_process_request(worker, msg, &remote_address); } else if (msg->type == UCP_WIREUP_MSG_REPLY) { ucp_wireup_process_reply(worker, msg, &remote_address); } else { ucs_bug("invalid wireup message"); } ucs_free(remote_address.address_list); out: UCS_ASYNC_UNBLOCK(&worker->async); return UCS_OK; } void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep, const char *info) { /* If ep already exists, it's a wireup proxy, and we need to update its * next_ep instead of replacing it. */ if (ep->uct_eps[lane] == NULL) { ucs_trace("ep %p: assign uct_ep[%d]=%p%s", ep, lane, uct_ep, info); ep->uct_eps[lane] = uct_ep; } else { ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane])); ucs_trace("ep %p: wireup uct_ep[%d]=%p next set to %p%s", ep, lane, ep->uct_eps[lane], uct_ep, info); ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); ucp_wireup_ep_remote_connected(ep->uct_eps[lane]); } } static uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane) { uct_ep_h uct_ep = ep->uct_eps[lane]; if ((uct_ep != NULL) && ucp_wireup_ep_test(uct_ep)) { return ucp_wireup_ep_extract_next_ep(uct_ep); } else { ep->uct_eps[lane] = NULL; return uct_ep; } } ucs_status_t ucp_wireup_connect_lane(ucp_ep_h ep, unsigned ep_init_flags, ucp_lane_index_t lane, const ucp_unpacked_address_t *remote_address, unsigned addr_index) { ucp_worker_h worker = ep->worker; int connect_aux; ucp_lane_index_t proxy_lane; ucp_rsc_index_t rsc_index; ucp_worker_iface_t *wiface; uct_ep_params_t uct_ep_params; uct_ep_h uct_ep; ucs_status_t status; ucs_trace("ep %p: connect lane[%d]", ep, lane); ucs_assert(lane != ucp_ep_get_cm_lane(ep)); ucs_assert_always(remote_address != NULL); ucs_assert_always(remote_address->address_list != NULL); ucs_assert_always(addr_index <= remote_address->address_count); proxy_lane = ucp_ep_get_proxy_lane(ep, lane); rsc_index = ucp_ep_get_rsc_index(ep, lane); wiface = ucp_worker_iface(worker, rsc_index); /* * if the selected transport can be connected directly to the remote * interface, just create a connected UCT endpoint. */ if ((wiface->attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) && ((ep->uct_eps[lane] == NULL) || ucp_wireup_ep_test(ep->uct_eps[lane]))) { if ((proxy_lane == UCP_NULL_LANE) || (proxy_lane == lane)) { /* create an endpoint connected to the remote interface */ ucs_trace("ep %p: connect uct_ep[%d] to addr[%d]", ep, lane, addr_index); uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE | UCT_EP_PARAM_FIELD_DEV_ADDR | UCT_EP_PARAM_FIELD_IFACE_ADDR; uct_ep_params.iface = wiface->iface; uct_ep_params.dev_addr = remote_address->address_list[addr_index].dev_addr; uct_ep_params.iface_addr = remote_address->address_list[addr_index].iface_addr; status = uct_ep_create(&uct_ep_params, &uct_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ return status; } ucp_wireup_assign_lane(ep, lane, uct_ep, ""); } ucp_worker_iface_progress_ep(wiface); return UCS_OK; } /* * create a wireup endpoint which will start connection establishment * protocol using an auxiliary transport. */ if (wiface->attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) { /* For now, p2p transports have no reason to have proxy */ ucs_assert_always(proxy_lane == UCP_NULL_LANE); if (ep->uct_eps[lane] == NULL) { status = ucp_wireup_ep_create(ep, &uct_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ return status; } ucs_trace("ep %p: assign uct_ep[%d]=%p wireup", ep, lane, uct_ep); ep->uct_eps[lane] = uct_ep; } else { uct_ep = ep->uct_eps[lane]; ucs_assert(ucp_wireup_ep_test(uct_ep)); } if (!(ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT))) { ucs_trace("ep %p: connect uct_ep[%d]=%p to addr[%d] wireup", ep, lane, uct_ep, addr_index); connect_aux = !(ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT | UCP_EP_INIT_CM_WIREUP_SERVER)) && (lane == ucp_ep_get_wireup_msg_lane(ep)); status = ucp_wireup_ep_connect(ep->uct_eps[lane], ep_init_flags, rsc_index, connect_aux, remote_address); if (status != UCS_OK) { return status; } } ucp_worker_iface_progress_ep(wiface); return UCS_OK; } return UCS_ERR_UNREACHABLE; } ucs_status_t ucp_wireup_resolve_proxy_lanes(ucp_ep_h ep) { ucp_lane_index_t lane, proxy_lane; uct_iface_attr_t *iface_attr; uct_ep_h uct_ep, signaling_ep; ucs_status_t status; /* point proxy endpoints */ for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { proxy_lane = ucp_ep_get_proxy_lane(ep, lane); if (proxy_lane == UCP_NULL_LANE) { continue; } iface_attr = ucp_worker_iface_get_attr(ep->worker, ucp_ep_get_rsc_index(ep, lane)); if (iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) { ucs_assert_always(iface_attr->cap.am.max_short <= iface_attr->cap.am.max_bcopy); } /* Create a signaling ep to the proxy lane */ if (proxy_lane == lane) { /* If proxy is to the same lane, temporarily remove the existing * UCT endpoint in there, so it could be assigned to the signaling * proxy ep. This can also be an endpoint contained inside a wireup * proxy, so ucp_wireup_extract_lane() handles both cases. */ uct_ep = ucp_wireup_extract_lane(ep, proxy_lane); ucs_assert_always(uct_ep != NULL); status = ucp_signaling_ep_create(ep, uct_ep, 1, &signaling_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ uct_ep_destroy(uct_ep); return status; } } else { status = ucp_signaling_ep_create(ep, ep->uct_eps[proxy_lane], 0, &signaling_ep); if (status != UCS_OK) { /* coverity[leaked_storage] */ return status; } } ucs_trace("ep %p: lane[%d]=%p proxy_lane=%d", ep, lane, ep->uct_eps[lane], proxy_lane); ucp_wireup_assign_lane(ep, lane, signaling_ep, " (signaling proxy)"); } return UCS_OK; } static void ucp_wireup_print_config(ucp_context_h context, const ucp_ep_config_key_t *key, const char *title, const unsigned *addr_indices, ucs_log_level_t log_level) { char lane_info[128] = {0}; ucp_lane_index_t lane; if (!ucs_log_is_enabled(log_level)) { return; } ucs_log(log_level, "%s: am_lane %d wireup_lane %d reachable_mds 0x%lx", title, key->am_lane, key->wireup_lane, key->reachable_md_map); for (lane = 0; lane < key->num_lanes; ++lane) { ucp_ep_config_lane_info_str(context, key, addr_indices, lane, UCP_NULL_RESOURCE, lane_info, sizeof(lane_info)); ucs_log(log_level, "%s: %s", title, lane_info); } } int ucp_wireup_is_reachable(ucp_worker_h worker, ucp_rsc_index_t rsc_index, const ucp_address_entry_t *ae) { ucp_context_h context = worker->context; ucp_worker_iface_t *wiface = ucp_worker_iface(worker, rsc_index); return (context->tl_rscs[rsc_index].tl_name_csum == ae->tl_name_csum) && uct_iface_is_reachable(wiface->iface, ae->dev_addr, ae->iface_addr); } static void ucp_wireup_get_reachable_mds(ucp_worker_h worker, const ucp_unpacked_address_t *remote_address, const ucp_ep_config_key_t *prev_key, ucp_ep_config_key_t *key) { ucp_context_h context = worker->context; ucp_rsc_index_t ae_cmpts[UCP_MAX_MDS]; /* component index for each address entry */ const ucp_address_entry_t *ae; ucp_rsc_index_t cmpt_index; ucp_rsc_index_t rsc_index; ucp_md_index_t dst_md_index; ucp_md_map_t ae_dst_md_map, dst_md_map; unsigned num_dst_mds; ae_dst_md_map = 0; ucs_for_each_bit(rsc_index, context->tl_bitmap) { ucp_unpacked_address_for_each(ae, remote_address) { if (ucp_wireup_is_reachable(worker, rsc_index, ae)) { ae_dst_md_map |= UCS_BIT(ae->md_index); dst_md_index = context->tl_rscs[rsc_index].md_index; ae_cmpts[ae->md_index] = context->tl_mds[dst_md_index].cmpt_index; } } } /* merge with previous configuration */ dst_md_map = ae_dst_md_map | prev_key->reachable_md_map; num_dst_mds = 0; ucs_for_each_bit(dst_md_index, dst_md_map) { cmpt_index = UCP_NULL_RESOURCE; /* remote md is reachable by the provided address */ if (UCS_BIT(dst_md_index) & ae_dst_md_map) { cmpt_index = ae_cmpts[dst_md_index]; } /* remote md is reachable by previous ep configuration */ if (UCS_BIT(dst_md_index) & prev_key->reachable_md_map) { cmpt_index = ucp_ep_config_get_dst_md_cmpt(prev_key, dst_md_index); if (UCS_BIT(dst_md_index) & ae_dst_md_map) { /* we expect previous configuration will not conflict with the * new one */ ucs_assert_always(cmpt_index == ae_cmpts[dst_md_index]); } } ucs_assert_always(cmpt_index != UCP_NULL_RESOURCE); key->dst_md_cmpts[num_dst_mds++] = cmpt_index; } ucs_assert(num_dst_mds == ucs_popcount(dst_md_map)); key->reachable_md_map = dst_md_map; } ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, uint64_t local_tl_bitmap, const ucp_unpacked_address_t *remote_address, unsigned *addr_indices) { ucp_worker_h worker = ep->worker; uint64_t tl_bitmap = local_tl_bitmap & worker->context->tl_bitmap; ucp_ep_config_key_t key; ucp_ep_cfg_index_t new_cfg_index; ucp_lane_index_t lane; ucs_status_t status; char str[32]; ucp_wireup_ep_t *cm_wireup_ep; ucs_assert(tl_bitmap != 0); ucs_trace("ep %p: initialize lanes", ep); ucp_ep_config_key_reset(&key); ucp_ep_config_key_set_err_mode(&key, ep_init_flags); status = ucp_wireup_select_lanes(ep, ep_init_flags, tl_bitmap, remote_address, addr_indices, &key); if (status != UCS_OK) { return status; } /* Get all reachable MDs from full remote address list */ key.dst_md_cmpts = ucs_alloca(sizeof(*key.dst_md_cmpts) * UCP_MAX_MDS); ucp_wireup_get_reachable_mds(worker, remote_address, &ucp_ep_config(ep)->key, &key); /* Load new configuration */ status = ucp_worker_get_ep_config(worker, &key, 1, &new_cfg_index); if (status != UCS_OK) { return status; } if (ep->cfg_index == new_cfg_index) { return UCS_OK; /* No change */ } if ((ep->cfg_index != 0) && !ucp_ep_is_sockaddr_stub(ep)) { /* * TODO handle a case where we have to change lanes and reconfigure the ep: * * - if we already have uct ep connected to an address - move it to the new lane index * - if we don't yet have connection to an address - create it * - if an existing lane is not connected anymore - delete it (possibly) * - if the configuration has changed - replay all pending operations on all lanes - * need that every pending callback would return, in case of failure, the number * of lane it wants to be queued on. */ ucs_debug("cannot reconfigure ep %p from [%d] to [%d]", ep, ep->cfg_index, new_cfg_index); ucp_wireup_print_config(worker->context, &ucp_ep_config(ep)->key, "old", NULL, UCS_LOG_LEVEL_ERROR); ucp_wireup_print_config(worker->context, &key, "new", NULL, UCS_LOG_LEVEL_ERROR); ucs_fatal("endpoint reconfiguration not supported yet"); } cm_wireup_ep = ucp_ep_get_cm_wireup_ep(ep); ep->cfg_index = new_cfg_index; ep->am_lane = key.am_lane; snprintf(str, sizeof(str), "ep %p", ep); ucp_wireup_print_config(worker->context, &ucp_ep_config(ep)->key, str, addr_indices, UCS_LOG_LEVEL_DEBUG); /* establish connections on all underlying endpoints */ for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (ucp_ep_get_cm_lane(ep) == lane) { /* restore the cm lane after reconfiguration */ ep->uct_eps[lane] = &cm_wireup_ep->super.super; continue; } status = ucp_wireup_connect_lane(ep, ep_init_flags, lane, remote_address, addr_indices[lane]); if (status != UCS_OK) { return status; } } status = ucp_wireup_resolve_proxy_lanes(ep); if (status != UCS_OK) { return status; } /* If we don't have a p2p transport, we're connected */ if (!ucp_ep_config(ep)->p2p_lanes) { ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED; } return UCS_OK; } ucs_status_t ucp_wireup_send_request(ucp_ep_h ep) { ucp_rsc_index_t rsc_index; ucs_status_t status; uint64_t tl_bitmap; tl_bitmap = ucp_wireup_get_ep_tl_bitmap(ep, UCS_MASK(ucp_ep_num_lanes(ep))); /* TODO make sure such lane would exist */ rsc_index = ucp_wireup_ep_get_aux_rsc_index( ep->uct_eps[ucp_ep_get_wireup_msg_lane(ep)]); if (rsc_index != UCP_NULL_RESOURCE) { tl_bitmap |= UCS_BIT(rsc_index); } ucs_debug("ep %p: send wireup request (flags=0x%x)", ep, ep->flags); status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REQUEST, tl_bitmap, NULL); ep->flags |= UCP_EP_FLAG_CONNECT_REQ_QUEUED; return status; } static void ucp_wireup_connect_remote_purge_cb(uct_pending_req_t *self, void *arg) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucs_queue_head_t *queue = arg; ucs_trace_req("ep %p: extracted request %p from pending queue", req->send.ep, req); ucs_queue_push(queue, (ucs_queue_elem_t*)&req->send.uct.priv); } ucs_status_t ucp_wireup_send_pre_request(ucp_ep_h ep) { ucp_rsc_index_t rsc_tli[UCP_MAX_LANES]; uint64_t tl_bitmap = UINT64_MAX; /* pack full worker address */ ucs_status_t status; ucs_assert(ep->flags & UCP_EP_FLAG_LISTENER); ucs_assert(!(ep->flags & UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED)); memset(rsc_tli, UCP_NULL_RESOURCE, sizeof(rsc_tli)); ucs_debug("ep %p: send wireup pre-request (flags=0x%x)", ep, ep->flags); status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_PRE_REQUEST, tl_bitmap, rsc_tli); ep->flags |= UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED; return status; } ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane) { ucs_queue_head_t tmp_q; ucs_status_t status; ucp_request_t *req; uct_ep_h uct_ep; ucs_trace("ep %p: connect lane %d to remote peer", ep, lane); ucs_assert(lane != UCP_NULL_LANE); UCS_ASYNC_BLOCK(&ep->worker->async); /* checking again, with lock held, if already connected or connection is * in progress */ if ((ep->flags & UCP_EP_FLAG_DEST_EP) || ucp_wireup_ep_test(ep->uct_eps[lane])) { status = UCS_OK; goto out_unlock; } if (ucp_proxy_ep_test(ep->uct_eps[lane])) { /* signaling ep is not needed now since we will send wireup request * with signaling flag */ uct_ep = ucp_proxy_ep_extract(ep->uct_eps[lane]); uct_ep_destroy(ep->uct_eps[lane]); } else { uct_ep = ep->uct_eps[lane]; } ucs_assert(!(ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED)); ucs_trace("ep %p: connect lane %d to remote peer with wireup ep", ep, lane); /* make ep->uct_eps[lane] a stub */ status = ucp_wireup_ep_create(ep, &ep->uct_eps[lane]); if (status != UCS_OK) { goto err; } /* Extract all pending requests from the transport endpoint, otherwise they * will prevent the wireup message from being sent (because those requests * could not be progressed any more after switching to wireup proxy). */ ucs_queue_head_init(&tmp_q); uct_ep_pending_purge(uct_ep, ucp_wireup_connect_remote_purge_cb, &tmp_q); /* the wireup ep should use the existing [am_lane] as next_ep */ ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); if (!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED)) { status = ucp_wireup_send_request(ep); if (status != UCS_OK) { goto err_destroy_wireup_ep; } } ucs_queue_for_each_extract(req, &tmp_q, send.uct.priv, 1) { ucs_trace_req("ep %p: requeue request %p after wireup request", req->send.ep, req); status = uct_ep_pending_add(ep->uct_eps[lane], &req->send.uct, (req->send.uct.func == ucp_wireup_msg_progress) || (req->send.uct.func == ucp_wireup_ep_progress_pending) ? UCT_CB_FLAG_ASYNC : 0); if (status != UCS_OK) { ucs_fatal("wireup proxy function must always return UCS_OK"); } } goto out_unlock; err_destroy_wireup_ep: uct_ep_destroy(ep->uct_eps[lane]); err: ep->uct_eps[lane] = uct_ep; /* restore am lane */ out_unlock: UCS_ASYNC_UNBLOCK(&ep->worker->async); return status; } static void ucp_wireup_msg_dump(ucp_worker_h worker, uct_am_trace_type_t type, uint8_t id, const void *data, size_t length, char *buffer, size_t max) { ucp_context_h context = worker->context; const ucp_wireup_msg_t *msg = data; ucp_unpacked_address_t unpacked_address; const ucp_address_entry_t *ae; ucp_tl_resource_desc_t *rsc; unsigned ep_addr_index; ucs_status_t status; char *p, *end; ucp_rsc_index_t tl; status = ucp_address_unpack(worker, msg + 1, ~UCP_ADDRESS_PACK_FLAG_TRACE, &unpacked_address); if (status != UCS_OK) { strncpy(unpacked_address.name, "", UCP_WORKER_NAME_MAX); unpacked_address.uuid = 0; unpacked_address.address_count = 0; unpacked_address.address_list = NULL; } p = buffer; end = buffer + max; snprintf(p, end - p, "WIREUP %s [%s uuid 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d]", ucp_wireup_msg_str(msg->type), unpacked_address.name, unpacked_address.uuid, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn); p += strlen(p); if (unpacked_address.address_list == NULL) { return; /* No addresses were unpacked */ } ucp_unpacked_address_for_each(ae, &unpacked_address) { ucs_for_each_bit(tl, context->tl_bitmap) { rsc = &context->tl_rscs[tl]; if (ae->tl_name_csum == rsc->tl_name_csum) { snprintf(p, end - p, " "UCT_TL_RESOURCE_DESC_FMT, UCT_TL_RESOURCE_DESC_ARG(&rsc->tl_rsc)); p += strlen(p); break; } } snprintf(p, end - p, "/md[%d]", ae->md_index); p += strlen(p); for (ep_addr_index = 0; ep_addr_index < ae->num_ep_addrs; ++ep_addr_index) { snprintf(p, end - p, "/lane[%d]", ae->ep_addrs[ep_addr_index].lane); p += strlen(p); } } ucs_free(unpacked_address.address_list); } int ucp_worker_iface_is_tl_p2p(const uct_iface_attr_t *iface_attr) { uint64_t flags = iface_attr->cap.flags; return (flags & UCT_IFACE_FLAG_CONNECT_TO_EP) && !(flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE); } static ucp_err_handling_mode_t ucp_ep_params_err_handling_mode(const ucp_ep_params_t *params) { return (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) ? params->err_mode : UCP_ERR_HANDLING_MODE_NONE; } unsigned ucp_ep_init_flags(const ucp_worker_h worker, const ucp_ep_params_t *params) { unsigned flags = ucp_cm_ep_init_flags(worker, params); if (!ucp_worker_sockaddr_is_cm_proto(worker) && (params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR)) { flags |= UCP_EP_INIT_CREATE_AM_LANE; } if (ucp_ep_params_err_handling_mode(params) == UCP_ERR_HANDLING_MODE_PEER) { flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE; } return flags; } UCP_DEFINE_AM(UINT64_MAX, UCP_AM_ID_WIREUP, ucp_wireup_msg_handler, ucp_wireup_msg_dump, UCT_CB_FLAG_ASYNC);