/*
* Copyright (c) 1998,1999,2000
* Traakan, Inc., Los Altos, CA
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice unmodified, this list of conditions, and the following
* disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
/*
* Project: NDMJOB
* Ident: $Id: $
*
* Description:
*
*/
#include "ndmlib.h"
/*
* Initialize a channel. Make sure it won't be confused for active.
*/
void
ndmchan_initialize (struct ndmchan *ch, char *name)
{
NDMOS_MACRO_ZEROFILL(ch);
ch->name = name ? name : "???";
ch->fd = -1;
ch->mode = NDMCHAN_MODE_IDLE;
}
/*
* Set the data buffer
*/
int
ndmchan_setbuf (struct ndmchan *ch, char *data, unsigned data_size)
{
ch->data = data;
ch->data_size = data_size;
ch->beg_ix = 0;
ch->end_ix = 0;
return 0;
}
/*
* Interfaces for starting a channel in various modes.
*/
int
ndmchan_start_mode (struct ndmchan *ch, int fd, int chan_mode)
{
ch->fd = fd;
ch->mode = chan_mode;
return 0;
}
int
ndmchan_start_read (struct ndmchan *ch, int fd)
{
return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_READ);
}
int
ndmchan_start_write (struct ndmchan *ch, int fd)
{
return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_WRITE);
}
int
ndmchan_start_readchk (struct ndmchan *ch, int fd)
{
return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_READCHK);
}
int
ndmchan_start_listen (struct ndmchan *ch, int fd)
{
return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_LISTEN);
}
int
ndmchan_start_resident (struct ndmchan *ch)
{
return ndmchan_start_mode (ch, -1, NDMCHAN_MODE_RESIDENT);
}
int
ndmchan_start_pending (struct ndmchan *ch, int fd)
{
return ndmchan_start_mode (ch, fd, NDMCHAN_MODE_PENDING);
}
/*
* Change a PENDING channel to an active (READ/WRITE) channel
*/
int
ndmchan_pending_to_mode (struct ndmchan *ch, int chan_mode)
{
ch->mode = chan_mode;
return 0;
}
int
ndmchan_pending_to_read (struct ndmchan *ch)
{
return ndmchan_pending_to_mode (ch, NDMCHAN_MODE_READ);
}
int
ndmchan_pending_to_write (struct ndmchan *ch)
{
return ndmchan_pending_to_mode (ch, NDMCHAN_MODE_WRITE);
}
/*
* Interfaces for stopping (close()ing) a channel.
* This is a bit of a hodge-podge. Could probably be cleaner.
*/
void
ndmchan_set_eof (struct ndmchan *ch)
{
ch->eof = 1;
}
void
ndmchan_close_set_errno (struct ndmchan *ch, int err_no)
{
ch->eof = 1;
if (ch->fd >= 0) {
close (ch->fd);
ch->fd = -1;
}
ch->mode = NDMCHAN_MODE_CLOSED;
ch->saved_errno = err_no;
ch->beg_ix = ch->end_ix = 0;
}
void
ndmchan_close (struct ndmchan *ch) {
ndmchan_close_set_errno (ch, 0);
}
void
ndmchan_abort (struct ndmchan *ch) {
ndmchan_close_set_errno (ch,
ch->saved_errno == 0 ? EINTR : ch->saved_errno);
}
void
ndmchan_close_as_is (struct ndmchan *ch) {
ndmchan_close_set_errno (ch, ch->saved_errno);
}
void
ndmchan_cleanup (struct ndmchan *ch) {
if (ch->mode != NDMCHAN_MODE_IDLE) {
ndmchan_close_as_is (ch);
}
}
/*
* CPU Quantum for a set of channels. There are three
* phases:
* 1) Identify the channels to check for ready to do I/O.
* For example, a READ channel with no buffer space
* need not be checked.
* 2) Call the OS dependent function that performs the
* actual select()/poll()/whatever.
* 3) Based on the results, perform actual read()/write().
* EOF and errors are detected.
*
* This is constructed so that applications which can not use
* ndmchan_quantum() directly have access to the helper functions
* ndmchan_pre_poll() and ndmchan_post_poll().
*/
int
ndmchan_quantum (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
int rc;
ndmchan_pre_poll (chtab, n_chtab);
rc = ndmos_chan_poll (chtab, n_chtab, milli_timo);
if (rc <= 0)
return rc;
rc = ndmchan_post_poll (chtab, n_chtab);
return rc;
}
int
ndmchan_pre_poll (struct ndmchan *chtab[], unsigned n_chtab)
{
struct ndmchan * ch;
unsigned int i, n_check;
n_check = 0;
for (i = 0; i < n_chtab; i++) {
ch = chtab[i];
ch->ready = 0;
ch->check = 0;
if (ch->error)
continue;
switch (ch->mode) {
default:
case NDMCHAN_MODE_IDLE:
case NDMCHAN_MODE_PENDING:
case NDMCHAN_MODE_RESIDENT:
case NDMCHAN_MODE_CLOSED:
continue;
case NDMCHAN_MODE_LISTEN:
case NDMCHAN_MODE_READCHK:
break;
case NDMCHAN_MODE_READ:
if (ch->eof)
continue;
if (ndmchan_n_avail (ch) == 0)
continue;
break;
case NDMCHAN_MODE_WRITE:
if (ndmchan_n_ready (ch) == 0)
continue;
break;
}
ch->check = 1;
n_check++;
}
return n_check;
}
int
ndmchan_post_poll (struct ndmchan *chtab[], unsigned n_chtab)
{
struct ndmchan * ch;
unsigned int i;
int rc, len, n_ready;
n_ready = 0;
for (i = 0; i < n_chtab; i++) {
ch = chtab[i];
if (!ch->ready)
continue;
switch (ch->mode) {
case NDMCHAN_MODE_READ:
len = ndmchan_n_avail (ch);
if (len <= 0) continue;
n_ready++;
rc = read (ch->fd, &ch->data[ch->end_ix], len);
if (rc < 0) {
if (errno != NDMOS_CONST_EWOULDBLOCK) {
ch->error = ch->eof = 1;
ch->saved_errno = errno;
if (!ch->saved_errno)
ch->saved_errno = -1;
} else {
/* no bytes read */
}
} else if (rc == 0) {
ch->eof = 1;
ch->error = 0;
ch->saved_errno = 0;
} else {
ch->end_ix += rc;
}
break;
case NDMCHAN_MODE_WRITE:
len = ndmchan_n_ready (ch);
if (len <= 0) continue;
n_ready++;
rc = write (ch->fd, &ch->data[ch->beg_ix], len);
if (rc < 0) {
if (errno != NDMOS_CONST_EWOULDBLOCK) {
ch->eof = 1;
ch->error = 1;
ch->saved_errno = errno;
if (!ch->saved_errno)
ch->saved_errno = -1;
} else {
/* no bytes written */
/* EWOULDBLOCK but ready? */
}
} else if (rc == 0) {
/* NDMOS_CONST_EWOULDBLOCK? */
ch->eof = 1;
ch->error = 1;
ch->saved_errno = 0;
} else {
ch->beg_ix += rc;
}
break;
}
}
return n_ready;
}
/*
* Channel data buffer space manipulation.
*/
void
ndmchan_compress (struct ndmchan *ch) {
unsigned len = ch->end_ix - ch->beg_ix;
if (ch->beg_ix > 0 && len > 0) {
bcopy (&ch->data[ch->beg_ix], ch->data, len);
} else {
if (len > ch->data_size)
len = 0;
}
ch->beg_ix = 0;
ch->end_ix = len;
}
int
ndmchan_n_avail (struct ndmchan *ch) {
if (ch->end_ix == ch->beg_ix)
ch->end_ix = ch->beg_ix = 0;
if (ch->end_ix >= ch->data_size) {
ndmchan_compress (ch);
}
return ch->data_size - ch->end_ix;
}
int
ndmchan_n_avail_record (struct ndmchan *ch, unsigned long size) {
if (ch->end_ix == ch->beg_ix)
ch->end_ix = ch->beg_ix = 0;
if (ch->end_ix >= ch->data_size - size) {
ndmchan_compress (ch);
}
return ch->data_size - ch->end_ix;
}
int
ndmchan_n_avail_total (struct ndmchan *ch) {
if (ch->end_ix == ch->beg_ix)
ch->end_ix = ch->beg_ix = 0;
if (ch->end_ix >= ch->data_size) {
ndmchan_compress (ch);
}
return ch->data_size - ch->end_ix + ch->beg_ix;
}
int
ndmchan_n_ready (struct ndmchan *ch) {
return ch->end_ix - ch->beg_ix;
}
/*
* Interfaces for interpreting channel state, obtaining pointers, lengths
*/
enum ndmchan_read_interpretation
ndmchan_read_interpret (struct ndmchan *ch, char **data_p,
unsigned *n_ready_p)
{
unsigned n_ready;
n_ready = *n_ready_p = ndmchan_n_ready (ch);
*data_p = &ch->data[ch->beg_ix];
if (ch->error) {
if (n_ready == 0) {
return NDMCHAN_RI_DONE_ERROR;
} else {
return NDMCHAN_RI_DRAIN_ERROR;
}
}
if (ch->eof) {
if (n_ready == 0) {
return NDMCHAN_RI_DONE_EOF;
} else {
return NDMCHAN_RI_DRAIN_EOF;
}
}
if (n_ready == 0) {
return NDMCHAN_RI_EMPTY;
}
if (n_ready == ch->data_size) {
return NDMCHAN_RI_READY_FULL;
}
return NDMCHAN_RI_READY;
}
enum ndmchan_write_interpretation
ndmchan_write_interpret (struct ndmchan *ch, char **data_p,
unsigned *n_avail_p)
{
unsigned n_avail;
n_avail = *n_avail_p = ndmchan_n_avail (ch);
*data_p = &ch->data[ch->end_ix];
if (ch->error) {
/* We don't use WI_DRAIN_ERROR. If it's kaput, it's kaput */
return NDMCHAN_WI_DONE_ERROR;
}
if (ch->eof) {
if (n_avail == ch->data_size) {
return NDMCHAN_WI_DONE_EOF;
} else {
return NDMCHAN_WI_DRAIN_EOF;
}
}
if (n_avail == 0) {
return NDMCHAN_WI_FULL;
}
if (n_avail == ch->data_size) {
return NDMCHAN_WI_AVAIL_EMPTY;
}
return NDMCHAN_WI_AVAIL;
}
/*
* Pretty printer
*/
void
ndmchan_pp (struct ndmchan *ch, char *buf)
{
int show_ra = 0;
char * bp = buf;
char * p;
sprintf (bp, "name=%s", ch->name); while (*bp) bp++;
switch (ch->mode) {
case NDMCHAN_MODE_IDLE: p = "idle"; break;
case NDMCHAN_MODE_RESIDENT: p = "resident"; show_ra = 1; break;
case NDMCHAN_MODE_READ: p = "read"; show_ra = 1; break;
case NDMCHAN_MODE_WRITE: p = "write"; show_ra = 1; break;
case NDMCHAN_MODE_READCHK: p = "readchk"; break;
case NDMCHAN_MODE_LISTEN: p = "listen"; break;
case NDMCHAN_MODE_PENDING: p = "pending"; break;
case NDMCHAN_MODE_CLOSED: p = "closed"; break;
default: p = "mode=???"; break;
}
sprintf (bp, " %s ", p);
while (*bp) bp++;
if (show_ra) {
sprintf (bp, "ready=%d avail=%d ",
ndmchan_n_ready(ch), ndmchan_n_avail(ch));
while (*bp) bp++;
}
if (ch->ready) strcat (bp, "-rdy");
if (ch->check) strcat (bp, "-chk");
if (ch->eof) strcat (bp, "-eof");
if (ch->error) strcat (bp, "-err");
}
#ifdef NDMOS_OPTION_USE_SELECT_FOR_CHAN_POLL
/*
* Here because it is almost always used
*/
int
ndmos_chan_poll (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
struct ndmchan * ch;
fd_set rfds, wfds;
int nfd = 0, rc;
unsigned i;
struct timeval timo;
FD_ZERO(&rfds);
FD_ZERO(&wfds);
timo.tv_sec = milli_timo / 1000;
timo.tv_usec = (milli_timo%1000) * 1000;
for (i = 0; i < n_chtab; i++) {
ch = chtab[i];
if (!ch->check)
continue;
switch (ch->mode) {
case NDMCHAN_MODE_LISTEN:
case NDMCHAN_MODE_READCHK:
case NDMCHAN_MODE_READ:
FD_SET (ch->fd, &rfds);
break;
case NDMCHAN_MODE_WRITE:
FD_SET (ch->fd, &wfds);
break;
}
if (nfd < ch->fd+1)
nfd = ch->fd+1;
}
rc = select (nfd, &rfds, &wfds, (void*)0, &timo);
if (rc <= 0)
return rc;
for (i = 0; i < n_chtab; i++) {
ch = chtab[i];
if (!ch->check)
continue;
switch (ch->mode) {
case NDMCHAN_MODE_LISTEN:
case NDMCHAN_MODE_READCHK:
case NDMCHAN_MODE_READ:
if (FD_ISSET (ch->fd, &rfds))
ch->ready = 1;
break;
case NDMCHAN_MODE_WRITE:
if (FD_ISSET (ch->fd, &wfds))
ch->ready = 1;
break;
}
}
return rc;
}
#endif /* NDMOS_OPTION_USE_SELECT_FOR_CHAN_POLL */
#ifdef NDMOS_OPTION_USE_POLL_FOR_CHAN_POLL
/*
* Here because it is common, and because poll(2) is
* INFINITELY SUPERIOR to select(2).
*/
int
ndmos_chan_poll (struct ndmchan *chtab[], unsigned n_chtab, int milli_timo)
{
struct ndmchan * ch;
struct pollfd pfdtab[20];
int n_pfdtab = 0;
int rc, i;
NDMOS_MACRO_ZEROFILL (pfdtab);
for (i = 0; i < n_chtab; i++) {
ch = chtab[i];
if (!ch->check)
continue;
switch (ch->mode) {
case NDMCHAN_MODE_LISTEN:
case NDMCHAN_MODE_READCHK:
case NDMCHAN_MODE_READ:
pfdtab[n_pfdtab].fd = ch->fd;
pfdtab[n_pfdtab].events = POLLIN;
break;
case NDMCHAN_MODE_WRITE:
pfdtab[n_pfdtab].fd = ch->fd;
pfdtab[n_pfdtab].events = POLLOUT;
break;
}
n_pfdtab++;
}
rc = poll (pfdtab, n_pfdtab, milli_timo);
@@@ TODO: post them back. Not easy @@@
return rc;
}
#endif /* NDMOS_OPTION_USE_POLL_FOR_CHAN_POLL */