/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/batchProcessor.c#2 $
*/
#include "batchProcessor.h"
#include "memoryAlloc.h"
#include "constants.h"
#include "kernelLayer.h"
/*
* On memory ordering:
*
* The producer thread does: enqueue item on queue (xchg, which is
* implicitly interlocked, then a store), memory barrier, then atomic
* cmpxchg of the state field. The x86 architecture spec says the
* xchg, store, lock-cmpxchg sequence cannot be reordered, but on
* architectures using load-linked and store-conditional for the
* cmpxchg, like AArch64, the LL can be reordered with the store, so
* we add a barrier.
*
* The consumer thread, when it is running out of work, does: read
* queue (find empty), set state, mfence, read queue again just to be
* sure. The set-state and read-queue cannot be reordered with respect
* to the mfence (but without the mfence, the read could be moved
* before the set).
*
* The xchg and mfence impose a total order across processors, and
* each processor sees the stores done by the other processor in the
* required order. If the xchg happens before the mfence, the
* consumer's "read queue again" operation will see the update. If the
* mfence happens first, the producer's "cmpxchg state" will see its
* updated value.
*
* These are the semantics implemented by memory set to WB (write-back
* caching) mode on x86-64. So, the simple analysis is that no wakeups
* should be missed.
*
* It's a little subtler with funnel queues, since one interrupted or
* delayed enqueue operation (see the commentary in funnelQueuePut)
* can cause another, concurrent enqueue operation to complete without
* actually making the entry visible to the consumer. In essence, one
* update makes no new work items visible to the consumer, and the
* other (when it eventually completes) makes two (or more) work items
* visible, and each one ensures that the consumer will process what
* it has made visible.
*/
typedef enum batchProcessorState {
BATCH_PROCESSOR_IDLE,
BATCH_PROCESSOR_ENQUEUED,
} BatchProcessorState;
struct batchProcessor {
spinlock_t consumerLock;
FunnelQueue *queue;
KvdoWorkItem workItem;
atomic_t state;
BatchProcessorCallback callback;
void *closure;
KernelLayer *layer;
};
static void scheduleBatchProcessing(BatchProcessor *batch);
/**
* Apply the batch processing function to the accumulated set of
* objects.
*
* Runs in a "CPU queue".
*
* @param [in] item The work item embedded in the BatchProcessor
**/
static void batchProcessorWork(KvdoWorkItem *item)
{
BatchProcessor *batch = container_of(item, BatchProcessor, workItem);
spin_lock(&batch->consumerLock);
while (!isFunnelQueueEmpty(batch->queue)) {
batch->callback(batch, batch->closure);
}
atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
memoryFence();
bool needReschedule = !isFunnelQueueEmpty(batch->queue);
spin_unlock(&batch->consumerLock);
if (needReschedule) {
scheduleBatchProcessing(batch);
}
}
/**
* Ensure that the batch-processing function is scheduled to run.
*
* If we're the thread that switches the BatchProcessor state from
* idle to enqueued, we're the thread responsible for actually
* enqueueing it. If some other thread got there first, or it was
* already enqueued, it's not our problem.
*
* @param [in] batch The BatchProcessor control data
**/
static void scheduleBatchProcessing(BatchProcessor *batch)
{
/*
* We want this to be very fast in the common cases.
*
* In testing on our "mgh" class machines (HP ProLiant DL380p Gen8,
* Intel Xeon E5-2690, 2.9GHz), it appears that under some
* conditions it's a little faster to use a memory fence and then
* read the "state" field, skipping the cmpxchg if the state is
* already set to BATCH_PROCESSOR_ENQUEUED. (Sometimes slightly
* faster still if we prefetch the state field first.) Note that the
* read requires the fence, otherwise it could be executed before
* the preceding store by the FunnelQueue code to the "next"
* pointer, which can, very rarely, result in failing to issue a
* wakeup when needed.
*
* However, the gain is small, and in testing on our older "harvard"
* class machines (Intel Xeon X5680, 3.33GHz) it was a clear win to
* skip all of that and go right for the cmpxchg.
*
* Of course, the tradeoffs may be sensitive to the particular work
* going on, cache pressure, etc.
*/
smp_mb();
BatchProcessorState oldState
= atomic_cmpxchg(&batch->state, BATCH_PROCESSOR_IDLE,
BATCH_PROCESSOR_ENQUEUED);
bool doSchedule = (oldState == BATCH_PROCESSOR_IDLE);
if (doSchedule) {
enqueueCPUWorkQueue(batch->layer, &batch->workItem);
}
}
/**********************************************************************/
int makeBatchProcessor(KernelLayer *layer,
BatchProcessorCallback callback,
void *closure,
BatchProcessor **batchPtr)
{
BatchProcessor *batch;
int result = ALLOCATE(1, BatchProcessor, "batchProcessor", &batch);
if (result != UDS_SUCCESS) {
return result;
}
result = makeFunnelQueue(&batch->queue);
if (result != UDS_SUCCESS) {
FREE(batch);
return result;
}
spin_lock_init(&batch->consumerLock);
setupWorkItem(&batch->workItem, batchProcessorWork,
(KvdoWorkFunction) callback, CPU_Q_ACTION_COMPLETE_KVIO);
atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
batch->callback = callback;
batch->closure = closure;
batch->layer = layer;
*batchPtr = batch;
return UDS_SUCCESS;
}
/**********************************************************************/
void addToBatchProcessor(BatchProcessor *batch, KvdoWorkItem *item)
{
funnelQueuePut(batch->queue, &item->workQueueEntryLink);
scheduleBatchProcessing(batch);
}
/**********************************************************************/
KvdoWorkItem *nextBatchItem(BatchProcessor *batch)
{
FunnelQueueEntry *fqEntry = funnelQueuePoll(batch->queue);
if (fqEntry == NULL) {
return NULL;
}
return container_of(fqEntry, KvdoWorkItem, workQueueEntryLink);
}
/**********************************************************************/
void condReschedBatchProcessor(BatchProcessor *batch)
{
cond_resched_lock(&batch->consumerLock);
}
/**********************************************************************/
void freeBatchProcessor(BatchProcessor **batchPtr)
{
BatchProcessor *batch = *batchPtr;
if (batch) {
memoryFence();
BUG_ON(atomic_read(&batch->state) == BATCH_PROCESSOR_ENQUEUED);
freeFunnelQueue(batch->queue);
FREE(batch);
*batchPtr = NULL;
}
}