Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "address.h"
#include "wireup_ep.h"

#include <ucp/core/ucp_worker.h>
#include <ucp/core/ucp_ep.inl>
#include <ucs/arch/bitops.h>
#include <ucs/debug/log.h>
#include <inttypes.h>


/*
 * Packed address layout:
 *
 * [ uuid(64bit) | worker_name(string) ]
 * [ device1_md_index | device1_address(var) ]
 *    [ tl1_name_csum(string) | tl1_info | tl1_address(var) ]
 *    [ tl2_name_csum(string) | tl2_info | tl2_address(var) ]
 *    ...
 * [ device2_md_index | device2_address(var) ]
 *    ...
 *
 *   * worker_name is packed if ENABLE_DEBUG is set.
 *   * In unified mode tl_info contains just rsc_index and iface latency overhead.
 *     For last address in the tl address list, it will have LAST flag set.
 *   * In non unified mode tl_info contains iface attributes. LAST flag is set in
 *     iface address length.
 *   * If a device does not have tl addresses, it's md_index will have the flag
 *     EMPTY.
 *   * If the address list is empty, then it will contain only a single md_index
 *     which equals to UCP_NULL_RESOURCE.
 *   * For non-unified mode, ep address contains length with flags. Multiple ep
 *     addresses could be present and the last one is marked with the flag
 *     UCP_ADDRESS_FLAG_LAST. For unified mode, there could not be more than one
 *     ep address.
 *   * For any mode, ep address is followed by a lane index.
 */


typedef struct {
    size_t           dev_addr_len;
    uint64_t         tl_bitmap;
    ucp_rsc_index_t  rsc_index;
    ucp_rsc_index_t  tl_count;
    size_t           tl_addrs_size;
} ucp_address_packed_device_t;


typedef struct {
    float            overhead;
    float            bandwidth;
    float            lat_ovh;
    uint32_t         prio_cap_flags; /* 8 lsb: prio, 22 msb: cap flags, 2 hsb: amo */
} ucp_address_packed_iface_attr_t;


/* In unified mode we pack resource index instead of iface attrs to the address,
 * so the peer can get all attrs from the local device with the same resource
 * index.
 * Also we send information which depends on device NUMA locality,
 * which may be different on peers (processes which do address pack
 * and address unpack):
 * - latency overhead
 * - Indication whether resource can be used for atomics or not (packed to the
 *   signed bit of lat_ovh).
 *
 * TODO: Revise/fix this when NUMA locality is exposed in UCP.
 * */
typedef struct {
    ucp_rsc_index_t  rsc_index;
    float            lat_ovh;
} ucp_address_unified_iface_attr_t;


#define UCT_ADDRESS_FLAG_ATOMIC32     UCS_BIT(30) /* 32bit atomic operations */
#define UCT_ADDRESS_FLAG_ATOMIC64     UCS_BIT(31) /* 64bit atomic operations */

#define UCP_ADDRESS_FLAG_LAST         0x80   /* Last address in the list */
#define UCP_ADDRESS_FLAG_HAVE_EP_ADDR 0x40   /* Indicates that ep addr is packed
                                                right after iface addr */
#define UCP_ADDRESS_FLAG_LEN_MASK     ~(UCP_ADDRESS_FLAG_HAVE_EP_ADDR | \
                                        UCP_ADDRESS_FLAG_LAST)

#define UCP_ADDRESS_FLAG_EMPTY        0x80   /* Device without TL addresses */
#define UCP_ADDRESS_FLAG_MD_ALLOC     0x40   /* MD can register  */
#define UCP_ADDRESS_FLAG_MD_REG       0x20   /* MD can allocate */
#define UCP_ADDRESS_FLAG_MD_MASK      ~(UCP_ADDRESS_FLAG_EMPTY | \
                                        UCP_ADDRESS_FLAG_MD_ALLOC | \
                                        UCP_ADDRESS_FLAG_MD_REG)

static size_t ucp_address_worker_name_size(ucp_worker_h worker, uint64_t flags)
{
#if ENABLE_DEBUG_DATA
    return (flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME) ?
           strlen(ucp_worker_get_name(worker)) + 1 : 0;
#else
    return 0;
#endif
}

static size_t ucp_address_iface_attr_size(ucp_worker_t *worker)
{
    return ucp_worker_unified_mode(worker) ?
           sizeof(ucp_address_unified_iface_attr_t) :
           sizeof(ucp_address_packed_iface_attr_t);
}

static uint64_t ucp_worker_iface_can_connect(uct_iface_attr_t *attrs)
{
    return attrs->cap.flags &
           (UCT_IFACE_FLAG_CONNECT_TO_IFACE | UCT_IFACE_FLAG_CONNECT_TO_EP);
}

