Blame tests/server_test/client.cc

Packit 6d2c1b
/*
Packit 6d2c1b
 * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
Packit 6d2c1b
 *
Packit 6d2c1b
 * This software is available to you under a choice of one of two
Packit 6d2c1b
 * licenses.  You may choose to be licensed under the terms of the GNU
Packit 6d2c1b
 * General Public License (GPL) Version 2, available from the file
Packit 6d2c1b
 * COPYING in the main directory of this source tree, or the
Packit 6d2c1b
 * BSD license below:
Packit 6d2c1b
 *
Packit 6d2c1b
 *     Redistribution and use in source and binary forms, with or
Packit 6d2c1b
 *     without modification, are permitted provided that the following
Packit 6d2c1b
 *     conditions are met:
Packit 6d2c1b
 *
Packit 6d2c1b
 *      - Redistributions of source code must retain the above
Packit 6d2c1b
 *        copyright notice, this list of conditions and the following
Packit 6d2c1b
 *        disclaimer.
Packit 6d2c1b
 *
Packit 6d2c1b
 *      - Redistributions in binary form must reproduce the above
Packit 6d2c1b
 *        copyright notice, this list of conditions and the following
Packit 6d2c1b
 *        disclaimer in the documentation and/or other materials
Packit 6d2c1b
 *        provided with the distribution.
Packit 6d2c1b
 *
Packit 6d2c1b
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
Packit 6d2c1b
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
Packit 6d2c1b
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
Packit 6d2c1b
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
Packit 6d2c1b
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
Packit 6d2c1b
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
Packit 6d2c1b
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
Packit 6d2c1b
 * SOFTWARE.
Packit 6d2c1b
 */
Packit 6d2c1b
Packit 6d2c1b
Packit 6d2c1b
#include "client.h"
Packit 6d2c1b
#include "options.h"
Packit 6d2c1b
Packit 6d2c1b
#include <sys/socket.h>
Packit 6d2c1b
#include <sys/epoll.h>
Packit 6d2c1b
#include <netdb.h>
Packit 6d2c1b
#include <cstring>
Packit 6d2c1b
#include <stdexcept>
Packit 6d2c1b
#include <boost/foreach.hpp>
Packit 6d2c1b
#include <boost/make_shared.hpp>
Packit 6d2c1b
#include <boost/thread.hpp>
Packit 6d2c1b
#include <boost/format.hpp>
Packit 6d2c1b
#include <boost/ref.hpp>
Packit 6d2c1b
#include <algorithm>
Packit 6d2c1b
#include <iostream>
Packit 6d2c1b
Packit 6d2c1b
Packit 6d2c1b
client::client(const options& opts) {
Packit 6d2c1b
    for (unsigned id = 0; id < opts.num_threads(); ++id) {
Packit 6d2c1b
        m_connections.push_back(boost::make_shared<connection>(id,
Packit 6d2c1b
                                                               opts.server(),
Packit 6d2c1b
                                                               opts.port(),
Packit 6d2c1b
                                                               opts.packet_rate()));
Packit 6d2c1b
    }
Packit 6d2c1b
}
Packit 6d2c1b
Packit 6d2c1b
void client::run() {
Packit 6d2c1b
    boost::thread_group tg;
Packit 6d2c1b
Packit 6d2c1b
    BOOST_FOREACH(const connection_ptr& conn, m_connections) {
Packit 6d2c1b
        tg.create_thread(boost::ref(*conn.get()));
Packit 6d2c1b
    }
Packit 6d2c1b
Packit 6d2c1b
    tg.join_all();
Packit 6d2c1b
}
Packit 6d2c1b
Packit 6d2c1b
client::connection::connection(unsigned id, const std::string& server,
Packit 6d2c1b
                               unsigned port, size_t packet_rate) :
Packit 6d2c1b
        m_id(id),
Packit 6d2c1b
        m_packet_rate(packet_rate),
Packit 6d2c1b
        m_psn(0)
