Blame src/unix/stream.c

Packit b5b901
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
Packit b5b901
 *
Packit b5b901
 * Permission is hereby granted, free of charge, to any person obtaining a copy
Packit b5b901
 * of this software and associated documentation files (the "Software"), to
Packit b5b901
 * deal in the Software without restriction, including without limitation the
Packit b5b901
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
Packit b5b901
 * sell copies of the Software, and to permit persons to whom the Software is
Packit b5b901
 * furnished to do so, subject to the following conditions:
Packit b5b901
 *
Packit b5b901
 * The above copyright notice and this permission notice shall be included in
Packit b5b901
 * all copies or substantial portions of the Software.
Packit b5b901
 *
Packit b5b901
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
Packit b5b901
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
Packit b5b901
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Packit b5b901
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
Packit b5b901
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
Packit b5b901
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
Packit b5b901
 * IN THE SOFTWARE.
Packit b5b901
 */
Packit b5b901
Packit b5b901
#include "uv.h"
Packit b5b901
#include "internal.h"
Packit b5b901
Packit b5b901
#include <stdio.h>
Packit b5b901
#include <stdlib.h>
Packit b5b901
#include <string.h>
Packit b5b901
#include <assert.h>
Packit b5b901
#include <errno.h>
Packit b5b901
Packit b5b901
#include <sys/types.h>
Packit b5b901
#include <sys/socket.h>
Packit b5b901
#include <sys/uio.h>
Packit b5b901
#include <sys/un.h>
Packit b5b901
#include <unistd.h>
Packit b5b901
#include <limits.h> /* IOV_MAX */
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
# include <sys/event.h>
Packit b5b901
# include <sys/time.h>
Packit b5b901
# include <sys/select.h>
Packit b5b901
Packit b5b901
/* Forward declaration */
Packit b5b901
typedef struct uv__stream_select_s uv__stream_select_t;
Packit b5b901
Packit b5b901
struct uv__stream_select_s {
Packit b5b901
  uv_stream_t* stream;
Packit b5b901
  uv_thread_t thread;
Packit b5b901
  uv_sem_t close_sem;
Packit b5b901
  uv_sem_t async_sem;
Packit b5b901
  uv_async_t async;
Packit b5b901
  int events;
Packit b5b901
  int fake_fd;
Packit b5b901
  int int_fd;
Packit b5b901
  int fd;
Packit b5b901
  fd_set* sread;
Packit b5b901
  size_t sread_sz;
Packit b5b901
  fd_set* swrite;
Packit b5b901
  size_t swrite_sz;
Packit b5b901
};
Packit Service e08953
Packit Service e08953
/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
Packit Service e08953
 * EPROTOTYPE can be returned while trying to write to a socket that is
Packit Service e08953
 * shutting down. If we retry the write, we should get the expected EPIPE
Packit Service e08953
 * instead.
Packit Service e08953
 */
Packit Service e08953
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
Packit Service e08953
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
Packit b5b901
    (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
Packit Service e08953
     (errno == EMSGSIZE && send_handle != NULL))
