/*
* Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
* COPYING in the main directory of this source tree, or the
* BSD license below:
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above
* copyright notice, this list of conditions and the following
* disclaimer.
*
* - Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include <tr1/unordered_map>
#include <ifaddrs.h>
#include "config.h"
#include "vlogger/vlogger.h"
#include "utils/lock_wrapper.h"
#include "vma/vma_extra.h"
#include "vma/util/data_updater.h"
#include "vma/util/sock_addr.h"
#include "vma/util/vma_stats.h"
#include "vma/util/sys_vars.h"
#include "vma/util/wakeup_pipe.h"
#include "vma/proto/flow_tuple.h"
#include "vma/proto/mem_buf_desc.h"
#include "vma/proto/dst_entry.h"
#include "vma/dev/net_device_table_mgr.h"
#include "vma/dev/ring_simple.h"
#include "vma/dev/ring_allocation_logic.h"
#include "socket_fd_api.h"
#include "pkt_rcvr_sink.h"
#include "pkt_sndr_source.h"
#include "sock-redirect.h"
#ifndef BASE_SOCKINFO_H
#define BASE_SOCKINFO_H
#define SI_RX_EPFD_EVENT_MAX 16
#define BYTE_TO_KB(byte_value) ((byte_value) / 125)
#define KB_TO_BYTE(kbit_value) ((kbit_value) * 125)
#if DEFINED_MISSING_NET_TSTAMP
enum {
SOF_TIMESTAMPING_TX_HARDWARE = (1<<0),
SOF_TIMESTAMPING_TX_SOFTWARE = (1<<1),
SOF_TIMESTAMPING_RX_HARDWARE = (1<<2),
SOF_TIMESTAMPING_RX_SOFTWARE = (1<<3),
SOF_TIMESTAMPING_SOFTWARE = (1<<4),
SOF_TIMESTAMPING_SYS_HARDWARE = (1<<5),
SOF_TIMESTAMPING_RAW_HARDWARE = (1<<6),
SOF_TIMESTAMPING_MASK =
(SOF_TIMESTAMPING_RAW_HARDWARE - 1) |
SOF_TIMESTAMPING_RAW_HARDWARE
};
#else
#include <linux/net_tstamp.h>
#endif
#ifndef SO_TIMESTAMPNS
#define SO_TIMESTAMPNS 35
#endif
#ifndef SO_TIMESTAMPING
#define SO_TIMESTAMPING 37
#endif
#ifndef SO_REUSEPORT
#define SO_REUSEPORT 15
#endif
struct cmsg_state
{
struct msghdr *mhdr;
struct cmsghdr *cmhdr;
size_t cmsg_bytes_consumed;
};
#define NOTIFY_ON_EVENTS(context, events) context->set_events(events)
struct buff_info_t {
buff_info_t(){
rx_reuse.set_id("buff_info_t (%p) : rx_reuse", this);
n_buff_num = 0;
}
int n_buff_num;
descq_t rx_reuse;
};
typedef struct {
net_device_entry* p_nde;
net_device_val* p_ndv;
ring* p_ring;
int refcnt;
} net_device_resources_t;
typedef std::tr1::unordered_map<in_addr_t, net_device_resources_t> rx_net_device_map_t;
/*
* Sockinfo setsockopt() return values
*/
#define SOCKOPT_INTERNAL_VMA_SUPPORT 0 // Internal socket option, should not pass request to OS.
#define SOCKOPT_NO_VMA_SUPPORT -1 // Socket option was found but not supported, error should be returned to user.
#define SOCKOPT_PASS_TO_OS 1 // Should pass to TCP/UDP level or OS.
namespace std { namespace tr1 {
template<>
class hash<flow_tuple_with_local_if>
{
public:
size_t operator()(const flow_tuple_with_local_if &key) const
{
flow_tuple_with_local_if* tmp_key = (flow_tuple_with_local_if*)&key;
return tmp_key->hash();
}
};
}}
typedef std::tr1::unordered_map<flow_tuple_with_local_if, ring*> rx_flow_map_t;
typedef struct {
int refcnt;
buff_info_t rx_reuse_info;
} ring_info_t;
typedef std::tr1::unordered_map<ring*, ring_info_t*> rx_ring_map_t;
// see route.c in Linux kernel
const uint8_t ip_tos2prio[16] = {
0, 0, 0, 0,
2, 2, 2, 2,
6, 6, 6, 6,
4, 4, 4, 4
};
class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_pipe
{
public:
sockinfo(int fd);
virtual ~sockinfo();
#if _BullseyeCoverage
#pragma BullseyeCoverage off
#endif
// don't put mt lock around sockinfo just yet
void lock(){};
void unlock() {};
#if _BullseyeCoverage
#pragma BullseyeCoverage on
#endif
enum sockinfo_state {
SOCKINFO_OPENED,
SOCKINFO_CLOSING,
SOCKINFO_CLOSED
};
virtual void consider_rings_migration();
virtual int add_epoll_context(epfd_info *epfd);
virtual void remove_epoll_context(epfd_info *epfd);
inline bool tcp_flow_is_5t(void) { return m_tcp_flow_is_5t; }
inline void set_tcp_flow_is_5t(void) { m_tcp_flow_is_5t = true; }
inline bool set_flow_tag(uint32_t flow_tag_id) {
if (flow_tag_id && (flow_tag_id != FLOW_TAG_MASK)) {
m_flow_tag_id = flow_tag_id;
m_flow_tag_enabled = true;
return true;
}
m_flow_tag_id = FLOW_TAG_MASK;
return false;
}
inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; }
inline int get_rx_epfd(void) { return m_rx_epfd; }
virtual bool flow_in_reuse(void) { return false;};
virtual int* get_rings_fds(int &res_length);
virtual int get_rings_num();
virtual int get_socket_network_ptr(void *ptr, uint16_t &len);
virtual bool check_rings() {return m_p_rx_ring ? true: false;}
virtual void statistics_print(vlog_levels_t log_level = VLOG_DEBUG);
uint32_t get_flow_tag_val() { return m_flow_tag_id; }
inline in_protocol_t get_protocol(void) { return m_protocol; }
protected:
bool m_b_blocking;
bool m_b_pktinfo;
bool m_b_rcvtstamp;
bool m_b_rcvtstampns;
uint8_t m_n_tsing_flags;
in_protocol_t m_protocol;
lock_spin_recursive m_lock_rcv;
lock_mutex m_lock_snd;
lock_mutex m_rx_migration_lock;
sockinfo_state m_state; // socket current state
sock_addr m_bound;
sock_addr m_connected;
dst_entry* m_p_connected_dst_entry;
in_addr_t m_so_bindtodevice_ip;
socket_stats_t m_socket_stats;
socket_stats_t* m_p_socket_stats;
int m_rx_epfd;
cache_observer m_rx_nd_observer;
rx_net_device_map_t m_rx_nd_map;
rx_flow_map_t m_rx_flow_map;
// we either listen on ALL system cqs or bound to the specific cq
ring* m_p_rx_ring; //used in TCP/UDP
buff_info_t m_rx_reuse_buff; //used in TCP instead of m_rx_ring_map
bool m_rx_reuse_buf_pending; //used to periodically return buffers, even if threshold was not reached
bool m_rx_reuse_buf_postponed; //used to mark threshold was reached, but free was not done yet
inline void set_rx_reuse_pending(bool is_pending = true) {m_rx_reuse_buf_pending = is_pending;}
rx_ring_map_t m_rx_ring_map; // CQ map
lock_mutex_recursive m_rx_ring_map_lock;
ring_allocation_logic_rx m_ring_alloc_logic;
loops_timer m_loops_timer;
/**
* list of pending ready packet on the Rx,
* each element is a pointer to the ib_conn_mgr that holds this ready rx datagram
*/
int m_n_rx_pkt_ready_list_count;
size_t m_rx_pkt_ready_offset;
size_t m_rx_ready_byte_count;
const int m_n_sysvar_rx_num_buffs_reuse;
const int32_t m_n_sysvar_rx_poll_num;
ring_alloc_logic_attr m_ring_alloc_log_rx;
ring_alloc_logic_attr m_ring_alloc_log_tx;
uint32_t m_pcp;
struct {
/* Track internal events to return in socketxtreme_poll()
* Current design support single event for socket at a particular time
*/
struct ring_ec ec;
struct vma_completion_t* completion;
struct vma_buff_t* last_buff_lst;
} m_socketxtreme;
// Callback function pointer to support VMA extra API (vma_extra.h)
vma_recv_callback_t m_rx_callback;
void* m_rx_callback_context; // user context
struct vma_rate_limit_t m_so_ratelimit;
void* m_fd_context; // Context data stored with socket
uint32_t m_flow_tag_id; // Flow Tag for this socket
bool m_flow_tag_enabled; // for this socket
uint8_t m_n_uc_ttl; // time to live
bool m_tcp_flow_is_5t; // to bypass packet analysis
int* m_p_rings_fds;
virtual void set_blocking(bool is_blocked);
virtual int fcntl(int __cmd, unsigned long int __arg);
virtual int ioctl(unsigned long int __request, unsigned long int __arg);
virtual int setsockopt(int __level, int __optname, const void *__optval, socklen_t __optlen);
int setsockopt_kernel(int __level, int __optname, const void *__optval, socklen_t __optlen, int supported, bool allow_priv);
virtual int getsockopt(int __level, int __optname, void *__optval, socklen_t *__optlen);
virtual mem_buf_desc_t* get_front_m_rx_pkt_ready_list() = 0;
virtual size_t get_size_m_rx_pkt_ready_list() = 0;
virtual void pop_front_m_rx_pkt_ready_list() = 0;
virtual void push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff) = 0;
int rx_wait(int &poll_count, bool is_blocking = true);
int rx_wait_helper(int &poll_count, bool is_blocking = true);
void save_stats_rx_os(int bytes);
void save_stats_tx_os(int bytes);
void save_stats_rx_offload(int nbytes);
virtual int rx_verify_available_data() = 0;
virtual void update_header_field(data_updater *updater) = 0;
virtual mem_buf_desc_t *get_next_desc (mem_buf_desc_t *p_desc) = 0;
virtual mem_buf_desc_t* get_next_desc_peek(mem_buf_desc_t *p_desc, int& rx_pkt_ready_list_idx) = 0;
virtual timestamps_t* get_socket_timestamps() = 0;
virtual void update_socket_timestamps(timestamps_t * ts) = 0;
virtual void post_deqeue (bool release_buff) = 0;
virtual int zero_copy_rx (iovec *p_iov, mem_buf_desc_t *pdesc, int *p_flags) = 0;
int register_callback(vma_recv_callback_t callback, void *context);
virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags);
bool attach_receiver(flow_tuple_with_local_if &flow_key);
bool detach_receiver(flow_tuple_with_local_if &flow_key);
net_device_resources_t* create_nd_resources(const ip_address ip_local);
bool destroy_nd_resources(const ip_address ip_local);
void do_rings_migration(resource_allocation_key &old_key);
int set_ring_attr(vma_ring_alloc_logic_attr *attr);
int set_ring_attr_helper(ring_alloc_logic_attr *sock_attr, vma_ring_alloc_logic_attr *attr);
// Attach to all relevant rings for offloading receive flows - always used from slow path
// According to bounded information we need to attach to all UC relevant flows
// If local_ip is ANY then we need to attach to all offloaded interfaces OR to the one our connected_ip is routed to
bool attach_as_uc_receiver(role_t role, bool skip_rules = false);
virtual void set_rx_packet_processor(void) = 0;
transport_t find_target_family(role_t role, struct sockaddr *sock_addr_first, struct sockaddr *sock_addr_second = NULL);
// This callback will notify that socket is ready to receive and map the cq.
virtual void rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, bool is_migration = false);
virtual void rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, bool is_migration = false);
virtual void lock_rx_q() {m_lock_rcv.lock();}
virtual void unlock_rx_q() {m_lock_rcv.unlock();}
void shutdown_rx();
void destructor_helper();
int modify_ratelimit(dst_entry* p_dst_entry, struct vma_rate_limit_t &rate_limit);
void move_owned_rx_ready_descs(ring* p_ring, descq_t* toq); // Move all owner's rx ready packets ro 'toq'
int set_sockopt_prio(__const void *__optval, socklen_t __optlen);
virtual void handle_ip_pktinfo(struct cmsg_state *cm_state) = 0;
inline void handle_recv_timestamping(struct cmsg_state *cm_state);
void insert_cmsg(struct cmsg_state *cm_state, int level, int type, void *data, int len);
void handle_cmsg(struct msghdr * msg);
void process_timestamps(mem_buf_desc_t* p_desc);
virtual bool try_un_offloading(); // un-offload the socket if possible
virtual inline void do_wakeup() {
if (!is_socketxtreme()) {
wakeup_pipe::do_wakeup();
}
}
inline bool is_socketxtreme() {
return (m_p_rx_ring && m_p_rx_ring->is_socketxtreme());
}
inline void set_events(uint64_t events) {
static int enable_socketxtreme = safe_mce_sys().enable_socketxtreme;
if (enable_socketxtreme && m_state == SOCKINFO_OPENED) {
/* Collect all events if rx ring is enabled */
if (is_socketxtreme()) {
if (m_socketxtreme.completion) {
if (!m_socketxtreme.completion->events) {
m_socketxtreme.completion->user_data = (uint64_t)m_fd_context;
}
m_socketxtreme.completion->events |= events;
}
else {
if (!m_socketxtreme.ec.completion.events) {
m_socketxtreme.ec.completion.user_data = (uint64_t)m_fd_context;
m_p_rx_ring->put_ec(&m_socketxtreme.ec);
}
m_socketxtreme.ec.completion.events |= events;
}
}
}
socket_fd_api::notify_epoll_context((uint32_t)events);
}
// This function validates the ipoib's properties
// Input params:
// 1. IF name (can be alias)
// 2. IF flags
// 3. general path to ipoib property file (for example: /sys/class/net/%s/mtu)
// 4. the expected value of the property
// 5. size of the property
// Output params:
// 1. property sysfs filename
// 2. physical IF name (stripped alias)
// Return Value
// Type: INT
// Val: -1 Reading from the sys file failed
// 1 Reading succeeded but the actual prop value != expected
// 0 Reading succeeded and acutal ptop value == expected one
//TODO need to copy this function from util
//int validate_ipoib_prop(char* ifname, unsigned int ifflags, const char param_file[], const char *val, int size, char *filename, char * base_ifname);
inline void fetch_peer_info(sockaddr_in *p_peer_addr, sockaddr_in *__from, socklen_t *__fromlen)
{
*__from = *p_peer_addr;
*__fromlen = sizeof(sockaddr_in);
}
inline int dequeue_packet(iovec *p_iov, ssize_t sz_iov,
sockaddr_in *__from, socklen_t *__fromlen,
int in_flags, int *p_out_flags)
{
mem_buf_desc_t *pdesc;
int total_rx = 0;
uint32_t nbytes, pos;
bool relase_buff = true;
bool is_peek = in_flags & MSG_PEEK;
int rx_pkt_ready_list_idx = 1;
int rx_pkt_ready_offset = m_rx_pkt_ready_offset;
pdesc = get_front_m_rx_pkt_ready_list();
void *iov_base = (uint8_t*)pdesc->rx.frag.iov_base + m_rx_pkt_ready_offset;
size_t bytes_left = pdesc->rx.frag.iov_len - m_rx_pkt_ready_offset;
size_t payload_size = pdesc->rx.sz_payload;
if (__from && __fromlen)
fetch_peer_info(&pdesc->rx.src, __from, __fromlen);
if (in_flags & MSG_VMA_ZCOPY) {
relase_buff = false;
total_rx = zero_copy_rx(p_iov, pdesc, p_out_flags);
if (unlikely(total_rx < 0))
return -1;
m_rx_pkt_ready_offset = 0;
}
else {
for (int i = 0; i < sz_iov && pdesc; i++) {
pos = 0;
while (pos < p_iov[i].iov_len && pdesc) {
nbytes = p_iov[i].iov_len - pos;
if (nbytes > bytes_left) nbytes = bytes_left;
memcpy((char *)(p_iov[i].iov_base) + pos, iov_base, nbytes);
pos += nbytes;
total_rx += nbytes;
m_rx_pkt_ready_offset += nbytes;
bytes_left -= nbytes;
iov_base = (uint8_t*)iov_base + nbytes;
if (m_b_rcvtstamp || m_n_tsing_flags) update_socket_timestamps(&pdesc->rx.timestamps);
if(bytes_left <= 0) {
if (unlikely(is_peek)) {
pdesc = get_next_desc_peek(pdesc, rx_pkt_ready_list_idx);
} else {
pdesc = get_next_desc(pdesc);
}
m_rx_pkt_ready_offset = 0;
if (pdesc) {
iov_base = pdesc->rx.frag.iov_base;
bytes_left = pdesc->rx.frag.iov_len;
}
}
}
}
}
if (unlikely(is_peek)) {
m_rx_pkt_ready_offset = rx_pkt_ready_offset; //if MSG_PEEK is on, m_rx_pkt_ready_offset must be zero-ed
//save_stats_rx_offload(total_rx); //TODO??
}
else {
m_rx_ready_byte_count -= total_rx;
m_p_socket_stats->n_rx_ready_byte_count -= total_rx;
post_deqeue(relase_buff);
save_stats_rx_offload(total_rx);
}
total_rx = handle_msg_trunc(total_rx, payload_size, in_flags, p_out_flags);
return total_rx;
}
inline void reuse_buffer(mem_buf_desc_t *buff)
{
set_rx_reuse_pending(false);
ring* p_ring = buff->p_desc_owner->get_parent();
rx_ring_map_t::iterator iter = m_rx_ring_map.find(p_ring);
if(likely(iter != m_rx_ring_map.end())){
descq_t *rx_reuse = &iter->second->rx_reuse_info.rx_reuse;
int& n_buff_num = iter->second->rx_reuse_info.n_buff_num;
rx_reuse->push_back(buff);
n_buff_num += buff->rx.n_frags;
if(n_buff_num < m_n_sysvar_rx_num_buffs_reuse){
return;
}
if(n_buff_num >= 2 * m_n_sysvar_rx_num_buffs_reuse){
if (p_ring->reclaim_recv_buffers(rx_reuse)) {
n_buff_num = 0;
} else {
g_buffer_pool_rx->put_buffers_after_deref_thread_safe(rx_reuse);
n_buff_num = 0;
}
m_rx_reuse_buf_postponed = false;
} else {
m_rx_reuse_buf_postponed = true;
}
}
else{
// Retuned buffer to global pool when owner can't be found
// In case ring was deleted while buffers where still queued
vlog_printf(VLOG_DEBUG, "Buffer owner not found\n");
// Awareness: these are best efforts: decRef without lock in case no CQ
if(buff->dec_ref_count() <= 1 && (buff->lwip_pbuf.pbuf.ref-- <= 1))
g_buffer_pool_rx->put_buffers_thread_safe(buff);
}
}
inline void move_owned_descs(ring* p_ring, descq_t *toq, descq_t *fromq)
{
// Assume locked by owner!!!
mem_buf_desc_t *temp;
const size_t size = fromq->size();
for (size_t i = 0 ; i < size; i++) {
temp = fromq->front();
fromq->pop_front();
if (p_ring->is_member(temp->p_desc_owner))
toq->push_back(temp);
else
fromq->push_back(temp);
}
}
static const char * setsockopt_so_opt_to_str(int opt)
{
switch (opt) {
case SO_REUSEADDR: return "SO_REUSEADDR";
case SO_REUSEPORT: return "SO_REUSEPORT";
case SO_BROADCAST: return "SO_BROADCAST";
case SO_RCVBUF: return "SO_RCVBUF";
case SO_SNDBUF: return "SO_SNDBUF";
case SO_TIMESTAMP: return "SO_TIMESTAMP";
case SO_TIMESTAMPNS: return "SO_TIMESTAMPNS";
case SO_BINDTODEVICE: return "SO_BINDTODEVICE";
case SO_VMA_RING_ALLOC_LOGIC: return "SO_VMA_RING_ALLOC_LOGIC";
case SO_MAX_PACING_RATE: return "SO_MAX_PACING_RATE";
case SO_VMA_FLOW_TAG: return "SO_VMA_FLOW_TAG";
case SO_VMA_SHUTDOWN_RX: return "SO_VMA_SHUTDOWN_RX";
default: break;
}
return "UNKNOWN SO opt";
}
inline void move_not_owned_descs(ring* p_ring, descq_t *toq, descq_t *fromq)
{
// Assume locked by owner!!!
mem_buf_desc_t *temp;
const size_t size = fromq->size();
for (size_t i = 0 ; i < size; i++) {
temp = fromq->front();
fromq->pop_front();
if (p_ring->is_member(temp->p_desc_owner))
fromq->push_back(temp);
else
toq->push_back(temp);
}
}
int get_sock_by_L3_L4(in_protocol_t protocol, in_addr_t ip, in_port_t port);
//////////////////////////////////////////////////////////////////
int handle_exception_flow(){
if (safe_mce_sys().exception_handling.is_suit_un_offloading()) {
try_un_offloading();
}
if (safe_mce_sys().exception_handling == vma_exception_handling::MODE_RETURN_ERROR) {
errno = EINVAL;
return -1;
}
if (safe_mce_sys().exception_handling == vma_exception_handling::MODE_ABORT) {
return -2;
}
return 0;
}
//////////////////////////////////////////////////////////////////
};
#endif /* BASE_SOCKINFO_H */