|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Copyright (c) 2020 Red Hat, Inc.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is free software; you can redistribute it and/or
|
|
Packit Service |
310c69 |
* modify it under the terms of the GNU General Public License
|
|
Packit Service |
310c69 |
* as published by the Free Software Foundation; either version 2
|
|
Packit Service |
310c69 |
* of the License, or (at your option) any later version.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is distributed in the hope that it will be useful,
|
|
Packit Service |
310c69 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit Service |
310c69 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit Service |
310c69 |
* GNU General Public License for more details.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* You should have received a copy of the GNU General Public License
|
|
Packit Service |
310c69 |
* along with this program; if not, write to the Free Software
|
|
Packit Service |
310c69 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
Packit Service |
310c69 |
* 02110-1301, USA.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/batchProcessor.c#2 $
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "batchProcessor.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "memoryAlloc.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "constants.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "kernelLayer.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* On memory ordering:
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* The producer thread does: enqueue item on queue (xchg, which is
|
|
Packit Service |
310c69 |
* implicitly interlocked, then a store), memory barrier, then atomic
|
|
Packit Service |
310c69 |
* cmpxchg of the state field. The x86 architecture spec says the
|
|
Packit Service |
310c69 |
* xchg, store, lock-cmpxchg sequence cannot be reordered, but on
|
|
Packit Service |
310c69 |
* architectures using load-linked and store-conditional for the
|
|
Packit Service |
310c69 |
* cmpxchg, like AArch64, the LL can be reordered with the store, so
|
|
Packit Service |
310c69 |
* we add a barrier.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* The consumer thread, when it is running out of work, does: read
|
|
Packit Service |
310c69 |
* queue (find empty), set state, mfence, read queue again just to be
|
|
Packit Service |
310c69 |
* sure. The set-state and read-queue cannot be reordered with respect
|
|
Packit Service |
310c69 |
* to the mfence (but without the mfence, the read could be moved
|
|
Packit Service |
310c69 |
* before the set).
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* The xchg and mfence impose a total order across processors, and
|
|
Packit Service |
310c69 |
* each processor sees the stores done by the other processor in the
|
|
Packit Service |
310c69 |
* required order. If the xchg happens before the mfence, the
|
|
Packit Service |
310c69 |
* consumer's "read queue again" operation will see the update. If the
|
|
Packit Service |
310c69 |
* mfence happens first, the producer's "cmpxchg state" will see its
|
|
Packit Service |
310c69 |
* updated value.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* These are the semantics implemented by memory set to WB (write-back
|
|
Packit Service |
310c69 |
* caching) mode on x86-64. So, the simple analysis is that no wakeups
|
|
Packit Service |
310c69 |
* should be missed.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* It's a little subtler with funnel queues, since one interrupted or
|
|
Packit Service |
310c69 |
* delayed enqueue operation (see the commentary in funnelQueuePut)
|
|
Packit Service |
310c69 |
* can cause another, concurrent enqueue operation to complete without
|
|
Packit Service |
310c69 |
* actually making the entry visible to the consumer. In essence, one
|
|
Packit Service |
310c69 |
* update makes no new work items visible to the consumer, and the
|
|
Packit Service |
310c69 |
* other (when it eventually completes) makes two (or more) work items
|
|
Packit Service |
310c69 |
* visible, and each one ensures that the consumer will process what
|
|
Packit Service |
310c69 |
* it has made visible.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
typedef enum batchProcessorState {
|
|
Packit Service |
310c69 |
BATCH_PROCESSOR_IDLE,
|
|
Packit Service |
310c69 |
BATCH_PROCESSOR_ENQUEUED,
|
|
Packit Service |
310c69 |
} BatchProcessorState;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
struct batchProcessor {
|
|
Packit Service |
310c69 |
spinlock_t consumerLock;
|
|
Packit Service |
310c69 |
FunnelQueue *queue;
|
|
Packit Service |
310c69 |
KvdoWorkItem workItem;
|
|
Packit Service |
310c69 |
atomic_t state;
|
|
Packit Service |
310c69 |
BatchProcessorCallback callback;
|
|
Packit Service |
310c69 |
void *closure;
|
|
Packit Service |
310c69 |
KernelLayer *layer;
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
static void scheduleBatchProcessing(BatchProcessor *batch);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Apply the batch processing function to the accumulated set of
|
|
Packit Service |
310c69 |
* objects.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* Runs in a "CPU queue".
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param [in] item The work item embedded in the BatchProcessor
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void batchProcessorWork(KvdoWorkItem *item)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
BatchProcessor *batch = container_of(item, BatchProcessor, workItem);
|
|
Packit Service |
310c69 |
spin_lock(&batch->consumerLock);
|
|
Packit Service |
310c69 |
while (!isFunnelQueueEmpty(batch->queue)) {
|
|
Packit Service |
310c69 |
batch->callback(batch, batch->closure);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
|
|
Packit Service |
310c69 |
memoryFence();
|
|
Packit Service |
310c69 |
bool needReschedule = !isFunnelQueueEmpty(batch->queue);
|
|
Packit Service |
310c69 |
spin_unlock(&batch->consumerLock);
|
|
Packit Service |
310c69 |
if (needReschedule) {
|
|
Packit Service |
310c69 |
scheduleBatchProcessing(batch);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Ensure that the batch-processing function is scheduled to run.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* If we're the thread that switches the BatchProcessor state from
|
|
Packit Service |
310c69 |
* idle to enqueued, we're the thread responsible for actually
|
|
Packit Service |
310c69 |
* enqueueing it. If some other thread got there first, or it was
|
|
Packit Service |
310c69 |
* already enqueued, it's not our problem.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param [in] batch The BatchProcessor control data
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void scheduleBatchProcessing(BatchProcessor *batch)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* We want this to be very fast in the common cases.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* In testing on our "mgh" class machines (HP ProLiant DL380p Gen8,
|
|
Packit Service |
310c69 |
* Intel Xeon E5-2690, 2.9GHz), it appears that under some
|
|
Packit Service |
310c69 |
* conditions it's a little faster to use a memory fence and then
|
|
Packit Service |
310c69 |
* read the "state" field, skipping the cmpxchg if the state is
|
|
Packit Service |
310c69 |
* already set to BATCH_PROCESSOR_ENQUEUED. (Sometimes slightly
|
|
Packit Service |
310c69 |
* faster still if we prefetch the state field first.) Note that the
|
|
Packit Service |
310c69 |
* read requires the fence, otherwise it could be executed before
|
|
Packit Service |
310c69 |
* the preceding store by the FunnelQueue code to the "next"
|
|
Packit Service |
310c69 |
* pointer, which can, very rarely, result in failing to issue a
|
|
Packit Service |
310c69 |
* wakeup when needed.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* However, the gain is small, and in testing on our older "harvard"
|
|
Packit Service |
310c69 |
* class machines (Intel Xeon X5680, 3.33GHz) it was a clear win to
|
|
Packit Service |
310c69 |
* skip all of that and go right for the cmpxchg.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* Of course, the tradeoffs may be sensitive to the particular work
|
|
Packit Service |
310c69 |
* going on, cache pressure, etc.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
smp_mb();
|
|
Packit Service |
310c69 |
BatchProcessorState oldState
|
|
Packit Service |
310c69 |
= atomic_cmpxchg(&batch->state, BATCH_PROCESSOR_IDLE,
|
|
Packit Service |
310c69 |
BATCH_PROCESSOR_ENQUEUED);
|
|
Packit Service |
310c69 |
bool doSchedule = (oldState == BATCH_PROCESSOR_IDLE);
|
|
Packit Service |
310c69 |
if (doSchedule) {
|
|
Packit Service |
310c69 |
enqueueCPUWorkQueue(batch->layer, &batch->workItem);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
int makeBatchProcessor(KernelLayer *layer,
|
|
Packit Service |
310c69 |
BatchProcessorCallback callback,
|
|
Packit Service |
310c69 |
void *closure,
|
|
Packit Service |
310c69 |
BatchProcessor **batchPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
BatchProcessor *batch;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
int result = ALLOCATE(1, BatchProcessor, "batchProcessor", &batch);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
result = makeFunnelQueue(&batch->queue);
|
|
Packit Service |
310c69 |
if (result != UDS_SUCCESS) {
|
|
Packit Service |
310c69 |
FREE(batch);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
spin_lock_init(&batch->consumerLock);
|
|
Packit Service |
310c69 |
setupWorkItem(&batch->workItem, batchProcessorWork,
|
|
Packit Service |
310c69 |
(KvdoWorkFunction) callback, CPU_Q_ACTION_COMPLETE_KVIO);
|
|
Packit Service |
310c69 |
atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
|
|
Packit Service |
310c69 |
batch->callback = callback;
|
|
Packit Service |
310c69 |
batch->closure = closure;
|
|
Packit Service |
310c69 |
batch->layer = layer;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
*batchPtr = batch;
|
|
Packit Service |
310c69 |
return UDS_SUCCESS;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void addToBatchProcessor(BatchProcessor *batch, KvdoWorkItem *item)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
funnelQueuePut(batch->queue, &item->workQueueEntryLink);
|
|
Packit Service |
310c69 |
scheduleBatchProcessing(batch);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
KvdoWorkItem *nextBatchItem(BatchProcessor *batch)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
FunnelQueueEntry *fqEntry = funnelQueuePoll(batch->queue);
|
|
Packit Service |
310c69 |
if (fqEntry == NULL) {
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
return container_of(fqEntry, KvdoWorkItem, workQueueEntryLink);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void condReschedBatchProcessor(BatchProcessor *batch)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
cond_resched_lock(&batch->consumerLock);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void freeBatchProcessor(BatchProcessor **batchPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
BatchProcessor *batch = *batchPtr;
|
|
Packit Service |
310c69 |
if (batch) {
|
|
Packit Service |
310c69 |
memoryFence();
|
|
Packit Service |
310c69 |
BUG_ON(atomic_read(&batch->state) == BATCH_PROCESSOR_ENQUEUED);
|
|
Packit Service |
310c69 |
freeFunnelQueue(batch->queue);
|
|
Packit Service |
310c69 |
FREE(batch);
|
|
Packit Service |
310c69 |
*batchPtr = NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|