|
Packit |
0848f5 |
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
|
|
Packit |
0848f5 |
/*
|
|
Packit |
0848f5 |
* (C) 2015 by Argonne National Laboratory.
|
|
Packit |
0848f5 |
* See COPYRIGHT in top-level directory.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* This test tests the overlapping of MPI_Comm_idup with other nonblocking calls
|
|
Packit |
0848f5 |
in a multithreaded setting. Each process produces a new communicator for each
|
|
Packit |
0848f5 |
of its threads. A thread will duplicate it again to produce child communicators.
|
|
Packit |
0848f5 |
When the first child communicator is ready, the thread duplicates it to produce
|
|
Packit |
0848f5 |
grandchild communicators. Meanwhile, the thread also issues nonblocking calls,
|
|
Packit |
0848f5 |
such as idup, iscan, ibcast, iallreduce, ibarrier on the communicators available.
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
The test tests both intracommunicators and intercommunicators
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
#include <stdio.h>
|
|
Packit |
0848f5 |
#include <mpi.h>
|
|
Packit |
0848f5 |
#include "mpitest.h"
|
|
Packit |
0848f5 |
#include "mpithreadtest.h"
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
#define NUM_THREADS 3 /* threads to spawn per process, must >= 1 */
|
|
Packit |
0848f5 |
#define NUM_IDUPS1 3 /* child communicators to iduplicate per thread, must >= 1 */
|
|
Packit |
0848f5 |
#define NUM_IDUPS2 3 /* grandchild communicators to iduplicate per thread, must >= 1 */
|
|
Packit |
0848f5 |
#define NUM_ITER 1 /* run the kernel this times */
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
#define check(X_) \
|
|
Packit |
0848f5 |
do { \
|
|
Packit |
0848f5 |
if (!(X_)) { \
|
|
Packit |
0848f5 |
printf("[%s:%d] -- Assertion failed: %s\n", __FILE__, __LINE__, #X_);\
|
|
Packit |
0848f5 |
MPI_Abort(MPI_COMM_WORLD, 1); \
|
|
Packit |
0848f5 |
} \
|
|
Packit |
0848f5 |
} while (0)
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int isLeft; /* Is left group of an intercomm? */
|
|
Packit |
0848f5 |
MPI_Comm parentcomms[NUM_THREADS];
|
|
Packit |
0848f5 |
MPI_Comm nbrcomms[NUM_THREADS];
|
|
Packit |
0848f5 |
int errs[NUM_THREADS] = { 0 };
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int verbose = 0;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Threads idup the communicator assigned to them NUM_IDUPS1 times. The operation
|
|
Packit |
0848f5 |
is overlapped with other non-blocking operations on the same communicator or
|
|
Packit |
0848f5 |
on a different communicator.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
MTEST_THREAD_RETURN_TYPE test_intracomm(void *arg)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
int i, j;
|
|
Packit |
0848f5 |
int root, bcastbuf;
|
|
Packit |
0848f5 |
int rank, size;
|
|
Packit |
0848f5 |
int ans[4], expected[4];
|
|
Packit |
0848f5 |
MPI_Request reqs[NUM_IDUPS1 + NUM_IDUPS2 + 10] = { MPI_REQUEST_NULL }; /* Preallocate enough reqs */
|
|
Packit |
0848f5 |
MPI_Comm comms[NUM_IDUPS1 + NUM_IDUPS2]; /* Hold all descendant comms */
|
|
Packit |
0848f5 |
int cnt;
|
|
Packit |
0848f5 |
int tid = *(int *) arg;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm parentcomm = parentcomms[tid];
|
|
Packit |
0848f5 |
MPI_Comm nbrcomm = nbrcomms[tid];
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm_rank(parentcomm, &rank;;
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_ITER; i++) {
|
|
Packit |
0848f5 |
cnt = 0;
|
|
Packit |
0848f5 |
if (*(int *) arg == rank)
|
|
Packit |
0848f5 |
MTestSleep(1);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("%d: Thread %d - comm_idup %d start\n", rank, tid, i);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Idup the parent intracomm NUM_IDUPS1 times */
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS1; j++)
|
|
Packit |
0848f5 |
MPI_Comm_idup(parentcomm, &comms[j], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Issue an iscan on parent comm to overlap with the pending idups */
|
|
Packit |
0848f5 |
MPI_Iscan(&rank, &ans[0], 1, MPI_INT, MPI_SUM, parentcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[0] = rank * (rank + 1) / 2;
|
|
Packit |
0848f5 |
/* Wait for the first child comm to be ready */
|
|
Packit |
0848f5 |
MPI_Wait(&reqs[0], MPI_STATUS_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Do Idups & iallreduce on the first child comm simultaneously */
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS2; j++)
|
|
Packit |
0848f5 |
MPI_Comm_idup(comms[0], &comms[NUM_IDUPS1 + j], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm_size(comms[0], &size);
|
|
Packit |
0848f5 |
MPI_Iallreduce(&rank, &ans[1], 1, MPI_INT, MPI_SUM, comms[0], &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[1] = (size - 1) * size / 2;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Issue an ibcast on the parent comm */
|
|
Packit |
0848f5 |
MPI_Comm_rank(parentcomm, &rank;;
|
|
Packit |
0848f5 |
ans[2] = (rank == 0) ? 199 : 111;
|
|
Packit |
0848f5 |
MPI_Ibcast(&ans[2], 1, MPI_INT, 0, parentcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[2] = 199;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Do ibarrier on the dup'ed comm */
|
|
Packit |
0848f5 |
MPI_Ibarrier(comms[0], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Issue an iscan on a neighbor comm */
|
|
Packit |
0848f5 |
MPI_Comm_rank(nbrcomm, &rank;;
|
|
Packit |
0848f5 |
MPI_Iscan(&rank, &ans[3], 1, MPI_INT, MPI_SUM, nbrcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[3] = rank * (rank + 1) / 2;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Pending operations include idup/iscan/ibcast on parentcomm
|
|
Packit |
0848f5 |
* idup/Iallreduce/Ibarrier on comms[0], and Iscan on nbrcomm */
|
|
Packit |
0848f5 |
/* Waitall even if the first request is completed */
|
|
Packit |
0848f5 |
MPI_Waitall(cnt, reqs, MPI_STATUSES_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Check the answers */
|
|
Packit |
0848f5 |
for (j = 0; j < 4; j++) {
|
|
Packit |
0848f5 |
if (ans[j] != expected[j])
|
|
Packit |
0848f5 |
errs[tid]++;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS1 + NUM_IDUPS2; j++) {
|
|
Packit |
0848f5 |
errs[tid] += MTestTestComm(comms[j]);
|
|
Packit |
0848f5 |
MPI_Comm_free(&comms[j]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("\t%d: Thread %d - comm_idup %d finish\n", rank, tid, i);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("%d: Thread %d - Done.\n", rank, tid);
|
|
Packit |
0848f5 |
return (MTEST_THREAD_RETURN_TYPE) 0;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Threads idup the communicator assigned to them NUM_IDUPS1 times. The operation
|
|
Packit |
0848f5 |
is overlapped with other non-blocking operations on the same communicator or
|
|
Packit |
0848f5 |
on a different communicator.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
MTEST_THREAD_RETURN_TYPE test_intercomm(void *arg)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
int rank, rsize, root;
|
|
Packit |
0848f5 |
int i, j;
|
|
Packit |
0848f5 |
int tid = *(int *) arg;
|
|
Packit |
0848f5 |
int ans[4], expected[4];
|
|
Packit |
0848f5 |
MPI_Comm parentcomm = parentcomms[tid];
|
|
Packit |
0848f5 |
MPI_Comm nbrcomm = nbrcomms[tid];
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Request reqs[NUM_IDUPS1 + NUM_IDUPS2 + 10] = { MPI_REQUEST_NULL }; /* Preallocate enough reqs */
|
|
Packit |
0848f5 |
MPI_Comm comms[NUM_IDUPS1 + NUM_IDUPS2]; /* Hold all descendant comms */
|
|
Packit |
0848f5 |
int cnt;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_ITER; i++) {
|
|
Packit |
0848f5 |
cnt = 0;
|
|
Packit |
0848f5 |
if (*(int *) arg == rank)
|
|
Packit |
0848f5 |
MTestSleep(1);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("%d: Thread %d - comm_idup %d start\n", rank, tid, i);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Idup the parent intracomm multiple times */
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS1; j++)
|
|
Packit |
0848f5 |
MPI_Comm_idup(parentcomm, &comms[j], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Issue an Iallreduce on parentcomm */
|
|
Packit |
0848f5 |
MPI_Comm_rank(parentcomm, &rank;;
|
|
Packit |
0848f5 |
MPI_Comm_remote_size(parentcomm, &rsize);
|
|
Packit |
0848f5 |
MPI_Iallreduce(&rank, &ans[0], 1, MPI_INT, MPI_SUM, parentcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[0] = (rsize - 1) * rsize / 2;
|
|
Packit |
0848f5 |
/* Wait for the first child comm to be ready */
|
|
Packit |
0848f5 |
MPI_Wait(&reqs[0], MPI_STATUS_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Do idup & iallreduce on the first child comm simultaneously */
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS2; j++)
|
|
Packit |
0848f5 |
MPI_Comm_idup(comms[0], &comms[NUM_IDUPS1 + j], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Comm_rank(comms[0], &rank;;
|
|
Packit |
0848f5 |
MPI_Comm_remote_size(comms[0], &rsize);
|
|
Packit |
0848f5 |
MPI_Iallreduce(&rank, &ans[1], 1, MPI_INT, MPI_SUM, comms[0], &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[1] = (rsize - 1) * rsize / 2;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Issue an ibcast on parentcomm */
|
|
Packit |
0848f5 |
MPI_Comm_rank(parentcomm, &rank;;
|
|
Packit |
0848f5 |
if (isLeft) {
|
|
Packit |
0848f5 |
if (rank == 0) {
|
|
Packit |
0848f5 |
root = MPI_ROOT;
|
|
Packit |
0848f5 |
ans[2] = 199;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
else {
|
|
Packit |
0848f5 |
root = MPI_PROC_NULL;
|
|
Packit |
0848f5 |
ans[2] = 199; /* not needed, just to make correctness checking easier */
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
else {
|
|
Packit |
0848f5 |
root = 0;
|
|
Packit |
0848f5 |
ans[2] = 111; /* garbage value */
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MPI_Ibcast(&ans[2], 1, MPI_INT, root, parentcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[2] = 199;
|
|
Packit |
0848f5 |
MPI_Ibarrier(comms[0], &reqs[cnt++]);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Do an Iscan on a neighbor comm */
|
|
Packit |
0848f5 |
MPI_Comm_rank(nbrcomm, &rank;;
|
|
Packit |
0848f5 |
MPI_Comm_remote_size(nbrcomm, &rsize);
|
|
Packit |
0848f5 |
MPI_Iallreduce(&rank, &ans[3], 1, MPI_INT, MPI_SUM, nbrcomm, &reqs[cnt++]);
|
|
Packit |
0848f5 |
expected[3] = (rsize - 1) * rsize / 2;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Pending operations include idup/iallreduce/ibcast on parentcomm
|
|
Packit |
0848f5 |
* Iallreduce/Ibarrier on comms[0], and Iallreduce on nbrcomm */
|
|
Packit |
0848f5 |
/* Waitall even if the first request is completed */
|
|
Packit |
0848f5 |
MPI_Waitall(cnt, reqs, MPI_STATUSES_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* Check the answers */
|
|
Packit |
0848f5 |
for (j = 0; j < 4; j++) {
|
|
Packit |
0848f5 |
if (ans[j] != expected[j])
|
|
Packit |
0848f5 |
errs[tid]++;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
for (j = 0; j < NUM_IDUPS1 + NUM_IDUPS2; j++) {
|
|
Packit |
0848f5 |
errs[tid] += MTestTestComm(comms[j]);
|
|
Packit |
0848f5 |
MPI_Comm_free(&comms[j]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("\t%d: Thread %d - comm_idup %d finish\n", rank, tid, i);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
if (verbose)
|
|
Packit |
0848f5 |
printf("%d: Thread %d - Done.\n", rank, tid);
|
|
Packit |
0848f5 |
return (MTEST_THREAD_RETURN_TYPE) 0;
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
int main(int argc, char **argv)
|
|
Packit |
0848f5 |
{
|
|
Packit |
0848f5 |
int thread_args[NUM_THREADS];
|
|
Packit |
0848f5 |
MPI_Request requests[NUM_THREADS * 2];
|
|
Packit |
0848f5 |
int i, provided;
|
|
Packit |
0848f5 |
MPI_Comm newcomm;
|
|
Packit |
0848f5 |
int toterrs = 0;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
check(provided == MPI_THREAD_MULTIPLE);
|
|
Packit |
0848f5 |
check(NUM_IDUPS1 >= 1 && NUM_IDUPS2 >= 1);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* In each iteration, the process generates a new kind of intracommunicator, then
|
|
Packit |
0848f5 |
* uses idup to duplicate the communicator for NUM_THREADS threads.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
while (MTestGetIntracommGeneral(&newcomm, 1, 1)) {
|
|
Packit |
0848f5 |
if (newcomm == MPI_COMM_NULL)
|
|
Packit |
0848f5 |
continue;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
MPI_Comm_idup(newcomm, &parentcomms[i], &requests[2 * i]);
|
|
Packit |
0848f5 |
MPI_Comm_idup(newcomm, &nbrcomms[i], &requests[2 * i + 1]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MPI_Waitall(NUM_THREADS * 2, requests, MPI_STATUSES_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
thread_args[i] = i;
|
|
Packit |
0848f5 |
MTest_Start_thread(test_intracomm, (void *) &thread_args[i]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MTest_Join_threads();
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
toterrs += errs[i];
|
|
Packit |
0848f5 |
MPI_Comm_free(&parentcomms[i]);
|
|
Packit |
0848f5 |
MPI_Comm_free(&nbrcomms[i]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MTestFreeComm(&newcomm);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
/* In each iteration, the process generates a new kind of intercommunicator, then
|
|
Packit |
0848f5 |
* uses idup to duplicate the communicator for NUM_THREADS threads.
|
|
Packit |
0848f5 |
*/
|
|
Packit |
0848f5 |
while (MTestGetIntercomm(&newcomm, &isLeft, 1)) {
|
|
Packit |
0848f5 |
if (newcomm == MPI_COMM_NULL)
|
|
Packit |
0848f5 |
continue;
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
MPI_Comm_idup(newcomm, &parentcomms[i], &requests[2 * i]);
|
|
Packit |
0848f5 |
MPI_Comm_idup(newcomm, &nbrcomms[i], &requests[2 * i + 1]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MPI_Waitall(NUM_THREADS * 2, requests, MPI_STATUSES_IGNORE);
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
thread_args[i] = i;
|
|
Packit |
0848f5 |
MTest_Start_thread(test_intercomm, (void *) &thread_args[i]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MTest_Join_threads();
|
|
Packit |
0848f5 |
|
|
Packit |
0848f5 |
for (i = 0; i < NUM_THREADS; i++) {
|
|
Packit |
0848f5 |
toterrs += errs[i];
|
|
Packit |
0848f5 |
MPI_Comm_free(&parentcomms[i]);
|
|
Packit |
0848f5 |
MPI_Comm_free(&nbrcomms[i]);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MTestFreeComm(&newcomm);
|
|
Packit |
0848f5 |
}
|
|
Packit |
0848f5 |
MTest_Finalize(toterrs);
|
|
Packit |
0848f5 |
MPI_Finalize();
|
|
Packit |
0848f5 |
return 0;
|
|
Packit |
0848f5 |
}
|