/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "ud_iface.h" #include "ud_ep.h" #include "ud_inl.h" #include #include #include #include #include #include #include #define UCT_UD_IPV4_ADDR_LEN sizeof(struct in_addr) #define UCT_UD_IPV6_ADDR_LEN sizeof(struct in6_addr) #if ENABLE_STATS static ucs_stats_class_t uct_ud_iface_stats_class = { .name = "ud_iface", .num_counters = UCT_UD_IFACE_STAT_LAST, .counter_names = { [UCT_UD_IFACE_STAT_RX_DROP] = "rx_drop" } }; #endif /* cppcheck-suppress ctunullpointer */ SGLIB_DEFINE_LIST_FUNCTIONS(uct_ud_iface_peer_t, uct_ud_iface_peer_cmp, next) SGLIB_DEFINE_HASHED_CONTAINER_FUNCTIONS(uct_ud_iface_peer_t, UCT_UD_HASH_SIZE, uct_ud_iface_peer_hash) static void uct_ud_iface_free_resend_skbs(uct_ud_iface_t *iface); static void uct_ud_iface_timer(int timer_id, void *arg); static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface); static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface); void uct_ud_iface_cep_init(uct_ud_iface_t *iface) { sglib_hashed_uct_ud_iface_peer_t_init(iface->peers); } static void uct_ud_iface_cep_cleanup_eps(uct_ud_iface_t *iface, uct_ud_iface_peer_t *peer) { uct_ud_ep_t *ep, *tmp; ucs_list_for_each_safe(ep, tmp, &peer->ep_list, cep_list) { if (ep->conn_id < peer->conn_id_last) { /* active connection should already be cleaned by owner */ ucs_warn("iface (%p) peer (qpn=%d lid=%d) cleanup with %d endpoints still active", iface, peer->dst_qpn, peer->dlid, (int)ucs_list_length(&peer->ep_list)); continue; } ucs_list_del(&ep->cep_list); ucs_trace("cep:ep_destroy(%p) conn_id %d", ep, ep->conn_id); uct_ep_destroy(&ep->super.super); } } void uct_ud_iface_cep_cleanup(uct_ud_iface_t *iface) { uct_ud_iface_peer_t *peer; struct sglib_hashed_uct_ud_iface_peer_t_iterator it_peer; for (peer = sglib_hashed_uct_ud_iface_peer_t_it_init(&it_peer, iface->peers); peer != NULL; peer = sglib_hashed_uct_ud_iface_peer_t_it_next(&it_peer)) { uct_ud_iface_cep_cleanup_eps(iface, peer); free(peer); } } static uct_ud_iface_peer_t * uct_ud_iface_cep_lookup_addr(uct_ud_iface_t *iface, uint16_t dlid, const union ibv_gid *dgid, uint32_t dest_qpn) { uct_ud_iface_peer_t key; key.dlid = dlid; key.dgid = *dgid; key.dst_qpn = dest_qpn; return sglib_hashed_uct_ud_iface_peer_t_find_member(iface->peers, &key); } static uct_ud_iface_peer_t * uct_ud_iface_cep_lookup_peer(uct_ud_iface_t *iface, const uct_ib_address_t *src_ib_addr, const uct_ud_iface_addr_t *src_if_addr) { uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num); union ibv_gid dgid; uint16_t dlid; uct_ib_address_unpack(src_ib_addr, &dlid, &dgid); return uct_ud_iface_cep_lookup_addr(iface, dlid, &dgid, dest_qpn); } static uct_ud_ep_t * uct_ud_iface_cep_lookup_ep(uct_ud_iface_peer_t *peer, uint32_t conn_id) { uint32_t id; uct_ud_ep_t *ep; if (conn_id != UCT_UD_EP_CONN_ID_MAX) { id = conn_id; } else { id = peer->conn_id_last; /* TODO: O(1) lookup in this case (new connection) */ } ucs_list_for_each(ep, &peer->ep_list, cep_list) { if (ep->conn_id == id) { return ep; } if (ep->conn_id < id) { break; } } return NULL; } static uint32_t uct_ud_iface_cep_getid(uct_ud_iface_peer_t *peer, uint32_t conn_id) { uint32_t new_id; if (conn_id != UCT_UD_EP_CONN_ID_MAX) { return conn_id; } new_id = peer->conn_id_last++; return new_id; } /* insert new ep that is connected to src_if_addr */ ucs_status_t uct_ud_iface_cep_insert(uct_ud_iface_t *iface, const uct_ib_address_t *src_ib_addr, const uct_ud_iface_addr_t *src_if_addr, uct_ud_ep_t *ep, uint32_t conn_id) { uint32_t dest_qpn = uct_ib_unpack_uint24(src_if_addr->qp_num); uct_ud_iface_peer_t *peer; union ibv_gid dgid; uct_ud_ep_t *cep; uint16_t dlid; uct_ib_address_unpack(src_ib_addr, &dlid, &dgid); peer = uct_ud_iface_cep_lookup_addr(iface, dlid, &dgid, dest_qpn); if (peer == NULL) { peer = malloc(sizeof *peer); if (peer == NULL) { return UCS_ERR_NO_MEMORY; } peer->dlid = dlid; peer->dgid = dgid; peer->dst_qpn = dest_qpn; sglib_hashed_uct_ud_iface_peer_t_add(iface->peers, peer); ucs_list_head_init(&peer->ep_list); peer->conn_id_last = 0; } ep->conn_id = uct_ud_iface_cep_getid(peer, conn_id); if (ep->conn_id == UCT_UD_EP_CONN_ID_MAX) { return UCS_ERR_NO_RESOURCE; } if (ucs_list_is_empty(&peer->ep_list)) { ucs_list_add_head(&peer->ep_list, &ep->cep_list); return UCS_OK; } ucs_list_for_each(cep, &peer->ep_list, cep_list) { ucs_assert_always(cep->conn_id != ep->conn_id); if (cep->conn_id < ep->conn_id) { ucs_list_insert_before(&cep->cep_list, &ep->cep_list); return UCS_OK; } } return UCS_OK; } void uct_ud_iface_cep_remove(uct_ud_ep_t *ep) { if (ucs_list_is_empty(&ep->cep_list)) { return; } ucs_trace("iface(%p) cep_remove:ep(%p)", ep->super.super.iface, ep); ucs_list_del(&ep->cep_list); ucs_list_head_init(&ep->cep_list); } uct_ud_ep_t *uct_ud_iface_cep_lookup(uct_ud_iface_t *iface, const uct_ib_address_t *src_ib_addr, const uct_ud_iface_addr_t *src_if_addr, uint32_t conn_id) { uct_ud_iface_peer_t *peer; uct_ud_ep_t *ep; peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr); if (peer == NULL) { return NULL; } ep = uct_ud_iface_cep_lookup_ep(peer, conn_id); if (ep && conn_id == UCT_UD_EP_CONN_ID_MAX) { peer->conn_id_last++; } return ep; } void uct_ud_iface_cep_rollback(uct_ud_iface_t *iface, const uct_ib_address_t *src_ib_addr, const uct_ud_iface_addr_t *src_if_addr, uct_ud_ep_t *ep) { uct_ud_iface_peer_t *peer; peer = uct_ud_iface_cep_lookup_peer(iface, src_ib_addr, src_if_addr); ucs_assert_always(peer != NULL); ucs_assert_always(peer->conn_id_last > 0); ucs_assert_always(ep->conn_id + 1 == peer->conn_id_last); ucs_assert_always(!ucs_list_is_empty(&peer->ep_list)); ucs_assert_always(!ucs_list_is_empty(&ep->cep_list)); peer->conn_id_last--; uct_ud_iface_cep_remove(ep); } static void uct_ud_iface_send_skb_init(uct_iface_h tl_iface, void *obj, uct_mem_h memh) { uct_ud_send_skb_t *skb = obj; uct_ib_mem_t *ib_memh = memh; skb->lkey = ib_memh->lkey; skb->flags = 0; } static ucs_status_t uct_ud_iface_create_qp(uct_ud_iface_t *self, const uct_ud_iface_config_t *config) { uct_ud_iface_ops_t *ops = ucs_derived_of(self->super.ops, uct_ud_iface_ops_t); uct_ib_qp_attr_t qp_init_attr = {}; struct ibv_qp_attr qp_attr; static ucs_status_t status; int ret; qp_init_attr.qp_type = IBV_QPT_UD; qp_init_attr.sq_sig_all = 0; qp_init_attr.cap.max_send_wr = config->super.tx.queue_len; qp_init_attr.cap.max_recv_wr = config->super.rx.queue_len; qp_init_attr.cap.max_send_sge = 2; qp_init_attr.cap.max_recv_sge = 1; qp_init_attr.cap.max_inline_data = ucs_max(config->super.tx.min_inline, UCT_UD_MIN_INLINE); status = ops->create_qp(&self->super, &qp_init_attr, &self->qp); if (status != UCS_OK) { return status; } self->config.max_inline = qp_init_attr.cap.max_inline_data; uct_ib_iface_set_max_iov(&self->super, qp_init_attr.cap.max_send_sge); memset(&qp_attr, 0, sizeof(qp_attr)); /* Modify QP to INIT state */ qp_attr.qp_state = IBV_QPS_INIT; qp_attr.pkey_index = self->super.pkey_index; qp_attr.port_num = self->super.config.port_num; qp_attr.qkey = UCT_IB_KEY; ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY); if (ret) { ucs_error("Failed to modify UD QP to INIT: %m"); goto err_destroy_qp; } /* Modify to RTR */ qp_attr.qp_state = IBV_QPS_RTR; ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE); if (ret) { ucs_error("Failed to modify UD QP to RTR: %m"); goto err_destroy_qp; } /* Modify to RTS */ qp_attr.qp_state = IBV_QPS_RTS; qp_attr.sq_psn = 0; ret = ibv_modify_qp(self->qp, &qp_attr, IBV_QP_STATE | IBV_QP_SQ_PSN); if (ret) { ucs_error("Failed to modify UD QP to RTS: %m"); goto err_destroy_qp; } return UCS_OK; err_destroy_qp: uct_ib_destroy_qp(self->qp); return UCS_ERR_INVALID_PARAM; } ucs_status_t uct_ud_iface_complete_init(uct_ud_iface_t *iface) { ucs_async_context_t *async = iface->super.super.worker->async; ucs_async_mode_t async_mode = async->mode; ucs_status_t status; iface->tx.resend_skbs_quota = iface->tx.available; status = ucs_twheel_init(&iface->async.slow_timer, iface->async.slow_tick / 4, uct_ud_iface_get_async_time(iface)); if (status != UCS_OK) { goto err; } status = ucs_async_add_timer(async_mode, iface->async.slow_tick, uct_ud_iface_timer, iface, async, &iface->async.timer_id); if (status != UCS_OK) { goto err_twheel_cleanup; } return UCS_OK; err_twheel_cleanup: ucs_twheel_cleanup(&iface->async.slow_timer); err: return status; } void uct_ud_iface_remove_async_handlers(uct_ud_iface_t *iface) { uct_base_iface_progress_disable(&iface->super.super.super, UCT_PROGRESS_SEND | UCT_PROGRESS_RECV); ucs_async_remove_handler(iface->async.timer_id, 1); } /* Calculate real GIDs len. Can be either 16 (RoCEv1 or RoCEv2/IPv6) * or 4 (RoCEv2/IPv4). This len is used for packets filtering by DGIDs. * * According to Annex17_RoCEv2 (A17.4.5.2): * "The first 40 bytes of user posted UD Receive Buffers are reserved for the L3 * header of the incoming packet (as per the InfiniBand Spec Section 11.4.1.2). * In RoCEv2, this area is filled up with the IP header. IPv6 header uses the * entire 40 bytes. IPv4 headers use the 20 bytes in the second half of the * reserved 40 bytes area (i.e. offset 20 from the beginning of the receive * buffer). In this case, the content of the first 20 bytes is undefined." */ static void uct_ud_iface_calc_gid_len(uct_ud_iface_t *iface) { uint16_t *local_gid_u16 = (uint16_t*)iface->super.gid.raw; /* Make sure that daddr in IPv4 resides in the last 4 bytes in GRH */ UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - (20 + offsetof(struct iphdr, daddr))) == UCT_UD_IPV4_ADDR_LEN); /* Make sure that dgid resides in the last 16 bytes in GRH */ UCS_STATIC_ASSERT((UCT_IB_GRH_LEN - offsetof(struct ibv_grh, dgid)) == UCT_UD_IPV6_ADDR_LEN); /* IPv4 mapped to IPv6 looks like: 0000:0000:0000:0000:0000:ffff:????:????, * so check for leading zeroes and verify that 11-12 bytes are 0xff. * Otherwise either RoCEv1 or RoCEv2/IPv6 are used. */ if (local_gid_u16[0] == 0x0000) { ucs_assert_always(local_gid_u16[5] == 0xffff); iface->config.gid_len = UCT_UD_IPV4_ADDR_LEN; } else { iface->config.gid_len = UCT_UD_IPV6_ADDR_LEN; } } UCS_CLASS_INIT_FUNC(uct_ud_iface_t, uct_ud_iface_ops_t *ops, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_ud_iface_config_t *config, uct_ib_iface_init_attr_t *init_attr) { ucs_status_t status; size_t data_size; int mtu; UCT_CHECK_PARAM(params->field_mask & UCT_IFACE_PARAM_FIELD_OPEN_MODE, "UCT_IFACE_PARAM_FIELD_OPEN_MODE is not defined"); if (!(params->open_mode & UCT_IFACE_OPEN_MODE_DEVICE)) { ucs_error("only UCT_IFACE_OPEN_MODE_DEVICE is supported"); return UCS_ERR_UNSUPPORTED; } ucs_trace_func("%s: iface=%p ops=%p worker=%p rx_headroom=%zu", params->mode.device.dev_name, self, ops, worker, (params->field_mask & UCT_IFACE_PARAM_FIELD_RX_HEADROOM) ? params->rx_headroom : 0); if (config->super.tx.queue_len <= UCT_UD_TX_MODERATION) { ucs_error("%s ud iface tx queue is too short (%d <= %d)", params->mode.device.dev_name, config->super.tx.queue_len, UCT_UD_TX_MODERATION); return UCS_ERR_INVALID_PARAM; } status = uct_ib_device_mtu(params->mode.device.dev_name, md, &mtu); if (status != UCS_OK) { return status; } init_attr->rx_priv_len = sizeof(uct_ud_recv_skb_t) - sizeof(uct_ib_iface_recv_desc_t); init_attr->rx_hdr_len = UCT_IB_GRH_LEN + sizeof(uct_ud_neth_t); init_attr->tx_cq_len = config->super.tx.queue_len; init_attr->rx_cq_len = config->super.rx.queue_len; init_attr->seg_size = ucs_min(mtu, config->super.seg_size); init_attr->qp_type = IBV_QPT_UD; UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t, &ops->super, md, worker, params, &config->super, init_attr); if (self->super.super.worker->async == NULL) { ucs_error("%s ud iface must have valid async context", params->mode.device.dev_name); return UCS_ERR_INVALID_PARAM; } self->tx.unsignaled = 0; self->tx.available = config->super.tx.queue_len; self->rx.available = config->super.rx.queue_len; self->rx.quota = 0; self->config.tx_qp_len = config->super.tx.queue_len; self->config.peer_timeout = ucs_time_from_sec(config->peer_timeout); self->config.check_grh_dgid = config->dgid_check && uct_ib_iface_is_roce(&self->super); if ((config->max_window < UCT_UD_CA_MIN_WINDOW) || (config->max_window > UCT_UD_CA_MAX_WINDOW)) { ucs_error("Max congestion avoidance window should be >= %d and <= %d (%d)", UCT_UD_CA_MIN_WINDOW, UCT_UD_CA_MAX_WINDOW, config->max_window); return UCS_ERR_INVALID_PARAM; } self->config.max_window = config->max_window; if (config->slow_timer_tick <= 0.) { ucs_error("The slow timer tick should be > 0 (%lf)", config->slow_timer_tick); return UCS_ERR_INVALID_PARAM; } else { self->async.slow_tick = ucs_time_from_sec(config->slow_timer_tick); } if (config->slow_timer_backoff < UCT_UD_MIN_TIMER_TIMER_BACKOFF) { ucs_error("The slow timer back off must be >= %lf (%lf)", UCT_UD_MIN_TIMER_TIMER_BACKOFF, config->slow_timer_backoff); return UCS_ERR_INVALID_PARAM; } else { self->config.slow_timer_backoff = config->slow_timer_backoff; } /* Redefine receive desc release callback */ self->super.release_desc.cb = uct_ud_iface_release_desc; UCT_UD_IFACE_HOOK_INIT(self); if (uct_ud_iface_create_qp(self, config) != UCS_OK) { return UCS_ERR_INVALID_PARAM; } ucs_ptr_array_init(&self->eps, 0, "ud_eps"); uct_ud_iface_cep_init(self); status = uct_ib_iface_recv_mpool_init(&self->super, &config->super, "ud_recv_skb", &self->rx.mp); if (status != UCS_OK) { goto err_qp; } self->rx.available = ucs_min(config->ud_common.rx_queue_len_init, config->super.rx.queue_len); self->rx.quota = config->super.rx.queue_len - self->rx.available; ucs_mpool_grow(&self->rx.mp, self->rx.available); data_size = sizeof(uct_ud_ctl_hdr_t) + self->super.addr_size; data_size = ucs_max(data_size, self->super.config.seg_size); data_size = ucs_max(data_size, sizeof(uct_ud_zcopy_desc_t) + self->config.max_inline); status = uct_iface_mpool_init(&self->super.super, &self->tx.mp, sizeof(uct_ud_send_skb_t) + data_size, sizeof(uct_ud_send_skb_t), UCT_UD_SKB_ALIGN, &config->super.tx.mp, self->config.tx_qp_len, uct_ud_iface_send_skb_init, "ud_tx_skb"); if (status != UCS_OK) { goto err_rx_mpool; } ucs_assert_always(data_size >= UCT_UD_MIN_INLINE); self->tx.skb = NULL; self->tx.skb_inl.super.len = sizeof(uct_ud_neth_t); ucs_queue_head_init(&self->tx.resend_skbs); self->tx.resend_skbs_quota = 0; ucs_arbiter_init(&self->tx.pending_q); ucs_queue_head_init(&self->tx.async_comp_q); ucs_queue_head_init(&self->rx.pending_q); self->tx.async_before_pending = 0; uct_ud_iface_calc_gid_len(self); status = UCS_STATS_NODE_ALLOC(&self->stats, &uct_ud_iface_stats_class, self->super.super.stats); if (status != UCS_OK) { goto err_tx_mpool; } return UCS_OK; err_tx_mpool: ucs_mpool_cleanup(&self->tx.mp, 1); err_rx_mpool: ucs_mpool_cleanup(&self->rx.mp, 1); err_qp: uct_ib_destroy_qp(self->qp); ucs_ptr_array_cleanup(&self->eps); return status; } static UCS_CLASS_CLEANUP_FUNC(uct_ud_iface_t) { ucs_trace_func(""); /* TODO: proper flush and connection termination */ uct_ud_enter(self); ucs_debug("iface(%p): cep cleanup", self); uct_ud_iface_cep_cleanup(self); uct_ud_iface_free_resend_skbs(self); uct_ud_iface_free_async_comps(self); ucs_mpool_cleanup(&self->tx.mp, 0); /* TODO: qp to error state and cleanup all wqes */ uct_ud_iface_free_pending_rx(self); ucs_mpool_cleanup(&self->rx.mp, 0); uct_ib_destroy_qp(self->qp); ucs_debug("iface(%p): ptr_array cleanup", self); ucs_ptr_array_cleanup(&self->eps); ucs_arbiter_cleanup(&self->tx.pending_q); UCS_STATS_NODE_FREE(self->stats); uct_ud_leave(self); } UCS_CLASS_DEFINE(uct_ud_iface_t, uct_ib_iface_t); ucs_config_field_t uct_ud_iface_config_table[] = { {"IB_", "", NULL, ucs_offsetof(uct_ud_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_ib_iface_config_table)}, {"UD_", "", NULL, ucs_offsetof(uct_ud_iface_config_t, ud_common), UCS_CONFIG_TYPE_TABLE(uct_ud_iface_common_config_table)}, {"TIMEOUT", "5.0m", "Transport timeout", ucs_offsetof(uct_ud_iface_config_t, peer_timeout), UCS_CONFIG_TYPE_TIME}, {"SLOW_TIMER_TICK", "100ms", "Initial timeout for retransmissions", ucs_offsetof(uct_ud_iface_config_t, slow_timer_tick), UCS_CONFIG_TYPE_TIME}, {"SLOW_TIMER_BACKOFF", "2.0", "Timeout multiplier for resending trigger (must be >= " UCS_PP_MAKE_STRING(UCT_UD_MIN_TIMER_TIMER_BACKOFF) ")", ucs_offsetof(uct_ud_iface_config_t, slow_timer_backoff), UCS_CONFIG_TYPE_DOUBLE}, {"ETH_DGID_CHECK", "y", "Enable checking destination GID for incoming packets of Ethernet network.\n" "Mismatched packets are silently dropped.", ucs_offsetof(uct_ud_iface_config_t, dgid_check), UCS_CONFIG_TYPE_BOOL}, {"MAX_WINDOW", UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW), "Max congestion avoidance window. Should be >= " UCS_PP_MAKE_STRING(UCT_UD_CA_MIN_WINDOW) " and <= " UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW), ucs_offsetof(uct_ud_iface_config_t, max_window), UCS_CONFIG_TYPE_UINT}, {NULL} }; ucs_status_t uct_ud_iface_query(uct_ud_iface_t *iface, uct_iface_attr_t *iface_attr) { ucs_status_t status; status = uct_ib_iface_query(&iface->super, UCT_IB_DETH_LEN + sizeof(uct_ud_neth_t), iface_attr); if (status != UCS_OK) { return status; } iface_attr->cap.flags = UCT_IFACE_FLAG_AM_BCOPY | UCT_IFACE_FLAG_AM_ZCOPY | UCT_IFACE_FLAG_CONNECT_TO_EP | UCT_IFACE_FLAG_CONNECT_TO_IFACE | UCT_IFACE_FLAG_PENDING | UCT_IFACE_FLAG_CB_SYNC | UCT_IFACE_FLAG_CB_ASYNC | UCT_IFACE_FLAG_EVENT_SEND_COMP | UCT_IFACE_FLAG_EVENT_RECV | UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE; iface_attr->cap.am.max_short = uct_ib_iface_hdr_size(iface->config.max_inline, sizeof(uct_ud_neth_t)); iface_attr->cap.am.max_bcopy = iface->super.config.seg_size - sizeof(uct_ud_neth_t); iface_attr->cap.am.min_zcopy = 0; iface_attr->cap.am.max_zcopy = iface->super.config.seg_size - sizeof(uct_ud_neth_t); iface_attr->cap.am.align_mtu = uct_ib_mtu_value(uct_ib_iface_port_attr(&iface->super)->active_mtu); iface_attr->cap.am.opt_zcopy_align = UCS_SYS_PCI_MAX_PAYLOAD; /* The first iov is reserved for the header */ iface_attr->cap.am.max_iov = uct_ib_iface_get_max_iov(&iface->super) - 1; iface_attr->cap.put.max_short = uct_ib_iface_hdr_size(iface->config.max_inline, sizeof(uct_ud_neth_t) + sizeof(uct_ud_put_hdr_t)); iface_attr->iface_addr_len = sizeof(uct_ud_iface_addr_t); iface_attr->ep_addr_len = sizeof(uct_ud_ep_addr_t); iface_attr->max_conn_priv = 0; /* UD lacks of scatter to CQE support */ iface_attr->latency.overhead += 10e-9; if (iface_attr->cap.am.max_short) { iface_attr->cap.flags |= UCT_IFACE_FLAG_AM_SHORT; } return UCS_OK; } ucs_status_t uct_ud_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr) { uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t); uct_ud_iface_addr_t *addr = (uct_ud_iface_addr_t *)iface_addr; uct_ib_pack_uint24(addr->qp_num, iface->qp->qp_num); return UCS_OK; } ucs_status_t uct_ud_iface_flush(uct_iface_h tl_iface, unsigned flags, uct_completion_t *comp) { uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t); uct_ud_ep_t *ep; ucs_status_t status; int i, count; ucs_trace_func(""); if (comp != NULL) { return UCS_ERR_UNSUPPORTED; } uct_ud_enter(iface); if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) { UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super); uct_ud_leave(iface); return UCS_INPROGRESS; } count = 0; ucs_ptr_array_for_each(ep, i, &iface->eps) { /* ud ep flush returns either ok or in progress */ status = uct_ud_ep_flush_nolock(iface, ep, NULL); if ((status == UCS_INPROGRESS) || (status == UCS_ERR_NO_RESOURCE)) { ++count; } } uct_ud_leave(iface); if (count != 0) { UCT_TL_IFACE_STAT_FLUSH_WAIT(&iface->super.super); return UCS_INPROGRESS; } UCT_TL_IFACE_STAT_FLUSH(&iface->super.super); return UCS_OK; } void uct_ud_iface_add_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep) { uint32_t prev_gen; ep->ep_id = ucs_ptr_array_insert(&iface->eps, ep, &prev_gen); } void uct_ud_iface_remove_ep(uct_ud_iface_t *iface, uct_ud_ep_t *ep) { if (ep->ep_id != UCT_UD_EP_NULL_ID) { ucs_trace("iface(%p) remove ep: %p id %d", iface, ep, ep->ep_id); ucs_ptr_array_remove(&iface->eps, ep->ep_id, 0); } } void uct_ud_iface_replace_ep(uct_ud_iface_t *iface, uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep) { void *p; ucs_assert_always(old_ep != new_ep); ucs_assert_always(old_ep->ep_id != new_ep->ep_id); p = ucs_ptr_array_replace(&iface->eps, old_ep->ep_id, new_ep); ucs_assert_always(p == (void *)old_ep); ucs_trace("replace_ep: old(%p) id=%d new(%p) id=%d", old_ep, old_ep->ep_id, new_ep, new_ep->ep_id); ucs_ptr_array_remove(&iface->eps, new_ep->ep_id, 0); } uct_ud_send_skb_t *uct_ud_iface_resend_skb_get(uct_ud_iface_t *iface) { ucs_queue_elem_t *elem; uct_ud_send_skb_t *skb; /* grow reserved skb's queue on-demand */ if (iface->tx.resend_skbs_quota > 0) { skb = ucs_mpool_get(&iface->tx.mp); if (skb == NULL) { ucs_fatal("failed to allocate control skb"); } --iface->tx.resend_skbs_quota; return skb; } else { elem = ucs_queue_pull(&iface->tx.resend_skbs); ucs_assert(elem != NULL); return ucs_container_of(elem, uct_ud_send_skb_t, queue); } } static void uct_ud_iface_free_resend_skbs(uct_ud_iface_t *iface) { uct_ud_send_skb_t *skb; iface->tx.resend_skbs_quota = 0; ucs_queue_for_each_extract(skb, &iface->tx.resend_skbs, queue, 1) { ucs_mpool_put(skb); } } static void uct_ud_ep_dispatch_err_comp(uct_ud_ep_t *ep, uct_ud_send_skb_t *skb) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); ucs_status_t status; ucs_assert(ep->tx.err_skb_count > 0); --ep->tx.err_skb_count; if ((ep->tx.err_skb_count > 0) || (ep->flags & UCT_UD_EP_FLAG_DISCONNECTED)) { return; } if (ep->flags & UCT_UD_EP_FLAG_PRIVATE) { uct_ep_destroy(&ep->super.super); return; } status = iface->super.ops->set_ep_failed(&iface->super, &ep->super.super, (ucs_status_t)skb->status); if (status != UCS_OK) { ucs_fatal("transport error: %s", ucs_status_string(status)); } } void uct_ud_iface_dispatch_async_comps_do(uct_ud_iface_t *iface) { uct_ud_comp_desc_t *cdesc; uct_ud_send_skb_t *skb; uct_ud_ep_t *ep; do { skb = ucs_queue_pull_elem_non_empty(&iface->tx.async_comp_q, uct_ud_send_skb_t, queue); cdesc = uct_ud_comp_desc(skb); ep = cdesc->ep; if (skb->flags & UCT_UD_SEND_SKB_FLAG_COMP) { ucs_assert(!(ep->flags & UCT_UD_EP_FLAG_DISCONNECTED)); uct_invoke_completion(cdesc->comp, (ucs_status_t)skb->status); } if (ucs_unlikely(skb->flags & UCT_UD_SEND_SKB_FLAG_ERR)) { uct_ud_ep_dispatch_err_comp(ep, skb); } ep->flags &= ~UCT_UD_EP_FLAG_ASYNC_COMPS; skb->flags = 0; ucs_mpool_put(skb); } while (!ucs_queue_is_empty(&iface->tx.async_comp_q)); } static void uct_ud_iface_free_async_comps(uct_ud_iface_t *iface) { uct_ud_send_skb_t *skb; while (!ucs_queue_is_empty(&iface->tx.async_comp_q)) { skb = ucs_queue_pull_elem_non_empty(&iface->tx.async_comp_q, uct_ud_send_skb_t, queue); ucs_mpool_put(skb); } } ucs_status_t uct_ud_iface_dispatch_pending_rx_do(uct_ud_iface_t *iface) { int count; uct_ud_recv_skb_t *skb; uct_ud_neth_t *neth; unsigned max_poll = iface->super.config.rx_max_poll; count = 0; do { skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue); neth = (uct_ud_neth_t *)((char *)uct_ib_iface_recv_desc_hdr(&iface->super, (uct_ib_iface_recv_desc_t *)skb) + UCT_IB_GRH_LEN); uct_ib_iface_invoke_am_desc(&iface->super, uct_ud_neth_get_am_id(neth), neth + 1, skb->u.am.len, &skb->super); count++; if (count >= max_poll) { return UCS_ERR_NO_RESOURCE; } } while (!ucs_queue_is_empty(&iface->rx.pending_q)); return UCS_OK; } static void uct_ud_iface_free_pending_rx(uct_ud_iface_t *iface) { uct_ud_recv_skb_t *skb; while (!ucs_queue_is_empty(&iface->rx.pending_q)) { skb = ucs_queue_pull_elem_non_empty(&iface->rx.pending_q, uct_ud_recv_skb_t, u.am.queue); ucs_mpool_put(skb); } } static inline void uct_ud_iface_async_progress(uct_ud_iface_t *iface) { unsigned ev_count; uct_ud_iface_ops_t *ops; ops = ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t); ev_count = ops->async_progress(iface); if (ev_count > 0) { uct_ud_iface_raise_pending_async_ev(iface); } } static void uct_ud_iface_timer(int timer_id, void *arg) { uct_ud_iface_t *iface = arg; ucs_time_t now; uct_ud_enter(iface); now = uct_ud_iface_get_async_time(iface); ucs_trace_async("iface(%p) slow_timer_sweep: now %lu", iface, now); ucs_twheel_sweep(&iface->async.slow_timer, now); uct_ud_iface_async_progress(iface); uct_ud_leave(iface); } void uct_ud_iface_release_desc(uct_recv_desc_t *self, void *desc) { uct_ud_iface_t *iface = ucs_container_of(self, uct_ud_iface_t, super.release_desc); uct_ud_enter(iface); uct_ib_iface_release_desc(self, desc); uct_ud_leave(iface); } void uct_ud_iface_handle_failure(uct_ib_iface_t *iface, void *arg, ucs_status_t status) { uct_ud_tx_wnd_purge_outstanding(ucs_derived_of(iface, uct_ud_iface_t), (uct_ud_ep_t *)arg, status); } ucs_status_t uct_ud_iface_event_arm(uct_iface_h tl_iface, unsigned events) { uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t); ucs_status_t status; uct_ud_enter(iface); status = uct_ib_iface_pre_arm(&iface->super); if (status != UCS_OK) { goto out; } /* Check if some receives were not delivered yet */ if ((events & (UCT_EVENT_RECV | UCT_EVENT_RECV_SIG)) && !ucs_queue_is_empty(&iface->rx.pending_q)) { status = UCS_ERR_BUSY; goto out; } /* Check if some send completions were not delivered yet */ if ((events & UCT_EVENT_SEND_COMP) && !ucs_queue_is_empty(&iface->tx.async_comp_q)) { status = UCS_ERR_BUSY; goto out; } if (events & UCT_EVENT_SEND_COMP) { status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_TX, 0); if (status != UCS_OK) { goto out; } } if (events & (UCT_EVENT_SEND_COMP | UCT_EVENT_RECV)) { /* we may get send completion through ACKs as well */ status = iface->super.ops->arm_cq(&iface->super, UCT_IB_DIR_RX, 0); if (status != UCS_OK) { goto out; } } status = UCS_OK; out: uct_ud_leave(iface); return status; } void uct_ud_iface_progress_enable(uct_iface_h tl_iface, unsigned flags) { uct_ud_iface_t *iface = ucs_derived_of(tl_iface, uct_ud_iface_t); if (flags & UCT_PROGRESS_RECV) { uct_ud_enter(iface); iface->rx.available += iface->rx.quota; iface->rx.quota = 0; /* let progress (possibly async) post the missing receives */ uct_ud_leave(iface); } uct_base_iface_progress_enable(tl_iface, flags); }