/*
* Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "epoll_wait_call.h"
#include <vlogger/vlogger.h>
#include <vma/util/vtypes.h>
#include <vma/sock/sock-redirect.h>
#include <vma/sock/socket_fd_api.h>
#include <vma/sock/fd_collection.h>
#include "epfd_info.h"
#define MODULE_NAME "epoll_wait_call:"
epoll_wait_call::epoll_wait_call(epoll_event *extra_events_buffer, offloaded_mode_t *off_modes_buffer,
int epfd, epoll_event *events, int maxevents,
int timeout, const sigset_t *sigmask /* = NULL */) :
io_mux_call(NULL, off_modes_buffer, 0, sigmask), // TODO: rethink on these arguments
m_epfd(epfd), m_events(events), m_maxevents(maxevents), m_timeout(timeout),
m_p_ready_events(extra_events_buffer)
{
// get epfd_info
m_epfd_info = fd_collection_get_epfd(epfd);
if (!m_epfd_info || maxevents <= 0) {
__log_dbg("error, epfd %d not found or maxevents <= 0 (=%d)", epfd, maxevents);
errno = maxevents <= 0 ? EINVAL : EBADF;
vma_throw_object(io_mux_call::io_error);
}
// create stats
m_p_stats = &m_epfd_info->stats()->stats;
}
void epoll_wait_call::init_offloaded_fds()
{
// copy offloaded_fds pointer and count
m_epfd_info->get_offloaded_fds_arr_and_size(&m_p_num_all_offloaded_fds, &m_p_all_offloaded_fds);
m_num_all_offloaded_fds = *m_p_num_all_offloaded_fds; // TODO: fix orig ugly code, and then remove this
__log_func("building: epfd=%d, m_epfd_info->get_fd_offloaded_size()=%zu, m_epfd_info->get_fd_non_offloaded_size()=%zu, *m_p_num_all_offloaded_fds=%d",
m_epfd, m_epfd_info->get_fd_offloaded_size(), m_epfd_info->get_fd_non_offloaded_size(), *m_p_num_all_offloaded_fds);
}
int epoll_wait_call::get_current_events()
{
if (m_epfd_info->m_ready_fds.empty()) {
return m_n_all_ready_fds;
}
vma_list_t<socket_fd_api, socket_fd_api::socket_fd_list_node_offset> socket_fd_list;
lock();
int i, ready_rfds = 0, ready_wfds = 0;
i = m_n_all_ready_fds;
socket_fd_api *p_socket_object;
ep_ready_fd_list_t::iterator iter = m_epfd_info->m_ready_fds.begin();
while (iter != m_epfd_info->m_ready_fds.end() && i < m_maxevents) {
p_socket_object = *iter;
++iter;
m_events[i].events = 0; //initialize
bool got_event = false;
//epoll_wait will always wait for EPOLLERR and EPOLLHUP; it is not necessary to set it in events.
uint32_t mutual_events = p_socket_object->m_epoll_event_flags & (p_socket_object->m_fd_rec.events | EPOLLERR | EPOLLHUP);
//EPOLLHUP & EPOLLOUT are mutually exclusive. see poll man pages. epoll adapt poll behavior.
if ((mutual_events & EPOLLHUP) && (mutual_events & EPOLLOUT)) {
mutual_events &= ~EPOLLOUT;
}
if (mutual_events & EPOLLIN) {
if (handle_epoll_event(p_socket_object->is_readable(NULL), EPOLLIN, p_socket_object, i)) {
ready_rfds++;
got_event = true;
}
mutual_events &= ~EPOLLIN;
}
if (mutual_events & EPOLLOUT) {
if (handle_epoll_event(p_socket_object->is_writeable(), EPOLLOUT, p_socket_object, i)) {
ready_wfds++;
got_event = true;
}
mutual_events &= ~EPOLLOUT;
}
if (mutual_events) {
if (handle_epoll_event(true, mutual_events, p_socket_object, i)) {
got_event = true;
}
}
if (got_event) {
socket_fd_list.push_back(p_socket_object);
++i;
}
}
m_n_ready_rfds += ready_rfds;
m_n_ready_wfds += ready_wfds;
m_p_stats->n_iomux_rx_ready += ready_rfds;
unlock();
/*
* for checking ring migration we need a socket context.
* in epoll we separate the rings from the sockets, so only here we access the sockets.
* therefore, it is most convenient to check it here.
* we need to move the ring migration to the epfd, going over the registered sockets,
* when polling the rings was not fruitful.
* this will be more similar to the behavior of select/poll.
* see RM task 212058
*/
while (!socket_fd_list.empty()) {
socket_fd_api* sockfd = socket_fd_list.get_and_pop_front();
sockfd->consider_rings_migration();
}
return (i);
}
epoll_wait_call::~epoll_wait_call()
{
}
void epoll_wait_call::prepare_to_block()
{
// Empty
}
bool epoll_wait_call::_wait(int timeout)
{
int i, ready_fds, fd;
bool cq_ready = false;
epoll_fd_rec* fd_rec;
__log_func("calling os epoll: %d", m_epfd);
if (timeout) {
lock();
if (m_epfd_info->m_ready_fds.empty()) {
m_epfd_info->going_to_sleep();
} else {
timeout = 0;
}
unlock();
}
if (m_sigmask) {
ready_fds = orig_os_api.epoll_pwait(m_epfd, m_p_ready_events, m_maxevents, timeout, m_sigmask);
} else {
ready_fds = orig_os_api.epoll_wait(m_epfd, m_p_ready_events, m_maxevents, timeout);
}
if (timeout) {
lock();
m_epfd_info->return_from_sleep();
unlock();
}
if (ready_fds < 0) {
vma_throw_object(io_mux_call::io_error);
}
// convert the returned events to user events and mark offloaded fds
m_n_all_ready_fds = 0;
for (i = 0; i < ready_fds; ++i) {
fd = m_p_ready_events[i].data.fd;
// wakeup event
if(m_epfd_info->is_wakeup_fd(fd))
{
lock();
m_epfd_info->remove_wakeup_fd();
unlock();
continue;
}
// If it's CQ
if (m_epfd_info->is_cq_fd(m_p_ready_events[i].data.u64)) {
cq_ready = true;
continue;
}
if (m_p_ready_events[i].events & EPOLLIN) {
socket_fd_api* temp_sock_fd_api = fd_collection_get_sockfd(fd);
if (temp_sock_fd_api) {
// Instructing the socket to sample the OS immediately to prevent hitting EAGAIN on recvfrom(),
// after iomux returned a shadow fd as ready (only for non-blocking sockets)
temp_sock_fd_api->set_immediate_os_sample();
}
}
// Copy event bits and data
m_events[m_n_all_ready_fds].events = m_p_ready_events[i].events;
fd_rec = m_epfd_info->get_fd_rec(fd);
if (fd_rec) {
m_events[m_n_all_ready_fds].data = fd_rec->epdata;
++m_n_all_ready_fds;
} else {
__log_dbg("error - could not found fd %d in m_fd_info of epfd %d", fd, m_epfd);
}
}
return cq_ready;
}
bool epoll_wait_call::wait_os(bool zero_timeout)
{
return _wait(zero_timeout ? 0 : m_timeout);
}
bool epoll_wait_call::wait(const timeval &elapsed)
{
int timeout;
if (m_timeout < 0) {
timeout = m_timeout;
} else {
timeout = m_timeout - tv_to_msec(&elapsed);
if (timeout < 0) {
// Already reached timeout
return false;
}
}
return _wait(timeout);
}
bool epoll_wait_call::is_timeout(const timeval &elapsed)
{
return m_timeout >= 0 && m_timeout <= tv_to_msec(&elapsed);
}
void epoll_wait_call::set_offloaded_rfd_ready(int fd_index)
{
// Empty - event inserted via event callback
NOT_IN_USE(fd_index);
}
void epoll_wait_call::set_offloaded_wfd_ready(int fd_index)
{
// Empty
NOT_IN_USE(fd_index);
}
void epoll_wait_call::set_rfd_ready(int fd)
{
// Empty
NOT_IN_USE(fd);
}
void epoll_wait_call::set_wfd_ready(int fd)
{
// Empty
NOT_IN_USE(fd);
}
void epoll_wait_call::set_efd_ready(int fd, int errors)
{
// Empty
NOT_IN_USE(fd);
NOT_IN_USE(errors);
}
void epoll_wait_call::lock()
{
m_epfd_info->lock();
}
void epoll_wait_call::unlock()
{
m_epfd_info->unlock();
}
bool epoll_wait_call::check_all_offloaded_sockets()
{
// check cq for acks
ring_poll_and_process_element();
m_n_all_ready_fds = get_current_events();
__log_func("m_n_all_ready_fds=%d, m_n_ready_rfds=%d, m_n_ready_wfds=%d", m_n_all_ready_fds, m_n_ready_rfds, m_n_ready_wfds);
return m_n_all_ready_fds;
}
bool epoll_wait_call::immidiate_return(int &poll_os_countdown)
{
NOT_IN_USE(poll_os_countdown);
return false;
}
bool epoll_wait_call::handle_epoll_event(bool is_ready, uint32_t events, socket_fd_api *socket_object, int index)
{
if (is_ready) {
epoll_fd_rec& fd_rec = socket_object->m_fd_rec;
m_events[index].data = fd_rec.epdata;
m_events[index].events |= events;
if (fd_rec.events & EPOLLONESHOT) {
// Clear events for this fd
fd_rec.events &= ~events;
}
if (fd_rec.events & EPOLLET) {
m_epfd_info->remove_epoll_event(socket_object, events);
}
return true;
}
else {
// not readable, need to erase from our ready list (LT support)
m_epfd_info->remove_epoll_event(socket_object, events);
return false;
}
}
bool epoll_wait_call::handle_os_countdown(int &poll_os_countdown)
{
NOT_IN_USE(poll_os_countdown);
if (!m_epfd_info->get_os_data_available() || !m_epfd_info->get_and_unset_os_data_available()) {
return false;
}
/*
* Poll OS when the internal thread found non offloaded data.
*/
bool cq_ready = wait_os(true);
m_epfd_info->register_to_internal_thread();
if (cq_ready) {
// This will empty the cqepfd
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
ring_wait_for_notification_and_process_element(NULL);
}
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
* below after calling check_all_offloaded_sockets();
* IMPORTANT : We cannot do an opposite with current code,
* means we cannot poll cq and then poll os (for epoll) - because poll os
* will delete ready offloaded fds.
*/
if (m_n_all_ready_fds) {
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
check_all_offloaded_sockets();
return true;
}
return false;
}
int epoll_wait_call::ring_poll_and_process_element()
{
return m_epfd_info->ring_poll_and_process_element(&m_poll_sn, NULL);
}
int epoll_wait_call::ring_request_notification()
{
return m_epfd_info->ring_request_notification(m_poll_sn);
}
int epoll_wait_call::ring_wait_for_notification_and_process_element(void* pv_fd_ready_array)
{
return m_epfd_info->ring_wait_for_notification_and_process_element(&m_poll_sn, pv_fd_ready_array);
}