/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/workQueue.c#11 $
*/
#include "workQueue.h"
#include <linux/delay.h>
#include <linux/kthread.h>
#include <linux/version.h>
#include "atomic.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "permassert.h"
#include "stringUtils.h"
#include "numeric.h"
#include "workItemStats.h"
#include "workQueueHandle.h"
#include "workQueueInternals.h"
#include "workQueueStats.h"
#include "workQueueSysfs.h"
enum {
// Time between work queue heartbeats in usec. The default kernel
// configurations generally have 1ms or 4ms tick rates, so let's make this a
// multiple for accuracy.
FUNNEL_HEARTBEAT_INTERVAL = 4000,
// Time to wait for a work queue to flush remaining items during shutdown.
// Specified in milliseconds.
FUNNEL_FINISH_SLEEP = 5000,
};
static struct mutex queueDataLock;
static SimpleWorkQueue queueData;
static void freeSimpleWorkQueue(SimpleWorkQueue *queue);
static void finishSimpleWorkQueue(SimpleWorkQueue *queue);
// work item lists (used for delayed work items)
/**********************************************************************/
static void initializeWorkItemList(KvdoWorkItemList *list)
{
list->tail = NULL;
}
/**********************************************************************/
static void addToWorkItemList(KvdoWorkItemList *list, KvdoWorkItem *item)
{
if (list->tail == NULL) {
item->next = item;
} else {
KvdoWorkItem *head = list->tail->next;
list->tail->next = item;
item->next = head;
}
list->tail = item;
}
/**********************************************************************/
static bool isWorkItemListEmpty(KvdoWorkItemList *list)
{
return list->tail == NULL;
}
/**********************************************************************/
static KvdoWorkItem *workItemListPoll(KvdoWorkItemList *list)
{
KvdoWorkItem *tail = list->tail;
if (tail == NULL) {
return NULL;
}
// Extract and return head of list.
KvdoWorkItem *head = tail->next;
// Only one entry?
if (head == tail) {
list->tail = NULL;
} else {
tail->next = head->next;
}
head->next = NULL;
return head;
}
/**********************************************************************/
static KvdoWorkItem *workItemListPeek(KvdoWorkItemList *list)
{
KvdoWorkItem *tail = list->tail;
return tail ? tail->next : NULL;
}
// Finding the SimpleWorkQueue to actually operate on.
/**
* Pick the next subordinate service queue in rotation.
*
* This doesn't need to be 100% precise in distributing work items around, so
* playing loose with concurrent field modifications isn't going to hurt us.
* (Avoiding the atomic ops may help us a bit in performance, but we'll still
* have contention over the fields.)
*
* @param queue The round-robin-type work queue
*
* @return A subordinate work queue
**/
static inline SimpleWorkQueue *nextServiceQueue(RoundRobinWorkQueue *queue)
{
unsigned int index = (queue->serviceQueueRotor++ % queue->numServiceQueues);
return queue->serviceQueues[index];
}
/**
* Find a simple work queue on which to operate.
*
* If the argument is already a simple work queue, use it. If it's a
* round-robin work queue, pick the next subordinate service queue and use it.
*
* @param queue a work queue (round-robin or simple)
*
* @return a simple work queue
**/
static inline SimpleWorkQueue *pickSimpleQueue(KvdoWorkQueue *queue)
{
return (queue->roundRobinMode
? nextServiceQueue(asRoundRobinWorkQueue(queue))
: asSimpleWorkQueue(queue));
}
// Processing normal work items.
/**
* Scan the work queue's work item lists, and dequeue and return the next
* waiting work item, if any.
*
* We scan the funnel queues from highest priority to lowest, once; there is
* therefore a race condition where a high-priority work item can be enqueued
* followed by a lower-priority one, and we'll grab the latter (but we'll catch
* the high-priority item on the next call). If strict enforcement of
* priorities becomes necessary, this function will need fixing.
*
* @param queue the work queue
*
* @return a work item pointer, or NULL
**/
static KvdoWorkItem *pollForWorkItem(SimpleWorkQueue *queue)
{
KvdoWorkItem *item = NULL;
for (int i = READ_ONCE(queue->numPriorityLists) - 1; i >= 0; i--) {
FunnelQueueEntry *link = funnelQueuePoll(queue->priorityLists[i]);
if (link != NULL) {
item = container_of(link, KvdoWorkItem, workQueueEntryLink);
break;
}
}
return item;
}
/**
* Add a work item into the queue, and inform the caller of any additional
* processing necessary.
*
* If the worker thread may not be awake, true is returned, and the caller
* should attempt a wakeup.
*
* @param queue The work queue
* @param item The work item to add
*
* @return true iff the caller should wake the worker thread
**/
__attribute__((warn_unused_result))
static bool enqueueWorkQueueItem(SimpleWorkQueue *queue, KvdoWorkItem *item)
{
ASSERT_LOG_ONLY(item->myQueue == NULL,
"item %" PRIptr " (fn %" PRIptr "/%" PRIptr
") to enqueue (%" PRIptr
") is not already queued (%" PRIptr ")",
item, item->work, item->statsFunction, queue,
item->myQueue);
if (ASSERT(item->action < WORK_QUEUE_ACTION_COUNT,
"action is in range for queue") != VDO_SUCCESS) {
item->action = 0;
}
unsigned int priority = READ_ONCE(queue->priorityMap[item->action]);
// Update statistics.
updateStatsForEnqueue(&queue->stats, item, priority);
item->myQueue = &queue->common;
// Funnel queue handles the synchronization for the put.
funnelQueuePut(queue->priorityLists[priority], &item->workQueueEntryLink);
/*
* Due to how funnel-queue synchronization is handled (just atomic
* operations), the simplest safe implementation here would be to wake-up any
* waiting threads after enqueueing each item. Even if the funnel queue is
* not empty at the time of adding an item to the queue, the consumer thread
* may not see this since it is not guaranteed to have the same view of the
* queue as a producer thread.
*
* However, the above is wasteful so instead we attempt to minimize the
* number of thread wakeups. This is normally unsafe due to the above
* consumer-producer synchronization constraints. To correct this a timeout
* mechanism is used to wake the thread periodically to handle the occasional
* race condition that triggers and results in this thread not being woken
* properly.
*
* In most cases, the above timeout will not occur prior to some other work
* item being added after the queue is set to idle state, so thread wakeups
* will generally be triggered much faster than this interval. The timeout
* provides protection against the cases where more work items are either not
* added or are added too infrequently.
*
* This is also why we can get away with the normally-unsafe optimization for
* the common case by checking queue->idle first without synchronization. The
* race condition exists, but another work item getting enqueued can wake us
* up, and if we don't get that either, we still have the timeout to fall
* back on.
*
* Developed and tuned for some x86 boxes; untested whether this is any
* better or worse for other platforms, with or without the explicit memory
* barrier.
*/
smp_mb();
return ((atomic_read(&queue->idle) == 1)
&& (atomic_cmpxchg(&queue->idle, 1, 0) == 1));
}
/**
* Compute an approximate indication of the number of pending work items.
*
* No synchronization is used, so it's guaranteed to be correct only if there
* is no activity.
*
* @param queue The work queue to examine
*
* @return the estimate of the number of pending work items
**/
static unsigned int getPendingCount(SimpleWorkQueue *queue)
{
KvdoWorkItemStats *stats = &queue->stats.workItemStats;
long long pending = 0;
for (int i = 0; i < NUM_WORK_QUEUE_ITEM_STATS + 1; i++) {
pending += atomic64_read(&stats->enqueued[i]);
pending -= stats->times[i].count;
}
if (pending < 0) {
/*
* If we fetched numbers that were changing, we can get negative results.
* Just return an indication that there's some activity.
*/
pending = 1;
}
return pending;
}
/**
* Run any start hook that may be defined for the work queue.
*
* @param queue The work queue
**/
static void runStartHook(SimpleWorkQueue *queue)
{
if (queue->type->start != NULL) {
queue->type->start(queue->private);
}
}
/**
* Run any finish hook that may be defined for the work queue.
*
* @param queue The work queue
**/
static void runFinishHook(SimpleWorkQueue *queue)
{
if (queue->type->finish != NULL) {
queue->type->finish(queue->private);
}
}
/**
* If the work queue has a suspend hook, invoke it, and when it finishes, check
* again for any pending work items.
*
* We assume a check for pending work items has just been done and turned up
* empty; so, if no suspend hook exists, we can just return NULL without doing
* another check.
*
* @param [in] queue The work queue preparing to suspend
*
* @return the newly found work item, if any
**/
static KvdoWorkItem *runSuspendHook(SimpleWorkQueue *queue)
{
if (queue->type->suspend == NULL) {
return NULL;
}
queue->type->suspend(queue->private);
return pollForWorkItem(queue);
}
/**
* Check whether a work queue has delayed work items pending.
*
* @param queue The work queue
*
* @return true iff delayed work items are pending
**/
static bool hasDelayedWorkItems(SimpleWorkQueue *queue)
{
bool result;
unsigned long flags;
spin_lock_irqsave(&queue->lock, flags);
result = !isWorkItemListEmpty(&queue->delayedItems);
spin_unlock_irqrestore(&queue->lock, flags);
return result;
}
/**
* Wait for the next work item to process, or until kthread_should_stop
* indicates that it's time for us to shut down.
*
* If kthread_should_stop says it's time to stop but we have pending work
* items, return a work item.
*
* Update statistics relating to scheduler interactions.
*
* @param [in] queue The work queue to wait on
* @param [in] timeoutInterval How long to wait each iteration
*
* @return the next work item, or NULL to indicate shutdown is requested
**/
static KvdoWorkItem *waitForNextWorkItem(SimpleWorkQueue *queue,
TimeoutJiffies timeoutInterval)
{
KvdoWorkItem *item = runSuspendHook(queue);
if (item != NULL) {
return item;
}
DEFINE_WAIT(wait);
while (true) {
atomic64_set(&queue->firstWakeup, 0);
prepare_to_wait(&queue->waitingWorkerThreads, &wait, TASK_INTERRUPTIBLE);
/*
* Don't set the idle flag until a wakeup will not be lost.
*
* Force synchronization between setting the idle flag and checking the
* funnel queue; the producer side will do them in the reverse order.
* (There's still a race condition we've chosen to allow, because we've got
* a timeout below that unwedges us if we hit it, but this may narrow the
* window a little.)
*/
atomic_set(&queue->idle, 1);
memoryFence(); // store-load barrier between "idle" and funnel queue
item = pollForWorkItem(queue);
if (item != NULL) {
break;
}
/*
* We need to check for thread-stop after setting TASK_INTERRUPTIBLE state
* up above. Otherwise, schedule() will put the thread to sleep and might
* miss a wakeup from kthread_stop() call in finishWorkQueue().
*
* If there are delayed work items, we need to wait for them to
* get run. Then, when we check kthread_should_stop again, we'll
* finally exit.
*/
if (kthread_should_stop() && !hasDelayedWorkItems(queue)) {
/*
* Recheck once again in case we *just* converted a delayed work item to
* a regular enqueued work item.
*
* It's important that processDelayedWorkItems holds the spin lock until
* it finishes enqueueing the work item to run.
*
* Funnel queues aren't synchronized between producers and consumer.
* Normally a producer interrupted mid-update can hide a later producer's
* entry until the first completes. This would be a problem, except that
* when kthread_stop is called, we should already have ceased adding new
* work items and have waited for all the regular work items to finish;
* (recurring) delayed work items should be the only exception.
*
* Worker thread shutdown would be simpler if even the delayed work items
* were required to be completed and not re-queued before shutting down a
* work queue.
*/
item = pollForWorkItem(queue);
break;
}
/*
* We don't need to update the wait count atomically since this is the only
* place it is modified and there is only one thread involved.
*/
queue->stats.waits++;
uint64_t timeBeforeSchedule = currentTime(CLOCK_MONOTONIC);
atomic64_add(timeBeforeSchedule - queue->mostRecentWakeup,
&queue->stats.runTime);
// Wake up often, to address the missed-wakeup race.
schedule_timeout(timeoutInterval);
queue->mostRecentWakeup = currentTime(CLOCK_MONOTONIC);
uint64_t callDurationNS = queue->mostRecentWakeup - timeBeforeSchedule;
enterHistogramSample(queue->stats.scheduleTimeHistogram,
callDurationNS / 1000);
/*
* Check again before resetting firstWakeup for more accurate
* stats. (It's still racy, which can't be fixed without requiring
* tighter synchronization between producer and consumer sides.)
*/
item = pollForWorkItem(queue);
if (item != NULL) {
break;
}
}
if (item != NULL) {
uint64_t firstWakeup = atomic64_read(&queue->firstWakeup);
/*
* We sometimes register negative wakeup latencies without this fencing.
* Whether it's forcing full serialization between the read of firstWakeup
* and the "rdtsc" that might be used depending on the clock source that
* helps, or some extra nanoseconds of delay covering for high-resolution
* clocks not being quite in sync between CPUs, is not yet clear.
*/
loadFence();
if (firstWakeup != 0) {
enterHistogramSample(queue->stats.wakeupLatencyHistogram,
(currentTime(CLOCK_MONOTONIC) - firstWakeup) / 1000);
enterHistogramSample(queue->stats.wakeupQueueLengthHistogram,
getPendingCount(queue));
}
}
finish_wait(&queue->waitingWorkerThreads, &wait);
atomic_set(&queue->idle, 0);
return item;
}
/**
* Get the next work item to process, possibly waiting for one, unless
* kthread_should_stop indicates that it's time for us to shut down.
*
* If kthread_should_stop says it's time to stop but we have pending work
* items, return a work item.
*
* @param [in] queue The work queue to wait on
* @param [in] timeoutInterval How long to wait each iteration
*
* @return the next work item, or NULL to indicate shutdown is requested
**/
static KvdoWorkItem *getNextWorkItem(SimpleWorkQueue *queue,
TimeoutJiffies timeoutInterval)
{
KvdoWorkItem *item = pollForWorkItem(queue);
if (item != NULL) {
return item;
}
return waitForNextWorkItem(queue, timeoutInterval);
}
/**
* Execute a work item from a work queue, and do associated bookkeeping.
*
* @param [in] queue the work queue the item is from
* @param [in] item the work item to run
**/
static void processWorkItem(SimpleWorkQueue *queue,
KvdoWorkItem *item)
{
if (ASSERT(item->myQueue == &queue->common,
"item %" PRIptr " from queue %" PRIptr
" marked as being in this queue (%" PRIptr ")",
item, queue, item->myQueue) == UDS_SUCCESS) {
updateStatsForDequeue(&queue->stats, item);
item->myQueue = NULL;
}
// Save the index, so we can use it after the work function.
unsigned int index = item->statTableIndex;
uint64_t workStartTime = recordStartTime(index);
item->work(item);
// We just surrendered control of the work item; no more access.
item = NULL;
updateWorkItemStatsForWorkTime(&queue->stats.workItemStats, index,
workStartTime);
/*
* Be friendly to a CPU that has other work to do, if the kernel has told us
* to. This speeds up some performance tests; that "other work" might include
* other VDO threads.
*
* N.B.: We compute the pending count info here without any synchronization,
* but it's for stats reporting only, so being imprecise isn't too big a
* deal, as long as reads and writes are atomic operations.
*/
if (need_resched()) {
uint64_t timeBeforeReschedule = currentTime(CLOCK_MONOTONIC);
// Record the queue length we have *before* rescheduling.
unsigned int queueLen = getPendingCount(queue);
cond_resched();
uint64_t timeAfterReschedule = currentTime(CLOCK_MONOTONIC);
enterHistogramSample(queue->stats.rescheduleQueueLengthHistogram,
queueLen);
uint64_t runTimeNS = timeBeforeReschedule - queue->mostRecentWakeup;
enterHistogramSample(queue->stats.runTimeBeforeRescheduleHistogram,
runTimeNS / 1000);
atomic64_add(runTimeNS, &queue->stats.runTime);
uint64_t callTimeNS = timeAfterReschedule - timeBeforeReschedule;
enterHistogramSample(queue->stats.rescheduleTimeHistogram,
callTimeNS / 1000);
atomic64_add(callTimeNS, &queue->stats.rescheduleTime);
queue->mostRecentWakeup = timeAfterReschedule;
}
}
/**
* Main loop of the work queue worker thread.
*
* Waits for work items and runs them, until told to stop.
*
* @param queue The work queue to run
**/
static void serviceWorkQueue(SimpleWorkQueue *queue)
{
TimeoutJiffies timeoutInterval =
maxLong(2, usecs_to_jiffies(FUNNEL_HEARTBEAT_INTERVAL + 1) - 1);
runStartHook(queue);
while (true) {
KvdoWorkItem *item = getNextWorkItem(queue, timeoutInterval);
if (item == NULL) {
// No work items but kthread_should_stop was triggered.
break;
}
// Process the work item
processWorkItem(queue, item);
}
runFinishHook(queue);
}
/**
* Initialize per-thread data for a new worker thread and run the work queue.
* Called in a new thread created by kthread_run().
*
* @param ptr A pointer to the KvdoWorkQueue to run.
*
* @return 0 (indicating success to kthread_run())
**/
static int workQueueRunner(void *ptr)
{
SimpleWorkQueue *queue = ptr;
kobject_get(&queue->common.kobj);
WorkQueueStackHandle queueHandle;
initializeWorkQueueStackHandle(&queueHandle, queue);
queue->stats.startTime = queue->mostRecentWakeup = currentTime(CLOCK_MONOTONIC);
unsigned long flags;
spin_lock_irqsave(&queue->lock, flags);
queue->started = true;
spin_unlock_irqrestore(&queue->lock, flags);
wake_up(&queue->startWaiters);
serviceWorkQueue(queue);
// Zero out handle structure for safety.
memset(&queueHandle, 0, sizeof(queueHandle));
kobject_put(&queue->common.kobj);
return 0;
}
// Preparing work items
/**********************************************************************/
void setupWorkItem(KvdoWorkItem *item,
KvdoWorkFunction work,
void *statsFunction,
unsigned int action)
{
ASSERT_LOG_ONLY(item->myQueue == NULL,
"setupWorkItem not called on enqueued work item");
item->work = work;
item->statsFunction = ((statsFunction == NULL) ? work : statsFunction);
item->statTableIndex = 0;
item->action = action;
item->myQueue = NULL;
item->executionTime = 0;
item->next = NULL;
}
// Thread management
/**********************************************************************/
static inline void wakeWorkerThread(SimpleWorkQueue *queue)
{
smp_mb();
atomic64_cmpxchg(&queue->firstWakeup, 0, currentTime(CLOCK_MONOTONIC));
// Despite the name, there's a maximum of one thread in this list.
wake_up(&queue->waitingWorkerThreads);
}
// Delayed work items
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
/**
* Timer function invoked when a delayed work item is ready to run.
*
* @param timer The timer which has just finished
**/
static void processDelayedWorkItems(struct timer_list *timer)
#else
/**
* Timer function invoked when a delayed work item is ready to run.
*
* @param data The queue pointer, as an unsigned long
**/
static void processDelayedWorkItems(unsigned long data)
#endif
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
SimpleWorkQueue *queue = from_timer(queue, timer, delayedItemsTimer);
#else
SimpleWorkQueue *queue = (SimpleWorkQueue *) data;
#endif
Jiffies nextExecutionTime = 0;
bool reschedule = false;
bool needsWakeup = false;
unsigned long flags;
spin_lock_irqsave(&queue->lock, flags);
while (!isWorkItemListEmpty(&queue->delayedItems)) {
KvdoWorkItem *item = workItemListPeek(&queue->delayedItems);
if (item->executionTime > jiffies) {
nextExecutionTime = item->executionTime;
reschedule = true;
break;
}
workItemListPoll(&queue->delayedItems);
item->executionTime = 0; // not actually looked at...
item->myQueue = NULL;
needsWakeup |= enqueueWorkQueueItem(queue, item);
}
spin_unlock_irqrestore(&queue->lock, flags);
if (reschedule) {
mod_timer(&queue->delayedItemsTimer, nextExecutionTime);
}
if (needsWakeup) {
wakeWorkerThread(queue);
}
}
// Creation & teardown
/**********************************************************************/
static bool queueStarted(SimpleWorkQueue *queue)
{
unsigned long flags;
spin_lock_irqsave(&queue->lock, flags);
bool started = queue->started;
spin_unlock_irqrestore(&queue->lock, flags);
return started;
}
/**
* Create a simple work queue with a worker thread.
*
* @param [in] threadNamePrefix The per-device prefix to use in thread names
* @param [in] name The queue name
* @param [in] parentKobject The parent sysfs node
* @param [in] owner The kernel layer owning the work queue
* @param [in] private Private data of the queue for use by work
* items or other queue-specific functions
* @param [in] type The work queue type defining the lifecycle
* functions, queue actions, priorities, and
* timeout behavior
* @param [out] queuePtr Where to store the queue handle
*
* @return VDO_SUCCESS or an error code
**/
static int makeSimpleWorkQueue(const char *threadNamePrefix,
const char *name,
struct kobject *parentKobject,
KernelLayer *owner,
void *private,
const KvdoWorkQueueType *type,
SimpleWorkQueue **queuePtr)
{
SimpleWorkQueue *queue;
int result = ALLOCATE(1, SimpleWorkQueue, "simple work queue", &queue);
if (result != UDS_SUCCESS) {
return result;
}
queue->type = type;
queue->private = private;
queue->common.owner = owner;
unsigned int numPriorityLists = 1;
for (int i = 0; i < WORK_QUEUE_ACTION_COUNT; i++) {
const KvdoWorkQueueAction *action = &queue->type->actionTable[i];
if (action->name == NULL) {
break;
}
unsigned int code = action->code;
unsigned int priority = action->priority;
result = ASSERT(code < WORK_QUEUE_ACTION_COUNT,
"invalid action code %u in work queue initialization",
code);
if (result != VDO_SUCCESS) {
FREE(queue);
return result;
}
result = ASSERT(priority < WORK_QUEUE_PRIORITY_COUNT,
"invalid action priority %u in work queue initialization",
priority);
if (result != VDO_SUCCESS) {
FREE(queue);
return result;
}
queue->priorityMap[code] = priority;
if (numPriorityLists <= priority) {
numPriorityLists = priority + 1;
}
}
result = duplicateString(name, "queue name", &queue->common.name);
if (result != VDO_SUCCESS) {
FREE(queue);
return -ENOMEM;
}
init_waitqueue_head(&queue->waitingWorkerThreads);
init_waitqueue_head(&queue->startWaiters);
spin_lock_init(&queue->lock);
initializeWorkItemList(&queue->delayedItems);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
timer_setup(&queue->delayedItemsTimer, processDelayedWorkItems, 0);
#else
setup_timer(&queue->delayedItemsTimer, processDelayedWorkItems,
(unsigned long) queue);
#endif
kobject_init(&queue->common.kobj, &simpleWorkQueueKobjType);
result = kobject_add(&queue->common.kobj, parentKobject, queue->common.name);
if (result != 0) {
logError("Cannot add sysfs node: %d", result);
freeSimpleWorkQueue(queue);
return result;
}
queue->numPriorityLists = numPriorityLists;
for (int i = 0; i < WORK_QUEUE_PRIORITY_COUNT; i++) {
result = makeFunnelQueue(&queue->priorityLists[i]);
if (result != UDS_SUCCESS) {
freeSimpleWorkQueue(queue);
return result;
}
}
result = initializeWorkQueueStats(&queue->stats, &queue->common.kobj);
if (result != 0) {
logError("Cannot initialize statistics tracking: %d", result);
freeSimpleWorkQueue(queue);
return result;
}
queue->started = false;
struct task_struct *thread = NULL;
thread = kthread_run(workQueueRunner, queue, "%s:%s", threadNamePrefix,
queue->common.name);
if (IS_ERR(thread)) {
freeSimpleWorkQueue(queue);
return (int) PTR_ERR(thread);
}
queue->thread = thread;
atomic_set(&queue->threadID, thread->pid);
/*
* If we don't wait to ensure the thread is running VDO code, a
* quick kthread_stop (due to errors elsewhere) could cause it to
* never get as far as running VDO, skipping the cleanup code.
*
* Eventually we should just make that path safe too, and then we
* won't need this synchronization.
*/
wait_event(queue->startWaiters, queueStarted(queue) == true);
*queuePtr = queue;
return UDS_SUCCESS;
}
/**********************************************************************/
int makeWorkQueue(const char *threadNamePrefix,
const char *name,
struct kobject *parentKobject,
KernelLayer *owner,
void *private,
const KvdoWorkQueueType *type,
unsigned int threadCount,
KvdoWorkQueue **queuePtr)
{
if (threadCount == 1) {
SimpleWorkQueue *simpleQueue;
int result = makeSimpleWorkQueue(threadNamePrefix, name, parentKobject,
owner, private, type, &simpleQueue);
if (result == VDO_SUCCESS) {
*queuePtr = &simpleQueue->common;
}
return result;
}
RoundRobinWorkQueue *queue;
int result = ALLOCATE(1, RoundRobinWorkQueue, "round-robin work queue",
&queue);
if (result != UDS_SUCCESS) {
return result;
}
result = ALLOCATE(threadCount, SimpleWorkQueue *, "subordinate work queues",
&queue->serviceQueues);
if (result != UDS_SUCCESS) {
FREE(queue);
return result;
}
queue->numServiceQueues = threadCount;
queue->common.roundRobinMode = true;
queue->common.owner = owner;
result = duplicateString(name, "queue name", &queue->common.name);
if (result != VDO_SUCCESS) {
FREE(queue->serviceQueues);
FREE(queue);
return -ENOMEM;
}
kobject_init(&queue->common.kobj, &roundRobinWorkQueueKobjType);
result = kobject_add(&queue->common.kobj, parentKobject, queue->common.name);
if (result != 0) {
logError("Cannot add sysfs node: %d", result);
finishWorkQueue(&queue->common);
kobject_put(&queue->common.kobj);
return result;
}
*queuePtr = &queue->common;
char threadName[TASK_COMM_LEN];
for (unsigned int i = 0; i < threadCount; i++) {
snprintf(threadName, sizeof(threadName), "%s%u", name, i);
result = makeSimpleWorkQueue(threadNamePrefix, threadName,
&queue->common.kobj, owner, private, type,
&queue->serviceQueues[i]);
if (result != VDO_SUCCESS) {
queue->numServiceQueues = i;
// Destroy previously created subordinates.
finishWorkQueue(*queuePtr);
freeWorkQueue(queuePtr);
return result;
}
queue->serviceQueues[i]->parentQueue = *queuePtr;
}
return VDO_SUCCESS;
}
/**
* Shut down a simple work queue's worker thread.
*
* @param queue The work queue to shut down
**/
static void finishSimpleWorkQueue(SimpleWorkQueue *queue)
{
// Tell the worker thread to shut down.
if (queue->thread != NULL) {
atomic_set(&queue->threadID, 0);
// Waits for thread to exit.
kthread_stop(queue->thread);
}
queue->thread = NULL;
}
/**
* Shut down a round-robin work queue's service queues.
*
* @param queue The work queue to shut down
**/
static void finishRoundRobinWorkQueue(RoundRobinWorkQueue *queue)
{
SimpleWorkQueue **queueTable = queue->serviceQueues;
unsigned int count = queue->numServiceQueues;
for (unsigned int i = 0; i < count; i++) {
finishSimpleWorkQueue(queueTable[i]);
}
}
/**********************************************************************/
void finishWorkQueue(KvdoWorkQueue *queue)
{
if (queue->roundRobinMode) {
finishRoundRobinWorkQueue(asRoundRobinWorkQueue(queue));
} else {
finishSimpleWorkQueue(asSimpleWorkQueue(queue));
}
}
/**
* Tear down a simple work queue, and decrement the kobject reference
* count on it.
*
* @param queue The work queue
**/
static void freeSimpleWorkQueue(SimpleWorkQueue *queue)
{
for (unsigned int i = 0; i < WORK_QUEUE_PRIORITY_COUNT; i++) {
freeFunnelQueue(queue->priorityLists[i]);
}
cleanupWorkQueueStats(&queue->stats);
kobject_put(&queue->common.kobj);
}
/**
* Tear down a round-robin work queue and its service queues, and
* decrement the kobject reference count on it.
*
* @param queue The work queue
**/
static void freeRoundRobinWorkQueue(RoundRobinWorkQueue *queue)
{
SimpleWorkQueue **queueTable = queue->serviceQueues;
unsigned int count = queue->numServiceQueues;
queue->serviceQueues = NULL;
for (unsigned int i = 0; i < count; i++) {
freeSimpleWorkQueue(queueTable[i]);
}
FREE(queueTable);
kobject_put(&queue->common.kobj);
}
/**********************************************************************/
void freeWorkQueue(KvdoWorkQueue **queuePtr)
{
KvdoWorkQueue *queue = *queuePtr;
if (queue == NULL) {
return;
}
*queuePtr = NULL;
finishWorkQueue(queue);
if (queue->roundRobinMode) {
freeRoundRobinWorkQueue(asRoundRobinWorkQueue(queue));
} else {
freeSimpleWorkQueue(asSimpleWorkQueue(queue));
}
}
// Debugging dumps
/**********************************************************************/
static void dumpSimpleWorkQueue(SimpleWorkQueue *queue)
{
mutex_lock(&queueDataLock);
// Take a snapshot to reduce inconsistency in logged numbers.
queueData = *queue;
const char *threadStatus;
char taskStateReport = '-';
if (queueData.thread != NULL) {
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,14,0)
taskStateReport = task_state_to_char(queue->thread);
#else
unsigned int taskState = queue->thread->state & TASK_REPORT;
taskState &= 0x1ff;
unsigned int taskStateIndex;
if (taskState != 0) {
taskStateIndex = __ffs(taskState)+1;
BUG_ON(taskStateIndex >= sizeof(TASK_STATE_TO_CHAR_STR));
} else {
taskStateIndex = 0;
}
taskStateReport = TASK_STATE_TO_CHAR_STR[taskStateIndex];
#endif
}
if (queueData.thread == NULL) {
threadStatus = "no threads";
} else if (atomic_read(&queueData.idle)) {
threadStatus = "idle";
} else {
threadStatus = "running";
}
logInfo("workQ %" PRIptr " (%s) %u entries %llu waits, %s (%c)",
&queue->common,
queueData.common.name,
getPendingCount(&queueData),
queueData.stats.waits,
threadStatus,
taskStateReport);
logWorkItemStats(&queueData.stats.workItemStats);
logWorkQueueStats(queue);
mutex_unlock(&queueDataLock);
// ->lock spin lock status?
// ->waitingWorkerThreads wait queue status? anyone waiting?
}
/**********************************************************************/
void dumpWorkQueue(KvdoWorkQueue *queue)
{
if (queue->roundRobinMode) {
RoundRobinWorkQueue *roundRobinQueue = asRoundRobinWorkQueue(queue);
for (unsigned int i = 0; i < roundRobinQueue->numServiceQueues; i++) {
dumpSimpleWorkQueue(roundRobinQueue->serviceQueues[i]);
}
} else {
dumpSimpleWorkQueue(asSimpleWorkQueue(queue));
}
}
/**********************************************************************/
void dumpWorkItemToBuffer(KvdoWorkItem *item, char *buffer, size_t length)
{
size_t currentLength
= snprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
item->myQueue == NULL ? "-" : item->myQueue->name);
if (currentLength < length) {
getFunctionName(item->statsFunction, buffer + currentLength,
length - currentLength);
}
}
// Work submission
/**********************************************************************/
void enqueueWorkQueue(KvdoWorkQueue *kvdoWorkQueue, KvdoWorkItem *item)
{
SimpleWorkQueue *queue = pickSimpleQueue(kvdoWorkQueue);
item->executionTime = 0;
if (enqueueWorkQueueItem(queue, item)) {
wakeWorkerThread(queue);
}
}
/**********************************************************************/
void enqueueWorkQueueDelayed(KvdoWorkQueue *kvdoWorkQueue,
KvdoWorkItem *item,
Jiffies executionTime)
{
if (executionTime <= jiffies) {
enqueueWorkQueue(kvdoWorkQueue, item);
return;
}
SimpleWorkQueue *queue = pickSimpleQueue(kvdoWorkQueue);
bool rescheduleTimer = false;
unsigned long flags;
item->executionTime = executionTime;
// Lock if the work item is delayed. All delayed items are handled via a
// single linked list.
spin_lock_irqsave(&queue->lock, flags);
if (isWorkItemListEmpty(&queue->delayedItems)) {
rescheduleTimer = true;
}
/*
* XXX We should keep the list sorted, but at the moment the list won't
* grow above a single entry anyway.
*/
item->myQueue = &queue->common;
addToWorkItemList(&queue->delayedItems, item);
spin_unlock_irqrestore(&queue->lock, flags);
if (rescheduleTimer) {
mod_timer(&queue->delayedItemsTimer, executionTime);
}
}
// Misc
/**********************************************************************/
KvdoWorkQueue *getCurrentWorkQueue(void)
{
SimpleWorkQueue *queue = getCurrentThreadWorkQueue();
return (queue == NULL) ? NULL : &queue->common;
}
/**********************************************************************/
KernelLayer *getWorkQueueOwner(KvdoWorkQueue *queue)
{
return queue->owner;
}
/**********************************************************************/
void *getWorkQueuePrivateData(void)
{
SimpleWorkQueue *queue = getCurrentThreadWorkQueue();
return (queue != NULL) ? queue->private : NULL;
}
/**********************************************************************/
void setWorkQueuePrivateData(void *newData)
{
SimpleWorkQueue *queue = getCurrentThreadWorkQueue();
BUG_ON(queue == NULL);
queue->private = newData;
}
/**********************************************************************/
void initWorkQueueOnce(void)
{
// We can't use DEFINE_MUTEX because it's not compatible with c99 mode.
mutex_init(&queueDataLock);
initWorkQueueStackHandleOnce();
}