/** * Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #include #include "rma.inl" static void ucp_ep_flush_error(ucp_request_t *req, ucs_status_t status) { if (ucp_ep_config(req->send.ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER) { ucs_error("error during flush: %s", ucs_status_string(status)); } req->status = status; --req->send.state.uct_comp.count; } static int ucp_ep_flush_is_completed(ucp_request_t *req) { return (req->send.state.uct_comp.count == 0) && req->send.flush.sw_done; } static void ucp_ep_flush_progress(ucp_request_t *req) { ucp_ep_h ep = req->send.ep; ucp_ep_flush_state_t *flush_state; ucp_lane_index_t lane; ucs_status_t status; uct_ep_h uct_ep; ucs_trace("ep %p: progress flush req %p, lanes 0x%x count %d", ep, req, req->send.flush.lanes, req->send.state.uct_comp.count); while (req->send.flush.lanes) { /* Search for next lane to start flush */ lane = ucs_ffs64(req->send.flush.lanes); uct_ep = ep->uct_eps[lane]; if (uct_ep == NULL) { req->send.flush.lanes &= ~UCS_BIT(lane); --req->send.state.uct_comp.count; continue; } /* Start flush operation on UCT endpoint */ if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) { uct_ep_pending_purge(uct_ep, ucp_ep_err_pending_purge, UCS_STATUS_PTR(UCS_ERR_CANCELED)); } status = uct_ep_flush(uct_ep, req->send.flush.uct_flags, &req->send.state.uct_comp); ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, ucs_status_string(status)); if (status == UCS_OK) { req->send.flush.lanes &= ~UCS_BIT(lane); --req->send.state.uct_comp.count; ucs_trace("ep %p: flush comp %p count reduced to %d", ep, &req->send.state.uct_comp, req->send.state.uct_comp.count); } else if (status == UCS_INPROGRESS) { req->send.flush.lanes &= ~UCS_BIT(lane); } else if (status == UCS_ERR_NO_RESOURCE) { if (req->send.lane != UCP_NULL_LANE) { ucs_trace("ep %p: not adding pending flush %p on lane %d, " "because it's already pending on lane %d", ep, req, lane, req->send.lane); break; } status = uct_ep_pending_add(uct_ep, &req->send.uct, 0); ucs_trace("adding pending flush on ep %p lane[%d]: %s", ep, lane, ucs_status_string(status)); if (status == UCS_OK) { req->send.lane = lane; req->send.flush.lanes &= ~UCS_BIT(lane); } else if (status != UCS_ERR_BUSY) { ucp_ep_flush_error(req, status); break; } } else { ucp_ep_flush_error(req, status); break; } } if (!req->send.flush.sw_started && (req->send.state.uct_comp.count == 0)) { /* Start waiting for remote completions only after all lanes are flushed * on the transport level, so we are sure all pending requests were sent. * We don't need to wait for remote completions in these cases: * - The flush operation is in 'cancel' mode * - The endpoint is either not used or did not resolve the peer endpoint, * which means we didn't have any user operations which require remote * completion. In this case, the flush state may not even be initialized. */ if ((req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) || !ucs_test_all_flags(ep->flags, UCP_EP_FLAG_USED|UCP_EP_FLAG_DEST_EP)) { ucs_trace_req("flush request %p not waiting for remote completions", req); req->send.flush.sw_done = 1; } else { /* All pending requests were sent, so 'send_sn' value is up-to-date */ flush_state = ucp_ep_flush_state(ep); if (flush_state->send_sn == flush_state->cmpl_sn) { req->send.flush.sw_done = 1; ucs_trace_req("flush request %p remote completions done", req); } else { req->send.flush.cmpl_sn = flush_state->send_sn; ucs_queue_push(&flush_state->reqs, &req->send.flush.queue); ucs_trace_req("added flush request %p to ep remote completion queue" " with sn %d", req, req->send.flush.cmpl_sn); } } req->send.flush.sw_started = 1; } } static void ucp_ep_flush_slow_path_remove(ucp_request_t *req) { ucp_ep_h ep = req->send.ep; uct_worker_progress_unregister_safe(ep->worker->uct, &req->send.flush.prog_id); } static int ucp_flush_check_completion(ucp_request_t *req) { /* Check if flushed all lanes */ if (!ucp_ep_flush_is_completed(req)) { return 0; } ucs_trace_req("flush req %p completed", req); ucp_ep_flush_slow_path_remove(req); req->send.flush.flushed_cb(req); return 1; } static unsigned ucp_ep_flush_resume_slow_path_callback(void *arg) { ucp_request_t *req = arg; ucp_ep_flush_slow_path_remove(req); ucp_ep_flush_progress(req); ucp_flush_check_completion(req); return 0; } static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucp_lane_index_t lane = req->send.lane; ucp_ep_h ep = req->send.ep; ucs_status_t status; int completed; ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); status = uct_ep_flush(ep->uct_eps[lane], req->send.flush.uct_flags, &req->send.state.uct_comp); ucs_trace("flushing ep %p lane[%d]: %s", ep, lane, ucs_status_string(status)); if (status == UCS_OK) { --req->send.state.uct_comp.count; /* UCT endpoint is flushed */ } /* since req->flush.pend.lane is still non-NULL, this function will not * put anything on pending. */ ucp_ep_flush_progress(req); completed = ucp_flush_check_completion(req); /* If the operation has not completed, add slow-path progress to resume */ if (!completed && req->send.flush.lanes) { ucs_trace("ep %p: adding slow-path callback to resume flush", ep); uct_worker_progress_register_safe(ep->worker->uct, ucp_ep_flush_resume_slow_path_callback, req, 0, &req->send.flush.prog_id); } if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { /* flushed callback might release the request */ if (!completed) { req->send.lane = UCP_NULL_LANE; } return UCS_OK; } else if (status == UCS_ERR_NO_RESOURCE) { return UCS_ERR_NO_RESOURCE; } else { ucp_ep_flush_error(req, status); return UCS_OK; } } static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.state.uct_comp); ucs_trace_req("flush completion req=%p status=%d", req, status); ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED)); req->status = status; if (status == UCS_OK) { ucp_ep_flush_progress(req); } else { /* force flush completion in case of error */ req->send.flush.sw_done = 1; req->send.state.uct_comp.count = 0; } ucs_trace_req("flush completion req=%p comp_count=%d", req, req->send.state.uct_comp.count); ucp_flush_check_completion(req); } void ucp_ep_flush_remote_completed(ucp_request_t *req) { ucs_trace_req("flush remote ops completed req=%p", req); if (!req->send.flush.sw_done) { req->send.flush.sw_done = 1; ucp_flush_check_completion(req); } } ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags, ucp_send_callback_t req_cb, unsigned req_flags, ucp_request_t *worker_req, ucp_request_callback_t flushed_cb, const char *debug_name) { ucs_status_t status; ucp_request_t *req; ucs_debug("%s ep %p", debug_name, ep); if (ep->flags & UCP_EP_FLAG_FAILED) { return NULL; } req = ucp_request_get(ep->worker); if (req == NULL) { return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } /* * Flush operation can be queued on the pending queue of only one of the * lanes (indicated by req->send.lane) and scheduled for completion on any * number of lanes. req->send.uct_comp.count keeps track of how many lanes * are not flushed yet, and when it reaches zero, it means all lanes are * flushed. req->send.flush.lanes keeps track of which lanes we still have * to start flush on. */ req->flags = req_flags; req->status = UCS_OK; req->send.ep = ep; req->send.cb = req_cb; req->send.flush.flushed_cb = flushed_cb; req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep)); req->send.flush.prog_id = UCS_CALLBACKQ_ID_NULL; req->send.flush.uct_flags = uct_flags; req->send.flush.worker_req = worker_req; req->send.flush.sw_started = 0; req->send.flush.sw_done = 0; req->send.lane = UCP_NULL_LANE; req->send.uct.func = ucp_ep_flush_progress_pending; req->send.state.uct_comp.func = ucp_ep_flush_completion; req->send.state.uct_comp.count = ucp_ep_num_lanes(ep); ucp_ep_flush_progress(req); if (ucp_ep_flush_is_completed(req)) { status = req->status; ucs_trace_req("ep %p: releasing flush request %p, returning status %s", ep, req, ucs_status_string(status)); ucp_request_put(req); return UCS_STATUS_PTR(status); } ucs_trace_req("ep %p: return inprogress flush request %p (%p)", ep, req, req + 1); return req + 1; } static void ucp_ep_flushed_callback(ucp_request_t *req) { ucp_request_complete_send(req, req->status); } UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nb, (ep, flags, cb), ucp_ep_h ep, unsigned flags, ucp_send_callback_t cb) { void *request; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, cb, UCP_REQUEST_FLAG_CALLBACK, NULL, ucp_ep_flushed_callback, "flush_nb"); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); return request; } static ucs_status_t ucp_worker_flush_check(ucp_worker_h worker) { ucp_rsc_index_t iface_id; ucp_worker_iface_t *wiface; ucs_status_t status; if (worker->flush_ops_count) { return UCS_INPROGRESS; } for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) { wiface = worker->ifaces[iface_id]; if (wiface->iface == NULL) { continue; } status = uct_iface_flush(wiface->iface, 0, NULL); if (status != UCS_OK) { if (UCS_STATUS_IS_ERR(status)) { ucs_error("iface[%d] "UCT_TL_RESOURCE_DESC_FMT" flush failed: %s", iface_id, UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[wiface->rsc_index].tl_rsc), ucs_status_string(status)); } return status; } } return UCS_OK; } static void ucp_worker_flush_complete_one(ucp_request_t *req, ucs_status_t status, int force_progress_unreg) { ucp_worker_h worker = req->flush_worker.worker; int complete; --req->flush_worker.comp_count; complete = (req->flush_worker.comp_count == 0) || (status != UCS_OK); if (complete || force_progress_unreg) { uct_worker_progress_unregister_safe(worker->uct, &req->flush_worker.prog_id); } if (complete) { ucs_assert(status != UCS_INPROGRESS); ucp_request_complete(req, flush_worker.cb, status); } } static void ucp_worker_flush_ep_flushed_cb(ucp_request_t *req) { ucp_worker_flush_complete_one(req->send.flush.worker_req, UCS_OK, 0); ucp_request_put(req); } static unsigned ucp_worker_flush_progress(void *arg) { ucp_request_t *req = arg; ucp_worker_h worker = req->flush_worker.worker; ucp_ep_ext_gen_t *next_ep = req->flush_worker.next_ep; void *ep_flush_request; ucs_status_t status; ucp_ep_h ep; status = ucp_worker_flush_check(worker); if ((status == UCS_OK) || (&next_ep->ep_list == &worker->all_eps)) { /* If all ifaces are flushed, or we finished going over all endpoints, * no need to progress this request actively any more. Just wait until * all associated endpoint flush requests are completed. */ ucp_worker_flush_complete_one(req, UCS_OK, 1); } else if (status != UCS_INPROGRESS) { /* Error returned from uct iface flush */ ucp_worker_flush_complete_one(req, status, 1); } else if (worker->context->config.ext.flush_worker_eps) { /* Some endpoints are not flushed yet. Take next endpoint from the list * and start flush operation on it. */ ep = ucp_ep_from_ext_gen(next_ep); req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list, ucp_ep_ext_gen_t, ep_list); ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, UCP_REQUEST_FLAG_RELEASED, req, ucp_worker_flush_ep_flushed_cb, "flush_worker"); if (UCS_PTR_IS_ERR(ep_flush_request)) { /* endpoint flush resulted in an error */ status = UCS_PTR_STATUS(ep_flush_request); ucs_warn("ucp_ep_flush_internal() failed: %s", ucs_status_string(status)); } else if (ep_flush_request != NULL) { /* endpoint flush started, increment refcount */ ++req->flush_worker.comp_count; } } return 0; } static ucs_status_ptr_t ucp_worker_flush_nb_internal(ucp_worker_h worker, ucp_send_callback_t cb, unsigned req_flags) { ucs_status_t status; ucp_request_t *req; status = ucp_worker_flush_check(worker); if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) { return UCS_STATUS_PTR(status); } req = ucp_request_get(worker); if (req == NULL) { return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY); } req->flags = req_flags; req->status = UCS_OK; req->flush_worker.worker = worker; req->flush_worker.cb = cb; req->flush_worker.comp_count = 1; /* counting starts from 1, and decremented when finished going over all endpoints */ req->flush_worker.prog_id = UCS_CALLBACKQ_ID_NULL; req->flush_worker.next_ep = ucs_list_head(&worker->all_eps, ucp_ep_ext_gen_t, ep_list); uct_worker_progress_register_safe(worker->uct, ucp_worker_flush_progress, req, 0, &req->flush_worker.prog_id); return req + 1; } UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_worker_flush_nb, (worker, flags, cb), ucp_worker_h worker, unsigned flags, ucp_send_callback_t cb) { void *request; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); request = ucp_worker_flush_nb_internal(worker, cb, UCP_REQUEST_FLAG_CALLBACK); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return request; } static ucs_status_t ucp_flush_wait(ucp_worker_h worker, void *request) { return ucp_rma_wait(worker, request, "flush"); } UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker) { ucs_status_t status; void *request; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); request = ucp_worker_flush_nb_internal(worker, NULL, 0); status = ucp_flush_wait(worker, request); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return status; } UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) { ucs_status_t status; void *request; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, 0, NULL, ucp_ep_flushed_callback, "flush"); status = ucp_flush_wait(ep->worker, request); UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); return status; } UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker) { ucp_rsc_index_t rsc_index; ucp_worker_iface_t *wiface; ucs_status_t status; UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); ucs_for_each_bit(rsc_index, worker->context->tl_bitmap) { wiface = ucp_worker_iface(worker, rsc_index); if (wiface->iface == NULL) { continue; } status = uct_iface_fence(wiface->iface, 0); if (status != UCS_OK) { goto out; } } status = UCS_OK; out: UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); return status; }