Blob Blame History Raw
/**
 * Copyright (C) Mellanox Technologies Ltd. 2001-2018.  ALL RIGHTS RESERVED.
 *
 * See file LICENSE for terms.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "rma.h"
#include "rma.inl"

#include <ucp/core/ucp_mm.h>

#include <ucp/dt/dt_contig.h>
#include <ucs/profile/profile.h>
#include <ucs/sys/stubs.h>


#define UCP_RMA_CHECK_BUFFER(_buffer, _action) \
    do { \
        if (ENABLE_PARAMS_CHECK && ucs_unlikely((_buffer) == NULL)) { \
            _action; \
        } \
    } while (0)


#define UCP_RMA_CHECK_ZERO_LENGTH(_length, _action) \
    do { \
        if ((_length) == 0) { \
            _action; \
        } \
    } while (0)


#define UCP_RMA_CHECK(_context, _buffer, _length) \
    do { \
        UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
                                        return UCS_ERR_INVALID_PARAM); \
        UCP_RMA_CHECK_ZERO_LENGTH(_length, return UCS_OK); \
        UCP_RMA_CHECK_BUFFER(_buffer, return UCS_ERR_INVALID_PARAM); \
    } while (0)


#define UCP_RMA_CHECK_PTR(_context, _buffer, _length) \
    do { \
        UCP_CONTEXT_CHECK_FEATURE_FLAGS(_context, UCP_FEATURE_RMA, \
                                        return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
        UCP_RMA_CHECK_ZERO_LENGTH(_length, return NULL); \
        UCP_RMA_CHECK_BUFFER(_buffer, \
                             return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM)); \
    } while (0)


/* request can be released if
 *  - all fragments were sent (length == 0) (bcopy & zcopy mix)
 *  - all zcopy fragments are done (uct_comp.count == 0)
 *  - and request was allocated from the mpool
 *    (checked in ucp_request_complete_send)
 *
 * Request can be released either immediately or in the completion callback.
 * We must check req length in the completion callback to avoid the following
 * scenario:
 *  partial_send;no_resos;progress;
 *  send_completed;cb called;req free(ooops);
 *  next_partial_send; (oops req already freed)
 */
ucs_status_t ucp_rma_request_advance(ucp_request_t *req, ssize_t frag_length,
                                     ucs_status_t status)
{
    ucs_assert(status != UCS_ERR_NOT_IMPLEMENTED);

    if (ucs_unlikely(UCS_STATUS_IS_ERR(status))) {
        if (status != UCS_ERR_NO_RESOURCE) {
            ucp_request_send_buffer_dereg(req);
            ucp_request_complete_send(req, status);
        }
        return status;
    }

    ucs_assert(frag_length >= 0);
    ucs_assert(req->send.length >= frag_length);
    req->send.length -= frag_length;
    if (req->send.length == 0) {
        /* bcopy is the fast path */
        if (ucs_likely(req->send.state.uct_comp.count == 0)) {
            ucp_request_send_buffer_dereg(req);
            ucp_request_complete_send(req, UCS_OK);
        }
        return UCS_OK;
    }
    req->send.buffer           = UCS_PTR_BYTE_OFFSET(req->send.buffer, frag_length);
    req->send.rma.remote_addr += frag_length;
    return UCS_INPROGRESS;
}

static void ucp_rma_request_bcopy_completion(uct_completion_t *self,
                                             ucs_status_t status)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t,
                                          send.state.uct_comp);

    if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
        ucp_request_complete_send(req, status);
    }
}

