Blob Blame History Raw
/*
 * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */


#include "event_handler_manager.h"

#include <sys/epoll.h>
#include <rdma/rdma_cma.h>
#include <vma/dev/net_device_table_mgr.h>
#include "vma/dev/ring_allocation_logic.h"
#include "vma/sock/fd_collection.h"
#include "vma/sock/sock-redirect.h" // calling orig_os_api.epoll()
#include "timer_handler.h"
#include "event_handler_ibverbs.h"
#include "event_handler_rdma_cm.h"

#include "vma/util/instrumentation.h"

#define MODULE_NAME 		"evh:"

#define evh_logpanic		__log_panic
#define evh_logerr		__log_err
#define evh_logwarn		__log_warn
#define evh_loginfo		__log_info
#define evh_logdbg		__log_dbg
#define evh_logfunc		__log_func
#define evh_logfuncall		__log_funcall

#undef  VLOG_PRINTF_ENTRY
#define VLOG_PRINTF_ENTRY(log_level, log_fmt, log_args...) 	vlog_printf(log_level, MODULE_NAME "%d:%s(" log_fmt ")\n", __LINE__, __FUNCTION__, ##log_args)

#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_DEBUG)
#define evh_logdbg_entry(log_fmt, log_args...)                  ((void)0)
#else
#define evh_logdbg_entry(log_fmt, log_args...)                  do { if (g_vlogger_level >= VLOG_DEBUG) VLOG_PRINTF_ENTRY(VLOG_DEBUG, log_fmt, ##log_args); } while (0)
#endif

#if (VMA_MAX_DEFINED_LOG_LEVEL < DEFINED_VLOG_FINE)
#define evh_logfunc_entry(log_fmt, log_args...)                 ((void)0)
#else
#define evh_logfunc_entry(log_fmt, log_args...)			do { if (g_vlogger_level >= VLOG_FUNC)	VLOG_PRINTF_ENTRY(VLOG_FUNC, log_fmt, ##log_args); } while (0)
#endif /* VMA_MAX_DEFINED_LOG_LEVEL */


#define INITIAL_EVENTS_NUM      64

event_handler_manager* g_p_event_handler_manager = NULL;

pthread_t g_n_internal_thread_id = 0;


