Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "tcp.h"

#include <ucs/async/async.h>


/* Forward declarations */
static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep);
static unsigned uct_tcp_ep_progress_data_rx(uct_tcp_ep_t *ep);
static unsigned uct_tcp_ep_progress_magic_number_rx(uct_tcp_ep_t *ep);

const uct_tcp_cm_state_t uct_tcp_ep_cm_state[] = {
    [UCT_TCP_EP_CONN_STATE_CLOSED]      = {
        .name        = "CLOSED",
        .tx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero,
        .rx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero
    },
    [UCT_TCP_EP_CONN_STATE_CONNECTING]  = {
        .name        = "CONNECTING",
        .tx_progress = uct_tcp_cm_conn_progress,
        .rx_progress = uct_tcp_ep_progress_data_rx
    },
    [UCT_TCP_EP_CONN_STATE_WAITING_ACK] = {
        .name        = "WAITING_ACK",
        .tx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero,
        .rx_progress = uct_tcp_ep_progress_data_rx
    },
    [UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER]   = {
        .name        = "RECV_MAGIC_NUMBER",
        .tx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero,
        .rx_progress = uct_tcp_ep_progress_magic_number_rx
    },
    [UCT_TCP_EP_CONN_STATE_ACCEPTING]   = {
        .name        = "ACCEPTING",
        .tx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero,
        .rx_progress = uct_tcp_ep_progress_data_rx
    },
    [UCT_TCP_EP_CONN_STATE_WAITING_REQ] = {
        .name        = "WAITING_REQ",
        .tx_progress = (uct_tcp_ep_progress_t)ucs_empty_function_return_zero,
        .rx_progress = uct_tcp_ep_progress_data_rx
    },
    [UCT_TCP_EP_CONN_STATE_CONNECTED]   = {
        .name        = "CONNECTED",
        .tx_progress = uct_tcp_ep_progress_data_tx,
        .rx_progress = uct_tcp_ep_progress_data_rx
    }
};

static inline int uct_tcp_ep_ctx_buf_empty(uct_tcp_ep_ctx_t *ctx)
{
    ucs_assert((ctx->length == 0) || (ctx->buf != NULL));

    return ctx->length == 0;
}

static inline int uct_tcp_ep_ctx_buf_need_progress(uct_tcp_ep_ctx_t *ctx)
{
    ucs_assert(ctx->offset <= ctx->length);

    return ctx->offset < ctx->length;
}

static inline ucs_status_t uct_tcp_ep_check_tx_res(uct_tcp_ep_t *ep)
{
    if (ucs_unlikely(ep->conn_state != UCT_TCP_EP_CONN_STATE_CONNECTED)) {
        if (ep->conn_state == UCT_TCP_EP_CONN_STATE_CLOSED) {
            return UCS_ERR_UNREACHABLE;
        }

        ucs_assertv((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) ||
                    (ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) ||
                    (ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ),
                    "ep=%p", ep);
        return UCS_ERR_NO_RESOURCE;
    }

    return uct_tcp_ep_ctx_buf_empty(&ep->tx) ? UCS_OK : UCS_ERR_NO_RESOURCE;
}

static inline void uct_tcp_ep_ctx_rewind(uct_tcp_ep_ctx_t *ctx)
{
    ctx->offset = 0;
    ctx->length = 0;
}

static inline void uct_tcp_ep_ctx_init(uct_tcp_ep_ctx_t *ctx)
{
    ctx->put_sn = UINT32_MAX;
    ctx->buf    = NULL;
    uct_tcp_ep_ctx_rewind(ctx);
}

static inline void uct_tcp_ep_ctx_reset(uct_tcp_ep_ctx_t *ctx)
{
    ucs_mpool_put_inline(ctx->buf);
    ctx->buf = NULL;
    uct_tcp_ep_ctx_rewind(ctx);
}

static void uct_tcp_ep_addr_cleanup(struct sockaddr_in *sock_addr)
{
    memset(sock_addr, 0, sizeof(*sock_addr));
}

static void uct_tcp_ep_addr_init(struct sockaddr_in *sock_addr,
                                 const struct sockaddr_in *peer_addr)
{
    /* TODO: handle IPv4 and IPv6 */
    if (peer_addr == NULL) {
        uct_tcp_ep_addr_cleanup(sock_addr);
    } else {
        *sock_addr = *peer_addr;
    }
}

static void uct_tcp_ep_close_fd(int *fd_p)
{
    if (*fd_p != -1) {
        close(*fd_p);
        *fd_p = -1;
    }
}

unsigned uct_tcp_ep_is_self(const uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    ucs_status_t status;
    int cmp;

    cmp = ucs_sockaddr_cmp((const struct sockaddr*)&ep->peer_addr,
                           (const struct sockaddr*)&iface->config.ifaddr,
                           &status);
    ucs_assertv(status == UCS_OK, "ep=%p", ep);
    return !cmp;
}

static void uct_tcp_ep_cleanup(uct_tcp_ep_t *ep)
{
    uct_tcp_ep_addr_cleanup(&ep->peer_addr);

    if (ep->tx.buf != NULL) {
        uct_tcp_ep_ctx_reset(&ep->tx);
    }

    if (ep->rx.buf != NULL) {
        uct_tcp_ep_ctx_reset(&ep->rx);
    }

    if (ep->events && (ep->fd != -1)) {
        uct_tcp_ep_mod_events(ep, 0, ep->events);
    }

    uct_tcp_ep_close_fd(&ep->fd);
}

static UCS_CLASS_INIT_FUNC(uct_tcp_ep_t, uct_tcp_iface_t *iface,
                           int fd, const struct sockaddr_in *dest_addr)
{
    ucs_status_t status;

    ucs_assertv(fd >= 0, "iface=%p", iface);

    UCS_CLASS_CALL_SUPER_INIT(uct_base_ep_t, &iface->super)

    uct_tcp_ep_addr_init(&self->peer_addr, dest_addr);

    uct_tcp_ep_ctx_init(&self->tx);
    uct_tcp_ep_ctx_init(&self->rx);

    self->events        = 0;
    self->conn_retries  = 0;
    self->fd            = fd;
    self->ctx_caps      = 0;
    self->conn_state    = UCT_TCP_EP_CONN_STATE_CLOSED;

    ucs_list_head_init(&self->list);
    ucs_queue_head_init(&self->pending_q);
    ucs_queue_head_init(&self->put_comp_q);

    /* Make a socket non-blocking if an EP is created during accepting
     * a connection or non-blocking connection mode is requested */
    if ((dest_addr == NULL) || iface->config.conn_nb) {
        status = ucs_sys_fcntl_modfl(self->fd, O_NONBLOCK, 0);
        if (status != UCS_OK) {
            goto err_cleanup;
        }
    }

    status = uct_tcp_iface_set_sockopt(iface, self->fd);
    if (status != UCS_OK) {
        goto err_cleanup;
    }

    uct_tcp_iface_add_ep(self);

    ucs_debug("tcp_ep %p: created on iface %p, fd %d", self, iface, self->fd);
    return UCS_OK;

err_cleanup:
    /* need to be closed by this function caller */
    self->fd = -1;
    uct_tcp_ep_cleanup(self);
    return status;
}

