Blame vdo/base/slabScrubber.c

Packit Service 693862
/*
Packit Service 693862
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 693862
 *
Packit Service 693862
 * This program is free software; you can redistribute it and/or
Packit Service 693862
 * modify it under the terms of the GNU General Public License
Packit Service 693862
 * as published by the Free Software Foundation; either version 2
Packit Service 693862
 * of the License, or (at your option) any later version.
Packit Service 693862
 * 
Packit Service 693862
 * This program is distributed in the hope that it will be useful,
Packit Service 693862
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 693862
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 693862
 * GNU General Public License for more details.
Packit Service 693862
 * 
Packit Service 693862
 * You should have received a copy of the GNU General Public License
Packit Service 693862
 * along with this program; if not, write to the Free Software
Packit Service 693862
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 693862
 * 02110-1301, USA. 
Packit Service 693862
 *
Packit Service 693862
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/slabScrubber.c#6 $
Packit Service 693862
 */
Packit Service 693862
Packit Service 693862
#include "slabScrubberInternals.h"
Packit Service 693862
Packit Service 693862
#include "logger.h"
Packit Service 693862
#include "memoryAlloc.h"
Packit Service 693862
Packit Service 693862
#include "adminState.h"
Packit Service 693862
#include "blockAllocator.h"
Packit Service 693862
#include "constants.h"
Packit Service 693862
#include "readOnlyNotifier.h"
Packit Service 693862
#include "recoveryJournal.h"
Packit Service 693862
#include "refCounts.h"
Packit Service 693862
#include "refCountsInternals.h"
Packit Service 693862
#include "slab.h"
Packit Service 693862
#include "slabJournalInternals.h"
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Allocate the buffer and extent used for reading the slab journal when
Packit Service 693862
 * scrubbing a slab.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber         The slab scrubber for which to allocate
Packit Service 693862
 * @param layer            The physical layer on which the scrubber resides
Packit Service 693862
 * @param slabJournalSize  The size of a slab journal
Packit Service 693862
 *
Packit Service 693862
 * @return VDO_SUCCESS or an error
Packit Service 693862
 **/
Packit Service 693862
__attribute__((warn_unused_result))
Packit Service 693862
static int allocateExtentAndBuffer(SlabScrubber  *scrubber,
Packit Service 693862
                                   PhysicalLayer *layer,
Packit Service 693862
                                   BlockCount     slabJournalSize)
