Blob Blame History Raw
/*
 * Copyright (c) 1998,1999,2000
 *	Traakan, Inc., Los Altos, CA
 *	All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice unmodified, this list of conditions, and the following
 *    disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/*
 * Project:  NDMJOB
 * Ident:    $Id: $
 *
 * Description:
 *
 */


#include "ndmlib.h"



/*
 * Initialize a channel. Make sure it won't be confused for active.
 */
void
ndmchan_initialize (struct ndmchan *ch, char *name)
{
	NDMOS_MACRO_ZEROFILL(ch);
	ch->name = name ? name : "???";
	ch->fd = -1;
	ch->mode = NDMCHAN_MODE_IDLE;
}

/*
 * Set the data buffer
 */
int
ndmchan_setbuf (struct ndmchan *ch, char *data, unsigned data_size)
{
	ch->data = data;
	ch->data_size = data_size;

	ch->beg_ix = 0;
	ch->end_ix = 0;

	return 0;
}




/*
 * Interfaces for starting a channel in various modes.
 */
int
ndmchan_start_mode (struct ndmchan *ch, int fd, int chan_mode)
{
	ch->fd = fd;
	ch->mode = chan_mode;
	return 0;
}

int
ndmchan_start_read (struct ndmchan *ch, int fd)
{
	return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_READ);
}

int
ndmchan_start_write (struct ndmchan *ch, int fd)
{
	return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_WRITE);
}

int
ndmchan_start_readchk (struct ndmchan *ch, int fd)
{
	return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_READCHK);
}

int
ndmchan_start_listen (struct ndmchan *ch, int fd)
{
	return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_LISTEN);
}

int
ndmchan_start_resident (struct ndmchan *ch)
{
	return ndmchan_start_mode (ch, -1, NDMCHAN_MODE_RESIDENT);
}

int
ndmchan_start_pending (struct ndmchan *ch, int fd)
{
	return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_PENDING);
}

/*
 * Change a PENDING channel to an active (READ/WRITE) channel
 */
int
ndmchan_pending_to_mode (struct ndmchan *ch, int chan_mode)
{
	ch->mode = chan_mode;
	return 0;
}

int
ndmchan_pending_to_read (struct ndmchan *ch)
{
	return ndmchan_pending_to_mode (ch, NDMCHAN_MODE_READ);
}

int
ndmchan_pending_to_write (struct ndmchan *ch)
{
	return ndmchan_pending_to_mode (ch, NDMCHAN_MODE_WRITE);
}




/*
 * Interfaces for stopping (close()ing) a channel.
 * This is a bit of a hodge-podge. Could probably be cleaner.
 */

void
ndmchan_set_eof (struct ndmchan *ch)
{
	ch->eof = 1;
}

void
ndmchan_close_set_errno (struct ndmchan *ch, int err_no)
{
	ch->eof = 1;
	if (ch->fd >= 0) {
		close (ch->fd);
		ch->fd = -1;
	}
	ch->mode = NDMCHAN_MODE_CLOSED;
	ch->saved_errno = err_no;
	ch->beg_ix = ch->end_ix = 0;
}

void
ndmchan_close (struct ndmchan *ch) {
	ndmchan_close_set_errno (ch, 0);
}

void
ndmchan_abort (struct ndmchan *ch) {
	ndmchan_close_set_errno (ch,
		ch->saved_errno == 0 ? EINTR : ch->saved_errno);
}

void
ndmchan_close_as_is (struct ndmchan *ch) {
	ndmchan_close_set_errno (ch, ch->saved_errno);
}

void
ndmchan_cleanup (struct ndmchan *ch) {
	if (ch->mode != NDMCHAN_MODE_IDLE) {
		ndmchan_close_as_is (ch);
	}
}




