Blob Blame History Raw
/*
 * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <dev/ring_eth_cb.h>
#include <dev/qp_mgr_mp.h>
#include <dev/cq_mgr_mp.h>

#undef  MODULE_NAME
#define MODULE_NAME		"ring_eth_cb"
#undef  MODULE_HDR
#define MODULE_HDR		MODULE_NAME "%d:%s() "


#ifdef HAVE_MP_RQ

#define DUMP_LKEY		(0x700)
#define VMA_MP_MIN_LOG_STRIDES	(10)
#define MAX_MP_WQES		(20) // limit max used memory
#define MIN_MP_WQES		(4)

ring_eth_cb::ring_eth_cb(int if_index, vma_cyclic_buffer_ring_attr *cb_ring,
			 iovec *mem_desc, ring *parent):
			 ring_eth(if_index, parent, RING_ETH_CB, false)
			,m_curr_wqe_used_strides(0)
			,m_curr_packets(0)
			,m_padd_mode_used_strides(0)
			,m_all_wqes_used_strides(0)
			,m_packet_receive_mode(cb_ring->packet_receive_mode)
			,m_curr_wq(0)
			,m_curr_payload_addr(NULL)
			,m_curr_hdr_ptr(NULL)
			,m_res_domain(NULL)
			,m_external_mem(cb_ring->comp_mask & VMA_CB_EXTERNAL_MEM)

{
	struct ibv_exp_res_domain_init_attr res_domain_attr;

	// check MP capabilities currently all caps are 0 due to a buf
	vma_ibv_device_attr* r_ibv_dev_attr = m_p_ib_ctx->get_ibv_device_attr();

	memset(&m_umr_wr, 0, sizeof(m_umr_wr));
	memset(m_sge_ptrs, 0, sizeof(m_sge_ptrs));
	m_p_umr_mr = NULL;
	m_hdr_len = 0;

	if (!r_ibv_dev_attr->max_ctx_res_domain) {
		ring_logdbg("device doesn't support resource domain");
		throw_vma_exception("device doesn't support resource domain");
	}

	struct ibv_exp_mp_rq_caps *mp_rq_caps = &r_ibv_dev_attr->mp_rq_caps;
	if (!(mp_rq_caps->supported_qps & IBV_EXP_QPT_RAW_PACKET)) {
		ring_logdbg("mp_rq is not supported");
		throw_vma_exception("device doesn't support RC QP");
	}

	res_domain_attr.comp_mask = IBV_EXP_RES_DOMAIN_THREAD_MODEL |
				    IBV_EXP_RES_DOMAIN_MSG_MODEL;

	// driver is in charge of locks
	res_domain_attr.thread_model = IBV_EXP_THREAD_SAFE;

	// currently have no affect
	res_domain_attr.msg_model = IBV_EXP_MSG_HIGH_BW;

	m_res_domain = ibv_exp_create_res_domain(m_p_ib_ctx->get_ibv_context(),
				&res_domain_attr);
	if (!m_res_domain) {
		ring_logdbg("could not create resource domain");
		throw_vma_exception("failed creating resource domain");
	}
	// stride size is headers + user payload aligned to power of 2
	uint16_t net_len = 0;
	if (m_partition) {
		net_len = ETH_VLAN_HDR_LEN + sizeof(struct iphdr) + sizeof(struct udphdr);
	} else {
		net_len = ETH_HDR_LEN + sizeof(struct iphdr) + sizeof(struct udphdr);
	}
	m_single_stride_log_num_of_bytes = ilog_2(align32pow2(
			cb_ring->stride_bytes + cb_ring->hdr_bytes + net_len));
	if (m_single_stride_log_num_of_bytes < mp_rq_caps->min_single_stride_log_num_of_bytes) {
		m_single_stride_log_num_of_bytes = mp_rq_caps->min_single_stride_log_num_of_bytes;
	}
	if (m_single_stride_log_num_of_bytes > mp_rq_caps->max_single_stride_log_num_of_bytes) {
		m_single_stride_log_num_of_bytes = mp_rq_caps->max_single_stride_log_num_of_bytes;
	}
	m_stride_size = 1 << m_single_stride_log_num_of_bytes;
	uint32_t max_wqe_size = 1 << mp_rq_caps->max_single_wqe_log_num_of_strides;
	uint32_t user_req_wq = cb_ring->num / max_wqe_size;
	if (user_req_wq > MIN_MP_WQES) {
		m_wq_count = std::min<uint32_t>(user_req_wq, MAX_MP_WQES);
		m_single_wqe_log_num_of_strides = mp_rq_caps->max_single_wqe_log_num_of_strides;
	} else {
		m_wq_count = MIN_MP_WQES;
		m_single_wqe_log_num_of_strides = ilog_2(align32pow2(cb_ring->num) / m_wq_count);
		if (m_single_wqe_log_num_of_strides < VMA_MP_MIN_LOG_STRIDES) {
			m_single_wqe_log_num_of_strides = VMA_MP_MIN_LOG_STRIDES;
		}
		if (m_single_wqe_log_num_of_strides > mp_rq_caps->max_single_wqe_log_num_of_strides) {
			m_single_wqe_log_num_of_strides = mp_rq_caps->max_single_wqe_log_num_of_strides;
		}
	}
	m_strides_num = 1 << m_single_wqe_log_num_of_strides;
	ring_logdbg("using strides_num %d stride size %d, wqe_count %d stride_bytes "
		    "%d, hdr_bytes %d num %d rec mode %d", m_strides_num, m_stride_size,
		    m_wq_count, cb_ring->stride_bytes, cb_ring->hdr_bytes, cb_ring->num,
		    m_packet_receive_mode);

	memset(&m_curr_hw_timestamp, 0, sizeof(m_curr_hw_timestamp));
	if (m_packet_receive_mode == PADDED_PACKET) {
		size_t buffer_size = m_stride_size * m_strides_num * m_wq_count;
		m_sge_ptrs[CB_UMR_PAYLOAD] = (uint64_t)allocate_memory(mem_desc, buffer_size);
		if (unlikely(!m_sge_ptrs[CB_UMR_PAYLOAD])) {
			throw_vma_exception("user provided to small memory");
		}
		m_buff_data.addr = m_sge_ptrs[CB_UMR_PAYLOAD];
		m_buff_data.length = m_stride_size * m_strides_num;
		m_buff_data.lkey = get_mem_lkey(m_p_ib_ctx);
		m_packet_size = cb_ring->stride_bytes + net_len;
		m_payload_len = m_stride_size;
		if (unlikely(m_buff_data.lkey == (uint32_t)(-1))) {
			ring_logerr("got invalid lkey for memory %p size %zd",
				    mem_desc->iov_base, mem_desc->iov_len);
			throw_vma_exception("failed retrieving lkey");
		}
		ring_logdbg("using buffer size %zd", buffer_size);
	} else if (allocate_umr_mem(cb_ring, mem_desc, net_len)) {
		ring_logerr("failed creating UMR QP");
		throw_vma_exception("failed creating UMR QP");
	}

	/* Complete resources initialization */
	ring_simple::create_resources();
}

