/*
* Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <vlogger/vlogger.h>
#include "utils/bullseye.h"
#include <vma/event/event_handler_manager.h>
#include "sock-redirect.h"
#include "pipeinfo.h"
#define MODULE_NAME "pi"
#undef VLOG_PRINTF
#define VLOG_PRINTF(log_level, log_fmt, log_args...) vlog_printf(log_level, "fd[%#x]:%s() " log_fmt "\n", m_fd, __FUNCTION__, ##log_args)
#define VLOG_PRINTF_DETAILS(log_level, log_fmt, log_args...) vlog_printf(log_level, MODULE_NAME ":%d:fd[%#x]:%s() " log_fmt "\n", __LINE__, m_fd, __FUNCTION__, ##log_args)
#define pi_logpanic(log_fmt, log_args...) VLOG_PRINTF(VLOG_PANIC, log_fmt, ##log_args); throw;
#define pi_logerr(log_fmt, log_args...) VLOG_PRINTF(VLOG_ERROR, log_fmt, ##log_args)
#define pi_logwarn(log_fmt, log_args...) VLOG_PRINTF(VLOG_WARNING, log_fmt, ##log_args)
#define pi_loginfo(log_fmt, log_args...) VLOG_PRINTF(VLOG_INFO, log_fmt, ##log_args)
#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_DEBUG)
#define pi_logdbg_no_funcname(log_fmt, log_args...) ((void)0)
#define pi_logdbg(log_fmt, log_args...) ((void)0)
#define si_logdbg_no_funcname(log_fmt, log_args...) ((void)0)
#else
#define pi_logdbg_no_funcname(log_fmt, log_args...) if (g_vlogger_level >= VLOG_DEBUG) vlog_printf(VLOG_DEBUG, MODULE_NAME ":%d:fd[%d]: " log_fmt "\n", __LINE__, m_fd, ##log_args)
#define pi_logdbg(log_fmt, log_args...) if (g_vlogger_level >= VLOG_DEBUG) VLOG_PRINTF_DETAILS(VLOG_DEBUG, log_fmt, ##log_args)
#define si_logdbg_no_funcname(log_fmt, log_args...) do { if (g_vlogger_level >= VLOG_DEBUG) vlog_printf(VLOG_DEBUG, MODULE_NAME "[fd=%d]:%d: " log_fmt "\n", m_fd, __LINE__, ##log_args); } while (0)
#endif
#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_FINE)
#define pi_logfunc(log_fmt, log_args...) ((void)0)
#else
#define pi_logfunc(log_fmt, log_args...) if (g_vlogger_level >= VLOG_FUNC) VLOG_PRINTF_DETAILS(VLOG_FUNC, log_fmt, ##log_args)
#endif
#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_FINER)
#define pi_logfuncall(log_fmt, log_args...) ((void)0)
#else
#define pi_logfuncall(log_fmt, log_args...) if (g_vlogger_level >= VLOG_FUNC_ALL) VLOG_PRINTF_DETAILS(VLOG_FUNC_ALL, log_fmt, ##log_args)
#endif /* VMA_MAX_DEFINED_LOG_LEVEL */
pipeinfo::pipeinfo(int fd) : socket_fd_api(fd),
m_lock("pipeinfo::m_lock"),
m_lock_rx("pipeinfo::m_lock_rx"),
m_lock_tx("pipeinfo::m_lock_tx")
{
pi_logfunc("");
m_b_closed = true;
m_timer_handle = NULL;
m_b_blocking = true;
m_p_socket_stats = NULL; // mce_stats_instance_create_socket_block();
if (m_p_socket_stats == NULL) {
// pi_logdbg("Got NULL from mce_stats_instance_create_socket_block, using local member");
m_p_socket_stats = &m_socket_stats;
}
m_p_socket_stats->reset();
m_p_socket_stats->fd = m_fd;
m_p_socket_stats->b_blocking = m_b_blocking;
m_p_socket_stats->n_rx_ready_pkt_count = 0;
m_p_socket_stats->counters.n_rx_ready_pkt_max = 0;
m_p_socket_stats->n_rx_ready_byte_count = 0;
m_p_socket_stats->n_tx_ready_byte_count = 0;
m_p_socket_stats->counters.n_rx_ready_byte_max = 0;
m_p_socket_stats->n_rx_zcopy_pkt_count = 0;
m_b_closed = false;
m_b_lbm_event_q_pipe_timer_on = false;
m_write_count = m_write_count_on_last_timer = 0;
m_write_count_no_change_count = 0;
pi_logfunc("done");
}
pipeinfo::~pipeinfo()
{
m_b_closed = true;
pi_logfunc("");
// Change to non-blocking socket so calling threads can exit
m_b_blocking = false;
m_lock_tx.lock();
m_lock_rx.lock();
m_lock.lock();
if (m_timer_handle) {
g_p_event_handler_manager->unregister_timer_event(this, m_timer_handle);
m_timer_handle = NULL;
}
statistics_print();
m_lock_tx.unlock();
m_lock_rx.unlock();
m_lock.unlock();
pi_logfunc("done");
}
void pipeinfo::clean_obj()
{
if (is_cleaned()) {
return ;
}
set_cleaned();
m_timer_handle = NULL;
if (g_p_event_handler_manager->is_running()) {
g_p_event_handler_manager->unregister_timers_event_and_delete(this);
} else {
cleanable_obj::clean_obj();
}
}
int pipeinfo::fcntl(int __cmd, unsigned long int __arg)
{
switch (__cmd) {
case F_SETFL:
{
pi_logfunc("cmd=F_SETFL, arg=%#x", __cmd, __arg);
if (__arg & O_NONBLOCK) {
pi_logdbg("set to non-blocking mode");
m_b_blocking = false;
}
else {
pi_logdbg("set to blocked mode");
m_b_blocking = true;
}
m_p_socket_stats->b_blocking = m_b_blocking;
}
break;
case F_GETFL: /* Get file status flags. */
pi_logfunc("F_GETFL, arg=%#x", __arg);
break;
case F_GETFD: /* Get file descriptor flags. */
pi_logfunc("F_GETFD, arg=%#x", __arg);
break;
case F_SETFD: /* Set file descriptor flags. */
pi_logfunc("F_SETFD, arg=%#x", __arg);
break;
default:
pi_logfunc("cmd=%d, arg=%#x", __cmd, __arg);
break;
}
return orig_os_api.fcntl(m_fd, __cmd, __arg);
}
int pipeinfo::ioctl(unsigned long int __request, unsigned long int __arg)
{
int *p_arg = (int *)__arg;
switch (__request) {
case FIONBIO:
{
if (*p_arg) {
pi_logdbg("FIONBIO, arg=%d - set to non-blocking mode", *p_arg);
m_b_blocking = false;
}
else {
pi_logdbg("FIONBIO, arg=%d - set to blocked mode", *p_arg);
m_b_blocking = true;
}
m_p_socket_stats->b_blocking = m_b_blocking;
}
break;
default:
pi_logfunc("request=%d, arg=%#x", __request, __arg);
break;
}
return orig_os_api.ioctl(m_fd, __request, __arg);
}
ssize_t pipeinfo::rx(const rx_call_t call_type, iovec* p_iov, ssize_t sz_iov,
int* p_flags, sockaddr *__from, socklen_t *__fromlen, struct msghdr *__msg)
{
pi_logfunc("");
ssize_t ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, *p_flags, __from, __fromlen, __msg);
save_stats_rx_os(ret);
return ret;
}
void pipeinfo::handle_timer_expired(void* user_data)
{
NOT_IN_USE(user_data);
pi_logfunc("(m_write_count=%d)", m_write_count);
m_lock_tx.lock();
write_lbm_pipe_enhance();
m_lock_tx.unlock();
}
ssize_t pipeinfo::tx(vma_tx_call_attr_t &tx_arg)
{
const iovec* p_iov = tx_arg.attr.msg.iov;
const ssize_t sz_iov = tx_arg.attr.msg.sz_iov;
const int __flags = tx_arg.attr.msg.flags;
const struct sockaddr *__to = tx_arg.attr.msg.addr;
const socklen_t __tolen = tx_arg.attr.msg.len;
ssize_t ret = -1;
pi_logfunc("");
m_lock_tx.lock();
switch (tx_arg.opcode) {
case TX_WRITE:
if ((safe_mce_sys().mce_spec == MCE_SPEC_29WEST_LBM_29 || safe_mce_sys().mce_spec == MCE_SPEC_WOMBAT_FH_LBM_554) &&
(p_iov[0].iov_len == 1) && (((char*)p_iov[0].iov_base)[0] == '\0')) {
// We will pass one pipe write in every T usec
//
// 1) First signaling pipe write will go through, and triger timer logic
// 2) Then we'll send a single pipe writes every T usec (safe_mce_sys().mce_spec_param1)
// 3) We'll stop the timer once we have N cycles with no pipe write
//
m_write_count++;
if (m_b_lbm_event_q_pipe_timer_on == false) {
m_timer_handle = g_p_event_handler_manager->register_timer_event(safe_mce_sys().mce_spec_param1/1000, this, PERIODIC_TIMER, 0);
m_b_lbm_event_q_pipe_timer_on = true;
m_write_count_on_last_timer = 0;
m_write_count_no_change_count = 0;
pi_logdbg("\n\n\npipe_write DONE timer Reg\n\n\n");
// simulate a pipe_write
write_lbm_pipe_enhance();
}
else if ((int)m_write_count > (int)(m_write_count_on_last_timer + safe_mce_sys().mce_spec_param2)) {
// simulate a pipe_write
write_lbm_pipe_enhance();
}
ret = 1;
}
else {
ret = orig_os_api.write(m_fd, p_iov[0].iov_base, p_iov[0].iov_len);
}
break;
case TX_SEND:
case TX_SENDTO:
case TX_SENDMSG:
default:
ret = socket_fd_api::tx_os(tx_arg.opcode, p_iov, sz_iov, __flags, __to, __tolen);
break;
}
save_stats_tx_os(ret);
m_lock_tx.unlock();
return ret;
}
void pipeinfo::write_lbm_pipe_enhance()
{
pi_logfunc("(m_write_count=%d)", m_write_count);
if (m_write_count == m_write_count_on_last_timer) {
// No pipe write happened during the last timer_expired()
m_write_count_no_change_count++;
// After 3 of these stop timer
if (m_write_count_no_change_count >= 2 && m_b_lbm_event_q_pipe_timer_on) {
if (m_timer_handle) {
g_p_event_handler_manager->unregister_timer_event(this, m_timer_handle);
m_timer_handle = NULL;
}
m_b_lbm_event_q_pipe_timer_on = false;
pi_logfunc("pipe_write DONE timer Un-Reg");
}
}
m_write_count = 0;
m_write_count_no_change_count = 0;
m_write_count_on_last_timer = 0;
// Send the buffered data
char buf[10] = "\0";
orig_os_api.write(m_fd, buf, 1);
}
void pipeinfo::statistics_print()
{
bool b_any_activiy = false;
if (m_p_socket_stats->counters.n_tx_sent_byte_count || m_p_socket_stats->counters.n_tx_sent_pkt_count || m_p_socket_stats->counters.n_tx_errors || m_p_socket_stats->counters.n_tx_drops) {
pi_logdbg_no_funcname("Tx Offload: %d KB / %d / %d / %d [bytes/packets/errors/drops]", m_p_socket_stats->counters.n_tx_sent_byte_count/1024, m_p_socket_stats->counters.n_tx_sent_pkt_count, m_p_socket_stats->counters.n_tx_errors, m_p_socket_stats->counters.n_tx_drops);
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_tx_os_bytes || m_p_socket_stats->counters.n_tx_os_packets || m_p_socket_stats->counters.n_tx_os_errors) {
pi_logdbg_no_funcname("Tx OS info: %d KB / %d / %d [bytes/packets/errors]", m_p_socket_stats->counters.n_tx_os_bytes/1024, m_p_socket_stats->counters.n_tx_os_packets, m_p_socket_stats->counters.n_tx_os_errors);
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_rx_bytes || m_p_socket_stats->counters.n_rx_packets || m_p_socket_stats->counters.n_rx_errors || m_p_socket_stats->counters.n_rx_eagain) {
pi_logdbg_no_funcname("Rx Offload: %d KB / %d / %d / %d [bytes/packets/errors/eagains]", m_p_socket_stats->counters.n_rx_bytes/1024, m_p_socket_stats->counters.n_rx_packets, m_p_socket_stats->counters.n_rx_errors, m_p_socket_stats->counters.n_rx_eagain);
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_rx_os_bytes || m_p_socket_stats->counters.n_rx_os_packets || m_p_socket_stats->counters.n_rx_os_errors) {
pi_logdbg_no_funcname("Rx OS info: %d KB / %d / %d [bytes/packets/errors]", m_p_socket_stats->counters.n_rx_os_bytes/1024, m_p_socket_stats->counters.n_rx_os_packets, m_p_socket_stats->counters.n_rx_os_errors);
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_rx_poll_miss || m_p_socket_stats->counters.n_rx_poll_hit) {
pi_logdbg_no_funcname("Rx poll: %d / %d (%2.2f%%) [miss/hit]", m_p_socket_stats->counters.n_rx_poll_miss, m_p_socket_stats->counters.n_rx_poll_hit,
(float)(m_p_socket_stats->counters.n_rx_poll_hit * 100) / (float)(m_p_socket_stats->counters.n_rx_poll_miss + m_p_socket_stats->counters.n_rx_poll_hit));
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_rx_ready_byte_drop) {
si_logdbg_no_funcname("Rx byte: max %d / dropped %d (%2.2f%%) [limit is %d]", m_p_socket_stats->counters.n_rx_ready_byte_max, m_p_socket_stats->counters.n_rx_ready_byte_drop,
(m_p_socket_stats->counters.n_rx_packets ? (float)(m_p_socket_stats->counters.n_rx_ready_byte_drop * 100) / (float)m_p_socket_stats->counters.n_rx_packets : 0),
m_p_socket_stats->n_rx_ready_byte_limit);
b_any_activiy = true;
}
if (m_p_socket_stats->counters.n_rx_ready_pkt_drop) {
si_logdbg_no_funcname("Rx pkt : max %d / dropped %d (%2.2f%%)", m_p_socket_stats->counters.n_rx_ready_pkt_max, m_p_socket_stats->counters.n_rx_ready_pkt_drop,
(m_p_socket_stats->counters.n_rx_packets ? (float)(m_p_socket_stats->counters.n_rx_ready_pkt_drop * 100) / (float)m_p_socket_stats->counters.n_rx_packets : 0));
b_any_activiy = true;
}
if (b_any_activiy == false) {
pi_logdbg_no_funcname("Rx and Tx where not active");
}
}
void pipeinfo::save_stats_rx_os(int bytes)
{
if (bytes >= 0) {
m_p_socket_stats->counters.n_rx_os_bytes += bytes;
m_p_socket_stats->counters.n_rx_os_packets++;
}else if ( errno == EAGAIN ){
m_p_socket_stats->counters.n_rx_os_eagain++;
}
else {
m_p_socket_stats->counters.n_rx_os_errors++;
}
}
void pipeinfo::save_stats_tx_os(int bytes)
{
if (bytes >= 0) {
m_p_socket_stats->counters.n_tx_os_bytes += bytes;
m_p_socket_stats->counters.n_tx_os_packets++;
}else if ( errno == EAGAIN ){
m_p_socket_stats->counters.n_rx_os_eagain++;
}
else {
m_p_socket_stats->counters.n_tx_os_errors++;
}
}