Blame src/mpi/coll/scan/scan_intra_recursive_doubling.c

Packit Service c5cf8c
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit Service c5cf8c
/*
Packit Service c5cf8c
 *
Packit Service c5cf8c
 *  (C) 2001 by Argonne National Laboratory.
Packit Service c5cf8c
 *      See COPYRIGHT in top-level directory.
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#include "mpiimpl.h"
Packit Service c5cf8c
Packit Service c5cf8c
/*
Packit Service c5cf8c
 * Recursive-doubling
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * We use a lgp recursive doubling algorithm. The basic algorithm is
Packit Service c5cf8c
 * given below. (You can replace "+" with any other scan operator.)
Packit Service c5cf8c
 * The result is stored in recvbuf.
Packit Service c5cf8c
 *
Packit Service c5cf8c
 * .vb
Packit Service c5cf8c
 *   recvbuf = sendbuf;
Packit Service c5cf8c
 *   partial_scan = sendbuf;
Packit Service c5cf8c
 *   mask = 0x1;
Packit Service c5cf8c
 *   while (mask < size) {
Packit Service c5cf8c
 *       dst = rank^mask;
Packit Service c5cf8c
 *       if (dst < size) {
Packit Service c5cf8c
 *           send partial_scan to dst;
Packit Service c5cf8c
 *           recv from dst into tmp_buf;
Packit Service c5cf8c
 *           if (rank > dst) {
Packit Service c5cf8c
 *               partial_scan = tmp_buf + partial_scan;
Packit Service c5cf8c
 *               recvbuf = tmp_buf + recvbuf;
Packit Service c5cf8c
 *           }
Packit Service c5cf8c
 *           else {
Packit Service c5cf8c
 *               if (op is commutative)
Packit Service c5cf8c
 *                   partial_scan = tmp_buf + partial_scan;
Packit Service c5cf8c
 *               else {
Packit Service c5cf8c
 *                   tmp_buf = partial_scan + tmp_buf;
Packit Service c5cf8c
 *                   partial_scan = tmp_buf;
Packit Service c5cf8c
 *               }
Packit Service c5cf8c
 *           }
Packit Service c5cf8c
 *       }
Packit Service c5cf8c
 *       mask <<= 1;
Packit Service c5cf8c
 *   }
Packit Service c5cf8c
 * .ve
Packit Service c5cf8c
 *
Packit Service c5cf8c
 */
Packit Service c5cf8c
Packit Service c5cf8c
#undef FUNCNAME
Packit Service c5cf8c
#define FUNCNAME MPIR_Scan_intra_recursive_doubling
Packit Service c5cf8c
#undef FCNAME
Packit Service c5cf8c
#define FCNAME MPL_QUOTE(FUNCNAME)
Packit Service c5cf8c
int MPIR_Scan_intra_recursive_doubling(const void *sendbuf,
Packit Service c5cf8c
                                       void *recvbuf,
Packit Service c5cf8c
                                       int count,
Packit Service c5cf8c
                                       MPI_Datatype datatype,
Packit Service c5cf8c
                                       MPI_Op op, MPIR_Comm * comm_ptr, MPIR_Errflag_t * errflag)
Packit Service c5cf8c
{
Packit Service c5cf8c
    MPI_Status status;
Packit Service c5cf8c
    int rank, comm_size;
Packit Service c5cf8c
    int mpi_errno = MPI_SUCCESS;
Packit Service c5cf8c
    int mpi_errno_ret = MPI_SUCCESS;
Packit Service c5cf8c
    int mask, dst, is_commutative;
Packit Service c5cf8c
    MPI_Aint true_extent, true_lb, extent;
Packit Service c5cf8c
    void *partial_scan, *tmp_buf;
Packit Service c5cf8c
    MPIR_CHKLMEM_DECL(2);
Packit Service c5cf8c
Packit Service c5cf8c
    if (count == 0)
Packit Service c5cf8c
        return MPI_SUCCESS;
Packit Service c5cf8c
Packit Service c5cf8c
    comm_size = comm_ptr->local_size;
Packit Service c5cf8c
    rank = comm_ptr->rank;
Packit Service c5cf8c
Packit Service c5cf8c
    /* set op_errno to 0. stored in perthread structure */
Packit Service c5cf8c
    {
Packit Service c5cf8c
        MPIR_Per_thread_t *per_thread = NULL;
Packit Service c5cf8c
        int err = 0;
Packit Service c5cf8c
Packit Service c5cf8c
        MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
Packit Service c5cf8c
                                     MPIR_Per_thread, per_thread, &err;;
Packit Service c5cf8c
        MPIR_Assert(err == 0);
Packit Service c5cf8c
        per_thread->op_errno = 0;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    is_commutative = MPIR_Op_is_commutative(op);
Packit Service c5cf8c
Packit Service c5cf8c
    /* need to allocate temporary buffer to store partial scan */
Packit Service c5cf8c
    MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent);
Packit Service c5cf8c
Packit Service c5cf8c
    MPIR_Datatype_get_extent_macro(datatype, extent);
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(partial_scan, void *, count * (MPL_MAX(extent, true_extent)), mpi_errno,
Packit Service c5cf8c
                        "partial_scan", MPL_MEM_BUFFER);
Packit Service c5cf8c
Packit Service c5cf8c
    /* This eventually gets malloc()ed as a temp buffer, not added to
Packit Service c5cf8c
     * any user buffers */
Packit Service c5cf8c
    MPIR_Ensure_Aint_fits_in_pointer(count * MPL_MAX(extent, true_extent));
Packit Service c5cf8c
Packit Service c5cf8c
    /* adjust for potential negative lower bound in datatype */
Packit Service c5cf8c
    partial_scan = (void *) ((char *) partial_scan - true_lb);
Packit Service c5cf8c
Packit Service c5cf8c
    /* need to allocate temporary buffer to store incoming data */
Packit Service c5cf8c
    MPIR_CHKLMEM_MALLOC(tmp_buf, void *, count * (MPL_MAX(extent, true_extent)), mpi_errno,
Packit Service c5cf8c
                        "tmp_buf", MPL_MEM_BUFFER);
Packit Service c5cf8c
Packit Service c5cf8c
    /* adjust for potential negative lower bound in datatype */
Packit Service c5cf8c
    tmp_buf = (void *) ((char *) tmp_buf - true_lb);
Packit Service c5cf8c
Packit Service c5cf8c
    /* Since this is an inclusive scan, copy local contribution into
Packit Service c5cf8c
     * recvbuf. */
Packit Service c5cf8c
    if (sendbuf != MPI_IN_PLACE) {
Packit Service c5cf8c
        mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype);
Packit Service c5cf8c
        if (mpi_errno)
Packit Service c5cf8c
            MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    if (sendbuf != MPI_IN_PLACE)
Packit Service c5cf8c
        mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, partial_scan, count, datatype);