void* ring_eth_cb::allocate_memory(iovec *mem_desc, size_t buffer_size)
{
	if (mem_desc && mem_desc->iov_len) {
		if (unlikely(mem_desc->iov_len < buffer_size)) {
			ring_logerr("user provided to small memory "
				    "expected %zd but got %zd",
				    buffer_size, mem_desc->iov_len);
			errno = EINVAL;
			return NULL;
		}
		return m_alloc.alloc_and_reg_mr(mem_desc->iov_len, m_p_ib_ctx,
					 mem_desc->iov_base);
	} else {
		return m_alloc.alloc_and_reg_mr(buffer_size, m_p_ib_ctx);
	}
}

qp_mgr* ring_eth_cb::create_qp_mgr(const ib_ctx_handler *ib_ctx,
				   uint8_t port_num,
				   struct ibv_comp_channel *p_rx_comp_event_channel)
{
	return new qp_mgr_mp(this, ib_ctx, port_num, p_rx_comp_event_channel,
			get_tx_num_wr(), m_partition, m_buff_data,
			m_external_mem);
}

int ring_eth_cb::get_mem_info(ibv_sge &mem_info)
{
	if (!m_buff_data.addr) {
		ring_logwarn("no valid memory to return");
		return -1;
	}
	mem_info.addr = m_buff_data.addr;
	mem_info.length = m_buff_data.length;
	mem_info.lkey = m_buff_data.lkey;
	ring_logdbg("returning ptr %p, legnth %zd, lkey %u", mem_info.addr,
		    mem_info.length, mem_info.lkey);
	return 0;
}

