/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * * (C) 2003 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ #include "mpi.h" #include "mpitest.h" #include "mpithreadtest.h" #include #include #define MAX_THREADS 4 /* #define LOOPS 10000 */ #define LOOPS 1000 #define WINDOW 16 /* #define MAX_MSG_SIZE 16384 */ #define MAX_MSG_SIZE 4096 #define HOP 4 /* Emulated thread private storage */ struct tp { int thread_id; int use_proc_null; int use_blocking_comm; int msg_size; double latency; } tp[MAX_THREADS]; int size, rank; char sbuf[MAX_MSG_SIZE], rbuf[MAX_MSG_SIZE]; static int verbose = 0; static volatile int num_threads; static MTEST_THREAD_LOCK_TYPE num_threads_lock; #define ABORT_MSG(msg_) do { printf("%s", (msg_)); MPI_Abort(MPI_COMM_WORLD, 1); } while (0) MTEST_THREAD_RETURN_TYPE run_test(void *arg); MTEST_THREAD_RETURN_TYPE run_test(void *arg) { int thread_id = (int) (long) arg; int i, j, peer; MPI_Status status[WINDOW]; MPI_Request req[WINDOW]; double start, end; int err; int local_num_threads = -1; if (tp[thread_id].use_proc_null) peer = MPI_PROC_NULL; else peer = (rank % 2) ? rank - 1 : rank + 1; err = MTest_thread_lock(&num_threads_lock); if (err) ABORT_MSG("unable to acquire lock, aborting\n"); local_num_threads = num_threads; err = MTest_thread_unlock(&num_threads_lock); if (err) ABORT_MSG("unable to release lock, aborting\n"); MTest_thread_barrier(num_threads); start = MPI_Wtime(); if (tp[thread_id].use_blocking_comm) { if ((rank % 2) == 0) { for (i = 0; i < LOOPS; i++) for (j = 0; j < WINDOW; j++) MPI_Send(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD); } else { for (i = 0; i < LOOPS; i++) for (j = 0; j < WINDOW; j++) MPI_Recv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD, &status[0]); } } else { for (i = 0; i < LOOPS; i++) { if ((rank % 2) == 0) { for (j = 0; j < WINDOW; j++) MPI_Isend(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD, &req[j]); } else { for (j = 0; j < WINDOW; j++) MPI_Irecv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD, &req[j]); } MPI_Waitall(WINDOW, req, status); } } end = MPI_Wtime(); tp[thread_id].latency = 1000000.0 * (end - start) / (LOOPS * WINDOW); MTest_thread_barrier(num_threads); return MTEST_THREAD_RETVAL_IGN; } void loops(void); void loops(void) { int i, nt; double latency, mrate, avg_latency, agg_mrate; int err; err = MTest_thread_lock_create(&num_threads_lock); if (err) ABORT_MSG("unable to create lock, aborting\n"); for (nt = 1; nt <= MAX_THREADS; nt++) { err = MTest_thread_lock(&num_threads_lock); if (err) ABORT_MSG("unable to acquire lock, aborting\n"); num_threads = 1; MPI_Barrier(MPI_COMM_WORLD); MTest_thread_barrier_init(); for (i = 1; i < nt; i++) { err = MTest_Start_thread(run_test, (void *) (long) i); if (err) { /* attempt to continue with fewer threads, we may be on a * thread-constrained platform like BG/P in DUAL mode */ break; } ++num_threads; } err = MTest_thread_unlock(&num_threads_lock); if (err) ABORT_MSG("unable to release lock, aborting\n"); if (nt > 1 && num_threads <= 1) { ABORT_MSG("unable to create any additional threads, aborting\n"); } run_test((void *) 0); /* we are thread 0 */ err = MTest_Join_threads(); if (err) { printf("error joining threads, err=%d", err); MPI_Abort(MPI_COMM_WORLD, 1); } MTest_thread_barrier_free(); latency = 0; for (i = 0; i < num_threads; i++) latency += tp[i].latency; latency /= num_threads; /* Average latency */ mrate = num_threads / latency; /* Message rate */ /* Global latency and message rate */ MPI_Reduce(&latency, &avg_latency, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); avg_latency /= size; MPI_Reduce(&mrate, &agg_mrate, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); if (!rank && verbose) { printf("Threads: %d; Latency: %.3f; Mrate: %.3f\n", num_threads, latency, mrate); } } err = MTest_thread_lock_free(&num_threads_lock); if (err) ABORT_MSG("unable to free lock, aborting\n"); } int main(int argc, char **argv) { int pmode, i, j; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode); if (pmode != MPI_THREAD_MULTIPLE) { fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n"); MPI_Abort(MPI_COMM_WORLD, -1); } MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (getenv("MPITEST_VERBOSE")) verbose = 1; /* For communication, we need an even number of processes */ if (size % 2) { fprintf(stderr, "This test needs an even number of processes\n"); MPI_Abort(MPI_COMM_WORLD, -1); } /* PROC_NULL */ for (i = 0; i < MAX_THREADS; i++) { tp[i].thread_id = i; tp[i].use_proc_null = 1; tp[i].use_blocking_comm = 1; tp[i].msg_size = 0; } if (!rank && verbose) { printf("\nUsing MPI_PROC_NULL\n"); printf("-------------------\n"); } loops(); /* Blocking communication */ for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) { for (i = 0; i < MAX_THREADS; i++) { tp[i].thread_id = i; tp[i].use_proc_null = 0; tp[i].use_blocking_comm = 1; tp[i].msg_size = j; } if (!rank && verbose) { printf("\nBlocking communication with message size %6d bytes\n", j); printf("------------------------------------------------------\n"); } loops(); } /* Non-blocking communication */ for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) { for (i = 0; i < MAX_THREADS; i++) { tp[i].thread_id = i; tp[i].use_proc_null = 0; tp[i].use_blocking_comm = 0; tp[i].msg_size = j; } if (!rank && verbose) { printf("\nNon-blocking communication with message size %6d bytes\n", j); printf("----------------------------------------------------------\n"); } loops(); } if (rank == 0) { printf(" No Errors\n"); } MPI_Finalize(); return 0; }