/* * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the * BSD license below: * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following * conditions are met: * * - Redistributions of source code must retain the above * copyright notice, this list of conditions and the following * disclaimer. * * - Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials * provided with the distribution. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include #include #include "config.h" #include "vlogger/vlogger.h" #include "utils/lock_wrapper.h" #include "vma/vma_extra.h" #include "vma/util/data_updater.h" #include "vma/util/sock_addr.h" #include "vma/util/vma_stats.h" #include "vma/util/sys_vars.h" #include "vma/util/wakeup_pipe.h" #include "vma/proto/flow_tuple.h" #include "vma/proto/mem_buf_desc.h" #include "vma/proto/dst_entry.h" #include "vma/dev/net_device_table_mgr.h" #include "vma/dev/ring_simple.h" #include "vma/dev/ring_allocation_logic.h" #include "socket_fd_api.h" #include "pkt_rcvr_sink.h" #include "pkt_sndr_source.h" #include "sock-redirect.h" #ifndef BASE_SOCKINFO_H #define BASE_SOCKINFO_H #define SI_RX_EPFD_EVENT_MAX 16 #define BYTE_TO_KB(byte_value) ((byte_value) / 125) #define KB_TO_BYTE(kbit_value) ((kbit_value) * 125) #if DEFINED_MISSING_NET_TSTAMP enum { SOF_TIMESTAMPING_TX_HARDWARE = (1<<0), SOF_TIMESTAMPING_TX_SOFTWARE = (1<<1), SOF_TIMESTAMPING_RX_HARDWARE = (1<<2), SOF_TIMESTAMPING_RX_SOFTWARE = (1<<3), SOF_TIMESTAMPING_SOFTWARE = (1<<4), SOF_TIMESTAMPING_SYS_HARDWARE = (1<<5), SOF_TIMESTAMPING_RAW_HARDWARE = (1<<6), SOF_TIMESTAMPING_MASK = (SOF_TIMESTAMPING_RAW_HARDWARE - 1) | SOF_TIMESTAMPING_RAW_HARDWARE }; #else #include #endif #ifndef SO_TIMESTAMPNS #define SO_TIMESTAMPNS 35 #endif #ifndef SO_TIMESTAMPING #define SO_TIMESTAMPING 37 #endif #ifndef SO_REUSEPORT #define SO_REUSEPORT 15 #endif struct cmsg_state { struct msghdr *mhdr; struct cmsghdr *cmhdr; size_t cmsg_bytes_consumed; }; #define NOTIFY_ON_EVENTS(context, events) context->set_events(events) struct buff_info_t { buff_info_t(){ rx_reuse.set_id("buff_info_t (%p) : rx_reuse", this); n_buff_num = 0; } int n_buff_num; descq_t rx_reuse; }; typedef struct { net_device_entry* p_nde; net_device_val* p_ndv; ring* p_ring; int refcnt; } net_device_resources_t; typedef std::tr1::unordered_map rx_net_device_map_t; /* * Sockinfo setsockopt() return values */ #define SOCKOPT_INTERNAL_VMA_SUPPORT 0 // Internal socket option, should not pass request to OS. #define SOCKOPT_NO_VMA_SUPPORT -1 // Socket option was found but not supported, error should be returned to user. #define SOCKOPT_PASS_TO_OS 1 // Should pass to TCP/UDP level or OS. namespace std { namespace tr1 { template<> class hash { public: size_t operator()(const flow_tuple_with_local_if &key) const { flow_tuple_with_local_if* tmp_key = (flow_tuple_with_local_if*)&key; return tmp_key->hash(); } }; }} typedef std::tr1::unordered_map rx_flow_map_t; typedef struct { int refcnt; buff_info_t rx_reuse_info; } ring_info_t; typedef std::tr1::unordered_map rx_ring_map_t; // see route.c in Linux kernel const uint8_t ip_tos2prio[16] = { 0, 0, 0, 0, 2, 2, 2, 2, 6, 6, 6, 6, 4, 4, 4, 4 }; class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_pipe { public: sockinfo(int fd); virtual ~sockinfo(); #if _BullseyeCoverage #pragma BullseyeCoverage off #endif // don't put mt lock around sockinfo just yet void lock(){}; void unlock() {}; #if _BullseyeCoverage #pragma BullseyeCoverage on #endif enum sockinfo_state { SOCKINFO_OPENED, SOCKINFO_CLOSING, SOCKINFO_CLOSED }; virtual void consider_rings_migration(); virtual int add_epoll_context(epfd_info *epfd); virtual void remove_epoll_context(epfd_info *epfd); inline bool tcp_flow_is_5t(void) { return m_tcp_flow_is_5t; } inline void set_tcp_flow_is_5t(void) { m_tcp_flow_is_5t = true; } inline bool set_flow_tag(uint32_t flow_tag_id) { if (flow_tag_id && (flow_tag_id != FLOW_TAG_MASK)) { m_flow_tag_id = flow_tag_id; m_flow_tag_enabled = true; return true; } m_flow_tag_id = FLOW_TAG_MASK; return false; } inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; } inline int get_rx_epfd(void) { return m_rx_epfd; } virtual bool flow_in_reuse(void) { return false;}; virtual int* get_rings_fds(int &res_length); virtual int get_rings_num(); virtual int get_socket_network_ptr(void *ptr, uint16_t &len); virtual bool check_rings() {return m_p_rx_ring ? true: false;} virtual void statistics_print(vlog_levels_t log_level = VLOG_DEBUG); uint32_t get_flow_tag_val() { return m_flow_tag_id; } inline in_protocol_t get_protocol(void) { return m_protocol; } protected: bool m_b_blocking; bool m_b_pktinfo; bool m_b_rcvtstamp; bool m_b_rcvtstampns; uint8_t m_n_tsing_flags; in_protocol_t m_protocol; lock_spin_recursive m_lock_rcv; lock_mutex m_lock_snd; lock_mutex m_rx_migration_lock; sockinfo_state m_state; // socket current state sock_addr m_bound; sock_addr m_connected; dst_entry* m_p_connected_dst_entry; in_addr_t m_so_bindtodevice_ip; socket_stats_t m_socket_stats; socket_stats_t* m_p_socket_stats; int m_rx_epfd; cache_observer m_rx_nd_observer; rx_net_device_map_t m_rx_nd_map; rx_flow_map_t m_rx_flow_map; // we either listen on ALL system cqs or bound to the specific cq ring* m_p_rx_ring; //used in TCP/UDP buff_info_t m_rx_reuse_buff; //used in TCP instead of m_rx_ring_map bool m_rx_reuse_buf_pending; //used to periodically return buffers, even if threshold was not reached bool m_rx_reuse_buf_postponed; //used to mark threshold was reached, but free was not done yet inline void set_rx_reuse_pending(bool is_pending = true) {m_rx_reuse_buf_pending = is_pending;} rx_ring_map_t m_rx_ring_map; // CQ map lock_mutex_recursive m_rx_ring_map_lock; ring_allocation_logic_rx m_ring_alloc_logic; loops_timer m_loops_timer; /** * list of pending ready packet on the Rx, * each element is a pointer to the ib_conn_mgr that holds this ready rx datagram */ int m_n_rx_pkt_ready_list_count; size_t m_rx_pkt_ready_offset; size_t m_rx_ready_byte_count; const int m_n_sysvar_rx_num_buffs_reuse; const int32_t m_n_sysvar_rx_poll_num; ring_alloc_logic_attr m_ring_alloc_log_rx; ring_alloc_logic_attr m_ring_alloc_log_tx; uint32_t m_pcp; struct { /* Track internal events to return in socketxtreme_poll() * Current design support single event for socket at a particular time */ struct ring_ec ec; struct vma_completion_t* completion; struct vma_buff_t* last_buff_lst; } m_socketxtreme; // Callback function pointer to support VMA extra API (vma_extra.h) vma_recv_callback_t m_rx_callback; void* m_rx_callback_context; // user context struct vma_rate_limit_t m_so_ratelimit; void* m_fd_context; // Context data stored with socket uint32_t m_flow_tag_id; // Flow Tag for this socket bool m_flow_tag_enabled; // for this socket uint8_t m_n_uc_ttl; // time to live bool m_tcp_flow_is_5t; // to bypass packet analysis int* m_p_rings_fds; virtual void set_blocking(bool is_blocked); virtual int fcntl(int __cmd, unsigned long int __arg); virtual int ioctl(unsigned long int __request, unsigned long int __arg); virtual int setsockopt(int __level, int __optname, const void *__optval, socklen_t __optlen); int setsockopt_kernel(int __level, int __optname, const void *__optval, socklen_t __optlen, int supported, bool allow_priv); virtual int getsockopt(int __level, int __optname, void *__optval, socklen_t *__optlen); virtual mem_buf_desc_t* get_front_m_rx_pkt_ready_list() = 0; virtual size_t get_size_m_rx_pkt_ready_list() = 0; virtual void pop_front_m_rx_pkt_ready_list() = 0; virtual void push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff) = 0; int rx_wait(int &poll_count, bool is_blocking = true); int rx_wait_helper(int &poll_count, bool is_blocking = true); void save_stats_rx_os(int bytes); void save_stats_tx_os(int bytes); void save_stats_rx_offload(int nbytes); virtual int rx_verify_available_data() = 0; virtual void update_header_field(data_updater *updater) = 0; virtual mem_buf_desc_t *get_next_desc (mem_buf_desc_t *p_desc) = 0; virtual mem_buf_desc_t* get_next_desc_peek(mem_buf_desc_t *p_desc, int& rx_pkt_ready_list_idx) = 0; virtual timestamps_t* get_socket_timestamps() = 0; virtual void update_socket_timestamps(timestamps_t * ts) = 0; virtual void post_deqeue (bool release_buff) = 0; virtual int zero_copy_rx (iovec *p_iov, mem_buf_desc_t *pdesc, int *p_flags) = 0; int register_callback(vma_recv_callback_t callback, void *context); virtual size_t handle_msg_trunc(size_t total_rx, size_t payload_size, int in_flags, int* p_out_flags); bool attach_receiver(flow_tuple_with_local_if &flow_key); bool detach_receiver(flow_tuple_with_local_if &flow_key); net_device_resources_t* create_nd_resources(const ip_address ip_local); bool destroy_nd_resources(const ip_address ip_local); void do_rings_migration(resource_allocation_key &old_key); int set_ring_attr(vma_ring_alloc_logic_attr *attr); int set_ring_attr_helper(ring_alloc_logic_attr *sock_attr, vma_ring_alloc_logic_attr *attr); // Attach to all relevant rings for offloading receive flows - always used from slow path // According to bounded information we need to attach to all UC relevant flows // If local_ip is ANY then we need to attach to all offloaded interfaces OR to the one our connected_ip is routed to bool attach_as_uc_receiver(role_t role, bool skip_rules = false); virtual void set_rx_packet_processor(void) = 0; transport_t find_target_family(role_t role, struct sockaddr *sock_addr_first, struct sockaddr *sock_addr_second = NULL); // This callback will notify that socket is ready to receive and map the cq. virtual void rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, bool is_migration = false); virtual void rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, bool is_migration = false); virtual void lock_rx_q() {m_lock_rcv.lock();} virtual void unlock_rx_q() {m_lock_rcv.unlock();} void shutdown_rx(); void destructor_helper(); int modify_ratelimit(dst_entry* p_dst_entry, struct vma_rate_limit_t &rate_limit); void move_owned_rx_ready_descs(ring* p_ring, descq_t* toq); // Move all owner's rx ready packets ro 'toq' int set_sockopt_prio(__const void *__optval, socklen_t __optlen); virtual void handle_ip_pktinfo(struct cmsg_state *cm_state) = 0; inline void handle_recv_timestamping(struct cmsg_state *cm_state); void insert_cmsg(struct cmsg_state *cm_state, int level, int type, void *data, int len); void handle_cmsg(struct msghdr * msg); void process_timestamps(mem_buf_desc_t* p_desc); virtual bool try_un_offloading(); // un-offload the socket if possible virtual inline void do_wakeup() { if (!is_socketxtreme()) { wakeup_pipe::do_wakeup(); } } inline bool is_socketxtreme() { return (m_p_rx_ring && m_p_rx_ring->is_socketxtreme()); } inline void set_events(uint64_t events) { static int enable_socketxtreme = safe_mce_sys().enable_socketxtreme; if (enable_socketxtreme && m_state == SOCKINFO_OPENED) { /* Collect all events if rx ring is enabled */ if (is_socketxtreme()) { if (m_socketxtreme.completion) { if (!m_socketxtreme.completion->events) { m_socketxtreme.completion->user_data = (uint64_t)m_fd_context; } m_socketxtreme.completion->events |= events; } else { if (!m_socketxtreme.ec.completion.events) { m_socketxtreme.ec.completion.user_data = (uint64_t)m_fd_context; m_p_rx_ring->put_ec(&m_socketxtreme.ec); } m_socketxtreme.ec.completion.events |= events; } } } socket_fd_api::notify_epoll_context((uint32_t)events); } // This function validates the ipoib's properties // Input params: // 1. IF name (can be alias) // 2. IF flags // 3. general path to ipoib property file (for example: /sys/class/net/%s/mtu) // 4. the expected value of the property // 5. size of the property // Output params: // 1. property sysfs filename // 2. physical IF name (stripped alias) // Return Value // Type: INT // Val: -1 Reading from the sys file failed // 1 Reading succeeded but the actual prop value != expected // 0 Reading succeeded and acutal ptop value == expected one //TODO need to copy this function from util //int validate_ipoib_prop(char* ifname, unsigned int ifflags, const char param_file[], const char *val, int size, char *filename, char * base_ifname); inline void fetch_peer_info(sockaddr_in *p_peer_addr, sockaddr_in *__from, socklen_t *__fromlen) { *__from = *p_peer_addr; *__fromlen = sizeof(sockaddr_in); } inline int dequeue_packet(iovec *p_iov, ssize_t sz_iov, sockaddr_in *__from, socklen_t *__fromlen, int in_flags, int *p_out_flags) { mem_buf_desc_t *pdesc; int total_rx = 0; uint32_t nbytes, pos; bool relase_buff = true; bool is_peek = in_flags & MSG_PEEK; int rx_pkt_ready_list_idx = 1; int rx_pkt_ready_offset = m_rx_pkt_ready_offset; pdesc = get_front_m_rx_pkt_ready_list(); void *iov_base = (uint8_t*)pdesc->rx.frag.iov_base + m_rx_pkt_ready_offset; size_t bytes_left = pdesc->rx.frag.iov_len - m_rx_pkt_ready_offset; size_t payload_size = pdesc->rx.sz_payload; if (__from && __fromlen) fetch_peer_info(&pdesc->rx.src, __from, __fromlen); if (in_flags & MSG_VMA_ZCOPY) { relase_buff = false; total_rx = zero_copy_rx(p_iov, pdesc, p_out_flags); if (unlikely(total_rx < 0)) return -1; m_rx_pkt_ready_offset = 0; } else { for (int i = 0; i < sz_iov && pdesc; i++) { pos = 0; while (pos < p_iov[i].iov_len && pdesc) { nbytes = p_iov[i].iov_len - pos; if (nbytes > bytes_left) nbytes = bytes_left; memcpy((char *)(p_iov[i].iov_base) + pos, iov_base, nbytes); pos += nbytes; total_rx += nbytes; m_rx_pkt_ready_offset += nbytes; bytes_left -= nbytes; iov_base = (uint8_t*)iov_base + nbytes; if (m_b_rcvtstamp || m_n_tsing_flags) update_socket_timestamps(&pdesc->rx.timestamps); if(bytes_left <= 0) { if (unlikely(is_peek)) { pdesc = get_next_desc_peek(pdesc, rx_pkt_ready_list_idx); } else { pdesc = get_next_desc(pdesc); } m_rx_pkt_ready_offset = 0; if (pdesc) { iov_base = pdesc->rx.frag.iov_base; bytes_left = pdesc->rx.frag.iov_len; } } } } } if (unlikely(is_peek)) { m_rx_pkt_ready_offset = rx_pkt_ready_offset; //if MSG_PEEK is on, m_rx_pkt_ready_offset must be zero-ed //save_stats_rx_offload(total_rx); //TODO?? } else { m_rx_ready_byte_count -= total_rx; m_p_socket_stats->n_rx_ready_byte_count -= total_rx; post_deqeue(relase_buff); save_stats_rx_offload(total_rx); } total_rx = handle_msg_trunc(total_rx, payload_size, in_flags, p_out_flags); return total_rx; } inline void reuse_buffer(mem_buf_desc_t *buff) { set_rx_reuse_pending(false); ring* p_ring = buff->p_desc_owner->get_parent(); rx_ring_map_t::iterator iter = m_rx_ring_map.find(p_ring); if(likely(iter != m_rx_ring_map.end())){ descq_t *rx_reuse = &iter->second->rx_reuse_info.rx_reuse; int& n_buff_num = iter->second->rx_reuse_info.n_buff_num; rx_reuse->push_back(buff); n_buff_num += buff->rx.n_frags; if(n_buff_num < m_n_sysvar_rx_num_buffs_reuse){ return; } if(n_buff_num >= 2 * m_n_sysvar_rx_num_buffs_reuse){ if (p_ring->reclaim_recv_buffers(rx_reuse)) { n_buff_num = 0; } else { g_buffer_pool_rx->put_buffers_after_deref_thread_safe(rx_reuse); n_buff_num = 0; } m_rx_reuse_buf_postponed = false; } else { m_rx_reuse_buf_postponed = true; } } else{ // Retuned buffer to global pool when owner can't be found // In case ring was deleted while buffers where still queued vlog_printf(VLOG_DEBUG, "Buffer owner not found\n"); // Awareness: these are best efforts: decRef without lock in case no CQ if(buff->dec_ref_count() <= 1 && (buff->lwip_pbuf.pbuf.ref-- <= 1)) g_buffer_pool_rx->put_buffers_thread_safe(buff); } } inline void move_owned_descs(ring* p_ring, descq_t *toq, descq_t *fromq) { // Assume locked by owner!!! mem_buf_desc_t *temp; const size_t size = fromq->size(); for (size_t i = 0 ; i < size; i++) { temp = fromq->front(); fromq->pop_front(); if (p_ring->is_member(temp->p_desc_owner)) toq->push_back(temp); else fromq->push_back(temp); } } static const char * setsockopt_so_opt_to_str(int opt) { switch (opt) { case SO_REUSEADDR: return "SO_REUSEADDR"; case SO_REUSEPORT: return "SO_REUSEPORT"; case SO_BROADCAST: return "SO_BROADCAST"; case SO_RCVBUF: return "SO_RCVBUF"; case SO_SNDBUF: return "SO_SNDBUF"; case SO_TIMESTAMP: return "SO_TIMESTAMP"; case SO_TIMESTAMPNS: return "SO_TIMESTAMPNS"; case SO_BINDTODEVICE: return "SO_BINDTODEVICE"; case SO_VMA_RING_ALLOC_LOGIC: return "SO_VMA_RING_ALLOC_LOGIC"; case SO_MAX_PACING_RATE: return "SO_MAX_PACING_RATE"; case SO_VMA_FLOW_TAG: return "SO_VMA_FLOW_TAG"; case SO_VMA_SHUTDOWN_RX: return "SO_VMA_SHUTDOWN_RX"; default: break; } return "UNKNOWN SO opt"; } inline void move_not_owned_descs(ring* p_ring, descq_t *toq, descq_t *fromq) { // Assume locked by owner!!! mem_buf_desc_t *temp; const size_t size = fromq->size(); for (size_t i = 0 ; i < size; i++) { temp = fromq->front(); fromq->pop_front(); if (p_ring->is_member(temp->p_desc_owner)) fromq->push_back(temp); else toq->push_back(temp); } } int get_sock_by_L3_L4(in_protocol_t protocol, in_addr_t ip, in_port_t port); ////////////////////////////////////////////////////////////////// int handle_exception_flow(){ if (safe_mce_sys().exception_handling.is_suit_un_offloading()) { try_un_offloading(); } if (safe_mce_sys().exception_handling == vma_exception_handling::MODE_RETURN_ERROR) { errno = EINVAL; return -1; } if (safe_mce_sys().exception_handling == vma_exception_handling::MODE_ABORT) { return -2; } return 0; } ////////////////////////////////////////////////////////////////// }; #endif /* BASE_SOCKINFO_H */