/**
 * allocate and set UMR addresses
 * @return 0 on success -1 on failure
 * @note when using UMR memory appears in VMA as follows
 * +----------------------------+
 * |	WQE0 network headers	|
 * |	WQE1 network headers	|
 * |	...			|
 * |	WQE0 user headers	|
 * |	WQE1 user headers	|
 * |	...			|
 * |	WQE0 payload		|
 * |	WQE1 payload		|
 * |	...			|
 * |	WQE0 padding		|
 * |	WQE1 padding		|
 * |	...			|
 * +----------------------------+
 */
int ring_eth_cb::allocate_umr_mem(vma_cyclic_buffer_ring_attr *cb_ring,
				  iovec *mem_desc,
				  uint16_t net_len)
{
	ibv_exp_create_mr_in mrin;
	ibv_exp_mem_repeat_block* p_mem_rep_list = NULL;
	ibv_mr* mr = NULL, *dump_mr;
	size_t curr_data_len = 0, packet_len, pad_len, buffer_size;
	size_t packets_num = m_strides_num * m_wq_count;
	uint64_t base_ptr, prev_addr, pad_addr;
	int index = 0, count = 1, umr_blocks;
	const int ndim = 1; // we only use one dimension see UMR docs
	int retval = 0;

	// the min mr is two one for padding and one for data
	umr_blocks = 2;
	if ((cb_ring->comp_mask & VMA_CB_HDR_BYTE) && cb_ring->hdr_bytes &&
	    m_packet_receive_mode == RAW_PACKET) {
		ring_logwarn("bad parameters!, you cannot choose "
			     "RAW_PACKET and define user header "
			     "the header\n");
		return -1;
	}

	if (m_packet_receive_mode != RAW_PACKET) {
		umr_blocks++; // add user_hd\netwrok_hdr
		if ((cb_ring->comp_mask & VMA_CB_HDR_BYTE) &&
		    cb_ring->hdr_bytes &&
		    m_packet_receive_mode == STRIP_NETWORK_HDRS) {
			umr_blocks++; // strip network hdr
		}
	}

	p_mem_rep_list = new(std::nothrow) ibv_exp_mem_repeat_block[umr_blocks]();
	if (p_mem_rep_list == NULL) {
		ring_logwarn("failed allocating memory");
		errno = ENOMEM;
		return -1;
	}
	for (int i = 0; i < umr_blocks; i++) {
		p_mem_rep_list[i].byte_count = new(std::nothrow) size_t[ndim];
		p_mem_rep_list[i].stride = new(std::nothrow) size_t[ndim];
		if (p_mem_rep_list[i].byte_count == NULL ||
		    p_mem_rep_list[i].stride == NULL) {
			ring_logwarn("failed allocating memory");
			errno = ENOMEM;
			retval = -1;
			goto cleanup;
		}
	}

	m_payload_len = cb_ring->stride_bytes;
	m_hdr_len = cb_ring->hdr_bytes;
	m_packet_size = m_payload_len + m_hdr_len + net_len;

	// in case stride smaller then packet size
	while ((m_stride_size * count) <= m_packet_size) {
		++count;
	}
	// no need to allocate padding
	pad_len = (m_stride_size * count) - m_packet_size;
	// allocate buffer
	if (m_packet_receive_mode == STRIP_NETWORK_HDRS) {
		buffer_size = (m_packet_size - net_len) * packets_num;
	} else {
		buffer_size = m_packet_size * packets_num;
	}
	// will raise an exception on failure
	base_ptr = (uint64_t)allocate_memory(mem_desc, buffer_size);
	if (unlikely(!base_ptr)) {
		goto cleanup;
	}
	ring_logdbg("using buffer parameters, buffer_size %zd "
		    "pad len %d packet size %d stride size %d",
		    buffer_size, pad_len, m_packet_size, m_stride_size);
	prev_addr = base_ptr;
	mr = m_alloc.find_ibv_mr_by_ib_ctx(m_p_ib_ctx);
	// redmine.mellanox.com/issues/1379468
	pad_addr = (uint64_t)m_dump_mr.alloc_and_reg_mr(128, m_p_ib_ctx);
	dump_mr = m_dump_mr.find_ibv_mr_by_ib_ctx(m_p_ib_ctx);
	BULLSEYE_EXCLUDE_BLOCK_START
	if (unlikely(mr == NULL || dump_mr == NULL)) {
		ring_logerr("could not find mr %p, dump mr %p", mr, dump_mr);
		retval = -1;
		goto cleanup;
	}
	BULLSEYE_EXCLUDE_BLOCK_END
	// no problem overriding lkey since deregmr is not using it
	dump_mr->lkey = DUMP_LKEY;
	packet_len = net_len;
	switch (m_packet_receive_mode) {
	case RAW_PACKET:
		packet_len += m_payload_len;
		// for size calculation in read_cyclic
		m_payload_len = packet_len;
		m_sge_ptrs[CB_UMR_PAYLOAD] = base_ptr;
		p_mem_rep_list[index].base_addr = base_ptr;
		p_mem_rep_list[index].byte_count[0] = packet_len;
		p_mem_rep_list[index].stride[0] = packet_len;
		p_mem_rep_list[index].mr = mr;
		index++;
	break;
	case STRIP_NETWORK_HDRS:
		// network not accessible to application
		p_mem_rep_list[index].base_addr = pad_addr;
		p_mem_rep_list[index].byte_count[0] = net_len;
		// optimize write header to the same physical address
		p_mem_rep_list[index].stride[0] = 0;
		p_mem_rep_list[index].mr = dump_mr;
		index++;
		if (m_hdr_len) {
			p_mem_rep_list[index].base_addr = base_ptr;
			p_mem_rep_list[index].byte_count[0] = m_hdr_len;
			p_mem_rep_list[index].stride[0] = m_hdr_len;
			p_mem_rep_list[index].mr = mr;
			m_sge_ptrs[CB_UMR_HDR] = base_ptr;
			curr_data_len = packets_num * m_hdr_len;
			prev_addr += curr_data_len;
			index++;
		}
		p_mem_rep_list[index].base_addr = prev_addr;
		p_mem_rep_list[index].byte_count[0] = m_payload_len;
		p_mem_rep_list[index].stride[0] = m_payload_len;
		p_mem_rep_list[index].mr = mr;
		m_sge_ptrs[CB_UMR_PAYLOAD] = prev_addr;
		index++;
	break;
	case SEPERATE_NETWORK_HDRS:
		if (m_hdr_len) {
			packet_len += m_hdr_len;
			// for size calculation in read_cyclic
			m_hdr_len = packet_len;
		} else {
			m_hdr_len = net_len;
		}
		p_mem_rep_list[index].base_addr = base_ptr;
		p_mem_rep_list[index].byte_count[0] = packet_len;
		p_mem_rep_list[index].stride[0] = packet_len;
		p_mem_rep_list[index].mr = mr;
		m_sge_ptrs[CB_UMR_HDR] = base_ptr;
		curr_data_len = packets_num * packet_len;
		prev_addr += curr_data_len;
		index++;
		p_mem_rep_list[index].base_addr = prev_addr;
		p_mem_rep_list[index].byte_count[0] = m_payload_len;
		p_mem_rep_list[index].stride[0] = m_payload_len;
		p_mem_rep_list[index].mr = mr;
		m_sge_ptrs[CB_UMR_PAYLOAD] = prev_addr;
		index++;
		break;
	default:
		ring_logpanic("bad packet_receive_mode\n");
	}
	// use base_ptr as base_addr to corrupt user data and prevent stack
	// corruption in case of unexpected big packet
	p_mem_rep_list[index].base_addr = pad_addr;
	p_mem_rep_list[index].byte_count[0] = pad_len;
	p_mem_rep_list[index].stride[0] = 0;
	p_mem_rep_list[index].mr = dump_mr;

	// allocate empty lkey
	memset(&mrin, 0, sizeof(mrin));
	mrin.pd = m_p_ib_ctx->get_ibv_pd();
	mrin.attr.create_flags = IBV_EXP_MR_INDIRECT_KLMS;
	mrin.attr.exp_access_flags = IBV_EXP_ACCESS_LOCAL_WRITE;
	mrin.attr.max_klm_list_size = umr_blocks;
	m_p_umr_mr = ibv_exp_create_mr(&mrin);
	BULLSEYE_EXCLUDE_BLOCK_START
	if (!m_p_umr_mr) {
		ring_logdbg("Failed creating mr %m", errno);
		retval = -1;
		goto cleanup;
	}
	BULLSEYE_EXCLUDE_BLOCK_END
	memset(&m_umr_wr, 0, sizeof(m_umr_wr));
	m_umr_wr.ext_op.umr.umr_type = IBV_EXP_UMR_REPEAT;
	m_umr_wr.ext_op.umr.mem_list.rb.mem_repeat_block_list = p_mem_rep_list;
	m_umr_wr.ext_op.umr.mem_list.rb.stride_dim = ndim;
	m_umr_wr.ext_op.umr.mem_list.rb.repeat_count = &packets_num;
	m_umr_wr.exp_send_flags = IBV_EXP_SEND_INLINE;
	m_umr_wr.ext_op.umr.exp_access = IBV_EXP_ACCESS_LOCAL_WRITE;
	m_umr_wr.ext_op.umr.modified_mr = m_p_umr_mr;
	m_umr_wr.ext_op.umr.base_addr = (uint64_t)mr->addr;
	m_umr_wr.ext_op.umr.num_mrs = umr_blocks;
	m_umr_wr.exp_send_flags |= IBV_EXP_SEND_SIGNALED;
	m_umr_wr.exp_opcode = IBV_EXP_WR_UMR_FILL;

	if (!m_p_ib_ctx->post_umr_wr(m_umr_wr)) {
		ring_logerr("Failed in ibv_exp_post_send IBV_EXP_WR_UMR_FILL\n");
		// prevent removal
		m_umr_wr.exp_opcode = IBV_EXP_WR_NOP;
		retval = -1;
		goto cleanup;
	}

	m_buff_data.addr = m_umr_wr.ext_op.umr.base_addr;
	m_buff_data.length = m_stride_size * m_strides_num;
	m_buff_data.lkey = m_p_umr_mr->lkey;
cleanup:
	for (int i = 0; i < umr_blocks; i++) {
		if (p_mem_rep_list[i].stride) {
			delete[] p_mem_rep_list[i].stride;
			p_mem_rep_list[i].stride = NULL;
		}
		if (p_mem_rep_list[i].byte_count) {
			delete[] p_mem_rep_list[i].byte_count;
			p_mem_rep_list[i].byte_count = NULL;
		}
	}

	delete[] p_mem_rep_list;
	p_mem_rep_list = NULL;

	if (retval == -1) {
		remove_umr_res();
	}
	return retval;
}

