Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "ud_base.h"

#include <uct/uct_test.h>

extern "C" {
#include <ucs/time/time.h>
#include <ucs/datastruct/queue.h>
#include <ucs/arch/bitops.h>
#include <uct/ib/ud/base/ud_ep.h>
}


class test_ud : public ud_base_test {
public:

    static ucs_status_t clear_ack_req(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        neth->packet_type &= ~UCT_UD_PACKET_FLAG_ACK_REQ;
        return UCS_OK;
    }

    static ucs_status_t drop_ctl(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if (neth->packet_type & UCT_UD_PACKET_FLAG_CTL) {
            return UCS_ERR_BUSY;
        }
        return UCS_OK;
    }

    static int rx_ack_count;
    static int tx_ackreq_psn;

    static ucs_status_t count_rx_acks(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if (UCT_UD_PSN_COMPARE(neth->ack_psn, >, ep->tx.acked_psn)) {
            rx_ack_count++;
        }
        return UCS_OK;
    }

    static ucs_status_t save_tx_ackreqs(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if (neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ) {
            tx_ackreq_psn = neth->psn;
        }
        return UCS_OK;
    }

    static int rx_drop_count;

    static ucs_status_t drop_rx(uct_ud_ep_t *ep, uct_ud_neth_t *neth) {
        rx_drop_count++;
        if (neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ) {
            tx_ack_psn = neth->psn;
            ack_req_tx_cnt++;
            ucs_debug("RX: psn %u ack_req", neth->psn);
        }
        return UCS_ERR_BUSY;
    }

    static int ack_req_tx_cnt;

    static uct_ud_psn_t tx_ack_psn;

    static ucs_status_t ack_req_count_tx(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if (neth->packet_type & UCT_UD_PACKET_FLAG_ACK_REQ) {
            tx_ack_psn = neth->psn;
            ack_req_tx_cnt++;
        }
        return UCS_OK;
    }

    static int tx_count;

    static ucs_status_t count_tx(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        tx_count++;
        return UCS_OK;
    }

    static ucs_status_t invalidate_creq_tx(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if ((neth->packet_type & UCT_UD_PACKET_FLAG_CTL) &&
            (uct_ud_neth_get_dest_id(neth) == UCT_UD_EP_NULL_ID)) {
            uct_ud_neth_set_dest_id(neth, 0xbeef);
        }
        return UCS_OK;
    }

    static ucs_status_t drop_ack(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
    {
        if (!(neth->packet_type & (UCT_UD_PACKET_FLAG_CTL|UCT_UD_PACKET_FLAG_AM))) {
            return UCS_ERR_BUSY;
        }
        return UCS_OK;
    }

    static ucs_status_t drop_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
    {
        if ((neth->packet_type & UCT_UD_PACKET_FLAG_CTL) &&
            ((uct_ud_ctl_hdr_t *)(neth + 1))->type == UCT_UD_PACKET_CREQ)
        {
            return UCS_ERR_BUSY;
        }

        return UCS_OK;
    }

    void connect_to_iface(unsigned index = 0)
    {
        m_e1->connect_to_iface(index, *m_e2);
        m_e2->connect_to_iface(index, *m_e1);
    }

    void validate_connect(uct_ud_ep_t *ep, unsigned value,
                          double timeout_sec=TEST_UD_TIMEOUT_IN_SEC) {
        ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(timeout_sec);
        while ((ep->dest_ep_id != value) && (ucs_get_time() < timeout)) {
            progress();
        }
        EXPECT_EQ(value, ep->dest_ep_id);
        EXPECT_EQ(value, ep->conn_id);
        EXPECT_EQ(value, ep->ep_id);
    }

    unsigned no_creq_cnt(uct_ud_ep_t *ep) {
        return (ep->flags & UCT_UD_EP_FLAG_CREQ_NOTSENT) ? 1 : 0;
    }

    void validate_send(uct_ud_ep_t *ep, unsigned value) {
        EXPECT_GE(ep->tx.acked_psn, value - no_creq_cnt(ep));
    }

    void validate_recv(uct_ud_ep_t *ep, unsigned value,
                       double timeout_sec=TEST_UD_TIMEOUT_IN_SEC) {
        ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(timeout_sec);
        while ((ucs_frag_list_sn(&ep->rx.ooo_pkts) < value - no_creq_cnt(ep)) &&
               (ucs_get_time() < timeout)) {
            progress();
        }
        EXPECT_EQ(value - no_creq_cnt(ep), ucs_frag_list_sn(&ep->rx.ooo_pkts));
    }

    void validate_flush() {
        /* 1 packets transmitted, 1 packets received */
        EXPECT_EQ(2, ep(m_e1)->tx.psn);
        EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e2)->rx.ooo_pkts));

        /* no data transmitted back */
        EXPECT_EQ(1, ep(m_e2)->tx.psn);

        /* one packet was acked */
        EXPECT_EQ(0U, ucs_queue_length(&ep(m_e1)->tx.window));
        EXPECT_EQ(1, ep(m_e1)->tx.acked_psn);
        EXPECT_EQ(1, ep(m_e2)->rx.acked_psn);
    }

    void check_connection() {
        /* make sure that connection is good */
        EXPECT_UCS_OK(tx(m_e1));
        EXPECT_UCS_OK(tx(m_e1));
        flush();
        EXPECT_EQ(4, ep(m_e1, 0)->tx.psn);
        EXPECT_EQ(3, ep(m_e1)->tx.acked_psn);
    }
};

