Blob Blame History Raw
/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2011 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#include "utlist.h"

/* A random guess at an appropriate value, we can tune it later.  It could also
 * be a real tunable parameter. */
#define MPIDU_SCHED_INITIAL_ENTRIES (16)

/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===

cvars:
    - name        : MPIR_CVAR_COLL_SCHED_DUMP
      category    : COLLECTIVE
      type        : boolean
      default     : false
      class       : device
      verbosity   : MPI_T_VERBOSITY_USER_BASIC
      scope       : MPI_T_SCOPE_ALL_EQ
      description : >-
        Print schedule data for nonblocking collective operations.

=== END_MPI_T_CVAR_INFO_BLOCK ===
*/

static const char *entry_to_str(enum MPIDU_Sched_entry_type type)
{
    switch (type) {
        case MPIDU_SCHED_ENTRY_SEND:
            return "SEND";
        case MPIDU_SCHED_ENTRY_RECV:
            return "RECV";
        case MPIDU_SCHED_ENTRY_REDUCE:
            return "REDUCE";
        case MPIDU_SCHED_ENTRY_COPY:
            return "COPY";
        case MPIDU_SCHED_ENTRY_NOP:
            return "NOP";
        case MPIDU_SCHED_ENTRY_CB:
            return "CB";
        default:
            return "(out of range)";
    }
}

/* utility function for debugging, dumps the given schedule object to fh */
static void sched_dump(struct MPIDU_Sched *s, FILE * fh)
{
    int i;

    fprintf(fh, "--------------------------------\n");
    fprintf(fh, "s=%p\n", s);
    if (s) {
        fprintf(fh, "s->size=%zd\n", s->size);
        fprintf(fh, "s->idx=%zd\n", s->idx);
        fprintf(fh, "s->num_entries=%d\n", s->num_entries);
        fprintf(fh, "s->tag=%d\n", s->tag);
        fprintf(fh, "s->req=%p\n", s->req);
        fprintf(fh, "s->entries=%p\n", s->entries);
        for (i = 0; i < s->num_entries; ++i) {
            fprintf(fh, "&s->entries[%d]=%p\n", i, &s->entries[i]);
            fprintf(fh, "s->entries[%d].type=%s\n", i, entry_to_str(s->entries[i].type));
            fprintf(fh, "s->entries[%d].status=%d\n", i, s->entries[i].status);
            fprintf(fh, "s->entries[%d].is_barrier=%s\n", i,
                    (s->entries[i].is_barrier ? "TRUE" : "FALSE"));
        }
    }
    fprintf(fh, "--------------------------------\n");
    /*
     * fprintf(fh, "s->next=%p\n", s->next);
     * fprintf(fh, "s->prev=%p\n", s->prev);
     */
}

struct MPIDU_Sched_state {
    struct MPIDU_Sched *head;
    /* no need for a tail with utlist */
};

/* holds on to all incomplete schedules on which progress should be made */
struct MPIDU_Sched_state all_schedules = { NULL };

/* returns TRUE if any schedules are currently pending completion by the
 * progress engine, FALSE otherwise */
#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_are_pending
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_are_pending(void)
{
    return (all_schedules.head != NULL);
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_next_tag
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_next_tag(MPIR_Comm * comm_ptr, int *tag)
{
    int mpi_errno = MPI_SUCCESS;
    /* TODO there should be an internal accessor/utility macro for getting the
     * TAG_UB value that doesn't require using the attribute interface */
    int tag_ub = MPIR_Process.attrs.tag_ub;
#if defined(HAVE_ERROR_CHECKING)
    int start = MPI_UNDEFINED;
    int end = MPI_UNDEFINED;
    struct MPIDU_Sched *elt = NULL;
#endif
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_NEXT_TAG);
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_NEXT_TAG);

    *tag = comm_ptr->next_sched_tag;
    ++comm_ptr->next_sched_tag;

#if defined(HAVE_ERROR_CHECKING)
    /* Upon entry into the second half of the tag space, ensure there are no
     * outstanding schedules still using the second half of the space.  Check
     * the first half similarly on wraparound. */
    if (comm_ptr->next_sched_tag == (tag_ub / 2)) {
        start = tag_ub / 2;
        end = tag_ub;
    } else if (comm_ptr->next_sched_tag == (tag_ub)) {
        start = MPIR_FIRST_NBC_TAG;
        end = tag_ub / 2;
    }
    if (start != MPI_UNDEFINED) {
        DL_FOREACH(all_schedules.head, elt) {
            if (elt->tag >= start && elt->tag < end) {
                MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**toomanynbc");
            }
        }
    }
