Blame source/uds/requestQueueKernel.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/uds-releases/jasper/kernelLinux/uds/requestQueueKernel.c#3 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "requestQueue.h"
Packit Service 310c69
Packit Service 310c69
#include <linux/wait.h>
Packit Service 310c69
Packit Service 310c69
#include "atomicDefs.h"
Packit Service 310c69
#include "compiler.h"
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "request.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "threads.h"
Packit Service 310c69
#include "util/funnelQueue.h"
Packit Service 310c69
Packit Service 310c69
/*
Packit Service 310c69
 * Ordering:
Packit Service 310c69
 *
Packit Service 310c69
 * Multiple retry requests or multiple non-retry requests enqueued from
Packit Service 310c69
 * a single producer thread will be processed in the order enqueued.
Packit Service 310c69
 *
Packit Service 310c69
 * Retry requests will generally be processed before normal requests.
Packit Service 310c69
 *
Packit Service 310c69
 * HOWEVER, a producer thread can enqueue a retry request (generally given
Packit Service 310c69
 * higher priority) and then enqueue a normal request, and they can get
Packit Service 310c69
 * processed in the reverse order.  The checking of the two internal queues is
Packit Service 310c69
 * very simple and there's a potential race with the producer regarding the
Packit Service 310c69
 * "priority" handling.  If an ordering guarantee is needed, it can be added
Packit Service 310c69
 * without much difficulty, it just makes the code a bit more complicated.
Packit Service 310c69
 *
Packit Service 310c69
 * If requests are enqueued while the processing of another request is
Packit Service 310c69
 * happening, and the enqueuing operations complete while the request
Packit Service 310c69
 * processing is still in progress, then the retry request(s) *will*
Packit Service 310c69
 * get processed next.  (This is used for testing.)
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Time constants, all in units of nanoseconds.
Packit Service 310c69
 **/
Packit Service 310c69
enum {
Packit Service 310c69
  ONE_NANOSECOND    =    1,
Packit Service 310c69
  ONE_MICROSECOND   = 1000 * ONE_NANOSECOND,
Packit Service 310c69
  ONE_MILLISECOND   = 1000 * ONE_MICROSECOND,
Packit Service 310c69
  ONE_SECOND        = 1000 * ONE_MILLISECOND,
Packit Service 310c69
Packit Service 310c69
  /** The initial time to wait after waiting with no timeout */
Packit Service 310c69
  DEFAULT_WAIT_TIME = 20 * ONE_MICROSECOND,
Packit Service 310c69
Packit Service 310c69
  /** The minimum time to wait when waiting with a timeout */
Packit Service 310c69
  MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
Packit Service 310c69
Packit Service 310c69
  /** The maximimum time to wait when waiting with a timeout */
Packit Service 310c69
  MAXIMUM_WAIT_TIME = ONE_MILLISECOND
Packit Service 310c69
};
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Batch size tuning constants. These are compared to the number of requests
Packit Service 310c69
 * that have been processed since the worker thread last woke up.
Packit Service 310c69
 **/
Packit Service 310c69
enum {
Packit Service 310c69
  MINIMUM_BATCH = 32,  // wait time increases if batches are smaller than this
Packit Service 310c69
  MAXIMUM_BATCH = 64   // wait time decreases if batches are larger than this
Packit Service 310c69
};
Packit Service 310c69
Packit Service 310c69
struct requestQueue {
Packit Service 310c69
  /* Wait queue for synchronizing producers and consumer */
Packit Service 310c69
  struct wait_queue_head  wqhead;
Packit Service 310c69
  /* function to process 1 request */
Packit Service 310c69
  RequestQueueProcessor  *processOne;
Packit Service 310c69
  /* new incoming requests */
Packit Service 310c69
  FunnelQueue            *mainQueue;
Packit Service 310c69
  /* old requests to retry first */
Packit Service 310c69
  FunnelQueue            *retryQueue;
Packit Service 310c69
  /* thread id of the worker thread */
Packit Service 310c69
  Thread                  thread;
Packit Service 310c69
  /* true if the worker was started */
Packit Service 310c69
  bool                    started;
Packit Service 310c69
  /* when true, requests can be enqueued */
Packit Service 310c69
  bool                    alive;
Packit Service 310c69
  /* A flag set when the worker is waiting without a timeout */
Packit Service 310c69
  atomic_t                dormant;
Packit Service 310c69
};
Packit Service 310c69
Packit Service 310c69
/*****************************************************************************/
Packit Service 310c69
/**
Packit Service 310c69
 * Poll the underlying lock-free queues for a request to process.  Must only be
Packit Service 310c69
 * called by the worker thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param queue  the RequestQueue being serviced
Packit Service 310c69
 *
Packit Service 310c69
 * @return a dequeued request, or NULL if no request was available
Packit Service 310c69
 **/