int test_ud::ack_req_tx_cnt = 0;
int test_ud::rx_ack_count   = 0;
int test_ud::tx_ackreq_psn = 0;
int test_ud::rx_drop_count  = 0;
int test_ud::tx_count  = 0;

uct_ud_psn_t test_ud::tx_ack_psn = 0;

UCS_TEST_SKIP_COND_P(test_ud, basic_tx,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    unsigned i, N = 13;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    set_tx_win(m_e1, 1024);
    for (i = 0; i < N; i++) {
        EXPECT_UCS_OK(tx(m_e1));
    }
    short_progress_loop();

    /* N packets transmitted, N packets received */
    EXPECT_EQ(N+1, ep(m_e1)->tx.psn);
    validate_recv(ep(m_e2), N);

    /* no data transmitted back */
    EXPECT_EQ(1, ep(m_e2)->tx.psn);

    /* nothing was acked */
    EXPECT_EQ(N, ucs_queue_length(&ep(m_e1)->tx.window));
    EXPECT_EQ(0, ep(m_e1)->tx.acked_psn);
    EXPECT_EQ(0, ep(m_e2)->rx.acked_psn);
}

UCS_TEST_SKIP_COND_P(test_ud, duplex_tx,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    unsigned i, N = 5;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    set_tx_win(m_e1, 1024);
    set_tx_win(m_e2, 1024);
    for (i = 0; i < N; i++) {
        EXPECT_UCS_OK(tx(m_e1));
        short_progress_loop();
        EXPECT_UCS_OK(tx(m_e2));
        short_progress_loop();
    }

    /* N packets transmitted, N packets received */
    EXPECT_EQ(N+1, ep(m_e1)->tx.psn);
    validate_recv(ep(m_e2), N);

    EXPECT_EQ(N+1, ep(m_e2)->tx.psn);
    validate_recv(ep(m_e1), N);

    /* everything but last packet from e2 is acked */
    EXPECT_EQ(N, ep(m_e1)->tx.acked_psn);
    EXPECT_EQ(N-1, ep(m_e2)->tx.acked_psn);
    EXPECT_EQ(N-1, ep(m_e1)->rx.acked_psn);
    EXPECT_EQ(N, ep(m_e2)->rx.acked_psn);
    EXPECT_EQ(1U, ucs_queue_length(&ep(m_e2)->tx.window));
    EXPECT_TRUE(ucs_queue_is_empty(&ep(m_e1)->tx.window));
}