const char *uct_tcp_ep_ctx_caps_str(uint8_t ep_ctx_caps, char *str_buffer)
{
    ucs_snprintf_zero(str_buffer, UCT_TCP_EP_CTX_CAPS_STR_MAX, "[%s:%s]",
                      (ep_ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) ?
                      "Tx" : "-",
                      (ep_ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) ?
                      "Rx" : "-");
    return str_buffer;
}

void uct_tcp_ep_change_ctx_caps(uct_tcp_ep_t *ep, uint8_t new_caps)
{
    char str_prev_ctx_caps[UCT_TCP_EP_CTX_CAPS_STR_MAX];
    char str_cur_ctx_caps[UCT_TCP_EP_CTX_CAPS_STR_MAX];

    if (ep->ctx_caps != new_caps) {
        ucs_trace("tcp_ep %p: ctx caps changed %s -> %s", ep,
                  uct_tcp_ep_ctx_caps_str(ep->ctx_caps, str_prev_ctx_caps),
                  uct_tcp_ep_ctx_caps_str(new_caps, str_cur_ctx_caps));
        ep->ctx_caps = new_caps;
    }
}

ucs_status_t uct_tcp_ep_add_ctx_cap(uct_tcp_ep_t *ep,
                                    uct_tcp_ep_ctx_type_t cap)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    uint8_t prev_caps      = ep->ctx_caps;

    uct_tcp_ep_change_ctx_caps(ep, ep->ctx_caps | UCS_BIT(cap));
    if (!uct_tcp_ep_is_self(ep) && (prev_caps != ep->ctx_caps)) {
        if (!prev_caps) {
            return uct_tcp_cm_add_ep(iface, ep);
        } else if (ucs_test_all_flags(ep->ctx_caps,
                                      (UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX) |
                                       UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)))) {
            uct_tcp_cm_remove_ep(iface, ep);
        }
    }

    return UCS_OK;
}

ucs_status_t uct_tcp_ep_remove_ctx_cap(uct_tcp_ep_t *ep,
                                       uct_tcp_ep_ctx_type_t cap)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    uint8_t prev_caps      = ep->ctx_caps;

    uct_tcp_ep_change_ctx_caps(ep, ep->ctx_caps & ~UCS_BIT(cap));
    if (!uct_tcp_ep_is_self(ep)) {
        if (ucs_test_all_flags(prev_caps,
                               (UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX) |
                                UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)))) {
            return uct_tcp_cm_add_ep(iface, ep);
        } else if (!ep->ctx_caps) {
            uct_tcp_cm_remove_ep(iface, ep);
        }
    }

    return UCS_OK;
}

ucs_status_t uct_tcp_ep_move_ctx_cap(uct_tcp_ep_t *from_ep, uct_tcp_ep_t *to_ep,
                                     uct_tcp_ep_ctx_type_t ctx_cap)
{
    ucs_status_t status;

    status = uct_tcp_ep_remove_ctx_cap(from_ep, ctx_cap);
    if (status != UCS_OK) {
        return status;
    }

    return uct_tcp_ep_add_ctx_cap(to_ep, ctx_cap);
}

static UCS_CLASS_CLEANUP_FUNC(uct_tcp_ep_t)
{
    uct_tcp_iface_t UCS_V_UNUSED *iface =
        ucs_derived_of(self->super.super.iface, uct_tcp_iface_t);
    uct_tcp_ep_put_completion_t *put_comp;

    uct_tcp_ep_mod_events(self, 0, self->events);

    if (self->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
        uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_TYPE_TX);
    }

    if (self->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
        uct_tcp_ep_remove_ctx_cap(self, UCT_TCP_EP_CTX_TYPE_RX);
    }

    ucs_assertv(!self->ctx_caps, "ep=%p", self);

    ucs_queue_for_each_extract(put_comp, &self->put_comp_q, elem, 1) {
        ucs_free(put_comp);
    }

    uct_tcp_iface_remove_ep(self);

    if (self->conn_state != UCT_TCP_EP_CONN_STATE_CLOSED) {
        uct_tcp_cm_change_conn_state(self, UCT_TCP_EP_CONN_STATE_CLOSED);
    }

    uct_tcp_ep_cleanup(self);

    ucs_debug("tcp_ep %p: destroyed on iface %p", self, iface);
}

UCS_CLASS_DEFINE(uct_tcp_ep_t, uct_base_ep_t);

UCS_CLASS_DEFINE_NAMED_NEW_FUNC(uct_tcp_ep_init, uct_tcp_ep_t, uct_tcp_ep_t,
                                uct_tcp_iface_t*, int,
                                const struct sockaddr_in*)
UCS_CLASS_DEFINE_NAMED_DELETE_FUNC(uct_tcp_ep_destroy_internal,
                                   uct_tcp_ep_t, uct_ep_t)

void uct_tcp_ep_destroy(uct_ep_h tl_ep)
{
    uct_tcp_ep_t *ep = ucs_derived_of(tl_ep, uct_tcp_ep_t);

    if ((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) &&
        ucs_test_all_flags(ep->ctx_caps,
                           UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX) |
                           UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX))) {
        /* remove TX capability, but still will be able to receive data */
        uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_TX);
    } else {
        uct_tcp_ep_destroy_internal(tl_ep);
    }
}

void uct_tcp_ep_set_failed(uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);

    if (ep->conn_state != UCT_TCP_EP_CONN_STATE_CLOSED) {
        uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_CLOSED);
    }

    uct_set_ep_failed(&UCS_CLASS_NAME(uct_tcp_ep_t),
                      &ep->super.super, &iface->super.super,
                      UCS_ERR_UNREACHABLE);
}

static ucs_status_t
uct_tcp_ep_create_socket_and_connect(uct_tcp_iface_t *iface,
                                     const struct sockaddr_in *dest_addr,
                                     uct_tcp_ep_t **ep_p)
{
    uct_tcp_ep_t *ep = NULL;
    ucs_status_t status;
    int fd;

    /* if EP is already allocated, dest_addr can be NULL */
    ucs_assert((*ep_p != NULL) || (dest_addr != NULL));

    status = ucs_socket_create(AF_INET, SOCK_STREAM, &fd);
    if (status != UCS_OK) {
        return status;
    }

    if (*ep_p == NULL) {
        status = uct_tcp_ep_init(iface, fd, dest_addr, &ep);
        if (status != UCS_OK) {
            goto err_close_fd;
        }

        /* EP is responsible for this socket fd from now */
        fd = -1;
    } else {
        ep     = *ep_p;
        ep->fd = fd;
    }

    status = uct_tcp_cm_conn_start(ep);
    if (status != UCS_OK) {
        goto err_ep_destroy;
    }

    if (*ep_p == NULL) {
        *ep_p = ep;
    }

    return UCS_OK;

err_ep_destroy:
    if (*ep_p == NULL) {
        uct_tcp_ep_destroy_internal(&ep->super.super);
    }
err_close_fd:
    /* fd has to be valid in case of valid EP has been
     * passed to this function */
    ucs_assert((*ep_p == NULL) || (fd != -1));
    uct_tcp_ep_close_fd(&fd);
    return status;
}

