Blame sysdeps/pthread/aio_misc.c

Packit 6c4009
/* Handle general operations.
Packit 6c4009
   Copyright (C) 1997-2018 Free Software Foundation, Inc.
Packit 6c4009
   This file is part of the GNU C Library.
Packit 6c4009
   Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
Packit 6c4009
Packit 6c4009
   The GNU C Library is free software; you can redistribute it and/or
Packit 6c4009
   modify it under the terms of the GNU Lesser General Public
Packit 6c4009
   License as published by the Free Software Foundation; either
Packit 6c4009
   version 2.1 of the License, or (at your option) any later version.
Packit 6c4009
Packit 6c4009
   The GNU C Library is distributed in the hope that it will be useful,
Packit 6c4009
   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 6c4009
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 6c4009
   Lesser General Public License for more details.
Packit 6c4009
Packit 6c4009
   You should have received a copy of the GNU Lesser General Public
Packit 6c4009
   License along with the GNU C Library; if not, see
Packit 6c4009
   <http://www.gnu.org/licenses/>.  */
Packit 6c4009
Packit 6c4009
#include <aio.h>
Packit 6c4009
#include <assert.h>
Packit 6c4009
#include <errno.h>
Packit 6c4009
#include <limits.h>
Packit 6c4009
#include <pthread.h>
Packit 6c4009
#include <stdlib.h>
Packit 6c4009
#include <unistd.h>
Packit 6c4009
#include <sys/param.h>
Packit 6c4009
#include <sys/stat.h>
Packit 6c4009
#include <sys/time.h>
Packit 6c4009
#include <aio_misc.h>
Packit 6c4009
Packit 6c4009
#ifndef aio_create_helper_thread
Packit 6c4009
# define aio_create_helper_thread __aio_create_helper_thread
Packit 6c4009
Packit 6c4009
extern inline int
Packit 6c4009
__aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
Packit 6c4009
{
Packit 6c4009
  pthread_attr_t attr;
Packit 6c4009
Packit 6c4009
  /* Make sure the thread is created detached.  */
Packit 6c4009
  pthread_attr_init (&attr);
Packit 6c4009
  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
Packit 6c4009
Packit 6c4009
  int ret = pthread_create (threadp, &attr, tf, arg);
Packit 6c4009
Packit 6c4009
  (void) pthread_attr_destroy (&attr);
Packit 6c4009
  return ret;
Packit 6c4009
}
Packit 6c4009
#endif
Packit 6c4009
Packit 6c4009
static void add_request_to_runlist (struct requestlist *newrequest);
Packit 6c4009
Packit 6c4009
/* Pool of request list entries.  */
Packit 6c4009
static struct requestlist **pool;
Packit 6c4009
Packit 6c4009
/* Number of total and allocated pool entries.  */
Packit 6c4009
static size_t pool_max_size;
Packit 6c4009
static size_t pool_size;
Packit 6c4009
Packit 6c4009
/* We implement a two dimensional array but allocate each row separately.
Packit 6c4009
   The macro below determines how many entries should be used per row.
Packit 6c4009
   It should better be a power of two.  */