/* send full window, rcv ack after progreess, send some more */
UCS_TEST_SKIP_COND_P(test_ud, tx_window1,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    unsigned i, N = 13;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    set_tx_win(m_e1, N+1);
    for (i = 0; i < N; i++) {
        EXPECT_UCS_OK(tx(m_e1));
    }
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));

    /* wait for ack */
    ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(TEST_UD_TIMEOUT_IN_SEC);
    while ((ucs_get_time() < timeout) &&
            uct_ud_ep_no_window(ep(m_e1))) {
        short_progress_loop();
    }
    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_UCS_OK(tx(m_e1));
}

/* basic flush */
/* send packet, flush, wait till flush ended */

UCS_TEST_SKIP_COND_P(test_ud, flush_ep,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    connect();
    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_UCS_OK(ep_flush_b(m_e1));

    validate_flush();
}

UCS_TEST_SKIP_COND_P(test_ud, flush_iface,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    connect();
    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_UCS_OK(iface_flush_b(m_e1));

    validate_flush();
}

#if UCT_UD_EP_DEBUG_HOOKS

/* disable ack req,
 * send full window,
 * should not be able to send some more
 */
UCS_TEST_SKIP_COND_P(test_ud, tx_window2,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    unsigned i, N = 13;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    set_tx_win(m_e1, N+1);
    ep(m_e1)->tx.tx_hook = clear_ack_req;

    for (i = 0; i < N; i++) {
        EXPECT_UCS_OK(tx(m_e1));
    }
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));
    short_progress_loop();
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));
    EXPECT_EQ(N, ucs_queue_length(&ep(m_e1)->tx.window));
}


/* last packet in window must have ack_req
 * answered with ack control message
 */
UCS_TEST_SKIP_COND_P(test_ud, ack_req_single,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    connect();
    disable_async(m_e1);
    disable_async(m_e2);
    set_tx_win(m_e1, 2);
    ack_req_tx_cnt = 0;
    tx_ack_psn = 0;
    rx_ack_count = 0;
    ep(m_e1)->tx.tx_hook = ack_req_count_tx;
    ep(m_e1)->rx.rx_hook = count_rx_acks;
    ep(m_e2)->rx.rx_hook = ack_req_count_tx;

    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_EQ(1, ack_req_tx_cnt);
    EXPECT_EQ(1, tx_ack_psn);

    wait_for_flag(&rx_ack_count);
    EXPECT_EQ(2, ack_req_tx_cnt);
    EXPECT_EQ(1, tx_ack_psn);
    EXPECT_TRUE(ucs_queue_is_empty(&ep(m_e1)->tx.window));
}

/* test that ack request is sent on 1/4 of window */
UCS_TEST_SKIP_COND_P(test_ud, ack_req_window,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    unsigned i, N = 16;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    set_tx_win(m_e1, N);
    ack_req_tx_cnt = 0;
    tx_ack_psn = 0;
    rx_ack_count = 0;
    ep(m_e1)->tx.tx_hook = ack_req_count_tx;
    ep(m_e1)->rx.rx_hook = count_rx_acks;
    ep(m_e2)->rx.rx_hook = ack_req_count_tx;

    for (i = 0; i < N/4; i++) {
        EXPECT_UCS_OK(tx(m_e1));
    }
    EXPECT_EQ(1, ack_req_tx_cnt);
    EXPECT_EQ(N/4, tx_ack_psn);

    wait_for_flag(&rx_ack_count);
    EXPECT_EQ(2, ack_req_tx_cnt);
    EXPECT_EQ(N/4, tx_ack_psn);
    EXPECT_TRUE(ucs_queue_is_empty(&ep(m_e1)->tx.window));
}

/* simulate retransmission of the CREQ packet */
UCS_TEST_SKIP_COND_P(test_ud, crep_drop1,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    m_e1->connect_to_iface(0, *m_e2);
    /* setup filter to drop crep */
    ep(m_e1, 0)->rx.rx_hook = drop_ctl;
    short_progress_loop(50);
    /* remove filter. Go to sleep. CREQ will be retransmitted */
    ep(m_e1, 0)->rx.rx_hook = uct_ud_ep_null_hook;
    twait(500);

    /* CREQ resend and connection shall be fully functional */
    validate_connect(ep(m_e1), 0U);

    EXPECT_EQ(2, ep(m_e1, 0)->tx.psn);
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts));

    check_connection();
}