static ucs_status_t uct_tcp_ep_create_connected(uct_tcp_iface_t *iface,
                                                const struct sockaddr_in *dest_addr,
                                                uct_tcp_ep_t **ep_p)
{
    ucs_status_t status;

    status = uct_tcp_ep_create_socket_and_connect(iface, dest_addr, ep_p);
    if (status != UCS_OK) {
        return status;
    }

    status = uct_tcp_ep_add_ctx_cap(*ep_p, UCT_TCP_EP_CTX_TYPE_TX);
    if (status != UCS_OK) {
        goto err_ep_destroy;
    }

    return UCS_OK;

err_ep_destroy:
    uct_tcp_ep_destroy_internal(&(*ep_p)->super.super);
    return status;
}

ucs_status_t uct_tcp_ep_create(const uct_ep_params_t *params,
                               uct_ep_h *ep_p)
{
    uct_tcp_iface_t *iface = ucs_derived_of(params->iface, uct_tcp_iface_t);
    uct_tcp_ep_t *ep       = NULL;
    struct sockaddr_in dest_addr;
    ucs_status_t status;

    UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(params);
    memset(&dest_addr, 0, sizeof(dest_addr));
    /* TODO: handle AF_INET6 */
    dest_addr.sin_family = AF_INET;
    dest_addr.sin_port   = *(in_port_t*)params->iface_addr;
    dest_addr.sin_addr   = *(struct in_addr*)params->dev_addr;

    do {
        ep = uct_tcp_cm_search_ep(iface, &dest_addr,
                                  UCT_TCP_EP_CTX_TYPE_RX);
        if (ep) {
            ucs_assert(!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)));
            /* Found EP with RX ctx, try to send the connection request
             * to the remote peer, if it successful - assign TX to this EP
             * and return the EP to the user, otherwise - destroy this EP
             * and try to search another EP w/o TX capability or create
             * new EP */
            status = uct_tcp_cm_send_event(ep, UCT_TCP_CM_CONN_REQ);
            if (status != UCS_OK) {
                uct_tcp_ep_destroy_internal(&ep->super.super);
                ep = NULL;
            } else {
                status = uct_tcp_ep_add_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_TX);
                if (status != UCS_OK) {
                    return status;
                }
            }
        } else {
            status = uct_tcp_ep_create_connected(iface, &dest_addr, &ep);
            break;
        }
    } while (ep == NULL);

    if (status == UCS_OK) {
        /* cppcheck-suppress autoVariables */
        *ep_p = &ep->super.super;
    }
    return status;
}

void uct_tcp_ep_mod_events(uct_tcp_ep_t *ep, int add, int remove)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    int old_events         = ep->events;
    int new_events         = (ep->events | add) & ~remove;
    ucs_status_t status;

    if (new_events != ep->events) {
        ep->events = new_events;
        ucs_trace("tcp_ep %p: set events to %c%c", ep,
                  (new_events & UCS_EVENT_SET_EVREAD)  ? 'r' : '-',
                  (new_events & UCS_EVENT_SET_EVWRITE) ? 'w' : '-');
        if (new_events == 0) {
            status = ucs_event_set_del(iface->event_set, ep->fd);
        } else if (old_events != 0) {
            status = ucs_event_set_mod(iface->event_set, ep->fd,
                                       (ucs_event_set_type_t)ep->events,
                                       (void *)ep);
        } else {
            status = ucs_event_set_add(iface->event_set, ep->fd,
                                       (ucs_event_set_type_t)ep->events,
                                       (void *)ep);
        }
        if (status != UCS_OK) {
            ucs_fatal("unable to modify event set for tcp_ep %p (fd=%d)", ep,
                      ep->fd);
        }
    }
}

static inline void uct_tcp_ep_handle_put_ack(uct_tcp_ep_t *ep,
                                             uct_tcp_ep_put_ack_hdr_t *put_ack)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    uct_tcp_ep_put_completion_t *put_comp;

    if (put_ack->sn == ep->tx.put_sn) {
        /* Since there are no other PUT operations in-flight, can remove flag
         * and decrement iface outstanding operations counter */
        ucs_assert(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK));
        ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK);
        uct_tcp_iface_outstanding_dec(iface);
    }

    ucs_queue_for_each_extract(put_comp, &ep->put_comp_q, elem,
                               (UCS_CIRCULAR_COMPARE32(put_comp->wait_put_sn,
                                                       <=, put_ack->sn))) {
        uct_invoke_completion(put_comp->comp, UCS_OK);
        ucs_free(put_comp);
    }
}

void uct_tcp_ep_pending_queue_dispatch(uct_tcp_ep_t *ep)
{
    uct_pending_req_priv_queue_t *priv;

    uct_pending_queue_dispatch(priv, &ep->pending_q,
                               uct_tcp_ep_ctx_buf_empty(&ep->tx));
    if (uct_tcp_ep_ctx_buf_empty(&ep->tx)) {
        ucs_assert(ucs_queue_is_empty(&ep->pending_q));
        uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVWRITE);
    }
}

static void uct_tcp_ep_handle_disconnected(uct_tcp_ep_t *ep,
                                           uct_tcp_ep_ctx_t *ctx)
{
    ucs_debug("tcp_ep %p: remote disconnected", ep);

    uct_tcp_ep_ctx_reset(ctx);

    if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
        if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) {
            uct_tcp_ep_remove_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_RX);
        }

        uct_tcp_ep_mod_events(ep, 0, ep->events);
        uct_tcp_ep_close_fd(&ep->fd);
    } else if ((ep->ctx_caps == 0) ||
               (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX))) {
        /* If the EP supports RX only or no capabilities set, destroy it */
        uct_tcp_ep_destroy_internal(&ep->super.super);
    }
}

static inline ssize_t uct_tcp_ep_send(uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    size_t sent_length;
    ucs_status_t status;

    ucs_assert(ep->tx.length > ep->tx.offset);
    sent_length = ep->tx.length - ep->tx.offset;

    status = ucs_socket_send_nb(ep->fd, UCS_PTR_BYTE_OFFSET(ep->tx.buf, ep->tx.offset),
                                &sent_length, NULL, NULL);
    if (ucs_unlikely((status != UCS_OK) &&
                     (status != UCS_ERR_NO_PROGRESS))) {
        return status;
    }

    iface->outstanding -= sent_length;
    ep->tx.offset      += sent_length;

    ucs_assert(sent_length <= SSIZE_MAX);

    return sent_length;
}

static inline void uct_tcp_ep_comp_zcopy(uct_tcp_ep_t *ep,
                                         uct_completion_t *comp,
                                         ucs_status_t status)
{
    ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);
    if (comp != NULL) {
        uct_invoke_completion(comp, status);
    }
}

static inline ssize_t uct_tcp_ep_sendv(uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface     = ucs_derived_of(ep->super.super.iface,
                                                uct_tcp_iface_t);
    uct_tcp_ep_zcopy_tx_t *ctx = (uct_tcp_ep_zcopy_tx_t*)ep->tx.buf;
    size_t sent_length;
    ucs_status_t status;

    ucs_assertv(ep->tx.offset < ep->tx.length, "ep=%p", ep);

    status = ucs_socket_sendv_nb(ep->fd, &ctx->iov[ctx->iov_index],
                                 ctx->iov_cnt - ctx->iov_index,
                                 &sent_length, NULL, NULL);

    if (ucs_unlikely(status != UCS_OK)) {
        if (status == UCS_ERR_NO_PROGRESS) {
            ucs_assert(sent_length == 0);
            return 0;
        }

        uct_tcp_ep_comp_zcopy(ep, ctx->comp, status);
        return status;
    }

    ep->tx.offset      += sent_length;
    iface->outstanding -= sent_length;

    if (ep->tx.offset != ep->tx.length) {
        ucs_iov_advance(ctx->iov, ctx->iov_cnt,
                        &ctx->iov_index, sent_length);
    } else {
        uct_tcp_ep_comp_zcopy(ep, ctx->comp, UCS_OK);
    }

    ucs_assert(sent_length <= SSIZE_MAX);
    return sent_length;
}