/* Pack a string and return a pointer to storage right after the string */
static void* ucp_address_pack_worker_name(ucp_worker_h worker, void *dest,
                                          uint64_t flags)
{
#if ENABLE_DEBUG_DATA
    const char *s;
    size_t length;

    if (!(flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME)) {
        return dest;
    }

    s      = ucp_worker_get_name(worker);
    length = strlen(s);
    ucs_assert(length <= UINT8_MAX);
    *(uint8_t*)dest = length;
    memcpy(UCS_PTR_TYPE_OFFSET(dest, uint8_t), s, length);
    return UCS_PTR_BYTE_OFFSET(UCS_PTR_TYPE_OFFSET(dest, uint8_t), length);
#else
    return dest;
#endif
}

/* Unpack a string and return pointer to next storage byte */
static const void* ucp_address_unpack_worker_name(const void *src, char *s,
                                                  size_t max, uint64_t flags)
{
#if ENABLE_DEBUG_DATA
    size_t length, avail;

    if (!(flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME)) {
        s[0] = '\0';
        return src;
    }

    ucs_assert(max >= 1);
    length   = *(const uint8_t*)src;
    avail    = ucs_min(length, max - 1);
    memcpy(s, UCS_PTR_TYPE_OFFSET(src, uint8_t), avail);
    s[avail] = '\0';
    return UCS_PTR_TYPE_OFFSET(UCS_PTR_BYTE_OFFSET(src, length), uint8_t);
#else
    s[0] = '\0';
    return src;
#endif
}

static ucp_address_packed_device_t*
ucp_address_get_device(ucp_context_h context, ucp_rsc_index_t rsc_index,
                       ucp_address_packed_device_t *devices,
                       ucp_rsc_index_t *num_devices_p)
{
    const ucp_tl_resource_desc_t *tl_rsc = context->tl_rscs;
    ucp_address_packed_device_t *dev;

    for (dev = devices; dev < devices + *num_devices_p; ++dev) {
        if ((tl_rsc[rsc_index].md_index == tl_rsc[dev->rsc_index].md_index) &&
            !strcmp(tl_rsc[rsc_index].tl_rsc.dev_name,
                    tl_rsc[dev->rsc_index].tl_rsc.dev_name)) {
            goto out;
        }
    }

    dev = &devices[(*num_devices_p)++];
    memset(dev, 0, sizeof(*dev));
out:
    return dev;
}

static ucs_status_t
ucp_address_gather_devices(ucp_worker_h worker, ucp_ep_h ep, uint64_t tl_bitmap,
                           uint64_t flags, ucp_address_packed_device_t **devices_p,
                           ucp_rsc_index_t *num_devices_p)
{
    ucp_context_h context = worker->context;
    ucp_address_packed_device_t *dev, *devices;
    uct_iface_attr_t *iface_attr;
    ucp_rsc_index_t num_devices;
    ucp_rsc_index_t rsc_index;
    ucp_lane_index_t lane;
    unsigned num_ep_addrs;

    devices = ucs_calloc(context->num_tls, sizeof(*devices), "packed_devices");
    if (devices == NULL) {
        return UCS_ERR_NO_MEMORY;
    }

    num_devices = 0;
    tl_bitmap  &= context->tl_bitmap;
    ucs_for_each_bit(rsc_index, tl_bitmap) {
        iface_attr = ucp_worker_iface_get_attr(worker, rsc_index);
        if (!ucp_worker_iface_can_connect(iface_attr)) {
            continue;
        }

        dev = ucp_address_get_device(context, rsc_index, devices, &num_devices);

        if ((flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR) &&
            ucp_worker_iface_is_tl_p2p(iface_attr)) {
            /* Each lane which matches the resource index adds an ep address
             * entry. The length and flags is packed in non-unified mode only.
             */
            ucs_assert(ep != NULL);
            num_ep_addrs = 0;
            for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
                if (ucp_ep_get_rsc_index(ep, lane) == rsc_index) {
                    dev->tl_addrs_size += !ucp_worker_unified_mode(worker);
                    dev->tl_addrs_size += iface_attr->ep_addr_len;
                    dev->tl_addrs_size += sizeof(uint8_t); /* lane index */
                }
            }
            if (ucp_worker_unified_mode(worker)) {
                ucs_assertv_always(
                    num_ep_addrs <= 1,
                    "unexpected multiple ep addresses in unified mode");
            }
        }

        dev->tl_addrs_size += sizeof(uint16_t); /* tl name checksum */

        if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) {
            /* iface address (its length will be packed in non-unified mode only) */
            dev->tl_addrs_size += iface_attr->iface_addr_len;
            dev->tl_addrs_size += !ucp_worker_unified_mode(worker); /* if addr length */
            dev->tl_addrs_size += ucp_address_iface_attr_size(worker);
        } else {
            dev->tl_addrs_size += 1; /* 0-value for valid unpacking */
        }

        if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) {
            dev->dev_addr_len = iface_attr->device_addr_len;
        } else {
            dev->dev_addr_len = 0;
        }

        dev->rsc_index  = rsc_index;
        dev->tl_bitmap |= UCS_BIT(rsc_index);
    }

    *devices_p     = devices;
    *num_devices_p = num_devices;
    return UCS_OK;
}