void* event_handler_manager::register_timer_event(int timeout_msec, timer_handler* handler, 
						  timer_req_type_t req_type, void* user_data,
						  timers_group* group /* = NULL */)
{
	evh_logdbg("timer handler '%p' registered %s timer for %d msec (user data: %X)",
		   handler, timer_req_type_str(req_type), timeout_msec, user_data);
	BULLSEYE_EXCLUDE_BLOCK_START
	if (!handler || (req_type < 0 || req_type >= INVALID_TIMER)) {
		evh_logwarn("bad timer type (%d) or handler (%p)", req_type, handler);
		return NULL;
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	// malloc here the timer list node in order to return it to the app
	void* node = calloc(1, sizeof(struct timer_node_t)); 
	BULLSEYE_EXCLUDE_BLOCK_START
	if (!node) {
		evh_logdbg("malloc failure");
		throw_vma_exception("malloc failure");
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	timer_node_t* timer_node = (timer_node_t*)node;
	timer_node->lock_timer=lock_spin_recursive("timer");

	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = REGISTER_TIMER;
	reg_action.info.timer.handler = handler;
	reg_action.info.timer.user_data = user_data;
	reg_action.info.timer.group = group;
	reg_action.info.timer.node = node;
	reg_action.info.timer.timeout_msec = timeout_msec;
	reg_action.info.timer.req_type = req_type;
	post_new_reg_action(reg_action);
	return node;
}

void event_handler_manager::wakeup_timer_event(timer_handler* handler, void* node)
{
	evh_logdbg("timer handler '%p'", handler);
	BULLSEYE_EXCLUDE_BLOCK_START
	if (!handler) {
		evh_logwarn("bad handler (%p)", handler);
		return;
	}
	BULLSEYE_EXCLUDE_BLOCK_END
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = WAKEUP_TIMER;
	reg_action.info.timer.handler = handler;
	reg_action.info.timer.node = node;
	post_new_reg_action(reg_action);
	return;
}

void event_handler_manager::unregister_timer_event(timer_handler* handler, void* node)
{
	evh_logdbg("timer handler '%p'", handler);
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = UNREGISTER_TIMER;
	reg_action.info.timer.handler = handler;
	reg_action.info.timer.node = node;

	/* Special protection is needed to avoid scenario when deregistration is done
	 * during timer_handler object destruction, timer node itself is not removed
	 * and time for this timer node is expired. In this case there is no guarantee
	 * to operate with timer_handler object.
	 * See timer::process_registered_timers()
	 * Do just lock() to protect timer_handler inside process_registered_timers()
	 */
	if (node) {
		timer_node_t* timer_node = (timer_node_t*)node;
		timer_node->lock_timer.lock();
	}

	post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_timers_event_and_delete(timer_handler* handler)
{
	evh_logdbg("timer handler '%p'", handler);
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = UNREGISTER_TIMERS_AND_DELETE;
	reg_action.info.timer.handler = handler;
	post_new_reg_action(reg_action);
}

void event_handler_manager::register_ibverbs_event(int fd, event_handler_ibverbs *handler, 
						   void* channel, void* user_data)
{
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = REGISTER_IBVERBS;
	reg_action.info.ibverbs.fd = fd;
	reg_action.info.ibverbs.handler = handler;
	reg_action.info.ibverbs.channel = channel;
	reg_action.info.ibverbs.user_data = user_data;
	post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_ibverbs_event(int fd, event_handler_ibverbs* handler)
{
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = UNREGISTER_IBVERBS;
	reg_action.info.ibverbs.fd = fd;
	reg_action.info.ibverbs.handler = handler;
	post_new_reg_action(reg_action);
}

void event_handler_manager::register_rdma_cm_event(int fd, void* id, void* cma_channel, event_handler_rdma_cm* handler)
{
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = REGISTER_RDMA_CM;
	reg_action.info.rdma_cm.fd = fd;
	reg_action.info.rdma_cm.id = id;
	reg_action.info.rdma_cm.handler = handler;
	reg_action.info.rdma_cm.cma_channel = cma_channel;
	post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_rdma_cm_event(int fd, void* id)
{
	reg_action_t reg_action;
	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = UNREGISTER_RDMA_CM;
	reg_action.info.rdma_cm.fd = fd;
	reg_action.info.rdma_cm.id = id;
	post_new_reg_action(reg_action);
}

void event_handler_manager::register_command_event(int fd, command* cmd)
{
	reg_action_t reg_action;

	evh_logdbg("Register command %s event", cmd->to_str().c_str());

	memset(&reg_action, 0, sizeof(reg_action));
	reg_action.type = REGISTER_COMMAND;
	reg_action.info.cmd.fd = fd;
	reg_action.info.cmd.cmd = cmd;
	post_new_reg_action(reg_action);

}

event_handler_manager::event_handler_manager() :
		m_reg_action_q_lock("reg_action_q_lock"),
		m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq_enabled),
		m_n_sysvar_vma_time_measure_num_samples(safe_mce_sys().vma_time_measure_num_samples),
		m_n_sysvar_timer_resolution_msec(safe_mce_sys().timer_resolution_msec)
{
	evh_logfunc("");

	m_cq_epfd = 0;

	m_epfd = orig_os_api.epoll_create(INITIAL_EVENTS_NUM);
	BULLSEYE_EXCLUDE_BLOCK_START
	if (m_epfd == -1) {
		evh_logdbg("epoll_create failed on ibv device collection (errno=%d %m)", errno);
		free_evh_resources();
		throw_vma_exception("epoll_create failed on ibv device collection");
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	m_b_continue_running = true;
	m_event_handler_tid = 0;

	wakeup_set_epoll_fd(m_epfd);
	going_to_sleep();

	return;
}

event_handler_manager::~event_handler_manager()
{
	free_evh_resources();
}

void event_handler_manager::free_evh_resources()
{
	evh_logfunc("");

	// Flag thread to stop on next loop
	stop_thread();
	evh_logfunc("Thread stopped");
}

// event handler main thread startup
void* event_handler_thread(void *_p_tgtObject)
{
	event_handler_manager* p_tgtObject = (event_handler_manager*)_p_tgtObject;
	g_n_internal_thread_id = pthread_self();
	evh_logdbg("Entering internal thread, id = %lu", g_n_internal_thread_id);

	if (strcmp(safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET)) {
		std::string tasks_file(safe_mce_sys().internal_thread_cpuset);
		tasks_file += "/tasks";
		FILE *fp = fopen(tasks_file.c_str(), "w");
		BULLSEYE_EXCLUDE_BLOCK_START
		if (fp == NULL) {
			evh_logpanic("Failed to open %s for writing", tasks_file.c_str());
		}
		if (fprintf(fp, "%d", gettid()) <= 0) {
			fclose(fp);
			evh_logpanic("Failed to add internal thread id to %s", tasks_file.c_str());
		}
		BULLSEYE_EXCLUDE_BLOCK_END
		fclose(fp);
		evh_logdbg("VMA Internal thread added to cpuset %s.", safe_mce_sys().internal_thread_cpuset);

		// do set affinity now that we are on correct cpuset
		cpu_set_t cpu_set = safe_mce_sys().internal_thread_affinity;
		if ( strcmp(safe_mce_sys().internal_thread_affinity_str, "-1")) {
			if (pthread_setaffinity_np(g_n_internal_thread_id, sizeof(cpu_set), &cpu_set)) {
				evh_logdbg("VMA Internal thread affinity failed. Did you try to set affinity outside of cpuset?");
			} else {
				evh_logdbg("VMA Internal thread affinity is set.");
			}
		} else {
			evh_logdbg("VMA Internal thread affinity not set.");
		}
	/* cppcheck-suppress resourceLeak */
	}

	void* ret = p_tgtObject->thread_loop();
	evh_logdbg("Ending internal thread");
	return ret;
}

int event_handler_manager::start_thread()
{
	cpu_set_t cpu_set;
	pthread_attr_t tattr;

	if (!m_b_continue_running)
		return -1;

	if (m_event_handler_tid != 0)
		return 0;

	//m_reg_action_q.reserve(); //todo change to vector and reserve

	BULLSEYE_EXCLUDE_BLOCK_START
	if (pthread_attr_init(&tattr)) {
		evh_logpanic("Failed to initialize thread attributes");
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	cpu_set = safe_mce_sys().internal_thread_affinity;
	if ( strcmp(safe_mce_sys().internal_thread_affinity_str, "-1") && !strcmp(safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET)) { // no affinity
		BULLSEYE_EXCLUDE_BLOCK_START
		if (pthread_attr_setaffinity_np(&tattr, sizeof(cpu_set), &cpu_set)) {
			evh_logpanic("Failed to set CPU affinity");
		}
		BULLSEYE_EXCLUDE_BLOCK_END
	}
	else {
		evh_logdbg("VMA Internal thread affinity not set.");
	}


	int ret = pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this);	
	if (ret) {
		// maybe it's the cset issue? try without affinity
		evh_logwarn("Failed to start event handler thread with thread affinity - trying without. [errno=%d %s]",
		            ret, strerror(ret));
		BULLSEYE_EXCLUDE_BLOCK_START
		if (pthread_attr_init(&tattr)) {
			evh_logpanic("Failed to initialize thread attributes");
		}
		if (pthread_create(&m_event_handler_tid, &tattr, event_handler_thread, this)) {
			evh_logpanic("Failed to start event handler thread");
		}
		BULLSEYE_EXCLUDE_BLOCK_END
	}

	pthread_attr_destroy(&tattr);

	evh_logdbg("Started event handler thread");
	return 0;
}

void event_handler_manager::stop_thread()
{
	if (!m_b_continue_running)
		return;
	m_b_continue_running = false;

	if(!g_is_forked_child){

		do_wakeup();

		// Wait for thread exit
		if (m_event_handler_tid) {
			pthread_join(m_event_handler_tid, 0);
			evh_logdbg("event handler thread stopped");
		}
		else {
			evh_logdbg("event handler thread not running");
		}
	}
	m_event_handler_tid = 0;

	// Close main epfd and signaling socket
	orig_os_api.close(m_epfd);
	m_epfd = -1;
}

void event_handler_manager::update_epfd(int fd, int operation, int events)
{
	epoll_event ev = {0, {0}};

	if (m_epfd < 0) {
		return;
	}

	ev.events = events;
	ev.data.fd = fd;
	BULLSEYE_EXCLUDE_BLOCK_START
	if ((orig_os_api.epoll_ctl(m_epfd, operation, fd, &ev) < 0) &&
		(!(errno == ENOENT || errno == EBADF))) {
		const char* operation_str[] = {"", "ADD", "DEL", "MOD"};
		evh_logerr("epoll_ctl(%d, %s, fd=%d) failed (errno=%d %m)",
			   m_epfd, operation_str[operation], fd, errno);
	}
	BULLSEYE_EXCLUDE_BLOCK_END
}

const char* event_handler_manager::reg_action_str(event_action_type_e reg_action_type)
{
	switch (reg_action_type) {
	case REGISTER_TIMER:					return "REGISTER_TIMER";
	case UNREGISTER_TIMER:					return "UNREGISTER_TIMER";
	case UNREGISTER_TIMERS_AND_DELETE:			return "UNREGISTER_TIMERS_AND_DELETE";
	case REGISTER_IBVERBS:					return "REGISTER_IBVERBS";
	case UNREGISTER_IBVERBS:				return "UNREGISTER_IBVERBS";
	case REGISTER_RDMA_CM:					return "REGISTER_RDMA_CM";
	case UNREGISTER_RDMA_CM:				return "UNREGISTER_RDMA_CM";
	case REGISTER_COMMAND:					return "REGISTER_COMMAND";
	case UNREGISTER_COMMAND:				return "UNREGISTER_COMMAND";
	BULLSEYE_EXCLUDE_BLOCK_START
	default:						return "UNKNOWN";
	BULLSEYE_EXCLUDE_BLOCK_END
	}
}

//get new action of event (register / unregister), and post to the thread's pipe
void event_handler_manager::post_new_reg_action(reg_action_t& reg_action)
{
	if (!m_b_continue_running)
		return;

	start_thread();

	evh_logfunc("add event action %s (%d)", reg_action_str(reg_action.type), reg_action.type);

	m_reg_action_q_lock.lock();
	m_reg_action_q.push_back(reg_action);
	m_reg_action_q_lock.unlock();
	do_wakeup();
}

void event_handler_manager::priv_register_timer_handler(timer_reg_info_t& info)
{
	if (info.group) {
		info.group->add_new_timer((timer_node_t*)info.node, info.handler, info.user_data);
	} else {
		m_timer.add_new_timer(info.timeout_msec, (timer_node_t*)info.node,
			       info.handler, info.user_data, info.req_type);
	}
}

void event_handler_manager::priv_wakeup_timer_handler(timer_reg_info_t& info)
{
	timer_node_t* node = (timer_node_t*)info.node;
	if (node && !node->group) {
		m_timer.wakeup_timer(node);
	}
}

void event_handler_manager::priv_unregister_timer_handler(timer_reg_info_t& info)
{
	timer_node_t* node = (timer_node_t*)info.node;
	if (node && node->group) {
		node->group->remove_timer((timer_node_t*)info.node);
	} else {
		m_timer.remove_timer(node, info.handler);
	}
}

void event_handler_manager::priv_unregister_all_handler_timers(timer_reg_info_t& info)
{
	m_timer.remove_all_timers(info.handler);
}

void event_handler_manager::priv_prepare_ibverbs_async_event_queue(event_handler_map_t::iterator& i)
{
	evh_logdbg_entry("");

	int cnt = 0;
	struct pollfd poll_fd = { /*.fd=*/ 0, /*.events=*/ POLLIN, /*.revents=*/ 0};

	if (i == m_event_handler_map.end()) {
		evh_logdbg("No event handler");
		return;
	}

	poll_fd.fd = i->second.ibverbs_ev.fd;

	// change the blocking mode of the async event queue
	set_fd_block_mode(poll_fd.fd, false);

	// empty the async event queue
	while (orig_os_api.poll(&poll_fd, 1, 0) > 0) {
		process_ibverbs_event(i);
		cnt++;
	}
	evh_logdbg("Emptied %d Events", cnt);
}

void event_handler_manager::priv_register_ibverbs_events(ibverbs_reg_info_t& info)
{
	event_handler_map_t::iterator i;
	i = m_event_handler_map.find(info.fd);
	if (i == m_event_handler_map.end()) {
		event_data_t v;

		v.type                  = EV_IBVERBS;
		v.ibverbs_ev.fd         = info.fd;
		v.ibverbs_ev.channel    = info.channel;

		/* coverity[uninit_use_in_call] */
		/* cppcheck-suppress uninitStructMember */
		m_event_handler_map[info.fd] = v;
		i = m_event_handler_map.find(info.fd);

		priv_prepare_ibverbs_async_event_queue(i);

		update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
		evh_logdbg("%d added to event_handler_map_t!", info.fd);
	}
	BULLSEYE_EXCLUDE_BLOCK_START
	if (i->second.type != EV_IBVERBS) {
		evh_logerr("fd=%d: is already handling events of different type", info.fd);
		return;
	}

	ibverbs_event_map_t::iterator j;
	j = i->second.ibverbs_ev.ev_map.find(info.handler);
	if (j != i->second.ibverbs_ev.ev_map.end()) {
		evh_logerr("Event for %d/%p already registered", info.fd, info.handler);
		return;
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	ibverbs_event_t vv;
	vv.handler = info.handler;
	vv.user_data = info.user_data;
	i->second.ibverbs_ev.ev_map[info.handler] = vv;

	return;
}

void event_handler_manager::priv_unregister_ibverbs_events(ibverbs_reg_info_t& info)
{

	event_handler_map_t::iterator i;
	ibverbs_event_map_t::iterator j;
	int n = 0;

	i = m_event_handler_map.find(info.fd);

	BULLSEYE_EXCLUDE_BLOCK_START
	if (i == m_event_handler_map.end()) {
		evh_logerr("Event for %d/%p already does not exist", info.fd, info.handler);
		return;
	}

	if (i->second.type != EV_IBVERBS) {
		evh_logerr("fd=%d: is already handling events of different type", info.fd);
		return;
	}

	n = i->second.ibverbs_ev.ev_map.size();

	if (n < 1) {
		evh_logerr("Event for %d/%p already does not exist", info.fd, info.handler);
		return;

	}

	j = i->second.ibverbs_ev.ev_map.find(info.handler);
	if (j == i->second.ibverbs_ev.ev_map.end()) {
		evh_logerr("event for %d/%p does not exist", info.fd, info.handler);
		return;
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	i->second.ibverbs_ev.ev_map.erase(j);
	if (n == 1) {
		update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
		m_event_handler_map.erase(i);
		evh_logdbg("%d erased from event_handler_map_t!", info.fd);
	}
}

void event_handler_manager::priv_register_rdma_cm_events(rdma_cm_reg_info_t& info)
{
	evh_logfunc_entry("fd=%d, event_handler_id=%p", info.fd, info.id);

	// Handle the new registration
	event_handler_map_t::iterator iter_fd = m_event_handler_map.find(info.fd);
	if (iter_fd == m_event_handler_map.end()) {
		evh_logdbg("Adding new channel (fd %d, id %#x, handler %p)", info.fd, info.id, info.handler);
		event_data_t map_value;

		map_value.type = EV_RDMA_CM;
		map_value.rdma_cm_ev.n_ref_count = 1;
		map_value.rdma_cm_ev.map_rdma_cm_id[info.id] = info.handler;
		map_value.rdma_cm_ev.cma_channel = info.cma_channel;

		/* coverity[uninit_use_in_call] */
		/* cppcheck-suppress uninitStructMember */
		m_event_handler_map[info.fd] = map_value;

		update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
	}
	else {
		BULLSEYE_EXCLUDE_BLOCK_START
		if (iter_fd->second.type != EV_RDMA_CM) {
			evh_logerr("fd=%d: is already handling events of different type", info.fd);
			return;
		}
		event_handler_rdma_cm_map_t::iterator iter_id = iter_fd->second.rdma_cm_ev.map_rdma_cm_id.find(info.id);
		if (iter_id == iter_fd->second.rdma_cm_ev.map_rdma_cm_id.end()) {
			evh_logdbg("Adding to exitsing channel fd %d (id %#x, handler %p)", info.fd, info.id, info.handler);
			iter_fd->second.rdma_cm_ev.map_rdma_cm_id[info.id] = info.handler;
			iter_fd->second.rdma_cm_ev.n_ref_count++;
			if (iter_fd->second.rdma_cm_ev.cma_channel != info.cma_channel) {
				evh_logerr("Trying to change the channel processing cb's on a registered fd %d (by id %#x)", info.fd, info.id);
			}
		}
		else {
			evh_logerr("Channel-id pair <%d, %#x> already registered (handler %p)", info.fd, info.id, info.handler);
		}
		BULLSEYE_EXCLUDE_BLOCK_END
	}
}

void event_handler_manager::priv_unregister_rdma_cm_events(rdma_cm_reg_info_t& info)
{
	evh_logfunc_entry("fd=%d, id=%p", info.fd, info.id);

	event_handler_map_t::iterator iter_fd = m_event_handler_map.find(info.fd);
	if (iter_fd != m_event_handler_map.end()) {
		BULLSEYE_EXCLUDE_BLOCK_START
		if (iter_fd->second.type != EV_RDMA_CM) {
			evh_logerr("fd=%d: is already handling events of different type", info.fd);
			return;
		}
		BULLSEYE_EXCLUDE_BLOCK_END
		event_handler_rdma_cm_map_t::iterator iter_id = iter_fd->second.rdma_cm_ev.map_rdma_cm_id.find(info.id);
		BULLSEYE_EXCLUDE_BLOCK_START
		if (iter_id != iter_fd->second.rdma_cm_ev.map_rdma_cm_id.end()) {
		BULLSEYE_EXCLUDE_BLOCK_END
			evh_logdbg("Removing from channel %d, id %p", info.fd, info.id);
			iter_fd->second.rdma_cm_ev.map_rdma_cm_id.erase(iter_id);
			iter_fd->second.rdma_cm_ev.n_ref_count--;
			if (iter_fd->second.rdma_cm_ev.n_ref_count == 0) {
				update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
				m_event_handler_map.erase(iter_fd);
				evh_logdbg("Removed channel <%d %p>", info.fd, info.id);
			}
		}
		else {
			evh_logerr("Channel-id pair <%d %p> not found", info.fd, info.id);
		}
	}
	else {
		evh_logdbg("Channel %d not found", info.fd);
	}
}

void event_handler_manager::priv_register_command_events(command_reg_info_t& info)
{
	// In case this is new registration need to add netlink fd to the epfd
	event_handler_map_t::iterator iter_fd = m_event_handler_map.find(info.fd);
	if (iter_fd == m_event_handler_map.end()) {
		evh_logdbg("Adding new channel (fd %d)", info.fd);
		event_data_t map_value;

		map_value.type = EV_COMMAND;
		map_value.command_ev.cmd = info.cmd;

		/* coverity[uninit_use_in_call] */
		/* cppcheck-suppress uninitStructMember */
		m_event_handler_map[info.fd] = map_value;
		update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
	}

}

void event_handler_manager::priv_unregister_command_events(command_reg_info_t& info)
{

	event_handler_map_t::iterator iter_fd = m_event_handler_map.find(info.fd);
	if (iter_fd == m_event_handler_map.end()) {
		evh_logdbg(" channel wasn't found (fd %d)", info.fd);

	}
	else if(iter_fd->first != EV_COMMAND){
		evh_logdbg(" This fd (%d) no longer COMMAND type fd", info.fd);
	}
	else {
		update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
	}
}

void event_handler_manager::handle_registration_action(reg_action_t& reg_action)
{
	if (!m_b_continue_running)
		return;

	evh_logfunc("event action %d", reg_action.type);
	switch (reg_action.type) {
	case REGISTER_TIMER:
		priv_register_timer_handler(reg_action.info.timer);
		break;
	case WAKEUP_TIMER:
		priv_wakeup_timer_handler(reg_action.info.timer);
		break;
	case UNREGISTER_TIMER:
		priv_unregister_timer_handler(reg_action.info.timer);
		break;
	case REGISTER_IBVERBS:
		priv_register_ibverbs_events(reg_action.info.ibverbs);
		break;
	case UNREGISTER_IBVERBS:
		priv_unregister_ibverbs_events(reg_action.info.ibverbs);
		break;
	case REGISTER_RDMA_CM:
		priv_register_rdma_cm_events(reg_action.info.rdma_cm);
		break;
	case UNREGISTER_RDMA_CM:
		priv_unregister_rdma_cm_events(reg_action.info.rdma_cm);
		break;
	case REGISTER_COMMAND:
		priv_register_command_events(reg_action.info.cmd);
		break;
	case UNREGISTER_COMMAND:
		priv_unregister_command_events(reg_action.info.cmd);
		break;
	case UNREGISTER_TIMERS_AND_DELETE:
		priv_unregister_all_handler_timers(reg_action.info.timer);
		delete reg_action.info.timer.handler;
		reg_action.info.timer.handler = NULL;
		break;
	BULLSEYE_EXCLUDE_BLOCK_START
	default:
		evh_logerr("illegal event action! (%d)", reg_action.type);
		break;
	BULLSEYE_EXCLUDE_BLOCK_END
	}
	return;
}

void event_handler_manager::query_for_ibverbs_event(int async_fd)
{
	evh_logfunc_entry("");

	struct pollfd poll_fd;
	event_handler_map_t::iterator i;

	poll_fd.events  = POLLIN | POLLPRI;
	poll_fd.revents = 0;
	poll_fd.fd = async_fd;

	// ibverbs events should be read only from the internal thread context
	if (pthread_self() != m_event_handler_tid) {
		return;
	}

	// Check for ready events
	if (orig_os_api.poll(&poll_fd, 1, 0) <= 0) {
		return;
	}

	// Verify handler exists in map
	if ((i = m_event_handler_map.find(async_fd)) == m_event_handler_map.end()) {
		return;
	}

	process_ibverbs_event(i);
}

void event_handler_manager::statistics_print(int fd, vlog_levels_t log_level)
{
	if (m_b_continue_running && g_p_fd_collection) {
		g_p_fd_collection->statistics_print(fd, log_level);
	}
}

void event_handler_manager::process_ibverbs_event(event_handler_map_t::iterator &i)
{
	evh_logfunc_entry("");

	//
	// Pre handling
	//
	struct ibv_context *hca = (struct ibv_context*)i->second.ibverbs_ev.channel;
	struct ibv_async_event ibv_event;

	IF_VERBS_FAILURE(ibv_get_async_event(hca, &ibv_event)) {
		vlog_levels_t _level = (errno == EBADF) ? VLOG_DEBUG : VLOG_ERROR; // EBADF may returned during plugout
		vlog_printf(_level, "[%d] Received HCA event but failed to get it (errno=%d %m)\n", hca->async_fd, errno);
		return;
	} ENDIF_VERBS_FAILURE;
	evh_logdbg("[%d] Received ibverbs event %s (%d)", hca->async_fd, priv_ibv_event_desc_str(ibv_event.event_type), ibv_event.event_type);

	//
	// Notify Event to handlers
	//
	for (ibverbs_event_map_t::iterator pos = i->second.ibverbs_ev.ev_map.begin();
	    pos != i->second.ibverbs_ev.ev_map.end(); pos++) {
		pos->second.handler->handle_event_ibverbs_cb(&ibv_event, pos->second.user_data);
	}

	evh_logdbg("[%d] Completed ibverbs event %s (%d)", hca->async_fd, priv_ibv_event_desc_str(ibv_event.event_type), ibv_event.event_type);

	//
	// Post handling
	//
	ibv_ack_async_event(&ibv_event);
}

void event_handler_manager::process_rdma_cm_event(event_handler_map_t::iterator &iter_fd)
{
	// Read the notification event channel
	struct rdma_event_channel* cma_channel = (struct rdma_event_channel*)iter_fd->second.rdma_cm_ev.cma_channel;
	struct rdma_cm_event* p_tmp_cm_event = NULL; 
	struct rdma_cm_event cma_event;

	evh_logfunc_entry("cma_channel %p (fd = %d)", cma_channel, cma_channel->fd);

	BULLSEYE_EXCLUDE_BLOCK_START
	// Get rdma_cm event
	if (rdma_get_cm_event(cma_channel, &p_tmp_cm_event)) {
		evh_logerr("rdma_get_cm_event failed on cma_channel %d (fd = %d) (errno=%d %m)", cma_channel, cma_channel->fd, errno);
		return;
	}
	if (!p_tmp_cm_event) {
		evh_logpanic("rdma_get_cm_event succeeded but the returned event is NULL on cma_channel %d (fd = %d) (errno=%d %m)", cma_channel, cma_channel->fd, errno);
	}
	BULLSEYE_EXCLUDE_BLOCK_END

	// Duplicate rdma_cm event to local memory
	memcpy(&cma_event, p_tmp_cm_event, sizeof(cma_event));

	// Ack  rdma_cm event (free)
	rdma_ack_cm_event(p_tmp_cm_event);
	evh_logdbg("[%d] Received rdma_cm event %s (%d)", cma_channel->fd, priv_rdma_cm_event_type_str(cma_event.event), cma_event.event);

	void* cma_id = (void*)cma_event.id;
	if (cma_event.listen_id)	// we assume that cma_listen_id != NULL in case of connect request
		cma_id = (void*)cma_event.listen_id;	

	
	// Find registered event handler
	if (cma_id != NULL) {
		event_handler_rdma_cm_map_t::iterator iter_id = iter_fd->second.rdma_cm_ev.map_rdma_cm_id.find(cma_id);
		if (iter_id != iter_fd->second.rdma_cm_ev.map_rdma_cm_id.end()) {
			event_handler_rdma_cm* handler = iter_id->second;

			// Call the registered event handler with event to be handled
			if (handler)
				handler->handle_event_rdma_cm_cb(&cma_event);
		}
		else {
			evh_logdbg("Can't find event_handler for ready event_handler_id %d (fd=%d)", cma_id, iter_fd->first);
		}
	}

	evh_logdbg("[%d] Completed rdma_cm event %s (%d)", cma_channel->fd, priv_rdma_cm_event_type_str(cma_event.event), cma_event.event);
}


/*
The main loop actions:
	1) update timeout + handle registers that theire timeout expiered
	2) epoll_wait
	3) handle new registrations/unregistrations
	4) update timeout + handle registers that theire timeout expiered
	5) handle new events
*/

void* event_handler_manager::thread_loop()
{
	int timeout_msec;
	int maxevents = INITIAL_EVENTS_NUM;
	struct pollfd poll_fd;
	struct epoll_event* p_events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*maxevents);	
	BULLSEYE_EXCLUDE_BLOCK_START
	if(!p_events){
		evh_logdbg("malloc failure");
		throw_vma_exception("malloc failure");
	}
	BULLSEYE_EXCLUDE_BLOCK_END
	
	poll_fd.events  = POLLIN | POLLPRI;
	poll_fd.revents = 0;
	while (m_b_continue_running) {
#ifdef VMA_TIME_MEASURE
		if (g_inst_cnt >= m_n_sysvar_vma_time_measure_num_samples)
			finit_instrumentation(safe_mce_sys().vma_time_measure_filename);
#endif

		// update timer and get timeout
		timeout_msec = m_timer.update_timeout();
		if (timeout_msec == 0) {
			// at least one timer has expired!
			m_timer.process_registered_timers();
			continue;
		}

		if( m_b_sysvar_internal_thread_arm_cq_enabled && m_cq_epfd == 0 && g_p_net_device_table_mgr) {
			m_cq_epfd = g_p_net_device_table_mgr->global_ring_epfd_get();
			if( m_cq_epfd > 0 ) {
				epoll_event evt = {0, {0}};
				evt.events = EPOLLIN | EPOLLPRI;
				evt.data.fd = m_cq_epfd;
				orig_os_api.epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_cq_epfd, &evt);
			}
		}

		uint64_t poll_sn = 0;
		if( m_b_sysvar_internal_thread_arm_cq_enabled && m_cq_epfd > 0 && g_p_net_device_table_mgr) {
			g_p_net_device_table_mgr->global_ring_poll_and_process_element(&poll_sn, NULL);
			int ret = g_p_net_device_table_mgr->global_ring_request_notification(poll_sn);
			if (ret > 0) {
				g_p_net_device_table_mgr->global_ring_poll_and_process_element(&poll_sn, NULL);
			}
		}

		// Make sure we sleep for a minimum of X milli seconds
		if (timeout_msec > 0) {
			if ((int)m_n_sysvar_timer_resolution_msec > timeout_msec) {
				timeout_msec = m_n_sysvar_timer_resolution_msec;
			}
		}

		evh_logfuncall("calling orig_os_api.epoll with %d msec timeout", timeout_msec);
		int ret = orig_os_api.epoll_wait(m_epfd, p_events, maxevents, timeout_msec);
		if (ret < 0) {
			evh_logfunc("epoll returned with error, errno=%d %m)", errno);
			continue;
		}
		evh_logfuncall("orig_os_api.epoll found %d ready fds", ret);

		// check pipe
		for (int idx = 0; (idx < ret) && (m_b_continue_running); ++idx) {
			if(m_b_sysvar_internal_thread_arm_cq_enabled && p_events[idx].data.fd == m_cq_epfd && g_p_net_device_table_mgr){
				g_p_net_device_table_mgr->global_ring_wait_for_notification_and_process_element(&poll_sn, NULL);
			}
			else if (is_wakeup_fd(p_events[idx].data.fd)) {
				// a request for registration was sent
				reg_action_t reg_action;
				while (1) {
					m_reg_action_q_lock.lock();
					if (m_reg_action_q.empty()) {
						return_from_sleep();
						remove_wakeup_fd();
						going_to_sleep();
						m_reg_action_q_lock.unlock();
						break;
					}
					reg_action = m_reg_action_q.front();
					m_reg_action_q.pop_front();
					m_reg_action_q_lock.unlock();
					handle_registration_action(reg_action);
				}
				break;
			}
		}

		if ((m_timer.update_timeout() == 0) && (m_b_continue_running)) {
			// at least one timer has expired!
			m_timer.process_registered_timers();
		}


		// process ready event channels
		for (int idx = 0; (idx < ret) && (m_b_continue_running); ++idx) {
			//if (p_events[idx].events & (EPOLLERR|EPOLLHUP))
			//	evh_logdbg("error in fd %d",p_events[idx].data.fd );

			int fd = p_events[idx].data.fd;

			if(m_b_sysvar_internal_thread_arm_cq_enabled && fd == m_cq_epfd) continue;

			evh_logfunc("Processing fd %d", fd);

			if (is_wakeup_fd(fd))	// the pipe was already handled
				continue;

			event_handler_map_t::iterator i = m_event_handler_map.find(fd);
			if (i == m_event_handler_map.end()) {
				// No event handler - this is probably a poll_os event!
				if (!g_p_fd_collection->set_immediate_os_sample(fd)) {
					evh_logdbg("No event handler (fd=%d)", fd);
				}
				continue;
			}

			switch (i->second.type) {
			case EV_RDMA_CM:
				int result;
				poll_fd.fd = fd;
				result = orig_os_api.poll(&poll_fd, 1, 0);
				if (result == 0) {
					evh_logdbg("error in fd %d", fd);
					break;
				}
				process_rdma_cm_event(i);
				break;
			case EV_IBVERBS:
				process_ibverbs_event(i);
				break;
			case EV_COMMAND:
				i->second.command_ev.cmd->execute();
				break;
			BULLSEYE_EXCLUDE_BLOCK_START
			default:
				evh_logerr("Unknow event on fd=%d", fd);
			BULLSEYE_EXCLUDE_BLOCK_END
			}
		} // for idx

		if (ret == maxevents) {
			struct epoll_event* p_events_new;
			// increase the events array
			maxevents *= 2;
			p_events_new = ( struct epoll_event*)realloc( (void *)p_events, sizeof(struct epoll_event) * maxevents);
			BULLSEYE_EXCLUDE_BLOCK_START
			if( !p_events_new) {
				evh_logpanic("realloc failure") ;
			}
			p_events = p_events_new;
			BULLSEYE_EXCLUDE_BLOCK_END
		}

	} // while (m_b_continue_running)

	free(p_events);

	return 0;
}