#endif

    /* wrap the tag values around to the start, but don't allow it to conflict
     * with the tags used by the blocking collectives */
    if (comm_ptr->next_sched_tag == tag_ub) {
        comm_ptr->next_sched_tag = MPIR_FIRST_NBC_TAG;
    }
#if defined(HAVE_ERROR_CHECKING)
  fn_fail:
#endif
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_NEXT_TAG);
    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_start_entry
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* initiates the schedule entry "e" in the NBC described by "s", where
 * "e" is at "idx" in "s".  This means posting nonblocking sends/recvs,
 * performing reductions, calling callbacks, etc. */
static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPIDU_Sched_entry *e)
{
    int mpi_errno = MPI_SUCCESS, ret_errno = MPI_SUCCESS;
    MPIR_Request *r = s->req;
    MPIR_Comm *comm;

    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_START_ENTRY);
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_START_ENTRY);

    MPIR_Assert(e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED);

    switch (e->type) {
        case MPIDU_SCHED_ENTRY_SEND:
            comm = e->u.send.comm;
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting SEND entry %d\n", (int) idx);
            if (e->u.send.count_p) {
                /* deferred send */
                /* originally there was no branch and send.count_p was set to
                 * &send.count, but this requires patching up the pointers
                 * during realloc of entries, so this is easier */
                ret_errno = MPIC_Isend(e->u.send.buf, *e->u.send.count_p, e->u.send.datatype,
                                       e->u.send.dest, s->tag, comm, &e->u.send.sreq,
                                       &r->u.nbc.errflag);
            } else {
                if (e->u.send.is_sync) {
                    ret_errno = MPIC_Issend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
                                            e->u.send.dest, s->tag, comm, &e->u.send.sreq,
                                            &r->u.nbc.errflag);
                } else {
                    ret_errno = MPIC_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
                                           e->u.send.dest, s->tag, comm, &e->u.send.sreq,
                                           &r->u.nbc.errflag);
                }
            }
            /* Check if the error is actually fatal to the NBC or we can continue. */
            if (unlikely(ret_errno)) {
                if (MPIR_ERR_NONE == r->u.nbc.errflag) {
                    if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
                        r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
                    } else {
                        r->u.nbc.errflag = MPIR_ERR_OTHER;
                    }
                }
                e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Sched SEND failed. Errflag: %d\n",
                              (int) r->u.nbc.errflag);
            } else {
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
            }
            break;
        case MPIDU_SCHED_ENTRY_RECV:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting RECV entry %d\n", (int) idx);
            comm = e->u.recv.comm;
            ret_errno = MPIC_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype,
                                   e->u.recv.src, s->tag, comm, &e->u.recv.rreq);
            /* Check if the error is actually fatal to the NBC or we can continue. */
            if (unlikely(ret_errno)) {
                if (MPIR_ERR_NONE == r->u.nbc.errflag) {
                    if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
                        r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
                    } else {
                        r->u.nbc.errflag = MPIR_ERR_OTHER;
                    }
                }
                /* We should set the status to failed here - since the request is not freed. this
                 * will be handled later in MPIDU_Sched_progress_state, so set to started here */
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Sched RECV failed. Errflag: %d\n",
                              (int) r->u.nbc.errflag);
            } else {
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
            }
            break;
        case MPIDU_SCHED_ENTRY_REDUCE:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting REDUCE entry %d\n", (int) idx);
            mpi_errno =
                MPIR_Reduce_local(e->u.reduce.inbuf, e->u.reduce.inoutbuf, e->u.reduce.count,
                                  e->u.reduce.datatype, e->u.reduce.op);
            if (mpi_errno)
                MPIR_ERR_POP(mpi_errno);
            MPIR_Op_release_if_not_builtin(e->u.reduce.op);
            MPIR_Datatype_release_if_not_builtin(e->u.reduce.datatype);
            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
            break;
        case MPIDU_SCHED_ENTRY_COPY:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting COPY entry %d\n", (int) idx);
            mpi_errno = MPIR_Localcopy(e->u.copy.inbuf, e->u.copy.incount, e->u.copy.intype,
                                       e->u.copy.outbuf, e->u.copy.outcount, e->u.copy.outtype);
            if (mpi_errno)
                MPIR_ERR_POP(mpi_errno);
            MPIR_Datatype_release_if_not_builtin(e->u.copy.intype);
            MPIR_Datatype_release_if_not_builtin(e->u.copy.outtype);
            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
            break;
        case MPIDU_SCHED_ENTRY_NOP:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting NOOP entry %d\n", (int) idx);
            /* nothing to be done */
            break;
        case MPIDU_SCHED_ENTRY_CB:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting CB entry %d\n", (int) idx);
            if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_1) {
                ret_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state);
                /* Sched entries list can be reallocated inside callback */
                e = &s->entries[idx];
                if (unlikely(ret_errno)) {
                    if (MPIR_ERR_NONE == r->u.nbc.errflag) {
                        if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
                            r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
                        } else {
                            r->u.nbc.errflag = MPIR_ERR_OTHER;
                        }
                    }
                    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
                } else {
                    e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
                }
            } else if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_2) {
                ret_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2);
                /* Sched entries list can be reallocated inside callback */
                e = &s->entries[idx];
                if (unlikely(ret_errno)) {
                    if (MPIR_ERR_NONE == r->u.nbc.errflag) {
                        if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
                            r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
                        } else {
                            r->u.nbc.errflag = MPIR_ERR_OTHER;
                        }
                    }
                    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
                } else {
                    e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
                }
            } else {
                MPL_DBG_MSG_D(MPIR_DBG_COMM, TYPICAL, "unknown callback type, e->u.cb.cb_type=%d",
                              e->u.cb.cb_type);
                e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
            }

            break;
        default:
            MPL_DBG_MSG_D(MPIR_DBG_COMM, TYPICAL, "unknown entry type, e->type=%d", e->type);
            break;
    }

  fn_exit:
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_START_ENTRY);
    return mpi_errno;
  fn_fail:
    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
    if (r)
        r->status.MPI_ERROR = mpi_errno;
    goto fn_exit;
}

