Blob Blame History Raw
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "wireup.h"
#include "address.h"
#include "wireup_cm.h"
#include "wireup_ep.h"

#include <ucp/core/ucp_ep.h>
#include <ucp/core/ucp_request.inl>
#include <ucp/core/ucp_proxy_ep.h>
#include <ucp/core/ucp_worker.h>
#include <ucp/core/ucp_listener.h>
#include <ucp/tag/eager.h>
#include <ucs/async/async.h>
#include <ucs/datastruct/queue.h>

/*
 * Description of the protocol in UCX wiki:
 * https://github.com/openucx/ucx/wiki/Connection-establishment
 */

static size_t ucp_wireup_msg_pack(void *dest, void *arg)
{
    ucp_request_t *req = arg;
    *(ucp_wireup_msg_t*)dest = req->send.wireup;
    memcpy((ucp_wireup_msg_t*)dest + 1, req->send.buffer, req->send.length);
    return sizeof(ucp_wireup_msg_t) + req->send.length;
}

static const char* ucp_wireup_msg_str(uint8_t msg_type)
{
    switch (msg_type) {
    case UCP_WIREUP_MSG_PRE_REQUEST:
        return "PRE_REQ";
    case UCP_WIREUP_MSG_REQUEST:
        return "REQ";
    case UCP_WIREUP_MSG_REPLY:
        return "REP";
    case UCP_WIREUP_MSG_ACK:
        return "ACK";
    default:
        return "<unknown>";
    }
}

static ucp_lane_index_t ucp_wireup_get_msg_lane(ucp_ep_h ep, uint8_t msg_type)
{
    ucp_context_h   context           = ep->worker->context;
    ucp_ep_config_t *ep_config        = ucp_ep_config(ep);
    ucp_lane_index_t lane             = UCP_NULL_LANE;

    if (msg_type != UCP_WIREUP_MSG_ACK) {
        /* for request/response, try wireup_lane first */
        lane = ep_config->key.wireup_lane;
    }

    if (lane == UCP_NULL_LANE) {
        /* fallback to active messages lane */
        lane = ep_config->key.am_lane;
    }

    if (lane == UCP_NULL_LANE) {
        ucs_fatal("ep %p to %s: could not find a lane to send CONN_%s%s",
                  ep, ucp_ep_peer_name(ep), ucp_wireup_msg_str(msg_type),
                  context->config.ext.unified_mode ?
                  ". try to set UCX_UNIFIED_MODE=n." : "");
    }

    return lane;
}

ucs_status_t ucp_wireup_msg_progress(uct_pending_req_t *self)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
    ucp_ep_h ep = req->send.ep;
    ssize_t packed_len;
    unsigned am_flags;

    if (req->send.wireup.type == UCP_WIREUP_MSG_REQUEST) {
        if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
            ucs_trace("ep %p: not sending wireup message - remote already connected",
                      ep);
            goto out;
        }
    } else if (req->send.wireup.type == UCP_WIREUP_MSG_PRE_REQUEST) {
        ucs_assert (!(ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED));
    }

    /* send the active message */
    req->send.lane = ucp_wireup_get_msg_lane(ep, req->send.wireup.type);

    am_flags = 0;
    if ((req->send.wireup.type == UCP_WIREUP_MSG_REQUEST) ||
        (req->send.wireup.type == UCP_WIREUP_MSG_PRE_REQUEST)) {
        am_flags |= UCT_SEND_FLAG_SIGNALED;
    }

    VALGRIND_CHECK_MEM_IS_DEFINED(&req->send.wireup, sizeof(req->send.wireup));
    VALGRIND_CHECK_MEM_IS_DEFINED(req->send.buffer, req->send.length);

    packed_len = uct_ep_am_bcopy(ep->uct_eps[req->send.lane], UCP_AM_ID_WIREUP,
                                 ucp_wireup_msg_pack, req, am_flags);
    if (packed_len < 0) {
        if (packed_len != UCS_ERR_NO_RESOURCE) {
            ucs_error("failed to send wireup: %s",
                      ucs_status_string((ucs_status_t)packed_len));
        }
        return (ucs_status_t)packed_len;
    }

    switch (req->send.wireup.type) {
    case UCP_WIREUP_MSG_PRE_REQUEST:
        ep->flags |= UCP_EP_FLAG_CONNECT_PRE_REQ_SENT;
        break;
    case UCP_WIREUP_MSG_REQUEST:
        ep->flags |= UCP_EP_FLAG_CONNECT_REQ_SENT;
        break;
    case UCP_WIREUP_MSG_REPLY:
        ep->flags |= UCP_EP_FLAG_CONNECT_REP_SENT;
        break;
    case UCP_WIREUP_MSG_ACK:
        ep->flags |= UCP_EP_FLAG_CONNECT_ACK_SENT;
        break;
    }

out:
    ucs_free((void*)req->send.buffer);
    ucs_free(req);
    return UCS_OK;
}

static inline int ucp_wireup_is_ep_needed(ucp_ep_h ep)
{
    return (ep != NULL) && !(ep->flags & UCP_EP_FLAG_LISTENER);
}

/*
 * @param [in] rsc_tli  Resource index for every lane.
 */
