Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
 * See file LICENSE for terms.
 */

#ifndef UCT_TCP_MD_H
#define UCT_TCP_MD_H

#include <uct/base/uct_md.h>
#include <uct/base/uct_iface.h>
#include <ucs/sys/sock.h>
#include <ucs/sys/string.h>
#include <ucs/datastruct/khash.h>
#include <ucs/algorithm/crc.h>
#include <ucs/sys/event_set.h>
#include <ucs/sys/iovec.h>

#include <net/if.h>

#define UCT_TCP_NAME                          "tcp"

#define UCT_TCP_CONFIG_PREFIX                 "TCP_"

/* Magic number that is used by TCP to identify its peers */
#define UCT_TCP_MAGIC_NUMBER                  0xCAFEBABE12345678lu

/* Maximum number of events to wait on event set */
#define UCT_TCP_MAX_EVENTS                    16

/* How long should be string to keep [%s:%s] string
 * where %s value can be -/Tx/Rx */
#define UCT_TCP_EP_CTX_CAPS_STR_MAX           8

/* How many IOVs are needed to keep AM/PUT Zcopy service data
 * (TCP protocol and user's AM (or PUT) headers) */
#define UCT_TCP_EP_ZCOPY_SERVICE_IOV_COUNT    2

/* How many IOVs are needed to do AM Short
 * (TCP protocol and user's AM headers, payload) */
#define UCT_TCP_EP_AM_SHORTV_IOV_COUNT        3

/* Maximum size of a data that can be sent by PUT Zcopy
 * operation */
#define UCT_TCP_EP_PUT_ZCOPY_MAX              SIZE_MAX

/* Length of a data that is used by PUT protocol */
#define UCT_TCP_EP_PUT_SERVICE_LENGTH        (sizeof(uct_tcp_am_hdr_t) + \
                                              sizeof(uct_tcp_ep_put_req_hdr_t))

#define UCT_TCP_CONFIG_MAX_CONN_RETRIES      "MAX_CONN_RETRIES"


/**
 * TCP context type
 */
typedef enum uct_tcp_ep_ctx_type {
    /* EP is connected to a peer to send data. This EP is managed
     * by a user and TCP mustn't free this EP even if connection
     * is broken. */
    UCT_TCP_EP_CTX_TYPE_TX,
    /* EP is connected to a peer to receive data. If only RX is set
     * on a given EP, it is hidden from a user (i.e. the user is unable
     * to do any operation on that EP) and TCP is responsible to
     * free memory allocating for this EP. */
    UCT_TCP_EP_CTX_TYPE_RX,

    /* Additional flags that controls EP behavior: */
    /* - Zcopy TX operation is in progress on a given EP. */
    UCT_TCP_EP_CTX_TYPE_ZCOPY_TX,
    /* - PUT RX operation is in progress on a given EP. */
    UCT_TCP_EP_CTX_TYPE_PUT_RX,
    /* - PUT TX operation is waiting for an ACK on a given EP */
    UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK,
    /* - PUT RX operation is waiting for resources to send an ACK
     *   for received PUT operations on a given EP */
    UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK
} uct_tcp_ep_ctx_type_t;


/**
 * TCP endpoint connection state
 */
typedef enum uct_tcp_ep_conn_state {
    /* EP is unable to communicate with a peer's EP - connections establishment
     * was unsuccessful or detected hangup during communications. */
    UCT_TCP_EP_CONN_STATE_CLOSED,
    /* EP is connecting to a peer's EP, i.e. connect() was called on non-blocking
     * socket and returned this call returned that an operation is in progress.
     * After it is done, it sends `UCT_TCP_CM_CONN_REQ` to the peer.
     * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */
    UCT_TCP_EP_CONN_STATE_CONNECTING,
    /* EP is receiving the magic number in order to verify a peer. EP is moved
     * to this state after accept() completed. */
    UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER,
    /* EP is accepting connection from a peer, i.e. accept() returns socket fd
     * on which a connection was accepted, this EP was created using this socket
     * fd and the magic number was received and verified by EP and now it is
     * waiting for `UCT_TCP_CM_CONN_REQ` from a peer. */
    UCT_TCP_EP_CONN_STATE_ACCEPTING,
    /* EP is waiting for `UCT_TCP_CM_CONN_ACK` message from a peer after sending
     * `UCT_TCP_CM_CONN_REQ`.
     * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */
    UCT_TCP_EP_CONN_STATE_WAITING_ACK,
    /* EP is waiting for a connection and `UCT_TCP_CM_CONN_REQ` message from
     * a peer after simultaneous connection resolution between them. This EP
     * is a "winner" of the resolution, but no RX capability on this PR (i.e.
     * no `UCT_TCP_CM_CONN_REQ` message was received from the peer). EP is moved
     * to `UCT_TCP_EP_CONN_STATE_CONNECTED` state upon receiving this message.
     * All AM operations return `UCS_ERR_NO_RESOURCE` error to a caller. */
    UCT_TCP_EP_CONN_STATE_WAITING_REQ,
    /* EP is connected to a peer and they can communicate with each other. */
    UCT_TCP_EP_CONN_STATE_CONNECTED
} uct_tcp_ep_conn_state_t;