/* check that creq is not left on tx window if
 * both sides connect simultaniously.
 */
UCS_TEST_SKIP_COND_P(test_ud, crep_drop2,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    connect_to_iface();

    ep(m_e1)->rx.rx_hook = drop_ctl;
    ep(m_e2)->rx.rx_hook = drop_ctl;

    short_progress_loop(100);

    /* Remove filter for CREP to be handled and TX win to be freed. */
    ep(m_e1)->rx.rx_hook = uct_ud_ep_null_hook;
    ep(m_e2)->rx.rx_hook = uct_ud_ep_null_hook;

    validate_connect(ep(m_e1), 0U);
    validate_connect(ep(m_e2), 0U);

    /* Expect that creq (and maybe crep already) are sent */
    validate_send(ep(m_e1), 1);
    validate_send(ep(m_e2), 1);
    EXPECT_GE(ep(m_e1)->tx.psn, 2);
    EXPECT_GE(ep(m_e2)->tx.psn, 2);

    /* Wait for TX win to be empty (which means that all
     * CONN packets are handled) */
    ucs_time_t timeout = ucs_get_time() + ucs_time_from_sec(TEST_UD_TIMEOUT_IN_SEC);
    while (ucs_get_time() < timeout) {
        if(ucs_queue_is_empty(&ep(m_e1)->tx.window) &&
           ucs_queue_is_empty(&ep(m_e2)->tx.window)) {
            break;
        }
        short_progress_loop();
    }
    EXPECT_TRUE(ucs_queue_is_empty(&ep(m_e1)->tx.window));
    EXPECT_TRUE(ucs_queue_is_empty(&ep(m_e2)->tx.window));
}

UCS_TEST_P(test_ud, crep_ack_drop) {
    ucs_status_t status;

    connect_to_iface();

    /* drop ACK from CERQ/CREP */
    ep(m_e1, 0)->rx.rx_hook = drop_ack;
    ep(m_e2, 0)->rx.rx_hook = drop_ack;

    short_progress_loop();

    status = uct_iface_set_am_handler(m_e2->iface(), 0,
                                      (uct_am_callback_t)ucs_empty_function_return_success,
                                      NULL, UCT_CB_FLAG_ASYNC);
    ASSERT_UCS_OK(status);

    /* allow sending the active message, in case the congestion window is
     * already reduced to minimum (=2) by the slow timer, since CREP ACK
     * was not received.
     */
    set_tx_win(m_e1, 10);

    do {
        status = send_am_message(m_e1);
        progress();
    } while (status == UCS_ERR_NO_RESOURCE);
    ASSERT_UCS_OK(status);

    validate_recv(ep(m_e2), 3u - no_creq_cnt(ep(m_e1)));

    ep(m_e1, 0)->rx.rx_hook = uct_ud_ep_null_hook;
    ep(m_e2, 0)->rx.rx_hook = uct_ud_ep_null_hook;

    /* Should receive both CREP and the active message */

    short_progress_loop();
    twait(500);
    short_progress_loop();

    status = send_am_message(m_e1);
    ASSERT_UCS_OK(status);

    short_progress_loop();

    m_e1->flush();
    m_e2->flush();
}

UCS_TEST_P(test_ud, creq_flush) {
    ucs_status_t status;

    m_e1->connect_to_iface(0, *m_e2);
    /* Setup filter to drop all packets. We have to drop CREP
     * and ACK_REQ packets. */
    ep(m_e1, 0)->rx.rx_hook = drop_rx;
    short_progress_loop();
    /* do flush while ep is being connected it must return in progress */
    status = uct_iface_flush(m_e1->iface(), 0, NULL);
    EXPECT_EQ(UCS_INPROGRESS, status);
}

