Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
* Copyright (c) UT-Battelle, LLC. 2015. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "rc_ep.h"
#include "rc_iface.h"

#include <uct/ib/base/ib_verbs.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/type/class.h>
#include <endian.h>

#if ENABLE_STATS
static ucs_stats_class_t uct_rc_fc_stats_class = {
    .name = "rc_fc",
    .num_counters = UCT_RC_FC_STAT_LAST,
    .counter_names = {
        [UCT_RC_FC_STAT_NO_CRED]            = "no_cred",
        [UCT_RC_FC_STAT_TX_GRANT]           = "tx_grant",
        [UCT_RC_FC_STAT_TX_PURE_GRANT]      = "tx_pure_grant",
        [UCT_RC_FC_STAT_TX_SOFT_REQ]        = "tx_soft_req",
        [UCT_RC_FC_STAT_TX_HARD_REQ]        = "tx_hard_req",
        [UCT_RC_FC_STAT_RX_GRANT]           = "rx_grant",
        [UCT_RC_FC_STAT_RX_PURE_GRANT]      = "rx_pure_grant",
        [UCT_RC_FC_STAT_RX_SOFT_REQ]        = "rx_soft_req",
        [UCT_RC_FC_STAT_RX_HARD_REQ]        = "rx_hard_req",
        [UCT_RC_FC_STAT_FC_WND]             = "fc_wnd"
    }
};

static ucs_stats_class_t uct_rc_txqp_stats_class = {
    .name = "rc_txqp",
    .num_counters = UCT_RC_TXQP_STAT_LAST,
    .counter_names = {
        [UCT_RC_TXQP_STAT_QP_FULL]          = "qp_full",
        [UCT_RC_TXQP_STAT_SIGNAL]           = "signal"
    }
};
#endif

ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
                              uint32_t qp_num
                              UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
{
    txqp->unsignaled = 0;
    txqp->unsignaled_store = 0;
    txqp->unsignaled_store_count = 0;
    txqp->available  = 0;
    ucs_queue_head_init(&txqp->outstanding);

    return UCS_STATS_NODE_ALLOC(&txqp->stats, &uct_rc_txqp_stats_class,
                                stats_parent, "-0x%x", qp_num);
}

void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp)
{
    uct_rc_txqp_purge_outstanding(txqp, UCS_ERR_CANCELED, 1);
    UCS_STATS_NODE_FREE(txqp->stats);
}

ucs_status_t uct_rc_fc_init(uct_rc_fc_t *fc, int16_t winsize
                            UCS_STATS_ARG(ucs_stats_node_t* stats_parent))
{
    ucs_status_t status;

    fc->fc_wnd     = winsize;
    fc->flags      = 0;

    status = UCS_STATS_NODE_ALLOC(&fc->stats, &uct_rc_fc_stats_class,
                                  stats_parent);
    if (status != UCS_OK) {
       return status;
    }

    UCS_STATS_SET_COUNTER(fc->stats, UCT_RC_FC_STAT_FC_WND, fc->fc_wnd);

    return UCS_OK;
}

void uct_rc_fc_cleanup(uct_rc_fc_t *fc)
{
    UCS_STATS_NODE_FREE(fc->stats);
}

UCS_CLASS_INIT_FUNC(uct_rc_ep_t, uct_rc_iface_t *iface, uint32_t qp_num)
{
    ucs_status_t status;

    UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);

    status = uct_rc_txqp_init(&self->txqp, iface, qp_num
                              UCS_STATS_ARG(self->super.stats));
    if (status != UCS_OK) {
        return status;
    }

    status = uct_rc_fc_init(&self->fc, iface->config.fc_wnd_size
                            UCS_STATS_ARG(self->super.stats));
    if (status != UCS_OK) {
        goto err_txqp_cleanup;
    }

    /* Check that FC protocol fits AM id
     * (just in case AM id space gets extended) */
    UCS_STATIC_ASSERT(UCT_RC_EP_FC_MASK < UINT8_MAX);

    ucs_arbiter_group_init(&self->arb_group);

    ucs_list_add_head(&iface->ep_list, &self->list);
    return UCS_OK;

err_txqp_cleanup:
    uct_rc_txqp_cleanup(&self->txqp);
    return status;
}

static UCS_CLASS_CLEANUP_FUNC(uct_rc_ep_t)
{
    ucs_debug("destroy rc ep %p", self);

    ucs_list_del(&self->list);
    uct_rc_ep_pending_purge(&self->super.super, NULL, NULL);
    uct_rc_fc_cleanup(&self->fc);
    uct_rc_txqp_cleanup(&self->txqp);
}

UCS_CLASS_DEFINE(uct_rc_ep_t, uct_base_ep_t)