static void ucp_rma_request_zcopy_completion(uct_completion_t *self,
                                             ucs_status_t status)
{
    ucp_request_t *req = ucs_container_of(self, ucp_request_t,
                                          send.state.uct_comp);

    if (ucs_likely(req->send.length == req->send.state.dt.offset)) {
        ucp_request_send_buffer_dereg(req);
        ucp_request_complete_send(req, status);
    }
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_rma_request_init(ucp_request_t *req, ucp_ep_h ep, const void *buffer,
                     size_t length, uint64_t remote_addr, ucp_rkey_h rkey,
                     uct_pending_callback_t cb, size_t zcopy_thresh, int flags)
{
    req->flags                = flags; /* Implicit release */
    req->send.ep              = ep;
    req->send.buffer          = (void*)buffer;
    req->send.datatype        = ucp_dt_make_contig(1);
    req->send.mem_type        = UCS_MEMORY_TYPE_HOST;
    req->send.length          = length;
    req->send.rma.remote_addr = remote_addr;
    req->send.rma.rkey        = rkey;
    req->send.uct.func        = cb;
    req->send.lane            = rkey->cache.rma_lane;
    ucp_request_send_state_init(req, ucp_dt_make_contig(1), length);
    ucp_request_send_state_reset(req,
                                 (length < zcopy_thresh) ?
                                 ucp_rma_request_bcopy_completion :
                                 ucp_rma_request_zcopy_completion,
                                 UCP_REQUEST_SEND_PROTO_RMA);
#if UCS_ENABLE_ASSERT
    req->send.cb              = NULL;
#endif
    if (length < zcopy_thresh) {
        return UCS_OK;
    }

    return ucp_request_send_buffer_reg_lane(req, req->send.lane, 0);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_rma_nonblocking(ucp_ep_h ep, const void *buffer, size_t length,
                    uint64_t remote_addr, ucp_rkey_h rkey,
                    uct_pending_callback_t progress_cb, size_t zcopy_thresh)
{
    ucs_status_t status;
    ucp_request_t *req;

    req = ucp_request_get(ep->worker);
    if (req == NULL) {
        return UCS_ERR_NO_MEMORY;
    }

    status = ucp_rma_request_init(req, ep, buffer, length, remote_addr, rkey,
                                  progress_cb, zcopy_thresh,
                                  UCP_REQUEST_FLAG_RELEASED);
    if (ucs_unlikely(status != UCS_OK)) {
        return status;
    }

    return ucp_request_send(req, 0);
}

static UCS_F_ALWAYS_INLINE ucs_status_ptr_t
ucp_rma_nonblocking_cb(ucp_ep_h ep, const void *buffer, size_t length,
                       uint64_t remote_addr, ucp_rkey_h rkey,
                       uct_pending_callback_t progress_cb, size_t zcopy_thresh,
                       ucp_send_callback_t cb)
{
    ucs_status_t status;
    ucp_request_t *req;

    req = ucp_request_get(ep->worker);
    if (req == NULL) {
        return UCS_STATUS_PTR(UCS_ERR_NO_MEMORY);
    }

    status = ucp_rma_request_init(req, ep, buffer, length, remote_addr, rkey,
                                  progress_cb, zcopy_thresh, 0);
    if (ucs_unlikely(status != UCS_OK)) {
        return UCS_STATUS_PTR(status);
    }

    return ucp_rma_send_request_cb(req, cb);
}

ucs_status_t ucp_put_nbi(ucp_ep_h ep, const void *buffer, size_t length,
                         uint64_t remote_addr, ucp_rkey_h rkey)
{
    ucp_ep_rma_config_t *rma_config;
    ucs_status_t status;

    UCP_RMA_CHECK(ep->worker->context, buffer, length);

    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

    ucs_trace_req("put_nbi buffer %p length %zu remote_addr %"PRIx64" rkey %p to %s",
                   buffer, length, remote_addr, rkey, ucp_ep_peer_name(ep));

    status = UCP_RKEY_RESOLVE(rkey, ep, rma);
    if (status != UCS_OK) {
        goto out_unlock;
    }

    /* Fast path for a single short message */
    if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
        status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
                                  buffer, length, remote_addr, rkey->cache.rma_rkey);
        if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
            goto out_unlock;
        }
    }

    rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
    status = ucp_rma_nonblocking(ep, buffer, length, remote_addr, rkey,
                                 rkey->cache.rma_proto->progress_put,
                                 rma_config->put_zcopy_thresh);
out_unlock:
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
    return status;
}

