/** * Copyright (C) Mellanox Technologies Ltd. 2017-2019. ALL RIGHTS RESERVED. * Copyright (C) NVIDIA Corporation. 2019. ALL RIGHTS RESERVED. * See file LICENSE for terms. */ #include "sockcm_iface.h" #include "sockcm_ep.h" #include #include #include #include enum uct_sockcm_process_event_flags { UCT_SOCKCM_PROCESS_EVENT_DESTROY_SOCK_ID_FLAG = UCS_BIT(0), UCT_SOCKCM_PROCESS_EVENT_ACK_EVENT_FLAG = UCS_BIT(1) }; static ucs_config_field_t uct_sockcm_iface_config_table[] = { {"", "", NULL, ucs_offsetof(uct_sockcm_iface_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_iface_config_table)}, {"BACKLOG", "1024", "Maximum number of pending connections for a listening socket.", ucs_offsetof(uct_sockcm_iface_config_t, backlog), UCS_CONFIG_TYPE_UINT}, {NULL} }; static UCS_CLASS_DECLARE_DELETE_FUNC(uct_sockcm_iface_t, uct_iface_t); static ucs_status_t uct_sockcm_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr) { uct_sockcm_iface_t *iface = ucs_derived_of(tl_iface, uct_sockcm_iface_t); struct sockaddr_storage addr; ucs_status_t status; uct_base_iface_query(&iface->super, iface_attr); iface_attr->iface_addr_len = sizeof(ucs_sock_addr_t); iface_attr->device_addr_len = 0; iface_attr->cap.flags = UCT_IFACE_FLAG_CONNECT_TO_SOCKADDR | UCT_IFACE_FLAG_CB_ASYNC | UCT_IFACE_FLAG_ERRHANDLE_PEER_FAILURE; iface_attr->max_conn_priv = UCT_SOCKCM_MAX_CONN_PRIV; if (iface->is_server) { socklen_t len = sizeof(struct sockaddr_storage); if (getsockname(iface->listen_fd, (struct sockaddr *)&addr, &len)) { ucs_error("sockcm_iface: getsockname failed %m"); return UCS_ERR_IO_ERROR; } status = ucs_sockaddr_copy((struct sockaddr *)&iface_attr->listen_sockaddr, (const struct sockaddr *)&addr); if (status != UCS_OK) { return status; } } return UCS_OK; } static ucs_status_t uct_sockcm_iface_get_address(uct_iface_h tl_iface, uct_iface_addr_t *iface_addr) { ucs_sock_addr_t *sockcm_addr = (ucs_sock_addr_t *)iface_addr; sockcm_addr->addr = NULL; sockcm_addr->addrlen = 0; return UCS_OK; } static ucs_status_t uct_sockcm_iface_notify_client(int notif_val, uct_conn_request_h conn_request) { char notif = notif_val; int fd; fd = ((uct_sockcm_ctx_t *) conn_request)->sock_fd; return ucs_socket_send(fd, ¬if, sizeof(notif), NULL, NULL); } static ucs_status_t uct_sockcm_iface_accept(uct_iface_h tl_iface, uct_conn_request_h conn_request) { return uct_sockcm_iface_notify_client(UCT_SOCKCM_IFACE_NOTIFY_ACCEPT, conn_request); } static ucs_status_t uct_sockcm_iface_reject(uct_iface_h tl_iface, uct_conn_request_h conn_request) { return uct_sockcm_iface_notify_client(UCT_SOCKCM_IFACE_NOTIFY_REJECT, conn_request); } static ucs_status_t uct_sockcm_ep_flush(uct_ep_h tl_ep, unsigned flags, uct_completion_t *comp) { uct_sockcm_ep_t *ep = ucs_derived_of(tl_ep, uct_sockcm_ep_t); ucs_status_t status; uct_sockcm_ep_op_t *op; pthread_mutex_lock(&ep->ops_mutex); status = ep->status; if ((status == UCS_INPROGRESS) && (comp != NULL)) { op = ucs_malloc(sizeof(*op), "uct_sockcm_ep_flush op"); if (op != NULL) { op->user_comp = comp; ucs_queue_push(&ep->ops, &op->queue_elem); } else { status = UCS_ERR_NO_MEMORY; } } pthread_mutex_unlock(&ep->ops_mutex); return status; } static uct_iface_ops_t uct_sockcm_iface_ops = { .ep_create = UCS_CLASS_NEW_FUNC_NAME(uct_sockcm_ep_t), .ep_destroy = UCS_CLASS_DELETE_FUNC_NAME(uct_sockcm_ep_t), .ep_flush = uct_sockcm_ep_flush, .ep_fence = uct_base_ep_fence, .ep_pending_purge = ucs_empty_function, .iface_accept = uct_sockcm_iface_accept, .iface_reject = uct_sockcm_iface_reject, .iface_progress_enable = (uct_iface_progress_enable_func_t)ucs_empty_function_return_success, .iface_progress_disable = (uct_iface_progress_disable_func_t)ucs_empty_function_return_success, .iface_progress = ucs_empty_function_return_zero, .iface_flush = uct_base_iface_flush, .iface_fence = uct_base_iface_fence, .iface_close = UCS_CLASS_DELETE_FUNC_NAME(uct_sockcm_iface_t), .iface_query = uct_sockcm_iface_query, .iface_is_reachable = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero, .iface_get_device_address = (uct_iface_get_device_address_func_t)ucs_empty_function_return_success, .iface_get_address = uct_sockcm_iface_get_address }; static ucs_status_t uct_sockcm_iface_process_conn_req(uct_sockcm_ctx_t *sock_id_ctx) { uct_sockcm_iface_t *iface = sock_id_ctx->iface; uct_sockcm_conn_param_t *conn_param = &sock_id_ctx->conn_param; ucs_debug("process conn req conn_param = %p, conn_param->length = %ld", conn_param, conn_param->length); iface->conn_request_cb(&iface->super.super, iface->conn_request_arg, sock_id_ctx, conn_param->private_data, conn_param->length); return UCS_OK; } static void uct_sockcm_iface_recv_handler(int fd, void *arg) { uct_sockcm_ctx_t *sock_id_ctx = (uct_sockcm_ctx_t *) arg; ucs_status_t status; size_t recv_len; /* attempt another receive only if initial receive was not successful */ recv_len = sizeof(uct_sockcm_conn_param_t) - sock_id_ctx->recv_len; if (recv_len == 0) { goto out_remove_handler; } status = ucs_socket_recv_nb(sock_id_ctx->sock_fd, UCS_PTR_BYTE_OFFSET(&sock_id_ctx->conn_param, sock_id_ctx->recv_len), &recv_len, NULL, NULL); if ((status == UCS_ERR_CANCELED) || (status == UCS_ERR_IO_ERROR)) { ucs_warn("recv failed in recv handler"); /* TODO: clean up resources allocated for client endpoint? */ return; } sock_id_ctx->recv_len += ((UCS_ERR_NO_PROGRESS == status) ? 0 : recv_len); if (sock_id_ctx->recv_len != sizeof(uct_sockcm_conn_param_t)) { /* handler should be notified when remaining pieces show up */ return; } if (UCS_OK != uct_sockcm_iface_process_conn_req((uct_sockcm_ctx_t*)arg)) { ucs_error("unable to process connection request"); } out_remove_handler: status = ucs_async_modify_handler(fd, 0); if (status != UCS_OK) { ucs_debug("unable to modify handler"); } } static void uct_sockcm_iface_event_handler(int fd, void *arg) { size_t recv_len = 0; uct_sockcm_iface_t *iface = arg; uct_sockcm_ctx_t *sock_id_ctx = NULL; struct sockaddr peer_addr; socklen_t addrlen; int accept_fd; char ip_port_str[UCS_SOCKADDR_STRING_LEN]; ucs_status_t status; addrlen = sizeof(struct sockaddr); accept_fd = accept(iface->listen_fd, (struct sockaddr*)&peer_addr, &addrlen); if (accept_fd == -1) { if ((errno == EAGAIN) || (errno == EINTR)) { ucs_debug("accept(fd=%d) failed: %m", iface->listen_fd); } else { /* accept failed here, let the client try again */ ucs_warn("accept(fd=%d) failed with non-recoverable error %m", iface->listen_fd); } return; } ucs_debug("sockcm_iface %p: accepted connection from %s at fd %d %m", iface, ucs_sockaddr_str(&peer_addr, ip_port_str, UCS_SOCKADDR_STRING_LEN), accept_fd); /* Unlike rdmacm, socket connect/accept does not permit exchange of * connection parameters but we need to use send/recv on top of that * We simulate that with an explicit receive */ sock_id_ctx = ucs_malloc(sizeof(uct_sockcm_ctx_t), "accepted sock_id_ctx"); if (sock_id_ctx == NULL) { ucs_error("sockcm_listener: unable to create mem for accepted fd"); close(accept_fd); return; } sock_id_ctx->recv_len = 0; sock_id_ctx->sock_fd = accept_fd; sock_id_ctx->iface = iface; status = ucs_sys_fcntl_modfl(sock_id_ctx->sock_fd, O_NONBLOCK, 0); if (status != UCS_OK) { ucs_error("sockcm_listener: unable make accepted fd non-blocking"); goto err; } recv_len = sizeof(sock_id_ctx->conn_param); status = ucs_socket_recv_nb(accept_fd, &sock_id_ctx->conn_param, &recv_len, NULL, NULL); if (UCS_OK != status) { sock_id_ctx->recv_len = ((UCS_ERR_NO_PROGRESS == status) ? 0: recv_len); status = ucs_async_set_event_handler(iface->super.worker->async->mode, sock_id_ctx->sock_fd, UCS_EVENT_SET_EVREAD, uct_sockcm_iface_recv_handler, sock_id_ctx, iface->super.worker->async); if (status != UCS_OK) { ucs_fatal("sockcm_listener: unable to create handler for new connection"); goto err; } ucs_debug("assigning recv handler for message from client"); } else { ucs_debug("not assigning recv handler for message from client"); if (UCS_OK != uct_sockcm_iface_process_conn_req(sock_id_ctx)) { ucs_error("Unable to process connection request"); } } UCS_ASYNC_BLOCK(iface->super.worker->async); ucs_list_add_tail(&iface->used_sock_ids_list, &sock_id_ctx->list); UCS_ASYNC_UNBLOCK(iface->super.worker->async); return; err: uct_sockcm_ep_put_sock_id(sock_id_ctx); return; } static UCS_CLASS_INIT_FUNC(uct_sockcm_iface_t, uct_md_h md, uct_worker_h worker, const uct_iface_params_t *params, const uct_iface_config_t *tl_config) { uct_sockcm_iface_config_t *config = ucs_derived_of(tl_config, uct_sockcm_iface_config_t); char ip_port_str[UCS_SOCKADDR_STRING_LEN]; ucs_status_t status; struct sockaddr *param_sockaddr; int param_sockaddr_len; UCT_CHECK_PARAM(params->field_mask & UCT_IFACE_PARAM_FIELD_OPEN_MODE, "UCT_IFACE_PARAM_FIELD_OPEN_MODE is not defined"); UCT_CHECK_PARAM((params->open_mode & UCT_IFACE_OPEN_MODE_SOCKADDR_SERVER) || (params->open_mode & UCT_IFACE_OPEN_MODE_SOCKADDR_CLIENT), "Invalid open mode %zu", params->open_mode); UCT_CHECK_PARAM(!(params->open_mode & UCT_IFACE_OPEN_MODE_SOCKADDR_SERVER) || (params->field_mask & UCT_IFACE_PARAM_FIELD_SOCKADDR), "UCT_IFACE_PARAM_FIELD_SOCKADDR is not defined " "for UCT_IFACE_OPEN_MODE_SOCKADDR_SERVER"); UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t, &uct_sockcm_iface_ops, md, worker, params, tl_config UCS_STATS_ARG((params->field_mask & UCT_IFACE_PARAM_FIELD_STATS_ROOT) ? params->stats_root : NULL) UCS_STATS_ARG(UCT_SOCKCM_TL_NAME)); if (self->super.worker->async == NULL) { ucs_error("sockcm must have async != NULL"); return UCS_ERR_INVALID_PARAM; } if (self->super.worker->async->mode == UCS_ASYNC_MODE_SIGNAL) { ucs_warn("sockcm does not support SIGIO"); } self->listen_fd = -1; if (params->open_mode & UCT_IFACE_OPEN_MODE_SOCKADDR_SERVER) { if (!(params->mode.sockaddr.cb_flags & UCT_CB_FLAG_ASYNC)) { return UCS_ERR_INVALID_PARAM; } param_sockaddr = (struct sockaddr *)params->mode.sockaddr.listen_sockaddr.addr; param_sockaddr_len = params->mode.sockaddr.listen_sockaddr.addrlen; status = ucs_socket_create(param_sockaddr->sa_family, SOCK_STREAM, &self->listen_fd); if (status != UCS_OK) { return status; } status = ucs_sys_fcntl_modfl(self->listen_fd, O_NONBLOCK, 0); if (status != UCS_OK) { goto err_close_sock; } if (0 > bind(self->listen_fd, param_sockaddr, param_sockaddr_len)) { ucs_error("bind(fd=%d) failed: %m", self->listen_fd); status = (errno == EADDRINUSE) ? UCS_ERR_BUSY : UCS_ERR_IO_ERROR; goto err_close_sock; } if (0 > listen(self->listen_fd, config->backlog)) { ucs_error("listen(fd=%d; backlog=%d)", self->listen_fd, config->backlog); status = UCS_ERR_IO_ERROR; goto err_close_sock; } status = ucs_async_set_event_handler(self->super.worker->async->mode, self->listen_fd, UCS_EVENT_SET_EVREAD | UCS_EVENT_SET_EVERR, uct_sockcm_iface_event_handler, self, self->super.worker->async); if (status != UCS_OK) { goto err_close_sock; } ucs_debug("iface (%p) sockcm id %d listening on %s", self, self->listen_fd, ucs_sockaddr_str(param_sockaddr, ip_port_str, UCS_SOCKADDR_STRING_LEN)); self->cb_flags = params->mode.sockaddr.cb_flags; self->conn_request_cb = params->mode.sockaddr.conn_request_cb; self->conn_request_arg = params->mode.sockaddr.conn_request_arg; self->is_server = 1; } else { self->is_server = 0; } ucs_list_head_init(&self->used_sock_ids_list); return UCS_OK; err_close_sock: close(self->listen_fd); return status; } static UCS_CLASS_CLEANUP_FUNC(uct_sockcm_iface_t) { uct_sockcm_ctx_t *sock_id_ctx; if (self->is_server) { if (-1 != self->listen_fd) { ucs_debug("cleaning listen_fd = %d", self->listen_fd); ucs_async_remove_handler(self->listen_fd, 1); close(self->listen_fd); } } UCS_ASYNC_BLOCK(self->super.worker->async); while (!ucs_list_is_empty(&self->used_sock_ids_list)) { sock_id_ctx = ucs_list_extract_head(&self->used_sock_ids_list, uct_sockcm_ctx_t, list); ucs_debug("cleaning server fd = %d", sock_id_ctx->sock_fd); ucs_async_remove_handler(sock_id_ctx->sock_fd, 1); uct_sockcm_ep_put_sock_id(sock_id_ctx); } UCS_ASYNC_UNBLOCK(self->super.worker->async); } UCS_CLASS_DEFINE(uct_sockcm_iface_t, uct_base_iface_t); static UCS_CLASS_DEFINE_NEW_FUNC(uct_sockcm_iface_t, uct_iface_t, uct_md_h, uct_worker_h, const uct_iface_params_t *, const uct_iface_config_t *); static UCS_CLASS_DEFINE_DELETE_FUNC(uct_sockcm_iface_t, uct_iface_t); static ucs_status_t uct_sockcm_query_tl_devices(uct_md_h md, uct_tl_device_resource_t **tl_devices_p, unsigned *num_tl_devices_p) { *num_tl_devices_p = 0; *tl_devices_p = NULL; return UCS_OK; } UCT_TL_DEFINE(&uct_sockcm_component, sockcm, uct_sockcm_query_tl_devices, uct_sockcm_iface_t, "SOCKCM_", uct_sockcm_iface_config_table, uct_sockcm_iface_config_t);