Packit 6c4009
#define ENTRIES_PER_ROW	32
Packit 6c4009
Packit 6c4009
/* How many rows we allocate at once.  */
Packit 6c4009
#define ROWS_STEP	8
Packit 6c4009
Packit 6c4009
/* List of available entries.  */
Packit 6c4009
static struct requestlist *freelist;
Packit 6c4009
Packit 6c4009
/* List of request waiting to be processed.  */
Packit 6c4009
static struct requestlist *runlist;
Packit 6c4009
Packit 6c4009
/* Structure list of all currently processed requests.  */
Packit 6c4009
static struct requestlist *requests;
Packit 6c4009
Packit 6c4009
/* Number of threads currently running.  */
Packit 6c4009
static int nthreads;
Packit 6c4009
Packit 6c4009
/* Number of threads waiting for work to arrive. */
Packit 6c4009
static int idle_thread_count;
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* These are the values used to optimize the use of AIO.  The user can
Packit 6c4009
   overwrite them by using the `aio_init' function.  */
Packit 6c4009
static struct aioinit optim =
Packit 6c4009
{
Packit 6c4009
  20,	/* int aio_threads;	Maximal number of threads.  */
Packit 6c4009
  64,	/* int aio_num;		Number of expected simultaneous requests. */
Packit 6c4009
  0,
Packit 6c4009
  0,
Packit 6c4009
  0,
Packit 6c4009
  0,
Packit 6c4009
  1,
Packit 6c4009
  0
Packit 6c4009
};
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* Since the list is global we need a mutex protecting it.  */
Packit 6c4009
pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
Packit 6c4009
Packit 6c4009
/* When you add a request to the list and there are idle threads present,
Packit 6c4009
   you signal this condition variable. When a thread finishes work, it waits
Packit 6c4009
   on this condition variable for a time before it actually exits. */
Packit 6c4009
pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* Functions to handle request list pool.  */
Packit 6c4009
static struct requestlist *
Packit 6c4009
get_elem (void)
Packit 6c4009
{
Packit 6c4009
  struct requestlist *result;
Packit 6c4009
Packit 6c4009
  if (freelist == NULL)
Packit 6c4009
    {
Packit 6c4009
      struct requestlist *new_row;
Packit 6c4009
      int cnt;
Packit 6c4009
Packit 6c4009
      assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
Packit 6c4009
Packit 6c4009
      if (pool_size + 1 >= pool_max_size)
Packit 6c4009
	{
Packit 6c4009
	  size_t new_max_size = pool_max_size + ROWS_STEP;
Packit 6c4009
	  struct requestlist **new_tab;
Packit 6c4009
Packit 6c4009
	  new_tab = (struct requestlist **)
Packit 6c4009
	    realloc (pool, new_max_size * sizeof (struct requestlist *));
Packit 6c4009
Packit 6c4009
	  if (new_tab == NULL)
Packit 6c4009
	    return NULL;
Packit 6c4009
Packit 6c4009
	  pool_max_size = new_max_size;
Packit 6c4009
	  pool = new_tab;
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      /* Allocate the new row.  */
Packit 6c4009
      cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
Packit 6c4009
      new_row = (struct requestlist *) calloc (cnt,
Packit 6c4009
					       sizeof (struct requestlist));
Packit 6c4009
      if (new_row == NULL)
Packit 6c4009
	return NULL;
Packit 6c4009
Packit 6c4009
      pool[pool_size++] = new_row;
Packit 6c4009
Packit 6c4009
      /* Put all the new entries in the freelist.  */
Packit 6c4009
      do
Packit 6c4009
	{
Packit 6c4009
	  new_row->next_prio = freelist;
Packit 6c4009
	  freelist = new_row++;
Packit 6c4009
	}
Packit 6c4009
      while (--cnt > 0);
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  result = freelist;
Packit 6c4009
  freelist = freelist->next_prio;
Packit 6c4009
Packit 6c4009
  return result;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
void
Packit 6c4009
__aio_free_request (struct requestlist *elem)
Packit 6c4009
{
Packit 6c4009
  elem->running = no;
Packit 6c4009
  elem->next_prio = freelist;
Packit 6c4009
  freelist = elem;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
struct requestlist *
Packit 6c4009
__aio_find_req (aiocb_union *elem)
Packit 6c4009
{
Packit 6c4009
  struct requestlist *runp = requests;
Packit 6c4009
  int fildes = elem->aiocb.aio_fildes;
Packit 6c4009
Packit 6c4009
  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
Packit 6c4009
    runp = runp->next_fd;
Packit 6c4009
Packit 6c4009
  if (runp != NULL)
Packit 6c4009
    {
Packit 6c4009
      if (runp->aiocbp->aiocb.aio_fildes != fildes)
Packit 6c4009
	runp = NULL;
Packit 6c4009
      else
Packit 6c4009
	while (runp != NULL && runp->aiocbp != elem)
Packit 6c4009
	  runp = runp->next_prio;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  return runp;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
struct requestlist *
Packit 6c4009
__aio_find_req_fd (int fildes)
Packit 6c4009
{
Packit 6c4009
  struct requestlist *runp = requests;
Packit 6c4009
Packit 6c4009
  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
Packit 6c4009
    runp = runp->next_fd;
Packit 6c4009
Packit 6c4009
  return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
Packit 6c4009
	  ? runp : NULL);
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
void
Packit 6c4009
__aio_remove_request (struct requestlist *last, struct requestlist *req,
Packit 6c4009
		      int all)
Packit 6c4009
{
Packit 6c4009
  assert (req->running == yes || req->running == queued
Packit 6c4009
	  || req->running == done);
Packit 6c4009
Packit 6c4009
  if (last != NULL)
Packit 6c4009
    last->next_prio = all ? NULL : req->next_prio;
Packit 6c4009
  else
Packit 6c4009
    {
Packit 6c4009
      if (all || req->next_prio == NULL)
Packit 6c4009
	{
Packit 6c4009
	  if (req->last_fd != NULL)
Packit 6c4009
	    req->last_fd->next_fd = req->next_fd;
Packit 6c4009
	  else
Packit 6c4009
	    requests = req->next_fd;
Packit 6c4009
	  if (req->next_fd != NULL)
Packit 6c4009
	    req->next_fd->last_fd = req->last_fd;
Packit 6c4009
	}
Packit 6c4009
      else
Packit 6c4009
	{
Packit 6c4009
	  if (req->last_fd != NULL)
Packit 6c4009
	    req->last_fd->next_fd = req->next_prio;
Packit 6c4009
	  else
Packit 6c4009
	    requests = req->next_prio;
Packit 6c4009
Packit 6c4009
	  if (req->next_fd != NULL)
Packit 6c4009
	    req->next_fd->last_fd = req->next_prio;
Packit 6c4009
Packit 6c4009
	  req->next_prio->last_fd = req->last_fd;
Packit 6c4009
	  req->next_prio->next_fd = req->next_fd;
Packit 6c4009
Packit 6c4009
	  /* Mark this entry as runnable.  */
Packit 6c4009
	  req->next_prio->running = yes;
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      if (req->running == yes)
Packit 6c4009
	{
Packit 6c4009
	  struct requestlist *runp = runlist;
Packit 6c4009
Packit 6c4009
	  last = NULL;
Packit 6c4009
	  while (runp != NULL)
Packit 6c4009
	    {
Packit 6c4009
	      if (runp == req)
Packit 6c4009
		{
Packit 6c4009
		  if (last == NULL)
Packit 6c4009
		    runlist = runp->next_run;
Packit 6c4009
		  else
Packit 6c4009
		    last->next_run = runp->next_run;
Packit 6c4009
		  break;
Packit 6c4009
		}
Packit 6c4009
	      last = runp;
Packit 6c4009
	      runp = runp->next_run;
Packit 6c4009
	    }
Packit 6c4009
	}
Packit 6c4009
    }
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* The thread handler.  */
Packit 6c4009
static void *handle_fildes_io (void *arg);
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* User optimization.  */
Packit 6c4009
void
Packit 6c4009
__aio_init (const struct aioinit *init)
Packit 6c4009
{
Packit 6c4009
  /* Get the mutex.  */
Packit 6c4009
  pthread_mutex_lock (&__aio_requests_mutex);
Packit 6c4009
Packit 6c4009
  /* Only allow writing new values if the table is not yet allocated.  */
Packit 6c4009
  if (pool == NULL)
Packit 6c4009
    {
Packit 6c4009
      optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
Packit 6c4009
      assert (powerof2 (ENTRIES_PER_ROW));
Packit 6c4009
      optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
Packit 6c4009
		       ? ENTRIES_PER_ROW
Packit 6c4009
		       : init->aio_num & ~(ENTRIES_PER_ROW - 1));
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  if (init->aio_idle_time != 0)
Packit 6c4009
    optim.aio_idle_time = init->aio_idle_time;
Packit 6c4009
Packit 6c4009
  /* Release the mutex.  */
Packit 6c4009
  pthread_mutex_unlock (&__aio_requests_mutex);
Packit 6c4009
}
Packit 6c4009
weak_alias (__aio_init, aio_init)
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* The main function of the async I/O handling.  It enqueues requests
Packit 6c4009
   and if necessary starts and handles threads.  */
Packit 6c4009
struct requestlist *
Packit 6c4009
__aio_enqueue_request (aiocb_union *aiocbp, int operation)
Packit 6c4009
{
Packit 6c4009
  int result = 0;
Packit 6c4009
  int policy, prio;
Packit 6c4009
  struct sched_param param;
Packit 6c4009
  struct requestlist *last, *runp, *newp;
Packit 6c4009
  int running = no;
Packit 6c4009
Packit 6c4009
  if (operation == LIO_SYNC || operation == LIO_DSYNC)
Packit 6c4009
    aiocbp->aiocb.aio_reqprio = 0;
Packit 6c4009
  else if (aiocbp->aiocb.aio_reqprio < 0
Packit 6c4009
#ifdef AIO_PRIO_DELTA_MAX
Packit 6c4009
	   || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX
Packit 6c4009
#endif
Packit 6c4009
	   )
Packit 6c4009
    {
Packit 6c4009
      /* Invalid priority value.  */
Packit 6c4009
      __set_errno (EINVAL);
Packit 6c4009
      aiocbp->aiocb.__error_code = EINVAL;
Packit 6c4009
      aiocbp->aiocb.__return_value = -1;
Packit 6c4009
      return NULL;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* Compute priority for this request.  */
Packit 6c4009
  pthread_getschedparam (pthread_self (), &policy, ¶m;;
Packit 6c4009
  prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
Packit 6c4009
Packit 6c4009
  /* Get the mutex.  */
Packit 6c4009
  pthread_mutex_lock (&__aio_requests_mutex);
Packit 6c4009
Packit 6c4009
  last = NULL;
Packit 6c4009
  runp = requests;
Packit 6c4009
  /* First look whether the current file descriptor is currently
Packit 6c4009
     worked with.  */
Packit 6c4009
  while (runp != NULL
Packit 6c4009
	 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
Packit 6c4009
    {
Packit 6c4009
      last = runp;
Packit 6c4009
      runp = runp->next_fd;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* Get a new element for the waiting list.  */
Packit 6c4009
  newp = get_elem ();
Packit 6c4009
  if (newp == NULL)
Packit 6c4009
    {
Packit 6c4009
      pthread_mutex_unlock (&__aio_requests_mutex);
Packit 6c4009
      __set_errno (EAGAIN);
Packit 6c4009
      return NULL;
Packit 6c4009
    }
Packit 6c4009
  newp->aiocbp = aiocbp;
Packit 6c4009
  newp->waiting = NULL;
Packit 6c4009
Packit 6c4009
  aiocbp->aiocb.__abs_prio = prio;
Packit 6c4009
  aiocbp->aiocb.__policy = policy;
Packit 6c4009
  aiocbp->aiocb.aio_lio_opcode = operation;
Packit 6c4009
  aiocbp->aiocb.__error_code = EINPROGRESS;
Packit 6c4009
  aiocbp->aiocb.__return_value = 0;
Packit 6c4009
Packit 6c4009
  if (runp != NULL
Packit 6c4009
      && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
Packit 6c4009
    {
Packit 6c4009
      /* The current file descriptor is worked on.  It makes no sense
Packit 6c4009
	 to start another thread since this new thread would fight
Packit 6c4009
	 with the running thread for the resources.  But we also cannot
Packit 6c4009
	 say that the thread processing this desriptor shall immediately
Packit 6c4009
	 after finishing the current job process this request if there
Packit 6c4009
	 are other threads in the running queue which have a higher
Packit 6c4009
	 priority.  */
Packit 6c4009
Packit 6c4009
      /* Simply enqueue it after the running one according to the
Packit 6c4009
	 priority.  */
Packit 6c4009
      last = NULL;
Packit 6c4009
      while (runp->next_prio != NULL
Packit 6c4009
	     && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
Packit 6c4009
	{
Packit 6c4009
	  last = runp;
Packit 6c4009
	  runp = runp->next_prio;
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      newp->next_prio = runp->next_prio;
Packit 6c4009
      runp->next_prio = newp;
Packit 6c4009
Packit 6c4009
      running = queued;
Packit 6c4009
    }
Packit 6c4009
  else
Packit 6c4009
    {
Packit 6c4009
      running = yes;
Packit 6c4009
      /* Enqueue this request for a new descriptor.  */
Packit 6c4009
      if (last == NULL)
Packit 6c4009
	{
Packit 6c4009
	  newp->last_fd = NULL;
Packit 6c4009
	  newp->next_fd = requests;
Packit 6c4009
	  if (requests != NULL)
Packit 6c4009
	    requests->last_fd = newp;
Packit 6c4009
	  requests = newp;
Packit 6c4009
	}
Packit 6c4009
      else
Packit 6c4009
	{
Packit 6c4009
	  newp->next_fd = last->next_fd;
Packit 6c4009
	  newp->last_fd = last;
Packit 6c4009
	  last->next_fd = newp;
Packit 6c4009
	  if (newp->next_fd != NULL)
Packit 6c4009
	    newp->next_fd->last_fd = newp;
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      newp->next_prio = NULL;
Packit 6c4009
      last = NULL;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  if (running == yes)
Packit 6c4009
    {
Packit 6c4009
      /* We try to create a new thread for this file descriptor.  The
Packit 6c4009
	 function which gets called will handle all available requests
Packit 6c4009
	 for this descriptor and when all are processed it will
Packit 6c4009
	 terminate.
Packit 6c4009
Packit 6c4009
	 If no new thread can be created or if the specified limit of
Packit 6c4009
	 threads for AIO is reached we queue the request.  */
Packit 6c4009
Packit 6c4009
      /* See if we need to and are able to create a thread.  */
Packit 6c4009
      if (nthreads < optim.aio_threads && idle_thread_count == 0)
Packit 6c4009
	{
Packit 6c4009
	  pthread_t thid;
Packit 6c4009
Packit 6c4009
	  running = newp->running = allocated;
Packit 6c4009
Packit 6c4009
	  /* Now try to start a thread.  */
Packit 6c4009
	  result = aio_create_helper_thread (&thid, handle_fildes_io, newp);
Packit 6c4009
	  if (result == 0)
Packit 6c4009
	    /* We managed to enqueue the request.  All errors which can
Packit 6c4009
	       happen now can be recognized by calls to `aio_return' and
