/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "rc_iface.h"
#include "rc_ep.h"
#include <ucs/arch/cpu.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/type/class.h>
static const char *uct_rc_fence_mode_values[] = {
[UCT_RC_FENCE_MODE_NONE] = "none",
[UCT_RC_FENCE_MODE_WEAK] = "weak",
[UCT_RC_FENCE_MODE_STRONG] = "strong",
[UCT_RC_FENCE_MODE_AUTO] = "auto",
[UCT_RC_FENCE_MODE_LAST] = NULL
};
ucs_config_field_t uct_rc_iface_common_config_table[] = {
{"IB_", "RX_INLINE=64;RX_QUEUE_LEN=4095;SEG_SIZE=8256", NULL,
ucs_offsetof(uct_rc_iface_common_config_t, super),
UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},
{"PATH_MTU", "default",
"Path MTU. \"default\" will select the best MTU for the device.",
ucs_offsetof(uct_rc_iface_common_config_t, path_mtu),
UCS_CONFIG_TYPE_ENUM(uct_ib_mtu_values)},
{"MAX_RD_ATOMIC", "4",
"Maximal number of outstanding read or atomic replies",
ucs_offsetof(uct_rc_iface_common_config_t, max_rd_atomic), UCS_CONFIG_TYPE_UINT},
{"TIMEOUT", "1.0s",
"Transport timeout",
ucs_offsetof(uct_rc_iface_common_config_t, tx.timeout), UCS_CONFIG_TYPE_TIME},
{"RETRY_COUNT", "7",
"Transport retries",
ucs_offsetof(uct_rc_iface_common_config_t, tx.retry_count), UCS_CONFIG_TYPE_UINT},
{"RNR_TIMEOUT", "1ms",
"RNR timeout",
ucs_offsetof(uct_rc_iface_common_config_t, tx.rnr_timeout), UCS_CONFIG_TYPE_TIME},
{"RNR_RETRY_COUNT", "7",
"RNR retries",
ucs_offsetof(uct_rc_iface_common_config_t, tx.rnr_retry_count), UCS_CONFIG_TYPE_UINT},
{"FC_ENABLE", "y",
"Enable flow control protocol to prevent sender from overwhelming the receiver,\n"
"thus avoiding RC RnR backoff timer.",
ucs_offsetof(uct_rc_iface_common_config_t, fc.enable), UCS_CONFIG_TYPE_BOOL},
{"FC_WND_SIZE", "512",
"The size of flow control window per endpoint. limits the number of AM\n"
"which can be sent w/o acknowledgment.",
ucs_offsetof(uct_rc_iface_common_config_t, fc.wnd_size), UCS_CONFIG_TYPE_UINT},
{"FC_HARD_THRESH", "0.25",
"Threshold for sending hard request for FC credits to the peer. This value\n"
"refers to the percentage of the FC_WND_SIZE value. (must be > 0 and < 1)",
ucs_offsetof(uct_rc_iface_common_config_t, fc.hard_thresh), UCS_CONFIG_TYPE_DOUBLE},
#if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
{"OOO_RW", "n",
"Enable out-of-order RDMA data placement",
ucs_offsetof(uct_rc_iface_common_config_t, ooo_rw), UCS_CONFIG_TYPE_BOOL},
#endif
{"FENCE", "auto",
"IB fence type when API fence requested:\n"
" none - fence is a no-op\n"
" weak - fence makes sure remote reads are ordered with respect to remote writes\n"
" strong - fence makes sure that subsequent remote operations start only after\n"
" previous remote operations complete\n"
" auto - select fence mode based on hardware capabilities",
ucs_offsetof(uct_rc_iface_common_config_t, fence_mode),
UCS_CONFIG_TYPE_ENUM(uct_rc_fence_mode_values)},
{NULL}
};
/* Config relevant for rc_mlx5 and rc_verbs only (not for dc) */
ucs_config_field_t uct_rc_iface_config_table[] = {
{"RC_", "MAX_NUM_EPS=256", NULL,
ucs_offsetof(uct_rc_iface_config_t, super),
UCS_CONFIG_TYPE_TABLE(uct_rc_iface_common_config_table)},
{"FC_SOFT_THRESH", "0.5",
"Threshold for sending soft request for FC credits to the peer. This value\n"
"refers to the percentage of the FC_WND_SIZE value. (must be > HARD_THRESH and < 1)",
ucs_offsetof(uct_rc_iface_config_t, soft_thresh), UCS_CONFIG_TYPE_DOUBLE},
{"TX_CQ_MODERATION", "64",
"Maximum number of send WQEs which can be posted without requesting a completion.",
ucs_offsetof(uct_rc_iface_config_t, tx_cq_moderation), UCS_CONFIG_TYPE_UINT},
{"TX_CQ_LEN", "4096",
"Length of send completion queue. This limits the total number of outstanding signaled sends.",
ucs_offsetof(uct_rc_iface_config_t, tx_cq_len), UCS_CONFIG_TYPE_UINT},
{NULL}
};
#if ENABLE_STATS
static ucs_stats_class_t uct_rc_iface_stats_class = {
.name = "rc_iface",
.num_counters = UCT_RC_IFACE_STAT_LAST,
.counter_names = {
[UCT_RC_IFACE_STAT_RX_COMPLETION] = "rx_completion",
[UCT_RC_IFACE_STAT_TX_COMPLETION] = "tx_completion",
[UCT_RC_IFACE_STAT_NO_CQE] = "no_cqe"
}
};
#endif /* ENABLE_STATS */
static ucs_mpool_ops_t uct_rc_fc_pending_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = NULL,
.obj_cleanup = NULL
};
static void
uct_rc_iface_flush_comp_init(ucs_mpool_t *mp, void *obj, void *chunk)
{
uct_rc_iface_t *iface = ucs_container_of(mp, uct_rc_iface_t, tx.flush_mp);
uct_rc_iface_send_op_t *op = obj;
op->handler = uct_rc_ep_flush_op_completion_handler;
op->flags = 0;
op->iface = iface;
}
static ucs_mpool_ops_t uct_rc_flush_comp_mpool_ops = {
.chunk_alloc = ucs_mpool_chunk_malloc,
.chunk_release = ucs_mpool_chunk_free,
.obj_init = uct_rc_iface_flush_comp_init,
.obj_cleanup = NULL
};
ucs_status_t uct_rc_iface_query(uct_rc_iface_t *iface,
uct_iface_attr_t *iface_attr,
size_t put_max_short, size_t max_inline,
size_t am_max_hdr, size_t am_max_iov,
size_t tag_max_iov, size_t tag_min_hdr)
{
uct_ib_device_t *dev = uct_ib_iface_device(&iface->super);
ucs_status_t status;
status = uct_ib_iface_query(&iface->super,
ucs_max(sizeof(uct_rc_hdr_t), UCT_IB_RETH_LEN),
iface_attr);
if (status != UCS_OK) {
return status;
}
iface_attr->iface_addr_len = 0;
iface_attr->ep_addr_len = sizeof(uct_rc_ep_address_t);
iface_attr->max_conn_priv = 0;
iface_attr->cap.flags = UCT_IFACE_FLAG_AM_BCOPY |
UCT_IFACE_FLAG_AM_ZCOPY |
UCT_IFACE_FLAG_PUT_BCOPY |
UCT_IFACE_FLAG_PUT_ZCOPY |
UCT_IFACE_FLAG_GET_BCOPY |
UCT_IFACE_FLAG_GET_ZCOPY |
UCT_IFACE_FLAG_PENDING |
UCT_IFACE_FLAG_CONNECT_TO_EP |
UCT_IFACE_FLAG_CB_SYNC |
UCT_IFACE_FLAG_EVENT_SEND_COMP |
UCT_IFACE_FLAG_EVENT_RECV;
if (uct_ib_device_has_pci_atomics(dev)) {
if (dev->pci_fadd_arg_sizes & sizeof(uint64_t)) {
iface_attr->cap.atomic64.op_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD);
iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD);
}
if (dev->pci_cswap_arg_sizes & sizeof(uint64_t)) {
iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_CSWAP);
}
iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_CPU;
} else {
if (dev->atomic_arg_sizes & sizeof(uint64_t)) {
/* TODO: remove deprecated flags */
iface_attr->cap.flags |= UCT_IFACE_FLAG_ATOMIC_DEVICE;
iface_attr->cap.atomic64.op_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD);
iface_attr->cap.atomic64.fop_flags |= UCS_BIT(UCT_ATOMIC_OP_ADD) |
UCS_BIT(UCT_ATOMIC_OP_CSWAP);
}
}
iface_attr->cap.put.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
iface_attr->cap.get.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
iface_attr->cap.am.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
iface_attr->cap.put.align_mtu = uct_ib_mtu_value(iface->config.path_mtu);
iface_attr->cap.get.align_mtu = uct_ib_mtu_value(iface->config.path_mtu);
iface_attr->cap.am.align_mtu = uct_ib_mtu_value(iface->config.path_mtu);
/* PUT */
iface_attr->cap.put.max_short = put_max_short;
iface_attr->cap.put.max_bcopy = iface->super.config.seg_size;
iface_attr->cap.put.min_zcopy = 0;
iface_attr->cap.put.max_zcopy = uct_ib_iface_port_attr(&iface->super)->max_msg_sz;
iface_attr->cap.put.max_iov = uct_ib_iface_get_max_iov(&iface->super);
/* GET */
iface_attr->cap.get.max_bcopy = iface->super.config.seg_size;
iface_attr->cap.get.min_zcopy = iface->super.config.max_inl_resp + 1;
iface_attr->cap.get.max_zcopy = uct_ib_iface_port_attr(&iface->super)->max_msg_sz;
iface_attr->cap.get.max_iov = uct_ib_iface_get_max_iov(&iface->super);
/* AM */
iface_attr->cap.am.max_short = uct_ib_iface_hdr_size(max_inline, tag_min_hdr);
iface_attr->cap.am.max_bcopy = iface->super.config.seg_size - tag_min_hdr;
iface_attr->cap.am.min_zcopy = 0;
iface_attr->cap.am.max_zcopy = iface->super.config.seg_size - tag_min_hdr;
iface_attr->cap.am.max_hdr = am_max_hdr - tag_min_hdr;
iface_attr->cap.am.max_iov = am_max_iov;
/* Error Handling */
iface_attr->cap.flags |= UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;
if (iface_attr->cap.am.max_short) {
iface_attr->cap.flags |= UCT_IFACE_FLAG_AM_SHORT;
}
if (iface_attr->cap.put.max_short) {
iface_attr->cap.flags |= UCT_IFACE_FLAG_PUT_SHORT;
}
return UCS_OK;
}
void uct_rc_iface_add_qp(uct_rc_iface_t *iface, uct_rc_ep_t *ep,
unsigned qp_num)
{
uct_rc_ep_t ***ptr, **memb;
ptr = &iface->eps[qp_num >> UCT_RC_QP_TABLE_ORDER];
if (*ptr == NULL) {
*ptr = ucs_calloc(UCS_BIT(UCT_RC_QP_TABLE_MEMB_ORDER), sizeof(**ptr),
"rc qp table");
}
memb = &(*ptr)[qp_num & UCS_MASK(UCT_RC_QP_TABLE_MEMB_ORDER)];
ucs_assert(*memb == NULL);
*memb = ep;
}
void uct_rc_iface_remove_qp(uct_rc_iface_t *iface, unsigned qp_num)
{
uct_rc_ep_t **memb;
memb = &iface->eps[qp_num >> UCT_RC_QP_TABLE_ORDER]
[qp_num & UCS_MASK(UCT_RC_QP_TABLE_MEMB_ORDER)];
ucs_assert(*memb != NULL);
*memb = NULL;
}
ucs_status_t uct_rc_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp)
{
uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
ucs_status_t status;
unsigned count;
uct_rc_ep_t *ep;
if (comp != NULL) {
return UCS_ERR_UNSUPPORTED;
}
count = 0;
ucs_list_for_each(ep, &iface->ep_list, list) {
status = uct_ep_flush(&ep->super.super, 0, NULL);
if ((status == UCS_ERR_NO_RESOURCE) || (status == UCS_INPROGRESS)) {
++count;
} else if (status != UCS_OK) {
return status;
}
}
if (count != 0) {
UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
return UCS_INPROGRESS;
}
UCT_TL_IFACE_STAT_FLUSH(&iface->super.super);
return UCS_OK;
}
void uct_rc_iface_send_desc_init(uct_iface_h tl_iface, void *obj, uct_mem_h memh)
{
uct_rc_iface_send_desc_t *desc = obj;
uct_ib_mem_t *ib_memh = memh;
desc->lkey = ib_memh->lkey;
desc->super.flags = 0;
}
static void uct_rc_iface_set_path_mtu(uct_rc_iface_t *iface,
const uct_rc_iface_common_config_t *config)
{
enum ibv_mtu port_mtu = uct_ib_iface_port_attr(&iface->super)->active_mtu;
uct_ib_device_t *dev = uct_ib_iface_device(&iface->super);
/* MTU is set by user configuration */
if (config->path_mtu != UCT_IB_MTU_DEFAULT) {
iface->config.path_mtu = (enum ibv_mtu)(config->path_mtu + (IBV_MTU_512 - UCT_IB_MTU_512));
} else if ((port_mtu > IBV_MTU_2048) && (IBV_DEV_ATTR(dev, vendor_id) == 0x02c9) &&
((IBV_DEV_ATTR(dev, vendor_part_id) == 4099) || (IBV_DEV_ATTR(dev, vendor_part_id) == 4100) ||
(IBV_DEV_ATTR(dev, vendor_part_id) == 4103) || (IBV_DEV_ATTR(dev, vendor_part_id) == 4104)))
{
/* On some devices optimal path_mtu is 2048 */
iface->config.path_mtu = IBV_MTU_2048;
} else {
iface->config.path_mtu = port_mtu;
}
}
ucs_status_t uct_rc_init_fc_thresh(uct_rc_iface_config_t *config,
uct_rc_iface_t *iface)
{
/* Check FC parameters correctness */
if ((config->soft_thresh <= config->super.fc.hard_thresh) ||
(config->soft_thresh >= 1)) {
ucs_error("The factor for soft FC threshold should be bigger"
" than FC_HARD_THRESH value and less than 1 (s=%f, h=%f)",
config->soft_thresh, config->super.fc.hard_thresh);
return UCS_ERR_INVALID_PARAM;
}
if (config->super.fc.enable) {
iface->config.fc_soft_thresh = ucs_max((int)(iface->config.fc_wnd_size *
config->soft_thresh), 1);
} else {
iface->config.fc_soft_thresh = 0;
}
return UCS_OK;
}
ucs_status_t uct_rc_iface_fc_handler(uct_rc_iface_t *iface, unsigned qp_num,
uct_rc_hdr_t *hdr, unsigned length,
uint32_t imm_data, uint16_t lid, unsigned flags)
{
ucs_status_t status;
int16_t cur_wnd;
uct_rc_fc_request_t *fc_req;
uct_rc_ep_t *ep = uct_rc_iface_lookup_ep(iface, qp_num);
uint8_t fc_hdr = uct_rc_fc_get_fc_hdr(hdr->am_id);
ucs_assert(iface->config.fc_enabled);
if (fc_hdr & UCT_RC_EP_FC_FLAG_GRANT) {
UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_GRANT, 1);
/* Got either grant flag or special FC grant message */
cur_wnd = ep->fc.fc_wnd;
/* Peer granted resources, so update wnd */
ep->fc.fc_wnd = iface->config.fc_wnd_size;
UCS_STATS_SET_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_FC_WND, ep->fc.fc_wnd);
/* To preserve ordering we have to dispatch all pending
* operations if current fc_wnd is <= 0
* (otherwise it will be dispatched by tx progress) */
if (cur_wnd <= 0) {
ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
ucs_arbiter_dispatch(&iface->tx.arbiter, 1,
uct_rc_ep_process_pending, NULL);
}
if (fc_hdr == UCT_RC_EP_FC_PURE_GRANT) {
/* Special FC grant message can't be bundled with any other FC
* request. Stop processing this AM and do not invoke AM handler */
UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_PURE_GRANT, 1);
return UCS_OK;
}
}
if (fc_hdr & UCT_RC_EP_FC_FLAG_SOFT_REQ) {
UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_SOFT_REQ, 1);
/* Got soft credit request. Mark ep that it needs to grant
* credits to the peer in outgoing AM (if any). */
ep->fc.flags |= UCT_RC_EP_FC_FLAG_GRANT;
} else if (fc_hdr & UCT_RC_EP_FC_FLAG_HARD_REQ) {
UCS_STATS_UPDATE_COUNTER(ep->fc.stats, UCT_RC_FC_STAT_RX_HARD_REQ, 1);
fc_req = ucs_mpool_get(&iface->tx.fc_mp);
if (ucs_unlikely(fc_req == NULL)) {
ucs_error("Failed to allocate FC request. "
"Grant will not be sent on ep %p", ep);
return UCS_ERR_NO_MEMORY;
}
fc_req->ep = &ep->super.super;
fc_req->super.func = uct_rc_ep_fc_grant;
/* Got hard credit request. Send grant to the peer immediately */
status = uct_rc_ep_fc_grant(&fc_req->super);
if (status == UCS_ERR_NO_RESOURCE){
/* force add request to group & schedule group to eliminate
* FC deadlock */
uct_pending_req_arb_group_push_head(&iface->tx.arbiter,
&ep->arb_group, &fc_req->super);
ucs_arbiter_group_schedule(&iface->tx.arbiter, &ep->arb_group);
} else {
ucs_assertv_always(status == UCS_OK, "Failed to send FC grant msg: %s",
ucs_status_string(status));
}
}
return uct_iface_invoke_am(&iface->super.super,
(hdr->am_id & ~UCT_RC_EP_FC_MASK),
hdr + 1, length, flags);
}
static ucs_status_t uct_rc_iface_tx_ops_init(uct_rc_iface_t *iface)
{
const unsigned count = iface->config.tx_ops_count;
uct_rc_iface_send_op_t *op;
ucs_status_t status;
iface->tx.ops_buffer = ucs_calloc(count, sizeof(*iface->tx.ops_buffer),
"rc_tx_ops");
if (iface->tx.ops_buffer == NULL) {
return UCS_ERR_NO_MEMORY;
}
iface->tx.free_ops = &iface->tx.ops_buffer[0];
for (op = iface->tx.ops_buffer; op < iface->tx.ops_buffer + count; ++op) {
op->handler = uct_rc_ep_send_op_completion_handler;
op->flags = UCT_RC_IFACE_SEND_OP_FLAG_IFACE;
op->iface = iface;
op->next = (op == (iface->tx.ops_buffer + count - 1)) ? NULL : (op + 1);
}
/* Create memory pool for flush completions. Can't just alloc a certain
* size buffer, because number of simultaneous flushes is not limited by
* CQ or QP resources. */
status = ucs_mpool_init(&iface->tx.flush_mp, 0, sizeof(*op), 0,
UCS_SYS_CACHE_LINE_SIZE, 256,
UINT_MAX, &uct_rc_flush_comp_mpool_ops,
"flush-comps-only");
return status;
}
static void uct_rc_iface_tx_ops_cleanup(uct_rc_iface_t *iface)
{
const unsigned total_count = iface->config.tx_ops_count;
uct_rc_iface_send_op_t *op;
unsigned free_count;
free_count = 0;
for (op = iface->tx.free_ops; op != NULL; op = op->next) {
++free_count;
ucs_assert(free_count <= total_count);
}
if (free_count != iface->config.tx_ops_count) {
ucs_warn("rc_iface %p: %u/%d send ops were not released", iface,
total_count- free_count, total_count);
}
ucs_free(iface->tx.ops_buffer);
ucs_mpool_cleanup(&iface->tx.flush_mp, 1);
}
unsigned uct_rc_iface_do_progress(uct_iface_h tl_iface)
{
uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
return iface->progress(iface);
}
ucs_status_t uct_rc_iface_init_rx(uct_rc_iface_t *iface,
const uct_rc_iface_common_config_t *config,
struct ibv_srq **srq_p)
{
struct ibv_srq_init_attr srq_init_attr;
struct ibv_pd *pd = uct_ib_iface_md(&iface->super)->pd;
struct ibv_srq *srq;
srq_init_attr.attr.max_sge = 1;
srq_init_attr.attr.max_wr = config->super.rx.queue_len;
srq_init_attr.attr.srq_limit = 0;
srq_init_attr.srq_context = iface;
srq = ibv_create_srq(pd, &srq_init_attr);
if (srq == NULL) {
ucs_error("ibv_create_srq() failed: %m");
return UCS_ERR_IO_ERROR;
}
iface->rx.srq.quota = srq_init_attr.attr.max_wr;
*srq_p = srq;
return UCS_OK;
}
static int uct_rc_iface_config_limit_value(const char *name,
int provided, int limit)
{
if (provided > limit) {
ucs_warn("using maximal value for %s (%d) instead of %d",
name, limit, provided);
return limit;
} else {
return provided;
}
}
UCS_CLASS_INIT_FUNC(uct_rc_iface_t, uct_rc_iface_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_rc_iface_common_config_t *config,
uct_ib_iface_init_attr_t *init_attr)
{
uct_ib_device_t *dev = &ucs_derived_of(md, uct_ib_md_t)->dev;
ucs_status_t status;
UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params,
&config->super, init_attr);
self->tx.cq_available = init_attr->tx_cq_len - 1;
self->rx.srq.available = 0;
self->rx.srq.quota = 0;
self->config.tx_qp_len = config->super.tx.queue_len;
self->config.tx_min_sge = config->super.tx.min_sge;
self->config.tx_min_inline = config->super.tx.min_inline;
self->config.tx_ops_count = init_attr->tx_cq_len;
self->config.rx_inline = config->super.rx.inl;
self->config.min_rnr_timer = uct_ib_to_rnr_fabric_time(config->tx.rnr_timeout);
self->config.timeout = uct_ib_to_qp_fabric_time(config->tx.timeout);
self->config.rnr_retry = uct_rc_iface_config_limit_value(
"RNR_RETRY_COUNT",
config->tx.rnr_retry_count,
UCT_RC_QP_MAX_RETRY_COUNT);
self->config.retry_cnt = uct_rc_iface_config_limit_value(
"RETRY_COUNT",
config->tx.retry_count,
UCT_RC_QP_MAX_RETRY_COUNT);
self->config.max_rd_atomic = config->max_rd_atomic;
self->config.ooo_rw = config->ooo_rw;
#if UCS_ENABLE_ASSERT
self->config.tx_cq_len = init_attr->tx_cq_len;
#endif
uct_ib_fence_info_init(&self->tx.fi);
uct_rc_iface_set_path_mtu(self, config);
memset(self->eps, 0, sizeof(self->eps));
ucs_arbiter_init(&self->tx.arbiter);
ucs_list_head_init(&self->ep_list);
/* Check FC parameters correctness */
if ((config->fc.hard_thresh <= 0) || (config->fc.hard_thresh >= 1)) {
ucs_error("The factor for hard FC threshold should be > 0 and < 1 (%f)",
config->fc.hard_thresh);
status = UCS_ERR_INVALID_PARAM;
goto err;
}
/* Create RX buffers mempool */
status = uct_ib_iface_recv_mpool_init(&self->super, &config->super,
"rc_recv_desc", &self->rx.mp);
if (status != UCS_OK) {
goto err;
}
/* Create TX buffers mempool */
status = uct_iface_mpool_init(&self->super.super,
&self->tx.mp,
sizeof(uct_rc_iface_send_desc_t) + self->super.config.seg_size,
sizeof(uct_rc_iface_send_desc_t),
UCS_SYS_CACHE_LINE_SIZE,
&config->super.tx.mp,
self->config.tx_qp_len,
uct_rc_iface_send_desc_init,
"rc_send_desc");
if (status != UCS_OK) {
goto err_destroy_rx_mp;
}
/* Allocate tx operations */
status = uct_rc_iface_tx_ops_init(self);
if (status != UCS_OK) {
goto err_destroy_tx_mp;
}
/* Set atomic handlers according to atomic reply endianness */
self->config.atomic64_handler = dev->atomic_arg_sizes_be & sizeof(uint64_t) ?
uct_rc_ep_atomic_handler_64_be1 :
uct_rc_ep_atomic_handler_64_be0;
self->config.atomic32_ext_handler = dev->ext_atomic_arg_sizes_be & sizeof(uint32_t) ?
uct_rc_ep_atomic_handler_32_be1 :
uct_rc_ep_atomic_handler_32_be0;
self->config.atomic64_ext_handler = dev->ext_atomic_arg_sizes_be & sizeof(uint64_t) ?
uct_rc_ep_atomic_handler_64_be1 :
uct_rc_ep_atomic_handler_64_be0;
status = UCS_STATS_NODE_ALLOC(&self->stats, &uct_rc_iface_stats_class,
self->super.super.stats);
if (status != UCS_OK) {
goto err_cleanup_tx_ops;
}
/* Initialize RX resources (SRQ) */
status = ops->init_rx(self, config);
if (status != UCS_OK) {
goto err_destroy_stats;
}
self->config.fc_enabled = config->fc.enable;
if (self->config.fc_enabled) {
/* Assume that number of recv buffers is the same on all peers.
* Then FC window size is the same for all endpoints as well.
* TODO: Make wnd size to be a property of the particular interface.
* We could distribute it via rc address then.*/
self->config.fc_wnd_size = ucs_min(config->fc.wnd_size,
config->super.rx.queue_len);
self->config.fc_hard_thresh = ucs_max((int)(self->config.fc_wnd_size *
config->fc.hard_thresh), 1);
/* Create mempool for pending requests for FC grant */
status = ucs_mpool_init(&self->tx.fc_mp,
0,
init_attr->fc_req_size,
0,
1,
128,
UINT_MAX,
&uct_rc_fc_pending_mpool_ops,
"pending-fc-grants-only");
if (status != UCS_OK) {
goto err_cleanup_rx;
}
} else {
self->config.fc_wnd_size = INT16_MAX;
self->config.fc_hard_thresh = 0;
}
return UCS_OK;
err_cleanup_rx:
ops->cleanup_rx(self);
err_destroy_stats:
UCS_STATS_NODE_FREE(self->stats);
err_cleanup_tx_ops:
uct_rc_iface_tx_ops_cleanup(self);
err_destroy_tx_mp:
ucs_mpool_cleanup(&self->tx.mp, 1);
err_destroy_rx_mp:
ucs_mpool_cleanup(&self->rx.mp, 1);
err:
return status;
}
static UCS_CLASS_CLEANUP_FUNC(uct_rc_iface_t)
{
uct_rc_iface_ops_t *ops = ucs_derived_of(self->super.ops, uct_rc_iface_ops_t);
unsigned i;
/* Release table. TODO release on-demand when removing ep. */
for (i = 0; i < UCT_RC_QP_TABLE_SIZE; ++i) {
ucs_free(self->eps[i]);
}
if (!ucs_list_is_empty(&self->ep_list)) {
ucs_warn("some eps were not destroyed");
}
ucs_arbiter_cleanup(&self->tx.arbiter);
UCS_STATS_NODE_FREE(self->stats);
ops->cleanup_rx(self);
uct_rc_iface_tx_ops_cleanup(self);
ucs_mpool_cleanup(&self->tx.mp, 1);
ucs_mpool_cleanup(&self->rx.mp, 0); /* Cannot flush SRQ */
if (self->config.fc_enabled) {
ucs_mpool_cleanup(&self->tx.fc_mp, 1);
}
}
UCS_CLASS_DEFINE(uct_rc_iface_t, uct_ib_iface_t);
void uct_rc_iface_fill_attr(uct_rc_iface_t *iface,
uct_ib_qp_attr_t *qp_init_attr,
unsigned max_send_wr,
struct ibv_srq *srq)
{
qp_init_attr->srq = srq;
qp_init_attr->cap.max_send_wr = max_send_wr;
qp_init_attr->cap.max_recv_wr = 0;
qp_init_attr->cap.max_send_sge = iface->config.tx_min_sge;
qp_init_attr->cap.max_recv_sge = 1;
qp_init_attr->cap.max_inline_data = iface->config.tx_min_inline;
qp_init_attr->qp_type = iface->super.config.qp_type;
qp_init_attr->sq_sig_all = !iface->config.tx_moderation;
qp_init_attr->max_inl_recv = iface->config.rx_inline;
qp_init_attr->max_inl_resp = iface->super.config.max_inl_resp;
}
ucs_status_t uct_rc_iface_qp_create(uct_rc_iface_t *iface, struct ibv_qp **qp_p,
uct_ib_qp_attr_t *attr, unsigned max_send_wr,
struct ibv_srq *srq)
{
uct_rc_iface_fill_attr(iface, attr, max_send_wr, srq);
uct_ib_iface_fill_attr(&iface->super, attr);
return uct_ib_iface_create_qp(&iface->super, attr, qp_p);
}
ucs_status_t uct_rc_iface_qp_init(uct_rc_iface_t *iface, struct ibv_qp *qp)
{
struct ibv_qp_attr qp_attr;
int ret;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = iface->super.pkey_index;
qp_attr.port_num = iface->super.config.port_num;
qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_ATOMIC;
ret = ibv_modify_qp(qp, &qp_attr,
IBV_QP_STATE |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT |
IBV_QP_ACCESS_FLAGS);
if (ret) {
ucs_error("error modifying QP to INIT: %m");
return UCS_ERR_IO_ERROR;
}
return UCS_OK;
}
ucs_status_t uct_rc_iface_qp_connect(uct_rc_iface_t *iface, struct ibv_qp *qp,
const uint32_t dest_qp_num,
struct ibv_ah_attr *ah_attr)
{
#if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
struct ibv_exp_qp_attr qp_attr;
uct_ib_device_t *dev;
#else
struct ibv_qp_attr qp_attr;
#endif
long qp_attr_mask;
int ret;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_RTR;
qp_attr.dest_qp_num = dest_qp_num;
qp_attr.rq_psn = 0;
qp_attr.path_mtu = iface->config.path_mtu;
qp_attr.max_dest_rd_atomic = iface->config.max_rd_atomic;
qp_attr.min_rnr_timer = iface->config.min_rnr_timer;
qp_attr.ah_attr = *ah_attr;
qp_attr_mask = IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER;
#if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
dev = uct_ib_iface_device(&iface->super);
if (iface->config.ooo_rw && UCX_IB_DEV_IS_OOO_SUPPORTED(dev, rc)) {
ucs_debug("enabling out-of-order on RC QP %x dev %s",
qp->qp_num, uct_ib_device_name(dev));
qp_attr_mask |= IBV_EXP_QP_OOO_RW_DATA_PLACEMENT;
}
ret = ibv_exp_modify_qp(qp, &qp_attr, qp_attr_mask);
#else
ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask);
#endif
if (ret) {
ucs_error("error modifying QP to RTR: %m");
return UCS_ERR_IO_ERROR;
}
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.sq_psn = 0;
qp_attr.timeout = iface->config.timeout;
qp_attr.rnr_retry = iface->config.rnr_retry;
qp_attr.retry_cnt = iface->config.retry_cnt;
qp_attr.max_rd_atomic = iface->config.max_rd_atomic;
qp_attr_mask = IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC;
#if HAVE_DECL_IBV_EXP_QP_OOO_RW_DATA_PLACEMENT
ret = ibv_exp_modify_qp(qp, &qp_attr, qp_attr_mask);
#else
ret = ibv_modify_qp(qp, &qp_attr, qp_attr_mask);
#endif
if (ret) {
ucs_error("error modifying QP to RTS: %m");
return UCS_ERR_IO_ERROR;
}
ucs_debug("connected rc qp 0x%x on "UCT_IB_IFACE_FMT" to lid %d(+%d) sl %d "
"remote_qp 0x%x mtu %zu timer %dx%d rnr %dx%d rd_atom %d",
qp->qp_num, UCT_IB_IFACE_ARG(&iface->super), ah_attr->dlid,
ah_attr->src_path_bits, ah_attr->sl, qp_attr.dest_qp_num,
uct_ib_mtu_value(qp_attr.path_mtu), qp_attr.timeout,
qp_attr.retry_cnt, qp_attr.min_rnr_timer, qp_attr.rnr_retry,
qp_attr.max_rd_atomic);
return UCS_OK;
}
ucs_status_t uct_rc_iface_common_event_arm(uct_iface_h tl_iface,
unsigned events, int force_rx_all)
{
uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
int arm_rx_solicited, arm_rx_all;
ucs_status_t status;
status = uct_ib_iface_pre_arm(&iface->super);
if (status != UCS_OK) {
return status;
}
if (events & UCT_EVENT_SEND_COMP) {
status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_TX, 0);
if (status != UCS_OK) {
return status;
}
}
arm_rx_solicited = 0;
arm_rx_all = 0;
if (events & UCT_EVENT_RECV) {
arm_rx_solicited = 1; /* to wake up on active messages */
}
if (((events & UCT_EVENT_SEND_COMP) && iface->config.fc_enabled) ||
force_rx_all) {
arm_rx_all = 1; /* to wake up on FC grants (or if forced) */
}
if (arm_rx_solicited || arm_rx_all) {
status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX,
arm_rx_solicited && !arm_rx_all);
if (status != UCS_OK) {
return status;
}
}
return UCS_OK;
}
ucs_status_t uct_rc_iface_event_arm(uct_iface_h tl_iface, unsigned events)
{
return uct_rc_iface_common_event_arm(tl_iface, events, 0);
}
ucs_status_t uct_rc_iface_fence(uct_iface_h tl_iface, unsigned flags)
{
uct_rc_iface_t *iface = ucs_derived_of(tl_iface, uct_rc_iface_t);
if (iface->config.fence_mode != UCT_RC_FENCE_MODE_NONE) {
iface->tx.fi.fence_beat++;
}
UCT_TL_IFACE_STAT_FENCE(&iface->super.super);
return UCS_OK;
}