Blame source/vdo/kernel/batchProcessor.c

Packit Service 75d76b
/*
Packit Service 75d76b
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 75d76b
 *
Packit Service 75d76b
 * This program is free software; you can redistribute it and/or
Packit Service 75d76b
 * modify it under the terms of the GNU General Public License
Packit Service 75d76b
 * as published by the Free Software Foundation; either version 2
Packit Service 75d76b
 * of the License, or (at your option) any later version.
Packit Service 75d76b
 * 
Packit Service 75d76b
 * This program is distributed in the hope that it will be useful,
Packit Service 75d76b
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 75d76b
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 75d76b
 * GNU General Public License for more details.
Packit Service 75d76b
 * 
Packit Service 75d76b
 * You should have received a copy of the GNU General Public License
Packit Service 75d76b
 * along with this program; if not, write to the Free Software
Packit Service 75d76b
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 75d76b
 * 02110-1301, USA. 
Packit Service 75d76b
 *
Packit Service 75d76b
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/batchProcessor.c#2 $
Packit Service 75d76b
 */
Packit Service 75d76b
Packit Service 75d76b
#include "batchProcessor.h"
Packit Service 75d76b
Packit Service 75d76b
#include "memoryAlloc.h"
Packit Service 75d76b
Packit Service 75d76b
#include "constants.h"
Packit Service 75d76b
Packit Service 75d76b
#include "kernelLayer.h"
Packit Service 75d76b
Packit Service 75d76b
/*
Packit Service 75d76b
 * On memory ordering:
Packit Service 75d76b
 *
Packit Service 75d76b
 * The producer thread does: enqueue item on queue (xchg, which is
Packit Service 75d76b
 * implicitly interlocked, then a store), memory barrier, then atomic
Packit Service 75d76b
 * cmpxchg of the state field. The x86 architecture spec says the
Packit Service 75d76b
 * xchg, store, lock-cmpxchg sequence cannot be reordered, but on
Packit Service 75d76b
 * architectures using load-linked and store-conditional for the
Packit Service 75d76b
 * cmpxchg, like AArch64, the LL can be reordered with the store, so
Packit Service 75d76b
 * we add a barrier.
Packit Service 75d76b
 *
Packit Service 75d76b
 * The consumer thread, when it is running out of work, does: read
Packit Service 75d76b
 * queue (find empty), set state, mfence, read queue again just to be
Packit Service 75d76b
 * sure. The set-state and read-queue cannot be reordered with respect
Packit Service 75d76b
 * to the mfence (but without the mfence, the read could be moved
Packit Service 75d76b
 * before the set).
Packit Service 75d76b
 *
Packit Service 75d76b
 * The xchg and mfence impose a total order across processors, and
Packit Service 75d76b
 * each processor sees the stores done by the other processor in the
Packit Service 75d76b
 * required order. If the xchg happens before the mfence, the
Packit Service 75d76b
 * consumer's "read queue again" operation will see the update. If the
Packit Service 75d76b
 * mfence happens first, the producer's "cmpxchg state" will see its
Packit Service 75d76b
 * updated value.
Packit Service 75d76b
 *
Packit Service 75d76b
 * These are the semantics implemented by memory set to WB (write-back
Packit Service 75d76b
 * caching) mode on x86-64. So, the simple analysis is that no wakeups
Packit Service 75d76b
 * should be missed.
Packit Service 75d76b
 *
Packit Service 75d76b
 * It's a little subtler with funnel queues, since one interrupted or
Packit Service 75d76b
 * delayed enqueue operation (see the commentary in funnelQueuePut)
Packit Service 75d76b
 * can cause another, concurrent enqueue operation to complete without
Packit Service 75d76b
 * actually making the entry visible to the consumer. In essence, one
Packit Service 75d76b
 * update makes no new work items visible to the consumer, and the
Packit Service 75d76b
 * other (when it eventually completes) makes two (or more) work items
Packit Service 75d76b
 * visible, and each one ensures that the consumer will process what
Packit Service 75d76b
 * it has made visible.
Packit Service 75d76b
 */
