/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2007 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "pmi2compat.h"
#include "simple2pmi.h"
#include "simple_pmiutil.h"
#include "pmi2.h"
#include <stdio.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_STDLIB_H
#include <stdlib.h>
#endif
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#ifdef HAVE_STRINGS_H
#include <strings.h>
#endif
#if defined(HAVE_SYS_SOCKET_H)
#include <sys/socket.h>
#endif
#ifdef USE_PMI_PORT
#ifndef MAXHOSTNAME
#define MAXHOSTNAME 256
#endif
#endif
#define PMII_EXIT_CODE -1
#define PMI_VERSION 2
#define PMI_SUBVERSION 0
#define MAX_INT_STR_LEN 11 /* number of digits in MAX_UINT + 1 */
typedef enum { PMI2_UNINITIALIZED = 0,
SINGLETON_INIT_BUT_NO_PM = 1,
NORMAL_INIT_WITH_PM,
SINGLETON_INIT_WITH_PM } PMI2State;
static PMI2State PMI2_initialized = PMI2_UNINITIALIZED;
static int PMI2_debug = 0;
static int PMI2_fd = -1;
static int PMI2_size = 1;
static int PMI2_rank = 0;
static int PMI2_debug_init = 0; /* Set this to true to debug the init */
int PMI2_pmiverbose = 0; /* Set this to true to print PMI debugging info */
#ifdef MPICH_IS_THREADED
static MPID_Thread_mutex_t mutex;
static int blocked = FALSE;
static MPID_Thread_cond_t cond;
#endif
/* XXX DJG the "const"s on both of these functions and the Keyvalpair
* struct are wrong in the isCopy==TRUE case! */
/* init_kv_str -- fills in keyvalpair. val is required to be a
null-terminated string. isCopy is set to FALSE, so caller must
free key and val memory, if necessary.
*/
static void init_kv_str(PMI2_Keyvalpair *kv, const char key[], const char val[])
{
kv->key = key;
kv->value = val;
kv->valueLen = strlen(val);
kv->isCopy = FALSE;
}
/* same as init_kv_str, but strdup's the key and val first, and sets isCopy=TRUE */
static void init_kv_strdup(PMI2_Keyvalpair *kv, const char key[], const char val[])
{
/* XXX DJG could be slightly more efficient */
init_kv_str(kv, PMI2U_Strdup(key), PMI2U_Strdup(val));
kv->isCopy = TRUE;
}
/* same as init_kv_strdup, but converts val into a string first */
/* XXX DJG could be slightly more efficient */
static void init_kv_strdup_int(PMI2_Keyvalpair *kv, const char key[], int val)
{
char tmpbuf[32] = {0};
int rc = PMI2_SUCCESS;
rc = PMI2U_Snprintf(tmpbuf, sizeof(tmpbuf), "%d", val);
PMI2U_Assert(rc >= 0);
init_kv_strdup(kv, key, tmpbuf);
}
/* initializes the key with ("%s%d", key_prefix, suffix), uses a string value */
/* XXX DJG could be slightly more efficient */
static void init_kv_strdup_intsuffix(PMI2_Keyvalpair *kv, const char key_prefix[], int suffix, const char val[])
{
char tmpbuf[256/*XXX HACK*/] = {0};
int rc = PMI2_SUCCESS;
rc = PMI2U_Snprintf(tmpbuf, sizeof(tmpbuf), "%s%d", key_prefix, suffix);
PMI2U_Assert(rc >= 0);
init_kv_strdup(kv, tmpbuf, val);
}
static int getPMIFD(void);
static int PMIi_ReadCommandExp( int fd, PMI2_Command *cmd, const char *exp, int* rc, const char **errmsg );
static int PMIi_ReadCommand( int fd, PMI2_Command *cmd );
static int PMIi_WriteSimpleCommand( int fd, PMI2_Command *resp, const char cmd[], PMI2_Keyvalpair *pairs[], int npairs);
static int PMIi_WriteSimpleCommandStr( int fd, PMI2_Command *resp, const char cmd[], ...);
static int PMIi_InitIfSingleton(void);
static void freepairs(PMI2_Keyvalpair** pairs, int npairs);
static int getval(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, const char **value, int *vallen);
static int getvalint(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, int *val);
static int getvalptr(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, void *val);
static int getvalbool(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, int *val);
static int accept_one_connection(int list_sock);
static int GetResponse(const char request[], const char expectedCmd[], int checkRc);
static void dump_PMI2_Command(FILE *file, PMI2_Command *cmd);
static void dump_PMI2_Keyvalpair(FILE *file, PMI2_Keyvalpair *kv);
typedef struct pending_item
{
struct pending_item *next;
PMI2_Command *cmd;
} pending_item_t;
pending_item_t *pendingq_head = NULL;
pending_item_t *pendingq_tail = NULL;
static inline void ENQUEUE(PMI2_Command *cmd)
{
pending_item_t *pi = PMI2U_Malloc(sizeof(pending_item_t));
pi->next = NULL;
pi->cmd = cmd;
if (pendingq_head == NULL) {
pendingq_head = pendingq_tail = pi;
} else {
pendingq_tail->next = pi;
pendingq_tail = pi;
}
}
static inline int SEARCH_REMOVE(PMI2_Command *cmd)
{
pending_item_t *pi, *prev;
pi = pendingq_head;
if (pi->cmd == cmd) {
pendingq_head = pi->next;
if (pendingq_head == NULL)
pendingq_tail = NULL;
PMI2U_Free(pi);
return 1;
}
prev = pi;
pi = pi->next;
for ( ; pi ; pi = pi->next) {
if (pi->cmd == cmd) {
prev->next = pi->next;
if (prev->next == NULL)
pendingq_tail = prev;
PMI2U_Free(pi);
return 1;
}
}
return 0;
}
/* ------------------------------------------------------------------------- */
/* PMI API Routines */
/* ------------------------------------------------------------------------- */
int PMI2_Init(int *spawned, int *size, int *rank, int *appnum)
{
int pmi2_errno = PMI2_SUCCESS;
char *p;
char buf[PMI2U_MAXLINE], cmdline[PMI2U_MAXLINE];
char *jobid;
char *pmiid;
int ret;
MPID_Thread_mutex_create(&mutex, &ret);
PMI2U_Assert(!ret);
MPID_Thread_cond_create(&cond, &ret);
PMI2U_Assert(!ret);
/* FIXME: Why is setvbuf commented out? */
/* FIXME: What if the output should be fully buffered (directed to file)?
unbuffered (user explicitly set?) */
/* setvbuf(stdout,0,_IONBF,0); */
setbuf(stdout, NULL);
/* PMI2U_printf( 1, "PMI2_INIT\n" ); */
/* Get the value of PMI2_DEBUG from the environment if possible, since
we may have set it to help debug the setup process */
p = getenv( "PMI2_DEBUG" );
if (p)
PMI2_debug = atoi( p );
/* Get the fd for PMI commands; if none, we're a singleton */
pmi2_errno = getPMIFD();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
if ( PMI2_fd == -1 ) {
/* Singleton init: Process not started with mpiexec,
so set size to 1, rank to 0 */
PMI2_size = 1;
PMI2_rank = 0;
*spawned = 0;
PMI2_initialized = SINGLETON_INIT_BUT_NO_PM;
goto fn_exit;
}
/* do initial PMI1 init */
ret = PMI2U_Snprintf( buf, PMI2U_MAXLINE, "cmd=init pmi_version=%d pmi_subversion=%d\n", PMI_VERSION, PMI_SUBVERSION );
PMI2U_ERR_CHKANDJUMP1(ret < 0, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "failed to generate init line");
ret = PMI2U_writeline(PMI2_fd, buf);
PMI2U_ERR_CHKANDJUMP(ret < 0, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_init_send");
ret = PMI2U_readline(PMI2_fd, buf, PMI2U_MAXLINE);
PMI2U_ERR_CHKANDJUMP1(ret < 0, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_initack", "**pmi2_initack %s", strerror(errno));
PMI2U_parse_keyvals( buf );
cmdline[0] = 0;
PMI2U_getval( "cmd", cmdline, PMI2U_MAXLINE );
PMI2U_ERR_CHKANDJUMP(strncmp( cmdline, "response_to_init", PMI2U_MAXLINE ) != 0, pmi2_errno, PMI2_ERR_OTHER, "**bad_cmd");
PMI2U_getval( "rc", buf, PMI2U_MAXLINE );
if ( strncmp( buf, "0", PMI2U_MAXLINE ) != 0 ) {
char buf1[PMI2U_MAXLINE];
PMI2U_getval( "pmi_version", buf, PMI2U_MAXLINE );
PMI2U_getval( "pmi_subversion", buf1, PMI2U_MAXLINE );
PMI2U_ERR_SETANDJUMP4(pmi2_errno, PMI2_ERR_OTHER, "**pmi2_version", "**pmi2_version %s %s %d %d", buf, buf1, PMI_VERSION, PMI_SUBVERSION);
}
/* do full PMI2 init */
{
PMI2_Keyvalpair pairs[3];
PMI2_Keyvalpair *pairs_p[] = { pairs, pairs+1, pairs+2 };
int npairs = 0;
int isThreaded = 0;
const char *errmsg;
int rc;
int found;
int version, subver;
const char *spawner_jobid;
int spawner_jobid_len;
PMI2_Command cmd = {0};
int debugged;
jobid = getenv("PMI_JOBID");
if (jobid) {
init_kv_str(&pairs[npairs], PMIJOBID_KEY, jobid);
++npairs;
}
pmiid = getenv("PMI_ID");
if (pmiid) {
init_kv_str(&pairs[npairs], PMIRANK_KEY, pmiid);
++npairs;
}
else {
pmiid = getenv("PMI_RANK");
if (pmiid) {
init_kv_str(&pairs[npairs], PMIRANK_KEY, pmiid);
++npairs;
}
}
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
isThreaded = 1;
}
MPIU_THREAD_CHECK_END;
#endif
init_kv_str(&pairs[npairs], THREADED_KEY, isThreaded ? "TRUE" : "FALSE");
++npairs;
pmi2_errno = PMIi_WriteSimpleCommand(PMI2_fd, 0, FULLINIT_CMD, pairs_p, npairs); /* don't pass in thread id for init */
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
/* Read auth-response */
/* Send auth-response-complete */
/* Read fullinit-response */
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, FULLINITRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_fullinit", "**pmi2_fullinit %s", errmsg ? errmsg : "unknown");
found = getvalint(cmd.pairs, cmd.nPairs, PMIVERSION_KEY, &version);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getvalint(cmd.pairs, cmd.nPairs, PMISUBVER_KEY, &subver);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getvalint(cmd.pairs, cmd.nPairs, RANK_KEY, rank);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getvalint(cmd.pairs, cmd.nPairs, SIZE_KEY, size);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getvalint(cmd.pairs, cmd.nPairs, APPNUM_KEY, appnum);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getval(cmd.pairs, cmd.nPairs, SPAWNERJOBID_KEY, &spawner_jobid, &spawner_jobid_len);
PMI2U_ERR_CHKANDJUMP(found == -1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (found)
*spawned = TRUE;
else
*spawned = FALSE;
debugged = 0;
found = getvalbool(cmd.pairs, cmd.nPairs, DEBUGGED_KEY, &debugged);
PMI2U_ERR_CHKANDJUMP(found == -1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2_debug |= debugged;
found = getvalbool(cmd.pairs, cmd.nPairs, PMIVERBOSE_KEY, &PMI2_pmiverbose);
PMI2U_ERR_CHKANDJUMP(found == -1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
}
if ( ! PMI2_initialized )
PMI2_initialized = NORMAL_INIT_WITH_PM;
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Finalize(void)
{
int pmi2_errno = PMI2_SUCCESS;
int rc;
const char *errmsg;
PMI2_Command cmd = {0};
if ( PMI2_initialized > SINGLETON_INIT_BUT_NO_PM) {
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, FINALIZE_CMD, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, FINALIZERESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_finalize", "**pmi2_finalize %s", errmsg ? errmsg : "unknown");
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
shutdown( PMI2_fd, SHUT_RDWR );
close( PMI2_fd );
}
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Initialized(void)
{
/* Turn this into a logical value (1 or 0) . This allows us
to use PMI2_initialized to distinguish between initialized with
an PMI service (e.g., via mpiexec) and the singleton init,
which has no PMI service */
return PMI2_initialized != 0;
}
int PMI2_Abort( int flag, const char msg[] )
{
PMI2U_printf(1, "aborting job:\n%s\n", msg);
/* ignoring return code, because we're exiting anyway */
PMIi_WriteSimpleCommandStr(PMI2_fd, NULL, ABORT_CMD, ISWORLD_KEY, flag ? TRUE_VAL : FALSE_VAL, MSG_KEY, msg, NULL);
PMI2U_Exit(PMII_EXIT_CODE);
return PMI2_SUCCESS;
}
int PMI2_Job_Spawn(int count, const char * cmds[],
int argcs[], const char ** argvs[],
const int maxprocs[],
const int info_keyval_sizes[],
const struct MPID_Info *info_keyval_vectors[],
int preput_keyval_size,
const struct MPID_Info *preput_keyval_vector[],
char jobId[], int jobIdSize,
int errors[])
{
int i,rc,spawncnt,total_num_processes,num_errcodes_found;
int found;
const char *jid;
int jidlen;
char tempbuf[PMI2U_MAXLINE];
char *lead, *lag;
int spawn_rc;
const char *errmsg = NULL;
PMI2_Command resp_cmd = {0};
int pmi2_errno = 0;
PMI2_Keyvalpair **pairs_p = NULL;
int npairs = 0;
int total_pairs = 0;
/* Connect to the PM if we haven't already */
if (PMIi_InitIfSingleton() != 0) return -1;
total_num_processes = 0;
/* XXX DJG from Pavan's email:
cmd=spawn;thrid=string;ncmds=count;preputcount=n;ppkey0=name;ppval0=string;...;\
subcmd=spawn-exe1;maxprocs=n;argc=narg;argv0=name;\
argv1=name;...;infokeycount=n;infokey0=key;\
infoval0=string;...;\
(... one subcmd for each executable ...)
*/
/* FIXME overall need a better interface for building commands!
* Need to be able to append commands, and to easily accept integer
* valued arguments. Memory management should stay completely out
* of mind when writing a new PMI command impl like this! */
/* Calculate the total number of keyval pairs that we need.
*
* The command writing utility adds "cmd" and "thrid" fields for us,
* don't include them in our count. */
total_pairs = 2; /* ncmds,preputcount */
total_pairs += (3 * count); /* subcmd,maxprocs,argc */
total_pairs += (2 * preput_keyval_size); /* ppkeyN,ppvalN */
for (spawncnt = 0; spawncnt < count; ++spawncnt) {
total_pairs += argcs[spawncnt]; /* argvN */
if (info_keyval_sizes) {
total_pairs += 1; /* infokeycount */
total_pairs += 2 * info_keyval_sizes[spawncnt]; /* infokeyN,infovalN */
}
}
pairs_p = PMI2U_Malloc(total_pairs * sizeof(PMI2_Keyvalpair*));
/* individiually allocating instead of batch alloc b/c freepairs assumes it */
for (i = 0; i < total_pairs; ++i) {
/* FIXME we are somehow still leaking some of this memory */
pairs_p[i] = PMI2U_Malloc(sizeof(PMI2_Keyvalpair));
PMI2U_Assert(pairs_p[i]);
}
init_kv_strdup_int(pairs_p[npairs++], "ncmds", count);
init_kv_strdup_int(pairs_p[npairs++], "preputcount", preput_keyval_size);
for (i = 0; i < preput_keyval_size; ++i) {
init_kv_strdup_intsuffix(pairs_p[npairs++], "ppkey", i, preput_keyval_vector[i]->key);
init_kv_strdup_intsuffix(pairs_p[npairs++], "ppval", i, preput_keyval_vector[i]->value);
}
for (spawncnt = 0; spawncnt < count; ++spawncnt)
{
total_num_processes += maxprocs[spawncnt];
init_kv_strdup(pairs_p[npairs++], "subcmd", cmds[spawncnt]);
init_kv_strdup_int(pairs_p[npairs++], "maxprocs", maxprocs[spawncnt]);
init_kv_strdup_int(pairs_p[npairs++], "argc", argcs[spawncnt]);
for (i = 0; i < argcs[spawncnt]; ++i) {
init_kv_strdup_intsuffix(pairs_p[npairs++], "argv", i, argvs[spawncnt][i]);
}
if (info_keyval_sizes) {
init_kv_strdup_int(pairs_p[npairs++], "infokeycount", info_keyval_sizes[spawncnt]);
for (i = 0; i < info_keyval_sizes[spawncnt]; ++i) {
init_kv_strdup_intsuffix(pairs_p[npairs++], "infokey", i, info_keyval_vectors[spawncnt][i].key);
init_kv_strdup_intsuffix(pairs_p[npairs++], "infoval", i, info_keyval_vectors[spawncnt][i].value);
}
}
}
if (npairs < total_pairs) { printf_d("about to fail assertion, npairs=%d total_pairs=%d\n", npairs, total_pairs); }
PMI2U_Assert(npairs == total_pairs);
pmi2_errno = PMIi_WriteSimpleCommand(PMI2_fd, &resp_cmd, "spawn", pairs_p, npairs);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
freepairs(pairs_p, npairs);
pairs_p = NULL;
/* XXX DJG TODO release any upper level MPICH critical sections */
rc = PMIi_ReadCommandExp(PMI2_fd, &resp_cmd, "spawn-response", &spawn_rc, &errmsg);
if (rc != 0) { return PMI2_FAIL; }
/* XXX DJG TODO deal with the response */
PMI2U_Assert(errors != NULL);
if (jobId && jobIdSize) {
found = getval(resp_cmd.pairs, resp_cmd.nPairs, JOBID_KEY, &jid, &jidlen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_Strncpy(jobId, jid, jobIdSize);
}
if (PMI2U_getval( "errcodes", tempbuf, PMI2U_MAXLINE )) {
num_errcodes_found = 0;
lag = &tempbuf[0];
do {
lead = strchr(lag, ',');
if (lead) *lead = '\0';
errors[num_errcodes_found++] = atoi(lag);
lag = lead + 1; /* move past the null char */
PMI2U_Assert(num_errcodes_found <= total_num_processes);
} while (lead != NULL);
PMI2U_Assert(num_errcodes_found == total_num_processes);
}
else {
/* gforker doesn't return errcodes, so we'll just pretend that means
that it was going to send all `0's. */
for (i = 0; i < total_num_processes; ++i) {
errors[i] = 0;
}
}
fn_fail:
PMI2U_Free(resp_cmd.command);
freepairs(resp_cmd.pairs, resp_cmd.nPairs);
if (pairs_p) freepairs(pairs_p, npairs);
return pmi2_errno;
}
int PMI2_Job_GetId(char jobid[], int jobid_size)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
const char *jid;
int jidlen;
int rc;
const char *errmsg;
PMI2_Command cmd = {0};
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, JOBGETID_CMD, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, JOBGETIDRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_jobgetid", "**pmi2_jobgetid %s", errmsg ? errmsg : "unknown");
found = getval(cmd.pairs, cmd.nPairs, JOBID_KEY, &jid, &jidlen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_Strncpy(jobid, jid, jobid_size);
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME PMI2_Job_Connect
#undef FCNAME
#define FCNAME PMI2DI_QUOTE(FUNCNAME)
int PMI2_Job_Connect(const char jobid[], PMI2_Connect_comm_t *conn)
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int found;
int kvscopy;
int rc;
const char *errmsg;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, JOBCONNECT_CMD, JOBID_KEY, jobid, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, JOBCONNECTRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_jobconnect", "**pmi2_jobconnect %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, KVSCOPY_KEY, &kvscopy);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_ERR_CHKANDJUMP(kvscopy, pmi2_errno, PMI2_ERR_OTHER, "**notimpl");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Job_Disconnect(const char jobid[])
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, JOBDISCONNECT_CMD, JOBID_KEY, jobid, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, JOBDISCONNECTRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_jobdisconnect", "**pmi2_jobdisconnect %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_KVS_Put(const char key[], const char value[])
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, KVSPUT_CMD, KEY_KEY, key, VALUE_KEY, value, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, KVSPUTRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_kvsput", "**pmi2_kvsput %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_KVS_Fence(void)
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, KVSFENCE_CMD, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, KVSFENCERESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_kvsfence", "**pmi2_kvsfence %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_KVS_Get(const char *jobid, int src_pmi_id, const char key[], char value [], int maxValue, int *valLen)
{
int pmi2_errno = PMI2_SUCCESS;
int found, keyfound;
const char *kvsvalue;
int kvsvallen;
int ret;
PMI2_Command cmd = {0};
int rc;
char src_pmi_id_str[256];
const char *errmsg;
PMI2U_Snprintf(src_pmi_id_str, sizeof(src_pmi_id_str), "%d", src_pmi_id);
pmi2_errno = PMIi_InitIfSingleton();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, KVSGET_CMD, JOBID_KEY, jobid, SRCID_KEY, src_pmi_id_str, KEY_KEY, key, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, KVSGETRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_kvsget", "**pmi2_kvsget %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, FOUND_KEY, &keyfound);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_ERR_CHKANDJUMP(!keyfound, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_kvsget_notfound");
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &kvsvalue, &kvsvallen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
ret = PMI2U_Strncpy(value, kvsvalue, maxValue);
*valLen = ret ? -kvsvallen : kvsvallen;
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Info_GetNodeAttr(const char name[], char value[], int valuelen, int *flag, int waitfor)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
const char *kvsvalue;
int kvsvallen;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_InitIfSingleton();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, GETNODEATTR_CMD, KEY_KEY, name, WAIT_KEY, waitfor ? "TRUE" : "FALSE", NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, GETNODEATTRRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_getnodeattr", "**pmi2_getnodeattr %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, FOUND_KEY, flag);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (*flag) {
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &kvsvalue, &kvsvallen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_Strncpy(value, kvsvalue, valuelen);
}
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Info_GetNodeAttrIntArray(const char name[], int array[], int arraylen, int *outlen, int *flag)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
const char *kvsvalue;
int kvsvallen;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
int i;
const char *valptr;
pmi2_errno = PMIi_InitIfSingleton();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, GETNODEATTR_CMD, KEY_KEY, name, WAIT_KEY, "FALSE", NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, GETNODEATTRRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_getnodeattr", "**pmi2_getnodeattr %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, FOUND_KEY, flag);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (*flag) {
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &kvsvalue, &kvsvallen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
valptr = kvsvalue;
i = 0;
rc = sscanf(valptr, "%d", &array[i]);
PMI2U_ERR_CHKANDJUMP1(rc != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "unable to parse intarray");
++i;
while ((valptr = strchr(valptr, ',')) && i < arraylen) {
++valptr; /* skip over the ',' */
rc = sscanf(valptr, "%d", &array[i]);
PMI2U_ERR_CHKANDJUMP1(rc != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "unable to parse intarray");
++i;
}
*outlen = i;
}
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Info_PutNodeAttr(const char name[], const char value[])
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, PUTNODEATTR_CMD, KEY_KEY, name, VALUE_KEY, value, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, PUTNODEATTRRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_putnodeattr", "**pmi2_putnodeattr %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Info_GetJobAttr(const char name[], char value[], int valuelen, int *flag)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
const char *kvsvalue;
int kvsvallen;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
pmi2_errno = PMIi_InitIfSingleton();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, GETJOBATTR_CMD, KEY_KEY, name, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, GETJOBATTRRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_getjobattr", "**pmi2_getjobattr %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, FOUND_KEY, flag);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (*flag) {
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &kvsvalue, &kvsvallen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
PMI2U_Strncpy(value, kvsvalue, valuelen);
}
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Info_GetJobAttrIntArray(const char name[], int array[], int arraylen, int *outlen, int *flag)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
const char *kvsvalue;
int kvsvallen;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
int i;
const char *valptr;
pmi2_errno = PMIi_InitIfSingleton();
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, GETJOBATTR_CMD, KEY_KEY, name, NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, GETJOBATTRRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_getjobattr", "**pmi2_getjobattr %s", errmsg ? errmsg : "unknown");
found = getvalbool(cmd.pairs, cmd.nPairs, FOUND_KEY, flag);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (*flag) {
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &kvsvalue, &kvsvallen);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
valptr = kvsvalue;
i = 0;
rc = sscanf(valptr, "%d", &array[i]);
PMI2U_ERR_CHKANDJUMP1(rc != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "unable to parse intarray");
++i;
while ((valptr = strchr(valptr, ',')) && i < arraylen) {
++valptr; /* skip over the ',' */
rc = sscanf(valptr, "%d", &array[i]);
PMI2U_ERR_CHKANDJUMP1(rc != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "unable to parse intarray");
++i;
}
*outlen = i;
}
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Nameserv_publish(const char service_name[], const PMI2U_Info *info_ptr, const char port[])
{
int pmi2_errno = PMI2_SUCCESS;
PMI2_Command cmd = {0};
int rc;
const char *errmsg;
/* ignoring infokey functionality for now */
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, NAMEPUBLISH_CMD,
NAME_KEY, service_name, PORT_KEY, port,
INFOKEYCOUNT_KEY, "0", NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, NAMEPUBLISHRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservpublish", "**pmi2_nameservpublish %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Nameserv_lookup(const char service_name[], const PMI2U_Info *info_ptr,
char port[], int portLen)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
int rc;
PMI2_Command cmd = {0};
int plen;
const char *errmsg;
const char *found_port;
/* ignoring infos for now */
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, NAMELOOKUP_CMD,
NAME_KEY, service_name, INFOKEYCOUNT_KEY, "0", NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, NAMELOOKUPRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservlookup", "**pmi2_nameservlookup %s", errmsg ? errmsg : "unknown");
found = getval(cmd.pairs, cmd.nPairs, VALUE_KEY, &found_port, &plen);
PMI2U_ERR_CHKANDJUMP1(!found, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservlookup", "**pmi2_nameservlookup %s", "not found");
PMI2U_Strncpy(port, found_port, portLen);
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMI2_Nameserv_unpublish(const char service_name[],
const PMI2U_Info *info_ptr)
{
int pmi2_errno = PMI2_SUCCESS;
int found;
int rc;
PMI2_Command cmd = {0};
int plen;
const char *errmsg;
const char *found_port;
pmi2_errno = PMIi_WriteSimpleCommandStr(PMI2_fd, &cmd, NAMEUNPUBLISH_CMD,
NAME_KEY, service_name, INFOKEYCOUNT_KEY, "0", NULL);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = PMIi_ReadCommandExp(PMI2_fd, &cmd, NAMEUNPUBLISHRESP_CMD, &rc, &errmsg);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP1(rc, pmi2_errno, PMI2_ERR_OTHER, "**pmi2_nameservunpublish", "**pmi2_nameservunpublish %s", errmsg ? errmsg : "unknown");
fn_exit:
PMI2U_Free(cmd.command);
freepairs(cmd.pairs, cmd.nPairs);
return pmi2_errno;
fn_fail:
goto fn_exit;
}
/* ------------------------------------------------------------------------- */
/* Service routines */
/*
* PMIi_ReadCommand - Reads an entire command from the PMI socket. This
* routine blocks the thread until the command is read.
*
* PMIi_WriteSimpleCommand - Write a simple command to the PMI socket; this
* allows printf - style arguments. This blocks the thread until the buffer
* has been written (for fault-tolerance, we may want to keep it around
* in case of PMI failure).
*
* PMIi_WaitFor - Wait for a particular PMI command request to complete.
* In a multithreaded environment, this may
*/
/* ------------------------------------------------------------------------- */
/* frees all of the keyvals pointed to by a keyvalpair* array and the array iteself*/
static void freepairs(PMI2_Keyvalpair** pairs, int npairs)
{
int i;
if (!pairs)
return;
for (i = 0; i < npairs; ++i)
if (pairs[i]->isCopy) {
/* FIXME casts are here to suppress legitimate constness warnings */
PMI2U_Free((void *)pairs[i]->key);
PMI2U_Free((void *)pairs[i]->value);
PMI2U_Free(pairs[i]);
}
PMI2U_Free(pairs);
}
/* getval & friends -- these functions search the pairs list for a
* matching key, set val appropriately and return 1. If no matching
* key is found, 0 is returned. If the value is invalid, -1 is returned */
static int getval(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, const char **value, int *vallen)
{
int i;
for (i = 0; i < npairs; ++i)
if (strncmp(key, pairs[i]->key, PMI2_MAX_KEYLEN) == 0) {
*value = pairs[i]->value;
*vallen = pairs[i]->valueLen;
return 1;
}
return 0;
}
static int getvalint(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, int *val)
{
int found;
const char *value;
int vallen;
int ret;
/* char *endptr; */
found = getval(pairs, npairs, key, &value, &vallen);
if (found != 1)
return found;
if (vallen == 0)
return -1;
ret = sscanf(value, "%d", val);
if (ret != 1)
return -1;
/* *val = strtoll(value, &endptr, 0); */
/* if (endptr - value != vallen) */
/* return -1; */
return 1;
}
static int getvalptr(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, void *val)
{
int found;
const char *value;
int vallen;
int ret;
void **val_ = val;
/* char *endptr; */
found = getval(pairs, npairs, key, &value, &vallen);
if (found != 1)
return found;
if (vallen == 0)
return -1;
ret = sscanf(value, "%p", val_);
if (ret != 1)
return -1;
/* *val_ = (void *)(PMI2R_Upint)strtoll(value, &endptr, 0); */
/* if (endptr - value != vallen) */
/* return -1; */
return 1;
}
static int getvalbool(PMI2_Keyvalpair *const pairs[], int npairs, const char *key, int *val)
{
int found;
const char *value;
int vallen;
found = getval(pairs, npairs, key, &value, &vallen);
if (found != 1)
return found;
if (strlen("TRUE") == vallen && !strncmp(value, "TRUE", vallen))
*val = TRUE;
else if (strlen("FALSE") == vallen && !strncmp(value, "FALSE", vallen))
*val = FALSE;
else
return -1;
return 1;
}
/* parse_keyval(cmdptr, len, key, val, vallen)
Scans through buffer specified by cmdptr looking for the first key and value.
IN/OUT cmdptr - IN: pointer to buffer; OUT: pointer to byte after the ';' terminating the value
IN/OUT len - IN: length of buffer; OUT: length of buffer not read
OUT key - pointer to null-terminated string containing the key
OUT val - pointer to string containing the value
OUT vallen - length of the value string
This function will modify the buffer passed through cmdptr to
insert '\0' following the key, and to replace escaped ';;' with
';'.
*/
static int parse_keyval(char **cmdptr, int *len, char **key, char **val, int *vallen)
{
int pmi2_errno = PMI2_SUCCESS;
char *c = *cmdptr;
char *d;
/* find key */
*key = c; /* key is at the start of the buffer */
while (*len && *c != '=') {
--*len;
++c;
}
PMI2U_ERR_CHKANDJUMP(*len == 0, pmi2_errno, PMI2_ERR_OTHER, "**bad_keyval");
PMI2U_ERR_CHKANDJUMP(c - *key > PMI2_MAX_KEYLEN, pmi2_errno, PMI2_ERR_OTHER, "**bad_keyval");
*c = '\0'; /* terminate the key string */
/* skip over the '=' */
--*len;
++c;
/* find val */
*val = d = c; /* val is next */
while (*len) {
if (*c == ';') { /* handle escaped ';' */
if (*(c+1) != ';')
break;
else
{
--*len;
++c;
}
}
--*len;
*(d++) = *(c++);
}
PMI2U_ERR_CHKANDJUMP(*len == 0, pmi2_errno, PMI2_ERR_OTHER, "**bad_keyval");
PMI2U_ERR_CHKANDJUMP(d - *val > PMI2_MAX_VALLEN, pmi2_errno, PMI2_ERR_OTHER, "**bad_keyval");
*vallen = d - *val;
*cmdptr = c+1; /* skip over the ';' */
--*len;
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
static int create_keyval(PMI2_Keyvalpair **kv, const char *key, const char *val, int vallen)
{
int pmi2_errno = PMI2_SUCCESS;
char *key_p;
char *value_p;
PMI2U_CHKPMEM_DECL(3);
PMI2U_CHKPMEM_MALLOC(*kv, PMI2_Keyvalpair *, sizeof(PMI2_Keyvalpair), pmi2_errno, "pair");
PMI2U_CHKPMEM_MALLOC(key_p, char *, strlen(key)+1, pmi2_errno, "key");
PMI2U_Strncpy(key_p, key, PMI2_MAX_KEYLEN+1);
PMI2U_CHKPMEM_MALLOC(value_p, char *, vallen+1, pmi2_errno, "value");
PMI2U_Memcpy(value_p, val, vallen);
value_p[vallen] = '\0';
(*kv)->key = key_p;
(*kv)->value = value_p;
(*kv)->valueLen = vallen;
(*kv)->isCopy = TRUE;
fn_exit:
PMI2U_CHKPMEM_COMMIT();
return pmi2_errno;
fn_fail:
PMI2U_CHKPMEM_REAP();
goto fn_exit;
}
/* Note that we fill in the fields in a command that is provided.
We may want to share these routines with the PMI version 2 server */
int PMIi_ReadCommand( int fd, PMI2_Command *cmd )
{
int pmi2_errno = PMI2_SUCCESS,err;
char cmd_len_str[PMII_COMMANDLEN_SIZE+1];
int cmd_len, remaining_len, vallen = 0;
char *c, *cmd_buf = NULL;
char *key, *val = NULL;
ssize_t nbytes;
ssize_t offset;
int num_pairs;
int pair_index;
char *command = NULL;
int nPairs;
int found;
PMI2_Keyvalpair **pairs = NULL;
PMI2_Command *target_cmd;
memset(cmd_len_str, 0, sizeof(cmd_len_str));
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
MPID_Thread_mutex_lock(&mutex, &err);
while (blocked && !cmd->complete)
MPID_Thread_cond_wait(&cond, &mutex);
if (cmd->complete) {
MPID_Thread_mutex_unlock(&mutex, &err);
goto fn_exit;
}
blocked = TRUE;
MPID_Thread_mutex_unlock(&mutex, &err);
}
MPIU_THREAD_CHECK_END;
#endif
do {
/* get length of cmd */
offset = 0;
do
{
do {
nbytes = read(fd, &cmd_len_str[offset], PMII_COMMANDLEN_SIZE - offset);
} while (nbytes == -1 && errno == EINTR);
PMI2U_ERR_CHKANDJUMP1(nbytes <= 0, pmi2_errno, PMI2_ERR_OTHER, "**read", "**read %s", strerror(errno));
offset += nbytes;
}
while (offset < PMII_COMMANDLEN_SIZE);
cmd_len = atoi(cmd_len_str);
cmd_buf = PMI2U_Malloc(cmd_len+1);
if (!cmd_buf) { PMI2U_CHKMEM_SETERR(pmi2_errno, cmd_len+1, "cmd_buf"); goto fn_exit; }
memset(cmd_buf, 0, cmd_len+1);
/* get command */
offset = 0;
do
{
do {
nbytes = read(fd, &cmd_buf[offset], cmd_len - offset);
} while (nbytes == -1 && errno == EINTR);
PMI2U_ERR_CHKANDJUMP1(nbytes <= 0, pmi2_errno, PMI2_ERR_OTHER, "**read", "**read %s", strerror(errno));
offset += nbytes;
}
while (offset < cmd_len);
printf_d("PMI received (cmdlen %d): %s\n", cmd_len, cmd_buf);
/* count number of "key=val;" */
c = cmd_buf;
remaining_len = cmd_len;
num_pairs = 0;
while (remaining_len) {
while (remaining_len && *c != ';') {
--remaining_len;
++c;
}
if (*c == ';' && *(c+1) == ';') {
remaining_len -= 2;
c += 2;
} else {
++num_pairs;
--remaining_len;
++c;
}
}
c = cmd_buf;
remaining_len = cmd_len;
pmi2_errno = parse_keyval(&c, &remaining_len, &key, &val, &vallen);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP(strncmp(key, "cmd", PMI2_MAX_KEYLEN) != 0, pmi2_errno, PMI2_ERR_OTHER, "**bad_cmd");
command = PMI2U_Malloc(vallen+1);
if (!command) { PMI2U_CHKMEM_SETERR(pmi2_errno, vallen+1, "command"); goto fn_exit; }
PMI2U_Memcpy(command, val, vallen);
val[vallen] = '\0';
nPairs = num_pairs-1; /* num_pairs-1 because the first pair is the command */
pairs = PMI2U_Malloc(sizeof(PMI2_Keyvalpair *) * nPairs);
if (!pairs) { PMI2U_CHKMEM_SETERR(pmi2_errno, sizeof(PMI2_Keyvalpair *) * nPairs, "pairs"); goto fn_exit; }
pair_index = 0;
while (remaining_len)
{
PMI2_Keyvalpair *pair;
pmi2_errno = parse_keyval(&c, &remaining_len, &key, &val, &vallen);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pmi2_errno = create_keyval(&pair, key, val, vallen);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
pairs[pair_index] = pair;
++pair_index;
}
found = getvalptr(pairs, nPairs, THRID_KEY, &target_cmd);
if (!found) /* if there's no thrid specified, assume it's for you */
target_cmd = cmd;
else
if (PMI2_debug && SEARCH_REMOVE(target_cmd) == 0) {
int i;
printf("command=%s\n", command);
for (i = 0; i < nPairs; ++i)
dump_PMI2_Keyvalpair(stdout, pairs[i]);
}
target_cmd->command = command;
target_cmd->nPairs = nPairs;
target_cmd->pairs = pairs;
#ifdef MPICH_IS_THREADED
target_cmd->complete = TRUE;
#endif
PMI2U_Free(cmd_buf);
} while (!cmd->complete);
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
MPID_Thread_mutex_lock(&mutex, &err);
blocked = FALSE;
MPID_Thread_cond_broadcast(&cond,&err);
MPID_Thread_mutex_unlock(&mutex, &err);
}
MPIU_THREAD_CHECK_END;
#endif
fn_exit:
return pmi2_errno;
fn_fail:
if (cmd_buf)
PMI2U_Free(cmd_buf);
goto fn_exit;
}
/* PMIi_ReadCommandExp -- reads a command checks that it matches the
* expected command string exp, and parses the return code */
int PMIi_ReadCommandExp( int fd, PMI2_Command *cmd, const char *exp, int* rc, const char **errmsg )
{
int pmi2_errno = PMI2_SUCCESS;
int found;
int msglen;
pmi2_errno = PMIi_ReadCommand(fd, cmd);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
PMI2U_ERR_CHKANDJUMP(strncmp(cmd->command, exp, strlen(exp)) != 0, pmi2_errno, PMI2_ERR_OTHER, "**bad_cmd");
found = getvalint(cmd->pairs, cmd->nPairs, RC_KEY, rc);
PMI2U_ERR_CHKANDJUMP(found != 1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
found = getval(cmd->pairs, cmd->nPairs, ERRMSG_KEY, errmsg, &msglen);
PMI2U_ERR_CHKANDJUMP(found == -1, pmi2_errno, PMI2_ERR_OTHER, "**intern");
if (!found)
*errmsg = NULL;
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMIi_WriteSimpleCommand( int fd, PMI2_Command *resp, const char cmd[], PMI2_Keyvalpair *pairs[], int npairs)
{
int pmi2_errno = PMI2_SUCCESS,err;
char cmdbuf[PMII_MAX_COMMAND_LEN];
char cmdlenbuf[PMII_COMMANDLEN_SIZE+1];
char *c = cmdbuf;
int ret;
int remaining_len = PMII_MAX_COMMAND_LEN;
int cmdlen;
int i;
ssize_t nbytes;
ssize_t offset;
int pair_index;
/* leave space for length field */
memset(c, ' ', PMII_COMMANDLEN_SIZE);
c += PMII_COMMANDLEN_SIZE;
PMI2U_ERR_CHKANDJUMP(strlen(cmd) > PMI2_MAX_VALLEN, pmi2_errno, PMI2_ERR_OTHER, "**cmd_too_long");
ret = PMI2U_Snprintf(c, remaining_len, "cmd=%s;", cmd);
PMI2U_ERR_CHKANDJUMP1(ret >= remaining_len, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "Ran out of room for command");
c += ret;
remaining_len -= ret;
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
if (resp) {
ret = PMI2U_Snprintf(c, remaining_len, "thrid=%p;", resp);
PMI2U_ERR_CHKANDJUMP1(ret >= remaining_len, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "Ran out of room for command");
c += ret;
remaining_len -= ret;
}
MPIU_THREAD_CHECK_END;
#endif
for (pair_index = 0; pair_index < npairs; ++pair_index) {
/* write key= */
PMI2U_ERR_CHKANDJUMP(strlen(pairs[pair_index]->key) > PMI2_MAX_KEYLEN, pmi2_errno, PMI2_ERR_OTHER, "**key_too_long");
ret = PMI2U_Snprintf(c, remaining_len, "%s=", pairs[pair_index]->key);
PMI2U_ERR_CHKANDJUMP1(ret >= remaining_len, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "Ran out of room for command");
c += ret;
remaining_len -= ret;
/* write value and escape ;'s as ;; */
PMI2U_ERR_CHKANDJUMP(pairs[pair_index]->valueLen > PMI2_MAX_VALLEN, pmi2_errno, PMI2_ERR_OTHER, "**val_too_long");
for (i = 0; i < pairs[pair_index]->valueLen; ++i) {
if (pairs[pair_index]->value[i] == ';') {
*c = ';';
++c;
--remaining_len;
}
*c = pairs[pair_index]->value[i];
++c;
--remaining_len;
}
/* append ; */
*c = ';';
++c;
--remaining_len;
}
/* prepend the buffer length stripping off the trailing '\0' */
cmdlen = PMII_MAX_COMMAND_LEN - remaining_len;
ret = PMI2U_Snprintf(cmdlenbuf, sizeof(cmdlenbuf), "%d", cmdlen);
PMI2U_ERR_CHKANDJUMP1(ret >= PMII_COMMANDLEN_SIZE, pmi2_errno, PMI2_ERR_OTHER, "**intern", "**intern %s", "Command length won't fit in length buffer");
PMI2U_Memcpy(cmdbuf, cmdlenbuf, ret);
cmdbuf[cmdlen+PMII_COMMANDLEN_SIZE] = '\0'; /* silence valgrind warnings in printf_d */
printf_d("PMI sending: %s\n", cmdbuf);
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
MPID_Thread_mutex_lock(&mutex, &err);
while (blocked)
MPID_Thread_cond_wait(&cond, &mutex, &err);
blocked = TRUE;
MPID_Thread_mutex_unlock(&mutex, &err);
}
MPIU_THREAD_CHECK_END;
#endif
if (PMI2_debug)
ENQUEUE(resp);
offset = 0;
do {
do {
nbytes = write(fd, &cmdbuf[offset], cmdlen + PMII_COMMANDLEN_SIZE - offset);
} while (nbytes == -1 && errno == EINTR);
PMI2U_ERR_CHKANDJUMP1(nbytes <= 0, pmi2_errno, PMI2_ERR_OTHER, "**write", "**write %s", strerror(errno));
offset += nbytes;
} while (offset < cmdlen + PMII_COMMANDLEN_SIZE);
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
MPID_Thread_mutex_lock(&mutex, &err);
blocked = FALSE;
MPID_Thread_cond_broadcast(&cond,&err);
MPID_Thread_mutex_unlock(&mutex, &err);
}
MPIU_THREAD_CHECK_END;
#endif
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
int PMIi_WriteSimpleCommandStr(int fd, PMI2_Command *resp, const char cmd[], ...)
{
int pmi2_errno = PMI2_SUCCESS;
va_list ap;
PMI2_Keyvalpair *pairs;
PMI2_Keyvalpair **pairs_p;
int npairs;
int i;
const char *key;
const char *val;
PMI2U_CHKLMEM_DECL(2);
npairs = 0;
va_start(ap, cmd);
while ((key = va_arg(ap, const char *))) {
val = va_arg(ap, const char *);
++npairs;
}
va_end(ap);
PMI2U_CHKLMEM_MALLOC(pairs, PMI2_Keyvalpair *, sizeof(PMI2_Keyvalpair) * npairs, pmi2_errno, "pairs");
PMI2U_CHKLMEM_MALLOC(pairs_p, PMI2_Keyvalpair **, sizeof(PMI2_Keyvalpair *) * npairs, pmi2_errno, "pairs_p");
i = 0;
va_start(ap, cmd);
while ((key = va_arg(ap, const char *))) {
val = va_arg(ap, const char *);
pairs_p[i] = &pairs[i];
pairs[i].key = key;
pairs[i].value = val;
pairs[i].valueLen = strlen(val);
pairs[i].isCopy = FALSE;
++i;
}
va_end(ap);
pmi2_errno = PMIi_WriteSimpleCommand(fd, resp, cmd, pairs_p, npairs);
if (pmi2_errno) PMI2U_ERR_POP(pmi2_errno);
fn_exit:
PMI2U_CHKLMEM_FREEALL();
return pmi2_errno;
fn_fail:
goto fn_exit;
}
/*
* This code allows a program to contact a host/port for the PMI socket.
*/
#include <errno.h>
#if defined(HAVE_SYS_TYPES_H)
#include <sys/types.h>
#endif
#include <sys/param.h>
#include <sys/socket.h>
/* sockaddr_in (Internet) */
#include <netinet/in.h>
/* TCP_NODELAY */
#include <netinet/tcp.h>
/* sockaddr_un (Unix) */
#include <sys/un.h>
/* defs of gethostbyname */
#include <netdb.h>
/* fcntl, F_GET/SETFL */
#include <fcntl.h>
/* This is really IP!? */
#ifndef TCP
#define TCP 0
#endif
/* stub for connecting to a specified host/port instead of using a
specified fd inherited from a parent process */
static int PMII_Connect_to_pm( char *hostname, int portnum )
{
struct hostent *hp;
struct sockaddr_in sa;
int fd;
int optval = 1;
int q_wait = 1;
hp = gethostbyname( hostname );
if (!hp) {
PMI2U_printf( 1, "Unable to get host entry for %s\n", hostname );
return -1;
}
memset( (void *)&sa, 0, sizeof(sa) );
/* POSIX might define h_addr_list only and node define h_addr */
#ifdef HAVE_H_ADDR_LIST
PMI2U_Memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr_list[0], hp->h_length);
#else
PMI2U_Memcpy( (void *)&sa.sin_addr, (void *)hp->h_addr, hp->h_length);
#endif
sa.sin_family = hp->h_addrtype;
sa.sin_port = htons( (unsigned short) portnum );
fd = socket( AF_INET, SOCK_STREAM, TCP );
if (fd < 0) {
PMI2U_printf( 1, "Unable to get AF_INET socket\n" );
return -1;
}
if (setsockopt( fd, IPPROTO_TCP, TCP_NODELAY,
(char *)&optval, sizeof(optval) )) {
perror( "Error calling setsockopt:" );
}
/* We wait here for the connection to succeed */
if (connect( fd, (struct sockaddr *)&sa, sizeof(sa) ) < 0) {
switch (errno) {
case ECONNREFUSED:
PMI2U_printf( 1, "connect failed with connection refused\n" );
/* (close socket, get new socket, try again) */
if (q_wait)
close(fd);
return -1;
case EINPROGRESS: /* (nonblocking) - select for writing. */
break;
case EISCONN: /* (already connected) */
break;
case ETIMEDOUT: /* timed out */
PMI2U_printf( 1, "connect failed with timeout\n" );
return -1;
default:
PMI2U_printf( 1, "connect failed with errno %d\n", errno );
return -1;
}
}
return fd;
}
/* ------------------------------------------------------------------------- */
/*
* Singleton Init.
*
* MPI-2 allows processes to become MPI processes and then make MPI calls,
* such as MPI_Comm_spawn, that require a process manager (this is different
* than the much simpler case of allowing MPI programs to run with an
* MPI_COMM_WORLD of size 1 without an mpiexec or process manager).
*
* The process starts when either the client or the process manager contacts
* the other. If the client starts, it sends a singinit command and
* waits for the server to respond with its own singinit command.
* If the server start, it send a singinit command and waits for the
* client to respond with its own singinit command
*
* client sends singinit with these required values
* pmi_version=<value of PMI_VERSION>
* pmi_subversion=<value of PMI_SUBVERSION>
*
* and these optional values
* stdio=[yes|no]
* authtype=[none|shared|<other-to-be-defined>]
* authstring=<string>
*
* server sends singinit with the same required and optional values as
* above.
*
* At this point, the protocol is now the same in both cases, and has the
* following components:
*
* server sends singinit_info with these required fields
* versionok=[yes|no]
* stdio=[yes|no]
* kvsname=<string>
*
* The client then issues the init command (see PMII_getmaxes)
*
* cmd=init pmi_version=<val> pmi_subversion=<val>
*
* and expects to receive a
*
* cmd=response_to_init rc=0 pmi_version=<val> pmi_subversion=<val>
*
* (This is the usual init sequence).
*
*/
/* ------------------------------------------------------------------------- */
/* This is a special routine used to re-initialize PMI when it is in
the singleton init case. That is, the executable was started without
mpiexec, and PMI2_Init returned as if there was only one process.
Note that PMI routines should not call PMII_singinit; they should
call PMIi_InitIfSingleton(), which both connects to the process mangager
and sets up the initial KVS connection entry.
*/
static int PMII_singinit(void)
{
return 0;
}
/* Promote PMI to a fully initialized version if it was started as
a singleton init */
static int PMIi_InitIfSingleton(void)
{
return 0;
}
static int accept_one_connection(int list_sock)
{
int gotit, new_sock;
struct sockaddr_in from;
socklen_t len;
len = sizeof(from);
gotit = 0;
while ( ! gotit )
{
new_sock = accept(list_sock, (struct sockaddr *)&from, &len);
if (new_sock == -1)
{
if (errno == EINTR) /* interrupted? If so, try again */
continue;
else
{
PMI2U_printf(1, "accept failed in accept_one_connection\n");
exit(-1);
}
}
else
gotit = 1;
}
return(new_sock);
}
/* Get the FD to use for PMI operations. If a port is used, rather than
a pre-established FD (i.e., via pipe), this routine will handle the
initial handshake.
*/
static int getPMIFD(void)
{
int pmi2_errno = PMI2_SUCCESS;
char *p;
/* Set the default */
PMI2_fd = -1;
p = getenv("PMI_FD");
if (p) {
PMI2_fd = atoi(p);
goto fn_exit;
}
p = getenv( "PMI_PORT" );
if (p) {
int portnum;
char hostname[MAXHOSTNAME+1];
char *pn, *ph;
/* Connect to the indicated port (in format hostname:portnumber)
and get the fd for the socket */
/* Split p into host and port */
pn = p;
ph = hostname;
while (*pn && *pn != ':' && (ph - hostname) < MAXHOSTNAME) {
*ph++ = *pn++;
}
*ph = 0;
PMI2U_ERR_CHKANDJUMP1(*pn != ':', pmi2_errno, PMI2_ERR_OTHER, "**pmi2_port", "**pmi2_port %s", p);
portnum = atoi( pn+1 );
/* FIXME: Check for valid integer after : */
/* This routine only gets the fd to use to talk to
the process manager. The handshake below is used
to setup the initial values */
PMI2_fd = PMII_Connect_to_pm( hostname, portnum );
PMI2U_ERR_CHKANDJUMP2(PMI2_fd < 0, pmi2_errno, PMI2_ERR_OTHER, "**connect_to_pm", "**connect_to_pm %s %d", hostname, portnum);
}
/* OK to return success for singleton init */
fn_exit:
return pmi2_errno;
fn_fail:
goto fn_exit;
}
/* ----------------------------------------------------------------------- */
/*
* This function is used to request information from the server and check
* that the response uses the expected command name. On a successful
* return from this routine, additional PMI2U_getval calls may be used
* to access information about the returned value.
*
* If checkRc is true, this routine also checks that the rc value returned
* was 0. If not, it uses the "msg" value to report on the reason for
* the failure.
*/
static int GetResponse( const char request[], const char expectedCmd[],
int checkRc )
{
int err = 0;
return err;
}
static void dump_PMI2_Keyvalpair(FILE *file, PMI2_Keyvalpair *kv)
{
fprintf(file, " key = %s\n", kv->key);
fprintf(file, " value = %s\n", kv->value);
fprintf(file, " valueLen = %d\n", kv->valueLen);
fprintf(file, " isCopy = %s\n", kv->isCopy ? "TRUE" : "FALSE");
}
static void dump_PMI2_Command(FILE *file, PMI2_Command *cmd)
{
int i;
fprintf(file, "cmd = %s\n", cmd->command);
fprintf(file, "nPairs = %d\n", cmd->nPairs);
for (i = 0; i < cmd->nPairs; ++i)
dump_PMI2_Keyvalpair(file, cmd->pairs[i]);
}