Blame src/mpid/common/sched/mpidu_sched.c

Packit Service c5cf8c
/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 *  (C) 2011 by Argonne National Laboratory.
Packit Service c5cf8c
 *      See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include "mpidimpl.h"
Packit Service c5cf8c
#include "utlist.h"
Packit Service c5cf8c
Packit Service c5cf8c
/* A random guess at an appropriate value, we can tune it later.  It could also
Packit Service c5cf8c
 * be a real tunable parameter. */
Packit Service c5cf8c
#define MPIDU_SCHED_INITIAL_ENTRIES (16)
Packit Service c5cf8c
Packit Service c5cf8c
/*
Packit Service c5cf8c
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
Packit Service c5cf8c
Packit Service c5cf8c
cvars:
Packit Service c5cf8c
    - name        : MPIR_CVAR_COLL_SCHED_DUMP
Packit Service c5cf8c
      category    : COLLECTIVE
Packit Service c5cf8c
      type        : boolean
Packit Service c5cf8c
      default     : false
Packit Service c5cf8c
      class       : device
Packit Service c5cf8c
      verbosity   : MPI_T_VERBOSITY_USER_BASIC
Packit Service c5cf8c
      scope       : MPI_T_SCOPE_ALL_EQ
Packit Service c5cf8c
      description : >-
Packit Service c5cf8c
        Print schedule data for nonblocking collective operations.
Packit Service c5cf8c
Packit Service c5cf8c
=== END_MPI_T_CVAR_INFO_BLOCK ===
Packit Service c5cf8c
*/
Packit Service c5cf8c
Packit Service c5cf8c
static const char *entry_to_str(enum MPIDU_Sched_entry_type type)
Packit Service c5cf8c
{
Packit Service c5cf8c
    switch (type) {
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_SEND:
Packit Service c5cf8c
            return "SEND";
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_RECV:
Packit Service c5cf8c
            return "RECV";
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_REDUCE:
Packit Service c5cf8c
            return "REDUCE";
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_COPY:
Packit Service c5cf8c
            return "COPY";
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_NOP:
Packit Service c5cf8c
            return "NOP";
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_CB:
Packit Service c5cf8c
            return "CB";
Packit Service c5cf8c
        default:
Packit Service c5cf8c
            return "(out of range)";
Packit Service c5cf8c
    }
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* utility function for debugging, dumps the given schedule object to fh */
Packit Service c5cf8c
static void sched_dump(struct MPIDU_Sched *s, FILE * fh)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int i;
Packit Service c5cf8c
Packit Service c5cf8c
    fprintf(fh, "--------------------------------\n");
Packit Service c5cf8c
    fprintf(fh, "s=%p\n", s);
Packit Service c5cf8c
    if (s) {
Packit Service c5cf8c
        fprintf(fh, "s->size=%zd\n", s->size);
Packit Service c5cf8c
        fprintf(fh, "s->idx=%zd\n", s->idx);
Packit Service c5cf8c
        fprintf(fh, "s->num_entries=%d\n", s->num_entries);
Packit Service c5cf8c
        fprintf(fh, "s->tag=%d\n", s->tag);
Packit Service c5cf8c
        fprintf(fh, "s->req=%p\n", s->req);
Packit Service c5cf8c
        fprintf(fh, "s->entries=%p\n", s->entries);
Packit Service c5cf8c
        for (i = 0; i < s->num_entries; ++i) {
Packit Service c5cf8c
            fprintf(fh, "&s->entries[%d]=%p\n", i, &s->entries[i]);
Packit Service c5cf8c
            fprintf(fh, "s->entries[%d].type=%s\n", i, entry_to_str(s->entries[i].type));
Packit Service c5cf8c
            fprintf(fh, "s->entries[%d].status=%d\n", i, s->entries[i].status);
Packit Service c5cf8c
            fprintf(fh, "s->entries[%d].is_barrier=%s\n", i,
Packit Service c5cf8c
                    (s->entries[i].is_barrier ? "TRUE" : "FALSE"));
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
    fprintf(fh, "--------------------------------\n");
Packit Service c5cf8c
    /*
Packit Service c5cf8c
     * fprintf(fh, "s->next=%p\n", s->next);
Packit Service c5cf8c
     * fprintf(fh, "s->prev=%p\n", s->prev);
Packit Service c5cf8c
     */
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
struct MPIDU_Sched_state {
Packit Service c5cf8c
    struct MPIDU_Sched *head;
Packit Service c5cf8c
    /* no need for a tail with utlist */
Packit Service c5cf8c
};
Packit Service c5cf8c
Packit Service c5cf8c
/* holds on to all incomplete schedules on which progress should be made */
Packit Service c5cf8c
struct MPIDU_Sched_state all_schedules = { NULL };
Packit Service c5cf8c
Packit Service c5cf8c
/* returns TRUE if any schedules are currently pending completion by the
Packit Service c5cf8c
 * progress engine, FALSE otherwise */
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_are_pending
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_are_pending(void)
Packit Service c5cf8c
{
Packit Service c5cf8c
    return (all_schedules.head != NULL);
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_next_tag
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_next_tag(MPIR_Comm * comm_ptr, int *tag)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    /* TODO there should be an internal accessor/utility macro for getting the
Packit Service c5cf8c
     * TAG_UB value that doesn't require using the attribute interface */
Packit Service c5cf8c
    int tag_ub = MPIR_Process.attrs.tag_ub;
Packit Service c5cf8c
#if defined(HAVE_ERROR_CHECKING)
Packit Service c5cf8c
    int start = MPI_UNDEFINED;
Packit Service c5cf8c
    int end = MPI_UNDEFINED;
Packit Service c5cf8c
    struct MPIDU_Sched *elt = NULL;
Packit Service c5cf8c
#endif
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_NEXT_TAG);
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_NEXT_TAG);
Packit Service c5cf8c
Packit Service c5cf8c
    *tag = comm_ptr->next_sched_tag;
Packit Service c5cf8c
    ++comm_ptr->next_sched_tag;
Packit Service c5cf8c
Packit Service c5cf8c
#if defined(HAVE_ERROR_CHECKING)
Packit Service c5cf8c
    /* Upon entry into the second half of the tag space, ensure there are no
Packit Service c5cf8c
     * outstanding schedules still using the second half of the space.  Check
Packit Service c5cf8c
     * the first half similarly on wraparound. */
Packit Service c5cf8c
    if (comm_ptr->next_sched_tag == (tag_ub / 2)) {
Packit Service c5cf8c
        start = tag_ub / 2;
Packit Service c5cf8c
        end = tag_ub;
Packit Service c5cf8c
    } else if (comm_ptr->next_sched_tag == (tag_ub)) {
Packit Service c5cf8c
        start = MPIR_FIRST_NBC_TAG;
Packit Service c5cf8c
        end = tag_ub / 2;
Packit Service c5cf8c
    }
Packit Service c5cf8c
    if (start != MPI_UNDEFINED) {
Packit Service c5cf8c
        DL_FOREACH(all_schedules.head, elt) {
Packit Service c5cf8c
            if (elt->tag >= start && elt->tag < end) {
Packit Service c5cf8c
                MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**toomanynbc");
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
#endif
Packit Service c5cf8c
Packit Service c5cf8c
    /* wrap the tag values around to the start, but don't allow it to conflict
Packit Service c5cf8c
     * with the tags used by the blocking collectives */
Packit Service c5cf8c
    if (comm_ptr->next_sched_tag == tag_ub) {
Packit Service c5cf8c
        comm_ptr->next_sched_tag = MPIR_FIRST_NBC_TAG;
Packit Service c5cf8c
    }
Packit Service c5cf8c
#if defined(HAVE_ERROR_CHECKING)
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
#endif
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_NEXT_TAG);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_start_entry
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* initiates the schedule entry "e" in the NBC described by "s", where
Packit Service c5cf8c
 * "e" is at "idx" in "s".  This means posting nonblocking sends/recvs,
Packit Service c5cf8c
 * performing reductions, calling callbacks, etc. */
Packit Service c5cf8c
static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPIDU_Sched_entry *e)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS, ret_errno = MPI_SUCCESS;
Packit Service c5cf8c
    MPIR_Request *r = s->req;
Packit Service c5cf8c
    MPIR_Comm *comm;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_START_ENTRY);
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_START_ENTRY);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Assert(e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED);
Packit Service c5cf8c
Packit Service c5cf8c
    switch (e->type) {
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_SEND:
Packit Service c5cf8c
            comm = e->u.send.comm;
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting SEND entry %d\n", (int) idx);
Packit Service c5cf8c
            if (e->u.send.count_p) {
Packit Service c5cf8c
                /* deferred send */
Packit Service c5cf8c
                /* originally there was no branch and send.count_p was set to
Packit Service c5cf8c
                 * &send.count, but this requires patching up the pointers
Packit Service c5cf8c
                 * during realloc of entries, so this is easier */
Packit Service c5cf8c
                ret_errno = MPIC_Isend(e->u.send.buf, *e->u.send.count_p, e->u.send.datatype,
Packit Service c5cf8c
                                       e->u.send.dest, s->tag, comm, &e->u.send.sreq,
Packit Service c5cf8c
                                       &r->u.nbc.errflag);
Packit Service c5cf8c
            } else {
Packit Service c5cf8c
                if (e->u.send.is_sync) {
Packit Service c5cf8c
                    ret_errno = MPIC_Issend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
Packit Service c5cf8c
                                            e->u.send.dest, s->tag, comm, &e->u.send.sreq,
Packit Service c5cf8c
                                            &r->u.nbc.errflag);
Packit Service c5cf8c
                } else {
Packit Service c5cf8c
                    ret_errno = MPIC_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
Packit Service c5cf8c
                                           e->u.send.dest, s->tag, comm, &e->u.send.sreq,
Packit Service c5cf8c
                                           &r->u.nbc.errflag);
Packit Service c5cf8c
                }
Packit Service c5cf8c
            }