UCS_TEST_SKIP_COND_P(test_ud, ca_ai,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    ucs_status_t status;
    int prev_cwnd;
    int max_window;

    /* check initial window */
    disable_async(m_e1);
    disable_async(m_e2);
    /* only test up to 'small' window when on valgrind
     * valgrind drops rx packets when window is too big and resends are disabled in this test
     */
    max_window = RUNNING_ON_VALGRIND ? 128 : UCT_UD_CA_MAX_WINDOW;
    connect();
    EXPECT_EQ(UCT_UD_CA_MIN_WINDOW, ep(m_e1)->ca.cwnd);
    EXPECT_EQ(UCT_UD_CA_MIN_WINDOW, ep(m_e2)->ca.cwnd);

    ep(m_e1, 0)->rx.rx_hook = count_rx_acks;
    ep(m_e1, 0)->tx.tx_hook = save_tx_ackreqs;
    prev_cwnd = ep(m_e1)->ca.cwnd;
    rx_ack_count = 0;

    /* window increase upto max window should
     * happen when we receive acks */
    while (ep(m_e1)->ca.cwnd < max_window) {
       status = tx(m_e1);
       if (status != UCS_OK) {

           /* progress until getting all acks for our requests */
           do {
               progress();
           } while (UCT_UD_PSN_COMPARE(ep(m_e1)->tx.acked_psn, <, tx_ackreq_psn));

           /* it is possible to get no acks if tx queue is full.
            * But no more than 2 acks per window.
            * One at 1/4 and one at the end
            *
            * every new ack should cause window increase
            */
           EXPECT_LE(rx_ack_count, 2);
           EXPECT_EQ(rx_ack_count,
                     UCT_UD_CA_AI_VALUE * (ep(m_e1)->ca.cwnd - prev_cwnd));
           prev_cwnd = ep(m_e1)->ca.cwnd;
           rx_ack_count = 0;
       }
    }
}

/* skip valgrind for now */
UCS_TEST_SKIP_COND_P(test_ud, ca_md,
                     (RUNNING_ON_VALGRIND ||
                      !check_caps(UCT_IFACE_FLAG_AM_SHORT)),
                     "IB_TX_QUEUE_LEN=" UCS_PP_MAKE_STRING(UCT_UD_CA_MAX_WINDOW)) {

    ucs_status_t status;
    int prev_cwnd, new_cwnd;
    int i;

    connect();

    validate_connect(ep(m_e1), 0U);

    /* assume we are at the max window
     * on receive drop all packets. After several retransmission
     * attempts the window will be reduced to the minimum
     */
    set_tx_win(m_e1, UCT_UD_CA_MAX_WINDOW);
    ep(m_e2, 0)->rx.rx_hook = drop_rx;
    for (i = 1; i < UCT_UD_CA_MAX_WINDOW; i++) {
        status = tx(m_e1);
        if (status == UCS_ERR_NO_RESOURCE) {
            // the congestion window can shrink by async timer if ACKs are
            // not received fast enough
            EXPECT_GT(i, 1); /* at least one packet should be sent */
            break;
        }
        EXPECT_UCS_OK(status);
        progress();
    }
    short_progress_loop();

    ep(m_e1)->tx.tx_hook = count_tx;
    do {
        prev_cwnd = ep(m_e1, 0)->ca.cwnd;
        tx_count = 0;
        do {
            progress();
        } while (ep(m_e1, 0)->ca.cwnd > (prev_cwnd / UCT_UD_CA_MD_FACTOR));
        short_progress_loop();

        new_cwnd = ep(m_e1, 0)->ca.cwnd;
        EXPECT_GE(tx_count, new_cwnd - 1);
        if (new_cwnd > UCT_UD_CA_MIN_WINDOW) {
           /* up to 3 additional ack_reqs per each resend */
           EXPECT_LE(tx_count, (prev_cwnd - new_cwnd) +
                               (int)(3 * ucs_ilog2(prev_cwnd/new_cwnd)));
	}

    } while (ep(m_e1, 0)->ca.cwnd > UCT_UD_CA_MIN_WINDOW);
}

