Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifndef UCT_RC_EP_H
#define UCT_RC_EP_H

#include "rc_iface.h"

#include <uct/api/uct.h>
#include <ucs/debug/debug.h>


#define RC_UNSIGNALED_INF UINT16_MAX

enum {
    UCT_RC_FC_STAT_NO_CRED,
    UCT_RC_FC_STAT_TX_GRANT,
    UCT_RC_FC_STAT_TX_PURE_GRANT,
    UCT_RC_FC_STAT_TX_SOFT_REQ,
    UCT_RC_FC_STAT_TX_HARD_REQ,
    UCT_RC_FC_STAT_RX_GRANT,
    UCT_RC_FC_STAT_RX_PURE_GRANT,
    UCT_RC_FC_STAT_RX_SOFT_REQ,
    UCT_RC_FC_STAT_RX_HARD_REQ,
    UCT_RC_FC_STAT_FC_WND,
    UCT_RC_FC_STAT_LAST
};

enum {
    UCT_RC_TXQP_STAT_QP_FULL,
    UCT_RC_TXQP_STAT_SIGNAL,
    UCT_RC_TXQP_STAT_LAST
};

/*
 * Auxillary AM ID bits used by FC protocol.
 */
enum {
    /* Soft Credit Request: indicates that peer needs to piggy-back credits
     * grant to counter AM (if any). Can be bundled with
     * UCT_RC_EP_FC_FLAG_GRANT  */
    UCT_RC_EP_FC_FLAG_SOFT_REQ  = UCS_BIT(UCT_AM_ID_BITS),

    /* Hard Credit Request: indicates that wnd is close to be exhausted.
     * The peer must send separate AM with credit grant as soon as it
     * receives AM  with this bit set. Can be bundled with
     * UCT_RC_EP_FC_FLAG_GRANT */
    UCT_RC_EP_FC_FLAG_HARD_REQ  = UCS_BIT((UCT_AM_ID_BITS) + 1),

    /* Credit Grant: ep should update its FC wnd as soon as it receives AM with
     * this bit set. Can be bundled with either soft or hard request bits */
    UCT_RC_EP_FC_FLAG_GRANT     = UCS_BIT((UCT_AM_ID_BITS) + 2),

    /* Special FC AM with Credit Grant: Just an empty message indicating
     * credit grant. Can't be bundled with any other FC flag (as it consumes
     * all 3 FC bits). */
    UCT_RC_EP_FC_PURE_GRANT     = (UCT_RC_EP_FC_FLAG_HARD_REQ |
                                   UCT_RC_EP_FC_FLAG_SOFT_REQ |
                                   UCT_RC_EP_FC_FLAG_GRANT)
};

/*
 * FC protocol header mask
 */
#define UCT_RC_EP_FC_MASK UCT_RC_EP_FC_PURE_GRANT

/*
 * Macro to generate functions for AMO completions.
 */
#define UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(_num_bits, _is_be) \
    uct_rc_ep_atomic_handler_##_num_bits##_be##_is_be

/*
 * Check for send resources
 */
#define UCT_RC_CHECK_CQE_RET(_iface, _ep, _ret) \
    /* tx_moderation == 0 for TLs which don't support it */ \
    if (ucs_unlikely((_iface)->tx.cq_available <= \
        (signed)(_iface)->config.tx_moderation)) { \
        if (uct_rc_ep_check_cqe(_iface, _ep) != UCS_OK) { \
            return _ret; \
        } \
    }

#define UCT_RC_CHECK_TXQP_RET(_iface, _ep, _ret) \
    if (uct_rc_txqp_available(&(_ep)->txqp) <= 0) { \
        UCS_STATS_UPDATE_COUNTER((_ep)->txqp.stats, UCT_RC_TXQP_STAT_QP_FULL, 1); \
        UCS_STATS_UPDATE_COUNTER((_ep)->super.stats, UCT_EP_STAT_NO_RES, 1); \
        return _ret; \
    }

#define UCT_RC_CHECK_RES(_iface, _ep) \
    UCT_RC_CHECK_CQE_RET(_iface, _ep, UCS_ERR_NO_RESOURCE) \
    UCT_RC_CHECK_TXQP_RET(_iface, _ep, UCS_ERR_NO_RESOURCE)

/*
 * check for FC credits and add FC protocol bits (if any)
 */
#define UCT_RC_CHECK_FC_WND(_fc, _stats)\
    if ((_fc)->fc_wnd <= 0) { \
        UCS_STATS_UPDATE_COUNTER((_fc)->stats, UCT_RC_FC_STAT_NO_CRED, 1); \
        UCS_STATS_UPDATE_COUNTER(_stats, UCT_EP_STAT_NO_RES, 1); \
        return UCS_ERR_NO_RESOURCE; \
    } \