void ring_eth_cb::remove_umr_res()
{
	if (m_umr_wr.exp_opcode == IBV_EXP_WR_UMR_FILL) {
		m_umr_wr.exp_opcode = IBV_EXP_WR_UMR_INVALIDATE;
		if (m_p_ib_ctx->post_umr_wr(m_umr_wr)) {
			ring_logdbg("Releasing UMR failed\n");
		}
	}

	if (m_p_umr_mr) {
		ibv_dereg_mr(m_p_umr_mr);
		m_p_umr_mr = NULL;
	}
	ring_logdbg("UMR resources removed\n");
}

int ring_eth_cb::drain_and_proccess()
{
	return 0;
}

int ring_eth_cb::poll_and_process_element_rx(uint64_t* p_cq_poll_sn,
					     void* pv_fd_ready_array)
{
	NOT_IN_USE(p_cq_poll_sn);
	NOT_IN_USE(pv_fd_ready_array);
	return 0;
}

/**
 * loop poll_cq
 * @param limit
 * @return TBD about -1 on error,
 * 	0 if cq is empty
 * 	1 if done looping
 * 	2 if need to return due to WQ or filler
 */
inline mp_loop_result ring_eth_cb::mp_loop_padded(size_t limit)
{
	struct mlx5_cqe64 *cqe64;
	uint16_t size = 0;
	uint32_t flags = 0, used_strides = 0;

	while (m_curr_packets < limit) {
		int ret = ((cq_mgr_mp *)m_p_cq_mgr_rx)->poll_mp_cq(size, used_strides,
								   flags, cqe64);
		if (size == 0) {
			ring_logfine("no packet found");
			return MP_LOOP_DRAINED;
		}
		if (unlikely(ret == -1)) {
			ring_logdbg("poll_mp_cq failed with errno %m", errno);
			return MP_LOOP_RETURN_TO_APP;
		}
		m_curr_wqe_used_strides += used_strides;
		if (unlikely(flags & VMA_MP_RQ_BAD_PACKET)) {
			if (m_curr_wqe_used_strides >= m_strides_num) {
				reload_wq();
			}
			return MP_LOOP_RETURN_TO_APP;
		}
		m_padd_mode_used_strides += used_strides;
		m_p_ring_stat->n_rx_pkt_count++;
		m_p_ring_stat->n_rx_byte_count += size;
		++m_curr_packets;
		if (unlikely(m_curr_wqe_used_strides >= m_strides_num)) {
			if (reload_wq()) {
				return MP_LOOP_RETURN_TO_APP;
			}
		}
	}
	ring_logfine("mp_loop finished all iterations");
	return MP_LOOP_LIMIT;
}

