Blob Blame History Raw

/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
* Copyright (C) UT-Battelle, LLC. 2016. ALL RIGHTS RESERVED.
* Copyright (C) ARM Ltd. 2016.All rights reserved.
* See file LICENSE for terms.
*/

extern "C" {
#include <uct/api/uct.h>
}
#include <common/test.h>
#include "uct_test.h"


class test_uct_peer_failure : public uct_test {
private:
    struct am_handler_setter
    {
        am_handler_setter(test_uct_peer_failure *test) : m_test(test) {}

        void operator() (test_uct_peer_failure::entity *e) {
            uct_iface_set_am_handler(e->iface(), 0,
                                     am_dummy_handler,
                                     reinterpret_cast<void*>(m_test), 0);
        }

        test_uct_peer_failure* m_test;
    };

public:

    test_uct_peer_failure() : m_sender(NULL),  m_nreceivers(2),
                              m_tx_window(100), m_err_count(0), m_am_count(0) {}

    virtual void init();

    inline uct_iface_params_t entity_params() {
        static uct_iface_params_t params;
        params.field_mask = UCT_IFACE_PARAM_FIELD_ERR_HANDLER     |
                            UCT_IFACE_PARAM_FIELD_ERR_HANDLER_ARG |
                            UCT_IFACE_PARAM_FIELD_ERR_HANDLER_FLAGS;
        params.err_handler       = get_err_handler();
        params.err_handler_arg   = reinterpret_cast<void*>(this);
        params.err_handler_flags = 0;
        return params;
    }

    virtual uct_error_handler_t get_err_handler() const {
        return err_cb;
    }

    static ucs_status_t am_dummy_handler(void *arg, void *data, size_t length,
                                         unsigned flags) {
        reinterpret_cast<test_uct_peer_failure*>(arg)->m_am_count++;
        return UCS_OK;
    }

    static ucs_status_t pending_cb(uct_pending_req_t *self)
    {
        m_req_count++;
        return UCS_OK;
    }

    static void purge_cb(uct_pending_req_t *self, void *arg)
    {
        m_req_count++;
    }

    static ucs_status_t err_cb(void *arg, uct_ep_h ep, ucs_status_t status)
    {
        EXPECT_EQ(UCS_ERR_ENDPOINT_TIMEOUT, status);
        reinterpret_cast<test_uct_peer_failure*>(arg)->m_err_count++;
        return UCS_OK;
    }

    void kill_receiver()
    {
        ucs_assert(!m_receivers.empty());
        m_entities.remove(m_receivers.front());
        ucs_assert(m_entities.size() == m_receivers.size());
        m_receivers.erase(m_receivers.begin());
    }

    void new_receiver()
    {
        uct_iface_params_t p = entity_params();
        p.field_mask |= UCT_IFACE_PARAM_FIELD_OPEN_MODE;
        p.open_mode   = UCT_IFACE_OPEN_MODE_DEVICE;
        m_receivers.push_back(uct_test::create_entity(p));
        m_entities.push_back(m_receivers.back());
        m_sender->connect(m_receivers.size() - 1, *m_receivers.back(), 0);

        am_handler_setter(this)(m_receivers.back());
        /* Make sure that TL is up and has resources */
        send_recv_am(m_receivers.size() - 1);
    }

    void set_am_handlers()
    {
        check_caps_skip(UCT_IFACE_FLAG_CB_SYNC);
        std::for_each(m_receivers.begin(), m_receivers.end(),
                      am_handler_setter(this));
    }

    ucs_status_t send_am(int index)
    {
        ucs_status_t status;
        while ((status = uct_ep_am_short(m_sender->ep(index), 0, 0, NULL, 0)) ==
               UCS_ERR_NO_RESOURCE) {
            progress();
        };
        return status;
    }

    void send_recv_am(int index, ucs_status_t exp_status = UCS_OK)
    {
        m_am_count = 0;

        ucs_status_t status = send_am(index);
        EXPECT_EQ(exp_status, status);

        if (exp_status == UCS_OK) {
            wait_for_flag(&m_am_count);
            EXPECT_EQ(m_am_count, 1ul);
        }
    }

