|
Packit Service |
c5cf8c |
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
|
|
Packit Service |
c5cf8c |
/*
|
|
Packit Service |
c5cf8c |
*
|
|
Packit Service |
c5cf8c |
* (C) 2017 by Argonne National Laboratory.
|
|
Packit Service |
c5cf8c |
* See COPYRIGHT in top-level directory.
|
|
Packit Service |
c5cf8c |
*/
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* This implementation of MPI_Reduce_scatter_block was obtained by taking
|
|
Packit Service |
c5cf8c |
the implementation of MPI_Reduce_scatter from reduce_scatter.c and replacing
|
|
Packit Service |
c5cf8c |
recvcnts[i] with recvcount everywhere. */
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#include "mpiimpl.h"
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#undef FUNCNAME
|
|
Packit Service |
c5cf8c |
#define FUNCNAME MPIR_Reduce_scatter_block_intra_pairwise
|
|
Packit Service |
c5cf8c |
#undef FCNAME
|
|
Packit Service |
c5cf8c |
#define FCNAME MPL_QUOTE(FUNCNAME)
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Algorithm: Pairwise Exchange
|
|
Packit Service |
c5cf8c |
*
|
|
Packit Service |
c5cf8c |
* This is a pairwise exchange algorithm similar to
|
|
Packit Service |
c5cf8c |
* the one used in MPI_Alltoall. At step i, each process sends n/p amount of
|
|
Packit Service |
c5cf8c |
* data to (rank+i) and receives n/p amount of data from (rank-i).
|
|
Packit Service |
c5cf8c |
*
|
|
Packit Service |
c5cf8c |
* Cost = (p-1).alpha + n.((p-1)/p).beta + n.((p-1)/p).gamma
|
|
Packit Service |
c5cf8c |
*/
|
|
Packit Service |
c5cf8c |
int MPIR_Reduce_scatter_block_intra_pairwise(const void *sendbuf,
|
|
Packit Service |
c5cf8c |
void *recvbuf,
|
|
Packit Service |
c5cf8c |
int recvcount,
|
|
Packit Service |
c5cf8c |
MPI_Datatype datatype,
|
|
Packit Service |
c5cf8c |
MPI_Op op,
|
|
Packit Service |
c5cf8c |
MPIR_Comm * comm_ptr, MPIR_Errflag_t * errflag)
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
int rank, comm_size, i;
|
|
Packit Service |
c5cf8c |
MPI_Aint extent, true_extent, true_lb;
|
|
Packit Service |
c5cf8c |
int *disps;
|
|
Packit Service |
c5cf8c |
void *tmp_recvbuf;
|
|
Packit Service |
c5cf8c |
int mpi_errno = MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
int mpi_errno_ret = MPI_SUCCESS;
|
|
Packit Service |
c5cf8c |
int src, dst;
|
|
Packit Service |
c5cf8c |
MPIR_CHKLMEM_DECL(5);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
comm_size = comm_ptr->local_size;
|
|
Packit Service |
c5cf8c |
rank = comm_ptr->rank;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* set op_errno to 0. stored in perthread structure */
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
MPIR_Per_thread_t *per_thread = NULL;
|
|
Packit Service |
c5cf8c |
int err = 0;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
|
|
Packit Service |
c5cf8c |
MPIR_Per_thread, per_thread, &err;;
|
|
Packit Service |
c5cf8c |
MPIR_Assert(err == 0);
|
|
Packit Service |
c5cf8c |
per_thread->op_errno = 0;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (recvcount == 0) {
|
|
Packit Service |
c5cf8c |
goto fn_exit;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_Datatype_get_extent_macro(datatype, extent);
|
|
Packit Service |
c5cf8c |
MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#ifdef HAVE_ERROR_CHECKING
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
int is_commutative;
|
|
Packit Service |
c5cf8c |
is_commutative = MPIR_Op_is_commutative(op);
|
|
Packit Service |
c5cf8c |
MPIR_Assert(is_commutative);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
#endif /* HAVE_ERROR_CHECKING */
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPIR_CHKLMEM_MALLOC(disps, int *, comm_size * sizeof(int), mpi_errno, "disps", MPL_MEM_BUFFER);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
for (i = 0; i < comm_size; i++) {
|
|
Packit Service |
c5cf8c |
disps[i] = i * recvcount;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* commutative and long message, or noncommutative and long message.
|
|
Packit Service |
c5cf8c |
* use (p-1) pairwise exchanges */
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (sendbuf != MPI_IN_PLACE) {
|
|
Packit Service |
c5cf8c |
/* copy local data into recvbuf */
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Localcopy(((char *) sendbuf + disps[rank] * extent),
|
|
Packit Service |
c5cf8c |
recvcount, datatype, recvbuf, recvcount, datatype);
|
|
Packit Service |
c5cf8c |
if (mpi_errno)
|
|
Packit Service |
c5cf8c |
MPIR_ERR_POP(mpi_errno);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* allocate temporary buffer to store incoming data */
|
|
Packit Service |
c5cf8c |
MPIR_CHKLMEM_MALLOC(tmp_recvbuf, void *, recvcount * (MPL_MAX(true_extent, extent)) + 1,
|
|
Packit Service |
c5cf8c |
mpi_errno, "tmp_recvbuf", MPL_MEM_BUFFER);
|
|
Packit Service |
c5cf8c |
/* adjust for potential negative lower bound in datatype */
|
|
Packit Service |
c5cf8c |
tmp_recvbuf = (void *) ((char *) tmp_recvbuf - true_lb);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
for (i = 1; i < comm_size; i++) {
|
|
Packit Service |
c5cf8c |
src = (rank - i + comm_size) % comm_size;
|
|
Packit Service |
c5cf8c |
dst = (rank + i) % comm_size;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* send the data that dst needs. recv data that this process
|
|
Packit Service |
c5cf8c |
* needs from src into tmp_recvbuf */
|
|
Packit Service |
c5cf8c |
if (sendbuf != MPI_IN_PLACE)
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIC_Sendrecv(((char *) sendbuf + disps[dst] * extent),
|
|
Packit Service |
c5cf8c |
recvcount, datatype, dst,
|
|
Packit Service |
c5cf8c |
MPIR_REDUCE_SCATTER_BLOCK_TAG, tmp_recvbuf,
|
|
Packit Service |
c5cf8c |
recvcount, datatype, src,
|
|
Packit Service |
c5cf8c |
MPIR_REDUCE_SCATTER_BLOCK_TAG, comm_ptr,
|
|
Packit Service |
c5cf8c |
MPI_STATUS_IGNORE, errflag);
|
|
Packit Service |
c5cf8c |
else
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIC_Sendrecv(((char *) recvbuf + disps[dst] * extent),
|
|
Packit Service |
c5cf8c |
recvcount, datatype, dst,
|
|
Packit Service |
c5cf8c |
MPIR_REDUCE_SCATTER_BLOCK_TAG, tmp_recvbuf,
|
|
Packit Service |
c5cf8c |
recvcount, datatype, src,
|
|
Packit Service |
c5cf8c |
MPIR_REDUCE_SCATTER_BLOCK_TAG, comm_ptr,
|
|
Packit Service |
c5cf8c |
MPI_STATUS_IGNORE, errflag);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (mpi_errno) {
|
|
Packit Service |
c5cf8c |
/* for communication errors, just record the error but continue */
|
|
Packit Service |
c5cf8c |
*errflag =
|
|
Packit Service |
c5cf8c |
MPIX_ERR_PROC_FAILED ==
|
|
Packit Service |
c5cf8c |
MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER;
|
|
Packit Service |
c5cf8c |
MPIR_ERR_SET(mpi_errno, *errflag, "**fail");
|
|
Packit Service |
c5cf8c |
MPIR_ERR_ADD(mpi_errno_ret, mpi_errno);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (sendbuf != MPI_IN_PLACE) {
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Reduce_local(tmp_recvbuf, recvbuf, recvcount, datatype, op);
|
|
Packit Service |
c5cf8c |
} else {
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Reduce_local(tmp_recvbuf, ((char *) recvbuf + disps[rank] * extent),
|
|
Packit Service |
c5cf8c |
recvcount, datatype, op);
|
|
Packit Service |
c5cf8c |
/* we can't store the result at the beginning of
|
|
Packit Service |
c5cf8c |
* recvbuf right here because there is useful data
|
|
Packit Service |
c5cf8c |
* there that other process/processes need. at the
|
|
Packit Service |
c5cf8c |
* end, we will copy back the result to the
|
|
Packit Service |
c5cf8c |
* beginning of recvbuf. */
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* if MPI_IN_PLACE, move output data to the beginning of
|
|
Packit Service |
c5cf8c |
* recvbuf. already done for rank 0. */
|
|
Packit Service |
c5cf8c |
if ((sendbuf == MPI_IN_PLACE) && (rank != 0)) {
|
|
Packit Service |
c5cf8c |
mpi_errno = MPIR_Localcopy(((char *) recvbuf +
|
|
Packit Service |
c5cf8c |
disps[rank] * extent),
|
|
Packit Service |
c5cf8c |
recvcount, datatype, recvbuf, recvcount, datatype);
|
|
Packit Service |
c5cf8c |
if (mpi_errno)
|
|
Packit Service |
c5cf8c |
MPIR_ERR_POP(mpi_errno);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
fn_exit:
|
|
Packit Service |
c5cf8c |
MPIR_CHKLMEM_FREEALL();
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
MPIR_Per_thread_t *per_thread = NULL;
|
|
Packit Service |
c5cf8c |
int err = 0;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPID_THREADPRIV_KEY_GET_ADDR(MPIR_ThreadInfo.isThreaded, MPIR_Per_thread_key,
|
|
Packit Service |
c5cf8c |
MPIR_Per_thread, per_thread, &err;;
|
|
Packit Service |
c5cf8c |
MPIR_Assert(err == 0);
|
|
Packit Service |
c5cf8c |
if (per_thread->op_errno)
|
|
Packit Service |
c5cf8c |
mpi_errno = per_thread->op_errno;
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* --BEGIN ERROR HANDLING-- */
|
|
Packit Service |
c5cf8c |
if (mpi_errno_ret)
|
|
Packit Service |
c5cf8c |
mpi_errno = mpi_errno_ret;
|
|
Packit Service |
c5cf8c |
else if (*errflag != MPIR_ERR_NONE)
|
|
Packit Service |
c5cf8c |
MPIR_ERR_SET(mpi_errno, *errflag, "**coll_fail");
|
|
Packit Service |
c5cf8c |
/* --END ERROR HANDLING-- */
|
|
Packit Service |
c5cf8c |
return mpi_errno;
|
|
Packit Service |
c5cf8c |
fn_fail:
|
|
Packit Service |
c5cf8c |
goto fn_exit;
|
|
Packit Service |
c5cf8c |
}
|