Blob Blame History Raw
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2004 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

/*
 * The routines in this file provide an event-driven I/O handler
 *
 * Each active fd has a handler associated with it.
 */
/* FIXME: Occassionally, data from stdout has been lost when the job is
   exiting.  I don't know whether data is being lost because the writer
   is discarding it or the reader (mpiexec) is failing to finish reading from
   all of the sockets before exiting.
 */
#include "mpichconf.h"
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#ifdef HAVE_TIME_H
#include <time.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef HAVE_ERRNO_H
#include <errno.h>
#endif
#include "pmutil.h"
#include "ioloop.h"

/*
 * To simplify mapping fds back to their handlers, we store the handles
 * in an array such that the ith element of the array corresponds to the
 * fd with value i (e.g., for fd == 10, the [10] element of the array
 * has the information on the handler).  This isn't terrifically scalable,
 * but it makes this code fairly simple and this code isn't
 * performance sensitive.  "maxFD" is the maximum fd value seen; this
 * allows us to allocate a large array but usually only look at a small
 * part of it.
 */
#define MAXFD 4096
static IOHandle handlesByFD[MAXFD + 1];
static int maxFD = -1;

/*@
  MPIE_IORegister - Register a handler for an FD

Input Parameters:

  Notes:
  Keeps track of the largest fd seen (in 'maxFD').
  @*/
int MPIE_IORegister(int fd, int rdwr, int (*handler) (int, int, void *), void *extra_data)
{
    int i;

    if (fd > MAXFD) {
        /* Error; fd is too large */
        return 1;
    }

    /* Remember the largest set FD, and clear any FDs between this
     * fd and the last maximum */
    if (fd > maxFD) {
        for (i = maxFD + 1; i < fd; i++) {
            handlesByFD[i].fd = -1;
            handlesByFD[i].handler = 0;
        }
        maxFD = fd;
    }
    handlesByFD[fd].fd = fd;
    handlesByFD[fd].rdwr = rdwr;
    handlesByFD[fd].handler = handler;
    handlesByFD[fd].extra_data = extra_data;

    return 0;
}

/*@
  MPIE_IODeregister - Remove a handler for an FD

Input Parameters:
. fd - fd to deregister
  @*/
int MPIE_IODeregister(int fd)
{
    int i;
    int newMaxFd;

    if (fd > MAXFD) {
        /* Error; fd is too large */
        return 1;
    }
    if (fd > maxFD) {
        /* Error; fd is unknown */
        return 1;
    }

    /* Recompute the new maxfd */
    newMaxFd = -1;
    for (i = 0; i <= maxFD; i++) {
        if (handlesByFD[i].fd >= 0 && i > newMaxFd) {
            newMaxFd = i;
        }
    }
    maxFD = newMaxFd;

    handlesByFD[fd].fd = -1;
    handlesByFD[fd].rdwr = 0;
    handlesByFD[fd].handler = 0;
    handlesByFD[fd].extra_data = 0;

    return 0;
}

