Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2016.  ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "proto_am.inl"

#include <ucp/tag/offload.h>


static inline size_t ucp_proto_max_packed_size()
{
    return ucs_max(sizeof(ucp_reply_hdr_t),
                   sizeof(ucp_offload_ssend_hdr_t));
}

static size_t ucp_proto_pack(void *dest, void *arg)
{
    ucp_request_t *req = arg;
    ucp_reply_hdr_t *rep_hdr;
    ucp_offload_ssend_hdr_t *off_rep_hdr;

    switch (req->send.proto.am_id) {
    case UCP_AM_ID_EAGER_SYNC_ACK:
    case UCP_AM_ID_RNDV_ATS:
    case UCP_AM_ID_RNDV_ATP:
        rep_hdr = dest;
        rep_hdr->reqptr = req->send.proto.remote_request;
        rep_hdr->status = req->send.proto.status;
        return sizeof(*rep_hdr);
    case UCP_AM_ID_OFFLOAD_SYNC_ACK:
        off_rep_hdr = dest;
        off_rep_hdr->sender_tag = req->send.proto.sender_tag;
        off_rep_hdr->ep_ptr     = ucp_request_get_dest_ep_ptr(req);
        return sizeof(*off_rep_hdr);
    }

    ucs_fatal("unexpected am_id");
    return 0;
}

ucs_status_t
ucp_do_am_single(uct_pending_req_t *self, uint8_t am_id,
                 uct_pack_callback_t pack_cb, ssize_t max_packed_size)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
    ucp_ep_t *ep       = req->send.ep;
    ssize_t packed_len;
    uint64_t *buffer;

    /* if packed data can fit short active message, use it, because it should
     * be faster than bcopy.
     */
    if ((max_packed_size <= UCS_ALLOCA_MAX_SIZE) &&
        (max_packed_size <= ucp_ep_config(ep)->am.max_short)) {
        req->send.lane = ucp_ep_get_am_lane(ep);
        buffer         = ucs_alloca(max_packed_size);
        packed_len     = pack_cb(buffer, req);
        ucs_assertv((packed_len >= 0) && (packed_len <= max_packed_size),
                    "packed_len=%zd max_packed_size=%zu", packed_len,
                    max_packed_size);

        return uct_ep_am_short(ep->uct_eps[req->send.lane], am_id, buffer[0],
                               &buffer[1], packed_len - sizeof(uint64_t));
    } else {
        return ucp_do_am_bcopy_single(self, am_id, pack_cb);
    }
}

ucs_status_t ucp_proto_progress_am_single(uct_pending_req_t *self)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct);
    ucs_status_t status = ucp_do_am_single(self, req->send.proto.am_id,
                                           ucp_proto_pack,
                                           ucp_proto_max_packed_size());
    if (status == UCS_OK) {
        req->send.proto.comp_cb(req);
    }
    return status;
}

void ucp_proto_am_zcopy_req_complete(ucp_request_t *req, ucs_status_t status)
{
    ucs_assert(req->send.state.uct_comp.count == 0);
    ucp_request_send_buffer_dereg(req); /* TODO register+lane change */
    ucp_request_complete_send(req, status);
}

void ucp_proto_am_zcopy_completion(uct_completion_t *self,
                                    ucs_status_t status)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t,
                                          send.state.uct_comp);
    if (req->send.state.dt.offset == req->send.length) {
        ucp_proto_am_zcopy_req_complete(req, status);
    } else if (status != UCS_OK) {
        ucs_assert(req->send.state.uct_comp.count == 0);
        ucs_assert(status != UCS_INPROGRESS);

        /* NOTE: the request is in pending queue if data was not completely sent,
         *       just dereg the buffer here and complete request on purge
         *       pending later.
         */
        ucp_request_send_buffer_dereg(req);
        req->send.state.uct_comp.func = NULL;
    }
}