Packit b5b901
#else
Packit Service e08953
# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
Packit Service e08953
# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
Packit b5b901
    (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
Packit b5b901
#endif /* defined(__APPLE__) */
Packit b5b901
Packit b5b901
static void uv__stream_connect(uv_stream_t*);
Packit b5b901
static void uv__write(uv_stream_t* stream);
Packit b5b901
static void uv__read(uv_stream_t* stream);
Packit b5b901
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
Packit b5b901
static void uv__write_callbacks(uv_stream_t* stream);
Packit b5b901
static size_t uv__write_req_size(uv_write_t* req);
Packit b5b901
Packit b5b901
Packit b5b901
void uv__stream_init(uv_loop_t* loop,
Packit b5b901
                     uv_stream_t* stream,
Packit b5b901
                     uv_handle_type type) {
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
  uv__handle_init(loop, (uv_handle_t*)stream, type);
Packit b5b901
  stream->read_cb = NULL;
Packit b5b901
  stream->alloc_cb = NULL;
Packit b5b901
  stream->close_cb = NULL;
Packit b5b901
  stream->connection_cb = NULL;
Packit b5b901
  stream->connect_req = NULL;
Packit b5b901
  stream->shutdown_req = NULL;
Packit b5b901
  stream->accepted_fd = -1;
Packit b5b901
  stream->queued_fds = NULL;
Packit b5b901
  stream->delayed_error = 0;
Packit b5b901
  QUEUE_INIT(&stream->write_queue);
Packit b5b901
  QUEUE_INIT(&stream->write_completed_queue);
Packit b5b901
  stream->write_queue_size = 0;
Packit b5b901
Packit b5b901
  if (loop->emfile_fd == -1) {
Packit b5b901
    err = uv__open_cloexec("/dev/null", O_RDONLY);
Packit b5b901
    if (err < 0)
Packit b5b901
        /* In the rare case that "/dev/null" isn't mounted open "/"
Packit b5b901
         * instead.
Packit b5b901
         */
Packit b5b901
        err = uv__open_cloexec("/", O_RDONLY);
Packit b5b901
    if (err >= 0)
Packit b5b901
      loop->emfile_fd = err;
Packit b5b901
  }
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
  stream->select = NULL;
Packit b5b901
#endif /* defined(__APPLE_) */
Packit b5b901
Packit b5b901
  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
  /* Notify select() thread about state change */
Packit b5b901
  uv__stream_select_t* s;
Packit b5b901
  int r;
Packit b5b901
Packit b5b901
  s = stream->select;
Packit b5b901
  if (s == NULL)
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  /* Interrupt select() loop
Packit b5b901
   * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
Packit b5b901
   * emit read event on other side
Packit b5b901
   */
Packit b5b901
  do
Packit b5b901
    r = write(s->fake_fd, "x", 1);
Packit b5b901
  while (r == -1 && errno == EINTR);
Packit b5b901
Packit b5b901
  assert(r == 1);
Packit b5b901
#else  /* !defined(__APPLE__) */
Packit b5b901
  /* No-op on any other platform */
Packit b5b901
#endif  /* !defined(__APPLE__) */
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
static void uv__stream_osx_select(void* arg) {
Packit b5b901
  uv_stream_t* stream;
Packit b5b901
  uv__stream_select_t* s;
Packit b5b901
  char buf[1024];
Packit b5b901
  int events;
Packit b5b901
  int fd;
Packit b5b901
  int r;
Packit b5b901
  int max_fd;
Packit b5b901
Packit b5b901
  stream = arg;
Packit b5b901
  s = stream->select;
Packit b5b901
  fd = s->fd;
Packit b5b901
Packit b5b901
  if (fd > s->int_fd)
Packit b5b901
    max_fd = fd;
Packit b5b901
  else
Packit b5b901
    max_fd = s->int_fd;
Packit b5b901
Packit b5b901
  while (1) {
Packit b5b901
    /* Terminate on semaphore */
Packit b5b901
    if (uv_sem_trywait(&s->close_sem) == 0)
Packit b5b901
      break;
Packit b5b901
Packit b5b901
    /* Watch fd using select(2) */
Packit b5b901
    memset(s->sread, 0, s->sread_sz);
Packit b5b901
    memset(s->swrite, 0, s->swrite_sz);
Packit b5b901
Packit b5b901
    if (uv__io_active(&stream->io_watcher, POLLIN))
Packit b5b901
      FD_SET(fd, s->sread);
Packit b5b901
    if (uv__io_active(&stream->io_watcher, POLLOUT))
Packit b5b901
      FD_SET(fd, s->swrite);
Packit b5b901
    FD_SET(s->int_fd, s->sread);
Packit b5b901
Packit b5b901
    /* Wait indefinitely for fd events */
Packit b5b901
    r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
Packit b5b901
    if (r == -1) {
Packit b5b901
      if (errno == EINTR)
Packit b5b901
        continue;
Packit b5b901
Packit b5b901
      /* XXX: Possible?! */
Packit b5b901
      abort();
Packit b5b901
    }
Packit b5b901
Packit b5b901
    /* Ignore timeouts */
Packit b5b901
    if (r == 0)
Packit b5b901
      continue;
Packit b5b901
Packit b5b901
    /* Empty socketpair's buffer in case of interruption */
Packit b5b901
    if (FD_ISSET(s->int_fd, s->sread))
Packit b5b901
      while (1) {
Packit b5b901
        r = read(s->int_fd, buf, sizeof(buf));
Packit b5b901
Packit b5b901
        if (r == sizeof(buf))
Packit b5b901
          continue;
Packit b5b901
Packit b5b901
        if (r != -1)
Packit b5b901
          break;
Packit b5b901
Packit b5b901
        if (errno == EAGAIN || errno == EWOULDBLOCK)
Packit b5b901
          break;
Packit b5b901
Packit b5b901
        if (errno == EINTR)
Packit b5b901
          continue;
Packit b5b901
Packit b5b901
        abort();
Packit b5b901
      }
Packit b5b901
Packit b5b901
    /* Handle events */
Packit b5b901
    events = 0;
Packit b5b901
    if (FD_ISSET(fd, s->sread))
Packit b5b901
      events |= POLLIN;
Packit b5b901
    if (FD_ISSET(fd, s->swrite))
Packit b5b901
      events |= POLLOUT;
Packit b5b901
Packit b5b901
    assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
Packit b5b901
    if (events != 0) {
Packit b5b901
      ACCESS_ONCE(int, s->events) = events;
Packit b5b901
Packit b5b901
      uv_async_send(&s->async);
Packit b5b901
      uv_sem_wait(&s->async_sem);
Packit b5b901
Packit b5b901
      /* Should be processed at this stage */
Packit b5b901
      assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
Packit b5b901
    }
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__stream_osx_select_cb(uv_async_t* handle) {
Packit b5b901
  uv__stream_select_t* s;
Packit b5b901
  uv_stream_t* stream;
Packit b5b901
  int events;
Packit b5b901
Packit b5b901
  s = container_of(handle, uv__stream_select_t, async);
Packit b5b901
  stream = s->stream;
Packit b5b901
Packit b5b901
  /* Get and reset stream's events */
Packit b5b901
  events = s->events;
Packit b5b901
  ACCESS_ONCE(int, s->events) = 0;
Packit b5b901
Packit b5b901
  assert(events != 0);
Packit b5b901
  assert(events == (events & (POLLIN | POLLOUT)));
Packit b5b901
Packit b5b901
  /* Invoke callback on event-loop */
Packit b5b901
  if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
Packit b5b901
    uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
Packit b5b901
  if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
Packit b5b901
    uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
Packit b5b901
  if (stream->flags & UV_HANDLE_CLOSING)
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  /* NOTE: It is important to do it here, otherwise `select()` might be called
Packit b5b901
   * before the actual `uv__read()`, leading to the blocking syscall
Packit b5b901
   */
Packit b5b901
  uv_sem_post(&s->async_sem);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__stream_osx_cb_close(uv_handle_t* async) {
Packit b5b901
  uv__stream_select_t* s;
Packit b5b901
Packit b5b901
  s = container_of(async, uv__stream_select_t, async);
Packit b5b901
  uv__free(s);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv__stream_try_select(uv_stream_t* stream, int* fd) {
Packit b5b901
  /*
Packit b5b901
   * kqueue doesn't work with some files from /dev mount on osx.
Packit b5b901
   * select(2) in separate thread for those fds
Packit b5b901
   */
Packit b5b901
Packit b5b901
  struct kevent filter[1];
Packit b5b901
  struct kevent events[1];
Packit b5b901
  struct timespec timeout;
Packit b5b901
  uv__stream_select_t* s;
Packit b5b901
  int fds[2];
Packit b5b901
  int err;
Packit b5b901
  int ret;
Packit b5b901
  int kq;
Packit b5b901
  int old_fd;
Packit b5b901
  int max_fd;
Packit b5b901
  size_t sread_sz;
Packit b5b901
  size_t swrite_sz;
Packit b5b901
Packit b5b901
  kq = kqueue();
Packit b5b901
  if (kq == -1) {
Packit b5b901
    perror("(libuv) kqueue()");
Packit b5b901
    return UV__ERR(errno);
Packit b5b901
  }
Packit b5b901
Packit b5b901
  EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
Packit b5b901
Packit b5b901
  /* Use small timeout, because we only want to capture EINVALs */
Packit b5b901
  timeout.tv_sec = 0;
Packit b5b901
  timeout.tv_nsec = 1;
Packit b5b901
Packit b5b901
  do
Packit b5b901
    ret = kevent(kq, filter, 1, events, 1, &timeout);
Packit b5b901
  while (ret == -1 && errno == EINTR);
Packit b5b901
Packit b5b901
  uv__close(kq);
Packit b5b901
Packit b5b901
  if (ret == -1)
Packit b5b901
    return UV__ERR(errno);
Packit b5b901
Packit b5b901
  if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
Packit b5b901
    return 0;
Packit b5b901
Packit b5b901
  /* At this point we definitely know that this fd won't work with kqueue */
Packit b5b901
Packit b5b901
  /*
Packit b5b901
   * Create fds for io watcher and to interrupt the select() loop.
Packit b5b901
   * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
Packit b5b901
   */
Packit b5b901
  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
Packit b5b901
    return UV__ERR(errno);
Packit b5b901
Packit b5b901
  max_fd = *fd;
Packit b5b901
  if (fds[1] > max_fd)
Packit b5b901
    max_fd = fds[1];
Packit b5b901
Packit b5b901
  sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
Packit b5b901
  swrite_sz = sread_sz;
Packit b5b901
Packit b5b901
  s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
Packit b5b901
  if (s == NULL) {
Packit b5b901
    err = UV_ENOMEM;
Packit b5b901
    goto failed_malloc;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  s->events = 0;
Packit b5b901
  s->fd = *fd;
Packit b5b901
  s->sread = (fd_set*) ((char*) s + sizeof(*s));
Packit b5b901
  s->sread_sz = sread_sz;
Packit b5b901
  s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
Packit b5b901
  s->swrite_sz = swrite_sz;
Packit b5b901
Packit b5b901
  err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
Packit b5b901
  if (err)
Packit b5b901
    goto failed_async_init;
Packit b5b901
Packit b5b901
  s->async.flags |= UV_HANDLE_INTERNAL;
Packit b5b901
  uv__handle_unref(&s->async);
Packit b5b901
Packit b5b901
  err = uv_sem_init(&s->close_sem, 0);
Packit b5b901
  if (err != 0)
Packit b5b901
    goto failed_close_sem_init;
Packit b5b901
Packit b5b901
  err = uv_sem_init(&s->async_sem, 0);
Packit b5b901
  if (err != 0)
Packit b5b901
    goto failed_async_sem_init;
Packit b5b901
Packit b5b901
  s->fake_fd = fds[0];
Packit b5b901
  s->int_fd = fds[1];
Packit b5b901
Packit b5b901
  old_fd = *fd;
Packit b5b901
  s->stream = stream;
Packit b5b901
  stream->select = s;
Packit b5b901
  *fd = s->fake_fd;
Packit b5b901
Packit b5b901
  err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
Packit b5b901
  if (err != 0)
Packit b5b901
    goto failed_thread_create;
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
Packit b5b901
failed_thread_create:
Packit b5b901
  s->stream = NULL;
Packit b5b901
  stream->select = NULL;
Packit b5b901
  *fd = old_fd;
Packit b5b901
Packit b5b901
  uv_sem_destroy(&s->async_sem);
Packit b5b901
Packit b5b901
failed_async_sem_init:
Packit b5b901
  uv_sem_destroy(&s->close_sem);
Packit b5b901
Packit b5b901
failed_close_sem_init:
Packit b5b901
  uv__close(fds[0]);
Packit b5b901
  uv__close(fds[1]);
Packit b5b901
  uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
Packit b5b901
  return err;
Packit b5b901
Packit b5b901
failed_async_init:
Packit b5b901
  uv__free(s);
Packit b5b901
Packit b5b901
failed_malloc:
Packit b5b901
  uv__close(fds[0]);
Packit b5b901
  uv__close(fds[1]);
Packit b5b901
Packit b5b901
  return err;
Packit b5b901
}
Packit b5b901
#endif /* defined(__APPLE__) */
Packit b5b901
Packit b5b901
Packit b5b901
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
  int enable;
Packit b5b901
#endif
Packit b5b901
Packit b5b901
  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
Packit b5b901
    return UV_EBUSY;
Packit b5b901
Packit b5b901
  assert(fd >= 0);
Packit b5b901
  stream->flags |= flags;
Packit b5b901
Packit b5b901
  if (stream->type == UV_TCP) {
Packit b5b901
    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
Packit b5b901
      return UV__ERR(errno);
Packit b5b901
Packit b5b901
    /* TODO Use delay the user passed in. */
Packit b5b901
    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
Packit b5b901
        uv__tcp_keepalive(fd, 1, 60)) {
Packit b5b901
      return UV__ERR(errno);
Packit b5b901
    }
Packit b5b901
  }
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
  enable = 1;
Packit b5b901
  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
Packit b5b901
      errno != ENOTSOCK &&
Packit b5b901
      errno != EINVAL) {
Packit b5b901
    return UV__ERR(errno);
Packit b5b901
  }
Packit b5b901
#endif
Packit b5b901
Packit b5b901
  stream->io_watcher.fd = fd;
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
Packit b5b901
  uv_write_t* req;
Packit b5b901
  QUEUE* q;
Packit b5b901
  while (!QUEUE_EMPTY(&stream->write_queue)) {
Packit b5b901
    q = QUEUE_HEAD(&stream->write_queue);
Packit b5b901
    QUEUE_REMOVE(q);
Packit b5b901
Packit b5b901
    req = QUEUE_DATA(q, uv_write_t, queue);
Packit b5b901
    req->error = error;
Packit b5b901
Packit b5b901
    QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
void uv__stream_destroy(uv_stream_t* stream) {
Packit b5b901
  assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
Packit b5b901
  assert(stream->flags & UV_HANDLE_CLOSED);
Packit b5b901
Packit b5b901
  if (stream->connect_req) {
Packit b5b901
    uv__req_unregister(stream->loop, stream->connect_req);
Packit b5b901
    stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
Packit b5b901
    stream->connect_req = NULL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  uv__stream_flush_write_queue(stream, UV_ECANCELED);
Packit b5b901
  uv__write_callbacks(stream);
Packit b5b901
Packit b5b901
  if (stream->shutdown_req) {
Packit b5b901
    /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
Packit b5b901
     * fait accompli at this point. Maybe we should revisit this in v0.11.
Packit b5b901
     * A possible reason for leaving it unchanged is that it informs the
Packit b5b901
     * callee that the handle has been destroyed.
Packit b5b901
     */
Packit b5b901
    uv__req_unregister(stream->loop, stream->shutdown_req);
Packit b5b901
    stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
Packit b5b901
    stream->shutdown_req = NULL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  assert(stream->write_queue_size == 0);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
/* Implements a best effort approach to mitigating accept() EMFILE errors.
Packit b5b901
 * We have a spare file descriptor stashed away that we close to get below
Packit b5b901
 * the EMFILE limit. Next, we accept all pending connections and close them
Packit b5b901
 * immediately to signal the clients that we're overloaded - and we are, but
Packit b5b901
 * we still keep on trucking.
Packit b5b901
 *
Packit b5b901
 * There is one caveat: it's not reliable in a multi-threaded environment.
Packit b5b901
 * The file descriptor limit is per process. Our party trick fails if another
Packit b5b901
 * thread opens a file or creates a socket in the time window between us
Packit b5b901
 * calling close() and accept().
Packit b5b901
 */
Packit b5b901
static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
Packit b5b901
  int err;
Packit b5b901
  int emfile_fd;
Packit b5b901
Packit b5b901
  if (loop->emfile_fd == -1)
Packit b5b901
    return UV_EMFILE;
Packit b5b901
Packit b5b901
  uv__close(loop->emfile_fd);
Packit b5b901
  loop->emfile_fd = -1;
Packit b5b901
Packit b5b901
  do {
Packit b5b901
    err = uv__accept(accept_fd);
Packit b5b901
    if (err >= 0)
Packit b5b901
      uv__close(err);
Packit b5b901
  } while (err >= 0 || err == UV_EINTR);
Packit b5b901
Packit b5b901
  emfile_fd = uv__open_cloexec("/", O_RDONLY);
Packit b5b901
  if (emfile_fd >= 0)
Packit b5b901
    loop->emfile_fd = emfile_fd;
Packit b5b901
Packit b5b901
  return err;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#if defined(UV_HAVE_KQUEUE)
Packit b5b901
# define UV_DEC_BACKLOG(w) w->rcount--;
Packit b5b901
#else
Packit b5b901
# define UV_DEC_BACKLOG(w) /* no-op */
Packit b5b901
#endif /* defined(UV_HAVE_KQUEUE) */
Packit b5b901
Packit b5b901
Packit b5b901
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
Packit b5b901
  uv_stream_t* stream;
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
  stream = container_of(w, uv_stream_t, io_watcher);
Packit b5b901
  assert(events & POLLIN);
Packit b5b901
  assert(stream->accepted_fd == -1);
Packit b5b901
  assert(!(stream->flags & UV_HANDLE_CLOSING));
Packit b5b901
Packit b5b901
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
Packit b5b901
  /* connection_cb can close the server socket while we're
Packit b5b901
   * in the loop so check it on each iteration.
Packit b5b901
   */
Packit b5b901
  while (uv__stream_fd(stream) != -1) {
Packit b5b901
    assert(stream->accepted_fd == -1);
Packit b5b901
Packit b5b901
#if defined(UV_HAVE_KQUEUE)
Packit b5b901
    if (w->rcount <= 0)
Packit b5b901
      return;
Packit b5b901
#endif /* defined(UV_HAVE_KQUEUE) */
Packit b5b901
Packit b5b901
    err = uv__accept(uv__stream_fd(stream));
Packit b5b901
    if (err < 0) {
Packit b5b901
      if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
Packit b5b901
        return;  /* Not an error. */
Packit b5b901
Packit b5b901
      if (err == UV_ECONNABORTED)
Packit b5b901
        continue;  /* Ignore. Nothing we can do about that. */
Packit b5b901
Packit b5b901
      if (err == UV_EMFILE || err == UV_ENFILE) {
Packit b5b901
        err = uv__emfile_trick(loop, uv__stream_fd(stream));
Packit b5b901
        if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
Packit b5b901
          break;
Packit b5b901
      }
Packit b5b901
Packit b5b901
      stream->connection_cb(stream, err);
Packit b5b901
      continue;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    UV_DEC_BACKLOG(w)
Packit b5b901
    stream->accepted_fd = err;
Packit b5b901
    stream->connection_cb(stream, 0);
Packit b5b901
Packit b5b901
    if (stream->accepted_fd != -1) {
Packit b5b901
      /* The user hasn't yet accepted called uv_accept() */
Packit b5b901
      uv__io_stop(loop, &stream->io_watcher, POLLIN);
Packit b5b901
      return;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    if (stream->type == UV_TCP &&
Packit b5b901
        (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
Packit b5b901
      /* Give other processes a chance to accept connections. */
Packit b5b901
      struct timespec timeout = { 0, 1 };
Packit b5b901
      nanosleep(&timeout, NULL);
Packit b5b901
    }
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#undef UV_DEC_BACKLOG
Packit b5b901
Packit b5b901
Packit b5b901
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
  assert(server->loop == client->loop);
Packit b5b901
Packit b5b901
  if (server->accepted_fd == -1)
Packit b5b901
    return UV_EAGAIN;
Packit b5b901
Packit b5b901
  switch (client->type) {
Packit b5b901
    case UV_NAMED_PIPE:
Packit b5b901
    case UV_TCP:
Packit b5b901
      err = uv__stream_open(client,
Packit b5b901
                            server->accepted_fd,
Packit b5b901
                            UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
Packit b5b901
      if (err) {
Packit b5b901
        /* TODO handle error */
Packit b5b901
        uv__close(server->accepted_fd);
Packit b5b901
        goto done;
Packit b5b901
      }
Packit b5b901
      break;
Packit b5b901
Packit b5b901
    case UV_UDP:
Packit b5b901
      err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
Packit b5b901
      if (err) {
Packit b5b901
        uv__close(server->accepted_fd);
Packit b5b901
        goto done;
Packit b5b901
      }
Packit b5b901
      break;
Packit b5b901
Packit b5b901
    default:
Packit b5b901
      return UV_EINVAL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  client->flags |= UV_HANDLE_BOUND;
Packit b5b901
Packit b5b901
done:
Packit b5b901
  /* Process queued fds */
Packit b5b901
  if (server->queued_fds != NULL) {
Packit b5b901
    uv__stream_queued_fds_t* queued_fds;
Packit b5b901
Packit b5b901
    queued_fds = server->queued_fds;
Packit b5b901
Packit b5b901
    /* Read first */
Packit b5b901
    server->accepted_fd = queued_fds->fds[0];
Packit b5b901
Packit b5b901
    /* All read, free */
Packit b5b901
    assert(queued_fds->offset > 0);
Packit b5b901
    if (--queued_fds->offset == 0) {
Packit b5b901
      uv__free(queued_fds);
Packit b5b901
      server->queued_fds = NULL;
Packit b5b901
    } else {
Packit b5b901
      /* Shift rest */
Packit b5b901
      memmove(queued_fds->fds,
Packit b5b901
              queued_fds->fds + 1,
Packit b5b901
              queued_fds->offset * sizeof(*queued_fds->fds));
Packit b5b901
    }
Packit b5b901
  } else {
Packit b5b901
    server->accepted_fd = -1;
Packit b5b901
    if (err == 0)
Packit b5b901
      uv__io_start(server->loop, &server->io_watcher, POLLIN);
Packit b5b901
  }
Packit b5b901
  return err;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
  switch (stream->type) {
Packit b5b901
  case UV_TCP:
Packit b5b901
    err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
Packit b5b901
    break;
Packit b5b901
Packit b5b901
  case UV_NAMED_PIPE:
Packit b5b901
    err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
Packit b5b901
    break;
Packit b5b901
Packit b5b901
  default:
Packit b5b901
    err = UV_EINVAL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (err == 0)
Packit b5b901
    uv__handle_start(stream);
Packit b5b901
Packit b5b901
  return err;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__drain(uv_stream_t* stream) {
Packit b5b901
  uv_shutdown_t* req;
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
  assert(QUEUE_EMPTY(&stream->write_queue));
Packit b5b901
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
Packit b5b901
  /* Shutdown? */
Packit b5b901
  if ((stream->flags & UV_HANDLE_SHUTTING) &&
Packit b5b901
      !(stream->flags & UV_HANDLE_CLOSING) &&
Packit b5b901
      !(stream->flags & UV_HANDLE_SHUT)) {
Packit b5b901
    assert(stream->shutdown_req);
Packit b5b901
Packit b5b901
    req = stream->shutdown_req;
Packit b5b901
    stream->shutdown_req = NULL;
Packit b5b901
    stream->flags &= ~UV_HANDLE_SHUTTING;
Packit b5b901
    uv__req_unregister(stream->loop, req);
Packit b5b901
Packit b5b901
    err = 0;
Packit b5b901
    if (shutdown(uv__stream_fd(stream), SHUT_WR))
Packit b5b901
      err = UV__ERR(errno);
Packit b5b901
Packit b5b901
    if (err == 0)
Packit b5b901
      stream->flags |= UV_HANDLE_SHUT;
Packit b5b901
Packit b5b901
    if (req->cb != NULL)
Packit b5b901
      req->cb(req, err);
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit Service e08953
static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
Packit Service e08953
  if (n == 1)
Packit Service e08953
    return write(fd, vec->iov_base, vec->iov_len);
Packit Service e08953
  else
Packit Service e08953
    return writev(fd, vec, n);
Packit Service e08953
}
Packit Service e08953
Packit Service e08953
Packit b5b901
static size_t uv__write_req_size(uv_write_t* req) {
Packit b5b901
  size_t size;
Packit b5b901
Packit b5b901
  assert(req->bufs != NULL);
Packit b5b901
  size = uv__count_bufs(req->bufs + req->write_index,
Packit b5b901
                        req->nbufs - req->write_index);
Packit b5b901
  assert(req->handle->write_queue_size >= size);
Packit b5b901
Packit b5b901
  return size;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit Service e08953
/* Returns 1 if all write request data has been written, or 0 if there is still
Packit Service e08953
 * more data to write.
Packit Service e08953
 *
Packit Service e08953
 * Note: the return value only says something about the *current* request.
Packit Service e08953
 * There may still be other write requests sitting in the queue.
Packit Service e08953
 */
Packit Service e08953
static int uv__write_req_update(uv_stream_t* stream,
Packit Service e08953
                                uv_write_t* req,
Packit Service e08953
                                size_t n) {
Packit Service e08953
  uv_buf_t* buf;
Packit Service e08953
  size_t len;
Packit Service e08953
Packit Service e08953
  assert(n <= stream->write_queue_size);
Packit Service e08953
  stream->write_queue_size -= n;
Packit Service e08953
Packit Service e08953
  buf = req->bufs + req->write_index;
Packit Service e08953
Packit Service e08953
  do {
Packit Service e08953
    len = n < buf->len ? n : buf->len;
Packit Service e08953
    buf->base += len;
Packit Service e08953
    buf->len -= len;
Packit Service e08953
    buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
Packit Service e08953
    n -= len;
Packit Service e08953
  } while (n > 0);
Packit Service e08953
Packit Service e08953
  req->write_index = buf - req->bufs;
Packit Service e08953
Packit Service e08953
  return req->write_index == req->nbufs;
Packit Service e08953
}
Packit Service e08953
Packit Service e08953
Packit b5b901
static void uv__write_req_finish(uv_write_t* req) {
Packit b5b901
  uv_stream_t* stream = req->handle;
Packit b5b901
Packit b5b901
  /* Pop the req off tcp->write_queue. */
Packit b5b901
  QUEUE_REMOVE(&req->queue);
Packit b5b901
Packit b5b901
  /* Only free when there was no error. On error, we touch up write_queue_size
Packit b5b901
   * right before making the callback. The reason we don't do that right away
Packit b5b901
   * is that a write_queue_size > 0 is our only way to signal to the user that
Packit b5b901
   * they should stop writing - which they should if we got an error. Something
Packit b5b901
   * to revisit in future revisions of the libuv API.
Packit b5b901
   */
Packit b5b901
  if (req->error == 0) {
Packit b5b901
    if (req->bufs != req->bufsml)
Packit b5b901
      uv__free(req->bufs);
Packit b5b901
    req->bufs = NULL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  /* Add it to the write_completed_queue where it will have its
Packit b5b901
   * callback called in the near future.
Packit b5b901
   */
Packit b5b901
  QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
Packit b5b901
  uv__io_feed(stream->loop, &stream->io_watcher);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static int uv__handle_fd(uv_handle_t* handle) {
Packit b5b901
  switch (handle->type) {
Packit b5b901
    case UV_NAMED_PIPE:
Packit b5b901
    case UV_TCP:
Packit b5b901
      return ((uv_stream_t*) handle)->io_watcher.fd;
Packit b5b901
Packit b5b901
    case UV_UDP:
Packit b5b901
      return ((uv_udp_t*) handle)->io_watcher.fd;
Packit b5b901
Packit b5b901
    default:
Packit b5b901
      return -1;
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
static void uv__write(uv_stream_t* stream) {
Packit b5b901
  struct iovec* iov;
Packit b5b901
  QUEUE* q;
Packit b5b901
  uv_write_t* req;
Packit b5b901
  int iovmax;
Packit b5b901
  int iovcnt;
Packit b5b901
  ssize_t n;
Packit b5b901
  int err;
Packit b5b901
Packit b5b901
start:
Packit b5b901
Packit b5b901
  assert(uv__stream_fd(stream) >= 0);
Packit b5b901
Packit b5b901
  if (QUEUE_EMPTY(&stream->write_queue))
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  q = QUEUE_HEAD(&stream->write_queue);
Packit b5b901
  req = QUEUE_DATA(q, uv_write_t, queue);
Packit b5b901
  assert(req->handle == stream);
Packit b5b901
Packit b5b901
  /*
Packit b5b901
   * Cast to iovec. We had to have our own uv_buf_t instead of iovec
Packit b5b901
   * because Windows's WSABUF is not an iovec.
Packit b5b901
   */
Packit b5b901
  assert(sizeof(uv_buf_t) == sizeof(struct iovec));
Packit b5b901
  iov = (struct iovec*) &(req->bufs[req->write_index]);
Packit b5b901
  iovcnt = req->nbufs - req->write_index;
Packit b5b901
Packit b5b901
  iovmax = uv__getiovmax();
Packit b5b901
Packit b5b901
  /* Limit iov count to avoid EINVALs from writev() */
Packit b5b901
  if (iovcnt > iovmax)
Packit b5b901
    iovcnt = iovmax;
Packit b5b901
Packit b5b901
  /*
Packit b5b901
   * Now do the actual writev. Note that we've been updating the pointers
Packit b5b901
   * inside the iov each time we write. So there is no need to offset it.
Packit b5b901
   */
Packit b5b901
Packit b5b901
  if (req->send_handle) {
Packit b5b901
    int fd_to_send;
Packit b5b901
    struct msghdr msg;
Packit b5b901
    struct cmsghdr *cmsg;
Packit b5b901
    union {
Packit b5b901
      char data[64];
Packit b5b901
      struct cmsghdr alias;
Packit b5b901
    } scratch;
Packit b5b901
Packit b5b901
    if (uv__is_closing(req->send_handle)) {
Packit b5b901
      err = UV_EBADF;
Packit b5b901
      goto error;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
Packit b5b901
Packit b5b901
    memset(&scratch, 0, sizeof(scratch));
Packit b5b901
Packit b5b901
    assert(fd_to_send >= 0);
Packit b5b901
Packit b5b901
    msg.msg_name = NULL;
Packit b5b901
    msg.msg_namelen = 0;
Packit b5b901
    msg.msg_iov = iov;
Packit b5b901
    msg.msg_iovlen = iovcnt;
Packit b5b901
    msg.msg_flags = 0;
Packit b5b901
Packit b5b901
    msg.msg_control = &scratch.alias;
Packit b5b901
    msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
Packit b5b901
Packit b5b901
    cmsg = CMSG_FIRSTHDR(&msg;;
Packit b5b901
    cmsg->cmsg_level = SOL_SOCKET;
Packit b5b901
    cmsg->cmsg_type = SCM_RIGHTS;
Packit b5b901
    cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
Packit b5b901
Packit b5b901
    /* silence aliasing warning */
Packit b5b901
    {
Packit b5b901
      void* pv = CMSG_DATA(cmsg);
Packit b5b901
      int* pi = pv;
Packit b5b901
      *pi = fd_to_send;
Packit b5b901
    }
Packit b5b901
Packit Service e08953
    do
Packit b5b901
      n = sendmsg(uv__stream_fd(stream), &msg, 0);
Packit Service e08953
    while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
Packit b5b901
Packit Service e08953
    /* Ensure the handle isn't sent again in case this is a partial write. */
Packit Service e08953
    if (n >= 0)
Packit Service e08953
      req->send_handle = NULL;
Packit b5b901
  } else {
Packit Service e08953
    do
Packit Service e08953
      n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
Packit Service e08953
    while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
Packit Service e08953
  }
Packit b5b901
Packit Service e08953
  if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
Packit Service e08953
    err = UV__ERR(errno);
Packit Service e08953
    goto error;
Packit b5b901
  }
Packit b5b901
Packit Service e08953
  if (n >= 0 && uv__write_req_update(stream, req, n)) {
Packit Service e08953
    uv__write_req_finish(req);
Packit Service e08953
    return;  /* TODO(bnoordhuis) Start trying to write the next request. */
Packit Service e08953
  }
Packit b5b901
Packit Service e08953
  /* If this is a blocking stream, try again. */
Packit Service e08953
  if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
Packit Service e08953
    goto start;
Packit b5b901
Packit b5b901
  /* We're not done. */
Packit b5b901
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
Packit b5b901
  /* Notify select() thread about state change */
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
Packit b5b901
  return;
Packit b5b901
Packit b5b901
error:
Packit b5b901
  req->error = err;
Packit b5b901
  uv__write_req_finish(req);
Packit b5b901
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
  if (!uv__io_active(&stream->io_watcher, POLLIN))
Packit b5b901
    uv__handle_stop(stream);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__write_callbacks(uv_stream_t* stream) {
Packit b5b901
  uv_write_t* req;
Packit b5b901
  QUEUE* q;
Packit b5b901
  QUEUE pq;
Packit b5b901
Packit b5b901
  if (QUEUE_EMPTY(&stream->write_completed_queue))
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  QUEUE_MOVE(&stream->write_completed_queue, &pq;;
Packit b5b901
Packit b5b901
  while (!QUEUE_EMPTY(&pq)) {
Packit b5b901
    /* Pop a req off write_completed_queue. */
Packit b5b901
    q = QUEUE_HEAD(&pq;;
Packit b5b901
    req = QUEUE_DATA(q, uv_write_t, queue);
Packit b5b901
    QUEUE_REMOVE(q);
Packit b5b901
    uv__req_unregister(stream->loop, req);
Packit b5b901
Packit b5b901
    if (req->bufs != NULL) {
Packit b5b901
      stream->write_queue_size -= uv__write_req_size(req);
Packit b5b901
      if (req->bufs != req->bufsml)
Packit b5b901
        uv__free(req->bufs);
Packit b5b901
      req->bufs = NULL;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    /* NOTE: call callback AFTER freeing the request data. */
Packit b5b901
    if (req->cb)
Packit b5b901
      req->cb(req, req->error);
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
uv_handle_type uv__handle_type(int fd) {
Packit b5b901
  struct sockaddr_storage ss;
Packit b5b901
  socklen_t sslen;
Packit b5b901
  socklen_t len;
Packit b5b901
  int type;
Packit b5b901
Packit b5b901
  memset(&ss, 0, sizeof(ss));
Packit b5b901
  sslen = sizeof(ss);
Packit b5b901
Packit b5b901
  if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
Packit b5b901
    return UV_UNKNOWN_HANDLE;
Packit b5b901
Packit b5b901
  len = sizeof type;
Packit b5b901
Packit b5b901
  if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
Packit b5b901
    return UV_UNKNOWN_HANDLE;
Packit b5b901
Packit b5b901
  if (type == SOCK_STREAM) {
Packit b5b901
#if defined(_AIX) || defined(__DragonFly__)
Packit b5b901
    /* on AIX/DragonFly the getsockname call returns an empty sa structure
Packit b5b901
     * for sockets of type AF_UNIX.  For all other types it will
Packit b5b901
     * return a properly filled in structure.
Packit b5b901
     */
Packit b5b901
    if (sslen == 0)
Packit b5b901
      return UV_NAMED_PIPE;
Packit b5b901
#endif
Packit b5b901
    switch (ss.ss_family) {
Packit b5b901
      case AF_UNIX:
Packit b5b901
        return UV_NAMED_PIPE;
Packit b5b901
      case AF_INET:
Packit b5b901
      case AF_INET6:
Packit b5b901
        return UV_TCP;
Packit b5b901
      }
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (type == SOCK_DGRAM &&
Packit b5b901
      (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
Packit b5b901
    return UV_UDP;
Packit b5b901
Packit b5b901
  return UV_UNKNOWN_HANDLE;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
Packit b5b901
  stream->flags |= UV_HANDLE_READ_EOF;
Packit Service e08953
  stream->flags &= ~UV_HANDLE_READING;
Packit b5b901
  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
  if (!uv__io_active(&stream->io_watcher, POLLOUT))
Packit b5b901
    uv__handle_stop(stream);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
  stream->read_cb(stream, UV_EOF, buf);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
Packit b5b901
  uv__stream_queued_fds_t* queued_fds;
Packit b5b901
  unsigned int queue_size;
Packit b5b901
Packit b5b901
  queued_fds = stream->queued_fds;
Packit b5b901
  if (queued_fds == NULL) {
Packit b5b901
    queue_size = 8;
Packit b5b901
    queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
Packit b5b901
                            sizeof(*queued_fds));
Packit b5b901
    if (queued_fds == NULL)
Packit b5b901
      return UV_ENOMEM;
Packit b5b901
    queued_fds->size = queue_size;
Packit b5b901
    queued_fds->offset = 0;
Packit b5b901
    stream->queued_fds = queued_fds;
Packit b5b901
Packit b5b901
    /* Grow */
Packit b5b901
  } else if (queued_fds->size == queued_fds->offset) {
Packit b5b901
    queue_size = queued_fds->size + 8;
Packit b5b901
    queued_fds = uv__realloc(queued_fds,
Packit b5b901
                             (queue_size - 1) * sizeof(*queued_fds->fds) +
Packit b5b901
                              sizeof(*queued_fds));
Packit b5b901
Packit b5b901
    /*
Packit b5b901
     * Allocation failure, report back.
Packit b5b901
     * NOTE: if it is fatal - sockets will be closed in uv__stream_close
Packit b5b901
     */
Packit b5b901
    if (queued_fds == NULL)
Packit b5b901
      return UV_ENOMEM;
Packit b5b901
    queued_fds->size = queue_size;
Packit b5b901
    stream->queued_fds = queued_fds;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  /* Put fd in a queue */
Packit b5b901
  queued_fds->fds[queued_fds->offset++] = fd;
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit Service e08953
#if defined(__PASE__)
Packit Service e08953
/* on IBMi PASE the control message length can not exceed 256. */
Packit Service e08953
# define UV__CMSG_FD_COUNT 60
Packit Service e08953
#else
Packit Service e08953
# define UV__CMSG_FD_COUNT 64
Packit Service e08953
#endif
Packit b5b901
#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
Packit b5b901
Packit b5b901
Packit b5b901
static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
Packit b5b901
  struct cmsghdr* cmsg;
Packit b5b901
Packit b5b901
  for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
Packit b5b901
    char* start;
Packit b5b901
    char* end;
Packit b5b901
    int err;
Packit b5b901
    void* pv;
Packit b5b901
    int* pi;
Packit b5b901
    unsigned int i;
Packit b5b901
    unsigned int count;
Packit b5b901
Packit b5b901
    if (cmsg->cmsg_type != SCM_RIGHTS) {
Packit b5b901
      fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
Packit b5b901
          cmsg->cmsg_type);
Packit b5b901
      continue;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    /* silence aliasing warning */
Packit b5b901
    pv = CMSG_DATA(cmsg);
Packit b5b901
    pi = pv;
Packit b5b901
Packit b5b901
    /* Count available fds */
Packit b5b901
    start = (char*) cmsg;
Packit b5b901
    end = (char*) cmsg + cmsg->cmsg_len;
Packit b5b901
    count = 0;
Packit b5b901
    while (start + CMSG_LEN(count * sizeof(*pi)) < end)
Packit b5b901
      count++;
Packit b5b901
    assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
Packit b5b901
Packit b5b901
    for (i = 0; i < count; i++) {
Packit b5b901
      /* Already has accepted fd, queue now */
Packit b5b901
      if (stream->accepted_fd != -1) {
Packit b5b901
        err = uv__stream_queue_fd(stream, pi[i]);
Packit b5b901
        if (err != 0) {
Packit b5b901
          /* Close rest */
Packit b5b901
          for (; i < count; i++)
Packit b5b901
            uv__close(pi[i]);
Packit b5b901
          return err;
Packit b5b901
        }
Packit b5b901
      } else {
Packit b5b901
        stream->accepted_fd = pi[i];
Packit b5b901
      }
Packit b5b901
    }
Packit b5b901
  }
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#ifdef __clang__
Packit b5b901
# pragma clang diagnostic push
Packit b5b901
# pragma clang diagnostic ignored "-Wgnu-folding-constant"
Packit b5b901
# pragma clang diagnostic ignored "-Wvla-extension"
Packit b5b901
#endif
Packit b5b901
Packit b5b901
static void uv__read(uv_stream_t* stream) {
Packit b5b901
  uv_buf_t buf;
Packit b5b901
  ssize_t nread;
Packit b5b901
  struct msghdr msg;
Packit b5b901
  char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
Packit b5b901
  int count;
Packit b5b901
  int err;
Packit b5b901
  int is_ipc;
Packit b5b901
Packit b5b901
  stream->flags &= ~UV_HANDLE_READ_PARTIAL;
Packit b5b901
Packit b5b901
  /* Prevent loop starvation when the data comes in as fast as (or faster than)
Packit b5b901
   * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
Packit b5b901
   */
Packit b5b901
  count = 32;
Packit b5b901
Packit b5b901
  is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
Packit b5b901
Packit b5b901
  /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
Packit b5b901
   * tcp->read_cb is NULL or not?
Packit b5b901
   */
Packit b5b901
  while (stream->read_cb
Packit b5b901
      && (stream->flags & UV_HANDLE_READING)
Packit b5b901
      && (count-- > 0)) {
Packit b5b901
    assert(stream->alloc_cb != NULL);
Packit b5b901
Packit b5b901
    buf = uv_buf_init(NULL, 0);
Packit b5b901
    stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf;;
Packit b5b901
    if (buf.base == NULL || buf.len == 0) {
Packit b5b901
      /* User indicates it can't or won't handle the read. */
Packit b5b901
      stream->read_cb(stream, UV_ENOBUFS, &buf;;
Packit b5b901
      return;
Packit b5b901
    }
Packit b5b901
Packit b5b901
    assert(buf.base != NULL);
Packit b5b901
    assert(uv__stream_fd(stream) >= 0);
Packit b5b901
Packit b5b901
    if (!is_ipc) {
Packit b5b901
      do {
Packit b5b901
        nread = read(uv__stream_fd(stream), buf.base, buf.len);
Packit b5b901
      }
Packit b5b901
      while (nread < 0 && errno == EINTR);
Packit b5b901
    } else {
Packit b5b901
      /* ipc uses recvmsg */
Packit b5b901
      msg.msg_flags = 0;
Packit b5b901
      msg.msg_iov = (struct iovec*) &buf;
Packit b5b901
      msg.msg_iovlen = 1;
Packit b5b901
      msg.msg_name = NULL;
Packit b5b901
      msg.msg_namelen = 0;
Packit b5b901
      /* Set up to receive a descriptor even if one isn't in the message */
Packit b5b901
      msg.msg_controllen = sizeof(cmsg_space);
Packit b5b901
      msg.msg_control = cmsg_space;
Packit b5b901
Packit b5b901
      do {
Packit b5b901
        nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
Packit b5b901
      }
Packit b5b901
      while (nread < 0 && errno == EINTR);
Packit b5b901
    }
Packit b5b901
Packit b5b901
    if (nread < 0) {
Packit b5b901
      /* Error */
Packit b5b901
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
Packit b5b901
        /* Wait for the next one. */
Packit b5b901
        if (stream->flags & UV_HANDLE_READING) {
Packit b5b901
          uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
          uv__stream_osx_interrupt_select(stream);
Packit b5b901
        }
Packit b5b901
        stream->read_cb(stream, 0, &buf;;
Packit b5b901
#if defined(__CYGWIN__) || defined(__MSYS__)
Packit b5b901
      } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
Packit b5b901
        uv__stream_eof(stream, &buf;;
Packit b5b901
        return;
Packit b5b901
#endif
Packit b5b901
      } else {
Packit b5b901
        /* Error. User should call uv_close(). */
Packit b5b901
        stream->read_cb(stream, UV__ERR(errno), &buf;;
Packit b5b901
        if (stream->flags & UV_HANDLE_READING) {
Packit b5b901
          stream->flags &= ~UV_HANDLE_READING;
Packit b5b901
          uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
          if (!uv__io_active(&stream->io_watcher, POLLOUT))
Packit b5b901
            uv__handle_stop(stream);
Packit b5b901
          uv__stream_osx_interrupt_select(stream);
Packit b5b901
        }
Packit b5b901
      }
Packit b5b901
      return;
Packit b5b901
    } else if (nread == 0) {
Packit b5b901
      uv__stream_eof(stream, &buf;;
Packit b5b901
      return;
Packit b5b901
    } else {
Packit b5b901
      /* Successful read */
Packit b5b901
      ssize_t buflen = buf.len;
Packit b5b901
Packit b5b901
      if (is_ipc) {
Packit b5b901
        err = uv__stream_recv_cmsg(stream, &msg;;
Packit b5b901
        if (err != 0) {
Packit b5b901
          stream->read_cb(stream, err, &buf;;
Packit b5b901
          return;
Packit b5b901
        }
Packit b5b901
      }
Packit b5b901
Packit b5b901
#if defined(__MVS__)
Packit b5b901
      if (is_ipc && msg.msg_controllen > 0) {
Packit b5b901
        uv_buf_t blankbuf;
Packit b5b901
        int nread;
Packit b5b901
        struct iovec *old;
Packit b5b901
Packit b5b901
        blankbuf.base = 0;
Packit b5b901
        blankbuf.len = 0;
Packit b5b901
        old = msg.msg_iov;
Packit b5b901
        msg.msg_iov = (struct iovec*) &blankbuf;
Packit b5b901
        nread = 0;
Packit b5b901
        do {
Packit b5b901
          nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
Packit b5b901
          err = uv__stream_recv_cmsg(stream, &msg;;
Packit b5b901
          if (err != 0) {
Packit b5b901
            stream->read_cb(stream, err, &buf;;
Packit b5b901
            msg.msg_iov = old;
Packit b5b901
            return;
Packit b5b901
          }
Packit b5b901
        } while (nread == 0 && msg.msg_controllen > 0);
Packit b5b901
        msg.msg_iov = old;
Packit b5b901
      }
Packit b5b901
#endif
Packit b5b901
      stream->read_cb(stream, nread, &buf;;
Packit b5b901
Packit b5b901
      /* Return if we didn't fill the buffer, there is no more data to read. */
Packit b5b901
      if (nread < buflen) {
Packit b5b901
        stream->flags |= UV_HANDLE_READ_PARTIAL;
Packit b5b901
        return;
Packit b5b901
      }
Packit b5b901
    }
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#ifdef __clang__
Packit b5b901
# pragma clang diagnostic pop
Packit b5b901
#endif
Packit b5b901
Packit b5b901
#undef UV__CMSG_FD_COUNT
Packit b5b901
#undef UV__CMSG_FD_SIZE
Packit b5b901
Packit b5b901
Packit b5b901
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
Packit b5b901
  assert(stream->type == UV_TCP ||
Packit b5b901
         stream->type == UV_TTY ||
Packit b5b901
         stream->type == UV_NAMED_PIPE);
Packit b5b901
Packit b5b901
  if (!(stream->flags & UV_HANDLE_WRITABLE) ||
Packit b5b901
      stream->flags & UV_HANDLE_SHUT ||
Packit b5b901
      stream->flags & UV_HANDLE_SHUTTING ||
Packit b5b901
      uv__is_closing(stream)) {
Packit b5b901
    return UV_ENOTCONN;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  assert(uv__stream_fd(stream) >= 0);
Packit b5b901
Packit b5b901
  /* Initialize request */
Packit b5b901
  uv__req_init(stream->loop, req, UV_SHUTDOWN);
Packit b5b901
  req->handle = stream;
Packit b5b901
  req->cb = cb;
Packit b5b901
  stream->shutdown_req = req;
Packit b5b901
  stream->flags |= UV_HANDLE_SHUTTING;
Packit b5b901
Packit b5b901
  uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
Packit b5b901
  uv_stream_t* stream;
Packit b5b901
Packit b5b901
  stream = container_of(w, uv_stream_t, io_watcher);
Packit b5b901
Packit b5b901
  assert(stream->type == UV_TCP ||
Packit b5b901
         stream->type == UV_NAMED_PIPE ||
Packit b5b901
         stream->type == UV_TTY);
Packit b5b901
  assert(!(stream->flags & UV_HANDLE_CLOSING));
Packit b5b901
Packit b5b901
  if (stream->connect_req) {
Packit b5b901
    uv__stream_connect(stream);
Packit b5b901
    return;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  assert(uv__stream_fd(stream) >= 0);
Packit b5b901
Packit b5b901
  /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
Packit b5b901
  if (events & (POLLIN | POLLERR | POLLHUP))
Packit b5b901
    uv__read(stream);
Packit b5b901
Packit b5b901
  if (uv__stream_fd(stream) == -1)
Packit b5b901
    return;  /* read_cb closed stream. */
Packit b5b901
Packit b5b901
  /* Short-circuit iff POLLHUP is set, the user is still interested in read
Packit b5b901
   * events and uv__read() reported a partial read but not EOF. If the EOF
Packit b5b901
   * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
Packit b5b901
   * have to do anything. If the partial read flag is not set, we can't
Packit b5b901
   * report the EOF yet because there is still data to read.
Packit b5b901
   */
Packit b5b901
  if ((events & POLLHUP) &&
Packit b5b901
      (stream->flags & UV_HANDLE_READING) &&
Packit b5b901
      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
Packit b5b901
      !(stream->flags & UV_HANDLE_READ_EOF)) {
Packit b5b901
    uv_buf_t buf = { NULL, 0 };
Packit b5b901
    uv__stream_eof(stream, &buf;;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (uv__stream_fd(stream) == -1)
Packit b5b901
    return;  /* read_cb closed stream. */
Packit b5b901
Packit b5b901
  if (events & (POLLOUT | POLLERR | POLLHUP)) {
Packit b5b901
    uv__write(stream);
Packit b5b901
    uv__write_callbacks(stream);
Packit b5b901
Packit b5b901
    /* Write queue drained. */
Packit b5b901
    if (QUEUE_EMPTY(&stream->write_queue))
Packit b5b901
      uv__drain(stream);
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
/**
Packit b5b901
 * We get called here from directly following a call to connect(2).
Packit b5b901
 * In order to determine if we've errored out or succeeded must call
Packit b5b901
 * getsockopt.
Packit b5b901
 */
Packit b5b901
static void uv__stream_connect(uv_stream_t* stream) {
Packit b5b901
  int error;
Packit b5b901
  uv_connect_t* req = stream->connect_req;
Packit b5b901
  socklen_t errorsize = sizeof(int);
Packit b5b901
Packit b5b901
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
Packit b5b901
  assert(req);
Packit b5b901
Packit b5b901
  if (stream->delayed_error) {
Packit b5b901
    /* To smooth over the differences between unixes errors that
Packit b5b901
     * were reported synchronously on the first connect can be delayed
Packit b5b901
     * until the next tick--which is now.
Packit b5b901
     */
Packit b5b901
    error = stream->delayed_error;
Packit b5b901
    stream->delayed_error = 0;
Packit b5b901
  } else {
Packit b5b901
    /* Normal situation: we need to get the socket error from the kernel. */
Packit b5b901
    assert(uv__stream_fd(stream) >= 0);
Packit b5b901
    getsockopt(uv__stream_fd(stream),
Packit b5b901
               SOL_SOCKET,
Packit b5b901
               SO_ERROR,
Packit b5b901
               &error,
Packit b5b901
               &errorsize);
Packit b5b901
    error = UV__ERR(error);
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (error == UV__ERR(EINPROGRESS))
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  stream->connect_req = NULL;
Packit b5b901
  uv__req_unregister(stream->loop, req);
Packit b5b901
Packit b5b901
  if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
Packit b5b901
    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (req->cb)
Packit b5b901
    req->cb(req, error);
Packit b5b901
Packit b5b901
  if (uv__stream_fd(stream) == -1)
Packit b5b901
    return;
Packit b5b901
Packit b5b901
  if (error < 0) {
Packit b5b901
    uv__stream_flush_write_queue(stream, UV_ECANCELED);
Packit b5b901
    uv__write_callbacks(stream);
Packit b5b901
  }
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_write2(uv_write_t* req,
Packit b5b901
              uv_stream_t* stream,
Packit b5b901
              const uv_buf_t bufs[],
Packit b5b901
              unsigned int nbufs,
Packit b5b901
              uv_stream_t* send_handle,
Packit b5b901
              uv_write_cb cb) {
Packit b5b901
  int empty_queue;
Packit b5b901
Packit b5b901
  assert(nbufs > 0);
Packit b5b901
  assert((stream->type == UV_TCP ||
Packit b5b901
          stream->type == UV_NAMED_PIPE ||
Packit b5b901
          stream->type == UV_TTY) &&
Packit b5b901
         "uv_write (unix) does not yet support other types of streams");
Packit b5b901
Packit b5b901
  if (uv__stream_fd(stream) < 0)
Packit b5b901
    return UV_EBADF;
Packit b5b901
Packit b5b901
  if (!(stream->flags & UV_HANDLE_WRITABLE))
Packit Service e08953
    return UV_EPIPE;
Packit b5b901
Packit b5b901
  if (send_handle) {
Packit b5b901
    if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
Packit b5b901
      return UV_EINVAL;
Packit b5b901
Packit b5b901
    /* XXX We abuse uv_write2() to send over UDP handles to child processes.
Packit b5b901
     * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
Packit b5b901
     * evaluates to a function that operates on a uv_stream_t with a couple of
Packit b5b901
     * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
Packit b5b901
     * which works but only by accident.
Packit b5b901
     */
Packit b5b901
    if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
Packit b5b901
      return UV_EBADF;
Packit b5b901
Packit b5b901
#if defined(__CYGWIN__) || defined(__MSYS__)
Packit b5b901
    /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
Packit b5b901
       See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
Packit b5b901
    return UV_ENOSYS;
Packit b5b901
#endif
Packit b5b901
  }
Packit b5b901
Packit b5b901
  /* It's legal for write_queue_size > 0 even when the write_queue is empty;
Packit b5b901
   * it means there are error-state requests in the write_completed_queue that
Packit b5b901
   * will touch up write_queue_size later, see also uv__write_req_finish().
Packit b5b901
   * We could check that write_queue is empty instead but that implies making
Packit b5b901
   * a write() syscall when we know that the handle is in error mode.
Packit b5b901
   */
Packit b5b901
  empty_queue = (stream->write_queue_size == 0);
Packit b5b901
Packit b5b901
  /* Initialize the req */
Packit b5b901
  uv__req_init(stream->loop, req, UV_WRITE);
Packit b5b901
  req->cb = cb;
Packit b5b901
  req->handle = stream;
Packit b5b901
  req->error = 0;
Packit b5b901
  req->send_handle = send_handle;
Packit b5b901
  QUEUE_INIT(&req->queue);
Packit b5b901
Packit b5b901
  req->bufs = req->bufsml;
Packit b5b901
  if (nbufs > ARRAY_SIZE(req->bufsml))
Packit b5b901
    req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
Packit b5b901
Packit b5b901
  if (req->bufs == NULL)
Packit b5b901
    return UV_ENOMEM;
Packit b5b901
Packit b5b901
  memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
Packit b5b901
  req->nbufs = nbufs;
Packit b5b901
  req->write_index = 0;
Packit b5b901
  stream->write_queue_size += uv__count_bufs(bufs, nbufs);
Packit b5b901
Packit b5b901
  /* Append the request to write_queue. */
Packit b5b901
  QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
Packit b5b901
Packit b5b901
  /* If the queue was empty when this function began, we should attempt to
Packit b5b901
   * do the write immediately. Otherwise start the write_watcher and wait
Packit b5b901
   * for the fd to become writable.
Packit b5b901
   */
Packit b5b901
  if (stream->connect_req) {
Packit b5b901
    /* Still connecting, do nothing. */
Packit b5b901
  }
Packit b5b901
  else if (empty_queue) {
Packit b5b901
    uv__write(stream);
Packit b5b901
  }
Packit b5b901
  else {
Packit b5b901
    /*
Packit b5b901
     * blocking streams should never have anything in the queue.
Packit b5b901
     * if this assert fires then somehow the blocking stream isn't being
Packit b5b901
     * sufficiently flushed in uv__write.
Packit b5b901
     */
Packit b5b901
    assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
Packit b5b901
    uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
    uv__stream_osx_interrupt_select(stream);
Packit b5b901
  }
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
/* The buffers to be written must remain valid until the callback is called.
Packit b5b901
 * This is not required for the uv_buf_t array.
Packit b5b901
 */
Packit b5b901
int uv_write(uv_write_t* req,
Packit b5b901
             uv_stream_t* handle,
Packit b5b901
             const uv_buf_t bufs[],
Packit b5b901
             unsigned int nbufs,
Packit b5b901
             uv_write_cb cb) {
Packit b5b901
  return uv_write2(req, handle, bufs, nbufs, NULL, cb);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
void uv_try_write_cb(uv_write_t* req, int status) {
Packit b5b901
  /* Should not be called */
Packit b5b901
  abort();
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_try_write(uv_stream_t* stream,
Packit b5b901
                 const uv_buf_t bufs[],
Packit b5b901
                 unsigned int nbufs) {
Packit b5b901
  int r;
Packit b5b901
  int has_pollout;
Packit b5b901
  size_t written;
Packit b5b901
  size_t req_size;
Packit b5b901
  uv_write_t req;
Packit b5b901
Packit b5b901
  /* Connecting or already writing some data */
Packit b5b901
  if (stream->connect_req != NULL || stream->write_queue_size != 0)
Packit b5b901
    return UV_EAGAIN;
Packit b5b901
Packit b5b901
  has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
Packit b5b901
Packit b5b901
  r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
Packit b5b901
  if (r != 0)
Packit b5b901
    return r;
Packit b5b901
Packit b5b901
  /* Remove not written bytes from write queue size */
Packit b5b901
  written = uv__count_bufs(bufs, nbufs);
Packit b5b901
  if (req.bufs != NULL)
Packit b5b901
    req_size = uv__write_req_size(&req;;
Packit b5b901
  else
Packit b5b901
    req_size = 0;
Packit b5b901
  written -= req_size;
Packit b5b901
  stream->write_queue_size -= req_size;
Packit b5b901
Packit b5b901
  /* Unqueue request, regardless of immediateness */
Packit b5b901
  QUEUE_REMOVE(&req.queue);
Packit b5b901
  uv__req_unregister(stream->loop, &req;;
Packit b5b901
  if (req.bufs != req.bufsml)
Packit b5b901
    uv__free(req.bufs);
Packit b5b901
  req.bufs = NULL;
Packit b5b901
Packit b5b901
  /* Do not poll for writable, if we wasn't before calling this */
Packit b5b901
  if (!has_pollout) {
Packit b5b901
    uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
Packit b5b901
    uv__stream_osx_interrupt_select(stream);
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (written == 0 && req_size != 0)
Packit Service e08953
    return req.error < 0 ? req.error : UV_EAGAIN;
Packit b5b901
  else
Packit b5b901
    return written;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_read_start(uv_stream_t* stream,
Packit b5b901
                  uv_alloc_cb alloc_cb,
Packit b5b901
                  uv_read_cb read_cb) {
Packit b5b901
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
Packit b5b901
      stream->type == UV_TTY);
Packit b5b901
Packit b5b901
  if (stream->flags & UV_HANDLE_CLOSING)
Packit b5b901
    return UV_EINVAL;
Packit b5b901
Packit b5b901
  if (!(stream->flags & UV_HANDLE_READABLE))
Packit Service e08953
    return UV_ENOTCONN;
Packit b5b901
Packit b5b901
  /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
Packit b5b901
   * expresses the desired state of the user.
Packit b5b901
   */
Packit b5b901
  stream->flags |= UV_HANDLE_READING;
Packit b5b901
Packit b5b901
  /* TODO: try to do the read inline? */
Packit b5b901
  /* TODO: keep track of tcp state. If we've gotten a EOF then we should
Packit b5b901
   * not start the IO watcher.
Packit b5b901
   */
Packit b5b901
  assert(uv__stream_fd(stream) >= 0);
Packit b5b901
  assert(alloc_cb);
Packit b5b901
Packit b5b901
  stream->read_cb = read_cb;
Packit b5b901
  stream->alloc_cb = alloc_cb;
Packit b5b901
Packit b5b901
  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
  uv__handle_start(stream);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_read_stop(uv_stream_t* stream) {
Packit b5b901
  if (!(stream->flags & UV_HANDLE_READING))
Packit b5b901
    return 0;
Packit b5b901
Packit b5b901
  stream->flags &= ~UV_HANDLE_READING;
Packit b5b901
  uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
Packit b5b901
  if (!uv__io_active(&stream->io_watcher, POLLOUT))
Packit b5b901
    uv__handle_stop(stream);
Packit b5b901
  uv__stream_osx_interrupt_select(stream);
Packit b5b901
Packit b5b901
  stream->read_cb = NULL;
Packit b5b901
  stream->alloc_cb = NULL;
Packit b5b901
  return 0;
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_is_readable(const uv_stream_t* stream) {
Packit b5b901
  return !!(stream->flags & UV_HANDLE_READABLE);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_is_writable(const uv_stream_t* stream) {
Packit b5b901
  return !!(stream->flags & UV_HANDLE_WRITABLE);
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
int uv___stream_fd(const uv_stream_t* handle) {
Packit b5b901
  const uv__stream_select_t* s;
Packit b5b901
Packit b5b901
  assert(handle->type == UV_TCP ||
Packit b5b901
         handle->type == UV_TTY ||
Packit b5b901
         handle->type == UV_NAMED_PIPE);
Packit b5b901
Packit b5b901
  s = handle->select;
Packit b5b901
  if (s != NULL)
Packit b5b901
    return s->fd;
Packit b5b901
Packit b5b901
  return handle->io_watcher.fd;
Packit b5b901
}
Packit b5b901
#endif /* defined(__APPLE__) */
Packit b5b901
Packit b5b901
Packit b5b901
void uv__stream_close(uv_stream_t* handle) {
Packit b5b901
  unsigned int i;
Packit b5b901
  uv__stream_queued_fds_t* queued_fds;
Packit b5b901
Packit b5b901
#if defined(__APPLE__)
Packit b5b901
  /* Terminate select loop first */
Packit b5b901
  if (handle->select != NULL) {
Packit b5b901
    uv__stream_select_t* s;
Packit b5b901
Packit b5b901
    s = handle->select;
Packit b5b901
Packit b5b901
    uv_sem_post(&s->close_sem);
Packit b5b901
    uv_sem_post(&s->async_sem);
Packit b5b901
    uv__stream_osx_interrupt_select(handle);
Packit b5b901
    uv_thread_join(&s->thread);
Packit b5b901
    uv_sem_destroy(&s->close_sem);
Packit b5b901
    uv_sem_destroy(&s->async_sem);
Packit b5b901
    uv__close(s->fake_fd);
Packit b5b901
    uv__close(s->int_fd);
Packit b5b901
    uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
Packit b5b901
Packit b5b901
    handle->select = NULL;
Packit b5b901
  }
Packit b5b901
#endif /* defined(__APPLE__) */
Packit b5b901
Packit b5b901
  uv__io_close(handle->loop, &handle->io_watcher);
Packit b5b901
  uv_read_stop(handle);
Packit b5b901
  uv__handle_stop(handle);
Packit b5b901
  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
Packit b5b901
Packit b5b901
  if (handle->io_watcher.fd != -1) {
Packit b5b901
    /* Don't close stdio file descriptors.  Nothing good comes from it. */
Packit b5b901
    if (handle->io_watcher.fd > STDERR_FILENO)
Packit b5b901
      uv__close(handle->io_watcher.fd);
Packit b5b901
    handle->io_watcher.fd = -1;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  if (handle->accepted_fd != -1) {
Packit b5b901
    uv__close(handle->accepted_fd);
Packit b5b901
    handle->accepted_fd = -1;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  /* Close all queued fds */
Packit b5b901
  if (handle->queued_fds != NULL) {
Packit b5b901
    queued_fds = handle->queued_fds;
Packit b5b901
    for (i = 0; i < queued_fds->offset; i++)
Packit b5b901
      uv__close(queued_fds->fds[i]);
Packit b5b901
    uv__free(handle->queued_fds);
Packit b5b901
    handle->queued_fds = NULL;
Packit b5b901
  }
Packit b5b901
Packit b5b901
  assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
Packit b5b901
}
Packit b5b901
Packit b5b901
Packit b5b901
int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
Packit b5b901
  /* Don't need to check the file descriptor, uv__nonblock()
Packit b5b901
   * will fail with EBADF if it's not valid.
Packit b5b901
   */
Packit b5b901
  return uv__nonblock(uv__stream_fd(handle), !blocking);
Packit b5b901
}