UCS_TEST_SKIP_COND_P(test_ud, ca_resend,
                     (RUNNING_ON_VALGRIND ||
                      !check_caps(UCT_IFACE_FLAG_AM_SHORT))) {

    int max_window = 10;
    int i;
    ucs_status_t status;

    connect();
    set_tx_win(m_e1, max_window);

    ep(m_e2, 0)->rx.rx_hook = drop_rx;
    for (i = 1; i < max_window; i++) {
        status = tx(m_e1);
        EXPECT_UCS_OK(status);
    }
    short_progress_loop();
    rx_drop_count = 0;
    ack_req_tx_cnt = 0;
    do {
        progress();
    } while(ep(m_e1)->ca.cwnd > max_window/2);
    /* expect that:
     * 4 packets will be retransmitted
     * first packet will have ack_req,
     * there will 2 ack_reqs
     * in addition there may be up to two
     * standalone ack_reqs
     */
    disable_async(m_e1);
    disable_async(m_e2);
    short_progress_loop(100);
    EXPECT_LE(0, rx_drop_count);
    EXPECT_GE(4+2, rx_drop_count);
    EXPECT_LE(0, ack_req_tx_cnt);
    EXPECT_GE(2+2, ack_req_tx_cnt);
}

UCS_TEST_P(test_ud, connect_iface_single_drop_creq) {
    /* single connect */
    iface(m_e2)->rx.hook = drop_creq;

    connect_to_iface();
    short_progress_loop(50);

    iface(m_e2)->rx.hook = uct_ud_iface_null_hook;

    validate_connect(ep(m_e2), 0U);
}
#endif

UCS_TEST_SKIP_COND_P(test_ud, connect_iface_single,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    /* single connect */
    m_e1->connect_to_iface(0, *m_e2);
    short_progress_loop(TEST_UD_PROGRESS_TIMEOUT);
    validate_connect(ep(m_e1), 0U);

    EXPECT_EQ(2, ep(m_e1, 0)->tx.psn);
    EXPECT_EQ(1, ep(m_e1, 0)->tx.acked_psn);
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts));

    check_connection();
}

UCS_TEST_P(test_ud, connect_iface_2to1) {
    /* 2 to 1 connect */
    m_e1->connect_to_iface(0, *m_e2);
    m_e1->connect_to_iface(1, *m_e2);

    validate_connect(ep(m_e1), 0U);
    EXPECT_EQ(2, ep(m_e1,0)->tx.psn);
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 0)->rx.ooo_pkts));

    validate_connect(ep(m_e1, 1), 1U);
    EXPECT_EQ(2, ep(m_e1,1)->tx.psn);
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1, 1)->rx.ooo_pkts));
}

UCS_TEST_SKIP_COND_P(test_ud, connect_iface_seq,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    /* sequential connect from both sides */
    m_e1->connect_to_iface(0, *m_e2);
    validate_connect(ep(m_e1), 0U);
    EXPECT_EQ(2, ep(m_e1)->tx.psn);
    /* one becase of crep */
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e1)->rx.ooo_pkts));

    /* now side two connects. existing ep will be reused */
    m_e2->connect_to_iface(0, *m_e1);
    validate_connect(ep(m_e2), 0U);
    EXPECT_EQ(2, ep(m_e2)->tx.psn);
    /* one becase creq sets initial psn */
    EXPECT_EQ(1, ucs_frag_list_sn(&ep(m_e2)->rx.ooo_pkts));

    check_connection();
}

UCS_TEST_P(test_ud, connect_iface_sim) {
    /* simultanious connect from both sides */
    connect_to_iface();

    validate_connect(ep(m_e1), 0U);
    validate_connect(ep(m_e2), 0U);

    /* psns are not checked because it really depends on scheduling */
}

UCS_TEST_P(test_ud, connect_iface_sim2v2) {
    /* simultanious connect from both sides */
    connect_to_iface(0);
    connect_to_iface(1);

    validate_connect(ep(m_e1),    0U);
    validate_connect(ep(m_e2),    0U);
    validate_connect(ep(m_e1, 1), 1U);
    validate_connect(ep(m_e2, 1), 1U);
    /* psns are not checked because it really depends on scheduling */
}

/*
 * check that:
 * - connect is not blocking when we run out of iface resources
 * - flush() will also progress pending CREQs
 */
