Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifndef UCP_REQUEST_INL_
#define UCP_REQUEST_INL_

#include "ucp_request.h"
#include "ucp_worker.h"
#include "ucp_ep.inl"

#include <ucp/core/ucp_worker.h>
#include <ucp/dt/dt.h>
#include <ucs/profile/profile.h>
#include <ucs/datastruct/mpool.inl>
#include <ucp/dt/dt.inl>
#include <inttypes.h>


#define UCP_REQUEST_FLAGS_FMT \
    "%c%c%c%c%c%c%c"

#define UCP_REQUEST_FLAGS_ARG(_flags) \
    (((_flags) & UCP_REQUEST_FLAG_COMPLETED)       ? 'd' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_RELEASED)        ? 'f' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_EXPECTED)        ? 'e' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_LOCAL_COMPLETED) ? 'L' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_CALLBACK)        ? 'c' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_RECV)            ? 'r' : '-'), \
    (((_flags) & UCP_REQUEST_FLAG_SYNC)            ? 's' : '-')

#define UCP_RECV_DESC_FMT \
    "rdesc %p %c%c%c%c%c%c len %u+%u"

#define UCP_RECV_DESC_ARG(_rdesc) \
    (_rdesc), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_UCT_DESC)      ? 't' : '-'), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER)         ? 'e' : '-'), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_ONLY)    ? 'o' : '-'), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_SYNC)    ? 's' : '-'), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_EAGER_OFFLOAD) ? 'f' : '-'), \
    (((_rdesc)->flags & UCP_RECV_DESC_FLAG_RNDV)          ? 'r' : '-'), \
    (_rdesc)->payload_offset, \
    ((_rdesc)->length - (_rdesc)->payload_offset)


/* defined as a macro to print the call site */
#define ucp_request_get(_worker) \
    ({ \
        ucp_request_t *_req = ucs_mpool_get_inline(&(_worker)->req_mp); \
        if (_req != NULL) { \
            VALGRIND_MAKE_MEM_DEFINED(_req + 1, \
                                      (_worker)->context->config.request.size); \
            ucs_trace_req("allocated request %p", _req); \
            UCS_PROFILE_REQUEST_NEW(_req, "ucp_request", 0); \
        } \
        _req; \
    })