ucs_status_t uct_tcp_ep_handle_dropped_connect(uct_tcp_ep_t *ep, int io_errno)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    ucs_status_t status;

    /* if connection establishment fails, the system limits
     * may not be big enough */
    if (((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) ||
         (ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) ||
         (ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ)) &&
        ((io_errno == ECONNRESET) || (io_errno == ECONNREFUSED) ||
         /* connection establishment procedure timed out */
         (io_errno == ETIMEDOUT))) {
        uct_tcp_ep_mod_events(ep, 0, ep->events);
        uct_tcp_ep_close_fd(&ep->fd);

        uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_CLOSED);

        status = uct_tcp_ep_create_socket_and_connect(iface, NULL, &ep);
        if (status != UCS_OK) {
            ucs_error("try to increase \"net.core.somaxconn\", "
                      "\"net.core.netdev_max_backlog\", "
                      "\"net.ipv4.tcp_max_syn_backlog\" to the maximum value "
                      "on the remote node or increase %s%s%s (=%u)",
                      UCS_CONFIG_PREFIX, UCT_TCP_CONFIG_PREFIX,
                      UCT_TCP_CONFIG_MAX_CONN_RETRIES,
                      iface->config.max_conn_retries);
        }

        return status;
    }

    return UCS_ERR_IO_ERROR;
}

static ucs_status_t uct_tcp_ep_io_err_handler_cb(void *arg, int io_errno)
{
    uct_tcp_ep_t *ep                    = (uct_tcp_ep_t*)arg;
    uct_tcp_iface_t UCS_V_UNUSED *iface = ucs_derived_of(ep->super.super.iface,
                                                         uct_tcp_iface_t);
    char str_local_addr[UCS_SOCKADDR_STRING_LEN];
    char str_remote_addr[UCS_SOCKADDR_STRING_LEN];

    if ((io_errno == ECONNRESET) &&
        ((ep->conn_state == UCT_TCP_EP_CONN_STATE_ACCEPTING) ||
         ((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) &&
          (ep->ctx_caps == UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX)) /* only RX cap */))) {
        ucs_debug("tcp_ep %p: detected %d (%s) error, the [%s <-> %s] "
                  "connection was dropped by the peer",
                  ep, io_errno, strerror(io_errno),
                  ucs_sockaddr_str((const struct sockaddr*)&iface->config.ifaddr,
                                   str_local_addr, UCS_SOCKADDR_STRING_LEN),
                  ucs_sockaddr_str((const struct sockaddr*)&ep->peer_addr,
                                   str_remote_addr, UCS_SOCKADDR_STRING_LEN));
        return UCS_OK;
    }

    return uct_tcp_ep_handle_dropped_connect(ep, io_errno);
}

static inline void uct_tcp_ep_handle_recv_err(uct_tcp_ep_t *ep,
                                              ucs_status_t status)
{
    if (status == UCS_ERR_NO_PROGRESS) {
        /* If no data were read to the allocated buffer,
         * we can safely reset it for futher re-use and to
         * avoid overwriting this buffer, because `rx::length == 0` */
        if (ep->rx.length == 0) {
            uct_tcp_ep_ctx_reset(&ep->rx);
        }
    } else {
        uct_tcp_ep_handle_disconnected(ep, &ep->rx);
    }
}

static inline unsigned uct_tcp_ep_recv(uct_tcp_ep_t *ep, size_t recv_length)
{
    uct_tcp_iface_t UCS_V_UNUSED *iface = ucs_derived_of(ep->super.super.iface,
                                                         uct_tcp_iface_t);
    ucs_status_t status;

    ucs_assertv(recv_length, "ep=%p", ep);

    status = ucs_socket_recv_nb(ep->fd, UCS_PTR_BYTE_OFFSET(ep->rx.buf,
                                                            ep->rx.length),
                                &recv_length, uct_tcp_ep_io_err_handler_cb, ep);
    if (ucs_unlikely(status != UCS_OK)) {
        uct_tcp_ep_handle_recv_err(ep, status);
        return 0;
    }

    ucs_assertv(recv_length, "ep=%p", ep);

    ep->rx.length += recv_length;
    ucs_trace_data("tcp_ep %p: recvd %zu bytes", ep, recv_length);
    ucs_assert(ep->rx.length <= (iface->config.rx_seg_size * 2));

    return 1;
}

/* Forward declaration - the function depends on AM send
 * functions implemented below */
static void uct_tcp_ep_post_put_ack(uct_tcp_ep_t *ep);

static unsigned uct_tcp_ep_progress_data_tx(uct_tcp_ep_t *ep)
{
    unsigned ret = 0;
    ssize_t offset;

    ucs_trace_func("ep=%p", ep);

    if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
        offset = (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX)) ?
                  uct_tcp_ep_send(ep) : uct_tcp_ep_sendv(ep));
        if (ucs_unlikely(offset < 0)) {
            uct_tcp_ep_handle_disconnected(ep, &ep->tx);
            return 1;
        }

        ret = (offset > 0);

        ucs_trace_data("ep %p fd %d sent %zu/%zu bytes, moved by offset %zd",
                       ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

        if (!uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
            uct_tcp_ep_ctx_reset(&ep->tx);
        }
    }

    if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK)) {
        uct_tcp_ep_post_put_ack(ep);
    }

    if (!ucs_queue_is_empty(&ep->pending_q)) {
        uct_tcp_ep_pending_queue_dispatch(ep);
        return ret;
    }

    if (uct_tcp_ep_ctx_buf_empty(&ep->tx)) {
        ucs_assert(ucs_queue_is_empty(&ep->pending_q));
        uct_tcp_ep_mod_events(ep, 0, UCS_EVENT_SET_EVWRITE);
    }

    return ret;
}

static inline void
uct_tcp_ep_comp_recv_am(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
                        uct_tcp_am_hdr_t *hdr)
{
    uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_RECV, hdr->am_id,
                       hdr + 1, hdr->length,
                       "RECV: ep %p fd %d received %zu/%zu bytes",
                       ep, ep->fd, ep->rx.offset, ep->rx.length);
    uct_iface_invoke_am(&iface->super, hdr->am_id, hdr + 1, hdr->length, 0);
}

static inline ucs_status_t
uct_tcp_ep_put_rx_advance(uct_tcp_ep_t *ep, uct_tcp_ep_put_req_hdr_t *put_req,
                          size_t recv_length)
{
    ucs_assert(!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK)));
    ucs_assert(recv_length <= put_req->length);
    put_req->addr   += recv_length;
    put_req->length -= recv_length;

    if (!put_req->length) {
        uct_tcp_ep_post_put_ack(ep);

        /* EP's ctx_caps doesn't have UCT_TCP_EP_CTX_TYPE_PUT_RX flag
         * set in case of entire PUT payload was received through
         * AM protocol */
        if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX)) {
            ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX);
            uct_tcp_ep_ctx_reset(&ep->rx);
        }

        return UCS_OK;
    }

    return UCS_INPROGRESS;
}

