Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2019.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#include "tcp_sockcm_ep.h"

#include <ucs/async/async.h>
#include <ucs/sys/sock.h>


ucs_config_field_t uct_tcp_sockcm_config_table[] = {
  {"", "", NULL,
   ucs_offsetof(uct_tcp_sockcm_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_cm_config_table)},

  {"PRIV_DATA_LEN", "2048",
   "TCP CM private data length",
   ucs_offsetof(uct_tcp_sockcm_config_t, priv_data_len), UCS_CONFIG_TYPE_MEMUNITS},

  {NULL}
};

static ucs_status_t uct_tcp_sockcm_query(uct_cm_h cm, uct_cm_attr_t *cm_attr)
{
    uct_tcp_sockcm_t *tcp_sockcm = ucs_derived_of(cm, uct_tcp_sockcm_t);

    if (cm_attr->field_mask & UCT_CM_ATTR_FIELD_MAX_CONN_PRIV) {
        cm_attr->max_conn_priv = tcp_sockcm->priv_data_len;
    }

    return UCS_OK;
}

static uct_cm_ops_t uct_tcp_sockcm_ops = {
    .close            = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_t),
    .cm_query         = uct_tcp_sockcm_query,
    .listener_create  = UCS_CLASS_NEW_FUNC_NAME(uct_tcp_listener_t),
    .listener_reject  = uct_tcp_listener_reject,
    .listener_query   = uct_tcp_listener_query,
    .listener_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_listener_t),
    .ep_create        = uct_tcp_sockcm_ep_create
};

static void uct_tcp_close_ep(uct_tcp_sockcm_ep_t *ep)
{
    ucs_list_del(&ep->list);
    ucs_async_remove_handler(ep->fd, 1);
    close(ep->fd);
    ep->fd = -1;
    UCS_CLASS_DELETE(uct_tcp_sockcm_ep_t, ep);
}

void uct_tcp_sa_data_handler(int fd, void *arg)
{
    uct_tcp_sockcm_ep_t *ep = (uct_tcp_sockcm_ep_t*)arg;
    ucs_status_t status;

    ucs_assertv(ep->fd == fd, "ep->fd %d fd %d, ep_state %d", ep->fd, fd, ep->state);

    if (!ucs_socket_is_connected(fd)) {
        ucs_debug("fd %d is not connected. ep state: %d", fd, ep->state);
        /* coverity[check_return] */
        ucs_async_modify_handler(fd, 0);
        return;
    }

    switch (ep->state) {
    case UCT_TCP_SOCKCM_EP_ON_CLIENT:
        /* connect() completed, send data to the server */
        ep->state |= UCT_TCP_SOCKCM_EP_CONNECTED;

        status = uct_tcp_sockcm_ep_send_priv_data(ep);
        if (status != UCS_OK) {
            ucs_async_modify_handler(fd, 0);
        }
        break;
    case UCT_TCP_SOCKCM_EP_ON_CLIENT | UCT_TCP_SOCKCM_EP_CONN_SENDING:
         /* can send, progress the sending */
         status = uct_tcp_sockcm_ep_progress_send(ep);
         if (status != UCS_OK) {
             ucs_async_modify_handler(fd, 0);
         }
         break;
    case UCT_TCP_SOCKCM_EP_ON_CLIENT | UCT_TCP_SOCKCM_EP_CONN_SENT:
         /* finished sending. TODO recv data from the server */
         ucs_async_modify_handler(fd, 0);
         break;
    case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONNECTED:
        /* receive data from the client */
        status = uct_tcp_sockcm_ep_recv(ep);
        if (status != UCS_OK) {
            uct_tcp_close_ep(ep);
        }
        break;
    case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONN_RECEIVING:
         /* can read, progress the receving */
         status = uct_tcp_sockcm_ep_progress_recv(ep);
         if (status != UCS_OK) {
             uct_tcp_close_ep(ep);
         }
         break;
    case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONN_RECEIVED:
         /* finished recv, can send. TODO send data to the client */
         ucs_async_modify_handler(fd, 0);
         break;
    default:
        ucs_error("unexpected event on client ep %p (state=%d)", ep, ep->state);
    }
}