/*
 * CPU Quantum for a set of channels. There are three
 * phases:
 * 1) Identify the channels to check for ready to do I/O.
 *    For example, a READ channel with no buffer space
 *    need not be checked.
 * 2) Call the OS dependent function that performs the
 *    actual select()/poll()/whatever.
 * 3) Based on the results, perform actual read()/write().
 *    EOF and errors are detected.
 *
 * This is constructed so that applications which can not use
 * ndmchan_quantum() directly have access to the helper functions
 * ndmchan_pre_poll() and ndmchan_post_poll().
 */

int
ndmchan_quantum (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
	int			rc;

	ndmchan_pre_poll (chtab, n_chtab);

	rc = ndmos_chan_poll (chtab, n_chtab, milli_timo);
	if (rc <= 0)
		return rc;

	rc = ndmchan_post_poll (chtab, n_chtab);

	return rc;
}

int
ndmchan_pre_poll (struct ndmchan *chtab[], unsigned n_chtab)
{
	struct ndmchan *	ch;
	unsigned int		i, n_check;

	n_check = 0;
	for (i = 0; i < n_chtab; i++) {
		ch = chtab[i];
		ch->ready = 0;
		ch->check = 0;

		if (ch->error)
			continue;

		switch (ch->mode) {
		default:
		case NDMCHAN_MODE_IDLE:
		case NDMCHAN_MODE_PENDING:
		case NDMCHAN_MODE_RESIDENT:
		case NDMCHAN_MODE_CLOSED:
			continue;

		case NDMCHAN_MODE_LISTEN:
		case NDMCHAN_MODE_READCHK:
			break;

		case NDMCHAN_MODE_READ:
			if (ch->eof)
				continue;
			if (ndmchan_n_avail (ch) == 0)
				continue;
			break;

		case NDMCHAN_MODE_WRITE:
			if (ndmchan_n_ready (ch) == 0)
				continue;
			break;
		}

		ch->check = 1;
		n_check++;
	}

	return n_check;
}

int
ndmchan_post_poll (struct ndmchan *chtab[], unsigned n_chtab)
{
	struct ndmchan *	ch;
	unsigned int		i;
	int			rc, len, n_ready;

	n_ready = 0;

	for (i = 0; i < n_chtab; i++) {
		ch = chtab[i];

		if (!ch->ready)
			continue;

		switch (ch->mode) {
		case NDMCHAN_MODE_READ:
			len = ndmchan_n_avail (ch);
			if (len <= 0) continue;

			n_ready++;
			rc = read (ch->fd, &ch->data[ch->end_ix], len);
			if (rc < 0) {
				if (errno != NDMOS_CONST_EWOULDBLOCK) {
					ch->error = ch->eof = 1;
					ch->saved_errno = errno;
					if (!ch->saved_errno)
						ch->saved_errno = -1;
				} else {
					/* no bytes read */
				}
			} else if (rc == 0) {
				ch->eof = 1;
				ch->error = 0;
				ch->saved_errno = 0;
			} else {
				ch->end_ix += rc;
			}
			break;

		case NDMCHAN_MODE_WRITE:
			len = ndmchan_n_ready (ch);
			if (len <= 0) continue;

			n_ready++;
			rc = write (ch->fd, &ch->data[ch->beg_ix], len);
			if (rc < 0) {
				if (errno != NDMOS_CONST_EWOULDBLOCK) {
					ch->eof = 1;
					ch->error = 1;
					ch->saved_errno = errno;
					if (!ch->saved_errno)
						ch->saved_errno = -1;
				} else {
					/* no bytes written */
					/* EWOULDBLOCK but ready? */
				}
			} else if (rc == 0) {
				/* NDMOS_CONST_EWOULDBLOCK? */
				ch->eof = 1;
				ch->error = 1;
				ch->saved_errno = 0;
			} else {
				ch->beg_ix += rc;
			}
			break;
		}
	}

	return n_ready;
}




/*
 * Channel data buffer space manipulation.
 */

void
ndmchan_compress (struct ndmchan *ch) {
	unsigned	len = ch->end_ix - ch->beg_ix;

	if (ch->beg_ix > 0 && len > 0) {
		bcopy (&ch->data[ch->beg_ix], ch->data, len);
	} else {
		if (len > ch->data_size)
			len = 0;
	}
	ch->beg_ix = 0;
	ch->end_ix = len;
}