static size_t ucp_address_packed_size(ucp_worker_h worker,
                                      const ucp_address_packed_device_t *devices,
                                      ucp_rsc_index_t num_devices,
                                      uint64_t flags)
{
    size_t size = 0;
    const ucp_address_packed_device_t *dev;

    if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) {
        size += sizeof(uint64_t);
    }

    size += ucp_address_worker_name_size(worker, flags);

    if (num_devices == 0) {
        size += 1;                      /* NULL md_index */
    } else {
        for (dev = devices; dev < (devices + num_devices); ++dev) {
            size += 1;                  /* device md_index */
            size += 1;                  /* device address length */
            if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) {
                size += dev->dev_addr_len;  /* device address */
            }
            size += dev->tl_addrs_size; /* transport addresses */
        }
    }
    return size;
}

static void ucp_address_memcheck(ucp_context_h context, void *ptr, size_t size,
                                 ucp_rsc_index_t rsc_index)
{

    void *undef_ptr;

    undef_ptr = (void*)VALGRIND_CHECK_MEM_IS_DEFINED(ptr, size);
    if (undef_ptr != NULL) {
        ucs_error(UCT_TL_RESOURCE_DESC_FMT
                  " address contains undefined bytes at offset %zd",
                  UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc),
                  UCS_PTR_BYTE_DIFF(ptr, undef_ptr));
    }
}

static int ucp_address_pack_iface_attr(ucp_worker_h worker, void *ptr,
                                       ucp_rsc_index_t index,
                                       const uct_iface_attr_t *iface_attr,
                                       int enable_atomics)
{
    ucp_address_packed_iface_attr_t  *packed;
    ucp_address_unified_iface_attr_t *unified;
    uint32_t packed_flag;
    uint64_t cap_flags;
    uint64_t bit;

    /* check if at least one of bandwidth values is 0 */
    if ((iface_attr->bandwidth.dedicated * iface_attr->bandwidth.shared) != 0) {
        ucs_error("Incorrect bandwidth value: one of bandwidth dedicated/shared must be zero");
        return -1;
    }


    if (ucp_worker_unified_mode(worker)) {
        /* In unified mode all workers have the same transports and tl bitmap.
         * Just send rsc index, so the remote peer could fetch iface attributes
         * from its local iface. Also send latency overhead, because it
         * depends on device NUMA locality. */
        unified            = ptr;
        unified->rsc_index = index;
        unified->lat_ovh   = enable_atomics ? -iface_attr->latency.overhead :
                                               iface_attr->latency.overhead;

        return sizeof(*unified);
    }

    packed    = ptr;
    cap_flags = iface_attr->cap.flags;

    packed->prio_cap_flags = ((uint8_t)iface_attr->priority);
    packed->overhead       = iface_attr->overhead;
    packed->bandwidth      = iface_attr->bandwidth.dedicated - iface_attr->bandwidth.shared;
    packed->lat_ovh        = iface_attr->latency.overhead;

    /* Keep only the bits defined by UCP_ADDRESS_IFACE_FLAGS, to shrink address. */
    packed_flag = UCS_BIT(8);
    bit         = 1;
    while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) {
        if (UCP_ADDRESS_IFACE_FLAGS & bit) {
            if (cap_flags & bit) {
                packed->prio_cap_flags |= packed_flag;
            }
            packed_flag <<= 1;
        }
        bit <<= 1;
    }

    if (enable_atomics) {
        if (ucs_test_all_flags(iface_attr->cap.atomic32.op_flags, UCP_ATOMIC_OP_MASK) &&
            ucs_test_all_flags(iface_attr->cap.atomic32.fop_flags, UCP_ATOMIC_FOP_MASK)) {
            packed->prio_cap_flags |= UCT_ADDRESS_FLAG_ATOMIC32;
        }
        if (ucs_test_all_flags(iface_attr->cap.atomic64.op_flags, UCP_ATOMIC_OP_MASK) &&
            ucs_test_all_flags(iface_attr->cap.atomic64.fop_flags, UCP_ATOMIC_FOP_MASK)) {
            packed->prio_cap_flags |= UCT_ADDRESS_FLAG_ATOMIC64;
        }
    }

    return sizeof(*packed);
}

