Blame source/vdo/kernel/batchProcessor.c

Packit Service d40955
/*
Packit Service d40955
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service d40955
 *
Packit Service d40955
 * This program is free software; you can redistribute it and/or
Packit Service d40955
 * modify it under the terms of the GNU General Public License
Packit Service d40955
 * as published by the Free Software Foundation; either version 2
Packit Service d40955
 * of the License, or (at your option) any later version.
Packit Service d40955
 * 
Packit Service d40955
 * This program is distributed in the hope that it will be useful,
Packit Service d40955
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service d40955
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service d40955
 * GNU General Public License for more details.
Packit Service d40955
 * 
Packit Service d40955
 * You should have received a copy of the GNU General Public License
Packit Service d40955
 * along with this program; if not, write to the Free Software
Packit Service d40955
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service d40955
 * 02110-1301, USA. 
Packit Service d40955
 *
Packit Service d40955
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/batchProcessor.c#2 $
Packit Service d40955
 */
Packit Service d40955
Packit Service d40955
#include "batchProcessor.h"
Packit Service d40955
Packit Service d40955
#include "memoryAlloc.h"
Packit Service d40955
Packit Service d40955
#include "constants.h"
Packit Service d40955
Packit Service d40955
#include "kernelLayer.h"
Packit Service d40955
Packit Service d40955
/*
Packit Service d40955
 * On memory ordering:
Packit Service d40955
 *
Packit Service d40955
 * The producer thread does: enqueue item on queue (xchg, which is
Packit Service d40955
 * implicitly interlocked, then a store), memory barrier, then atomic
Packit Service d40955
 * cmpxchg of the state field. The x86 architecture spec says the
Packit Service d40955
 * xchg, store, lock-cmpxchg sequence cannot be reordered, but on
Packit Service d40955
 * architectures using load-linked and store-conditional for the
Packit Service d40955
 * cmpxchg, like AArch64, the LL can be reordered with the store, so
Packit Service d40955
 * we add a barrier.
Packit Service d40955
 *
Packit Service d40955
 * The consumer thread, when it is running out of work, does: read
Packit Service d40955
 * queue (find empty), set state, mfence, read queue again just to be
Packit Service d40955
 * sure. The set-state and read-queue cannot be reordered with respect
Packit Service d40955
 * to the mfence (but without the mfence, the read could be moved
Packit Service d40955
 * before the set).
Packit Service d40955
 *
Packit Service d40955
 * The xchg and mfence impose a total order across processors, and
Packit Service d40955
 * each processor sees the stores done by the other processor in the
Packit Service d40955
 * required order. If the xchg happens before the mfence, the
Packit Service d40955
 * consumer's "read queue again" operation will see the update. If the
Packit Service d40955
 * mfence happens first, the producer's "cmpxchg state" will see its
Packit Service d40955
 * updated value.
Packit Service d40955
 *
Packit Service d40955
 * These are the semantics implemented by memory set to WB (write-back
Packit Service d40955
 * caching) mode on x86-64. So, the simple analysis is that no wakeups
Packit Service d40955
 * should be missed.
Packit Service d40955
 *
Packit Service d40955
 * It's a little subtler with funnel queues, since one interrupted or
Packit Service d40955
 * delayed enqueue operation (see the commentary in funnelQueuePut)
Packit Service d40955
 * can cause another, concurrent enqueue operation to complete without
Packit Service d40955
 * actually making the entry visible to the consumer. In essence, one
Packit Service d40955
 * update makes no new work items visible to the consumer, and the
Packit Service d40955
 * other (when it eventually completes) makes two (or more) work items
Packit Service d40955
 * visible, and each one ensures that the consumer will process what
Packit Service d40955
 * it has made visible.
Packit Service d40955
 */