Packit Service c5cf8c
    else
Packit Service c5cf8c
        mpi_errno = MPIR_Localcopy(recvbuf, count, datatype, partial_scan, count, datatype);
Packit Service c5cf8c
    if (mpi_errno)
Packit Service c5cf8c
        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
Packit Service c5cf8c
    mask = 0x1;
Packit Service c5cf8c
    while (mask < comm_size) {
Packit Service c5cf8c
        dst = rank ^ mask;
Packit Service c5cf8c
        if (dst < comm_size) {
Packit Service c5cf8c
            /* Send partial_scan to dst. Recv into tmp_buf */
Packit Service c5cf8c
            mpi_errno = MPIC_Sendrecv(partial_scan, count, datatype,
Packit Service c5cf8c
                                      dst, MPIR_SCAN_TAG, tmp_buf,
Packit Service c5cf8c
                                      count, datatype, dst,
Packit Service c5cf8c
                                      MPIR_SCAN_TAG, comm_ptr, &status, errflag);
Packit Service c5cf8c
            if (mpi_errno) {
Packit Service c5cf8c
                /* for communication errors, just record the error but continue */
Packit Service c5cf8c
                *errflag =
Packit Service c5cf8c
                    MPIX_ERR_PROC_FAILED ==
Packit Service c5cf8c
                    MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
Packit Service c5cf8c
                MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
Packit Service c5cf8c
                MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
Packit Service c5cf8c
            }
Packit Service c5cf8c
Packit Service c5cf8c
            if (rank > dst) {
Packit Service c5cf8c
                mpi_errno = MPIR_Reduce_local(tmp_buf, partial_scan, count, datatype, op);
Packit Service c5cf8c
                if (mpi_errno)
Packit Service c5cf8c
                    MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                mpi_errno = MPIR_Reduce_local(tmp_buf, recvbuf, count, datatype, op);
Packit Service c5cf8c
                if (mpi_errno)
Packit Service c5cf8c
                    MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
            } else {
Packit Service c5cf8c
                if (is_commutative) {
Packit Service c5cf8c
                    mpi_errno = MPIR_Reduce_local(tmp_buf, partial_scan, count, datatype, op);
Packit Service c5cf8c
                    if (mpi_errno)
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                } else {
Packit Service c5cf8c
                    mpi_errno = MPIR_Reduce_local(partial_scan, tmp_buf, count, datatype, op);
Packit Service c5cf8c
                    if (mpi_errno)
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                    mpi_errno = MPIR_Localcopy(tmp_buf, count, datatype,
Packit Service c5cf8c
                                               partial_scan, count, datatype);
Packit Service c5cf8c
                    if (mpi_errno)
Packit Service c5cf8c
                        MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
                }
Packit Service c5cf8c
            }
Packit Service c5cf8c
        }
Packit Service c5cf8c
        mask <<= 1;
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
    {
Packit Service c5cf8c
        MPIR_Per_thread_t *per_thread = NULL;
Packit Service c5cf8c
        int err = 0;
Packit Service c5cf8c
Packit Service c5cf8c
        MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
Packit Service c5cf8c
                                     MPIR_Per_thread, per_thread, &err;;
Packit Service c5cf8c
        MPIR_Assert(err == 0);
Packit Service c5cf8c
        if (per_thread->op_errno) {
Packit Service c5cf8c
            mpi_errno = per_thread->op_errno;
Packit Service c5cf8c
            if (mpi_errno)
Packit Service c5cf8c
                MPIR_ERR_POP(mpi_errno);
Packit Service c5cf8c
        }
Packit Service c5cf8c
    }
Packit Service c5cf8c
Packit Service c5cf8c
  fn_exit:
Packit Service c5cf8c
    MPIR_CHKLMEM_FREEALL();
Packit Service c5cf8c
Packit Service c5cf8c
    if (mpi_errno_ret)
Packit Service c5cf8c
        mpi_errno = mpi_errno_ret;
Packit Service c5cf8c
    else if (*errflag != MPIR_ERR_NONE)
Packit Service c5cf8c
        MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
Packit Service c5cf8c
    return mpi_errno;
Packit Service c5cf8c
  fn_fail:
Packit Service c5cf8c
    goto fn_exit;
Packit Service c5cf8c
}