Blame src/mpi/coll/alltoallw/alltoallw_intra_scattered.c

Packit Service c5cf8c
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 *
Packit Service c5cf8c
 *  (C) 2001 by Argonne National Laboratory.
Packit Service c5cf8c
 *      See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include "mpiimpl.h"
Packit Service c5cf8c
Packit Service c5cf8c
/* Algorithm: Blocked Alltoallw
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * Since each process sends/receives different amounts of data to every other
Packit Service c5cf8c
 * process, we don't know the total message size for all processes without
Packit Service c5cf8c
 * additional communication. Therefore we simply use the "middle of the road"
Packit Service c5cf8c
 * isend/irecv algorithm that works reasonably well in all cases.
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * We post all irecvs and isends and then do a waitall. We scatter the order of
Packit Service c5cf8c
 * sources and destinations among the processes, so that all processes don't
Packit Service c5cf8c
 * try to send/recv to/from the same process at the same time.
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * *** Modification: We post only a small number of isends and irecvs at a time
Packit Service c5cf8c
 * and wait on them as suggested by Tony Ladd. ***
Packit Service c5cf8c
 */
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIR_Alltoallw_intra_scattered
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIR_Alltoallw_intra_scattered(const void *sendbuf, const int sendcounts[], const int sdispls[],
Packit Service c5cf8c
                                   const MPI_Datatype sendtypes[], void *recvbuf,
Packit Service c5cf8c
                                   const int recvcounts[], const int rdispls[],
Packit Service c5cf8c
                                   const MPI_Datatype recvtypes[], MPIR_Comm * comm_ptr,
Packit Service c5cf8c
                                   MPIR_Errflag_t * errflag)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int comm_size, i;
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    int mpi_errno_ret = MPI_SUCCESS;
Packit Service c5cf8c
    MPI_Status *starray;
Packit Service c5cf8c
    MPIR_Request **reqarray;
Packit Service c5cf8c
    int dst, rank;
Packit Service c5cf8c
    int outstanding_requests;
Packit Service c5cf8c
    int ii, ss, bblock;
Packit Service c5cf8c
    int type_size;
Packit Service c5cf8c
    MPIR_CHKLMEM_DECL(2);
Packit Service c5cf8c
Packit Service c5cf8c
    comm_size = comm_ptr->local_size;
Packit Service c5cf8c
    rank = comm_ptr->rank;
Packit Service c5cf8c
Packit Service c5cf8c
#ifdef HAVE_ERROR_CHECKING
Packit Service c5cf8c
    /* When MPI_IN_PLACE, we use pair-wise sendrecv_replace in order to conserve memory usage,
Packit Service c5cf8c
     * which is keeping with the spirit of the MPI-2.2 Standard.  But
Packit Service c5cf8c
     * because of this approach all processes must agree on the global
Packit Service c5cf8c
     * schedule of sendrecv_replace operations to avoid deadlock.
Packit Service c5cf8c
     */
Packit Service c5cf8c
    MPIR_Assert(sendbuf != MPI_IN_PLACE);
Packit Service c5cf8c
#endif
Packit Service c5cf8c
Packit Service c5cf8c
    bblock = MPIR_CVAR_ALLTOALL_THROTTLE;
Packit Service c5cf8c
    if (bblock == 0)
Packit Service c5cf8c
        bblock = comm_size;
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(starray, MPI_Status *, 2 * bblock * sizeof(MPI_Status), mpi_errno,
Packit Service c5cf8c
                        "starray", MPL_MEM_BUFFER);
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(reqarray, MPIR_Request **, 2 * bblock * sizeof(MPIR_Request *), mpi_errno,
Packit Service c5cf8c
                        "reqarray", MPL_MEM_BUFFER);
Packit Service c5cf8c
Packit Service c5cf8c
    /* post only bblock isends/irecvs at a time as suggested by Tony Ladd */