static ucs_status_t
ucp_wireup_msg_send(ucp_ep_h ep, uint8_t type, uint64_t tl_bitmap,
                    const ucp_lane_index_t *lanes2remote)
{
    ucp_request_t* req;
    ucs_status_t status;
    void *address;

    ucs_assert(ep->cfg_index != (uint8_t)-1);

    /* We cannot allocate from memory pool because it's not thread safe
     * and this function may be called from any thread
     */
    req = ucs_malloc(sizeof(*req), "wireup_msg_req");
    if (req == NULL) {
        return UCS_ERR_NO_MEMORY;
    }

    req->flags                   = 0;
    req->send.ep                 = ep;
    req->send.wireup.type        = type;
    req->send.wireup.err_mode    = ucp_ep_config(ep)->key.err_mode;
    req->send.wireup.conn_sn     = ep->conn_sn;
    req->send.wireup.src_ep_ptr  = (uintptr_t)ep;
    if (ep->flags & UCP_EP_FLAG_DEST_EP) {
        req->send.wireup.dest_ep_ptr = ucp_ep_dest_ep_ptr(ep);
    } else {
        req->send.wireup.dest_ep_ptr = 0;
    }

    req->send.uct.func           = ucp_wireup_msg_progress;
    req->send.datatype           = ucp_dt_make_contig(1);
    ucp_request_send_state_init(req, ucp_dt_make_contig(1), 0);

    /* pack all addresses */
    status = ucp_address_pack(ep->worker,
                              ucp_wireup_is_ep_needed(ep) ? ep : NULL,
                              tl_bitmap, UCP_ADDRESS_PACK_FLAG_ALL,
                              lanes2remote, &req->send.length, &address);
    if (status != UCS_OK) {
        ucs_free(req);
        ucs_error("failed to pack address: %s", ucs_status_string(status));
        return status;
    }

    req->send.buffer = address;

    ucp_request_send(req, 0);
    return UCS_OK;
}

static uint64_t ucp_wireup_get_ep_tl_bitmap(ucp_ep_h ep, ucp_lane_map_t lane_map)
{
    uint64_t         tl_bitmap = 0;
    ucp_lane_index_t lane;

    ucs_for_each_bit(lane, lane_map) {
        ucs_assert(lane < UCP_MAX_LANES);
        tl_bitmap |= UCS_BIT(ucp_ep_get_rsc_index(ep, lane));
    }

    return tl_bitmap;
}

/*
 * Select remote ep address for every remote address entry (because there
 * could be multiple ep addresses per entry). This selection is used to create
 * 'lanes2remote' mapping with the remote lane index for each local lane.
 */
static void
ucp_wireup_match_p2p_lanes(ucp_ep_h ep,
                           const ucp_unpacked_address_t *remote_address,
                           const unsigned *addr_indices,
                           ucp_lane_index_t *lanes2remote)
{
    const ucp_address_entry_t *address;
    unsigned address_index;
    ucp_lane_index_t lane, remote_lane;
    unsigned *ep_addr_indexes;
    unsigned ep_addr_index;
    uint64_t UCS_V_UNUSED used_remote_lanes;

    /* Initialize the counters of ep address index for each address entry */
    ep_addr_indexes = ucs_alloca(sizeof(ep_addr_index) *
                                 remote_address->address_count);
    for (address_index = 0; address_index < remote_address->address_count;
         ++address_index) {
        ep_addr_indexes[address_index] = 0;
    }

    /* Initialize lanes2remote array */
    for (lane = 0; lane < UCP_MAX_LANES; ++lane) {
        lanes2remote[lane] = UCP_NULL_LANE;
    }

    used_remote_lanes = 0;
    for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
        if (!ucp_ep_is_lane_p2p(ep, lane)) {
            continue;
        }

        /* Select next remote ep address within the address_index as specified
         * by addr_indices argument
         */
        address_index      = addr_indices[lane];
        address            = &remote_address->address_list[address_index];
        ep_addr_index      = ep_addr_indexes[address_index]++;
        remote_lane        = address->ep_addrs[ep_addr_index].lane;
        lanes2remote[lane] = remote_lane;

        if (used_remote_lanes & UCS_BIT(remote_lane)) {
            ucs_fatal("ep %p: remote lane %d is used more than once", ep,
                      remote_lane);
        }
        used_remote_lanes |= UCS_BIT(remote_lane);

        ucs_trace("ep %p: lane[%d]->remote_lane[%d] (address[%d].ep_address[%d])",
                  ep, lane, remote_lane, address_index, ep_addr_index);
    }
}

static ucs_status_t
ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane,
                               const ucp_unpacked_address_t *remote_address,
                               const uct_ep_addr_t **ep_addr_p,
                               const uct_device_addr_t **dev_addr_p)
{
    const ucp_address_entry_t *address;
    unsigned ep_addr_index;

    ucp_unpacked_address_for_each(address, remote_address) {
        for (ep_addr_index = 0; ep_addr_index < address->num_ep_addrs;
             ++ep_addr_index) {
            if (remote_lane == address->ep_addrs[ep_addr_index].lane) {
                *ep_addr_p  = address->ep_addrs[ep_addr_index].addr;
                *dev_addr_p = address->dev_addr;
                return UCS_OK;
            }
        }
    }

    return UCS_ERR_UNREACHABLE;
}

ucs_status_t
ucp_wireup_connect_local(ucp_ep_h ep,
                         const ucp_unpacked_address_t *remote_address,
                         const ucp_lane_index_t *lanes2remote)
{
    ucp_lane_index_t lane, remote_lane;
    const uct_device_addr_t *dev_addr;
    const uct_ep_addr_t *ep_addr;
    ucs_status_t status;

    ucs_trace("ep %p: connect local transports", ep);

    for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
        if (!ucp_ep_is_lane_p2p(ep, lane)) {
            continue;
        }

        remote_lane = (lanes2remote == NULL) ? lane : lanes2remote[lane];

        status = ucp_wireup_find_remote_p2p_addr(ep, remote_lane, remote_address,
                                                 &ep_addr, &dev_addr);
        if (status != UCS_OK) {
            ucs_error("ep %p: no remote ep address for lane[%d]->remote_lane[%d]",
                      ep, lane, remote_lane);
           return status;
        }

        status = uct_ep_connect_to_ep(ep->uct_eps[lane], dev_addr, ep_addr);
        if (status != UCS_OK) {
            return status;
        }
    }

    return UCS_OK;
}

void ucp_wireup_remote_connected(ucp_ep_h ep)
{
    ucp_lane_index_t lane;

    if (ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED) {
        return;
    }

    ucs_trace("ep %p: remote connected", ep);
    ep->flags |= UCP_EP_FLAG_REMOTE_CONNECTED;

    for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
        if (ucp_ep_is_lane_p2p(ep, lane)) {
            ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane]));
        }
        if (ucp_wireup_ep_test(ep->uct_eps[lane])) {
            ucp_wireup_ep_remote_connected(ep->uct_eps[lane]);
        }
    }

    ucs_assert(ep->flags & UCP_EP_FLAG_DEST_EP);
}


