/*
* Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "cq_mgr.h"
#include "cq_mgr.inl"
#include <errno.h>
#include <fcntl.h>
#include <math.h>
#include <netinet/ip.h>
#include "utils/bullseye.h"
#include <vma/util/vtypes.h>
#include <vma/util/valgrind.h>
#include "vma/util/instrumentation.h"
#include <vma/sock/sock-redirect.h>
#include "vma/ib/base/verbs_extra.h"
#include "buffer_pool.h"
#include "qp_mgr.h"
#include "ring_simple.h"
#define MODULE_NAME "cqm"
#define cq_logpanic __log_info_panic
#define cq_logerr __log_info_err
#define cq_logwarn __log_info_warn
#define cq_loginfo __log_info_info
#define cq_logdbg __log_info_dbg
#define cq_logfunc __log_info_func
#define cq_logfuncall __log_info_funcall
#define cq_logdbg_no_funcname(log_fmt, log_args...) do { if (g_vlogger_level >= VLOG_DEBUG) vlog_printf(VLOG_DEBUG, MODULE_NAME "[%p]:%d: " log_fmt "\n", __INFO__, __LINE__, ##log_args); } while (0)
#if VLIST_DEBUG
#define VLIST_DEBUG_CQ_MGR_PRINT_ERROR_IS_MEMBER do { \
if (buff->buffer_node.is_list_member()) \
cq_logwarn("Buffer is already a member in a list! id=[%s]", buff->buffer_node.list_id()); \
} while (0)
#else
#define VLIST_DEBUG_CQ_MGR_PRINT_ERROR_IS_MEMBER
#endif
atomic_t cq_mgr::m_n_cq_id_counter = ATOMIC_INIT(1);
uint64_t cq_mgr::m_n_global_sn = 0;
cq_mgr::cq_mgr(ring_simple* p_ring, ib_ctx_handler* p_ib_ctx_handler, int cq_size, struct ibv_comp_channel* p_comp_event_channel, bool is_rx, bool config) :
m_p_ibv_cq(NULL)
,m_b_is_rx(is_rx)
,m_cq_id(0)
,m_n_cq_poll_sn(0)
,m_p_ring(p_ring)
,m_n_wce_counter(0)
,m_b_was_drained(false)
,m_b_is_rx_hw_csum_on(false)
,m_n_sysvar_cq_poll_batch_max(safe_mce_sys().cq_poll_batch_max)
,m_n_sysvar_progress_engine_wce_max(safe_mce_sys().progress_engine_wce_max)
,m_p_cq_stat(&m_cq_stat_static) // use local copy of stats by default (on rx cq get shared memory stats)
,m_transport_type(m_p_ring->get_transport_type())
,m_p_next_rx_desc_poll(NULL)
,m_n_sysvar_rx_prefetch_bytes_before_poll(safe_mce_sys().rx_prefetch_bytes_before_poll)
,m_n_sysvar_rx_prefetch_bytes(safe_mce_sys().rx_prefetch_bytes)
,m_sz_transport_header(0)
,m_p_ib_ctx_handler(p_ib_ctx_handler)
,m_n_sysvar_rx_num_wr_to_post_recv(safe_mce_sys().rx_num_wr_to_post_recv)
,m_comp_event_channel(p_comp_event_channel)
,m_b_notification_armed(false)
,m_n_sysvar_qp_compensation_level(safe_mce_sys().qp_compensation_level)
,m_rx_lkey(g_buffer_pool_rx->find_lkey_by_ib_ctx_thread_safe(m_p_ib_ctx_handler))
,m_b_sysvar_cq_keep_qp_full(safe_mce_sys().cq_keep_qp_full)
,m_n_out_of_free_bufs_warning(0)
,m_rx_buffs_rdy_for_free_head(NULL)
,m_rx_buffs_rdy_for_free_tail(NULL)
{
BULLSEYE_EXCLUDE_BLOCK_START
if (m_rx_lkey == 0) {
__log_info_panic("invalid lkey found %lu", m_rx_lkey);
}
BULLSEYE_EXCLUDE_BLOCK_END
memset(&m_cq_stat_static, 0, sizeof(m_cq_stat_static));
memset(&m_qp_rec, 0, sizeof(m_qp_rec));
m_rx_queue.set_id("cq_mgr (%p) : m_rx_queue", this);
m_rx_pool.set_id("cq_mgr (%p) : m_rx_pool", this);
m_cq_id = atomic_fetch_and_inc(&m_n_cq_id_counter); // cq id is nonzero
if (config)
configure(cq_size);
}
void cq_mgr::configure(int cq_size)
{
vma_ibv_cq_init_attr attr;
memset(&attr, 0, sizeof(attr));
prep_ibv_cq(attr);
m_p_ibv_cq = vma_ibv_create_cq(m_p_ib_ctx_handler->get_ibv_context(),
cq_size - 1, (void *)this, m_comp_event_channel, 0, &attr);
BULLSEYE_EXCLUDE_BLOCK_START
if (!m_p_ibv_cq) {
throw_vma_exception("ibv_create_cq failed");
}
BULLSEYE_EXCLUDE_BLOCK_END
VALGRIND_MAKE_MEM_DEFINED(m_p_ibv_cq, sizeof(ibv_cq));
switch (m_transport_type) {
case VMA_TRANSPORT_IB:
m_sz_transport_header = GRH_HDR_LEN;
break;
case VMA_TRANSPORT_ETH:
m_sz_transport_header = ETH_HDR_LEN;
break;
BULLSEYE_EXCLUDE_BLOCK_START
default:
cq_logpanic("Unknown transport type: %d", m_transport_type);
break;
BULLSEYE_EXCLUDE_BLOCK_END
}
if (m_b_is_rx) {
vma_stats_instance_create_cq_block(m_p_cq_stat);
}
if (m_b_is_rx) {
m_b_is_rx_hw_csum_on = vma_is_rx_hw_csum_supported(m_p_ib_ctx_handler->get_ibv_device_attr());
cq_logdbg("RX CSUM support = %d", m_b_is_rx_hw_csum_on);
}
cq_logdbg("Created CQ as %s with fd[%d] and of size %d elements (ibv_cq_hndl=%p)", (m_b_is_rx?"Rx":"Tx"), get_channel_fd(), cq_size, m_p_ibv_cq);
}
void cq_mgr::prep_ibv_cq(vma_ibv_cq_init_attr& attr) const
{
if (m_p_ib_ctx_handler->get_ctx_time_converter_status()) {
vma_ibv_cq_init_ts_attr(&attr);
}
}
uint32_t cq_mgr::clean_cq()
{
uint32_t ret_total = 0;
int ret = 0;
uint64_t cq_poll_sn = 0;
mem_buf_desc_t* buff = NULL;
/* coverity[stack_use_local_overflow] */
vma_ibv_wc wce[MCE_MAX_CQ_POLL_BATCH];
while ((ret = poll(wce, MCE_MAX_CQ_POLL_BATCH, &cq_poll_sn)) > 0) {
for (int i = 0; i < ret; i++) {
if (m_b_is_rx) {
buff = process_cq_element_rx(&wce[i]);
} else {
buff = process_cq_element_tx(&wce[i]);
}
if (buff)
m_rx_queue.push_back(buff);
}
ret_total += ret;
}
return ret_total;
}
cq_mgr::~cq_mgr()
{
cq_logfunc("");
cq_logdbg("destroying CQ as %s", (m_b_is_rx?"Rx":"Tx"));
if (m_rx_buffs_rdy_for_free_head) {
reclaim_recv_buffers(m_rx_buffs_rdy_for_free_head);
}
m_b_was_drained = true;
if (m_rx_queue.size() + m_rx_pool.size()) {
cq_logdbg("Returning %d buffers to global Rx pool (ready queue %d, free pool %d))", m_rx_queue.size() + m_rx_pool.size(), m_rx_queue.size(), m_rx_pool.size());
g_buffer_pool_rx->put_buffers_thread_safe(&m_rx_queue, m_rx_queue.size());
m_p_cq_stat->n_rx_sw_queue_len = m_rx_queue.size();
g_buffer_pool_rx->put_buffers_thread_safe(&m_rx_pool, m_rx_pool.size());
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
}
cq_logfunc("destroying ibv_cq");
IF_VERBS_FAILURE_EX(ibv_destroy_cq(m_p_ibv_cq), EIO) {
cq_logdbg("destroy cq failed (errno=%d %m)", errno);
} ENDIF_VERBS_FAILURE;
VALGRIND_MAKE_MEM_UNDEFINED(m_p_ibv_cq, sizeof(ibv_cq));
statistics_print();
if (m_b_is_rx)
vma_stats_instance_remove_cq_block(m_p_cq_stat);
cq_logdbg("done");
}
void cq_mgr::statistics_print()
{
if (m_p_cq_stat->n_rx_pkt_drop || m_p_cq_stat->n_rx_sw_queue_len ||
m_p_cq_stat->n_rx_drained_at_once_max || m_p_cq_stat->n_buffer_pool_len) {
cq_logdbg_no_funcname("Packets dropped: %12llu", m_p_cq_stat->n_rx_pkt_drop);
cq_logdbg_no_funcname("Drained max: %17u", m_p_cq_stat->n_rx_drained_at_once_max);
}
}
ibv_cq* cq_mgr::get_ibv_cq_hndl()
{
return m_p_ibv_cq;
}
int cq_mgr::get_channel_fd()
{
return m_comp_event_channel->fd;
}
void cq_mgr::add_qp_rx(qp_mgr* qp)
{
cq_logdbg("qp_mgr=%p", qp);
descq_t temp_desc_list;
temp_desc_list.set_id("cq_mgr (%p) : temp_desc_list", this);
m_p_cq_stat->n_rx_drained_at_once_max = 0;
/* return_extra_buffers(); */ //todo??
// Initial fill of receiver work requests
uint32_t qp_rx_wr_num = qp->get_rx_max_wr_num();
cq_logdbg("Trying to push %d WRE to allocated qp (%p)", qp_rx_wr_num, qp);
while (qp_rx_wr_num) {
uint32_t n_num_mem_bufs = m_n_sysvar_rx_num_wr_to_post_recv;
if (n_num_mem_bufs > qp_rx_wr_num)
n_num_mem_bufs = qp_rx_wr_num;
bool res = g_buffer_pool_rx->get_buffers_thread_safe(temp_desc_list, m_p_ring, n_num_mem_bufs, m_rx_lkey);
if (!res) {
VLOG_PRINTF_INFO_ONCE_THEN_ALWAYS(VLOG_WARNING, VLOG_DEBUG, "WARNING Out of mem_buf_desc from Rx buffer pool for qp_mgr qp_mgr initialization (qp=%p),\n"
"\tThis might happen due to wrong setting of VMA_RX_BUFS and VMA_RX_WRE. Please refer to README.txt for more info", qp);
break;
}
qp->post_recv_buffers(&temp_desc_list, temp_desc_list.size());
if (!temp_desc_list.empty()) {
cq_logdbg("qp post recv is already full (push=%d, planned=%d)", qp->get_rx_max_wr_num()-qp_rx_wr_num, qp->get_rx_max_wr_num());
g_buffer_pool_rx->put_buffers_thread_safe(&temp_desc_list, temp_desc_list.size());
break;
}
qp_rx_wr_num -= n_num_mem_bufs;
}
cq_logdbg("Successfully post_recv qp with %d new Rx buffers (planned=%d)", qp->get_rx_max_wr_num()-qp_rx_wr_num, qp->get_rx_max_wr_num());
// Add qp_mgr to map
m_qp_rec.qp = qp;
m_qp_rec.debt = 0;
}
void cq_mgr::del_qp_rx(qp_mgr *qp)
{
BULLSEYE_EXCLUDE_BLOCK_START
if (m_qp_rec.qp != qp) {
cq_logdbg("wrong qp_mgr=%p != m_qp_rec.qp=%p", qp, m_qp_rec.qp);
return;
}
BULLSEYE_EXCLUDE_BLOCK_END
cq_logdbg("qp_mgr=%p", m_qp_rec.qp);
return_extra_buffers();
clean_cq();
memset(&m_qp_rec, 0, sizeof(m_qp_rec));
}
void cq_mgr::add_qp_tx(qp_mgr* qp)
{
//Assume locked!
cq_logdbg("qp_mgr=%p", qp);
m_qp_rec.qp = qp;
m_qp_rec.debt = 0;
}
bool cq_mgr::request_more_buffers()
{
cq_logfuncall("Allocating additional %d buffers for internal use", m_n_sysvar_qp_compensation_level);
// Assume locked!
// Add an additional free buffer descs to RX cq mgr
bool res = g_buffer_pool_rx->get_buffers_thread_safe(m_rx_pool, m_p_ring, m_n_sysvar_qp_compensation_level, m_rx_lkey);
if (!res) {
cq_logfunc("Out of mem_buf_desc from RX free pool for internal object pool");
return false;
};
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
return true;
}
void cq_mgr::return_extra_buffers()
{
if (m_rx_pool.size() < m_n_sysvar_qp_compensation_level * 2)
return;
int buff_to_rel = m_rx_pool.size() - m_n_sysvar_qp_compensation_level;
cq_logfunc("releasing %d buffers to global rx pool", buff_to_rel);
g_buffer_pool_rx->put_buffers_thread_safe(&m_rx_pool, buff_to_rel);
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
}
int cq_mgr::poll(vma_ibv_wc* p_wce, int num_entries, uint64_t* p_cq_poll_sn)
{
// Assume locked!!!
cq_logfuncall("");
#ifdef RDTSC_MEASURE_RX_VERBS_READY_POLL
RDTSC_TAKE_START(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VERBS_READY_POLL]);
#endif //RDTSC_MEASURE_RX_VERBS_READY_POLL
#ifdef RDTSC_MEASURE_RX_VERBS_IDLE_POLL
RDTSC_TAKE_START(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VERBS_IDLE_POLL]);
#endif //RDTSC_MEASURE_RX_VERBS_IDLE_POLL
#ifdef RDTSC_MEASURE_RX_VMA_TCP_IDLE_POLL
RDTSC_TAKE_END(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VMA_TCP_IDLE_POLL]);
#endif //RDTSC_MEASURE_RX_VMA_TCP_IDLE_POLL
int ret = vma_ibv_poll_cq(m_p_ibv_cq, num_entries, p_wce);
if (ret <= 0) {
#ifdef RDTSC_MEASURE_RX_VERBS_IDLE_POLL
RDTSC_TAKE_END(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VERBS_IDLE_POLL]);
#endif
#ifdef RDTSC_MEASURE_RX_VMA_TCP_IDLE_POLL
RDTSC_TAKE_START(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VMA_TCP_IDLE_POLL]);
#endif
// Zero polled wce OR ibv_poll_cq() has driver specific errors
// so we can't really do anything with them
#ifdef RDTSC_MEASURE_RX_CQE_RECEIVEFROM
RDTSC_TAKE_START(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_CQE_TO_RECEIVEFROM]);
#endif
*p_cq_poll_sn = m_n_global_sn;
#ifdef VMA_TIME_MEASURE
INC_ERR_POLL_COUNT;
#endif
return 0;
}
else {
#ifdef RDTSC_MEASURE_RX_VERBS_READY_POLL
RDTSC_TAKE_END(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_VERBS_READY_POLL]);
#endif //RDTSC_MEASURE_RX_VERBS_READY_POLL
#ifdef RDTSC_MEASURE_RX_READY_POLL_TO_LWIP
RDTSC_TAKE_START(g_rdtsc_instr_info_arr[RDTSC_FLOW_RX_READY_POLL_TO_LWIP]);
#endif
}
#ifdef VMA_TIME_MEASURE
TAKE_POLL_CQ_IN;
#endif
if (unlikely(g_vlogger_level >= VLOG_FUNC_ALL)) {
for (int i = 0; i < ret; i++) {
cq_logfuncall("wce[%d] info wr_id=%x, status=%x, opcode=%x, vendor_err=%x, byte_len=%d, imm_data=%x", i, p_wce[i].wr_id, p_wce[i].status, vma_wc_opcode(p_wce[i]), p_wce[i].vendor_err, p_wce[i].byte_len, p_wce[i].imm_data);
cq_logfuncall("qp_num=%x, src_qp=%x, wc_flags=%x, pkey_index=%x, slid=%x, sl=%x, dlid_path_bits=%x", p_wce[i].qp_num, p_wce[i].src_qp, vma_wc_flags(p_wce[i]), p_wce[i].pkey_index, p_wce[i].slid, p_wce[i].sl, p_wce[i].dlid_path_bits);
}
}
// spoil the global sn if we have packets ready
union __attribute__((packed)) {
uint64_t global_sn;
struct {
uint32_t cq_id;
uint32_t cq_sn;
} bundle;
} next_sn;
next_sn.bundle.cq_sn = ++m_n_cq_poll_sn;
next_sn.bundle.cq_id = m_cq_id;
*p_cq_poll_sn = m_n_global_sn = next_sn.global_sn;
return ret;
}
void cq_mgr::process_cq_element_log_helper(mem_buf_desc_t* p_mem_buf_desc, vma_ibv_wc* p_wce)
{
BULLSEYE_EXCLUDE_BLOCK_START
// wce with bad status value
if (p_wce->status == IBV_WC_SUCCESS) {
cq_logdbg("wce: wr_id=%#x, status=%#x, vendor_err=%#x, qp_num=%#x", p_wce->wr_id, p_wce->status, p_wce->vendor_err, p_wce->qp_num);
if (m_b_is_rx_hw_csum_on && ! vma_wc_rx_hw_csum_ok(*p_wce))
cq_logdbg("wce: bad rx_csum");
cq_logdbg("wce: opcode=%#x, byte_len=%#d, src_qp=%#x, wc_flags=%#x", vma_wc_opcode(*p_wce), p_wce->byte_len, p_wce->src_qp, vma_wc_flags(*p_wce));
cq_logdbg("wce: pkey_index=%#x, slid=%#x, sl=%#x, dlid_path_bits=%#x, imm_data=%#x", p_wce->pkey_index, p_wce->slid, p_wce->sl, p_wce->dlid_path_bits, p_wce->imm_data);
cq_logdbg("mem_buf_desc: lkey=%#x, p_buffer=%p, sz_buffer=%#x", p_mem_buf_desc->lkey, p_mem_buf_desc->p_buffer, p_mem_buf_desc->sz_buffer);
} else if (p_wce->status != IBV_WC_WR_FLUSH_ERR) {
cq_logwarn("wce: wr_id=%#x, status=%#x, vendor_err=%#x, qp_num=%#x", p_wce->wr_id, p_wce->status, p_wce->vendor_err, p_wce->qp_num);
cq_loginfo("wce: opcode=%#x, byte_len=%#d, src_qp=%#x, wc_flags=%#x", vma_wc_opcode(*p_wce), p_wce->byte_len, p_wce->src_qp, vma_wc_flags(*p_wce));
cq_loginfo("wce: pkey_index=%#x, slid=%#x, sl=%#x, dlid_path_bits=%#x, imm_data=%#x", p_wce->pkey_index, p_wce->slid, p_wce->sl, p_wce->dlid_path_bits, p_wce->imm_data);
if (p_mem_buf_desc) {
cq_logwarn("mem_buf_desc: lkey=%#x, p_buffer=%p, sz_buffer=%#x", p_mem_buf_desc->lkey, p_mem_buf_desc->p_buffer, p_mem_buf_desc->sz_buffer);
}
}
BULLSEYE_EXCLUDE_BLOCK_END
cq_logfunc("wce error status '%s' [%d] (wr_id=%p, qp_num=%x)", priv_ibv_wc_status_str(p_wce->status), p_wce->status, p_wce->wr_id, p_wce->qp_num);
}
mem_buf_desc_t* cq_mgr::process_cq_element_tx(vma_ibv_wc* p_wce)
{
// Assume locked!!!
cq_logfuncall("");
// Get related mem_buf_desc pointer from the wr_id
mem_buf_desc_t* p_mem_buf_desc = (mem_buf_desc_t*)(uintptr_t)p_wce->wr_id;
if (unlikely(p_wce->status != IBV_WC_SUCCESS)) {
process_cq_element_log_helper(p_mem_buf_desc, p_wce);
if (p_mem_buf_desc == NULL) {
cq_logdbg("wce->wr_id = 0!!! When status != IBV_WC_SUCCESS");
return NULL;
}
if (p_mem_buf_desc->p_desc_owner) {
m_p_ring->mem_buf_desc_completion_with_error_tx(p_mem_buf_desc);
} else {
// AlexR: can this wce have a valid mem_buf_desc pointer?
// AlexR: are we throwing away a data buffer and a mem_buf_desc element?
cq_logdbg("no desc_owner(wr_id=%p, qp_num=%x)", p_wce->wr_id, p_wce->qp_num);
}
return NULL;
}
if (p_mem_buf_desc == NULL) {
cq_logdbg("wce->wr_id = 0!!! When status == IBV_WC_SUCCESS");
return NULL;
}
return p_mem_buf_desc;
}
mem_buf_desc_t* cq_mgr::process_cq_element_rx(vma_ibv_wc* p_wce)
{
// Assume locked!!!
cq_logfuncall("");
// Get related mem_buf_desc pointer from the wr_id
mem_buf_desc_t* p_mem_buf_desc = (mem_buf_desc_t*)(uintptr_t)p_wce->wr_id;
bool bad_wce = p_wce->status != IBV_WC_SUCCESS;
if (unlikely(bad_wce || p_mem_buf_desc == NULL)) {
if (p_mem_buf_desc == NULL) {
m_p_next_rx_desc_poll = NULL;
cq_logdbg("wce->wr_id = 0!!! When status == IBV_WC_SUCCESS");
return NULL;
}
process_cq_element_log_helper(p_mem_buf_desc, p_wce);
m_p_next_rx_desc_poll = NULL;
if (p_mem_buf_desc == NULL) {
cq_logdbg("wce->wr_id = 0!!! When status != IBV_WC_SUCCESS");
return NULL;
}
if (p_mem_buf_desc->p_desc_owner) {
m_p_ring->mem_buf_desc_completion_with_error_rx(p_mem_buf_desc);
return NULL;
}
// AlexR: can this wce have a valid mem_buf_desc pointer?
// AlexR: are we throwing away a data buffer and a mem_buf_desc element?
cq_logdbg("no desc_owner(wr_id=%p, qp_num=%x)", p_wce->wr_id, p_wce->qp_num);
return NULL;
}
if (m_n_sysvar_rx_prefetch_bytes_before_poll) {
/*for debug:
if (m_p_next_rx_desc_poll && m_p_next_rx_desc_poll != p_mem_buf_desc) {
cq_logerr("prefetched wrong buffer");
}*/
m_p_next_rx_desc_poll = p_mem_buf_desc->p_prev_desc;
p_mem_buf_desc->p_prev_desc = NULL;
}
p_mem_buf_desc->rx.is_sw_csum_need = !(m_b_is_rx_hw_csum_on && vma_wc_rx_hw_csum_ok(*p_wce));
if (likely(vma_wc_opcode(*p_wce) & VMA_IBV_WC_RECV)) {
// Save recevied total bytes
p_mem_buf_desc->sz_data = p_wce->byte_len;
//we use context to verify that on reclaim rx buffer path we return the buffer to the right CQ
p_mem_buf_desc->rx.is_vma_thr = false;
p_mem_buf_desc->rx.context = this;
p_mem_buf_desc->rx.socketxtreme_polled = false;
//this is not a deadcode if timestamping is defined in verbs API
// coverity[dead_error_condition]
if (vma_wc_flags(*p_wce) & VMA_IBV_WC_WITH_TIMESTAMP) {
p_mem_buf_desc->rx.hw_raw_timestamp = vma_wc_timestamp(*p_wce);
}
VALGRIND_MAKE_MEM_DEFINED(p_mem_buf_desc->p_buffer, p_mem_buf_desc->sz_data);
prefetch_range((uint8_t*)p_mem_buf_desc->p_buffer + m_sz_transport_header,
std::min(p_mem_buf_desc->sz_data - m_sz_transport_header, (size_t)m_n_sysvar_rx_prefetch_bytes));
//prefetch((uint8_t*)p_mem_buf_desc->p_buffer + m_sz_transport_header);
}
return p_mem_buf_desc;
}
bool cq_mgr::compensate_qp_poll_success(mem_buf_desc_t* buff_cur)
{
// Assume locked!!!
// Compensate QP for all completions that we found
if (m_rx_pool.size() || request_more_buffers()) {
size_t buffers = std::min<size_t>(m_qp_rec.debt, m_rx_pool.size());
m_qp_rec.qp->post_recv_buffers(&m_rx_pool, buffers);
m_qp_rec.debt -= buffers;
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
}
else if (m_b_sysvar_cq_keep_qp_full ||
m_qp_rec.debt + MCE_MAX_CQ_POLL_BATCH > (int)m_qp_rec.qp->m_rx_num_wr) {
m_p_cq_stat->n_rx_pkt_drop++;
m_qp_rec.qp->post_recv_buffer(buff_cur);
--m_qp_rec.debt;
return true;
}
return false;
}
void cq_mgr::compensate_qp_poll_failed()
{
// Assume locked!!!
// Compensate QP for all completions debt
if (m_qp_rec.debt) {
if (likely(m_rx_pool.size() || request_more_buffers())) {
size_t buffers = std::min<size_t>(m_qp_rec.debt, m_rx_pool.size());
m_qp_rec.qp->post_recv_buffers(&m_rx_pool, buffers);
m_qp_rec.debt -= buffers;
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
}
}
}
void cq_mgr::reclaim_recv_buffer_helper(mem_buf_desc_t* buff)
{
if (buff->dec_ref_count() <= 1 && (buff->lwip_pbuf.pbuf.ref-- <= 1)) {
if (likely(buff->p_desc_owner == m_p_ring)) {
mem_buf_desc_t* temp = NULL;
while (buff) {
VLIST_DEBUG_CQ_MGR_PRINT_ERROR_IS_MEMBER;
temp = buff;
buff = temp->p_next_desc;
temp->p_next_desc = NULL;
temp->p_prev_desc = NULL;
temp->reset_ref_count();
temp->rx.tcp.gro = 0;
temp->rx.is_vma_thr = false;
temp->rx.socketxtreme_polled = false;
temp->rx.flow_tag_id = 0;
temp->rx.tcp.p_ip_h = NULL;
temp->rx.tcp.p_tcp_h = NULL;
temp->rx.timestamps.sw.tv_nsec = 0;
temp->rx.timestamps.sw.tv_sec = 0;
temp->rx.timestamps.hw.tv_nsec = 0;
temp->rx.timestamps.hw.tv_sec = 0;
temp->rx.hw_raw_timestamp = 0;
free_lwip_pbuf(&temp->lwip_pbuf);
m_rx_pool.push_back(temp);
}
m_p_cq_stat->n_buffer_pool_len = m_rx_pool.size();
}
else {
cq_logfunc("Buffer returned to wrong CQ");
g_buffer_pool_rx->put_buffers_thread_safe(buff);
}
}
}
void cq_mgr::process_tx_buffer_list(mem_buf_desc_t* p_mem_buf_desc)
{
// Assume locked!!!
BULLSEYE_EXCLUDE_BLOCK_START
if (p_mem_buf_desc && (p_mem_buf_desc->p_desc_owner == m_p_ring /*|| m_p_ring->get_parent()->is_member(p_mem_buf_desc->p_desc_owner)*/)) {
m_p_ring->mem_buf_desc_return_to_owner_tx(p_mem_buf_desc);
/* if decided to free buffers of another ring here, need to modify return_to_owner to check owner and return to gpool. */
}
else if (p_mem_buf_desc && m_p_ring->get_parent()->is_member(p_mem_buf_desc->p_desc_owner)) {
cq_logerr("got buffer of wrong owner, high-availability event? buf=%p, owner=%p", p_mem_buf_desc, p_mem_buf_desc ? p_mem_buf_desc->p_desc_owner : NULL);
/* if decided to free buffers here, remember its a list and need to deref members. */
//p_mem_buf_desc->p_desc_owner->mem_buf_desc_return_to_owner_tx(p_mem_buf_desc); /* this can cause a deadlock between rings, use trylock? */
} else {
cq_logerr("got buffer of wrong owner, buf=%p, owner=%p", p_mem_buf_desc, p_mem_buf_desc ? p_mem_buf_desc->p_desc_owner : NULL);
}
BULLSEYE_EXCLUDE_BLOCK_END
}
void cq_mgr::mem_buf_desc_completion_with_error(mem_buf_desc_t* p_mem_buf_desc)
{
cq_logfuncall("");
// lock(); Called from cq_mgr context which is already locked!!
reclaim_recv_buffer_helper(p_mem_buf_desc);
// unlock(); Called from cq_mgr context which is already locked!!
}
void cq_mgr::mem_buf_desc_return_to_owner(mem_buf_desc_t* p_mem_buf_desc, void* pv_fd_ready_array /*=NULL*/)
{
cq_logfuncall("");
NOT_IN_USE(pv_fd_ready_array);
reclaim_recv_buffer_helper(p_mem_buf_desc);
}
int cq_mgr::poll_and_process_element_rx(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array)
{
// Assume locked!!!
cq_logfuncall("");
/* coverity[stack_use_local_overflow] */
vma_ibv_wc wce[MCE_MAX_CQ_POLL_BATCH];
int ret;
uint32_t ret_rx_processed = process_recv_queue(pv_fd_ready_array);
if (unlikely(ret_rx_processed >= m_n_sysvar_cq_poll_batch_max)) {
m_p_ring->m_gro_mgr.flush_all(pv_fd_ready_array);
return ret_rx_processed;
}
if (m_p_next_rx_desc_poll) {
prefetch_range((uint8_t*)m_p_next_rx_desc_poll->p_buffer, m_n_sysvar_rx_prefetch_bytes_before_poll);
}
ret = poll(wce, m_n_sysvar_cq_poll_batch_max, p_cq_poll_sn);
if (ret > 0) {
m_n_wce_counter += ret;
if (ret < (int)m_n_sysvar_cq_poll_batch_max)
m_b_was_drained = true;
for (int i = 0; i < ret; i++) {
mem_buf_desc_t *buff = process_cq_element_rx((&wce[i]));
if (buff) {
if (vma_wc_opcode(wce[i]) & VMA_IBV_WC_RECV) {
if ((++m_qp_rec.debt < (int)m_n_sysvar_rx_num_wr_to_post_recv) ||
!compensate_qp_poll_success(buff)) {
process_recv_buffer(buff, pv_fd_ready_array);
}
}
}
}
ret_rx_processed += ret;
m_p_ring->m_gro_mgr.flush_all(pv_fd_ready_array);
} else {
compensate_qp_poll_failed();
}
return ret_rx_processed;
}
int cq_mgr::poll_and_process_element_tx(uint64_t* p_cq_poll_sn)
{
// Assume locked!!!
cq_logfuncall("");
/* coverity[stack_use_local_overflow] */
vma_ibv_wc wce[MCE_MAX_CQ_POLL_BATCH];
int ret = poll(wce, m_n_sysvar_cq_poll_batch_max, p_cq_poll_sn);
if (ret > 0) {
m_n_wce_counter += ret;
if (ret < (int)m_n_sysvar_cq_poll_batch_max)
m_b_was_drained = true;
for (int i = 0; i < ret; i++) {
mem_buf_desc_t *buff = process_cq_element_tx((&wce[i]));
if (buff) {
process_tx_buffer_list(buff);
}
}
}
return ret;
}
int cq_mgr::poll_and_process_element_rx(mem_buf_desc_t **p_desc_lst)
{
NOT_IN_USE(p_desc_lst);
cq_logerr("SocketXtreme mode is supported by mlx5 cq manager only");
return 0;
}
bool cq_mgr::reclaim_recv_buffers(mem_buf_desc_t *rx_reuse_lst)
{
if (m_rx_buffs_rdy_for_free_head) {
reclaim_recv_buffer_helper(m_rx_buffs_rdy_for_free_head);
m_rx_buffs_rdy_for_free_head = m_rx_buffs_rdy_for_free_tail = NULL;
}
reclaim_recv_buffer_helper(rx_reuse_lst);
return_extra_buffers();
return true;
}
bool cq_mgr::reclaim_recv_buffers_no_lock(mem_buf_desc_t *rx_reuse_lst)
{
if (likely(rx_reuse_lst)) {
reclaim_recv_buffer_helper(rx_reuse_lst);
return true;
}
return false;
}
int cq_mgr::reclaim_recv_single_buffer(mem_buf_desc_t* rx_reuse)
{
int ret_val = 0;
ret_val = rx_reuse->lwip_pbuf_dec_ref_count();
if ((ret_val == 0) && (rx_reuse->get_ref_count() <= 0)) {
/*if ((safe_mce_sys().thread_mode > THREAD_MODE_SINGLE)) {
m_lock_ring_rx.lock();
}*/
if (!m_rx_buffs_rdy_for_free_head) {
m_rx_buffs_rdy_for_free_head = m_rx_buffs_rdy_for_free_tail = rx_reuse;
}
else {
m_rx_buffs_rdy_for_free_tail->p_next_desc = rx_reuse;
m_rx_buffs_rdy_for_free_tail = rx_reuse;
}
m_rx_buffs_rdy_for_free_tail->p_next_desc = NULL;
/*if ((safe_mce_sys().thread_mode > THREAD_MODE_SINGLE)) {
m_lock_ring_rx.lock();
}*/
}
return ret_val;
}
bool cq_mgr::reclaim_recv_buffers(descq_t *rx_reuse)
{
cq_logfuncall("");
// Called from outside cq_mgr context which is not locked!!
while (!rx_reuse->empty()) {
mem_buf_desc_t* buff = rx_reuse->get_and_pop_front();
reclaim_recv_buffer_helper(buff);
}
return_extra_buffers();
return true;
}
//
// @OUT: p_recycle_buffers_last_wr_id Returns the final WR_ID handled. When set, this indicates this is a CQE drain flow.
// @OUT: returns total number of processes CQE's
//
int cq_mgr::drain_and_proccess(uintptr_t* p_recycle_buffers_last_wr_id /*=NULL*/)
{
cq_logfuncall("cq was %s drained. %d processed wce since last check. %d wce in m_rx_queue", (m_b_was_drained?"":"not "), m_n_wce_counter, m_rx_queue.size());
// CQ polling loop until max wce limit is reached for this interval or CQ is drained
uint32_t ret_total = 0;
uint64_t cq_poll_sn = 0;
if (p_recycle_buffers_last_wr_id != NULL) {
m_b_was_drained = false;
}
while ((m_n_sysvar_progress_engine_wce_max > m_n_wce_counter) && (!m_b_was_drained)) {
/* coverity[stack_use_local_overflow] */
vma_ibv_wc wce[MCE_MAX_CQ_POLL_BATCH];
int ret = poll(wce, MCE_MAX_CQ_POLL_BATCH, &cq_poll_sn);
if (ret <= 0) {
m_b_was_drained = true;
m_p_ring->m_gro_mgr.flush_all(NULL);
return ret_total;
}
m_n_wce_counter += ret;
if (ret < MCE_MAX_CQ_POLL_BATCH)
m_b_was_drained = true;
for (int i = 0; i < ret; i++) {
mem_buf_desc_t* buff = process_cq_element_rx(&wce[i]);
if (buff) {
if (p_recycle_buffers_last_wr_id) {
m_p_cq_stat->n_rx_pkt_drop++;
reclaim_recv_buffer_helper(buff);
} else {
bool procces_now = false;
if (m_transport_type == VMA_TRANSPORT_ETH) {
procces_now = is_eth_tcp_frame(buff);
}
if (m_transport_type == VMA_TRANSPORT_IB) {
procces_now = is_ib_tcp_frame(buff);
}
// We process immediately all non udp/ip traffic..
if (procces_now) {
buff->rx.is_vma_thr = true;
if ((++m_qp_rec.debt < (int)m_n_sysvar_rx_num_wr_to_post_recv) ||
!compensate_qp_poll_success(buff)) {
process_recv_buffer(buff, NULL);
}
}
else { //udp/ip traffic we just put in the cq's rx queue
m_rx_queue.push_back(buff);
mem_buf_desc_t* buff_cur = m_rx_queue.get_and_pop_front();
if ((++m_qp_rec.debt < (int)m_n_sysvar_rx_num_wr_to_post_recv) ||
!compensate_qp_poll_success(buff_cur)) {
m_rx_queue.push_front(buff_cur);
}
}
}
}
if (p_recycle_buffers_last_wr_id) {
*p_recycle_buffers_last_wr_id = (uintptr_t)wce[i].wr_id;
}
}
ret_total += ret;
}
m_p_ring->m_gro_mgr.flush_all(NULL);
m_n_wce_counter = 0;
m_b_was_drained = false;
// Update cq statistics
m_p_cq_stat->n_rx_sw_queue_len = m_rx_queue.size();
m_p_cq_stat->n_rx_drained_at_once_max = std::max(ret_total, m_p_cq_stat->n_rx_drained_at_once_max);
return ret_total;
}
// 1 -> busy
// 0 -> ok
// -1 -> error
int cq_mgr::ack_and_request_notification()
{
int res, cq_ev_count = 0;
ibv_cq* ib_cq;
void *cq_context;
do {
res = ibv_get_cq_event(m_comp_event_channel, &ib_cq, &cq_context);
if (res == 0) {
++cq_ev_count;
}
} while (res == 0);
if (errno != EAGAIN) {
return -1;
}
if (cq_ev_count > 0) {
get_cq_event(cq_ev_count);
ibv_ack_cq_events(m_p_ibv_cq, cq_ev_count);
return 1;
}
IF_VERBS_FAILURE(req_notify_cq()) {
cq_logerr("Failure arming the qp_mgr notification channel (errno=%d %m)", errno);
return -1;
}
ENDIF_VERBS_FAILURE
return 0;
}
int cq_mgr::request_notification(uint64_t poll_sn)
{
int ret = -1;
cq_logfuncall("");
if ((m_n_global_sn > 0 && poll_sn != m_n_global_sn)) {
// The cq_mgr's has receive packets pending processing (or got processed since cq_poll_sn)
cq_logfunc("miss matched poll sn (user=0x%lx, cq=0x%lx)", poll_sn, m_n_cq_poll_sn);
return 1;
}
if (m_b_notification_armed == false) {
cq_logfunc("arming cq_mgr notification channel");
// Arm the CQ notification channel
IF_VERBS_FAILURE(req_notify_cq()) {
cq_logerr("Failure arming the qp_mgr notification channel (errno=%d %m)", errno);
}
else {
ret = 0;
m_b_notification_armed = true;
} ENDIF_VERBS_FAILURE;
}
else {
// cq_mgr notification channel already armed
ret = 0;
}
cq_logfuncall("returning with %d", ret);
return ret;
}
int cq_mgr::wait_for_notification_and_process_element(uint64_t* p_cq_poll_sn, void* pv_fd_ready_array)
{
int ret = -1;
cq_logfunc("");
if (m_b_notification_armed) {
cq_mgr* p_cq_mgr_context = NULL;
struct ibv_cq* p_cq_hndl = NULL;
void *p; // deal with compiler warnings
// Block on the cq_mgr's notification event channel
IF_VERBS_FAILURE(ibv_get_cq_event(m_comp_event_channel, &p_cq_hndl, &p)) {
cq_logfunc("waiting on cq_mgr event returned with error (errno=%d %m)", errno);
}
else {
get_cq_event();
p_cq_mgr_context = (cq_mgr*)p;
if (p_cq_mgr_context != this) {
cq_logerr("mismatch with cq_mgr returned from new event (event->cq_mgr->%p)", p_cq_mgr_context);
// this can be if we are using a single channel for several/all cq_mgrs
// in this case we need to deliver the event to the correct cq_mgr
}
// Ack event
ibv_ack_cq_events(m_p_ibv_cq, 1);
// Clear flag
m_b_notification_armed = false;
// Now try processing the ready element
if (m_b_is_rx) {
ret = poll_and_process_element_rx(p_cq_poll_sn, pv_fd_ready_array);
} else {
ret = poll_and_process_element_tx(p_cq_poll_sn);
}
} ENDIF_VERBS_FAILURE;
}
else {
cq_logfunc("notification channel is not armed");
errno = EAGAIN;
}
return ret;
}
cq_mgr* get_cq_mgr_from_cq_event(struct ibv_comp_channel* p_cq_channel)
{
cq_mgr* p_cq_mgr = NULL;
struct ibv_cq* p_cq_hndl = NULL;
void *p_context; // deal with compiler warnings
// read & ack the CQ event
IF_VERBS_FAILURE(ibv_get_cq_event(p_cq_channel, &p_cq_hndl, &p_context)) {
vlog_printf(VLOG_INFO, MODULE_NAME ":%d: waiting on cq_mgr event returned with error (errno=%d %m)\n", __LINE__, errno);
}
else {
p_cq_mgr = (cq_mgr*)p_context; // Save the cq_mgr
p_cq_mgr->get_cq_event();
ibv_ack_cq_events(p_cq_hndl, 1); // Ack the ibv event
} ENDIF_VERBS_FAILURE;
return p_cq_mgr;
}