Packit Service c5cf8c
            /* Check if the error is actually fatal to the NBC or we can continue. */
Packit Service c5cf8c
            if (unlikely(ret_errno)) {
Packit Service c5cf8c
                if (MPIR_ERR_NONE == r->u.nbc.errflag) {
Packit Service c5cf8c
                    if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
Packit Service c5cf8c
                        r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
Packit Service c5cf8c
                    } else {
Packit Service c5cf8c
                        r->u.nbc.errflag = MPIR_ERR_OTHER;
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                }
Packit Service c5cf8c
                e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Sched SEND failed. Errflag: %d\n",
Packit Service c5cf8c
                              (int) r->u.nbc.errflag);
Packit Service c5cf8c
            } else {
Packit Service c5cf8c
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
Packit Service c5cf8c
            }
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_RECV:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting RECV entry %d\n", (int) idx);
Packit Service c5cf8c
            comm = e->u.recv.comm;
Packit Service c5cf8c
            ret_errno = MPIC_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype,
Packit Service c5cf8c
                                   e->u.recv.src, s->tag, comm, &e->u.recv.rreq);
Packit Service c5cf8c
            /* Check if the error is actually fatal to the NBC or we can continue. */
Packit Service c5cf8c
            if (unlikely(ret_errno)) {
Packit Service c5cf8c
                if (MPIR_ERR_NONE == r->u.nbc.errflag) {
Packit Service c5cf8c
                    if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
Packit Service c5cf8c
                        r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
Packit Service c5cf8c
                    } else {
Packit Service c5cf8c
                        r->u.nbc.errflag = MPIR_ERR_OTHER;
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                }
Packit Service c5cf8c
                /* We should set the status to failed here - since the request is not freed. this
Packit Service c5cf8c
                 * will be handled later in MPIDU_Sched_progress_state, so set to started here */
Packit Service c5cf8c
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
Packit Service c5cf8c
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "Sched RECV failed. Errflag: %d\n",
Packit Service c5cf8c
                              (int) r->u.nbc.errflag);
