/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2008 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpi.h"
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "mpitest.h"
#include "connectstuff.h"
int main(int argc, char **argv)
{
MPI_Comm tmp, comm, startComm;
char *fname;
char *actualFname = NULL;
char *globalFname = NULL;
int totalSize, expectedRank, size, cachedRank;
char portName[MPI_MAX_PORT_NAME];
int rankToAccept = 1;
/* Debug - print out where we picked up the MPICH build from */
#ifdef MPICHLIBSTR
msg("MPICH library taken from: %s\n", MPICHLIBSTR);
#endif
if (argc != 4) {
printf("Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0]);
exit(1);
}
/* This is the base name of the file into which we write the port */
fname = argv[1];
/* This is the total number of processes launched */
totalSize = atoi(argv[2]);
/* Each process knows its expected rank */
expectedRank = atoi(argv[3]) - 1;
/* Start a watchdog thread which will abort after 120 seconds, and will
* print stack traces using GDB every 5 seconds if you don't call
* strokeWatchdog() */
startWatchdog(120);
/* Print a debug header */
msg("Waiting for: %d - my rank is %d\n", totalSize, expectedRank);
/* Singleton init */
MPI_Init(0, 0);
/* Duplicate from MPI_COMM_SELF the starting point */
MPI_Comm_dup(MPI_COMM_SELF, &startComm);
if (expectedRank == 0) {
/* This process opens the port, and writes the information to the file */
MPI_Open_port(MPI_INFO_NULL, portName);
/* Write the port to fname.<rank> so that the connecting processes can
* wait their turn by checking for the correct file to show up */
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
/* The wrapper script I'm using checks for the existance of "fname", so
* create that - even though it isn't used */
globalFname = writePortToFile(portName, fname);
installExitHandler(globalFname);
comm = startComm;
}
else {
char *readPort;
readPort = getPortFromFile("%s.%d", fname, expectedRank);
strncpy(portName, readPort, MPI_MAX_PORT_NAME);
free(readPort);
msg("Read port <%s>\n", portName);
MPI_Comm_connect(portName, MPI_INFO_NULL, 0, startComm, &comm);
MPI_Intercomm_merge(comm, 1, &tmp);
comm = tmp;
MPI_Comm_size(comm, &size);
msg("After my first merge, size is now: %d\n", size);
}
while (size < totalSize) {
/* Make sure we don't print a stack until we stall */
strokeWatchdog();
/* Accept the connection */
MPI_Comm_accept(portName, MPI_INFO_NULL, 0, comm, &tmp);
/* Merge into intracomm */
MPI_Intercomm_merge(tmp, 0, &comm);
/* Free the intercomm */
MPI_Comm_free(&tmp);
/* See where we're up to */
MPI_Comm_rank(comm, &cachedRank);
MPI_Comm_size(comm, &size);
if (expectedRank == 0) {
msg("Up to size: %d\n", size);
/* Delete the old file, create the new one */
unlink(actualFname);
free(actualFname);
/* Allow the next rank to connect */
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
}
}
MPI_Comm_rank(comm, &cachedRank);
msg("All done - I got rank: %d.\n", cachedRank);
MPI_Barrier(comm);
if (expectedRank == 0) {
/* Cleanup on rank zero - delete some files */
MTestSleep(4);
unlink(actualFname);
free(actualFname);
unlink(globalFname);
free(globalFname);
/* This lets my wrapper script know that we did everything correctly */
indicateConnectSucceeded();
}
MPI_Finalize();
return 0;
}