static int
ucp_address_unpack_iface_attr(ucp_worker_t *worker,
                              ucp_address_iface_attr_t *iface_attr,
                              const void *ptr)
{
    const ucp_address_packed_iface_attr_t *packed;
    const ucp_address_unified_iface_attr_t *unified;
    ucp_worker_iface_t *wiface;
    uint32_t packed_flag;
    ucp_rsc_index_t rsc_idx;
    uint64_t bit;

    if (ucp_worker_unified_mode(worker)) {
        /* Address contains resources index and iface latency overhead
         * (not all iface attrs). */
        unified               = ptr;
        rsc_idx               = unified->rsc_index & UCP_ADDRESS_FLAG_LEN_MASK;
        iface_attr->lat_ovh   = fabs(unified->lat_ovh);
        wiface                = ucp_worker_iface(worker, rsc_idx);

        /* Just take the rest of iface attrs from the local resource. */
        iface_attr->cap_flags = wiface->attr.cap.flags;
        iface_attr->priority  = wiface->attr.priority;
        iface_attr->overhead  = wiface->attr.overhead;
        iface_attr->bandwidth = wiface->attr.bandwidth;
        if (signbit(unified->lat_ovh)) {
            iface_attr->atomic.atomic32.op_flags  = wiface->attr.cap.atomic32.op_flags;
            iface_attr->atomic.atomic32.fop_flags = wiface->attr.cap.atomic32.fop_flags;
            iface_attr->atomic.atomic64.op_flags  = wiface->attr.cap.atomic64.op_flags;
            iface_attr->atomic.atomic64.fop_flags = wiface->attr.cap.atomic64.fop_flags;
        }

        return sizeof(*unified);
    }

    packed                          = ptr;
    iface_attr->cap_flags           = 0;
    iface_attr->priority            = packed->prio_cap_flags & UCS_MASK(8);
    iface_attr->overhead            = packed->overhead;
    iface_attr->bandwidth.dedicated = ucs_max(0.0, packed->bandwidth);
    iface_attr->bandwidth.shared    = ucs_max(0.0, -packed->bandwidth);
    iface_attr->lat_ovh             = packed->lat_ovh;

    packed_flag = UCS_BIT(8);
    bit         = 1;
    while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) {
        if (UCP_ADDRESS_IFACE_FLAGS & bit) {
            if (packed->prio_cap_flags & packed_flag) {
                iface_attr->cap_flags |= bit;
            }
            packed_flag <<= 1;
        }
        bit <<= 1;
    }

    if (packed->prio_cap_flags & UCT_ADDRESS_FLAG_ATOMIC32) {
        iface_attr->atomic.atomic32.op_flags  |= UCP_ATOMIC_OP_MASK;
        iface_attr->atomic.atomic32.fop_flags |= UCP_ATOMIC_FOP_MASK;
    }
    if (packed->prio_cap_flags & UCT_ADDRESS_FLAG_ATOMIC64) {
        iface_attr->atomic.atomic64.op_flags  |= UCP_ATOMIC_OP_MASK;
        iface_attr->atomic.atomic64.fop_flags |= UCP_ATOMIC_FOP_MASK;
    }

    return sizeof(*packed);
}

static void*
ucp_address_iface_flags_ptr(ucp_worker_h worker, void *attr_ptr, int attr_len)
{
    if (ucp_worker_unified_mode(worker)) {
        /* In unified mode, rsc_index is packed instead of attrs. Address flags
         * will be packed in the end of rsc_index byte. */
        UCS_STATIC_ASSERT(ucs_offsetof(ucp_address_unified_iface_attr_t,
                                       rsc_index) == 0);
        return attr_ptr;
    }

    /* In non-unified mode, address flags will be packed in the end of
     * iface addr length byte, which is packed right after iface attrs. */
    return UCS_PTR_BYTE_OFFSET(attr_ptr, attr_len);
}

static void*
ucp_address_pack_length(ucp_worker_h worker, void *ptr, size_t addr_length)
{
    if (ucp_worker_unified_mode(worker)) {
        return ptr;
    }

    ucs_assert(addr_length <= UCP_ADDRESS_FLAG_LEN_MASK);
    *(uint8_t*)ptr = addr_length;

    return UCS_PTR_TYPE_OFFSET(ptr, uint8_t);
}