static ucs_status_t
ucp_wireup_init_lanes_by_request(ucp_worker_h worker, ucp_ep_h ep,
                                 unsigned ep_init_flags,
                                 const ucp_unpacked_address_t *remote_address,
                                 unsigned *addr_indices)
{
    ucs_status_t status = ucp_wireup_init_lanes(ep, ep_init_flags, UINT64_MAX,
                                                remote_address, addr_indices);
    if (status == UCS_OK) {
        return UCS_OK;
    }

    ucp_worker_set_ep_failed(worker, ep, NULL, UCP_NULL_LANE, status);
    return status;
}


static UCS_F_NOINLINE void
ucp_wireup_process_pre_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
                               const ucp_unpacked_address_t *remote_address)
{
    unsigned ep_init_flags = UCP_EP_INIT_CREATE_AM_LANE;
    unsigned addr_indices[UCP_MAX_LANES];
    ucs_status_t status;
    ucp_ep_h ep;

    ucs_assert(msg->type == UCP_WIREUP_MSG_PRE_REQUEST);
    ucs_assert(msg->dest_ep_ptr != 0);
    ucs_trace("got wireup pre_request from 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d",
              remote_address->uuid, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn);

    /* wireup pre_request for a specific ep */
    ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);
    ucs_assert(ep->flags & UCP_EP_FLAG_SOCKADDR_PARTIAL_ADDR);

    ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);
    ucp_ep_flush_state_reset(ep);

    if (ucp_ep_config(ep)->key.err_mode == UCP_ERR_HANDLING_MODE_PEER) {
        ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
    }

    /* initialize transport endpoints */
    status = ucp_wireup_init_lanes_by_request(worker, ep, ep_init_flags,
                                              remote_address, addr_indices);
    if (status != UCS_OK) {
        return;
    }

    status = ucp_wireup_send_request(ep);
    if (status != UCS_OK) {
        ucp_ep_cleanup_lanes(ep);
    }
}

static UCS_F_NOINLINE void
ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
                           const ucp_unpacked_address_t *remote_address)
{
    uint64_t remote_uuid   = remote_address->uuid;
    uint64_t tl_bitmap     = 0;
    int send_reply         = 0;
    unsigned ep_init_flags = 0;
    ucp_rsc_index_t lanes2remote[UCP_MAX_LANES];
    unsigned addr_indices[UCP_MAX_LANES];
    ucs_status_t status;
    ucp_ep_flags_t listener_flag;
    ucp_ep_h ep;

    ucs_assert(msg->type == UCP_WIREUP_MSG_REQUEST);
    ucs_trace("got wireup request from 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d",
              remote_address->uuid, msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn);

    if (msg->dest_ep_ptr != 0) {
        /* wireup request for a specific ep */
        ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);
        ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);
        if (!(ep->flags & UCP_EP_FLAG_LISTENER)) {
            /* Reset flush state only if it's not a client-server wireup on
             * server side with long address exchange when listener (united with
             * flush state) should be valid until user's callback invoking */
            ucp_ep_flush_state_reset(ep);
        }
        ep_init_flags |= UCP_EP_INIT_CREATE_AM_LANE;
    } else {
        ep = ucp_ep_match_retrieve_exp(&worker->ep_match_ctx, remote_uuid,
                                       msg->conn_sn ^ (remote_uuid == worker->uuid));
        if (ep == NULL) {
            /* Create a new endpoint if does not exist */
            status = ucp_ep_new(worker, remote_address->name, "remote-request",
                                &ep);
            if (status != UCS_OK) {
                return;
            }

            /* add internal endpoint to hash */
            ep->conn_sn = msg->conn_sn;
            ucp_ep_match_insert_unexp(&worker->ep_match_ctx, remote_uuid, ep);
        } else {
            ucp_ep_flush_state_reset(ep);
        }

        ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);

        /*
         * If the current endpoint already sent a connection request, we have a
         * "simultaneous connect" situation. In this case, only one of the endpoints
         * (instead of both) should respect the connect request, otherwise they
         * will end up being connected to "internal" endpoints on the remote side
         * instead of each other. We use the uniqueness of worker uuid to decide
         * which connect request should be ignored.
         */
        if ((ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED) && (remote_uuid > worker->uuid)) {
            ucs_trace("ep %p: ignoring simultaneous connect request", ep);
            ep->flags |= UCP_EP_FLAG_CONNECT_REQ_IGNORED;
            return;
        }
    }

    if (ep->flags & UCP_EP_FLAG_LISTENER) {
        /* If this is an ep on a listener (server) that received a partial
         * worker address from the client, then the following lanes initialization
         * will be done after an aux lane was already created on this ep.
         * Therefore, remove the existing aux endpoint since will need to create
         * new lanes now */
        ucp_ep_cleanup_lanes(ep);
    }

    if (msg->err_mode == UCP_ERR_HANDLING_MODE_PEER) {
        ep_init_flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
    }

    /* Initialize lanes (possible destroy existing lanes) */
    status = ucp_wireup_init_lanes_by_request(worker, ep, ep_init_flags,
                                              remote_address, addr_indices);
    if (status != UCS_OK) {
        return;
    }

    ucp_wireup_match_p2p_lanes(ep, remote_address, addr_indices, lanes2remote);

    /* Send a reply if remote side does not have ep_ptr (active-active flow) or
     * there are p2p lanes (client-server flow)
     */
    send_reply = (msg->dest_ep_ptr == 0) || ucp_ep_config(ep)->p2p_lanes;

    /* Connect p2p addresses to remote endpoint */
    if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) {
        status = ucp_wireup_connect_local(ep, remote_address, lanes2remote);
        if (status != UCS_OK) {
            return;
        }

        tl_bitmap  = ucp_wireup_get_ep_tl_bitmap(ep,
                                                 ucp_ep_config(ep)->p2p_lanes);
        ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;

        ucs_assert(send_reply);
    }

    /* mark the endpoint as connected to remote */
    if (!ucp_ep_config(ep)->p2p_lanes) {
        ucp_wireup_remote_connected(ep);
    }

    if (send_reply) {

        listener_flag = ep->flags & UCP_EP_FLAG_LISTENER;
        /* Remove this flag at this point if it's set
         * (so that address packing would be correct) */
        ep->flags &= ~UCP_EP_FLAG_LISTENER;

        ucs_trace("ep %p: sending wireup reply", ep);
        status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REPLY, tl_bitmap,
                                     lanes2remote);
        if (status != UCS_OK) {
            return;
        }

        /* Restore saved flag value */
        ep->flags |= listener_flag;
    } else {
        /* if in client-server flow, schedule invoking the user's callback
         * (if server is connected) from the main thread */
        if (ucs_test_all_flags(ep->flags,
                               (UCP_EP_FLAG_LISTENER | UCP_EP_FLAG_LOCAL_CONNECTED))) {
            ucp_listener_schedule_accept_cb(ep);
        }
    }
}

