/* * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include "epoll_wait_call.h" #include #include #include #include #include #include "epfd_info.h" #define MODULE_NAME "epoll_wait_call:" epoll_wait_call::epoll_wait_call(epoll_event *extra_events_buffer, offloaded_mode_t *off_modes_buffer, int epfd, epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask /* = NULL */) : io_mux_call(NULL, off_modes_buffer, 0, sigmask), // TODO: rethink on these arguments m_epfd(epfd), m_events(events), m_maxevents(maxevents), m_timeout(timeout), m_p_ready_events(extra_events_buffer) { // get epfd_info m_epfd_info = fd_collection_get_epfd(epfd); if (!m_epfd_info || maxevents <= 0) { __log_dbg("error, epfd %d not found or maxevents <= 0 (=%d)", epfd, maxevents); errno = maxevents <= 0 ? EINVAL : EBADF; vma_throw_object(io_mux_call::io_error); } // create stats m_p_stats = &m_epfd_info->stats()->stats; } void epoll_wait_call::init_offloaded_fds() { // copy offloaded_fds pointer and count m_epfd_info->get_offloaded_fds_arr_and_size(&m_p_num_all_offloaded_fds, &m_p_all_offloaded_fds); m_num_all_offloaded_fds = *m_p_num_all_offloaded_fds; // TODO: fix orig ugly code, and then remove this __log_func("building: epfd=%d, m_epfd_info->get_fd_offloaded_size()=%zu, m_epfd_info->get_fd_non_offloaded_size()=%zu, *m_p_num_all_offloaded_fds=%d", m_epfd, m_epfd_info->get_fd_offloaded_size(), m_epfd_info->get_fd_non_offloaded_size(), *m_p_num_all_offloaded_fds); } int epoll_wait_call::get_current_events() { if (m_epfd_info->m_ready_fds.empty()) { return m_n_all_ready_fds; } vma_list_t socket_fd_list; lock(); int i, ready_rfds = 0, ready_wfds = 0; i = m_n_all_ready_fds; socket_fd_api *p_socket_object; ep_ready_fd_list_t::iterator iter = m_epfd_info->m_ready_fds.begin(); while (iter != m_epfd_info->m_ready_fds.end() && i < m_maxevents) { p_socket_object = *iter; ++iter; m_events[i].events = 0; //initialize bool got_event = false; //epoll_wait will always wait for EPOLLERR and EPOLLHUP; it is not necessary to set it in events. uint32_t mutual_events = p_socket_object->m_epoll_event_flags & (p_socket_object->m_fd_rec.events | EPOLLERR | EPOLLHUP); //EPOLLHUP & EPOLLOUT are mutually exclusive. see poll man pages. epoll adapt poll behavior. if ((mutual_events & EPOLLHUP) && (mutual_events & EPOLLOUT)) { mutual_events &= ~EPOLLOUT; } if (mutual_events & EPOLLIN) { if (handle_epoll_event(p_socket_object->is_readable(NULL), EPOLLIN, p_socket_object, i)) { ready_rfds++; got_event = true; } mutual_events &= ~EPOLLIN; } if (mutual_events & EPOLLOUT) { if (handle_epoll_event(p_socket_object->is_writeable(), EPOLLOUT, p_socket_object, i)) { ready_wfds++; got_event = true; } mutual_events &= ~EPOLLOUT; } if (mutual_events) { if (handle_epoll_event(true, mutual_events, p_socket_object, i)) { got_event = true; } } if (got_event) { socket_fd_list.push_back(p_socket_object); ++i; } } m_n_ready_rfds += ready_rfds; m_n_ready_wfds += ready_wfds; m_p_stats->n_iomux_rx_ready += ready_rfds; unlock(); /* * for checking ring migration we need a socket context. * in epoll we separate the rings from the sockets, so only here we access the sockets. * therefore, it is most convenient to check it here. * we need to move the ring migration to the epfd, going over the registered sockets, * when polling the rings was not fruitful. * this will be more similar to the behavior of select/poll. * see RM task 212058 */ while (!socket_fd_list.empty()) { socket_fd_api* sockfd = socket_fd_list.get_and_pop_front(); sockfd->consider_rings_migration(); } return (i); } epoll_wait_call::~epoll_wait_call() { } void epoll_wait_call::prepare_to_block() { // Empty } bool epoll_wait_call::_wait(int timeout) { int i, ready_fds, fd; bool cq_ready = false; epoll_fd_rec* fd_rec; __log_func("calling os epoll: %d", m_epfd); if (timeout) { lock(); if (m_epfd_info->m_ready_fds.empty()) { m_epfd_info->going_to_sleep(); } else { timeout = 0; } unlock(); } if (m_sigmask) { ready_fds = orig_os_api.epoll_pwait(m_epfd, m_p_ready_events, m_maxevents, timeout, m_sigmask); } else { ready_fds = orig_os_api.epoll_wait(m_epfd, m_p_ready_events, m_maxevents, timeout); } if (timeout) { lock(); m_epfd_info->return_from_sleep(); unlock(); } if (ready_fds < 0) { vma_throw_object(io_mux_call::io_error); } // convert the returned events to user events and mark offloaded fds m_n_all_ready_fds = 0; for (i = 0; i < ready_fds; ++i) { fd = m_p_ready_events[i].data.fd; // wakeup event if(m_epfd_info->is_wakeup_fd(fd)) { lock(); m_epfd_info->remove_wakeup_fd(); unlock(); continue; } // If it's CQ if (m_epfd_info->is_cq_fd(m_p_ready_events[i].data.u64)) { cq_ready = true; continue; } if (m_p_ready_events[i].events & EPOLLIN) { socket_fd_api* temp_sock_fd_api = fd_collection_get_sockfd(fd); if (temp_sock_fd_api) { // Instructing the socket to sample the OS immediately to prevent hitting EAGAIN on recvfrom(), // after iomux returned a shadow fd as ready (only for non-blocking sockets) temp_sock_fd_api->set_immediate_os_sample(); } } // Copy event bits and data m_events[m_n_all_ready_fds].events = m_p_ready_events[i].events; fd_rec = m_epfd_info->get_fd_rec(fd); if (fd_rec) { m_events[m_n_all_ready_fds].data = fd_rec->epdata; ++m_n_all_ready_fds; } else { __log_dbg("error - could not found fd %d in m_fd_info of epfd %d", fd, m_epfd); } } return cq_ready; } bool epoll_wait_call::wait_os(bool zero_timeout) { return _wait(zero_timeout ? 0 : m_timeout); } bool epoll_wait_call::wait(const timeval &elapsed) { int timeout; if (m_timeout < 0) { timeout = m_timeout; } else { timeout = m_timeout - tv_to_msec(&elapsed); if (timeout < 0) { // Already reached timeout return false; } } return _wait(timeout); } bool epoll_wait_call::is_timeout(const timeval &elapsed) { return m_timeout >= 0 && m_timeout <= tv_to_msec(&elapsed); } void epoll_wait_call::set_offloaded_rfd_ready(int fd_index) { // Empty - event inserted via event callback NOT_IN_USE(fd_index); } void epoll_wait_call::set_offloaded_wfd_ready(int fd_index) { // Empty NOT_IN_USE(fd_index); } void epoll_wait_call::set_rfd_ready(int fd) { // Empty NOT_IN_USE(fd); } void epoll_wait_call::set_wfd_ready(int fd) { // Empty NOT_IN_USE(fd); } void epoll_wait_call::set_efd_ready(int fd, int errors) { // Empty NOT_IN_USE(fd); NOT_IN_USE(errors); } void epoll_wait_call::lock() { m_epfd_info->lock(); } void epoll_wait_call::unlock() { m_epfd_info->unlock(); } bool epoll_wait_call::check_all_offloaded_sockets() { // check cq for acks ring_poll_and_process_element(); m_n_all_ready_fds = get_current_events(); __log_func("m_n_all_ready_fds=%d, m_n_ready_rfds=%d, m_n_ready_wfds=%d", m_n_all_ready_fds, m_n_ready_rfds, m_n_ready_wfds); return m_n_all_ready_fds; } bool epoll_wait_call::immidiate_return(int &poll_os_countdown) { NOT_IN_USE(poll_os_countdown); return false; } bool epoll_wait_call::handle_epoll_event(bool is_ready, uint32_t events, socket_fd_api *socket_object, int index) { if (is_ready) { epoll_fd_rec& fd_rec = socket_object->m_fd_rec; m_events[index].data = fd_rec.epdata; m_events[index].events |= events; if (fd_rec.events & EPOLLONESHOT) { // Clear events for this fd fd_rec.events &= ~events; } if (fd_rec.events & EPOLLET) { m_epfd_info->remove_epoll_event(socket_object, events); } return true; } else { // not readable, need to erase from our ready list (LT support) m_epfd_info->remove_epoll_event(socket_object, events); return false; } } bool epoll_wait_call::handle_os_countdown(int &poll_os_countdown) { NOT_IN_USE(poll_os_countdown); if (!m_epfd_info->get_os_data_available() || !m_epfd_info->get_and_unset_os_data_available()) { return false; } /* * Poll OS when the internal thread found non offloaded data. */ bool cq_ready = wait_os(true); m_epfd_info->register_to_internal_thread(); if (cq_ready) { // This will empty the cqepfd // (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll)) ring_wait_for_notification_and_process_element(NULL); } /* Before we exit with ready OS fd's we'll check the CQs once more and exit * below after calling check_all_offloaded_sockets(); * IMPORTANT : We cannot do an opposite with current code, * means we cannot poll cq and then poll os (for epoll) - because poll os * will delete ready offloaded fds. */ if (m_n_all_ready_fds) { m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter check_all_offloaded_sockets(); return true; } return false; } int epoll_wait_call::ring_poll_and_process_element() { return m_epfd_info->ring_poll_and_process_element(&m_poll_sn, NULL); } int epoll_wait_call::ring_request_notification() { return m_epfd_info->ring_request_notification(m_poll_sn); } int epoll_wait_call::ring_wait_for_notification_and_process_element(void* pv_fd_ready_array) { return m_epfd_info->ring_wait_for_notification_and_process_element(&m_poll_sn, pv_fd_ready_array); }