Packit Service d40955
Packit Service d40955
typedef enum batchProcessorState {
Packit Service d40955
  BATCH_PROCESSOR_IDLE,
Packit Service d40955
  BATCH_PROCESSOR_ENQUEUED,
Packit Service d40955
} BatchProcessorState;
Packit Service d40955
Packit Service d40955
struct batchProcessor {
Packit Service d40955
  spinlock_t              consumerLock;
Packit Service d40955
  FunnelQueue            *queue;
Packit Service d40955
  KvdoWorkItem            workItem;
Packit Service d40955
  atomic_t                state;
Packit Service d40955
  BatchProcessorCallback  callback;
Packit Service d40955
  void                   *closure;
Packit Service d40955
  KernelLayer            *layer;
Packit Service d40955
};
Packit Service d40955
Packit Service d40955
static void scheduleBatchProcessing(BatchProcessor *batch);
Packit Service d40955
Packit Service d40955
/**
Packit Service d40955
 * Apply the batch processing function to the accumulated set of
Packit Service d40955
 * objects.
Packit Service d40955
 *
Packit Service d40955
 * Runs in a "CPU queue".
Packit Service d40955
 *
Packit Service d40955
 * @param [in]  item  The work item embedded in the BatchProcessor
Packit Service d40955
 **/