static unsigned ucp_wireup_send_msg_ack(void *arg)
{
    ucp_ep_h ep = (ucp_ep_h)arg;
    ucp_rsc_index_t rsc_tli[UCP_MAX_LANES];
    ucs_status_t status;

    /* Send ACK without any address, we've already sent it as part of the request */
    ucs_trace("ep %p: sending wireup ack", ep);

    memset(rsc_tli, UCP_NULL_RESOURCE, sizeof(rsc_tli));
    status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_ACK, 0, rsc_tli);
    return (status == UCS_OK);
}

int ucp_wireup_msg_ack_cb_pred(const ucs_callbackq_elem_t *elem, void *arg)
{
    return ((elem->arg == arg) && (elem->cb == ucp_wireup_send_msg_ack));
}

static UCS_F_NOINLINE void
ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg,
                         const ucp_unpacked_address_t *remote_address)
{
    uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL;
    ucs_status_t status;
    ucp_ep_h ep;
    int ack;

    ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);

    ucs_assert(msg->type == UCP_WIREUP_MSG_REPLY);
    ucs_assert((!(ep->flags & UCP_EP_FLAG_LISTENER)));
    ucs_trace("ep %p: got wireup reply src_ep 0x%lx dst_ep 0x%lx sn %d", ep,
              msg->src_ep_ptr, msg->dest_ep_ptr, msg->conn_sn);

    ucp_ep_match_remove_ep(&worker->ep_match_ctx, ep);
    ucp_ep_update_dest_ep_ptr(ep, msg->src_ep_ptr);
    ucp_ep_flush_state_reset(ep);

    /* Connect p2p addresses to remote endpoint */
    if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) {

        /*
         * In the wireup reply message, the lane indexes specify which
         * **receiver** ep lane should be connected to a given ep address. So we
         * don't pass 'lanes2remote' mapping, and use local lanes directly.
         */
        status = ucp_wireup_connect_local(ep, remote_address, NULL);
        if (status != UCS_OK) {
            return;
        }

        ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
        ack = 1;
    } else {
        ack = 0;
    }

    ucp_wireup_remote_connected(ep);

    if (ack) {
        /* Send `UCP_WIREUP_MSG_ACK` from progress function
         * to avoid calling UCT routines from an async thread */
        uct_worker_progress_register_safe(worker->uct,
                                          ucp_wireup_send_msg_ack, ep,
                                          UCS_CALLBACKQ_FLAG_ONESHOT, &cb_id);
    }
}

static UCS_F_NOINLINE
void ucp_wireup_process_ack(ucp_worker_h worker, const ucp_wireup_msg_t *msg)
{
    ucp_ep_h ep;

    ep = ucp_worker_get_ep_by_ptr(worker, msg->dest_ep_ptr);

    ucs_assert(msg->type == UCP_WIREUP_MSG_ACK);
    ucs_trace("ep %p: got wireup ack", ep);

    ucs_assert(ep->flags & UCP_EP_FLAG_DEST_EP);
    ucs_assert(ep->flags & UCP_EP_FLAG_CONNECT_REP_SENT);
    ucs_assert(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED);

    ucp_wireup_remote_connected(ep);

    /* if this ack is received as part of the client-server flow, when handling
     * a large worker address from the client, invoke the cached user callback
     * from the main thread */
    if (ep->flags & UCP_EP_FLAG_LISTENER) {
        ucp_listener_schedule_accept_cb(ep);
    }
}

static ucs_status_t ucp_wireup_msg_handler(void *arg, void *data,
                                           size_t length, unsigned flags)
{
    ucp_worker_h worker   = arg;
    ucp_wireup_msg_t *msg = data;
    ucp_unpacked_address_t remote_address;
    ucs_status_t status;

    UCS_ASYNC_BLOCK(&worker->async);

    status = ucp_address_unpack(worker, msg + 1, UINT64_MAX, &remote_address);
    if (status != UCS_OK) {
        ucs_error("failed to unpack address: %s", ucs_status_string(status));
        goto out;
    }

    if (msg->type == UCP_WIREUP_MSG_ACK) {
        ucs_assert(remote_address.address_count == 0);
        ucp_wireup_process_ack(worker, msg);
    } else if (msg->type == UCP_WIREUP_MSG_PRE_REQUEST) {
        ucp_wireup_process_pre_request(worker, msg, &remote_address);
    } else if (msg->type == UCP_WIREUP_MSG_REQUEST) {
        ucp_wireup_process_request(worker, msg, &remote_address);
    } else if (msg->type == UCP_WIREUP_MSG_REPLY) {
        ucp_wireup_process_reply(worker, msg, &remote_address);
    } else {
        ucs_bug("invalid wireup message");
    }

    ucs_free(remote_address.address_list);

out:
    UCS_ASYNC_UNBLOCK(&worker->async);
    return UCS_OK;
}

