Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "ud_base.h"

#include <uct/uct_test.h>

extern "C" {
#include <ucs/time/time.h>
#include <ucs/datastruct/queue.h>
#include <uct/ib/ud/base/ud_ep.h>
#include <uct/ib/ud/base/ud_iface.h>
}


class test_ud_pending : public ud_base_test {
public:
    uct_pending_req_t m_r[64];

    void dispatch_req(uct_pending_req_t *r) {
        EXPECT_UCS_OK(tx(m_e1));
    } 

    void post_pending_reqs(void) 
    {
        int i;

        req_count = 0;
        me = this;
        m_e1->connect_to_iface(0, *m_e2);
        set_tx_win(m_e1, UCT_UD_CA_MAX_WINDOW);
        /* ep is not connected yet */
        EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));

        /* queuee some work */
        for(i = 0; i < N; i++) {
            m_r[i].func = pending_cb_dispatch;
            EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &m_r[i], 0));
        }
    }

    void check_pending_reqs(bool wait)
    {
        /* wait for all work to be complete */
        ucs_time_t start_time = ucs_get_time();
        while (wait && (req_count < N) &&
               (ucs_get_time() < start_time + ucs_time_from_sec(10.0)))
        {
            progress();
        }
        EXPECT_EQ(N, req_count);
        uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
    }

    static const int N; 
    static const int W; 
    static int req_count;
    static test_ud_pending *me;

    static ucs_status_t pending_cb_dispatch(uct_pending_req_t *r)
    {
        req_count++;
        me->dispatch_req(r);
        return UCS_OK;
    }

    static ucs_status_t pending_cb(uct_pending_req_t *r)
    {
        req_count++;
        return UCS_OK;
    }

    static void purge_cb(uct_pending_req_t *r, void *arg)
    {
        req_count++;
    }

    static ucs_status_t pending_cb_busy(uct_pending_req_t *r)
    {
        return UCS_ERR_BUSY;
    }

};

const int test_ud_pending::N = 13; 
const int test_ud_pending::W = 6; 
int test_ud_pending::req_count = 0;
test_ud_pending *test_ud_pending::me = 0;

/* add/purge requests */
UCS_TEST_SKIP_COND_P(test_ud_pending, async_progress,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT)) {
    uct_pending_req_t r[N];
    int i;

    req_count = 0;
    connect();

    set_tx_win(m_e1, 2);
    EXPECT_UCS_OK(tx(m_e1));

    for(i = 0; i < N; i++) {
        EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &r[i], 0));
    }
    twait(300);
    /* requests must not be dispatched from async progress */
    EXPECT_EQ(0, req_count);
    uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
    EXPECT_EQ(N, req_count);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, sync_progress,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT)) {
    uct_pending_req_t r[N];
    int i;

    req_count = 0;
    connect();

    set_tx_win(m_e1, 2);
    EXPECT_UCS_OK(tx(m_e1));

    for(i = 0; i < N; i++) {
        r[i].func = pending_cb;
        EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &r[i], 0));
    }
    wait_for_value(&req_count, N, true);
    /* requests must be dispatched from progress */
    EXPECT_EQ(N, req_count);
    uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
    EXPECT_EQ(N, req_count);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, err_busy,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT)) {
    uct_pending_req_t r[N];
    int i;

    req_count = 0;
    connect();

    set_tx_win(m_e1, 2);
    EXPECT_UCS_OK(tx(m_e1));

    for(i = 0; i < N; i++) {
        r[i].func = pending_cb_busy;
        EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &r[i], 0));
    }
    short_progress_loop();
    /* requests will not be dispatched from progress */
    EXPECT_EQ(0, req_count);
    uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
    EXPECT_EQ(N, req_count);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, connect,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT))
{
    disable_async(m_e1);
    disable_async(m_e2);
    post_pending_reqs();
    check_pending_reqs(true);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, flush,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT))
{
    disable_async(m_e1);
    disable_async(m_e2);
    post_pending_reqs();
    flush();
    check_pending_reqs(false);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, window,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT))
{
    int i;
    uct_pending_req_t r;

    req_count = 0;
    me = this;
    connect();
    set_tx_win(m_e1, W+1);
    for (i = 0; i < W; i ++) {
        EXPECT_UCS_OK(tx(m_e1));
    }
    EXPECT_EQ(UCS_ERR_NO_RESOURCE, tx(m_e1));
    r.func = pending_cb_dispatch;
    EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &r, 0));
    wait_for_value(&req_count, 1, true);
    EXPECT_EQ(1, req_count);
    uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
}

UCS_TEST_SKIP_COND_P(test_ud_pending, tx_wqe,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT))
{
    int i;
    uct_pending_req_t r;
    ucs_status_t status;

    req_count = 0;
    me = this;
    disable_async(m_e1);
    disable_async(m_e2);
    connect();
    /* set big window */
    set_tx_win(m_e1, 8192);
    i = 0;
    do {
       status = tx(m_e1);
       i++;
    } while (status == UCS_OK);

    r.func = pending_cb_dispatch;
    EXPECT_EQ(UCS_OK, uct_ep_pending_add(m_e1->ep(0), &r, 0));
    wait_for_value(&req_count, 1, true);
    EXPECT_EQ(1, req_count);
    short_progress_loop();
    uct_ep_pending_purge(m_e1->ep(0), purge_cb, NULL);
}

UCT_INSTANTIATE_UD_TEST_CASE(test_ud_pending)