Blame sysdeps/pthread/aio_misc.c

Packit Service 82fcde
/* Handle general operations.
Packit Service 82fcde
   Copyright (C) 1997-2018 Free Software Foundation, Inc.
Packit Service 82fcde
   This file is part of the GNU C Library.
Packit Service 82fcde
   Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
Packit Service 82fcde
Packit Service 82fcde
   The GNU C Library is free software; you can redistribute it and/or
Packit Service 82fcde
   modify it under the terms of the GNU Lesser General Public
Packit Service 82fcde
   License as published by the Free Software Foundation; either
Packit Service 82fcde
   version 2.1 of the License, or (at your option) any later version.
Packit Service 82fcde
Packit Service 82fcde
   The GNU C Library is distributed in the hope that it will be useful,
Packit Service 82fcde
   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 82fcde
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit Service 82fcde
   Lesser General Public License for more details.
Packit Service 82fcde
Packit Service 82fcde
   You should have received a copy of the GNU Lesser General Public
Packit Service 82fcde
   License along with the GNU C Library; if not, see
Packit Service 82fcde
   <http://www.gnu.org/licenses/>.  */
Packit Service 82fcde
Packit Service 82fcde
#include <aio.h>
Packit Service 82fcde
#include <assert.h>
Packit Service 82fcde
#include <errno.h>
Packit Service 82fcde
#include <limits.h>
Packit Service 82fcde
#include <pthread.h>
Packit Service 82fcde
#include <stdlib.h>
Packit Service 82fcde
#include <unistd.h>
Packit Service 82fcde
#include <sys/param.h>
Packit Service 82fcde
#include <sys/stat.h>
Packit Service 82fcde
#include <sys/time.h>
Packit Service 82fcde
#include <aio_misc.h>
Packit Service 82fcde
Packit Service 82fcde
#ifndef aio_create_helper_thread
Packit Service 82fcde
# define aio_create_helper_thread __aio_create_helper_thread
Packit Service 82fcde
Packit Service 82fcde
extern inline int
Packit Service 82fcde
__aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
Packit Service 82fcde
{
Packit Service 82fcde
  pthread_attr_t attr;
Packit Service 82fcde
Packit Service 82fcde
  /* Make sure the thread is created detached.  */
Packit Service 82fcde
  pthread_attr_init (&attr);
Packit Service 82fcde
  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
Packit Service 82fcde
Packit Service 82fcde
  int ret = pthread_create (threadp, &attr, tf, arg);
Packit Service 82fcde
Packit Service 82fcde
  (void) pthread_attr_destroy (&attr);
Packit Service 82fcde
  return ret;
Packit Service 82fcde
}
Packit Service 82fcde
#endif
Packit Service 82fcde
Packit Service 82fcde
static void add_request_to_runlist (struct requestlist *newrequest);
Packit Service 82fcde
Packit Service 82fcde
/* Pool of request list entries.  */
Packit Service 82fcde
static struct requestlist **pool;
Packit Service 82fcde
Packit Service 82fcde
/* Number of total and allocated pool entries.  */
Packit Service 82fcde
static size_t pool_max_size;
Packit Service 82fcde
static size_t pool_size;
Packit Service 82fcde
Packit Service 82fcde
/* We implement a two dimensional array but allocate each row separately.
Packit Service 82fcde
   The macro below determines how many entries should be used per row.
Packit Service 82fcde
   It should better be a power of two.  */
