|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Copyright (c) 2020 Red Hat, Inc.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is free software; you can redistribute it and/or
|
|
Packit Service |
310c69 |
* modify it under the terms of the GNU General Public License
|
|
Packit Service |
310c69 |
* as published by the Free Software Foundation; either version 2
|
|
Packit Service |
310c69 |
* of the License, or (at your option) any later version.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is distributed in the hope that it will be useful,
|
|
Packit Service |
310c69 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit Service |
310c69 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit Service |
310c69 |
* GNU General Public License for more details.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* You should have received a copy of the GNU General Public License
|
|
Packit Service |
310c69 |
* along with this program; if not, write to the Free Software
|
|
Packit Service |
310c69 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
Packit Service |
310c69 |
* 02110-1301, USA.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* $Id: //eng/uds-releases/jasper/kernelLinux/uds/requestQueueKernel.c#3 $
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "requestQueue.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include <linux/wait.h>
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "atomicDefs.h"
|
|
Packit Service |
310c69 |
#include "compiler.h"
|
|
Packit Service |
310c69 |
#include "logger.h"
|
|
Packit Service |
310c69 |
#include "request.h"
|
|
Packit Service |
310c69 |
#include "memoryAlloc.h"
|
|
Packit Service |
310c69 |
#include "threads.h"
|
|
Packit Service |
310c69 |
#include "util/funnelQueue.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Ordering:
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* Multiple retry requests or multiple non-retry requests enqueued from
|
|
Packit Service |
310c69 |
* a single producer thread will be processed in the order enqueued.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* Retry requests will generally be processed before normal requests.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* HOWEVER, a producer thread can enqueue a retry request (generally given
|
|
Packit Service |
310c69 |
* higher priority) and then enqueue a normal request, and they can get
|
|
Packit Service |
310c69 |
* processed in the reverse order. The checking of the two internal queues is
|
|
Packit Service |
310c69 |
* very simple and there's a potential race with the producer regarding the
|
|
Packit Service |
310c69 |
* "priority" handling. If an ordering guarantee is needed, it can be added
|
|
Packit Service |
310c69 |
* without much difficulty, it just makes the code a bit more complicated.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* If requests are enqueued while the processing of another request is
|
|
Packit Service |
310c69 |
* happening, and the enqueuing operations complete while the request
|
|
Packit Service |
310c69 |
* processing is still in progress, then the retry request(s) *will*
|
|
Packit Service |
310c69 |
* get processed next. (This is used for testing.)
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Time constants, all in units of nanoseconds.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
enum {
|
|
Packit Service |
310c69 |
ONE_NANOSECOND = 1,
|
|
Packit Service |
310c69 |
ONE_MICROSECOND = 1000 * ONE_NANOSECOND,
|
|
Packit Service |
310c69 |
ONE_MILLISECOND = 1000 * ONE_MICROSECOND,
|
|
Packit Service |
310c69 |
ONE_SECOND = 1000 * ONE_MILLISECOND,
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/** The initial time to wait after waiting with no timeout */
|
|
Packit Service |
310c69 |
DEFAULT_WAIT_TIME = 20 * ONE_MICROSECOND,
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/** The minimum time to wait when waiting with a timeout */
|
|
Packit Service |
310c69 |
MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/** The maximimum time to wait when waiting with a timeout */
|
|
Packit Service |
310c69 |
MAXIMUM_WAIT_TIME = ONE_MILLISECOND
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Batch size tuning constants. These are compared to the number of requests
|
|
Packit Service |
310c69 |
* that have been processed since the worker thread last woke up.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
enum {
|
|
Packit Service |
310c69 |
MINIMUM_BATCH = 32, // wait time increases if batches are smaller than this
|
|
Packit Service |
310c69 |
MAXIMUM_BATCH = 64 // wait time decreases if batches are larger than this
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
struct requestQueue {
|
|
Packit Service |
310c69 |
/* Wait queue for synchronizing producers and consumer */
|
|
Packit Service |
310c69 |
struct wait_queue_head wqhead;
|
|
Packit Service |
310c69 |
/* function to process 1 request */
|
|
Packit Service |
310c69 |
RequestQueueProcessor *processOne;
|
|
Packit Service |
310c69 |
/* new incoming requests */
|
|
Packit Service |
310c69 |
FunnelQueue *mainQueue;
|
|
Packit Service |
310c69 |
/* old requests to retry first */
|
|
Packit Service |
310c69 |
FunnelQueue *retryQueue;
|
|
Packit Service |
310c69 |
/* thread id of the worker thread */
|
|
Packit Service |
310c69 |
Thread thread;
|
|
Packit Service |
310c69 |
/* true if the worker was started */
|
|
Packit Service |
310c69 |
bool started;
|
|
Packit Service |
310c69 |
/* when true, requests can be enqueued */
|
|
Packit Service |
310c69 |
bool alive;
|
|
Packit Service |
310c69 |
/* A flag set when the worker is waiting without a timeout */
|
|
Packit Service |
310c69 |
atomic_t dormant;
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*****************************************************************************/
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Poll the underlying lock-free queues for a request to process. Must only be
|
|
Packit Service |
310c69 |
* called by the worker thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param queue the RequestQueue being serviced
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return a dequeued request, or NULL if no request was available
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static INLINE Request *pollQueues(RequestQueue *queue)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// The retry queue has higher priority.
|
|
Packit Service |
310c69 |
FunnelQueueEntry *entry = funnelQueuePoll(queue->retryQueue);
|
|
Packit Service |
310c69 |
if (entry != NULL) {
|
|
Packit Service |
310c69 |
return container_of(entry, Request, requestQueueLink);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// The main queue has lower priority.
|
|
Packit Service |
310c69 |
entry = funnelQueuePoll(queue->mainQueue);
|
|
Packit Service |
310c69 |
if (entry != NULL) {
|
|
Packit Service |
310c69 |
return container_of(entry, Request, requestQueueLink);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// No entry found.
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*****************************************************************************/
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Check if the underlying lock-free queues appear not just not to have any
|
|
Packit Service |
310c69 |
* requests available right now, but also not to be in the intermediate state
|
|
Packit Service |
310c69 |
* of getting requests added. Must only be called by the worker thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param queue the RequestQueue being serviced
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return true iff both funnel queues are idle
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static INLINE bool areQueuesIdle(RequestQueue *queue)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
return (isFunnelQueueIdle(queue->retryQueue) &&
|
|
Packit Service |
310c69 |
isFunnelQueueIdle(queue->mainQueue));
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*****************************************************************************/
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Remove the next request to be processed from the queue. Must only be called
|
|
Packit Service |
310c69 |
* by the worker thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param queue the queue from which to remove an entry
|
|
Packit Service |
310c69 |
* @param requestPtr the next request is returned here, or a NULL pointer to
|
|
Packit Service |
310c69 |
* indicate that there will be no more requests
|
|
Packit Service |
310c69 |
* @param waitedPtr return a boolean to indicate that we need to wait
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return True when there is a next request, or when we know that there will
|
|
Packit Service |
310c69 |
* never be another request. False when we must wait for a request.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static INLINE bool dequeueRequest(RequestQueue *queue,
|
|
Packit Service |
310c69 |
Request **requestPtr,
|
|
Packit Service |
310c69 |
bool *waitedPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// Because of batching, we expect this to be the most common code path.
|
|
Packit Service |
310c69 |
Request *request = pollQueues(queue);
|
|
Packit Service |
310c69 |
if (request != NULL) {
|
|
Packit Service |
310c69 |
// Return because we found a request
|
|
Packit Service |
310c69 |
*requestPtr = request;
|
|
Packit Service |
310c69 |
return true;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (!READ_ONCE(queue->alive)) {
|
|
Packit Service |
310c69 |
// Return because we see that shutdown is happening
|
|
Packit Service |
310c69 |
*requestPtr = NULL;
|
|
Packit Service |
310c69 |
return true;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Return indicating that we need to wait.
|
|
Packit Service |
310c69 |
*requestPtr = NULL;
|
|
Packit Service |
310c69 |
*waitedPtr = true;
|
|
Packit Service |
310c69 |
return false;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*****************************************************************************/
|
|
Packit Service |
310c69 |
static void requestQueueWorker(void *arg)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
RequestQueue *queue = (RequestQueue *) arg;
|
|
Packit Service |
310c69 |
unsigned long timeBatch = DEFAULT_WAIT_TIME;
|
|
Packit Service |
310c69 |
bool dormant = atomic_read(&queue->dormant);
|
|
Packit Service |
310c69 |
long currentBatch = 0;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
for (;;) {
|
|
Packit Service |
310c69 |
Request *request;
|
|
Packit Service |
310c69 |
bool waited = false;
|
|
Packit Service |
310c69 |
if (dormant) {
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Sleep/wakeup protocol:
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* The enqueue operation updates "newest" in the
|
|
Packit Service |
310c69 |
* funnel queue via xchg which is a memory barrier,
|
|
Packit Service |
310c69 |
* and later checks "dormant" to decide whether to do
|
|
Packit Service |
310c69 |
* a wakeup of the worker thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* The worker thread, when deciding to go to sleep,
|
|
Packit Service |
310c69 |
* sets "dormant" and then examines "newest" to decide
|
|
Packit Service |
310c69 |
* if the funnel queue is idle. In dormant mode, the
|
|
Packit Service |
310c69 |
* last examination of "newest" before going to sleep
|
|
Packit Service |
310c69 |
* is done inside the wait_event_interruptible macro,
|
|
Packit Service |
310c69 |
* after a point where (one or more) memory barriers
|
|
Packit Service |
310c69 |
* have been issued. (Preparing to sleep uses spin
|
|
Packit Service |
310c69 |
* locks.) Even if the "next" field update isn't
|
|
Packit Service |
310c69 |
* visible yet to make the entry accessible, its
|
|
Packit Service |
310c69 |
* existence will kick the worker thread out of
|
|
Packit Service |
310c69 |
* dormant mode and back into timer-based mode.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* So the two threads should agree on the ordering of
|
|
Packit Service |
310c69 |
* the updating of the two fields.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
wait_event_interruptible(queue->wqhead,
|
|
Packit Service |
310c69 |
dequeueRequest(queue, &request, &waited) ||
|
|
Packit Service |
310c69 |
!areQueuesIdle(queue));
|
|
Packit Service |
310c69 |
} else {
|
|
Packit Service |
310c69 |
wait_event_interruptible_hrtimeout(queue->wqhead,
|
|
Packit Service |
310c69 |
dequeueRequest(queue, &request,
|
|
Packit Service |
310c69 |
&waited),
|
|
Packit Service |
310c69 |
ns_to_ktime(timeBatch));
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (likely(request != NULL)) {
|
|
Packit Service |
310c69 |
// We got a request.
|
|
Packit Service |
310c69 |
currentBatch++;
|
|
Packit Service |
310c69 |
queue->processOne(request);
|
|
Packit Service |
310c69 |
} else if (!READ_ONCE(queue->alive)) {
|
|
Packit Service |
310c69 |
// We got no request and we know we are shutting down.
|
|
Packit Service |
310c69 |
break;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (dormant) {
|
|
Packit Service |
310c69 |
// We've been roused from dormancy. Clear the flag so enqueuers can stop
|
|
Packit Service |
310c69 |
// broadcasting (no fence needed for this transition).
|
|
Packit Service |
310c69 |
atomic_set(&queue->dormant, false);
|
|
Packit Service |
310c69 |
dormant = false;
|
|
Packit Service |
310c69 |
// Reset the timeout back to the default since we don't know how long
|
|
Packit Service |
310c69 |
// we've been asleep and we also want to be responsive to a new burst.
|
|
Packit Service |
310c69 |
timeBatch = DEFAULT_WAIT_TIME;
|
|
Packit Service |
310c69 |
} else if (waited) {
|
|
Packit Service |
310c69 |
// We waited for this request to show up. Adjust the wait time if the
|
|
Packit Service |
310c69 |
// last batch of requests was too small or too large..
|
|
Packit Service |
310c69 |
if (currentBatch < MINIMUM_BATCH) {
|
|
Packit Service |
310c69 |
// Adjust the wait time if the last batch of requests was too small.
|
|
Packit Service |
310c69 |
timeBatch += timeBatch / 4;
|
|
Packit Service |
310c69 |
if (timeBatch >= MAXIMUM_WAIT_TIME) {
|
|
Packit Service |
310c69 |
// The timeout is getting long enough that we need to switch into
|
|
Packit Service |
310c69 |
// dormant mode.
|
|
Packit Service |
310c69 |
atomic_set(&queue->dormant, true);
|
|
Packit Service |
310c69 |
dormant = true;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
} else if (currentBatch > MAXIMUM_BATCH) {
|
|
Packit Service |
310c69 |
// Adjust the wait time if the last batch of requests was too large.
|
|
Packit Service |
310c69 |
timeBatch -= timeBatch / 4;
|
|
Packit Service |
310c69 |
if (timeBatch < MINIMUM_WAIT_TIME) {
|
|
Packit Service |
310c69 |
// But if the producer is very fast or the scheduler doesn't wake up
|
|
Packit Service |
310c69 |
// up promptly, waiting for very short times won't make the batches
|
|
Packit Service |
310c69 |
// smaller.
|
|
Packit Service |
310c69 |
timeBatch = MINIMUM_WAIT_TIME;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
// And we must now start a new batch count
|
|
Packit Service |
310c69 |
currentBatch = 0;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Ensure that we see any requests that were guaranteed to have been fully
|
|
Packit Service |
310c69 |
* enqueued before shutdown was flagged. The corresponding write barrier
|
|
Packit Service |
310c69 |
* is in requestQueueFinish.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
smp_rmb();
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Process every request that is still in the queue, and never wait for any
|
|
Packit Service |
310c69 |
// new requests to show up.
|
|
Packit Service |
310c69 |
for (;;) {
|
|
Packit Service |
310c69 |
Request *request = pollQueues(queue);
|
|
Packit Service |
310c69 |
if (request == NULL) {
|
|
Packit Service |
310c69 |
break;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
queue->processOne(request);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
int makeRequestQueue(const char *queueName,
|
|
Packit Service |
310c69 |
RequestQueueProcessor *processOne,
|
|
Packit Service |
310c69 |
RequestQueue **queuePtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
RequestQueue *queue;
|
|
Packit Service |
310c69 |
int result = ALLOCATE(1, RequestQueue, __func__, &queue);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
queue->processOne = processOne;
|
|
Packit Service |
310c69 |
queue->alive = true;
|
|
Packit Service |
310c69 |
atomic_set(&queue->dormant, false);
|
|
Packit Service |
310c69 |
init_waitqueue_head(&queue->wqhead);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
result = makeFunnelQueue(&queue->mainQueue);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
requestQueueFinish(queue);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
result = makeFunnelQueue(&queue->retryQueue);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
requestQueueFinish(queue);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
result = createThread(requestQueueWorker, queue, queueName, &queue->thread);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
requestQueueFinish(queue);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
queue->started = true;
|
|
Packit Service |
310c69 |
smp_mb();
|
|
Packit Service |
310c69 |
*queuePtr = queue;
|
|
Packit Service |
310c69 |
return UDS_SUCCESS;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
static INLINE void wakeUpWorker(RequestQueue *queue)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// This is the code sequence recommended in <linux/wait.h>
|
|
Packit Service |
310c69 |
smp_mb();
|
|
Packit Service |
310c69 |
if (waitqueue_active(&queue->wqhead)) {
|
|
Packit Service |
310c69 |
wake_up(&queue->wqhead);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void requestQueueEnqueue(RequestQueue *queue, Request *request)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
bool unbatched = request->unbatched;
|
|
Packit Service |
310c69 |
funnelQueuePut(request->requeued ? queue->retryQueue : queue->mainQueue,
|
|
Packit Service |
310c69 |
&request->requestQueueLink);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* We must wake the worker thread when it is dormant (waiting with no
|
|
Packit Service |
310c69 |
* timeout). An atomic load (read fence) isn't needed here since we know the
|
|
Packit Service |
310c69 |
* queue operation acts as one.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
if (atomic_read(&queue->dormant) || unbatched) {
|
|
Packit Service |
310c69 |
wakeUpWorker(queue);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void requestQueueFinish(RequestQueue *queue)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (queue == NULL) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* This memory barrier ensures that any requests we queued will be seen. The
|
|
Packit Service |
310c69 |
* point is that when dequeueRequest sees the following update to the alive
|
|
Packit Service |
310c69 |
* flag, it will also be able to see any change we made to a next field in
|
|
Packit Service |
310c69 |
* the FunnelQueue entry. The corresponding read barrier is in
|
|
Packit Service |
310c69 |
* requestQueueWorker.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
smp_wmb();
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Mark the queue as dead.
|
|
Packit Service |
310c69 |
WRITE_ONCE(queue->alive, false);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (queue->started) {
|
|
Packit Service |
310c69 |
// Wake the worker so it notices that it should exit.
|
|
Packit Service |
310c69 |
wakeUpWorker(queue);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Wait for the worker thread to finish processing any additional pending
|
|
Packit Service |
310c69 |
// work and exit.
|
|
Packit Service |
310c69 |
int result = joinThreads(queue->thread);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
logWarningWithStringError(result, "Failed to join worker thread");
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
freeFunnelQueue(queue->mainQueue);
|
|
Packit Service |
310c69 |
freeFunnelQueue(queue->retryQueue);
|
|
Packit Service |
310c69 |
FREE(queue);
|
|
Packit Service |
310c69 |
}
|