Blob Blame History Raw
/*
 * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */



#include <vlogger/vlogger.h>
#include "utils/bullseye.h"
#include <vma/event/event_handler_manager.h>

#include "sock-redirect.h"

#include "pipeinfo.h"

#define MODULE_NAME 	"pi"
#undef  VLOG_PRINTF
#define VLOG_PRINTF(log_level, log_fmt, log_args...) 		vlog_printf(log_level, "fd[%#x]:%s() " log_fmt "\n", m_fd, __FUNCTION__, ##log_args)
#define VLOG_PRINTF_DETAILS(log_level, log_fmt, log_args...) 	vlog_printf(log_level, MODULE_NAME ":%d:fd[%#x]:%s() " log_fmt "\n", __LINE__, m_fd, __FUNCTION__, ##log_args)

#define pi_logpanic(log_fmt, log_args...) 							VLOG_PRINTF(VLOG_PANIC, log_fmt, ##log_args);	throw;
#define pi_logerr(log_fmt, log_args...) 							VLOG_PRINTF(VLOG_ERROR, log_fmt, ##log_args)
#define pi_logwarn(log_fmt, log_args...) 							VLOG_PRINTF(VLOG_WARNING, log_fmt, ##log_args)
#define pi_loginfo(log_fmt, log_args...) 							VLOG_PRINTF(VLOG_INFO, log_fmt, ##log_args)

#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_DEBUG)
#define pi_logdbg_no_funcname(log_fmt, log_args...)    ((void)0)
#define pi_logdbg(log_fmt, log_args...)                ((void)0)
#define si_logdbg_no_funcname(log_fmt, log_args...)    ((void)0)
#else
#define pi_logdbg_no_funcname(log_fmt, log_args...)     if (g_vlogger_level >= VLOG_DEBUG)      vlog_printf(VLOG_DEBUG, MODULE_NAME ":%d:fd[%d]: " log_fmt "\n", __LINE__, m_fd, ##log_args)
#define pi_logdbg(log_fmt, log_args...)                 if (g_vlogger_level >= VLOG_DEBUG)      VLOG_PRINTF_DETAILS(VLOG_DEBUG, log_fmt, ##log_args)
#define si_logdbg_no_funcname(log_fmt, log_args...)	do { if (g_vlogger_level >= VLOG_DEBUG) 	vlog_printf(VLOG_DEBUG, MODULE_NAME "[fd=%d]:%d: " log_fmt "\n", m_fd, __LINE__, ##log_args); } while (0)
#endif

#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_FINE)
#define pi_logfunc(log_fmt, log_args...)               ((void)0)
#else
#define pi_logfunc(log_fmt, log_args...)                if (g_vlogger_level >= VLOG_FUNC)       VLOG_PRINTF_DETAILS(VLOG_FUNC, log_fmt, ##log_args)
#endif

#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_FINER)
#define pi_logfuncall(log_fmt, log_args...)            ((void)0)
#else
#define pi_logfuncall(log_fmt, log_args...) 		if (g_vlogger_level >= VLOG_FUNC_ALL) 	VLOG_PRINTF_DETAILS(VLOG_FUNC_ALL, log_fmt, ##log_args)
#endif /* VMA_MAX_DEFINED_LOG_LEVEL */

pipeinfo::pipeinfo(int fd) : socket_fd_api(fd),
    m_lock("pipeinfo::m_lock"),
    m_lock_rx("pipeinfo::m_lock_rx"),
    m_lock_tx("pipeinfo::m_lock_tx")
{
	pi_logfunc("");

	m_b_closed = true;
	m_timer_handle = NULL;

	m_b_blocking = true;

	m_p_socket_stats = NULL; // mce_stats_instance_create_socket_block();
	if (m_p_socket_stats == NULL) {
		// pi_logdbg("Got NULL from mce_stats_instance_create_socket_block, using local member");
		m_p_socket_stats = &m_socket_stats;
	}
	m_p_socket_stats->reset();
	m_p_socket_stats->fd = m_fd;
	m_p_socket_stats->b_blocking = m_b_blocking;
	m_p_socket_stats->n_rx_ready_pkt_count = 0;
	m_p_socket_stats->counters.n_rx_ready_pkt_max = 0;
	m_p_socket_stats->n_rx_ready_byte_count = 0;
	m_p_socket_stats->n_tx_ready_byte_count = 0;
	m_p_socket_stats->counters.n_rx_ready_byte_max = 0;
	m_p_socket_stats->n_rx_zcopy_pkt_count = 0;

	m_b_closed = false;

	m_b_lbm_event_q_pipe_timer_on = false;
	m_write_count = m_write_count_on_last_timer = 0;
	m_write_count_no_change_count = 0;


	pi_logfunc("done");
}