Packit Service 82fcde
#define ENTRIES_PER_ROW	32
Packit Service 82fcde
Packit Service 82fcde
/* How many rows we allocate at once.  */
Packit Service 82fcde
#define ROWS_STEP	8
Packit Service 82fcde
Packit Service 82fcde
/* List of available entries.  */
Packit Service 82fcde
static struct requestlist *freelist;
Packit Service 82fcde
Packit Service 82fcde
/* List of request waiting to be processed.  */
Packit Service 82fcde
static struct requestlist *runlist;
Packit Service 82fcde
Packit Service 82fcde
/* Structure list of all currently processed requests.  */
Packit Service 82fcde
static struct requestlist *requests;
Packit Service 82fcde
Packit Service 82fcde
/* Number of threads currently running.  */
Packit Service 82fcde
static int nthreads;
Packit Service 82fcde
Packit Service 82fcde
/* Number of threads waiting for work to arrive. */
Packit Service 82fcde
static int idle_thread_count;
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* These are the values used to optimize the use of AIO.  The user can
Packit Service 82fcde
   overwrite them by using the `aio_init' function.  */
Packit Service 82fcde
static struct aioinit optim =
Packit Service 82fcde
{
Packit Service 82fcde
  20,	/* int aio_threads;	Maximal number of threads.  */
Packit Service 82fcde
  64,	/* int aio_num;		Number of expected simultaneous requests. */
Packit Service 82fcde
  0,
Packit Service 82fcde
  0,
Packit Service 82fcde
  0,
Packit Service 82fcde
  0,
Packit Service 82fcde
  1,
Packit Service 82fcde
  0
Packit Service 82fcde
};
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* Since the list is global we need a mutex protecting it.  */
Packit Service 82fcde
pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
Packit Service 82fcde
Packit Service 82fcde
/* When you add a request to the list and there are idle threads present,
Packit Service 82fcde
   you signal this condition variable. When a thread finishes work, it waits
Packit Service 82fcde
   on this condition variable for a time before it actually exits. */
Packit Service 82fcde
pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* Functions to handle request list pool.  */
Packit Service 82fcde
static struct requestlist *
Packit Service 82fcde
get_elem (void)
Packit Service 82fcde
{
Packit Service 82fcde
  struct requestlist *result;
Packit Service 82fcde
Packit Service 82fcde
  if (freelist == NULL)
Packit Service 82fcde
    {
Packit Service 82fcde
      struct requestlist *new_row;
Packit Service 82fcde
      int cnt;
Packit Service 82fcde
Packit Service 82fcde
      assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
Packit Service 82fcde
Packit Service 82fcde
      if (pool_size + 1 >= pool_max_size)
Packit Service 82fcde
	{
Packit Service 82fcde
	  size_t new_max_size = pool_max_size + ROWS_STEP;
Packit Service 82fcde
	  struct requestlist **new_tab;
Packit Service 82fcde
Packit Service 82fcde
	  new_tab = (struct requestlist **)
Packit Service 82fcde
	    realloc (pool, new_max_size * sizeof (struct requestlist *));
Packit Service 82fcde
Packit Service 82fcde
	  if (new_tab == NULL)
Packit Service 82fcde
	    return NULL;
Packit Service 82fcde
Packit Service 82fcde
	  pool_max_size = new_max_size;
Packit Service 82fcde
	  pool = new_tab;
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      /* Allocate the new row.  */
Packit Service 82fcde
      cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
Packit Service 82fcde
      new_row = (struct requestlist *) calloc (cnt,
Packit Service 82fcde
					       sizeof (struct requestlist));
Packit Service 82fcde
      if (new_row == NULL)
Packit Service 82fcde
	return NULL;
Packit Service 82fcde
Packit Service 82fcde
      pool[pool_size++] = new_row;
Packit Service 82fcde
Packit Service 82fcde
      /* Put all the new entries in the freelist.  */
Packit Service 82fcde
      do
Packit Service 82fcde
	{
Packit Service 82fcde
	  new_row->next_prio = freelist;
Packit Service 82fcde
	  freelist = new_row++;
Packit Service 82fcde
	}
Packit Service 82fcde
      while (--cnt > 0);
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  result = freelist;
Packit Service 82fcde
  freelist = freelist->next_prio;
Packit Service 82fcde
Packit Service 82fcde
  return result;
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
void
Packit Service 82fcde
__aio_free_request (struct requestlist *elem)
Packit Service 82fcde
{
Packit Service 82fcde
  elem->running = no;
Packit Service 82fcde
  elem->next_prio = freelist;
Packit Service 82fcde
  freelist = elem;
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
struct requestlist *
Packit Service 82fcde
__aio_find_req (aiocb_union *elem)
Packit Service 82fcde
{
Packit Service 82fcde
  struct requestlist *runp = requests;
Packit Service 82fcde
  int fildes = elem->aiocb.aio_fildes;
Packit Service 82fcde
Packit Service 82fcde
  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
Packit Service 82fcde
    runp = runp->next_fd;
Packit Service 82fcde
Packit Service 82fcde
  if (runp != NULL)
Packit Service 82fcde
    {
Packit Service 82fcde
      if (runp->aiocbp->aiocb.aio_fildes != fildes)
Packit Service 82fcde
	runp = NULL;
Packit Service 82fcde
      else
Packit Service 82fcde
	while (runp != NULL && runp->aiocbp != elem)
Packit Service 82fcde
	  runp = runp->next_prio;
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  return runp;
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
struct requestlist *
Packit Service 82fcde
__aio_find_req_fd (int fildes)
Packit Service 82fcde
{
Packit Service 82fcde
  struct requestlist *runp = requests;
Packit Service 82fcde
Packit Service 82fcde
  while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
Packit Service 82fcde
    runp = runp->next_fd;
Packit Service 82fcde
Packit Service 82fcde
  return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
Packit Service 82fcde
	  ? runp : NULL);
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
void
Packit Service 82fcde
__aio_remove_request (struct requestlist *last, struct requestlist *req,
Packit Service 82fcde
		      int all)
Packit Service 82fcde
{
Packit Service 82fcde
  assert (req->running == yes || req->running == queued
Packit Service 82fcde
	  || req->running == done);
Packit Service 82fcde
Packit Service 82fcde
  if (last != NULL)
Packit Service 82fcde
    last->next_prio = all ? NULL : req->next_prio;
Packit Service 82fcde
  else
Packit Service 82fcde
    {
Packit Service 82fcde
      if (all || req->next_prio == NULL)
Packit Service 82fcde
	{
Packit Service 82fcde
	  if (req->last_fd != NULL)
Packit Service 82fcde
	    req->last_fd->next_fd = req->next_fd;
Packit Service 82fcde
	  else
Packit Service 82fcde
	    requests = req->next_fd;
Packit Service 82fcde
	  if (req->next_fd != NULL)
Packit Service 82fcde
	    req->next_fd->last_fd = req->last_fd;
Packit Service 82fcde
	}
Packit Service 82fcde
      else
Packit Service 82fcde
	{
Packit Service 82fcde
	  if (req->last_fd != NULL)
Packit Service 82fcde
	    req->last_fd->next_fd = req->next_prio;
Packit Service 82fcde
	  else
Packit Service 82fcde
	    requests = req->next_prio;
Packit Service 82fcde
Packit Service 82fcde
	  if (req->next_fd != NULL)
Packit Service 82fcde
	    req->next_fd->last_fd = req->next_prio;
Packit Service 82fcde
Packit Service 82fcde
	  req->next_prio->last_fd = req->last_fd;
Packit Service 82fcde
	  req->next_prio->next_fd = req->next_fd;
Packit Service 82fcde
Packit Service 82fcde
	  /* Mark this entry as runnable.  */
Packit Service 82fcde
	  req->next_prio->running = yes;
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      if (req->running == yes)
Packit Service 82fcde
	{
Packit Service 82fcde
	  struct requestlist *runp = runlist;
Packit Service 82fcde
Packit Service 82fcde
	  last = NULL;
Packit Service 82fcde
	  while (runp != NULL)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      if (runp == req)
Packit Service 82fcde
		{
Packit Service 82fcde
		  if (last == NULL)
Packit Service 82fcde
		    runlist = runp->next_run;
Packit Service 82fcde
		  else
Packit Service 82fcde
		    last->next_run = runp->next_run;
Packit Service 82fcde
		  break;
Packit Service 82fcde
		}
Packit Service 82fcde
	      last = runp;
Packit Service 82fcde
	      runp = runp->next_run;
Packit Service 82fcde
	    }
Packit Service 82fcde
	}
Packit Service 82fcde
    }
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* The thread handler.  */
Packit Service 82fcde
static void *handle_fildes_io (void *arg);
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* User optimization.  */
Packit Service 82fcde
void
Packit Service 82fcde
__aio_init (const struct aioinit *init)
Packit Service 82fcde
{
Packit Service 82fcde
  /* Get the mutex.  */
Packit Service 82fcde
  pthread_mutex_lock (&__aio_requests_mutex);
Packit Service 82fcde
Packit Service 82fcde
  /* Only allow writing new values if the table is not yet allocated.  */
Packit Service 82fcde
  if (pool == NULL)
Packit Service 82fcde
    {
Packit Service 82fcde
      optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
Packit Service 82fcde
      assert (powerof2 (ENTRIES_PER_ROW));
Packit Service 82fcde
      optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
Packit Service 82fcde
		       ? ENTRIES_PER_ROW
Packit Service 82fcde
		       : init->aio_num & ~(ENTRIES_PER_ROW - 1));
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  if (init->aio_idle_time != 0)
Packit Service 82fcde
    optim.aio_idle_time = init->aio_idle_time;
Packit Service 82fcde
Packit Service 82fcde
  /* Release the mutex.  */
Packit Service 82fcde
  pthread_mutex_unlock (&__aio_requests_mutex);
Packit Service 82fcde
}
Packit Service 82fcde
weak_alias (__aio_init, aio_init)
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* The main function of the async I/O handling.  It enqueues requests
Packit Service 82fcde
   and if necessary starts and handles threads.  */
Packit Service 82fcde
struct requestlist *
Packit Service 82fcde
__aio_enqueue_request (aiocb_union *aiocbp, int operation)
Packit Service 82fcde
{
Packit Service 82fcde
  int result = 0;
Packit Service 82fcde
  int policy, prio;
Packit Service 82fcde
  struct sched_param param;
Packit Service 82fcde
  struct requestlist *last, *runp, *newp;
Packit Service 82fcde
  int running = no;
Packit Service 82fcde
Packit Service 82fcde
  if (operation == LIO_SYNC || operation == LIO_DSYNC)
Packit Service 82fcde
    aiocbp->aiocb.aio_reqprio = 0;
Packit Service 82fcde
  else if (aiocbp->aiocb.aio_reqprio < 0
Packit Service 82fcde
#ifdef AIO_PRIO_DELTA_MAX
Packit Service 82fcde
	   || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX
Packit Service 82fcde
#endif
Packit Service 82fcde
	   )
Packit Service 82fcde
    {
Packit Service 82fcde
      /* Invalid priority value.  */
Packit Service 82fcde
      __set_errno (EINVAL);
Packit Service 82fcde
      aiocbp->aiocb.__error_code = EINVAL;
Packit Service 82fcde
      aiocbp->aiocb.__return_value = -1;
Packit Service 82fcde
      return NULL;
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  /* Compute priority for this request.  */
Packit Service 82fcde
  pthread_getschedparam (pthread_self (), &policy, ¶m;;
Packit Service 82fcde
  prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
Packit Service 82fcde
Packit Service 82fcde
  /* Get the mutex.  */
Packit Service 82fcde
  pthread_mutex_lock (&__aio_requests_mutex);
Packit Service 82fcde
Packit Service 82fcde
  last = NULL;
Packit Service 82fcde
  runp = requests;
Packit Service 82fcde
  /* First look whether the current file descriptor is currently
Packit Service 82fcde
     worked with.  */
Packit Service 82fcde
  while (runp != NULL
Packit Service 82fcde
	 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
Packit Service 82fcde
    {
Packit Service 82fcde
      last = runp;
Packit Service 82fcde
      runp = runp->next_fd;
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  /* Get a new element for the waiting list.  */
Packit Service 82fcde
  newp = get_elem ();
Packit Service 82fcde
  if (newp == NULL)
Packit Service 82fcde
    {
Packit Service 82fcde
      pthread_mutex_unlock (&__aio_requests_mutex);
Packit Service 82fcde
      __set_errno (EAGAIN);
Packit Service 82fcde
      return NULL;
Packit Service 82fcde
    }
Packit Service 82fcde
  newp->aiocbp = aiocbp;
Packit Service 82fcde
  newp->waiting = NULL;
Packit Service 82fcde
Packit Service 82fcde
  aiocbp->aiocb.__abs_prio = prio;
Packit Service 82fcde
  aiocbp->aiocb.__policy = policy;
Packit Service 82fcde
  aiocbp->aiocb.aio_lio_opcode = operation;
Packit Service 82fcde
  aiocbp->aiocb.__error_code = EINPROGRESS;
Packit Service 82fcde
  aiocbp->aiocb.__return_value = 0;
Packit Service 82fcde
Packit Service 82fcde
  if (runp != NULL
Packit Service 82fcde
      && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
Packit Service 82fcde
    {
Packit Service 82fcde
      /* The current file descriptor is worked on.  It makes no sense
Packit Service 82fcde
	 to start another thread since this new thread would fight
Packit Service 82fcde
	 with the running thread for the resources.  But we also cannot
Packit Service 82fcde
	 say that the thread processing this desriptor shall immediately
Packit Service 82fcde
	 after finishing the current job process this request if there
Packit Service 82fcde
	 are other threads in the running queue which have a higher
Packit Service 82fcde
	 priority.  */
Packit Service 82fcde
Packit Service 82fcde
      /* Simply enqueue it after the running one according to the
Packit Service 82fcde
	 priority.  */
Packit Service 82fcde
      last = NULL;
Packit Service 82fcde
      while (runp->next_prio != NULL
Packit Service 82fcde
	     && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
Packit Service 82fcde
	{
Packit Service 82fcde
	  last = runp;
Packit Service 82fcde
	  runp = runp->next_prio;
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      newp->next_prio = runp->next_prio;
Packit Service 82fcde
      runp->next_prio = newp;
Packit Service 82fcde
Packit Service 82fcde
      running = queued;
Packit Service 82fcde
    }
Packit Service 82fcde
  else
Packit Service 82fcde
    {
Packit Service 82fcde
      running = yes;
Packit Service 82fcde
      /* Enqueue this request for a new descriptor.  */
Packit Service 82fcde
      if (last == NULL)
Packit Service 82fcde
	{
Packit Service 82fcde
	  newp->last_fd = NULL;
Packit Service 82fcde
	  newp->next_fd = requests;
Packit Service 82fcde
	  if (requests != NULL)
Packit Service 82fcde
	    requests->last_fd = newp;
Packit Service 82fcde
	  requests = newp;
Packit Service 82fcde
	}
Packit Service 82fcde
      else
Packit Service 82fcde
	{
Packit Service 82fcde
	  newp->next_fd = last->next_fd;
Packit Service 82fcde
	  newp->last_fd = last;
Packit Service 82fcde
	  last->next_fd = newp;
Packit Service 82fcde
	  if (newp->next_fd != NULL)
Packit Service 82fcde
	    newp->next_fd->last_fd = newp;
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      newp->next_prio = NULL;
Packit Service 82fcde
      last = NULL;
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  if (running == yes)
Packit Service 82fcde
    {
Packit Service 82fcde
      /* We try to create a new thread for this file descriptor.  The
Packit Service 82fcde
	 function which gets called will handle all available requests
Packit Service 82fcde
	 for this descriptor and when all are processed it will
Packit Service 82fcde
	 terminate.
Packit Service 82fcde
Packit Service 82fcde
	 If no new thread can be created or if the specified limit of
Packit Service 82fcde
	 threads for AIO is reached we queue the request.  */
Packit Service 82fcde
Packit Service 82fcde
      /* See if we need to and are able to create a thread.  */
Packit Service 82fcde
      if (nthreads < optim.aio_threads && idle_thread_count == 0)
Packit Service 82fcde
	{
Packit Service 82fcde
	  pthread_t thid;
Packit Service 82fcde
Packit Service 82fcde
	  running = newp->running = allocated;
Packit Service 82fcde
Packit Service 82fcde
	  /* Now try to start a thread.  */
Packit Service 82fcde
	  result = aio_create_helper_thread (&thid, handle_fildes_io, newp);
Packit Service 82fcde
	  if (result == 0)
Packit Service 82fcde
	    /* We managed to enqueue the request.  All errors which can
Packit Service 82fcde
	       happen now can be recognized by calls to `aio_return' and