static inline void uct_tcp_ep_handle_put_req(uct_tcp_ep_t *ep,
                                             uct_tcp_ep_put_req_hdr_t *put_req,
                                             size_t extra_recvd_length)
{
    size_t copied_length;
    ucs_status_t status;

    ucs_assert(put_req->addr || !put_req->length);

    copied_length  = ucs_min(put_req->length, extra_recvd_length);
    memcpy((void*)(uintptr_t)put_req->addr,
           UCS_PTR_BYTE_OFFSET(ep->rx.buf, ep->rx.offset),
           copied_length);
    ep->rx.offset += copied_length;
    ep->rx.put_sn  = put_req->sn;

    /* Remove the flag that indicates that EP is sending PUT RX ACK in order
     * to not ack the uncompleted PUT RX operation for which PUT REQ is being
     * handled here. ACK for both operations will be sent after the completion
     * of the last received PUT operation */
    ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK);

    status = uct_tcp_ep_put_rx_advance(ep, put_req, copied_length);
    if (status == UCS_OK) {
        return;
    }

    ucs_assert(ep->rx.offset == ep->rx.length);
    uct_tcp_ep_ctx_rewind(&ep->rx);
    /* Since RX buffer and PUT request can be ovelapped, use memmove() */
    memmove(ep->rx.buf, put_req, sizeof(*put_req));
    ep->ctx_caps |= UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX);
}

static unsigned uct_tcp_ep_progress_am_rx(uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    unsigned handled       = 0;
    uct_tcp_am_hdr_t *hdr;
    size_t recv_length;
    size_t remainder;

    ucs_trace_func("ep=%p", ep);

    if (!uct_tcp_ep_ctx_buf_need_progress(&ep->rx)) {
        ucs_assert(ep->rx.buf == NULL);
        ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
        if (ucs_unlikely(ep->rx.buf == NULL)) {
            ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
            return 0;
        }

        /* post the entire AM buffer */
        recv_length = iface->config.rx_seg_size;
    } else if (ep->rx.length < sizeof(*hdr)) {
        ucs_assert((ep->rx.buf != NULL) && (ep->rx.offset == 0));

        /* do partial receive of the remaining part of the hdr
         * and post the entire AM buffer */
        recv_length = iface->config.rx_seg_size - ep->rx.length;
    } else {
        ucs_assert((ep->rx.buf != NULL) &&
                   ((ep->rx.length - ep->rx.offset) >= sizeof(*hdr)));

        /* do partial receive of the remaining user data */
        hdr         = UCS_PTR_BYTE_OFFSET(ep->rx.buf, ep->rx.offset);
        recv_length = hdr->length - (ep->rx.length - ep->rx.offset - sizeof(*hdr));
    }

    if (!uct_tcp_ep_recv(ep, recv_length)) {
        goto out;
    }

    /* Parse received active messages */
    while (uct_tcp_ep_ctx_buf_need_progress(&ep->rx)) {
        remainder = ep->rx.length - ep->rx.offset;
        if (remainder < sizeof(*hdr)) {
            /* Move the partially received hdr to the beginning of the buffer */
            memmove(ep->rx.buf, UCS_PTR_BYTE_OFFSET(ep->rx.buf, ep->rx.offset),
                    remainder);
            ep->rx.offset = 0;
            ep->rx.length = remainder;
            handled++;
            goto out;
        }

        hdr = UCS_PTR_BYTE_OFFSET(ep->rx.buf, ep->rx.offset);
        ucs_assertv(hdr->length <= (iface->config.rx_seg_size - sizeof(*hdr)),
                    "tcp_ep %p (conn state - %s): %u vs %zu",
                    ep, uct_tcp_ep_cm_state[ep->conn_state].name, hdr->length,
                    (iface->config.rx_seg_size - sizeof(*hdr)));

        if (remainder < (sizeof(*hdr) + hdr->length)) {
            handled++;
            goto out;
        }

        /* Full message was received */
        ep->rx.offset += sizeof(*hdr) + hdr->length;
        ucs_assert(ep->rx.offset <= ep->rx.length);

        if (ucs_likely(hdr->am_id < UCT_AM_ID_MAX)) {
            uct_tcp_ep_comp_recv_am(iface, ep, hdr);
            handled++;
        } else if (hdr->am_id == UCT_TCP_EP_PUT_REQ_AM_ID) {
            ucs_assert(hdr->length == sizeof(uct_tcp_ep_put_req_hdr_t));
            uct_tcp_ep_handle_put_req(ep, (uct_tcp_ep_put_req_hdr_t*)(hdr + 1),
                                      ep->rx.length - ep->rx.offset);
            handled++;
            if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX)) {
                /* It means that PUT RX is in progress and EP RX buffer
                 * is used to keep PUT header. So, we don't need to
                 * release a EP RX buffer */
                goto out;
            }
        } else if (hdr->am_id == UCT_TCP_EP_PUT_ACK_AM_ID) {
            ucs_assert(hdr->length == sizeof(uint32_t));
            uct_tcp_ep_handle_put_ack(ep, (uct_tcp_ep_put_ack_hdr_t*)(hdr + 1));
            handled++;
        } else {
            ucs_assert(hdr->am_id == UCT_TCP_EP_CM_AM_ID);
            handled += 1 + uct_tcp_cm_handle_conn_pkt(&ep, hdr + 1, hdr->length);
            /* coverity[check_after_deref] */
            if (ep == NULL) {
                goto out;
            }
        }

        ucs_assert(ep != NULL);
    }

    uct_tcp_ep_ctx_reset(&ep->rx);

out:
    return handled;
}

static inline ucs_status_t
uct_tcp_ep_am_prepare(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
                      uint8_t am_id, uct_tcp_am_hdr_t **hdr)
{
    ucs_status_t status;

    status = uct_tcp_ep_check_tx_res(ep);
    if (ucs_unlikely(status != UCS_OK)) {
        if (ucs_likely(status == UCS_ERR_NO_RESOURCE)) {
            goto err_no_res;
        }
        return status;
    }

    ucs_assertv(ep->tx.buf == NULL, "ep=%p", ep);

    ep->tx.buf = ucs_mpool_get_inline(&iface->tx_mpool);
    if (ucs_unlikely(ep->tx.buf == NULL)) {
        goto err_no_res;
    }

    *hdr          = ep->tx.buf;
    (*hdr)->am_id = am_id;

    return UCS_OK;

err_no_res:
    if (ep->fd != -1) {
        uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
    }
    UCS_STATS_UPDATE_COUNTER(ep->super.stats, UCT_EP_STAT_NO_RES, 1);
    return UCS_ERR_NO_RESOURCE;
}