pipeinfo::~pipeinfo()
{
	m_b_closed = true;
	pi_logfunc("");


	// Change to non-blocking socket so calling threads can exit
	m_b_blocking = false;

	m_lock_tx.lock();
	m_lock_rx.lock();
	m_lock.lock();

	if (m_timer_handle) {
		g_p_event_handler_manager->unregister_timer_event(this, m_timer_handle);
		m_timer_handle = NULL;
	}
	
	statistics_print();

	m_lock_tx.unlock();
	m_lock_rx.unlock();
	m_lock.unlock();

	pi_logfunc("done");
}

void pipeinfo::clean_obj()
{
	if (is_cleaned()) {
		return ;
	}

	set_cleaned();
	m_timer_handle = NULL;
	if (g_p_event_handler_manager->is_running()) {
		g_p_event_handler_manager->unregister_timers_event_and_delete(this);
	} else {
		cleanable_obj::clean_obj();
	}
}

int pipeinfo::fcntl(int __cmd, unsigned long int __arg)
{
	switch (__cmd) {
	case F_SETFL:
		{
			pi_logfunc("cmd=F_SETFL, arg=%#x", __cmd, __arg);
			if (__arg & O_NONBLOCK) {
				pi_logdbg("set to non-blocking mode");
				m_b_blocking = false;
			}
			else {
				pi_logdbg("set to blocked mode");
				m_b_blocking = true;
			}
			m_p_socket_stats->b_blocking = m_b_blocking;
		}
		break;

	case F_GETFL:		/* Get file status flags.  */
		pi_logfunc("F_GETFL, arg=%#x", __arg);
		break;

	case F_GETFD:		/* Get file descriptor flags.  */
		pi_logfunc("F_GETFD, arg=%#x", __arg);
		break;

	case F_SETFD:		/* Set file descriptor flags.  */
		pi_logfunc("F_SETFD, arg=%#x", __arg);
		break;

	default:
		pi_logfunc("cmd=%d, arg=%#x", __cmd, __arg);
		break;
	}

	return orig_os_api.fcntl(m_fd, __cmd, __arg);
}

int pipeinfo::ioctl(unsigned long int __request, unsigned long int __arg)
{
	int *p_arg = (int *)__arg;

	switch (__request) {
	case FIONBIO:
		{
			if (*p_arg) {
				pi_logdbg("FIONBIO, arg=%d - set to non-blocking mode", *p_arg);
				m_b_blocking = false;
			}
			else {
				pi_logdbg("FIONBIO, arg=%d - set to blocked mode", *p_arg);
				m_b_blocking = true;
			}

			m_p_socket_stats->b_blocking = m_b_blocking;
		}
		break;

	default:
		pi_logfunc("request=%d, arg=%#x", __request, __arg);
		break;
	}

	return orig_os_api.ioctl(m_fd, __request, __arg);
}

ssize_t pipeinfo::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov,
                     int* p_flags, sockaddr *__from, socklen_t *__fromlen, struct msghdr *__msg)
{
	pi_logfunc("");
	ssize_t ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, *p_flags, __from, __fromlen, __msg);
	save_stats_rx_os(ret);
	return ret;
}

void pipeinfo::handle_timer_expired(void* user_data)
{
	NOT_IN_USE(user_data);
	pi_logfunc("(m_write_count=%d)", m_write_count);
	m_lock_tx.lock();
	write_lbm_pipe_enhance();
	m_lock_tx.unlock();
}