Packit Service 82fcde
	       `aio_error'.  */
Packit Service 82fcde
	    ++nthreads;
Packit Service 82fcde
	  else
Packit Service 82fcde
	    {
Packit Service 82fcde
	      /* Reset the running flag.  The new request is not running.  */
Packit Service 82fcde
	      running = newp->running = yes;
Packit Service 82fcde
Packit Service 82fcde
	      if (nthreads == 0)
Packit Service 82fcde
		{
Packit Service 82fcde
		  /* We cannot create a thread in the moment and there is
Packit Service 82fcde
		     also no thread running.  This is a problem.  `errno' is
Packit Service 82fcde
		     set to EAGAIN if this is only a temporary problem.  */
Packit Service 82fcde
		  __aio_remove_request (last, newp, 0);
Packit Service 82fcde
		}
Packit Service 82fcde
	      else
Packit Service 82fcde
		result = 0;
Packit Service 82fcde
	    }
Packit Service 82fcde
	}
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  /* Enqueue the request in the run queue if it is not yet running.  */
Packit Service 82fcde
  if (running == yes && result == 0)
Packit Service 82fcde
    {
Packit Service 82fcde
      add_request_to_runlist (newp);
Packit Service 82fcde
Packit Service 82fcde
      /* If there is a thread waiting for work, then let it know that we
Packit Service 82fcde
	 have just given it something to do. */
Packit Service 82fcde
      if (idle_thread_count > 0)
Packit Service 82fcde
	pthread_cond_signal (&__aio_new_request_notification);
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  if (result == 0)
Packit Service 82fcde
    newp->running = running;
Packit Service 82fcde
  else
Packit Service 82fcde
    {
Packit Service 82fcde
      /* Something went wrong.  */
Packit Service 82fcde
      __aio_free_request (newp);
Packit Service 82fcde
      aiocbp->aiocb.__error_code = result;
Packit Service 82fcde
      __set_errno (result);
Packit Service 82fcde
      newp = NULL;
Packit Service 82fcde
    }
Packit Service 82fcde
Packit Service 82fcde
  /* Release the mutex.  */
Packit Service 82fcde
  pthread_mutex_unlock (&__aio_requests_mutex);
Packit Service 82fcde
Packit Service 82fcde
  return newp;
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
static void *
Packit Service 82fcde
handle_fildes_io (void *arg)
Packit Service 82fcde
{
Packit Service 82fcde
  pthread_t self = pthread_self ();
Packit Service 82fcde
  struct sched_param param;
Packit Service 82fcde
  struct requestlist *runp = (struct requestlist *) arg;
Packit Service 82fcde
  aiocb_union *aiocbp;
Packit Service 82fcde
  int policy;
Packit Service 82fcde
  int fildes;
Packit Service 82fcde
Packit Service 82fcde
  pthread_getschedparam (self, &policy, ¶m;;
Packit Service 82fcde
Packit Service 82fcde
  do
Packit Service 82fcde
    {
Packit Service 82fcde
      /* If runp is NULL, then we were created to service the work queue
Packit Service 82fcde
	 in general, not to handle any particular request. In that case we
Packit Service 82fcde
	 skip the "do work" stuff on the first pass, and go directly to the
Packit Service 82fcde
	 "get work off the work queue" part of this loop, which is near the
Packit Service 82fcde
	 end. */
Packit Service 82fcde
      if (runp == NULL)
Packit Service 82fcde
	pthread_mutex_lock (&__aio_requests_mutex);
Packit Service 82fcde
      else
Packit Service 82fcde
	{
Packit Service 82fcde
	  /* Hopefully this request is marked as running.  */
Packit Service 82fcde
	  assert (runp->running == allocated);
Packit Service 82fcde
Packit Service 82fcde
	  /* Update our variables.  */
Packit Service 82fcde
	  aiocbp = runp->aiocbp;
Packit Service 82fcde
	  fildes = aiocbp->aiocb.aio_fildes;
Packit Service 82fcde
Packit Service 82fcde
	  /* Change the priority to the requested value (if necessary).  */
Packit Service 82fcde
	  if (aiocbp->aiocb.__abs_prio != param.sched_priority
Packit Service 82fcde
	      || aiocbp->aiocb.__policy != policy)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      param.sched_priority = aiocbp->aiocb.__abs_prio;
Packit Service 82fcde
	      policy = aiocbp->aiocb.__policy;
Packit Service 82fcde
	      pthread_setschedparam (self, policy, ¶m;;
Packit Service 82fcde
	    }
Packit Service 82fcde
Packit Service 82fcde
	  /* Process request pointed to by RUNP.  We must not be disturbed
Packit Service 82fcde
	     by signals.  */
Packit Service 82fcde
	  if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      if (sizeof (off_t) != sizeof (off64_t)
Packit Service 82fcde
		  && aiocbp->aiocb.aio_lio_opcode & 128)
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
Packit Service 82fcde
						 aiocbp->aiocb64.aio_buf,
Packit Service 82fcde
						 aiocbp->aiocb64.aio_nbytes,
Packit Service 82fcde
						 aiocbp->aiocb64.aio_offset));
Packit Service 82fcde
	      else
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (__libc_pread (fildes,
Packit Service 82fcde
						    (void *)
Packit Service 82fcde
						    aiocbp->aiocb.aio_buf,
Packit Service 82fcde
						    aiocbp->aiocb.aio_nbytes,
Packit Service 82fcde
						    aiocbp->aiocb.aio_offset));
Packit Service 82fcde
Packit Service 82fcde
	      if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
Packit Service 82fcde
		/* The Linux kernel is different from others.  It returns
Packit Service 82fcde
		   ESPIPE if using pread on a socket.  Other platforms
Packit Service 82fcde
		   simply ignore the offset parameter and behave like
Packit Service 82fcde
		   read.  */
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (read (fildes,
Packit Service 82fcde
					    (void *) aiocbp->aiocb64.aio_buf,
Packit Service 82fcde
					    aiocbp->aiocb64.aio_nbytes));
Packit Service 82fcde
	    }
Packit Service 82fcde
	  else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      if (sizeof (off_t) != sizeof (off64_t)
Packit Service 82fcde
		  && aiocbp->aiocb.aio_lio_opcode & 128)
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
Packit Service 82fcde
						  aiocbp->aiocb64.aio_buf,
Packit Service 82fcde
						  aiocbp->aiocb64.aio_nbytes,
Packit Service 82fcde
						  aiocbp->aiocb64.aio_offset));
