/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
/*********************** PMI implementation ********************************/
/*
* This file implements the client-side of the PMI interface.
*
* Note that the PMI client code must not print error messages (except
* when an abort is required) because MPI error handling is based on
* reporting error codes to which messages are attached.
*
* In v2, we should require a PMI client interface to use MPI error codes
* to provide better integration with MPICH.
*/
/***************************************************************************/
#include "mpichconf.h"
#define PMI_VERSION 1
#define PMI_SUBVERSION 1
#include <stdio.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_STDLIB_H
#include <stdlib.h>
#endif
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#ifdef HAVE_STRINGS_H
#include <strings.h>
#endif
#ifdef USE_PMI_PORT
#ifndef MAXHOSTNAME
#define MAXHOSTNAME 256
#endif
#endif
/* This should be moved to pmiu for shutdown */
#if defined(HAVE_SYS_SOCKET_H)
#include <sys/socket.h>
#endif
#include "mpl.h" /* Get ATTRIBUTE, some base functions */
/* mpimem includes the definitions for MPL_malloc and MPL_free */
#include "mpir_mem.h"
/* Temporary debug definitions */
/* #define DBG_PRINTF(args) printf args ; fflush(stdout) */
#define DBG_PRINTF(args)
#include "pmi.h"
#include "simple_pmiutil.h"
#include "mpi.h" /* to get MPI_MAX_PORT_NAME */
/*
These are global variable used *ONLY* in this file, and are hence
declared static.
*/
static int PMI_fd = -1;
static int PMI_size = 1;
static int PMI_rank = 0;
/* Set PMI_initialized to 1 for singleton init but no process manager
to help. Initialized to 2 for normal initialization. Initialized
to values higher than 2 when singleton_init by a process manager.
All values higher than 1 invlove a PM in some way.
*/
typedef enum { PMI_UNINITIALIZED = 0,
SINGLETON_INIT_BUT_NO_PM = 1,
NORMAL_INIT_WITH_PM,
SINGLETON_INIT_WITH_PM
} PMIState;
static PMIState PMI_initialized = PMI_UNINITIALIZED;
/* ALL GLOBAL VARIABLES MUST BE INITIALIZED TO AVOID POLLUTING THE
LIBRARY WITH COMMON SYMBOLS */
static int PMI_kvsname_max = 0;
static int PMI_keylen_max = 0;
static int PMI_vallen_max = 0;
static int PMI_debug = 0;
static int PMI_debug_init = 0; /* Set this to true to debug the init
* handshakes */
static int PMI_spawned = 0;
/* Function prototypes for internal routines */
static int PMII_getmaxes(int *kvsname_max, int *keylen_max, int *vallen_max);
static int PMII_Set_from_port(int, int);
static int PMII_Connect_to_pm(char *, int);
static int GetResponse(const char[], const char[], int);
static int getPMIFD(int *);
#ifdef USE_PMI_PORT
static int PMII_singinit(void);
static int PMI_totalview = 0;
#endif
static int PMIi_InitIfSingleton(void);
static int accept_one_connection(int);
static int cached_singinit_inuse = 0;
static char cached_singinit_key[PMIU_MAXLINE];
static char cached_singinit_val[PMIU_MAXLINE];
static char singinit_kvsname[256];
/******************************** Group functions *************************/
int PMI_Init(int *spawned)
{
char *p;
int notset = 1;
int rc;
PMI_initialized = PMI_UNINITIALIZED;
/* FIXME: Why is setvbuf commented out? */
/* FIXME: What if the output should be fully buffered (directed to file)?
* unbuffered (user explicitly set?) */
/* setvbuf(stdout,0,_IONBF,0); */
setbuf(stdout, NULL);
/* PMIU_printf(1, "PMI_INIT\n"); */
/* Get the value of PMI_DEBUG from the environment if possible, since
* we may have set it to help debug the setup process */
p = getenv("PMI_DEBUG");
if (p)
PMI_debug = atoi(p);
/* Get the fd for PMI commands; if none, we're a singleton */
rc = getPMIFD(¬set);
if (rc) {
return rc;
}
if (PMI_fd == -1) {
/* Singleton init: Process not started with mpiexec,
* so set size to 1, rank to 0 */
PMI_size = 1;
PMI_rank = 0;
*spawned = 0;
PMI_initialized = SINGLETON_INIT_BUT_NO_PM;
/* 256 is picked as the minimum allowed length by the PMI servers */
PMI_kvsname_max = 256;
PMI_keylen_max = 256;
PMI_vallen_max = 256;
return PMI_SUCCESS;
}
/* If size, rank, and debug are not set from a communication port,
* use the environment */
if (notset) {
if ((p = getenv("PMI_SIZE")))
PMI_size = atoi(p);
else
PMI_size = 1;
if ((p = getenv("PMI_RANK"))) {
PMI_rank = atoi(p);
/* Let the util routine know the rank of this process for
* any messages (usually debugging or error) */
PMIU_Set_rank(PMI_rank);
} else
PMI_rank = 0;
if ((p = getenv("PMI_DEBUG")))
PMI_debug = atoi(p);
else
PMI_debug = 0;
/* Leave unchanged otherwise, which indicates that no value
* was set */
}
/* FIXME: Why does this depend on their being a port??? */
/* FIXME: What is this for? */
#ifdef USE_PMI_PORT
if ((p = getenv("PMI_TOTALVIEW")))
PMI_totalview = atoi(p);
if (PMI_totalview) {
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
/* FIXME: This should use a cmd/response rather than a expecting the
* server to set a value in this and only this case */
/* FIXME: And it most ceratainly should not happen *before* the
* initialization handshake */
PMIU_readline(PMI_fd, buf, PMIU_MAXLINE);
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strncmp(cmd, "tv_ready", PMIU_MAXLINE) != 0) {
PMIU_printf(1, "expecting cmd=tv_ready, got %s\n", buf);
return PMI_FAIL;
}
}
#endif
PMII_getmaxes(&PMI_kvsname_max, &PMI_keylen_max, &PMI_vallen_max);
/* FIXME: This is something that the PM should tell the process,
* rather than deliver it through the environment */
if ((p = getenv("PMI_SPAWNED")))
PMI_spawned = atoi(p);
else
PMI_spawned = 0;
if (PMI_spawned)
*spawned = 1;
else
*spawned = 0;
if (!PMI_initialized)
PMI_initialized = NORMAL_INIT_WITH_PM;
return PMI_SUCCESS;
}
int PMI_Initialized(int *initialized)
{
/* Turn this into a logical value (1 or 0) . This allows us
* to use PMI_initialized to distinguish between initialized with
* an PMI service (e.g., via mpiexec) and the singleton init,
* which has no PMI service */
*initialized = (PMI_initialized != 0);
return PMI_SUCCESS;
}
int PMI_Get_size(int *size)
{
if (PMI_initialized)
*size = PMI_size;
else
*size = 1;
return PMI_SUCCESS;
}
int PMI_Get_rank(int *rank)
{
if (PMI_initialized)
*rank = PMI_rank;
else
*rank = 0;
return PMI_SUCCESS;
}
/*
* Get_universe_size is one of the routines that needs to communicate
* with the process manager. If we started as a singleton init, then
* we first need to connect to the process manager and acquire the
* needed information.
*/
int PMI_Get_universe_size(int *size)
{
int err;
char size_c[PMIU_MAXLINE];
/* Connect to the PM if we haven't already */
if (PMIi_InitIfSingleton() != 0)
return PMI_FAIL;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
err = GetResponse("cmd=get_universe_size\n", "universe_size", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("size", size_c, PMIU_MAXLINE);
*size = atoi(size_c);
return PMI_SUCCESS;
} else
return err;
} else
*size = 1;
return PMI_SUCCESS;
}
int PMI_Get_appnum(int *appnum)
{
int err;
char appnum_c[PMIU_MAXLINE];
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
err = GetResponse("cmd=get_appnum\n", "appnum", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("appnum", appnum_c, PMIU_MAXLINE);
*appnum = atoi(appnum_c);
return PMI_SUCCESS;
} else
return err;
} else
*appnum = -1;
return PMI_SUCCESS;
}
int PMI_Barrier(void)
{
int err = PMI_SUCCESS;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
err = GetResponse("cmd=barrier_in\n", "barrier_out", 0);
}
return err;
}
/* Inform the process manager that we're in finalize */
int PMI_Finalize(void)
{
int err = PMI_SUCCESS;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
err = GetResponse("cmd=finalize\n", "finalize_ack", 0);
shutdown(PMI_fd, SHUT_RDWR);
close(PMI_fd);
}
return err;
}
int PMI_Abort(int exit_code, const char error_msg[])
{
char buf[PMIU_MAXLINE];
/* include exit_code in the abort command */
MPL_snprintf(buf, PMIU_MAXLINE, "cmd=abort exitcode=%d\n", exit_code);
PMIU_printf(PMI_debug, "aborting job:\n%s\n", error_msg);
GetResponse(buf, "", 0);
/* the above command should not return */
return PMI_FAIL;
}
/************************************* Keymap functions **********************/
/*FIXME: need to return an error if the value of the kvs name returned is
truncated because it is larger than length */
/* FIXME: My name should be cached rather than re-acquired, as it is
unchanging (after singleton init) */
int PMI_KVS_Get_my_name(char kvsname[], int length)
{
int err;
if (PMI_initialized == SINGLETON_INIT_BUT_NO_PM) {
/* Return a dummy name */
/* FIXME: We need to support a distinct kvsname for each
* process group */
MPL_snprintf(kvsname, length, "singinit_kvs_%d_0", (int) getpid());
return PMI_SUCCESS;
}
err = GetResponse("cmd=get_my_kvsname\n", "my_kvsname", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("kvsname", kvsname, length);
}
return err;
}
int PMI_KVS_Get_name_length_max(int *maxlen)
{
if (maxlen == NULL)
return PMI_ERR_INVALID_ARG;
*maxlen = PMI_kvsname_max;
return PMI_SUCCESS;
}
int PMI_KVS_Get_key_length_max(int *maxlen)
{
if (maxlen == NULL)
return PMI_ERR_INVALID_ARG;
*maxlen = PMI_keylen_max;
return PMI_SUCCESS;
}
int PMI_KVS_Get_value_length_max(int *maxlen)
{
if (maxlen == NULL)
return PMI_ERR_INVALID_ARG;
*maxlen = PMI_vallen_max;
return PMI_SUCCESS;
}
int PMI_KVS_Put(const char kvsname[], const char key[], const char value[])
{
char buf[PMIU_MAXLINE];
int err = PMI_SUCCESS;
int rc;
/* This is a special hack to support singleton initialization */
if (PMI_initialized == SINGLETON_INIT_BUT_NO_PM) {
if (cached_singinit_inuse)
return PMI_FAIL;
rc = MPL_strncpy(cached_singinit_key, key, PMI_keylen_max);
if (rc != 0)
return PMI_FAIL;
rc = MPL_strncpy(cached_singinit_val, value, PMI_vallen_max);
if (rc != 0)
return PMI_FAIL;
cached_singinit_inuse = 1;
return PMI_SUCCESS;
}
rc = MPL_snprintf(buf, PMIU_MAXLINE,
"cmd=put kvsname=%s key=%s value=%s\n", kvsname, key, value);
if (rc < 0)
return PMI_FAIL;
err = GetResponse(buf, "put_result", 1);
return err;
}
int PMI_KVS_Commit(const char kvsname[]ATTRIBUTE((unused)))
{
/* no-op in this implementation */
return PMI_SUCCESS;
}
/*FIXME: need to return an error if the value returned is truncated
because it is larger than length */
int PMI_KVS_Get(const char kvsname[], const char key[], char value[], int length)
{
char buf[PMIU_MAXLINE];
int err = PMI_SUCCESS;
int rc;
/* Connect to the PM if we haven't already. This is needed in case
* we're doing an MPI_Comm_join or MPI_Comm_connect/accept from
* the singleton init case. This test is here because, in the way in
* which MPICH uses PMI, this is where the test needs to be. */
if (PMIi_InitIfSingleton() != 0)
return PMI_FAIL;
rc = MPL_snprintf(buf, PMIU_MAXLINE, "cmd=get kvsname=%s key=%s\n", kvsname, key);
if (rc < 0)
return PMI_FAIL;
err = GetResponse(buf, "get_result", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("rc", buf, PMIU_MAXLINE);
rc = atoi(buf);
if (rc == 0) {
PMIU_getval("value", value, length);
return PMI_SUCCESS;
} else {
return PMI_FAIL;
}
}
return err;
}
/*************************** Name Publishing functions **********************/
int PMI_Publish_name(const char service_name[], const char port[])
{
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
int err;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
MPL_snprintf(cmd, PMIU_MAXLINE,
"cmd=publish_name service=%s port=%s\n", service_name, port);
err = GetResponse(cmd, "publish_result", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("rc", buf, PMIU_MAXLINE);
if (strcmp(buf, "0") != 0) {
PMIU_getval("msg", buf, PMIU_MAXLINE);
PMIU_printf(PMI_debug, "publish failed; reason = %s\n", buf);
return PMI_FAIL;
}
}
} else {
PMIU_printf(1, "PMI_Publish_name called before init\n");
return PMI_FAIL;
}
return PMI_SUCCESS;
}
int PMI_Unpublish_name(const char service_name[])
{
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
int err = PMI_SUCCESS;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
MPL_snprintf(cmd, PMIU_MAXLINE, "cmd=unpublish_name service=%s\n", service_name);
err = GetResponse(cmd, "unpublish_result", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("rc", buf, PMIU_MAXLINE);
if (strcmp(buf, "0") != 0) {
PMIU_getval("msg", buf, PMIU_MAXLINE);
PMIU_printf(PMI_debug, "unpublish failed; reason = %s\n", buf);
return PMI_FAIL;
}
}
} else {
PMIU_printf(1, "PMI_Unpublish_name called before init\n");
return PMI_FAIL;
}
return PMI_SUCCESS;
}
int PMI_Lookup_name(const char service_name[], char port[])
{
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
int err;
if (PMI_initialized > SINGLETON_INIT_BUT_NO_PM) {
MPL_snprintf(cmd, PMIU_MAXLINE, "cmd=lookup_name service=%s\n", service_name);
err = GetResponse(cmd, "lookup_result", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("rc", buf, PMIU_MAXLINE);
if (strcmp(buf, "0") != 0) {
PMIU_getval("msg", buf, PMIU_MAXLINE);
PMIU_printf(PMI_debug, "lookup failed; reason = %s\n", buf);
return PMI_FAIL;
}
PMIU_getval("port", port, MPI_MAX_PORT_NAME);
}
} else {
PMIU_printf(1, "PMI_Lookup_name called before init\n");
return PMI_FAIL;
}
return PMI_SUCCESS;
}
/************************** Process Creation functions **********************/
int PMI_Spawn_multiple(int count,
const char *cmds[],
const char **argvs[],
const int maxprocs[],
const int info_keyval_sizes[],
const PMI_keyval_t * info_keyval_vectors[],
int preput_keyval_size,
const PMI_keyval_t preput_keyval_vector[], int errors[])
{
int i, rc, argcnt, spawncnt, total_num_processes, num_errcodes_found;
char buf[PMIU_MAXLINE], tempbuf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
char *lead, *lag;
/* Connect to the PM if we haven't already */
if (PMIi_InitIfSingleton() != 0)
return PMI_FAIL;
total_num_processes = 0;
for (spawncnt = 0; spawncnt < count; spawncnt++) {
total_num_processes += maxprocs[spawncnt];
rc = MPL_snprintf(buf, PMIU_MAXLINE,
"mcmd=spawn\nnprocs=%d\nexecname=%s\n",
maxprocs[spawncnt], cmds[spawncnt]);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE,
"totspawns=%d\nspawnssofar=%d\n", count, spawncnt + 1);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
argcnt = 0;
if ((argvs != NULL) && (argvs[spawncnt] != NULL)) {
for (i = 0; argvs[spawncnt][i] != NULL; i++) {
/* FIXME (protocol design flaw): command line arguments
* may contain both = and <space> (and even tab!).
*/
/* Note that part of this fixme was really a design error -
* because this uses the mcmd form, the data can be
* sent in multiple writelines. This code now takes
* advantage of that. Note also that a correct parser
* of the commands will permit any character other than a
* new line in the argument, since the form is
* argn=<any nonnewline><newline> */
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "arg%d=%s\n", i + 1, argvs[spawncnt][i]);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
argcnt++;
rc = PMIU_writeline(PMI_fd, buf);
if (rc)
return PMI_FAIL;
buf[0] = 0;
}
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "argcnt=%d\n", argcnt);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "preput_num=%d\n", preput_keyval_size);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
for (i = 0; i < preput_keyval_size; i++) {
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "preput_key_%d=%s\n",
i, preput_keyval_vector[i].key);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "preput_val_%d=%s\n",
i, preput_keyval_vector[i].val);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "info_num=%d\n", info_keyval_sizes[spawncnt]);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
for (i = 0; i < info_keyval_sizes[spawncnt]; i++) {
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "info_key_%d=%s\n",
i, info_keyval_vectors[spawncnt][i].key);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
rc = MPL_snprintf(tempbuf, PMIU_MAXLINE, "info_val_%d=%s\n",
i, info_keyval_vectors[spawncnt][i].val);
if (rc < 0) {
return PMI_FAIL;
}
rc = MPL_strnapp(buf, tempbuf, PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
}
rc = MPL_strnapp(buf, "endcmd\n", PMIU_MAXLINE);
if (rc != 0) {
return PMI_FAIL;
}
rc = PMIU_writeline(PMI_fd, buf);
if (rc) {
return PMI_FAIL;
}
}
PMIU_readline(PMI_fd, buf, PMIU_MAXLINE);
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strncmp(cmd, "spawn_result", PMIU_MAXLINE) != 0) {
PMIU_printf(1, "got unexpected response to spawn :%s:\n", buf);
return PMI_FAIL;
} else {
PMIU_getval("rc", buf, PMIU_MAXLINE);
rc = atoi(buf);
if (rc != 0) {
/* PMIU_getval("status", tempbuf, PMIU_MAXLINE); */
/* PMIU_printf(1, "pmi_spawn_mult failed; status: %s\n",tempbuf); */
return PMI_FAIL;
}
}
PMIU_Assert(errors != NULL);
if (PMIU_getval("errcodes", tempbuf, PMIU_MAXLINE)) {
num_errcodes_found = 0;
lag = &tempbuf[0];
do {
lead = strchr(lag, ',');
if (lead)
*lead = '\0';
errors[num_errcodes_found++] = atoi(lag);
lag = lead + 1; /* move past the null char */
PMIU_Assert(num_errcodes_found <= total_num_processes);
} while (lead != NULL);
PMIU_Assert(num_errcodes_found == total_num_processes);
} else {
/* gforker doesn't return errcodes, so we'll just pretend that means
* that it was going to send all `0's. */
for (i = 0; i < total_num_processes; ++i) {
errors[i] = 0;
}
}
return PMI_SUCCESS;
}
/***************** Internal routines not part of PMI interface ***************/
/* to get all maxes in one message */
/* FIXME: This mixes init with get maxes */
static int PMII_getmaxes(int *kvsname_max, int *keylen_max, int *vallen_max)
{
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
int err, rc;
rc = MPL_snprintf(buf, PMIU_MAXLINE,
"cmd=init pmi_version=%d pmi_subversion=%d\n", PMI_VERSION, PMI_SUBVERSION);
if (rc < 0) {
return PMI_FAIL;
}
rc = PMIU_writeline(PMI_fd, buf);
if (rc != 0) {
PMIU_printf(1, "Unable to write to PMI_fd\n");
return PMI_FAIL;
}
buf[0] = 0; /* Ensure buffer is empty if read fails */
err = PMIU_readline(PMI_fd, buf, PMIU_MAXLINE);
if (err < 0) {
PMIU_printf(1, "Error reading initack on %d\n", PMI_fd);
perror("Error on readline:");
PMI_Abort(-1, "Above error when reading after init");
}
PMIU_parse_keyvals(buf);
cmd[0] = 0;
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strncmp(cmd, "response_to_init", PMIU_MAXLINE) != 0) {
char errmsg[PMIU_MAXLINE * 2 + 100];
MPL_snprintf(errmsg, sizeof(errmsg),
"got unexpected response to init :%s: (full line = %s)", cmd, buf);
PMI_Abort(-1, errmsg);
} else {
char buf1[PMIU_MAXLINE];
PMIU_getval("rc", buf, PMIU_MAXLINE);
if (strncmp(buf, "0", PMIU_MAXLINE) != 0) {
PMIU_getval("pmi_version", buf, PMIU_MAXLINE);
PMIU_getval("pmi_subversion", buf1, PMIU_MAXLINE);
char errmsg[PMIU_MAXLINE * 2 + 100];
MPL_snprintf(errmsg, sizeof(errmsg),
"pmi_version mismatch; client=%d.%d mgr=%s.%s",
PMI_VERSION, PMI_SUBVERSION, buf, buf1);
PMI_Abort(-1, errmsg);
}
}
err = GetResponse("cmd=get_maxes\n", "maxes", 0);
if (err == PMI_SUCCESS) {
PMIU_getval("kvsname_max", buf, PMIU_MAXLINE);
*kvsname_max = atoi(buf);
PMIU_getval("keylen_max", buf, PMIU_MAXLINE);
*keylen_max = atoi(buf);
PMIU_getval("vallen_max", buf, PMIU_MAXLINE);
*vallen_max = atoi(buf);
}
return err;
}
/* ----------------------------------------------------------------------- */
/*
* This function is used to request information from the server and check
* that the response uses the expected command name. On a successful
* return from this routine, additional PMIU_getval calls may be used
* to access information about the returned value.
*
* If checkRc is true, this routine also checks that the rc value returned
* was 0. If not, it uses the "msg" value to report on the reason for
* the failure.
*/
static int GetResponse(const char request[], const char expectedCmd[], int checkRc)
{
int err, n;
char *p;
char recvbuf[PMIU_MAXLINE];
char cmdName[PMIU_MAXLINE];
/* FIXME: This is an example of an incorrect fix - writeline can change
* the second argument in some cases, and that will break the const'ness
* of request. Instead, writeline should take a const item and return
* an error in the case in which it currently truncates the data. */
err = PMIU_writeline(PMI_fd, (char *) request);
if (err) {
return err;
}
n = PMIU_readline(PMI_fd, recvbuf, sizeof(recvbuf));
if (n <= 0) {
PMIU_printf(1, "readline failed\n");
return PMI_FAIL;
}
err = PMIU_parse_keyvals(recvbuf);
if (err) {
PMIU_printf(1, "parse_kevals failed %d\n", err);
return err;
}
p = PMIU_getval("cmd", cmdName, sizeof(cmdName));
if (!p) {
PMIU_printf(1, "getval cmd failed\n");
return PMI_FAIL;
}
if (strcmp(expectedCmd, cmdName) != 0) {
PMIU_printf(1, "expecting cmd=%s, got %s\n", expectedCmd, cmdName);
return PMI_FAIL;
}
if (checkRc) {
p = PMIU_getval("rc", cmdName, PMIU_MAXLINE);
if (p && strcmp(cmdName, "0") != 0) {
PMIU_getval("msg", cmdName, PMIU_MAXLINE);
PMIU_printf(1, "Command %s failed, reason='%s'\n", request, cmdName);
return PMI_FAIL;
}
}
return err;
}
/* ----------------------------------------------------------------------- */
#ifdef USE_PMI_PORT
/*
* This code allows a program to contact a host/port for the PMI socket.
*/
#include <errno.h>
#if defined(HAVE_SYS_TYPES_H)
#include <sys/types.h>
#endif
#include <sys/param.h>
#include <sys/socket.h>
/* sockaddr_in (Internet) */
#include <netinet/in.h>
/* TCP_NODELAY */
#include <netinet/tcp.h>
/* sockaddr_un (Unix) */
#include <sys/un.h>
/* defs of gethostbyname */
#include <netdb.h>
/* fcntl, F_GET/SETFL */
#include <fcntl.h>
/* This is really IP!? */
#ifndef TCP
#define TCP 0
#endif
/* stub for connecting to a specified host/port instead of using a
specified fd inherited from a parent process */
static int PMII_Connect_to_pm(char *hostname, int portnum)
{
MPL_sockaddr_t addr;
int ret;
int fd;
int optval = 1;
int q_wait = 1;
ret = MPL_get_sockaddr(hostname, &addr);
if (ret) {
PMIU_printf(1, "Unable to get host entry for %s\n", hostname);
return PMI_FAIL;
}
fd = MPL_socket();
if (fd < 0) {
PMIU_printf(1, "Unable to get AF_INET socket\n");
return PMI_FAIL;
}
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &optval, sizeof(optval))) {
perror("Error calling setsockopt:");
}
/* We wait here for the connection to succeed */
ret = MPL_connect(fd, &addr, portnum);
if (ret < 0) {
switch (errno) {
case ECONNREFUSED:
PMIU_printf(1, "connect failed with connection refused\n");
/* (close socket, get new socket, try again) */
if (q_wait)
close(fd);
return PMI_FAIL;
case EINPROGRESS: /* (nonblocking) - select for writing. */
break;
case EISCONN: /* (already connected) */
break;
case ETIMEDOUT: /* timed out */
PMIU_printf(1, "connect failed with timeout\n");
return PMI_FAIL;
default:
PMIU_printf(1, "connect failed with errno %d\n", errno);
return PMI_FAIL;
}
}
return fd;
}
static int PMII_Set_from_port(int fd, int id)
{
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
int err, rc;
/* We start by sending a startup message to the server */
if (PMI_debug) {
PMIU_printf(1, "Writing initack to destination fd %d\n", fd);
}
/* Handshake and initialize from a port */
rc = MPL_snprintf(buf, PMIU_MAXLINE, "cmd=initack pmiid=%d\n", id);
if (rc < 0) {
return PMI_FAIL;
}
PMIU_printf(PMI_debug, "writing on fd %d line :%s:\n", fd, buf);
err = PMIU_writeline(fd, buf);
if (err) {
PMIU_printf(1, "Error in writeline initack\n");
return PMI_FAIL;
}
/* cmd=initack */
buf[0] = 0;
PMIU_printf(PMI_debug, "reading initack\n");
err = PMIU_readline(fd, buf, PMIU_MAXLINE);
if (err < 0) {
PMIU_printf(1, "Error reading initack on %d\n", fd);
perror("Error on readline:");
return PMI_FAIL;
}
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strcmp(cmd, "initack")) {
PMIU_printf(1, "got unexpected input %s\n", buf);
return PMI_FAIL;
}
/* Read, in order, size, rank, and debug. Eventually, we'll want
* the handshake to include a version number */
/* size */
PMIU_printf(PMI_debug, "reading size\n");
err = PMIU_readline(fd, buf, PMIU_MAXLINE);
if (err < 0) {
PMIU_printf(1, "Error reading size on %d\n", fd);
perror("Error on readline:");
return PMI_FAIL;
}
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strcmp(cmd, "set")) {
PMIU_printf(1, "got unexpected command %s in %s\n", cmd, buf);
return PMI_FAIL;
}
/* cmd=set size=n */
PMIU_getval("size", cmd, PMIU_MAXLINE);
PMI_size = atoi(cmd);
/* rank */
PMIU_printf(PMI_debug, "reading rank\n");
err = PMIU_readline(fd, buf, PMIU_MAXLINE);
if (err < 0) {
PMIU_printf(1, "Error reading rank on %d\n", fd);
perror("Error on readline:");
return PMI_FAIL;
}
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strcmp(cmd, "set")) {
PMIU_printf(1, "got unexpected command %s in %s\n", cmd, buf);
return PMI_FAIL;
}
/* cmd=set rank=n */
PMIU_getval("rank", cmd, PMIU_MAXLINE);
PMI_rank = atoi(cmd);
PMIU_Set_rank(PMI_rank);
/* debug flag */
err = PMIU_readline(fd, buf, PMIU_MAXLINE);
if (err < 0) {
PMIU_printf(1, "Error reading debug on %d\n", fd);
return PMI_FAIL;
}
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strcmp(cmd, "set")) {
PMIU_printf(1, "got unexpected command %s in %s\n", cmd, buf);
return PMI_FAIL;
}
/* cmd=set debug=n */
PMIU_getval("debug", cmd, PMIU_MAXLINE);
PMI_debug = atoi(cmd);
if (PMI_debug) {
DBG_PRINTF(("end of handshake, rank = %d, size = %d\n", PMI_rank, PMI_size));
DBG_PRINTF(("Completed init\n"));
}
return PMI_SUCCESS;
}
/* ------------------------------------------------------------------------- */
/*
* Singleton Init.
*
* MPI-2 allows processes to become MPI processes and then make MPI calls,
* such as MPI_Comm_spawn, that require a process manager (this is different
* than the much simpler case of allowing MPI programs to run with an
* MPI_COMM_WORLD of size 1 without an mpiexec or process manager).
*
* The process starts when either the client or the process manager contacts
* the other. If the client starts, it sends a singinit command and
* waits for the server to respond with its own singinit command.
* If the server start, it send a singinit command and waits for the
* client to respond with its own singinit command
*
* client sends singinit with these required values
* pmi_version=<value of PMI_VERSION>
* pmi_subversion=<value of PMI_SUBVERSION>
*
* and these optional values
* stdio=[yes|no]
* authtype=[none|shared|<other-to-be-defined>]
* authstring=<string>
*
* server sends singinit with the same required and optional values as
* above.
*
* At this point, the protocol is now the same in both cases, and has the
* following components:
*
* server sends singinit_info with these required fields
* versionok=[yes|no]
* stdio=[yes|no]
* kvsname=<string>
*
* The client then issues the init command (see PMII_getmaxes)
*
* cmd=init pmi_version=<val> pmi_subversion=<val>
*
* and expects to receive a
*
* cmd=response_to_init rc=0 pmi_version=<val> pmi_subversion=<val>
*
* (This is the usual init sequence).
*
*/
/* ------------------------------------------------------------------------- */
/* This is a special routine used to re-initialize PMI when it is in
the singleton init case. That is, the executable was started without
mpiexec, and PMI_Init returned as if there was only one process.
Note that PMI routines should not call PMII_singinit; they should
call PMIi_InitIfSingleton(), which both connects to the process mangager
and sets up the initial KVS connection entry.
*/
static int PMII_singinit(void)
{
int pid, rc;
int singinit_listen_sock, stdin_sock, stdout_sock, stderr_sock;
const char *newargv[8];
char charpid[8], port_c[8];
unsigned short port;
/* Create a socket on which to allow an mpiexec to connect back to
* us */
singinit_listen_sock = MPL_socket();
if (singinit_listen_sock == -1) {
perror("PMII_singinit: socket creation failed");
return PMI_FAIL;
}
MPL_LISTEN_PUSH(0, 5);
rc = MPL_listen_anyport(singinit_listen_sock, &port);
MPL_LISTEN_POP;
if (rc) {
perror("PMII_singinit: listen failed");
return PMI_FAIL;
}
MPL_snprintf(port_c, sizeof(port_c), "%d", port);
PMIU_printf(PMI_debug_init, "Starting mpiexec with %s\n", port_c);
/* Launch the mpiexec process with the name of this port */
pid = fork();
if (pid < 0) {
perror("PMII_singinit: fork failed");
exit(-1);
} else if (pid == 0) {
newargv[0] = "mpiexec";
newargv[1] = "-pmi_args";
newargv[2] = port_c;
/* FIXME: Use a valid hostname */
newargv[3] = "default_interface"; /* default interface name, for now */
newargv[4] = "default_key"; /* default authentication key, for now */
MPL_snprintf(charpid, sizeof(charpid), "%d", getpid());
newargv[5] = charpid;
newargv[6] = NULL;
rc = execvp(newargv[0], (char **) newargv);
perror("PMII_singinit: execv failed");
PMIU_printf(1, " This singleton init program attempted to access some feature\n");
PMIU_printf(1,
" for which process manager support was required, e.g. spawn or universe_size.\n");
PMIU_printf(1, " But the necessary mpiexec is not in your path.\n");
return PMI_FAIL;
} else {
char buf[PMIU_MAXLINE], cmd[PMIU_MAXLINE];
char *p;
int connectStdio = 0;
/* Allow one connection back from the created mpiexec program */
PMI_fd = accept_one_connection(singinit_listen_sock);
if (PMI_fd < 0) {
PMIU_printf(1, "Failed to establish singleton init connection\n");
return PMI_FAIL;
}
/* Execute the singleton init protocol */
rc = PMIU_readline(PMI_fd, buf, PMIU_MAXLINE);
PMIU_printf(PMI_debug_init, "Singinit: read %s\n", buf);
PMIU_parse_keyvals(buf);
PMIU_getval("cmd", cmd, PMIU_MAXLINE);
if (strcmp(cmd, "singinit") != 0) {
PMIU_printf(1, "unexpected command from PM: %s\n", cmd);
return PMI_FAIL;
}
p = PMIU_getval("authtype", cmd, PMIU_MAXLINE);
if (p && strcmp(cmd, "none") != 0) {
PMIU_printf(1, "unsupported authentication method %s\n", cmd);
return PMI_FAIL;
}
/* p = PMIU_getval("authstring", cmd, PMIU_MAXLINE); */
/* If we're successful, send back our own singinit */
rc = MPL_snprintf(buf, PMIU_MAXLINE,
"cmd=singinit pmi_version=%d pmi_subversion=%d stdio=yes authtype=none\n",
PMI_VERSION, PMI_SUBVERSION);
if (rc < 0) {
return PMI_FAIL;
}
PMIU_printf(PMI_debug_init, "GetResponse with %s\n", buf);
rc = GetResponse(buf, "singinit_info", 0);
if (rc != 0) {
PMIU_printf(1, "GetResponse failed\n");
return PMI_FAIL;
}
p = PMIU_getval("versionok", cmd, PMIU_MAXLINE);
if (p && strcmp(cmd, "yes") != 0) {
PMIU_printf(1, "Process manager needs a different PMI version\n");
return PMI_FAIL;
}
p = PMIU_getval("stdio", cmd, PMIU_MAXLINE);
if (p && strcmp(cmd, "yes") == 0) {
PMIU_printf(PMI_debug_init, "PM agreed to connect stdio\n");
connectStdio = 1;
}
p = PMIU_getval("kvsname", singinit_kvsname, sizeof(singinit_kvsname));
PMIU_printf(PMI_debug_init, "kvsname to use is %s\n", singinit_kvsname);
if (connectStdio) {
PMIU_printf(PMI_debug_init, "Accepting three connections for stdin, out, err\n");
stdin_sock = accept_one_connection(singinit_listen_sock);
dup2(stdin_sock, 0);
stdout_sock = accept_one_connection(singinit_listen_sock);
dup2(stdout_sock, 1);
stderr_sock = accept_one_connection(singinit_listen_sock);
dup2(stderr_sock, 2);
}
PMIU_printf(PMI_debug_init, "Done with singinit handshake\n");
}
return PMI_SUCCESS;
}
/* Promote PMI to a fully initialized version if it was started as
a singleton init */
static int PMIi_InitIfSingleton(void)
{
int rc;
static int firstcall = 1;
if (PMI_initialized != SINGLETON_INIT_BUT_NO_PM || !firstcall)
return PMI_SUCCESS;
/* We only try to init as a singleton the first time */
firstcall = 0;
/* First, start (if necessary) an mpiexec, connect to it,
* and start the singleton init handshake */
rc = PMII_singinit();
if (rc < 0)
return PMI_FAIL;
PMI_initialized = SINGLETON_INIT_WITH_PM; /* do this right away */
PMI_size = 1;
PMI_rank = 0;
PMI_debug = 0;
PMI_spawned = 0;
PMII_getmaxes(&PMI_kvsname_max, &PMI_keylen_max, &PMI_vallen_max);
/* FIXME: We need to support a distinct kvsname for each
* process group */
PMI_KVS_Put(singinit_kvsname, cached_singinit_key, cached_singinit_val);
return PMI_SUCCESS;
}
static int accept_one_connection(int list_sock)
{
int gotit, new_sock;
MPL_sockaddr_t addr;
socklen_t len;
len = sizeof(addr);
gotit = 0;
while (!gotit) {
new_sock = accept(list_sock, (struct sockaddr *) &addr, &len);
if (new_sock == -1) {
if (errno == EINTR) /* interrupted? If so, try again */
continue;
else {
PMIU_printf(1, "accept failed in accept_one_connection\n");
exit(-1);
}
} else
gotit = 1;
}
return (new_sock);
}
#endif
/* end USE_PMI_PORT */
/* Get the FD to use for PMI operations. If a port is used, rather than
a pre-established FD (i.e., via pipe), this routine will handle the
initial handshake.
*/
static int getPMIFD(int *notset)
{
char *p;
/* Set the default */
PMI_fd = -1;
p = getenv("PMI_FD");
if (p) {
PMI_fd = atoi(p);
return PMI_SUCCESS;
}
#ifdef USE_PMI_PORT
p = getenv("PMI_PORT");
if (p) {
int portnum;
char hostname[MAXHOSTNAME + 1];
char *pn, *ph;
int id = 0;
/* Connect to the indicated port (in format hostname:portnumber)
* and get the fd for the socket */
/* Split p into host and port */
pn = p;
ph = hostname;
while (*pn && *pn != ':' && (ph - hostname) < MAXHOSTNAME) {
*ph++ = *pn++;
}
*ph = 0;
if (PMI_debug) {
DBG_PRINTF(("Connecting to %s\n", p));
}
if (*pn == ':') {
portnum = atoi(pn + 1);
/* FIXME: Check for valid integer after : */
/* This routine only gets the fd to use to talk to
* the process manager. The handshake below is used
* to setup the initial values */
PMI_fd = PMII_Connect_to_pm(hostname, portnum);
if (PMI_fd < 0) {
PMIU_printf(1, "Unable to connect to %s on %d\n", hostname, portnum);
return PMI_FAIL;
}
} else {
PMIU_printf(1, "unable to decode hostport from %s\n", p);
return PMI_FAIL;
}
/* We should first handshake to get size, rank, debug. */
p = getenv("PMI_ID");
if (p) {
id = atoi(p);
/* PMII_Set_from_port sets up the values that are delivered
* by enviroment variables when a separate port is not used */
PMII_Set_from_port(PMI_fd, id);
*notset = 0;
}
return PMI_SUCCESS;
}
#endif
/* Singleton init case - its ok to return success with no fd set */
return PMI_SUCCESS;
}