/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "ud_iface.h"
#include "ud_ep.h"
#include "ud_inl.h"
#include <ucs/arch/cpu.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/type/class.h>
#include <ucs/datastruct/queue.h>
#include <sys/poll.h>
#include <linux/ip.h>
#define UCT_UD_IPV4_ADDR_LEN sizeof(struct in_addr)
#define UCT_UD_IPV6_ADDR_LEN sizeof(struct in6_addr)
#if ENABLE_STATS
static ucs_stats_class_t uct_ud_iface_stats_class = {
.name = "ud_iface",
.num_counters = UCT_UD_IFACE_STAT_LAST,
.counter_names = {
[UCT_UD_IFACE_STAT_RX_DROP] = "rx_drop"
}
};
#endif
/* cppcheck-suppress ctunullpointer */
SGLIB_DEFINE_LIST_FUNCTIONS(uct_ud_iface_peer_t, uct_ud_iface_peer_cmp, next)
SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(uct_ud_iface_peer_t,
UCT_UD_HASH_SIZE,
uct_ud_iface_peer_hash)
static void uct_ud_iface_free_resend_skbs(uct_ud_iface_t *iface);
static void uct_ud_iface_timer(int timer_id, void *arg);
static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface);
static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface);
void uct_ud_iface_cep_init(uct_ud_iface_t *iface)
{
sglib_hashed_uct_ud_iface_peer_t_init(iface->peers);
}
static void
uct_ud_iface_cep_cleanup_eps(uct_ud_iface_t *iface, uct_ud_iface_peer_t *peer)
{
uct_ud_ep_t *ep, *tmp;
ucs_list_for_each_safe(ep, tmp, &peer->ep_list, cep_list) {
if (ep->conn_id < peer->conn_id_last) {
/* active connection should already be cleaned by owner */
ucs_warn("iface (%p) peer (qpn=%d lid=%d) cleanup with %d endpoints still active",
iface, peer->dst_qpn, peer->dlid,
(int)ucs_list_length(&peer->ep_list));
continue;
}
ucs_list_del(&ep->cep_list);
ucs_trace("cep:ep_destroy(%p) conn_id %d", ep, ep->conn_id);
uct_ep_destroy(&ep->super.super);
}
}
void uct_ud_iface_cep_cleanup(uct_ud_iface_t *iface)
{
uct_ud_iface_peer_t *peer;
struct sglib_hashed_uct_ud_iface_peer_t_iterator it_peer;
for (peer = sglib_hashed_uct_ud_iface_peer_t_it_init(&it_peer,
iface->peers);
peer != NULL;
peer = sglib_hashed_uct_ud_iface_peer_t_it_next(&it_peer)) {
uct_ud_iface_cep_cleanup_eps(iface, peer);
free(peer);
}
}
static uct_ud_iface_peer_t *
uct_ud_iface_cep_lookup_addr(uct_ud_iface_t *iface, uint16_t dlid,
const union ibv_gid *dgid, uint32_t dest_qpn)
{
uct_ud_iface_peer_t key;
key.dlid = dlid;
key.dgid = *dgid;
key.dst_qpn = dest_qpn;
return sglib_hashed_uct_ud_iface_peer_t_find_member(iface->peers, &key);
}
static uct_ud_iface_peer_t *
uct_ud_iface_cep_lookup_peer(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr)
{
uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num);
union ibv_gid dgid;
uint16_t dlid;
uct_ib_address_unpack(src_ib_addr, &dlid, &dgid);
return uct_ud_iface_cep_lookup_addr(iface, dlid, &dgid, dest_qpn);
}
static uct_ud_ep_t *
uct_ud_iface_cep_lookup_ep(uct_ud_iface_peer_t *peer, uint32_t conn_id)
{
uint32_t id;
uct_ud_ep_t *ep;
if (conn_id != UCT_UD_EP_CONN_ID_MAX) {
id = conn_id;
} else {
id = peer->conn_id_last;
/* TODO: O(1) lookup in this case (new connection) */
}
ucs_list_for_each(ep, &peer->ep_list, cep_list) {
if (ep->conn_id == id) {
return ep;
}
if (ep->conn_id < id) {
break;
}
}
return NULL;
}
static uint32_t
uct_ud_iface_cep_getid(uct_ud_iface_peer_t *peer, uint32_t conn_id)
{
uint32_t new_id;
if (conn_id != UCT_UD_EP_CONN_ID_MAX) {
return conn_id;
}
new_id = peer->conn_id_last++;
return new_id;
}
/* insert new ep that is connected to src_if_addr */
ucs_status_t uct_ud_iface_cep_insert(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uct_ud_ep_t *ep, uint32_t conn_id)
{
uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num);
uct_ud_iface_peer_t *peer;
union ibv_gid dgid;
uct_ud_ep_t *cep;
uint16_t dlid;
uct_ib_address_unpack(src_ib_addr, &dlid, &dgid);
peer = uct_ud_iface_cep_lookup_addr(iface, dlid, &dgid, dest_qpn);
if (peer == NULL) {
peer = malloc(sizeof *peer);
if (peer == NULL) {
return UCS_ERR_NO_MEMORY;
}
peer->dlid = dlid;
peer->dgid = dgid;
peer->dst_qpn = dest_qpn;
sglib_hashed_uct_ud_iface_peer_t_add(iface->peers, peer);
ucs_list_head_init(&peer->ep_list);
peer->conn_id_last = 0;
}
ep->conn_id = uct_ud_iface_cep_getid(peer, conn_id);
if (ep->conn_id == UCT_UD_EP_CONN_ID_MAX) {
return UCS_ERR_NO_RESOURCE;
}
if (ucs_list_is_empty(&peer->ep_list)) {
ucs_list_add_head(&peer->ep_list, &ep->cep_list);
return UCS_OK;
}
ucs_list_for_each(cep, &peer->ep_list, cep_list) {
ucs_assert_always(cep->conn_id != ep->conn_id);
if (cep->conn_id < ep->conn_id) {
ucs_list_insert_before(&cep->cep_list, &ep->cep_list);
return UCS_OK;
}
}
return UCS_OK;
}
void uct_ud_iface_cep_remove(uct_ud_ep_t *ep)
{
if (ucs_list_is_empty(&ep->cep_list)) {
return;
}
ucs_trace("iface(%p) cep_remove:ep(%p)", ep->super.super.iface, ep);
ucs_list_del(&ep->cep_list);
ucs_list_head_init(&ep->cep_list);
}
uct_ud_ep_t *uct_ud_iface_cep_lookup(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uint32_t conn_id)
{
uct_ud_iface_peer_t *peer;
uct_ud_ep_t *ep;
peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr);
if (peer == NULL) {
return NULL;
}
ep = uct_ud_iface_cep_lookup_ep(peer, conn_id);
if (ep && conn_id == UCT_UD_EP_CONN_ID_MAX) {
peer->conn_id_last++;
}
return ep;
}
void uct_ud_iface_cep_rollback(uct_ud_iface_t *iface,
const uct_ib_address_t *src_ib_addr,
const uct_ud_iface_addr_t *src_if_addr,
uct_ud_ep_t *ep)
{
uct_ud_iface_peer_t *peer;
peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr);
ucs_assert_always(peer != NULL);
ucs_assert_always(peer->conn_id_last > 0);
ucs_assert_always(ep->conn_id + 1 == peer->conn_id_last);
ucs_assert_always(!ucs_list_is_empty(&peer->ep_list));
ucs_assert_always(!ucs_list_is_empty(&ep->cep_list));
peer->conn_id_last--;
uct_ud_iface_cep_remove(ep);
}
static void uct_ud_iface_send_skb_init(uct_iface_h tl_iface, void *obj,
uct_mem_h memh)
{
uct_ud_send_skb_t *skb = obj;
uct_ib_mem_t *ib_memh = memh;
skb->lkey = ib_memh->lkey;
skb->flags = 0;
}
static ucs_status_t
uct_ud_iface_create_qp(uct_ud_iface_t *self, const uct_ud_iface_config_t *config)
{
uct_ud_iface_ops_t *ops = ucs_derived_of(self->super.ops, uct_ud_iface_ops_t);
uct_ib_qp_attr_t qp_init_attr = {};
struct ibv_qp_attr qp_attr;
static ucs_status_t status;
int ret;
qp_init_attr.qp_type = IBV_QPT_UD;
qp_init_attr.sq_sig_all = 0;
qp_init_attr.cap.max_send_wr = config->super.tx.queue_len;
qp_init_attr.cap.max_recv_wr = config->super.rx.queue_len;
qp_init_attr.cap.max_send_sge = 2;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = ucs_max(config->super.tx.min_inline,
UCT_UD_MIN_INLINE);
status = ops->create_qp(&self->super, &qp_init_attr, &self->qp);
if (status != UCS_OK) {
return status;
}
self->config.max_inline = qp_init_attr.cap.max_inline_data;
uct_ib_iface_set_max_iov(&self->super, qp_init_attr.cap.max_send_sge);
memset(&qp_attr, 0, sizeof(qp_attr));
/* Modify QP to INIT state */
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = self->super.pkey_index;
qp_attr.port_num = self->super.config.port_num;
qp_attr.qkey = UCT_IB_KEY;
ret = ibv_modify_qp(self->qp, &qp_attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY);
if (ret) {
ucs_error("Failed to modify UD QP to INIT: %m");
goto err_destroy_qp;
}
/* Modify to RTR */
qp_attr.qp_state = IBV_QPS_RTR;
ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE);
if (ret) {
ucs_error("Failed to modify UD QP to RTR: %m");
goto err_destroy_qp;
}
/* Modify to RTS */
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.sq_psn = 0;
ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_SQ_PSN);
if (ret) {
ucs_error("Failed to modify UD QP to RTS: %m");
goto err_destroy_qp;
}
return UCS_OK;
err_destroy_qp:
uct_ib_destroy_qp(self->qp);
return UCS_ERR_INVALID_PARAM;
}
ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface)
{
ucs_async_context_t *async = iface->super.super.worker->async;
ucs_async_mode_t async_mode = async->mode;
ucs_status_t status;
iface->tx.resend_skbs_quota = iface->tx.available;
status = ucs_twheel_init(&iface->async.slow_timer,
iface->async.slow_tick / 4,
uct_ud_iface_get_async_time(iface));
if (status != UCS_OK) {
goto err;
}
status = ucs_async_add_timer(async_mode, iface->async.slow_tick,
uct_ud_iface_timer, iface, async,
&iface->async.timer_id);
if (status != UCS_OK) {
goto err_twheel_cleanup;
}
return UCS_OK;
err_twheel_cleanup:
ucs_twheel_cleanup(&iface->async.slow_timer);
err:
return status;
}
void uct_ud_iface_remove_async_handlers(uct_ud_iface_t *iface)
{
uct_base_iface_progress_disable(&iface->super.super.super,
UCT_PROGRESS_SEND | UCT_PROGRESS_RECV);
ucs_async_remove_handler(iface->async.timer_id, 1);
}
/* Calculate real GIDs len. Can be either 16 (RoCEv1 or RoCEv2/IPv6)
* or 4 (RoCEv2/IPv4). This len is used for packets filtering by DGIDs.
*
* According to Annex17_RoCEv2 (A17.4.5.2):
* "The first 40 bytes of user posted UD Receive Buffers are reserved for the L3
* header of the incoming packet (as per the InfiniBand Spec Section 11.4.1.2).
* In RoCEv2, this area is filled up with the IP header. IPv6 header uses the
* entire 40 bytes. IPv4 headers use the 20 bytes in the second half of the
* reserved 40 bytes area (i.e. offset 20 from the beginning of the receive
* buffer). In this case, the content of the first 20 bytes is undefined." */
static void uct_ud_iface_calc_gid_len(uct_ud_iface_t *iface)
{
uint16_t *local_gid_u16 = (uint16_t*)iface->super.gid.raw;
/* Make sure that daddr in IPv4 resides in the last 4 bytes in GRH */
UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - (20 + offsetof(struct iphdr, daddr))) ==
UCT_UD_IPV4_ADDR_LEN);
/* Make sure that dgid resides in the last 16 bytes in GRH */
UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - offsetof(struct ibv_grh, dgid)) ==
UCT_UD_IPV6_ADDR_LEN);
/* IPv4 mapped to IPv6 looks like: 0000:0000:0000:0000:0000:ffff:????:????,
* so check for leading zeroes and verify that 11-12 bytes are 0xff.
* Otherwise either RoCEv1 or RoCEv2/IPv6 are used. */
if (local_gid_u16[0] == 0x0000) {
ucs_assert_always(local_gid_u16[5] == 0xffff);
iface->config.gid_len = UCT_UD_IPV4_ADDR_LEN;
} else {
iface->config.gid_len = UCT_UD_IPV6_ADDR_LEN;
}
}
UCS_CLASS_INIT_FUNC(uct_ud_iface_t, uct_ud_iface_ops_t *ops, uct_md_h md,
uct_worker_h worker, const uct_iface_params_t *params,
const uct_ud_iface_config_t *config,
uct_ib_iface_init_attr_t *init_attr)
{
ucs_status_t status;
size_t data_size;
int mtu;
UCT_CHECK_PARAM(params->field_mask & UCT_IFACE_PARAM_FIELD_OPEN_MODE,
"UCT_IFACE_PARAM_FIELD_OPEN_MODE is not defined");
if (!(params->open_mode & UCT_IFACE_OPEN_MODE_DEVICE)) {
ucs_error("only UCT_IFACE_OPEN_MODE_DEVICE is supported");
return UCS_ERR_UNSUPPORTED;
}
ucs_trace_func("%s: iface=%p ops=%p worker=%p rx_headroom=%zu",
params->mode.device.dev_name, self, ops, worker,
(params->field_mask & UCT_IFACE_PARAM_FIELD_RX_HEADROOM) ?
params->rx_headroom : 0);
if (config->super.tx.queue_len <= UCT_UD_TX_MODERATION) {
ucs_error("%s ud iface tx queue is too short (%d <= %d)",
params->mode.device.dev_name,
config->super.tx.queue_len, UCT_UD_TX_MODERATION);
return UCS_ERR_INVALID_PARAM;
}
status = uct_ib_device_mtu(params->mode.device.dev_name, md, &mtu);
if (status != UCS_OK) {
return status;
}
init_attr->rx_priv_len = sizeof(uct_ud_recv_skb_t) -
sizeof(uct_ib_iface_recv_desc_t);
init_attr->rx_hdr_len = UCT_IB_GRH_LEN + sizeof(uct_ud_neth_t);
init_attr->tx_cq_len = config->super.tx.queue_len;
init_attr->rx_cq_len = config->super.rx.queue_len;
init_attr->seg_size = ucs_min(mtu, config->super.seg_size);
init_attr->qp_type = IBV_QPT_UD;
UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker,
params, &config->super, init_attr);
if (self->super.super.worker->async == NULL) {
ucs_error("%s ud iface must have valid async context", params->mode.device.dev_name);
return UCS_ERR_INVALID_PARAM;
}
self->tx.unsignaled = 0;
self->tx.available = config->super.tx.queue_len;
self->rx.available = config->super.rx.queue_len;
self->rx.quota = 0;
self->config.tx_qp_len = config->super.tx.queue_len;
self->config.peer_timeout = ucs_time_from_sec(config->peer_timeout);
self->config.check_grh_dgid = config->dgid_check &&
uct_ib_iface_is_roce(&self->super);
if ((config->max_window < UCT_UD_CA_MIN_WINDOW) ||
(config->max_window > UCT_UD_CA_MAX_WINDOW)) {
ucs_error("Max congestion avoidance window should be >= %d and <= %d (%d)",
UCT_UD_CA_MIN_WINDOW, UCT_UD_CA_MAX_WINDOW, config->max_window);
return UCS_ERR_INVALID_PARAM;
}
self->config.max_window = config->max_window;
if (config->slow_timer_tick <= 0.) {
ucs_error("The slow timer tick should be > 0 (%lf)",
config->slow_timer_tick);
return UCS_ERR_INVALID_PARAM;
} else {
self->async.slow_tick = ucs_time_from_sec(config->slow_timer_tick);
}
if (config->slow_timer_backoff < UCT_UD_MIN_TIMER_TIMER_BACKOFF) {
ucs_error("The slow timer back off must be >= %lf (%lf)",
UCT_UD_MIN_TIMER_TIMER_BACKOFF, config->slow_timer_backoff);
return UCS_ERR_INVALID_PARAM;
} else {
self->config.slow_timer_backoff = config->slow_timer_backoff;
}
/* Redefine receive desc release callback */
self->super.release_desc.cb = uct_ud_iface_release_desc;
UCT_UD_IFACE_HOOK_INIT(self);
if (uct_ud_iface_create_qp(self, config) != UCS_OK) {
return UCS_ERR_INVALID_PARAM;
}
ucs_ptr_array_init(&self->eps, 0, "ud_eps");
uct_ud_iface_cep_init(self);
status = uct_ib_iface_recv_mpool_init(&self->super, &config->super,
"ud_recv_skb", &self->rx.mp);
if (status != UCS_OK) {
goto err_qp;
}
self->rx.available = ucs_min(config->ud_common.rx_queue_len_init,
config->super.rx.queue_len);
self->rx.quota = config->super.rx.queue_len - self->rx.available;
ucs_mpool_grow(&self->rx.mp, self->rx.available);
data_size = sizeof(uct_ud_ctl_hdr_t) + self->super.addr_size;
data_size = ucs_max(data_size, self->super.config.seg_size);
data_size = ucs_max(data_size,
sizeof(uct_ud_zcopy_desc_t) + self->config.max_inline);
status = uct_iface_mpool_init(&self->super.super, &self->tx.mp,
sizeof(uct_ud_send_skb_t) + data_size,
sizeof(uct_ud_send_skb_t),
UCT_UD_SKB_ALIGN,
&config->super.tx.mp, self->config.tx_qp_len,
uct_ud_iface_send_skb_init, "ud_tx_skb");
if (status != UCS_OK) {
goto err_rx_mpool;
}
ucs_assert_always(data_size >= UCT_UD_MIN_INLINE);
self->tx.skb = NULL;
self->tx.skb_inl.super.len = sizeof(uct_ud_neth_t);
ucs_queue_head_init(&self->tx.resend_skbs);
self->tx.resend_skbs_quota = 0;
ucs_arbiter_init(&self->tx.pending_q);
ucs_queue_head_init(&self->tx.async_comp_q);
ucs_queue_head_init(&self->rx.pending_q);
self->tx.async_before_pending = 0;
uct_ud_iface_calc_gid_len(self);
status = UCS_STATS_NODE_ALLOC(&self->stats, &uct_ud_iface_stats_class,
self->super.super.stats);
if (status != UCS_OK) {
goto err_tx_mpool;
}
return UCS_OK;
err_tx_mpool:
ucs_mpool_cleanup(&self->tx.mp, 1);
err_rx_mpool:
ucs_mpool_cleanup(&self->rx.mp, 1);
err_qp:
uct_ib_destroy_qp(self->qp);
ucs_ptr_array_cleanup(&self->eps);
return status;
}
static UCS_CLASS_CLEANUP_FUNC(uct_ud_iface_t)
{
ucs_trace_func("");
/* TODO: proper flush and connection termination */
uct_ud_enter(self);
ucs_debug("iface(%p): cep cleanup", self);
uct_ud_iface_cep_cleanup(self);
uct_ud_iface_free_resend_skbs(self);
uct_ud_iface_free_async_comps(self);
ucs_mpool_cleanup(&self->tx.mp, 0);
/* TODO: qp to error state and cleanup all wqes */
uct_ud_iface_free_pending_rx(self);
ucs_mpool_cleanup(&self->rx.mp, 0);
uct_ib_destroy_qp(self->qp);
ucs_debug("iface(%p): ptr_array cleanup", self);
ucs_ptr_array_cleanup(&self->eps);
ucs_arbiter_cleanup(&self->tx.pending_q);
UCS_STATS_NODE_FREE(self->stats);
uct_ud_leave(self);
}
UCS_CLASS_DEFINE(uct_ud_iface_t, uct_ib_iface_t);
ucs_config_field_t uct_ud_iface_config_table[] = {
{"IB_", "", NULL,
ucs_offsetof(uct_ud_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)},
{"UD_", "", NULL,
ucs_offsetof(uct_ud_iface_config_t, ud_common),
UCS_CONFIG_TYPE_TABLE(uct_ud_iface_common_config_table)},
{"TIMEOUT", "5.0m", "Transport timeout",
ucs_offsetof(uct_ud_iface_config_t, peer_timeout), UCS_CONFIG_TYPE_TIME},
{"SLOW_TIMER_TICK", "100ms", "Initial timeout for retransmissions",
ucs_offsetof(uct_ud_iface_config_t, slow_timer_tick), UCS_CONFIG_TYPE_TIME},
{"SLOW_TIMER_BACKOFF", "2.0",
"Timeout multiplier for resending trigger (must be >= "
UCS_PP_MAKE_STRING(UCT_UD_MIN_TIMER_TIMER_BACKOFF) ")",
ucs_offsetof(uct_ud_iface_config_t, slow_timer_backoff),
UCS_CONFIG_TYPE_DOUBLE},
{"ETH_DGID_CHECK", "y",
"Enable checking destination GID for incoming packets of Ethernet network.\n"
"Mismatched packets are silently dropped.",
ucs_offsetof(uct_ud_iface_config_t, dgid_check), UCS_CONFIG_TYPE_BOOL},
{"MAX_WINDOW", UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW),
"Max congestion avoidance window. Should be >= "
UCS_PP_MAKE_STRING(UCT_UD_CA_MIN_WINDOW) " and <= "
UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW),
ucs_offsetof(uct_ud_iface_config_t, max_window), UCS_CONFIG_TYPE_UINT},
{NULL}
};
ucs_status_t uct_ud_iface_query(uct_ud_iface_t *iface, uct_iface_attr_t *iface_attr)
{
ucs_status_t status;
status = uct_ib_iface_query(&iface->super,
UCT_IB_DETH_LEN + sizeof(uct_ud_neth_t),
iface_attr);
if (status != UCS_OK) {
return status;
}
iface_attr->cap.flags = UCT_IFACE_FLAG_AM_BCOPY |
UCT_IFACE_FLAG_AM_ZCOPY |
UCT_IFACE_FLAG_CONNECT_TO_EP |
UCT_IFACE_FLAG_CONNECT_TO_IFACE |
UCT_IFACE_FLAG_PENDING |
UCT_IFACE_FLAG_CB_SYNC |
UCT_IFACE_FLAG_CB_ASYNC |
UCT_IFACE_FLAG_EVENT_SEND_COMP |
UCT_IFACE_FLAG_EVENT_RECV |
UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;
iface_attr->cap.am.max_short = uct_ib_iface_hdr_size(iface->config.max_inline,
sizeof(uct_ud_neth_t));
iface_attr->cap.am.max_bcopy = iface->super.config.seg_size - sizeof(uct_ud_neth_t);
iface_attr->cap.am.min_zcopy = 0;
iface_attr->cap.am.max_zcopy = iface->super.config.seg_size - sizeof(uct_ud_neth_t);
iface_attr->cap.am.align_mtu = uct_ib_mtu_value(uct_ib_iface_port_attr(&iface->super)->active_mtu);
iface_attr->cap.am.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD;
/* The first iov is reserved for the header */
iface_attr->cap.am.max_iov = uct_ib_iface_get_max_iov(&iface->super) - 1;
iface_attr->cap.put.max_short = uct_ib_iface_hdr_size(iface->config.max_inline,
sizeof(uct_ud_neth_t) +
sizeof(uct_ud_put_hdr_t));
iface_attr->iface_addr_len = sizeof(uct_ud_iface_addr_t);
iface_attr->ep_addr_len = sizeof(uct_ud_ep_addr_t);
iface_attr->max_conn_priv = 0;
/* UD lacks of scatter to CQE support */
iface_attr->latency.overhead += 10e-9;
if (iface_attr->cap.am.max_short) {
iface_attr->cap.flags |= UCT_IFACE_FLAG_AM_SHORT;
}
return UCS_OK;
}
ucs_status_t
uct_ud_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr)
{
uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
uct_ud_iface_addr_t *addr = (uct_ud_iface_addr_t *)iface_addr;
uct_ib_pack_uint24(addr->qp_num, iface->qp->qp_num);
return UCS_OK;
}
ucs_status_t uct_ud_iface_flush(uct_iface_h tl_iface, unsigned flags,
uct_completion_t *comp)
{
uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
uct_ud_ep_t *ep;
ucs_status_t status;
int i, count;
ucs_trace_func("");
if (comp != NULL) {
return UCS_ERR_UNSUPPORTED;
}
uct_ud_enter(iface);
if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) {
UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
uct_ud_leave(iface);
return UCS_INPROGRESS;
}
count = 0;
ucs_ptr_array_for_each(ep, i, &iface->eps) {
/* ud ep flush returns either ok or in progress */
status = uct_ud_ep_flush_nolock(iface, ep, NULL);
if ((status == UCS_INPROGRESS) || (status == UCS_ERR_NO_RESOURCE)) {
++count;
}
}
uct_ud_leave(iface);
if (count != 0) {
UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super);
return UCS_INPROGRESS;
}
UCT_TL_IFACE_STAT_FLUSH(&iface->super.super);
return UCS_OK;
}
void uct_ud_iface_add_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
{
uint32_t prev_gen;
ep->ep_id = ucs_ptr_array_insert(&iface->eps, ep, &prev_gen);
}
void uct_ud_iface_remove_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
{
if (ep->ep_id != UCT_UD_EP_NULL_ID) {
ucs_trace("iface(%p) remove ep: %p id %d", iface, ep, ep->ep_id);
ucs_ptr_array_remove(&iface->eps, ep->ep_id, 0);
}
}
void uct_ud_iface_replace_ep(uct_ud_iface_t *iface,
uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep)
{
void *p;
ucs_assert_always(old_ep != new_ep);
ucs_assert_always(old_ep->ep_id != new_ep->ep_id);
p = ucs_ptr_array_replace(&iface->eps, old_ep->ep_id, new_ep);
ucs_assert_always(p == (void *)old_ep);
ucs_trace("replace_ep: old(%p) id=%d new(%p) id=%d", old_ep, old_ep->ep_id, new_ep, new_ep->ep_id);
ucs_ptr_array_remove(&iface->eps, new_ep->ep_id, 0);
}
uct_ud_send_skb_t *uct_ud_iface_resend_skb_get(uct_ud_iface_t *iface)
{
ucs_queue_elem_t *elem;
uct_ud_send_skb_t *skb;
/* grow reserved skb's queue on-demand */
if (iface->tx.resend_skbs_quota > 0) {
skb = ucs_mpool_get(&iface->tx.mp);
if (skb == NULL) {
ucs_fatal("failed to allocate control skb");
}
--iface->tx.resend_skbs_quota;
return skb;
} else {
elem = ucs_queue_pull(&iface->tx.resend_skbs);
ucs_assert(elem != NULL);
return ucs_container_of(elem, uct_ud_send_skb_t, queue);
}
}
static void uct_ud_iface_free_resend_skbs(uct_ud_iface_t *iface)
{
uct_ud_send_skb_t *skb;
iface->tx.resend_skbs_quota = 0;
ucs_queue_for_each_extract(skb, &iface->tx.resend_skbs, queue, 1) {
ucs_mpool_put(skb);
}
}
static void uct_ud_ep_dispatch_err_comp(uct_ud_ep_t *ep, uct_ud_send_skb_t *skb)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
ucs_status_t status;
ucs_assert(ep->tx.err_skb_count > 0);
--ep->tx.err_skb_count;
if ((ep->tx.err_skb_count > 0) || (ep->flags & UCT_UD_EP_FLAG_DISCONNECTED)) {
return;
}
if (ep->flags & UCT_UD_EP_FLAG_PRIVATE) {
uct_ep_destroy(&ep->super.super);
return;
}
status = iface->super.ops->set_ep_failed(&iface->super, &ep->super.super,
(ucs_status_t)skb->status);
if (status != UCS_OK) {
ucs_fatal("transport error: %s", ucs_status_string(status));
}
}
void uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t *iface)
{
uct_ud_comp_desc_t *cdesc;
uct_ud_send_skb_t *skb;
uct_ud_ep_t *ep;
do {
skb = ucs_queue_pull_elem_non_empty(&iface->tx.async_comp_q,
uct_ud_send_skb_t, queue);
cdesc = uct_ud_comp_desc(skb);
ep = cdesc->ep;
if (skb->flags & UCT_UD_SEND_SKB_FLAG_COMP) {
ucs_assert(!(ep->flags & UCT_UD_EP_FLAG_DISCONNECTED));
uct_invoke_completion(cdesc->comp, (ucs_status_t)skb->status);
}
if (ucs_unlikely(skb->flags & UCT_UD_SEND_SKB_FLAG_ERR)) {
uct_ud_ep_dispatch_err_comp(ep, skb);
}
ep->flags &= ~UCT_UD_EP_FLAG_ASYNC_COMPS;
skb->flags = 0;
ucs_mpool_put(skb);
} while (!ucs_queue_is_empty(&iface->tx.async_comp_q));
}
static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface)
{
uct_ud_send_skb_t *skb;
while (!ucs_queue_is_empty(&iface->tx.async_comp_q)) {
skb = ucs_queue_pull_elem_non_empty(&iface->tx.async_comp_q,
uct_ud_send_skb_t, queue);
ucs_mpool_put(skb);
}
}
ucs_status_t uct_ud_iface_dispatch_pending_rx_do(uct_ud_iface_t *iface)
{
int count;
uct_ud_recv_skb_t *skb;
uct_ud_neth_t *neth;
unsigned max_poll = iface->super.config.rx_max_poll;
count = 0;
do {
skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue);
neth = (uct_ud_neth_t *)((char *)uct_ib_iface_recv_desc_hdr(&iface->super,
(uct_ib_iface_recv_desc_t *)skb) +
UCT_IB_GRH_LEN);
uct_ib_iface_invoke_am_desc(&iface->super,
uct_ud_neth_get_am_id(neth),
neth + 1,
skb->u.am.len,
&skb->super);
count++;
if (count >= max_poll) {
return UCS_ERR_NO_RESOURCE;
}
} while (!ucs_queue_is_empty(&iface->rx.pending_q));
return UCS_OK;
}
static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface)
{
uct_ud_recv_skb_t *skb;
while (!ucs_queue_is_empty(&iface->rx.pending_q)) {
skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue);
ucs_mpool_put(skb);
}
}
static inline void uct_ud_iface_async_progress(uct_ud_iface_t *iface)
{
unsigned ev_count;
uct_ud_iface_ops_t *ops;
ops = ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t);
ev_count = ops->async_progress(iface);
if (ev_count > 0) {
uct_ud_iface_raise_pending_async_ev(iface);
}
}
static void uct_ud_iface_timer(int timer_id, void *arg)
{
uct_ud_iface_t *iface = arg;
ucs_time_t now;
uct_ud_enter(iface);
now = uct_ud_iface_get_async_time(iface);
ucs_trace_async("iface(%p) slow_timer_sweep: now %lu", iface, now);
ucs_twheel_sweep(&iface->async.slow_timer, now);
uct_ud_iface_async_progress(iface);
uct_ud_leave(iface);
}
void uct_ud_iface_release_desc(uct_recv_desc_t *self, void *desc)
{
uct_ud_iface_t *iface = ucs_container_of(self,
uct_ud_iface_t, super.release_desc);
uct_ud_enter(iface);
uct_ib_iface_release_desc(self, desc);
uct_ud_leave(iface);
}
void uct_ud_iface_handle_failure(uct_ib_iface_t *iface, void *arg,
ucs_status_t status)
{
uct_ud_tx_wnd_purge_outstanding(ucs_derived_of(iface, uct_ud_iface_t),
(uct_ud_ep_t *)arg, status);
}
ucs_status_t uct_ud_iface_event_arm(uct_iface_h tl_iface, unsigned events)
{
uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
ucs_status_t status;
uct_ud_enter(iface);
status = uct_ib_iface_pre_arm(&iface->super);
if (status != UCS_OK) {
goto out;
}
/* Check if some receives were not delivered yet */
if ((events & (UCT_EVENT_RECV | UCT_EVENT_RECV_SIG)) &&
!ucs_queue_is_empty(&iface->rx.pending_q))
{
status = UCS_ERR_BUSY;
goto out;
}
/* Check if some send completions were not delivered yet */
if ((events & UCT_EVENT_SEND_COMP) &&
!ucs_queue_is_empty(&iface->tx.async_comp_q))
{
status = UCS_ERR_BUSY;
goto out;
}
if (events & UCT_EVENT_SEND_COMP) {
status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_TX, 0);
if (status != UCS_OK) {
goto out;
}
}
if (events & (UCT_EVENT_SEND_COMP | UCT_EVENT_RECV)) {
/* we may get send completion through ACKs as well */
status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX, 0);
if (status != UCS_OK) {
goto out;
}
}
status = UCS_OK;
out:
uct_ud_leave(iface);
return status;
}
void uct_ud_iface_progress_enable(uct_iface_h tl_iface, unsigned flags)
{
uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t);
if (flags & UCT_PROGRESS_RECV) {
uct_ud_enter(iface);
iface->rx.available += iface->rx.quota;
iface->rx.quota = 0;
/* let progress (possibly async) post the missing receives */
uct_ud_leave(iface);
}
uct_base_iface_progress_enable(tl_iface, flags);
}