/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2004 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
/*
* The routines in this file provide an event-driven I/O handler
*
* Each active fd has a handler associated with it.
*/
/* FIXME: Occassionally, data from stdout has been lost when the job is
exiting. I don't know whether data is being lost because the writer
is discarding it or the reader (mpiexec) is failing to finish reading from
all of the sockets before exiting.
*/
#include "mpichconf.h"
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#ifdef HAVE_TIME_H
#include <time.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif
#include "pmutil.h"
#include "ioloop.h"
/*
* To simplify mapping fds back to their handlers, we store the handles
* in an array such that the ith element of the array corresponds to the
* fd with value i (e.g., for fd == 10, the [10] element of the array
* has the information on the handler). This isn't terrifically scalable,
* but it makes this code fairly simple and this code isn't
* performance sensitive. "maxFD" is the maximum fd value seen; this
* allows us to allocate a large array but usually only look at a small
* part of it.
*/
#define MAXFD 4096
static IOHandle handlesByFD[MAXFD + 1];
static int maxFD = -1;
/*@
MPIE_IORegister - Register a handler for an FD
Input Parameters:
Notes:
Keeps track of the largest fd seen (in 'maxFD').
@*/
int MPIE_IORegister(int fd, int rdwr, int (*handler) (int, int, void *), void *extra_data)
{
int i;
if (fd > MAXFD) {
/* Error; fd is too large */
return 1;
}
/* Remember the largest set FD, and clear any FDs between this
* fd and the last maximum */
if (fd > maxFD) {
for (i = maxFD + 1; i < fd; i++) {
handlesByFD[i].fd = -1;
handlesByFD[i].handler = 0;
}
maxFD = fd;
}
handlesByFD[fd].fd = fd;
handlesByFD[fd].rdwr = rdwr;
handlesByFD[fd].handler = handler;
handlesByFD[fd].extra_data = extra_data;
return 0;
}
/*@
MPIE_IODeregister - Remove a handler for an FD
Input Parameters:
. fd - fd to deregister
@*/
int MPIE_IODeregister(int fd)
{
int i;
int newMaxFd;
if (fd > MAXFD) {
/* Error; fd is too large */
return 1;
}
if (fd > maxFD) {
/* Error; fd is unknown */
return 1;
}
/* Recompute the new maxfd */
newMaxFd = -1;
for (i = 0; i <= maxFD; i++) {
if (handlesByFD[i].fd >= 0 && i > newMaxFd) {
newMaxFd = i;
}
}
maxFD = newMaxFd;
handlesByFD[fd].fd = -1;
handlesByFD[fd].rdwr = 0;
handlesByFD[fd].handler = 0;
handlesByFD[fd].extra_data = 0;
return 0;
}
/*@
MPIE_IOLoop - Handle all registered I/O
Input Parameters:
. timeoutSeconds - Seconds until this routine should return with a
timeout error. If negative, no timeout. If 0, return immediatedly
after a nonblocking check for I/O.
Return Value:
Returns zero on success. Returns 'IOLOOP_TIMEOUT' if the timeout
is reached and 'IOLOOP_ERROR' on other errors.
@*/
int MPIE_IOLoop(int timeoutSeconds)
{
int i, maxfd, fd, nfds, rc = 0, rc2;
fd_set readfds, writefds;
int (*handler) (int, int, void *);
struct timeval tv;
/* Loop on the fds, with the timeout */
TimeoutInit(timeoutSeconds);
while (1) {
tv.tv_sec = TimeoutGetRemaining();
tv.tv_usec = 0;
/* Determine the active FDs */
FD_ZERO(&readfds);
FD_ZERO(&writefds);
/* maxfd is the maximum active fd */
maxfd = -1;
for (i = 0; i <= maxFD; i++) {
if (handlesByFD[i].handler) {
fd = handlesByFD[i].fd;
if (handlesByFD[i].rdwr & IO_READ) {
FD_SET(fd, &readfds);
maxfd = i;
}
if (handlesByFD[i].rdwr & IO_WRITE) {
FD_SET(fd, &writefds);
maxfd = i;
}
}
}
if (maxfd < 0)
break;
/* DBG_PRINTF(("Calling select with readfds = %x writefds = %x\n", */
/* *(int *)&readfds, *(int*)&writefds)); */
MPIE_SYSCALL(nfds, select, (maxfd + 1, &readfds, &writefds, 0, &tv));
if (nfds < 0 && (errno == EINTR || errno == 0)) {
/* Continuing through EINTR */
/* We allow errno == 0 as a synonym for EINTR. We've seen this
* on Solaris; in addition, we set errno to 0 after a failed
* waitpid in the process routines, and if the OS isn't careful,
* the value of errno may get ECHILD instead of EINTR when the
* signal handler returns (we suspect Linux of this problem),
* which is why we have the signal handler in process.c reset
* errno to 0 (we may need to allow ECHILD here (!)) */
/* FIXME: an EINTR may also mean that a process has exited
* (SIGCHILD). If all processes have exited, we may want to
* exit */
DBG_PRINTF(("errno = EINTR in select\n"));
continue;
}
if (nfds < 0) {
/* Serious error */
MPL_internal_sys_error_printf("select", errno, 0);
break;
}
if (nfds == 0) {
/* Timeout from select */
DBG_PRINTF(("Timeout in select\n"));
return IOLOOP_TIMEOUT;
}
/* nfds > 0 */
DBG_PRINTF(("Found some fds to process (n = %d)\n", nfds));
for (fd = 0; fd <= maxfd; fd++) {
if (FD_ISSET(fd, &writefds)) {
handler = handlesByFD[fd].handler;
if (handler) {
rc = (*handler) (fd, IO_WRITE, handlesByFD[fd].extra_data);
}
if (rc == 1) {
/* EOF? */
MPIE_SYSCALL(rc2, close, (fd));
handlesByFD[fd].rdwr = 0;
FD_CLR(fd, &writefds);
}
}
if (FD_ISSET(fd, &readfds)) {
handler = handlesByFD[fd].handler;
if (handler) {
rc = (*handler) (fd, IO_READ, handlesByFD[fd].extra_data);
}
if (rc == 1) {
/* EOF? */
MPIE_SYSCALL(rc2, close, (fd));
handlesByFD[fd].rdwr = 0;
FD_CLR(fd, &readfds);
}
}
}
}
DBG_PRINTF(("Returning from IOLOOP handler\n"));
return 0;
}
static int end_time = -1; /* Time of timeout in seconds */
void TimeoutInit(int seconds)
{
if (seconds > 0) {
#ifdef HAVE_TIME
time_t t;
t = time(NULL);
end_time = seconds + (int) t;
#elif defined(HAVE_GETTIMEOFDAY)
struct timeval tp;
gettimeofday(&tp, NULL);
end_time = seconds + tp.tv_sec;
#else
#error 'No timer available'
#endif
} else {
end_time = -1;
}
}
/* Return remaining time in seconds */
int TimeoutGetRemaining(void)
{
int time_left;
if (end_time < 0) {
/* Return a large, positive number */
return 1000000;
} else {
#ifdef HAVE_TIME
time_t t;
t = time(NULL);
time_left = end_time - (int) t;
#elif defined(HAVE_GETTIMEOFDAY)
struct timeval tp;
gettimeofday(&tp, NULL);
time_left = end_time - tp.tv_sec;
#else
#error 'No timer available'
#endif
}
if (time_left < 0)
time_left = 0;
return time_left;
}