void ucp_wireup_assign_lane(ucp_ep_h ep, ucp_lane_index_t lane, uct_ep_h uct_ep,
                            const char *info)
{
    /* If ep already exists, it's a wireup proxy, and we need to update its
     * next_ep instead of replacing it.
     */
    if (ep->uct_eps[lane] == NULL) {
        ucs_trace("ep %p: assign uct_ep[%d]=%p%s", ep, lane, uct_ep, info);
        ep->uct_eps[lane] = uct_ep;
    } else {
        ucs_assert(ucp_wireup_ep_test(ep->uct_eps[lane]));
        ucs_trace("ep %p: wireup uct_ep[%d]=%p next set to %p%s", ep, lane,
                  ep->uct_eps[lane], uct_ep, info);
        ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);
        ucp_wireup_ep_remote_connected(ep->uct_eps[lane]);
    }
}

static uct_ep_h ucp_wireup_extract_lane(ucp_ep_h ep, ucp_lane_index_t lane)
{
    uct_ep_h uct_ep = ep->uct_eps[lane];

    if ((uct_ep != NULL) && ucp_wireup_ep_test(uct_ep)) {
        return ucp_wireup_ep_extract_next_ep(uct_ep);
    } else {
        ep->uct_eps[lane] = NULL;
        return uct_ep;
    }
}

ucs_status_t
ucp_wireup_connect_lane(ucp_ep_h ep, unsigned ep_init_flags,
                        ucp_lane_index_t lane,
                        const ucp_unpacked_address_t *remote_address,
                        unsigned addr_index)
{
    ucp_worker_h worker = ep->worker;
    int connect_aux;
    ucp_lane_index_t proxy_lane;
    ucp_rsc_index_t rsc_index;
    ucp_worker_iface_t *wiface;
    uct_ep_params_t uct_ep_params;
    uct_ep_h uct_ep;
    ucs_status_t status;

    ucs_trace("ep %p: connect lane[%d]", ep, lane);

    ucs_assert(lane != ucp_ep_get_cm_lane(ep));

    ucs_assert_always(remote_address != NULL);
    ucs_assert_always(remote_address->address_list != NULL);
    ucs_assert_always(addr_index <= remote_address->address_count);

    proxy_lane = ucp_ep_get_proxy_lane(ep, lane);
    rsc_index  = ucp_ep_get_rsc_index(ep, lane);
    wiface     = ucp_worker_iface(worker, rsc_index);

    /*
     * if the selected transport can be connected directly to the remote
     * interface, just create a connected UCT endpoint.
     */
    if ((wiface->attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE) &&
        ((ep->uct_eps[lane] == NULL) || ucp_wireup_ep_test(ep->uct_eps[lane])))
    {
        if ((proxy_lane == UCP_NULL_LANE) || (proxy_lane == lane)) {
            /* create an endpoint connected to the remote interface */
            ucs_trace("ep %p: connect uct_ep[%d] to addr[%d]", ep, lane,
                      addr_index);
            uct_ep_params.field_mask = UCT_EP_PARAM_FIELD_IFACE    |
                                       UCT_EP_PARAM_FIELD_DEV_ADDR |
                                       UCT_EP_PARAM_FIELD_IFACE_ADDR;
            uct_ep_params.iface      = wiface->iface;
            uct_ep_params.dev_addr   = remote_address->address_list[addr_index].dev_addr;
            uct_ep_params.iface_addr = remote_address->address_list[addr_index].iface_addr;
            status = uct_ep_create(&uct_ep_params, &uct_ep);
            if (status != UCS_OK) {
                /* coverity[leaked_storage] */
                return status;
            }

            ucp_wireup_assign_lane(ep, lane, uct_ep, "");
        }

        ucp_worker_iface_progress_ep(wiface);
        return UCS_OK;
    }

    /*
     * create a wireup endpoint which will start connection establishment
     * protocol using an auxiliary transport.
     */
    if (wiface->attr.cap.flags & UCT_IFACE_FLAG_CONNECT_TO_EP) {

        /* For now, p2p transports have no reason to have proxy */
        ucs_assert_always(proxy_lane == UCP_NULL_LANE);

        if (ep->uct_eps[lane] == NULL) {
            status = ucp_wireup_ep_create(ep, &uct_ep);
            if (status != UCS_OK) {
                /* coverity[leaked_storage] */
                return status;
            }

            ucs_trace("ep %p: assign uct_ep[%d]=%p wireup", ep, lane, uct_ep);
            ep->uct_eps[lane] = uct_ep;
        } else {
            uct_ep = ep->uct_eps[lane];
            ucs_assert(ucp_wireup_ep_test(uct_ep));
        }

        if (!(ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT))) {
            ucs_trace("ep %p: connect uct_ep[%d]=%p to addr[%d] wireup", ep,
                      lane, uct_ep, addr_index);
            connect_aux = !(ep_init_flags & (UCP_EP_INIT_CM_WIREUP_CLIENT |
                                             UCP_EP_INIT_CM_WIREUP_SERVER)) &&
                          (lane == ucp_ep_get_wireup_msg_lane(ep));
            status = ucp_wireup_ep_connect(ep->uct_eps[lane], ep_init_flags,
                                           rsc_index, connect_aux,
                                           remote_address);
            if (status != UCS_OK) {
                return status;
            }
        }

        ucp_worker_iface_progress_ep(wiface);

        return UCS_OK;
    }

    return UCS_ERR_UNREACHABLE;
}

