Blob Blame History Raw
/**
 * Copyright (c) UT-Battelle, LLC. 2014-2017. ALL RIGHTS RESERVED.
 * Copyright (C) Mellanox Technologies Ltd. 2001-2014.  ALL RIGHTS RESERVED.
 * See file LICENSE for terms.
 */

#include "ugni_udt_iface.h"
#include "ugni_udt_ep.h"
#include <uct/ugni/base/ugni_device.h>
#include <uct/ugni/base/ugni_md.h>
#include <poll.h>


static ucs_config_field_t uct_ugni_udt_iface_config_table[] = {
    {"", "ALLOC=huge,thp,mmap,heap", NULL,
    ucs_offsetof(uct_ugni_iface_config_t, super),
    UCS_CONFIG_TYPE_TABLE(uct_iface_config_table)},

    UCT_IFACE_MPOOL_CONFIG_FIELDS("UDT", -1, 0, "udt",
                                  ucs_offsetof(uct_ugni_iface_config_t, mpool),
                                  "\nAttention: Setting this param with value != -1 is a dangerous thing\n"
                                  "and could cause deadlock or performance degradation."),

    {NULL}
};

static ucs_status_t processs_datagram(uct_ugni_udt_iface_t *iface, uct_ugni_udt_desc_t *desc)
{
    ucs_status_t status;
    uct_ugni_udt_header_t *header;
    void *payload;

    header = uct_ugni_udt_get_rheader(desc, iface);
    payload = uct_ugni_udt_get_rpayload(desc, iface);
    uct_iface_trace_am(&iface->super.super, UCT_AM_TRACE_TYPE_RECV,
                       header->am_id, payload, header->length, "RX: AM");
    status = uct_iface_invoke_am(&iface->super.super, header->am_id, payload,
                                 header->length, UCT_CB_PARAM_FLAG_DESC);
    return status;
}

static ucs_status_t recieve_datagram(uct_ugni_udt_iface_t *iface, uint64_t id, uct_ugni_udt_ep_t **ep_out)
{
    uint32_t rem_addr, rem_id;
    gni_post_state_t post_state;
    gni_return_t ugni_rc;
    uct_ugni_udt_ep_t *ep;
    gni_ep_handle_t gni_ep;
    uct_ugni_udt_desc_t *desc;
    uct_ugni_udt_header_t *header;

    ucs_trace_func("iface=%p, id=%lx", iface, id);

    if (UCT_UGNI_UDT_ANY == id) {
        ep = NULL;
        gni_ep = iface->ep_any;
        desc = iface->desc_any;
    } else {
        ep = ucs_derived_of(uct_ugni_iface_lookup_ep(&iface->super, id),
                            uct_ugni_udt_ep_t);
        gni_ep = ep->super.ep;
        desc = ep->posted_desc;
    }

    *ep_out = ep;
    uct_ugni_cdm_lock(&iface->super.cdm);
    ugni_rc = GNI_EpPostDataWaitById(gni_ep, id, -1, &post_state, &rem_addr, &rem_id);
    uct_ugni_cdm_unlock(&iface->super.cdm);
    if (ucs_unlikely(GNI_RC_SUCCESS != ugni_rc)) {
        ucs_error("GNI_EpPostDataWaitById, id=%lu Error status: %s %d",
                  id, gni_err_str[ugni_rc], ugni_rc);
        return UCS_ERR_IO_ERROR;
    }
    if (GNI_POST_TERMINATED == post_state) {
        return UCS_ERR_CANCELED;
    }

    if (GNI_POST_COMPLETED != post_state) {
        ucs_error("GNI_EpPostDataWaitById gave unexpected response: %u", post_state);
        return UCS_ERR_IO_ERROR;
    }

    if (UCT_UGNI_UDT_ANY != id) {
        --iface->super.outstanding;
    }

    header = uct_ugni_udt_get_rheader(desc, iface);

    ucs_trace("Got datagram id: %lu type: %i len: %i am_id: %i", id, header->type, header->length, header->am_id);

    if (UCT_UGNI_UDT_PAYLOAD != header->type) {
        /* ack message, no data */
        ucs_assert_always(NULL != ep);
        ucs_mpool_put(ep->posted_desc);
        uct_ugni_check_flush(ep->desc_flush_group);
        ep->posted_desc = NULL;
        return UCS_OK;
    }

    return UCS_INPROGRESS;
}