void uct_rc_ep_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
                           void *data, size_t length, size_t valid_length,
                           char *buffer, size_t max)
{
    uct_rc_hdr_t *rch = data;
    uint8_t fc_hdr    = uct_rc_fc_get_fc_hdr(rch->am_id);
    uint8_t am_wo_fc;

    /* Do not invoke AM tracer for auxiliary pure FC_GRANT message */
    if (fc_hdr != UCT_RC_EP_FC_PURE_GRANT) {
        am_wo_fc = rch->am_id & ~UCT_RC_EP_FC_MASK; /* mask out FC bits*/
        snprintf(buffer, max, " %c%c am %d ",
                 fc_hdr & UCT_RC_EP_FC_FLAG_SOFT_REQ ? 's' :
                 fc_hdr & UCT_RC_EP_FC_FLAG_HARD_REQ ? 'h' : '-',
                 fc_hdr & UCT_RC_EP_FC_FLAG_GRANT    ? 'g' : '-',
                 am_wo_fc);
        uct_iface_dump_am(iface, type, am_wo_fc, rch + 1, length - sizeof(*rch),
                          buffer + strlen(buffer), max - strlen(buffer));
    } else {
        snprintf(buffer, max, " FC pure grant am ");
    }
}

void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
{
    uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);

    VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);

    desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);

    uct_invoke_completion(desc->super.user_comp, UCS_OK);

    ucs_mpool_put(desc);
}

void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
                                               const void *resp)
{
    uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);

    VALGRIND_MAKE_MEM_DEFINED(resp, desc->super.length);

    desc->unpack_cb(desc->super.unpack_arg, resp, desc->super.length);

    ucs_mpool_put(desc);
}

void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
                                          const void *resp)
{
    uct_invoke_completion(op->user_comp, UCS_OK);
    uct_rc_iface_put_send_op(op);
}

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
                                           const void *resp)
{
    uct_invoke_completion(op->user_comp, UCS_OK);
    ucs_mpool_put(op);
}

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
                                   unsigned flags)
{
    uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
    uct_rc_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_ep_t);

    if (uct_rc_ep_has_tx_resources(ep) &&
        uct_rc_iface_has_tx_resources(iface)) {
        return UCS_ERR_BUSY;
    }

    UCS_STATIC_ASSERT(sizeof(uct_pending_req_priv_arb_t) <=
                      UCT_PENDING_REQ_PRIV_LEN);
    uct_pending_req_arb_group_push(&ep->arb_group, n);
    UCT_TL_EP_STAT_PEND(&ep->super);

    if (uct_rc_ep_has_tx_resources(ep)) {
        /* If we have ep (but not iface) resources, we need to schedule the ep */
        ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
    }

    return UCS_OK;
}

ucs_arbiter_cb_result_t uct_rc_ep_process_pending(ucs_arbiter_t *arbiter,
                                                  ucs_arbiter_elem_t *elem,
                                                  void *arg)
{
    uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv);
    uct_rc_iface_t *iface UCS_V_UNUSED;
    ucs_status_t status;
    uct_rc_ep_t *ep;

    ucs_trace_data("progressing pending request %p", req);
    status = req->func(req);
    ucs_trace_data("status returned from progress pending: %s",
                   ucs_status_string(status));

    if (status == UCS_OK) {
        return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
    } else if (status == UCS_INPROGRESS) {
        return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
    } else {
        ep    = ucs_container_of(ucs_arbiter_elem_group(elem), uct_rc_ep_t, arb_group);
        iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);
        if (!uct_rc_iface_has_tx_resources(iface)) {
            /* No iface resources */
            return UCS_ARBITER_CB_RESULT_STOP;
        } else {
            /* No ep resources */
            ucs_assertv(!uct_rc_ep_has_tx_resources(ep),
                        "pending callback returned error but send resources are available");
            return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
        }
    }
}

static ucs_arbiter_cb_result_t uct_rc_ep_abriter_purge_cb(ucs_arbiter_t *arbiter,
                                                          ucs_arbiter_elem_t *elem,
                                                          void *arg)
{
    uct_purge_cb_args_t *cb_args    = arg;
    uct_pending_purge_callback_t cb = cb_args->cb;
    uct_pending_req_t *req          = ucs_container_of(elem, uct_pending_req_t,
                                                       priv);
    uct_rc_ep_t UCS_V_UNUSED *ep    = ucs_container_of(
                                          ucs_arbiter_elem_group(elem),
                                          uct_rc_ep_t, arb_group);
    uct_rc_fc_request_t *freq;

    /* Invoke user's callback only if it is not internal FC message */
    if (ucs_likely(req->func != uct_rc_ep_fc_grant)){
        if (cb != NULL) {
            cb(req, cb_args->arg);
        } else {
            ucs_debug("ep=%p cancelling user pending request %p", ep, req);
        }
    } else {
        freq = ucs_derived_of(req, uct_rc_fc_request_t);
        ucs_mpool_put(freq);
    }
    return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}