/*@
  MPIE_IOLoop - Handle all registered I/O

Input Parameters:
.  timeoutSeconds - Seconds until this routine should return with a
   timeout error.  If negative, no timeout.  If 0, return immediatedly
   after a nonblocking check for I/O.

   Return Value:
   Returns zero on success.  Returns 'IOLOOP_TIMEOUT' if the timeout
   is reached and 'IOLOOP_ERROR' on other errors.
@*/
int MPIE_IOLoop(int timeoutSeconds)
{
    int i, maxfd, fd, nfds, rc = 0, rc2;
    fd_set readfds, writefds;
    int (*handler) (int, int, void *);
    struct timeval tv;

    /* Loop on the fds, with the timeout */
    TimeoutInit(timeoutSeconds);
    while (1) {
        tv.tv_sec = TimeoutGetRemaining();
        tv.tv_usec = 0;
        /* Determine the active FDs */
        FD_ZERO(&readfds);
        FD_ZERO(&writefds);
        /* maxfd is the maximum active fd */
        maxfd = -1;
        for (i = 0; i <= maxFD; i++) {
            if (handlesByFD[i].handler) {
                fd = handlesByFD[i].fd;
                if (handlesByFD[i].rdwr & IO_READ) {
                    FD_SET(fd, &readfds);
                    maxfd = i;
                }
                if (handlesByFD[i].rdwr & IO_WRITE) {
                    FD_SET(fd, &writefds);
                    maxfd = i;
                }
            }
        }
        if (maxfd < 0)
            break;

        /* DBG_PRINTF(("Calling select with readfds = %x writefds = %x\n", */
        /*          *(int *)&readfds, *(int*)&writefds)); */
        MPIE_SYSCALL(nfds, select, (maxfd + 1, &readfds, &writefds, 0, &tv));
        if (nfds < 0 && (errno == EINTR || errno == 0)) {
            /* Continuing through EINTR */
            /* We allow errno == 0 as a synonym for EINTR.  We've seen this
             * on Solaris; in addition, we set errno to 0 after a failed
             * waitpid in the process routines, and if the OS isn't careful,
             * the value of errno may get ECHILD instead of EINTR when the
             * signal handler returns (we suspect Linux of this problem),
             * which is why we have the signal handler in process.c reset
             * errno to 0 (we may need to allow ECHILD here (!)) */
            /* FIXME: an EINTR may also mean that a process has exited
             * (SIGCHILD).  If all processes have exited, we may want to
             * exit */
            DBG_PRINTF(("errno = EINTR in select\n"));
            continue;
        }
        if (nfds < 0) {
            /* Serious error */
            MPL_internal_sys_error_printf("select", errno, 0);
            break;
        }
        if (nfds == 0) {
            /* Timeout from select */
            DBG_PRINTF(("Timeout in select\n"));
            return IOLOOP_TIMEOUT;
        }
        /* nfds > 0 */
        DBG_PRINTF(("Found some fds to process (n = %d)\n", nfds));
        for (fd = 0; fd <= maxfd; fd++) {
            if (FD_ISSET(fd, &writefds)) {
                handler = handlesByFD[fd].handler;
                if (handler) {
                    rc = (*handler) (fd, IO_WRITE, handlesByFD[fd].extra_data);
                }
                if (rc == 1) {
                    /* EOF? */
                    MPIE_SYSCALL(rc2, close, (fd));
                    handlesByFD[fd].rdwr = 0;
                    FD_CLR(fd, &writefds);
                }
            }
            if (FD_ISSET(fd, &readfds)) {
                handler = handlesByFD[fd].handler;
                if (handler) {
                    rc = (*handler) (fd, IO_READ, handlesByFD[fd].extra_data);
                }
                if (rc == 1) {
                    /* EOF? */
                    MPIE_SYSCALL(rc2, close, (fd));
                    handlesByFD[fd].rdwr = 0;
                    FD_CLR(fd, &readfds);
                }
            }
        }
    }
    DBG_PRINTF(("Returning from IOLOOP handler\n"));
    return 0;
}


static int end_time = -1;       /* Time of timeout in seconds */

void TimeoutInit(int seconds)
{
    if (seconds > 0) {
#ifdef HAVE_TIME
        time_t t;
        t = time(NULL);
        end_time = seconds + (int) t;
#elif defined(HAVE_GETTIMEOFDAY)
        struct timeval tp;
        gettimeofday(&tp, NULL);
        end_time = seconds + tp.tv_sec;
#else
#error 'No timer available'
#endif
    } else {
        end_time = -1;
    }
}

/* Return remaining time in seconds */
int TimeoutGetRemaining(void)
{
    int time_left;
    if (end_time < 0) {
        /* Return a large, positive number */
        return 1000000;
    } else {
#ifdef HAVE_TIME
        time_t t;
        t = time(NULL);
        time_left = end_time - (int) t;
#elif defined(HAVE_GETTIMEOFDAY)
        struct timeval tp;
        gettimeofday(&tp, NULL);
        time_left = end_time - tp.tv_sec;
#else
#error 'No timer available'
#endif
    }
    if (time_left < 0)
        time_left = 0;
    return time_left;
}