static const void*
ucp_address_unpack_length(ucp_worker_h worker, const void* flags_ptr, const void *ptr,
                          size_t *addr_length, int is_ep_addr, int *is_last)
{
    ucp_rsc_index_t rsc_index;
    uct_iface_attr_t *attr;
    const ucp_address_unified_iface_attr_t *unified;

    if (ucp_worker_unified_mode(worker)) {
        /* In unified mode:
         * - flags are packed with rsc index in ucp_address_unified_iface_attr_t
         * - iface and ep addr lengths are not packed, need to take them from
         *   local iface attrs */
        unified   = flags_ptr;
        rsc_index = unified->rsc_index & UCP_ADDRESS_FLAG_LEN_MASK;
        attr      = ucp_worker_iface_get_attr(worker, rsc_index);

        ucs_assert(&unified->rsc_index == flags_ptr);

        if (is_ep_addr) {
            *addr_length = attr->ep_addr_len;
            *is_last     = 1; /* in unified mode, there's only 1 ep address */
        } else {
            *addr_length = attr->iface_addr_len;
            *is_last     = unified->rsc_index & UCP_ADDRESS_FLAG_LAST;
        }
        return ptr;
    }

    *is_last     = *(uint8_t*)ptr & UCP_ADDRESS_FLAG_LAST;
    *addr_length = *(uint8_t*)ptr & UCP_ADDRESS_FLAG_LEN_MASK;

    return UCS_PTR_TYPE_OFFSET(ptr, uint8_t);
}