Packit Service 82fcde
	      else
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (__libc_pwrite (fildes, (const void *)
Packit Service 82fcde
					      aiocbp->aiocb.aio_buf,
Packit Service 82fcde
					      aiocbp->aiocb.aio_nbytes,
Packit Service 82fcde
					      aiocbp->aiocb.aio_offset));
Packit Service 82fcde
Packit Service 82fcde
	      if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
Packit Service 82fcde
		/* The Linux kernel is different from others.  It returns
Packit Service 82fcde
		   ESPIPE if using pwrite on a socket.  Other platforms
Packit Service 82fcde
		   simply ignore the offset parameter and behave like
Packit Service 82fcde
		   write.  */
Packit Service 82fcde
		aiocbp->aiocb.__return_value =
Packit Service 82fcde
		  TEMP_FAILURE_RETRY (write (fildes,
Packit Service 82fcde
					     (void *) aiocbp->aiocb64.aio_buf,
Packit Service 82fcde
					     aiocbp->aiocb64.aio_nbytes));
Packit Service 82fcde
	    }
Packit Service 82fcde
	  else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
Packit Service 82fcde
	    aiocbp->aiocb.__return_value =
Packit Service 82fcde
	      TEMP_FAILURE_RETRY (fdatasync (fildes));
Packit Service 82fcde
	  else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