UCS_TEST_P(test_ud, connect_iface_2k) {

    unsigned i;
    unsigned cids[2000];
    unsigned count = 2000 / ucs::test_time_multiplier();

    /* create 2k connections */
    for (i = 0; i < count; i++) {
        m_e1->connect_to_iface(i, *m_e2);
        cids[i] = UCT_UD_EP_NULL_ID;
    }

    flush();

    for (i = 0; i < count; i++) {
        ASSERT_EQ(cids[i], (unsigned)UCT_UD_EP_NULL_ID);
        cids[i] = ep(m_e1,i)->dest_ep_id;
        ASSERT_NE((unsigned)UCT_UD_EP_NULL_ID, ep(m_e1,i)->dest_ep_id);
        EXPECT_EQ(i, ep(m_e1,i)->conn_id);
        EXPECT_EQ(i, ep(m_e1,i)->ep_id);
    }
}

UCS_TEST_P(test_ud, ep_destroy_simple) {
    uct_ep_h ep;
    ucs_status_t status;
    uct_ud_ep_t *ud_ep1, *ud_ep2;
    uct_ep_params_t ep_params;

    ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
    ep_params.iface      = m_e1->iface();

    status = uct_ep_create(&ep_params, &ep);
    EXPECT_UCS_OK(status);
    ud_ep1 = ucs_derived_of(ep, uct_ud_ep_t);
    uct_ep_destroy(ep);

    ep_params.iface = m_e1->iface();
    status = uct_ep_create(&ep_params, &ep);
    EXPECT_UCS_OK(status);
    /* coverity[use_after_free] */
    ud_ep2 = ucs_derived_of(ep, uct_ud_ep_t);
    uct_ep_destroy(ep);

    EXPECT_EQ(0U, ud_ep1->ep_id);
    EXPECT_EQ(1U, ud_ep2->ep_id);
}

UCS_TEST_SKIP_COND_P(test_ud, ep_destroy_flush,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    uct_ep_h ep;
    ucs_status_t status;
    uct_ud_ep_t *ud_ep1;
    uct_ep_params_t ep_params;

    connect();
    EXPECT_UCS_OK(tx(m_e1));
    short_progress_loop();
    uct_ep_destroy(m_e1->ep(0));
    /* ep destroy should try to flush outstanding packets */
    short_progress_loop();
    validate_flush();

    /* next created ep must not reuse old id */
    ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
    ep_params.iface      = m_e1->iface();
    status = uct_ep_create(&ep_params, &ep);
    EXPECT_UCS_OK(status);
    ud_ep1 = ucs_derived_of(ep, uct_ud_ep_t);
    EXPECT_EQ(1U, ud_ep1->ep_id);
    uct_ep_destroy(ep);
}

UCS_TEST_SKIP_COND_P(test_ud, ep_destroy_passive,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    connect();
    uct_ep_destroy(m_e2->ep(0));
    /* destroyed ep must still accept data */
    EXPECT_UCS_OK(tx(m_e1));
    EXPECT_UCS_OK(ep_flush_b(m_e1));

    validate_flush();
}

UCS_TEST_P(test_ud, ep_destroy_creq) {
    uct_ep_h ep;
    ucs_status_t status;
    uct_ud_ep_t *ud_ep;
    uct_ep_params ep_params;

    /* single connect */
    m_e1->connect_to_iface(0, *m_e2);
    short_progress_loop(TEST_UD_PROGRESS_TIMEOUT);

    uct_ep_destroy(m_e1->ep(0));

    /* check that ep id are not reused on both sides */
    ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE;
    ep_params.iface      = m_e1->iface();
    status = uct_ep_create(&ep_params, &ep);
    EXPECT_UCS_OK(status);
    ud_ep = ucs_derived_of(ep, uct_ud_ep_t);
    uct_ep_destroy(ep);
    EXPECT_EQ(1U, ud_ep->ep_id);

    ep_params.iface = m_e2->iface();
    status = uct_ep_create(&ep_params, &ep);
    EXPECT_UCS_OK(status);
    /* coverity[use_after_free] */
    ud_ep = ucs_derived_of(ep, uct_ud_ep_t);
    uct_ep_destroy(ep);
    EXPECT_EQ(1U, ud_ep->ep_id);
}