Packit 6d2c1b
{
Packit 6d2c1b
    struct hostent *he = gethostbyname(server.c_str());
Packit 6d2c1b
    if (!he) {
Packit 6d2c1b
        throw std::runtime_error(std::string("failed to resolve ") + server);
Packit 6d2c1b
    }
Packit 6d2c1b
Packit 6d2c1b
    m_dest_addr.sin_family = he->h_addrtype;
Packit 6d2c1b
    m_dest_addr.sin_port = htons(port);
Packit 6d2c1b
    if (he->h_length != sizeof(m_dest_addr.sin_addr)) {
Packit 6d2c1b
        throw std::runtime_error("invalid address length");
Packit 6d2c1b
    }
Packit 6d2c1b
Packit 6d2c1b
    memcpy(&m_dest_addr.sin_addr, he->h_addr_list[0], he->h_length);
Packit 6d2c1b
    memset(m_dest_addr.sin_zero, 0, sizeof(m_dest_addr.sin_zero));
Packit 6d2c1b
    m_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
Packit 6d2c1b
    if (m_sockfd < 0) {
Packit 6d2c1b
        throw std::runtime_error("failed to create socket");
Packit 6d2c1b
    }
Packit 6d2c1b
Packit 6d2c1b
    m_epfd = epoll_create(1);
Packit 6d2c1b
    if (m_epfd < 0) {
Packit 6d2c1b
        throw std::runtime_error("failed to create epfd");
Packit 6d2c1b
    }
Packit 6d2c1b
Packit 6d2c1b
    // Add the socket to the main epoll set
Packit 6d2c1b
    struct epoll_event evt;
Packit 6d2c1b
    evt.events   = EPOLLIN|EPOLLOUT;
Packit 6d2c1b
    evt.data.fd  = m_sockfd;
Packit 6d2c1b
    int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_sockfd, &evt);
Packit 6d2c1b
    if (ret < 0) {
Packit 6d2c1b
        throw std::runtime_error("failed to add socket fd to epoll set");
Packit 6d2c1b
    }