int
ndmchan_n_avail (struct ndmchan *ch) {
	if (ch->end_ix == ch->beg_ix)
		ch->end_ix = ch->beg_ix = 0;

	if (ch->end_ix >= ch->data_size) {
		ndmchan_compress (ch);
	}
	return ch->data_size - ch->end_ix;
}

int
ndmchan_n_avail_record (struct ndmchan *ch, unsigned long size) {
	if (ch->end_ix == ch->beg_ix)
		ch->end_ix = ch->beg_ix = 0;

	if (ch->end_ix >= ch->data_size - size) {
		ndmchan_compress (ch);
	}
	return ch->data_size - ch->end_ix;
}

int
ndmchan_n_avail_total (struct ndmchan *ch) {
	if (ch->end_ix == ch->beg_ix)
		ch->end_ix = ch->beg_ix = 0;

	if (ch->end_ix >= ch->data_size) {
		ndmchan_compress (ch);
	}
	return ch->data_size - ch->end_ix + ch->beg_ix;
}

int
ndmchan_n_ready (struct ndmchan *ch) {
	return ch->end_ix - ch->beg_ix;
}




/*
 * Interfaces for interpreting channel state, obtaining pointers, lengths
 */

enum ndmchan_read_interpretation
ndmchan_read_interpret (struct ndmchan *ch, char **data_p,
  unsigned *n_ready_p)
{
	unsigned	n_ready;

	n_ready = *n_ready_p = ndmchan_n_ready (ch);
	*data_p = &ch->data[ch->beg_ix];

	if (ch->error) {
		if (n_ready == 0) {
			return NDMCHAN_RI_DONE_ERROR;
		} else {
			return NDMCHAN_RI_DRAIN_ERROR;
		}
	}

	if (ch->eof) {
		if (n_ready == 0) {
			return NDMCHAN_RI_DONE_EOF;
		} else {
			return NDMCHAN_RI_DRAIN_EOF;
		}
	}

	if (n_ready == 0) {
		return NDMCHAN_RI_EMPTY;
	}

	if (n_ready == ch->data_size) {
		return NDMCHAN_RI_READY_FULL;
	}

	return NDMCHAN_RI_READY;
}

enum ndmchan_write_interpretation
ndmchan_write_interpret (struct ndmchan *ch, char **data_p,
  unsigned *n_avail_p)
{
	unsigned	n_avail;

	n_avail = *n_avail_p = ndmchan_n_avail (ch);
	*data_p = &ch->data[ch->end_ix];

	if (ch->error) {
		/* We don't use WI_DRAIN_ERROR. If it's kaput, it's kaput */
		return NDMCHAN_WI_DONE_ERROR;
	}

	if (ch->eof) {
		if (n_avail == ch->data_size) {
			return NDMCHAN_WI_DONE_EOF;
		} else {
			return NDMCHAN_WI_DRAIN_EOF;
		}
	}

	if (n_avail == 0) {
		return NDMCHAN_WI_FULL;
	}

	if (n_avail == ch->data_size) {
		return NDMCHAN_WI_AVAIL_EMPTY;
	}

	return NDMCHAN_WI_AVAIL;
}




/*
 * Pretty printer
 */