ucs_status_t ucp_wireup_resolve_proxy_lanes(ucp_ep_h ep)
{
    ucp_lane_index_t lane, proxy_lane;
    uct_iface_attr_t *iface_attr;
    uct_ep_h uct_ep, signaling_ep;
    ucs_status_t status;

    /* point proxy endpoints */
    for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
        proxy_lane = ucp_ep_get_proxy_lane(ep, lane);
        if (proxy_lane == UCP_NULL_LANE) {
            continue;
        }

        iface_attr = ucp_worker_iface_get_attr(ep->worker,
                                               ucp_ep_get_rsc_index(ep, lane));

        if (iface_attr->cap.flags & UCT_IFACE_FLAG_AM_SHORT) {
            ucs_assert_always(iface_attr->cap.am.max_short <=
                              iface_attr->cap.am.max_bcopy);
        }

        /* Create a signaling ep to the proxy lane */
        if (proxy_lane == lane) {
            /* If proxy is to the same lane, temporarily remove the existing
             * UCT endpoint in there, so it could be assigned to the signaling
             * proxy ep. This can also be an endpoint contained inside a wireup
             * proxy, so ucp_wireup_extract_lane() handles both cases.
             */
            uct_ep = ucp_wireup_extract_lane(ep, proxy_lane);
            ucs_assert_always(uct_ep != NULL);
            status = ucp_signaling_ep_create(ep, uct_ep, 1, &signaling_ep);
            if (status != UCS_OK) {
                /* coverity[leaked_storage] */
                uct_ep_destroy(uct_ep);
                return status;
            }
        } else {
            status = ucp_signaling_ep_create(ep, ep->uct_eps[proxy_lane], 0,
                                             &signaling_ep);
            if (status != UCS_OK) {
                /* coverity[leaked_storage] */
                return status;
            }
        }

        ucs_trace("ep %p: lane[%d]=%p proxy_lane=%d", ep, lane, ep->uct_eps[lane],
                  proxy_lane);

        ucp_wireup_assign_lane(ep, lane, signaling_ep, " (signaling proxy)");
    }

    return UCS_OK;
}

static void ucp_wireup_print_config(ucp_context_h context,
                                    const ucp_ep_config_key_t *key,
                                    const char *title,
                                    const unsigned *addr_indices,
                                    ucs_log_level_t log_level)
{
    char lane_info[128] = {0};
    ucp_lane_index_t lane;

    if (!ucs_log_is_enabled(log_level)) {
        return;
    }

    ucs_log(log_level, "%s: am_lane %d wireup_lane %d reachable_mds 0x%lx",
              title, key->am_lane, key->wireup_lane,
              key->reachable_md_map);

    for (lane = 0; lane < key->num_lanes; ++lane) {
        ucp_ep_config_lane_info_str(context, key, addr_indices, lane,
                                    UCP_NULL_RESOURCE, lane_info,
                                    sizeof(lane_info));
        ucs_log(log_level, "%s: %s", title, lane_info);
    }
}

int ucp_wireup_is_reachable(ucp_worker_h worker, ucp_rsc_index_t rsc_index,
                            const ucp_address_entry_t *ae)
{
    ucp_context_h context      = worker->context;
    ucp_worker_iface_t *wiface = ucp_worker_iface(worker, rsc_index);

    return (context->tl_rscs[rsc_index].tl_name_csum == ae->tl_name_csum) &&
           uct_iface_is_reachable(wiface->iface, ae->dev_addr, ae->iface_addr);
}

static void
ucp_wireup_get_reachable_mds(ucp_worker_h worker,
                             const ucp_unpacked_address_t *remote_address,
                             const ucp_ep_config_key_t *prev_key,
                             ucp_ep_config_key_t *key)
{
    ucp_context_h context = worker->context;
    ucp_rsc_index_t ae_cmpts[UCP_MAX_MDS]; /* component index for each address entry */
    const ucp_address_entry_t *ae;
    ucp_rsc_index_t cmpt_index;
    ucp_rsc_index_t rsc_index;
    ucp_md_index_t dst_md_index;
    ucp_md_map_t ae_dst_md_map, dst_md_map;
    unsigned num_dst_mds;

    ae_dst_md_map = 0;
    ucs_for_each_bit(rsc_index, context->tl_bitmap) {
        ucp_unpacked_address_for_each(ae, remote_address) {
            if (ucp_wireup_is_reachable(worker, rsc_index, ae)) {
                ae_dst_md_map         |= UCS_BIT(ae->md_index);
                dst_md_index           = context->tl_rscs[rsc_index].md_index;
                ae_cmpts[ae->md_index] = context->tl_mds[dst_md_index].cmpt_index;
            }
        }
    }

    /* merge with previous configuration */
    dst_md_map  = ae_dst_md_map | prev_key->reachable_md_map;
    num_dst_mds = 0;
    ucs_for_each_bit(dst_md_index, dst_md_map) {
        cmpt_index = UCP_NULL_RESOURCE;
        /* remote md is reachable by the provided address */
        if (UCS_BIT(dst_md_index) & ae_dst_md_map) {
            cmpt_index = ae_cmpts[dst_md_index];
        }
        /* remote md is reachable by previous ep configuration */
        if (UCS_BIT(dst_md_index) & prev_key->reachable_md_map) {
            cmpt_index = ucp_ep_config_get_dst_md_cmpt(prev_key, dst_md_index);
            if (UCS_BIT(dst_md_index) & ae_dst_md_map) {
                /* we expect previous configuration will not conflict with the
                 * new one
                 */
                ucs_assert_always(cmpt_index == ae_cmpts[dst_md_index]);
            }
        }
        ucs_assert_always(cmpt_index != UCP_NULL_RESOURCE);
        key->dst_md_cmpts[num_dst_mds++] = cmpt_index;
    }
    ucs_assert(num_dst_mds == ucs_popcount(dst_md_map));

    key->reachable_md_map = dst_md_map;
}

ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags,
                                   uint64_t local_tl_bitmap,
                                   const ucp_unpacked_address_t *remote_address,
                                   unsigned *addr_indices)
{
    ucp_worker_h worker = ep->worker;
    uint64_t tl_bitmap  = local_tl_bitmap & worker->context->tl_bitmap;
    ucp_ep_config_key_t key;
    ucp_ep_cfg_index_t new_cfg_index;
    ucp_lane_index_t lane;
    ucs_status_t status;
    char str[32];
    ucp_wireup_ep_t *cm_wireup_ep;

    ucs_assert(tl_bitmap != 0);

    ucs_trace("ep %p: initialize lanes", ep);

    ucp_ep_config_key_reset(&key);
    ucp_ep_config_key_set_err_mode(&key, ep_init_flags);

    status = ucp_wireup_select_lanes(ep, ep_init_flags, tl_bitmap,
                                     remote_address, addr_indices, &key);
    if (status != UCS_OK) {
        return status;
    }

    /* Get all reachable MDs from full remote address list */
    key.dst_md_cmpts = ucs_alloca(sizeof(*key.dst_md_cmpts) * UCP_MAX_MDS);
    ucp_wireup_get_reachable_mds(worker, remote_address, &ucp_ep_config(ep)->key,
                                 &key);

    /* Load new configuration */
    status = ucp_worker_get_ep_config(worker, &key, 1, &new_cfg_index);
    if (status != UCS_OK) {
        return status;
    }

    if (ep->cfg_index == new_cfg_index) {
        return UCS_OK; /* No change */
    }

    if ((ep->cfg_index != 0) && !ucp_ep_is_sockaddr_stub(ep)) {
        /*
         * TODO handle a case where we have to change lanes and reconfigure the ep:
         *
         * - if we already have uct ep connected to an address - move it to the new lane index
         * - if we don't yet have connection to an address - create it
         * - if an existing lane is not connected anymore - delete it (possibly)
         * - if the configuration has changed - replay all pending operations on all lanes -
         *   need that every pending callback would return, in case of failure, the number
         *   of lane it wants to be queued on.
         */
        ucs_debug("cannot reconfigure ep %p from [%d] to [%d]", ep, ep->cfg_index,
                  new_cfg_index);
        ucp_wireup_print_config(worker->context, &ucp_ep_config(ep)->key, "old",
			       NULL, UCS_LOG_LEVEL_ERROR);
        ucp_wireup_print_config(worker->context, &key, "new", NULL, UCS_LOG_LEVEL_ERROR);
        ucs_fatal("endpoint reconfiguration not supported yet");
    }

    cm_wireup_ep  = ucp_ep_get_cm_wireup_ep(ep);
    ep->cfg_index = new_cfg_index;
    ep->am_lane   = key.am_lane;

    snprintf(str, sizeof(str), "ep %p", ep);
    ucp_wireup_print_config(worker->context, &ucp_ep_config(ep)->key, str,
                            addr_indices, UCS_LOG_LEVEL_DEBUG);

    /* establish connections on all underlying endpoints */
    for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
        if (ucp_ep_get_cm_lane(ep) == lane) {
            /* restore the cm lane after reconfiguration */
            ep->uct_eps[lane] = &cm_wireup_ep->super.super;
            continue;
        }

        status = ucp_wireup_connect_lane(ep, ep_init_flags, lane,
                                         remote_address, addr_indices[lane]);
        if (status != UCS_OK) {
            return status;
        }
    }

    status = ucp_wireup_resolve_proxy_lanes(ep);
    if (status != UCS_OK) {
        return status;
    }

    /* If we don't have a p2p transport, we're connected */
    if (!ucp_ep_config(ep)->p2p_lanes) {
        ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED;
    }

    return UCS_OK;
}