Packit Service 82fcde
	    aiocbp->aiocb.__return_value =
Packit Service 82fcde
	      TEMP_FAILURE_RETRY (fsync (fildes));
Packit Service 82fcde
	  else
Packit Service 82fcde
	    {
Packit Service 82fcde
	      /* This is an invalid opcode.  */
Packit Service 82fcde
	      aiocbp->aiocb.__return_value = -1;
Packit Service 82fcde
	      __set_errno (EINVAL);
Packit Service 82fcde
	    }
Packit Service 82fcde
Packit Service 82fcde
	  /* Get the mutex.  */
Packit Service 82fcde
	  pthread_mutex_lock (&__aio_requests_mutex);
Packit Service 82fcde
Packit Service 82fcde
	  if (aiocbp->aiocb.__return_value == -1)
Packit Service 82fcde
	    aiocbp->aiocb.__error_code = errno;
Packit Service 82fcde
	  else
Packit Service 82fcde
	    aiocbp->aiocb.__error_code = 0;
Packit Service 82fcde
Packit Service 82fcde
	  /* Send the signal to notify about finished processing of the
Packit Service 82fcde
	     request.  */
Packit Service 82fcde
	  __aio_notify (runp);
Packit Service 82fcde
Packit Service 82fcde
	  /* For debugging purposes we reset the running flag of the
Packit Service 82fcde
	     finished request.  */