Packit Service 75d76b
Packit Service 75d76b
typedef enum batchProcessorState {
Packit Service 75d76b
  BATCH_PROCESSOR_IDLE,
Packit Service 75d76b
  BATCH_PROCESSOR_ENQUEUED,
Packit Service 75d76b
} BatchProcessorState;
Packit Service 75d76b
Packit Service 75d76b
struct batchProcessor {
Packit Service 75d76b
  spinlock_t              consumerLock;
Packit Service 75d76b
  FunnelQueue            *queue;
Packit Service 75d76b
  KvdoWorkItem            workItem;
Packit Service 75d76b
  atomic_t                state;
Packit Service 75d76b
  BatchProcessorCallback  callback;
Packit Service 75d76b
  void                   *closure;
Packit Service 75d76b
  KernelLayer            *layer;
Packit Service 75d76b
};
Packit Service 75d76b
Packit Service 75d76b
static void scheduleBatchProcessing(BatchProcessor *batch);
Packit Service 75d76b
Packit Service 75d76b
/**
Packit Service 75d76b
 * Apply the batch processing function to the accumulated set of
Packit Service 75d76b
 * objects.
Packit Service 75d76b
 *
Packit Service 75d76b
 * Runs in a "CPU queue".
Packit Service 75d76b
 *
Packit Service 75d76b
 * @param [in]  item  The work item embedded in the BatchProcessor
Packit Service 75d76b
 **/