static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep,
                                        void *buffer, size_t size,
                                        uint64_t tl_bitmap, uint64_t flags,
                                        const ucp_lane_index_t *lanes2remote,
                                        const ucp_address_packed_device_t *devices,
                                        ucp_rsc_index_t num_devices)
{
    ucp_context_h context       = worker->context;
    uint64_t md_flags_pack_mask = (UCT_MD_FLAG_REG | UCT_MD_FLAG_ALLOC);
    const ucp_address_packed_device_t *dev;
    uct_iface_attr_t *iface_attr;
    ucp_rsc_index_t md_index;
    ucp_worker_iface_t *wiface;
    ucp_rsc_index_t rsc_index;
    ucp_lane_index_t lane, remote_lane;
    void *flags_ptr, *ep_flags_ptr;
    uint64_t dev_tl_bitmap;
    unsigned num_ep_addrs;
    ucs_status_t status;
    size_t iface_addr_len;
    size_t ep_addr_len;
    uint64_t md_flags;
    unsigned index;
    int attr_len;
    void *ptr;
    int enable_amo;

    ptr   = buffer;
    index = 0;

    if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) {
        *(uint64_t*)ptr = worker->uuid;
        ptr = UCS_PTR_TYPE_OFFSET(ptr, worker->uuid);
    }

    ptr = ucp_address_pack_worker_name(worker, ptr, flags);

    if (num_devices == 0) {
        *((uint8_t*)ptr) = UCP_NULL_RESOURCE;
        ptr = UCS_PTR_TYPE_OFFSET(ptr, UCP_NULL_RESOURCE);
        goto out;
    }

    for (dev = devices; dev < (devices + num_devices); ++dev) {

        dev_tl_bitmap = context->tl_bitmap & dev->tl_bitmap;

        /* MD index */
        md_index       = context->tl_rscs[dev->rsc_index].md_index;
        md_flags       = context->tl_mds[md_index].attr.cap.flags & md_flags_pack_mask;
        ucs_assert_always(!(md_index & ~UCP_ADDRESS_FLAG_MD_MASK));

        *(uint8_t*)ptr = md_index |
                         ((dev_tl_bitmap == 0)           ? UCP_ADDRESS_FLAG_EMPTY    : 0) |
                         ((md_flags & UCT_MD_FLAG_ALLOC) ? UCP_ADDRESS_FLAG_MD_ALLOC : 0) |
                         ((md_flags & UCT_MD_FLAG_REG)   ? UCP_ADDRESS_FLAG_MD_REG   : 0);
        ptr = UCS_PTR_TYPE_OFFSET(ptr, md_index);

        /* Device address length */
        *(uint8_t*)ptr = (dev == (devices + num_devices - 1)) ?
                         UCP_ADDRESS_FLAG_LAST : 0;
        if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) {
            ucs_assert(dev->dev_addr_len < UCP_ADDRESS_FLAG_LAST);
            *(uint8_t*)ptr |= dev->dev_addr_len;
        }
        ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);

        /* Device address */
        if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) {
            wiface = ucp_worker_iface(worker, dev->rsc_index);
            status = uct_iface_get_device_address(wiface->iface,
                                                  (uct_device_addr_t*)ptr);
            if (status != UCS_OK) {
                return status;
            }

            ucp_address_memcheck(context, ptr, dev->dev_addr_len, dev->rsc_index);
            ptr = UCS_PTR_BYTE_OFFSET(ptr, dev->dev_addr_len);
        }

        flags_ptr = NULL;
        ucs_for_each_bit(rsc_index, dev_tl_bitmap) {

            wiface     = ucp_worker_iface(worker, rsc_index);
            iface_attr = &wiface->attr;

            if (!ucp_worker_iface_can_connect(iface_attr)) {
                return UCS_ERR_INVALID_ADDR;
            }

            /* Transport name checksum */
            *(uint16_t*)ptr = context->tl_rscs[rsc_index].tl_name_csum;
            ptr = UCS_PTR_TYPE_OFFSET(ptr,
                                      context->tl_rscs[rsc_index].tl_name_csum);

            /* Transport information */
            enable_amo = worker->atomic_tls & UCS_BIT(rsc_index);
            attr_len   = ucp_address_pack_iface_attr(worker, ptr, rsc_index,
                                                     iface_attr, enable_amo);
            if (attr_len < 0) {
                return UCS_ERR_INVALID_ADDR;
            }

            ucp_address_memcheck(context, ptr, attr_len, rsc_index);

            if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) {
                iface_addr_len = iface_attr->iface_addr_len;
            } else {
                iface_addr_len = 0;
            }

            flags_ptr = ucp_address_iface_flags_ptr(worker, ptr, attr_len);
            ptr       = UCS_PTR_BYTE_OFFSET(ptr, attr_len);
            ucs_assertv(iface_addr_len < UCP_ADDRESS_FLAG_HAVE_EP_ADDR,
                        "iface_addr_len=%zu", iface_addr_len);

            /* Pack iface address */
            ptr = ucp_address_pack_length(worker, ptr, iface_addr_len);
            if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) {
                status = uct_iface_get_address(wiface->iface,
                                               (uct_iface_addr_t*)ptr);
                if (status != UCS_OK) {
                    return status;
                }

                ucp_address_memcheck(context, ptr, iface_addr_len, rsc_index);
                ptr = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len);
            }

            /* Pack ep address if present: iterate over all lanes which use the
             * current resource (rsc_index) and pack their addresses. The last
             * one is marked with UCP_ADDRESS_FLAG_LAST in its length field.
             */
            num_ep_addrs = 0;
            if ((flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR) &&
                ucp_worker_iface_is_tl_p2p(iface_attr)) {

                ucs_assert(ep != NULL);
                ep_addr_len  = iface_attr->ep_addr_len;
                ep_flags_ptr = NULL;

                for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
                    if (ucp_ep_get_rsc_index(ep, lane) != rsc_index) {
                        continue;
                    }

                    /* pack ep address length and save pointer to flags */
                    ep_flags_ptr = ptr;
                    ptr          = ucp_address_pack_length(worker, ptr,
                                                           ep_addr_len);

                    /* pack ep address */
                    status = uct_ep_get_address(ep->uct_eps[lane], ptr);
                    if (status != UCS_OK) {
                        return status;
                    }

                    ucp_address_memcheck(context, ptr, ep_addr_len, rsc_index);
                    ptr = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len);

                    /* pack ep lane index */
                    remote_lane    = (lanes2remote == NULL) ? lane :
                                     lanes2remote[lane];
                    *(uint8_t*)ptr = remote_lane;
                    ptr            = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);

                    ucs_trace("pack addr[%d].ep_addr[%d] : len %zu lane %d->%d",
                               index, num_ep_addrs, ep_addr_len, lane,
                               remote_lane);

                    ++num_ep_addrs;
                }

                if (num_ep_addrs > 0) {
                    ucs_assert(ep_flags_ptr != NULL);
                    *(uint8_t*)flags_ptr    |= UCP_ADDRESS_FLAG_HAVE_EP_ADDR;
                    if (!ucp_worker_unified_mode(worker)) {
                        *(uint8_t*)ep_flags_ptr |= UCP_ADDRESS_FLAG_LAST;
                    }
                }
            }

            ucs_assert((num_ep_addrs > 0) ||
                       !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR));

            if (flags & UCP_ADDRESS_PACK_FLAG_TRACE) {
                ucs_trace("pack addr[%d] : "UCT_TL_RESOURCE_DESC_FMT" "
                          "eps %u md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e + %e/n ovh %e "
                          "lat_ovh %e dev_priority %d a32 0x%lx/0x%lx a64 0x%lx/0x%lx",
                          index,
                          UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc),
                          num_ep_addrs, md_flags, iface_attr->cap.flags,
                          iface_attr->bandwidth.dedicated,
                          iface_attr->bandwidth.shared,
                          iface_attr->overhead,
                          iface_attr->latency.overhead,
                          iface_attr->priority,
                          iface_attr->cap.atomic32.op_flags,
                          iface_attr->cap.atomic32.fop_flags,
                          iface_attr->cap.atomic64.op_flags,
                          iface_attr->cap.atomic64.fop_flags);
            }

            ++index;
            ucs_assert(index <= UCP_MAX_RESOURCES);
        }

        /* flags_ptr is a valid pointer to the flags set to the last entry
         * during the above loop So, set the LAST flag for the flags_ptr
         * from the last iteration */
        if (flags_ptr != NULL) {
            ucs_assert(dev_tl_bitmap != 0);
            *(uint8_t*)flags_ptr |= UCP_ADDRESS_FLAG_LAST;
        } else {
            /* cppcheck-suppress internalAstError */
            ucs_assert(dev_tl_bitmap == 0);
        }
    }