ucs_status_t ucp_wireup_send_request(ucp_ep_h ep)
{
    ucp_rsc_index_t rsc_index;
    ucs_status_t status;
    uint64_t tl_bitmap;

    tl_bitmap = ucp_wireup_get_ep_tl_bitmap(ep, UCS_MASK(ucp_ep_num_lanes(ep)));

    /* TODO make sure such lane would exist */
    rsc_index = ucp_wireup_ep_get_aux_rsc_index(
                    ep->uct_eps[ucp_ep_get_wireup_msg_lane(ep)]);
    if (rsc_index != UCP_NULL_RESOURCE) {
        tl_bitmap |= UCS_BIT(rsc_index);
    }

    ucs_debug("ep %p: send wireup request (flags=0x%x)", ep, ep->flags);
    status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_REQUEST, tl_bitmap, NULL);

    ep->flags |= UCP_EP_FLAG_CONNECT_REQ_QUEUED;

    return status;
}

static void ucp_wireup_connect_remote_purge_cb(uct_pending_req_t *self, void *arg)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
    ucs_queue_head_t *queue = arg;

    ucs_trace_req("ep %p: extracted request %p from pending queue", req->send.ep,
                  req);
    ucs_queue_push(queue, (ucs_queue_elem_t*)&req->send.uct.priv);
}