Packit Service 82fcde
	  assert (runp->running == allocated);
Packit Service 82fcde
	  runp->running = done;
Packit Service 82fcde
Packit Service 82fcde
	  /* Now dequeue the current request.  */
Packit Service 82fcde
	  __aio_remove_request (NULL, runp, 0);
Packit Service 82fcde
	  if (runp->next_prio != NULL)
Packit Service 82fcde
	    add_request_to_runlist (runp->next_prio);
Packit Service 82fcde
Packit Service 82fcde
	  /* Free the old element.  */
Packit Service 82fcde
	  __aio_free_request (runp);
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      runp = runlist;
Packit Service 82fcde
Packit Service 82fcde
      /* If the runlist is empty, then we sleep for a while, waiting for
Packit Service 82fcde
	 something to arrive in it. */
Packit Service 82fcde
      if (runp == NULL && optim.aio_idle_time >= 0)
Packit Service 82fcde
	{
Packit Service 82fcde
	  struct timeval now;
Packit Service 82fcde
	  struct timespec wakeup_time;
Packit Service 82fcde
Packit Service 82fcde
	  ++idle_thread_count;
Packit Service 82fcde
	  __gettimeofday (&now, NULL);
Packit Service 82fcde
	  wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
Packit Service 82fcde
	  wakeup_time.tv_nsec = now.tv_usec * 1000;
Packit Service 82fcde
	  if (wakeup_time.tv_nsec >= 1000000000)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      wakeup_time.tv_nsec -= 1000000000;
Packit Service 82fcde
	      ++wakeup_time.tv_sec;
Packit Service 82fcde
	    }