Packit Service c5cf8c
            } else {
Packit Service c5cf8c
                e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
Packit Service c5cf8c
            }
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_REDUCE:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting REDUCE entry %d\n", (int) idx);
Packit Service c5cf8c
            mpi_errno =
Packit Service c5cf8c
                MPIR_Reduce_local(e->u.reduce.inbuf, e->u.reduce.inoutbuf, e->u.reduce.count,
Packit Service c5cf8c
                                  e->u.reduce.datatype, e->u.reduce.op);
Packit Service c5cf8c
            if (mpi_errno)
Packit Service c5cf8c
                MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
            MPIR_Op_release_if_not_builtin(e->u.reduce.op);
Packit Service c5cf8c
            MPIR_Datatype_release_if_not_builtin(e->u.reduce.datatype);
Packit Service c5cf8c
            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_COPY:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting COPY entry %d\n", (int) idx);
Packit Service c5cf8c
            mpi_errno = MPIR_Localcopy(e->u.copy.inbuf, e->u.copy.incount, e->u.copy.intype,
Packit Service c5cf8c
                                       e->u.copy.outbuf, e->u.copy.outcount, e->u.copy.outtype);
Packit Service c5cf8c
            if (mpi_errno)
Packit Service c5cf8c
                MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
            MPIR_Datatype_release_if_not_builtin(e->u.copy.intype);
Packit Service c5cf8c
            MPIR_Datatype_release_if_not_builtin(e->u.copy.outtype);
Packit Service c5cf8c
            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_NOP:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting NOOP entry %d\n", (int) idx);
Packit Service c5cf8c
            /* nothing to be done */
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        case MPIDU_SCHED_ENTRY_CB:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "starting CB entry %d\n", (int) idx);
Packit Service c5cf8c
            if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_1) {
Packit Service c5cf8c
                ret_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state);
Packit Service c5cf8c
                /* Sched entries list can be reallocated inside callback */
Packit Service c5cf8c
                e = &s->entries[idx];
Packit Service c5cf8c
                if (unlikely(ret_errno)) {
Packit Service c5cf8c
                    if (MPIR_ERR_NONE == r->u.nbc.errflag) {
Packit Service c5cf8c
                        if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
Packit Service c5cf8c
                            r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
Packit Service c5cf8c
                        } else {
Packit Service c5cf8c
                            r->u.nbc.errflag = MPIR_ERR_OTHER;
Packit Service c5cf8c
                        }
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
                } else {
Packit Service c5cf8c
                    e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
                }
Packit Service c5cf8c
            } else if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_2) {
Packit Service c5cf8c
                ret_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2);
Packit Service c5cf8c
                /* Sched entries list can be reallocated inside callback */
Packit Service c5cf8c
                e = &s->entries[idx];
Packit Service c5cf8c
                if (unlikely(ret_errno)) {
Packit Service c5cf8c
                    if (MPIR_ERR_NONE == r->u.nbc.errflag) {
Packit Service c5cf8c
                        if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
Packit Service c5cf8c
                            r->u.nbc.errflag = MPIR_ERR_PROC_FAILED;
Packit Service c5cf8c
                        } else {
Packit Service c5cf8c
                            r->u.nbc.errflag = MPIR_ERR_OTHER;
Packit Service c5cf8c
                        }
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
                } else {
Packit Service c5cf8c
                    e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
                }
Packit Service c5cf8c
            } else {
Packit Service c5cf8c
                MPL_DBG_MSG_D(MPIR_DBG_COMM, TYPICAL, "unknown callback type, e->u.cb.cb_type=%d",
Packit Service c5cf8c
                              e->u.cb.cb_type);
Packit Service c5cf8c
                e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
            }
Packit Service c5cf8c
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        default:
Packit Service c5cf8c
            MPL_DBG_MSG_D(MPIR_DBG_COMM, TYPICAL, "unknown entry type, e->type=%d", e->type);