static void *uct_ugni_udt_device_thread(void *arg)
{
    uct_ugni_udt_iface_t *iface = (uct_ugni_udt_iface_t *)arg;
    gni_return_t ugni_rc;
    uint64_t id;

    while (1) {
        pthread_mutex_lock(&iface->device_lock);
        while (iface->events_ready) {
            pthread_cond_wait(&iface->device_condition, &iface->device_lock);
        }
        pthread_mutex_unlock(&iface->device_lock);
        ugni_rc = GNI_PostdataProbeWaitById(uct_ugni_udt_iface_nic_handle(iface),-1,&id);
        if (ucs_unlikely(GNI_RC_SUCCESS != ugni_rc)) {
            ucs_error("GNI_PostDataProbeWaitById, Error status: %s %d\n",
                      gni_err_str[ugni_rc], ugni_rc);
            continue;
        }
        if (ucs_unlikely(UCT_UGNI_UDT_CANCEL == id)) {
            /* When the iface is torn down, it will post and cancel a datagram with a
             * magic cookie as it's id that tells us to shut down.
             */
            break;
        }
        iface->events_ready = 1;
        ucs_trace("Recieved a new datagram");
        ucs_async_pipe_push(&iface->event_pipe);
    }

    return NULL;
}

unsigned uct_ugni_udt_progress(void *arg)
{
    uct_ugni_udt_iface_t * iface = (uct_ugni_udt_iface_t *)arg;

    uct_ugni_enter_async(&iface->super);
    ucs_arbiter_dispatch(&iface->super.arbiter, 1, uct_ugni_udt_ep_process_pending, NULL);
    uct_ugni_leave_async(&iface->super);
    return 0;
}

static void uct_ugni_udt_iface_release_desc(uct_recv_desc_t *self, void *desc)
{
    uct_ugni_udt_desc_t *ugni_desc;
    uct_ugni_udt_iface_t *iface = ucs_container_of(self, uct_ugni_udt_iface_t,
                                                   release_desc);

    ugni_desc = (uct_ugni_udt_desc_t *)((uct_recv_desc_t *)desc - 1);
    ucs_assert_always(NULL != ugni_desc);
    uct_ugni_udt_reset_desc(ugni_desc, iface);
    ucs_mpool_put(ugni_desc);
}

static ucs_status_t uct_ugni_udt_iface_query(uct_iface_h tl_iface, uct_iface_attr_t *iface_attr)
{
    uct_ugni_udt_iface_t *iface = ucs_derived_of(tl_iface, uct_ugni_udt_iface_t);

    uct_base_iface_query(&iface->super.super, iface_attr);

    iface_attr->cap.am.max_short       = iface->config.udt_seg_size -
                                         sizeof(uct_ugni_udt_header_t);
    iface_attr->cap.am.max_bcopy       = iface->config.udt_seg_size -
                                         sizeof(uct_ugni_udt_header_t);
    iface_attr->cap.am.opt_zcopy_align = 1;
    iface_attr->cap.am.align_mtu       = iface_attr->cap.am.opt_zcopy_align;
    iface_attr->device_addr_len        = sizeof(uct_devaddr_ugni_t);
    iface_attr->iface_addr_len         = sizeof(uct_sockaddr_ugni_t);
    iface_attr->ep_addr_len            = 0;
    iface_attr->max_conn_priv          = 0;
    iface_attr->cap.flags              = UCT_IFACE_FLAG_AM_SHORT |
                                         UCT_IFACE_FLAG_AM_BCOPY |
                                         UCT_IFACE_FLAG_CONNECT_TO_IFACE |
                                         UCT_IFACE_FLAG_PENDING |
                                         UCT_IFACE_FLAG_CB_ASYNC;

    iface_attr->overhead               = 1e-6;  /* 1 usec */
    iface_attr->latency.overhead       = 40e-6; /* 40 usec */
    iface_attr->latency.growth         = 0;
    iface_attr->bandwidth.dedicated    = pow(1024, 2); /* bytes */
    iface_attr->bandwidth.shared       = 0;
    iface_attr->priority               = 0;

    return UCS_OK;
}

