/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #include "ud_ep.h" #include "ud_iface.h" #include "ud_inl.h" #include "ud_def.h" #include #include #include #include #include /* Must be less then peer_timeout to avoid false positive errors taking into * account timer resolution and not too small to avoid performance degradation */ #define UCT_UD_SLOW_TIMER_MAX_TICK(_iface) ((_iface)->config.peer_timeout / 3) static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface); static void uct_ud_peer_name(uct_ud_peer_name_t *peer) { gethostname(peer->name, sizeof(peer->name)); peer->pid = getpid(); } static void uct_ud_ep_set_state(uct_ud_ep_t *ep, uint32_t state) { ep->flags |= state; } #if ENABLE_DEBUG_DATA static void uct_ud_peer_copy(uct_ud_peer_name_t *dst, uct_ud_peer_name_t *src) { memcpy(dst, src, sizeof(*src)); } #else #define uct_ud_peer_copy(dst, src) #endif static void uct_ud_ep_resend_start(uct_ud_iface_t *iface, uct_ud_ep_t *ep) { ep->resend.max_psn = ep->tx.psn - 1; ep->resend.psn = ep->tx.acked_psn + 1; ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_RESEND); } static void uct_ud_ep_resend_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep) { if (UCT_UD_PSN_COMPARE(ep->tx.acked_psn, <, ep->resend.max_psn)) { /* new ack arrived that acked something in our resend window. */ if (UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->tx.acked_psn)) { ucs_debug("ep(%p): ack received during resend resend.psn=%d tx.acked_psn=%d", ep, ep->resend.psn, ep->tx.acked_psn); ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); ep->resend.psn = ep->tx.acked_psn + 1; } uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_RESEND); } else { /* everything in resend window was acked - no need to resend anymore */ ep->resend.psn = ep->resend.max_psn + 1; uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND); } } static void uct_ud_ep_ca_drop(uct_ud_ep_t *ep) { ucs_debug("ep: %p ca drop@cwnd = %d in flight: %d", ep, ep->ca.cwnd, (int)ep->tx.psn-(int)ep->tx.acked_psn-1); ep->ca.cwnd /= UCT_UD_CA_MD_FACTOR; if (ep->ca.cwnd < UCT_UD_CA_MIN_WINDOW) { ep->ca.cwnd = UCT_UD_CA_MIN_WINDOW; } ep->tx.max_psn = ep->tx.acked_psn + ep->ca.cwnd; if (UCT_UD_PSN_COMPARE(ep->tx.max_psn, >, ep->tx.psn)) { /* do not send more until we get acks going */ uct_ud_ep_tx_stop(ep); } } static UCS_F_ALWAYS_INLINE void uct_ud_ep_ca_ack(uct_ud_ep_t *ep) { if (ep->ca.cwnd < ep->ca.wmax) { ep->ca.cwnd += UCT_UD_CA_AI_VALUE; } ep->tx.max_psn = ep->tx.acked_psn + ep->ca.cwnd; } static void uct_ud_ep_reset(uct_ud_ep_t *ep) { ep->tx.psn = UCT_UD_INITIAL_PSN; ep->ca.cwnd = UCT_UD_CA_MIN_WINDOW; ep->ca.wmax = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t)->config.max_window; ep->tx.max_psn = ep->tx.psn + ep->ca.cwnd; ep->tx.acked_psn = UCT_UD_INITIAL_PSN - 1; ep->tx.pending.ops = UCT_UD_EP_OP_NONE; ucs_queue_head_init(&ep->tx.window); ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window); ep->resend.psn = ep->tx.psn; ep->resend.max_psn = ep->tx.acked_psn; ep->rx_creq_count = 0; ep->rx.acked_psn = UCT_UD_INITIAL_PSN - 1; ucs_frag_list_init(ep->tx.psn-1, &ep->rx.ooo_pkts, 0 /*TODO: ooo support */ UCS_STATS_ARG(ep->super.stats)); } static ucs_status_t uct_ud_ep_free_by_timeout(uct_ud_ep_t *ep, uct_ud_iface_t *iface) { uct_ud_iface_ops_t *ops; ucs_time_t diff; diff = ucs_twheel_get_time(&iface->async.slow_timer) - ep->close_time; if (diff > iface->config.peer_timeout) { ucs_debug("ud_ep %p is destroyed after %fs with timeout %fs\n", ep, ucs_time_to_sec(diff), ucs_time_to_sec(iface->config.peer_timeout)); ops = ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t); ops->ep_free(&ep->super.super); return UCS_OK; } return UCS_INPROGRESS; } static void uct_ud_ep_slow_timer(ucs_wtimer_t *self) { uct_ud_ep_t *ep = ucs_container_of(self, uct_ud_ep_t, slow_timer); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); ucs_time_t now; ucs_time_t diff; ucs_status_t status; UCT_UD_EP_HOOK_CALL_TIMER(ep); if (ucs_queue_is_empty(&ep->tx.window)) { /* Do not free the EP until all scheduled communications are done. */ if (ep->flags & UCT_UD_EP_FLAG_DISCONNECTED) { status = uct_ud_ep_free_by_timeout(ep, iface); if (status == UCS_INPROGRESS) { goto again; } } return; } now = ucs_twheel_get_time(&iface->async.slow_timer); diff = now - ep->tx.send_time; if (diff > iface->config.peer_timeout) { ucs_debug("ep %p: timeout of %.2f sec, config::peer_timeout - %.2f sec", ep, ucs_time_to_sec(diff), ucs_time_to_sec(iface->config.peer_timeout)); iface->super.ops->handle_failure(&iface->super, ep, UCS_ERR_ENDPOINT_TIMEOUT); return; } else if (diff > 3*iface->async.slow_tick) { ucs_trace("scheduling resend now: %lu send_time: %lu diff: %lu tick: %lu", now, ep->tx.send_time, now - ep->tx.send_time, ep->tx.slow_tick); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ); uct_ud_ep_ca_drop(ep); uct_ud_ep_resend_start(iface, ep); } else if ((diff > iface->async.slow_tick) && uct_ud_ep_is_connected(ep)) { /* It is possible that the sender is slow. * Try to flush the window twice before going into * full resend mode. */ uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK_REQ); } again: /* Cool down the timer on rescheduling/resending */ ep->tx.slow_tick *= iface->config.slow_timer_backoff; ep->tx.slow_tick = ucs_min(ep->tx.slow_tick, UCT_UD_SLOW_TIMER_MAX_TICK(iface)); ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer, ep->tx.slow_tick); } UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface) { ucs_trace_func(""); memset(self, 0, sizeof(*self)); UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super); self->dest_ep_id = UCT_UD_EP_NULL_ID; uct_ud_ep_reset(self); ucs_list_head_init(&self->cep_list); uct_ud_iface_add_ep(iface, self); self->tx.slow_tick = iface->async.slow_tick; ucs_wtimer_init(&self->slow_timer, uct_ud_ep_slow_timer); ucs_arbiter_group_init(&self->tx.pending.group); ucs_arbiter_elem_init(&self->tx.pending.elem); UCT_UD_EP_HOOK_INIT(self); ucs_debug("created ep ep=%p iface=%p id=%d", self, iface, self->ep_id); return UCS_OK; } static ucs_arbiter_cb_result_t uct_ud_ep_pending_cancel_cb(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem, void *arg) { uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem), uct_ud_ep_t, tx.pending.group); uct_pending_req_t *req; /* we may have pending op on ep */ if (&ep->tx.pending.elem == elem) { /* return ignored by arbiter */ return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } /* uct user should not have anything pending */ req = ucs_container_of(elem, uct_pending_req_t, priv); ucs_warn("ep=%p removing user pending req=%p", ep, req); /* return ignored by arbiter */ return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } static UCS_CLASS_CLEANUP_FUNC(uct_ud_ep_t) { uct_ud_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_ud_iface_t); ucs_trace_func("ep=%p id=%d conn_id=%d", self, self->ep_id, self->conn_id); ucs_wtimer_remove(&self->slow_timer); uct_ud_iface_remove_ep(iface, self); uct_ud_iface_cep_remove(self); ucs_frag_list_cleanup(&self->rx.ooo_pkts); ucs_arbiter_group_purge(&iface->tx.pending_q, &self->tx.pending.group, uct_ud_ep_pending_cancel_cb, 0); if (!ucs_queue_is_empty(&self->tx.window)) { ucs_debug("ep=%p id=%d conn_id=%d has %d unacked packets", self, self->ep_id, self->conn_id, (int)ucs_queue_length(&self->tx.window)); } ucs_arbiter_group_cleanup(&self->tx.pending.group); } UCS_CLASS_DEFINE(uct_ud_ep_t, uct_base_ep_t); void uct_ud_ep_clone(uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep) { uct_ep_t *ep_h = &old_ep->super.super; uct_iface_t *iface_h = ep_h->iface; uct_ud_iface_replace_ep(ucs_derived_of(iface_h, uct_ud_iface_t), old_ep, new_ep); memcpy(new_ep, old_ep, sizeof(uct_ud_ep_t)); } ucs_status_t uct_ud_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr) { uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_ep_addr_t *ep_addr = (uct_ud_ep_addr_t *)addr; uct_ib_pack_uint24(ep_addr->iface_addr.qp_num, iface->qp->qp_num); uct_ib_pack_uint24(ep_addr->ep_id, ep->ep_id); return UCS_OK; } static ucs_status_t uct_ud_ep_connect_to_iface(uct_ud_ep_t *ep, const uct_ib_address_t *ib_addr, const uct_ud_iface_addr_t *if_addr) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super); char buf[128]; ucs_frag_list_cleanup(&ep->rx.ooo_pkts); uct_ud_ep_reset(ep); ucs_debug(UCT_IB_IFACE_FMT" lid %d qpn 0x%x epid %u ep %p connected to " "IFACE %s qpn 0x%x", UCT_IB_IFACE_ARG(&iface->super), dev->port_attr[iface->super.config.port_num - dev->first_port].lid, iface->qp->qp_num, ep->ep_id, ep, uct_ib_address_str(ib_addr, buf, sizeof(buf)), uct_ib_unpack_uint24(if_addr->qp_num)); return UCS_OK; } static ucs_status_t uct_ud_ep_disconnect_from_iface(uct_ep_h tl_ep) { uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t); ucs_frag_list_cleanup(&ep->rx.ooo_pkts); uct_ud_ep_reset(ep); ep->dest_ep_id = UCT_UD_EP_NULL_ID; return UCS_OK; } ucs_status_t uct_ud_ep_create_connected_common(uct_ud_iface_t *iface, const uct_ib_address_t *ib_addr, const uct_ud_iface_addr_t *if_addr, uct_ud_ep_t **new_ep_p, uct_ud_send_skb_t **skb_p) { uct_ep_params_t params; ucs_status_t status; uct_ud_ep_t *ep; uct_ep_h new_ep_h; ep = uct_ud_iface_cep_lookup(iface, ib_addr, if_addr, UCT_UD_EP_CONN_ID_MAX); if (ep) { uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_NOTSENT); ep->flags &= ~UCT_UD_EP_FLAG_PRIVATE; *new_ep_p = ep; *skb_p = NULL; return UCS_ERR_ALREADY_EXISTS; } params.field_mask = UCT_EP_PARAM_FIELD_IFACE; params.iface = &iface->super.super.super; status = uct_ep_create(¶ms, &new_ep_h); if (status != UCS_OK) { return status; } ep = ucs_derived_of(new_ep_h, uct_ud_ep_t); status = uct_ud_ep_connect_to_iface(ep, ib_addr, if_addr); if (status != UCS_OK) { return status; } status = uct_ud_iface_cep_insert(iface, ib_addr, if_addr, ep, UCT_UD_EP_CONN_ID_MAX); if (status != UCS_OK) { goto err_cep_insert; } *skb_p = uct_ud_ep_prepare_creq(ep); if (!*skb_p) { status = UCS_ERR_NO_RESOURCE; uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREQ); } *new_ep_p = ep; return status; err_cep_insert: uct_ud_ep_disconnect_from_iface(&ep->super.super); return status; } void uct_ud_ep_destroy_connected(uct_ud_ep_t *ep, const uct_ib_address_t *ib_addr, const uct_ud_iface_addr_t *if_addr) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_iface_cep_rollback(iface, ib_addr, if_addr, ep); uct_ud_ep_disconnect_from_iface(&ep->super.super); } ucs_status_t uct_ud_ep_connect_to_ep(uct_ud_ep_t *ep, const uct_ib_address_t *ib_addr, const uct_ud_ep_addr_t *ep_addr) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super); char buf[128]; ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID); ucs_trace_func(""); ep->dest_ep_id = uct_ib_unpack_uint24(ep_addr->ep_id); ucs_frag_list_cleanup(&ep->rx.ooo_pkts); uct_ud_ep_reset(ep); ucs_debug(UCT_IB_IFACE_FMT" slid %d qpn 0x%x epid %u connected to %s qpn 0x%x " "epid %u", UCT_IB_IFACE_ARG(&iface->super), dev->port_attr[iface->super.config.port_num - dev->first_port].lid, iface->qp->qp_num, ep->ep_id, uct_ib_address_str(ib_addr, buf, sizeof(buf)), uct_ib_unpack_uint24(ep_addr->iface_addr.qp_num), ep->dest_ep_id); return UCS_OK; } static UCS_F_ALWAYS_INLINE void uct_ud_iface_add_async_comp(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_send_skb_t *skb, ucs_status_t status) { uct_ud_comp_desc_t *cdesc; skb->status = status; if (status != UCS_OK) { if (!(skb->flags & UCT_UD_SEND_SKB_FLAG_COMP)) { skb->len = 0; } if (status == UCS_ERR_ENDPOINT_TIMEOUT) { skb->flags |= UCT_UD_SEND_SKB_FLAG_ERR; ++ep->tx.err_skb_count; } else if (status == UCS_ERR_CANCELED) { skb->flags |= UCT_UD_SEND_SKB_FLAG_CANCEL; } } cdesc = uct_ud_comp_desc(skb); /* don't call user completion from async context. instead, put * it on a queue which will be progressed from main thread. */ ucs_queue_push(&iface->tx.async_comp_q, &skb->queue); cdesc->ep = ep; ep->flags |= UCT_UD_EP_FLAG_ASYNC_COMPS; } static UCS_F_ALWAYS_INLINE void uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_psn_t ack_psn, int is_async) { uct_ud_send_skb_t *skb; if (ucs_unlikely(UCT_UD_PSN_COMPARE(ack_psn, <=, ep->tx.acked_psn))) { return; } ep->tx.acked_psn = ack_psn; /* Release acknowledged skb's */ ucs_queue_for_each_extract(skb, &ep->tx.window, queue, UCT_UD_PSN_COMPARE(skb->neth->psn, <=, ack_psn)) { if (ucs_unlikely(skb->flags & UCT_UD_SEND_SKB_FLAG_COMP)) { if (ucs_unlikely(is_async)) { uct_ud_iface_add_async_comp(iface, ep, skb, UCS_OK); continue; } uct_invoke_completion(uct_ud_comp_desc(skb)->comp, UCS_OK); } skb->flags = 0; /* reset also ACK_REQ flag */ ucs_mpool_put(skb); } uct_ud_ep_ca_ack(ep); if (ucs_unlikely(UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->resend.max_psn))) { uct_ud_ep_resend_ack(iface, ep); } ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); ep->tx.slow_tick = iface->async.slow_tick; ep->tx.send_time = uct_ud_iface_get_async_time(iface); } static inline void uct_ud_ep_rx_put(uct_ud_neth_t *neth, unsigned byte_len) { uct_ud_put_hdr_t *put_hdr; put_hdr = (uct_ud_put_hdr_t *)(neth+1); memcpy((void *)put_hdr->rva, put_hdr+1, byte_len - sizeof(*neth) - sizeof(*put_hdr)); } static uct_ud_ep_t *uct_ud_ep_create_passive(uct_ud_iface_t *iface, uct_ud_ctl_hdr_t *ctl) { uct_ep_params_t params; uct_ud_ep_t *ep; ucs_status_t status; uct_ep_t *ep_h; /* create new endpoint */ params.field_mask = UCT_EP_PARAM_FIELD_IFACE; params.iface = &iface->super.super.super; status = uct_ep_create(¶ms, &ep_h); ucs_assert_always(status == UCS_OK); ep = ucs_derived_of(ep_h, uct_ud_ep_t); status = uct_ep_connect_to_ep(ep_h, (void*)uct_ud_creq_ib_addr(ctl), (void*)&ctl->conn_req.ep_addr); ucs_assert_always(status == UCS_OK); status = uct_ud_iface_cep_insert(iface, uct_ud_creq_ib_addr(ctl), &ctl->conn_req.ep_addr.iface_addr, ep, ctl->conn_req.conn_id); ucs_assert_always(status == UCS_OK); return ep; } static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth) { uct_ud_ep_t *ep; uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t *)(neth + 1); ucs_assert_always(ctl->type == UCT_UD_PACKET_CREQ); ep = uct_ud_iface_cep_lookup(iface, uct_ud_creq_ib_addr(ctl), &ctl->conn_req.ep_addr.iface_addr, ctl->conn_req.conn_id); if (!ep) { ep = uct_ud_ep_create_passive(iface, ctl); ucs_assert_always(ep != NULL); ep->rx.ooo_pkts.head_sn = neth->psn; uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer)); uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_PRIVATE); } else { if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) { /* simultanuous CREQ */ ep->dest_ep_id = uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id); ep->rx.ooo_pkts.head_sn = neth->psn; uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer)); ucs_debug("simultanuous CREQ ep=%p" "(iface=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u)", ep, iface, ep->conn_id, ep->ep_id, ep->dest_ep_id, ep->rx.ooo_pkts.head_sn); if (UCT_UD_PSN_COMPARE(ep->tx.psn, >, UCT_UD_INITIAL_PSN)) { /* our own creq was sent, treat incoming creq as ack and remove our own * from tx window */ uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0); } uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP); } } ++ep->rx_creq_count; ucs_assert_always(ctl->conn_req.conn_id == ep->conn_id); ucs_assert_always(uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id) == ep->dest_ep_id); /* creq must always have same psn */ ucs_assertv_always(ep->rx.ooo_pkts.head_sn == neth->psn, "iface=%p ep=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u " "neth_psn=%u ep_flags=0x%x ctl_ops=0x%x rx_creq_count=%d", iface, ep, ep->conn_id, ep->ep_id, ep->dest_ep_id, ep->rx.ooo_pkts.head_sn, neth->psn, ep->flags, ep->tx.pending.ops, ep->rx_creq_count); /* scedule connection reply op */ UCT_UD_EP_HOOK_CALL_RX(ep, neth, sizeof(*neth) + sizeof(*ctl)); if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) { uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_NOTSENT); } uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_RCVD); } static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_ud_neth_t *neth, uct_ud_recv_skb_t *skb) { uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t*)(neth + 1); ucs_trace_func(""); ucs_assert_always(ctl->type == UCT_UD_PACKET_CREP); ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID || ep->dest_ep_id == ctl->conn_rep.src_ep_id); /* Discard duplicate CREP */ if (UCT_UD_PSN_COMPARE(neth->psn, <, ep->rx.ooo_pkts.head_sn)) { return; } ep->rx.ooo_pkts.head_sn = neth->psn; ep->dest_ep_id = ctl->conn_rep.src_ep_id; ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer)); uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_RCVD); } uct_ud_send_skb_t *uct_ud_ep_prepare_creq(uct_ud_ep_t *ep) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_ctl_hdr_t *creq; uct_ud_send_skb_t *skb; uct_ud_neth_t *neth; ucs_status_t status; ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID); ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID); /* CREQ should not be sent if CREP for the counter CREQ is scheduled * (or sent already) */ ucs_assertv_always(!uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP) && !(ep->flags & UCT_UD_EP_FLAG_CREP_SENT), "iface=%p ep=%p conn_id=%d rx_psn=%u ep_flags=0x%x " "ctl_ops=0x%x rx_creq_count=%d", iface, ep, ep->conn_id, ep->rx.ooo_pkts.head_sn, ep->flags, ep->tx.pending.ops, ep->rx_creq_count); skb = uct_ud_iface_get_tx_skb(iface, ep); if (!skb) { return NULL; } neth = skb->neth; uct_ud_neth_init_data(ep, neth); neth->packet_type = UCT_UD_EP_NULL_ID; neth->packet_type |= UCT_UD_PACKET_FLAG_CTL; creq = (uct_ud_ctl_hdr_t *)(neth + 1); creq->type = UCT_UD_PACKET_CREQ; creq->conn_req.conn_id = ep->conn_id; status = uct_ud_ep_get_address(&ep->super.super, (void*)&creq->conn_req.ep_addr); if (status != UCS_OK) { return NULL; } status = uct_ib_iface_get_device_address(&iface->super.super.super, (uct_device_addr_t*)uct_ud_creq_ib_addr(creq)); if (status != UCS_OK) { return NULL; } uct_ud_peer_name(ucs_unaligned_ptr(&creq->peer)); skb->len = sizeof(*neth) + sizeof(*creq) + iface->super.addr_size; return skb; } void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned byte_len, uct_ud_recv_skb_t *skb, int is_async) { uint32_t dest_id; uint32_t is_am, am_id; uct_ud_ep_t *ep = 0; /* todo: check why gcc complaints about uninitialized var */ ucs_frag_list_ooo_type_t ooo_type; UCT_UD_IFACE_HOOK_CALL_RX(iface, neth, byte_len); dest_id = uct_ud_neth_get_dest_id(neth); am_id = uct_ud_neth_get_am_id(neth); is_am = neth->packet_type & UCT_UD_PACKET_FLAG_AM; if (ucs_unlikely(dest_id == UCT_UD_EP_NULL_ID)) { /* must be connection request packet */ uct_ud_ep_rx_creq(iface, neth); goto out; } else if (ucs_unlikely(!ucs_ptr_array_lookup(&iface->eps, dest_id, ep) || ep->ep_id != dest_id)) { /* Drop the packet because it is * allowed to do disconnect without flush/barrier. So it * is possible to get packet for the ep that has been destroyed */ ucs_trace("RX: failed to find ep %d, dropping packet", dest_id); goto out; } ucs_assert(ep->ep_id != UCT_UD_EP_NULL_ID); UCT_UD_EP_HOOK_CALL_RX(ep, neth, byte_len); uct_ud_ep_process_ack(iface, ep, neth->ack_psn, is_async); if (ucs_unlikely(neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ)) { uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK); ucs_trace_data("ACK_REQ - schedule ack, head_sn=%d sn=%d", ep->rx.ooo_pkts.head_sn, neth->psn); } if (ucs_unlikely(!is_am)) { if ((size_t)byte_len == sizeof(*neth)) { goto out; } if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) { uct_ud_ep_rx_ctl(iface, ep, neth, skb); goto out; } } ooo_type = ucs_frag_list_insert(&ep->rx.ooo_pkts, &skb->u.ooo.elem, neth->psn); if (ucs_unlikely(ooo_type != UCS_FRAG_LIST_INSERT_FAST)) { if (ooo_type != UCS_FRAG_LIST_INSERT_DUP && ooo_type != UCS_FRAG_LIST_INSERT_FAIL) { ucs_fatal("Out of order is not implemented: got %d", ooo_type); } ucs_trace_data("DUP/OOB - schedule ack, head_sn=%d sn=%d", ep->rx.ooo_pkts.head_sn, neth->psn); uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK); goto out; } if (ucs_unlikely(!is_am && (neth->packet_type & UCT_UD_PACKET_FLAG_PUT))) { /* TODO: remove once ucp implements put */ uct_ud_ep_rx_put(neth, byte_len); goto out; } if (ucs_unlikely(is_async && !(iface->super.super.am[am_id].flags & UCT_CB_FLAG_ASYNC))) { skb->u.am.len = byte_len - sizeof(*neth); ucs_queue_push(&iface->rx.pending_q, &skb->u.am.queue); } else { /* Avoid reordering with respect to pending operations, if user AM handler * initiates sends from any endpoint created on the iface. * This flag would be cleared after all incoming messages * are processed. */ uct_ud_iface_raise_pending_async_ev(iface); uct_ib_iface_invoke_am_desc(&iface->super, am_id, neth + 1, byte_len - sizeof(*neth), &skb->super); } return; out: ucs_mpool_put(skb); } ucs_status_t uct_ud_ep_flush_nolock(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uct_completion_t *comp) { uct_ud_send_skb_t *skb; uct_ud_psn_t psn; if (ucs_unlikely(!uct_ud_ep_is_connected(ep))) { /* check for CREQ either being scheduled or sent and waiting for CREP ack */ if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ) || !ucs_queue_is_empty(&ep->tx.window)) { return UCS_ERR_NO_RESOURCE; /* connection in progress */ } return UCS_OK; /* Nothing was ever sent */ } if (!uct_ud_iface_can_tx(iface) || !uct_ud_iface_has_skbs(iface) || uct_ud_ep_no_window(ep)) { /* iface/ep has no resources, prevent reordering with possible pending * operations by not starting the flush. */ return UCS_ERR_NO_RESOURCE; } if (ucs_queue_is_empty(&ep->tx.window)) { uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ); /* Check if have pending async completions for this ep, * if not - all was acknowledged, nothing is pending - return OK * if yes - continue to add * */ if (!(ep->flags & UCT_UD_EP_FLAG_ASYNC_COMPS)) { return UCS_OK; } /* * If we have pending async completion, and the user requested a callback, * add a new async completion in the queue. */ if (comp != NULL) { skb = ucs_mpool_get(&iface->tx.mp); if (skb == NULL) { return UCS_ERR_NO_RESOURCE; } skb->flags = UCT_UD_SEND_SKB_FLAG_COMP; skb->len = 0; uct_ud_comp_desc(skb)->comp = comp; uct_ud_comp_desc(skb)->ep = ep; ucs_queue_push(&iface->tx.async_comp_q, &skb->queue); } } else { skb = ucs_queue_tail_elem_non_empty(&ep->tx.window, uct_ud_send_skb_t, queue); psn = skb->neth->psn; if (!(skb->flags & UCT_UD_SEND_SKB_FLAG_ACK_REQ)) { /* If we didn't ask for ACK on last skb, send an ACK_REQ message. * It will speed up the flush because we will not have to wait untill * retransmit is triggered. * Also, prevent from sending more control messages like this after * first time by turning on the flag on the last skb. */ /* Since the function can be called from the arbiter context it is * impossible to schedule a control operation. So just raise a * flag and if there is no other control send ACK_REQ directly. * * If there is other control arbiter will take care of it. */ ep->tx.pending.ops |= UCT_UD_EP_OP_ACK_REQ; if (uct_ud_ep_ctl_op_check_ex(ep, UCT_UD_EP_OP_ACK_REQ)) { uct_ud_ep_do_pending_ctl(ep, iface); } skb->flags |= UCT_UD_SEND_SKB_FLAG_ACK_REQ; } /* If the user requested a callback, add a dummy skb to the window which * will be released when the current sequence number is acknowledged. */ if (comp != NULL) { skb = ucs_mpool_get(&iface->tx.mp); if (skb == NULL) { return UCS_ERR_NO_RESOURCE; } /* Add dummy skb to the window, which would call user completion * callback when getting ACK. */ skb->flags = UCT_UD_SEND_SKB_FLAG_COMP; skb->len = sizeof(skb->neth[0]); skb->neth->packet_type = 0; skb->neth->psn = psn; uct_ud_comp_desc(skb)->comp = comp; ucs_assert(psn == (uct_ud_psn_t)(ep->tx.psn - 1)); uct_ud_neth_set_dest_id(skb->neth, UCT_UD_EP_NULL_ID); ucs_queue_push(&ep->tx.window, &skb->queue); ucs_trace_data("added dummy flush skb %p psn %d user_comp %p", skb, skb->neth->psn, comp); } } return UCS_INPROGRESS; } void uct_ud_tx_wnd_purge_outstanding(uct_ud_iface_t *iface, uct_ud_ep_t *ud_ep, ucs_status_t status) { uct_ud_send_skb_t *skb; uct_ud_ep_tx_stop(ud_ep); ucs_queue_for_each_extract(skb, &ud_ep->tx.window, queue, 1) { uct_ud_iface_add_async_comp(iface, ud_ep, skb, status); } } ucs_status_t uct_ud_ep_flush(uct_ep_h ep_h, unsigned flags, uct_completion_t *comp) { ucs_status_t status; uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_enter(iface); if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) { uct_ud_tx_wnd_purge_outstanding(iface, ep, UCS_ERR_CANCELED); uct_ud_iface_dispatch_zcopy_comps(iface); uct_ep_pending_purge(ep_h, NULL, 0); /* Open window after cancellation for next sending */ uct_ud_ep_ca_ack(ep); status = UCS_OK; goto out; } if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) { status = UCS_ERR_NO_RESOURCE; goto out; } status = uct_ud_ep_flush_nolock(iface, ep, comp); if (status == UCS_OK) { UCT_TL_EP_STAT_FLUSH(&ep->super); } else if (status == UCS_INPROGRESS) { UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super); } out: uct_ud_leave(iface); return status; } static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep) { uct_ud_send_skb_t *skb; uct_ud_neth_t *neth; uct_ud_ctl_hdr_t *crep; uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); ucs_assert_always(ep->dest_ep_id != UCT_UD_EP_NULL_ID); ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID); /* Check that CREQ is neither sheduled nor waiting for CREP ack */ ucs_assertv_always(!uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ) && ucs_queue_is_empty(&ep->tx.window), "iface=%p ep=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u " "ep_flags=0x%x ctl_ops=0x%x rx_creq_count=%d", iface, ep, ep->conn_id, ep->ep_id, ep->dest_ep_id, ep->rx.ooo_pkts.head_sn, ep->flags, ep->tx.pending.ops, ep->rx_creq_count); skb = uct_ud_iface_get_tx_skb(iface, ep); if (!skb) { return NULL; } neth = skb->neth; uct_ud_neth_init_data(ep, neth); neth->packet_type = ep->dest_ep_id; neth->packet_type |= (UCT_UD_PACKET_FLAG_ACK_REQ|UCT_UD_PACKET_FLAG_CTL); crep = (uct_ud_ctl_hdr_t *)(neth + 1); crep->type = UCT_UD_PACKET_CREP; crep->conn_rep.src_ep_id = ep->ep_id; uct_ud_peer_name(ucs_unaligned_ptr(&crep->peer)); skb->len = sizeof(*neth) + sizeof(*crep); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP); return skb; } static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep) { uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_send_skb_t *skb, *sent_skb; ucs_queue_iter_t resend_pos; uct_ud_zcopy_desc_t *zdesc; size_t iov_it; /* check window */ resend_pos = (void*)ep->resend.pos; sent_skb = ucs_queue_iter_elem(sent_skb, resend_pos, queue); if (sent_skb == NULL) { uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND); return NULL; } ucs_assert(((uintptr_t)sent_skb % UCT_UD_SKB_ALIGN) == 0); if (UCT_UD_PSN_COMPARE(sent_skb->neth->psn, >=, ep->tx.max_psn)) { ucs_debug("ep(%p): out of window(psn=%d/max_psn=%d) - can not resend more", ep, sent_skb ? sent_skb->neth->psn : -1, ep->tx.max_psn); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND); return NULL; } /* skip dummy skb created for non-blocking flush */ if ((uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) && !(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_CTL)) { ep->resend.pos = ucs_queue_iter_next(resend_pos); return NULL; } /* creq/crep must remove creq packet from window */ ucs_assertv_always(!(uct_ud_ep_is_connected(ep) && (uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) && !(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)), "ep(%p): CREQ resend on endpoint which is already connected", ep); skb = uct_ud_iface_resend_skb_get(iface); ucs_assert_always(skb != NULL); ep->resend.pos = ucs_queue_iter_next(resend_pos); ep->resend.psn = sent_skb->neth->psn; memcpy(skb->neth, sent_skb->neth, sent_skb->len); skb->neth->ack_psn = ep->rx.acked_psn; skb->len = sent_skb->len; if (sent_skb->flags & UCT_UD_SEND_SKB_FLAG_ZCOPY) { zdesc = uct_ud_zcopy_desc(sent_skb); for (iov_it = 0; iov_it < zdesc->iovcnt; ++iov_it) { if (zdesc->iov[iov_it].length > 0) { memcpy((char *)skb->neth + skb->len, zdesc->iov[iov_it].buffer, zdesc->iov[iov_it].length); skb->len += zdesc->iov[iov_it].length; } } } /* force ack request on every Nth packet or on first packet in resend window */ if ((skb->neth->psn % UCT_UD_RESENDS_PER_ACK) == 0 || UCT_UD_PSN_COMPARE(skb->neth->psn, ==, ep->tx.acked_psn+1)) { skb->neth->packet_type |= UCT_UD_PACKET_FLAG_ACK_REQ; } else { skb->neth->packet_type &= ~UCT_UD_PACKET_FLAG_ACK_REQ; } ucs_debug("ep(%p): resending rt_psn %u rt_max_psn %u acked_psn %u max_psn %u ack_req %d", ep, ep->resend.psn, ep->resend.max_psn, ep->tx.acked_psn, ep->tx.max_psn, skb->neth->packet_type&UCT_UD_PACKET_FLAG_ACK_REQ ? 1 : 0); if (UCT_UD_PSN_COMPARE(ep->resend.psn, ==, ep->resend.max_psn)) { ucs_debug("ep(%p): resending completed", ep); ep->resend.psn = ep->resend.max_psn + 1; uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND); } return skb; } static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface) { uct_ud_send_skb_t *skb; int flag = 0; if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) { skb = uct_ud_ep_prepare_creq(ep); if (skb) { flag = 1; uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_SENT); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ); } } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP)) { skb = uct_ud_ep_prepare_crep(ep); if (skb) { flag = 1; uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_SENT); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP); } } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_RESEND)) { skb = uct_ud_ep_resend(ep); } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK)) { if (uct_ud_ep_is_connected(ep)) { if (iface->config.max_inline >= sizeof(uct_ud_neth_t)) { skb = ucs_unaligned_ptr(&iface->tx.skb_inl.super); } else { skb = uct_ud_iface_resend_skb_get(iface); skb->len = sizeof(uct_ud_neth_t); } uct_ud_neth_ctl_ack(ep, skb->neth); } else { /* Do not send ACKs if not connected yet. It may happen if * CREQ and CREP from peer are lost. Need to wait for CREP * resending by peer. */ skb = NULL; } uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK); } else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK_REQ)) { if (iface->config.max_inline >= sizeof(uct_ud_neth_t)) { skb = ucs_unaligned_ptr(&iface->tx.skb_inl.super); } else { skb = uct_ud_iface_resend_skb_get(iface); skb->len = sizeof(uct_ud_neth_t); } uct_ud_neth_ctl_ack_req(ep, skb->neth); uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ); } else if (uct_ud_ep_ctl_op_isany(ep)) { ucs_fatal("unsupported pending op mask: %x", ep->tx.pending.ops); } else { skb = 0; } if (!skb) { /* no pending - nothing to do */ return; } VALGRIND_MAKE_MEM_DEFINED(skb, sizeof *skb); ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, flag); if (flag) { /* creq and crep allocate real skb, it must be put on window like * a regular packet to ensure a retransmission. */ uct_ud_iface_complete_tx_skb(iface, ep, skb); } else { uct_ud_iface_resend_skb_put(iface, skb); } } static inline ucs_arbiter_cb_result_t uct_ud_ep_ctl_op_next(uct_ud_ep_t *ep) { if (uct_ud_ep_ctl_op_isany(ep)) { /* can send more control - come here later */ return UCS_ARBITER_CB_RESULT_NEXT_GROUP; } /* no more control - nothing to do in * this dispatch cycle. */ return UCS_ARBITER_CB_RESULT_RESCHED_GROUP; } /** * pending operations are processed according to priority: * - high prio control: * - creq request * - crep reply * - resends * - pending uct requests * - low prio control: ack reply/ack requests * * Low priority control can be send along with user data, so * there is a good chance that processing pending uct reqs will * also deal with the low prio control. * However we can not let pending uct req block control forever. */ ucs_arbiter_cb_result_t uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem, void *arg) { uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t, priv); uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem), uct_ud_ep_t, tx.pending.group); uct_ud_iface_t *iface = ucs_container_of(arbiter, uct_ud_iface_t, tx.pending_q); uintptr_t in_async_progress = (uintptr_t)arg; int allow_callback; int async_before_pending; ucs_status_t status; /* check if we have global resources * - tx_wqe * - skb * control messages does not need skb. */ if (!uct_ud_iface_can_tx(iface)) { return UCS_ARBITER_CB_RESULT_STOP; } /* here we rely on the fact that arbiter * will start next dispatch cycle from the * next group. * So it is ok to stop if there is no ctl. * However in worst case only one ctl per * dispatch cycle will be send. */ if (!uct_ud_iface_has_skbs(iface) && !uct_ud_ep_ctl_op_isany(ep)) { return UCS_ARBITER_CB_RESULT_STOP; } /* we can desched group: iff * - no control * - no ep resources (connect or window) **/ if (!uct_ud_ep_ctl_op_isany(ep) && (!uct_ud_ep_is_connected(ep) || uct_ud_ep_no_window(ep))) { return UCS_ARBITER_CB_RESULT_DESCHED_GROUP; } if (&ep->tx.pending.elem == elem) { uct_ud_ep_do_pending_ctl(ep, iface); if (uct_ud_ep_ctl_op_isany(ep)) { /* there is still some ctl left. go to next group */ return UCS_ARBITER_CB_RESULT_NEXT_GROUP; } else { /* no more ctl - dummy elem can be removed */ return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } } /* user pending can be send iff * - not in async progress * - there are no high priority pending control messages */ allow_callback = !in_async_progress || (uct_ud_pending_req_priv(req)->flags & UCT_CB_FLAG_ASYNC); if (allow_callback && !uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CTL_HI_PRIO)) { ucs_assert(!(ep->flags & UCT_UD_EP_FLAG_IN_PENDING)); ep->flags |= UCT_UD_EP_FLAG_IN_PENDING; async_before_pending = iface->tx.async_before_pending; if (uct_ud_pending_req_priv(req)->flags & UCT_CB_FLAG_ASYNC) { /* temporary reset the flag to unblock sends from async context */ iface->tx.async_before_pending = 0; } status = req->func(req); iface->tx.async_before_pending = async_before_pending; ep->flags &= ~UCT_UD_EP_FLAG_IN_PENDING; if (status == UCS_INPROGRESS) { return UCS_ARBITER_CB_RESULT_NEXT_GROUP; } else if (status != UCS_OK) { /* avoid deadlock: send low priority ctl if user cb failed * no need to check for low prio here because we * already checked above. */ uct_ud_ep_do_pending_ctl(ep, iface); return uct_ud_ep_ctl_op_next(ep); } return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } /* try to send ctl messages */ uct_ud_ep_do_pending_ctl(ep, iface); if (in_async_progress) { return uct_ud_ep_ctl_op_next(ep); } else { /* we still didn't process the current pending request because of hi-prio * control messages, so cannot stop sending yet. If we stop, not all * resources will be exhausted and out-of-order with pending can occur. * (pending control ops may be cleared by uct_ud_ep_do_pending_ctl) */ return UCS_ARBITER_CB_RESULT_NEXT_GROUP; } } ucs_status_t uct_ud_ep_pending_add(uct_ep_h ep_h, uct_pending_req_t *req, unsigned flags) { uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_ud_enter(iface); /* if there was an async progress all 'send' ops return * UCS_ERR_NO_RESOURCE. If we return UCS_ERR_BUSY there will * be a deadlock. * So we must skip a resource check and add a pending op in order to * avoid a deadlock. */ if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) { goto add_req; } if (uct_ud_iface_can_tx(iface) && uct_ud_iface_has_skbs(iface) && uct_ud_ep_is_connected(ep) && !uct_ud_ep_no_window(ep)) { uct_ud_leave(iface); return UCS_ERR_BUSY; } add_req: UCS_STATIC_ASSERT(sizeof(uct_ud_pending_req_priv_t) <= UCT_PENDING_REQ_PRIV_LEN); uct_ud_pending_req_priv(req)->flags = flags; uct_pending_req_arb_group_push(&ep->tx.pending.group, req); ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); ucs_trace_data("ud ep %p: added pending req %p tx_psn %d acked_psn %d cwnd %d", ep, req, ep->tx.psn, ep->tx.acked_psn, ep->ca.cwnd); UCT_TL_EP_STAT_PEND(&ep->super); uct_ud_leave(iface); return UCS_OK; } static ucs_arbiter_cb_result_t uct_ud_ep_pending_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem, void *arg) { uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem), uct_ud_ep_t, tx.pending.group); uct_purge_cb_args_t *cb_args = arg; uct_pending_purge_callback_t cb = cb_args->cb; uct_pending_req_t *req; if (&ep->tx.pending.elem == elem) { /* return ignored by arbiter */ return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } req = ucs_container_of(elem, uct_pending_req_t, priv); if (cb) { cb(req, cb_args->arg); } else { ucs_debug("ep=%p cancelling user pending request %p", ep, req); } /* return ignored by arbiter */ return UCS_ARBITER_CB_RESULT_REMOVE_ELEM; } void uct_ud_ep_pending_purge(uct_ep_h ep_h, uct_pending_purge_callback_t cb, void *arg) { uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t); uct_purge_cb_args_t args = {cb, arg}; uct_ud_enter(iface); ucs_arbiter_group_purge(&iface->tx.pending_q, &ep->tx.pending.group, uct_ud_ep_pending_purge_cb, &args); if (uct_ud_ep_ctl_op_isany(ep)) { ucs_arbiter_group_push_elem(&ep->tx.pending.group, &ep->tx.pending.elem); ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group); } uct_ud_leave(iface); } void uct_ud_ep_disconnect(uct_ep_h tl_ep) { uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t); uct_ud_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_ud_iface_t); ucs_debug("ep %p: disconnect", ep); /* cancel user pending */ uct_ud_ep_pending_purge(tl_ep, NULL, NULL); /* schedule flush */ uct_ud_ep_flush(tl_ep, 0, NULL); /* the EP will be destroyed by interface destroy or timeout in * uct_ud_ep_slow_timer */ ep->close_time = ucs_twheel_get_time(&iface->async.slow_timer); ep->flags |= UCT_UD_EP_FLAG_DISCONNECTED; ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer, UCT_UD_SLOW_TIMER_MAX_TICK(iface)); }