/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2017. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <ucp/core/ucp_ep.h>
#include <ucp/core/ucp_ep.inl>
#include <ucp/core/ucp_request.inl>
#include "rma.inl"
static void ucp_ep_flush_error(ucp_request_t *req, ucs_status_t status)
{
if (ucp_ep_config(req->send.ep)->key.err_mode != UCP_ERR_HANDLING_MODE_PEER) {
ucs_error("error during flush: %s", ucs_status_string(status));
}
req->status = status;
--req->send.state.uct_comp.count;
}
static int ucp_ep_flush_is_completed(ucp_request_t *req)
{
return (req->send.state.uct_comp.count == 0) && req->send.flush.sw_done;
}
static void ucp_ep_flush_progress(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
ucp_ep_flush_state_t *flush_state;
ucp_lane_index_t lane;
ucs_status_t status;
uct_ep_h uct_ep;
ucs_trace("ep %p: progress flush req %p, lanes 0x%x count %d", ep, req,
req->send.flush.lanes, req->send.state.uct_comp.count);
while (req->send.flush.lanes) {
/* Search for next lane to start flush */
lane = ucs_ffs64(req->send.flush.lanes);
uct_ep = ep->uct_eps[lane];
if (uct_ep == NULL) {
req->send.flush.lanes &= ~UCS_BIT(lane);
--req->send.state.uct_comp.count;
continue;
}
/* Start flush operation on UCT endpoint */
if (req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) {
uct_ep_pending_purge(uct_ep, ucp_ep_err_pending_purge,
UCS_STATUS_PTR(UCS_ERR_CANCELED));
}
status = uct_ep_flush(uct_ep, req->send.flush.uct_flags,
&req->send.state.uct_comp);
ucs_trace("flushing ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
req->send.flush.lanes &= ~UCS_BIT(lane);
--req->send.state.uct_comp.count;
ucs_trace("ep %p: flush comp %p count reduced to %d", ep,
&req->send.state.uct_comp, req->send.state.uct_comp.count);
} else if (status == UCS_INPROGRESS) {
req->send.flush.lanes &= ~UCS_BIT(lane);
} else if (status == UCS_ERR_NO_RESOURCE) {
if (req->send.lane != UCP_NULL_LANE) {
ucs_trace("ep %p: not adding pending flush %p on lane %d, "
"because it's already pending on lane %d",
ep, req, lane, req->send.lane);
break;
}
status = uct_ep_pending_add(uct_ep, &req->send.uct, 0);
ucs_trace("adding pending flush on ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
req->send.lane = lane;
req->send.flush.lanes &= ~UCS_BIT(lane);
} else if (status != UCS_ERR_BUSY) {
ucp_ep_flush_error(req, status);
break;
}
} else {
ucp_ep_flush_error(req, status);
break;
}
}
if (!req->send.flush.sw_started && (req->send.state.uct_comp.count == 0)) {
/* Start waiting for remote completions only after all lanes are flushed
* on the transport level, so we are sure all pending requests were sent.
* We don't need to wait for remote completions in these cases:
* - The flush operation is in 'cancel' mode
* - The endpoint is either not used or did not resolve the peer endpoint,
* which means we didn't have any user operations which require remote
* completion. In this case, the flush state may not even be initialized.
*/
if ((req->send.flush.uct_flags & UCT_FLUSH_FLAG_CANCEL) ||
!ucs_test_all_flags(ep->flags, UCP_EP_FLAG_USED|UCP_EP_FLAG_DEST_EP)) {
ucs_trace_req("flush request %p not waiting for remote completions",
req);
req->send.flush.sw_done = 1;
} else {
/* All pending requests were sent, so 'send_sn' value is up-to-date */
flush_state = ucp_ep_flush_state(ep);
if (flush_state->send_sn == flush_state->cmpl_sn) {
req->send.flush.sw_done = 1;
ucs_trace_req("flush request %p remote completions done", req);
} else {
req->send.flush.cmpl_sn = flush_state->send_sn;
ucs_queue_push(&flush_state->reqs, &req->send.flush.queue);
ucs_trace_req("added flush request %p to ep remote completion queue"
" with sn %d", req, req->send.flush.cmpl_sn);
}
}
req->send.flush.sw_started = 1;
}
}
static void ucp_ep_flush_slow_path_remove(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
uct_worker_progress_unregister_safe(ep->worker->uct,
&req->send.flush.prog_id);
}
static int ucp_flush_check_completion(ucp_request_t *req)
{
/* Check if flushed all lanes */
if (!ucp_ep_flush_is_completed(req)) {
return 0;
}
ucs_trace_req("flush req %p completed", req);
ucp_ep_flush_slow_path_remove(req);
req->send.flush.flushed_cb(req);
return 1;
}
static unsigned ucp_ep_flush_resume_slow_path_callback(void *arg)
{
ucp_request_t *req = arg;
ucp_ep_flush_slow_path_remove(req);
ucp_ep_flush_progress(req);
ucp_flush_check_completion(req);
return 0;
}
static ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
ucp_lane_index_t lane = req->send.lane;
ucp_ep_h ep = req->send.ep;
ucs_status_t status;
int completed;
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));
status = uct_ep_flush(ep->uct_eps[lane], req->send.flush.uct_flags,
&req->send.state.uct_comp);
ucs_trace("flushing ep %p lane[%d]: %s", ep, lane,
ucs_status_string(status));
if (status == UCS_OK) {
--req->send.state.uct_comp.count; /* UCT endpoint is flushed */
}
/* since req->flush.pend.lane is still non-NULL, this function will not
* put anything on pending.
*/
ucp_ep_flush_progress(req);
completed = ucp_flush_check_completion(req);
/* If the operation has not completed, add slow-path progress to resume */
if (!completed && req->send.flush.lanes) {
ucs_trace("ep %p: adding slow-path callback to resume flush", ep);
uct_worker_progress_register_safe(ep->worker->uct,
ucp_ep_flush_resume_slow_path_callback,
req, 0, &req->send.flush.prog_id);
}
if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
/* flushed callback might release the request */
if (!completed) {
req->send.lane = UCP_NULL_LANE;
}
return UCS_OK;
} else if (status == UCS_ERR_NO_RESOURCE) {
return UCS_ERR_NO_RESOURCE;
} else {
ucp_ep_flush_error(req, status);
return UCS_OK;
}
}
static void ucp_ep_flush_completion(uct_completion_t *self, ucs_status_t status)
{
ucp_request_t *req = ucs_container_of(self, ucp_request_t,
send.state.uct_comp);
ucs_trace_req("flush completion req=%p status=%d", req, status);
ucs_assert(!(req->flags & UCP_REQUEST_FLAG_COMPLETED));
req->status = status;
if (status == UCS_OK) {
ucp_ep_flush_progress(req);
} else {
/* force flush completion in case of error */
req->send.flush.sw_done = 1;
req->send.state.uct_comp.count = 0;
}
ucs_trace_req("flush completion req=%p comp_count=%d", req, req->send.state.uct_comp.count);
ucp_flush_check_completion(req);
}
void ucp_ep_flush_remote_completed(ucp_request_t *req)
{
ucs_trace_req("flush remote ops completed req=%p", req);
if (!req->send.flush.sw_done) {
req->send.flush.sw_done = 1;
ucp_flush_check_completion(req);
}
}
ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned uct_flags,
ucp_send_callback_t req_cb,
unsigned req_flags,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
const char *debug_name)
{
ucs_status_t status;
ucp_request_t *req;
ucs_debug("%s ep %p", debug_name, ep);
if (ep->flags & UCP_EP_FLAG_FAILED) {
return NULL;
}
req = ucp_request_get(ep->worker);
if (req == NULL) {
return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
}
/*
* Flush operation can be queued on the pending queue of only one of the
* lanes (indicated by req->send.lane) and scheduled for completion on any
* number of lanes. req->send.uct_comp.count keeps track of how many lanes
* are not flushed yet, and when it reaches zero, it means all lanes are
* flushed. req->send.flush.lanes keeps track of which lanes we still have
* to start flush on.
*/
req->flags = req_flags;
req->status = UCS_OK;
req->send.ep = ep;
req->send.cb = req_cb;
req->send.flush.flushed_cb = flushed_cb;
req->send.flush.lanes = UCS_MASK(ucp_ep_num_lanes(ep));
req->send.flush.prog_id = UCS_CALLBACKQ_ID_NULL;
req->send.flush.uct_flags = uct_flags;
req->send.flush.worker_req = worker_req;
req->send.flush.sw_started = 0;
req->send.flush.sw_done = 0;
req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.state.uct_comp.func = ucp_ep_flush_completion;
req->send.state.uct_comp.count = ucp_ep_num_lanes(ep);
ucp_ep_flush_progress(req);
if (ucp_ep_flush_is_completed(req)) {
status = req->status;
ucs_trace_req("ep %p: releasing flush request %p, returning status %s",
ep, req, ucs_status_string(status));
ucp_request_put(req);
return UCS_STATUS_PTR(status);
}
ucs_trace_req("ep %p: return inprogress flush request %p (%p)", ep, req,
req + 1);
return req + 1;
}
static void ucp_ep_flushed_callback(ucp_request_t *req)
{
ucp_request_complete_send(req, req->status);
}
UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nb, (ep, flags, cb),
ucp_ep_h ep, unsigned flags, ucp_send_callback_t cb)
{
void *request;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, cb,
UCP_REQUEST_FLAG_CALLBACK, NULL,
ucp_ep_flushed_callback, "flush_nb");
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
return request;
}
static ucs_status_t ucp_worker_flush_check(ucp_worker_h worker)
{
ucp_rsc_index_t iface_id;
ucp_worker_iface_t *wiface;
ucs_status_t status;
if (worker->flush_ops_count) {
return UCS_INPROGRESS;
}
for (iface_id = 0; iface_id < worker->num_ifaces; ++iface_id) {
wiface = worker->ifaces[iface_id];
if (wiface->iface == NULL) {
continue;
}
status = uct_iface_flush(wiface->iface, 0, NULL);
if (status != UCS_OK) {
if (UCS_STATUS_IS_ERR(status)) {
ucs_error("iface[%d] "UCT_TL_RESOURCE_DESC_FMT" flush failed: %s",
iface_id,
UCT_TL_RESOURCE_DESC_ARG(&worker->context->tl_rscs[wiface->rsc_index].tl_rsc),
ucs_status_string(status));
}
return status;
}
}
return UCS_OK;
}
static void ucp_worker_flush_complete_one(ucp_request_t *req, ucs_status_t status,
int force_progress_unreg)
{
ucp_worker_h worker = req->flush_worker.worker;
int complete;
--req->flush_worker.comp_count;
complete = (req->flush_worker.comp_count == 0) || (status != UCS_OK);
if (complete || force_progress_unreg) {
uct_worker_progress_unregister_safe(worker->uct,
&req->flush_worker.prog_id);
}
if (complete) {
ucs_assert(status != UCS_INPROGRESS);
ucp_request_complete(req, flush_worker.cb, status);
}
}
static void ucp_worker_flush_ep_flushed_cb(ucp_request_t *req)
{
ucp_worker_flush_complete_one(req->send.flush.worker_req, UCS_OK, 0);
ucp_request_put(req);
}
static unsigned ucp_worker_flush_progress(void *arg)
{
ucp_request_t *req = arg;
ucp_worker_h worker = req->flush_worker.worker;
ucp_ep_ext_gen_t *next_ep = req->flush_worker.next_ep;
void *ep_flush_request;
ucs_status_t status;
ucp_ep_h ep;
status = ucp_worker_flush_check(worker);
if ((status == UCS_OK) || (&next_ep->ep_list == &worker->all_eps)) {
/* If all ifaces are flushed, or we finished going over all endpoints,
* no need to progress this request actively any more. Just wait until
* all associated endpoint flush requests are completed.
*/
ucp_worker_flush_complete_one(req, UCS_OK, 1);
} else if (status != UCS_INPROGRESS) {
/* Error returned from uct iface flush */
ucp_worker_flush_complete_one(req, status, 1);
} else if (worker->context->config.ext.flush_worker_eps) {
/* Some endpoints are not flushed yet. Take next endpoint from the list
* and start flush operation on it.
*/
ep = ucp_ep_from_ext_gen(next_ep);
req->flush_worker.next_ep = ucs_list_next(&next_ep->ep_list,
ucp_ep_ext_gen_t, ep_list);
ep_flush_request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL,
UCP_REQUEST_FLAG_RELEASED, req,
ucp_worker_flush_ep_flushed_cb,
"flush_worker");
if (UCS_PTR_IS_ERR(ep_flush_request)) {
/* endpoint flush resulted in an error */
status = UCS_PTR_STATUS(ep_flush_request);
ucs_warn("ucp_ep_flush_internal() failed: %s", ucs_status_string(status));
} else if (ep_flush_request != NULL) {
/* endpoint flush started, increment refcount */
++req->flush_worker.comp_count;
}
}
return 0;
}
static ucs_status_ptr_t ucp_worker_flush_nb_internal(ucp_worker_h worker,
ucp_send_callback_t cb,
unsigned req_flags)
{
ucs_status_t status;
ucp_request_t *req;
status = ucp_worker_flush_check(worker);
if ((status != UCS_INPROGRESS) && (status != UCS_ERR_NO_RESOURCE)) {
return UCS_STATUS_PTR(status);
}
req = ucp_request_get(worker);
if (req == NULL) {
return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
}
req->flags = req_flags;
req->status = UCS_OK;
req->flush_worker.worker = worker;
req->flush_worker.cb = cb;
req->flush_worker.comp_count = 1; /* counting starts from 1, and decremented
when finished going over all endpoints */
req->flush_worker.prog_id = UCS_CALLBACKQ_ID_NULL;
req->flush_worker.next_ep = ucs_list_head(&worker->all_eps,
ucp_ep_ext_gen_t, ep_list);
uct_worker_progress_register_safe(worker->uct, ucp_worker_flush_progress,
req, 0, &req->flush_worker.prog_id);
return req + 1;
}
UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_worker_flush_nb, (worker, flags, cb),
ucp_worker_h worker, unsigned flags, ucp_send_callback_t cb)
{
void *request;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
request = ucp_worker_flush_nb_internal(worker, cb,
UCP_REQUEST_FLAG_CALLBACK);
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
return request;
}
static ucs_status_t ucp_flush_wait(ucp_worker_h worker, void *request)
{
return ucp_rma_wait(worker, request, "flush");
}
UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_flush, (worker), ucp_worker_h worker)
{
ucs_status_t status;
void *request;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
request = ucp_worker_flush_nb_internal(worker, NULL, 0);
status = ucp_flush_wait(worker, request);
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
return status;
}
UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep)
{
ucs_status_t status;
void *request;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);
request = ucp_ep_flush_internal(ep, UCT_FLUSH_FLAG_LOCAL, NULL, 0, NULL,
ucp_ep_flushed_callback, "flush");
status = ucp_flush_wait(ep->worker, request);
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
return status;
}
UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker)
{
ucp_rsc_index_t rsc_index;
ucp_worker_iface_t *wiface;
ucs_status_t status;
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
ucs_for_each_bit(rsc_index, worker->context->tl_bitmap) {
wiface = ucp_worker_iface(worker, rsc_index);
if (wiface->iface == NULL) {
continue;
}
status = uct_iface_fence(wiface->iface, 0);
if (status != UCS_OK) {
goto out;
}
}
status = UCS_OK;
out:
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
return status;
}