void uct_ugni_proccess_datagram_pipe(int event_id, void *arg) {
    uct_ugni_udt_iface_t *iface = (uct_ugni_udt_iface_t *)arg;
    uct_ugni_udt_ep_t *ep;
    uct_ugni_udt_desc_t *datagram;
    ucs_status_t status;
    void *user_desc;
    gni_return_t ugni_rc;
    uint64_t id;

    ucs_trace_func("");

    uct_ugni_cdm_lock(&iface->super.cdm);
    ugni_rc = GNI_PostDataProbeById(uct_ugni_udt_iface_nic_handle(iface), &id);
    uct_ugni_cdm_unlock(&iface->super.cdm);
    while (GNI_RC_SUCCESS == ugni_rc) {
        status = recieve_datagram(iface, id, &ep);
        if (UCS_INPROGRESS == status) {
            if (ep != NULL){
                ucs_trace_data("Processing reply");
                datagram = ep->posted_desc;
                status = processs_datagram(iface, datagram);
                if (UCS_OK != status) {
                    user_desc = uct_ugni_udt_get_user_desc(datagram, iface);
                    uct_recv_desc(user_desc) = &iface->release_desc;
                } else {
                    ucs_mpool_put(datagram);
                }
                ep->posted_desc = NULL;
                uct_ugni_check_flush(ep->desc_flush_group);
            } else {
                ucs_trace_data("Processing wildcard");
                datagram = iface->desc_any;
                status = processs_datagram(iface, datagram);
                if (UCS_OK != status) {
                    UCT_TL_IFACE_GET_TX_DESC(&iface->super.super, &iface->free_desc,
                                             iface->desc_any, iface->desc_any=NULL);
                    user_desc = uct_ugni_udt_get_user_desc(datagram, iface);
                    uct_recv_desc(user_desc) = &iface->release_desc;
                }
                status = uct_ugni_udt_ep_any_post(iface);
                if (UCS_OK != status) {
                    /* We can't continue if we can't post the first receive */
                    ucs_error("Failed to post wildcard request");
                    return;
                }
            }
        }
        uct_ugni_cdm_lock(&iface->super.cdm);
        ugni_rc = GNI_PostDataProbeById(uct_ugni_udt_iface_nic_handle(iface), &id);
        uct_ugni_cdm_unlock(&iface->super.cdm);
    }

    ucs_async_pipe_drain(&iface->event_pipe);
    pthread_mutex_lock(&iface->device_lock);
    iface->events_ready = 0;
    pthread_mutex_unlock(&iface->device_lock);
    ucs_trace("Signaling device thread to resume monitoring");
    pthread_cond_signal(&iface->device_condition);

}

static void uct_ugni_udt_clean_wildcard(uct_ugni_udt_iface_t *iface)
{
    gni_return_t ugni_rc;
    uint32_t rem_addr, rem_id;
    gni_post_state_t post_state;
    uct_ugni_cdm_lock(&iface->super.cdm);
    ugni_rc = GNI_EpPostDataCancelById(iface->ep_any, UCT_UGNI_UDT_ANY);
    if (GNI_RC_SUCCESS != ugni_rc) {
        uct_ugni_cdm_unlock(&iface->super.cdm);
        ucs_error("GNI_EpPostDataCancel failed, Error status: %s %d",
                  gni_err_str[ugni_rc], ugni_rc);
        return;
    }
    ugni_rc = GNI_EpPostDataTestById(iface->ep_any, UCT_UGNI_UDT_ANY, &post_state, &rem_addr, &rem_id);
    if (GNI_RC_SUCCESS != ugni_rc) {
        if (GNI_RC_NO_MATCH != ugni_rc) {
            uct_ugni_cdm_unlock(&iface->super.cdm);
            ucs_error("GNI_EpPostDataTestById failed, Error status: %s %d",
                      gni_err_str[ugni_rc], ugni_rc);
            return;
        }
    } else {
        if (GNI_POST_PENDING == post_state) {
            ugni_rc = GNI_EpPostDataWaitById(iface->ep_any, UCT_UGNI_UDT_ANY, -1, &post_state, &rem_addr, &rem_id);
        }
    }
    ugni_rc = GNI_EpDestroy(iface->ep_any);
    if (GNI_RC_SUCCESS != ugni_rc) {
        ucs_error("GNI_EpDestroy failed, Error status: %s %d\n",
                  gni_err_str[ugni_rc], ugni_rc);
    }
    uct_ugni_cdm_unlock(&iface->super.cdm);
}

/* Before this function is called, you MUST
 * A) Deregister the datagram processing function from the async thread.
 * B) Cancel the wildcard datagram.
 * C) Drain all other messages from the queue.
 */
