Blame src/mpi/coll/reduce_scatter/reduce_scatter_intra_pairwise.c

Packit Service c5cf8c
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 *
Packit Service c5cf8c
 *  (C) 2001 by Argonne National Laboratory.
Packit Service c5cf8c
 *      See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include "mpiimpl.h"
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIR_Reduce_scatter_intra_pairwise
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
Packit Service c5cf8c
/* Algorithm: Pairwise Exchange
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * For commutative operations and very long messages we use This is a pairwise
Packit Service c5cf8c
 * exchange algorithm similar to the one used in MPI_Alltoall. At step i, each
Packit Service c5cf8c
 * process sends n/p amount of data to (rank+i) and receives n/p amount of data
Packit Service c5cf8c
 * from (rank-i).
Packit Service c5cf8c
 */
Packit Service c5cf8c
int MPIR_Reduce_scatter_intra_pairwise(const void *sendbuf, void *recvbuf, const int recvcounts[],
Packit Service c5cf8c
                                       MPI_Datatype datatype, MPI_Op op, MPIR_Comm * comm_ptr,
Packit Service c5cf8c
                                       MPIR_Errflag_t * errflag)
Packit Service c5cf8c
{
Packit Service c5cf8c
    int rank, comm_size, i;
Packit Service c5cf8c
    MPI_Aint extent, true_extent, true_lb;
Packit Service c5cf8c
    int *disps;
Packit Service c5cf8c
    void *tmp_recvbuf;
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    int mpi_errno_ret = MPI_SUCCESS;
Packit Service c5cf8c
    int total_count, src, dst;
Packit Service c5cf8c
    MPIR_CHKLMEM_DECL(5);
Packit Service c5cf8c
Packit Service c5cf8c
    comm_size = comm_ptr->local_size;
Packit Service c5cf8c
    rank = comm_ptr->rank;
Packit Service c5cf8c
Packit Service c5cf8c
    /* set op_errno to 0. stored in perthread structure */
Packit Service c5cf8c
    {
Packit Service c5cf8c
        MPIR_Per_thread_t *per_thread = NULL;
Packit Service c5cf8c
        int err = 0;
Packit Service c5cf8c
Packit Service c5cf8c
        MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
Packit Service c5cf8c
                                     MPIR_Per_thread, per_thread, &err;;
Packit Service c5cf8c
        MPIR_Assert(err == 0);
Packit Service c5cf8c
        per_thread->op_errno = 0;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Datatype_get_extent_macro(datatype, extent);
Packit Service c5cf8c
    MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent);
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
#ifdef HAVE_ERROR_CHECKING
Packit Service c5cf8c
    {
Packit Service c5cf8c
        int is_commutative;
Packit Service c5cf8c
        is_commutative = MPIR_Op_is_commutative(op);
Packit Service c5cf8c
        MPIR_Assert(is_commutative);
Packit Service c5cf8c
    }
Packit Service c5cf8c
#endif /* HAVE_ERROR_CHECKING */
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(disps, int *, comm_size * sizeof(int), mpi_errno, "disps", MPL_MEM_BUFFER);
Packit Service c5cf8c
Packit Service c5cf8c
    total_count = 0;
Packit Service c5cf8c
    for (i = 0; i < comm_size; i++) {
Packit Service c5cf8c
        disps[i] = total_count;
Packit Service c5cf8c
        total_count += recvcounts[i];
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    if (total_count == 0) {
Packit Service c5cf8c
        goto fn_exit;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    /* total_count*extent eventually gets malloced. it isn't added to
Packit Service c5cf8c
     * a user-passed in buffer */
Packit Service c5cf8c
    MPIR_Ensure_Aint_fits_in_pointer(total_count * MPL_MAX(true_extent, extent));
Packit Service c5cf8c
Packit Service c5cf8c
Packit Service c5cf8c
    /* commutative and long message, or noncommutative and long message.
Packit Service c5cf8c
     * use (p-1) pairwise exchanges */
Packit Service c5cf8c
Packit Service c5cf8c
    if (sendbuf != MPI_IN_PLACE) {
Packit Service c5cf8c
        /* copy local data into recvbuf */
Packit Service c5cf8c
        mpi_errno = MPIR_Localcopy(((char *) sendbuf + disps[rank] * extent),
Packit Service c5cf8c
                                   recvcounts[rank], datatype, recvbuf, recvcounts[rank], datatype);
Packit Service c5cf8c
        if (mpi_errno)
Packit Service c5cf8c
            MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    /* allocate temporary buffer to store incoming data */
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(tmp_recvbuf, void *, recvcounts[rank] * (MPL_MAX(true_extent, extent)) + 1,
Packit Service c5cf8c
                        mpi_errno, "tmp_recvbuf", MPL_MEM_BUFFER);
Packit Service c5cf8c
    /* adjust for potential negative lower bound in datatype */
Packit Service c5cf8c
    tmp_recvbuf = (void *) ((char *) tmp_recvbuf - true_lb);
Packit Service c5cf8c
Packit Service c5cf8c
    for (i = 1; i < comm_size; i++) {
Packit Service c5cf8c
        src = (rank - i + comm_size) % comm_size;
Packit Service c5cf8c
        dst = (rank + i) % comm_size;
Packit Service c5cf8c
Packit Service c5cf8c
        /* send the data that dst needs. recv data that this process
Packit Service c5cf8c
         * needs from src into tmp_recvbuf */
Packit Service c5cf8c
        if (sendbuf != MPI_IN_PLACE)
Packit Service c5cf8c
            mpi_errno = MPIC_Sendrecv(((char *) sendbuf + disps[dst] * extent),
Packit Service c5cf8c
                                      recvcounts[dst], datatype, dst,
Packit Service c5cf8c
                                      MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
Packit Service c5cf8c
                                      recvcounts[rank], datatype, src,
Packit Service c5cf8c
                                      MPIR_REDUCE_SCATTER_TAG, comm_ptr,
Packit Service c5cf8c
                                      MPI_STATUS_IGNORE, errflag);
Packit Service c5cf8c
        else
Packit Service c5cf8c
            mpi_errno = MPIC_Sendrecv(((char *) recvbuf + disps[dst] * extent),
Packit Service c5cf8c
                                      recvcounts[dst], datatype, dst,
Packit Service c5cf8c
                                      MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
Packit Service c5cf8c
                                      recvcounts[rank], datatype, src,
Packit Service c5cf8c
                                      MPIR_REDUCE_SCATTER_TAG, comm_ptr,
Packit Service c5cf8c
                                      MPI_STATUS_IGNORE, errflag);
Packit Service c5cf8c
Packit Service c5cf8c
        if (mpi_errno) {
Packit Service c5cf8c
            /* for communication errors, just record the error but continue */
Packit Service c5cf8c
            *errflag =
Packit Service c5cf8c
                MPIX_ERR_PROC_FAILED ==
Packit Service c5cf8c
                MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
Packit Service c5cf8c
            MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
Packit Service c5cf8c
            MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
Packit Service c5cf8c
        }
Packit Service c5cf8c
Packit Service c5cf8c
        if (sendbuf != MPI_IN_PLACE) {
Packit Service c5cf8c
            mpi_errno = MPIR_Reduce_local(tmp_recvbuf, recvbuf, recvcounts[rank], datatype, op);
Packit Service c5cf8c
        } else {
Packit Service c5cf8c
            mpi_errno = MPIR_Reduce_local(tmp_recvbuf, ((char *) recvbuf + disps[rank] * extent),
Packit Service c5cf8c
                                          recvcounts[rank], datatype, op);
Packit Service c5cf8c
            /* we can't store the result at the beginning of
Packit Service c5cf8c
             * recvbuf right here because there is useful data
Packit Service c5cf8c
             * there that other process/processes need. at the
Packit Service c5cf8c
             * end, we will copy back the result to the
Packit Service c5cf8c
             * beginning of recvbuf. */
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    /* if MPI_IN_PLACE, move output data to the beginning of
Packit Service c5cf8c
     * recvbuf. already done for rank 0. */
Packit Service c5cf8c
    if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) {
Packit Service c5cf8c
        mpi_errno = MPIR_Localcopy(((char *) recvbuf +
Packit Service c5cf8c
                                    disps[rank] * extent),
Packit Service c5cf8c
                                   recvcounts[rank], datatype, recvbuf, recvcounts[rank], datatype);
Packit Service c5cf8c
        if (mpi_errno)
Packit Service c5cf8c
            MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_CHKLMEM_FREEALL();
Packit Service c5cf8c
Packit Service c5cf8c
    {
Packit Service c5cf8c
        MPIR_Per_thread_t *per_thread = NULL;
Packit Service c5cf8c
        int err = 0;
Packit Service c5cf8c
Packit Service c5cf8c
        MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
Packit Service c5cf8c
                                     MPIR_Per_thread, per_thread, &err;;
Packit Service c5cf8c
        MPIR_Assert(err == 0);
Packit Service c5cf8c
        if (per_thread->op_errno)
Packit Service c5cf8c
            mpi_errno = per_thread->op_errno;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    if (mpi_errno_ret)
Packit Service c5cf8c
        mpi_errno = mpi_errno_ret;
Packit Service c5cf8c
    else if (*errflag != MPIR_ERR_NONE)
Packit Service c5cf8c
        MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}