#define UCT_RC_UPDATE_FC_WND(_iface, _fc) \
    { \
        /* For performance reasons, prefer to update fc_wnd unconditionally */ \
        (_fc)->fc_wnd--; \
        \
        if ((_iface)->config.fc_enabled) { \
            UCS_STATS_SET_COUNTER((_fc)->stats, UCT_RC_FC_STAT_FC_WND, \
                                  (_fc)->fc_wnd); \
        } \
    }

#define UCT_RC_CHECK_FC(_iface, _ep, _am_id) \
    { \
        if (ucs_unlikely((_ep)->fc.fc_wnd <= (_iface)->config.fc_soft_thresh)) { \
            if ((_iface)->config.fc_enabled) { \
                UCT_RC_CHECK_FC_WND(&(_ep)->fc, (_ep)->super.stats); \
                (_am_id) |= uct_rc_fc_req_moderation(&(_ep)->fc, _iface); \
            } else { \
                /* Set fc_wnd to max, to send as much as possible without checks */ \
                (_ep)->fc.fc_wnd = INT16_MAX; \
            } \
        } \
        (_am_id) |= uct_rc_fc_get_fc_hdr((_ep)->fc.flags); /* take grant bit */ \
    }

#define UCT_RC_UPDATE_FC(_iface, _ep, _fc_hdr) \
    { \
        if ((_fc_hdr) & UCT_RC_EP_FC_FLAG_GRANT) { \
            UCS_STATS_UPDATE_COUNTER((_ep)->fc.stats, UCT_RC_FC_STAT_TX_GRANT, 1); \
        } \
        if ((_fc_hdr) & UCT_RC_EP_FC_FLAG_SOFT_REQ) { \
            UCS_STATS_UPDATE_COUNTER((_ep)->fc.stats, UCT_RC_FC_STAT_TX_SOFT_REQ, 1); \
        } else if ((_fc_hdr) & UCT_RC_EP_FC_FLAG_HARD_REQ) { \
            UCS_STATS_UPDATE_COUNTER((_ep)->fc.stats, UCT_RC_FC_STAT_TX_HARD_REQ, 1); \
        } \
        \
        (_ep)->fc.flags = 0; \
        \
        UCT_RC_UPDATE_FC_WND(_iface, &(_ep)->fc) \
    }


/* this is a common type for all rc and dc transports */
struct uct_rc_txqp {
    ucs_queue_head_t    outstanding;
    /* RC_UNSIGNALED_INF value forces signaled in moderation logic when
     * CQ credits are close to zero (less tx_moderation value) */
    uint16_t            unsignaled;
    /* Saved unsignaled value before it was set to inf to have possibility
     * to return correct amount of CQ credits on TX completion */
    uint16_t            unsignaled_store;
    /* If unsignaled was stored several times to aggregative value, let's return
     * credits only when this counter == 0 because it's impossible to return
     * exact value on each signaled completion */
    uint16_t            unsignaled_store_count;
    int16_t             available;
    UCS_STATS_NODE_DECLARE(stats)
};

typedef struct uct_rc_fc {
    /* Not more than fc_wnd active messages can be sent w/o acknowledgment */
    int16_t             fc_wnd;
    /* used only for FC protocol at this point (3 higher bits) */
    uint8_t             flags;
    UCS_STATS_NODE_DECLARE(stats)
} uct_rc_fc_t;

struct uct_rc_ep {
    uct_base_ep_t       super;
    uct_rc_txqp_t       txqp;
    ucs_list_link_t     list;
    ucs_arbiter_group_t arb_group;
    uct_rc_fc_t         fc;
};

UCS_CLASS_DECLARE(uct_rc_ep_t, uct_rc_iface_t*, uint32_t);


typedef struct uct_rc_ep_address {
    uct_ib_uint24_t  qp_num;
} UCS_S_PACKED uct_rc_ep_address_t;

void uct_rc_ep_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
                           void *data, size_t length, size_t valid_length,
                           char *buffer, size_t max);

void uct_rc_ep_get_bcopy_handler(uct_rc_iface_send_op_t *op, const void *resp);

void uct_rc_ep_get_bcopy_handler_no_completion(uct_rc_iface_send_op_t *op,
                                               const void *resp);

void uct_rc_ep_send_op_completion_handler(uct_rc_iface_send_op_t *op,
                                          const void *resp);

void uct_rc_ep_flush_op_completion_handler(uct_rc_iface_send_op_t *op,
                                           const void *resp);

ucs_status_t uct_rc_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *n,
                                   unsigned flags);

void uct_rc_ep_pending_purge(uct_ep_h ep, uct_pending_purge_callback_t cb,
                             void*arg);