out:
    ucs_assertv(UCS_PTR_BYTE_OFFSET(buffer, size) == ptr,
                "buffer=%p size=%zu ptr=%p ptr-buffer=%zd",
                buffer, size, ptr, UCS_PTR_BYTE_DIFF(buffer, ptr));
    return UCS_OK;
}

ucs_status_t ucp_address_pack(ucp_worker_h worker, ucp_ep_h ep,
                              uint64_t tl_bitmap, uint64_t flags,
                              const ucp_lane_index_t *lanes2remote,
                              size_t *size_p, void **buffer_p)
{
    ucp_address_packed_device_t *devices;
    ucp_rsc_index_t num_devices;
    ucs_status_t status;
    void *buffer;
    size_t size;

    if (ep == NULL) {
        flags &= ~UCP_ADDRESS_PACK_FLAG_EP_ADDR;
    }

    /* Collect all devices we want to pack */
    status = ucp_address_gather_devices(worker, ep, tl_bitmap, flags, &devices,
                                        &num_devices);
    if (status != UCS_OK) {
        goto out;
    }

    /* Calculate packed size */
    size = ucp_address_packed_size(worker, devices, num_devices, flags);

    /* Allocate address */
    buffer = ucs_malloc(size, "ucp_address");
    if (buffer == NULL) {
        status = UCS_ERR_NO_MEMORY;
        goto out_free_devices;
    }

    memset(buffer, 0, size);

    /* Pack the address */
    status = ucp_address_do_pack(worker, ep, buffer, size, tl_bitmap, flags,
                                 lanes2remote, devices, num_devices);
    if (status != UCS_OK) {
        ucs_free(buffer);
        goto out_free_devices;
    }

    VALGRIND_CHECK_MEM_IS_DEFINED(buffer, size);

    *size_p   = size;
    *buffer_p = buffer;
    status    = UCS_OK;

out_free_devices:
    ucs_free(devices);
out:
    return status;
}