void uct_rc_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
                             void *arg)
{
    uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);
    uct_rc_ep_t *ep       = ucs_derived_of(tl_ep, uct_rc_ep_t);
    uct_purge_cb_args_t  args = {cb, arg};

    ucs_arbiter_group_purge(&iface->tx.arbiter, &ep->arb_group,
                            uct_rc_ep_abriter_purge_cb, &args);
}

ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self)
{
    ucs_status_t status;
    uct_rc_fc_request_t *freq = ucs_derived_of(self, uct_rc_fc_request_t);
    uct_rc_ep_t *ep           = ucs_derived_of(freq->ep, uct_rc_ep_t);
    uct_rc_iface_t *iface     = ucs_derived_of(ep->super.super.iface,
                                               uct_rc_iface_t);

    ucs_assert_always(iface->config.fc_enabled);
    status = uct_rc_fc_ctrl(&ep->super.super, UCT_RC_EP_FC_PURE_GRANT, NULL);
    if (status == UCS_OK) {
        UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_TX_PURE_GRANT, 1);
        ucs_mpool_put(freq);
    }
    return status;
}

void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
                                   int is_log)
{
    uct_rc_iface_send_op_t *op;
    uct_rc_iface_send_desc_t *desc;

    ucs_queue_for_each_extract(op, &txqp->outstanding, queue, 1) {
        if (op->handler != (uct_rc_send_handler_t)ucs_mpool_put) {
            if (is_log != 0) {
                ucs_warn("destroying rc ep %p with uncompleted operation %p",
                         txqp, op);
            }

            if (op->user_comp != NULL) {
                /* This must be uct_rc_ep_get_bcopy_handler,
                 * uct_rc_ep_send_completion_proxy_handler,
                 * one of the atomic handlers,
                 * so invoke user completion */
                uct_invoke_completion(op->user_comp, status);
            }
        }
        op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE |
                       UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY);
        if (op->handler == uct_rc_ep_send_op_completion_handler) {
            uct_rc_iface_put_send_op(op);
        } else if (op->handler == uct_rc_ep_flush_op_completion_handler) {
            ucs_mpool_put(op);
        } else {
            desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
            ucs_mpool_put(desc);
        }
    }
}

ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
                             unsigned flags)
{
    uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                           uct_rc_iface_t);

    if (!uct_rc_iface_has_tx_resources(iface) ||
        !uct_rc_ep_has_tx_resources(ep)) {
        return UCS_ERR_NO_RESOURCE;
    }

    if (uct_rc_txqp_available(&ep->txqp) == max_available) {
        UCT_TL_EP_STAT_FLUSH(&ep->super);
        return UCS_OK;
    }

    return UCS_INPROGRESS;
}

ucs_status_t uct_rc_ep_check_cqe(uct_rc_iface_t *iface, uct_rc_ep_t *ep)
{
    uct_rc_txqp_t *txqp;

    if (!uct_rc_iface_have_tx_cqe_avail(iface)) {
        UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_RC_IFACE_STAT_NO_CQE, 1);
        UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
        return UCS_ERR_NO_RESOURCE;
    }

    txqp = &ep->txqp;
    /* if unsignaled == RC_UNSIGNALED_INF this value was already saved and \
       next operation will be defenitly signaled */
    if (txqp->unsignaled != RC_UNSIGNALED_INF) {
        txqp->unsignaled_store_count++;
        txqp->unsignaled_store += txqp->unsignaled;
        txqp->unsignaled        = RC_UNSIGNALED_INF;
    }

    return UCS_OK;
}

#define UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(_num_bits, _is_be) \
    void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(_num_bits, _is_be) \
            (uct_rc_iface_send_op_t *op, const void *resp) \
    { \
        uct_rc_iface_send_desc_t *desc = \
            ucs_derived_of(op, uct_rc_iface_send_desc_t); \
        const uint##_num_bits##_t *value = resp; \
        uint##_num_bits##_t *dest = desc->super.buffer; \
        \
        VALGRIND_MAKE_MEM_DEFINED(value, sizeof(*value)); \
        if (_is_be && (_num_bits == 32)) { \
            *dest = ntohl(*value); /* TODO swap in-place */ \
        } else if (_is_be && (_num_bits == 64)) { \
            *dest = be64toh(*value); \
        } else { \
            *dest = *value; \
        } \
        \
        uct_invoke_completion(desc->super.user_comp, UCS_OK); \
        ucs_mpool_put(desc); \
  }

UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 0);
UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(32, 1);
UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 0);
UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC(64, 1);

void uct_rc_ep_am_zcopy_handler(uct_rc_iface_send_op_t *op, const void *resp)
{
    uct_rc_iface_send_desc_t *desc = ucs_derived_of(op, uct_rc_iface_send_desc_t);
    uct_invoke_completion(desc->super.user_comp, UCS_OK);
    ucs_mpool_put(desc);
}