static unsigned uct_tcp_ep_progress_put_rx(uct_tcp_ep_t *ep)
{
    uct_tcp_ep_put_req_hdr_t *put_req;
    size_t recv_length;
    ucs_status_t status;

    put_req     = (uct_tcp_ep_put_req_hdr_t*)ep->rx.buf;
    recv_length = put_req->length;
    status      = ucs_socket_recv_nb(ep->fd, (void*)(uintptr_t)put_req->addr,
                                     &recv_length,
                                     uct_tcp_ep_io_err_handler_cb, ep);
    if (ucs_unlikely(status != UCS_OK)) {
        uct_tcp_ep_handle_recv_err(ep, status);
        return 0;
    }

    ucs_assertv(recv_length, "ep=%p", ep);

    uct_tcp_ep_put_rx_advance(ep, put_req, recv_length);

    return 1;
}

static unsigned uct_tcp_ep_progress_data_rx(uct_tcp_ep_t *ep)
{
    if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX))) {
        return uct_tcp_ep_progress_am_rx(ep);
    } else {
        return uct_tcp_ep_progress_put_rx(ep);
    }
}

static unsigned uct_tcp_ep_progress_magic_number_rx(uct_tcp_ep_t *ep)
{
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    char str_local_addr[UCS_SOCKADDR_STRING_LEN];
    char str_remote_addr[UCS_SOCKADDR_STRING_LEN];
    size_t recv_length, prev_length;
    uint64_t magic_number;

    if (ep->rx.buf == NULL) {
        ep->rx.buf = ucs_mpool_get_inline(&iface->rx_mpool);
        if (ucs_unlikely(ep->rx.buf == NULL)) {
            ucs_warn("tcp_ep %p: unable to get a buffer from RX memory pool", ep);
            return 0;
        }
    }

    prev_length = ep->rx.length;
    recv_length = sizeof(magic_number) - ep->rx.length;

    if (!uct_tcp_ep_recv(ep, recv_length) ||
        (ep->rx.length < sizeof(magic_number))) {
        return ((ep->rx.length - prev_length) > 0);
    }

    magic_number = *(uint64_t*)ep->rx.buf;

    if (magic_number != UCT_TCP_MAGIC_NUMBER) {
        /* Silently close this connection and destroy its EP */
        ucs_debug("tcp_iface %p (%s): received wrong magic number (expected: "
                  "%zu, received: %zu) for ep=%p (fd=%d) from %s", iface,
                  ucs_sockaddr_str((const struct sockaddr*)&iface->config.ifaddr,
                                   str_local_addr, UCS_SOCKADDR_STRING_LEN),
                  UCT_TCP_MAGIC_NUMBER, magic_number, ep,
                  ep->fd, ucs_socket_getname_str(ep->fd, str_remote_addr,
                                                 UCS_SOCKADDR_STRING_LEN));
        goto err;
    }

    uct_tcp_ep_ctx_reset(&ep->rx);

    uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_ACCEPTING);

    return 1;

err:
    uct_tcp_ep_destroy_internal(&ep->super.super);
    return 0;
}

static inline void
uct_tcp_ep_set_outstanding_zcopy(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
                                 uct_tcp_ep_zcopy_tx_t *ctx, const void *header,
                                 unsigned header_length, uct_completion_t *comp)
{
    ctx->comp     = comp;
    ep->ctx_caps |= UCS_BIT(UCT_TCP_EP_CTX_TYPE_ZCOPY_TX);

    if ((header_length != 0) &&
        /* check whether a user's header was sent or not */
        (ep->tx.offset < (sizeof(uct_tcp_am_hdr_t) + header_length))) {
        ucs_assert(header_length <= iface->config.zcopy.max_hdr);
        /* if the user's header wasn't sent completely, copy it to
         * the EP TX buffer (after Zcopy context and IOVs) for
         * retransmission. iov_len is already set to the proper value */
        ctx->iov[1].iov_base = UCS_PTR_BYTE_OFFSET(ep->tx.buf,
                                                   iface->config.zcopy.hdr_offset);
        memcpy(ctx->iov[1].iov_base, header, header_length);
    }

    ctx->iov_index = 0;
    ucs_iov_advance(ctx->iov, ctx->iov_cnt, &ctx->iov_index, ep->tx.offset);
    uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
}

static inline ucs_status_t
uct_tcp_ep_am_send(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
                   const uct_tcp_am_hdr_t *hdr)
{
    ssize_t offset;

    ep->tx.length       = sizeof(*hdr) + hdr->length;
    iface->outstanding += ep->tx.length;

    offset = uct_tcp_ep_send(ep);
    if (ucs_unlikely(offset < 0)) {
        return (ucs_status_t)offset;
    }

    uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, hdr->am_id,
                       hdr + 1, hdr->length, "SEND: ep %p fd %d sent "
                       "%zu/%zu bytes, moved by offset %zd",
                       ep, ep->fd, ep->tx.offset, ep->tx.length, offset);

    if (ucs_likely(!uct_tcp_ep_ctx_buf_need_progress(&ep->tx))) {
        uct_tcp_ep_ctx_reset(&ep->tx);
    } else {
        uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
    }

    return UCS_OK;
}

static const void*
uct_tcp_ep_am_sendv_get_trace_payload(uct_tcp_am_hdr_t *hdr,
                                      const void *header,
                                      const struct iovec *payload_iov,
                                      int short_sendv)
{
    if (!short_sendv) {
        return header;
    }

    /* If user requested trace data, we copy header and payload
     * to EP TX buffer in order to trace correct data */
    uct_am_short_fill_data(hdr + 1, *(const uint64_t*)header,
                           payload_iov->iov_base, payload_iov->iov_len);
    return (hdr + 1);
}

static inline ucs_status_t
uct_tcp_ep_am_sendv(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep,
                    int short_sendv, uct_tcp_am_hdr_t *hdr,
                    size_t send_limit, const void *header,
                    struct iovec *iov, size_t iov_cnt)
{
    ucs_status_t status;

    ep->tx.length += hdr->length + sizeof(*hdr);

    ucs_assertv(ep->tx.length <= send_limit, "ep=%p", ep);

    status = ucs_socket_sendv_nb(ep->fd, iov, iov_cnt,
                                 &ep->tx.offset, NULL, NULL);

    uct_iface_trace_am(&iface->super, UCT_AM_TRACE_TYPE_SEND, hdr->am_id,
                       /* the function will be invoked only in case of
                        * data tracing is enabled */
                       uct_tcp_ep_am_sendv_get_trace_payload(hdr, header,
                                                             &iov[2], short_sendv),
                       hdr->length, "SEND: ep %p fd %d sent %zu/%zu bytes, "
                       "moved by offset %zu, iov cnt %zu "
                       "[addr %p len %zu] [addr %p len %zu]",
                       ep, ep->fd, ep->tx.offset, ep->tx.length,
                       ep->tx.offset, iov_cnt,
                       /* print user-defined header or
                        * first iovec with a payload */
                       ((iov_cnt > 1) ? iov[1].iov_base : NULL),
                       ((iov_cnt > 1) ? iov[1].iov_len  : 0),
                       /* print first/second iovec with a payload */
                       ((iov_cnt > 2) ? iov[2].iov_base : NULL),
                       ((iov_cnt > 2) ? iov[2].iov_len  : 0));

    iface->outstanding += ep->tx.length - ep->tx.offset;

    return status;
}