/* Forward declaration */
typedef struct uct_tcp_ep uct_tcp_ep_t;

typedef unsigned (*uct_tcp_ep_progress_t)(uct_tcp_ep_t *ep);

static inline int uct_tcp_khash_sockaddr_in_equal(struct sockaddr_in sa1,
                                                  struct sockaddr_in sa2)
{
    ucs_status_t status;
    int cmp;

    cmp = ucs_sockaddr_cmp((const struct sockaddr*)&sa1,
                           (const struct sockaddr*)&sa2,
                           &status);
    ucs_assert(status == UCS_OK);
    return !cmp;
}

static inline uint32_t uct_tcp_khash_sockaddr_in_hash(struct sockaddr_in sa)
{
    ucs_status_t UCS_V_UNUSED status;
    size_t addr_size;

    status = ucs_sockaddr_sizeof((const struct sockaddr*)&sa,
                                 &addr_size);
    ucs_assert(status == UCS_OK);
    return ucs_crc32(0, (const void *)&sa, addr_size);
}

KHASH_INIT(uct_tcp_cm_eps, struct sockaddr_in, ucs_list_link_t*,
           1, uct_tcp_khash_sockaddr_in_hash, uct_tcp_khash_sockaddr_in_equal);


/**
 * TCP Connection Manager state
 */
typedef struct uct_tcp_cm_state {
    const char            *name;       /* CM state name */
    uct_tcp_ep_progress_t tx_progress; /* TX progress function */
    uct_tcp_ep_progress_t rx_progress; /* RX progress function */
} uct_tcp_cm_state_t;


/**
 * TCP Connection Manager event
 */
typedef enum uct_tcp_cm_conn_event {
    /* Connection request from a EP that has TX capability to a EP that
     * has to be able to receive AM data (i.e. has to have RX capability). */
    UCT_TCP_CM_CONN_REQ               = UCS_BIT(0),
    /* Connection acknowledgment from a EP that accepts a conenction from
     * initiator of a connection request. */
    UCT_TCP_CM_CONN_ACK               = UCS_BIT(1),
    /* Request for waiting of a connection request.
     * The mesage is not sent separately (only along with a connection
     * acknowledgment.) */
    UCT_TCP_CM_CONN_WAIT_REQ          = UCS_BIT(2),
    /* Connection acknowledgment + Connection request. The mesasge is sent
     * from a EP that accepts remote conenction when it was in
     * `UCT_TCP_EP_CONN_STATE_CONNECTING` state (i.e. original
     * `UCT_TCP_CM_CONN_REQ` wasn't sent yet) and want to have RX capability
     * on a peer's EP in order to send AM data. */
    UCT_TCP_CM_CONN_ACK_WITH_REQ      = (UCT_TCP_CM_CONN_REQ |
                                         UCT_TCP_CM_CONN_ACK),
    /* Connection acknowledgment + Request for waiting of a connection request.
     * The message is sent from a EP that accepts remote conenction when it was
     * in `UCT_TCP_EP_CONN_STATE_WAITING_ACK` state (i.e. original
     * `UCT_TCP_CM_CONN_REQ` was sent) and want to have RX capability on a
     * peer's EP in order to send AM data. */
    UCT_TCP_CM_CONN_ACK_WITH_WAIT_REQ = (UCT_TCP_CM_CONN_WAIT_REQ |
                                         UCT_TCP_CM_CONN_ACK)
} uct_tcp_cm_conn_event_t;


/**
 * TCP connection request packet
 */