ucs_arbiter_cb_result_t uct_rc_ep_process_pending(ucs_arbiter_t *arbiter,
                                                  ucs_arbiter_elem_t *elem,
                                                  void *arg);

ucs_status_t uct_rc_fc_init(uct_rc_fc_t *fc, int16_t winsize
                            UCS_STATS_ARG(ucs_stats_node_t* stats_parent));
void uct_rc_fc_cleanup(uct_rc_fc_t *fc);

ucs_status_t uct_rc_ep_fc_grant(uct_pending_req_t *self);

void uct_rc_txqp_purge_outstanding(uct_rc_txqp_t *txqp, ucs_status_t status,
                                   int is_log);

ucs_status_t uct_rc_ep_flush(uct_rc_ep_t *ep, int16_t max_available,
                             unsigned flags);

ucs_status_t uct_rc_ep_check_cqe(uct_rc_iface_t *iface, uct_rc_ep_t *ep);

void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(32, 0)(uct_rc_iface_send_op_t *op,
                                                   const void *resp);
void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(32, 1)(uct_rc_iface_send_op_t *op,
                                                   const void *resp);
void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(64, 0)(uct_rc_iface_send_op_t *op,
                                                   const void *resp);
void UCT_RC_DEFINE_ATOMIC_HANDLER_FUNC_NAME(64, 1)(uct_rc_iface_send_op_t *op,
                                                   const void *resp);

ucs_status_t uct_rc_txqp_init(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface,
                              uint32_t qp_num
                              UCS_STATS_ARG(ucs_stats_node_t* stats_parent));
void uct_rc_txqp_cleanup(uct_rc_txqp_t *txqp);

static inline int16_t uct_rc_txqp_available(uct_rc_txqp_t *txqp)
{
    return txqp->available;
}

static inline void uct_rc_txqp_available_add(uct_rc_txqp_t *txqp, int16_t val)
{
    txqp->available += val;
}

static inline void uct_rc_txqp_available_set(uct_rc_txqp_t *txqp, int16_t val)
{
    txqp->available = val;
}

static inline uint16_t uct_rc_txqp_unsignaled(uct_rc_txqp_t *txqp)
{
    return txqp->unsignaled;
}

static UCS_F_ALWAYS_INLINE
int uct_rc_fc_has_resources(uct_rc_iface_t *iface, uct_rc_fc_t *fc)
{
    /* When FC is disabled, fc_wnd may still become 0 because it's decremented
     * unconditionally (for performance reasons) */
    return (fc->fc_wnd > 0) || !iface->config.fc_enabled;
}