static inline void uct_ugni_udt_terminate_thread(uct_ugni_udt_iface_t *iface)
{
    gni_return_t ugni_rc;
    gni_ep_handle_t   ep;

    uct_ugni_cdm_lock(&iface->super.cdm);
    ugni_rc = GNI_EpCreate(uct_ugni_udt_iface_nic_handle(iface), iface->super.local_cq, &ep);
    if (GNI_RC_SUCCESS != ugni_rc) {
        uct_ugni_cdm_unlock(&iface->super.cdm);
        ucs_error("GNI_EpCreate, Error status: %s %d",
                  gni_err_str[ugni_rc], ugni_rc);
        return;
    }
    ugni_rc = GNI_EpBind(ep, iface->super.cdm.dev->address, iface->super.cdm.domain_id);
    if (GNI_RC_SUCCESS != ugni_rc) {
        GNI_EpDestroy(ep);
        uct_ugni_cdm_unlock(&iface->super.cdm);
        ucs_error("GNI_EpBind failed, Error status: %s %d",
                  gni_err_str[ugni_rc], ugni_rc);
        return;
    }
    ugni_rc = GNI_EpPostDataWId(ep,
                                NULL, 0,
                                NULL, 0,
                                UCT_UGNI_UDT_CANCEL);
    if (GNI_RC_SUCCESS != ugni_rc) {
        ucs_error("Couldn't send cancel message to UGNI interface! %s %d",
                  gni_err_str[ugni_rc], ugni_rc);
    }
    /* When the gni_ep is destroyed the above post will be canceled */
    ugni_rc = GNI_EpDestroy(ep);
    uct_ugni_cdm_unlock(&iface->super.cdm);
    if (GNI_RC_SUCCESS != ugni_rc) {
        ucs_error("GNI_EpDestroy failed, Error status: %s %d\n",
                  gni_err_str[ugni_rc], ugni_rc);
    }
}

static UCS_CLASS_CLEANUP_FUNC(uct_ugni_udt_iface_t)
{
    void *dummy;
    uct_ugni_enter_async(&self->super);
    uct_ugni_udt_clean_wildcard(self);
    ucs_async_remove_handler(ucs_async_pipe_rfd(&self->event_pipe),1);
    if (self->events_ready) {
        uct_ugni_proccess_datagram_pipe(ucs_async_pipe_rfd(&self->event_pipe),self);
    }
    uct_ugni_udt_terminate_thread(self);
    pthread_join(self->event_thread, &dummy);
    ucs_async_pipe_destroy(&self->event_pipe);
    ucs_mpool_put(self->desc_any);
    ucs_mpool_cleanup(&self->free_desc, 1);
    pthread_mutex_destroy(&self->device_lock);
    uct_ugni_leave_async(&self->super);
}

static UCS_CLASS_DEFINE_DELETE_FUNC(uct_ugni_udt_iface_t, uct_iface_t);

static uct_iface_ops_t uct_ugni_udt_iface_ops = {
    .ep_am_short              = uct_ugni_udt_ep_am_short,
    .ep_am_bcopy              = uct_ugni_udt_ep_am_bcopy,
    .ep_pending_add           = uct_ugni_udt_ep_pending_add,
    .ep_pending_purge         = uct_ugni_udt_ep_pending_purge,
    .ep_flush                 = uct_ugni_ep_flush,
    .ep_fence                 = uct_base_ep_fence,
    .ep_create                = UCS_CLASS_NEW_FUNC_NAME(uct_ugni_udt_ep_t),
    .ep_destroy               = UCS_CLASS_DELETE_FUNC_NAME(uct_ugni_udt_ep_t),
    .iface_flush              = uct_ugni_iface_flush,
    .iface_fence              = uct_base_iface_fence,
    .iface_progress_enable    = ucs_empty_function,
    .iface_progress_disable   = ucs_empty_function,
    .iface_progress        = (void*)uct_ugni_udt_progress,
    .iface_close              = UCS_CLASS_DELETE_FUNC_NAME(uct_ugni_udt_iface_t),
    .iface_query              = uct_ugni_udt_iface_query,
    .iface_get_address        = uct_ugni_iface_get_address,
    .iface_get_device_address = uct_ugni_iface_get_dev_address,
    .iface_is_reachable       = uct_ugni_iface_is_reachable
};

static ucs_mpool_ops_t uct_ugni_udt_desc_mpool_ops = {
    .chunk_alloc   = ucs_mpool_hugetlb_malloc,
    .chunk_release = ucs_mpool_hugetlb_free,
    .obj_init      = NULL,
    .obj_cleanup   = NULL
};