Packit Service 82fcde
	  pthread_cond_timedwait (&__aio_new_request_notification,
Packit Service 82fcde
				  &__aio_requests_mutex,
Packit Service 82fcde
				  &wakeup_time);
Packit Service 82fcde
	  --idle_thread_count;
Packit Service 82fcde
	  runp = runlist;
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      if (runp == NULL)
Packit Service 82fcde
	--nthreads;
Packit Service 82fcde
      else
Packit Service 82fcde
	{
Packit Service 82fcde
	  assert (runp->running == yes);
Packit Service 82fcde
	  runp->running = allocated;
Packit Service 82fcde
	  runlist = runp->next_run;
Packit Service 82fcde
Packit Service 82fcde
	  /* If we have a request to process, and there's still another in
Packit Service 82fcde
	     the run list, then we need to either wake up or create a new
Packit Service 82fcde
	     thread to service the request that is still in the run list. */
Packit Service 82fcde
	  if (runlist != NULL)
Packit Service 82fcde
	    {
Packit Service 82fcde
	      /* There are at least two items in the work queue to work on.
Packit Service 82fcde
		 If there are other idle threads, then we should wake them
Packit Service 82fcde
		 up for these other work elements; otherwise, we should try
Packit Service 82fcde
		 to create a new thread. */
Packit Service 82fcde
	      if (idle_thread_count > 0)
Packit Service 82fcde
		pthread_cond_signal (&__aio_new_request_notification);
Packit Service 82fcde
	      else if (nthreads < optim.aio_threads)
Packit Service 82fcde
		{
Packit Service 82fcde
		  pthread_t thid;
Packit Service 82fcde
		  pthread_attr_t attr;
Packit Service 82fcde
Packit Service 82fcde
		  /* Make sure the thread is created detached.  */
Packit Service 82fcde
		  pthread_attr_init (&attr);
Packit Service 82fcde
		  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
Packit Service 82fcde
Packit Service 82fcde
		  /* Now try to start a thread. If we fail, no big deal,
Packit Service 82fcde
		     because we know that there is at least one thread (us)
Packit Service 82fcde
		     that is working on AIO operations. */
Packit Service 82fcde
		  if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
Packit Service 82fcde
		      == 0)
Packit Service 82fcde
		    ++nthreads;
Packit Service 82fcde
		}
Packit Service 82fcde
	    }