Packit Service d40955
static void batchProcessorWork(KvdoWorkItem *item)
Packit Service d40955
{
Packit Service d40955
  BatchProcessor *batch = container_of(item, BatchProcessor, workItem);
Packit Service d40955
  spin_lock(&batch->consumerLock);
Packit Service d40955
  while (!isFunnelQueueEmpty(batch->queue)) {
Packit Service d40955
    batch->callback(batch, batch->closure);
Packit Service d40955
  }
Packit Service d40955
  atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
Packit Service d40955
  memoryFence();
Packit Service d40955
  bool needReschedule = !isFunnelQueueEmpty(batch->queue);
Packit Service d40955
  spin_unlock(&batch->consumerLock);
Packit Service d40955
  if (needReschedule) {
Packit Service d40955
    scheduleBatchProcessing(batch);
Packit Service d40955
  }
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**
Packit Service d40955
 * Ensure that the batch-processing function is scheduled to run.
Packit Service d40955
 *
Packit Service d40955
 * If we're the thread that switches the BatchProcessor state from
Packit Service d40955
 * idle to enqueued, we're the thread responsible for actually
Packit Service d40955
 * enqueueing it. If some other thread got there first, or it was
Packit Service d40955
 * already enqueued, it's not our problem.
Packit Service d40955
 *
Packit Service d40955
 * @param [in]  batch  The BatchProcessor control data
Packit Service d40955
 **/
Packit Service d40955
static void scheduleBatchProcessing(BatchProcessor *batch)
Packit Service d40955
{
Packit Service d40955
  /*
Packit Service d40955
   * We want this to be very fast in the common cases.
Packit Service d40955
   *
Packit Service d40955
   * In testing on our "mgh" class machines (HP ProLiant DL380p Gen8,
Packit Service d40955
   * Intel Xeon E5-2690, 2.9GHz), it appears that under some
Packit Service d40955
   * conditions it's a little faster to use a memory fence and then
Packit Service d40955
   * read the "state" field, skipping the cmpxchg if the state is
Packit Service d40955
   * already set to BATCH_PROCESSOR_ENQUEUED. (Sometimes slightly
Packit Service d40955
   * faster still if we prefetch the state field first.) Note that the
Packit Service d40955
   * read requires the fence, otherwise it could be executed before
Packit Service d40955
   * the preceding store by the FunnelQueue code to the "next"
Packit Service d40955
   * pointer, which can, very rarely, result in failing to issue a
Packit Service d40955
   * wakeup when needed.
Packit Service d40955
   *
Packit Service d40955
   * However, the gain is small, and in testing on our older "harvard"
Packit Service d40955
   * class machines (Intel Xeon X5680, 3.33GHz) it was a clear win to
Packit Service d40955
   * skip all of that and go right for the cmpxchg.
Packit Service d40955
   *
Packit Service d40955
   * Of course, the tradeoffs may be sensitive to the particular work
Packit Service d40955
   * going on, cache pressure, etc.
Packit Service d40955
   */
Packit Service d40955
  smp_mb();
Packit Service d40955
  BatchProcessorState oldState
Packit Service d40955
    = atomic_cmpxchg(&batch->state, BATCH_PROCESSOR_IDLE,
Packit Service d40955
                     BATCH_PROCESSOR_ENQUEUED);
Packit Service d40955
  bool doSchedule = (oldState == BATCH_PROCESSOR_IDLE);
Packit Service d40955
  if (doSchedule) {
Packit Service d40955
    enqueueCPUWorkQueue(batch->layer, &batch->workItem);
Packit Service d40955
  }
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**********************************************************************/
Packit Service d40955
int makeBatchProcessor(KernelLayer             *layer,
Packit Service d40955
                       BatchProcessorCallback   callback,
Packit Service d40955
                       void                    *closure,
Packit Service d40955
                       BatchProcessor         **batchPtr)
Packit Service d40955
{
Packit Service d40955
  BatchProcessor *batch;
Packit Service d40955
Packit Service d40955
  int result = ALLOCATE(1, BatchProcessor, "batchProcessor", &batch);
Packit Service d40955
  if (result != UDS_SUCCESS) {
Packit Service d40955
    return result;
Packit Service d40955
  }
Packit Service d40955
  result = makeFunnelQueue(&batch->queue);
Packit Service d40955
  if (result != UDS_SUCCESS) {
Packit Service d40955
    FREE(batch);
Packit Service d40955
    return result;
Packit Service d40955
  }
Packit Service d40955
Packit Service d40955
  spin_lock_init(&batch->consumerLock);
Packit Service d40955
  setupWorkItem(&batch->workItem, batchProcessorWork,
Packit Service d40955
                (KvdoWorkFunction) callback, CPU_Q_ACTION_COMPLETE_KVIO);
Packit Service d40955
  atomic_set(&batch->state, BATCH_PROCESSOR_IDLE);
Packit Service d40955
  batch->callback = callback;
Packit Service d40955
  batch->closure  = closure;
Packit Service d40955
  batch->layer    = layer;
Packit Service d40955
Packit Service d40955
  *batchPtr = batch;
Packit Service d40955
  return UDS_SUCCESS;
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**********************************************************************/
Packit Service d40955
void addToBatchProcessor(BatchProcessor *batch, KvdoWorkItem *item)
Packit Service d40955
{
Packit Service d40955
  funnelQueuePut(batch->queue, &item->workQueueEntryLink);
Packit Service d40955
  scheduleBatchProcessing(batch);
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**********************************************************************/
Packit Service d40955
KvdoWorkItem *nextBatchItem(BatchProcessor *batch)
Packit Service d40955
{
Packit Service d40955
  FunnelQueueEntry *fqEntry = funnelQueuePoll(batch->queue);
Packit Service d40955
  if (fqEntry == NULL) {
Packit Service d40955
    return NULL;
Packit Service d40955
  }
Packit Service d40955
Packit Service d40955
  return container_of(fqEntry, KvdoWorkItem, workQueueEntryLink);
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**********************************************************************/
Packit Service d40955
void condReschedBatchProcessor(BatchProcessor *batch)
Packit Service d40955
{
Packit Service d40955
  cond_resched_lock(&batch->consumerLock);
Packit Service d40955
}
Packit Service d40955
Packit Service d40955
/**********************************************************************/
Packit Service d40955
void freeBatchProcessor(BatchProcessor **batchPtr)
Packit Service d40955
{
Packit Service d40955
  BatchProcessor *batch = *batchPtr;
Packit Service d40955
  if (batch) {
Packit Service d40955
    memoryFence();
Packit Service d40955
    BUG_ON(atomic_read(&batch->state) == BATCH_PROCESSOR_ENQUEUED);
Packit Service d40955
    freeFunnelQueue(batch->queue);
Packit Service d40955
    FREE(batch);
Packit Service d40955
    *batchPtr = NULL;
Packit Service d40955
  }
Packit Service d40955
}