#define ucp_request_complete(_req, _cb, _status, ...) \
    { \
        (_req)->status = (_status); \
        if (ucs_likely((_req)->flags & UCP_REQUEST_FLAG_CALLBACK)) { \
            (_req)->_cb((_req) + 1, (_status), ## __VA_ARGS__); \
        } \
        if (ucs_unlikely(((_req)->flags  |= UCP_REQUEST_FLAG_COMPLETED) & \
                         UCP_REQUEST_FLAG_RELEASED)) { \
            ucp_request_put(_req); \
        } \
    }

#define ucp_request_set_callback(_req, _cb, _value) \
    { \
        (_req)->_cb    = _value; \
        (_req)->flags |= UCP_REQUEST_FLAG_CALLBACK; \
        ucs_trace_data("request %p %s set to %p", _req, #_cb, _value); \
    }


static UCS_F_ALWAYS_INLINE void
ucp_request_put(ucp_request_t *req)
{
    ucs_trace_req("put request %p", req);
    UCS_PROFILE_REQUEST_FREE(req);
    ucs_mpool_put_inline(req);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
{
    ucs_trace_req("completing send request %p (%p) "UCP_REQUEST_FLAGS_FMT" %s",
                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
                  ucs_status_string(status));
    UCS_PROFILE_REQUEST_EVENT(req, "complete_send", status);
    ucp_request_complete(req, send.cb, status);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_tag_recv(ucp_request_t *req, ucs_status_t status)
{
    ucs_trace_req("completing receive request %p (%p) "UCP_REQUEST_FLAGS_FMT
                  " stag 0x%"PRIx64" len %zu, %s",
                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
                  req->recv.tag.info.sender_tag, req->recv.tag.info.length,
                  ucs_status_string(status));
    UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
    ucp_request_complete(req, recv.tag.cb, status, &req->recv.tag.info);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_stream_recv(ucp_request_t *req, ucp_ep_ext_proto_t* ep_ext,
                                 ucs_status_t status)
{
    /* dequeue request before complete */
    ucp_request_t *check_req UCS_V_UNUSED =
            ucs_queue_pull_elem_non_empty(&ep_ext->stream.match_q, ucp_request_t,
                                          recv.queue);
    ucs_assert(check_req               == req);
    ucs_assert(req->recv.stream.offset >  0);

    req->recv.stream.length = req->recv.stream.offset;
    ucs_trace_req("completing stream receive request %p (%p) "
                  UCP_REQUEST_FLAGS_FMT" count %zu, %s",
                  req, req + 1, UCP_REQUEST_FLAGS_ARG(req->flags),
                  req->recv.stream.length, ucs_status_string(status));
    UCS_PROFILE_REQUEST_EVENT(req, "complete_recv", status);
    ucp_request_complete(req, recv.stream.cb, status, req->recv.stream.length);
}

static UCS_F_ALWAYS_INLINE int
ucp_request_can_complete_stream_recv(ucp_request_t *req)
{
    /* NOTE: first check is needed to avoid heavy "%" operation if request is
     *       completely filled */
    if (req->recv.stream.offset == req->recv.length) {
        return 1;
    }

    if (req->flags & UCP_REQUEST_FLAG_STREAM_RECV_WAITALL) {
        return 0;
    }

    /* 0-length stream recv is meaningless if this was not requested explicitely */
    if (req->recv.stream.offset == 0) {
        return 0;
    }

    if (ucs_likely(UCP_DT_IS_CONTIG(req->recv.datatype))) {
        return req->recv.stream.offset %
               ucp_contig_dt_elem_size(req->recv.datatype) == 0;
    }

    /* Currently, all data types except contig has granularity 1 byte */
    return 1;
}

/*
 * @return Whether completed.
 *         *req_status if filled with the completion status if completed.
 */
static int UCS_F_ALWAYS_INLINE
ucp_request_try_send(ucp_request_t *req, ucs_status_t *req_status,
                     unsigned pending_flags)
{
    ucs_status_t status;

    /* coverity wrongly resolves (*req).send.uct.func to test_uct_pending::pending_send_op_ok */
    /* coverity[address_free] */
    status = req->send.uct.func(&req->send.uct);
    if (status == UCS_OK) {
        /* Completed the operation */
        *req_status = UCS_OK;
        return 1;
    } else if (status == UCS_INPROGRESS) {
        /* Not completed, but made progress */
        return 0;
    } else if (status != UCS_ERR_NO_RESOURCE) {
        /* Unexpected error */
        ucp_request_complete_send(req, status);
        *req_status = status;
        return 1;
    }

    /* No send resources, try to add to pending queue */
    ucs_assert(status == UCS_ERR_NO_RESOURCE);
    return ucp_request_pending_add(req, req_status, pending_flags);
}

/**
 * Start sending a request.
 *
 * @param [in]  req             Request to start.
 * @param [in]  pending_flags   flags to be passed to UCT if request will be
 *                              added to pending queue.
 *
 * @return UCS_OK - completed (callback will not be called)
 *         UCS_INPROGRESS - started but not completed
 *         other error - failure
 */
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_send(ucp_request_t *req, unsigned pending_flags)
{
    ucs_status_t status = UCS_ERR_NOT_IMPLEMENTED;
    while (!ucp_request_try_send(req, &status, pending_flags));
    return status;
}

static UCS_F_ALWAYS_INLINE
void ucp_request_send_generic_dt_finish(ucp_request_t *req)
{
    ucp_dt_generic_t *dt;
    if (UCP_DT_IS_GENERIC(req->send.datatype)) {
        dt = ucp_dt_generic(req->send.datatype);
        ucs_assert(NULL != dt);
        dt->ops.finish(req->send.state.dt.dt.generic.state);
    }
}

static UCS_F_ALWAYS_INLINE
void ucp_request_recv_generic_dt_finish(ucp_request_t *req)
{
    ucp_dt_generic_t *dt;
    if (UCP_DT_IS_GENERIC(req->recv.datatype)) {
        dt = ucp_dt_generic(req->recv.datatype);
        ucs_assert(NULL != dt);
        dt->ops.finish(req->recv.state.dt.generic.state);
    }
}

static UCS_F_ALWAYS_INLINE void
ucp_request_send_state_init(ucp_request_t *req, ucp_datatype_t datatype,
                            size_t dt_count)
{
    ucp_dt_generic_t *dt_gen;
    void             *state_gen;

    VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.uct_comp.count,
                                sizeof(req->send.state.uct_comp.count));
    VALGRIND_MAKE_MEM_UNDEFINED(&req->send.state.dt.offset,
                                sizeof(req->send.state.dt.offset));

    req->send.state.uct_comp.func = NULL;

    switch (datatype & UCP_DATATYPE_CLASS_MASK) {
    case UCP_DATATYPE_CONTIG:
        req->send.state.dt.dt.contig.md_map     = 0;
        return;
    case UCP_DATATYPE_IOV:
        req->send.state.dt.dt.iov.iovcnt_offset = 0;
        req->send.state.dt.dt.iov.iov_offset    = 0;
        req->send.state.dt.dt.iov.iovcnt        = dt_count;
        req->send.state.dt.dt.iov.dt_reg        = NULL;
        return;
    case UCP_DATATYPE_GENERIC:
        dt_gen    = ucp_dt_generic(datatype);
        state_gen = dt_gen->ops.start_pack(dt_gen->context, req->send.buffer,
                                           dt_count);
        req->send.state.dt.dt.generic.state = state_gen;
        return;
    default:
        ucs_fatal("Invalid data type");
    }
}

static UCS_F_ALWAYS_INLINE void
ucp_request_send_state_reset(ucp_request_t *req,
                             uct_completion_callback_t comp_cb, unsigned proto)
{
    switch (proto) {
    case UCP_REQUEST_SEND_PROTO_RMA:
        ucs_assert(UCP_DT_IS_CONTIG(req->send.datatype));
        /* Fall through */
    case UCP_REQUEST_SEND_PROTO_RNDV_GET:
    case UCP_REQUEST_SEND_PROTO_RNDV_PUT:
    case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
        req->send.state.uct_comp.func       = comp_cb;
        req->send.state.uct_comp.count      = 0;
        /* Fall through */
    case UCP_REQUEST_SEND_PROTO_BCOPY_AM:
        req->send.state.dt.offset           = 0;
        break;
    default:
        ucs_fatal("unknown protocol");
    }
}

/**
 * Advance state of send request after UCT operation. This function applies
 * @a new_dt_state to @a req request according to @a proto protocol. Also, UCT
 * completion counter will be incremented if @a proto requires it.
 *
 * @param [inout]   req             Send request.
 * @param [in]      new_dt_state    State which was progressed by
 *                                  @ref ucp_dt_pack or @ref ucp_dt_iov_copy_uct.
 * @param [in]      proto           Internal UCP protocol identifier
 *                                  (UCP_REQUEST_SEND_PROTO_*)
 * @param [in]      status          Status of the last UCT operation which
 *                                  progressed @a proto protocol.
 */
static UCS_F_ALWAYS_INLINE void
ucp_request_send_state_advance(ucp_request_t *req,
                               const ucp_dt_state_t *new_dt_state,
                               unsigned proto, ucs_status_t status)
{
    if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
        /* Don't advance after failed operation in order to continue on next try
         * from last valid point.
         */
        return;
    }

    switch (proto) {
    case UCP_REQUEST_SEND_PROTO_RMA:
        if (status == UCS_INPROGRESS) {
            ++req->send.state.uct_comp.count;
        }
        break;
    case UCP_REQUEST_SEND_PROTO_ZCOPY_AM:
        /* Fall through */
    case UCP_REQUEST_SEND_PROTO_RNDV_GET:
    case UCP_REQUEST_SEND_PROTO_RNDV_PUT:
        if (status == UCS_INPROGRESS) {
            ++req->send.state.uct_comp.count;
        }
        /* Fall through */
    case UCP_REQUEST_SEND_PROTO_BCOPY_AM:
        ucs_assert(new_dt_state != NULL);
        if (UCP_DT_IS_CONTIG(req->send.datatype)) {
            req->send.state.dt.offset = new_dt_state->offset;
        } else {
            req->send.state.dt        = *new_dt_state;
        }
        break;
    default:
        ucs_fatal("unknown protocol");
    }

    /* offset is not used for RMA */
    ucs_assert((proto == UCP_REQUEST_SEND_PROTO_RMA) ||
               (req->send.state.dt.offset <= req->send.length));
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_send_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map,
                            unsigned uct_flags)
{
    return ucp_request_memory_reg(req->send.ep->worker->context, md_map,
                                  (void*)req->send.buffer, req->send.length,
                                  req->send.datatype, &req->send.state.dt,
                                  req->send.mem_type, req, uct_flags);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_send_buffer_reg_lane(ucp_request_t *req, ucp_lane_index_t lane,
                                 unsigned uct_flags)
{
    ucp_md_map_t md_map = UCS_BIT(ucp_ep_md_index(req->send.ep, lane));
    return ucp_request_send_buffer_reg(req, md_map, uct_flags);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_send_request_add_reg_lane(ucp_request_t *req, ucp_lane_index_t lane)
{
    /* add new lane to registration map */
    ucp_md_map_t md_map = UCS_BIT(ucp_ep_md_index(req->send.ep, lane)) |
                          req->send.state.dt.dt.contig.md_map;
    ucs_assert(ucs_popcount(md_map) <= UCP_MAX_OP_MDS);
    return ucp_request_send_buffer_reg(req, md_map, 0);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_recv_buffer_reg(ucp_request_t *req, ucp_md_map_t md_map,
                            size_t length)
{
    return ucp_request_memory_reg(req->recv.worker->context, md_map,
                                  req->recv.buffer, length,
                                  req->recv.datatype, &req->recv.state,
                                  req->recv.mem_type, req,
                                  UCT_MD_MEM_FLAG_HIDE_ERRORS);
}

static UCS_F_ALWAYS_INLINE void ucp_request_send_buffer_dereg(ucp_request_t *req)
{
    ucp_request_memory_dereg(req->send.ep->worker->context, req->send.datatype,
                             &req->send.state.dt, req);
}

static UCS_F_ALWAYS_INLINE void ucp_request_recv_buffer_dereg(ucp_request_t *req)
{
    ucp_request_memory_dereg(req->recv.worker->context, req->recv.datatype,
                             &req->recv.state, req);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_wait_uct_comp(ucp_request_t *req)
{
    while (req->send.state.uct_comp.count > 0) {
        ucp_worker_progress(req->send.ep->worker);
    }
}

static UCS_F_ALWAYS_INLINE void
ucp_request_unpack_contig(ucp_request_t *req, void *buf, const void *data,
                          size_t length)
{
    if (ucs_likely(UCP_MEM_IS_ACCESSIBLE_FROM_CPU(req->recv.mem_type))) {
        UCS_PROFILE_NAMED_CALL("memcpy_recv", ucs_memcpy_relaxed, buf,
                               data, length);
    } else {
        ucp_mem_type_unpack(req->recv.worker, buf, data, length,
                            req->recv.mem_type);
    }
}

/**
 * Unpack receive data to a request
 *
 * req - receive request
 * data - data to unpack
 * length -
 * offset - offset of received data within the request, for OOO fragments
 *
 *
 */
static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_request_recv_data_unpack(ucp_request_t *req, const void *data,
                             size_t length, size_t offset, int last)
{
    ucp_dt_generic_t *dt_gen;
    ucs_status_t status;

    ucs_assert(req->status == UCS_OK);

    ucp_trace_req(req, "unpack recv_data req_len %zu data_len %zu offset %zu last: %s",
                  req->recv.length, length, offset, last ? "yes" : "no");

    if (ucs_unlikely((length + offset) > req->recv.length)) {
        return ucp_request_recv_msg_truncated(req, length, offset);
    }

    switch (req->recv.datatype & UCP_DATATYPE_CLASS_MASK) {
    case UCP_DATATYPE_CONTIG:
        ucp_request_unpack_contig(req,
                                  UCS_PTR_BYTE_OFFSET(req->recv.buffer, offset),
                                  data, length);
        return UCS_OK;

    case UCP_DATATYPE_IOV:
        if (offset != req->recv.state.offset) {
            ucp_dt_iov_seek(req->recv.buffer, req->recv.state.dt.iov.iovcnt,
                            offset - req->recv.state.offset,
                            &req->recv.state.dt.iov.iov_offset,
                            &req->recv.state.dt.iov.iovcnt_offset);
            req->recv.state.offset = offset;
        }
        UCS_PROFILE_CALL(ucp_dt_iov_scatter, req->recv.buffer,
                         req->recv.state.dt.iov.iovcnt, data, length,
                         &req->recv.state.dt.iov.iov_offset,
                         &req->recv.state.dt.iov.iovcnt_offset);
        req->recv.state.offset += length;
        return UCS_OK;

    case UCP_DATATYPE_GENERIC:
        dt_gen = ucp_dt_generic(req->recv.datatype);
        status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack,
                                        req->recv.state.dt.generic.state,
                                        offset, data, length);
        if (last || (status != UCS_OK)) {
            UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
                                        req->recv.state.dt.generic.state);
        }
        return status;

    default:
        ucs_fatal("unexpected datatype=%lx", req->recv.datatype);
    }
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_recv_desc_init(ucp_worker_h worker, void *data, size_t length,
                   int data_offset, unsigned am_flags, uint16_t hdr_len,
                   uint16_t rdesc_flags, uint16_t priv_length,
                   ucp_recv_desc_t **rdesc_p)
{
    ucp_recv_desc_t *rdesc;
    void *data_hdr;
    ucs_status_t status;

    if (ucs_unlikely(am_flags & UCT_CB_PARAM_FLAG_DESC)) {
        /* slowpath */
        ucs_assert(priv_length <= UCP_WORKER_HEADROOM_PRIV_SIZE);
        data_hdr           = UCS_PTR_BYTE_OFFSET(data, -data_offset);
        rdesc              = (ucp_recv_desc_t *)data_hdr - 1;
        rdesc->flags       = rdesc_flags | UCP_RECV_DESC_FLAG_UCT_DESC;
        rdesc->priv_length = priv_length;
        status             = UCS_INPROGRESS;
    } else {
        rdesc = (ucp_recv_desc_t*)ucs_mpool_get_inline(&worker->am_mp);
        if (rdesc == NULL) {
            ucs_error("ucp recv descriptor is not allocated");
            return UCS_ERR_NO_MEMORY;
        }

        /* No need to initialize rdesc->priv_length here, because it is only
         * needed for releasing UCT descriptor. */

        rdesc->flags = rdesc_flags;
        status       = UCS_OK;
        memcpy(UCS_PTR_BYTE_OFFSET(rdesc + 1, data_offset), data, length);
    }

    rdesc->length         = length + data_offset;
    rdesc->payload_offset = hdr_len;
    *rdesc_p              = rdesc;
    return status;
}

static UCS_F_ALWAYS_INLINE void
ucp_recv_desc_release(ucp_recv_desc_t *rdesc)
{
    ucs_trace_req("release receive descriptor %p", rdesc);
    if (ucs_unlikely(rdesc->flags & UCP_RECV_DESC_FLAG_UCT_DESC)) {
        /* uct desc is slowpath */
        uct_iface_release_desc(UCS_PTR_BYTE_OFFSET(rdesc,
                                                   -(UCP_WORKER_HEADROOM_PRIV_SIZE -
                                                     rdesc->priv_length)));
    } else {
        ucs_mpool_put_inline(rdesc);
    }
}

static UCS_F_ALWAYS_INLINE ucp_lane_index_t
ucp_send_request_get_next_am_bw_lane(ucp_request_t *req)
{
    ucp_lane_index_t lane;

    /* at least one lane must be initialized */
    ucs_assert(ucp_ep_config(req->send.ep)->key.am_bw_lanes[0] != UCP_NULL_LANE);

    lane = (req->send.tag.am_bw_index >= UCP_MAX_LANES) ?
           UCP_NULL_LANE :
           ucp_ep_config(req->send.ep)->key.am_bw_lanes[req->send.tag.am_bw_index];
    if (lane != UCP_NULL_LANE) {
        req->send.tag.am_bw_index++;
        return lane;
    } else {
        req->send.tag.am_bw_index = 1;
        return ucp_ep_config(req->send.ep)->key.am_bw_lanes[0];
    }
}

static UCS_F_ALWAYS_INLINE uintptr_t ucp_request_get_dest_ep_ptr(ucp_request_t *req)
{
    /* This function may return 0, but in such cases the message should not be
     * sent at all because the am_lane would point to a wireup (proxy) endpoint.
     * So only the receiver side has an assertion that ep_ptr != 0.
     */
    return ucp_ep_dest_ep_ptr(req->send.ep);
}

#endif