/**
 * loop poll_cq
 * @param limit
 * @return TBD about -1 on error,
 * 	0 if cq is empty
 * 	1 if done looping
 * 	2 if need to return due to WQ or filler
 */
inline mp_loop_result ring_eth_cb::mp_loop(size_t limit)
{
	struct mlx5_cqe64 *cqe64;
	uint16_t size = 0;
	uint32_t flags = 0, used_strides = 0;

	while (m_curr_packets < limit) {
		int ret = ((cq_mgr_mp *)m_p_cq_mgr_rx)->poll_mp_cq(size, used_strides,
								   flags, cqe64);
		if (size == 0) {
			ring_logfine("no packet found");
			return MP_LOOP_DRAINED;
		}
		if (unlikely(ret == -1)) {
			ring_logdbg("poll_mp_cq failed with errno %m", errno);
			return MP_LOOP_RETURN_TO_APP;
		}
		m_curr_wqe_used_strides += used_strides;
		if (unlikely(size > m_packet_size)) {
			errno = EMSGSIZE;
			ring_logerr("got unexpected packet size, expected "
				    "packet size %u but got %d, user data is "
				    "corrupted", m_packet_size, size);
			return MP_LOOP_RETURN_TO_APP;
		}
		if (unlikely(flags & VMA_MP_RQ_BAD_PACKET)) {
			if (m_curr_wqe_used_strides >= m_strides_num) {
				reload_wq();
			}
			return MP_LOOP_RETURN_TO_APP;
		}
		m_p_ring_stat->n_rx_pkt_count++;
		m_p_ring_stat->n_rx_byte_count += size;
		++m_curr_packets;
		if (unlikely(m_curr_wqe_used_strides >= m_strides_num)) {
			if (reload_wq()) {
				return MP_LOOP_RETURN_TO_APP;
			}
		}
	}
	ring_logfine("mp_loop finished all iterations");
	return MP_LOOP_LIMIT;
}