typedef struct uct_tcp_cm_conn_req_pkt {
    uct_tcp_cm_conn_event_t       event;      /* Connection event ID */
    struct sockaddr_in            iface_addr; /* Socket address of UCT local iface */
} UCS_S_PACKED uct_tcp_cm_conn_req_pkt_t;


/**
 * TCP active message header
 */
typedef struct uct_tcp_am_hdr {
    uint8_t                       am_id;      /* UCT AM ID of an AM operation */
    uint32_t                      length;     /* Length of data sent in an AM operation */
} UCS_S_PACKED uct_tcp_am_hdr_t;


/**
 * AM IDs reserved for TCP protocols
 */
typedef enum uct_tcp_ep_am_id {
    /* AM ID reserved for TCP internal Connection Manager messages */
    UCT_TCP_EP_CM_AM_ID      = UCT_AM_ID_MAX,
    /* AM ID reserved for TCP internal PUT REQ message */
    UCT_TCP_EP_PUT_REQ_AM_ID = UCT_AM_ID_MAX + 1,
    /* AM ID reserved for TCP internal PUT ACK message */
    UCT_TCP_EP_PUT_ACK_AM_ID = UCT_AM_ID_MAX + 2
} uct_tcp_ep_am_id_t;


/**
 * TCP PUT request header
 */
typedef struct uct_tcp_ep_put_req_hdr {
    uint64_t                      addr;        /* Address of a remote memory buffer */
    size_t                        length;      /* Length of a remote memory buffer */
    uint32_t                      sn;          /* Sequence number of the current PUT operation */
} UCS_S_PACKED uct_tcp_ep_put_req_hdr_t;


/**
 * TCP PUT acknowledge header
 */
typedef struct uct_tcp_ep_put_ack_hdr {
    uint32_t                      sn;          /* Sequence number of the last acked PUT operation */
} UCS_S_PACKED uct_tcp_ep_put_ack_hdr_t;


/**
 * TCP PUT completion
 */
typedef struct uct_tcp_ep_put_completion {
    uct_completion_t              *comp;           /* User's completion passed to
                                                    * uct_ep_flush */
    uint32_t                      wait_put_sn;     /* Sequence number of the last unacked
                                                    * PUT operations that was in-progress
                                                    * when uct_ep_flush was called */
    ucs_queue_elem_t              elem;            /* Element to insert completion into
                                                    * TCP EP PUT operation pending queue */
} uct_tcp_ep_put_completion_t;


/**
 * TCP endpoint communication context
 */
typedef struct uct_tcp_ep_ctx {
    uint32_t                      put_sn;         /* Sequence number of last sent
                                                   * or received PUT operation */
    void                          *buf;           /* Partial send/recv data */
    size_t                        length;         /* How much data in the buffer */
    size_t                        offset;         /* How much data was sent (TX) or was
                                                   * handled after receiving (RX) */
} uct_tcp_ep_ctx_t;


/**
 * TCP AM/PUT Zcopy communication context mapped to
 * buffer from TCP EP context
 */
typedef struct uct_tcp_ep_zcopy_tx {
    uct_tcp_am_hdr_t              super;     /* UCT TCP AM header */
    uct_completion_t              *comp;     /* Local UCT completion object */
    size_t                        iov_index; /* Current IOV index */
    size_t                        iov_cnt;   /* Number of IOVs that should be sent */
    struct iovec                  iov[0];    /* IOVs that should be sent */
} uct_tcp_ep_zcopy_tx_t;


/**
 * TCP endpoint
 */
struct uct_tcp_ep {
    uct_base_ep_t                 super;
    uint8_t                       ctx_caps;         /* Which contexts are supported */
    int                           fd;               /* Socket file descriptor */
    uct_tcp_ep_conn_state_t       conn_state;       /* State of connection with peer */
    unsigned                      conn_retries;     /* Number of connection attempts done */
    int                           events;           /* Current notifications */
    uct_tcp_ep_ctx_t              tx;               /* TX resources */
    uct_tcp_ep_ctx_t              rx;               /* RX resources */
    struct sockaddr_in            peer_addr;        /* Remote iface addr */
    ucs_queue_head_t              pending_q;        /* Pending operations */
    ucs_queue_head_t              put_comp_q;       /* Flush completions waiting for
                                                     * outstanding PUTs acknowledgment */
    ucs_list_link_t               list;             /* List element to insert into TCP EP list */
};


/**
 * TCP interface
 */
