|
Packit Service |
c5cf8c |
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
|
|
Packit Service |
c5cf8c |
/*
|
|
Packit Service |
c5cf8c |
* (C) 2008 by Argonne National Laboratory.
|
|
Packit Service |
c5cf8c |
* See COPYRIGHT in top-level directory.
|
|
Packit Service |
c5cf8c |
*/
|
|
Packit Service |
c5cf8c |
#include "mpi.h"
|
|
Packit Service |
c5cf8c |
#include <pthread.h>
|
|
Packit Service |
c5cf8c |
#include <signal.h>
|
|
Packit Service |
c5cf8c |
#include <stdio.h>
|
|
Packit Service |
c5cf8c |
#include <stdlib.h>
|
|
Packit Service |
c5cf8c |
#include <string.h>
|
|
Packit Service |
c5cf8c |
#include <time.h>
|
|
Packit Service |
c5cf8c |
#include <unistd.h>
|
|
Packit Service |
c5cf8c |
#include "mpitest.h"
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
#include "connectstuff.h"
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
int main(int argc, char **argv)
|
|
Packit Service |
c5cf8c |
{
|
|
Packit Service |
c5cf8c |
MPI_Comm tmp, comm, startComm;
|
|
Packit Service |
c5cf8c |
char *fname;
|
|
Packit Service |
c5cf8c |
char *actualFname = NULL;
|
|
Packit Service |
c5cf8c |
char *globalFname = NULL;
|
|
Packit Service |
c5cf8c |
int totalSize, expectedRank, size, cachedRank;
|
|
Packit Service |
c5cf8c |
char portName[MPI_MAX_PORT_NAME];
|
|
Packit Service |
c5cf8c |
int rankToAccept = 1;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Debug - print out where we picked up the MPICH build from */
|
|
Packit Service |
c5cf8c |
#ifdef MPICHLIBSTR
|
|
Packit Service |
c5cf8c |
msg("MPICH library taken from: %s\n", MPICHLIBSTR);
|
|
Packit Service |
c5cf8c |
#endif
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (argc != 4) {
|
|
Packit Service |
c5cf8c |
printf("Usage: %s <fname> <totalSize> <idx-1-based>\n", argv[0]);
|
|
Packit Service |
c5cf8c |
exit(1);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* This is the base name of the file into which we write the port */
|
|
Packit Service |
c5cf8c |
fname = argv[1];
|
|
Packit Service |
c5cf8c |
/* This is the total number of processes launched */
|
|
Packit Service |
c5cf8c |
totalSize = atoi(argv[2]);
|
|
Packit Service |
c5cf8c |
/* Each process knows its expected rank */
|
|
Packit Service |
c5cf8c |
expectedRank = atoi(argv[3]) - 1;
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Start a watchdog thread which will abort after 120 seconds, and will
|
|
Packit Service |
c5cf8c |
* print stack traces using GDB every 5 seconds if you don't call
|
|
Packit Service |
c5cf8c |
* strokeWatchdog() */
|
|
Packit Service |
c5cf8c |
startWatchdog(120);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Print a debug header */
|
|
Packit Service |
c5cf8c |
msg("Waiting for: %d - my rank is %d\n", totalSize, expectedRank);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Singleton init */
|
|
Packit Service |
c5cf8c |
MPI_Init(0, 0);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Duplicate from MPI_COMM_SELF the starting point */
|
|
Packit Service |
c5cf8c |
MPI_Comm_dup(MPI_COMM_SELF, &startComm);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (expectedRank == 0) {
|
|
Packit Service |
c5cf8c |
/* This process opens the port, and writes the information to the file */
|
|
Packit Service |
c5cf8c |
MPI_Open_port(MPI_INFO_NULL, portName);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Write the port to fname.<rank> so that the connecting processes can
|
|
Packit Service |
c5cf8c |
* wait their turn by checking for the correct file to show up */
|
|
Packit Service |
c5cf8c |
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* The wrapper script I'm using checks for the existance of "fname", so
|
|
Packit Service |
c5cf8c |
* create that - even though it isn't used */
|
|
Packit Service |
c5cf8c |
globalFname = writePortToFile(portName, fname);
|
|
Packit Service |
c5cf8c |
installExitHandler(globalFname);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
comm = startComm;
|
|
Packit Service |
c5cf8c |
} else {
|
|
Packit Service |
c5cf8c |
char *readPort;
|
|
Packit Service |
c5cf8c |
readPort = getPortFromFile("%s.%d", fname, expectedRank);
|
|
Packit Service |
c5cf8c |
strncpy(portName, readPort, MPI_MAX_PORT_NAME);
|
|
Packit Service |
c5cf8c |
free(readPort);
|
|
Packit Service |
c5cf8c |
msg("Read port <%s>\n", portName);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPI_Comm_connect(portName, MPI_INFO_NULL, 0, startComm, &comm);
|
|
Packit Service |
c5cf8c |
MPI_Intercomm_merge(comm, 1, &tmp);
|
|
Packit Service |
c5cf8c |
comm = tmp;
|
|
Packit Service |
c5cf8c |
MPI_Comm_size(comm, &size);
|
|
Packit Service |
c5cf8c |
msg("After my first merge, size is now: %d\n", size);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
while (size < totalSize) {
|
|
Packit Service |
c5cf8c |
/* Make sure we don't print a stack until we stall */
|
|
Packit Service |
c5cf8c |
strokeWatchdog();
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Accept the connection */
|
|
Packit Service |
c5cf8c |
MPI_Comm_accept(portName, MPI_INFO_NULL, 0, comm, &tmp);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Merge into intracomm */
|
|
Packit Service |
c5cf8c |
MPI_Intercomm_merge(tmp, 0, &comm);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Free the intercomm */
|
|
Packit Service |
c5cf8c |
MPI_Comm_free(&tmp);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* See where we're up to */
|
|
Packit Service |
c5cf8c |
MPI_Comm_rank(comm, &cachedRank);
|
|
Packit Service |
c5cf8c |
MPI_Comm_size(comm, &size);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (expectedRank == 0) {
|
|
Packit Service |
c5cf8c |
msg("Up to size: %d\n", size);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Delete the old file, create the new one */
|
|
Packit Service |
c5cf8c |
unlink(actualFname);
|
|
Packit Service |
c5cf8c |
free(actualFname);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Allow the next rank to connect */
|
|
Packit Service |
c5cf8c |
actualFname = writePortToFile(portName, "%s.%d", fname, rankToAccept++);
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
MPI_Comm_rank(comm, &cachedRank);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
msg("All done - I got rank: %d.\n", cachedRank);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
MPI_Barrier(comm);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
if (expectedRank == 0) {
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* Cleanup on rank zero - delete some files */
|
|
Packit Service |
c5cf8c |
MTestSleep(4);
|
|
Packit Service |
c5cf8c |
unlink(actualFname);
|
|
Packit Service |
c5cf8c |
free(actualFname);
|
|
Packit Service |
c5cf8c |
unlink(globalFname);
|
|
Packit Service |
c5cf8c |
free(globalFname);
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
/* This lets my wrapper script know that we did everything correctly */
|
|
Packit Service |
c5cf8c |
indicateConnectSucceeded();
|
|
Packit Service |
c5cf8c |
}
|
|
Packit Service |
c5cf8c |
MPI_Finalize();
|
|
Packit Service |
c5cf8c |
|
|
Packit Service |
c5cf8c |
return 0;
|
|
Packit Service |
c5cf8c |
}
|