/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2009 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ /* * Run concurrent sends to different target processes. Stresses an * implementation that permits concurrent sends to different targets. * * By turning on verbose output, some simple performance data will be output. * * Use nonblocking sends, and have a single thread complete all I/O. */ #include #include #include "mpi.h" #include "mpitest.h" #include "mpithreadtest.h" /* This is the master test routine */ #define MAX_CNT 660000 /* #define MAX_LOOP 200 */ #define MAX_LOOP 10 #define MAX_NTHREAD 128 static int ownerWaits = 0; static int nthreads = -1; MTEST_THREAD_RETURN_TYPE run_test_sendrecv(void *arg); MTEST_THREAD_RETURN_TYPE run_test_sendrecv(void *arg) { int cnt, j, *buf, wsize, tag; int thread_num = (int) (long) arg; double t; int myrloc = 2 * thread_num; static MPI_Request r[2 * MAX_NTHREAD]; MPI_Comm_size(MPI_COMM_WORLD, &wsize); if (wsize >= MAX_NTHREAD) wsize = MAX_NTHREAD; /* Sanity check */ if (nthreads != wsize) fprintf(stderr, "Panic wsize = %d nthreads = %d\n", wsize, nthreads); for (cnt = 1, tag = 1; cnt < MAX_CNT; cnt = 2 * cnt, tag++) { buf = (int *) malloc(2 * cnt * sizeof(int)); MTEST_VG_MEM_INIT(buf, 2 * cnt * sizeof(int)); /* Wait for all senders to be ready */ MTest_thread_barrier(nthreads); t = MPI_Wtime(); for (j = 0; j < MAX_LOOP; j++) { MTest_thread_barrier(nthreads); MPI_Isend(buf, cnt, MPI_INT, thread_num, tag, MPI_COMM_WORLD, &r[myrloc]); MPI_Irecv(buf + cnt, cnt, MPI_INT, thread_num, tag, MPI_COMM_WORLD, &r[myrloc + 1]); /* Wait for all threads to start the sends */ if (ownerWaits) { MPI_Waitall(2, &r[myrloc], MPI_STATUSES_IGNORE); } else { /* Wait for all threads to create their requests */ MTest_thread_barrier(nthreads); if (thread_num == 1) MPI_Waitall(2 * wsize, r, MPI_STATUSES_IGNORE); } } t = MPI_Wtime() - t; /* can't free the buffers until the requests are completed */ MTest_thread_barrier(nthreads); free(buf); if (thread_num == 1) MTestPrintfMsg(1, "buf size %d: time %f\n", cnt, t / MAX_LOOP); } return (MTEST_THREAD_RETURN_TYPE) NULL; } int main(int argc, char **argv) { int i, pmode, nprocs, rank; int errs = 0, err; MTest_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode); if (pmode != MPI_THREAD_MULTIPLE) { fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n"); MPI_Abort(MPI_COMM_WORLD, -1); } MPI_Comm_size(MPI_COMM_WORLD, &nprocs); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (nprocs < 2) { fprintf(stderr, "Need at least two processes\n"); MPI_Abort(MPI_COMM_WORLD, 1); } if (nprocs > MAX_NTHREAD) nprocs = MAX_NTHREAD; err = MTest_thread_barrier_init(); if (err) { fprintf(stderr, "Could not create thread barrier\n"); MPI_Abort(MPI_COMM_WORLD, 1); } MPI_Barrier(MPI_COMM_WORLD); if (rank < MAX_NTHREAD) { int nt = nprocs; if (nt > MAX_NTHREAD) nt = MAX_NTHREAD; nthreads = nt; for (i = 0; i < nt; i++) MTest_Start_thread(run_test_sendrecv, (void *) (long) i); } MTest_Join_threads(); MTest_thread_barrier_free(); MTest_Finalize(errs); return MTestReturnValue(errs); }