/** * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED. * See file LICENSE for terms. */ #ifndef UCT_TCP_MD_H #define UCT_TCP_MD_H #include #include #include #include #include #include #include #include #include #define UCT_TCP_NAME "tcp" #define UCT_TCP_CONFIG_PREFIX "TCP_" /* Magic number that is used by TCP to identify its peers */ #define UCT_TCP_MAGIC_NUMBER 0xCAFEBABE12345678lu /* Maximum number of events to wait on event set */ #define UCT_TCP_MAX_EVENTS 16 /* How long should be string to keep [%s:%s] string * where %s value can be -/Tx/Rx */ #define UCT_TCP_EP_CTX_CAPS_STR_MAX 8 /* How many IOVs are needed to keep AM/PUT Zcopy service data * (TCP protocol and user's AM (or PUT) headers) */ #define UCT_TCP_EP_ZCOPY_SERVICE_IOV_COUNT 2 /* How many IOVs are needed to do AM Short * (TCP protocol and user's AM headers, payload) */ #define UCT_TCP_EP_AM_SHORTV_IOV_COUNT 3 /* Maximum size of a data that can be sent by PUT Zcopy * operation */ #define UCT_TCP_EP_PUT_ZCOPY_MAX SIZE_MAX /* Length of a data that is used by PUT protocol */ #define UCT_TCP_EP_PUT_SERVICE_LENGTH (sizeof(uct_tcp_am_hdr_t) + \ sizeof(uct_tcp_ep_put_req_hdr_t)) #define UCT_TCP_CONFIG_MAX_CONN_RETRIES "MAX_CONN_RETRIES" /** * TCP context type */ typedef enum uct_tcp_ep_ctx_type { /* EP is connected to a peer to send data. This EP is managed * by a user and TCP mustn't free this EP even if connection * is broken. */ UCT_TCP_EP_CTX_TYPE_TX, /* EP is connected to a peer to receive data. If only RX is set * on a given EP, it is hidden from a user (i.e. the user is unable * to do any operation on that EP) and TCP is responsible to * free memory allocating for this EP. */ UCT_TCP_EP_CTX_TYPE_RX, /* Additional flags that controls EP behavior: */ /* - Zcopy TX operation is in progress on a given EP. */ UCT_TCP_EP_CTX_TYPE_ZCOPY_TX, /* - PUT RX operation is in progress on a given EP. */ UCT_TCP_EP_CTX_TYPE_PUT_RX, /* - PUT TX operation is waiting for an ACK on a given EP */ UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK, /* - PUT RX operation is waiting for resources to send an ACK * for received PUT operations on a given EP */ UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK } uct_tcp_ep_ctx_type_t; /** * TCP endpoint connection state */ typedef enum uct_tcp_ep_conn_state { /* EP is unable to communicate with a peer's EP - connections establishment * was unsuccessful or detected hangup during communications. */ UCT_TCP_EP_CONN_STATE_CLOSED, /* EP is connecting to a peer's EP, i.e. connect() was called on non-blocking * socket and returned this call returned that an operation is in progress. * After it is done, it sends `UCT_TCP_CM_CONN_REQ` to the peer. * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */ UCT_TCP_EP_CONN_STATE_CONNECTING, /* EP is receiving the magic number in order to verify a peer. EP is moved * to this state after accept() completed. */ UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER, /* EP is accepting connection from a peer, i.e. accept() returns socket fd * on which a connection was accepted, this EP was created using this socket * fd and the magic number was received and verified by EP and now it is * waiting for `UCT_TCP_CM_CONN_REQ` from a peer. */ UCT_TCP_EP_CONN_STATE_ACCEPTING, /* EP is waiting for `UCT_TCP_CM_CONN_ACK` message from a peer after sending * `UCT_TCP_CM_CONN_REQ`. * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */ UCT_TCP_EP_CONN_STATE_WAITING_ACK, /* EP is waiting for a connection and `UCT_TCP_CM_CONN_REQ` message from * a peer after simultaneous connection resolution between them. This EP * is a "winner" of the resolution, but no RX capability on this PR (i.e. * no `UCT_TCP_CM_CONN_REQ` message was received from the peer). EP is moved * to `UCT_TCP_EP_CONN_STATE_CONNECTED` state upon receiving this message. * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */ UCT_TCP_EP_CONN_STATE_WAITING_REQ, /* EP is connected to a peer and they can communicate with each other. */ UCT_TCP_EP_CONN_STATE_CONNECTED } uct_tcp_ep_conn_state_t; /* Forward declaration */ typedef struct uct_tcp_ep uct_tcp_ep_t; typedef unsigned (*uct_tcp_ep_progress_t)(uct_tcp_ep_t *ep); static inline int uct_tcp_khash_sockaddr_in_equal(struct sockaddr_in sa1, struct sockaddr_in sa2) { ucs_status_t status; int cmp; cmp = ucs_sockaddr_cmp((const struct sockaddr*)&sa1, (const struct sockaddr*)&sa2, &status); ucs_assert(status == UCS_OK); return !cmp; } static inline uint32_t uct_tcp_khash_sockaddr_in_hash(struct sockaddr_in sa) { ucs_status_t UCS_V_UNUSED status; size_t addr_size; status = ucs_sockaddr_sizeof((const struct sockaddr*)&sa, &addr_size); ucs_assert(status == UCS_OK); return ucs_crc32(0, (const void *)&sa, addr_size); } KHASH_INIT(uct_tcp_cm_eps, struct sockaddr_in, ucs_list_link_t*, 1, uct_tcp_khash_sockaddr_in_hash, uct_tcp_khash_sockaddr_in_equal); /** * TCP Connection Manager state */ typedef struct uct_tcp_cm_state { const char *name; /* CM state name */ uct_tcp_ep_progress_t tx_progress; /* TX progress function */ uct_tcp_ep_progress_t rx_progress; /* RX progress function */ } uct_tcp_cm_state_t; /** * TCP Connection Manager event */ typedef enum uct_tcp_cm_conn_event { /* Connection request from a EP that has TX capability to a EP that * has to be able to receive AM data (i.e. has to have RX capability). */ UCT_TCP_CM_CONN_REQ = UCS_BIT(0), /* Connection acknowledgment from a EP that accepts a conenction from * initiator of a connection request. */ UCT_TCP_CM_CONN_ACK = UCS_BIT(1), /* Request for waiting of a connection request. * The mesage is not sent separately (only along with a connection * acknowledgment.) */ UCT_TCP_CM_CONN_WAIT_REQ = UCS_BIT(2), /* Connection acknowledgment + Connection request. The mesasge is sent * from a EP that accepts remote conenction when it was in * `UCT_TCP_EP_CONN_STATE_CONNECTING` state (i.e. original * `UCT_TCP_CM_CONN_REQ` wasn't sent yet) and want to have RX capability * on a peer's EP in order to send AM data. */ UCT_TCP_CM_CONN_ACK_WITH_REQ = (UCT_TCP_CM_CONN_REQ | UCT_TCP_CM_CONN_ACK), /* Connection acknowledgment + Request for waiting of a connection request. * The message is sent from a EP that accepts remote conenction when it was * in `UCT_TCP_EP_CONN_STATE_WAITING_ACK` state (i.e. original * `UCT_TCP_CM_CONN_REQ` was sent) and want to have RX capability on a * peer's EP in order to send AM data. */ UCT_TCP_CM_CONN_ACK_WITH_WAIT_REQ = (UCT_TCP_CM_CONN_WAIT_REQ | UCT_TCP_CM_CONN_ACK) } uct_tcp_cm_conn_event_t; /** * TCP connection request packet */ typedef struct uct_tcp_cm_conn_req_pkt { uct_tcp_cm_conn_event_t event; /* Connection event ID */ struct sockaddr_in iface_addr; /* Socket address of UCT local iface */ } UCS_S_PACKED uct_tcp_cm_conn_req_pkt_t; /** * TCP active message header */ typedef struct uct_tcp_am_hdr { uint8_t am_id; /* UCT AM ID of an AM operation */ uint32_t length; /* Length of data sent in an AM operation */ } UCS_S_PACKED uct_tcp_am_hdr_t; /** * AM IDs reserved for TCP protocols */ typedef enum uct_tcp_ep_am_id { /* AM ID reserved for TCP internal Connection Manager messages */ UCT_TCP_EP_CM_AM_ID = UCT_AM_ID_MAX, /* AM ID reserved for TCP internal PUT REQ message */ UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1, /* AM ID reserved for TCP internal PUT ACK message */ UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2 } uct_tcp_ep_am_id_t; /** * TCP PUT request header */ typedef struct uct_tcp_ep_put_req_hdr { uint64_t addr; /* Address of a remote memory buffer */ size_t length; /* Length of a remote memory buffer */ uint32_t sn; /* Sequence number of the current PUT operation */ } UCS_S_PACKED uct_tcp_ep_put_req_hdr_t; /** * TCP PUT acknowledge header */ typedef struct uct_tcp_ep_put_ack_hdr { uint32_t sn; /* Sequence number of the last acked PUT operation */ } UCS_S_PACKED uct_tcp_ep_put_ack_hdr_t; /** * TCP PUT completion */ typedef struct uct_tcp_ep_put_completion { uct_completion_t *comp; /* User's completion passed to * uct_ep_flush */ uint32_t wait_put_sn; /* Sequence number of the last unacked * PUT operations that was in-progress * when uct_ep_flush was called */ ucs_queue_elem_t elem; /* Element to insert completion into * TCP EP PUT operation pending queue */ } uct_tcp_ep_put_completion_t; /** * TCP endpoint communication context */ typedef struct uct_tcp_ep_ctx { uint32_t put_sn; /* Sequence number of last sent * or received PUT operation */ void *buf; /* Partial send/recv data */ size_t length; /* How much data in the buffer */ size_t offset; /* How much data was sent (TX) or was * handled after receiving (RX) */ } uct_tcp_ep_ctx_t; /** * TCP AM/PUT Zcopy communication context mapped to * buffer from TCP EP context */ typedef struct uct_tcp_ep_zcopy_tx { uct_tcp_am_hdr_t super; /* UCT TCP AM header */ uct_completion_t *comp; /* Local UCT completion object */ size_t iov_index; /* Current IOV index */ size_t iov_cnt; /* Number of IOVs that should be sent */ struct iovec iov[0]; /* IOVs that should be sent */ } uct_tcp_ep_zcopy_tx_t; /** * TCP endpoint */ struct uct_tcp_ep { uct_base_ep_t super; uint8_t ctx_caps; /* Which contexts are supported */ int fd; /* Socket file descriptor */ uct_tcp_ep_conn_state_t conn_state; /* State of connection with peer */ unsigned conn_retries; /* Number of connection attempts done */ int events; /* Current notifications */ uct_tcp_ep_ctx_t tx; /* TX resources */ uct_tcp_ep_ctx_t rx; /* RX resources */ struct sockaddr_in peer_addr; /* Remote iface addr */ ucs_queue_head_t pending_q; /* Pending operations */ ucs_queue_head_t put_comp_q; /* Flush completions waiting for * outstanding PUTs acknowledgment */ ucs_list_link_t list; /* List element to insert into TCP EP list */ }; /** * TCP interface */ typedef struct uct_tcp_iface { uct_base_iface_t super; /* Parent class */ int listen_fd; /* Server socket */ khash_t(uct_tcp_cm_eps) ep_cm_map; /* Map of endpoints that don't * have one of the context cap */ ucs_list_link_t ep_list; /* List of endpoints */ char if_name[IFNAMSIZ]; /* Network interface name */ ucs_sys_event_set_t *event_set; /* Event set identifier */ ucs_mpool_t tx_mpool; /* TX memory pool */ ucs_mpool_t rx_mpool; /* RX memory pool */ size_t outstanding; /* How much data in the EP send buffers * + how many non-blocking connections * are in progress + how many EPs are * waiting for PUT Zcopy operation ACKs * (0/1 for each EP) */ struct { size_t tx_seg_size; /* TX AM buffer size */ size_t rx_seg_size; /* RX AM buffer size */ size_t sendv_thresh; /* Minimum size of user's payload from which * non-blocking vector send should be used */ struct { size_t max_iov; /* Maximum supported IOVs limited by * user configuration and service buffers * (TCP protocol and user's AM headers) */ size_t max_hdr; /* Maximum supported AM Zcopy header */ size_t hdr_offset; /* Offset in TX buffer to empty space that * can be used for AM Zcopy header */ } zcopy; struct sockaddr_in ifaddr; /* Network address */ struct sockaddr_in netmask; /* Network address mask */ int prefer_default; /* Prefer default gateway */ int put_enable; /* Enable PUT Zcopy operation support */ int conn_nb; /* Use non-blocking connect() */ unsigned max_poll; /* Number of events to poll per socket*/ unsigned max_conn_retries; /* How many connection establishment attmepts * should be done if dropped connection was * detected due to lack of system resources */ } config; struct { int nodelay; /* TCP_NODELAY */ size_t sndbuf; /* SO_SNDBUF */ size_t rcvbuf; /* SO_RCVBUF */ } sockopt; } uct_tcp_iface_t; /** * TCP interface configuration */ typedef struct uct_tcp_iface_config { uct_iface_config_t super; size_t tx_seg_size; size_t rx_seg_size; size_t max_iov; size_t sendv_thresh; int prefer_default; int put_enable; int conn_nb; unsigned max_poll; unsigned max_conn_retries; int sockopt_nodelay; size_t sockopt_sndbuf; size_t sockopt_rcvbuf; uct_iface_mpool_config_t tx_mpool; uct_iface_mpool_config_t rx_mpool; } uct_tcp_iface_config_t; extern uct_component_t uct_tcp_component; extern const char *uct_tcp_address_type_names[]; extern const uct_tcp_cm_state_t uct_tcp_ep_cm_state[]; extern const uct_tcp_ep_progress_t uct_tcp_ep_progress_rx_cb[]; ucs_status_t uct_tcp_netif_caps(const char *if_name, double *latency_p, double *bandwidth_p); ucs_status_t uct_tcp_netif_inaddr(const char *if_name, struct sockaddr_in *ifaddr, struct sockaddr_in *netmask); ucs_status_t uct_tcp_netif_is_default(const char *if_name, int *result_p); int uct_tcp_sockaddr_cmp(const struct sockaddr *sa1, const struct sockaddr *sa2); ucs_status_t uct_tcp_iface_set_sockopt(uct_tcp_iface_t *iface, int fd); size_t uct_tcp_iface_get_max_iov(const uct_tcp_iface_t *iface); size_t uct_tcp_iface_get_max_zcopy_header(const uct_tcp_iface_t *iface); void uct_tcp_iface_add_ep(uct_tcp_ep_t *ep); void uct_tcp_iface_remove_ep(uct_tcp_ep_t *ep); ucs_status_t uct_tcp_ep_handle_dropped_connect(uct_tcp_ep_t *ep, int io_errno); ucs_status_t uct_tcp_ep_init(uct_tcp_iface_t *iface, int fd, const struct sockaddr_in *dest_addr, uct_tcp_ep_t **ep_p); ucs_status_t uct_tcp_ep_create(const uct_ep_params_t *params, uct_ep_h *ep_p); const char *uct_tcp_ep_ctx_caps_str(uint8_t ep_ctx_caps, char *str_buffer); void uct_tcp_ep_change_ctx_caps(uct_tcp_ep_t *ep, uint8_t new_caps); ucs_status_t uct_tcp_ep_add_ctx_cap(uct_tcp_ep_t *ep, uct_tcp_ep_ctx_type_t cap); ucs_status_t uct_tcp_ep_remove_ctx_cap(uct_tcp_ep_t *ep, uct_tcp_ep_ctx_type_t cap); ucs_status_t uct_tcp_ep_move_ctx_cap(uct_tcp_ep_t *from_ep, uct_tcp_ep_t *to_ep, uct_tcp_ep_ctx_type_t ctx_cap); void uct_tcp_ep_destroy_internal(uct_ep_h tl_ep); void uct_tcp_ep_destroy(uct_ep_h tl_ep); void uct_tcp_ep_set_failed(uct_tcp_ep_t *ep); unsigned uct_tcp_ep_is_self(const uct_tcp_ep_t *ep); void uct_tcp_ep_remove(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep); void uct_tcp_ep_add(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep); void uct_tcp_ep_mod_events(uct_tcp_ep_t *ep, int add, int remove); void uct_tcp_ep_pending_queue_dispatch(uct_tcp_ep_t *ep); ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header, const void *payload, unsigned length); ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id, uct_pack_callback_t pack_cb, void *arg, unsigned flags); ucs_status_t uct_tcp_ep_am_zcopy(uct_ep_h uct_ep, uint8_t am_id, const void *header, unsigned header_length, const uct_iov_t *iov, size_t iovcnt, unsigned flags, uct_completion_t *comp); ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov, size_t iovcnt, uint64_t remote_addr, uct_rkey_t rkey, uct_completion_t *comp); ucs_status_t uct_tcp_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *req, unsigned flags); void uct_tcp_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb, void *arg); ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp); ucs_status_t uct_tcp_cm_send_event(uct_tcp_ep_t *ep, uct_tcp_cm_conn_event_t event); unsigned uct_tcp_cm_handle_conn_pkt(uct_tcp_ep_t **ep_p, void *pkt, uint32_t length); unsigned uct_tcp_cm_conn_progress(uct_tcp_ep_t *ep); uct_tcp_ep_conn_state_t uct_tcp_cm_set_conn_state(uct_tcp_ep_t *ep, uct_tcp_ep_conn_state_t new_conn_state); void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep, uct_tcp_ep_conn_state_t new_conn_state); ucs_status_t uct_tcp_cm_add_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep); void uct_tcp_cm_remove_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep); uct_tcp_ep_t *uct_tcp_cm_search_ep(uct_tcp_iface_t *iface, const struct sockaddr_in *peer_addr, uct_tcp_ep_ctx_type_t with_ctx_type); void uct_tcp_cm_purge_ep(uct_tcp_ep_t *ep); ucs_status_t uct_tcp_cm_handle_incoming_conn(uct_tcp_iface_t *iface, const struct sockaddr_in *peer_addr, int fd); ucs_status_t uct_tcp_cm_conn_start(uct_tcp_ep_t *ep); static inline void uct_tcp_iface_outstanding_inc(uct_tcp_iface_t *iface) { iface->outstanding++; } static inline void uct_tcp_iface_outstanding_dec(uct_tcp_iface_t *iface) { ucs_assert(iface->outstanding > 0); iface->outstanding--; } /** * Query for active network devices under /sys/class/net, as determined by * ucs_netif_is_active(). 'md' parameter is not used, and is added for * compatibility with uct_tl_t::query_devices definition. */ ucs_status_t uct_tcp_query_devices(uct_md_h md, uct_tl_device_resource_t **devices_p, unsigned *num_devices_p); #endif