/**
* Copyright (C) Mellanox Technologies Ltd. 2019. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
#include "tcp.h"
#include <ucs/async/async.h>
void uct_tcp_cm_change_conn_state(uct_tcp_ep_t *ep,
uct_tcp_ep_conn_state_t new_conn_state)
{
int full_log = 1;
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
char str_local_addr[UCS_SOCKADDR_STRING_LEN];
char str_remote_addr[UCS_SOCKADDR_STRING_LEN];
char str_ctx_caps[UCT_TCP_EP_CTX_CAPS_STR_MAX];
uct_tcp_ep_conn_state_t old_conn_state;
old_conn_state = ep->conn_state;
ep->conn_state = new_conn_state;
switch(ep->conn_state) {
case UCT_TCP_EP_CONN_STATE_CONNECTING:
case UCT_TCP_EP_CONN_STATE_WAITING_ACK:
if (old_conn_state == UCT_TCP_EP_CONN_STATE_CLOSED) {
uct_tcp_iface_outstanding_inc(iface);
} else {
ucs_assert((ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING));
}
break;
case UCT_TCP_EP_CONN_STATE_WAITING_REQ:
ucs_assert(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK);
break;
case UCT_TCP_EP_CONN_STATE_CONNECTED:
ucs_assert((old_conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_ACCEPTING) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ));
if ((old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ) ||
/* It may happen when a peer is going to use this EP with socket
* from accepted connection in case of handling simultaneous
* connection establishment */
(old_conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING)) {
uct_tcp_iface_outstanding_dec(iface);
}
if (ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) {
/* Progress possibly pending TX operations */
uct_tcp_ep_pending_queue_dispatch(ep);
}
break;
case UCT_TCP_EP_CONN_STATE_CLOSED:
ucs_assert(old_conn_state != UCT_TCP_EP_CONN_STATE_CLOSED);
if ((old_conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ)) {
uct_tcp_iface_outstanding_dec(iface);
} else if ((old_conn_state == UCT_TCP_EP_CONN_STATE_ACCEPTING) ||
(old_conn_state == UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER)) {
/* Since ep::peer_addr is 0'ed, we have to print w/o peer's address */
full_log = 0;
}
break;
default:
ucs_assert((ep->conn_state == UCT_TCP_EP_CONN_STATE_ACCEPTING) ||
(ep->conn_state == UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER));
/* Since ep::peer_addr is 0'ed and client's <address:port>
* has already been logged, print w/o peer's address */
full_log = 0;
break;
}
if (full_log) {
ucs_debug("tcp_ep %p: %s -> %s for the [%s]<->[%s] connection %s",
ep, uct_tcp_ep_cm_state[old_conn_state].name,
uct_tcp_ep_cm_state[ep->conn_state].name,
ucs_sockaddr_str((const struct sockaddr*)&iface->config.ifaddr,
str_local_addr, UCS_SOCKADDR_STRING_LEN),
ucs_sockaddr_str((const struct sockaddr*)&ep->peer_addr,
str_remote_addr, UCS_SOCKADDR_STRING_LEN),
uct_tcp_ep_ctx_caps_str(ep->ctx_caps, str_ctx_caps));
} else {
ucs_debug("tcp_ep %p: %s -> %s",
ep, uct_tcp_ep_cm_state[old_conn_state].name,
uct_tcp_ep_cm_state[ep->conn_state].name);
}
}
static ucs_status_t uct_tcp_cm_io_err_handler_cb(void *arg, int io_errno)
{
return uct_tcp_ep_handle_dropped_connect((uct_tcp_ep_t*)arg,
io_errno);
}
/* `fmt_str` parameter has to contain "%s" to write event type */
static void uct_tcp_cm_trace_conn_pkt(const uct_tcp_ep_t *ep,
ucs_log_level_t log_level,
const char *fmt_str,
uct_tcp_cm_conn_event_t event)
{
char event_str[64] = { 0 };
char str_addr[UCS_SOCKADDR_STRING_LEN], msg[128], *p;
p = event_str;
if (event & UCT_TCP_CM_CONN_REQ) {
ucs_snprintf_zero(event_str, sizeof(event_str), "%s",
UCS_PP_MAKE_STRING(UCT_TCP_CM_CONN_REQ));
p += strlen(event_str);
}
if (event & UCT_TCP_CM_CONN_WAIT_REQ) {
ucs_assert(p == event_str);
ucs_snprintf_zero(event_str, sizeof(event_str), "%s",
UCS_PP_MAKE_STRING(UCT_TCP_CM_CONN_WAIT_REQ));
p += strlen(event_str);
}
if (event & UCT_TCP_CM_CONN_ACK) {
if (p != event_str) {
ucs_snprintf_zero(p, sizeof(event_str) - (p - event_str), " | ");
p += strlen(p);
}
ucs_snprintf_zero(p, sizeof(event_str) - (p - event_str), "%s",
UCS_PP_MAKE_STRING(UCT_TCP_CM_CONN_ACK));
p += strlen(event_str);
}
if (event_str == p) {
ucs_snprintf_zero(event_str, sizeof(event_str), "UNKNOWN (%d)", event);
log_level = UCS_LOG_LEVEL_ERROR;
}
ucs_snprintf_zero(msg, sizeof(msg), fmt_str, event_str);
ucs_log(log_level, "tcp_ep %p: %s %s", ep, msg,
ucs_sockaddr_str((const struct sockaddr*)&ep->peer_addr,
str_addr, UCS_SOCKADDR_STRING_LEN));
}
ucs_status_t uct_tcp_cm_send_event(uct_tcp_ep_t *ep, uct_tcp_cm_conn_event_t event)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
size_t magic_number_length = 0;
void *pkt_buf;
size_t pkt_length, cm_pkt_length;
uct_tcp_cm_conn_req_pkt_t *conn_pkt;
uct_tcp_cm_conn_event_t *pkt_event;
uct_tcp_am_hdr_t *pkt_hdr;
ucs_status_t status;
ucs_assertv(!(event & ~(UCT_TCP_CM_CONN_REQ |
UCT_TCP_CM_CONN_ACK |
UCT_TCP_CM_CONN_WAIT_REQ)),
"ep=%p", ep);
ucs_assertv(!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)) ||
(ep->conn_state != UCT_TCP_EP_CONN_STATE_CONNECTED),
"ep=%p", ep);
pkt_length = sizeof(*pkt_hdr);
if (event == UCT_TCP_CM_CONN_REQ) {
cm_pkt_length = sizeof(*conn_pkt);
if (ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) {
magic_number_length = sizeof(uint64_t);
}
} else {
cm_pkt_length = sizeof(event);
}
pkt_length += cm_pkt_length + magic_number_length;
pkt_buf = ucs_alloca(pkt_length);
pkt_hdr = (uct_tcp_am_hdr_t*)(UCS_PTR_BYTE_OFFSET(pkt_buf,
magic_number_length));
pkt_hdr->am_id = UCT_AM_ID_MAX;
pkt_hdr->length = cm_pkt_length;
if (event == UCT_TCP_CM_CONN_REQ) {
if (ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) {
ucs_assert(magic_number_length == sizeof(uint64_t));
*(uint64_t*)pkt_buf = UCT_TCP_MAGIC_NUMBER;
}
conn_pkt = (uct_tcp_cm_conn_req_pkt_t*)(pkt_hdr + 1);
conn_pkt->event = UCT_TCP_CM_CONN_REQ;
conn_pkt->iface_addr = iface->config.ifaddr;
} else {
pkt_event = (uct_tcp_cm_conn_event_t*)(pkt_hdr + 1);
*pkt_event = event;
}
status = ucs_socket_send(ep->fd, pkt_buf, pkt_length,
uct_tcp_cm_io_err_handler_cb, ep);
if (status != UCS_OK) {
uct_tcp_cm_trace_conn_pkt(ep, UCS_LOG_LEVEL_ERROR,
"unable to send %s to", event);
} else {
uct_tcp_cm_trace_conn_pkt(ep, UCS_LOG_LEVEL_TRACE,
"%s sent to", event);
}
return status;
}
ucs_status_t uct_tcp_cm_add_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep)
{
ucs_list_link_t *ep_list;
khiter_t iter;
int ret;
iter = kh_get(uct_tcp_cm_eps, &iface->ep_cm_map, ep->peer_addr);
if (iter == kh_end(&iface->ep_cm_map)) {
ep_list = ucs_malloc(sizeof(*ep_list), "tcp_ep_cm_map_entry");
if (ep_list == NULL) {
return UCS_ERR_NO_MEMORY;
}
ucs_list_head_init(ep_list);
iter = kh_put(uct_tcp_cm_eps, &iface->ep_cm_map, ep->peer_addr, &ret);
kh_value(&iface->ep_cm_map, iter) = ep_list;
ucs_debug("tcp_iface %p: %p list added to map", iface, ep_list);
} else {
ep_list = kh_value(&iface->ep_cm_map, iter);
ucs_assertv(!ucs_list_is_empty(ep_list), "iface=%p", iface);
}
uct_tcp_iface_remove_ep(ep);
ucs_list_add_tail(ep_list, &ep->list);
ucs_debug("tcp_iface %p: tcp_ep %p added to %p list",
iface, ep, ep_list);
return UCS_OK;
}
void uct_tcp_cm_remove_ep(uct_tcp_iface_t *iface, uct_tcp_ep_t *ep)
{
ucs_list_link_t *ep_list;
khiter_t iter;
iter = kh_get(uct_tcp_cm_eps, &iface->ep_cm_map, ep->peer_addr);
ucs_assertv(iter != kh_end(&iface->ep_cm_map), "iface=%p", iface);
ep_list = kh_value(&iface->ep_cm_map, iter);
ucs_assertv(!ucs_list_is_empty(ep_list), "iface=%p", iface);
ucs_list_del(&ep->list);
ucs_debug("tcp_iface %p: tcp_ep %p removed from %p list",
iface, ep, ep_list);
uct_tcp_iface_add_ep(ep);
if (ucs_list_is_empty(ep_list)) {
kh_del(uct_tcp_cm_eps, &iface->ep_cm_map, iter);
ucs_debug("tcp_iface %p: %p list removed from map",
iface, ep_list);
ucs_free(ep_list);
}
}
uct_tcp_ep_t *uct_tcp_cm_search_ep(uct_tcp_iface_t *iface,
const struct sockaddr_in *peer_addr,
uct_tcp_ep_ctx_type_t with_ctx_type)
{
uct_tcp_ep_t *ep;
ucs_list_link_t *ep_list;
khiter_t iter;
iter = kh_get(uct_tcp_cm_eps, &iface->ep_cm_map, *peer_addr);
if (iter != kh_end(&iface->ep_cm_map)) {
ep_list = kh_value(&iface->ep_cm_map, iter);
ucs_assertv(!ucs_list_is_empty(ep_list), "iface=%p", iface);
ucs_list_for_each(ep, ep_list, list) {
if (ep->ctx_caps & UCS_BIT(with_ctx_type)) {
return ep;
}
}
}
return NULL;
}
void uct_tcp_cm_purge_ep(uct_tcp_ep_t *ep)
{
/* Move from a khash's EP list to iface's EP list */
ucs_list_del(&ep->list);
uct_tcp_ep_change_ctx_caps(ep, 0);
uct_tcp_iface_add_ep(ep);
}
static unsigned
uct_tcp_cm_simult_conn_accept_remote_conn(uct_tcp_ep_t *accept_ep,
uct_tcp_ep_t *connect_ep)
{
uct_tcp_cm_conn_event_t event;
ucs_status_t status;
/* 1. Close the allocated socket `fd` to avoid reading any
* events for this socket and assign the socket `fd` returned
* from `accept()` to the found EP */
uct_tcp_ep_mod_events(connect_ep, 0, connect_ep->events);
ucs_assertv(connect_ep->events == 0,
"Requested epoll events must be 0-ed for ep=%p", connect_ep);
close(connect_ep->fd);
connect_ep->fd = accept_ep->fd;
/* 2. Migrate RX from the EP allocated during accepting connection to
* the found EP */
status = uct_tcp_ep_move_ctx_cap(accept_ep, connect_ep,
UCT_TCP_EP_CTX_TYPE_RX);
if (status != UCS_OK) {
return 0;
}
/* 3. The EP allocated during accepting connection has to be destroyed
* upon return from this function (set its socket `fd` to -1 prior
* to avoid closing this socket) */
uct_tcp_ep_mod_events(accept_ep, 0, UCS_EVENT_SET_EVREAD);
accept_ep->fd = -1;
accept_ep = NULL;
/* 4. Send ACK to the peer */
event = UCT_TCP_CM_CONN_ACK;
/* 5. - If found EP is still connecting, tie REQ with ACK and send
* it to the peer using new socket fd to ensure that the peer
* will be able to receive the data from us
* - If found EP is waiting ACK, tie WAIT_REQ with ACK and send
* it to the peer using new socket fd to ensure that the peer
* will wait for REQ and after receiving the REQ, peer will
* be able to receive the data from us */
if (connect_ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTING) {
event |= UCT_TCP_CM_CONN_REQ;
} else if (connect_ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_ACK) {
event |= UCT_TCP_CM_CONN_WAIT_REQ;
}
status = uct_tcp_cm_send_event(connect_ep, event);
if (status != UCS_OK) {
return 0;
}
/* 6. Now fully connected to the peer */
uct_tcp_ep_mod_events(connect_ep, UCS_EVENT_SET_EVREAD, 0);
uct_tcp_cm_change_conn_state(connect_ep, UCT_TCP_EP_CONN_STATE_CONNECTED);
return 1;
}
static unsigned uct_tcp_cm_handle_simult_conn(uct_tcp_iface_t *iface,
uct_tcp_ep_t *accept_ep,
uct_tcp_ep_t *connect_ep)
{
int accept_conn = 0;
unsigned progress_count = 0;
ucs_status_t status;
int cmp;
if ((connect_ep->conn_state != UCT_TCP_EP_CONN_STATE_CONNECTED) &&
(connect_ep->conn_state != UCT_TCP_EP_CONN_STATE_WAITING_REQ)) {
cmp = ucs_sockaddr_cmp((const struct sockaddr*)&connect_ep->peer_addr,
(const struct sockaddr*)&iface->config.ifaddr,
&status);
if (status != UCS_OK) {
return 0;
}
/* Accept connection from a peer if our iface
* address is greater than peer's one */
accept_conn = (cmp < 0);
}
if (!accept_conn) {
/* Migrate RX from the EP allocated during accepting connection to
* the found EP. */
status = uct_tcp_ep_move_ctx_cap(accept_ep, connect_ep,
UCT_TCP_EP_CTX_TYPE_RX);
if (status != UCS_OK) {
return 0;
}
if (connect_ep->conn_state == UCT_TCP_EP_CONN_STATE_WAITING_REQ) {
uct_tcp_cm_change_conn_state(connect_ep, UCT_TCP_EP_CONN_STATE_CONNECTED);
}
uct_tcp_ep_mod_events(connect_ep, UCS_EVENT_SET_EVREAD, 0);
} else /* our iface address less than remote && we are not connected */ {
/* Accept the remote connection and close the current one */
ucs_assertv(cmp != 0, "peer addresses for accepted tcp_ep %p and "
"found tcp_ep %p mustn't be equal", accept_ep, connect_ep);
progress_count = uct_tcp_cm_simult_conn_accept_remote_conn(accept_ep,
connect_ep);
}
return progress_count;
}
static unsigned
uct_tcp_cm_handle_conn_req(uct_tcp_ep_t **ep_p,
const uct_tcp_cm_conn_req_pkt_t *cm_req_pkt)
{
uct_tcp_ep_t *ep = *ep_p;
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
unsigned progress_count = 0;
ucs_status_t status;
uct_tcp_ep_t *peer_ep;
ep->peer_addr = cm_req_pkt->iface_addr;
uct_tcp_cm_trace_conn_pkt(ep, UCS_LOG_LEVEL_TRACE,
"%s received from", UCT_TCP_CM_CONN_REQ);
status = uct_tcp_ep_add_ctx_cap(ep, UCT_TCP_EP_CTX_TYPE_RX);
if (status != UCS_OK) {
goto out;
}
if (ep->conn_state == UCT_TCP_EP_CONN_STATE_CONNECTED) {
return 0;
}
ucs_assertv(!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)),
"ep %p mustn't have TX cap", ep);
if (!uct_tcp_ep_is_self(ep) &&
(peer_ep = uct_tcp_cm_search_ep(iface, &ep->peer_addr,
UCT_TCP_EP_CTX_TYPE_TX))) {
progress_count = uct_tcp_cm_handle_simult_conn(iface, ep, peer_ep);
ucs_assert(!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX)));
goto out;
} else {
/* Just accept this connection and make it operational for RX events */
status = uct_tcp_cm_send_event(ep, UCT_TCP_CM_CONN_ACK);
if (status != UCS_OK) {
goto out;
}
uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_CONNECTED);
progress_count = 1;
}
return progress_count;
out:
if (!(ep->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_TX))) {
uct_tcp_ep_destroy_internal(&ep->super.super);
*ep_p = NULL;
}
return progress_count;
}
void uct_tcp_cm_handle_conn_ack(uct_tcp_ep_t *ep, uct_tcp_cm_conn_event_t cm_event,
uct_tcp_ep_conn_state_t new_conn_state)
{
uct_tcp_cm_trace_conn_pkt(ep, UCS_LOG_LEVEL_TRACE,
"%s received from", cm_event);
if (ep->conn_state != new_conn_state) {
uct_tcp_cm_change_conn_state(ep, new_conn_state);
}
}
unsigned uct_tcp_cm_handle_conn_pkt(uct_tcp_ep_t **ep_p, void *pkt, uint32_t length)
{
ucs_status_t status;
uct_tcp_cm_conn_event_t cm_event;
uct_tcp_cm_conn_req_pkt_t *cm_req_pkt;
uct_tcp_ep_conn_state_t new_conn_state;
ucs_assertv(length >= sizeof(cm_event), "ep=%p", *ep_p);
cm_event = *((uct_tcp_cm_conn_event_t*)pkt);
switch (cm_event) {
case UCT_TCP_CM_CONN_REQ:
/* Don't trace received CM packet here, because
* EP doesn't contain the peer address */
ucs_assertv(length == sizeof(*cm_req_pkt), "ep=%p", *ep_p);
cm_req_pkt = (uct_tcp_cm_conn_req_pkt_t*)pkt;
return uct_tcp_cm_handle_conn_req(ep_p, cm_req_pkt);
case UCT_TCP_CM_CONN_ACK_WITH_WAIT_REQ:
if (!((*ep_p)->ctx_caps & UCS_BIT(UCT_TCP_EP_CTX_TYPE_RX))) {
new_conn_state = UCT_TCP_EP_CONN_STATE_WAITING_REQ;
} else {
new_conn_state = UCT_TCP_EP_CONN_STATE_CONNECTED;
}
uct_tcp_cm_handle_conn_ack(*ep_p, cm_event, new_conn_state);
return 0;
case UCT_TCP_CM_CONN_ACK_WITH_REQ:
status = uct_tcp_ep_add_ctx_cap(*ep_p, UCT_TCP_EP_CTX_TYPE_RX);
if (status != UCS_OK) {
return 0;
}
/* fall through */
case UCT_TCP_CM_CONN_ACK:
uct_tcp_cm_handle_conn_ack(*ep_p, cm_event,
UCT_TCP_EP_CONN_STATE_CONNECTED);
return 0;
case UCT_TCP_CM_CONN_WAIT_REQ:
ucs_error("tcp_ep %p: CM event for waiting REQ (%d) "
"must be sent along with ACK", *ep_p, cm_event);
return 0;
}
ucs_error("tcp_ep %p: unknown CM event received %d", *ep_p, cm_event);
return 0;
}
static ucs_status_t uct_tcp_cm_conn_complete(uct_tcp_ep_t *ep,
unsigned *progress_count_p)
{
ucs_status_t status;
status = uct_tcp_cm_send_event(ep, UCT_TCP_CM_CONN_REQ);
if (status != UCS_OK) {
goto out;
}
uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_WAITING_ACK);
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVREAD, 0);
ucs_assertv((ep->tx.length == 0) && (ep->tx.offset == 0) &&
(ep->tx.buf == NULL), "ep=%p", ep);
out:
if (progress_count_p != NULL) {
*progress_count_p = (status == UCS_OK);
}
return status;
}
unsigned uct_tcp_cm_conn_progress(uct_tcp_ep_t *ep)
{
unsigned progress_count;
if (!ucs_socket_is_connected(ep->fd)) {
ucs_error("tcp_ep %p: connection establishment for "
"socket fd %d was unsuccessful", ep, ep->fd);
goto err;
}
uct_tcp_cm_conn_complete(ep, &progress_count);
return progress_count;
err:
uct_tcp_ep_set_failed(ep);
return 0;
}
ucs_status_t uct_tcp_cm_conn_start(uct_tcp_ep_t *ep)
{
uct_tcp_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_tcp_iface_t);
ucs_status_t status;
if (ep->conn_retries++ > iface->config.max_conn_retries) {
ucs_error("tcp_ep %p: reached maximum number of connection retries "
"(%u)", ep, iface->config.max_conn_retries);
return UCS_ERR_TIMED_OUT;
}
uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_CONNECTING);
status = ucs_socket_connect(ep->fd, (const struct sockaddr*)&ep->peer_addr);
if (UCS_STATUS_IS_ERR(status)) {
return status;
} else if (status == UCS_INPROGRESS) {
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVWRITE, 0);
return UCS_OK;
}
ucs_assert(status == UCS_OK);
if (!iface->config.conn_nb) {
status = ucs_sys_fcntl_modfl(ep->fd, O_NONBLOCK, 0);
if (status != UCS_OK) {
return status;
}
}
return uct_tcp_cm_conn_complete(ep, NULL);
}
/* This function is called from async thread */
ucs_status_t uct_tcp_cm_handle_incoming_conn(uct_tcp_iface_t *iface,
const struct sockaddr_in *peer_addr,
int fd)
{
char str_local_addr[UCS_SOCKADDR_STRING_LEN];
char str_remote_addr[UCS_SOCKADDR_STRING_LEN];
ucs_status_t status;
uct_tcp_ep_t *ep;
if (!ucs_socket_is_connected(fd)) {
ucs_warn("tcp_iface %p: connection establishment for socket fd %d "
"from %s to %s was unsuccessful", iface, fd,
ucs_sockaddr_str((const struct sockaddr*)&peer_addr,
str_remote_addr, UCS_SOCKADDR_STRING_LEN),
ucs_sockaddr_str((const struct sockaddr*)&iface->config.ifaddr,
str_local_addr, UCS_SOCKADDR_STRING_LEN));
return UCS_ERR_UNREACHABLE;
}
status = uct_tcp_ep_init(iface, fd, NULL, &ep);
if (status != UCS_OK) {
return status;
}
uct_tcp_cm_change_conn_state(ep, UCT_TCP_EP_CONN_STATE_RECV_MAGIC_NUMBER);
uct_tcp_ep_mod_events(ep, UCS_EVENT_SET_EVREAD, 0);
ucs_debug("tcp_iface %p: accepted connection from "
"%s on %s to tcp_ep %p (fd %d)", iface,
ucs_sockaddr_str((const struct sockaddr*)peer_addr,
str_remote_addr, UCS_SOCKADDR_STRING_LEN),
ucs_sockaddr_str((const struct sockaddr*)&iface->config.ifaddr,
str_local_addr, UCS_SOCKADDR_STRING_LEN),
ep, fd);
return UCS_OK;
}