Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *
 *  (C) 2003 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpi.h"
#include "mpitest.h"
#include "mpithreadtest.h"
#include <stdio.h>
#include <stdlib.h>

#define MAX_THREADS 4
/* #define LOOPS 10000 */
#define LOOPS 1000
#define WINDOW 16
/* #define MAX_MSG_SIZE 16384 */
#define MAX_MSG_SIZE 4096
#define HOP 4

/* Emulated thread private storage */
struct tp {
    int thread_id;
    int use_proc_null;
    int use_blocking_comm;
    int msg_size;
    double latency;
} tp[MAX_THREADS];

int size, rank;
char sbuf[MAX_MSG_SIZE], rbuf[MAX_MSG_SIZE];
static int verbose = 0;
static volatile int num_threads;
static MTEST_THREAD_LOCK_TYPE num_threads_lock;

#define ABORT_MSG(msg_) do { printf("%s", (msg_)); MPI_Abort(MPI_COMM_WORLD, 1); } while (0)

MTEST_THREAD_RETURN_TYPE run_test(void *arg);
MTEST_THREAD_RETURN_TYPE run_test(void *arg)
{
    int thread_id = (int) (long) arg;
    int i, j, peer;
    MPI_Status status[WINDOW];
    MPI_Request req[WINDOW];
    double start, end;
    int err;
    int local_num_threads = -1;

    if (tp[thread_id].use_proc_null)
        peer = MPI_PROC_NULL;
    else
        peer = (rank % 2) ? rank - 1 : rank + 1;

    err = MTest_thread_lock(&num_threads_lock);
    if (err)
        ABORT_MSG("unable to acquire lock, aborting\n");
    local_num_threads = num_threads;
    err = MTest_thread_unlock(&num_threads_lock);
    if (err)
        ABORT_MSG("unable to release lock, aborting\n");

    MTest_thread_barrier(num_threads);

    start = MPI_Wtime();

    if (tp[thread_id].use_blocking_comm) {
        if ((rank % 2) == 0) {
            for (i = 0; i < LOOPS; i++)
                for (j = 0; j < WINDOW; j++)
                    MPI_Send(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD);
        }
        else {
            for (i = 0; i < LOOPS; i++)
                for (j = 0; j < WINDOW; j++)
                    MPI_Recv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
                             &status[0]);
        }
    }
    else {
        for (i = 0; i < LOOPS; i++) {
            if ((rank % 2) == 0) {
                for (j = 0; j < WINDOW; j++)
                    MPI_Isend(sbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
                              &req[j]);
            }
            else {
                for (j = 0; j < WINDOW; j++)
                    MPI_Irecv(rbuf, tp[thread_id].msg_size, MPI_CHAR, peer, 0, MPI_COMM_WORLD,
                              &req[j]);
            }
            MPI_Waitall(WINDOW, req, status);
        }
    }

    end = MPI_Wtime();
    tp[thread_id].latency = 1000000.0 * (end - start) / (LOOPS * WINDOW);

    MTest_thread_barrier(num_threads);
    return MTEST_THREAD_RETVAL_IGN;
}

void loops(void);
void loops(void)
{
    int i, nt;
    double latency, mrate, avg_latency, agg_mrate;
    int err;

    err = MTest_thread_lock_create(&num_threads_lock);
    if (err)
        ABORT_MSG("unable to create lock, aborting\n");

    for (nt = 1; nt <= MAX_THREADS; nt++) {
        err = MTest_thread_lock(&num_threads_lock);
        if (err)
            ABORT_MSG("unable to acquire lock, aborting\n");

        num_threads = 1;
        MPI_Barrier(MPI_COMM_WORLD);
        MTest_thread_barrier_init();

        for (i = 1; i < nt; i++) {
            err = MTest_Start_thread(run_test, (void *) (long) i);
            if (err) {
                /* attempt to continue with fewer threads, we may be on a
                 * thread-constrained platform like BG/P in DUAL mode */
                break;
            }
            ++num_threads;
        }

        err = MTest_thread_unlock(&num_threads_lock);
        if (err)
            ABORT_MSG("unable to release lock, aborting\n");

        if (nt > 1 && num_threads <= 1) {
            ABORT_MSG("unable to create any additional threads, aborting\n");
        }

        run_test((void *) 0);   /* we are thread 0 */
        err = MTest_Join_threads();
        if (err) {
            printf("error joining threads, err=%d", err);
            MPI_Abort(MPI_COMM_WORLD, 1);
        }

        MTest_thread_barrier_free();

        latency = 0;
        for (i = 0; i < num_threads; i++)
            latency += tp[i].latency;
        latency /= num_threads; /* Average latency */
        mrate = num_threads / latency;  /* Message rate */

        /* Global latency and message rate */
        MPI_Reduce(&latency, &avg_latency, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
        avg_latency /= size;
        MPI_Reduce(&mrate, &agg_mrate, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

        if (!rank && verbose) {
            printf("Threads: %d; Latency: %.3f; Mrate: %.3f\n", num_threads, latency, mrate);
        }
    }

    err = MTest_thread_lock_free(&num_threads_lock);
    if (err)
        ABORT_MSG("unable to free lock, aborting\n");
}

int main(int argc, char **argv)
{
    int pmode, i, j;

    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &pmode);
    if (pmode != MPI_THREAD_MULTIPLE) {
        fprintf(stderr, "Thread Multiple not supported by the MPI implementation\n");
        MPI_Abort(MPI_COMM_WORLD, -1);
    }

    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (getenv("MPITEST_VERBOSE"))
        verbose = 1;

    /* For communication, we need an even number of processes */
    if (size % 2) {
        fprintf(stderr, "This test needs an even number of processes\n");
        MPI_Abort(MPI_COMM_WORLD, -1);
    }

    /* PROC_NULL */
    for (i = 0; i < MAX_THREADS; i++) {
        tp[i].thread_id = i;
        tp[i].use_proc_null = 1;
        tp[i].use_blocking_comm = 1;
        tp[i].msg_size = 0;
    }
    if (!rank && verbose) {
        printf("\nUsing MPI_PROC_NULL\n");
        printf("-------------------\n");
    }
    loops();

    /* Blocking communication */
    for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
        for (i = 0; i < MAX_THREADS; i++) {
            tp[i].thread_id = i;
            tp[i].use_proc_null = 0;
            tp[i].use_blocking_comm = 1;
            tp[i].msg_size = j;
        }
        if (!rank && verbose) {
            printf("\nBlocking communication with message size %6d bytes\n", j);
            printf("------------------------------------------------------\n");
        }
        loops();
    }

    /* Non-blocking communication */
    for (j = 0; j < MAX_MSG_SIZE; j = (!j ? 1 : j * HOP)) {
        for (i = 0; i < MAX_THREADS; i++) {
            tp[i].thread_id = i;
            tp[i].use_proc_null = 0;
            tp[i].use_blocking_comm = 0;
            tp[i].msg_size = j;
        }
        if (!rank && verbose) {
            printf("\nNon-blocking communication with message size %6d bytes\n", j);
            printf("----------------------------------------------------------\n");
        }
        loops();
    }

    if (rank == 0) {
        printf(" No Errors\n");
    }

    MPI_Finalize();

    return 0;
}