ucs_status_ptr_t ucp_put_nb(ucp_ep_h ep, const void *buffer, size_t length,
                            uint64_t remote_addr, ucp_rkey_h rkey,
                            ucp_send_callback_t cb)
{
    ucp_ep_rma_config_t *rma_config;
    ucs_status_ptr_t ptr_status;
    ucs_status_t status;

    UCP_RMA_CHECK_PTR(ep->worker->context, buffer, length);
    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

    ucs_trace_req("put_nb buffer %p length %zu remote_addr %"PRIx64" rkey %p to %s cb %p",
                   buffer, length, remote_addr, rkey, ucp_ep_peer_name(ep), cb);

    status = UCP_RKEY_RESOLVE(rkey, ep, rma);
    if (status != UCS_OK) {
        ptr_status = UCS_STATUS_PTR(status);
        goto out_unlock;
    }

    /* Fast path for a single short message */
    if (ucs_likely((ssize_t)length <= (int)rkey->cache.max_put_short)) {
        status = UCS_PROFILE_CALL(uct_ep_put_short, ep->uct_eps[rkey->cache.rma_lane],
                                  buffer, length, remote_addr, rkey->cache.rma_rkey);
        if (ucs_likely(status != UCS_ERR_NO_RESOURCE)) {
            ptr_status = UCS_STATUS_PTR(status);
            goto out_unlock;
        }
    }

    rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
    ptr_status = ucp_rma_nonblocking_cb(ep, buffer, length, remote_addr, rkey,
                                        rkey->cache.rma_proto->progress_put,
                                        rma_config->put_zcopy_thresh, cb);
out_unlock:
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
    return ptr_status;
}

ucs_status_t ucp_get_nbi(ucp_ep_h ep, void *buffer, size_t length,
                         uint64_t remote_addr, ucp_rkey_h rkey)
{
    ucp_ep_rma_config_t *rma_config;
    ucs_status_t status;

    UCP_RMA_CHECK(ep->worker->context, buffer, length);
    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

    ucs_trace_req("get_nbi buffer %p length %zu remote_addr %"PRIx64" rkey %p from %s",
                   buffer, length, remote_addr, rkey, ucp_ep_peer_name(ep));

    status = UCP_RKEY_RESOLVE(rkey, ep, rma);
    if (status != UCS_OK) {
        goto out_unlock;
    }

    rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
    status = ucp_rma_nonblocking(ep, buffer, length, remote_addr, rkey,
                                 rkey->cache.rma_proto->progress_get,
                                 rma_config->get_zcopy_thresh);
out_unlock:
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
    return status;
}

ucs_status_ptr_t ucp_get_nb(ucp_ep_h ep, void *buffer, size_t length,
                            uint64_t remote_addr, ucp_rkey_h rkey,
                            ucp_send_callback_t cb)
{
    ucp_ep_rma_config_t *rma_config;
    ucs_status_ptr_t ptr_status;
    ucs_status_t status;

    UCP_RMA_CHECK_PTR(ep->worker->context, buffer, length);
    UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

    ucs_trace_req("get_nb buffer %p length %zu remote_addr %"PRIx64" rkey %p from %s cb %p",
                   buffer, length, remote_addr, rkey, ucp_ep_peer_name(ep), cb);

    status = UCP_RKEY_RESOLVE(rkey, ep, rma);
    if (status != UCS_OK) {
        ptr_status = UCS_STATUS_PTR(status);
        goto out_unlock;
    }

    rma_config = &ucp_ep_config(ep)->rma[rkey->cache.rma_lane];
    ptr_status = ucp_rma_nonblocking_cb(ep, buffer, length, remote_addr, rkey,
                                        rkey->cache.rma_proto->progress_get,
                                        rma_config->get_zcopy_thresh, cb);
out_unlock:
    UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
    return ptr_status;
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_put, (ep, buffer, length, remote_addr, rkey),
                 ucp_ep_h ep, const void *buffer, size_t length,
                 uint64_t remote_addr, ucp_rkey_h rkey)
{
    return ucp_rma_wait(ep->worker,
                        ucp_put_nb(ep, buffer, length, remote_addr, rkey,
                                   (ucp_send_callback_t)ucs_empty_function),
                        "put");
}

UCS_PROFILE_FUNC(ucs_status_t, ucp_get, (ep, buffer, length, remote_addr, rkey),
                 ucp_ep_h ep, void *buffer, size_t length,
                 uint64_t remote_addr, ucp_rkey_h rkey)
{
    return ucp_rma_wait(ep->worker,
                        ucp_get_nb(ep, buffer, length, remote_addr, rkey,
                                   (ucp_send_callback_t)ucs_empty_function),
                        "get");
}