Packit Service c5cf8c
    for (ii = 0; ii < comm_size; ii += bblock) {
Packit Service c5cf8c
        outstanding_requests = 0;
Packit Service c5cf8c
        ss = comm_size - ii < bblock ? comm_size - ii : bblock;
Packit Service c5cf8c
Packit Service c5cf8c
        /* do the communication -- post ss sends and receives: */
Packit Service c5cf8c
        for (i = 0; i < ss; i++) {
Packit Service c5cf8c
            dst = (rank + i + ii) % comm_size;
Packit Service c5cf8c
            if (recvcounts[dst]) {
Packit Service c5cf8c
                MPIR_Datatype_get_size_macro(recvtypes[dst], type_size);
Packit Service c5cf8c
                if (type_size) {
Packit Service c5cf8c
                    mpi_errno = MPIC_Irecv((char *) recvbuf + rdispls[dst],
Packit Service c5cf8c
                                           recvcounts[dst], recvtypes[dst], dst,
Packit Service c5cf8c
                                           MPIR_ALLTOALLW_TAG, comm_ptr,
Packit Service c5cf8c
                                           &reqarray[outstanding_requests]);
Packit Service c5cf8c
                    if (mpi_errno) {
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                    }
Packit Service c5cf8c
Packit Service c5cf8c
                    outstanding_requests++;
Packit Service c5cf8c
                }
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        for (i = 0; i < ss; i++) {
Packit Service c5cf8c
            dst = (rank - i - ii + comm_size) % comm_size;
Packit Service c5cf8c
            if (sendcounts[dst]) {
Packit Service c5cf8c
                MPIR_Datatype_get_size_macro(sendtypes[dst], type_size);
Packit Service c5cf8c
                if (type_size) {
Packit Service c5cf8c
                    mpi_errno = MPIC_Isend((char *) sendbuf + sdispls[dst],
Packit Service c5cf8c
                                           sendcounts[dst], sendtypes[dst], dst,
Packit Service c5cf8c
                                           MPIR_ALLTOALLW_TAG, comm_ptr,
Packit Service c5cf8c
                                           &reqarray[outstanding_requests], errflag);
Packit Service c5cf8c
                    if (mpi_errno) {
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                    }
Packit Service c5cf8c
Packit Service c5cf8c
                    outstanding_requests++;
Packit Service c5cf8c
                }
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        mpi_errno = MPIC_Waitall(outstanding_requests, reqarray, starray, errflag);
Packit Service c5cf8c
        if (mpi_errno && mpi_errno != MPI_ERR_IN_STATUS)
Packit Service c5cf8c
            MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
        /* --BEGIN ERROR HANDLING-- */
Packit Service c5cf8c
        if (mpi_errno == MPI_ERR_IN_STATUS) {
Packit Service c5cf8c
            for (i = 0; i < outstanding_requests; i++) {
Packit Service c5cf8c
                if (starray[i].MPI_ERROR != MPI_SUCCESS) {
Packit Service c5cf8c
                    mpi_errno = starray[i].MPI_ERROR;
Packit Service c5cf8c
                    if (mpi_errno) {
Packit Service c5cf8c
                        /* for communication errors, just record the error but continue */
Packit Service c5cf8c
                        *errflag =
Packit Service c5cf8c
                            MPIX_ERR_PROC_FAILED ==
Packit Service c5cf8c
                            MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
Packit Service c5cf8c
                        MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
Packit Service c5cf8c
                        MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
Packit Service c5cf8c
                    }
Packit Service c5cf8c
                }
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
        /* --END ERROR HANDLING-- */
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_CHKLMEM_FREEALL();
Packit Service c5cf8c
    if (mpi_errno_ret)
Packit Service c5cf8c
        mpi_errno = mpi_errno_ret;
Packit Service c5cf8c
    else if (*errflag != MPIR_ERR_NONE)
Packit Service c5cf8c
        MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}