/* Posts or performs any NOT_STARTED operations in the given schedule that are
 * permitted to be started.  That is, this routine will respect schedule
 * barriers appropriately. */
#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_continue
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
static int MPIDU_Sched_continue(struct MPIDU_Sched *s)
{
    int mpi_errno = MPI_SUCCESS;
    size_t i;

    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_CONTINUE);
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_CONTINUE);

    for (i = s->idx; i < s->num_entries; ++i) {
        struct MPIDU_Sched_entry *e = &s->entries[i];

        if (e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED) {
            mpi_errno = MPIDU_Sched_start_entry(s, i, e);
            /* Sched entries list can be reallocated inside callback */
            e = &s->entries[i];
            if (mpi_errno)
                MPIR_ERR_POP(mpi_errno);
        }

        /* _start_entry may have completed the operation, but won't update s->idx */
        if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
            ++s->idx;   /* this is valid even for barrier entries */
        }

        /* watch the indexing, s->idx might have been incremented above, so
         * ||-short-circuit matters here */
        if (e->is_barrier && (e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i + 1))) {
            /* we've hit a barrier but outstanding operations before this
             * barrier remain, so we cannot proceed past the barrier */
            break;
        }
    }
  fn_exit:
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_CONTINUE);
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_create
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* creates a new opaque schedule object and returns a handle to it in (*sp) */
int MPIDU_Sched_create(MPIR_Sched_t * sp)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched *s;
    MPIR_CHKPMEM_DECL(2);

    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_CREATE);
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_CREATE);

    *sp = NULL;

    /* this mem will be freed by the progress engine when the request is completed */
    MPIR_CHKPMEM_MALLOC(s, struct MPIDU_Sched *, sizeof(struct MPIDU_Sched), mpi_errno,
                        "schedule object", MPL_MEM_COMM);

    s->size = MPIDU_SCHED_INITIAL_ENTRIES;
    s->idx = 0;
    s->num_entries = 0;
    s->tag = -1;
    s->req = NULL;
    s->entries = NULL;
    s->next = NULL;     /* only needed for sanity checks */
    s->prev = NULL;     /* only needed for sanity checks */

    /* this mem will be freed by the progress engine when the request is completed */
    MPIR_CHKPMEM_MALLOC(s->entries, struct MPIDU_Sched_entry *,
                        MPIDU_SCHED_INITIAL_ENTRIES * sizeof(struct MPIDU_Sched_entry), mpi_errno,
                        "schedule entries vector", MPL_MEM_COMM);

    /* TODO in a debug build, defensively mark all entries as status=INVALID */

    MPIR_CHKPMEM_COMMIT();
    *sp = s;
  fn_exit:
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_CREATE);
    return mpi_errno;
  fn_fail:
    MPIR_CHKPMEM_REAP();
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_clone
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* clones orig and returns a handle to the new schedule in (*cloned) */
int MPIDU_Sched_clone(MPIR_Sched_t orig, MPIR_Sched_t * cloned)
{
    int mpi_errno = MPI_SUCCESS;
    /* TODO implement this function for real */
    MPIR_Assert_fmt_msg(FALSE, ("clone not yet implemented"));
    MPIR_Assertp(FALSE);
    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_start
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* sets (*sp) to MPIR_SCHED_NULL and gives you back a request pointer in (*req).
 * The caller is giving up ownership of the opaque schedule object. */
int MPIDU_Sched_start(MPIR_Sched_t * sp, MPIR_Comm * comm, int tag, MPIR_Request ** req)
{
    int mpi_errno = MPI_SUCCESS;
    MPIR_Request *r;
    struct MPIDU_Sched *s = *sp;

    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_START);
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_START);

    *req = NULL;
    *sp = MPIR_SCHED_NULL;

    /* sanity check the schedule */
    MPIR_Assert(s->num_entries <= s->size);
    MPIR_Assert(s->num_entries == 0 || s->idx < s->num_entries);
    MPIR_Assert(s->req == NULL);
    MPIR_Assert(s->next == NULL);
    MPIR_Assert(s->prev == NULL);
    MPIR_Assert(s->entries != NULL);

    /* now create and populate the request */
    r = MPIR_Request_create(MPIR_REQUEST_KIND__COLL);
    if (!r)
        MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomem");
    /* FIXME is this right when comm/datatype GC is used? */
    MPIR_Comm_add_ref(comm);
    r->comm = comm;
    /* req refcount is currently 1, for the user's request.  Increment for the
     * schedule's reference */
    MPIR_Request_add_ref(r);
    s->req = r;
    *req = r;
    /* cc is 1, which is fine b/c we only use it as a signal, rather than
     * incr/decr on every constituent operation */
    s->tag = tag;

    /* Now kick off any initial operations.  Do this before we tell the progress
     * engine about this req+sched, otherwise we have more MT issues to worry
     * about.  Skipping this step will increase latency. */
    mpi_errno = MPIDU_Sched_continue(s);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    /* finally, enqueue in the list of all pending schedules so that the
     * progress engine can make progress on it */
    if (all_schedules.head == NULL)
        MPID_Progress_activate_hook(MPIR_Nbc_progress_hook_id);

    DL_APPEND(all_schedules.head, s);

    MPL_DBG_MSG_P(MPIR_DBG_COMM, TYPICAL, "started schedule s=%p\n", s);
    if (MPIR_CVAR_COLL_SCHED_DUMP)
        sched_dump(s, stderr);

  fn_exit:
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_START);
    return mpi_errno;
  fn_fail:
    if (*req)
        *req = NULL;
    if (r) {
        MPIR_Request_free(r);   /* the schedule's ref */
        MPIR_Request_free(r);   /* the user's ref */
    }

    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_add_entry
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* idx and e are permitted to be NULL */
static int MPIDU_Sched_add_entry(struct MPIDU_Sched *s, int *idx, struct MPIDU_Sched_entry **e)
{
    int mpi_errno = MPI_SUCCESS;
    int i;
    struct MPIDU_Sched_entry *ei;

    MPIR_Assert(s->entries != NULL);
    MPIR_Assert(s->size > 0);

    if (s->num_entries == s->size) {
        /* need to grow the entries array */
        s->entries =
            MPL_realloc(s->entries, 2 * s->size * sizeof(struct MPIDU_Sched_entry), MPL_MEM_COMM);
        if (s->entries == NULL)
            MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomem");
        s->size *= 2;
    }

    i = s->num_entries++;
    ei = &s->entries[i];

    if (idx != NULL)
        *idx = i;
    if (e != NULL)
        *e = ei;
  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_send
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* do these ops need an entry handle returned? */
int MPIDU_Sched_send(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest,
                     MPIR_Comm * comm, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_SEND;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;

    e->u.send.buf = buf;
    e->u.send.count = count;
    e->u.send.count_p = NULL;
    e->u.send.datatype = datatype;
    e->u.send.dest = dest;
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
    e->u.send.comm = comm;
    e->u.send.is_sync = FALSE;

    /* the user may free the comm & type after initiating but before the
     * underlying send is actually posted, so we must add a reference here and
     * release it at entry completion time */
    MPIR_Comm_add_ref(comm);
    MPIR_Datatype_add_ref_if_not_builtin(datatype);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_ssend
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_ssend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest,
                      MPIR_Comm * comm, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_SEND;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;

    e->u.send.buf = buf;
    e->u.send.count = count;
    e->u.send.count_p = NULL;
    e->u.send.datatype = datatype;
    e->u.send.dest = dest;
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
    e->u.send.comm = comm;
    e->u.send.is_sync = TRUE;

    /* the user may free the comm & type after initiating but before the
     * underlying send is actually posted, so we must add a reference here and
     * release it at entry completion time */
    MPIR_Comm_add_ref(comm);
    MPIR_Datatype_add_ref_if_not_builtin(datatype);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_send_defer
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_send_defer(const void *buf, const MPI_Aint * count, MPI_Datatype datatype, int dest,
                           MPIR_Comm * comm, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_SEND;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;

    e->u.send.buf = buf;
    e->u.send.count = MPI_UNDEFINED;
    e->u.send.count_p = count;
    e->u.send.datatype = datatype;
    e->u.send.dest = dest;
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
    e->u.send.comm = comm;
    e->u.send.is_sync = FALSE;

    /* the user may free the comm & type after initiating but before the
     * underlying send is actually posted, so we must add a reference here and
     * release it at entry completion time */
    MPIR_Comm_add_ref(comm);
    MPIR_Datatype_add_ref_if_not_builtin(datatype);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_recv_status
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_recv_status(void *buf, MPI_Aint count, MPI_Datatype datatype, int src,
                            MPIR_Comm * comm, MPI_Status * status, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_RECV;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;

    e->u.recv.buf = buf;
    e->u.recv.count = count;
    e->u.recv.datatype = datatype;
    e->u.recv.src = src;
    e->u.recv.rreq = NULL;      /* will be populated by _start_entry */
    e->u.recv.comm = comm;
    e->u.recv.status = status;
    status->MPI_ERROR = MPI_SUCCESS;
    MPIR_Comm_add_ref(comm);
    MPIR_Datatype_add_ref_if_not_builtin(datatype);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_recv
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_recv(void *buf, MPI_Aint count, MPI_Datatype datatype, int src, MPIR_Comm * comm,
                     MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_RECV;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;

    e->u.recv.buf = buf;
    e->u.recv.count = count;
    e->u.recv.datatype = datatype;
    e->u.recv.src = src;
    e->u.recv.rreq = NULL;      /* will be populated by _start_entry */
    e->u.recv.comm = comm;
    e->u.recv.status = MPI_STATUS_IGNORE;

    MPIR_Comm_add_ref(comm);
    MPIR_Datatype_add_ref_if_not_builtin(datatype);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_reduce
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_reduce(const void *inbuf, void *inoutbuf, MPI_Aint count, MPI_Datatype datatype,
                       MPI_Op op, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;
    struct MPIDU_Sched_reduce *reduce = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_REDUCE;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;
    reduce = &e->u.reduce;

    reduce->inbuf = inbuf;
    reduce->inoutbuf = inoutbuf;
    reduce->count = count;
    reduce->datatype = datatype;
    reduce->op = op;

    MPIR_Datatype_add_ref_if_not_builtin(datatype);
    MPIR_Op_add_ref_if_not_builtin(op);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_copy
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* Schedules a copy of "incount" copies of "intype" from "inbuf" to "outbuf" as
 * specified by "outcount" and "outtype".  It is erroneous to attempt to copy
 * more data than will fit into the (outbuf,outcount,outtype)-triple.  This
 * corresponds naturally with the buffer sizing rules for send-recv.
 *
 * Packing/unpacking can be accomplished by passing MPI_PACKED as either intype
 * or outtype. */
int MPIDU_Sched_copy(const void *inbuf, MPI_Aint incount, MPI_Datatype intype,
                     void *outbuf, MPI_Aint outcount, MPI_Datatype outtype, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;
    struct MPIDU_Sched_copy *copy = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_COPY;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;
    copy = &e->u.copy;

    copy->inbuf = inbuf;
    copy->incount = incount;
    copy->intype = intype;
    copy->outbuf = outbuf;
    copy->outcount = outcount;
    copy->outtype = outtype;

    MPIR_Datatype_add_ref_if_not_builtin(intype);
    MPIR_Datatype_add_ref_if_not_builtin(outtype);

    /* some sanity checking up front */
#if defined(HAVE_ERROR_CHECKING) && !defined(NDEBUG)
    {
        MPI_Aint intype_size, outtype_size;
        MPIR_Datatype_get_size_macro(intype, intype_size);
        MPIR_Datatype_get_size_macro(outtype, outtype_size);
        if (incount * intype_size > outcount * outtype_size) {
            MPL_error_printf("truncation: intype=%#x, intype_size=" MPI_AINT_FMT_DEC_SPEC
                             ", incount=" MPI_AINT_FMT_DEC_SPEC ", outtype=%#x, outtype_size="
                             MPI_AINT_FMT_DEC_SPEC " outcount=" MPI_AINT_FMT_DEC_SPEC "\n", intype,
                             intype_size, incount, outtype, outtype_size, outcount);
        }
    }
#endif

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_barrier
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* require that all previously added ops are complete before subsequent ops
 * may begin to execute */
int MPIDU_Sched_barrier(MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;

    /* mark the previous entry as a barrier unless we're at the beginning, which
     * would be a pointless barrier */
    if (s->num_entries > 0) {
        s->entries[s->num_entries - 1].is_barrier = TRUE;
    }

    return mpi_errno;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_cb
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* buffer management, fancy reductions, etc */
int MPIDU_Sched_cb(MPIR_Sched_cb_t * cb_p, void *cb_state, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;
    struct MPIDU_Sched_cb *cb = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_CB;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;
    cb = &e->u.cb;

    cb->cb_type = MPIDU_SCHED_CB_TYPE_1;
    cb->u.cb_p = cb_p;
    cb->cb_state = cb_state;
    cb->cb_state2 = NULL;

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_cb2
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
/* buffer management, fancy reductions, etc */
int MPIDU_Sched_cb2(MPIR_Sched_cb2_t * cb_p, void *cb_state, void *cb_state2, MPIR_Sched_t s)
{
    int mpi_errno = MPI_SUCCESS;
    struct MPIDU_Sched_entry *e = NULL;
    struct MPIDU_Sched_cb *cb = NULL;

    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
    if (mpi_errno)
        MPIR_ERR_POP(mpi_errno);

    e->type = MPIDU_SCHED_ENTRY_CB;
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
    e->is_barrier = FALSE;
    cb = &e->u.cb;

    cb->cb_type = MPIDU_SCHED_CB_TYPE_2;
    cb->u.cb2_p = cb_p;
    cb->cb_state = cb_state;
    cb->cb_state2 = cb_state2;

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


/* returns TRUE in (*made_progress) if any of the outstanding schedules in state completed */
#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_progress_state
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made_progress)
{
    int mpi_errno = MPI_SUCCESS;
    size_t i;
    struct MPIDU_Sched *s;
    struct MPIDU_Sched *tmp;
    if (made_progress)
        *made_progress = FALSE;

    DL_FOREACH_SAFE(state->head, s, tmp) {
        if (MPIR_CVAR_COLL_SCHED_DUMP)
            sched_dump(s, stderr);

        for (i = s->idx; i < s->num_entries; ++i) {
            struct MPIDU_Sched_entry *e = &s->entries[i];

            switch (e->type) {
                case MPIDU_SCHED_ENTRY_SEND:
                    if (e->u.send.sreq != NULL && MPIR_Request_is_complete(e->u.send.sreq)) {
                        MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
                                        (MPL_DBG_FDEST, "completed SEND entry %d, sreq=%p\n",
                                         (int) i, e->u.send.sreq));
                        if (s->req->u.nbc.errflag != MPIR_ERR_NONE)
                            e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
                        else
                            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
                        MPIR_Request_free(e->u.send.sreq);
                        e->u.send.sreq = NULL;
                        MPIR_Comm_release(e->u.send.comm);
                        MPIR_Datatype_release_if_not_builtin(e->u.send.datatype);
                    }
                    break;
                case MPIDU_SCHED_ENTRY_RECV:
                    if (e->u.recv.rreq != NULL && MPIR_Request_is_complete(e->u.recv.rreq)) {
                        MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
                                        (MPL_DBG_FDEST, "completed RECV entry %d, rreq=%p\n",
                                         (int) i, e->u.recv.rreq));
                        MPIR_Process_status(&e->u.recv.rreq->status, &s->req->u.nbc.errflag);
                        if (e->u.recv.status != MPI_STATUS_IGNORE) {
                            MPI_Aint recvd;
                            e->u.recv.status->MPI_ERROR = e->u.recv.rreq->status.MPI_ERROR;
                            MPIR_Get_count_impl(&e->u.recv.rreq->status, MPI_BYTE, &recvd);
                            MPIR_STATUS_SET_COUNT(*(e->u.recv.status), recvd);
                        }
                        if (s->req->u.nbc.errflag != MPIR_ERR_NONE)
                            e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
                        else
                            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
                        MPIR_Request_free(e->u.recv.rreq);
                        e->u.recv.rreq = NULL;
                        MPIR_Comm_release(e->u.recv.comm);
                        MPIR_Datatype_release_if_not_builtin(e->u.recv.datatype);
                    }
                    break;
                default:
                    /* all other entry types don't have any sub-requests that
                     * need to be checked */
                    break;
            }

            if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
                ++s->idx;
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "completed OTHER entry %d\n", (int) i);
                if (e->is_barrier) {
                    /* post/perform the next round of operations */
                    mpi_errno = MPIDU_Sched_continue(s);
                    if (mpi_errno)
                        MPIR_ERR_POP(mpi_errno);
                }
            } else if (e->is_barrier && e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
                /* don't process anything after this barrier entry */
                break;
            }
        }

        if (s->idx == s->num_entries) {
            MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
                            (MPL_DBG_FDEST, "completing and dequeuing s=%p r=%p\n", s, s->req));

            /* dequeue this schedule from the state, it's complete */
            DL_DELETE(state->head, s);

            /* TODO refactor into a sched_complete routine? */
            switch (s->req->u.nbc.errflag) {
                case MPIR_ERR_PROC_FAILED:
                    MPIR_ERR_SET(s->req->status.MPI_ERROR, MPIX_ERR_PROC_FAILED, "**comm");
                    break;
                case MPIR_ERR_OTHER:
                    MPIR_ERR_SET(s->req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm");
                    break;
                case MPIR_ERR_NONE:
                default:
                    break;
            }

            mpi_errno = MPID_Request_complete(s->req);
            if (mpi_errno != MPI_SUCCESS) {
                MPIR_ERR_POP(mpi_errno);
            }

            s->req = NULL;
            MPL_free(s->entries);
            MPL_free(s);

            if (made_progress)
                *made_progress = TRUE;
        }
    }

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

/* returns TRUE in (*made_progress) if any of the outstanding schedules completed */
#undef FUNCNAME
#define FUNCNAME MPIDU_Sched_progress
#undef FCNAME
#define FCNAME MPL_QUOTE(FUNCNAME)
int MPIDU_Sched_progress(int *made_progress)
{
    int mpi_errno;

    MPID_THREAD_CS_ENTER(VNI, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);

    mpi_errno = MPIDU_Sched_progress_state(&all_schedules, made_progress);
    if (!mpi_errno && all_schedules.head == NULL)
        MPID_Progress_deactivate_hook(MPIR_Nbc_progress_hook_id);

    MPID_THREAD_CS_EXIT(VNI, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);

    return mpi_errno;
}