static UCS_F_ALWAYS_INLINE int uct_rc_ep_has_tx_resources(uct_rc_ep_t *ep)
{
    uct_rc_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_rc_iface_t);

    return (ep->txqp.available > 0) && uct_rc_fc_has_resources(iface, &ep->fc);
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_add_send_op(uct_rc_txqp_t *txqp, uct_rc_iface_send_op_t *op)
{

    /* NOTE: We insert the descriptor with the sequence number after the post,
     * because when polling completions, we get the number of completions (rather
     * than completion zero-based index).
     */
    ucs_assert(op != NULL);
    ucs_assertv(!(op->flags & UCT_RC_IFACE_SEND_OP_FLAG_INUSE), "op=%p", op);
    op->flags |= UCT_RC_IFACE_SEND_OP_FLAG_INUSE;
    ucs_queue_push(&txqp->outstanding, &op->queue);
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_add_send_op_sn(uct_rc_txqp_t *txqp, uct_rc_iface_send_op_t *op, uint16_t sn)
{
    ucs_trace_poll("txqp %p add send op %p sn %d handler %s", txqp, op, sn,
                   ucs_debug_get_symbol_name((void*)op->handler));
    op->sn = sn;
    uct_rc_txqp_add_send_op(txqp, op);
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_add_send_comp(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp,
                          uct_completion_t *comp, uint16_t sn, uint16_t flags)
{
    uct_rc_iface_send_op_t *op;

    if (comp == NULL) {
        return;
    }

    op            = uct_rc_iface_get_send_op(iface);
    op->user_comp = comp;
    op->flags    |= flags;
    uct_rc_txqp_add_send_op_sn(txqp, op, sn);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_txqp_add_flush_comp(uct_rc_iface_t *iface, uct_base_ep_t *ep,
                           uct_rc_txqp_t *txqp, uct_completion_t *comp,
                           uint16_t sn)
{
    uct_rc_iface_send_op_t *op;

    if (comp != NULL) {
        op = (uct_rc_iface_send_op_t*)ucs_mpool_get(&iface->tx.flush_mp);
        if (ucs_unlikely(op == NULL)) {
            ucs_error("Failed to allocate flush completion");
            return UCS_ERR_NO_MEMORY;
        }

        op->flags     = 0;
        op->user_comp = comp;
        uct_rc_txqp_add_send_op_sn(txqp, op, sn);
        VALGRIND_MAKE_MEM_DEFINED(op, sizeof(*op)); /* handler set by mpool init */
    }
    UCT_TL_EP_STAT_FLUSH_WAIT(ep);

    return UCS_INPROGRESS;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_completion_op(uct_rc_iface_send_op_t *op, const void *resp)
{
    ucs_trace_poll("complete op %p sn %d handler %s", op, op->sn,
                   ucs_debug_get_symbol_name((void*)op->handler));
    ucs_assert(op->flags & UCT_RC_IFACE_SEND_OP_FLAG_INUSE);
    op->flags &= ~(UCT_RC_IFACE_SEND_OP_FLAG_INUSE |
                   UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY);
    op->handler(op, resp);
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_completion_desc(uct_rc_txqp_t *txqp, uint16_t sn)
{
    uct_rc_iface_send_op_t *op;

    ucs_trace_poll("txqp %p complete ops up to sn %d", txqp, sn);
    ucs_queue_for_each_extract(op, &txqp->outstanding, queue,
                               UCS_CIRCULAR_COMPARE16(op->sn, <=, sn)) {
        uct_rc_txqp_completion_op(op, ucs_derived_of(op, uct_rc_iface_send_desc_t) + 1);
    }
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_completion_inl_resp(uct_rc_txqp_t *txqp, const void *resp, uint16_t sn)
{
    uct_rc_iface_send_op_t *op;

    ucs_trace_poll("txqp %p complete ops up to sn %d", txqp, sn);
    ucs_queue_for_each_extract(op, &txqp->outstanding, queue,
                               UCS_CIRCULAR_COMPARE16(op->sn, <=, sn)) {
        ucs_assert(!(op->flags & UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY));
        uct_rc_txqp_completion_op(op, resp);
    }
}

static UCS_F_ALWAYS_INLINE uint8_t
uct_rc_iface_tx_moderation(uct_rc_iface_t *iface, uct_rc_txqp_t *txqp, uint8_t flag)
{
    return (txqp->unsignaled >= iface->config.tx_moderation) ? flag : 0;
}

static UCS_F_ALWAYS_INLINE void
uct_rc_txqp_posted(uct_rc_txqp_t *txqp, uct_rc_iface_t *iface, uint16_t res_count,
                   int signaled)
{
    if (signaled) {
        ucs_assert(uct_rc_iface_have_tx_cqe_avail(iface));
        txqp->unsignaled = 0;
        UCS_STATS_UPDATE_COUNTER(txqp->stats, UCT_RC_TXQP_STAT_SIGNAL, 1);
    } else {
        ucs_assert(txqp->unsignaled != RC_UNSIGNALED_INF);
        ++txqp->unsignaled;
    }

    /* reserve cq credits for every posted operation,
     * in case it would complete with error */
    iface->tx.cq_available -= res_count;
    txqp->available -= res_count;
}

static UCS_F_ALWAYS_INLINE uint8_t
uct_rc_fc_get_fc_hdr(uint8_t id)
{
    return id & UCT_RC_EP_FC_MASK;
}

static UCS_F_ALWAYS_INLINE uint8_t
uct_rc_fc_req_moderation(uct_rc_fc_t *fc, uct_rc_iface_t *iface)
{
    return (fc->fc_wnd == iface->config.fc_hard_thresh) ?
            UCT_RC_EP_FC_FLAG_HARD_REQ :
           (fc->fc_wnd == iface->config.fc_soft_thresh) ?
            UCT_RC_EP_FC_FLAG_SOFT_REQ : 0;
}

static UCS_F_ALWAYS_INLINE int
uct_rc_ep_fm(uct_rc_iface_t *iface, uct_ib_fence_info_t* fi, int flag)
{
    int fence;

    /* a call to iface_fence increases beat, so if endpoint beat is not in
     * sync with iface beat it means the endpoint did not post any WQE with
     * fence flag yet */
    fence          = (fi->fence_beat != iface->tx.fi.fence_beat) ? flag : 0;
    fi->fence_beat = iface->tx.fi.fence_beat;
    return fence;
}

static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_ep_fence(uct_ep_h tl_ep, uct_ib_fence_info_t* fi, int fence)
{
    uct_rc_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_iface_t);

    /* in case if fence is requested and enabled by configuration
     * we need to schedule fence for next RDMA operation */
    if (fence && (iface->config.fence_mode != UCT_RC_FENCE_MODE_NONE)) {
        fi->fence_beat = iface->tx.fi.fence_beat - 1;
    }

    UCT_TL_EP_STAT_FENCE(ucs_derived_of(tl_ep, uct_base_ep_t));
    return UCS_OK;
}

#endif