ucs_status_t ucp_address_unpack(ucp_worker_t *worker, const void *buffer,
                                uint64_t flags,
                                ucp_unpacked_address_t *unpacked_address)
{
    ucp_address_entry_t *address_list, *address;
    ucp_address_entry_ep_addr_t *ep_addr;
    int last_dev, last_tl, last_ep_addr;
    const uct_device_addr_t *dev_addr;
    ucp_rsc_index_t dev_index;
    ucp_rsc_index_t md_index;
    unsigned address_count;
    int empty_dev;
    uint64_t md_flags;
    size_t dev_addr_len;
    size_t iface_addr_len;
    size_t ep_addr_len;
    size_t attr_len;
    uint8_t md_byte;
    const void *ptr;
    const void *aptr;
    const void *flags_ptr;

    ptr = buffer;
    if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) {
        unpacked_address->uuid = *(uint64_t*)ptr;
        ptr = UCS_PTR_TYPE_OFFSET(ptr, unpacked_address->uuid);
    } else {
        unpacked_address->uuid = 0;
    }

    aptr = ucp_address_unpack_worker_name(ptr, unpacked_address->name,
                                          sizeof(unpacked_address->name),
                                          flags);

    /* Count addresses */
    ptr           = aptr;
    address_count = 0;

    last_dev = (*(uint8_t*)ptr == UCP_NULL_RESOURCE);
    while (!last_dev) {
        /* md_index */
        empty_dev    = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EMPTY;
        ptr          = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);

        /* device address length */
        dev_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST;
        last_dev     = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
        ptr          = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);
        ptr          = UCS_PTR_BYTE_OFFSET(ptr, dev_addr_len);

        last_tl = empty_dev;
        while (!last_tl) {
            ptr       = UCS_PTR_TYPE_OFFSET(ptr, uint16_t); /* tl_name_csum */
            attr_len  = ucp_address_iface_attr_size(worker);
            flags_ptr = ucp_address_iface_flags_ptr(worker, (void*)ptr, attr_len);
            ptr       = UCS_PTR_BYTE_OFFSET(ptr, attr_len);
            ptr       = ucp_address_unpack_length(worker, flags_ptr, ptr,
                                                  &iface_addr_len, 0, &last_tl);
            ptr       = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len);

            last_ep_addr = !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR);
            while (!last_ep_addr) {
                ptr = ucp_address_unpack_length(worker, flags_ptr, ptr,
                                                &ep_addr_len, 1, &last_ep_addr);
                ucs_assert(flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR);
                ucs_assert(ep_addr_len > 0);
                ptr = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len);
                ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);
            }

            ++address_count;
            ucs_assert(address_count <= UCP_MAX_RESOURCES);
        }
    }

    if (address_count == 0) {
        address_list = NULL;
        goto out;
    }

    /* Allocate address list */
    address_list = ucs_calloc(address_count, sizeof(*address_list),
                              "ucp_address_list");
    if (address_list == NULL) {
        ucs_error("failed to allocate address list");
        return UCS_ERR_NO_MEMORY;
    }

    /* Unpack addresses */
    address   = address_list;
    ptr       = aptr;
    dev_index = 0;

    do {
        /* md_index */
        md_byte      = (*(uint8_t*)ptr);
        md_index     = md_byte & UCP_ADDRESS_FLAG_MD_MASK;
        md_flags     = (md_byte & UCP_ADDRESS_FLAG_MD_ALLOC) ? UCT_MD_FLAG_ALLOC : 0;
        md_flags    |= (md_byte & UCP_ADDRESS_FLAG_MD_REG)   ? UCT_MD_FLAG_REG   : 0;
        empty_dev    = md_byte & UCP_ADDRESS_FLAG_EMPTY;
        ptr          = UCS_PTR_TYPE_OFFSET(ptr, md_byte);

        /* device address length */
        dev_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST;
        last_dev     = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST;
        ptr          = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);

        dev_addr = ptr;
        ptr      = UCS_PTR_BYTE_OFFSET(ptr, dev_addr_len);

        last_tl = empty_dev;
        while (!last_tl) {
            /* tl_name_csum */
            address->tl_name_csum = *(uint16_t*)ptr;
            ptr = UCS_PTR_TYPE_OFFSET(ptr, address->tl_name_csum);

            address->dev_addr   = (dev_addr_len > 0) ? dev_addr : NULL;
            address->md_index   = md_index;
            address->dev_index  = dev_index;
            address->md_flags   = md_flags;

            attr_len  = ucp_address_unpack_iface_attr(worker, &address->iface_attr, ptr);
            flags_ptr = ucp_address_iface_flags_ptr(worker, (void*)ptr, attr_len);
            ptr       = UCS_PTR_BYTE_OFFSET(ptr, attr_len);
            ptr       = ucp_address_unpack_length(worker, flags_ptr, ptr,
                                                  &iface_addr_len, 0, &last_tl);
            address->iface_addr   = (iface_addr_len > 0) ? ptr : NULL;
            address->num_ep_addrs = 0;
            ptr                   = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len);

            last_ep_addr = !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR);
            while (!last_ep_addr) {
                ucs_assert(address->num_ep_addrs < UCP_MAX_LANES);
                ep_addr       = &address->ep_addrs[address->num_ep_addrs++];
                ptr           = ucp_address_unpack_length(worker, flags_ptr, ptr,
                                                          &ep_addr_len, 1,
                                                          &last_ep_addr);
                ep_addr->addr = ptr;
                ptr           = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len);

                ep_addr->lane = *(uint8_t*)ptr;
                ptr           = UCS_PTR_TYPE_OFFSET(ptr, uint8_t);
            }

            if (flags & UCP_ADDRESS_PACK_FLAG_TRACE) {
                ucs_trace("unpack addr[%d] : eps %u md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e + %e/n ovh %e "
                          "lat_ovh %e dev_priority %d a32 0x%lx/0x%lx a64 0x%lx/0x%lx",
                          (int)(address - address_list), address->num_ep_addrs,
                          address->md_flags, address->iface_attr.cap_flags,
                          address->iface_attr.bandwidth.dedicated,
                          address->iface_attr.bandwidth.shared,
                          address->iface_attr.overhead,
                          address->iface_attr.lat_ovh,
                          address->iface_attr.priority,
                          address->iface_attr.atomic.atomic32.op_flags,
                          address->iface_attr.atomic.atomic32.fop_flags,
                          address->iface_attr.atomic.atomic64.op_flags,
                          address->iface_attr.atomic.atomic64.fop_flags);
            }

            ++address;
        }

        ++dev_index;
    } while (!last_dev);

    ucs_assert((unsigned)(address - address_list) == address_count);

out:
    unpacked_address->address_count = address_count;
    unpacked_address->address_list  = address_list;
    return UCS_OK;
}