Packit Service 310c69
static INLINE Request *pollQueues(RequestQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  // The retry queue has higher priority.
Packit Service 310c69
  FunnelQueueEntry *entry = funnelQueuePoll(queue->retryQueue);
Packit Service 310c69
  if (entry != NULL) {
Packit Service 310c69
    return container_of(entry, Request, requestQueueLink);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // The main queue has lower priority.
Packit Service 310c69
  entry = funnelQueuePoll(queue->mainQueue);
Packit Service 310c69
  if (entry != NULL) {
Packit Service 310c69
    return container_of(entry, Request, requestQueueLink);
Packit Service 310c69
  }
Packit Service 310c69
  
Packit Service 310c69
  // No entry found.
Packit Service 310c69
  return NULL;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/*****************************************************************************/
Packit Service 310c69
/**
Packit Service 310c69
 * Check if the underlying lock-free queues appear not just not to have any
Packit Service 310c69
 * requests available right now, but also not to be in the intermediate state
Packit Service 310c69
 * of getting requests added. Must only be called by the worker thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param queue  the RequestQueue being serviced
Packit Service 310c69
 *
Packit Service 310c69
 * @return true iff both funnel queues are idle
Packit Service 310c69
 **/
Packit Service 310c69
static INLINE bool areQueuesIdle(RequestQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  return (isFunnelQueueIdle(queue->retryQueue) &&
Packit Service 310c69
          isFunnelQueueIdle(queue->mainQueue));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/*****************************************************************************/
Packit Service 310c69
/**
Packit Service 310c69
 * Remove the next request to be processed from the queue.  Must only be called
Packit Service 310c69
 * by the worker thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param queue       the queue from which to remove an entry
Packit Service 310c69
 * @param requestPtr  the next request is returned here, or a NULL pointer to
Packit Service 310c69
 *                    indicate that there will be no more requests
Packit Service 310c69
 * @param waitedPtr   return a boolean to indicate that we need to wait
Packit Service 310c69
 *
Packit Service 310c69
 * @return True when there is a next request, or when we know that there will
Packit Service 310c69
 *         never be another request.  False when we must wait for a request.
Packit Service 310c69
 **/
Packit Service 310c69
static INLINE bool dequeueRequest(RequestQueue  *queue,
Packit Service 310c69
                                  Request      **requestPtr,
Packit Service 310c69
                                  bool          *waitedPtr)
Packit Service 310c69
{
Packit Service 310c69
  // Because of batching, we expect this to be the most common code path.
Packit Service 310c69
  Request *request = pollQueues(queue);
Packit Service 310c69
  if (request != NULL) {
Packit Service 310c69
    // Return because we found a request
Packit Service 310c69
    *requestPtr = request;
Packit Service 310c69
    return true;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (!READ_ONCE(queue->alive)) {
Packit Service 310c69
    // Return because we see that shutdown is happening
Packit Service 310c69
    *requestPtr = NULL;
Packit Service 310c69
    return true;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Return indicating that we need to wait.
Packit Service 310c69
  *requestPtr = NULL;
Packit Service 310c69
  *waitedPtr = true;
Packit Service 310c69
  return false;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/*****************************************************************************/
Packit Service 310c69
static void requestQueueWorker(void *arg)
Packit Service 310c69
{
Packit Service 310c69
  RequestQueue *queue = (RequestQueue *) arg;
Packit Service 310c69
  unsigned long timeBatch = DEFAULT_WAIT_TIME;
Packit Service 310c69
  bool dormant = atomic_read(&queue->dormant);
Packit Service 310c69
  long currentBatch = 0;
Packit Service 310c69
Packit Service 310c69
  for (;;) {
Packit Service 310c69
    Request *request;
Packit Service 310c69
    bool waited = false;
Packit Service 310c69
    if (dormant) {
Packit Service 310c69
      /*
Packit Service 310c69
       * Sleep/wakeup protocol:
Packit Service 310c69
       *
Packit Service 310c69
       * The enqueue operation updates "newest" in the
Packit Service 310c69
       * funnel queue via xchg which is a memory barrier,
Packit Service 310c69
       * and later checks "dormant" to decide whether to do
Packit Service 310c69
       * a wakeup of the worker thread.
Packit Service 310c69
       *
Packit Service 310c69
       * The worker thread, when deciding to go to sleep,
Packit Service 310c69
       * sets "dormant" and then examines "newest" to decide
Packit Service 310c69
       * if the funnel queue is idle. In dormant mode, the
Packit Service 310c69
       * last examination of "newest" before going to sleep
Packit Service 310c69
       * is done inside the wait_event_interruptible macro,
Packit Service 310c69
       * after a point where (one or more) memory barriers
Packit Service 310c69
       * have been issued. (Preparing to sleep uses spin
Packit Service 310c69
       * locks.) Even if the "next" field update isn't
Packit Service 310c69
       * visible yet to make the entry accessible, its
Packit Service 310c69
       * existence will kick the worker thread out of
Packit Service 310c69
       * dormant mode and back into timer-based mode.
Packit Service 310c69
       *
Packit Service 310c69
       * So the two threads should agree on the ordering of
Packit Service 310c69
       * the updating of the two fields.
Packit Service 310c69
       */
Packit Service 310c69
      wait_event_interruptible(queue->wqhead,
Packit Service 310c69
                               dequeueRequest(queue, &request, &waited) ||
Packit Service 310c69
                               !areQueuesIdle(queue));
Packit Service 310c69
    } else {
Packit Service 310c69
      wait_event_interruptible_hrtimeout(queue->wqhead,
Packit Service 310c69
                                         dequeueRequest(queue, &request,
Packit Service 310c69
                                                        &waited),
Packit Service 310c69
                                         ns_to_ktime(timeBatch));
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (likely(request != NULL)) {
Packit Service 310c69
      // We got a request.
Packit Service 310c69
      currentBatch++;
Packit Service 310c69
      queue->processOne(request);
Packit Service 310c69
    } else if (!READ_ONCE(queue->alive)) {
Packit Service 310c69
      // We got no request and we know we are shutting down.
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (dormant) {
Packit Service 310c69
      // We've been roused from dormancy. Clear the flag so enqueuers can stop
Packit Service 310c69
      // broadcasting (no fence needed for this transition).
Packit Service 310c69
      atomic_set(&queue->dormant, false);
Packit Service 310c69
      dormant = false;
Packit Service 310c69
      // Reset the timeout back to the default since we don't know how long
Packit Service 310c69
      // we've been asleep and we also want to be responsive to a new burst.
Packit Service 310c69
      timeBatch = DEFAULT_WAIT_TIME;
Packit Service 310c69
    } else if (waited) {
Packit Service 310c69
      // We waited for this request to show up.  Adjust the wait time if the
Packit Service 310c69
      // last batch of requests was too small or too large..
Packit Service 310c69
      if (currentBatch < MINIMUM_BATCH) {
Packit Service 310c69
        // Adjust the wait time if the last batch of requests was too small.
Packit Service 310c69
        timeBatch += timeBatch / 4;
Packit Service 310c69
        if (timeBatch >= MAXIMUM_WAIT_TIME) {
Packit Service 310c69
          // The timeout is getting long enough that we need to switch into
Packit Service 310c69
          // dormant mode.
Packit Service 310c69
          atomic_set(&queue->dormant, true);
Packit Service 310c69
          dormant = true;
Packit Service 310c69
        }
Packit Service 310c69
      } else if (currentBatch > MAXIMUM_BATCH) {
Packit Service 310c69
        // Adjust the wait time if the last batch of requests was too large.
Packit Service 310c69
        timeBatch -= timeBatch / 4;
Packit Service 310c69
        if (timeBatch < MINIMUM_WAIT_TIME) {
Packit Service 310c69
          // But if the producer is very fast or the scheduler doesn't wake up
Packit Service 310c69
          // up promptly, waiting for very short times won't make the batches
Packit Service 310c69
          // smaller.
Packit Service 310c69
          timeBatch = MINIMUM_WAIT_TIME;
Packit Service 310c69
        }
Packit Service 310c69
      }
Packit Service 310c69
      // And we must now start a new batch count
Packit Service 310c69
      currentBatch = 0;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Ensure that we see any requests that were guaranteed to have been fully
Packit Service 310c69
   * enqueued before shutdown was flagged.  The corresponding write barrier
Packit Service 310c69
   * is in requestQueueFinish.
Packit Service 310c69
   */
Packit Service 310c69
  smp_rmb();
Packit Service 310c69
Packit Service 310c69
  // Process every request that is still in the queue, and never wait for any
Packit Service 310c69
  // new requests to show up.
Packit Service 310c69
  for (;;) {
Packit Service 310c69
    Request *request = pollQueues(queue);
Packit Service 310c69
    if (request == NULL) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
    queue->processOne(request);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeRequestQueue(const char             *queueName,
Packit Service 310c69
                     RequestQueueProcessor  *processOne,
Packit Service 310c69
                     RequestQueue          **queuePtr)
Packit Service 310c69
{
Packit Service 310c69
  RequestQueue *queue;
Packit Service 310c69
  int result = ALLOCATE(1, RequestQueue, __func__, &queue);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  queue->processOne = processOne;
Packit Service 310c69
  queue->alive      = true;
Packit Service 310c69
  atomic_set(&queue->dormant, false);
Packit Service 310c69
  init_waitqueue_head(&queue->wqhead);
Packit Service 310c69
Packit Service 310c69
  result = makeFunnelQueue(&queue->mainQueue);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    requestQueueFinish(queue);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = makeFunnelQueue(&queue->retryQueue);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    requestQueueFinish(queue);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = createThread(requestQueueWorker, queue, queueName, &queue->thread);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    requestQueueFinish(queue);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  queue->started = true;
Packit Service 310c69
  smp_mb();
Packit Service 310c69
  *queuePtr = queue;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE void wakeUpWorker(RequestQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  // This is the code sequence recommended in <linux/wait.h>
Packit Service 310c69
  smp_mb();
Packit Service 310c69
  if (waitqueue_active(&queue->wqhead)) {
Packit Service 310c69
    wake_up(&queue->wqhead);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void requestQueueEnqueue(RequestQueue *queue, Request *request)
Packit Service 310c69
{
Packit Service 310c69
  bool unbatched = request->unbatched;
Packit Service 310c69
  funnelQueuePut(request->requeued ? queue->retryQueue : queue->mainQueue,
Packit Service 310c69
                 &request->requestQueueLink);
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * We must wake the worker thread when it is dormant (waiting with no
Packit Service 310c69
   * timeout).  An atomic load (read fence) isn't needed here since we know the
Packit Service 310c69
   * queue operation acts as one.
Packit Service 310c69
   */
Packit Service 310c69
  if (atomic_read(&queue->dormant) || unbatched) {
Packit Service 310c69
    wakeUpWorker(queue);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void requestQueueFinish(RequestQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  if (queue == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * This memory barrier ensures that any requests we queued will be seen.  The
Packit Service 310c69
   * point is that when dequeueRequest sees the following update to the alive
Packit Service 310c69
   * flag, it will also be able to see any change we made to a next field in
Packit Service 310c69
   * the FunnelQueue entry.  The corresponding read barrier is in
Packit Service 310c69
   * requestQueueWorker.
Packit Service 310c69
   */
Packit Service 310c69
  smp_wmb();
Packit Service 310c69
Packit Service 310c69
  // Mark the queue as dead.
Packit Service 310c69
  WRITE_ONCE(queue->alive, false);
Packit Service 310c69
Packit Service 310c69
  if (queue->started) {
Packit Service 310c69
    // Wake the worker so it notices that it should exit.
Packit Service 310c69
    wakeUpWorker(queue);
Packit Service 310c69
Packit Service 310c69
    // Wait for the worker thread to finish processing any additional pending
Packit Service 310c69
    // work and exit.
Packit Service 310c69
    int result = joinThreads(queue->thread);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      logWarningWithStringError(result, "Failed to join worker thread");
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  freeFunnelQueue(queue->mainQueue);
Packit Service 310c69
  freeFunnelQueue(queue->retryQueue);
Packit Service 310c69
  FREE(queue);
Packit Service 310c69
}