Packit Service c5cf8c
            break;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_START_ENTRY);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
    if (r)
Packit Service c5cf8c
        r->status.MPI_ERROR = mpi_errno;
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* Posts or performs any NOT_STARTED operations in the given schedule that are
Packit Service c5cf8c
 * permitted to be started.  That is, this routine will respect schedule
Packit Service c5cf8c
 * barriers appropriately. */
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_continue
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
static int MPIDU_Sched_continue(struct MPIDU_Sched *s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    size_t i;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_CONTINUE);
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_CONTINUE);
Packit Service c5cf8c
Packit Service c5cf8c
    for (i = s->idx; i < s->num_entries; ++i) {
Packit Service c5cf8c
        struct MPIDU_Sched_entry *e = &s->entries[i];
Packit Service c5cf8c
Packit Service c5cf8c
        if (e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED) {
Packit Service c5cf8c
            mpi_errno = MPIDU_Sched_start_entry(s, i, e);
Packit Service c5cf8c
            /* Sched entries list can be reallocated inside callback */
Packit Service c5cf8c
            e = &s->entries[i];
Packit Service c5cf8c
            if (mpi_errno)
Packit Service c5cf8c
                MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        /* _start_entry may have completed the operation, but won't update s->idx */
Packit Service c5cf8c
        if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
Packit Service c5cf8c
            ++s->idx;   /* this is valid even for barrier entries */
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        /* watch the indexing, s->idx might have been incremented above, so
Packit Service c5cf8c
         * ||-short-circuit matters here */
Packit Service c5cf8c
        if (e->is_barrier && (e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i + 1))) {
Packit Service c5cf8c
            /* we've hit a barrier but outstanding operations before this
Packit Service c5cf8c
             * barrier remain, so we cannot proceed past the barrier */
Packit Service c5cf8c
            break;
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_CONTINUE);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_create
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* creates a new opaque schedule object and returns a handle to it in (*sp) */
Packit Service c5cf8c
int MPIDU_Sched_create(MPIR_Sched_t * sp)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched *s;
Packit Service c5cf8c
    MPIR_CHKPMEM_DECL(2);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_CREATE);
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_CREATE);
Packit Service c5cf8c
Packit Service c5cf8c
    *sp = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    /* this mem will be freed by the progress engine when the request is completed */
Packit Service c5cf8c
    MPIR_CHKPMEM_MALLOC(s, struct MPIDU_Sched *, sizeof(struct MPIDU_Sched), mpi_errno,
Packit Service c5cf8c
                        "schedule object", MPL_MEM_COMM);
Packit Service c5cf8c
Packit Service c5cf8c
    s->size = MPIDU_SCHED_INITIAL_ENTRIES;
Packit Service c5cf8c
    s->idx = 0;
Packit Service c5cf8c
    s->num_entries = 0;
Packit Service c5cf8c
    s->tag = -1;
Packit Service c5cf8c
    s->req = NULL;
Packit Service c5cf8c
    s->entries = NULL;
Packit Service c5cf8c
    s->next = NULL;     /* only needed for sanity checks */
Packit Service c5cf8c
    s->prev = NULL;     /* only needed for sanity checks */
Packit Service c5cf8c
Packit Service c5cf8c
    /* this mem will be freed by the progress engine when the request is completed */
Packit Service c5cf8c
    MPIR_CHKPMEM_MALLOC(s->entries, struct MPIDU_Sched_entry *,
Packit Service c5cf8c
                        MPIDU_SCHED_INITIAL_ENTRIES * sizeof(struct MPIDU_Sched_entry), mpi_errno,
Packit Service c5cf8c
                        "schedule entries vector", MPL_MEM_COMM);
Packit Service c5cf8c
Packit Service c5cf8c
    /* TODO in a debug build, defensively mark all entries as status=INVALID */
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_CHKPMEM_COMMIT();
Packit Service c5cf8c
    *sp = s;
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_CREATE);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    MPIR_CHKPMEM_REAP();
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_clone
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* clones orig and returns a handle to the new schedule in (*cloned) */
Packit Service c5cf8c
int MPIDU_Sched_clone(MPIR_Sched_t orig, MPIR_Sched_t * cloned)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    /* TODO implement this function for real */
Packit Service c5cf8c
    MPIR_Assert_fmt_msg(FALSE, ("clone not yet implemented"));
Packit Service c5cf8c
    MPIR_Assertp(FALSE);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_start
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* sets (*sp) to MPIR_SCHED_NULL and gives you back a request pointer in (*req).
Packit Service c5cf8c
 * The caller is giving up ownership of the opaque schedule object. */
Packit Service c5cf8c
int MPIDU_Sched_start(MPIR_Sched_t * sp, MPIR_Comm * comm, int tag, MPIR_Request ** req)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    MPIR_Request *r;
Packit Service c5cf8c
    struct MPIDU_Sched *s = *sp;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDU_SCHED_START);
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDU_SCHED_START);
Packit Service c5cf8c
Packit Service c5cf8c
    *req = NULL;
Packit Service c5cf8c
    *sp = MPIR_SCHED_NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    /* sanity check the schedule */
Packit Service c5cf8c
    MPIR_Assert(s->num_entries <= s->size);