    uct_ep_h ep0() {
        return m_sender->ep(0);
    }

    ucs_status_t flush_ep(size_t index, ucs_time_t deadline = ULONG_MAX) {
        uct_completion_t    comp;
        ucs_status_t        status;
        int                 is_time_out;

        comp.count = 2;
        comp.func  = NULL;
        do {
            progress();
            status = uct_ep_flush(m_sender->ep(index), 0, &comp);
            is_time_out = (ucs_get_time() > deadline);
        } while ((status == UCS_ERR_NO_RESOURCE) && !is_time_out);

        if (!is_time_out) {
            ASSERT_UCS_OK_OR_INPROGRESS(status);
        }

        if (status == UCS_OK) {
            return UCS_OK;
        } else if (is_time_out) {
            return UCS_ERR_TIMED_OUT;
        }

        /* coverity[loop_condition] */
        while ((comp.count == 2) && !is_time_out) {
            progress();
            is_time_out = (ucs_get_time() > deadline);
        }

        return (comp.count == 1) ? UCS_OK :
               (is_time_out ? UCS_ERR_TIMED_OUT : UCS_ERR_OUT_OF_RANGE);
    }

protected:
    entity                *m_sender;
    std::vector<entity *> m_receivers;
    size_t                m_nreceivers;
    size_t                m_tx_window;
    size_t                m_err_count;
    size_t                m_am_count;
    static size_t         m_req_count;
    static const uint64_t m_required_caps;
};

size_t test_uct_peer_failure::m_req_count             = 0ul;
const uint64_t test_uct_peer_failure::m_required_caps = UCT_IFACE_FLAG_AM_SHORT  |
                                                        UCT_IFACE_FLAG_PENDING   |
                                                        UCT_IFACE_FLAG_CB_SYNC   |
                                                        UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE;

void test_uct_peer_failure::init()
{
    uct_test::init();

    /* To reduce test execution time decrease retransmition timeouts
     * where it is relevant */
    if (has_rc_or_dc()) {
        set_config("RC_TIMEOUT=100us"); /* 100 us should be enough */
        set_config("RC_RETRY_COUNT=4");
    } else if (has_ud()) {
        set_config("UD_TIMEOUT=3s");
    }

    uct_iface_params_t p = entity_params();
    p.field_mask |= UCT_IFACE_PARAM_FIELD_OPEN_MODE;
    p.open_mode   = UCT_IFACE_OPEN_MODE_DEVICE;
    m_sender = uct_test::create_entity(p);
    m_entities.push_back(m_sender);

    check_skip_test();
    for (size_t i = 0; i < 2; ++i) {
        new_receiver();
    }

    m_err_count = 0;
    m_req_count = 0;
    m_am_count  = 0;
}