static void uct_tcp_ep_post_put_ack(uct_tcp_ep_t *ep)
{
    uct_tcp_am_hdr_t *hdr  = NULL;
    uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
                                            uct_tcp_iface_t);
    uct_tcp_ep_put_ack_hdr_t *put_ack;
    ucs_status_t status;

    /* Make sure that we are sending nothing through this EP at the moment.
     * This check is needed to avoid mixing AM/PUT data sent from this EP
     * and this PUT ACK message */
    status = uct_tcp_ep_am_prepare(iface, ep,
                                   UCT_TCP_EP_PUT_ACK_AM_ID, &hdr);
    if (status != UCS_OK) {
        if (status == UCS_ERR_NO_RESOURCE) {
            ep->ctx_caps |= UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK);
        } else {
            ucs_error("tcp_ep %p: failed to prepare AM data", ep);
        }
        return;
    }

    /* Send PUT ACK to confirm completing PUT operations with
     * the last received sequence number == ep::rx::put_sn */
    ucs_assertv(hdr != NULL, "ep=%p", ep);
    hdr->length = sizeof(*put_ack);
    put_ack     = (uct_tcp_ep_put_ack_hdr_t*)(hdr + 1);         
    put_ack->sn = ep->rx.put_sn;

    uct_tcp_ep_am_send(iface, ep, hdr);

    /* If sending PUT ACK was OK, always remove SENDING ACK flag
     * as the function can be called from outstanding progress */
    ep->ctx_caps &= ~UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_RX_SENDING_ACK);
}

ucs_status_t uct_tcp_ep_am_short(uct_ep_h uct_ep, uint8_t am_id, uint64_t header,
                                 const void *payload, unsigned length)
{
    uct_tcp_ep_t *ep       = ucs_derived_of(uct_ep, uct_tcp_ep_t);
    uct_tcp_iface_t *iface = ucs_derived_of(uct_ep->iface, uct_tcp_iface_t);
    uct_tcp_am_hdr_t *hdr  = NULL;
    struct iovec iov[UCT_TCP_EP_AM_SHORTV_IOV_COUNT];
    uint32_t payload_length;
    size_t offset;
    ucs_status_t status;

    UCT_CHECK_LENGTH(length + sizeof(header), 0,
                     iface->config.tx_seg_size - sizeof(uct_tcp_am_hdr_t),
                     "am_short");
    UCT_CHECK_AM_ID(am_id);

    status = uct_tcp_ep_am_prepare(iface, ep, am_id, &hdr);
    if (status != UCS_OK) {
        return status;
    }

    ucs_assertv(hdr != NULL, "ep=%p", ep);

    /* Save the length of the payload, because hdr (ep::buf)
     * can be released inside `uct_tcp_ep_am_send` call */
    hdr->length = payload_length = length + sizeof(header);

    if (length <= iface->config.sendv_thresh) {
        uct_am_short_fill_data(hdr + 1, header, payload, length);
        status = uct_tcp_ep_am_send(iface, ep, hdr);
        if (ucs_unlikely(status != UCS_OK)) {
            uct_tcp_ep_ctx_reset(&ep->tx);
            return status;
        }

        UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, payload_length);
    } else {
        iov[0].iov_base = hdr;
        iov[0].iov_len  = sizeof(*hdr);

        iov[1].iov_base = &header;
        iov[1].iov_len  = sizeof(header);

        iov[2].iov_base = (void*)payload;
        iov[2].iov_len  = length;

        status = uct_tcp_ep_am_sendv(iface, ep, 1, hdr,
                                     iface->config.tx_seg_size, &header,
                                     iov, UCT_TCP_EP_AM_SHORTV_IOV_COUNT);
        if ((status == UCS_OK) || (status == UCS_ERR_NO_PROGRESS)) {
            UCT_TL_EP_STAT_OP(&ep->super, AM, SHORT, payload_length);

            if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
                /* Copy only user's header and payload to the TX buffer,
                 * TCP AM header is placed at the beginning of the buffer */
                offset = ((ep->tx.offset >= sizeof(*hdr)) ?
                          (ep->tx.offset - sizeof(*hdr)) : 0);

                ucs_iov_copy(&iov[1], UCT_TCP_EP_AM_SHORTV_IOV_COUNT - 1,
                             offset, UCS_PTR_BYTE_OFFSET(hdr + 1, offset),
                             ep->tx.length - sizeof(*hdr) - offset,
                             UCS_IOV_COPY_TO_BUF);
                uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
                return UCS_OK;
            }

            ucs_assert(status == UCS_OK);
        }

        uct_tcp_ep_ctx_reset(&ep->tx);
    }

    return status;
}

ssize_t uct_tcp_ep_am_bcopy(uct_ep_h uct_ep, uint8_t am_id,
                            uct_pack_callback_t pack_cb, void *arg,
                            unsigned flags)
{
    uct_tcp_ep_t *ep       = ucs_derived_of(uct_ep, uct_tcp_ep_t);
    uct_tcp_iface_t *iface = ucs_derived_of(uct_ep->iface, uct_tcp_iface_t);
    uct_tcp_am_hdr_t *hdr  = NULL;
    uint32_t payload_length;
    ucs_status_t status;

    UCT_CHECK_AM_ID(am_id);

    status = uct_tcp_ep_am_prepare(iface, ep, am_id, &hdr);
    if (status != UCS_OK) {
        return status;
    }

    ucs_assertv(hdr != NULL, "ep=%p", ep);

    /* Save the length of the payload, because hdr (ep::buf)
     * can be released inside `uct_tcp_ep_am_send` call */
    hdr->length = payload_length = pack_cb(hdr + 1, arg);

    status = uct_tcp_ep_am_send(iface, ep, hdr);
    if (ucs_unlikely(status != UCS_OK)) {
        uct_tcp_ep_ctx_reset(&ep->tx);
        return status;
    }

    UCT_TL_EP_STAT_OP(&ep->super, AM, BCOPY, payload_length);

    return payload_length;
}

static inline ucs_status_t
uct_tcp_ep_prepare_zcopy(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep, uint8_t am_id,
                         const void *header, unsigned header_length,
                         const uct_iov_t *iov, size_t iovcnt, const char *name,
                         size_t *zcopy_payload_p, uct_tcp_ep_zcopy_tx_t **ctx_p)
{
    uct_tcp_am_hdr_t *hdr = NULL;
    uct_tcp_ep_zcopy_tx_t *ctx;
    ucs_status_t status;

    UCT_CHECK_IOV_SIZE(iovcnt, iface->config.zcopy.max_iov, name);
    UCT_CHECK_LENGTH(header_length, 0, iface->config.zcopy.max_hdr, name);

    status = uct_tcp_ep_am_prepare(iface, ep, am_id, &hdr);
    if (ucs_unlikely(status != UCS_OK)) {
        return status;
    }

    ucs_assertv(hdr != NULL, "ep=%p", ep);

    ctx          = ucs_derived_of(hdr, uct_tcp_ep_zcopy_tx_t);
    ctx->iov_cnt = 0;

    /* TCP transport header */
    ctx->iov[ctx->iov_cnt].iov_base = hdr;
    ctx->iov[ctx->iov_cnt].iov_len  = sizeof(*hdr);
    ctx->iov_cnt++;

    /* User-defined or TCP internal protocol header */
    if (header_length != 0) {
        ucs_assert(header != NULL);
        ctx->iov[ctx->iov_cnt].iov_base = (void*)header;
        ctx->iov[ctx->iov_cnt].iov_len  = header_length;
        ctx->iov_cnt++;
    }

    /* User-defined payload */
    ctx->iov_cnt += uct_iovec_fill_iov(&ctx->iov[ctx->iov_cnt], iov,
                                       iovcnt, zcopy_payload_p);

    *ctx_p = ctx;

    return UCS_OK;
}