typedef struct uct_tcp_iface {
    uct_base_iface_t              super;             /* Parent class */
    int                           listen_fd;         /* Server socket */
    khash_t(uct_tcp_cm_eps)       ep_cm_map;         /* Map of endpoints that don't
                                                      * have one of the context cap */
    ucs_list_link_t               ep_list;           /* List of endpoints */
    char                          if_name[IFNAMSIZ]; /* Network interface name */
    ucs_sys_event_set_t           *event_set;        /* Event set identifier */
    ucs_mpool_t                   tx_mpool;          /* TX memory pool */
    ucs_mpool_t                   rx_mpool;          /* RX memory pool */
    size_t                        outstanding;       /* How much data in the EP send buffers
                                                      * + how many non-blocking connections
                                                      * are in progress + how many EPs are
                                                      * waiting for PUT Zcopy operation ACKs
                                                      * (0/1 for each EP) */

    struct {
        size_t                    tx_seg_size;       /* TX AM buffer size */
        size_t                    rx_seg_size;       /* RX AM buffer size */
        size_t                    sendv_thresh;      /* Minimum size of user's payload from which
                                                      * non-blocking vector send should be used */
        struct {
            size_t                max_iov;           /* Maximum supported IOVs limited by
                                                      * user configuration and service buffers
                                                      * (TCP protocol and user's AM headers) */
            size_t                max_hdr;           /* Maximum supported AM Zcopy header */
            size_t                hdr_offset;        /* Offset in TX buffer to empty space that
                                                      * can be used for AM Zcopy header */
        } zcopy;
        struct sockaddr_in        ifaddr;            /* Network address */
        struct sockaddr_in        netmask;           /* Network address mask */
        int                       prefer_default;    /* Prefer default gateway */
        int                       put_enable;        /* Enable PUT Zcopy operation support */
        int                       conn_nb;           /* Use non-blocking connect() */
        unsigned                  max_poll;          /* Number of events to poll per socket*/
        unsigned                  max_conn_retries;  /* How many connection establishment attmepts
                                                      * should be done if dropped connection was
                                                      * detected due to lack of system resources */
    } config;

    struct {
        int                       nodelay;           /* TCP_NODELAY */
        size_t                    sndbuf;            /* SO_SNDBUF */
        size_t                    rcvbuf;            /* SO_RCVBUF */
    } sockopt;
} uct_tcp_iface_t;


/**
 * TCP interface configuration
 */
typedef struct uct_tcp_iface_config {
    uct_iface_config_t            super;
    size_t                        tx_seg_size;
    size_t                        rx_seg_size;
    size_t                        max_iov;
    size_t                        sendv_thresh;
    int                           prefer_default;
    int                           put_enable;
    int                           conn_nb;
    unsigned                      max_poll;
    unsigned                      max_conn_retries;
    int                           sockopt_nodelay;
    size_t                        sockopt_sndbuf;
    size_t                        sockopt_rcvbuf;
    uct_iface_mpool_config_t      tx_mpool;
    uct_iface_mpool_config_t      rx_mpool;
} uct_tcp_iface_config_t;


extern uct_component_t uct_tcp_component;
extern const char *uct_tcp_address_type_names[];
extern const uct_tcp_cm_state_t uct_tcp_ep_cm_state[];
extern const uct_tcp_ep_progress_t uct_tcp_ep_progress_rx_cb[];

ucs_status_t uct_tcp_netif_caps(const char *if_name, double *latency_p,
                                double *bandwidth_p);

ucs_status_t uct_tcp_netif_inaddr(const char *if_name, struct sockaddr_in *ifaddr,
                                  struct sockaddr_in *netmask);

ucs_status_t uct_tcp_netif_is_default(const char *if_name, int *result_p);

int uct_tcp_sockaddr_cmp(const struct sockaddr *sa1,
                         const struct sockaddr *sa2);

ucs_status_t uct_tcp_iface_set_sockopt(uct_tcp_iface_t *iface, int fd);

size_t uct_tcp_iface_get_max_iov(const uct_tcp_iface_t *iface);

size_t uct_tcp_iface_get_max_zcopy_header(const uct_tcp_iface_t *iface);

void uct_tcp_iface_add_ep(uct_tcp_ep_t *ep);

void uct_tcp_iface_remove_ep(uct_tcp_ep_t *ep);

ucs_status_t uct_tcp_ep_handle_dropped_connect(uct_tcp_ep_t *ep, int io_errno);

