/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/uds-releases/jasper/kernelLinux/uds/requestQueueKernel.c#3 $ */ #include "requestQueue.h" #include #include "atomicDefs.h" #include "compiler.h" #include "logger.h" #include "request.h" #include "memoryAlloc.h" #include "threads.h" #include "util/funnelQueue.h" /* * Ordering: * * Multiple retry requests or multiple non-retry requests enqueued from * a single producer thread will be processed in the order enqueued. * * Retry requests will generally be processed before normal requests. * * HOWEVER, a producer thread can enqueue a retry request (generally given * higher priority) and then enqueue a normal request, and they can get * processed in the reverse order. The checking of the two internal queues is * very simple and there's a potential race with the producer regarding the * "priority" handling. If an ordering guarantee is needed, it can be added * without much difficulty, it just makes the code a bit more complicated. * * If requests are enqueued while the processing of another request is * happening, and the enqueuing operations complete while the request * processing is still in progress, then the retry request(s) *will* * get processed next. (This is used for testing.) */ /** * Time constants, all in units of nanoseconds. **/ enum { ONE_NANOSECOND = 1, ONE_MICROSECOND = 1000 * ONE_NANOSECOND, ONE_MILLISECOND = 1000 * ONE_MICROSECOND, ONE_SECOND = 1000 * ONE_MILLISECOND, /** The initial time to wait after waiting with no timeout */ DEFAULT_WAIT_TIME = 20 * ONE_MICROSECOND, /** The minimum time to wait when waiting with a timeout */ MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2, /** The maximimum time to wait when waiting with a timeout */ MAXIMUM_WAIT_TIME = ONE_MILLISECOND }; /** * Batch size tuning constants. These are compared to the number of requests * that have been processed since the worker thread last woke up. **/ enum { MINIMUM_BATCH = 32, // wait time increases if batches are smaller than this MAXIMUM_BATCH = 64 // wait time decreases if batches are larger than this }; struct requestQueue { /* Wait queue for synchronizing producers and consumer */ struct wait_queue_head wqhead; /* function to process 1 request */ RequestQueueProcessor *processOne; /* new incoming requests */ FunnelQueue *mainQueue; /* old requests to retry first */ FunnelQueue *retryQueue; /* thread id of the worker thread */ Thread thread; /* true if the worker was started */ bool started; /* when true, requests can be enqueued */ bool alive; /* A flag set when the worker is waiting without a timeout */ atomic_t dormant; }; /*****************************************************************************/ /** * Poll the underlying lock-free queues for a request to process. Must only be * called by the worker thread. * * @param queue the RequestQueue being serviced * * @return a dequeued request, or NULL if no request was available **/ static INLINE Request *pollQueues(RequestQueue *queue) { // The retry queue has higher priority. FunnelQueueEntry *entry = funnelQueuePoll(queue->retryQueue); if (entry != NULL) { return container_of(entry, Request, requestQueueLink); } // The main queue has lower priority. entry = funnelQueuePoll(queue->mainQueue); if (entry != NULL) { return container_of(entry, Request, requestQueueLink); } // No entry found. return NULL; } /*****************************************************************************/ /** * Check if the underlying lock-free queues appear not just not to have any * requests available right now, but also not to be in the intermediate state * of getting requests added. Must only be called by the worker thread. * * @param queue the RequestQueue being serviced * * @return true iff both funnel queues are idle **/ static INLINE bool areQueuesIdle(RequestQueue *queue) { return (isFunnelQueueIdle(queue->retryQueue) && isFunnelQueueIdle(queue->mainQueue)); } /*****************************************************************************/ /** * Remove the next request to be processed from the queue. Must only be called * by the worker thread. * * @param queue the queue from which to remove an entry * @param requestPtr the next request is returned here, or a NULL pointer to * indicate that there will be no more requests * @param waitedPtr return a boolean to indicate that we need to wait * * @return True when there is a next request, or when we know that there will * never be another request. False when we must wait for a request. **/ static INLINE bool dequeueRequest(RequestQueue *queue, Request **requestPtr, bool *waitedPtr) { // Because of batching, we expect this to be the most common code path. Request *request = pollQueues(queue); if (request != NULL) { // Return because we found a request *requestPtr = request; return true; } if (!READ_ONCE(queue->alive)) { // Return because we see that shutdown is happening *requestPtr = NULL; return true; } // Return indicating that we need to wait. *requestPtr = NULL; *waitedPtr = true; return false; } /*****************************************************************************/ static void requestQueueWorker(void *arg) { RequestQueue *queue = (RequestQueue *) arg; unsigned long timeBatch = DEFAULT_WAIT_TIME; bool dormant = atomic_read(&queue->dormant); long currentBatch = 0; for (;;) { Request *request; bool waited = false; if (dormant) { /* * Sleep/wakeup protocol: * * The enqueue operation updates "newest" in the * funnel queue via xchg which is a memory barrier, * and later checks "dormant" to decide whether to do * a wakeup of the worker thread. * * The worker thread, when deciding to go to sleep, * sets "dormant" and then examines "newest" to decide * if the funnel queue is idle. In dormant mode, the * last examination of "newest" before going to sleep * is done inside the wait_event_interruptible macro, * after a point where (one or more) memory barriers * have been issued. (Preparing to sleep uses spin * locks.) Even if the "next" field update isn't * visible yet to make the entry accessible, its * existence will kick the worker thread out of * dormant mode and back into timer-based mode. * * So the two threads should agree on the ordering of * the updating of the two fields. */ wait_event_interruptible(queue->wqhead, dequeueRequest(queue, &request, &waited) || !areQueuesIdle(queue)); } else { wait_event_interruptible_hrtimeout(queue->wqhead, dequeueRequest(queue, &request, &waited), ns_to_ktime(timeBatch)); } if (likely(request != NULL)) { // We got a request. currentBatch++; queue->processOne(request); } else if (!READ_ONCE(queue->alive)) { // We got no request and we know we are shutting down. break; } if (dormant) { // We've been roused from dormancy. Clear the flag so enqueuers can stop // broadcasting (no fence needed for this transition). atomic_set(&queue->dormant, false); dormant = false; // Reset the timeout back to the default since we don't know how long // we've been asleep and we also want to be responsive to a new burst. timeBatch = DEFAULT_WAIT_TIME; } else if (waited) { // We waited for this request to show up. Adjust the wait time if the // last batch of requests was too small or too large.. if (currentBatch < MINIMUM_BATCH) { // Adjust the wait time if the last batch of requests was too small. timeBatch += timeBatch / 4; if (timeBatch >= MAXIMUM_WAIT_TIME) { // The timeout is getting long enough that we need to switch into // dormant mode. atomic_set(&queue->dormant, true); dormant = true; } } else if (currentBatch > MAXIMUM_BATCH) { // Adjust the wait time if the last batch of requests was too large. timeBatch -= timeBatch / 4; if (timeBatch < MINIMUM_WAIT_TIME) { // But if the producer is very fast or the scheduler doesn't wake up // up promptly, waiting for very short times won't make the batches // smaller. timeBatch = MINIMUM_WAIT_TIME; } } // And we must now start a new batch count currentBatch = 0; } } /* * Ensure that we see any requests that were guaranteed to have been fully * enqueued before shutdown was flagged. The corresponding write barrier * is in requestQueueFinish. */ smp_rmb(); // Process every request that is still in the queue, and never wait for any // new requests to show up. for (;;) { Request *request = pollQueues(queue); if (request == NULL) { break; } queue->processOne(request); } } /**********************************************************************/ int makeRequestQueue(const char *queueName, RequestQueueProcessor *processOne, RequestQueue **queuePtr) { RequestQueue *queue; int result = ALLOCATE(1, RequestQueue, __func__, &queue); if (result != UDS_SUCCESS) { return result; } queue->processOne = processOne; queue->alive = true; atomic_set(&queue->dormant, false); init_waitqueue_head(&queue->wqhead); result = makeFunnelQueue(&queue->mainQueue); if (result != UDS_SUCCESS) { requestQueueFinish(queue); return result; } result = makeFunnelQueue(&queue->retryQueue); if (result != UDS_SUCCESS) { requestQueueFinish(queue); return result; } result = createThread(requestQueueWorker, queue, queueName, &queue->thread); if (result != UDS_SUCCESS) { requestQueueFinish(queue); return result; } queue->started = true; smp_mb(); *queuePtr = queue; return UDS_SUCCESS; } /**********************************************************************/ static INLINE void wakeUpWorker(RequestQueue *queue) { // This is the code sequence recommended in smp_mb(); if (waitqueue_active(&queue->wqhead)) { wake_up(&queue->wqhead); } } /**********************************************************************/ void requestQueueEnqueue(RequestQueue *queue, Request *request) { bool unbatched = request->unbatched; funnelQueuePut(request->requeued ? queue->retryQueue : queue->mainQueue, &request->requestQueueLink); /* * We must wake the worker thread when it is dormant (waiting with no * timeout). An atomic load (read fence) isn't needed here since we know the * queue operation acts as one. */ if (atomic_read(&queue->dormant) || unbatched) { wakeUpWorker(queue); } } /**********************************************************************/ void requestQueueFinish(RequestQueue *queue) { if (queue == NULL) { return; } /* * This memory barrier ensures that any requests we queued will be seen. The * point is that when dequeueRequest sees the following update to the alive * flag, it will also be able to see any change we made to a next field in * the FunnelQueue entry. The corresponding read barrier is in * requestQueueWorker. */ smp_wmb(); // Mark the queue as dead. WRITE_ONCE(queue->alive, false); if (queue->started) { // Wake the worker so it notices that it should exit. wakeUpWorker(queue); // Wait for the worker thread to finish processing any additional pending // work and exit. int result = joinThreads(queue->thread); if (result != UDS_SUCCESS) { logWarningWithStringError(result, "Failed to join worker thread"); } } freeFunnelQueue(queue->mainQueue); freeFunnelQueue(queue->retryQueue); FREE(queue); }