Packit Service c5cf8c
    MPIR_Assert(s->num_entries == 0 || s->idx < s->num_entries);
Packit Service c5cf8c
    MPIR_Assert(s->req == NULL);
Packit Service c5cf8c
    MPIR_Assert(s->next == NULL);
Packit Service c5cf8c
    MPIR_Assert(s->prev == NULL);
Packit Service c5cf8c
    MPIR_Assert(s->entries != NULL);
Packit Service c5cf8c
Packit Service c5cf8c
    /* now create and populate the request */
Packit Service c5cf8c
    r = MPIR_Request_create(MPIR_REQUEST_KIND__COLL);
Packit Service c5cf8c
    if (!r)
Packit Service c5cf8c
        MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomem");
Packit Service c5cf8c
    /* FIXME is this right when comm/datatype GC is used? */
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    r->comm = comm;
Packit Service c5cf8c
    /* req refcount is currently 1, for the user's request.  Increment for the
Packit Service c5cf8c
     * schedule's reference */
Packit Service c5cf8c
    MPIR_Request_add_ref(r);
Packit Service c5cf8c
    s->req = r;
Packit Service c5cf8c
    *req = r;
Packit Service c5cf8c
    /* cc is 1, which is fine b/c we only use it as a signal, rather than
Packit Service c5cf8c
     * incr/decr on every constituent operation */
Packit Service c5cf8c
    s->tag = tag;
Packit Service c5cf8c
Packit Service c5cf8c
    /* Now kick off any initial operations.  Do this before we tell the progress
Packit Service c5cf8c
     * engine about this req+sched, otherwise we have more MT issues to worry
Packit Service c5cf8c
     * about.  Skipping this step will increase latency. */
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_continue(s);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    /* finally, enqueue in the list of all pending schedules so that the
Packit Service c5cf8c
     * progress engine can make progress on it */
Packit Service c5cf8c
    if (all_schedules.head == NULL)
Packit Service c5cf8c
        MPID_Progress_activate_hook(MPIR_Nbc_progress_hook_id);
Packit Service c5cf8c
Packit Service c5cf8c
    DL_APPEND(all_schedules.head, s);
Packit Service c5cf8c
Packit Service c5cf8c
    MPL_DBG_MSG_P(MPIR_DBG_COMM, TYPICAL, "started schedule s=%p\n", s);
Packit Service c5cf8c
    if (MPIR_CVAR_COLL_SCHED_DUMP)
Packit Service c5cf8c
        sched_dump(s, stderr);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDU_SCHED_START);
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    if (*req)
Packit Service c5cf8c
        *req = NULL;
