Blame test/mpi/manual/testconnectserial.c

Packit 0848f5
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit 0848f5
/*
Packit 0848f5
 *  (C) 2008 by Argonne National Laboratory.
Packit 0848f5
 *      See COPYRIGHT in top-level directory.
Packit 0848f5
 */
Packit 0848f5
#include "mpi.h"
Packit 0848f5
#include <pthread.h>
Packit 0848f5
#include <signal.h>
Packit 0848f5
#include <stdio.h>
Packit 0848f5
#include <stdlib.h>
Packit 0848f5
#include <string.h>
Packit 0848f5
#include <time.h>
Packit 0848f5
#include <unistd.h>
Packit 0848f5
#include "mpitest.h"
Packit 0848f5
Packit 0848f5
#include "connectstuff.h"
Packit 0848f5
Packit 0848f5
int main(int argc, char **argv)
Packit 0848f5
{
Packit 0848f5
    MPI_Comm tmp, comm, startComm;
Packit 0848f5
    char *fname;
Packit 0848f5
    char *actualFname = NULL;
Packit 0848f5
    char *globalFname = NULL;
Packit 0848f5
    int totalSize, expectedRank, size, cachedRank;
Packit 0848f5
    char portName[MPI_MAX_PORT_NAME];
Packit 0848f5
    int rankToAccept = 1;
Packit 0848f5
Packit 0848f5
    /* Debug - print out where we picked up the MPICH build from */
Packit 0848f5
#ifdef MPICHLIBSTR
Packit 0848f5
    msg("MPICH library taken from: %s\n", MPICHLIBSTR);
Packit 0848f5
#endif
Packit 0848f5
Packit 0848f5
    if (argc != 4) {
Packit 0848f5
        printf("Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0]);
Packit 0848f5
        exit(1);
Packit 0848f5
    }
Packit 0848f5
Packit 0848f5
    /* This is the base name of the file into which we write the port */
Packit 0848f5
    fname = argv[1];
Packit 0848f5
    /* This is the total number of processes launched */
Packit 0848f5
    totalSize = atoi(argv[2]);
Packit 0848f5
    /* Each process knows its expected rank */
Packit 0848f5
    expectedRank = atoi(argv[3]) - 1;
Packit 0848f5
Packit 0848f5
    /* Start a watchdog thread which will abort after 120 seconds, and will
Packit 0848f5
     * print stack traces using GDB every 5 seconds if you don't call
Packit 0848f5
     * strokeWatchdog() */
Packit 0848f5
    startWatchdog(120);
Packit 0848f5
Packit 0848f5
    /* Print a debug header */
Packit 0848f5
    msg("Waiting for: %d - my rank is %d\n", totalSize, expectedRank);
Packit 0848f5
Packit 0848f5
    /* Singleton init */
Packit 0848f5
    MPI_Init(0, 0);
Packit 0848f5
Packit 0848f5
    /* Duplicate from MPI_COMM_SELF the starting point */
Packit 0848f5
    MPI_Comm_dup(MPI_COMM_SELF, &startComm);
Packit 0848f5
Packit 0848f5
Packit 0848f5
    if (expectedRank == 0) {
Packit 0848f5
        /* This process opens the port, and writes the information to the file */
Packit 0848f5
        MPI_Open_port(MPI_INFO_NULL, portName);
Packit 0848f5
Packit 0848f5
        /* Write the port to fname.<rank> so that the connecting processes can
Packit 0848f5
         * wait their turn by checking for the correct file to show up */
Packit 0848f5
        actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
Packit 0848f5
Packit 0848f5
        /* The wrapper script I'm using checks for the existance of "fname", so
Packit 0848f5
         * create that - even though it isn't used  */
Packit 0848f5
        globalFname = writePortToFile(portName, fname);
Packit 0848f5
        installExitHandler(globalFname);
Packit 0848f5
Packit 0848f5
        comm = startComm;
Packit 0848f5
    }
Packit 0848f5
    else {
Packit 0848f5
        char *readPort;
Packit 0848f5
        readPort = getPortFromFile("%s.%d", fname, expectedRank);
Packit 0848f5
        strncpy(portName, readPort, MPI_MAX_PORT_NAME);
Packit 0848f5
        free(readPort);
Packit 0848f5
        msg("Read port <%s>\n", portName);
Packit 0848f5
Packit 0848f5
        MPI_Comm_connect(portName, MPI_INFO_NULL, 0, startComm, &comm);
Packit 0848f5
        MPI_Intercomm_merge(comm, 1, &tmp);
Packit 0848f5
        comm = tmp;
Packit 0848f5
        MPI_Comm_size(comm, &size);
Packit 0848f5
        msg("After my first merge, size is now: %d\n", size);
Packit 0848f5
    }
Packit 0848f5
    while (size < totalSize) {
Packit 0848f5
        /* Make sure we don't print a stack until we stall */
Packit 0848f5
        strokeWatchdog();
Packit 0848f5
Packit 0848f5
        /* Accept the connection */
Packit 0848f5
        MPI_Comm_accept(portName, MPI_INFO_NULL, 0, comm, &tmp);
Packit 0848f5
Packit 0848f5
        /* Merge into intracomm */
Packit 0848f5
        MPI_Intercomm_merge(tmp, 0, &comm);
Packit 0848f5
Packit 0848f5
        /* Free the intercomm */
Packit 0848f5
        MPI_Comm_free(&tmp);
Packit 0848f5
Packit 0848f5
        /* See where we're up to */
Packit 0848f5
        MPI_Comm_rank(comm, &cachedRank);
Packit 0848f5
        MPI_Comm_size(comm, &size);
Packit 0848f5
Packit 0848f5
        if (expectedRank == 0) {
Packit 0848f5
            msg("Up to size: %d\n", size);
Packit 0848f5
Packit 0848f5
            /* Delete the old file, create the new one */
Packit 0848f5
            unlink(actualFname);
Packit 0848f5
            free(actualFname);
Packit 0848f5
Packit 0848f5
            /* Allow the next rank to connect */
Packit 0848f5
            actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
Packit 0848f5
        }
Packit 0848f5
    }
Packit 0848f5
    MPI_Comm_rank(comm, &cachedRank);
Packit 0848f5
Packit 0848f5
    msg("All done - I got rank: %d.\n", cachedRank);
Packit 0848f5
Packit 0848f5
    MPI_Barrier(comm);
Packit 0848f5
Packit 0848f5
    if (expectedRank == 0) {
Packit 0848f5
Packit 0848f5
        /* Cleanup on rank zero - delete some files */
Packit 0848f5
        MTestSleep(4);
Packit 0848f5
        unlink(actualFname);
Packit 0848f5
        free(actualFname);
Packit 0848f5
        unlink(globalFname);
Packit 0848f5
        free(globalFname);
Packit 0848f5
Packit 0848f5
        /* This lets my wrapper script know that we did everything correctly */
Packit 0848f5
        indicateConnectSucceeded();
Packit 0848f5
    }
Packit 0848f5
    MPI_Finalize();
Packit 0848f5
Packit 0848f5
    return 0;
Packit 0848f5
}