Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "uct_test.h"

extern "C" {
#include <ucs/arch/atomic.h>
}

class test_many2one_am : public uct_test {
public:
    static const uint8_t  AM_ID = 15;
    static const uint64_t MAGIC_DESC  = 0xdeadbeef12345678ul;
    static const uint64_t MAGIC_ALLOC = 0xbaadf00d12345678ul;

    typedef struct {
        uint64_t magic;
        unsigned length;
    } receive_desc_t;

    test_many2one_am() : m_am_count(0), m_receiver(NULL) {
    }

    void init() {
        std::string val = "16k";
        std::string tx_name, rx_name;

        if (has_ib()) {
            tx_name = "IB_SEG_SIZE";
        } else if (has_transport("tcp")) {
            tx_name = "TX_SEG_SIZE";
            rx_name = "RX_SEG_SIZE";
        } else if (has_transport("mm")  ||
                   has_transport("self")) {
            tx_name = "SEG_SIZE";
        }

        if (!tx_name.empty()) {
            modify_config(tx_name, val);
        }

        if (!rx_name.empty()) {
            modify_config(rx_name, val);
        }

        uct_test::init();

        m_receiver = create_entity(sizeof(receive_desc_t));
        m_entities.push_back(m_receiver);

        check_skip_test();
    }

    static ucs_status_t am_handler(void *arg, void *data, size_t length,
                                   unsigned flags) {
        test_many2one_am *self = reinterpret_cast<test_many2one_am*>(arg);
        return self->am_handler(data, length, flags);
    }

    ucs_status_t am_handler(void *data, size_t length, unsigned flags) {
        if (ucs::rand() % 4 == 0) {
            receive_desc_t *my_desc;
            if (flags & UCT_CB_PARAM_FLAG_DESC) {
                my_desc = (receive_desc_t *)data - 1;
                my_desc->magic  = MAGIC_DESC;
            } else {
                my_desc = (receive_desc_t *)ucs_malloc(sizeof(*my_desc) + length,
                                                       "TODO: remove allocation");
                my_desc->magic  = MAGIC_ALLOC;
            }
            my_desc->length = length;
            if (data != my_desc + 1) {
                memcpy(my_desc + 1, data, length);
            }
            m_backlog.push_back(my_desc);
            ucs_atomic_add32(&m_am_count, 1);
            return (flags & UCT_CB_PARAM_FLAG_DESC) ? UCS_INPROGRESS : UCS_OK;
        }
        mem_buffer::pattern_check(data, length);
        ucs_atomic_add32(&m_am_count, 1);
        return UCS_OK;
    }

    void check_backlog() {
        while (!m_backlog.empty()) {
            receive_desc_t *my_desc = m_backlog.back();
            m_backlog.pop_back();
            mem_buffer::pattern_check(my_desc + 1, my_desc->length);
            if (my_desc->magic == MAGIC_DESC) {
                uct_iface_release_desc(my_desc);
            } else {
                EXPECT_EQ(uint64_t(MAGIC_ALLOC), my_desc->magic);
                ucs_free(my_desc);
            }
        }
    }

    static const size_t NUM_SENDERS = 10;

protected:
    volatile uint32_t             m_am_count;
    std::vector<receive_desc_t*>  m_backlog;
    entity                       *m_receiver;  
};


UCS_TEST_SKIP_COND_P(test_many2one_am, am_bcopy,
                     !check_caps(UCT_IFACE_FLAG_AM_BCOPY |
                                 UCT_IFACE_FLAG_CB_SYNC))
{
    const unsigned num_sends = 1000 / ucs::test_time_multiplier();
    ucs_status_t status;

    ucs::ptr_vector<mapped_buffer> buffers;
    for (unsigned i = 0; i < NUM_SENDERS; ++i) {
        entity *sender = create_entity(0);
        mapped_buffer *buffer = new mapped_buffer(
                            sender->iface_attr().cap.am.max_bcopy, 0, *sender);
        sender->connect(0, *m_receiver, i);
        m_entities.push_back(sender);
        buffers.push_back(buffer);
    }

    m_am_count = 0;

    status = uct_iface_set_am_handler(m_receiver->iface(), AM_ID, am_handler,
                                      (void*)this, 0);
    ASSERT_UCS_OK(status);

    for (unsigned i = 0; i < num_sends; ++i) {
        unsigned sender_num = ucs::rand() % NUM_SENDERS;

        mapped_buffer& buffer = buffers.at(sender_num);
        buffer.pattern_fill(i);

        ssize_t packed_len;
        for (;;) {
            const entity& sender = ent(sender_num + 1);
            packed_len = uct_ep_am_bcopy(sender.ep(0), AM_ID,
                                         mapped_buffer::pack,
                                         (void*)&buffer, 0);
            if (packed_len != UCS_ERR_NO_RESOURCE) {
                break;
            }
            sender.progress();
            m_receiver->progress();
        }
        if (packed_len < 0) {
            ASSERT_UCS_OK((ucs_status_t)packed_len);
        }
    }

    while (m_am_count < num_sends) {
        progress();
    }

    status = uct_iface_set_am_handler(m_receiver->iface(), AM_ID,
                                      NULL, NULL, 0);
    ASSERT_UCS_OK(status);

    check_backlog();

    for (unsigned i = 0; i < NUM_SENDERS; ++i) {
        ent(i + 1).flush();
    }

    buffers.clear();
}

UCT_INSTANTIATE_NO_SELF_TEST_CASE(test_many2one_am)