ucs_status_t uct_tcp_ep_am_zcopy(uct_ep_h uct_ep, uint8_t am_id, const void *header,
                                 unsigned header_length, const uct_iov_t *iov,
                                 size_t iovcnt, unsigned flags,
                                 uct_completion_t *comp)
{
    uct_tcp_ep_t *ep           = ucs_derived_of(uct_ep, uct_tcp_ep_t);
    uct_tcp_iface_t *iface     = ucs_derived_of(uct_ep->iface, uct_tcp_iface_t);
    uct_tcp_ep_zcopy_tx_t *ctx = NULL;
    size_t payload_length;
    ucs_status_t status;

    UCT_CHECK_LENGTH(header_length + uct_iov_total_length(iov, iovcnt), 0,
                     iface->config.rx_seg_size - sizeof(uct_tcp_am_hdr_t),
                     "am_zcopy");
    UCT_CHECK_AM_ID(am_id);

    status = uct_tcp_ep_prepare_zcopy(iface, ep, am_id, header, header_length,
                                      iov, iovcnt, "am_zcopy", &payload_length,
                                      &ctx);
    if (ucs_unlikely(status != UCS_OK)) {
        return status;
    }

    ctx->super.length = payload_length + header_length;

    status = uct_tcp_ep_am_sendv(iface, ep, 0, &ctx->super,
                                 iface->config.rx_seg_size,
                                 header, ctx->iov, ctx->iov_cnt);
    if (ucs_unlikely((status != UCS_OK) && (status != UCS_ERR_NO_PROGRESS))) {
        goto out;
    }

    UCT_TL_EP_STAT_OP(&ep->super, AM, ZCOPY, ctx->super.length);

    if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
        uct_tcp_ep_set_outstanding_zcopy(iface, ep, ctx, header,
                                         header_length, comp);
        return UCS_INPROGRESS;
    }

    ucs_assert(status == UCS_OK);

out:
    uct_tcp_ep_ctx_reset(&ep->tx);
    return status;
}

ucs_status_t uct_tcp_ep_put_zcopy(uct_ep_h uct_ep, const uct_iov_t *iov,
                                  size_t iovcnt, uint64_t remote_addr,
                                  uct_rkey_t rkey, uct_completion_t *comp)
{
    uct_tcp_ep_t *ep                 = ucs_derived_of(uct_ep, uct_tcp_ep_t);
    uct_tcp_iface_t *iface           = ucs_derived_of(uct_ep->iface, uct_tcp_iface_t);
    uct_tcp_ep_zcopy_tx_t *ctx       = NULL;
    uct_tcp_ep_put_req_hdr_t put_req = {0}; /* Suppress Cppcheck false-positive */
    ucs_status_t status;

    UCT_CHECK_LENGTH(sizeof(put_req) + uct_iov_total_length(iov, iovcnt), 0,
                     UCT_TCP_EP_PUT_ZCOPY_MAX - sizeof(uct_tcp_am_hdr_t),
                     "put_zcopy");

    status = uct_tcp_ep_prepare_zcopy(iface, ep, UCT_TCP_EP_PUT_REQ_AM_ID,
                                      &put_req, sizeof(put_req),
                                      iov, iovcnt, "put_zcopy",
                                      /* Set a payload length directly to the
                                       * TX length, since PUT Zcopy doesn't
                                       * set the payload length to TCP AM hdr */
                                      &ep->tx.length, &ctx);
    if (ucs_unlikely(status != UCS_OK)) {
        return status;
    }

    ctx->super.length = sizeof(put_req);
    put_req.addr      = remote_addr;
    put_req.length    = ep->tx.length;
    put_req.sn        = ep->tx.put_sn + 1;

    status = uct_tcp_ep_am_sendv(iface, ep, 0, &ctx->super, UCT_TCP_EP_PUT_ZCOPY_MAX,
                                 &put_req, ctx->iov, ctx->iov_cnt);
    if (ucs_unlikely((status != UCS_OK) && (status != UCS_ERR_NO_PROGRESS))) {
        goto out;
    }

    ep->tx.put_sn++;

    if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK))) {
        /* Add UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK flag and increment iface
         * outstanding operations counter in order to ensure returning
         * UCS_INPROGRESS from flush functions and do progressing.
         * UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK flag has to be removed upon PUT
         * ACK message receiving if there are no other PUT operations in-flight */
        ep->ctx_caps |= UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK);
        uct_tcp_iface_outstanding_inc(iface);
    }

    UCT_TL_EP_STAT_OP(&ep->super, PUT, ZCOPY, put_req.length);

    if (uct_tcp_ep_ctx_buf_need_progress(&ep->tx)) {
        uct_tcp_ep_set_outstanding_zcopy(iface, ep, ctx, &put_req,
                                         sizeof(put_req), comp);
        return UCS_INPROGRESS;
    }

    ucs_assert(status == UCS_OK);

out:
    uct_tcp_ep_ctx_reset(&ep->tx);
    return status;
}

ucs_status_t uct_tcp_ep_pending_add(uct_ep_h tl_ep, uct_pending_req_t *req,
                                    unsigned flags)
{
    uct_tcp_ep_t *ep = ucs_derived_of(tl_ep, uct_tcp_ep_t);

    if (uct_tcp_ep_check_tx_res(ep) == UCS_OK) {
        return UCS_ERR_BUSY;
    }

    uct_pending_req_queue_push(&ep->pending_q, req);
    UCT_TL_EP_STAT_PEND(&ep->super);
    return UCS_OK;
}

void uct_tcp_ep_pending_purge(uct_ep_h tl_ep, uct_pending_purge_callback_t cb,
                              void *arg)
{
    uct_tcp_ep_t *ep = ucs_derived_of(tl_ep, uct_tcp_ep_t);
    uct_pending_req_priv_queue_t UCS_V_UNUSED *priv;

    uct_pending_queue_purge(priv, &ep->pending_q, 1, cb, arg);
}

ucs_status_t uct_tcp_ep_flush(uct_ep_h tl_ep, unsigned flags,
                              uct_completion_t *comp)
{
    uct_tcp_ep_t *ep = ucs_derived_of(tl_ep, uct_tcp_ep_t);
    uct_tcp_ep_put_completion_t *put_comp;

    if (uct_tcp_ep_check_tx_res(ep) == UCS_ERR_NO_RESOURCE) {
        UCT_TL_EP_STAT_FLUSH_WAIT(&ep->super);
        return UCS_ERR_NO_RESOURCE;
    }

    if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_PUT_TX_WAITING_ACK)) {
        if (comp != NULL) {
            put_comp = ucs_calloc(1, sizeof(*put_comp), "put completion");
            if (put_comp == NULL) {
                return UCS_ERR_NO_MEMORY;
            }

            put_comp->wait_put_sn = ep->tx.put_sn;
            put_comp->comp        = comp;
            ucs_queue_push(&ep->put_comp_q, &put_comp->elem);
        }

        return UCS_INPROGRESS;
    }

    UCT_TL_EP_STAT_FLUSH(&ep->super);
    return UCS_OK;
}