Packit 6d2c1b
}
Packit 6d2c1b
Packit 6d2c1b
client::connection::~connection() {
Packit 6d2c1b
    close(m_epfd);
Packit 6d2c1b
    close(m_sockfd);
Packit 6d2c1b
}
Packit 6d2c1b
Packit 6d2c1b
std::string client::connection::destination() const {
Packit 6d2c1b
    char buf[256] = {0};
Packit 6d2c1b
    inet_ntop(m_dest_addr.sin_family, &m_dest_addr.sin_addr, buf, sizeof(buf) - 1);
Packit 6d2c1b
    return (boost::format("%s:%d") % buf % ntohs(m_dest_addr.sin_port)).str();
Packit 6d2c1b
}
Packit 6d2c1b
Packit 6d2c1b
void client::connection::operator()() {
Packit 6d2c1b
    const size_t maxevents = 2;
Packit 6d2c1b
    struct epoll_event events[maxevents];
Packit 6d2c1b
    unsigned next_worker = 0;
Packit 6d2c1b
Packit 6d2c1b
    m_start_time = vtime::current();
Packit 6d2c1b
    m_recv_count = 0;
Packit 6d2c1b
    m_send_count = 0;
Packit 6d2c1b
    m_rtt_sum = 0;
Packit 6d2c1b
Packit 6d2c1b
    size_t sent_prev = 0;
Packit 6d2c1b
    size_t recv_prev = 0;
Packit 6d2c1b
    vtime::time_t time_prev = m_start_time;
Packit 6d2c1b
    vtime::time_t prev_rtt_sum = 0;
Packit 6d2c1b
    vtime::time_t packet_interval = vtime::time_from_sec(1.0 / m_packet_rate);
Packit 6d2c1b
    vtime::time_t last_send_time = m_start_time;
Packit 6d2c1b
    size_t print_rate = std::min(200000ul, m_packet_rate);
Packit 6d2c1b
Packit 6d2c1b
    std::cout << "connection " << m_id << ": sending to " << destination() << std::endl;
Packit 6d2c1b
    do {
Packit 6d2c1b
        int nevents = epoll_wait(m_epfd, events, maxevents, -1);
Packit 6d2c1b
        if (nevents < 0) {
Packit 6d2c1b
            throw std::runtime_error("epoll_wait failed");
Packit 6d2c1b
        }
Packit 6d2c1b
Packit 6d2c1b
        vtime:time_t current_time = vtime::current();
Packit 6d2c1b
Packit 6d2c1b
        for (int i = 0; i < nevents; ++i) {
Packit 6d2c1b
            if (events[i].data.fd == m_sockfd) {
Packit 6d2c1b
                if (events[i].events & EPOLLIN) {
Packit 6d2c1b
                    int nrecvd = recvfrom(m_sockfd, &m_recvbuf, sizeof(m_recvbuf),
Packit 6d2c1b
                                          0, NULL, NULL);
Packit 6d2c1b
                    if (nrecvd != sizeof(m_sendbuf)) {
Packit 6d2c1b
                        throw std::runtime_error("recvfrom failed");
Packit 6d2c1b
                    }
Packit 6d2c1b
Packit 6d2c1b
                    m_rtt_sum += (current_time - m_recvbuf.send_time);
Packit 6d2c1b
                    ++m_recv_count;
Packit 6d2c1b
                }
Packit 6d2c1b
Packit 6d2c1b
                if (events[i].events & EPOLLOUT) {
Packit 6d2c1b
                    if (current_time >= last_send_time + packet_interval) {
Packit 6d2c1b
                        // TODO maintain packet rate
Packit 6d2c1b
                        m_sendbuf.psn = m_psn++;
Packit 6d2c1b
                        m_sendbuf.send_time = current_time;
Packit 6d2c1b
                        int nsent = sendto(m_sockfd, &m_sendbuf, sizeof(m_sendbuf), 0,
Packit 6d2c1b
                                           reinterpret_cast<struct sockaddr*>(&m_dest_addr),
Packit 6d2c1b
                                           sizeof(m_dest_addr));
Packit 6d2c1b
                        if (nsent != sizeof(m_sendbuf)) {
Packit 6d2c1b
                            throw std::runtime_error("sendto failed");
Packit 6d2c1b
                        }
Packit 6d2c1b
Packit 6d2c1b
                        ++m_send_count;
Packit 6d2c1b
                        /*last_send_time += packet_interval;*/
Packit 6d2c1b
                        last_send_time =
Packit 6d2c1b
                                        (current_time + last_send_time + packet_interval) / 2;
Packit 6d2c1b
                    }
Packit 6d2c1b
                }
Packit 6d2c1b
            }
Packit 6d2c1b
        }
Packit 6d2c1b
Packit 6d2c1b
        if (m_send_count - sent_prev >= print_rate) {
Packit 6d2c1b
            double rtt = (m_recv_count) > 0 ?
Packit 6d2c1b
                            vtime::time_to_sec(
Packit 6d2c1b
                                            (m_rtt_sum - prev_rtt_sum) * 1000000.0 /
Packit 6d2c1b
                                            (m_recv_count - recv_prev)) :
Packit 6d2c1b
                            0;
Packit 6d2c1b
Packit 6d2c1b
            double packet_rate = (m_send_count - sent_prev) /
Packit 6d2c1b
                            vtime::time_to_sec(current_time - time_prev);
Packit 6d2c1b
Packit 6d2c1b
            double recv_ratio = (m_recv_count - recv_prev) /
Packit 6d2c1b
                            static_cast<double>(m_send_count - sent_prev);
Packit 6d2c1b
Packit 6d2c1b
            printf("sent: %Zu rate: %7.2f recvd: %Zu (%5.2f%%) rtt: %5.2f\n",
Packit 6d2c1b
                   m_send_count, packet_rate, m_recv_count, recv_ratio * 100.0,
Packit 6d2c1b
                   rtt);
Packit 6d2c1b
Packit 6d2c1b
            sent_prev = m_send_count;
Packit 6d2c1b
            recv_prev = m_recv_count;
Packit 6d2c1b
            time_prev = current_time;
Packit 6d2c1b
            prev_rtt_sum = m_rtt_sum;
Packit 6d2c1b
        }
Packit 6d2c1b
Packit 6d2c1b
    } while (1);
Packit 6d2c1b
}
Packit 6d2c1b