/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "rc_mlx5.h"
#if HAVE_DECL_IBV_CMD_MODIFY_QP
#include <infiniband/driver.h>
#endif
#include <uct/ib/mlx5/ib_mlx5_log.h>
#include <uct/ib/mlx5/exp/ib_exp.h>
#include <ucs/arch/cpu.h>
#include <ucs/sys/compiler.h>
#include <arpa/inet.h> /* For htonl */
#include "rc_mlx5.inl"
/*
*
* Helper function for buffer-copy post.
* Adds the descriptor to the callback queue.
*/
static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_txqp_bcopy_post(uct_rc_mlx5_iface_common_t *iface,
uct_rc_txqp_t *txqp, uct_ib_mlx5_txwq_t *txwq,
unsigned opcode, unsigned length,
/* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
uint8_t fm_ce_se, uint32_t imm_val_be,
uct_rc_iface_send_desc_t *desc, const void *buffer,
uct_ib_log_sge_t *log_sge)
{
desc->super.sn = txwq->sw_pi;
uct_rc_mlx5_txqp_dptr_post(iface, IBV_QPT_RC, txqp, txwq,
opcode, buffer, length, &desc->lkey,
rdma_raddr, uct_ib_md_direct_rkey(rdma_rkey),
0, 0, 0, 0,
NULL, NULL, 0, fm_ce_se, imm_val_be, INT_MAX, log_sge);
uct_rc_txqp_add_send_op(txqp, &desc->super);
}
/*
* Helper function for zero-copy post.
* Adds user completion to the callback queue.
*/
static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_ep_zcopy_post(uct_rc_mlx5_ep_t *ep,
unsigned opcode, const uct_iov_t *iov, size_t iovcnt,
/* SEND */ uint8_t am_id, const void *am_hdr, unsigned am_hdr_len,
/* RDMA */ uint64_t rdma_raddr, uct_rkey_t rdma_rkey,
/* TAG */ uct_tag_t tag, uint32_t app_ctx, uint32_t ib_imm_be,
int force_sig, uct_completion_t *comp)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(ep->super.super.super.iface,
uct_rc_mlx5_iface_common_t);
uint16_t sn;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
sn = ep->tx.wq.sw_pi;
uct_rc_mlx5_txqp_dptr_post_iov(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
opcode, iov, iovcnt,
am_id, am_hdr, am_hdr_len,
rdma_raddr, uct_ib_md_direct_rkey(rdma_rkey),
tag, app_ctx, ib_imm_be,
NULL, NULL, 0,
(comp == NULL) ? force_sig : MLX5_WQE_CTRL_CQ_UPDATE,
UCT_IB_MAX_ZCOPY_LOG_SGE(&iface->super.super));
uct_rc_txqp_add_send_comp(&iface->super, &ep->super.txqp, comp, sn,
UCT_RC_IFACE_SEND_OP_FLAG_ZCOPY);
return UCS_INPROGRESS;
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_put_short_inline(uct_ep_h tl_ep, const void *buffer, unsigned length,
uint64_t remote_addr, uct_rkey_t rkey)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
UCT_RC_MLX5_CHECK_PUT_SHORT(length, 0);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_RDMA_WRITE,
buffer, length, 0, 0, 0,
remote_addr, uct_ib_md_direct_rkey(rkey),
NULL, NULL, 0, 0, INT_MAX);
UCT_TL_EP_STAT_OP(&ep->super.super, PUT, SHORT, length);
return UCS_OK;
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_am_short_inline(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
UCT_RC_MLX5_CHECK_AM_SHORT(id, length, 0);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_SEND,
payload, length,
id, hdr, 0,
0, 0,
NULL, NULL, 0,
MLX5_WQE_CTRL_SOLICITED,
INT_MAX);
UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(hdr) + length);
UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
return UCS_OK;
}
#if HAVE_IBV_DM
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_short_dm(uct_rc_mlx5_ep_t *ep, uct_rc_mlx5_dm_copy_data_t *cache,
size_t hdr_len, const void *payload, unsigned length,
unsigned opcode, uint8_t fm_ce_se,
uint64_t rdma_raddr, uct_rkey_t rdma_rkey)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(ep->super.super.super.iface,
uct_rc_mlx5_iface_common_t);
uct_rc_iface_send_desc_t *desc;
void *buffer;
ucs_status_t status;
uct_ib_log_sge_t log_sge;
status = uct_rc_mlx5_common_dm_make_data(iface, cache, hdr_len, payload,
length, &desc, &buffer, &log_sge);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
return status;
}
uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
opcode, hdr_len + length,
rdma_raddr, rdma_rkey, fm_ce_se,
0, desc, buffer,
log_sge.num_sge ? &log_sge : NULL);
return UCS_OK;
}
#endif
ucs_status_t
uct_rc_mlx5_ep_put_short(uct_ep_h tl_ep, const void *buffer, unsigned length,
uint64_t remote_addr, uct_rkey_t rkey)
{
#if HAVE_IBV_DM
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_mlx5_iface_common_t);
uct_rc_iface_t *rc_iface = &iface->super;
uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
ucs_status_t status;
if (ucs_likely((length <= UCT_IB_MLX5_PUT_MAX_SHORT(0)) || !iface->dm.dm)) {
#endif
return uct_rc_mlx5_ep_put_short_inline(tl_ep, buffer, length, remote_addr, rkey);
#if HAVE_IBV_DM
}
UCT_CHECK_LENGTH(length, 0, iface->dm.seg_len, "put_short");
UCT_RC_CHECK_RES(rc_iface, &ep->super);
status = uct_rc_mlx5_ep_short_dm(ep, NULL, 0, buffer, length,
MLX5_OPCODE_RDMA_WRITE,
MLX5_WQE_CTRL_CQ_UPDATE,
remote_addr, rkey);
if (UCS_STATUS_IS_ERR(status)) {
return status;
}
UCT_TL_EP_STAT_OP(&ep->super.super, PUT, SHORT, length);
return UCS_OK;
#endif
}
ssize_t uct_rc_mlx5_ep_put_bcopy(uct_ep_h tl_ep, uct_pack_callback_t pack_cb,
void *arg, uint64_t remote_addr, uct_rkey_t rkey)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
size_t length;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_IFACE_GET_TX_PUT_BCOPY_DESC(&iface->super, &iface->super.tx.mp,
desc, pack_cb, arg, length);
uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_RDMA_WRITE, length, remote_addr,
rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1,
NULL);
UCT_TL_EP_STAT_OP(&ep->super.super, PUT, BCOPY, length);
return length;
}
ucs_status_t uct_rc_mlx5_ep_put_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
uct_ib_iface_t UCS_V_UNUSED *iface = ucs_derived_of(tl_ep->iface,
uct_ib_iface_t);
uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep,
uct_rc_mlx5_ep_t);
ucs_status_t status;
UCT_CHECK_IOV_SIZE(iovcnt, uct_ib_iface_get_max_iov(iface),
"uct_rc_mlx5_ep_put_zcopy");
UCT_CHECK_LENGTH(uct_iov_total_length(iov, iovcnt), 0, UCT_IB_MAX_MESSAGE_SIZE,
"put_zcopy");
status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_RDMA_WRITE, iov, iovcnt,
0, NULL, 0, remote_addr, rkey, 0ul, 0, 0,
MLX5_WQE_CTRL_CQ_UPDATE, comp);
UCT_TL_EP_STAT_OP_IF_SUCCESS(status, &ep->super.super, PUT, ZCOPY,
uct_iov_total_length(iov, iovcnt));
return status;
}
ucs_status_t uct_rc_mlx5_ep_get_bcopy(uct_ep_h tl_ep,
uct_unpack_callback_t unpack_cb,
void *arg, size_t length,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
UCT_CHECK_LENGTH(length, 0, iface->super.super.config.seg_size, "get_bcopy");
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_IFACE_GET_TX_GET_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc,
unpack_cb, comp, arg, length);
uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_RDMA_READ, length, remote_addr,
rkey, MLX5_WQE_CTRL_CQ_UPDATE, 0, desc, desc + 1,
NULL);
UCT_TL_EP_STAT_OP(&ep->super.super, GET, BCOPY, length);
return UCS_INPROGRESS;
}
ucs_status_t uct_rc_mlx5_ep_get_zcopy(uct_ep_h tl_ep, const uct_iov_t *iov, size_t iovcnt,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
ucs_status_t status;
UCT_CHECK_IOV_SIZE(iovcnt, uct_ib_iface_get_max_iov(&iface->super.super),
"uct_rc_mlx5_ep_get_zcopy");
UCT_CHECK_LENGTH(uct_iov_total_length(iov, iovcnt),
iface->super.super.config.max_inl_resp + 1,
UCT_IB_MAX_MESSAGE_SIZE, "get_zcopy");
status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_RDMA_READ, iov, iovcnt,
0, NULL, 0, remote_addr, rkey, 0ul, 0, 0,
MLX5_WQE_CTRL_CQ_UPDATE, comp);
UCT_TL_EP_STAT_OP_IF_SUCCESS(status, &ep->super.super, GET, ZCOPY,
uct_iov_total_length(iov, iovcnt));
return status;
}
ucs_status_t
uct_rc_mlx5_ep_am_short(uct_ep_h tl_ep, uint8_t id, uint64_t hdr,
const void *payload, unsigned length)
{
#if HAVE_IBV_DM
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface, uct_rc_mlx5_iface_common_t);
uct_rc_iface_t *rc_iface = &iface->super;
uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
ucs_status_t status;
uct_rc_mlx5_dm_copy_data_t cache;
if (ucs_likely((sizeof(uct_rc_mlx5_am_short_hdr_t) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) ||
!iface->dm.dm)) {
#endif
return uct_rc_mlx5_ep_am_short_inline(tl_ep, id, hdr, payload, length);
#if HAVE_IBV_DM
}
UCT_CHECK_LENGTH(length + sizeof(uct_rc_mlx5_am_short_hdr_t), 0,
iface->dm.seg_len, "am_short");
UCT_CHECK_AM_ID(id);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
uct_rc_mlx5_am_hdr_fill(&cache.am_hdr.rc_hdr, id);
cache.am_hdr.am_hdr = hdr;
status = uct_rc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.am_hdr), payload, length,
MLX5_OPCODE_SEND,
MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
0, 0);
if (UCS_STATUS_IS_ERR(status)) {
return status;
}
UCT_TL_EP_STAT_OP(&ep->super.super, AM, SHORT, sizeof(cache.am_hdr) + length);
UCT_RC_UPDATE_FC(rc_iface, &ep->super, id);
return UCS_OK;
#endif
}
ssize_t uct_rc_mlx5_ep_am_bcopy(uct_ep_h tl_ep, uint8_t id,
uct_pack_callback_t pack_cb, void *arg,
unsigned flags)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
size_t length;
UCT_CHECK_AM_ID(id);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
UCT_RC_IFACE_GET_TX_AM_BCOPY_DESC(&iface->super, &iface->super.tx.mp, desc,
id, uct_rc_mlx5_am_hdr_fill, uct_rc_mlx5_hdr_t,
pack_cb, arg, &length);
uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_SEND, sizeof(uct_rc_mlx5_hdr_t) + length,
0, 0, MLX5_WQE_CTRL_SOLICITED, 0, desc, desc + 1,
NULL);
UCT_TL_EP_STAT_OP(&ep->super.super, AM, BCOPY, length);
UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
return length;
}
ucs_status_t uct_rc_mlx5_ep_am_zcopy(uct_ep_h tl_ep, uint8_t id, const void *header,
unsigned header_length, const uct_iov_t *iov,
size_t iovcnt, unsigned flags,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
ucs_status_t status;
UCT_CHECK_IOV_SIZE(iovcnt, UCT_IB_MLX5_AM_ZCOPY_MAX_IOV,
"uct_rc_mlx5_ep_am_zcopy");
UCT_RC_MLX5_CHECK_AM_ZCOPY(id, header_length, uct_iov_total_length(iov, iovcnt),
iface->super.super.config.seg_size, 0);
UCT_RC_CHECK_FC(&iface->super, &ep->super, id);
status = uct_rc_mlx5_ep_zcopy_post(ep, MLX5_OPCODE_SEND, iov, iovcnt,
id, header, header_length, 0, 0, 0ul, 0, 0,
MLX5_WQE_CTRL_SOLICITED, comp);
if (ucs_likely(status >= 0)) {
UCT_TL_EP_STAT_OP(&ep->super.super, AM, ZCOPY,
header_length + uct_iov_total_length(iov, iovcnt));
UCT_RC_UPDATE_FC(&iface->super, &ep->super, id);
}
return status;
}
static UCS_F_ALWAYS_INLINE void
uct_rc_mlx5_ep_atomic_post(uct_ep_h tl_ep, unsigned opcode,
uct_rc_iface_send_desc_t *desc, unsigned length,
uint64_t remote_addr, uct_rkey_t rkey,
uint64_t compare_mask, uint64_t compare,
uint64_t swap_mask, uint64_t swap_add)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uint32_t ib_rkey = uct_ib_resolve_atomic_rkey(rkey, ep->atomic_mr_offset,
&remote_addr);
desc->super.sn = ep->tx.wq.sw_pi;
uct_rc_mlx5_txqp_dptr_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
opcode, desc + 1, length, &desc->lkey,
remote_addr, ib_rkey,
compare_mask, compare, swap_mask, swap_add,
NULL, NULL, 0, MLX5_WQE_CTRL_CQ_UPDATE,
0, INT_MAX, NULL);
UCT_TL_EP_STAT_ATOMIC(&ep->super.super);
uct_rc_txqp_add_send_op(&ep->super.txqp, &desc->super);
}
static UCS_F_ALWAYS_INLINE ucs_status_t
uct_rc_mlx5_ep_atomic_fop(uct_ep_h tl_ep, int opcode, void *result, int ext,
unsigned length, uint64_t remote_addr, uct_rkey_t rkey,
uint64_t compare_mask, uint64_t compare,
uint64_t swap_mask, uint64_t swap_add, uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_IFACE_GET_TX_ATOMIC_FETCH_DESC(&iface->super, &iface->tx.atomic_desc_mp,
desc, uct_rc_iface_atomic_handler(&iface->super, ext,
length),
result, comp);
uct_rc_mlx5_ep_atomic_post(tl_ep, opcode, desc, length, remote_addr, rkey,
compare_mask, compare, swap_mask, swap_add);
return UCS_INPROGRESS;
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_atomic_op_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
uint64_t value, uint64_t remote_addr, uct_rkey_t rkey)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
int op;
uint64_t compare_mask;
uint64_t compare;
uint64_t swap_mask;
uint64_t swap;
int ext; /* not used here */
ucs_status_t status;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_OPS);
status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
&compare, &swap_mask, &swap, &ext);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
return status;
}
UCT_RC_IFACE_GET_TX_ATOMIC_DESC(&iface->super, &iface->tx.atomic_desc_mp, desc);
uct_rc_mlx5_ep_atomic_post(tl_ep, op, desc, size, remote_addr, rkey,
compare_mask, compare, swap_mask, swap);
return UCS_OK;
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_atomic_fop_post(uct_ep_h tl_ep, unsigned opcode, unsigned size,
uint64_t value, void *result,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
int op;
uint64_t compare_mask;
uint64_t compare;
uint64_t swap_mask;
uint64_t swap;
int ext;
ucs_status_t status;
UCT_RC_MLX5_CHECK_ATOMIC_OPS(opcode, size, UCT_RC_MLX5_ATOMIC_FOPS);
status = uct_rc_mlx5_iface_common_atomic_data(opcode, size, value, &op, &compare_mask,
&compare, &swap_mask, &swap, &ext);
if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
return status;
}
return uct_rc_mlx5_ep_atomic_fop(tl_ep, op, result, ext, size, remote_addr, rkey,
compare_mask, compare, swap_mask, swap, comp);
}
ucs_status_t uct_rc_mlx5_ep_atomic32_post(uct_ep_h ep, unsigned opcode, uint32_t value,
uint64_t remote_addr, uct_rkey_t rkey)
{
return uct_rc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
}
ucs_status_t uct_rc_mlx5_ep_atomic64_post(uct_ep_h ep, unsigned opcode, uint64_t value,
uint64_t remote_addr, uct_rkey_t rkey)
{
return uct_rc_mlx5_ep_atomic_op_post(ep, opcode, sizeof(value), value, remote_addr, rkey);
}
ucs_status_t uct_rc_mlx5_ep_atomic64_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
uint64_t value, uint64_t *result,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
return uct_rc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
remote_addr, rkey, comp);
}
ucs_status_t uct_rc_mlx5_ep_atomic32_fetch(uct_ep_h ep, uct_atomic_op_t opcode,
uint32_t value, uint32_t *result,
uint64_t remote_addr, uct_rkey_t rkey,
uct_completion_t *comp)
{
return uct_rc_mlx5_ep_atomic_fop_post(ep, opcode, sizeof(value), value, result,
remote_addr, rkey, comp);
}
ucs_status_t uct_rc_mlx5_ep_atomic_cswap64(uct_ep_h tl_ep, uint64_t compare, uint64_t swap,
uint64_t remote_addr, uct_rkey_t rkey,
uint64_t *result, uct_completion_t *comp)
{
return uct_rc_mlx5_ep_atomic_fop(tl_ep, MLX5_OPCODE_ATOMIC_CS, result, 0, sizeof(uint64_t),
remote_addr, rkey, 0, htobe64(compare),
UINT64_MAX, htobe64(swap), comp);
}
ucs_status_t uct_rc_mlx5_ep_atomic_cswap32(uct_ep_h tl_ep, uint32_t compare, uint32_t swap,
uint64_t remote_addr, uct_rkey_t rkey,
uint32_t *result, uct_completion_t *comp)
{
return uct_rc_mlx5_ep_atomic_fop(tl_ep, MLX5_OPCODE_ATOMIC_MASKED_CS, result, 1,
sizeof(uint32_t), remote_addr, rkey, UCS_MASK(32),
htonl(compare), UINT64_MAX, htonl(swap), comp);
}
ucs_status_t uct_rc_mlx5_ep_fence(uct_ep_h tl_ep, unsigned flags)
{
uct_rc_mlx5_ep_t *ep = ucs_derived_of(tl_ep, uct_rc_mlx5_ep_t);
return uct_rc_ep_fence(tl_ep, &ep->tx.wq.fi, 1);
}
ucs_status_t uct_rc_mlx5_ep_flush(uct_ep_h tl_ep, unsigned flags,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
ucs_status_t status;
uint16_t sn;
if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
uct_ep_pending_purge(&ep->super.super.super, NULL, 0);
uct_rc_mlx5_ep_handle_failure(ep, UCS_ERR_CANCELED);
return UCS_OK;
}
status = uct_rc_ep_flush(&ep->super, ep->tx.wq.bb_max, flags);
if (status != UCS_INPROGRESS) {
return status;
}
if (uct_rc_txqp_unsignaled(&ep->super.txqp) != 0) {
sn = ep->tx.wq.sw_pi;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_NOP, NULL, 0,
0, 0, 0,
0, 0,
NULL, NULL, 0, 0,
INT_MAX);
} else {
sn = ep->tx.wq.sig_pi;
}
return uct_rc_txqp_add_flush_comp(&iface->super, &ep->super.super,
&ep->super.txqp, comp, sn);
}
ucs_status_t uct_rc_mlx5_ep_fc_ctrl(uct_ep_t *tl_ep, unsigned op,
uct_rc_fc_request_t *req)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
/* In RC only PURE grant is sent as a separate message. Other FC
* messages are bundled with AM. */
ucs_assert(op == UCT_RC_EP_FC_PURE_GRANT);
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_txqp_inline_post(iface, IBV_QPT_RC,
&ep->super.txqp, &ep->tx.wq,
MLX5_OPCODE_SEND|UCT_RC_MLX5_OPCODE_FLAG_RAW,
NULL, 0,
UCT_RC_EP_FC_PURE_GRANT, 0, 0,
0, 0,
NULL, NULL, 0, 0,
INT_MAX);
return UCS_OK;
}
ucs_status_t uct_rc_mlx5_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_mlx5_ep_address_t *rc_addr = (uct_rc_mlx5_ep_address_t*)addr;
uct_ib_pack_uint24(rc_addr->qp_num, ep->tx.wq.super.qp_num);
rc_addr->atomic_mr_id = uct_ib_mlx5_iface_get_atomic_mr_id(&iface->super.super);
if (UCT_RC_MLX5_TM_ENABLED(iface)) {
uct_ib_pack_uint24(rc_addr->tm_qp_num, ep->tm_qp.qp_num);
}
return UCS_OK;
}
void uct_rc_mlx5_common_packet_dump(uct_base_iface_t *iface, uct_am_trace_type_t type,
void *data, size_t length, size_t valid_length,
char *buffer, size_t max)
{
uct_rc_mlx5_hdr_t *rch = data;
#if IBV_HW_TM
if (rch->tmh_opcode != IBV_TMH_NO_TAG) {
struct ibv_tmh *tmh = ucs_unaligned_ptr(rch);
struct ibv_rvh *rvh = (void*)(tmh + 1);
uct_tag_t tag;
uint32_t app_ctx;
tag = tmh->tag;
app_ctx = tmh->app_ctx;
switch (rch->tmh_opcode) {
case IBV_TMH_EAGER:
snprintf(buffer, max, " EAGER tag %lx app_ctx %d", tag, app_ctx);
return;
case IBV_TMH_RNDV:
snprintf(buffer, max, " RNDV tag %lx app_ctx %d va 0x%lx len %d rkey %x",
tag, app_ctx, be64toh(rvh->va), ntohl(rvh->len), ntohl(rvh->rkey));
return;
case IBV_TMH_FIN:
snprintf(buffer, max, " FIN tag %lx app_ctx %d", tag, app_ctx);
return;
default:
break;
}
}
#endif
data = &rch->rc_hdr;
/* coverity[overrun-buffer-val] */
uct_rc_ep_packet_dump(iface, type, data, length - UCS_PTR_BYTE_DIFF(rch, data),
valid_length, buffer, max);
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_connect_qp(uct_rc_mlx5_iface_common_t *iface,
uct_ib_mlx5_qp_t *qp, uint32_t qp_num,
struct ibv_ah_attr *ah_attr)
{
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.md, uct_ib_mlx5_md_t);
if (md->flags & UCT_IB_MLX5_MD_FLAG_DEVX) {
return uct_rc_mlx5_iface_common_devx_connect_qp(iface, qp, qp_num, ah_attr);
} else {
return uct_rc_iface_qp_connect(&iface->super, qp->verbs.qp, qp_num, ah_attr);
}
}
ucs_status_t uct_rc_mlx5_ep_connect_to_ep(uct_ep_h tl_ep,
const uct_device_addr_t *dev_addr,
const uct_ep_addr_t *ep_addr)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
const uct_ib_address_t *ib_addr = (const uct_ib_address_t *)dev_addr;
const uct_rc_mlx5_ep_address_t *rc_addr = (const uct_rc_mlx5_ep_address_t*)ep_addr;
uint32_t qp_num;
struct ibv_ah_attr ah_attr;
ucs_status_t status;
uct_ib_iface_fill_ah_attr_from_addr(&iface->super.super, ib_addr, &ah_attr);
if (UCT_RC_MLX5_TM_ENABLED(iface)) {
/* For HW TM we need 2 QPs, one of which will be used by the device for
* RNDV offload (for issuing RDMA reads and sending RNDV ACK). No WQEs
* should be posted to the send side of the QP which is owned by device. */
status = uct_rc_mlx5_ep_connect_qp(iface, &ep->tm_qp,
uct_ib_unpack_uint24(rc_addr->qp_num),
&ah_attr);
if (status != UCS_OK) {
return status;
}
/* Need to connect local ep QP to the one owned by device
* (and bound to XRQ) on the peer. */
qp_num = uct_ib_unpack_uint24(rc_addr->tm_qp_num);
} else {
qp_num = uct_ib_unpack_uint24(rc_addr->qp_num);
}
status = uct_rc_mlx5_ep_connect_qp(iface, &ep->tx.wq.super,
qp_num, &ah_attr);
if (status != UCS_OK) {
return status;
}
ep->atomic_mr_offset = uct_ib_md_atomic_offset(rc_addr->atomic_mr_id);
return UCS_OK;
}
#if IBV_HW_TM
ucs_status_t uct_rc_mlx5_ep_tag_rndv_cancel(uct_ep_h tl_ep, void *op)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(tl_ep->iface,
uct_rc_mlx5_iface_common_t);
uint32_t op_index = (uint32_t)((uint64_t)op);
ucs_ptr_array_remove(&iface->tm.rndv_comps, op_index, 0);
return UCS_OK;
}
static ucs_status_t UCS_F_ALWAYS_INLINE
uct_rc_mlx5_ep_tag_eager_short_inline(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
UCT_IB_MLX5_AM_MAX_SHORT(0), "tag_short");
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
&ep->tx.wq, MLX5_OPCODE_SEND, data, length,
NULL, tag, 0, IBV_TMH_EAGER, 0, NULL,
NULL, 0, NULL, 0, MLX5_WQE_CTRL_SOLICITED);
UCT_TL_EP_STAT_OP(&ep->super.super, TAG, SHORT, length);
return UCS_OK;
}
ucs_status_t uct_rc_mlx5_ep_tag_eager_short(uct_ep_h tl_ep, uct_tag_t tag,
const void *data, size_t length)
{
#if HAVE_IBV_DM
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_mlx5_dm_copy_data_t cache;
ucs_status_t status;
if (ucs_likely((sizeof(struct ibv_tmh) + length <= UCT_IB_MLX5_AM_MAX_SHORT(0)) ||
!iface->dm.dm)) {
#endif
return uct_rc_mlx5_ep_tag_eager_short_inline(tl_ep, tag, data, length);
#if HAVE_IBV_DM
}
UCT_CHECK_LENGTH(length + sizeof(struct ibv_tmh), 0,
iface->dm.seg_len, "tag_short");
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_fill_tmh(ucs_unaligned_ptr(&cache.tm_hdr), tag, 0, IBV_TMH_EAGER);
status = uct_rc_mlx5_ep_short_dm(ep, &cache, sizeof(cache.tm_hdr), data, length,
MLX5_OPCODE_SEND,
MLX5_WQE_CTRL_SOLICITED | MLX5_WQE_CTRL_CQ_UPDATE,
0, 0);
if (!UCS_STATUS_IS_ERR(status)) {
UCT_TL_EP_STAT_OP(&ep->super.super, TAG, SHORT, length);
}
return status;
#endif
}
ssize_t uct_rc_mlx5_ep_tag_eager_bcopy(uct_ep_h tl_ep, uct_tag_t tag,
uint64_t imm,
uct_pack_callback_t pack_cb,
void *arg, unsigned flags)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uct_rc_iface_send_desc_t *desc;
uint32_t app_ctx, ib_imm;
int opcode;
size_t length;
UCT_RC_CHECK_RES(&iface->super, &ep->super);
UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
_IMM);
UCT_RC_MLX5_IFACE_GET_TM_BCOPY_DESC(&iface->super, iface->tm.bcopy_mp,
desc, tag, app_ctx, pack_cb, arg, length);
uct_rc_mlx5_txqp_bcopy_post(iface, &ep->super.txqp, &ep->tx.wq,
opcode, sizeof(struct ibv_tmh) + length,
0, 0, MLX5_WQE_CTRL_SOLICITED, ib_imm,
desc, desc + 1, NULL);
UCT_TL_EP_STAT_OP(&ep->super.super, TAG, BCOPY, length);
return length;
}
ucs_status_t uct_rc_mlx5_ep_tag_eager_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
uint64_t imm, const uct_iov_t *iov,
size_t iovcnt, unsigned flags,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
uint32_t app_ctx, ib_imm;
int opcode;
UCT_CHECK_IOV_SIZE(iovcnt, UCT_RC_MLX5_TM_EAGER_ZCOPY_MAX_IOV(0),
"uct_rc_mlx5_ep_tag_eager_zcopy");
UCT_RC_CHECK_ZCOPY_DATA(sizeof(struct ibv_tmh),
uct_iov_total_length(iov, iovcnt),
iface->tm.max_zcopy);
UCT_RC_MLX5_FILL_TM_IMM(imm, app_ctx, ib_imm, opcode, MLX5_OPCODE_SEND,
_IMM);
UCT_TL_EP_STAT_OP(&ep->super.super, TAG, ZCOPY,
uct_iov_total_length(iov, iovcnt));
return uct_rc_mlx5_ep_zcopy_post(ep, opcode|UCT_RC_MLX5_OPCODE_FLAG_TM,
iov, iovcnt, 0, "", 0, 0, 0,
tag, app_ctx, ib_imm,
MLX5_WQE_CTRL_SOLICITED, comp);
}
ucs_status_ptr_t uct_rc_mlx5_ep_tag_rndv_zcopy(uct_ep_h tl_ep, uct_tag_t tag,
const void *header,
unsigned header_length,
const uct_iov_t *iov,
size_t iovcnt, unsigned flags,
uct_completion_t *comp)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
unsigned tm_hdr_len = sizeof(struct ibv_tmh) +
sizeof(struct ibv_rvh);
uint32_t op_index;
UCT_RC_MLX5_CHECK_RNDV_PARAMS(iovcnt, header_length, tm_hdr_len,
UCT_IB_MLX5_AM_MAX_SHORT(0),
iface->tm.max_rndv_data +
UCT_RC_MLX5_TMH_PRIV_LEN);
UCT_RC_MLX5_CHECK_RES_PTR(iface, ep);
op_index = uct_rc_mlx5_tag_get_op_id(iface, comp);
uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
&ep->tx.wq, MLX5_OPCODE_SEND, header,
header_length, iov, tag, op_index,
IBV_TMH_RNDV, 0, NULL, NULL, 0,
NULL, 0, MLX5_WQE_CTRL_SOLICITED);
return (ucs_status_ptr_t)((uint64_t)op_index);
}
ucs_status_t uct_rc_mlx5_ep_tag_rndv_request(uct_ep_h tl_ep, uct_tag_t tag,
const void* header,
unsigned header_length,
unsigned flags)
{
UCT_RC_MLX5_EP_DECL(tl_ep, iface, ep);
UCT_CHECK_LENGTH(header_length + sizeof(struct ibv_tmh), 0,
UCT_IB_MLX5_AM_MAX_SHORT(0), "tag_rndv_request");
UCT_RC_CHECK_RES(&iface->super, &ep->super);
uct_rc_mlx5_txqp_tag_inline_post(iface, IBV_QPT_RC, &ep->super.txqp,
&ep->tx.wq, MLX5_OPCODE_SEND_IMM, header,
header_length, NULL, tag, 0,
IBV_TMH_EAGER, 0, NULL, NULL, 0,
NULL, 0, MLX5_WQE_CTRL_SOLICITED);
return UCS_OK;
}
#endif /* IBV_HW_TM */
UCS_CLASS_INIT_FUNC(uct_rc_mlx5_ep_t, const uct_ep_params_t *params)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(params->iface,
uct_rc_mlx5_iface_common_t);
uct_ib_qp_attr_t attr = {};
ucs_status_t status;
/* Need to create QP before super constructor to get QP number */
uct_rc_mlx5_iface_fill_attr(iface, &attr, iface->super.config.tx_qp_len,
&iface->rx.srq);
uct_ib_exp_qp_fill_attr(&iface->super.super, &attr);
status = uct_rc_mlx5_iface_create_qp(iface, &self->tx.wq.super, &self->tx.wq, &attr);
if (status != UCS_OK) {
return status;
}
UCS_CLASS_CALL_SUPER_INIT(uct_rc_ep_t, &iface->super, self->tx.wq.super.qp_num);
if (self->tx.wq.super.type == UCT_IB_MLX5_OBJ_TYPE_VERBS) {
status = uct_rc_iface_qp_init(&iface->super, self->tx.wq.super.verbs.qp);
if (status != UCS_OK) {
goto err;
}
}
uct_rc_iface_add_qp(&iface->super, &self->super, self->tx.wq.super.qp_num);
if (UCT_RC_MLX5_TM_ENABLED(iface)) {
/* Send queue of this QP will be used by FW for HW RNDV. Driver requires
* such a QP to be initialized with zero send queue length. */
memset(&attr, 0, sizeof(attr));
uct_rc_mlx5_iface_fill_attr(iface, &attr, 0, &iface->rx.srq);
uct_ib_exp_qp_fill_attr(&iface->super.super, &attr);
status = uct_rc_mlx5_iface_create_qp(iface, &self->tm_qp, NULL, &attr);
if (status != UCS_OK) {
goto err;
}
uct_rc_iface_add_qp(&iface->super, &self->super, self->tm_qp.qp_num);
}
self->tx.wq.bb_max = ucs_min(self->tx.wq.bb_max, iface->tx.bb_max);
self->mp.free = 1;
uct_rc_txqp_available_set(&self->super.txqp, self->tx.wq.bb_max);
return UCS_OK;
err:
uct_ib_mlx5_destroy_qp(&self->tx.wq.super);
return status;
}
static void uct_rc_mlx5_ep_clean_qp(uct_rc_mlx5_ep_t *ep, uct_ib_mlx5_qp_t *qp)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(ep->super.super.super.iface,
uct_rc_mlx5_iface_common_t);
uct_ib_mlx5_md_t *md = ucs_derived_of(iface->super.super.super.md,
uct_ib_mlx5_md_t);
/* Make the HW generate CQEs for all in-progress SRQ receives from the QP,
* so we clean them all before ibv_modify_qp() can see them.
*/
#if HAVE_DECL_IBV_CMD_MODIFY_QP && !HAVE_DEVX
struct ibv_qp_attr qp_attr;
struct ibv_modify_qp cmd;
int ret;
/* Bypass mlx5 driver, and go directly to command interface, to avoid
* cleaning the CQ in mlx5 driver
*/
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_RESET;
ret = ibv_cmd_modify_qp(qp->verbs.qp, &qp_attr, IBV_QP_STATE, &cmd, sizeof(cmd));
if (ret) {
ucs_warn("modify qp 0x%x to RESET failed: %m", qp->qp_num);
}
#else
(void)uct_ib_mlx5_modify_qp_state(md, qp, IBV_QPS_ERR);
#endif
iface->super.rx.srq.available += uct_rc_mlx5_iface_commom_clean(
&iface->cq[UCT_IB_DIR_RX],
&iface->rx.srq, qp->qp_num);
/* Synchronize CQ index with the driver, since it would remove pending
* completions for this QP (both send and receive) during ibv_destroy_qp().
*/
uct_rc_mlx5_iface_common_update_cqs_ci(iface, &iface->super.super);
(void)uct_ib_mlx5_modify_qp_state(md, qp, IBV_QPS_RESET);
uct_rc_mlx5_iface_common_sync_cqs_ci(iface, &iface->super.super);
}
static UCS_CLASS_CLEANUP_FUNC(uct_rc_mlx5_ep_t)
{
uct_rc_mlx5_iface_common_t *iface = ucs_derived_of(self->super.super.super.iface,
uct_rc_mlx5_iface_common_t);
uct_ib_mlx5_txwq_cleanup(&self->tx.wq);
uct_rc_mlx5_ep_clean_qp(self, &self->tx.wq.super);
#if IBV_HW_TM
if (UCT_RC_MLX5_TM_ENABLED(iface)) {
uct_rc_mlx5_ep_clean_qp(self, &self->tm_qp);
uct_ib_mlx5_iface_put_res_domain(&self->tm_qp);
uct_rc_iface_remove_qp(&iface->super, self->tm_qp.qp_num);
uct_ib_mlx5_destroy_qp(&self->tm_qp);
}
#endif
ucs_assert(self->mp.free == 1);
/* Return all credits if user do flush(UCT_FLUSH_FLAG_CANCEL) before
* ep_destroy.
*/
uct_rc_txqp_available_add(&self->super.txqp,
self->tx.wq.bb_max -
uct_rc_txqp_available(&self->super.txqp));
uct_ib_mlx5_srq_cleanup(&iface->rx.srq, iface->rx.srq.verbs.srq);
uct_rc_iface_remove_qp(&iface->super, self->tx.wq.super.qp_num);
uct_ib_mlx5_destroy_qp(&self->tx.wq.super);
}
ucs_status_t uct_rc_mlx5_ep_handle_failure(uct_rc_mlx5_ep_t *ep,
ucs_status_t status)
{
uct_ib_iface_t *ib_iface = ucs_derived_of(ep->super.super.super.iface,
uct_ib_iface_t);
uct_rc_iface_t *rc_iface = ucs_derived_of(ib_iface, uct_rc_iface_t);
uct_rc_txqp_purge_outstanding(&ep->super.txqp, status, 0);
/* poll_cqe for mlx5 returns NULL in case of failure and the cq_avaialble
is not updated for the error cqe and all outstanding wqes*/
rc_iface->tx.cq_available += ep->tx.wq.bb_max -
uct_rc_txqp_available(&ep->super.txqp);
return ib_iface->ops->set_ep_failed(ib_iface, &ep->super.super.super,
status);
}
ucs_status_t uct_rc_mlx5_ep_set_failed(uct_ib_iface_t *iface, uct_ep_h ep,
ucs_status_t status)
{
return uct_set_ep_failed(&UCS_CLASS_NAME(uct_rc_mlx5_ep_t), ep,
&iface->super.super, status);
}
UCS_CLASS_DEFINE(uct_rc_mlx5_ep_t, uct_rc_ep_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_mlx5_ep_t, uct_ep_t, const uct_ep_params_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_rc_mlx5_ep_t, uct_ep_t);