static UCS_CLASS_INIT_FUNC(uct_ugni_udt_iface_t, uct_md_h md, uct_worker_h worker,
                           const uct_iface_params_t *params,
                           const uct_iface_config_t *tl_config)
{
    uct_ugni_iface_config_t *config = ucs_derived_of(tl_config, uct_ugni_iface_config_t);
    ucs_status_t status;
    uct_ugni_udt_desc_t *desc;
    gni_return_t ugni_rc;
    int rc;

    UCS_CLASS_CALL_SUPER_INIT(uct_ugni_iface_t, md, worker, params,
                              &uct_ugni_udt_iface_ops,
                              &config->super UCS_STATS_ARG(NULL));

    /* Setting initial configuration */
    self->config.udt_seg_size = GNI_DATAGRAM_MAXSIZE;
    self->config.rx_headroom  = params->rx_headroom;
    self->release_desc.cb     = uct_ugni_udt_iface_release_desc;

    status = ucs_async_pipe_create(&self->event_pipe);
    if (UCS_OK != status) {
        ucs_error("Pipe creation failed");
        goto exit;
    }

    status = ucs_mpool_init(&self->free_desc,
                            0,
                            uct_ugni_udt_get_diff(self) + self->config.udt_seg_size * 2,
                            uct_ugni_udt_get_diff(self),
                            UCS_SYS_CACHE_LINE_SIZE,      /* alignment */
                            128,                          /* grow */
                            config->mpool.max_bufs,       /* max buffers */
                            &uct_ugni_udt_desc_mpool_ops,
                            "UGNI-UDT-DESC");

    if (UCS_OK != status) {
        ucs_error("Mpool creation failed");
        goto clean_pipe;
    }

    ugni_rc = GNI_EpCreate(uct_ugni_udt_iface_nic_handle(self), NULL, &self->ep_any);
    if (GNI_RC_SUCCESS != ugni_rc) {
        ucs_error("GNI_EpCreate failed, Error status: %s %d",
                  gni_err_str[ugni_rc], ugni_rc);
        status = UCS_ERR_NO_DEVICE;
        goto clean_free_desc;
    }

    UCT_TL_IFACE_GET_TX_DESC(&self->super.super, &self->free_desc,
                             desc, goto clean_ep);
    ucs_debug("First wildcard desc is %p", desc);

    /* Init any desc */
    self->desc_any = desc;
    status = uct_ugni_udt_ep_any_post(self);
    if (UCS_OK != status) {
        /* We can't continue if we can't post the first receive */
        ucs_error("Failed to post wildcard request");
        goto clean_any_desc;
    }

    status = ucs_async_set_event_handler(self->super.super.worker->async->mode,
                                         ucs_async_pipe_rfd(&self->event_pipe),
                                         UCS_EVENT_SET_EVREAD,
                                         uct_ugni_proccess_datagram_pipe,
                                         self, self->super.super.worker->async);
                                 
    if (UCS_OK != status) {
        goto clean_cancel_desc;
    }

    pthread_mutex_init(&self->device_lock, NULL);
    pthread_cond_init(&self->device_condition, NULL);
    self->events_ready = 0;

    rc = pthread_create(&self->event_thread, NULL, uct_ugni_udt_device_thread, self);
    if(0 != rc) {
        goto clean_remove_event;
    }

    return UCS_OK;

 clean_remove_event:
    ucs_async_pipe_destroy(&self->event_pipe);
 clean_cancel_desc:
    uct_ugni_udt_clean_wildcard(self);
 clean_any_desc:
    ucs_mpool_put(self->desc_any);
 clean_ep:
    ugni_rc = GNI_EpDestroy(self->ep_any);
    if (GNI_RC_SUCCESS != ugni_rc) {
        ucs_warn("GNI_EpDestroy failed, Error status: %s %d",
                 gni_err_str[ugni_rc], ugni_rc);
    }
 clean_free_desc:
    ucs_mpool_cleanup(&self->free_desc, 1);
 clean_pipe:
    ucs_async_pipe_destroy(&self->event_pipe);
 exit:
    uct_ugni_cleanup_base_iface(&self->super);
    ucs_error("Failed to activate interface");
    return status;
}

UCS_CLASS_DEFINE(uct_ugni_udt_iface_t, uct_ugni_iface_t);
UCS_CLASS_DEFINE_NEW_FUNC(uct_ugni_udt_iface_t, uct_iface_t, uct_md_h,
                          uct_worker_h, const uct_iface_params_t*,
                          const uct_iface_config_t*);

UCT_TL_DEFINE(&uct_ugni_component, ugni_udt, uct_ugni_query_devices,
              uct_ugni_udt_iface_t, "UGNI_UDT_",
              uct_ugni_udt_iface_config_table, uct_ugni_iface_config_t);