ucs_status_t ucp_wireup_send_pre_request(ucp_ep_h ep)
{
    ucp_rsc_index_t rsc_tli[UCP_MAX_LANES];
    uint64_t tl_bitmap = UINT64_MAX;  /* pack full worker address */
    ucs_status_t status;

    ucs_assert(ep->flags & UCP_EP_FLAG_LISTENER);
    ucs_assert(!(ep->flags & UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED));
    memset(rsc_tli, UCP_NULL_RESOURCE, sizeof(rsc_tli));

    ucs_debug("ep %p: send wireup pre-request (flags=0x%x)", ep, ep->flags);
    status = ucp_wireup_msg_send(ep, UCP_WIREUP_MSG_PRE_REQUEST, tl_bitmap, rsc_tli);

    ep->flags |= UCP_EP_FLAG_CONNECT_PRE_REQ_QUEUED;
    return status;
}

ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane)
{
    ucs_queue_head_t tmp_q;
    ucs_status_t status;
    ucp_request_t *req;
    uct_ep_h uct_ep;

    ucs_trace("ep %p: connect lane %d to remote peer", ep, lane);

    ucs_assert(lane != UCP_NULL_LANE);

    UCS_ASYNC_BLOCK(&ep->worker->async);

    /* checking again, with lock held, if already connected or connection is
     * in progress */
    if ((ep->flags & UCP_EP_FLAG_DEST_EP) ||
        ucp_wireup_ep_test(ep->uct_eps[lane])) {
        status = UCS_OK;
        goto out_unlock;
    }

    if (ucp_proxy_ep_test(ep->uct_eps[lane])) {
        /* signaling ep is not needed now since we will send wireup request
         * with signaling flag
         */
        uct_ep = ucp_proxy_ep_extract(ep->uct_eps[lane]);
        uct_ep_destroy(ep->uct_eps[lane]);
    } else {
        uct_ep = ep->uct_eps[lane];
    }

    ucs_assert(!(ep->flags & UCP_EP_FLAG_REMOTE_CONNECTED));

    ucs_trace("ep %p: connect lane %d to remote peer with wireup ep", ep, lane);

    /* make ep->uct_eps[lane] a stub */
    status = ucp_wireup_ep_create(ep, &ep->uct_eps[lane]);
    if (status != UCS_OK) {
        goto err;
    }

    /* Extract all pending requests from the transport endpoint, otherwise they
     * will prevent the wireup message from being sent (because those requests
     * could not be progressed any more after switching to wireup proxy).
     */
    ucs_queue_head_init(&tmp_q);
    uct_ep_pending_purge(uct_ep, ucp_wireup_connect_remote_purge_cb, &tmp_q);

    /* the wireup ep should use the existing [am_lane] as next_ep */
    ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep);

    if (!(ep->flags & UCP_EP_FLAG_CONNECT_REQ_QUEUED)) {
        status = ucp_wireup_send_request(ep);
        if (status != UCS_OK) {
            goto err_destroy_wireup_ep;
        }
    }

    ucs_queue_for_each_extract(req, &tmp_q, send.uct.priv, 1) {
        ucs_trace_req("ep %p: requeue request %p after wireup request",
                      req->send.ep, req);
        status = uct_ep_pending_add(ep->uct_eps[lane], &req->send.uct,
                                    (req->send.uct.func == ucp_wireup_msg_progress) ||
                                    (req->send.uct.func == ucp_wireup_ep_progress_pending) ?
                                    UCT_CB_FLAG_ASYNC : 0);
        if (status != UCS_OK) {
            ucs_fatal("wireup proxy function must always return UCS_OK");
        }
    }

    goto out_unlock;

err_destroy_wireup_ep:
    uct_ep_destroy(ep->uct_eps[lane]);
err:
    ep->uct_eps[lane] = uct_ep; /* restore am lane */
out_unlock:
    UCS_ASYNC_UNBLOCK(&ep->worker->async);
    return status;
}

static void ucp_wireup_msg_dump(ucp_worker_h worker, uct_am_trace_type_t type,
                                uint8_t id, const void *data, size_t length,
                                char *buffer, size_t max)
{
    ucp_context_h context       = worker->context;
    const ucp_wireup_msg_t *msg = data;
    ucp_unpacked_address_t unpacked_address;
    const ucp_address_entry_t *ae;
    ucp_tl_resource_desc_t *rsc;
    unsigned ep_addr_index;
    ucs_status_t status;
    char *p, *end;
    ucp_rsc_index_t tl;

    status = ucp_address_unpack(worker, msg + 1, ~UCP_ADDRESS_PACK_FLAG_TRACE,
                                &unpacked_address);
    if (status != UCS_OK) {
        strncpy(unpacked_address.name, "<malformed address>", UCP_WORKER_NAME_MAX);
        unpacked_address.uuid          = 0;
        unpacked_address.address_count = 0;
        unpacked_address.address_list  = NULL;
    }

    p   = buffer;
    end = buffer + max;

    snprintf(p, end - p,
             "WIREUP %s [%s uuid 0x%"PRIx64" src_ep 0x%lx dst_ep 0x%lx conn_sn %d]",
             ucp_wireup_msg_str(msg->type), unpacked_address.name,
             unpacked_address.uuid, msg->src_ep_ptr, msg->dest_ep_ptr,
             msg->conn_sn);
    p += strlen(p);

    if (unpacked_address.address_list == NULL) {
        return; /* No addresses were unpacked */
    }

    ucp_unpacked_address_for_each(ae, &unpacked_address) {
        ucs_for_each_bit(tl, context->tl_bitmap) {
            rsc = &context->tl_rscs[tl];
            if (ae->tl_name_csum == rsc->tl_name_csum) {
                snprintf(p, end - p, " "UCT_TL_RESOURCE_DESC_FMT,
                         UCT_TL_RESOURCE_DESC_ARG(&rsc->tl_rsc));
                p += strlen(p);
                break;
            }
        }
        snprintf(p, end - p, "/md[%d]", ae->md_index);
        p += strlen(p);

        for (ep_addr_index = 0; ep_addr_index < ae->num_ep_addrs;
             ++ep_addr_index) {
            snprintf(p, end - p, "/lane[%d]", ae->ep_addrs[ep_addr_index].lane);
            p += strlen(p);
        }
    }

    ucs_free(unpacked_address.address_list);
}

int ucp_worker_iface_is_tl_p2p(const uct_iface_attr_t *iface_attr)
{
    uint64_t flags = iface_attr->cap.flags;

    return (flags & UCT_IFACE_FLAG_CONNECT_TO_EP) &&
           !(flags & UCT_IFACE_FLAG_CONNECT_TO_IFACE);
}

static ucp_err_handling_mode_t
ucp_ep_params_err_handling_mode(const ucp_ep_params_t *params)
{
    return (params->field_mask & UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE) ?
           params->err_mode : UCP_ERR_HANDLING_MODE_NONE;
}

unsigned ucp_ep_init_flags(const ucp_worker_h worker,
                           const ucp_ep_params_t *params)
{
    unsigned flags = ucp_cm_ep_init_flags(worker, params);

    if (!ucp_worker_sockaddr_is_cm_proto(worker) &&
        (params->field_mask & UCP_EP_PARAM_FIELD_SOCK_ADDR)) {
        flags |= UCP_EP_INIT_CREATE_AM_LANE;
    }

    if (ucp_ep_params_err_handling_mode(params) == UCP_ERR_HANDLING_MODE_PEER) {
        flags |= UCP_EP_INIT_ERR_MODE_PEER_FAILURE;
    }

    return flags;
}

UCP_DEFINE_AM(UINT64_MAX, UCP_AM_ID_WIREUP, ucp_wireup_msg_handler,
              ucp_wireup_msg_dump, UCT_CB_FLAG_ASYNC);