void
ndmchan_pp (struct ndmchan *ch, char *buf)
{
	int		show_ra = 0;
	char *		bp = buf;
	char *		p;

	sprintf (bp, "name=%s", ch->name); while (*bp) bp++;

	switch (ch->mode) {
	case NDMCHAN_MODE_IDLE:		p = "idle"; break;
	case NDMCHAN_MODE_RESIDENT:	p = "resident"; show_ra = 1; break;
	case NDMCHAN_MODE_READ:		p = "read"; show_ra = 1; break;
	case NDMCHAN_MODE_WRITE:	p = "write"; show_ra = 1; break;
	case NDMCHAN_MODE_READCHK:	p = "readchk"; break;
	case NDMCHAN_MODE_LISTEN:	p = "listen"; break;
	case NDMCHAN_MODE_PENDING:	p = "pending"; break;
	case NDMCHAN_MODE_CLOSED:	p = "closed"; break;
	default:			p = "mode=???"; break;
	}

	sprintf (bp, " %s ", p);
	while (*bp) bp++;

	if (show_ra) {
		sprintf (bp, "ready=%d avail=%d ",
			ndmchan_n_ready(ch), ndmchan_n_avail(ch));
		while (*bp) bp++;
	}

	if (ch->ready)	strcat (bp, "-rdy");
	if (ch->check)	strcat (bp, "-chk");
	if (ch->eof)	strcat (bp, "-eof");
	if (ch->error)	strcat (bp, "-err");
}




#ifdef NDMOS_OPTION_USE_SELECT_FOR_CHAN_POLL
/*
 * Here because it is almost always used
 */

int
ndmos_chan_poll (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
	struct ndmchan *	ch;
	fd_set			rfds, wfds;
	int			nfd = 0, rc;
	unsigned		i;
	struct timeval		timo;

	FD_ZERO(&rfds);
	FD_ZERO(&wfds);

	timo.tv_sec = milli_timo / 1000;
	timo.tv_usec = (milli_timo%1000) * 1000;

	for (i = 0; i < n_chtab; i++) {
		ch = chtab[i];
		if (!ch->check)
			continue;

		switch (ch->mode) {
		case NDMCHAN_MODE_LISTEN:
		case NDMCHAN_MODE_READCHK:
		case NDMCHAN_MODE_READ:
			FD_SET (ch->fd, &rfds);
			break;

		case NDMCHAN_MODE_WRITE:
			FD_SET (ch->fd, &wfds);
			break;
		}
		if (nfd < ch->fd+1)
			nfd = ch->fd+1;
	}

	rc = select (nfd, &rfds, &wfds, (void*)0, &timo);
	if (rc <= 0)
		return rc;

	for (i = 0; i < n_chtab; i++) {
		ch = chtab[i];
		if (!ch->check)
			continue;

		switch (ch->mode) {
		case NDMCHAN_MODE_LISTEN:
		case NDMCHAN_MODE_READCHK:
		case NDMCHAN_MODE_READ:
			if (FD_ISSET (ch->fd, &rfds))
				ch->ready = 1;
			break;

		case NDMCHAN_MODE_WRITE:
			if (FD_ISSET (ch->fd, &wfds))
				ch->ready = 1;
			break;
		}
	}

	return rc;
}

#endif /* NDMOS_OPTION_USE_SELECT_FOR_CHAN_POLL */

#ifdef NDMOS_OPTION_USE_POLL_FOR_CHAN_POLL
/*
 * Here because it is common, and because poll(2) is
 * INFINITELY SUPERIOR to select(2).
 */

int
ndmos_chan_poll (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
	struct ndmchan *	ch;
	struct pollfd		pfdtab[20];
	int			n_pfdtab = 0;
	int			rc, i;

	NDMOS_MACRO_ZEROFILL (pfdtab);

	for (i = 0; i < n_chtab; i++) {
		ch = chtab[i];
		if (!ch->check)
			continue;

		switch (ch->mode) {
		case NDMCHAN_MODE_LISTEN:
		case NDMCHAN_MODE_READCHK:
		case NDMCHAN_MODE_READ:
			pfdtab[n_pfdtab].fd = ch->fd;
			pfdtab[n_pfdtab].events = POLLIN;
			break;

		case NDMCHAN_MODE_WRITE:
			pfdtab[n_pfdtab].fd = ch->fd;
			pfdtab[n_pfdtab].events = POLLOUT;
			break;
		}
		n_pfdtab++;
	}

	rc = poll (pfdtab, n_pfdtab, milli_timo);

	@@@ TODO: post them back. Not easy @@@

	return rc;
}

#endif /* NDMOS_OPTION_USE_POLL_FOR_CHAN_POLL */