/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/batchProcessor.c#2 $ */ #include "batchProcessor.h" #include "memoryAlloc.h" #include "constants.h" #include "kernelLayer.h" /* * On memory ordering: * * The producer thread does: enqueue item on queue (xchg, which is * implicitly interlocked, then a store), memory barrier, then atomic * cmpxchg of the state field. The x86 architecture spec says the * xchg, store, lock-cmpxchg sequence cannot be reordered, but on * architectures using load-linked and store-conditional for the * cmpxchg, like AArch64, the LL can be reordered with the store, so * we add a barrier. * * The consumer thread, when it is running out of work, does: read * queue (find empty), set state, mfence, read queue again just to be * sure. The set-state and read-queue cannot be reordered with respect * to the mfence (but without the mfence, the read could be moved * before the set). * * The xchg and mfence impose a total order across processors, and * each processor sees the stores done by the other processor in the * required order. If the xchg happens before the mfence, the * consumer's "read queue again" operation will see the update. If the * mfence happens first, the producer's "cmpxchg state" will see its * updated value. * * These are the semantics implemented by memory set to WB (write-back * caching) mode on x86-64. So, the simple analysis is that no wakeups * should be missed. * * It's a little subtler with funnel queues, since one interrupted or * delayed enqueue operation (see the commentary in funnelQueuePut) * can cause another, concurrent enqueue operation to complete without * actually making the entry visible to the consumer. In essence, one * update makes no new work items visible to the consumer, and the * other (when it eventually completes) makes two (or more) work items * visible, and each one ensures that the consumer will process what * it has made visible. */ typedef enum batchProcessorState { BATCH_PROCESSOR_IDLE, BATCH_PROCESSOR_ENQUEUED, } BatchProcessorState; struct batchProcessor { spinlock_t consumerLock; FunnelQueue *queue; KvdoWorkItem workItem; atomic_t state; BatchProcessorCallback callback; void *closure; KernelLayer *layer; }; static void scheduleBatchProcessing(BatchProcessor *batch); /** * Apply the batch processing function to the accumulated set of * objects. * * Runs in a "CPU queue". * * @param [in] item The work item embedded in the BatchProcessor **/ static void batchProcessorWork(KvdoWorkItem *item) { BatchProcessor *batch = container_of(item, BatchProcessor, workItem); spin_lock(&batch->consumerLock); while (!isFunnelQueueEmpty(batch->queue)) { batch->callback(batch, batch->closure); } atomic_set(&batch->state, BATCH_PROCESSOR_IDLE); memoryFence(); bool needReschedule = !isFunnelQueueEmpty(batch->queue); spin_unlock(&batch->consumerLock); if (needReschedule) { scheduleBatchProcessing(batch); } } /** * Ensure that the batch-processing function is scheduled to run. * * If we're the thread that switches the BatchProcessor state from * idle to enqueued, we're the thread responsible for actually * enqueueing it. If some other thread got there first, or it was * already enqueued, it's not our problem. * * @param [in] batch The BatchProcessor control data **/ static void scheduleBatchProcessing(BatchProcessor *batch) { /* * We want this to be very fast in the common cases. * * In testing on our "mgh" class machines (HP ProLiant DL380p Gen8, * Intel Xeon E5-2690, 2.9GHz), it appears that under some * conditions it's a little faster to use a memory fence and then * read the "state" field, skipping the cmpxchg if the state is * already set to BATCH_PROCESSOR_ENQUEUED. (Sometimes slightly * faster still if we prefetch the state field first.) Note that the * read requires the fence, otherwise it could be executed before * the preceding store by the FunnelQueue code to the "next" * pointer, which can, very rarely, result in failing to issue a * wakeup when needed. * * However, the gain is small, and in testing on our older "harvard" * class machines (Intel Xeon X5680, 3.33GHz) it was a clear win to * skip all of that and go right for the cmpxchg. * * Of course, the tradeoffs may be sensitive to the particular work * going on, cache pressure, etc. */ smp_mb(); BatchProcessorState oldState = atomic_cmpxchg(&batch->state, BATCH_PROCESSOR_IDLE, BATCH_PROCESSOR_ENQUEUED); bool doSchedule = (oldState == BATCH_PROCESSOR_IDLE); if (doSchedule) { enqueueCPUWorkQueue(batch->layer, &batch->workItem); } } /**********************************************************************/ int makeBatchProcessor(KernelLayer *layer, BatchProcessorCallback callback, void *closure, BatchProcessor **batchPtr) { BatchProcessor *batch; int result = ALLOCATE(1, BatchProcessor, "batchProcessor", &batch); if (result != UDS_SUCCESS) { return result; } result = makeFunnelQueue(&batch->queue); if (result != UDS_SUCCESS) { FREE(batch); return result; } spin_lock_init(&batch->consumerLock); setupWorkItem(&batch->workItem, batchProcessorWork, (KvdoWorkFunction) callback, CPU_Q_ACTION_COMPLETE_KVIO); atomic_set(&batch->state, BATCH_PROCESSOR_IDLE); batch->callback = callback; batch->closure = closure; batch->layer = layer; *batchPtr = batch; return UDS_SUCCESS; } /**********************************************************************/ void addToBatchProcessor(BatchProcessor *batch, KvdoWorkItem *item) { funnelQueuePut(batch->queue, &item->workQueueEntryLink); scheduleBatchProcessing(batch); } /**********************************************************************/ KvdoWorkItem *nextBatchItem(BatchProcessor *batch) { FunnelQueueEntry *fqEntry = funnelQueuePoll(batch->queue); if (fqEntry == NULL) { return NULL; } return container_of(fqEntry, KvdoWorkItem, workQueueEntryLink); } /**********************************************************************/ void condReschedBatchProcessor(BatchProcessor *batch) { cond_resched_lock(&batch->consumerLock); } /**********************************************************************/ void freeBatchProcessor(BatchProcessor **batchPtr) { BatchProcessor *batch = *batchPtr; if (batch) { memoryFence(); BUG_ON(atomic_read(&batch->state) == BATCH_PROCESSOR_ENQUEUED); freeFunnelQueue(batch->queue); FREE(batch); *batchPtr = NULL; } }