/** * Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "address.h" #include "wireup_ep.h" #include #include #include #include #include /* * Packed address layout: * * [ uuid(64bit) | worker_name(string) ] * [ device1_md_index | device1_address(var) ] * [ tl1_name_csum(string) | tl1_info | tl1_address(var) ] * [ tl2_name_csum(string) | tl2_info | tl2_address(var) ] * ... * [ device2_md_index | device2_address(var) ] * ... * * * worker_name is packed if ENABLE_DEBUG is set. * * In unified mode tl_info contains just rsc_index and iface latency overhead. * For last address in the tl address list, it will have LAST flag set. * * In non unified mode tl_info contains iface attributes. LAST flag is set in * iface address length. * * If a device does not have tl addresses, it's md_index will have the flag * EMPTY. * * If the address list is empty, then it will contain only a single md_index * which equals to UCP_NULL_RESOURCE. * * For non-unified mode, ep address contains length with flags. Multiple ep * addresses could be present and the last one is marked with the flag * UCP_ADDRESS_FLAG_LAST. For unified mode, there could not be more than one * ep address. * * For any mode, ep address is followed by a lane index. */ typedef struct { size_t dev_addr_len; uint64_t tl_bitmap; ucp_rsc_index_t rsc_index; ucp_rsc_index_t tl_count; size_t tl_addrs_size; } ucp_address_packed_device_t; typedef struct { float overhead; float bandwidth; float lat_ovh; uint32_t prio_cap_flags; /* 8 lsb: prio, 22 msb: cap flags, 2 hsb: amo */ } ucp_address_packed_iface_attr_t; /* In unified mode we pack resource index instead of iface attrs to the address, * so the peer can get all attrs from the local device with the same resource * index. * Also we send information which depends on device NUMA locality, * which may be different on peers (processes which do address pack * and address unpack): * - latency overhead * - Indication whether resource can be used for atomics or not (packed to the * signed bit of lat_ovh). * * TODO: Revise/fix this when NUMA locality is exposed in UCP. * */ typedef struct { ucp_rsc_index_t rsc_index; float lat_ovh; } ucp_address_unified_iface_attr_t; #define UCT_ADDRESS_FLAG_ATOMIC32 UCS_BIT(30) /* 32bit atomic operations */ #define UCT_ADDRESS_FLAG_ATOMIC64 UCS_BIT(31) /* 64bit atomic operations */ #define UCP_ADDRESS_FLAG_LAST 0x80 /* Last address in the list */ #define UCP_ADDRESS_FLAG_HAVE_EP_ADDR 0x40 /* Indicates that ep addr is packed right after iface addr */ #define UCP_ADDRESS_FLAG_LEN_MASK ~(UCP_ADDRESS_FLAG_HAVE_EP_ADDR | \ UCP_ADDRESS_FLAG_LAST) #define UCP_ADDRESS_FLAG_EMPTY 0x80 /* Device without TL addresses */ #define UCP_ADDRESS_FLAG_MD_ALLOC 0x40 /* MD can register */ #define UCP_ADDRESS_FLAG_MD_REG 0x20 /* MD can allocate */ #define UCP_ADDRESS_FLAG_MD_MASK ~(UCP_ADDRESS_FLAG_EMPTY | \ UCP_ADDRESS_FLAG_MD_ALLOC | \ UCP_ADDRESS_FLAG_MD_REG) static size_t ucp_address_worker_name_size(ucp_worker_h worker, uint64_t flags) { #if ENABLE_DEBUG_DATA return (flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME) ? strlen(ucp_worker_get_name(worker)) + 1 : 0; #else return 0; #endif } static size_t ucp_address_iface_attr_size(ucp_worker_t *worker) { return ucp_worker_unified_mode(worker) ? sizeof(ucp_address_unified_iface_attr_t) : sizeof(ucp_address_packed_iface_attr_t); } static uint64_t ucp_worker_iface_can_connect(uct_iface_attr_t *attrs) { return attrs->cap.flags & (UCT_IFACE_FLAG_CONNECT_TO_IFACE | UCT_IFACE_FLAG_CONNECT_TO_EP); } /* Pack a string and return a pointer to storage right after the string */ static void* ucp_address_pack_worker_name(ucp_worker_h worker, void *dest, uint64_t flags) { #if ENABLE_DEBUG_DATA const char *s; size_t length; if (!(flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME)) { return dest; } s = ucp_worker_get_name(worker); length = strlen(s); ucs_assert(length <= UINT8_MAX); *(uint8_t*)dest = length; memcpy(UCS_PTR_TYPE_OFFSET(dest, uint8_t), s, length); return UCS_PTR_BYTE_OFFSET(UCS_PTR_TYPE_OFFSET(dest, uint8_t), length); #else return dest; #endif } /* Unpack a string and return pointer to next storage byte */ static const void* ucp_address_unpack_worker_name(const void *src, char *s, size_t max, uint64_t flags) { #if ENABLE_DEBUG_DATA size_t length, avail; if (!(flags & UCP_ADDRESS_PACK_FLAG_WORKER_NAME)) { s[0] = '\0'; return src; } ucs_assert(max >= 1); length = *(const uint8_t*)src; avail = ucs_min(length, max - 1); memcpy(s, UCS_PTR_TYPE_OFFSET(src, uint8_t), avail); s[avail] = '\0'; return UCS_PTR_TYPE_OFFSET(UCS_PTR_BYTE_OFFSET(src, length), uint8_t); #else s[0] = '\0'; return src; #endif } static ucp_address_packed_device_t* ucp_address_get_device(ucp_context_h context, ucp_rsc_index_t rsc_index, ucp_address_packed_device_t *devices, ucp_rsc_index_t *num_devices_p) { const ucp_tl_resource_desc_t *tl_rsc = context->tl_rscs; ucp_address_packed_device_t *dev; for (dev = devices; dev < devices + *num_devices_p; ++dev) { if ((tl_rsc[rsc_index].md_index == tl_rsc[dev->rsc_index].md_index) && !strcmp(tl_rsc[rsc_index].tl_rsc.dev_name, tl_rsc[dev->rsc_index].tl_rsc.dev_name)) { goto out; } } dev = &devices[(*num_devices_p)++]; memset(dev, 0, sizeof(*dev)); out: return dev; } static ucs_status_t ucp_address_gather_devices(ucp_worker_h worker, ucp_ep_h ep, uint64_t tl_bitmap, uint64_t flags, ucp_address_packed_device_t **devices_p, ucp_rsc_index_t *num_devices_p) { ucp_context_h context = worker->context; ucp_address_packed_device_t *dev, *devices; uct_iface_attr_t *iface_attr; ucp_rsc_index_t num_devices; ucp_rsc_index_t rsc_index; ucp_lane_index_t lane; unsigned num_ep_addrs; devices = ucs_calloc(context->num_tls, sizeof(*devices), "packed_devices"); if (devices == NULL) { return UCS_ERR_NO_MEMORY; } num_devices = 0; tl_bitmap &= context->tl_bitmap; ucs_for_each_bit(rsc_index, tl_bitmap) { iface_attr = ucp_worker_iface_get_attr(worker, rsc_index); if (!ucp_worker_iface_can_connect(iface_attr)) { continue; } dev = ucp_address_get_device(context, rsc_index, devices, &num_devices); if ((flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR) && ucp_worker_iface_is_tl_p2p(iface_attr)) { /* Each lane which matches the resource index adds an ep address * entry. The length and flags is packed in non-unified mode only. */ ucs_assert(ep != NULL); num_ep_addrs = 0; for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (ucp_ep_get_rsc_index(ep, lane) == rsc_index) { dev->tl_addrs_size += !ucp_worker_unified_mode(worker); dev->tl_addrs_size += iface_attr->ep_addr_len; dev->tl_addrs_size += sizeof(uint8_t); /* lane index */ } } if (ucp_worker_unified_mode(worker)) { ucs_assertv_always( num_ep_addrs <= 1, "unexpected multiple ep addresses in unified mode"); } } dev->tl_addrs_size += sizeof(uint16_t); /* tl name checksum */ if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) { /* iface address (its length will be packed in non-unified mode only) */ dev->tl_addrs_size += iface_attr->iface_addr_len; dev->tl_addrs_size += !ucp_worker_unified_mode(worker); /* if addr length */ dev->tl_addrs_size += ucp_address_iface_attr_size(worker); } else { dev->tl_addrs_size += 1; /* 0-value for valid unpacking */ } if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) { dev->dev_addr_len = iface_attr->device_addr_len; } else { dev->dev_addr_len = 0; } dev->rsc_index = rsc_index; dev->tl_bitmap |= UCS_BIT(rsc_index); } *devices_p = devices; *num_devices_p = num_devices; return UCS_OK; } static size_t ucp_address_packed_size(ucp_worker_h worker, const ucp_address_packed_device_t *devices, ucp_rsc_index_t num_devices, uint64_t flags) { size_t size = 0; const ucp_address_packed_device_t *dev; if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) { size += sizeof(uint64_t); } size += ucp_address_worker_name_size(worker, flags); if (num_devices == 0) { size += 1; /* NULL md_index */ } else { for (dev = devices; dev < (devices + num_devices); ++dev) { size += 1; /* device md_index */ size += 1; /* device address length */ if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) { size += dev->dev_addr_len; /* device address */ } size += dev->tl_addrs_size; /* transport addresses */ } } return size; } static void ucp_address_memcheck(ucp_context_h context, void *ptr, size_t size, ucp_rsc_index_t rsc_index) { void *undef_ptr; undef_ptr = (void*)VALGRIND_CHECK_MEM_IS_DEFINED(ptr, size); if (undef_ptr != NULL) { ucs_error(UCT_TL_RESOURCE_DESC_FMT " address contains undefined bytes at offset %zd", UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), UCS_PTR_BYTE_DIFF(ptr, undef_ptr)); } } static int ucp_address_pack_iface_attr(ucp_worker_h worker, void *ptr, ucp_rsc_index_t index, const uct_iface_attr_t *iface_attr, int enable_atomics) { ucp_address_packed_iface_attr_t *packed; ucp_address_unified_iface_attr_t *unified; uint32_t packed_flag; uint64_t cap_flags; uint64_t bit; /* check if at least one of bandwidth values is 0 */ if ((iface_attr->bandwidth.dedicated * iface_attr->bandwidth.shared) != 0) { ucs_error("Incorrect bandwidth value: one of bandwidth dedicated/shared must be zero"); return -1; } if (ucp_worker_unified_mode(worker)) { /* In unified mode all workers have the same transports and tl bitmap. * Just send rsc index, so the remote peer could fetch iface attributes * from its local iface. Also send latency overhead, because it * depends on device NUMA locality. */ unified = ptr; unified->rsc_index = index; unified->lat_ovh = enable_atomics ? -iface_attr->latency.overhead : iface_attr->latency.overhead; return sizeof(*unified); } packed = ptr; cap_flags = iface_attr->cap.flags; packed->prio_cap_flags = ((uint8_t)iface_attr->priority); packed->overhead = iface_attr->overhead; packed->bandwidth = iface_attr->bandwidth.dedicated - iface_attr->bandwidth.shared; packed->lat_ovh = iface_attr->latency.overhead; /* Keep only the bits defined by UCP_ADDRESS_IFACE_FLAGS, to shrink address. */ packed_flag = UCS_BIT(8); bit = 1; while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) { if (UCP_ADDRESS_IFACE_FLAGS & bit) { if (cap_flags & bit) { packed->prio_cap_flags |= packed_flag; } packed_flag <<= 1; } bit <<= 1; } if (enable_atomics) { if (ucs_test_all_flags(iface_attr->cap.atomic32.op_flags, UCP_ATOMIC_OP_MASK) && ucs_test_all_flags(iface_attr->cap.atomic32.fop_flags, UCP_ATOMIC_FOP_MASK)) { packed->prio_cap_flags |= UCT_ADDRESS_FLAG_ATOMIC32; } if (ucs_test_all_flags(iface_attr->cap.atomic64.op_flags, UCP_ATOMIC_OP_MASK) && ucs_test_all_flags(iface_attr->cap.atomic64.fop_flags, UCP_ATOMIC_FOP_MASK)) { packed->prio_cap_flags |= UCT_ADDRESS_FLAG_ATOMIC64; } } return sizeof(*packed); } static int ucp_address_unpack_iface_attr(ucp_worker_t *worker, ucp_address_iface_attr_t *iface_attr, const void *ptr) { const ucp_address_packed_iface_attr_t *packed; const ucp_address_unified_iface_attr_t *unified; ucp_worker_iface_t *wiface; uint32_t packed_flag; ucp_rsc_index_t rsc_idx; uint64_t bit; if (ucp_worker_unified_mode(worker)) { /* Address contains resources index and iface latency overhead * (not all iface attrs). */ unified = ptr; rsc_idx = unified->rsc_index & UCP_ADDRESS_FLAG_LEN_MASK; iface_attr->lat_ovh = fabs(unified->lat_ovh); wiface = ucp_worker_iface(worker, rsc_idx); /* Just take the rest of iface attrs from the local resource. */ iface_attr->cap_flags = wiface->attr.cap.flags; iface_attr->priority = wiface->attr.priority; iface_attr->overhead = wiface->attr.overhead; iface_attr->bandwidth = wiface->attr.bandwidth; if (signbit(unified->lat_ovh)) { iface_attr->atomic.atomic32.op_flags = wiface->attr.cap.atomic32.op_flags; iface_attr->atomic.atomic32.fop_flags = wiface->attr.cap.atomic32.fop_flags; iface_attr->atomic.atomic64.op_flags = wiface->attr.cap.atomic64.op_flags; iface_attr->atomic.atomic64.fop_flags = wiface->attr.cap.atomic64.fop_flags; } return sizeof(*unified); } packed = ptr; iface_attr->cap_flags = 0; iface_attr->priority = packed->prio_cap_flags & UCS_MASK(8); iface_attr->overhead = packed->overhead; iface_attr->bandwidth.dedicated = ucs_max(0.0, packed->bandwidth); iface_attr->bandwidth.shared = ucs_max(0.0, -packed->bandwidth); iface_attr->lat_ovh = packed->lat_ovh; packed_flag = UCS_BIT(8); bit = 1; while (UCP_ADDRESS_IFACE_FLAGS & ~(bit - 1)) { if (UCP_ADDRESS_IFACE_FLAGS & bit) { if (packed->prio_cap_flags & packed_flag) { iface_attr->cap_flags |= bit; } packed_flag <<= 1; } bit <<= 1; } if (packed->prio_cap_flags & UCT_ADDRESS_FLAG_ATOMIC32) { iface_attr->atomic.atomic32.op_flags |= UCP_ATOMIC_OP_MASK; iface_attr->atomic.atomic32.fop_flags |= UCP_ATOMIC_FOP_MASK; } if (packed->prio_cap_flags & UCT_ADDRESS_FLAG_ATOMIC64) { iface_attr->atomic.atomic64.op_flags |= UCP_ATOMIC_OP_MASK; iface_attr->atomic.atomic64.fop_flags |= UCP_ATOMIC_FOP_MASK; } return sizeof(*packed); } static void* ucp_address_iface_flags_ptr(ucp_worker_h worker, void *attr_ptr, int attr_len) { if (ucp_worker_unified_mode(worker)) { /* In unified mode, rsc_index is packed instead of attrs. Address flags * will be packed in the end of rsc_index byte. */ UCS_STATIC_ASSERT(ucs_offsetof(ucp_address_unified_iface_attr_t, rsc_index) == 0); return attr_ptr; } /* In non-unified mode, address flags will be packed in the end of * iface addr length byte, which is packed right after iface attrs. */ return UCS_PTR_BYTE_OFFSET(attr_ptr, attr_len); } static void* ucp_address_pack_length(ucp_worker_h worker, void *ptr, size_t addr_length) { if (ucp_worker_unified_mode(worker)) { return ptr; } ucs_assert(addr_length <= UCP_ADDRESS_FLAG_LEN_MASK); *(uint8_t*)ptr = addr_length; return UCS_PTR_TYPE_OFFSET(ptr, uint8_t); } static const void* ucp_address_unpack_length(ucp_worker_h worker, const void* flags_ptr, const void *ptr, size_t *addr_length, int is_ep_addr, int *is_last) { ucp_rsc_index_t rsc_index; uct_iface_attr_t *attr; const ucp_address_unified_iface_attr_t *unified; if (ucp_worker_unified_mode(worker)) { /* In unified mode: * - flags are packed with rsc index in ucp_address_unified_iface_attr_t * - iface and ep addr lengths are not packed, need to take them from * local iface attrs */ unified = flags_ptr; rsc_index = unified->rsc_index & UCP_ADDRESS_FLAG_LEN_MASK; attr = ucp_worker_iface_get_attr(worker, rsc_index); ucs_assert(&unified->rsc_index == flags_ptr); if (is_ep_addr) { *addr_length = attr->ep_addr_len; *is_last = 1; /* in unified mode, there's only 1 ep address */ } else { *addr_length = attr->iface_addr_len; *is_last = unified->rsc_index & UCP_ADDRESS_FLAG_LAST; } return ptr; } *is_last = *(uint8_t*)ptr & UCP_ADDRESS_FLAG_LAST; *addr_length = *(uint8_t*)ptr & UCP_ADDRESS_FLAG_LEN_MASK; return UCS_PTR_TYPE_OFFSET(ptr, uint8_t); } static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, void *buffer, size_t size, uint64_t tl_bitmap, uint64_t flags, const ucp_lane_index_t *lanes2remote, const ucp_address_packed_device_t *devices, ucp_rsc_index_t num_devices) { ucp_context_h context = worker->context; uint64_t md_flags_pack_mask = (UCT_MD_FLAG_REG | UCT_MD_FLAG_ALLOC); const ucp_address_packed_device_t *dev; uct_iface_attr_t *iface_attr; ucp_rsc_index_t md_index; ucp_worker_iface_t *wiface; ucp_rsc_index_t rsc_index; ucp_lane_index_t lane, remote_lane; void *flags_ptr, *ep_flags_ptr; uint64_t dev_tl_bitmap; unsigned num_ep_addrs; ucs_status_t status; size_t iface_addr_len; size_t ep_addr_len; uint64_t md_flags; unsigned index; int attr_len; void *ptr; int enable_amo; ptr = buffer; index = 0; if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) { *(uint64_t*)ptr = worker->uuid; ptr = UCS_PTR_TYPE_OFFSET(ptr, worker->uuid); } ptr = ucp_address_pack_worker_name(worker, ptr, flags); if (num_devices == 0) { *((uint8_t*)ptr) = UCP_NULL_RESOURCE; ptr = UCS_PTR_TYPE_OFFSET(ptr, UCP_NULL_RESOURCE); goto out; } for (dev = devices; dev < (devices + num_devices); ++dev) { dev_tl_bitmap = context->tl_bitmap & dev->tl_bitmap; /* MD index */ md_index = context->tl_rscs[dev->rsc_index].md_index; md_flags = context->tl_mds[md_index].attr.cap.flags & md_flags_pack_mask; ucs_assert_always(!(md_index & ~UCP_ADDRESS_FLAG_MD_MASK)); *(uint8_t*)ptr = md_index | ((dev_tl_bitmap == 0) ? UCP_ADDRESS_FLAG_EMPTY : 0) | ((md_flags & UCT_MD_FLAG_ALLOC) ? UCP_ADDRESS_FLAG_MD_ALLOC : 0) | ((md_flags & UCT_MD_FLAG_REG) ? UCP_ADDRESS_FLAG_MD_REG : 0); ptr = UCS_PTR_TYPE_OFFSET(ptr, md_index); /* Device address length */ *(uint8_t*)ptr = (dev == (devices + num_devices - 1)) ? UCP_ADDRESS_FLAG_LAST : 0; if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) { ucs_assert(dev->dev_addr_len < UCP_ADDRESS_FLAG_LAST); *(uint8_t*)ptr |= dev->dev_addr_len; } ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); /* Device address */ if (flags & UCP_ADDRESS_PACK_FLAG_DEVICE_ADDR) { wiface = ucp_worker_iface(worker, dev->rsc_index); status = uct_iface_get_device_address(wiface->iface, (uct_device_addr_t*)ptr); if (status != UCS_OK) { return status; } ucp_address_memcheck(context, ptr, dev->dev_addr_len, dev->rsc_index); ptr = UCS_PTR_BYTE_OFFSET(ptr, dev->dev_addr_len); } flags_ptr = NULL; ucs_for_each_bit(rsc_index, dev_tl_bitmap) { wiface = ucp_worker_iface(worker, rsc_index); iface_attr = &wiface->attr; if (!ucp_worker_iface_can_connect(iface_attr)) { return UCS_ERR_INVALID_ADDR; } /* Transport name checksum */ *(uint16_t*)ptr = context->tl_rscs[rsc_index].tl_name_csum; ptr = UCS_PTR_TYPE_OFFSET(ptr, context->tl_rscs[rsc_index].tl_name_csum); /* Transport information */ enable_amo = worker->atomic_tls & UCS_BIT(rsc_index); attr_len = ucp_address_pack_iface_attr(worker, ptr, rsc_index, iface_attr, enable_amo); if (attr_len < 0) { return UCS_ERR_INVALID_ADDR; } ucp_address_memcheck(context, ptr, attr_len, rsc_index); if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) { iface_addr_len = iface_attr->iface_addr_len; } else { iface_addr_len = 0; } flags_ptr = ucp_address_iface_flags_ptr(worker, ptr, attr_len); ptr = UCS_PTR_BYTE_OFFSET(ptr, attr_len); ucs_assertv(iface_addr_len < UCP_ADDRESS_FLAG_HAVE_EP_ADDR, "iface_addr_len=%zu", iface_addr_len); /* Pack iface address */ ptr = ucp_address_pack_length(worker, ptr, iface_addr_len); if (flags & UCP_ADDRESS_PACK_FLAG_IFACE_ADDR) { status = uct_iface_get_address(wiface->iface, (uct_iface_addr_t*)ptr); if (status != UCS_OK) { return status; } ucp_address_memcheck(context, ptr, iface_addr_len, rsc_index); ptr = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len); } /* Pack ep address if present: iterate over all lanes which use the * current resource (rsc_index) and pack their addresses. The last * one is marked with UCP_ADDRESS_FLAG_LAST in its length field. */ num_ep_addrs = 0; if ((flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR) && ucp_worker_iface_is_tl_p2p(iface_attr)) { ucs_assert(ep != NULL); ep_addr_len = iface_attr->ep_addr_len; ep_flags_ptr = NULL; for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) { if (ucp_ep_get_rsc_index(ep, lane) != rsc_index) { continue; } /* pack ep address length and save pointer to flags */ ep_flags_ptr = ptr; ptr = ucp_address_pack_length(worker, ptr, ep_addr_len); /* pack ep address */ status = uct_ep_get_address(ep->uct_eps[lane], ptr); if (status != UCS_OK) { return status; } ucp_address_memcheck(context, ptr, ep_addr_len, rsc_index); ptr = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len); /* pack ep lane index */ remote_lane = (lanes2remote == NULL) ? lane : lanes2remote[lane]; *(uint8_t*)ptr = remote_lane; ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); ucs_trace("pack addr[%d].ep_addr[%d] : len %zu lane %d->%d", index, num_ep_addrs, ep_addr_len, lane, remote_lane); ++num_ep_addrs; } if (num_ep_addrs > 0) { ucs_assert(ep_flags_ptr != NULL); *(uint8_t*)flags_ptr |= UCP_ADDRESS_FLAG_HAVE_EP_ADDR; if (!ucp_worker_unified_mode(worker)) { *(uint8_t*)ep_flags_ptr |= UCP_ADDRESS_FLAG_LAST; } } } ucs_assert((num_ep_addrs > 0) || !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR)); if (flags & UCP_ADDRESS_PACK_FLAG_TRACE) { ucs_trace("pack addr[%d] : "UCT_TL_RESOURCE_DESC_FMT" " "eps %u md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e + %e/n ovh %e " "lat_ovh %e dev_priority %d a32 0x%lx/0x%lx a64 0x%lx/0x%lx", index, UCT_TL_RESOURCE_DESC_ARG(&context->tl_rscs[rsc_index].tl_rsc), num_ep_addrs, md_flags, iface_attr->cap.flags, iface_attr->bandwidth.dedicated, iface_attr->bandwidth.shared, iface_attr->overhead, iface_attr->latency.overhead, iface_attr->priority, iface_attr->cap.atomic32.op_flags, iface_attr->cap.atomic32.fop_flags, iface_attr->cap.atomic64.op_flags, iface_attr->cap.atomic64.fop_flags); } ++index; ucs_assert(index <= UCP_MAX_RESOURCES); } /* flags_ptr is a valid pointer to the flags set to the last entry * during the above loop So, set the LAST flag for the flags_ptr * from the last iteration */ if (flags_ptr != NULL) { ucs_assert(dev_tl_bitmap != 0); *(uint8_t*)flags_ptr |= UCP_ADDRESS_FLAG_LAST; } else { /* cppcheck-suppress internalAstError */ ucs_assert(dev_tl_bitmap == 0); } } out: ucs_assertv(UCS_PTR_BYTE_OFFSET(buffer, size) == ptr, "buffer=%p size=%zu ptr=%p ptr-buffer=%zd", buffer, size, ptr, UCS_PTR_BYTE_DIFF(buffer, ptr)); return UCS_OK; } ucs_status_t ucp_address_pack(ucp_worker_h worker, ucp_ep_h ep, uint64_t tl_bitmap, uint64_t flags, const ucp_lane_index_t *lanes2remote, size_t *size_p, void **buffer_p) { ucp_address_packed_device_t *devices; ucp_rsc_index_t num_devices; ucs_status_t status; void *buffer; size_t size; if (ep == NULL) { flags &= ~UCP_ADDRESS_PACK_FLAG_EP_ADDR; } /* Collect all devices we want to pack */ status = ucp_address_gather_devices(worker, ep, tl_bitmap, flags, &devices, &num_devices); if (status != UCS_OK) { goto out; } /* Calculate packed size */ size = ucp_address_packed_size(worker, devices, num_devices, flags); /* Allocate address */ buffer = ucs_malloc(size, "ucp_address"); if (buffer == NULL) { status = UCS_ERR_NO_MEMORY; goto out_free_devices; } memset(buffer, 0, size); /* Pack the address */ status = ucp_address_do_pack(worker, ep, buffer, size, tl_bitmap, flags, lanes2remote, devices, num_devices); if (status != UCS_OK) { ucs_free(buffer); goto out_free_devices; } VALGRIND_CHECK_MEM_IS_DEFINED(buffer, size); *size_p = size; *buffer_p = buffer; status = UCS_OK; out_free_devices: ucs_free(devices); out: return status; } ucs_status_t ucp_address_unpack(ucp_worker_t *worker, const void *buffer, uint64_t flags, ucp_unpacked_address_t *unpacked_address) { ucp_address_entry_t *address_list, *address; ucp_address_entry_ep_addr_t *ep_addr; int last_dev, last_tl, last_ep_addr; const uct_device_addr_t *dev_addr; ucp_rsc_index_t dev_index; ucp_rsc_index_t md_index; unsigned address_count; int empty_dev; uint64_t md_flags; size_t dev_addr_len; size_t iface_addr_len; size_t ep_addr_len; size_t attr_len; uint8_t md_byte; const void *ptr; const void *aptr; const void *flags_ptr; ptr = buffer; if (flags & UCP_ADDRESS_PACK_FLAG_WORKER_UUID) { unpacked_address->uuid = *(uint64_t*)ptr; ptr = UCS_PTR_TYPE_OFFSET(ptr, unpacked_address->uuid); } else { unpacked_address->uuid = 0; } aptr = ucp_address_unpack_worker_name(ptr, unpacked_address->name, sizeof(unpacked_address->name), flags); /* Count addresses */ ptr = aptr; address_count = 0; last_dev = (*(uint8_t*)ptr == UCP_NULL_RESOURCE); while (!last_dev) { /* md_index */ empty_dev = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_EMPTY; ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); /* device address length */ dev_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST; last_dev = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); ptr = UCS_PTR_BYTE_OFFSET(ptr, dev_addr_len); last_tl = empty_dev; while (!last_tl) { ptr = UCS_PTR_TYPE_OFFSET(ptr, uint16_t); /* tl_name_csum */ attr_len = ucp_address_iface_attr_size(worker); flags_ptr = ucp_address_iface_flags_ptr(worker, (void*)ptr, attr_len); ptr = UCS_PTR_BYTE_OFFSET(ptr, attr_len); ptr = ucp_address_unpack_length(worker, flags_ptr, ptr, &iface_addr_len, 0, &last_tl); ptr = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len); last_ep_addr = !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR); while (!last_ep_addr) { ptr = ucp_address_unpack_length(worker, flags_ptr, ptr, &ep_addr_len, 1, &last_ep_addr); ucs_assert(flags & UCP_ADDRESS_PACK_FLAG_EP_ADDR); ucs_assert(ep_addr_len > 0); ptr = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len); ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); } ++address_count; ucs_assert(address_count <= UCP_MAX_RESOURCES); } } if (address_count == 0) { address_list = NULL; goto out; } /* Allocate address list */ address_list = ucs_calloc(address_count, sizeof(*address_list), "ucp_address_list"); if (address_list == NULL) { ucs_error("failed to allocate address list"); return UCS_ERR_NO_MEMORY; } /* Unpack addresses */ address = address_list; ptr = aptr; dev_index = 0; do { /* md_index */ md_byte = (*(uint8_t*)ptr); md_index = md_byte & UCP_ADDRESS_FLAG_MD_MASK; md_flags = (md_byte & UCP_ADDRESS_FLAG_MD_ALLOC) ? UCT_MD_FLAG_ALLOC : 0; md_flags |= (md_byte & UCP_ADDRESS_FLAG_MD_REG) ? UCT_MD_FLAG_REG : 0; empty_dev = md_byte & UCP_ADDRESS_FLAG_EMPTY; ptr = UCS_PTR_TYPE_OFFSET(ptr, md_byte); /* device address length */ dev_addr_len = (*(uint8_t*)ptr) & ~UCP_ADDRESS_FLAG_LAST; last_dev = (*(uint8_t*)ptr) & UCP_ADDRESS_FLAG_LAST; ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); dev_addr = ptr; ptr = UCS_PTR_BYTE_OFFSET(ptr, dev_addr_len); last_tl = empty_dev; while (!last_tl) { /* tl_name_csum */ address->tl_name_csum = *(uint16_t*)ptr; ptr = UCS_PTR_TYPE_OFFSET(ptr, address->tl_name_csum); address->dev_addr = (dev_addr_len > 0) ? dev_addr : NULL; address->md_index = md_index; address->dev_index = dev_index; address->md_flags = md_flags; attr_len = ucp_address_unpack_iface_attr(worker, &address->iface_attr, ptr); flags_ptr = ucp_address_iface_flags_ptr(worker, (void*)ptr, attr_len); ptr = UCS_PTR_BYTE_OFFSET(ptr, attr_len); ptr = ucp_address_unpack_length(worker, flags_ptr, ptr, &iface_addr_len, 0, &last_tl); address->iface_addr = (iface_addr_len > 0) ? ptr : NULL; address->num_ep_addrs = 0; ptr = UCS_PTR_BYTE_OFFSET(ptr, iface_addr_len); last_ep_addr = !(*(uint8_t*)flags_ptr & UCP_ADDRESS_FLAG_HAVE_EP_ADDR); while (!last_ep_addr) { ucs_assert(address->num_ep_addrs < UCP_MAX_LANES); ep_addr = &address->ep_addrs[address->num_ep_addrs++]; ptr = ucp_address_unpack_length(worker, flags_ptr, ptr, &ep_addr_len, 1, &last_ep_addr); ep_addr->addr = ptr; ptr = UCS_PTR_BYTE_OFFSET(ptr, ep_addr_len); ep_addr->lane = *(uint8_t*)ptr; ptr = UCS_PTR_TYPE_OFFSET(ptr, uint8_t); } if (flags & UCP_ADDRESS_PACK_FLAG_TRACE) { ucs_trace("unpack addr[%d] : eps %u md_flags 0x%"PRIx64" tl_flags 0x%"PRIx64" bw %e + %e/n ovh %e " "lat_ovh %e dev_priority %d a32 0x%lx/0x%lx a64 0x%lx/0x%lx", (int)(address - address_list), address->num_ep_addrs, address->md_flags, address->iface_attr.cap_flags, address->iface_attr.bandwidth.dedicated, address->iface_attr.bandwidth.shared, address->iface_attr.overhead, address->iface_attr.lat_ovh, address->iface_attr.priority, address->iface_attr.atomic.atomic32.op_flags, address->iface_attr.atomic.atomic32.fop_flags, address->iface_attr.atomic.atomic64.op_flags, address->iface_attr.atomic.atomic64.fop_flags); } ++address; } ++dev_index; } while (!last_dev); ucs_assert((unsigned)(address - address_list) == address_count); out: unpacked_address->address_count = address_count; unpacked_address->address_list = address_list; return UCS_OK; }