ssize_t pipeinfo::tx(vma_tx_call_attr_t &tx_arg)
{
	const iovec* p_iov = tx_arg.attr.msg.iov;
	const ssize_t sz_iov = tx_arg.attr.msg.sz_iov;
	const int __flags = tx_arg.attr.msg.flags;
	const struct sockaddr *__to = tx_arg.attr.msg.addr;
	const socklen_t __tolen = tx_arg.attr.msg.len;
	ssize_t ret = -1;

	pi_logfunc("");
	m_lock_tx.lock();
	switch (tx_arg.opcode) {
	case  TX_WRITE:

		if ((safe_mce_sys().mce_spec == MCE_SPEC_29WEST_LBM_29 || safe_mce_sys().mce_spec == MCE_SPEC_WOMBAT_FH_LBM_554) && 
		    (p_iov[0].iov_len == 1) && (((char*)p_iov[0].iov_base)[0] == '\0')) {

			// We will pass one pipe write in every T usec
			//
			// 1) First signaling pipe write will go through, and triger timer logic
			// 2) Then we'll send a single pipe writes every T usec (safe_mce_sys().mce_spec_param1)
			// 3) We'll stop the timer once we have N cycles with no pipe write
			//

			m_write_count++;
			if (m_b_lbm_event_q_pipe_timer_on == false) {
				m_timer_handle = g_p_event_handler_manager->register_timer_event(safe_mce_sys().mce_spec_param1/1000, this, PERIODIC_TIMER, 0);
				m_b_lbm_event_q_pipe_timer_on = true;
				m_write_count_on_last_timer = 0;
				m_write_count_no_change_count = 0;

				pi_logdbg("\n\n\npipe_write DONE timer Reg\n\n\n");

				// simulate a pipe_write
				write_lbm_pipe_enhance();
			}
			else if ((int)m_write_count > (int)(m_write_count_on_last_timer + safe_mce_sys().mce_spec_param2)) {
				// simulate a pipe_write
				write_lbm_pipe_enhance();
			}

			ret = 1;
		}
		else {
			ret = orig_os_api.write(m_fd, p_iov[0].iov_base, p_iov[0].iov_len);
		}

		break;

	case  TX_SEND:
	case  TX_SENDTO:
	case  TX_SENDMSG:
	default:
		ret = socket_fd_api::tx_os(tx_arg.opcode, p_iov, sz_iov, __flags, __to, __tolen);
		break;
	}

	save_stats_tx_os(ret);
	m_lock_tx.unlock();
	return ret;
}

void pipeinfo::write_lbm_pipe_enhance()
{
	pi_logfunc("(m_write_count=%d)", m_write_count);

	if (m_write_count == m_write_count_on_last_timer) {
		// No pipe write happened during the last timer_expired()
		m_write_count_no_change_count++;

		// After 3 of these stop timer
		if (m_write_count_no_change_count >= 2 && m_b_lbm_event_q_pipe_timer_on) {
			if (m_timer_handle) {
				g_p_event_handler_manager->unregister_timer_event(this, m_timer_handle);
				m_timer_handle = NULL;
			}
			m_b_lbm_event_q_pipe_timer_on = false;

			pi_logfunc("pipe_write DONE timer Un-Reg");
		}
	}

	m_write_count = 0;
	m_write_count_no_change_count = 0;
	m_write_count_on_last_timer = 0;

	// Send the buffered data
	char buf[10] = "\0";
	orig_os_api.write(m_fd, buf, 1);
}

