/** * Copyright (C) Mellanox Technologies Ltd. 2001-2015. ALL RIGHTS RESERVED. * Copyright (C) ARM Ltd. 2016-2017. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "ucp_am.h" #include "ucp_worker.h" #include "ucp_mm.h" #include "ucp_request.inl" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define UCP_WORKER_HEADROOM_SIZE \ (sizeof(ucp_recv_desc_t) + UCP_WORKER_HEADROOM_PRIV_SIZE) typedef enum ucp_worker_event_fd_op { UCP_WORKER_EPFD_OP_ADD, UCP_WORKER_EPFD_OP_DEL } ucp_worker_event_fd_op_t; #if ENABLE_STATS static ucs_stats_class_t ucp_worker_tm_offload_stats_class = { .name = "tag_offload", .num_counters = UCP_WORKER_STAT_TAG_OFFLOAD_LAST, .counter_names = { [UCP_WORKER_STAT_TAG_OFFLOAD_POSTED] = "posted", [UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED] = "matched", [UCP_WORKER_STAT_TAG_OFFLOAD_MATCHED_SW_RNDV] = "matched_sw_rndv", [UCP_WORKER_STAT_TAG_OFFLOAD_CANCELED] = "canceled", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_TAG_EXCEED] = "block_tag_exceed", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NON_CONTIG] = "block_non_contig", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_WILDCARD] = "block_wildcard", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_SW_PEND] = "block_sw_pend", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_NO_IFACE] = "block_no_iface", [UCP_WORKER_STAT_TAG_OFFLOAD_BLOCK_MEM_REG] = "block_mem_reg", [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_EGR] = "rx_unexp_egr", [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_RNDV] = "rx_unexp_rndv", [UCP_WORKER_STAT_TAG_OFFLOAD_RX_UNEXP_SW_RNDV] = "rx_unexp_sw_rndv" } }; static ucs_stats_class_t ucp_worker_stats_class = { .name = "ucp_worker", .num_counters = UCP_WORKER_STAT_LAST, .counter_names = { [UCP_WORKER_STAT_TAG_RX_EAGER_MSG] = "rx_eager_msg", [UCP_WORKER_STAT_TAG_RX_EAGER_SYNC_MSG] = "rx_sync_msg", [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_EXP] = "rx_eager_chunk_exp", [UCP_WORKER_STAT_TAG_RX_EAGER_CHUNK_UNEXP] = "rx_eager_chunk_unexp", [UCP_WORKER_STAT_TAG_RX_RNDV_EXP] = "rx_rndv_rts_exp", [UCP_WORKER_STAT_TAG_RX_RNDV_UNEXP] = "rx_rndv_rts_unexp" } }; #endif ucs_mpool_ops_t ucp_am_mpool_ops = { .chunk_alloc = ucs_mpool_hugetlb_malloc, .chunk_release = ucs_mpool_hugetlb_free, .obj_init = ucs_empty_function, .obj_cleanup = ucs_empty_function }; ucs_mpool_ops_t ucp_reg_mpool_ops = { .chunk_alloc = ucp_reg_mpool_malloc, .chunk_release = ucp_reg_mpool_free, .obj_init = ucp_mpool_obj_init, .obj_cleanup = ucs_empty_function }; ucs_mpool_ops_t ucp_frag_mpool_ops = { .chunk_alloc = ucp_frag_mpool_malloc, .chunk_release = ucp_frag_mpool_free, .obj_init = ucp_mpool_obj_init, .obj_cleanup = ucs_empty_function }; void ucp_worker_iface_check_events(ucp_worker_iface_t *wiface, int force); static UCS_F_ALWAYS_INLINE double ucp_worker_iface_latency(ucp_worker_h worker, ucp_worker_iface_t *wiface) { return wiface->attr.latency.overhead + wiface->attr.latency.growth * worker->context->config.est_num_eps; } static ucs_status_t ucp_worker_wakeup_ctl_fd(ucp_worker_h worker, ucp_worker_event_fd_op_t op, int event_fd) { ucs_event_set_type_t events = UCS_EVENT_SET_EVREAD; ucs_status_t status; if (!(worker->context->config.features & UCP_FEATURE_WAKEUP)) { return UCS_OK; } if (worker->flags & UCP_WORKER_FLAG_EDGE_TRIGGERED) { events |= UCS_EVENT_SET_EDGE_TRIGGERED; } switch (op) { case UCP_WORKER_EPFD_OP_ADD: status = ucs_event_set_add(worker->event_set, event_fd, events, worker->user_data); break; case UCP_WORKER_EPFD_OP_DEL: status = ucs_event_set_del(worker->event_set, event_fd); break; default: ucs_bug("Unknown operation (%d) was passed", op); status = UCS_ERR_INVALID_PARAM; break; } return status; } static void ucp_worker_set_am_handlers(ucp_worker_iface_t *wiface, int is_proxy) { ucp_worker_h worker = wiface->worker; ucp_context_h context = worker->context; ucs_status_t status; unsigned am_id; ucs_trace_func("iface=%p is_proxy=%d", wiface->iface, is_proxy); for (am_id = 0; am_id < UCP_AM_ID_LAST; ++am_id) { if (!(wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_ZCOPY))) { continue; } if (!(context->config.features & ucp_am_handlers[am_id].features)) { continue; } if (!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC) && !(wiface->attr.cap.flags & UCT_IFACE_FLAG_CB_SYNC)) { /* Do not register a sync callback on interface which does not * support it. The transport selection logic should not use async * transports for protocols with sync active message handlers. */ continue; } if (is_proxy && (ucp_am_handlers[am_id].proxy_cb != NULL)) { /* we care only about sync active messages, and this also makes sure * the counter is not accessed from another thread. */ ucs_assert(!(ucp_am_handlers[am_id].flags & UCT_CB_FLAG_ASYNC)); status = uct_iface_set_am_handler(wiface->iface, am_id, ucp_am_handlers[am_id].proxy_cb, wiface, ucp_am_handlers[am_id].flags); } else { status = uct_iface_set_am_handler(wiface->iface, am_id, ucp_am_handlers[am_id].cb, worker, ucp_am_handlers[am_id].flags); } if (status != UCS_OK) { ucs_fatal("failed to set active message handler id %d: %s", am_id, ucs_status_string(status)); } } } static ucs_status_t ucp_stub_am_handler(void *arg, void *data, size_t length, unsigned flags) { ucp_worker_h worker = arg; ucs_trace("worker %p: drop message", worker); return UCS_OK; } static void ucp_worker_remove_am_handlers(ucp_worker_h worker) { ucp_context_h context = worker->context; ucp_worker_iface_t *wiface; ucp_rsc_index_t iface_id; unsigned am_id; ucs_debug("worker %p: remove active message handlers", worker); for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; if (!(wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT | UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_ZCOPY))) { continue; } for (am_id = 0; am_id < UCP_AM_ID_LAST; ++am_id) { if (context->config.features & ucp_am_handlers[am_id].features) { (void)uct_iface_set_am_handler(wiface->iface, am_id, ucp_stub_am_handler, worker, UCT_CB_FLAG_ASYNC); } } } } static void ucp_worker_am_tracer(void *arg, uct_am_trace_type_t type, uint8_t id, const void *data, size_t length, char *buffer, size_t max) { ucp_worker_h worker = arg; ucp_am_tracer_t tracer; if (id < UCP_AM_ID_LAST) { tracer = ucp_am_handlers[id].tracer; if (tracer != NULL) { tracer(worker, type, id, data, length, buffer, max); } } } static ucs_status_t ucp_worker_wakeup_init(ucp_worker_h worker, const ucp_worker_params_t *params) { ucp_context_h context = worker->context; unsigned events; ucs_status_t status; if (!(context->config.features & UCP_FEATURE_WAKEUP)) { worker->event_fd = -1; worker->event_set = NULL; worker->eventfd = -1; worker->uct_events = 0; status = UCS_OK; goto out; } if (params->field_mask & UCP_WORKER_PARAM_FIELD_EVENTS) { events = params->events; } else { events = UCP_WAKEUP_RMA | UCP_WAKEUP_AMO | UCP_WAKEUP_TAG_SEND | UCP_WAKEUP_TAG_RECV | UCP_WAKEUP_TX | UCP_WAKEUP_RX; } if (params->field_mask & UCP_WORKER_PARAM_FIELD_EVENT_FD) { worker->flags |= UCP_WORKER_FLAG_EXTERNAL_EVENT_FD; status = ucs_event_set_create_from_fd(&worker->event_set, params->event_fd); } else { status = ucs_event_set_create(&worker->event_set); } if (status != UCS_OK) { goto out; } status = ucs_event_set_fd_get(worker->event_set, &worker->event_fd); if (status != UCS_OK) { goto err_cleanup_event_set; } if (events & UCP_WAKEUP_EDGE) { worker->flags |= UCP_WORKER_FLAG_EDGE_TRIGGERED; } worker->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (worker->eventfd == -1) { ucs_error("Failed to create event fd: %m"); status = UCS_ERR_IO_ERROR; goto err_cleanup_event_set; } ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD, worker->eventfd); worker->uct_events = 0; /* FIXME: any TAG flag initializes all types of completion because of * possible issues in RNDV protocol. The optimization may be * implemented with using of separated UCP descriptors or manual * signaling in RNDV and similar cases, see conversation in PR #1277 */ if ((events & UCP_WAKEUP_TAG_SEND) || ((events & UCP_WAKEUP_TAG_RECV) && (context->config.ext.rndv_thresh != UCS_MEMUNITS_INF))) { worker->uct_events |= UCT_EVENT_SEND_COMP; } if (events & (UCP_WAKEUP_TAG_RECV | UCP_WAKEUP_RX)) { worker->uct_events |= UCT_EVENT_RECV; } if (events & (UCP_WAKEUP_RMA | UCP_WAKEUP_AMO | UCP_WAKEUP_TX)) { worker->uct_events |= UCT_EVENT_SEND_COMP; } return UCS_OK; err_cleanup_event_set: ucs_event_set_cleanup(worker->event_set); worker->event_set = NULL; worker->event_fd = -1; out: return status; } static void ucp_worker_wakeup_cleanup(ucp_worker_h worker) { if (worker->event_set != NULL) { ucs_assert(worker->event_fd != -1); ucs_event_set_cleanup(worker->event_set); worker->event_set = NULL; worker->event_fd = -1; } if (worker->eventfd != -1) { close(worker->eventfd); } } static void ucp_worker_iface_disarm(ucp_worker_iface_t *wiface) { ucs_status_t status; if (wiface->flags & UCP_WORKER_IFACE_FLAG_ON_ARM_LIST) { status = ucp_worker_wakeup_ctl_fd(wiface->worker, UCP_WORKER_EPFD_OP_DEL, wiface->event_fd); ucs_assert_always(status == UCS_OK); ucs_list_del(&wiface->arm_list); wiface->flags &= ~UCP_WORKER_IFACE_FLAG_ON_ARM_LIST; } } static ucs_status_t ucp_worker_wakeup_signal_fd(ucp_worker_h worker) { uint64_t dummy = 1; int ret; ucs_trace_func("worker=%p fd=%d", worker, worker->eventfd); do { ret = write(worker->eventfd, &dummy, sizeof(dummy)); if (ret == sizeof(dummy)) { return UCS_OK; } else if (ret == -1) { if (errno == EAGAIN) { return UCS_OK; } else if (errno != EINTR) { ucs_error("Signaling wakeup failed: %m"); return UCS_ERR_IO_ERROR; } } else { ucs_assert(ret == 0); } } while (ret == 0); return UCS_OK; } void ucp_worker_signal_internal(ucp_worker_h worker) { if (worker->context->config.features & UCP_FEATURE_WAKEUP) { ucp_worker_wakeup_signal_fd(worker); } } static unsigned ucp_worker_iface_err_handle_progress(void *arg) { ucp_worker_err_handle_arg_t *err_handle_arg = arg; ucp_worker_h worker = err_handle_arg->worker; ucp_ep_h ucp_ep = err_handle_arg->ucp_ep; uct_ep_h uct_ep = err_handle_arg->uct_ep; ucs_status_t status = err_handle_arg->status; ucp_lane_index_t failed_lane = err_handle_arg->failed_lane; ucp_lane_index_t lane; ucp_ep_config_key_t key; UCS_ASYNC_BLOCK(&worker->async); ucs_debug("ep %p: handle error on lane[%d]=%p: %s", ucp_ep, failed_lane, uct_ep, ucs_status_string(status)); ucs_assert(ucp_ep->flags & UCP_EP_FLAG_FAILED); /* Destroy all lanes except failed one since ucp_ep becomes unusable as well */ for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) { if (ucp_ep->uct_eps[lane] == NULL) { continue; } /* Purge pending queue */ ucs_trace("ep %p: purge pending on uct_ep[%d]=%p", ucp_ep, lane, ucp_ep->uct_eps[lane]); uct_ep_pending_purge(ucp_ep->uct_eps[lane], ucp_ep_err_pending_purge, UCS_STATUS_PTR(status)); if (lane != failed_lane) { ucs_trace("ep %p: destroy uct_ep[%d]=%p", ucp_ep, lane, ucp_ep->uct_eps[lane]); uct_ep_destroy(ucp_ep->uct_eps[lane]); ucp_ep->uct_eps[lane] = NULL; } } /* Move failed lane to index 0 */ if ((failed_lane != 0) && (failed_lane != UCP_NULL_LANE)) { ucp_ep->uct_eps[0] = ucp_ep->uct_eps[failed_lane]; ucp_ep->uct_eps[failed_lane] = NULL; } /* NOTE: if failed ep is wireup auxiliary/sockaddr then we need to replace * the lane with failed ep and destroy wireup ep */ if (ucp_ep->uct_eps[0] != uct_ep) { ucs_assert(ucp_wireup_ep_is_owner(ucp_ep->uct_eps[0], uct_ep)); ucp_wireup_ep_disown(ucp_ep->uct_eps[0], uct_ep); ucs_trace("ep %p: destroy failed wireup ep %p", ucp_ep, ucp_ep->uct_eps[0]); uct_ep_destroy(ucp_ep->uct_eps[0]); ucp_ep->uct_eps[0] = uct_ep; } /* Redirect all lanes to failed one */ key = ucp_ep_config(ucp_ep)->key; key.am_lane = 0; key.wireup_lane = 0; key.tag_lane = 0; key.rma_lanes[0] = 0; key.rma_bw_lanes[0] = 0; key.amo_lanes[0] = 0; key.lanes[0].rsc_index = UCP_NULL_RESOURCE; key.num_lanes = 1; key.status = status; status = ucp_worker_get_ep_config(worker, &key, 0, &ucp_ep->cfg_index); if (status != UCS_OK) { ucs_fatal("ep %p: could not change configuration to error state: %s", ucp_ep, ucs_status_string(status)); } ucp_ep->am_lane = 0; if (!(ucp_ep->flags & UCP_EP_FLAG_USED)) { ucs_debug("ep %p: destroy internal endpoint due to peer failure", ucp_ep); ucp_ep_disconnected(ucp_ep, 1); } else { ucp_ep_invoke_err_cb(ucp_ep, key.status); } ucs_free(err_handle_arg); UCS_ASYNC_UNBLOCK(&worker->async); return 1; } int ucp_worker_err_handle_remove_filter(const ucs_callbackq_elem_t *elem, void *arg) { ucp_worker_err_handle_arg_t *err_handle_arg = elem->arg; if ((elem->cb == ucp_worker_iface_err_handle_progress) && (err_handle_arg->ucp_ep == arg)) { /* release err handling argument to avoid memory leak */ ucs_free(err_handle_arg); return 1; } return 0; } ucs_status_t ucp_worker_set_ep_failed(ucp_worker_h worker, ucp_ep_h ucp_ep, uct_ep_h uct_ep, ucp_lane_index_t lane, ucs_status_t status) { uct_worker_cb_id_t prog_id = UCS_CALLBACKQ_ID_NULL; ucs_status_t ret_status = UCS_OK; ucp_rsc_index_t rsc_index; uct_tl_resource_desc_t *tl_rsc; ucp_worker_err_handle_arg_t *err_handle_arg; if (ucp_ep->flags & UCP_EP_FLAG_FAILED) { goto out_ok; } /* In case if this is a local failure we need to notify remote side */ if (ucp_ep_is_cm_local_connected(ucp_ep)) { ucp_ep_cm_disconnect_cm_lane(ucp_ep); } /* set endpoint to failed to prevent wireup_ep switch */ ucp_ep->flags |= UCP_EP_FLAG_FAILED; if (ucp_ep_config(ucp_ep)->key.err_mode == UCP_ERR_HANDLING_MODE_NONE) { /* NOTE: if user has not requested error handling on the endpoint, * the failure is considered unhandled */ ret_status = status; goto out; } err_handle_arg = ucs_malloc(sizeof(*err_handle_arg), "ucp_worker_err_handle_arg"); if (err_handle_arg == NULL) { ucs_error("failed to allocate ucp_worker_err_handle_arg"); ret_status = UCS_ERR_NO_MEMORY; goto out; } err_handle_arg->worker = worker; err_handle_arg->ucp_ep = ucp_ep; err_handle_arg->uct_ep = uct_ep; err_handle_arg->status = status; err_handle_arg->failed_lane = lane; /* invoke the rest of the error handling flow from the main thread */ uct_worker_progress_register_safe(worker->uct, ucp_worker_iface_err_handle_progress, err_handle_arg, UCS_CALLBACKQ_FLAG_ONESHOT, &prog_id); if ((ucp_ep_ext_gen(ucp_ep)->err_cb == NULL) && (ucp_ep->flags & UCP_EP_FLAG_USED)) { if (lane != UCP_NULL_LANE) { rsc_index = ucp_ep_get_rsc_index(ucp_ep, lane); tl_rsc = &worker->context->tl_rscs[rsc_index].tl_rsc; ucs_error("error '%s' will not be handled for ep %p - " UCT_TL_RESOURCE_DESC_FMT " since no error callback is installed", ucs_status_string(status), ucp_ep, UCT_TL_RESOURCE_DESC_ARG(tl_rsc)); } else { ucs_assert(uct_ep == NULL); ucs_error("error '%s' occurred on wireup will not be handled for ep %p " "since no error callback is installed", ucs_status_string(status), ucp_ep); } ret_status = status; goto out; } out_ok: ret_status = UCS_OK; out: /* If the worker supports the UCP_FEATURE_WAKEUP feature, signal the user so * that he can wake-up on this event */ ucp_worker_signal_internal(worker); return ret_status; } static ucs_status_t ucp_worker_iface_error_handler(void *arg, uct_ep_h uct_ep, ucs_status_t status) { ucp_worker_h worker = (ucp_worker_h)arg; ucp_lane_index_t lane; ucs_status_t ret_status; ucp_ep_ext_gen_t *ep_ext; ucp_ep_h ucp_ep; UCS_ASYNC_BLOCK(&worker->async); ucs_debug("worker %p: error handler called for uct_ep %p: %s", worker, uct_ep, ucs_status_string(status)); /* TODO: need to optimize uct_ep -> ucp_ep lookup */ ucs_list_for_each(ep_ext, &worker->all_eps, ep_list) { ucp_ep = ucp_ep_from_ext_gen(ep_ext); for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) { if ((uct_ep == ucp_ep->uct_eps[lane]) || ucp_wireup_ep_is_owner(ucp_ep->uct_eps[lane], uct_ep)) { ret_status = ucp_worker_set_ep_failed(worker, ucp_ep, uct_ep, lane, status); UCS_ASYNC_UNBLOCK(&worker->async); return ret_status; } } } ucs_error("no uct_ep_h %p associated with ucp_ep_h on ucp_worker_h %p", uct_ep, worker); UCS_ASYNC_UNBLOCK(&worker->async); return UCS_ERR_NO_ELEM; } void ucp_worker_iface_activate(ucp_worker_iface_t *wiface, unsigned uct_flags) { ucp_worker_h worker = wiface->worker; ucs_status_t status; ucs_trace("activate iface %p acount=%u aifaces=%u", wiface->iface, wiface->activate_count, worker->num_active_ifaces); if (wiface->activate_count++ > 0) { return; /* was already activated */ } /* Stop ongoing activation process, if such exists */ uct_worker_progress_unregister_safe(worker->uct, &wiface->check_events_id); /* Set default active message handlers */ ucp_worker_set_am_handlers(wiface, 0); /* Add to user wakeup */ if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) { status = ucp_worker_wakeup_ctl_fd(worker, UCP_WORKER_EPFD_OP_ADD, wiface->event_fd); ucs_assert_always(status == UCS_OK); wiface->flags |= UCP_WORKER_IFACE_FLAG_ON_ARM_LIST; ucs_list_add_tail(&worker->arm_ifaces, &wiface->arm_list); } ++worker->num_active_ifaces; uct_iface_progress_enable(wiface->iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV | uct_flags); } static void ucp_worker_iface_deactivate(ucp_worker_iface_t *wiface, int force) { ucs_trace("deactivate iface %p force=%d acount=%u aifaces=%u", wiface->iface, force, wiface->activate_count, wiface->worker->num_active_ifaces); if (!force) { ucs_assert(wiface->activate_count > 0); if (--wiface->activate_count > 0) { return; /* not completely deactivated yet */ } --wiface->worker->num_active_ifaces; } /* Avoid progress on the interface to reduce overhead */ uct_iface_progress_disable(wiface->iface, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); /* Remove from user wakeup */ ucp_worker_iface_disarm(wiface); /* Set proxy active message handlers to count receives */ ucp_worker_set_am_handlers(wiface, 1); /* Prepare for next receive event */ ucp_worker_iface_check_events(wiface, force); } void ucp_worker_iface_progress_ep(ucp_worker_iface_t *wiface) { ucs_trace_func("iface=%p", wiface->iface); UCS_ASYNC_BLOCK(&wiface->worker->async); /* This function may be called from progress thread (such as when processing * wireup messages), so ask UCT to be thread-safe. */ ucp_worker_iface_activate(wiface, UCT_PROGRESS_THREAD_SAFE); UCS_ASYNC_UNBLOCK(&wiface->worker->async); } void ucp_worker_iface_unprogress_ep(ucp_worker_iface_t *wiface) { ucs_trace_func("iface=%p", wiface->iface); UCS_ASYNC_BLOCK(&wiface->worker->async); ucp_worker_iface_deactivate(wiface, 0); UCS_ASYNC_UNBLOCK(&wiface->worker->async); } /* * If active messages were received by am proxy handler, activate the interface. * Otherwise, arm the interface event and make sure that when an active message * is received in the future, the interface would be activated. */ static ucs_status_t ucp_worker_iface_check_events_do(ucp_worker_iface_t *wiface, unsigned *progress_count) { unsigned prev_recv_count; ucs_status_t status; ucs_trace_func("wiface=%p iface=%p", wiface, wiface->iface); if (wiface->activate_count > 0) { ucs_trace("iface %p already activated", wiface->iface); *progress_count = 0; return UCS_OK; } prev_recv_count = wiface->proxy_recv_count; *progress_count = uct_iface_progress(wiface->iface); if (prev_recv_count != wiface->proxy_recv_count) { /* Received relevant active messages, activate the interface */ ucp_worker_iface_activate(wiface, 0); return UCS_OK; } else if (*progress_count == 0) { /* Arm the interface to wait for next event */ ucs_assert(wiface->attr.cap.flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS); status = uct_iface_event_arm(wiface->iface, UCP_WORKER_UCT_RECV_EVENT_ARM_FLAGS); if (status == UCS_OK) { ucs_trace("armed iface %p", wiface->iface); /* re-enable events, which were disabled by ucp_suspended_iface_event() */ status = ucs_async_modify_handler(wiface->event_fd, UCS_EVENT_SET_EVREAD); if (status != UCS_OK) { ucs_fatal("failed to modify %d event handler to UCS_EVENT_SET_EVREAD: %s", wiface->event_fd, ucs_status_string(status)); } return UCS_OK; } else if (status != UCS_ERR_BUSY) { ucs_fatal("failed to arm iface %p: %s", wiface->iface, ucs_status_string(status)); } else { ucs_trace("arm iface %p returned BUSY", wiface->iface); return UCS_ERR_BUSY; } } else { ucs_trace("wiface %p progress returned %u, but no active messages were received", wiface, *progress_count); return UCS_ERR_BUSY; } } static unsigned ucp_worker_iface_check_events_progress(void *arg) { ucp_worker_iface_t *wiface = arg; ucp_worker_h worker = wiface->worker; unsigned progress_count; ucs_status_t status; ucs_trace_func("iface=%p, worker=%p", wiface->iface, worker); /* Check if we either had active messages or were able to arm the interface. * In these cases, the work is done and this progress callback can be removed. */ UCS_ASYNC_BLOCK(&worker->async); status = ucp_worker_iface_check_events_do(wiface, &progress_count); if (status == UCS_OK) { uct_worker_progress_unregister_safe(worker->uct, &wiface->check_events_id); } UCS_ASYNC_UNBLOCK(&worker->async); return progress_count; } void ucp_worker_iface_check_events(ucp_worker_iface_t *wiface, int force) { unsigned progress_count; ucs_status_t status; ucs_trace_func("iface=%p, force=%d", wiface->iface, force); if (force) { do { /* coverity wrongly resolves rc's progress to ucp_listener_conn_request_progress * which in turn releases wiface->iface. this leads coverity to assume * that ucp_worker_iface_check_events_do() dereferences a freed pointer * in the subsequent call in the following loop */ /* coverity[freed_arg] */ status = ucp_worker_iface_check_events_do(wiface, &progress_count); ucs_assert(progress_count == 0); } while (status == UCS_ERR_BUSY); ucs_assert(status == UCS_OK); } else { /* Check events on the main progress loop, to make this function safe to * call from async context, and avoid starvation of other progress callbacks. */ uct_worker_progress_register_safe(wiface->worker->uct, ucp_worker_iface_check_events_progress, wiface, 0, &wiface->check_events_id); } } void ucp_worker_iface_event(int fd, void *arg) { ucp_worker_iface_t *wiface = arg; ucp_worker_h worker = wiface->worker; ucs_status_t status; ucs_trace_func("fd=%d iface=%p", fd, wiface->iface); status = ucs_async_modify_handler(wiface->event_fd, 0); if (status != UCS_OK) { ucs_fatal("failed to modify %d event handler to : %s", wiface->event_fd, ucs_status_string(status)); } /* Do more work on the main thread */ ucp_worker_iface_check_events(wiface, 0); /* Signal user wakeup, to report the first message on the interface */ ucp_worker_signal_internal(worker); } static void ucp_worker_uct_iface_close(ucp_worker_iface_t *wiface) { if (wiface->iface != NULL) { uct_iface_close(wiface->iface); wiface->iface = NULL; } } static int ucp_worker_iface_find_better(ucp_worker_h worker, ucp_worker_iface_t *wiface, ucp_rsc_index_t *better_index) { ucp_context_h ctx = worker->context; ucp_rsc_index_t rsc_index; ucp_worker_iface_t *if_iter; uint64_t test_flags; double latency_iter, latency_cur, bw_cur; ucs_assert(wiface != NULL); latency_cur = ucp_worker_iface_latency(worker, wiface); bw_cur = ucp_tl_iface_bandwidth(ctx, &wiface->attr.bandwidth); test_flags = wiface->attr.cap.flags & ~(UCT_IFACE_FLAG_CONNECT_TO_IFACE | UCT_IFACE_FLAG_CONNECT_TO_EP); for (rsc_index = 0; rsc_index < ctx->num_tls; ++rsc_index) { if_iter = worker->ifaces[rsc_index]; /* Need to check resources which belong to the same device only */ if ((ctx->tl_rscs[rsc_index].dev_index != ctx->tl_rscs[wiface->rsc_index].dev_index) || (if_iter->flags & UCP_WORKER_IFACE_FLAG_UNUSED) || (rsc_index == wiface->rsc_index)) { continue; } latency_iter = ucp_worker_iface_latency(worker, if_iter); /* Check that another iface: */ if (/* 1. Supports all capabilities of the target iface (at least), * except ...CONNECT_TO... caps. */ ucs_test_all_flags(if_iter->attr.cap.flags, test_flags) && /* 2. Has the same or better performance characteristics */ (if_iter->attr.overhead <= wiface->attr.overhead) && (ucp_tl_iface_bandwidth(ctx, &if_iter->attr.bandwidth) >= bw_cur) && /* swap latencies in args list since less is better */ (ucp_score_prio_cmp(latency_cur, if_iter->attr.priority, latency_iter, wiface->attr.priority) >= 0) && /* 3. The found transport is scalable enough or both * transport are unscalable */ (ucp_is_scalable_transport(ctx, if_iter->attr.max_num_eps) || !ucp_is_scalable_transport(ctx, wiface->attr.max_num_eps))) { *better_index = rsc_index; /* Do not check this iface anymore, because better one exists. * It helps to avoid the case when two interfaces with the same * caps and performance exclude each other. */ wiface->flags |= UCP_WORKER_IFACE_FLAG_UNUSED; return 1; } } /* Better resource wasn't found */ *better_index = 0; return 0; } /** * @brief Find the minimal possible set of tl interfaces for each device * * @param [in] worker UCP worker. * @param [out] tl_bitmap Map of the relevant tl resources. * * @return Error code as defined by @ref ucs_status_t */ static void ucp_worker_select_best_ifaces(ucp_worker_h worker, uint64_t *tl_bitmap_p) { ucp_context_h context = worker->context; uint64_t tl_bitmap = 0; ucp_rsc_index_t repl_ifaces[UCP_MAX_RESOURCES]; ucp_worker_iface_t *wiface; ucp_rsc_index_t tl_id, iface_id; /* For each iface check whether there is another iface, which: * 1. Supports at least the same capabilities * 2. Provides equivalent or better performance */ for (tl_id = 0; tl_id < context->num_tls; ++tl_id) { wiface = worker->ifaces[tl_id]; if (!ucp_worker_iface_find_better(worker, wiface, &repl_ifaces[tl_id])) { tl_bitmap |= UCS_BIT(tl_id); } } *tl_bitmap_p = tl_bitmap; worker->num_ifaces = ucs_popcount(tl_bitmap); ucs_assert(worker->num_ifaces <= context->num_tls); if (worker->num_ifaces == context->num_tls) { return; } ucs_assert(worker->num_ifaces < context->num_tls); /* Some ifaces need to be closed */ for (tl_id = 0, iface_id = 0; tl_id < context->num_tls; ++tl_id) { wiface = worker->ifaces[tl_id]; if (tl_bitmap & UCS_BIT(tl_id)) { if (iface_id != tl_id) { worker->ifaces[iface_id] = wiface; } ++iface_id; } else { ucs_debug("closing resource[%d] "UCT_TL_RESOURCE_DESC_FMT ", since resource[%d] "UCT_TL_RESOURCE_DESC_FMT " is better, worker %p", tl_id, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[tl_id].tl_rsc), repl_ifaces[tl_id], UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[repl_ifaces[tl_id]].tl_rsc), worker); /* Ifaces should not be initialized yet, just close it * (no need for cleanup) */ ucp_worker_uct_iface_close(wiface); ucs_free(wiface); } } } /** * @brief Open all resources as interfaces on this worker * * This routine opens interfaces on the tl resources according to the * bitmap in the context. If bitmap is not set, the routine opens interfaces * on all available resources and select the best ones. Then it caches obtained * bitmap on the context, so the next workers could use it instead of * constructing it themselves. * * @param [in] worker UCP worker. * * @return Error code as defined by @ref ucs_status_t */ static ucs_status_t ucp_worker_add_resource_ifaces(ucp_worker_h worker) { ucp_context_h context = worker->context; ucp_tl_resource_desc_t *resource; uct_iface_params_t iface_params; ucp_rsc_index_t tl_id, iface_id; ucp_worker_iface_t *wiface; uint64_t ctx_tl_bitmap, tl_bitmap; unsigned num_ifaces; ucs_status_t status; /* If tl_bitmap is already set, just use it. Otherwise open ifaces on all * available resources and then select the best ones. */ ctx_tl_bitmap = context->tl_bitmap; if (ctx_tl_bitmap) { num_ifaces = ucs_popcount(ctx_tl_bitmap); tl_bitmap = ctx_tl_bitmap; } else { num_ifaces = context->num_tls; tl_bitmap = UCS_MASK(context->num_tls); } worker->ifaces = ucs_calloc(num_ifaces, sizeof(*worker->ifaces), "ucp ifaces array"); if (worker->ifaces == NULL) { ucs_error("failed to allocate worker ifaces"); return UCS_ERR_NO_MEMORY; } worker->num_ifaces = num_ifaces; iface_id = 0; ucs_for_each_bit(tl_id, tl_bitmap) { iface_params.field_mask = UCT_IFACE_PARAM_FIELD_OPEN_MODE; resource = &context->tl_rscs[tl_id]; if (resource->flags & UCP_TL_RSC_FLAG_SOCKADDR) { iface_params.open_mode = UCT_IFACE_OPEN_MODE_SOCKADDR_CLIENT; } else { iface_params.open_mode = UCT_IFACE_OPEN_MODE_DEVICE; iface_params.field_mask |= UCT_IFACE_PARAM_FIELD_DEVICE; iface_params.mode.device.tl_name = resource->tl_rsc.tl_name; iface_params.mode.device.dev_name = resource->tl_rsc.dev_name; } status = ucp_worker_iface_open(worker, tl_id, &iface_params, &worker->ifaces[iface_id++]); if (status != UCS_OK) { return status; } } if (!ctx_tl_bitmap) { /* Context bitmap is not set, need to select the best tl resources */ tl_bitmap = 0; ucp_worker_select_best_ifaces(worker, &tl_bitmap); ucs_assert(tl_bitmap); /* Cache tl_bitmap on the context, so the next workers would not need * to select best ifaces. */ context->tl_bitmap = tl_bitmap; ucs_debug("selected tl bitmap: 0x%lx (%d tls)", tl_bitmap, ucs_popcount(tl_bitmap)); } worker->scalable_tl_bitmap = 0; ucs_for_each_bit(tl_id, context->tl_bitmap) { wiface = ucp_worker_iface(worker, tl_id); if (ucp_is_scalable_transport(context, wiface->attr.max_num_eps)) { worker->scalable_tl_bitmap |= UCS_BIT(tl_id); } } ucs_debug("selected scalable tl bitmap: 0x%lx (%d tls)", worker->scalable_tl_bitmap, ucs_popcount(worker->scalable_tl_bitmap)); iface_id = 0; ucs_for_each_bit(tl_id, tl_bitmap) { status = ucp_worker_iface_init(worker, tl_id, worker->ifaces[iface_id++]); if (status != UCS_OK) { return status; } } return UCS_OK; } static void ucp_worker_close_ifaces(ucp_worker_h worker) { ucp_rsc_index_t iface_id; ucp_worker_iface_t *wiface; UCS_ASYNC_BLOCK(&worker->async); for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; if (wiface != NULL) { ucp_worker_iface_cleanup(wiface); } } ucs_free(worker->ifaces); UCS_ASYNC_UNBLOCK(&worker->async); } ucs_status_t ucp_worker_iface_open(ucp_worker_h worker, ucp_rsc_index_t tl_id, uct_iface_params_t *iface_params, ucp_worker_iface_t **wiface_p) { ucp_context_h context = worker->context; ucp_tl_resource_desc_t *resource = &context->tl_rscs[tl_id]; uct_md_h md = context->tl_mds[resource->md_index].md; uct_iface_config_t *iface_config; const char *cfg_tl_name; ucp_worker_iface_t *wiface; ucs_status_t status; wiface = ucs_calloc(1, sizeof(*wiface), "ucp_iface"); if (wiface == NULL) { return UCS_ERR_NO_MEMORY; } wiface->rsc_index = tl_id; wiface->worker = worker; wiface->event_fd = -1; wiface->activate_count = 0; wiface->check_events_id = UCS_CALLBACKQ_ID_NULL; wiface->proxy_recv_count = 0; wiface->post_count = 0; wiface->flags = 0; /* Read interface or md configuration */ if (resource->flags & UCP_TL_RSC_FLAG_SOCKADDR) { cfg_tl_name = NULL; } else { cfg_tl_name = resource->tl_rsc.tl_name; } status = uct_md_iface_config_read(md, cfg_tl_name, NULL, NULL, &iface_config); if (status != UCS_OK) { goto err_free_iface; } UCS_STATIC_ASSERT(UCP_WORKER_HEADROOM_PRIV_SIZE >= sizeof(ucp_eager_sync_hdr_t)); /* Fill rest of uct_iface params (caller should fill specific mode fields) */ iface_params->field_mask |= UCT_IFACE_PARAM_FIELD_STATS_ROOT | UCT_IFACE_PARAM_FIELD_RX_HEADROOM | UCT_IFACE_PARAM_FIELD_ERR_HANDLER_ARG | UCT_IFACE_PARAM_FIELD_ERR_HANDLER | UCT_IFACE_PARAM_FIELD_ERR_HANDLER_FLAGS | UCT_IFACE_PARAM_FIELD_HW_TM_EAGER_ARG | UCT_IFACE_PARAM_FIELD_HW_TM_RNDV_ARG | UCT_IFACE_PARAM_FIELD_HW_TM_RNDV_CB | UCT_IFACE_PARAM_FIELD_HW_TM_EAGER_CB | UCT_IFACE_PARAM_FIELD_CPU_MASK; iface_params->stats_root = UCS_STATS_RVAL(worker->stats); iface_params->rx_headroom = UCP_WORKER_HEADROOM_SIZE; iface_params->err_handler_arg = worker; iface_params->err_handler = ucp_worker_iface_error_handler; iface_params->err_handler_flags = UCT_CB_FLAG_ASYNC; iface_params->eager_arg = iface_params->rndv_arg = wiface; iface_params->eager_cb = ucp_tag_offload_unexp_eager; iface_params->rndv_cb = ucp_tag_offload_unexp_rndv; iface_params->cpu_mask = worker->cpu_mask; /* Open UCT interface */ status = uct_iface_open(md, worker->uct, iface_params, iface_config, &wiface->iface); uct_config_release(iface_config); if (status != UCS_OK) { goto err_free_iface; } VALGRIND_MAKE_MEM_UNDEFINED(&wiface->attr, sizeof(wiface->attr)); status = uct_iface_query(wiface->iface, &wiface->attr); if (status != UCS_OK) { goto err_close_iface; } ucs_debug("created interface[%d]=%p using "UCT_TL_RESOURCE_DESC_FMT" on worker %p", tl_id, wiface->iface, UCT_TL_RESOURCE_DESC_ARG(&resource->tl_rsc), worker); *wiface_p = wiface; return UCS_OK; err_close_iface: uct_iface_close(wiface->iface); err_free_iface: ucs_free(wiface); return status; } ucs_status_t ucp_worker_iface_init(ucp_worker_h worker, ucp_rsc_index_t tl_id, ucp_worker_iface_t *wiface) { ucp_context_h context = worker->context; ucp_tl_resource_desc_t *resource = &context->tl_rscs[tl_id]; ucs_status_t status; ucs_assert(wiface != NULL); /* Set wake-up handlers */ if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) { status = uct_iface_event_fd_get(wiface->iface, &wiface->event_fd); if (status != UCS_OK) { goto out_close_iface; } /* Register event handler without actual events so we could modify it later. */ status = ucs_async_set_event_handler(worker->async.mode, wiface->event_fd, 0, ucp_worker_iface_event, wiface, &worker->async); if (status != UCS_OK) { ucs_fatal("failed to register event handler: %s", ucs_status_string(status)); } } /* Set active message handlers */ if ((wiface->attr.cap.flags & (UCT_IFACE_FLAG_AM_SHORT| UCT_IFACE_FLAG_AM_BCOPY| UCT_IFACE_FLAG_AM_ZCOPY))) { status = uct_iface_set_am_tracer(wiface->iface, ucp_worker_am_tracer, worker); if (status != UCS_OK) { goto out_close_iface; } if (context->config.ext.adaptive_progress && (wiface->attr.cap.flags & UCP_WORKER_UCT_RECV_EVENT_CAP_FLAGS)) { ucp_worker_iface_deactivate(wiface, 1); } else { ucp_worker_iface_activate(wiface, 0); } } context->mem_type_access_tls[context->tl_mds[resource->md_index]. attr.cap.access_mem_type] |= UCS_BIT(tl_id); return UCS_OK; out_close_iface: ucp_worker_uct_iface_close(wiface); return status; } void ucp_worker_iface_cleanup(ucp_worker_iface_t *wiface) { ucs_status_t status; uct_worker_progress_unregister_safe(wiface->worker->uct, &wiface->check_events_id); ucp_worker_iface_disarm(wiface); if ((wiface->event_fd != -1) && (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS)) { status = ucs_async_remove_handler(wiface->event_fd, 1); if (status != UCS_OK) { ucs_warn("failed to remove event handler for fd %d: %s", wiface->event_fd, ucs_status_string(status)); } } ucp_worker_uct_iface_close(wiface); ucs_free(wiface); } static void ucp_worker_close_cms(ucp_worker_h worker) { const ucp_rsc_index_t num_cms = ucp_worker_num_cm_cmpts(worker); ucp_rsc_index_t i; for (i = 0; (i < num_cms) && (worker->cms[i].cm != NULL); ++i) { uct_cm_close(worker->cms[i].cm); } ucs_free(worker->cms); worker->cms = NULL; } static ucs_status_t ucp_worker_add_resource_cms(ucp_worker_h worker) { ucp_context_h context = worker->context; uct_cm_config_t *cm_config; uct_component_h cmpt; ucp_rsc_index_t cmpt_index, cm_cmpt_index, i; ucs_status_t status; if (!ucp_worker_sockaddr_is_cm_proto(worker)) { worker->cms = NULL; return UCS_OK; } UCS_ASYNC_BLOCK(&worker->async); worker->cms = ucs_calloc(ucp_worker_num_cm_cmpts(worker), sizeof(*worker->cms), "ucp cms"); if (worker->cms == NULL) { ucs_error("can't allocate CMs array"); status = UCS_ERR_NO_MEMORY; goto out; } for (i = 0, cm_cmpt_index = 0; cm_cmpt_index < context->config.num_cm_cmpts; ++cm_cmpt_index) { cmpt_index = context->config.cm_cmpt_idxs[cm_cmpt_index]; cmpt = context->tl_cmpts[cmpt_index].cmpt; status = uct_cm_config_read(cmpt, NULL, NULL, &cm_config); if (status != UCS_OK) { ucs_error("failed to read cm configuration on component %s", context->tl_cmpts[cmpt_index].attr.name); goto err_free_cms; } status = uct_cm_open(cmpt, worker->uct, cm_config, &worker->cms[i].cm); if (status != UCS_OK) { ucs_error("failed to open CM on component %s with status %s", context->tl_cmpts[cmpt_index].attr.name, ucs_status_string(status)); goto err_free_cms; } uct_config_release(cm_config); worker->cms[i++].cmpt_idx = cmpt_index; } status = UCS_OK; goto out; err_free_cms: ucp_worker_close_cms(worker); out: UCS_ASYNC_UNBLOCK(&worker->async); return status; } static void ucp_worker_enable_atomic_tl(ucp_worker_h worker, const char *mode, ucp_rsc_index_t rsc_index) { ucs_assert(rsc_index != UCP_NULL_RESOURCE); ucs_trace("worker %p: using %s atomics on iface[%d]=" UCT_TL_RESOURCE_DESC_FMT, worker, mode, rsc_index, UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[rsc_index].tl_rsc)); worker->atomic_tls |= UCS_BIT(rsc_index); } static void ucp_worker_init_cpu_atomics(ucp_worker_h worker) { ucp_rsc_index_t iface_id; ucp_worker_iface_t *wiface; ucs_debug("worker %p: using cpu atomics", worker); /* Enable all interfaces which have host-based atomics */ for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; if (wiface->attr.cap.flags & UCT_IFACE_FLAG_ATOMIC_CPU) { ucp_worker_enable_atomic_tl(worker, "cpu", wiface->rsc_index); } } } static void ucp_worker_init_device_atomics(ucp_worker_h worker) { ucp_context_h context = worker->context; ucp_address_iface_attr_t dummy_iface_attr; ucp_tl_resource_desc_t *rsc, *best_rsc; uct_iface_attr_t *iface_attr; ucp_rsc_index_t rsc_index; ucp_rsc_index_t iface_id; uint64_t iface_cap_flags; double score, best_score; ucp_rsc_index_t md_index; ucp_worker_iface_t *wiface; uct_md_attr_t *md_attr; uint64_t supp_tls; uint8_t priority, best_priority; ucp_tl_iface_atomic_flags_t atomic; ucp_context_uct_atomic_iface_flags(context, &atomic); iface_cap_flags = UCT_IFACE_FLAG_ATOMIC_DEVICE; dummy_iface_attr.bandwidth.dedicated = 1e12; dummy_iface_attr.bandwidth.shared = 0; dummy_iface_attr.cap_flags = UINT64_MAX; dummy_iface_attr.overhead = 0; dummy_iface_attr.priority = 0; dummy_iface_attr.lat_ovh = 0; supp_tls = 0; best_score = -1; best_rsc = NULL; best_priority = 0; /* Select best interface for atomics device */ for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; rsc_index = wiface->rsc_index; rsc = &context->tl_rscs[rsc_index]; md_index = rsc->md_index; md_attr = &context->tl_mds[md_index].attr; iface_attr = &wiface->attr; if (!(md_attr->cap.flags & UCT_MD_FLAG_REG) || !ucs_test_all_flags(iface_attr->cap.flags, iface_cap_flags) || !ucs_test_all_flags(iface_attr->cap.atomic32.op_flags, atomic.atomic32.op_flags) || !ucs_test_all_flags(iface_attr->cap.atomic32.fop_flags, atomic.atomic32.fop_flags) || !ucs_test_all_flags(iface_attr->cap.atomic64.op_flags, atomic.atomic64.op_flags) || !ucs_test_all_flags(iface_attr->cap.atomic64.fop_flags, atomic.atomic64.fop_flags)) { continue; } supp_tls |= UCS_BIT(rsc_index); priority = iface_attr->priority; score = ucp_wireup_amo_score_func(context, md_attr, iface_attr, &dummy_iface_attr); if (ucp_is_scalable_transport(worker->context, iface_attr->max_num_eps) && ((score > best_score) || ((score == best_score) && (priority > best_priority)))) { best_rsc = rsc; best_score = score; best_priority = priority; } } if (best_rsc == NULL) { ucs_debug("worker %p: no support for atomics", worker); return; } ucs_debug("worker %p: using device atomics", worker); /* Enable atomics on all resources using same device as the "best" resource */ ucs_for_each_bit(rsc_index, context->tl_bitmap) { rsc = &context->tl_rscs[rsc_index]; if ((supp_tls & UCS_BIT(rsc_index)) && (rsc->md_index == best_rsc->md_index) && !strncmp(rsc->tl_rsc.dev_name, best_rsc->tl_rsc.dev_name, UCT_DEVICE_NAME_MAX)) { ucp_worker_enable_atomic_tl(worker, "device", rsc_index); } } } static void ucp_worker_init_guess_atomics(ucp_worker_h worker) { uint64_t accumulated_flags = 0; ucp_rsc_index_t iface_id; for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { if (ucp_is_scalable_transport(worker->context, worker->ifaces[iface_id]->attr.max_num_eps)) { accumulated_flags |= worker->ifaces[iface_id]->attr.cap.flags; } } if (accumulated_flags & UCT_IFACE_FLAG_ATOMIC_DEVICE) { ucp_worker_init_device_atomics(worker); } else { ucp_worker_init_cpu_atomics(worker); } } static void ucp_worker_init_atomic_tls(ucp_worker_h worker) { ucp_context_h context = worker->context; worker->atomic_tls = 0; if (context->config.features & UCP_FEATURE_AMO) { switch(context->config.ext.atomic_mode) { case UCP_ATOMIC_MODE_CPU: ucp_worker_init_cpu_atomics(worker); break; case UCP_ATOMIC_MODE_DEVICE: ucp_worker_init_device_atomics(worker); break; case UCP_ATOMIC_MODE_GUESS: ucp_worker_init_guess_atomics(worker); break; default: ucs_fatal("unsupported atomic mode: %d", context->config.ext.atomic_mode); } } } static char* ucp_worker_add_feature_rsc(ucp_context_h context, const ucp_ep_config_key_t *key, ucp_lane_map_t lanes_bitmap, const char *feature_str, char *buf, size_t max) { char *p = buf; char *endp = buf + max; int sep = 0; ucp_rsc_index_t rsc_idx; ucp_lane_index_t lane; if (!lanes_bitmap) { return p; } snprintf(p, endp - p, "%s(", feature_str); p += strlen(p); ucs_for_each_bit(lane, lanes_bitmap) { ucs_assert(lane < UCP_MAX_LANES); /* make coverity happy */ rsc_idx = key->lanes[lane].rsc_index; snprintf(p, endp - p, "%*s"UCT_TL_RESOURCE_DESC_FMT, sep, "", UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_idx].tl_rsc)); p += strlen(p); sep = 1; /* add space between tl names */ } snprintf(p, endp - p, "); "); p += strlen(p); return p; } static void ucp_worker_print_used_tls(const ucp_ep_config_key_t *key, ucp_context_h context, ucp_ep_cfg_index_t config_idx) { char info[256] = {0}; ucp_lane_map_t tag_lanes_map = 0; ucp_lane_map_t rma_lanes_map = 0; ucp_lane_map_t amo_lanes_map = 0; ucp_lane_map_t stream_lanes_map = 0; ucp_lane_index_t lane; char *p, *endp; if (!ucs_log_is_enabled(UCS_LOG_LEVEL_INFO)) { return; } p = info; endp = p + sizeof(info); snprintf(p, endp - p, "ep_cfg[%d]: ", config_idx); p += strlen(p); for (lane = 0; lane < key->num_lanes; ++lane) { if (((key->am_lane == lane) || (lane == key->tag_lane) || (ucp_ep_config_get_multi_lane_prio(key->am_bw_lanes, lane) >= 0) || (ucp_ep_config_get_multi_lane_prio(key->rma_bw_lanes, lane) >= 0)) && (context->config.features & UCP_FEATURE_TAG)) { tag_lanes_map |= UCS_BIT(lane); } if ((key->am_lane == lane) && (context->config.features & UCP_FEATURE_STREAM)) { stream_lanes_map |= UCS_BIT(lane); } if ((ucp_ep_config_get_multi_lane_prio(key->rma_lanes, lane) >= 0)) { rma_lanes_map |= UCS_BIT(lane); } if ((ucp_ep_config_get_multi_lane_prio(key->amo_lanes, lane) >= 0)) { amo_lanes_map |= UCS_BIT(lane); } } p = ucp_worker_add_feature_rsc(context, key, tag_lanes_map, "tag", p, endp - p); p = ucp_worker_add_feature_rsc(context, key, rma_lanes_map, "rma", p, endp - p); p = ucp_worker_add_feature_rsc(context, key, amo_lanes_map, "amo", p, endp - p); ucp_worker_add_feature_rsc(context, key, stream_lanes_map, "stream", p, endp - p); ucs_info("%s", info); } static ucs_status_t ucp_worker_init_mpools(ucp_worker_h worker) { size_t max_mp_entry_size = 0; ucp_context_t *context = worker->context; uct_iface_attr_t *if_attr; ucp_rsc_index_t iface_id; ucs_status_t status; for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { if_attr = &worker->ifaces[iface_id]->attr; max_mp_entry_size = ucs_max(max_mp_entry_size, if_attr->cap.am.max_short); max_mp_entry_size = ucs_max(max_mp_entry_size, if_attr->cap.am.max_bcopy); max_mp_entry_size = ucs_max(max_mp_entry_size, if_attr->cap.am.max_zcopy); } status = ucs_mpool_init(&worker->am_mp, 0, max_mp_entry_size + UCP_WORKER_HEADROOM_SIZE, 0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX, &ucp_am_mpool_ops, "ucp_am_bufs"); if (status != UCS_OK) { goto out; } status = ucs_mpool_init(&worker->reg_mp, 0, context->config.ext.seg_size + sizeof(ucp_mem_desc_t), sizeof(ucp_mem_desc_t), UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX, &ucp_reg_mpool_ops, "ucp_reg_bufs"); if (status != UCS_OK) { goto err_release_am_mpool; } status = ucs_mpool_init(&worker->rndv_frag_mp, 0, context->config.ext.rndv_frag_size + sizeof(ucp_mem_desc_t), sizeof(ucp_mem_desc_t), UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX, &ucp_frag_mpool_ops, "ucp_rndv_frags"); if (status != UCS_OK) { goto err_release_reg_mpool; } return UCS_OK; err_release_reg_mpool: ucs_mpool_cleanup(&worker->reg_mp, 0); err_release_am_mpool: ucs_mpool_cleanup(&worker->am_mp, 0); out: return status; } /* All the ucp endpoints will share the configurations. No need for every ep to * have it's own configuration (to save memory footprint). Same config can be used * by different eps. * A 'key' identifies an entry in the ep_config array. An entry holds the key and * additional configuration parameters and thresholds. */ ucs_status_t ucp_worker_get_ep_config(ucp_worker_h worker, const ucp_ep_config_key_t *key, int print_cfg, ucp_ep_cfg_index_t *config_idx_p) { ucp_ep_cfg_index_t config_idx; ucs_status_t status; /* Search for the given key in the ep_config array */ for (config_idx = 0; config_idx < worker->ep_config_count; ++config_idx) { if (ucp_ep_config_is_equal(&worker->ep_config[config_idx].key, key)) { goto out; } } if (worker->ep_config_count >= worker->ep_config_max) { /* TODO support larger number of configurations */ ucs_fatal("too many ep configurations: %d", worker->ep_config_count); } /* Create new configuration */ config_idx = worker->ep_config_count++; status = ucp_ep_config_init(worker, &worker->ep_config[config_idx], key); if (status != UCS_OK) { return status; } if (print_cfg) { ucp_worker_print_used_tls(key, worker->context, config_idx); } out: *config_idx_p = config_idx; return UCS_OK; } static ucs_mpool_ops_t ucp_rkey_mpool_ops = { .chunk_alloc = ucs_mpool_chunk_malloc, .chunk_release = ucs_mpool_chunk_free, .obj_init = NULL, .obj_cleanup = NULL }; ucs_status_t ucp_worker_create(ucp_context_h context, const ucp_worker_params_t *params, ucp_worker_h *worker_p) { ucs_thread_mode_t uct_thread_mode; unsigned config_count; unsigned name_length; ucp_worker_h worker; ucs_status_t status; config_count = ucs_min((context->num_tls + 1) * (context->num_tls + 1) * context->num_tls, UINT8_MAX); worker = ucs_calloc(1, sizeof(*worker) + sizeof(*worker->ep_config) * config_count, "ucp worker"); if (worker == NULL) { return UCS_ERR_NO_MEMORY; } uct_thread_mode = UCS_THREAD_MODE_SINGLE; worker->flags = 0; if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) { #if ENABLE_MT if (params->thread_mode != UCS_THREAD_MODE_SINGLE) { /* UCT is serialized by UCP lock or by UCP user */ uct_thread_mode = UCS_THREAD_MODE_SERIALIZED; } if (params->thread_mode == UCS_THREAD_MODE_MULTI) { worker->flags |= UCP_WORKER_FLAG_MT; } #else if (params->thread_mode != UCS_THREAD_MODE_SINGLE) { ucs_debug("forced single thread mode on worker create"); } #endif } worker->context = context; worker->uuid = ucs_generate_uuid((uintptr_t)worker); worker->flush_ops_count = 0; worker->inprogress = 0; worker->ep_config_max = config_count; worker->ep_config_count = 0; worker->num_active_ifaces = 0; worker->num_ifaces = 0; worker->am_message_id = ucs_generate_uuid(0); ucs_list_head_init(&worker->arm_ifaces); ucs_list_head_init(&worker->stream_ready_eps); ucs_list_head_init(&worker->all_eps); ucp_ep_match_init(&worker->ep_match_ctx); UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t)); if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) { UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_proto_t) <= sizeof(ucp_ep_t)); ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 3); } else { ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 2); } if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) { worker->user_data = params->user_data; } else { worker->user_data = NULL; } if (context->config.features & UCP_FEATURE_AM){ worker->am_cbs = NULL; worker->am_cb_array_len = 0; } name_length = ucs_min(UCP_WORKER_NAME_MAX, context->config.ext.max_worker_name + 1); ucs_snprintf_zero(worker->name, name_length, "%s:%d", ucs_get_host_name(), getpid()); /* Create statistics */ status = UCS_STATS_NODE_ALLOC(&worker->stats, &ucp_worker_stats_class, ucs_stats_get_root(), "-%p", worker); if (status != UCS_OK) { goto err_free; } status = UCS_STATS_NODE_ALLOC(&worker->tm_offload_stats, &ucp_worker_tm_offload_stats_class, worker->stats); if (status != UCS_OK) { goto err_free_stats; } status = ucs_async_context_init(&worker->async, context->config.ext.use_mt_mutex ? UCS_ASYNC_MODE_THREAD_MUTEX : UCS_ASYNC_THREAD_LOCK_TYPE); if (status != UCS_OK) { goto err_free_tm_offload_stats; } /* Create the underlying UCT worker */ status = uct_worker_create(&worker->async, uct_thread_mode, &worker->uct); if (status != UCS_OK) { goto err_destroy_async; } /* Create memory pool for requests */ status = ucs_mpool_init(&worker->req_mp, 0, sizeof(ucp_request_t) + context->config.request.size, 0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX, &ucp_request_mpool_ops, "ucp_requests"); if (status != UCS_OK) { goto err_destroy_uct_worker; } /* create memory pool for small rkeys */ status = ucs_mpool_init(&worker->rkey_mp, 0, sizeof(ucp_rkey_t) + sizeof(ucp_tl_rkey_t) * UCP_RKEY_MPOOL_MAX_MD, 0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX, &ucp_rkey_mpool_ops, "ucp_rkeys"); if (status != UCS_OK) { goto err_req_mp_cleanup; } /* Create UCS event set which combines events from all transports */ status = ucp_worker_wakeup_init(worker, params); if (status != UCS_OK) { goto err_rkey_mp_cleanup; } if (params->field_mask & UCP_WORKER_PARAM_FIELD_CPU_MASK) { worker->cpu_mask = params->cpu_mask; } else { UCS_CPU_ZERO(&worker->cpu_mask); } /* Initialize tag matching */ status = ucp_tag_match_init(&worker->tm); if (status != UCS_OK) { goto err_wakeup_cleanup; } /* Open all resources as interfaces on this worker */ status = ucp_worker_add_resource_ifaces(worker); if (status != UCS_OK) { goto err_close_ifaces; } /* Open all resources as connection managers on this worker */ status = ucp_worker_add_resource_cms(worker); if (status != UCS_OK) { goto err_close_cms; } /* create mem type endponts */ status = ucp_worker_create_mem_type_endpoints(worker); if (status != UCS_OK) { goto err_close_cms; } /* Init AM and registered memory pools */ status = ucp_worker_init_mpools(worker); if (status != UCS_OK) { goto err_close_cms; } /* Select atomic resources */ ucp_worker_init_atomic_tls(worker); /* At this point all UCT memory domains and interfaces are already created * so warn about unused environment variables. */ ucs_config_parser_warn_unused_env_vars_once(); *worker_p = worker; return UCS_OK; err_close_cms: ucp_worker_close_cms(worker); err_close_ifaces: ucp_worker_close_ifaces(worker); ucp_tag_match_cleanup(&worker->tm); err_wakeup_cleanup: ucp_worker_wakeup_cleanup(worker); err_rkey_mp_cleanup: ucs_mpool_cleanup(&worker->rkey_mp, 1); err_req_mp_cleanup: ucs_mpool_cleanup(&worker->req_mp, 1); err_destroy_uct_worker: uct_worker_destroy(worker->uct); err_destroy_async: ucs_async_context_cleanup(&worker->async); err_free_tm_offload_stats: UCS_STATS_NODE_FREE(worker->tm_offload_stats); err_free_stats: UCS_STATS_NODE_FREE(worker->stats); err_free: ucs_strided_alloc_cleanup(&worker->ep_alloc); ucs_free(worker); return status; } static void ucp_worker_destroy_eps(ucp_worker_h worker) { ucp_ep_ext_gen_t *ep_ext, *tmp; ucs_debug("worker %p: destroy all endpoints", worker); ucs_list_for_each_safe(ep_ext, tmp, &worker->all_eps, ep_list) { ucp_ep_disconnected(ucp_ep_from_ext_gen(ep_ext), 1); } } static void ucp_worker_destroy_ep_configs(ucp_worker_h worker) { unsigned i; for (i = 0; i < worker->ep_config_count; ++i) { ucp_ep_config_cleanup(worker, &worker->ep_config[i]); } worker->ep_config_count = 0; } void ucp_worker_destroy(ucp_worker_h worker) { ucs_trace_func("worker=%p", worker); UCS_ASYNC_BLOCK(&worker->async); ucs_free(worker->am_cbs); ucp_worker_destroy_eps(worker); ucp_worker_remove_am_handlers(worker); ucp_worker_close_cms(worker); UCS_ASYNC_UNBLOCK(&worker->async); ucp_worker_destroy_ep_configs(worker); ucs_mpool_cleanup(&worker->am_mp, 1); ucs_mpool_cleanup(&worker->reg_mp, 1); ucs_mpool_cleanup(&worker->rndv_frag_mp, 1); ucp_worker_close_ifaces(worker); ucp_tag_match_cleanup(&worker->tm); ucp_worker_wakeup_cleanup(worker); ucs_mpool_cleanup(&worker->rkey_mp, 1); ucs_mpool_cleanup(&worker->req_mp, 1); uct_worker_destroy(worker->uct); ucs_async_context_cleanup(&worker->async); ucp_ep_match_cleanup(&worker->ep_match_ctx); ucs_strided_alloc_cleanup(&worker->ep_alloc); UCS_STATS_NODE_FREE(worker->tm_offload_stats); UCS_STATS_NODE_FREE(worker->stats); ucs_free(worker); } ucs_status_t ucp_worker_query(ucp_worker_h worker, ucp_worker_attr_t *attr) { ucp_context_h context = worker->context; ucs_status_t status = UCS_OK; uint64_t tl_bitmap; ucp_rsc_index_t tl_id; if (attr->field_mask & UCP_WORKER_ATTR_FIELD_THREAD_MODE) { if (worker->flags & UCP_WORKER_FLAG_MT) { attr->thread_mode = UCS_THREAD_MODE_MULTI; } else { attr->thread_mode = UCS_THREAD_MODE_SINGLE; } } if (attr->field_mask & UCP_WORKER_ATTR_FIELD_ADDRESS) { /* If UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS is not set, * pack all tl adresses */ tl_bitmap = UINT64_MAX; if (attr->field_mask & UCP_WORKER_ATTR_FIELD_ADDRESS_FLAGS) { if (attr->address_flags & UCP_WORKER_ADDRESS_FLAG_NET_ONLY) { tl_bitmap = 0; ucs_for_each_bit(tl_id, context->tl_bitmap) { if (context->tl_rscs[tl_id].tl_rsc.dev_type == UCT_DEVICE_TYPE_NET) { tl_bitmap |= UCS_BIT(tl_id); } } } } status = ucp_address_pack(worker, NULL, tl_bitmap, UCP_ADDRESS_PACK_FLAG_ALL, NULL, &attr->address_length, (void**)&attr->address); } return status; } unsigned ucp_worker_progress(ucp_worker_h worker) { unsigned count; /* worker->inprogress is used only for assertion check. * coverity[assert_side_effect] */ UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); /* check that ucp_worker_progress is not called from within ucp_worker_progress */ ucs_assert(worker->inprogress++ == 0); count = uct_worker_progress(worker->uct); ucs_async_check_miss(&worker->async); /* coverity[assert_side_effect] */ ucs_assert(--worker->inprogress == 0); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return count; } ssize_t ucp_stream_worker_poll(ucp_worker_h worker, ucp_stream_poll_ep_t *poll_eps, size_t max_eps, unsigned flags) { ssize_t count = 0; ucp_ep_ext_proto_t *ep_ext; ucp_ep_h ep; UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_STREAM, return UCS_ERR_INVALID_PARAM); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); while ((count < max_eps) && !ucs_list_is_empty(&worker->stream_ready_eps)) { ep_ext = ucp_stream_worker_dequeue_ep_head(worker); ep = ucp_ep_from_ext_proto(ep_ext); poll_eps[count].ep = ep; poll_eps[count].user_data = ucp_ep_ext_gen(ep)->user_data; ++count; } UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return count; } ucs_status_t ucp_worker_get_efd(ucp_worker_h worker, int *fd) { ucs_status_t status; UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, return UCS_ERR_INVALID_PARAM); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); if (worker->flags & UCP_WORKER_FLAG_EXTERNAL_EVENT_FD) { status = UCS_ERR_UNSUPPORTED; } else { *fd = worker->event_fd; status = UCS_OK; } UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return status; } ucs_status_t ucp_worker_arm(ucp_worker_h worker) { ucp_worker_iface_t *wiface; ucs_status_t status; uint64_t dummy; int ret; ucs_trace_func("worker=%p", worker); UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, return UCS_ERR_INVALID_PARAM); /* Read from event pipe. If some events are found, return BUSY, * Otherwise, continue to arm the transport interfaces. */ do { ret = read(worker->eventfd, &dummy, sizeof(dummy)); if (ret == sizeof(dummy)) { status = UCS_ERR_BUSY; goto out; } else if (ret == -1) { if (errno == EAGAIN) { break; /* No more events */ } else if (errno != EINTR) { ucs_error("Read from internal event fd failed: %m"); status = UCS_ERR_IO_ERROR; goto out; } } else { ucs_assert(ret == 0); } } while (ret != 0); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); /* Go over arm_list of active interfaces which support events and arm them */ ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) { ucs_assert(wiface->activate_count > 0); status = uct_iface_event_arm(wiface->iface, worker->uct_events); ucs_trace("arm iface %p returned %s", wiface->iface, ucs_status_string(status)); if (status != UCS_OK) { goto out_unlock; } } status = UCS_OK; out_unlock: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); out: ucs_trace("ucp_worker_arm returning %s", ucs_status_string(status)); return status; } void ucp_worker_wait_mem(ucp_worker_h worker, void *address) { ucs_arch_wait_mem(address); } ucs_status_t ucp_worker_wait(ucp_worker_h worker) { ucp_worker_iface_t *wiface; struct pollfd *pfd; ucs_status_t status; nfds_t nfds; int ret; ucs_trace_func("worker %p", worker); UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, return UCS_ERR_INVALID_PARAM); UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); status = ucp_worker_arm(worker); if (status == UCS_ERR_BUSY) { /* if UCS_ERR_BUSY returned - no poll() must called */ status = UCS_OK; goto out_unlock; } else if (status != UCS_OK) { goto out_unlock; } if (worker->flags & UCP_WORKER_FLAG_EXTERNAL_EVENT_FD) { pfd = ucs_alloca(sizeof(*pfd) * worker->context->num_tls); nfds = 0; ucs_list_for_each(wiface, &worker->arm_ifaces, arm_list) { pfd[nfds].fd = wiface->event_fd; pfd[nfds].events = POLLIN; ++nfds; } } else { pfd = ucs_alloca(sizeof(*pfd)); pfd->fd = worker->event_fd; pfd->events = POLLIN; nfds = 1; } UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); /* poll is thread safe system call, though can have unpredictable results * because of using the same descriptor in multiple threads. */ for (;;) { ret = poll(pfd, nfds, -1); if (ret >= 0) { ucs_assertv(ret == 1, "ret=%d", ret); status = UCS_OK; goto out; } else { if (errno != EINTR) { ucs_error("poll(nfds=%d) returned %d: %m", (int)nfds, ret); status = UCS_ERR_IO_ERROR; goto out; } } } out_unlock: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); out: return status; } ucs_status_t ucp_worker_signal(ucp_worker_h worker) { ucs_trace_func("worker %p", worker); UCP_CONTEXT_CHECK_FEATURE_FLAGS(worker->context, UCP_FEATURE_WAKEUP, return UCS_ERR_INVALID_PARAM); return ucp_worker_wakeup_signal_fd(worker); } ucs_status_t ucp_worker_get_address(ucp_worker_h worker, ucp_address_t **address_p, size_t *address_length_p) { ucs_status_t status; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); status = ucp_address_pack(worker, NULL, UINT64_MAX, UCP_ADDRESS_PACK_FLAG_ALL, NULL, address_length_p, (void**)address_p); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return status; } void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address) { ucs_free(address); } void ucp_worker_print_info(ucp_worker_h worker, FILE *stream) { ucp_context_h context = worker->context; ucp_address_t *address; size_t address_length; ucs_status_t status; ucp_rsc_index_t rsc_index; int first; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); fprintf(stream, "#\n"); fprintf(stream, "# UCP worker '%s'\n", ucp_worker_get_name(worker)); fprintf(stream, "#\n"); status = ucp_worker_get_address(worker, &address, &address_length); if (status == UCS_OK) { ucp_worker_release_address(worker, address); fprintf(stream, "# address: %zu bytes\n", address_length); } else { fprintf(stream, "# \n"); } if (context->config.features & UCP_FEATURE_AMO) { fprintf(stream, "# atomics: "); first = 1; for (rsc_index = 0; rsc_index < worker->context->num_tls; ++rsc_index) { if (worker->atomic_tls & UCS_BIT(rsc_index)) { if (!first) { fprintf(stream, ", "); } fprintf(stream, "%d:"UCT_TL_RESOURCE_DESC_FMT, rsc_index, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc)); first = 0; } } fprintf(stream, "\n"); } fprintf(stream, "#\n"); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); }