/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/uds-releases/jasper/kernelLinux/uds/requestQueueKernel.c#3 $
*/
#include "requestQueue.h"
#include <linux/wait.h>
#include "atomicDefs.h"
#include "compiler.h"
#include "logger.h"
#include "request.h"
#include "memoryAlloc.h"
#include "threads.h"
#include "util/funnelQueue.h"
/*
* Ordering:
*
* Multiple retry requests or multiple non-retry requests enqueued from
* a single producer thread will be processed in the order enqueued.
*
* Retry requests will generally be processed before normal requests.
*
* HOWEVER, a producer thread can enqueue a retry request (generally given
* higher priority) and then enqueue a normal request, and they can get
* processed in the reverse order. The checking of the two internal queues is
* very simple and there's a potential race with the producer regarding the
* "priority" handling. If an ordering guarantee is needed, it can be added
* without much difficulty, it just makes the code a bit more complicated.
*
* If requests are enqueued while the processing of another request is
* happening, and the enqueuing operations complete while the request
* processing is still in progress, then the retry request(s) *will*
* get processed next. (This is used for testing.)
*/
/**
* Time constants, all in units of nanoseconds.
**/
enum {
ONE_NANOSECOND = 1,
ONE_MICROSECOND = 1000 * ONE_NANOSECOND,
ONE_MILLISECOND = 1000 * ONE_MICROSECOND,
ONE_SECOND = 1000 * ONE_MILLISECOND,
/** The initial time to wait after waiting with no timeout */
DEFAULT_WAIT_TIME = 20 * ONE_MICROSECOND,
/** The minimum time to wait when waiting with a timeout */
MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
/** The maximimum time to wait when waiting with a timeout */
MAXIMUM_WAIT_TIME = ONE_MILLISECOND
};
/**
* Batch size tuning constants. These are compared to the number of requests
* that have been processed since the worker thread last woke up.
**/
enum {
MINIMUM_BATCH = 32, // wait time increases if batches are smaller than this
MAXIMUM_BATCH = 64 // wait time decreases if batches are larger than this
};
struct requestQueue {
/* Wait queue for synchronizing producers and consumer */
struct wait_queue_head wqhead;
/* function to process 1 request */
RequestQueueProcessor *processOne;
/* new incoming requests */
FunnelQueue *mainQueue;
/* old requests to retry first */
FunnelQueue *retryQueue;
/* thread id of the worker thread */
Thread thread;
/* true if the worker was started */
bool started;
/* when true, requests can be enqueued */
bool alive;
/* A flag set when the worker is waiting without a timeout */
atomic_t dormant;
};
/*****************************************************************************/
/**
* Poll the underlying lock-free queues for a request to process. Must only be
* called by the worker thread.
*
* @param queue the RequestQueue being serviced
*
* @return a dequeued request, or NULL if no request was available
**/
static INLINE Request *pollQueues(RequestQueue *queue)
{
// The retry queue has higher priority.
FunnelQueueEntry *entry = funnelQueuePoll(queue->retryQueue);
if (entry != NULL) {
return container_of(entry, Request, requestQueueLink);
}
// The main queue has lower priority.
entry = funnelQueuePoll(queue->mainQueue);
if (entry != NULL) {
return container_of(entry, Request, requestQueueLink);
}
// No entry found.
return NULL;
}
/*****************************************************************************/
/**
* Check if the underlying lock-free queues appear not just not to have any
* requests available right now, but also not to be in the intermediate state
* of getting requests added. Must only be called by the worker thread.
*
* @param queue the RequestQueue being serviced
*
* @return true iff both funnel queues are idle
**/
static INLINE bool areQueuesIdle(RequestQueue *queue)
{
return (isFunnelQueueIdle(queue->retryQueue) &&
isFunnelQueueIdle(queue->mainQueue));
}
/*****************************************************************************/
/**
* Remove the next request to be processed from the queue. Must only be called
* by the worker thread.
*
* @param queue the queue from which to remove an entry
* @param requestPtr the next request is returned here, or a NULL pointer to
* indicate that there will be no more requests
* @param waitedPtr return a boolean to indicate that we need to wait
*
* @return True when there is a next request, or when we know that there will
* never be another request. False when we must wait for a request.
**/
static INLINE bool dequeueRequest(RequestQueue *queue,
Request **requestPtr,
bool *waitedPtr)
{
// Because of batching, we expect this to be the most common code path.
Request *request = pollQueues(queue);
if (request != NULL) {
// Return because we found a request
*requestPtr = request;
return true;
}
if (!READ_ONCE(queue->alive)) {
// Return because we see that shutdown is happening
*requestPtr = NULL;
return true;
}
// Return indicating that we need to wait.
*requestPtr = NULL;
*waitedPtr = true;
return false;
}
/*****************************************************************************/
static void requestQueueWorker(void *arg)
{
RequestQueue *queue = (RequestQueue *) arg;
unsigned long timeBatch = DEFAULT_WAIT_TIME;
bool dormant = atomic_read(&queue->dormant);
long currentBatch = 0;
for (;;) {
Request *request;
bool waited = false;
if (dormant) {
/*
* Sleep/wakeup protocol:
*
* The enqueue operation updates "newest" in the
* funnel queue via xchg which is a memory barrier,
* and later checks "dormant" to decide whether to do
* a wakeup of the worker thread.
*
* The worker thread, when deciding to go to sleep,
* sets "dormant" and then examines "newest" to decide
* if the funnel queue is idle. In dormant mode, the
* last examination of "newest" before going to sleep
* is done inside the wait_event_interruptible macro,
* after a point where (one or more) memory barriers
* have been issued. (Preparing to sleep uses spin
* locks.) Even if the "next" field update isn't
* visible yet to make the entry accessible, its
* existence will kick the worker thread out of
* dormant mode and back into timer-based mode.
*
* So the two threads should agree on the ordering of
* the updating of the two fields.
*/
wait_event_interruptible(queue->wqhead,
dequeueRequest(queue, &request, &waited) ||
!areQueuesIdle(queue));
} else {
wait_event_interruptible_hrtimeout(queue->wqhead,
dequeueRequest(queue, &request,
&waited),
ns_to_ktime(timeBatch));
}
if (likely(request != NULL)) {
// We got a request.
currentBatch++;
queue->processOne(request);
} else if (!READ_ONCE(queue->alive)) {
// We got no request and we know we are shutting down.
break;
}
if (dormant) {
// We've been roused from dormancy. Clear the flag so enqueuers can stop
// broadcasting (no fence needed for this transition).
atomic_set(&queue->dormant, false);
dormant = false;
// Reset the timeout back to the default since we don't know how long
// we've been asleep and we also want to be responsive to a new burst.
timeBatch = DEFAULT_WAIT_TIME;
} else if (waited) {
// We waited for this request to show up. Adjust the wait time if the
// last batch of requests was too small or too large..
if (currentBatch < MINIMUM_BATCH) {
// Adjust the wait time if the last batch of requests was too small.
timeBatch += timeBatch / 4;
if (timeBatch >= MAXIMUM_WAIT_TIME) {
// The timeout is getting long enough that we need to switch into
// dormant mode.
atomic_set(&queue->dormant, true);
dormant = true;
}
} else if (currentBatch > MAXIMUM_BATCH) {
// Adjust the wait time if the last batch of requests was too large.
timeBatch -= timeBatch / 4;
if (timeBatch < MINIMUM_WAIT_TIME) {
// But if the producer is very fast or the scheduler doesn't wake up
// up promptly, waiting for very short times won't make the batches
// smaller.
timeBatch = MINIMUM_WAIT_TIME;
}
}
// And we must now start a new batch count
currentBatch = 0;
}
}
/*
* Ensure that we see any requests that were guaranteed to have been fully
* enqueued before shutdown was flagged. The corresponding write barrier
* is in requestQueueFinish.
*/
smp_rmb();
// Process every request that is still in the queue, and never wait for any
// new requests to show up.
for (;;) {
Request *request = pollQueues(queue);
if (request == NULL) {
break;
}
queue->processOne(request);
}
}
/**********************************************************************/
int makeRequestQueue(const char *queueName,
RequestQueueProcessor *processOne,
RequestQueue **queuePtr)
{
RequestQueue *queue;
int result = ALLOCATE(1, RequestQueue, __func__, &queue);
if (result != UDS_SUCCESS) {
return result;
}
queue->processOne = processOne;
queue->alive = true;
atomic_set(&queue->dormant, false);
init_waitqueue_head(&queue->wqhead);
result = makeFunnelQueue(&queue->mainQueue);
if (result != UDS_SUCCESS) {
requestQueueFinish(queue);
return result;
}
result = makeFunnelQueue(&queue->retryQueue);
if (result != UDS_SUCCESS) {
requestQueueFinish(queue);
return result;
}
result = createThread(requestQueueWorker, queue, queueName, &queue->thread);
if (result != UDS_SUCCESS) {
requestQueueFinish(queue);
return result;
}
queue->started = true;
smp_mb();
*queuePtr = queue;
return UDS_SUCCESS;
}
/**********************************************************************/
static INLINE void wakeUpWorker(RequestQueue *queue)
{
// This is the code sequence recommended in <linux/wait.h>
smp_mb();
if (waitqueue_active(&queue->wqhead)) {
wake_up(&queue->wqhead);
}
}
/**********************************************************************/
void requestQueueEnqueue(RequestQueue *queue, Request *request)
{
bool unbatched = request->unbatched;
funnelQueuePut(request->requeued ? queue->retryQueue : queue->mainQueue,
&request->requestQueueLink);
/*
* We must wake the worker thread when it is dormant (waiting with no
* timeout). An atomic load (read fence) isn't needed here since we know the
* queue operation acts as one.
*/
if (atomic_read(&queue->dormant) || unbatched) {
wakeUpWorker(queue);
}
}
/**********************************************************************/
void requestQueueFinish(RequestQueue *queue)
{
if (queue == NULL) {
return;
}
/*
* This memory barrier ensures that any requests we queued will be seen. The
* point is that when dequeueRequest sees the following update to the alive
* flag, it will also be able to see any change we made to a next field in
* the FunnelQueue entry. The corresponding read barrier is in
* requestQueueWorker.
*/
smp_wmb();
// Mark the queue as dead.
WRITE_ONCE(queue->alive, false);
if (queue->started) {
// Wake the worker so it notices that it should exit.
wakeUpWorker(queue);
// Wait for the worker thread to finish processing any additional pending
// work and exit.
int result = joinThreads(queue->thread);
if (result != UDS_SUCCESS) {
logWarningWithStringError(result, "Failed to join worker thread");
}
}
freeFunnelQueue(queue->mainQueue);
freeFunnelQueue(queue->retryQueue);
FREE(queue);
}