static uct_iface_ops_t uct_tcp_sockcm_iface_ops = {
    .ep_pending_purge         = (uct_ep_pending_purge_func_t)ucs_empty_function,
    .ep_disconnect            = uct_tcp_sockcm_ep_disconnect,
    .ep_destroy               = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_ep_t),
    .ep_put_short             = (uct_ep_put_short_func_t)ucs_empty_function_return_unsupported,
    .ep_put_bcopy             = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_get_bcopy             = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_am_short              = (uct_ep_am_short_func_t)ucs_empty_function_return_unsupported,
    .ep_am_bcopy              = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic_cswap64        = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic64_post         = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic64_fetch        = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic_cswap32        = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic32_post         = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_unsupported,
    .ep_atomic32_fetch        = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_unsupported,
    .ep_pending_add           = (uct_ep_pending_add_func_t)ucs_empty_function_return_unsupported,
    .ep_flush                 = (uct_ep_flush_func_t)ucs_empty_function_return_unsupported,
    .ep_fence                 = (uct_ep_fence_func_t)ucs_empty_function_return_unsupported,
    .ep_check                 = (uct_ep_check_func_t)ucs_empty_function_return_unsupported,
    .ep_create                = (uct_ep_create_func_t)ucs_empty_function_return_unsupported,
    .iface_flush              = (uct_iface_flush_func_t)ucs_empty_function_return_unsupported,
    .iface_fence              = (uct_iface_fence_func_t)ucs_empty_function_return_unsupported,
    .iface_progress_enable    = ucs_empty_function,
    .iface_progress_disable   = ucs_empty_function,
    .iface_progress           = (uct_iface_progress_func_t)ucs_empty_function_return_zero,
    .iface_event_fd_get       = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_unsupported,
    .iface_event_arm          = (uct_iface_event_arm_func_t)ucs_empty_function_return_unsupported,
    .iface_close              = ucs_empty_function,
    .iface_query              = (uct_iface_query_func_t)ucs_empty_function_return_unsupported,
    .iface_get_device_address = (uct_iface_get_device_address_func_t)ucs_empty_function_return_unsupported,
    .iface_get_address        = (uct_iface_get_address_func_t)ucs_empty_function_return_unsupported,
    .iface_is_reachable       = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero
};

UCS_CLASS_INIT_FUNC(uct_tcp_sockcm_t, uct_component_h component,
                    uct_worker_h worker, const uct_cm_config_t *config)
{
    uct_tcp_sockcm_config_t *cm_config = ucs_derived_of(config,
                                                        uct_tcp_sockcm_config_t);

    UCS_CLASS_CALL_SUPER_INIT(uct_cm_t, &uct_tcp_sockcm_ops,
                              &uct_tcp_sockcm_iface_ops, worker, component);

    self->priv_data_len = cm_config->priv_data_len -
                          sizeof(uct_tcp_sockcm_priv_data_hdr_t);

    ucs_list_head_init(&self->ep_list);

    ucs_debug("created tcp_sockcm %p", self);

    return UCS_OK;
}

UCS_CLASS_CLEANUP_FUNC(uct_tcp_sockcm_t)
{
    uct_tcp_sockcm_ep_t *ep, *tmp;

    UCS_ASYNC_BLOCK(self->super.iface.worker->async);

    ucs_list_for_each_safe(ep, tmp, &self->ep_list, list) {
        uct_tcp_close_ep(ep);
    }

    UCS_ASYNC_UNBLOCK(self->super.iface.worker->async);
}

UCS_CLASS_DEFINE(uct_tcp_sockcm_t, uct_cm_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_tcp_sockcm_t, uct_cm_t, uct_component_h,
                          uct_worker_h, const uct_cm_config_t *);
UCS_CLASS_DEFINE_DELETE_FUNC(uct_tcp_sockcm_t, uct_cm_t);