void pipeinfo::statistics_print()
{
	bool b_any_activiy = false;
	if (m_p_socket_stats->counters.n_tx_sent_byte_count || m_p_socket_stats->counters.n_tx_sent_pkt_count || m_p_socket_stats->counters.n_tx_errors || m_p_socket_stats->counters.n_tx_drops) {
		pi_logdbg_no_funcname("Tx Offload: %d KB / %d / %d / %d [bytes/packets/errors/drops]", m_p_socket_stats->counters.n_tx_sent_byte_count/1024, m_p_socket_stats->counters.n_tx_sent_pkt_count, m_p_socket_stats->counters.n_tx_errors, m_p_socket_stats->counters.n_tx_drops);
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_tx_os_bytes || m_p_socket_stats->counters.n_tx_os_packets || m_p_socket_stats->counters.n_tx_os_errors) {
		pi_logdbg_no_funcname("Tx OS info: %d KB / %d / %d [bytes/packets/errors]", m_p_socket_stats->counters.n_tx_os_bytes/1024, m_p_socket_stats->counters.n_tx_os_packets, m_p_socket_stats->counters.n_tx_os_errors);
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_rx_bytes || m_p_socket_stats->counters.n_rx_packets || m_p_socket_stats->counters.n_rx_errors || m_p_socket_stats->counters.n_rx_eagain) {
		pi_logdbg_no_funcname("Rx Offload: %d KB / %d / %d / %d [bytes/packets/errors/eagains]", m_p_socket_stats->counters.n_rx_bytes/1024, m_p_socket_stats->counters.n_rx_packets, m_p_socket_stats->counters.n_rx_errors, m_p_socket_stats->counters.n_rx_eagain);
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_rx_os_bytes || m_p_socket_stats->counters.n_rx_os_packets || m_p_socket_stats->counters.n_rx_os_errors) {
		pi_logdbg_no_funcname("Rx OS info: %d KB / %d / %d [bytes/packets/errors]", m_p_socket_stats->counters.n_rx_os_bytes/1024, m_p_socket_stats->counters.n_rx_os_packets, m_p_socket_stats->counters.n_rx_os_errors);
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_rx_poll_miss || m_p_socket_stats->counters.n_rx_poll_hit) {
		pi_logdbg_no_funcname("Rx poll: %d / %d (%2.2f%%) [miss/hit]", m_p_socket_stats->counters.n_rx_poll_miss, m_p_socket_stats->counters.n_rx_poll_hit,
			(float)(m_p_socket_stats->counters.n_rx_poll_hit * 100) / (float)(m_p_socket_stats->counters.n_rx_poll_miss + m_p_socket_stats->counters.n_rx_poll_hit));
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_rx_ready_byte_drop) {
		si_logdbg_no_funcname("Rx byte: max %d / dropped %d (%2.2f%%) [limit is %d]", m_p_socket_stats->counters.n_rx_ready_byte_max, m_p_socket_stats->counters.n_rx_ready_byte_drop,
			(m_p_socket_stats->counters.n_rx_packets ? (float)(m_p_socket_stats->counters.n_rx_ready_byte_drop * 100) / (float)m_p_socket_stats->counters.n_rx_packets : 0),
			m_p_socket_stats->n_rx_ready_byte_limit);
		b_any_activiy = true;
	}
	if (m_p_socket_stats->counters.n_rx_ready_pkt_drop) {
		si_logdbg_no_funcname("Rx pkt : max %d / dropped %d (%2.2f%%)", m_p_socket_stats->counters.n_rx_ready_pkt_max, m_p_socket_stats->counters.n_rx_ready_pkt_drop,
			(m_p_socket_stats->counters.n_rx_packets ? (float)(m_p_socket_stats->counters.n_rx_ready_pkt_drop * 100) / (float)m_p_socket_stats->counters.n_rx_packets : 0));
		b_any_activiy = true;
	}
	if (b_any_activiy == false) {
		pi_logdbg_no_funcname("Rx and Tx where not active");
	}
}

void pipeinfo::save_stats_rx_os(int bytes)
{
	if (bytes >= 0) {
		m_p_socket_stats->counters.n_rx_os_bytes += bytes;
		m_p_socket_stats->counters.n_rx_os_packets++;
	}else if ( errno == EAGAIN ){
		m_p_socket_stats->counters.n_rx_os_eagain++;
	}
	else {
		m_p_socket_stats->counters.n_rx_os_errors++;
	}
}

void pipeinfo::save_stats_tx_os(int bytes)
{
	if (bytes >= 0) {
		m_p_socket_stats->counters.n_tx_os_bytes += bytes;
		m_p_socket_stats->counters.n_tx_os_packets++;
	}else if ( errno == EAGAIN ){
		m_p_socket_stats->counters.n_rx_os_eagain++;
	}
	else {
		m_p_socket_stats->counters.n_tx_os_errors++;
	}
}