/** * Copyright (C) Mellanox Technologies Ltd. 2019. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #include "tcp_sockcm_ep.h" #include #include ucs_config_field_t uct_tcp_sockcm_config_table[] = { {"", "", NULL, ucs_offsetof(uct_tcp_sockcm_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_cm_config_table)}, {"PRIV_DATA_LEN", "2048", "TCP CM private data length", ucs_offsetof(uct_tcp_sockcm_config_t, priv_data_len), UCS_CONFIG_TYPE_MEMUNITS}, {NULL} }; static ucs_status_t uct_tcp_sockcm_query(uct_cm_h cm, uct_cm_attr_t *cm_attr) { uct_tcp_sockcm_t *tcp_sockcm = ucs_derived_of(cm, uct_tcp_sockcm_t); if (cm_attr->field_mask & UCT_CM_ATTR_FIELD_MAX_CONN_PRIV) { cm_attr->max_conn_priv = tcp_sockcm->priv_data_len; } return UCS_OK; } static uct_cm_ops_t uct_tcp_sockcm_ops = { .close = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_t), .cm_query = uct_tcp_sockcm_query, .listener_create = UCS_CLASS_NEW_FUNC_NAME(uct_tcp_listener_t), .listener_reject = uct_tcp_listener_reject, .listener_query = uct_tcp_listener_query, .listener_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_listener_t), .ep_create = uct_tcp_sockcm_ep_create }; static void uct_tcp_close_ep(uct_tcp_sockcm_ep_t *ep) { ucs_list_del(&ep->list); ucs_async_remove_handler(ep->fd, 1); close(ep->fd); ep->fd = -1; UCS_CLASS_DELETE(uct_tcp_sockcm_ep_t, ep); } void uct_tcp_sa_data_handler(int fd, void *arg) { uct_tcp_sockcm_ep_t *ep = (uct_tcp_sockcm_ep_t*)arg; ucs_status_t status; ucs_assertv(ep->fd == fd, "ep->fd %d fd %d, ep_state %d", ep->fd, fd, ep->state); if (!ucs_socket_is_connected(fd)) { ucs_debug("fd %d is not connected. ep state: %d", fd, ep->state); /* coverity[check_return] */ ucs_async_modify_handler(fd, 0); return; } switch (ep->state) { case UCT_TCP_SOCKCM_EP_ON_CLIENT: /* connect() completed, send data to the server */ ep->state |= UCT_TCP_SOCKCM_EP_CONNECTED; status = uct_tcp_sockcm_ep_send_priv_data(ep); if (status != UCS_OK) { ucs_async_modify_handler(fd, 0); } break; case UCT_TCP_SOCKCM_EP_ON_CLIENT | UCT_TCP_SOCKCM_EP_CONN_SENDING: /* can send, progress the sending */ status = uct_tcp_sockcm_ep_progress_send(ep); if (status != UCS_OK) { ucs_async_modify_handler(fd, 0); } break; case UCT_TCP_SOCKCM_EP_ON_CLIENT | UCT_TCP_SOCKCM_EP_CONN_SENT: /* finished sending. TODO recv data from the server */ ucs_async_modify_handler(fd, 0); break; case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONNECTED: /* receive data from the client */ status = uct_tcp_sockcm_ep_recv(ep); if (status != UCS_OK) { uct_tcp_close_ep(ep); } break; case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONN_RECEIVING: /* can read, progress the receving */ status = uct_tcp_sockcm_ep_progress_recv(ep); if (status != UCS_OK) { uct_tcp_close_ep(ep); } break; case UCT_TCP_SOCKCM_EP_ON_SERVER | UCT_TCP_SOCKCM_EP_CONN_RECEIVED: /* finished recv, can send. TODO send data to the client */ ucs_async_modify_handler(fd, 0); break; default: ucs_error("unexpected event on client ep %p (state=%d)", ep, ep->state); } } static uct_iface_ops_t uct_tcp_sockcm_iface_ops = { .ep_pending_purge = (uct_ep_pending_purge_func_t)ucs_empty_function, .ep_disconnect = uct_tcp_sockcm_ep_disconnect, .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_tcp_sockcm_ep_t), .ep_put_short = (uct_ep_put_short_func_t)ucs_empty_function_return_unsupported, .ep_put_bcopy = (uct_ep_put_bcopy_func_t)ucs_empty_function_return_unsupported, .ep_get_bcopy = (uct_ep_get_bcopy_func_t)ucs_empty_function_return_unsupported, .ep_am_short = (uct_ep_am_short_func_t)ucs_empty_function_return_unsupported, .ep_am_bcopy = (uct_ep_am_bcopy_func_t)ucs_empty_function_return_unsupported, .ep_atomic_cswap64 = (uct_ep_atomic_cswap64_func_t)ucs_empty_function_return_unsupported, .ep_atomic64_post = (uct_ep_atomic64_post_func_t)ucs_empty_function_return_unsupported, .ep_atomic64_fetch = (uct_ep_atomic64_fetch_func_t)ucs_empty_function_return_unsupported, .ep_atomic_cswap32 = (uct_ep_atomic_cswap32_func_t)ucs_empty_function_return_unsupported, .ep_atomic32_post = (uct_ep_atomic32_post_func_t)ucs_empty_function_return_unsupported, .ep_atomic32_fetch = (uct_ep_atomic32_fetch_func_t)ucs_empty_function_return_unsupported, .ep_pending_add = (uct_ep_pending_add_func_t)ucs_empty_function_return_unsupported, .ep_flush = (uct_ep_flush_func_t)ucs_empty_function_return_unsupported, .ep_fence = (uct_ep_fence_func_t)ucs_empty_function_return_unsupported, .ep_check = (uct_ep_check_func_t)ucs_empty_function_return_unsupported, .ep_create = (uct_ep_create_func_t)ucs_empty_function_return_unsupported, .iface_flush = (uct_iface_flush_func_t)ucs_empty_function_return_unsupported, .iface_fence = (uct_iface_fence_func_t)ucs_empty_function_return_unsupported, .iface_progress_enable = ucs_empty_function, .iface_progress_disable = ucs_empty_function, .iface_progress = (uct_iface_progress_func_t)ucs_empty_function_return_zero, .iface_event_fd_get = (uct_iface_event_fd_get_func_t)ucs_empty_function_return_unsupported, .iface_event_arm = (uct_iface_event_arm_func_t)ucs_empty_function_return_unsupported, .iface_close = ucs_empty_function, .iface_query = (uct_iface_query_func_t)ucs_empty_function_return_unsupported, .iface_get_device_address = (uct_iface_get_device_address_func_t)ucs_empty_function_return_unsupported, .iface_get_address = (uct_iface_get_address_func_t)ucs_empty_function_return_unsupported, .iface_is_reachable = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero }; UCS_CLASS_INIT_FUNC(uct_tcp_sockcm_t, uct_component_h component, uct_worker_h worker, const uct_cm_config_t *config) { uct_tcp_sockcm_config_t *cm_config = ucs_derived_of(config, uct_tcp_sockcm_config_t); UCS_CLASS_CALL_SUPER_INIT(uct_cm_t, &uct_tcp_sockcm_ops, &uct_tcp_sockcm_iface_ops, worker, component); self->priv_data_len = cm_config->priv_data_len - sizeof(uct_tcp_sockcm_priv_data_hdr_t); ucs_list_head_init(&self->ep_list); ucs_debug("created tcp_sockcm %p", self); return UCS_OK; } UCS_CLASS_CLEANUP_FUNC(uct_tcp_sockcm_t) { uct_tcp_sockcm_ep_t *ep, *tmp; UCS_ASYNC_BLOCK(self->super.iface.worker->async); ucs_list_for_each_safe(ep, tmp, &self->ep_list, list) { uct_tcp_close_ep(ep); } UCS_ASYNC_UNBLOCK(self->super.iface.worker->async); } UCS_CLASS_DEFINE(uct_tcp_sockcm_t, uct_cm_t); UCS_CLASS_DEFINE_NEW_FUNC(uct_tcp_sockcm_t, uct_cm_t, uct_component_h, uct_worker_h, const uct_cm_config_t *); UCS_CLASS_DEFINE_DELETE_FUNC(uct_tcp_sockcm_t, uct_cm_t);