Packit 6c4009
	       `aio_error'.  */
Packit 6c4009
	    ++nthreads;
Packit 6c4009
	  else
Packit 6c4009
	    {
Packit 6c4009
	      /* Reset the running flag.  The new request is not running.  */
Packit 6c4009
	      running = newp->running = yes;
Packit 6c4009
Packit 6c4009
	      if (nthreads == 0)
Packit 6c4009
		{
Packit 6c4009
		  /* We cannot create a thread in the moment and there is
Packit 6c4009
		     also no thread running.  This is a problem.  `errno' is
Packit 6c4009
		     set to EAGAIN if this is only a temporary problem.  */
Packit 6c4009
		  __aio_remove_request (last, newp, 0);
Packit 6c4009
		}
Packit 6c4009
	      else
Packit 6c4009
		result = 0;
Packit 6c4009
	    }
Packit 6c4009
	}
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* Enqueue the request in the run queue if it is not yet running.  */
Packit 6c4009
  if (running == yes && result == 0)
Packit 6c4009
    {
Packit 6c4009
      add_request_to_runlist (newp);
Packit 6c4009
Packit 6c4009
      /* If there is a thread waiting for work, then let it know that we
Packit 6c4009
	 have just given it something to do. */
Packit 6c4009
      if (idle_thread_count > 0)
Packit 6c4009
	pthread_cond_signal (&__aio_new_request_notification);
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  if (result == 0)
Packit 6c4009
    newp->running = running;
Packit 6c4009
  else
Packit 6c4009
    {
Packit 6c4009
      /* Something went wrong.  */
Packit 6c4009
      __aio_free_request (newp);
Packit 6c4009
      aiocbp->aiocb.__error_code = result;
Packit 6c4009
      __set_errno (result);
Packit 6c4009
      newp = NULL;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* Release the mutex.  */
Packit 6c4009
  pthread_mutex_unlock (&__aio_requests_mutex);
Packit 6c4009
Packit 6c4009
  return newp;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
static void *
Packit 6c4009
handle_fildes_io (void *arg)
Packit 6c4009
{
Packit 6c4009
  pthread_t self = pthread_self ();
Packit 6c4009
  struct sched_param param;
Packit 6c4009
  struct requestlist *runp = (struct requestlist *) arg;
Packit 6c4009
  aiocb_union *aiocbp;
Packit 6c4009
  int policy;
Packit 6c4009
  int fildes;
Packit 6c4009
Packit 6c4009
  pthread_getschedparam (self, &policy, ¶m;;
Packit 6c4009
Packit 6c4009
  do
Packit 6c4009
    {
Packit 6c4009
      /* If runp is NULL, then we were created to service the work queue
Packit 6c4009
	 in general, not to handle any particular request. In that case we
Packit 6c4009
	 skip the "do work" stuff on the first pass, and go directly to the
Packit 6c4009
	 "get work off the work queue" part of this loop, which is near the
Packit 6c4009
	 end. */
Packit 6c4009
      if (runp == NULL)
Packit 6c4009
	pthread_mutex_lock (&__aio_requests_mutex);
Packit 6c4009
      else
Packit 6c4009
	{
Packit 6c4009
	  /* Hopefully this request is marked as running.  */
Packit 6c4009
	  assert (runp->running == allocated);
Packit 6c4009
Packit 6c4009
	  /* Update our variables.  */
Packit 6c4009
	  aiocbp = runp->aiocbp;
Packit 6c4009
	  fildes = aiocbp->aiocb.aio_fildes;
Packit 6c4009
Packit 6c4009
	  /* Change the priority to the requested value (if necessary).  */
Packit 6c4009
	  if (aiocbp->aiocb.__abs_prio != param.sched_priority
Packit 6c4009
	      || aiocbp->aiocb.__policy != policy)
Packit 6c4009
	    {
Packit 6c4009
	      param.sched_priority = aiocbp->aiocb.__abs_prio;
Packit 6c4009
	      policy = aiocbp->aiocb.__policy;
Packit 6c4009
	      pthread_setschedparam (self, policy, ¶m;;
Packit 6c4009
	    }
Packit 6c4009
Packit 6c4009
	  /* Process request pointed to by RUNP.  We must not be disturbed
Packit 6c4009
	     by signals.  */
Packit 6c4009
	  if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
Packit 6c4009
	    {
Packit 6c4009
	      if (sizeof (off_t) != sizeof (off64_t)
Packit 6c4009
		  && aiocbp->aiocb.aio_lio_opcode & 128)
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
Packit 6c4009
						 aiocbp->aiocb64.aio_buf,
Packit 6c4009
						 aiocbp->aiocb64.aio_nbytes,
Packit 6c4009
						 aiocbp->aiocb64.aio_offset));
Packit 6c4009
	      else
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (__libc_pread (fildes,
Packit 6c4009
						    (void *)
Packit 6c4009
						    aiocbp->aiocb.aio_buf,
Packit 6c4009
						    aiocbp->aiocb.aio_nbytes,
Packit 6c4009
						    aiocbp->aiocb.aio_offset));
Packit 6c4009
Packit 6c4009
	      if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
Packit 6c4009
		/* The Linux kernel is different from others.  It returns
Packit 6c4009
		   ESPIPE if using pread on a socket.  Other platforms
Packit 6c4009
		   simply ignore the offset parameter and behave like
Packit 6c4009
		   read.  */
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (read (fildes,
Packit 6c4009
					    (void *) aiocbp->aiocb64.aio_buf,
Packit 6c4009
					    aiocbp->aiocb64.aio_nbytes));
Packit 6c4009
	    }
Packit 6c4009
	  else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
Packit 6c4009
	    {
Packit 6c4009
	      if (sizeof (off_t) != sizeof (off64_t)
Packit 6c4009
		  && aiocbp->aiocb.aio_lio_opcode & 128)
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
Packit 6c4009
						  aiocbp->aiocb64.aio_buf,
Packit 6c4009
						  aiocbp->aiocb64.aio_nbytes,
Packit 6c4009
						  aiocbp->aiocb64.aio_offset));
Packit 6c4009
	      else
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (__libc_pwrite (fildes, (const void *)
Packit 6c4009
					      aiocbp->aiocb.aio_buf,
Packit 6c4009
					      aiocbp->aiocb.aio_nbytes,
Packit 6c4009
					      aiocbp->aiocb.aio_offset));
Packit 6c4009
Packit 6c4009
	      if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
Packit 6c4009
		/* The Linux kernel is different from others.  It returns
Packit 6c4009
		   ESPIPE if using pwrite on a socket.  Other platforms
Packit 6c4009
		   simply ignore the offset parameter and behave like
Packit 6c4009
		   write.  */
Packit 6c4009
		aiocbp->aiocb.__return_value =
Packit 6c4009
		  TEMP_FAILURE_RETRY (write (fildes,
Packit 6c4009
					     (void *) aiocbp->aiocb64.aio_buf,
Packit 6c4009
					     aiocbp->aiocb64.aio_nbytes));
Packit 6c4009
	    }
Packit 6c4009
	  else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
Packit 6c4009
	    aiocbp->aiocb.__return_value =
Packit 6c4009
	      TEMP_FAILURE_RETRY (fdatasync (fildes));
Packit 6c4009
	  else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
Packit 6c4009
	    aiocbp->aiocb.__return_value =
Packit 6c4009
	      TEMP_FAILURE_RETRY (fsync (fildes));
Packit 6c4009
	  else
Packit 6c4009
	    {
Packit 6c4009
	      /* This is an invalid opcode.  */
Packit 6c4009
	      aiocbp->aiocb.__return_value = -1;
Packit 6c4009
	      __set_errno (EINVAL);
Packit 6c4009
	    }
Packit 6c4009
Packit 6c4009
	  /* Get the mutex.  */
Packit 6c4009
	  pthread_mutex_lock (&__aio_requests_mutex);
Packit 6c4009
Packit 6c4009
	  if (aiocbp->aiocb.__return_value == -1)
Packit 6c4009
	    aiocbp->aiocb.__error_code = errno;
Packit 6c4009
	  else
Packit 6c4009
	    aiocbp->aiocb.__error_code = 0;
Packit 6c4009
Packit 6c4009
	  /* Send the signal to notify about finished processing of the
Packit 6c4009
	     request.  */
Packit 6c4009
	  __aio_notify (runp);
Packit 6c4009
Packit 6c4009
	  /* For debugging purposes we reset the running flag of the
Packit 6c4009
	     finished request.  */
Packit 6c4009
	  assert (runp->running == allocated);
Packit 6c4009
	  runp->running = done;
Packit 6c4009
Packit 6c4009
	  /* Now dequeue the current request.  */
Packit 6c4009
	  __aio_remove_request (NULL, runp, 0);
Packit 6c4009
	  if (runp->next_prio != NULL)
Packit 6c4009
	    add_request_to_runlist (runp->next_prio);
Packit 6c4009
Packit 6c4009
	  /* Free the old element.  */
Packit 6c4009
	  __aio_free_request (runp);
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      runp = runlist;
Packit 6c4009
Packit 6c4009
      /* If the runlist is empty, then we sleep for a while, waiting for
Packit 6c4009
	 something to arrive in it. */
Packit 6c4009
      if (runp == NULL && optim.aio_idle_time >= 0)
Packit 6c4009
	{
Packit 6c4009
	  struct timeval now;
Packit 6c4009
	  struct timespec wakeup_time;
Packit 6c4009
Packit 6c4009
	  ++idle_thread_count;
Packit 6c4009
	  __gettimeofday (&now, NULL);
Packit 6c4009
	  wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
Packit 6c4009
	  wakeup_time.tv_nsec = now.tv_usec * 1000;
Packit 6c4009
	  if (wakeup_time.tv_nsec >= 1000000000)
Packit 6c4009
	    {
Packit 6c4009
	      wakeup_time.tv_nsec -= 1000000000;
Packit 6c4009
	      ++wakeup_time.tv_sec;
Packit 6c4009
	    }
Packit 6c4009
	  pthread_cond_timedwait (&__aio_new_request_notification,
Packit 6c4009
				  &__aio_requests_mutex,
Packit 6c4009
				  &wakeup_time);
Packit 6c4009
	  --idle_thread_count;
Packit 6c4009
	  runp = runlist;
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      if (runp == NULL)
Packit 6c4009
	--nthreads;
Packit 6c4009
      else
Packit 6c4009
	{
Packit 6c4009
	  assert (runp->running == yes);
Packit 6c4009
	  runp->running = allocated;
Packit 6c4009
	  runlist = runp->next_run;
Packit 6c4009
Packit 6c4009
	  /* If we have a request to process, and there's still another in
Packit 6c4009
	     the run list, then we need to either wake up or create a new
Packit 6c4009
	     thread to service the request that is still in the run list. */
Packit 6c4009
	  if (runlist != NULL)
Packit 6c4009
	    {
Packit 6c4009
	      /* There are at least two items in the work queue to work on.
Packit 6c4009
		 If there are other idle threads, then we should wake them
Packit 6c4009
		 up for these other work elements; otherwise, we should try
Packit 6c4009
		 to create a new thread. */
Packit 6c4009
	      if (idle_thread_count > 0)
Packit 6c4009
		pthread_cond_signal (&__aio_new_request_notification);
Packit 6c4009
	      else if (nthreads < optim.aio_threads)
Packit 6c4009
		{
Packit 6c4009
		  pthread_t thid;
Packit 6c4009
		  pthread_attr_t attr;
Packit 6c4009
Packit 6c4009
		  /* Make sure the thread is created detached.  */
Packit 6c4009
		  pthread_attr_init (&attr);
Packit 6c4009
		  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
Packit 6c4009
Packit 6c4009
		  /* Now try to start a thread. If we fail, no big deal,
Packit 6c4009
		     because we know that there is at least one thread (us)
Packit 6c4009
		     that is working on AIO operations. */
Packit 6c4009
		  if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
Packit 6c4009
		      == 0)
Packit 6c4009
		    ++nthreads;
Packit 6c4009
		}
Packit 6c4009
	    }
Packit 6c4009
	}
Packit 6c4009
Packit 6c4009
      /* Release the mutex.  */
Packit 6c4009
      pthread_mutex_unlock (&__aio_requests_mutex);
Packit 6c4009
    }
Packit 6c4009
  while (runp != NULL);
Packit 6c4009
Packit 6c4009
  return NULL;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* Free allocated resources.  */
Packit 6c4009
libc_freeres_fn (free_res)
Packit 6c4009
{
Packit 6c4009
  size_t row;
Packit 6c4009
Packit 6c4009
  for (row = 0; row < pool_max_size; ++row)
Packit 6c4009
    free (pool[row]);
Packit 6c4009
Packit 6c4009
  free (pool);
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* Add newrequest to the runlist. The __abs_prio flag of newrequest must
Packit 6c4009
   be correctly set to do this. Also, you had better set newrequest's
Packit 6c4009
   "running" flag to "yes" before you release your lock or you'll throw an
Packit 6c4009
   assertion. */
Packit 6c4009
static void
Packit 6c4009
add_request_to_runlist (struct requestlist *newrequest)
Packit 6c4009
{
Packit 6c4009
  int prio = newrequest->aiocbp->aiocb.__abs_prio;
Packit 6c4009
  struct requestlist *runp;
Packit 6c4009
Packit 6c4009
  if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
Packit 6c4009
    {
Packit 6c4009
      newrequest->next_run = runlist;
Packit 6c4009
      runlist = newrequest;
Packit 6c4009
    }
Packit 6c4009
  else
Packit 6c4009
    {
Packit 6c4009
      runp = runlist;
Packit 6c4009
Packit 6c4009
      while (runp->next_run != NULL
Packit 6c4009
	     && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
Packit 6c4009
	runp = runp->next_run;
Packit 6c4009
Packit 6c4009
      newrequest->next_run = runp->next_run;
Packit 6c4009
      runp->next_run = newrequest;
Packit 6c4009
    }
Packit 6c4009
}