/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "ud_ep.h"
#include "ud_iface.h"
#include "ud_inl.h"
#include "ud_def.h"
#include <uct/api/uct_def.h>
#include <uct/ib/base/ib_verbs.h>
#include <ucs/debug/memtrack.h>
#include <ucs/debug/log.h>
#include <ucs/time/time.h>
/* Must be less then peer_timeout to avoid false positive errors taking into
* account timer resolution and not too small to avoid performance degradation
*/
#define UCT_UD_SLOW_TIMER_MAX_TICK(_iface) ((_iface)->config.peer_timeout / 3)
static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface);
static void uct_ud_peer_name(uct_ud_peer_name_t *peer)
{
gethostname(peer->name, sizeof(peer->name));
peer->pid = getpid();
}
static void uct_ud_ep_set_state(uct_ud_ep_t *ep, uint32_t state)
{
ep->flags |= state;
}
#if ENABLE_DEBUG_DATA
static void uct_ud_peer_copy(uct_ud_peer_name_t *dst, uct_ud_peer_name_t *src)
{
memcpy(dst, src, sizeof(*src));
}
#else
#define uct_ud_peer_copy(dst, src)
#endif
static void uct_ud_ep_resend_start(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
{
ep->resend.max_psn = ep->tx.psn - 1;
ep->resend.psn = ep->tx.acked_psn + 1;
ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window);
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_RESEND);
}
static void uct_ud_ep_resend_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep)
{
if (UCT_UD_PSN_COMPARE(ep->tx.acked_psn, <, ep->resend.max_psn)) {
/* new ack arrived that acked something in our resend window. */
if (UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->tx.acked_psn)) {
ucs_debug("ep(%p): ack received during resend resend.psn=%d tx.acked_psn=%d",
ep, ep->resend.psn, ep->tx.acked_psn);
ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window);
ep->resend.psn = ep->tx.acked_psn + 1;
}
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_RESEND);
} else {
/* everything in resend window was acked - no need to resend anymore */
ep->resend.psn = ep->resend.max_psn + 1;
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND);
}
}
static void uct_ud_ep_ca_drop(uct_ud_ep_t *ep)
{
ucs_debug("ep: %p ca drop@cwnd = %d in flight: %d",
ep, ep->ca.cwnd, (int)ep->tx.psn-(int)ep->tx.acked_psn-1);
ep->ca.cwnd /= UCT_UD_CA_MD_FACTOR;
if (ep->ca.cwnd < UCT_UD_CA_MIN_WINDOW) {
ep->ca.cwnd = UCT_UD_CA_MIN_WINDOW;
}
ep->tx.max_psn = ep->tx.acked_psn + ep->ca.cwnd;
if (UCT_UD_PSN_COMPARE(ep->tx.max_psn, >, ep->tx.psn)) {
/* do not send more until we get acks going */
uct_ud_ep_tx_stop(ep);
}
}
static UCS_F_ALWAYS_INLINE void uct_ud_ep_ca_ack(uct_ud_ep_t *ep)
{
if (ep->ca.cwnd < ep->ca.wmax) {
ep->ca.cwnd += UCT_UD_CA_AI_VALUE;
}
ep->tx.max_psn = ep->tx.acked_psn + ep->ca.cwnd;
}
static void uct_ud_ep_reset(uct_ud_ep_t *ep)
{
ep->tx.psn = UCT_UD_INITIAL_PSN;
ep->ca.cwnd = UCT_UD_CA_MIN_WINDOW;
ep->ca.wmax = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t)->config.max_window;
ep->tx.max_psn = ep->tx.psn + ep->ca.cwnd;
ep->tx.acked_psn = UCT_UD_INITIAL_PSN - 1;
ep->tx.pending.ops = UCT_UD_EP_OP_NONE;
ucs_queue_head_init(&ep->tx.window);
ep->resend.pos = ucs_queue_iter_begin(&ep->tx.window);
ep->resend.psn = ep->tx.psn;
ep->resend.max_psn = ep->tx.acked_psn;
ep->rx_creq_count = 0;
ep->rx.acked_psn = UCT_UD_INITIAL_PSN - 1;
ucs_frag_list_init(ep->tx.psn-1, &ep->rx.ooo_pkts, 0 /*TODO: ooo support */
UCS_STATS_ARG(ep->super.stats));
}
static ucs_status_t uct_ud_ep_free_by_timeout(uct_ud_ep_t *ep,
uct_ud_iface_t *iface)
{
uct_ud_iface_ops_t *ops;
ucs_time_t diff;
diff = ucs_twheel_get_time(&iface->async.slow_timer) - ep->close_time;
if (diff > iface->config.peer_timeout) {
ucs_debug("ud_ep %p is destroyed after %fs with timeout %fs\n",
ep, ucs_time_to_sec(diff),
ucs_time_to_sec(iface->config.peer_timeout));
ops = ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t);
ops->ep_free(&ep->super.super);
return UCS_OK;
}
return UCS_INPROGRESS;
}
static void uct_ud_ep_slow_timer(ucs_wtimer_t *self)
{
uct_ud_ep_t *ep = ucs_container_of(self, uct_ud_ep_t, slow_timer);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t);
ucs_time_t now;
ucs_time_t diff;
ucs_status_t status;
UCT_UD_EP_HOOK_CALL_TIMER(ep);
if (ucs_queue_is_empty(&ep->tx.window)) {
/* Do not free the EP until all scheduled communications are done. */
if (ep->flags & UCT_UD_EP_FLAG_DISCONNECTED) {
status = uct_ud_ep_free_by_timeout(ep, iface);
if (status == UCS_INPROGRESS) {
goto again;
}
}
return;
}
now = ucs_twheel_get_time(&iface->async.slow_timer);
diff = now - ep->tx.send_time;
if (diff > iface->config.peer_timeout) {
ucs_debug("ep %p: timeout of %.2f sec, config::peer_timeout - %.2f sec",
ep, ucs_time_to_sec(diff),
ucs_time_to_sec(iface->config.peer_timeout));
iface->super.ops->handle_failure(&iface->super, ep,
UCS_ERR_ENDPOINT_TIMEOUT);
return;
} else if (diff > 3*iface->async.slow_tick) {
ucs_trace("scheduling resend now: %lu send_time: %lu diff: %lu tick: %lu",
now, ep->tx.send_time, now - ep->tx.send_time,
ep->tx.slow_tick);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ);
uct_ud_ep_ca_drop(ep);
uct_ud_ep_resend_start(iface, ep);
} else if ((diff > iface->async.slow_tick) && uct_ud_ep_is_connected(ep)) {
/* It is possible that the sender is slow.
* Try to flush the window twice before going into
* full resend mode.
*/
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK_REQ);
}
again:
/* Cool down the timer on rescheduling/resending */
ep->tx.slow_tick *= iface->config.slow_timer_backoff;
ep->tx.slow_tick = ucs_min(ep->tx.slow_tick,
UCT_UD_SLOW_TIMER_MAX_TICK(iface));
ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer, ep->tx.slow_tick);
}
UCS_CLASS_INIT_FUNC(uct_ud_ep_t, uct_ud_iface_t *iface)
{
ucs_trace_func("");
memset(self, 0, sizeof(*self));
UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super.super);
self->dest_ep_id = UCT_UD_EP_NULL_ID;
uct_ud_ep_reset(self);
ucs_list_head_init(&self->cep_list);
uct_ud_iface_add_ep(iface, self);
self->tx.slow_tick = iface->async.slow_tick;
ucs_wtimer_init(&self->slow_timer, uct_ud_ep_slow_timer);
ucs_arbiter_group_init(&self->tx.pending.group);
ucs_arbiter_elem_init(&self->tx.pending.elem);
UCT_UD_EP_HOOK_INIT(self);
ucs_debug("created ep ep=%p iface=%p id=%d", self, iface, self->ep_id);
return UCS_OK;
}
static ucs_arbiter_cb_result_t
uct_ud_ep_pending_cancel_cb(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem,
void *arg)
{
uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem),
uct_ud_ep_t, tx.pending.group);
uct_pending_req_t *req;
/* we may have pending op on ep */
if (&ep->tx.pending.elem == elem) {
/* return ignored by arbiter */
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
/* uct user should not have anything pending */
req = ucs_container_of(elem, uct_pending_req_t, priv);
ucs_warn("ep=%p removing user pending req=%p", ep, req);
/* return ignored by arbiter */
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
static UCS_CLASS_CLEANUP_FUNC(uct_ud_ep_t)
{
uct_ud_iface_t *iface = ucs_derived_of(self->super.super.iface, uct_ud_iface_t);
ucs_trace_func("ep=%p id=%d conn_id=%d", self, self->ep_id, self->conn_id);
ucs_wtimer_remove(&self->slow_timer);
uct_ud_iface_remove_ep(iface, self);
uct_ud_iface_cep_remove(self);
ucs_frag_list_cleanup(&self->rx.ooo_pkts);
ucs_arbiter_group_purge(&iface->tx.pending_q, &self->tx.pending.group,
uct_ud_ep_pending_cancel_cb, 0);
if (!ucs_queue_is_empty(&self->tx.window)) {
ucs_debug("ep=%p id=%d conn_id=%d has %d unacked packets",
self, self->ep_id, self->conn_id,
(int)ucs_queue_length(&self->tx.window));
}
ucs_arbiter_group_cleanup(&self->tx.pending.group);
}
UCS_CLASS_DEFINE(uct_ud_ep_t, uct_base_ep_t);
void uct_ud_ep_clone(uct_ud_ep_t *old_ep, uct_ud_ep_t *new_ep)
{
uct_ep_t *ep_h = &old_ep->super.super;
uct_iface_t *iface_h = ep_h->iface;
uct_ud_iface_replace_ep(ucs_derived_of(iface_h, uct_ud_iface_t), old_ep, new_ep);
memcpy(new_ep, old_ep, sizeof(uct_ud_ep_t));
}
ucs_status_t uct_ud_ep_get_address(uct_ep_h tl_ep, uct_ep_addr_t *addr)
{
uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ud_ep_addr_t *ep_addr = (uct_ud_ep_addr_t *)addr;
uct_ib_pack_uint24(ep_addr->iface_addr.qp_num, iface->qp->qp_num);
uct_ib_pack_uint24(ep_addr->ep_id, ep->ep_id);
return UCS_OK;
}
static ucs_status_t uct_ud_ep_connect_to_iface(uct_ud_ep_t *ep,
const uct_ib_address_t *ib_addr,
const uct_ud_iface_addr_t *if_addr)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
char buf[128];
ucs_frag_list_cleanup(&ep->rx.ooo_pkts);
uct_ud_ep_reset(ep);
ucs_debug(UCT_IB_IFACE_FMT" lid %d qpn 0x%x epid %u ep %p connected to "
"IFACE %s qpn 0x%x", UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id, ep,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(if_addr->qp_num));
return UCS_OK;
}
static ucs_status_t uct_ud_ep_disconnect_from_iface(uct_ep_h tl_ep)
{
uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t);
ucs_frag_list_cleanup(&ep->rx.ooo_pkts);
uct_ud_ep_reset(ep);
ep->dest_ep_id = UCT_UD_EP_NULL_ID;
return UCS_OK;
}
ucs_status_t uct_ud_ep_create_connected_common(uct_ud_iface_t *iface,
const uct_ib_address_t *ib_addr,
const uct_ud_iface_addr_t *if_addr,
uct_ud_ep_t **new_ep_p,
uct_ud_send_skb_t **skb_p)
{
uct_ep_params_t params;
ucs_status_t status;
uct_ud_ep_t *ep;
uct_ep_h new_ep_h;
ep = uct_ud_iface_cep_lookup(iface, ib_addr, if_addr, UCT_UD_EP_CONN_ID_MAX);
if (ep) {
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_NOTSENT);
ep->flags &= ~UCT_UD_EP_FLAG_PRIVATE;
*new_ep_p = ep;
*skb_p = NULL;
return UCS_ERR_ALREADY_EXISTS;
}
params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
params.iface = &iface->super.super.super;
status = uct_ep_create(¶ms, &new_ep_h);
if (status != UCS_OK) {
return status;
}
ep = ucs_derived_of(new_ep_h, uct_ud_ep_t);
status = uct_ud_ep_connect_to_iface(ep, ib_addr, if_addr);
if (status != UCS_OK) {
return status;
}
status = uct_ud_iface_cep_insert(iface, ib_addr, if_addr, ep, UCT_UD_EP_CONN_ID_MAX);
if (status != UCS_OK) {
goto err_cep_insert;
}
*skb_p = uct_ud_ep_prepare_creq(ep);
if (!*skb_p) {
status = UCS_ERR_NO_RESOURCE;
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREQ);
}
*new_ep_p = ep;
return status;
err_cep_insert:
uct_ud_ep_disconnect_from_iface(&ep->super.super);
return status;
}
void uct_ud_ep_destroy_connected(uct_ud_ep_t *ep,
const uct_ib_address_t *ib_addr,
const uct_ud_iface_addr_t *if_addr)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ud_iface_cep_rollback(iface, ib_addr, if_addr, ep);
uct_ud_ep_disconnect_from_iface(&ep->super.super);
}
ucs_status_t uct_ud_ep_connect_to_ep(uct_ud_ep_t *ep,
const uct_ib_address_t *ib_addr,
const uct_ud_ep_addr_t *ep_addr)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ib_device_t UCS_V_UNUSED *dev = uct_ib_iface_device(&iface->super);
char buf[128];
ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID);
ucs_trace_func("");
ep->dest_ep_id = uct_ib_unpack_uint24(ep_addr->ep_id);
ucs_frag_list_cleanup(&ep->rx.ooo_pkts);
uct_ud_ep_reset(ep);
ucs_debug(UCT_IB_IFACE_FMT" slid %d qpn 0x%x epid %u connected to %s qpn 0x%x "
"epid %u", UCT_IB_IFACE_ARG(&iface->super),
dev->port_attr[iface->super.config.port_num - dev->first_port].lid,
iface->qp->qp_num, ep->ep_id,
uct_ib_address_str(ib_addr, buf, sizeof(buf)),
uct_ib_unpack_uint24(ep_addr->iface_addr.qp_num), ep->dest_ep_id);
return UCS_OK;
}
static UCS_F_ALWAYS_INLINE void
uct_ud_iface_add_async_comp(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
uct_ud_send_skb_t *skb, ucs_status_t status)
{
uct_ud_comp_desc_t *cdesc;
skb->status = status;
if (status != UCS_OK) {
if (!(skb->flags & UCT_UD_SEND_SKB_FLAG_COMP)) {
skb->len = 0;
}
if (status == UCS_ERR_ENDPOINT_TIMEOUT) {
skb->flags |= UCT_UD_SEND_SKB_FLAG_ERR;
++ep->tx.err_skb_count;
} else if (status == UCS_ERR_CANCELED) {
skb->flags |= UCT_UD_SEND_SKB_FLAG_CANCEL;
}
}
cdesc = uct_ud_comp_desc(skb);
/* don't call user completion from async context. instead, put
* it on a queue which will be progressed from main thread.
*/
ucs_queue_push(&iface->tx.async_comp_q, &skb->queue);
cdesc->ep = ep;
ep->flags |= UCT_UD_EP_FLAG_ASYNC_COMPS;
}
static UCS_F_ALWAYS_INLINE void
uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
uct_ud_psn_t ack_psn, int is_async)
{
uct_ud_send_skb_t *skb;
if (ucs_unlikely(UCT_UD_PSN_COMPARE(ack_psn, <=, ep->tx.acked_psn))) {
return;
}
ep->tx.acked_psn = ack_psn;
/* Release acknowledged skb's */
ucs_queue_for_each_extract(skb, &ep->tx.window, queue,
UCT_UD_PSN_COMPARE(skb->neth->psn, <=, ack_psn)) {
if (ucs_unlikely(skb->flags & UCT_UD_SEND_SKB_FLAG_COMP)) {
if (ucs_unlikely(is_async)) {
uct_ud_iface_add_async_comp(iface, ep, skb, UCS_OK);
continue;
}
uct_invoke_completion(uct_ud_comp_desc(skb)->comp, UCS_OK);
}
skb->flags = 0; /* reset also ACK_REQ flag */
ucs_mpool_put(skb);
}
uct_ud_ep_ca_ack(ep);
if (ucs_unlikely(UCT_UD_PSN_COMPARE(ep->resend.psn, <=, ep->resend.max_psn))) {
uct_ud_ep_resend_ack(iface, ep);
}
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
ep->tx.slow_tick = iface->async.slow_tick;
ep->tx.send_time = uct_ud_iface_get_async_time(iface);
}
static inline void uct_ud_ep_rx_put(uct_ud_neth_t *neth, unsigned byte_len)
{
uct_ud_put_hdr_t *put_hdr;
put_hdr = (uct_ud_put_hdr_t *)(neth+1);
memcpy((void *)put_hdr->rva, put_hdr+1,
byte_len - sizeof(*neth) - sizeof(*put_hdr));
}
static uct_ud_ep_t *uct_ud_ep_create_passive(uct_ud_iface_t *iface, uct_ud_ctl_hdr_t *ctl)
{
uct_ep_params_t params;
uct_ud_ep_t *ep;
ucs_status_t status;
uct_ep_t *ep_h;
/* create new endpoint */
params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
params.iface = &iface->super.super.super;
status = uct_ep_create(¶ms, &ep_h);
ucs_assert_always(status == UCS_OK);
ep = ucs_derived_of(ep_h, uct_ud_ep_t);
status = uct_ep_connect_to_ep(ep_h, (void*)uct_ud_creq_ib_addr(ctl),
(void*)&ctl->conn_req.ep_addr);
ucs_assert_always(status == UCS_OK);
status = uct_ud_iface_cep_insert(iface, uct_ud_creq_ib_addr(ctl),
&ctl->conn_req.ep_addr.iface_addr,
ep, ctl->conn_req.conn_id);
ucs_assert_always(status == UCS_OK);
return ep;
}
static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
{
uct_ud_ep_t *ep;
uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t *)(neth + 1);
ucs_assert_always(ctl->type == UCT_UD_PACKET_CREQ);
ep = uct_ud_iface_cep_lookup(iface, uct_ud_creq_ib_addr(ctl),
&ctl->conn_req.ep_addr.iface_addr,
ctl->conn_req.conn_id);
if (!ep) {
ep = uct_ud_ep_create_passive(iface, ctl);
ucs_assert_always(ep != NULL);
ep->rx.ooo_pkts.head_sn = neth->psn;
uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer));
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_PRIVATE);
} else {
if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) {
/* simultanuous CREQ */
ep->dest_ep_id = uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id);
ep->rx.ooo_pkts.head_sn = neth->psn;
uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer));
ucs_debug("simultanuous CREQ ep=%p"
"(iface=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u)",
ep, iface, ep->conn_id, ep->ep_id,
ep->dest_ep_id, ep->rx.ooo_pkts.head_sn);
if (UCT_UD_PSN_COMPARE(ep->tx.psn, >, UCT_UD_INITIAL_PSN)) {
/* our own creq was sent, treat incoming creq as ack and remove our own
* from tx window
*/
uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0);
}
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
}
}
++ep->rx_creq_count;
ucs_assert_always(ctl->conn_req.conn_id == ep->conn_id);
ucs_assert_always(uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id) == ep->dest_ep_id);
/* creq must always have same psn */
ucs_assertv_always(ep->rx.ooo_pkts.head_sn == neth->psn,
"iface=%p ep=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u "
"neth_psn=%u ep_flags=0x%x ctl_ops=0x%x rx_creq_count=%d",
iface, ep, ep->conn_id, ep->ep_id, ep->dest_ep_id,
ep->rx.ooo_pkts.head_sn, neth->psn, ep->flags,
ep->tx.pending.ops, ep->rx_creq_count);
/* scedule connection reply op */
UCT_UD_EP_HOOK_CALL_RX(ep, neth, sizeof(*neth) + sizeof(*ctl));
if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) {
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_NOTSENT);
}
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ);
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_RCVD);
}
static void uct_ud_ep_rx_ctl(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
uct_ud_neth_t *neth, uct_ud_recv_skb_t *skb)
{
uct_ud_ctl_hdr_t *ctl = (uct_ud_ctl_hdr_t*)(neth + 1);
ucs_trace_func("");
ucs_assert_always(ctl->type == UCT_UD_PACKET_CREP);
ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID ||
ep->dest_ep_id == ctl->conn_rep.src_ep_id);
/* Discard duplicate CREP */
if (UCT_UD_PSN_COMPARE(neth->psn, <, ep->rx.ooo_pkts.head_sn)) {
return;
}
ep->rx.ooo_pkts.head_sn = neth->psn;
ep->dest_ep_id = ctl->conn_rep.src_ep_id;
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer));
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_RCVD);
}
uct_ud_send_skb_t *uct_ud_ep_prepare_creq(uct_ud_ep_t *ep)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ud_ctl_hdr_t *creq;
uct_ud_send_skb_t *skb;
uct_ud_neth_t *neth;
ucs_status_t status;
ucs_assert_always(ep->dest_ep_id == UCT_UD_EP_NULL_ID);
ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID);
/* CREQ should not be sent if CREP for the counter CREQ is scheduled
* (or sent already) */
ucs_assertv_always(!uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP) &&
!(ep->flags & UCT_UD_EP_FLAG_CREP_SENT),
"iface=%p ep=%p conn_id=%d rx_psn=%u ep_flags=0x%x "
"ctl_ops=0x%x rx_creq_count=%d",
iface, ep, ep->conn_id, ep->rx.ooo_pkts.head_sn,
ep->flags, ep->tx.pending.ops, ep->rx_creq_count);
skb = uct_ud_iface_get_tx_skb(iface, ep);
if (!skb) {
return NULL;
}
neth = skb->neth;
uct_ud_neth_init_data(ep, neth);
neth->packet_type = UCT_UD_EP_NULL_ID;
neth->packet_type |= UCT_UD_PACKET_FLAG_CTL;
creq = (uct_ud_ctl_hdr_t *)(neth + 1);
creq->type = UCT_UD_PACKET_CREQ;
creq->conn_req.conn_id = ep->conn_id;
status = uct_ud_ep_get_address(&ep->super.super,
(void*)&creq->conn_req.ep_addr);
if (status != UCS_OK) {
return NULL;
}
status = uct_ib_iface_get_device_address(&iface->super.super.super,
(uct_device_addr_t*)uct_ud_creq_ib_addr(creq));
if (status != UCS_OK) {
return NULL;
}
uct_ud_peer_name(ucs_unaligned_ptr(&creq->peer));
skb->len = sizeof(*neth) + sizeof(*creq) + iface->super.addr_size;
return skb;
}
void uct_ud_ep_process_rx(uct_ud_iface_t *iface, uct_ud_neth_t *neth, unsigned byte_len,
uct_ud_recv_skb_t *skb, int is_async)
{
uint32_t dest_id;
uint32_t is_am, am_id;
uct_ud_ep_t *ep = 0; /* todo: check why gcc complaints about uninitialized var */
ucs_frag_list_ooo_type_t ooo_type;
UCT_UD_IFACE_HOOK_CALL_RX(iface, neth, byte_len);
dest_id = uct_ud_neth_get_dest_id(neth);
am_id = uct_ud_neth_get_am_id(neth);
is_am = neth->packet_type & UCT_UD_PACKET_FLAG_AM;
if (ucs_unlikely(dest_id == UCT_UD_EP_NULL_ID)) {
/* must be connection request packet */
uct_ud_ep_rx_creq(iface, neth);
goto out;
} else if (ucs_unlikely(!ucs_ptr_array_lookup(&iface->eps, dest_id, ep) ||
ep->ep_id != dest_id))
{
/* Drop the packet because it is
* allowed to do disconnect without flush/barrier. So it
* is possible to get packet for the ep that has been destroyed
*/
ucs_trace("RX: failed to find ep %d, dropping packet", dest_id);
goto out;
}
ucs_assert(ep->ep_id != UCT_UD_EP_NULL_ID);
UCT_UD_EP_HOOK_CALL_RX(ep, neth, byte_len);
uct_ud_ep_process_ack(iface, ep, neth->ack_psn, is_async);
if (ucs_unlikely(neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ)) {
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK);
ucs_trace_data("ACK_REQ - schedule ack, head_sn=%d sn=%d",
ep->rx.ooo_pkts.head_sn, neth->psn);
}
if (ucs_unlikely(!is_am)) {
if ((size_t)byte_len == sizeof(*neth)) {
goto out;
}
if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) {
uct_ud_ep_rx_ctl(iface, ep, neth, skb);
goto out;
}
}
ooo_type = ucs_frag_list_insert(&ep->rx.ooo_pkts, &skb->u.ooo.elem, neth->psn);
if (ucs_unlikely(ooo_type != UCS_FRAG_LIST_INSERT_FAST)) {
if (ooo_type != UCS_FRAG_LIST_INSERT_DUP &&
ooo_type != UCS_FRAG_LIST_INSERT_FAIL) {
ucs_fatal("Out of order is not implemented: got %d", ooo_type);
}
ucs_trace_data("DUP/OOB - schedule ack, head_sn=%d sn=%d",
ep->rx.ooo_pkts.head_sn, neth->psn);
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_ACK);
goto out;
}
if (ucs_unlikely(!is_am && (neth->packet_type & UCT_UD_PACKET_FLAG_PUT))) {
/* TODO: remove once ucp implements put */
uct_ud_ep_rx_put(neth, byte_len);
goto out;
}
if (ucs_unlikely(is_async &&
!(iface->super.super.am[am_id].flags & UCT_CB_FLAG_ASYNC))) {
skb->u.am.len = byte_len - sizeof(*neth);
ucs_queue_push(&iface->rx.pending_q, &skb->u.am.queue);
} else {
/* Avoid reordering with respect to pending operations, if user AM handler
* initiates sends from any endpoint created on the iface.
* This flag would be cleared after all incoming messages
* are processed. */
uct_ud_iface_raise_pending_async_ev(iface);
uct_ib_iface_invoke_am_desc(&iface->super, am_id, neth + 1,
byte_len - sizeof(*neth), &skb->super);
}
return;
out:
ucs_mpool_put(skb);
}
ucs_status_t uct_ud_ep_flush_nolock(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
uct_completion_t *comp)
{
uct_ud_send_skb_t *skb;
uct_ud_psn_t psn;
if (ucs_unlikely(!uct_ud_ep_is_connected(ep))) {
/* check for CREQ either being scheduled or sent and waiting for CREP ack */
if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ) ||
!ucs_queue_is_empty(&ep->tx.window))
{
return UCS_ERR_NO_RESOURCE; /* connection in progress */
}
return UCS_OK; /* Nothing was ever sent */
}
if (!uct_ud_iface_can_tx(iface) || !uct_ud_iface_has_skbs(iface) ||
uct_ud_ep_no_window(ep))
{
/* iface/ep has no resources, prevent reordering with possible pending
* operations by not starting the flush.
*/
return UCS_ERR_NO_RESOURCE;
}
if (ucs_queue_is_empty(&ep->tx.window)) {
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ);
/* Check if have pending async completions for this ep,
* if not - all was acknowledged, nothing is pending - return OK
* if yes - continue to add
* */
if (!(ep->flags & UCT_UD_EP_FLAG_ASYNC_COMPS)) {
return UCS_OK;
}
/*
* If we have pending async completion, and the user requested a callback,
* add a new async completion in the queue.
*/
if (comp != NULL) {
skb = ucs_mpool_get(&iface->tx.mp);
if (skb == NULL) {
return UCS_ERR_NO_RESOURCE;
}
skb->flags = UCT_UD_SEND_SKB_FLAG_COMP;
skb->len = 0;
uct_ud_comp_desc(skb)->comp = comp;
uct_ud_comp_desc(skb)->ep = ep;
ucs_queue_push(&iface->tx.async_comp_q, &skb->queue);
}
} else {
skb = ucs_queue_tail_elem_non_empty(&ep->tx.window, uct_ud_send_skb_t, queue);
psn = skb->neth->psn;
if (!(skb->flags & UCT_UD_SEND_SKB_FLAG_ACK_REQ)) {
/* If we didn't ask for ACK on last skb, send an ACK_REQ message.
* It will speed up the flush because we will not have to wait untill
* retransmit is triggered.
* Also, prevent from sending more control messages like this after
* first time by turning on the flag on the last skb.
*/
/* Since the function can be called from the arbiter context it is
* impossible to schedule a control operation. So just raise a
* flag and if there is no other control send ACK_REQ directly.
*
* If there is other control arbiter will take care of it.
*/
ep->tx.pending.ops |= UCT_UD_EP_OP_ACK_REQ;
if (uct_ud_ep_ctl_op_check_ex(ep, UCT_UD_EP_OP_ACK_REQ)) {
uct_ud_ep_do_pending_ctl(ep, iface);
}
skb->flags |= UCT_UD_SEND_SKB_FLAG_ACK_REQ;
}
/* If the user requested a callback, add a dummy skb to the window which
* will be released when the current sequence number is acknowledged.
*/
if (comp != NULL) {
skb = ucs_mpool_get(&iface->tx.mp);
if (skb == NULL) {
return UCS_ERR_NO_RESOURCE;
}
/* Add dummy skb to the window, which would call user completion
* callback when getting ACK.
*/
skb->flags = UCT_UD_SEND_SKB_FLAG_COMP;
skb->len = sizeof(skb->neth[0]);
skb->neth->packet_type = 0;
skb->neth->psn = psn;
uct_ud_comp_desc(skb)->comp = comp;
ucs_assert(psn == (uct_ud_psn_t)(ep->tx.psn - 1));
uct_ud_neth_set_dest_id(skb->neth, UCT_UD_EP_NULL_ID);
ucs_queue_push(&ep->tx.window, &skb->queue);
ucs_trace_data("added dummy flush skb %p psn %d user_comp %p", skb,
skb->neth->psn, comp);
}
}
return UCS_INPROGRESS;
}
void uct_ud_tx_wnd_purge_outstanding(uct_ud_iface_t *iface, uct_ud_ep_t *ud_ep,
ucs_status_t status)
{
uct_ud_send_skb_t *skb;
uct_ud_ep_tx_stop(ud_ep);
ucs_queue_for_each_extract(skb, &ud_ep->tx.window, queue, 1) {
uct_ud_iface_add_async_comp(iface, ud_ep, skb, status);
}
}
ucs_status_t uct_ud_ep_flush(uct_ep_h ep_h, unsigned flags,
uct_completion_t *comp)
{
ucs_status_t status;
uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t);
uct_ud_enter(iface);
if (ucs_unlikely(flags & UCT_FLUSH_FLAG_CANCEL)) {
uct_ud_tx_wnd_purge_outstanding(iface, ep, UCS_ERR_CANCELED);
uct_ud_iface_dispatch_zcopy_comps(iface);
uct_ep_pending_purge(ep_h, NULL, 0);
/* Open window after cancellation for next sending */
uct_ud_ep_ca_ack(ep);
status = UCS_OK;
goto out;
}
if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) {
status = UCS_ERR_NO_RESOURCE;
goto out;
}
status = uct_ud_ep_flush_nolock(iface, ep, comp);
if (status == UCS_OK) {
UCT_TL_EP_STAT_FLUSH(&ep->super);
} else if (status == UCS_INPROGRESS) {
UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
}
out:
uct_ud_leave(iface);
return status;
}
static uct_ud_send_skb_t *uct_ud_ep_prepare_crep(uct_ud_ep_t *ep)
{
uct_ud_send_skb_t *skb;
uct_ud_neth_t *neth;
uct_ud_ctl_hdr_t *crep;
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
ucs_assert_always(ep->dest_ep_id != UCT_UD_EP_NULL_ID);
ucs_assert_always(ep->ep_id != UCT_UD_EP_NULL_ID);
/* Check that CREQ is neither sheduled nor waiting for CREP ack */
ucs_assertv_always(!uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ) &&
ucs_queue_is_empty(&ep->tx.window),
"iface=%p ep=%p conn_id=%d ep_id=%d, dest_ep_id=%d rx_psn=%u "
"ep_flags=0x%x ctl_ops=0x%x rx_creq_count=%d",
iface, ep, ep->conn_id, ep->ep_id, ep->dest_ep_id,
ep->rx.ooo_pkts.head_sn, ep->flags, ep->tx.pending.ops,
ep->rx_creq_count);
skb = uct_ud_iface_get_tx_skb(iface, ep);
if (!skb) {
return NULL;
}
neth = skb->neth;
uct_ud_neth_init_data(ep, neth);
neth->packet_type = ep->dest_ep_id;
neth->packet_type |= (UCT_UD_PACKET_FLAG_ACK_REQ|UCT_UD_PACKET_FLAG_CTL);
crep = (uct_ud_ctl_hdr_t *)(neth + 1);
crep->type = UCT_UD_PACKET_CREP;
crep->conn_rep.src_ep_id = ep->ep_id;
uct_ud_peer_name(ucs_unaligned_ptr(&crep->peer));
skb->len = sizeof(*neth) + sizeof(*crep);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP);
return skb;
}
static uct_ud_send_skb_t *uct_ud_ep_resend(uct_ud_ep_t *ep)
{
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface, uct_ud_iface_t);
uct_ud_send_skb_t *skb, *sent_skb;
ucs_queue_iter_t resend_pos;
uct_ud_zcopy_desc_t *zdesc;
size_t iov_it;
/* check window */
resend_pos = (void*)ep->resend.pos;
sent_skb = ucs_queue_iter_elem(sent_skb, resend_pos, queue);
if (sent_skb == NULL) {
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND);
return NULL;
}
ucs_assert(((uintptr_t)sent_skb % UCT_UD_SKB_ALIGN) == 0);
if (UCT_UD_PSN_COMPARE(sent_skb->neth->psn, >=, ep->tx.max_psn)) {
ucs_debug("ep(%p): out of window(psn=%d/max_psn=%d) - can not resend more",
ep, sent_skb ? sent_skb->neth->psn : -1, ep->tx.max_psn);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND);
return NULL;
}
/* skip dummy skb created for non-blocking flush */
if ((uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) &&
!(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_CTL))
{
ep->resend.pos = ucs_queue_iter_next(resend_pos);
return NULL;
}
/* creq/crep must remove creq packet from window */
ucs_assertv_always(!(uct_ud_ep_is_connected(ep) &&
(uct_ud_neth_get_dest_id(sent_skb->neth) == UCT_UD_EP_NULL_ID) &&
!(sent_skb->neth->packet_type & UCT_UD_PACKET_FLAG_AM)),
"ep(%p): CREQ resend on endpoint which is already connected", ep);
skb = uct_ud_iface_resend_skb_get(iface);
ucs_assert_always(skb != NULL);
ep->resend.pos = ucs_queue_iter_next(resend_pos);
ep->resend.psn = sent_skb->neth->psn;
memcpy(skb->neth, sent_skb->neth, sent_skb->len);
skb->neth->ack_psn = ep->rx.acked_psn;
skb->len = sent_skb->len;
if (sent_skb->flags & UCT_UD_SEND_SKB_FLAG_ZCOPY) {
zdesc = uct_ud_zcopy_desc(sent_skb);
for (iov_it = 0; iov_it < zdesc->iovcnt; ++iov_it) {
if (zdesc->iov[iov_it].length > 0) {
memcpy((char *)skb->neth + skb->len, zdesc->iov[iov_it].buffer,
zdesc->iov[iov_it].length);
skb->len += zdesc->iov[iov_it].length;
}
}
}
/* force ack request on every Nth packet or on first packet in resend window */
if ((skb->neth->psn % UCT_UD_RESENDS_PER_ACK) == 0 ||
UCT_UD_PSN_COMPARE(skb->neth->psn, ==, ep->tx.acked_psn+1)) {
skb->neth->packet_type |= UCT_UD_PACKET_FLAG_ACK_REQ;
} else {
skb->neth->packet_type &= ~UCT_UD_PACKET_FLAG_ACK_REQ;
}
ucs_debug("ep(%p): resending rt_psn %u rt_max_psn %u acked_psn %u max_psn %u ack_req %d",
ep, ep->resend.psn, ep->resend.max_psn,
ep->tx.acked_psn, ep->tx.max_psn,
skb->neth->packet_type&UCT_UD_PACKET_FLAG_ACK_REQ ? 1 : 0);
if (UCT_UD_PSN_COMPARE(ep->resend.psn, ==, ep->resend.max_psn)) {
ucs_debug("ep(%p): resending completed", ep);
ep->resend.psn = ep->resend.max_psn + 1;
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_RESEND);
}
return skb;
}
static void uct_ud_ep_do_pending_ctl(uct_ud_ep_t *ep, uct_ud_iface_t *iface)
{
uct_ud_send_skb_t *skb;
int flag = 0;
if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREQ)) {
skb = uct_ud_ep_prepare_creq(ep);
if (skb) {
flag = 1;
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREQ_SENT);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREQ);
}
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CREP)) {
skb = uct_ud_ep_prepare_crep(ep);
if (skb) {
flag = 1;
uct_ud_ep_set_state(ep, UCT_UD_EP_FLAG_CREP_SENT);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_CREP);
}
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_RESEND)) {
skb = uct_ud_ep_resend(ep);
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK)) {
if (uct_ud_ep_is_connected(ep)) {
if (iface->config.max_inline >= sizeof(uct_ud_neth_t)) {
skb = ucs_unaligned_ptr(&iface->tx.skb_inl.super);
} else {
skb = uct_ud_iface_resend_skb_get(iface);
skb->len = sizeof(uct_ud_neth_t);
}
uct_ud_neth_ctl_ack(ep, skb->neth);
} else {
/* Do not send ACKs if not connected yet. It may happen if
* CREQ and CREP from peer are lost. Need to wait for CREP
* resending by peer. */
skb = NULL;
}
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK);
} else if (uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_ACK_REQ)) {
if (iface->config.max_inline >= sizeof(uct_ud_neth_t)) {
skb = ucs_unaligned_ptr(&iface->tx.skb_inl.super);
} else {
skb = uct_ud_iface_resend_skb_get(iface);
skb->len = sizeof(uct_ud_neth_t);
}
uct_ud_neth_ctl_ack_req(ep, skb->neth);
uct_ud_ep_ctl_op_del(ep, UCT_UD_EP_OP_ACK_REQ);
} else if (uct_ud_ep_ctl_op_isany(ep)) {
ucs_fatal("unsupported pending op mask: %x", ep->tx.pending.ops);
} else {
skb = 0;
}
if (!skb) {
/* no pending - nothing to do */
return;
}
VALGRIND_MAKE_MEM_DEFINED(skb, sizeof *skb);
ucs_derived_of(iface->super.ops, uct_ud_iface_ops_t)->tx_skb(ep, skb, flag);
if (flag) {
/* creq and crep allocate real skb, it must be put on window like
* a regular packet to ensure a retransmission.
*/
uct_ud_iface_complete_tx_skb(iface, ep, skb);
} else {
uct_ud_iface_resend_skb_put(iface, skb);
}
}
static inline ucs_arbiter_cb_result_t
uct_ud_ep_ctl_op_next(uct_ud_ep_t *ep)
{
if (uct_ud_ep_ctl_op_isany(ep)) {
/* can send more control - come here later */
return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
}
/* no more control - nothing to do in
* this dispatch cycle. */
return UCS_ARBITER_CB_RESULT_RESCHED_GROUP;
}
/**
* pending operations are processed according to priority:
* - high prio control:
* - creq request
* - crep reply
* - resends
* - pending uct requests
* - low prio control: ack reply/ack requests
*
* Low priority control can be send along with user data, so
* there is a good chance that processing pending uct reqs will
* also deal with the low prio control.
* However we can not let pending uct req block control forever.
*/
ucs_arbiter_cb_result_t
uct_ud_ep_do_pending(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem,
void *arg)
{
uct_pending_req_t *req = ucs_container_of(elem, uct_pending_req_t,
priv);
uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem),
uct_ud_ep_t,
tx.pending.group);
uct_ud_iface_t *iface = ucs_container_of(arbiter, uct_ud_iface_t,
tx.pending_q);
uintptr_t in_async_progress = (uintptr_t)arg;
int allow_callback;
int async_before_pending;
ucs_status_t status;
/* check if we have global resources
* - tx_wqe
* - skb
* control messages does not need skb.
*/
if (!uct_ud_iface_can_tx(iface)) {
return UCS_ARBITER_CB_RESULT_STOP;
}
/* here we rely on the fact that arbiter
* will start next dispatch cycle from the
* next group.
* So it is ok to stop if there is no ctl.
* However in worst case only one ctl per
* dispatch cycle will be send.
*/
if (!uct_ud_iface_has_skbs(iface) && !uct_ud_ep_ctl_op_isany(ep)) {
return UCS_ARBITER_CB_RESULT_STOP;
}
/* we can desched group: iff
* - no control
* - no ep resources (connect or window)
**/
if (!uct_ud_ep_ctl_op_isany(ep) &&
(!uct_ud_ep_is_connected(ep) ||
uct_ud_ep_no_window(ep))) {
return UCS_ARBITER_CB_RESULT_DESCHED_GROUP;
}
if (&ep->tx.pending.elem == elem) {
uct_ud_ep_do_pending_ctl(ep, iface);
if (uct_ud_ep_ctl_op_isany(ep)) {
/* there is still some ctl left. go to next group */
return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
} else {
/* no more ctl - dummy elem can be removed */
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
}
/* user pending can be send iff
* - not in async progress
* - there are no high priority pending control messages
*/
allow_callback = !in_async_progress ||
(uct_ud_pending_req_priv(req)->flags & UCT_CB_FLAG_ASYNC);
if (allow_callback && !uct_ud_ep_ctl_op_check(ep, UCT_UD_EP_OP_CTL_HI_PRIO)) {
ucs_assert(!(ep->flags & UCT_UD_EP_FLAG_IN_PENDING));
ep->flags |= UCT_UD_EP_FLAG_IN_PENDING;
async_before_pending = iface->tx.async_before_pending;
if (uct_ud_pending_req_priv(req)->flags & UCT_CB_FLAG_ASYNC) {
/* temporary reset the flag to unblock sends from async context */
iface->tx.async_before_pending = 0;
}
status = req->func(req);
iface->tx.async_before_pending = async_before_pending;
ep->flags &= ~UCT_UD_EP_FLAG_IN_PENDING;
if (status == UCS_INPROGRESS) {
return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
} else if (status != UCS_OK) {
/* avoid deadlock: send low priority ctl if user cb failed
* no need to check for low prio here because we
* already checked above.
*/
uct_ud_ep_do_pending_ctl(ep, iface);
return uct_ud_ep_ctl_op_next(ep);
}
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
/* try to send ctl messages */
uct_ud_ep_do_pending_ctl(ep, iface);
if (in_async_progress) {
return uct_ud_ep_ctl_op_next(ep);
} else {
/* we still didn't process the current pending request because of hi-prio
* control messages, so cannot stop sending yet. If we stop, not all
* resources will be exhausted and out-of-order with pending can occur.
* (pending control ops may be cleared by uct_ud_ep_do_pending_ctl)
*/
return UCS_ARBITER_CB_RESULT_NEXT_GROUP;
}
}
ucs_status_t uct_ud_ep_pending_add(uct_ep_h ep_h, uct_pending_req_t *req,
unsigned flags)
{
uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t);
uct_ud_enter(iface);
/* if there was an async progress all 'send' ops return
* UCS_ERR_NO_RESOURCE. If we return UCS_ERR_BUSY there will
* be a deadlock.
* So we must skip a resource check and add a pending op in order to
* avoid a deadlock.
*/
if (ucs_unlikely(uct_ud_iface_has_pending_async_ev(iface))) {
goto add_req;
}
if (uct_ud_iface_can_tx(iface) &&
uct_ud_iface_has_skbs(iface) &&
uct_ud_ep_is_connected(ep) &&
!uct_ud_ep_no_window(ep)) {
uct_ud_leave(iface);
return UCS_ERR_BUSY;
}
add_req:
UCS_STATIC_ASSERT(sizeof(uct_ud_pending_req_priv_t) <=
UCT_PENDING_REQ_PRIV_LEN);
uct_ud_pending_req_priv(req)->flags = flags;
uct_pending_req_arb_group_push(&ep->tx.pending.group, req);
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
ucs_trace_data("ud ep %p: added pending req %p tx_psn %d acked_psn %d cwnd %d",
ep, req, ep->tx.psn, ep->tx.acked_psn, ep->ca.cwnd);
UCT_TL_EP_STAT_PEND(&ep->super);
uct_ud_leave(iface);
return UCS_OK;
}
static ucs_arbiter_cb_result_t
uct_ud_ep_pending_purge_cb(ucs_arbiter_t *arbiter, ucs_arbiter_elem_t *elem,
void *arg)
{
uct_ud_ep_t *ep = ucs_container_of(ucs_arbiter_elem_group(elem),
uct_ud_ep_t, tx.pending.group);
uct_purge_cb_args_t *cb_args = arg;
uct_pending_purge_callback_t cb = cb_args->cb;
uct_pending_req_t *req;
if (&ep->tx.pending.elem == elem) {
/* return ignored by arbiter */
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
req = ucs_container_of(elem, uct_pending_req_t, priv);
if (cb) {
cb(req, cb_args->arg);
} else {
ucs_debug("ep=%p cancelling user pending request %p", ep, req);
}
/* return ignored by arbiter */
return UCS_ARBITER_CB_RESULT_REMOVE_ELEM;
}
void uct_ud_ep_pending_purge(uct_ep_h ep_h, uct_pending_purge_callback_t cb,
void *arg)
{
uct_ud_ep_t *ep = ucs_derived_of(ep_h, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_ud_iface_t);
uct_purge_cb_args_t args = {cb, arg};
uct_ud_enter(iface);
ucs_arbiter_group_purge(&iface->tx.pending_q, &ep->tx.pending.group,
uct_ud_ep_pending_purge_cb, &args);
if (uct_ud_ep_ctl_op_isany(ep)) {
ucs_arbiter_group_push_elem(&ep->tx.pending.group,
&ep->tx.pending.elem);
ucs_arbiter_group_schedule(&iface->tx.pending_q, &ep->tx.pending.group);
}
uct_ud_leave(iface);
}
void uct_ud_ep_disconnect(uct_ep_h tl_ep)
{
uct_ud_ep_t *ep = ucs_derived_of(tl_ep, uct_ud_ep_t);
uct_ud_iface_t *iface = ucs_derived_of(tl_ep->iface, uct_ud_iface_t);
ucs_debug("ep %p: disconnect", ep);
/* cancel user pending */
uct_ud_ep_pending_purge(tl_ep, NULL, NULL);
/* schedule flush */
uct_ud_ep_flush(tl_ep, 0, NULL);
/* the EP will be destroyed by interface destroy or timeout in
* uct_ud_ep_slow_timer
*/
ep->close_time = ucs_twheel_get_time(&iface->async.slow_timer);
ep->flags |= UCT_UD_EP_FLAG_DISCONNECTED;
ucs_wtimer_add(&iface->async.slow_timer, &ep->slow_timer,
UCT_UD_SLOW_TIMER_MAX_TICK(iface));
}