Blame test/mpi/threads/comm/idup_nb.c

Packit 0848f5
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit 0848f5
/*
Packit 0848f5
 *  (C) 2015 by Argonne National Laboratory.
Packit 0848f5
 *      See COPYRIGHT in top-level directory.
Packit 0848f5
 */
Packit 0848f5
Packit 0848f5
/* This test tests the overlapping of MPI_Comm_idup with other nonblocking calls
Packit 0848f5
  in a multithreaded setting. Each process produces a new communicator for each
Packit 0848f5
  of its threads.  A thread will duplicate it again to produce child communicators.
Packit 0848f5
  When the first child communicator is ready, the thread duplicates it to produce
Packit 0848f5
  grandchild communicators. Meanwhile, the thread also issues nonblocking calls,
Packit 0848f5
  such as idup, iscan, ibcast, iallreduce, ibarrier on the communicators available.
Packit 0848f5
Packit 0848f5
  The test tests both intracommunicators and intercommunicators
Packit 0848f5
*/
Packit 0848f5
Packit 0848f5
#include <stdio.h>
Packit 0848f5
#include <mpi.h>
Packit 0848f5
#include "mpitest.h"
Packit 0848f5
#include "mpithreadtest.h"
Packit 0848f5
Packit 0848f5
#define NUM_THREADS  3  /* threads to spawn per process, must >= 1 */
Packit 0848f5
#define NUM_IDUPS1   3  /* child communicators to iduplicate per thread, must >= 1 */
Packit 0848f5
#define NUM_IDUPS2   3  /* grandchild communicators to iduplicate per thread, must >= 1 */
Packit 0848f5
#define NUM_ITER     1  /* run the kernel this times */
Packit 0848f5
Packit 0848f5
#define check(X_)       \
Packit 0848f5
    do {                \
Packit 0848f5
        if (!(X_)) {    \
Packit 0848f5
            printf("[%s:%d] -- Assertion failed: %s\n", __FILE__, __LINE__, #X_);\
Packit 0848f5
            MPI_Abort(MPI_COMM_WORLD, 1); \
Packit 0848f5
        }               \
Packit 0848f5
    } while (0)
Packit 0848f5
Packit 0848f5
int isLeft;                     /* Is left group of an intercomm? */
Packit 0848f5
MPI_Comm parentcomms[NUM_THREADS];
Packit 0848f5
MPI_Comm nbrcomms[NUM_THREADS];
Packit 0848f5
int errs[NUM_THREADS] = { 0 };
Packit 0848f5
Packit 0848f5
int verbose = 0;
Packit 0848f5
Packit 0848f5
/* Threads idup the communicator assigned to them NUM_IDUPS1 times. The operation
Packit 0848f5
   is overlapped with other non-blocking operations on the same communicator or
Packit 0848f5
   on a different communicator.
Packit 0848f5
*/
Packit 0848f5
MTEST_THREAD_RETURN_TYPE test_intracomm(void *arg)
Packit 0848f5
{
Packit 0848f5
    int i, j;
Packit 0848f5
    int root, bcastbuf;
Packit 0848f5
    int rank, size;
Packit 0848f5
    int ans[4], expected[4];
Packit 0848f5
    MPI_Request reqs[NUM_IDUPS1 + NUM_IDUPS2 + 10] = { MPI_REQUEST_NULL };      /* Preallocate enough reqs */
Packit 0848f5
    MPI_Comm comms[NUM_IDUPS1 + NUM_IDUPS2];    /* Hold all descendant comms */
Packit 0848f5
    int cnt;
Packit 0848f5
    int tid = *(int *) arg;
Packit 0848f5
Packit 0848f5
    MPI_Comm parentcomm = parentcomms[tid];
Packit 0848f5
    MPI_Comm nbrcomm = nbrcomms[tid];
Packit 0848f5
Packit 0848f5
    MPI_Comm_rank(parentcomm, &rank;;
Packit 0848f5
    for (i = 0; i < NUM_ITER; i++) {
Packit 0848f5
        cnt = 0;
Packit 0848f5
        if (*(int *) arg == rank)
Packit 0848f5
            MTestSleep(1);
Packit 0848f5
Packit 0848f5
        if (verbose)
Packit 0848f5
            printf("%d: Thread %d - comm_idup %d start\n", rank, tid, i);
Packit 0848f5
Packit 0848f5
        /* Idup the parent intracomm NUM_IDUPS1 times */
Packit 0848f5
        for (j = 0; j < NUM_IDUPS1; j++)
Packit 0848f5
            MPI_Comm_idup(parentcomm, &comms[j], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        /* Issue an iscan on parent comm to overlap with the pending idups */
Packit 0848f5
        MPI_Iscan(&rank, &ans[0], 1, MPI_INT, MPI_SUM, parentcomm, &reqs[cnt++]);
Packit 0848f5
        expected[0] = rank * (rank + 1) / 2;
Packit 0848f5
        /* Wait for the first child comm to be ready */
Packit 0848f5
        MPI_Wait(&reqs[0], MPI_STATUS_IGNORE);
Packit 0848f5
Packit 0848f5
        /* Do Idups & iallreduce on the first child comm simultaneously */
Packit 0848f5
        for (j = 0; j < NUM_IDUPS2; j++)
Packit 0848f5
            MPI_Comm_idup(comms[0], &comms[NUM_IDUPS1 + j], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        MPI_Comm_size(comms[0], &size);
Packit 0848f5
        MPI_Iallreduce(&rank, &ans[1], 1, MPI_INT, MPI_SUM, comms[0], &reqs[cnt++]);
Packit 0848f5
        expected[1] = (size - 1) * size / 2;
Packit 0848f5
Packit 0848f5
        /* Issue an ibcast on the parent comm */
Packit 0848f5
        MPI_Comm_rank(parentcomm, &rank;;
Packit 0848f5
        ans[2] = (rank == 0) ? 199 : 111;
Packit 0848f5
        MPI_Ibcast(&ans[2], 1, MPI_INT, 0, parentcomm, &reqs[cnt++]);
Packit 0848f5
        expected[2] = 199;
Packit 0848f5
Packit 0848f5
        /* Do ibarrier on the dup'ed comm */
Packit 0848f5
        MPI_Ibarrier(comms[0], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        /* Issue an iscan on a neighbor comm */
Packit 0848f5
        MPI_Comm_rank(nbrcomm, &rank;;
Packit 0848f5
        MPI_Iscan(&rank, &ans[3], 1, MPI_INT, MPI_SUM, nbrcomm, &reqs[cnt++]);
Packit 0848f5
        expected[3] = rank * (rank + 1) / 2;
Packit 0848f5
Packit 0848f5
        /* Pending operations include idup/iscan/ibcast on parentcomm
Packit 0848f5
         * idup/Iallreduce/Ibarrier on comms[0], and Iscan on nbrcomm */
Packit 0848f5
        /* Waitall even if the first request is completed */
Packit 0848f5
        MPI_Waitall(cnt, reqs, MPI_STATUSES_IGNORE);
Packit 0848f5
Packit 0848f5
        /* Check the answers */
Packit 0848f5
        for (j = 0; j < 4; j++) {
Packit 0848f5
            if (ans[j] != expected[j])
Packit 0848f5
                errs[tid]++;
Packit 0848f5
        }
Packit 0848f5
        for (j = 0; j < NUM_IDUPS1 + NUM_IDUPS2; j++) {
Packit 0848f5
            errs[tid] += MTestTestComm(comms[j]);
Packit 0848f5
            MPI_Comm_free(&comms[j]);
Packit 0848f5
        }
Packit 0848f5
        if (verbose)
Packit 0848f5
            printf("\t%d: Thread %d - comm_idup %d finish\n", rank, tid, i);
Packit 0848f5
    }
Packit 0848f5
    if (verbose)
Packit 0848f5
        printf("%d: Thread %d - Done.\n", rank, tid);
Packit 0848f5
    return (MTEST_THREAD_RETURN_TYPE) 0;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* Threads idup the communicator assigned to them NUM_IDUPS1 times. The operation
Packit 0848f5
   is overlapped with other non-blocking operations on the same communicator or
Packit 0848f5
   on a different communicator.
Packit 0848f5
*/
Packit 0848f5
MTEST_THREAD_RETURN_TYPE test_intercomm(void *arg)
Packit 0848f5
{
Packit 0848f5
    int rank, rsize, root;
Packit 0848f5
    int i, j;
Packit 0848f5
    int tid = *(int *) arg;
Packit 0848f5
    int ans[4], expected[4];
Packit 0848f5
    MPI_Comm parentcomm = parentcomms[tid];
Packit 0848f5
    MPI_Comm nbrcomm = nbrcomms[tid];
Packit 0848f5
Packit 0848f5
    MPI_Request reqs[NUM_IDUPS1 + NUM_IDUPS2 + 10] = { MPI_REQUEST_NULL };      /* Preallocate enough reqs */
Packit 0848f5
    MPI_Comm comms[NUM_IDUPS1 + NUM_IDUPS2];    /* Hold all descendant comms */
Packit 0848f5
    int cnt;
Packit 0848f5
Packit 0848f5
    for (i = 0; i < NUM_ITER; i++) {
Packit 0848f5
        cnt = 0;
Packit 0848f5
        if (*(int *) arg == rank)
Packit 0848f5
            MTestSleep(1);
Packit 0848f5
Packit 0848f5
        if (verbose)
Packit 0848f5
            printf("%d: Thread %d - comm_idup %d start\n", rank, tid, i);
Packit 0848f5
Packit 0848f5
        /* Idup the parent intracomm multiple times */
Packit 0848f5
        for (j = 0; j < NUM_IDUPS1; j++)
Packit 0848f5
            MPI_Comm_idup(parentcomm, &comms[j], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        /* Issue an Iallreduce on parentcomm */
Packit 0848f5
        MPI_Comm_rank(parentcomm, &rank;;
Packit 0848f5
        MPI_Comm_remote_size(parentcomm, &rsize);
Packit 0848f5
        MPI_Iallreduce(&rank, &ans[0], 1, MPI_INT, MPI_SUM, parentcomm, &reqs[cnt++]);
Packit 0848f5
        expected[0] = (rsize - 1) * rsize / 2;
Packit 0848f5
        /* Wait for the first child comm to be ready */
Packit 0848f5
        MPI_Wait(&reqs[0], MPI_STATUS_IGNORE);
Packit 0848f5
Packit 0848f5
        /* Do idup & iallreduce on the first child comm simultaneously */
Packit 0848f5
        for (j = 0; j < NUM_IDUPS2; j++)
Packit 0848f5
            MPI_Comm_idup(comms[0], &comms[NUM_IDUPS1 + j], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        MPI_Comm_rank(comms[0], &rank;;
Packit 0848f5
        MPI_Comm_remote_size(comms[0], &rsize);
Packit 0848f5
        MPI_Iallreduce(&rank, &ans[1], 1, MPI_INT, MPI_SUM, comms[0], &reqs[cnt++]);
Packit 0848f5
        expected[1] = (rsize - 1) * rsize / 2;
Packit 0848f5
Packit 0848f5
        /* Issue an ibcast on parentcomm */
Packit 0848f5
        MPI_Comm_rank(parentcomm, &rank;;
Packit 0848f5
        if (isLeft) {
Packit 0848f5
            if (rank == 0) {
Packit 0848f5
                root = MPI_ROOT;
Packit 0848f5
                ans[2] = 199;
Packit 0848f5
            }
Packit 0848f5
            else {
Packit 0848f5
                root = MPI_PROC_NULL;
Packit 0848f5
                ans[2] = 199;   /* not needed, just to make correctness checking easier */
Packit 0848f5
            }
Packit 0848f5
        }
Packit 0848f5
        else {
Packit 0848f5
            root = 0;
Packit 0848f5
            ans[2] = 111;       /* garbage value */
Packit 0848f5
        }
Packit 0848f5
        MPI_Ibcast(&ans[2], 1, MPI_INT, root, parentcomm, &reqs[cnt++]);
Packit 0848f5
        expected[2] = 199;
Packit 0848f5
        MPI_Ibarrier(comms[0], &reqs[cnt++]);
Packit 0848f5
Packit 0848f5
        /* Do an Iscan on a neighbor comm */
Packit 0848f5
        MPI_Comm_rank(nbrcomm, &rank;;
Packit 0848f5
        MPI_Comm_remote_size(nbrcomm, &rsize);
Packit 0848f5
        MPI_Iallreduce(&rank, &ans[3], 1, MPI_INT, MPI_SUM, nbrcomm, &reqs[cnt++]);
Packit 0848f5
        expected[3] = (rsize - 1) * rsize / 2;
Packit 0848f5
Packit 0848f5
        /* Pending operations include idup/iallreduce/ibcast on parentcomm
Packit 0848f5
         * Iallreduce/Ibarrier on comms[0], and Iallreduce on nbrcomm */
Packit 0848f5
        /* Waitall even if the first request is completed */
Packit 0848f5
        MPI_Waitall(cnt, reqs, MPI_STATUSES_IGNORE);
Packit 0848f5
Packit 0848f5
        /* Check the answers */
Packit 0848f5
        for (j = 0; j < 4; j++) {
Packit 0848f5
            if (ans[j] != expected[j])
Packit 0848f5
                errs[tid]++;
Packit 0848f5
        }
Packit 0848f5
        for (j = 0; j < NUM_IDUPS1 + NUM_IDUPS2; j++) {
Packit 0848f5
            errs[tid] += MTestTestComm(comms[j]);
Packit 0848f5
            MPI_Comm_free(&comms[j]);
Packit 0848f5
        }
Packit 0848f5
        if (verbose)
Packit 0848f5
            printf("\t%d: Thread %d - comm_idup %d finish\n", rank, tid, i);
Packit 0848f5
    }
Packit 0848f5
    if (verbose)
Packit 0848f5
        printf("%d: Thread %d - Done.\n", rank, tid);
Packit 0848f5
    return (MTEST_THREAD_RETURN_TYPE) 0;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
Packit 0848f5
int main(int argc, char **argv)
Packit 0848f5
{
Packit 0848f5
    int thread_args[NUM_THREADS];
Packit 0848f5
    MPI_Request requests[NUM_THREADS * 2];
Packit 0848f5
    int i, provided;
Packit 0848f5
    MPI_Comm newcomm;
Packit 0848f5
    int toterrs = 0;
Packit 0848f5
Packit 0848f5
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
Packit 0848f5
Packit 0848f5
    check(provided == MPI_THREAD_MULTIPLE);
Packit 0848f5
    check(NUM_IDUPS1 >= 1 && NUM_IDUPS2 >= 1);
Packit 0848f5
Packit 0848f5
    /* In each iteration, the process generates a new kind of intracommunicator, then
Packit 0848f5
     * uses idup to duplicate the communicator for NUM_THREADS threads.
Packit 0848f5
     */
Packit 0848f5
    while (MTestGetIntracommGeneral(&newcomm, 1, 1)) {
Packit 0848f5
        if (newcomm == MPI_COMM_NULL)
Packit 0848f5
            continue;
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            MPI_Comm_idup(newcomm, &parentcomms[i], &requests[2 * i]);
Packit 0848f5
            MPI_Comm_idup(newcomm, &nbrcomms[i], &requests[2 * i + 1]);
Packit 0848f5
        }
Packit 0848f5
        MPI_Waitall(NUM_THREADS * 2, requests, MPI_STATUSES_IGNORE);
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            thread_args[i] = i;
Packit 0848f5
            MTest_Start_thread(test_intracomm, (void *) &thread_args[i]);
Packit 0848f5
        }
Packit 0848f5
        MTest_Join_threads();
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            toterrs += errs[i];
Packit 0848f5
            MPI_Comm_free(&parentcomms[i]);
Packit 0848f5
            MPI_Comm_free(&nbrcomms[i]);
Packit 0848f5
        }
Packit 0848f5
        MTestFreeComm(&newcomm);
Packit 0848f5
    }
Packit 0848f5
Packit 0848f5
    /* In each iteration, the process generates a new kind of intercommunicator, then
Packit 0848f5
     * uses idup to duplicate the communicator for NUM_THREADS threads.
Packit 0848f5
     */
Packit 0848f5
    while (MTestGetIntercomm(&newcomm, &isLeft, 1)) {
Packit 0848f5
        if (newcomm == MPI_COMM_NULL)
Packit 0848f5
            continue;
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            MPI_Comm_idup(newcomm, &parentcomms[i], &requests[2 * i]);
Packit 0848f5
            MPI_Comm_idup(newcomm, &nbrcomms[i], &requests[2 * i + 1]);
Packit 0848f5
        }
Packit 0848f5
        MPI_Waitall(NUM_THREADS * 2, requests, MPI_STATUSES_IGNORE);
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            thread_args[i] = i;
Packit 0848f5
            MTest_Start_thread(test_intercomm, (void *) &thread_args[i]);
Packit 0848f5
        }
Packit 0848f5
        MTest_Join_threads();
Packit 0848f5
Packit 0848f5
        for (i = 0; i < NUM_THREADS; i++) {
Packit 0848f5
            toterrs += errs[i];
Packit 0848f5
            MPI_Comm_free(&parentcomms[i]);
Packit 0848f5
            MPI_Comm_free(&nbrcomms[i]);
Packit 0848f5
        }
Packit 0848f5
        MTestFreeComm(&newcomm);
Packit 0848f5
    }
Packit 0848f5
    MTest_Finalize(toterrs);
Packit 0848f5
    MPI_Finalize();
Packit 0848f5
    return 0;
Packit 0848f5
}