/* check that the amount of reserved skbs is not less than
 * iface tx queue len
 */
UCS_TEST_P(test_ud, res_skb_basic) {
    uct_ud_send_skb_t *skb;
    uct_ud_iface_t *ud_if;
    int i, tx_qlen;

    connect();

    ud_if = iface(m_e1);
    tx_qlen = ud_if->tx.available;

    uct_ud_send_skb_t *used_skbs[tx_qlen];

    for (i = 0; i < tx_qlen; i++) {
        skb = uct_ud_iface_resend_skb_get(ud_if);
        ASSERT_TRUE(skb);
        used_skbs[i] = skb;
    }

    for (i = 0; i < tx_qlen; i++) {
        uct_ud_iface_resend_skb_put(ud_if, used_skbs[i]);
    }
}

/* test that reserved skb is not being reused while it is still in flight
 */
UCS_TEST_SKIP_COND_P(test_ud, res_skb_tx,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    uct_ud_iface_t *ud_if;
    int poll_sn;
    uct_ud_send_skb_t *skb;
    int n, tx_count;

    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    EXPECT_UCS_OK(tx(m_e1));
    short_progress_loop();

    ud_if = iface(m_e1);
    n = tx_count = 0;
    poll_sn = 1;
    while(n < 100) {
        while(uct_ud_iface_can_tx(ud_if)) {
            uct_ud_put_hdr_t *put_hdr;
            uct_ud_neth_t *neth;

            skb = uct_ud_iface_resend_skb_get(ud_if);
            ASSERT_TRUE(skb);
            VALGRIND_MAKE_MEM_DEFINED(skb, sizeof *skb);
            ASSERT_LT(skb->flags, poll_sn);
            skb->flags = poll_sn;

            /* simulate put */
            neth = skb->neth;
            uct_ud_neth_init_data(ep(m_e1), neth);
            uct_ud_neth_set_type_put(ep(m_e1), neth);
            uct_ud_neth_ack_req(ep(m_e1), neth);

            put_hdr      = (uct_ud_put_hdr_t *)(neth+1);
            put_hdr->rva = (uint64_t)&m_dummy;
            memcpy(put_hdr+1, &m_dummy, sizeof(m_dummy));
            skb->len = sizeof(*neth) + sizeof(*put_hdr) + sizeof(m_dummy);

            ucs_derived_of(ud_if->super.ops, uct_ud_iface_ops_t)->tx_skb(ep(m_e1),
                                                                         skb, 0);
            uct_ud_iface_resend_skb_put(ud_if, skb);
            tx_count++;
        }
        short_progress_loop(1);
        poll_sn++;
        n++;
    }
}

#if UCT_UD_EP_DEBUG_HOOKS
/* Simulate loss of ctl packets during simultaneous CREQs.
 * Use-case: CREQ and CREP packets from m_e2 to m_e1 are lost.
 * Check: that both eps (m_e1 and m_e2) are connected finally */
UCS_TEST_SKIP_COND_P(test_ud, ctls_loss,
                     !check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
    iface(m_e2)->tx.available = 0;

    connect_to_iface();

    /* Simulate loss of CREQ to m_e1 */
    ep(m_e2)->tx.tx_hook = invalidate_creq_tx;
    iface(m_e2)->tx.available = 128;
    iface(m_e1)->tx.available = 128;

    /* Simulate loss of CREP to m_e1 */
    ep(m_e1)->rx.rx_hook = drop_ctl;
    short_progress_loop(300);

    /* m_e2 ep should be in connected state now, as it received CREQ which is
     * counter to its own CREQ. So, send a packet to m_e1 (which is not in
     * connected state yet) */
    ep(m_e2)->tx.tx_hook = uct_ud_ep_null_hook;
    set_tx_win(m_e2, 128);
    EXPECT_UCS_OK(tx(m_e2));
    short_progress_loop();
    ep(m_e1)->rx.rx_hook = uct_ud_ep_null_hook;
    twait(500);

    validate_connect(ep(m_e1), 0U);
    validate_connect(ep(m_e2), 0U);
}
#endif

UCT_INSTANTIATE_UD_TEST_CASE(test_ud)