/*
 *  all WQE are contagious in memory so we need to return to the user
 *  true if last WQE was posted so we're at the end of the buffer
 *
 */
inline bool ring_eth_cb::reload_wq()
{
	// in current implementation after each WQe is used by the HW
	// the ring reloads it to the HW again that why 1 is used
	((cq_mgr_mp *)m_p_cq_mgr_rx)->update_dbell();
	((qp_mgr_mp *)m_p_qp_mgr)->post_recv(m_curr_wq, 1);
	m_curr_wq = (m_curr_wq + 1) % m_wq_count;
	m_curr_wqe_used_strides = 0;
	if (m_curr_wq == 0) {
		m_all_wqes_used_strides = 0;
		return true;
	}
	m_all_wqes_used_strides += m_strides_num;
	return false;
}

int ring_eth_cb::cyclic_buffer_read(vma_completion_cb_t &completion,
				    size_t min, size_t max, int flags)
{
	uint32_t poll_flags = 0, used_strides = 0;
	uint16_t size;
	struct mlx5_cqe64 *cqe64;

	// sanity check
	if (unlikely(min > max || max == 0 || flags != MSG_DONTWAIT)) {
		errno = EINVAL;
		ring_logdbg("Illegal values, got min: %d, max: %d, flags %d",
			    min, max, flags);
		if (flags != MSG_DONTWAIT) {
			ring_logdbg("only %d flag is currently supported",
				    MSG_DONTWAIT);
		}
		return -1;
	}
	int prev_used_strides = m_curr_wqe_used_strides;
	int ret = ((cq_mgr_mp *)m_p_cq_mgr_rx)->poll_mp_cq(size, used_strides,
							   poll_flags, cqe64);
	// empty
	if (size == 0) {
		return 0;
	}

	if (m_packet_receive_mode != PADDED_PACKET &&
	    unlikely(size > m_packet_size)) {
		errno = EMSGSIZE;
		ring_logerr("got unexpected packet size, expected "
			    "packet size %u but got %d, user data is "
			    "corrupted", m_packet_size, size);
		return -1;
	}
	if (unlikely(ret == -1)) {
		ring_logdbg("poll_mp_cq failed with errno %m", errno);
		return -1;
	}
	m_curr_wqe_used_strides += used_strides;
	m_padd_mode_used_strides += used_strides;
	// set it here because we might not have min packets avail in this run
	if (likely(!(poll_flags & VMA_MP_RQ_BAD_PACKET))) {
		m_p_ring_stat->n_rx_pkt_count++;
		m_p_ring_stat->n_rx_byte_count += size;
		if (unlikely(m_curr_payload_addr == NULL)) {
			// data is in calculated UMR location array +
			// number of strides in old WQEs (e.g. first WQE that was already consumed) +
			// number of used strides in current WQE
			prev_used_strides += m_all_wqes_used_strides;
			m_curr_payload_addr = (void *)(m_sge_ptrs[CB_UMR_PAYLOAD] +
						(uint32_t)m_payload_len * prev_used_strides);
			m_curr_hdr_ptr = (void *)(m_sge_ptrs[CB_UMR_HDR] +
						(uint32_t)m_hdr_len * prev_used_strides);
			if (completion.comp_mask & VMA_CB_MASK_TIMESTAMP) {
				convert_hw_time_to_system_time(ntohll(cqe64->timestamp),
							       &m_curr_hw_timestamp);
			}
			m_curr_packets = 1;
		} else {
			m_curr_packets++;
		}
		bool return_to_app = false;
		if (unlikely(m_curr_wqe_used_strides >= m_strides_num)) {
			return_to_app = reload_wq();
		}
		if (!return_to_app) {
			if (m_packet_receive_mode == PADDED_PACKET) {
				ret = mp_loop_padded(min);
				if (ret == MP_LOOP_LIMIT) { // there might be more to drain
					mp_loop_padded(max);
				}
			} else {
				ret = mp_loop(min);
				if (ret == MP_LOOP_LIMIT) { // there might be more to drain
					mp_loop(max);
				}
			}
			if (ret == MP_LOOP_DRAINED) { // no packets left
				((cq_mgr_mp *)m_p_cq_mgr_rx)->update_max_drain(m_curr_packets);
				return 0;
			}
		}
	}
	((cq_mgr_mp *)m_p_cq_mgr_rx)->update_max_drain(m_curr_packets);
	completion.payload_ptr = m_curr_payload_addr;
	if (m_packet_receive_mode == PADDED_PACKET) {
		// support packet taking more then one stride
		completion.payload_length = m_padd_mode_used_strides * m_stride_size;
	} else {
		completion.payload_length = m_payload_len * m_curr_packets;
	}
	completion.packets = m_curr_packets;
	completion.usr_hdr_ptr = m_curr_hdr_ptr;
	completion.usr_hdr_ptr_length = m_hdr_len * m_curr_packets;
	// hw_timestamp of first packet in batch
	completion.hw_timestamp = m_curr_hw_timestamp;
	m_curr_payload_addr = 0;
	m_padd_mode_used_strides = 0;
	ring_logdbg("Returning completion, buffer ptr %p, data size %zd, "
		    "usr hdr ptr %p usr hdr size %zd, number of packets %zd curr wqe idx %d",
		    completion.payload_ptr, completion.payload_length,
		    completion.usr_hdr_ptr, completion.usr_hdr_ptr_length,
		    m_curr_packets, m_curr_wq);
	return 0;
}

ring_eth_cb::~ring_eth_cb()
{
	struct ibv_exp_destroy_res_domain_attr attr;

	m_lock_ring_rx.lock();
	flow_udp_del_all();
	flow_tcp_del_all();
	m_lock_ring_rx.unlock();

	memset(&attr, 0, sizeof(attr));
	int res = ibv_exp_destroy_res_domain(m_p_ib_ctx->get_ibv_context(),
					     m_res_domain, &attr);
	if (res) {
		ring_logdbg("call to ibv_exp_destroy_res_domain returned %d", res);
	}

	remove_umr_res();
}
#endif /* HAVE_MP_RQ */