Packit Service c5cf8c
    if (r) {
Packit Service c5cf8c
        MPIR_Request_free(r);   /* the schedule's ref */
Packit Service c5cf8c
        MPIR_Request_free(r);   /* the user's ref */
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_add_entry
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* idx and e are permitted to be NULL */
Packit Service c5cf8c
static int MPIDU_Sched_add_entry(struct MPIDU_Sched *s, int *idx, struct MPIDU_Sched_entry **e)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    int i;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *ei;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Assert(s->entries != NULL);
Packit Service c5cf8c
    MPIR_Assert(s->size > 0);
Packit Service c5cf8c
Packit Service c5cf8c
    if (s->num_entries == s->size) {
Packit Service c5cf8c
        /* need to grow the entries array */
Packit Service c5cf8c
        s->entries =
Packit Service c5cf8c
            MPL_realloc(s->entries, 2 * s->size * sizeof(struct MPIDU_Sched_entry), MPL_MEM_COMM);
Packit Service c5cf8c
        if (s->entries == NULL)
Packit Service c5cf8c
            MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomem");
Packit Service c5cf8c
        s->size *= 2;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    i = s->num_entries++;
Packit Service c5cf8c
    ei = &s->entries[i];
Packit Service c5cf8c
Packit Service c5cf8c
    if (idx != NULL)
Packit Service c5cf8c
        *idx = i;
Packit Service c5cf8c
    if (e != NULL)
Packit Service c5cf8c
        *e = ei;
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_send
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* do these ops need an entry handle returned? */
Packit Service c5cf8c
int MPIDU_Sched_send(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest,
Packit Service c5cf8c
                     MPIR_Comm * comm, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_SEND;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    e->u.send.buf = buf;
Packit Service c5cf8c
    e->u.send.count = count;
Packit Service c5cf8c
    e->u.send.count_p = NULL;
Packit Service c5cf8c
    e->u.send.datatype = datatype;
Packit Service c5cf8c
    e->u.send.dest = dest;
Packit Service c5cf8c
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
Packit Service c5cf8c
    e->u.send.comm = comm;
Packit Service c5cf8c
    e->u.send.is_sync = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    /* the user may free the comm & type after initiating but before the
Packit Service c5cf8c
     * underlying send is actually posted, so we must add a reference here and
Packit Service c5cf8c
     * release it at entry completion time */
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_ssend
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_ssend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest,
Packit Service c5cf8c
                      MPIR_Comm * comm, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_SEND;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    e->u.send.buf = buf;
Packit Service c5cf8c
    e->u.send.count = count;
Packit Service c5cf8c
    e->u.send.count_p = NULL;
Packit Service c5cf8c
    e->u.send.datatype = datatype;
Packit Service c5cf8c
    e->u.send.dest = dest;
Packit Service c5cf8c
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
Packit Service c5cf8c
    e->u.send.comm = comm;
Packit Service c5cf8c
    e->u.send.is_sync = TRUE;
Packit Service c5cf8c
Packit Service c5cf8c
    /* the user may free the comm & type after initiating but before the
Packit Service c5cf8c
     * underlying send is actually posted, so we must add a reference here and
Packit Service c5cf8c
     * release it at entry completion time */
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_send_defer
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_send_defer(const void *buf, const MPI_Aint * count, MPI_Datatype datatype, int dest,
Packit Service c5cf8c
                           MPIR_Comm * comm, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_SEND;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    e->u.send.buf = buf;
Packit Service c5cf8c
    e->u.send.count = MPI_UNDEFINED;
Packit Service c5cf8c
    e->u.send.count_p = count;
Packit Service c5cf8c
    e->u.send.datatype = datatype;
Packit Service c5cf8c
    e->u.send.dest = dest;
Packit Service c5cf8c
    e->u.send.sreq = NULL;      /* will be populated by _start_entry */
Packit Service c5cf8c
    e->u.send.comm = comm;
Packit Service c5cf8c
    e->u.send.is_sync = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    /* the user may free the comm & type after initiating but before the
Packit Service c5cf8c
     * underlying send is actually posted, so we must add a reference here and
Packit Service c5cf8c
     * release it at entry completion time */
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_recv_status
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_recv_status(void *buf, MPI_Aint count, MPI_Datatype datatype, int src,
Packit Service c5cf8c
                            MPIR_Comm * comm, MPI_Status * status, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_RECV;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    e->u.recv.buf = buf;
Packit Service c5cf8c
    e->u.recv.count = count;
Packit Service c5cf8c
    e->u.recv.datatype = datatype;
Packit Service c5cf8c
    e->u.recv.src = src;
Packit Service c5cf8c
    e->u.recv.rreq = NULL;      /* will be populated by _start_entry */
Packit Service c5cf8c
    e->u.recv.comm = comm;
Packit Service c5cf8c
    e->u.recv.status = status;
Packit Service c5cf8c
    status->MPI_ERROR = MPI_SUCCESS;
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_recv
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_recv(void *buf, MPI_Aint count, MPI_Datatype datatype, int src, MPIR_Comm * comm,
Packit Service c5cf8c
                     MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_RECV;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    e->u.recv.buf = buf;
Packit Service c5cf8c
    e->u.recv.count = count;
Packit Service c5cf8c
    e->u.recv.datatype = datatype;
Packit Service c5cf8c
    e->u.recv.src = src;
Packit Service c5cf8c
    e->u.recv.rreq = NULL;      /* will be populated by _start_entry */
Packit Service c5cf8c
    e->u.recv.comm = comm;
Packit Service c5cf8c
    e->u.recv.status = MPI_STATUS_IGNORE;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Comm_add_ref(comm);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_reduce
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_reduce(const void *inbuf, void *inoutbuf, MPI_Aint count, MPI_Datatype datatype,
Packit Service c5cf8c
                       MPI_Op op, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
    struct MPIDU_Sched_reduce *reduce = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_REDUCE;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
    reduce = &e->u.reduce;
Packit Service c5cf8c
Packit Service c5cf8c
    reduce->inbuf = inbuf;
Packit Service c5cf8c
    reduce->inoutbuf = inoutbuf;
Packit Service c5cf8c
    reduce->count = count;
Packit Service c5cf8c
    reduce->datatype = datatype;
Packit Service c5cf8c
    reduce->op = op;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(datatype);
Packit Service c5cf8c
    MPIR_Op_add_ref_if_not_builtin(op);
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_copy
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* Schedules a copy of "incount" copies of "intype" from "inbuf" to "outbuf" as
Packit Service c5cf8c
 * specified by "outcount" and "outtype".  It is erroneous to attempt to copy
Packit Service c5cf8c
 * more data than will fit into the (outbuf,outcount,outtype)-triple.  This
Packit Service c5cf8c
 * corresponds naturally with the buffer sizing rules for send-recv.
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * Packing/unpacking can be accomplished by passing MPI_PACKED as either intype
Packit Service c5cf8c
 * or outtype. */
Packit Service c5cf8c
int MPIDU_Sched_copy(const void *inbuf, MPI_Aint incount, MPI_Datatype intype,
Packit Service c5cf8c
                     void *outbuf, MPI_Aint outcount, MPI_Datatype outtype, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
    struct MPIDU_Sched_copy *copy = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_COPY;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
    copy = &e->u.copy;
Packit Service c5cf8c
Packit Service c5cf8c
    copy->inbuf = inbuf;
Packit Service c5cf8c
    copy->incount = incount;
Packit Service c5cf8c
    copy->intype = intype;
Packit Service c5cf8c
    copy->outbuf = outbuf;
Packit Service c5cf8c
    copy->outcount = outcount;
Packit Service c5cf8c
    copy->outtype = outtype;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(intype);
Packit Service c5cf8c
    MPIR_Datatype_add_ref_if_not_builtin(outtype);
Packit Service c5cf8c
Packit Service c5cf8c
    /* some sanity checking up front */
Packit Service c5cf8c
#if defined(HAVE_ERROR_CHECKING) && !defined(NDEBUG)
Packit Service c5cf8c
    {
Packit Service c5cf8c
        MPI_Aint intype_size, outtype_size;
Packit Service c5cf8c
        MPIR_Datatype_get_size_macro(intype, intype_size);
Packit Service c5cf8c
        MPIR_Datatype_get_size_macro(outtype, outtype_size);
Packit Service c5cf8c
        if (incount * intype_size > outcount * outtype_size) {
Packit Service c5cf8c
            MPL_error_printf("truncation: intype=%#x, intype_size=" MPI_AINT_FMT_DEC_SPEC
Packit Service c5cf8c
                             ", incount=" MPI_AINT_FMT_DEC_SPEC ", outtype=%#x, outtype_size="
Packit Service c5cf8c
                             MPI_AINT_FMT_DEC_SPEC " outcount=" MPI_AINT_FMT_DEC_SPEC "\n", intype,
Packit Service c5cf8c
                             intype_size, incount, outtype, outtype_size, outcount);
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
#endif
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_barrier
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* require that all previously added ops are complete before subsequent ops
Packit Service c5cf8c
 * may begin to execute */
Packit Service c5cf8c
int MPIDU_Sched_barrier(MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
Packit Service c5cf8c
    /* mark the previous entry as a barrier unless we're at the beginning, which
Packit Service c5cf8c
     * would be a pointless barrier */
Packit Service c5cf8c
    if (s->num_entries > 0) {
Packit Service c5cf8c
        s->entries[s->num_entries - 1].is_barrier = TRUE;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_cb
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* buffer management, fancy reductions, etc */
Packit Service c5cf8c
int MPIDU_Sched_cb(MPIR_Sched_cb_t * cb_p, void *cb_state, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
    struct MPIDU_Sched_cb *cb = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_CB;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
    cb = &e->u.cb;
Packit Service c5cf8c
Packit Service c5cf8c
    cb->cb_type = MPIDU_SCHED_CB_TYPE_1;
Packit Service c5cf8c
    cb->u.cb_p = cb_p;
Packit Service c5cf8c
    cb->cb_state = cb_state;
Packit Service c5cf8c
    cb->cb_state2 = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_cb2
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
/* buffer management, fancy reductions, etc */
Packit Service c5cf8c
int MPIDU_Sched_cb2(MPIR_Sched_cb2_t * cb_p, void *cb_state, void *cb_state2, MPIR_Sched_t s)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    struct MPIDU_Sched_entry *e = NULL;
Packit Service c5cf8c
    struct MPIDU_Sched_cb *cb = NULL;
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    e->type = MPIDU_SCHED_ENTRY_CB;
Packit Service c5cf8c
    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
Packit Service c5cf8c
    e->is_barrier = FALSE;
Packit Service c5cf8c
    cb = &e->u.cb;
Packit Service c5cf8c
Packit Service c5cf8c
    cb->cb_type = MPIDU_SCHED_CB_TYPE_2;
Packit Service c5cf8c
    cb->u.cb2_p = cb_p;
Packit Service c5cf8c
    cb->cb_state = cb_state;
Packit Service c5cf8c
    cb->cb_state2 = cb_state2;
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
/* returns TRUE in (*made_progress) if any of the outstanding schedules in state completed */
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_progress_state
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made_progress)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    size_t i;
Packit Service c5cf8c
    struct MPIDU_Sched *s;
Packit Service c5cf8c
    struct MPIDU_Sched *tmp;
Packit Service c5cf8c
    if (made_progress)
Packit Service c5cf8c
        *made_progress = FALSE;
Packit Service c5cf8c
Packit Service c5cf8c
    DL_FOREACH_SAFE(state->head, s, tmp) {
Packit Service c5cf8c
        if (MPIR_CVAR_COLL_SCHED_DUMP)
Packit Service c5cf8c
            sched_dump(s, stderr);
Packit Service c5cf8c
Packit Service c5cf8c
        for (i = s->idx; i < s->num_entries; ++i) {
Packit Service c5cf8c
            struct MPIDU_Sched_entry *e = &s->entries[i];
Packit Service c5cf8c
Packit Service c5cf8c
            switch (e->type) {
Packit Service c5cf8c
                case MPIDU_SCHED_ENTRY_SEND:
Packit Service c5cf8c
                    if (e->u.send.sreq != NULL && MPIR_Request_is_complete(e->u.send.sreq)) {
Packit Service c5cf8c
                        MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
Packit Service c5cf8c
                                        (MPL_DBG_FDEST, "completed SEND entry %d, sreq=%p\n",
Packit Service c5cf8c
                                         (int) i, e->u.send.sreq));
Packit Service c5cf8c
                        if (s->req->u.nbc.errflag != MPIR_ERR_NONE)
Packit Service c5cf8c
                            e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
                        else
Packit Service c5cf8c
                            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
                        MPIR_Request_free(e->u.send.sreq);
Packit Service c5cf8c
                        e->u.send.sreq = NULL;
Packit Service c5cf8c
                        MPIR_Comm_release(e->u.send.comm);
Packit Service c5cf8c
                        MPIR_Datatype_release_if_not_builtin(e->u.send.datatype);
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
                case MPIDU_SCHED_ENTRY_RECV:
Packit Service c5cf8c
                    if (e->u.recv.rreq != NULL && MPIR_Request_is_complete(e->u.recv.rreq)) {
Packit Service c5cf8c
                        MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
Packit Service c5cf8c
                                        (MPL_DBG_FDEST, "completed RECV entry %d, rreq=%p\n",
Packit Service c5cf8c
                                         (int) i, e->u.recv.rreq));
Packit Service c5cf8c
                        MPIR_Process_status(&e->u.recv.rreq->status, &s->req->u.nbc.errflag);
Packit Service c5cf8c
                        if (e->u.recv.status != MPI_STATUS_IGNORE) {
Packit Service c5cf8c
                            MPI_Aint recvd;
Packit Service c5cf8c
                            e->u.recv.status->MPI_ERROR = e->u.recv.rreq->status.MPI_ERROR;
Packit Service c5cf8c
                            MPIR_Get_count_impl(&e->u.recv.rreq->status, MPI_BYTE, &recvd);
Packit Service c5cf8c
                            MPIR_STATUS_SET_COUNT(*(e->u.recv.status), recvd);
Packit Service c5cf8c
                        }
Packit Service c5cf8c
                        if (s->req->u.nbc.errflag != MPIR_ERR_NONE)
Packit Service c5cf8c
                            e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
Packit Service c5cf8c
                        else
Packit Service c5cf8c
                            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
Packit Service c5cf8c
                        MPIR_Request_free(e->u.recv.rreq);
Packit Service c5cf8c
                        e->u.recv.rreq = NULL;
Packit Service c5cf8c
                        MPIR_Comm_release(e->u.recv.comm);
Packit Service c5cf8c
                        MPIR_Datatype_release_if_not_builtin(e->u.recv.datatype);
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
                default:
Packit Service c5cf8c
                    /* all other entry types don't have any sub-requests that
Packit Service c5cf8c
                     * need to be checked */
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
            }
Packit Service c5cf8c
Packit Service c5cf8c
            if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
Packit Service c5cf8c
                ++s->idx;
Packit Service c5cf8c
                MPL_DBG_MSG_D(MPIR_DBG_COMM, VERBOSE, "completed OTHER entry %d\n", (int) i);
Packit Service c5cf8c
                if (e->is_barrier) {
Packit Service c5cf8c
                    /* post/perform the next round of operations */
Packit Service c5cf8c
                    mpi_errno = MPIDU_Sched_continue(s);
Packit Service c5cf8c
                    if (mpi_errno)
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                }
Packit Service c5cf8c
            } else if (e->is_barrier && e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
Packit Service c5cf8c
                /* don't process anything after this barrier entry */
Packit Service c5cf8c
                break;
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        if (s->idx == s->num_entries) {
Packit Service c5cf8c
            MPL_DBG_MSG_FMT(MPIR_DBG_COMM, VERBOSE,
Packit Service c5cf8c
                            (MPL_DBG_FDEST, "completing and dequeuing s=%p r=%p\n", s, s->req));
Packit Service c5cf8c
Packit Service c5cf8c
            /* dequeue this schedule from the state, it's complete */
Packit Service c5cf8c
            DL_DELETE(state->head, s);
Packit Service c5cf8c
Packit Service c5cf8c
            /* TODO refactor into a sched_complete routine? */
Packit Service c5cf8c
            switch (s->req->u.nbc.errflag) {
Packit Service c5cf8c
                case MPIR_ERR_PROC_FAILED:
Packit Service c5cf8c
                    MPIR_ERR_SET(s->req->status.MPI_ERROR, MPIX_ERR_PROC_FAILED, "**comm");
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
                case MPIR_ERR_OTHER:
Packit Service c5cf8c
                    MPIR_ERR_SET(s->req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm");
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
                case MPIR_ERR_NONE:
Packit Service c5cf8c
                default:
Packit Service c5cf8c
                    break;
Packit Service c5cf8c
            }
Packit Service c5cf8c
Packit Service c5cf8c
            mpi_errno = MPID_Request_complete(s->req);
Packit Service c5cf8c
            if (mpi_errno != MPI_SUCCESS) {
Packit Service c5cf8c
                MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
            }
Packit Service c5cf8c
Packit Service c5cf8c
            s->req = NULL;
Packit Service c5cf8c
            MPL_free(s->entries);
Packit Service c5cf8c
            MPL_free(s);
Packit Service c5cf8c
Packit Service c5cf8c
            if (made_progress)
Packit Service c5cf8c
                *made_progress = TRUE;
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}
Packit Service c5cf8c
Packit Service c5cf8c
/* returns TRUE in (*made_progress) if any of the outstanding schedules completed */
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIDU_Sched_progress
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIDU_Sched_progress(int *made_progress)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int mpi_errno;
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_THREAD_CS_ENTER(VNI, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    mpi_errno = MPIDU_Sched_progress_state(&all_schedules, made_progress);
Packit Service c5cf8c
    if (!mpi_errno && all_schedules.head == NULL)
Packit Service c5cf8c
        MPID_Progress_deactivate_hook(MPIR_Nbc_progress_hook_id);
Packit Service c5cf8c
Packit Service c5cf8c
    MPID_THREAD_CS_EXIT(VNI, MPIR_THREAD_GLOBAL_ALLFUNC_MUTEX);
Packit Service c5cf8c
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
}