Packit Service 82fcde
	}
Packit Service 82fcde
Packit Service 82fcde
      /* Release the mutex.  */
Packit Service 82fcde
      pthread_mutex_unlock (&__aio_requests_mutex);
Packit Service 82fcde
    }
Packit Service 82fcde
  while (runp != NULL);
Packit Service 82fcde
Packit Service 82fcde
  return NULL;
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* Free allocated resources.  */
Packit Service 82fcde
libc_freeres_fn (free_res)
Packit Service 82fcde
{
Packit Service 82fcde
  size_t row;
Packit Service 82fcde
Packit Service 82fcde
  for (row = 0; row < pool_max_size; ++row)
Packit Service 82fcde
    free (pool[row]);
Packit Service 82fcde
Packit Service 82fcde
  free (pool);
Packit Service 82fcde
}
Packit Service 82fcde
Packit Service 82fcde
Packit Service 82fcde
/* Add newrequest to the runlist. The __abs_prio flag of newrequest must
Packit Service 82fcde
   be correctly set to do this. Also, you had better set newrequest's
Packit Service 82fcde
   "running" flag to "yes" before you release your lock or you'll throw an
Packit Service 82fcde
   assertion. */
Packit Service 82fcde
static void
Packit Service 82fcde
add_request_to_runlist (struct requestlist *newrequest)
Packit Service 82fcde
{
Packit Service 82fcde
  int prio = newrequest->aiocbp->aiocb.__abs_prio;
Packit Service 82fcde
  struct requestlist *runp;
Packit Service 82fcde
Packit Service 82fcde
  if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
Packit Service 82fcde
    {
Packit Service 82fcde
      newrequest->next_run = runlist;
Packit Service 82fcde
      runlist = newrequest;
Packit Service 82fcde
    }
Packit Service 82fcde
  else
Packit Service 82fcde
    {
Packit Service 82fcde
      runp = runlist;
Packit Service 82fcde
Packit Service 82fcde
      while (runp->next_run != NULL
Packit Service 82fcde
	     && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
Packit Service 82fcde
	runp = runp->next_run;
Packit Service 82fcde
Packit Service 82fcde
      newrequest->next_run = runp->next_run;
Packit Service 82fcde
      runp->next_run = newrequest;
Packit Service 82fcde
    }
Packit Service 82fcde
}