ucs_status_t uct_tcp_ep_init(uct_tcp_iface_t *iface, int fd,
                             const struct sockaddr_in *dest_addr,
                             uct_tcp_ep_t **ep_p);

ucs_status_t uct_tcp_ep_create(const uct_ep_params_t *params,
                               uct_ep_h *ep_p);

const char *uct_tcp_ep_ctx_caps_str(uint8_t ep_ctx_caps, char *str_buffer);

void uct_tcp_ep_change_ctx_caps(uct_tcp_ep_t *ep, uint8_t new_caps);

ucs_status_t uct_tcp_ep_add_ctx_cap(uct_tcp_ep_t *ep,
                                    uct_tcp_ep_ctx_type_t cap);

ucs_status_t uct_tcp_ep_remove_ctx_cap(uct_tcp_ep_t *ep,
                                       uct_tcp_ep_ctx_type_t cap);

ucs_status_t uct_tcp_ep_move_ctx_cap(uct_tcp_ep_t *from_ep, uct_tcp_ep_t *to_ep,
                                     uct_tcp_ep_ctx_type_t ctx_cap);

void uct_tcp_ep_destroy_internal(uct_ep_h tl_ep);

void uct_tcp_ep_destroy(uct_ep_h tl_ep);

void uct_tcp_ep_set_failed(uct_tcp_ep_t *ep);

unsigned uct_tcp_ep_is_self(const uct_tcp_ep_t *ep);

void uct_tcp_ep_remove(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);

void uct_tcp_ep_add(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);

void uct_tcp_ep_mod_events(uct_tcp_ep_t *ep, int add, int remove);

void uct_tcp_ep_pending_queue_dispatch(uct_tcp_ep_t *ep);

ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header,
                                 const void *payload, unsigned length);

ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id,
                            uct_pack_callback_t pack_cb, void *arg,
                            unsigned flags);

ucs_status_t uct_tcp_ep_am_zcopy(uct_ep_h uct_ep, uint8_t am_id, const void *header,
                                 unsigned header_length, const uct_iov_t *iov,
                                 size_t iovcnt, unsigned flags,
                                 uct_completion_t *comp);

ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov,
                                  size_t iovcnt, uint64_t remote_addr,
                                  uct_rkey_t rkey, uct_completion_t *comp);

ucs_status_t uct_tcp_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *req,
                                    unsigned flags);

void uct_tcp_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
                              void *arg);

ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
                              uct_completion_t *comp);

ucs_status_t uct_tcp_cm_send_event(uct_tcp_ep_t *ep, uct_tcp_cm_conn_event_t event);

unsigned uct_tcp_cm_handle_conn_pkt(uct_tcp_ep_t **ep_p, void *pkt, uint32_t length);

unsigned uct_tcp_cm_conn_progress(uct_tcp_ep_t *ep);

uct_tcp_ep_conn_state_t
uct_tcp_cm_set_conn_state(uct_tcp_ep_t *ep,
                          uct_tcp_ep_conn_state_t new_conn_state);

void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep,
                                  uct_tcp_ep_conn_state_t new_conn_state);

ucs_status_t uct_tcp_cm_add_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);

void uct_tcp_cm_remove_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep);

uct_tcp_ep_t *uct_tcp_cm_search_ep(uct_tcp_iface_t *iface,
                                   const struct sockaddr_in *peer_addr,
                                   uct_tcp_ep_ctx_type_t with_ctx_type);

void uct_tcp_cm_purge_ep(uct_tcp_ep_t *ep);

ucs_status_t uct_tcp_cm_handle_incoming_conn(uct_tcp_iface_t *iface,
                                             const struct sockaddr_in *peer_addr,
                                             int fd);

ucs_status_t uct_tcp_cm_conn_start(uct_tcp_ep_t *ep);

static inline void uct_tcp_iface_outstanding_inc(uct_tcp_iface_t *iface)
{
    iface->outstanding++;
}

static inline void uct_tcp_iface_outstanding_dec(uct_tcp_iface_t *iface)
{
    ucs_assert(iface->outstanding > 0);
    iface->outstanding--;
}

/**
 * Query for active network devices under /sys/class/net, as determined by
 * ucs_netif_is_active(). 'md' parameter is not used, and is added for
 * compatibility with uct_tl_t::query_devices definition.
 */
ucs_status_t uct_tcp_query_devices(uct_md_h md,
                                   uct_tl_device_resource_t **devices_p,
                                   unsigned *num_devices_p);

#endif