UCS_TEST_SKIP_COND_P(test_uct_peer_failure, peer_failure,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT |
                                 m_required_caps))
{
    {
        scoped_log_handler slh(wrap_errors_logger);

        kill_receiver();
        EXPECT_EQ(UCS_OK, uct_ep_put_short(ep0(), NULL, 0, 0, 0));

        flush();
    }

    UCS_TEST_GET_BUFFER_IOV(iov, iovcnt, NULL, 0, NULL, 1);

    /* Check that all ep operations return pre-defined error code */
    EXPECT_EQ(uct_ep_am_short(ep0(), 0, 0, NULL, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_am_bcopy(ep0(), 0, NULL, NULL, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_am_zcopy(ep0(), 0, NULL, 0, iov, iovcnt, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_put_short(ep0(), NULL, 0, 0, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_put_bcopy(ep0(), NULL, NULL, 0, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_put_zcopy(ep0(), iov, iovcnt, 0, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_get_bcopy(ep0(), NULL, NULL, 0, 0, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_get_zcopy(ep0(), iov, iovcnt, 0, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic64_post(ep0(), UCT_ATOMIC_OP_ADD, 0, 0, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic32_post(ep0(), UCT_ATOMIC_OP_ADD, 0, 0, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic64_fetch(ep0(), UCT_ATOMIC_OP_ADD, 0, NULL, 0, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic32_fetch(ep0(), UCT_ATOMIC_OP_ADD, 0, NULL, 0, 0, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic_cswap64(ep0(), 0, 0, 0, 0, NULL, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_atomic_cswap32(ep0(), 0, 0, 0, 0, NULL, NULL),
              UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_flush(ep0(), 0, NULL), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_get_address(ep0(), NULL), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_pending_add(ep0(), NULL, 0), UCS_ERR_ENDPOINT_TIMEOUT);
    EXPECT_EQ(uct_ep_connect_to_ep(ep0(), NULL, NULL), UCS_ERR_ENDPOINT_TIMEOUT);

    EXPECT_GT(m_err_count, 0ul);
}

UCS_TEST_SKIP_COND_P(test_uct_peer_failure, purge_failed_peer,
                     !check_caps(m_required_caps))
{
    set_am_handlers();

    send_recv_am(0);
    send_recv_am(1);

    const size_t num_pend_sends = 3ul;
    uct_pending_req_t reqs[num_pend_sends];
    {
        scoped_log_handler slh(wrap_errors_logger);

        kill_receiver();

        ucs_status_t status;
        do {
            status = uct_ep_am_short(ep0(), 0, 0, NULL, 0);
        } while (status == UCS_OK);

        for (size_t i = 0; i < num_pend_sends; i ++) {
            reqs[i].func = pending_cb;
            EXPECT_EQ(uct_ep_pending_add(ep0(), &reqs[i], 0), UCS_OK);
        }

        flush();
    }

    EXPECT_EQ(uct_ep_am_short(ep0(), 0, 0, NULL, 0), UCS_ERR_ENDPOINT_TIMEOUT);

    uct_ep_pending_purge(ep0(), purge_cb, NULL);
    EXPECT_EQ(num_pend_sends, m_req_count);
    EXPECT_GE(m_err_count, 0ul);
}

UCS_TEST_SKIP_COND_P(test_uct_peer_failure, two_pairs_send,
                     !check_caps(m_required_caps))
{
    set_am_handlers();

    /* queue sends on 1st pair */
    for (size_t i = 0; i < m_tx_window; ++i) {
        send_am(0);
    }

    /* kill the 1st receiver while sending on 2nd pair */
    {
        scoped_log_handler slh(wrap_errors_logger);
        kill_receiver();
        send_am(0);
        send_recv_am(1);
        flush();
    }

    /* test flushing one operations */
    send_recv_am(0, UCS_ERR_ENDPOINT_TIMEOUT);
    send_recv_am(1, UCS_OK);
    flush();

    /* test flushing many operations */
    for (size_t i = 0; i < (m_tx_window * 10 / ucs::test_time_multiplier()); ++i) {
        send_recv_am(0, UCS_ERR_ENDPOINT_TIMEOUT);
        send_recv_am(1, UCS_OK);
    }
    flush();
}


UCS_TEST_SKIP_COND_P(test_uct_peer_failure, two_pairs_send_after,
                     !check_caps(m_required_caps))
{
    set_am_handlers();

    {
        scoped_log_handler slh(wrap_errors_logger);
        kill_receiver();
        for (int i = 0; i < 100; ++i) {
            send_am(0);
        }
        flush();
    }

    send_recv_am(0, UCS_ERR_ENDPOINT_TIMEOUT);

    m_am_count = 0;
    send_am(1);
    ucs_debug("flushing");
    flush_ep(1);
    ucs_debug("flushed");
    wait_for_flag(&m_am_count);
    EXPECT_EQ(m_am_count, 1ul);
}

UCT_INSTANTIATE_TEST_CASE(test_uct_peer_failure)

class test_uct_peer_failure_cb : public test_uct_peer_failure {
public:
    virtual uct_error_handler_t get_err_handler() const {
        return err_cb_ep_destroy;
    }

    static ucs_status_t err_cb_ep_destroy(void *arg, uct_ep_h ep, ucs_status_t status) {
        test_uct_peer_failure_cb *self(reinterpret_cast<test_uct_peer_failure_cb*>(arg));
        EXPECT_EQ(self->ep0(), ep);
        self->m_sender->destroy_ep(0);
        return UCS_OK;
    }
};

UCS_TEST_SKIP_COND_P(test_uct_peer_failure_cb, desproy_ep_cb,
                     !check_caps(UCT_IFACE_FLAG_PUT_SHORT |
                                 m_required_caps))
{
    scoped_log_handler slh(wrap_errors_logger);
    kill_receiver();
    EXPECT_EQ(uct_ep_put_short(ep0(), NULL, 0, 0, 0), UCS_OK);
    flush();
}

UCT_INSTANTIATE_TEST_CASE(test_uct_peer_failure_cb)

class test_uct_peer_failure_multiple : public test_uct_peer_failure
{
public:
    virtual void init();

protected:
    size_t get_tx_queue_len() const;
};

void test_uct_peer_failure_multiple::init()
{
    size_t tx_queue_len = get_tx_queue_len();

    if (ucs_get_page_size() > 4096) {
        /* NOTE: Too much receivers may cause failure of ibv_open_device */
        test_uct_peer_failure::m_nreceivers = 10;
    } else {
        test_uct_peer_failure::m_nreceivers = tx_queue_len;
    }

    test_uct_peer_failure::m_tx_window  = tx_queue_len / 3;

    test_uct_peer_failure::init();

    m_receivers.reserve(m_nreceivers);
    while (m_receivers.size() < m_nreceivers) {
        new_receiver();
    }
}

size_t test_uct_peer_failure_multiple::get_tx_queue_len() const
{
    bool        set = true;
    std::string name, val;
    size_t      tx_queue_len;

    if (has_rc()) {
        name = "RC_RC_IB_TX_QUEUE_LEN";
    } else if (has_transport("dc_mlx5")) {
        name = "DC_RC_IB_TX_QUEUE_LEN";
    } else if (has_ud()) {
        name = "UD_IB_TX_QUEUE_LEN";
    } else {
        set  = false;
        name = "TX_QUEUE_LEN";
    }

    if (get_config(name, val)) {
        tx_queue_len = ucs::from_string<size_t>(val);
        EXPECT_LT(0ul, tx_queue_len);
    } else {
        tx_queue_len = 256;
        UCS_TEST_MESSAGE << name << " setting not found, "
                         << "taken test default value: " << tx_queue_len;
        if (set) {
            UCS_TEST_ABORT(name + " config name must be found for %s transport" +
                           GetParam()->tl_name);
        }
    }

    return tx_queue_len;
}

/* Skip under valgrind due to brk segment overflow.
 * See https://bugs.kde.org/show_bug.cgi?id=352742 */
UCS_TEST_SKIP_COND_P(test_uct_peer_failure_multiple, test,
                     (RUNNING_ON_VALGRIND ||
                      !check_caps(m_required_caps)),
                     "RC_TM_ENABLE?=n")
{
    ucs_time_t timeout  = ucs_get_time() +
                          ucs_time_from_sec(200 * ucs::test_time_multiplier());

    {
        scoped_log_handler slh(wrap_errors_logger);
        for (size_t idx = 0; idx < m_nreceivers - 1; ++idx) {
            for (size_t i = 0; i < m_tx_window; ++i) {
                send_am(idx);
            }
            kill_receiver();
        }
        flush(timeout);

        /* if EPs are not failed yet, these ops should trigger that */
        for (size_t idx = 0; idx < m_nreceivers - 1; ++idx) {
            for (size_t i = 0; i < m_tx_window; ++i) {
                send_am(idx);
            }
        }

        flush(timeout);
    }

    for (size_t idx = 0; idx < m_nreceivers - 1; ++idx) {
        send_recv_am(idx, UCS_ERR_ENDPOINT_TIMEOUT);
    }

    m_am_count = 0;
    send_am(m_nreceivers - 1);
    ucs_debug("flushing");
    flush_ep(m_nreceivers - 1);
    ucs_debug("flushed");
    wait_for_flag(&m_am_count);
    EXPECT_EQ(m_am_count, 1ul);
}

UCT_INSTANTIATE_TEST_CASE(test_uct_peer_failure_multiple)