Packit Service 693862
{
Packit Service 693862
  size_t bufferSize = VDO_BLOCK_SIZE * slabJournalSize;
Packit Service 693862
  int result = ALLOCATE(bufferSize, char, __func__, &scrubber->journalData);
Packit Service 693862
  if (result != VDO_SUCCESS) {
Packit Service 693862
    return result;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  return createExtent(layer, VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
Packit Service 693862
                      slabJournalSize, scrubber->journalData,
Packit Service 693862
                      &scrubber->extent);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
int makeSlabScrubber(PhysicalLayer     *layer,
Packit Service 693862
                     BlockCount         slabJournalSize,
Packit Service 693862
                     ReadOnlyNotifier  *readOnlyNotifier,
Packit Service 693862
                     SlabScrubber     **scrubberPtr)
Packit Service 693862
{
Packit Service 693862
  SlabScrubber *scrubber;
Packit Service 693862
  int result = ALLOCATE(1, SlabScrubber, __func__, &scrubber);
Packit Service 693862
  if (result != VDO_SUCCESS) {
Packit Service 693862
    return result;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  result = allocateExtentAndBuffer(scrubber, layer, slabJournalSize);
Packit Service 693862
  if (result != VDO_SUCCESS) {
Packit Service 693862
    freeSlabScrubber(&scrubber);
Packit Service 693862
    return result;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  initializeCompletion(&scrubber->completion, SLAB_SCRUBBER_COMPLETION, layer);
Packit Service 693862
  initializeRing(&scrubber->highPrioritySlabs);
Packit Service 693862
  initializeRing(&scrubber->slabs);
Packit Service 693862
  scrubber->readOnlyNotifier = readOnlyNotifier;
Packit Service 693862
  scrubber->adminState.state = ADMIN_STATE_SUSPENDED;
Packit Service 693862
  *scrubberPtr               = scrubber;
Packit Service 693862
  return VDO_SUCCESS;
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Free the extent and buffer used for reading slab journals.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber  The scrubber
Packit Service 693862
 **/
Packit Service 693862
static void freeExtentAndBuffer(SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  freeExtent(&scrubber->extent);
Packit Service 693862
  if (scrubber->journalData != NULL) {
Packit Service 693862
    FREE(scrubber->journalData);
Packit Service 693862
    scrubber->journalData = NULL;
Packit Service 693862
  }
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void freeSlabScrubber(SlabScrubber **scrubberPtr)
Packit Service 693862
{
Packit Service 693862
  if (*scrubberPtr == NULL) {
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  SlabScrubber *scrubber = *scrubberPtr;
Packit Service 693862
  freeExtentAndBuffer(scrubber);
Packit Service 693862
  FREE(scrubber);
Packit Service 693862
  *scrubberPtr = NULL;
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Get the next slab to scrub.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber  The slab scrubber
Packit Service 693862
 *
Packit Service 693862
 * @return The next slab to scrub or NULL if there are none
Packit Service 693862
 **/
Packit Service 693862
static Slab *getNextSlab(SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  if (!isRingEmpty(&scrubber->highPrioritySlabs)) {
Packit Service 693862
    return slabFromRingNode(scrubber->highPrioritySlabs.next);
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  if (!isRingEmpty(&scrubber->slabs)) {
Packit Service 693862
    return slabFromRingNode(scrubber->slabs.next);
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  return NULL;
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
bool hasSlabsToScrub(SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  return (getNextSlab(scrubber) != NULL);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
SlabCount getScrubberSlabCount(const SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  return relaxedLoad64(&scrubber->slabCount);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void registerSlabForScrubbing(SlabScrubber *scrubber,
Packit Service 693862
                              Slab         *slab,
Packit Service 693862
                              bool          highPriority)
Packit Service 693862
{
Packit Service 693862
  ASSERT_LOG_ONLY((slab->status != SLAB_REBUILT),
Packit Service 693862
                  "slab to be scrubbed is unrecovered");
Packit Service 693862
Packit Service 693862
  if (slab->status != SLAB_REQUIRES_SCRUBBING) {
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  unspliceRingNode(&slab->ringNode);
Packit Service 693862
  if (!slab->wasQueuedForScrubbing) {
Packit Service 693862
    relaxedAdd64(&scrubber->slabCount, 1);
Packit Service 693862
    slab->wasQueuedForScrubbing = true;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  if (highPriority) {
Packit Service 693862
    slab->status = SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
Packit Service 693862
    pushRingNode(&scrubber->highPrioritySlabs, &slab->ringNode);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  pushRingNode(&scrubber->slabs, &slab->ringNode);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Stop scrubbing, either because there are no more slabs to scrub or because
Packit Service 693862
 * there's been an error.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber  The scrubber
Packit Service 693862
 **/
Packit Service 693862
static void finishScrubbing(SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  if (!hasSlabsToScrub(scrubber)) {
Packit Service 693862
    freeExtentAndBuffer(scrubber);
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  // Inform whoever is waiting that scrubbing has completed.
Packit Service 693862
  completeCompletion(&scrubber->completion);
Packit Service 693862
Packit Service 693862
  bool notify = hasWaiters(&scrubber->waiters);
Packit Service 693862
Packit Service 693862
  // Note that the scrubber has stopped, and inform anyone who might be waiting
Packit Service 693862
  // for that to happen.
Packit Service 693862
  if (!finishDraining(&scrubber->adminState)) {
Packit Service 693862
    scrubber->adminState.state = ADMIN_STATE_SUSPENDED;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  /*
Packit Service 693862
   * We can't notify waiters until after we've finished draining or they'll
Packit Service 693862
   * just requeue. Fortunately if there were waiters, we can't have been freed
Packit Service 693862
   * yet.
Packit Service 693862
   */
Packit Service 693862
  if (notify) {
Packit Service 693862
    notifyAllWaiters(&scrubber->waiters, NULL, NULL);
Packit Service 693862
  }
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
static void scrubNextSlab(SlabScrubber *scrubber);
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Notify the scrubber that a slab has been scrubbed. This callback is
Packit Service 693862
 * registered in applyJournalEntries().
Packit Service 693862
 *
Packit Service 693862
 * @param completion  The slab rebuild completion
Packit Service 693862
 **/
Packit Service 693862
static void slabScrubbed(VDOCompletion *completion)
Packit Service 693862
{
Packit Service 693862
  SlabScrubber *scrubber = completion->parent;
Packit Service 693862
  finishScrubbingSlab(scrubber->slab);
Packit Service 693862
  relaxedAdd64(&scrubber->slabCount, -1);
Packit Service 693862
  scrubNextSlab(scrubber);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Abort scrubbing due to an error.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber  The slab scrubber
Packit Service 693862
 * @param result    The error
Packit Service 693862
 **/
Packit Service 693862
static void abortScrubbing(SlabScrubber *scrubber, int result)
Packit Service 693862
{
Packit Service 693862
  enterReadOnlyMode(scrubber->readOnlyNotifier, result);
Packit Service 693862
  setCompletionResult(&scrubber->completion, result);
Packit Service 693862
  scrubNextSlab(scrubber);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Handle errors while rebuilding a slab.
Packit Service 693862
 *
Packit Service 693862
 * @param completion  The slab rebuild completion
Packit Service 693862
 **/
Packit Service 693862
static void handleScrubberError(VDOCompletion *completion)
Packit Service 693862
{
Packit Service 693862
  abortScrubbing(completion->parent, completion->result);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Apply all the entries in a block to the reference counts.
Packit Service 693862
 *
Packit Service 693862
 * @param block        A block with entries to apply
Packit Service 693862
 * @param entryCount   The number of entries to apply
Packit Service 693862
 * @param blockNumber  The sequence number of the block
Packit Service 693862
 * @param slab         The slab to apply the entries to
Packit Service 693862
 *
Packit Service 693862
 * @return VDO_SUCCESS or an error code
Packit Service 693862
 **/
Packit Service 693862
static int applyBlockEntries(PackedSlabJournalBlock *block,
Packit Service 693862
                             JournalEntryCount       entryCount,
Packit Service 693862
                             SequenceNumber          blockNumber,
Packit Service 693862
                             Slab                   *slab)
Packit Service 693862
{
Packit Service 693862
  JournalPoint entryPoint = {
Packit Service 693862
    .sequenceNumber = blockNumber,
Packit Service 693862
    .entryCount     = 0,
Packit Service 693862
  };
Packit Service 693862
Packit Service 693862
  SlabBlockNumber maxSBN = slab->end - slab->start;
Packit Service 693862
  while (entryPoint.entryCount < entryCount) {
Packit Service 693862
    SlabJournalEntry entry = decodeSlabJournalEntry(block,
Packit Service 693862
                                                    entryPoint.entryCount);
Packit Service 693862
    if (entry.sbn > maxSBN) {
Packit Service 693862
      // This entry is out of bounds.
Packit Service 693862
      return logErrorWithStringError(VDO_CORRUPT_JOURNAL, "Slab journal entry"
Packit Service 693862
                                     " (%llu, %u) had invalid offset"
Packit Service 693862
                                     " %u in slab (size %u blocks)",
Packit Service 693862
                                     blockNumber, entryPoint.entryCount,
Packit Service 693862
                                     entry.sbn, maxSBN);
Packit Service 693862
    }
Packit Service 693862
Packit Service 693862
    int result = replayReferenceCountChange(slab->referenceCounts, &entryPoint,
Packit Service 693862
                                            entry);
Packit Service 693862
    if (result != VDO_SUCCESS) {
Packit Service 693862
      logErrorWithStringError(result, "Slab journal entry (%llu, %u)"
Packit Service 693862
                              " (%s of offset %" PRIu32 ") could not be"
Packit Service 693862
                              " applied in slab %u",
Packit Service 693862
                              blockNumber, entryPoint.entryCount,
Packit Service 693862
                              getJournalOperationName(entry.operation),
Packit Service 693862
                              entry.sbn, slab->slabNumber);
Packit Service 693862
      return result;
Packit Service 693862
    }
Packit Service 693862
    entryPoint.entryCount++;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  return VDO_SUCCESS;
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Find the relevant extent of the slab journal and apply all valid entries.
Packit Service 693862
 * This is a callback registered in startScrubbing().
Packit Service 693862
 *
Packit Service 693862
 * @param completion  The metadata read extent completion
Packit Service 693862
 **/
Packit Service 693862
static void applyJournalEntries(VDOCompletion *completion)
Packit Service 693862
{
Packit Service 693862
  SlabScrubber *scrubber        = completion->parent;
Packit Service 693862
  Slab         *slab            = scrubber->slab;
Packit Service 693862
  SlabJournal  *journal         = slab->journal;
Packit Service 693862
  RefCounts    *referenceCounts = slab->referenceCounts;
Packit Service 693862
Packit Service 693862
  // Find the boundaries of the useful part of the journal.
Packit Service 693862
  SequenceNumber  tail     = journal->tail;
Packit Service 693862
  TailBlockOffset endIndex = getSlabJournalBlockOffset(journal, tail - 1);
Packit Service 693862
  char *endData = scrubber->journalData + (endIndex * VDO_BLOCK_SIZE);
Packit Service 693862
  PackedSlabJournalBlock *endBlock = (PackedSlabJournalBlock *) endData;
Packit Service 693862
Packit Service 693862
  SequenceNumber  head      = getUInt64LE(endBlock->header.fields.head);
Packit Service 693862
  TailBlockOffset headIndex = getSlabJournalBlockOffset(journal, head);
Packit Service 693862
  BlockCount      index     = headIndex;
Packit Service 693862
Packit Service 693862
  JournalPoint refCountsPoint   = referenceCounts->slabJournalPoint;
Packit Service 693862
  JournalPoint lastEntryApplied = refCountsPoint;
Packit Service 693862
  for (SequenceNumber sequence = head; sequence < tail; sequence++) {
Packit Service 693862
    char *blockData = scrubber->journalData + (index * VDO_BLOCK_SIZE);
Packit Service 693862
    PackedSlabJournalBlock *block  = (PackedSlabJournalBlock *) blockData;
Packit Service 693862
    SlabJournalBlockHeader header;
Packit Service 693862
    unpackSlabJournalBlockHeader(&block->header, &header);
Packit Service 693862
Packit Service 693862
    if ((header.nonce != slab->allocator->nonce)
Packit Service 693862
        || (header.metadataType != VDO_METADATA_SLAB_JOURNAL)
Packit Service 693862
        || (header.sequenceNumber != sequence)
Packit Service 693862
        || (header.entryCount > journal->entriesPerBlock)
Packit Service 693862
        || (header.hasBlockMapIncrements
Packit Service 693862
            && (header.entryCount > journal->fullEntriesPerBlock))) {
Packit Service 693862
      // The block is not what we expect it to be.
Packit Service 693862
      logError("Slab journal block for slab %u was invalid",
Packit Service 693862
               slab->slabNumber);
Packit Service 693862
      abortScrubbing(scrubber, VDO_CORRUPT_JOURNAL);
Packit Service 693862
      return;
Packit Service 693862
    }
Packit Service 693862
Packit Service 693862
    int result = applyBlockEntries(block, header.entryCount, sequence, slab);
Packit Service 693862
    if (result != VDO_SUCCESS) {
Packit Service 693862
      abortScrubbing(scrubber, result);
Packit Service 693862
      return;
Packit Service 693862
    }
Packit Service 693862
Packit Service 693862
    lastEntryApplied.sequenceNumber = sequence;
Packit Service 693862
    lastEntryApplied.entryCount     = header.entryCount - 1;
Packit Service 693862
    index++;
Packit Service 693862
    if (index == journal->size) {
Packit Service 693862
      index = 0;
Packit Service 693862
    }
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  // At the end of rebuild, the refCounts should be accurate to the end
Packit Service 693862
  // of the journal we just applied.
Packit Service 693862
  int result = ASSERT(!beforeJournalPoint(&lastEntryApplied, &refCountsPoint),
Packit Service 693862
                      "Refcounts are not more accurate than the slab journal");
Packit Service 693862
  if (result != VDO_SUCCESS) {
Packit Service 693862
    abortScrubbing(scrubber, result);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  // Save out the rebuilt reference blocks.
Packit Service 693862
  prepareCompletion(completion, slabScrubbed, handleScrubberError,
Packit Service 693862
                    completion->callbackThreadID, scrubber);
Packit Service 693862
  startSlabAction(slab, ADMIN_STATE_SAVE_FOR_SCRUBBING, completion);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Read the current slab's journal from disk now that it has been flushed.
Packit Service 693862
 * This callback is registered in scrubNextSlab().
Packit Service 693862
 *
Packit Service 693862
 * @param completion  The scrubber's extent completion
Packit Service 693862
 **/
Packit Service 693862
static void startScrubbing(VDOCompletion *completion)
Packit Service 693862
{
Packit Service 693862
  SlabScrubber *scrubber = completion->parent;
Packit Service 693862
  Slab         *slab     = scrubber->slab;
Packit Service 693862
  if (getSummarizedCleanliness(slab->allocator->summary, slab->slabNumber)) {
Packit Service 693862
    slabScrubbed(completion);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  prepareCompletion(&scrubber->extent->completion, applyJournalEntries,
Packit Service 693862
                    handleScrubberError, completion->callbackThreadID,
Packit Service 693862
                    completion->parent);
Packit Service 693862
  readMetadataExtent(scrubber->extent, slab->journalOrigin);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**
Packit Service 693862
 * Scrub the next slab if there is one.
Packit Service 693862
 *
Packit Service 693862
 * @param scrubber  The scrubber
Packit Service 693862
 **/
Packit Service 693862
static void scrubNextSlab(SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  // Note: this notify call is always safe only because scrubbing can only
Packit Service 693862
  // be started when the VDO is quiescent.
Packit Service 693862
  notifyAllWaiters(&scrubber->waiters, NULL, NULL);
Packit Service 693862
  if (isReadOnly(scrubber->readOnlyNotifier)) {
Packit Service 693862
    setCompletionResult(&scrubber->completion, VDO_READ_ONLY);
Packit Service 693862
    finishScrubbing(scrubber);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  Slab *slab = getNextSlab(scrubber);
Packit Service 693862
  if ((slab == NULL)
Packit Service 693862
      || (scrubber->highPriorityOnly
Packit Service 693862
          && isRingEmpty(&scrubber->highPrioritySlabs))) {
Packit Service 693862
    scrubber->highPriorityOnly = false;
Packit Service 693862
    finishScrubbing(scrubber);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  if (finishDraining(&scrubber->adminState)) {
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  unspliceRingNode(&slab->ringNode);
Packit Service 693862
  scrubber->slab = slab;
Packit Service 693862
  VDOCompletion *completion = extentAsCompletion(scrubber->extent);
Packit Service 693862
  prepareCompletion(completion, startScrubbing,
Packit Service 693862
                    handleScrubberError, scrubber->completion.callbackThreadID,
Packit Service 693862
                    scrubber);
Packit Service 693862
  startSlabAction(slab, ADMIN_STATE_SCRUBBING, completion);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void scrubSlabs(SlabScrubber *scrubber,
Packit Service 693862
                void         *parent,
Packit Service 693862
                VDOAction    *callback,
Packit Service 693862
                VDOAction    *errorHandler)
Packit Service 693862
{
Packit Service 693862
  resumeIfQuiescent(&scrubber->adminState);
Packit Service 693862
  ThreadID threadID = getCallbackThreadID();
Packit Service 693862
  prepareCompletion(&scrubber->completion, callback, errorHandler, threadID,
Packit Service 693862
                    parent);
Packit Service 693862
  if (!hasSlabsToScrub(scrubber)) {
Packit Service 693862
    finishScrubbing(scrubber);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  scrubNextSlab(scrubber);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void scrubHighPrioritySlabs(SlabScrubber  *scrubber,
Packit Service 693862
                            bool           scrubAtLeastOne,
Packit Service 693862
                            VDOCompletion *parent,
Packit Service 693862
                            VDOAction     *callback,
Packit Service 693862
                            VDOAction     *errorHandler)
Packit Service 693862
{
Packit Service 693862
  if (scrubAtLeastOne && isRingEmpty(&scrubber->highPrioritySlabs)) {
Packit Service 693862
    Slab *slab = getNextSlab(scrubber);
Packit Service 693862
    if (slab != NULL) {
Packit Service 693862
      registerSlabForScrubbing(scrubber, slab, true);
Packit Service 693862
    }
Packit Service 693862
  }
Packit Service 693862
  scrubber->highPriorityOnly = true;
Packit Service 693862
  scrubSlabs(scrubber, parent, callback, errorHandler);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void stopScrubbing(SlabScrubber *scrubber, VDOCompletion *parent)
Packit Service 693862
{
Packit Service 693862
  if (isQuiescent(&scrubber->adminState)) {
Packit Service 693862
    completeCompletion(parent);
Packit Service 693862
  } else {
Packit Service 693862
    startDraining(&scrubber->adminState, ADMIN_STATE_SUSPENDING, parent, NULL);
Packit Service 693862
  }
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void resumeScrubbing(SlabScrubber *scrubber, VDOCompletion *parent)
Packit Service 693862
{
Packit Service 693862
  if (!hasSlabsToScrub(scrubber)) {
Packit Service 693862
    completeCompletion(parent);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  int result = resumeIfQuiescent(&scrubber->adminState);
Packit Service 693862
  if (result != VDO_SUCCESS) {
Packit Service 693862
    finishCompletion(parent, result);
Packit Service 693862
    return;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  scrubNextSlab(scrubber);
Packit Service 693862
  completeCompletion(parent);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
int enqueueCleanSlabWaiter(SlabScrubber *scrubber, Waiter *waiter)
Packit Service 693862
{
Packit Service 693862
  if (isReadOnly(scrubber->readOnlyNotifier)) {
Packit Service 693862
    return VDO_READ_ONLY;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  if (isQuiescent(&scrubber->adminState)) {
Packit Service 693862
    return VDO_NO_SPACE;
Packit Service 693862
  }
Packit Service 693862
Packit Service 693862
  return enqueueWaiter(&scrubber->waiters, waiter);
Packit Service 693862
}
Packit Service 693862
Packit Service 693862
/**********************************************************************/
Packit Service 693862
void dumpSlabScrubber(const SlabScrubber *scrubber)
Packit Service 693862
{
Packit Service 693862
  logInfo("slabScrubber slabCount %u waiters %zu %s%s",
Packit Service 693862
          getScrubberSlabCount(scrubber),
Packit Service 693862
          countWaiters(&scrubber->waiters),
Packit Service 693862
          getAdminStateName(&scrubber->adminState),
Packit Service 693862
          scrubber->highPriorityOnly ? ", highPriorityOnly " : "");
Packit Service 693862
}