Packit Service 75d76b
static void batchProcessorWork(KvdoWorkItem *item)
Packit Service 75d76b
{
Packit Service 75d76b
  BatchProcessor *batch = container_of(item, BatchProcessor, workItem);
Packit Service 75d76b
  spin_lock(&batch->consumerLock);
Packit Service 75d76b
  while (!isFunnelQueueEmpty(batch->queue)) {
Packit Service 75d76b
    batch->callback(batch, batch->closure);
Packit Service 75d76b
  }
Packit Service 75d76b
  atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
Packit Service 75d76b
  memoryFence();
Packit Service 75d76b
  bool needReschedule = !isFunnelQueueEmpty(batch->queue);
Packit Service 75d76b
  spin_unlock(&batch->consumerLock);
Packit Service 75d76b
  if (needReschedule) {
Packit Service 75d76b
    scheduleBatchProcessing(batch);
Packit Service 75d76b
  }
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**
Packit Service 75d76b
 * Ensure that the batch-processing function is scheduled to run.
Packit Service 75d76b
 *
Packit Service 75d76b
 * If we're the thread that switches the BatchProcessor state from
Packit Service 75d76b
 * idle to enqueued, we're the thread responsible for actually
Packit Service 75d76b
 * enqueueing it. If some other thread got there first, or it was
Packit Service 75d76b
 * already enqueued, it's not our problem.
Packit Service 75d76b
 *
Packit Service 75d76b
 * @param [in]  batch  The BatchProcessor control data
Packit Service 75d76b
 **/
Packit Service 75d76b
static void scheduleBatchProcessing(BatchProcessor *batch)
Packit Service 75d76b
{
Packit Service 75d76b
  /*
Packit Service 75d76b
   * We want this to be very fast in the common cases.
Packit Service 75d76b
   *
Packit Service 75d76b
   * In testing on our "mgh" class machines (HP ProLiant DL380p Gen8,
Packit Service 75d76b
   * Intel Xeon E5-2690, 2.9GHz), it appears that under some
Packit Service 75d76b
   * conditions it's a little faster to use a memory fence and then
Packit Service 75d76b
   * read the "state" field, skipping the cmpxchg if the state is
Packit Service 75d76b
   * already set to BATCH_PROCESSOR_ENQUEUED. (Sometimes slightly
Packit Service 75d76b
   * faster still if we prefetch the state field first.) Note that the
Packit Service 75d76b
   * read requires the fence, otherwise it could be executed before
Packit Service 75d76b
   * the preceding store by the FunnelQueue code to the "next"
Packit Service 75d76b
   * pointer, which can, very rarely, result in failing to issue a
Packit Service 75d76b
   * wakeup when needed.
Packit Service 75d76b
   *
Packit Service 75d76b
   * However, the gain is small, and in testing on our older "harvard"
Packit Service 75d76b
   * class machines (Intel Xeon X5680, 3.33GHz) it was a clear win to
Packit Service 75d76b
   * skip all of that and go right for the cmpxchg.
Packit Service 75d76b
   *
Packit Service 75d76b
   * Of course, the tradeoffs may be sensitive to the particular work
Packit Service 75d76b
   * going on, cache pressure, etc.
Packit Service 75d76b
   */
Packit Service 75d76b
  smp_mb();
Packit Service 75d76b
  BatchProcessorState oldState
Packit Service 75d76b
    = atomic_cmpxchg(&batch->state, BATCH_PROCESSOR_IDLE,
Packit Service 75d76b
                     BATCH_PROCESSOR_ENQUEUED);
Packit Service 75d76b
  bool doSchedule = (oldState == BATCH_PROCESSOR_IDLE);
Packit Service 75d76b
  if (doSchedule) {
Packit Service 75d76b
    enqueueCPUWorkQueue(batch->layer, &batch->workItem);
Packit Service 75d76b
  }
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**********************************************************************/
Packit Service 75d76b
int makeBatchProcessor(KernelLayer             *layer,
Packit Service 75d76b
                       BatchProcessorCallback   callback,
Packit Service 75d76b
                       void                    *closure,
Packit Service 75d76b
                       BatchProcessor         **batchPtr)
Packit Service 75d76b
{
Packit Service 75d76b
  BatchProcessor *batch;
Packit Service 75d76b
Packit Service 75d76b
  int result = ALLOCATE(1, BatchProcessor, "batchProcessor", &batch);
Packit Service 75d76b
  if (result != UDS_SUCCESS) {
Packit Service 75d76b
    return result;
Packit Service 75d76b
  }
Packit Service 75d76b
  result = makeFunnelQueue(&batch->queue);
Packit Service 75d76b
  if (result != UDS_SUCCESS) {
Packit Service 75d76b
    FREE(batch);
Packit Service 75d76b
    return result;
Packit Service 75d76b
  }
Packit Service 75d76b
Packit Service 75d76b
  spin_lock_init(&batch->consumerLock);
Packit Service 75d76b
  setupWorkItem(&batch->workItem, batchProcessorWork,
Packit Service 75d76b
                (KvdoWorkFunction) callback, CPU_Q_ACTION_COMPLETE_KVIO);
Packit Service 75d76b
  atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
Packit Service 75d76b
  batch->callback = callback;
Packit Service 75d76b
  batch->closure  = closure;
Packit Service 75d76b
  batch->layer    = layer;
Packit Service 75d76b
Packit Service 75d76b
  *batchPtr = batch;
Packit Service 75d76b
  return UDS_SUCCESS;
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**********************************************************************/
Packit Service 75d76b
void addToBatchProcessor(BatchProcessor *batch, KvdoWorkItem *item)
Packit Service 75d76b
{
Packit Service 75d76b
  funnelQueuePut(batch->queue, &item->workQueueEntryLink);
Packit Service 75d76b
  scheduleBatchProcessing(batch);
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**********************************************************************/
Packit Service 75d76b
KvdoWorkItem *nextBatchItem(BatchProcessor *batch)
Packit Service 75d76b
{
Packit Service 75d76b
  FunnelQueueEntry *fqEntry = funnelQueuePoll(batch->queue);
Packit Service 75d76b
  if (fqEntry == NULL) {
Packit Service 75d76b
    return NULL;
Packit Service 75d76b
  }
Packit Service 75d76b
Packit Service 75d76b
  return container_of(fqEntry, KvdoWorkItem, workQueueEntryLink);
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**********************************************************************/
Packit Service 75d76b
void condReschedBatchProcessor(BatchProcessor *batch)
Packit Service 75d76b
{
Packit Service 75d76b
  cond_resched_lock(&batch->consumerLock);
Packit Service 75d76b
}
Packit Service 75d76b
Packit Service 75d76b
/**********************************************************************/
Packit Service 75d76b
void freeBatchProcessor(BatchProcessor **batchPtr)
Packit Service 75d76b
{
Packit Service 75d76b
  BatchProcessor *batch = *batchPtr;
Packit Service 75d76b
  if (batch) {
Packit Service 75d76b
    memoryFence();
Packit Service 75d76b
    BUG_ON(atomic_read(&batch->state) == BATCH_PROCESSOR_ENQUEUED);
Packit Service 75d76b
    freeFunnelQueue(batch->queue);
Packit Service 75d76b
    FREE(batch);
Packit Service 75d76b
    *batchPtr = NULL;
Packit Service 75d76b
  }
Packit Service 75d76b
}