Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/slabJournal.c#18 $
 */

#include "slabJournalInternals.h"

#include "logger.h"
#include "memoryAlloc.h"
#include "stringUtils.h"

#include "adminState.h"
#include "blockAllocatorInternals.h"
#include "dataVIO.h"
#include "recoveryJournal.h"
#include "refCounts.h"
#include "slabDepot.h"
#include "slabSummary.h"

/**
 * Return the slab journal from the resource waiter.
 *
 * @param waiter  The waiter
 *
 * @return The slab journal
 **/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromResourceWaiter(Waiter *waiter)
{
  STATIC_ASSERT(offsetof(SlabJournal, resourceWaiter) == 0);
  return (SlabJournal *) waiter;
}

/**
 * Return the slab journal from the flush waiter.
 *
 * @param waiter  The waiter
 *
 * @return The slab journal
 **/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromFlushWaiter(Waiter *waiter)
{
  if (waiter == NULL) {
    return NULL;
  }
  return (SlabJournal *)
    ((uintptr_t) waiter - offsetof(SlabJournal, flushWaiter));
}

/**********************************************************************/
SlabJournal *slabJournalFromDirtyNode(RingNode *node)
{
  if (node == NULL) {
    return NULL;
  }
  return (SlabJournal *) ((uintptr_t) node - offsetof(SlabJournal, dirtyNode));
}

/**
 * Return the slab journal from the slab summary waiter.
 *
 * @param waiter  The waiter
 *
 * @return The slab journal
 **/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromSlabSummaryWaiter(Waiter *waiter)
{
  if (waiter == NULL) {
    return NULL;
  }
  return (SlabJournal *)
    ((uintptr_t) waiter - offsetof(SlabJournal, slabSummaryWaiter));
}

/**
 * Get the physical block number for a given sequence number.
 *
 * @param journal   The journal
 * @param sequence  The sequence number of the desired block
 *
 * @return the block number corresponding to the sequence number
 **/
__attribute__((warn_unused_result))
static inline PhysicalBlockNumber getBlockNumber(SlabJournal    *journal,
                                                 SequenceNumber  sequence)
{
  TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequence);
  return (journal->slab->journalOrigin + offset);
}

/**
 * Get the lock object for a slab journal block by sequence number.
 *
 * @param journal         Slab journal to retrieve from
 * @param sequenceNumber  Sequence number of the block
 *
 * @return the lock object for the given sequence number
 **/
__attribute__((warn_unused_result))
static inline JournalLock *getLock(SlabJournal    *journal,
                                   SequenceNumber  sequenceNumber)
{
  TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequenceNumber);
  return &journal->locks[offset];
}

/**
 * Check whether the VDO is in read-only mode.
 *
 * @param journal  The journal whose owning VDO should be checked
 *
 * @return <code>true</code> if the VDO is in read-only mode
 **/
__attribute__((warn_unused_result))
static inline bool isVDOReadOnly(SlabJournal *journal)
{
  return isReadOnly(journal->slab->allocator->readOnlyNotifier);
}

/**
 * Check whether there are entry waiters which should delay a flush.
 *
 * @param journal  The journal to check
 *
 * @return <code>true</code> if there are no entry waiters, or if the slab
 *         is unrecovered
 **/
__attribute__((warn_unused_result))
static inline bool mustMakeEntriesToFlush(SlabJournal *journal)
{
  return (!slabIsRebuilding(journal->slab)
          && hasWaiters(&journal->entryWaiters));
}

/**
 * Check whether a reap is currently in progress.
 *
 * @param journal  The journal which may be reaping
 *
 * @return <code>true</code> if the journal is reaping
 **/
__attribute__((warn_unused_result))
static inline bool isReaping(SlabJournal *journal)
{
  return (journal->head != journal->unreapable);
}

/**********************************************************************/
bool isSlabJournalActive(SlabJournal *journal)
{
  return (mustMakeEntriesToFlush(journal)
          || isReaping(journal)
          || journal->waitingToCommit
          || !isRingEmpty(&journal->uncommittedBlocks)
          || journal->updatingSlabSummary);
}

/**
 * Initialize tail block as a new block.
 *
 * @param journal  The journal whose tail block is being initialized
 **/
static void initializeTailBlock(SlabJournal *journal)
{
  SlabJournalBlockHeader *header = &journal->tailHeader;
  header->sequenceNumber         = journal->tail;
  header->entryCount             = 0;
  header->hasBlockMapIncrements  = false;
}

/**
 * Set all journal fields appropriately to start journaling.
 *
 * @param journal  The journal to be reset, based on its tail sequence number
 **/
static void initializeJournalState(SlabJournal *journal)
{
  journal->unreapable = journal->head;
  journal->reapLock   = getLock(journal, journal->unreapable);
  journal->nextCommit = journal->tail;
  journal->summarized = journal->lastSummarized = journal->tail;
  initializeTailBlock(journal);
}

/**
 * Check whether a journal block is full.
 *
 * @param journal The slab journal for the block
 *
 * @return <code>true</code> if the tail block is full
 **/
__attribute__((warn_unused_result))
static bool blockIsFull(SlabJournal *journal)
{
  JournalEntryCount count = journal->tailHeader.entryCount;
  return (journal->tailHeader.hasBlockMapIncrements
          ? (journal->fullEntriesPerBlock == count)
          : (journal->entriesPerBlock == count));
}

/**********************************************************************/
static void addEntries(SlabJournal *journal);
static void updateTailBlockLocation(SlabJournal *journal);
static void releaseJournalLocks(Waiter *waiter, void *context);

/**********************************************************************/
int makeSlabJournal(BlockAllocator   *allocator,
                    Slab             *slab,
                    RecoveryJournal  *recoveryJournal,
                    SlabJournal     **journalPtr)
{
  SlabJournal *journal;
  const SlabConfig *slabConfig = getSlabConfig(allocator->depot);
  int result = ALLOCATE_EXTENDED(SlabJournal, slabConfig->slabJournalBlocks,
                                 JournalLock, __func__, &journal);
  if (result != VDO_SUCCESS) {
    return result;
  }

  journal->slab                = slab;
  journal->size                = slabConfig->slabJournalBlocks;
  journal->flushingThreshold   = slabConfig->slabJournalFlushingThreshold;
  journal->blockingThreshold   = slabConfig->slabJournalBlockingThreshold;
  journal->scrubbingThreshold  = slabConfig->slabJournalScrubbingThreshold;
  journal->entriesPerBlock     = SLAB_JOURNAL_ENTRIES_PER_BLOCK;
  journal->fullEntriesPerBlock = SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
  journal->events              = &allocator->slabJournalStatistics;
  journal->recoveryJournal     = recoveryJournal;
  journal->summary             = getSlabSummaryZone(allocator);
  journal->tail                = 1;
  journal->head                = 1;

  journal->flushingDeadline = journal->flushingThreshold;
  // Set there to be some time between the deadline and the blocking threshold,
  // so that hopefully all are done before blocking.
  if ((journal->blockingThreshold - journal->flushingThreshold) > 5) {
    journal->flushingDeadline = journal->blockingThreshold - 5;
  }

  journal->slabSummaryWaiter.callback = releaseJournalLocks;

  result = ALLOCATE(VDO_BLOCK_SIZE, char, "PackedSlabJournalBlock",
                    (char **) &journal->block);
  if (result != VDO_SUCCESS) {
    freeSlabJournal(&journal);
    return result;
  }

  initializeRing(&journal->dirtyNode);
  initializeRing(&journal->uncommittedBlocks);

  journal->tailHeader.nonce        = slab->allocator->nonce;
  journal->tailHeader.metadataType = VDO_METADATA_SLAB_JOURNAL;
  initializeJournalState(journal);

  *journalPtr = journal;
  return VDO_SUCCESS;
}

/**********************************************************************/
void freeSlabJournal(SlabJournal **journalPtr)
{
  SlabJournal *journal = *journalPtr;
  if (journal == NULL) {
    return;
  }

  FREE(journal->block);
  FREE(journal);
  *journalPtr = NULL;
}

/**********************************************************************/
bool isSlabJournalBlank(const SlabJournal *journal)
{
  return ((journal != NULL)
          && (journal->tail == 1)
          && (journal->tailHeader.entryCount == 0));
}

/**********************************************************************/
bool isSlabJournalDirty(const SlabJournal *journal)
{
  return (journal->recoveryLock != 0);
}

/**
 * Put a slab journal on the dirty ring of its allocator in the correct order.
 *
 * @param journal  The journal to be marked dirty
 * @param lock     The recovery journal lock held by the slab journal
 **/
static void markSlabJournalDirty(SlabJournal *journal, SequenceNumber lock)
{
  ASSERT_LOG_ONLY(!isSlabJournalDirty(journal), "slab journal was clean");

  journal->recoveryLock = lock;
  RingNode *dirtyRing   = &journal->slab->allocator->dirtySlabJournals;
  RingNode *node        = dirtyRing->prev;
  while (node != dirtyRing) {
    SlabJournal *dirtyJournal = slabJournalFromDirtyNode(node);
    if (dirtyJournal->recoveryLock <= journal->recoveryLock) {
      break;
    }

    node = node->prev;
  }

  pushRingNode(node->next, &journal->dirtyNode);
}

/**********************************************************************/
static void markSlabJournalClean(SlabJournal *journal)
{
  journal->recoveryLock = 0;
  unspliceRingNode(&journal->dirtyNode);
}

/**
 * Implements WaiterCallback. This callback is invoked on all VIOs waiting
 * to make slab journal entries after the VDO has gone into read-only mode.
 **/
static void abortWaiter(Waiter *waiter,
                        void   *context __attribute__((unused)))
{
  continueDataVIO(waiterAsDataVIO(waiter), VDO_READ_ONLY);
}

/**********************************************************************/
void abortSlabJournalWaiters(SlabJournal *journal)
{
  ASSERT_LOG_ONLY((getCallbackThreadID()
                   == journal->slab->allocator->threadID),
                  "abortSlabJournalWaiters() called on correct thread");
  notifyAllWaiters(&journal->entryWaiters, abortWaiter, journal);
  checkIfSlabDrained(journal->slab);
}

/**
 * Put the journal in read-only mode. All attempts to add entries after
 * this function is called will fail. All VIOs waiting for to make entries
 * will be awakened with an error. All flushes will complete as soon as all
 * pending IO is done.
 *
 * @param journal    The journal which has failed
 * @param errorCode  The error result triggering this call
 **/
static void enterJournalReadOnlyMode(SlabJournal *journal, int errorCode)
{
  enterReadOnlyMode(journal->slab->allocator->readOnlyNotifier, errorCode);
  abortSlabJournalWaiters(journal);
}

/**
 * Actually advance the head of the journal now that any necessary flushes
 * are complete.
 *
 * @param journal  The journal to be reaped
 **/
static void finishReaping(SlabJournal *journal)
{
  journal->head = journal->unreapable;
  addEntries(journal);
  checkIfSlabDrained(journal->slab);
}

/**********************************************************************/
static void reapSlabJournal(SlabJournal *journal);

/**
 * Finish reaping now that we have flushed the lower layer and then try
 * reaping again in case we deferred reaping due to an outstanding VIO.
 *
 * @param completion  The flush VIO
 **/
static void completeReaping(VDOCompletion *completion)
{
  VIOPoolEntry *entry   = completion->parent;
  SlabJournal  *journal = entry->parent;
  returnVIO(journal->slab->allocator, entry);
  finishReaping(journal);
  reapSlabJournal(journal);
}

/**
 * Handle an error flushing the lower layer.
 *
 * @param completion  The flush VIO
 **/
static void handleFlushError(VDOCompletion *completion)
{
  SlabJournal *journal = ((VIOPoolEntry *) completion->parent)->parent;
  enterJournalReadOnlyMode(journal, completion->result);
  completeReaping(completion);
}

/**
 * Waiter callback for getting a VIO with which to flush the lower layer prior
 * to reaping.
 *
 * @param waiter      The journal as a flush waiter
 * @param vioContext  The newly acquired flush VIO
 **/
static void flushForReaping(Waiter *waiter, void *vioContext)
{
  SlabJournal  *journal = slabJournalFromFlushWaiter(waiter);
  VIOPoolEntry *entry   = vioContext;
  VIO          *vio     = entry->vio;

  entry->parent                    = journal;
  vio->completion.callbackThreadID = journal->slab->allocator->threadID;
  launchFlush(vio, completeReaping, handleFlushError);
}

/**
 * Conduct a reap on a slab journal to reclaim unreferenced blocks.
 *
 * @param journal  The slab journal
 **/
static void reapSlabJournal(SlabJournal *journal)
{
  if (isReaping(journal)) {
    // We already have a reap in progress so wait for it to finish.
    return;
  }

  if (isUnrecoveredSlab(journal->slab) || !isNormal(&journal->slab->state)
      || isVDOReadOnly(journal)) {
    // We must not reap in the first two cases, and there's no point in
    // read-only mode.
    return;
  }

  /*
   * Start reclaiming blocks only when the journal head has no references. Then
   * stop when a block is referenced or reap reaches the most recently written
   * block, referenced by the slab summary, which has the sequence number just
   * before the tail.
   */
  bool reaped = false;
  while ((journal->unreapable < journal->tail)
         && (journal->reapLock->count == 0)) {
    reaped = true;
    journal->unreapable++;
    journal->reapLock++;
    if (journal->reapLock == &journal->locks[journal->size]) {
      journal->reapLock = &journal->locks[0];
    }
  }

  if (!reaped) {
    return;
  }

  PhysicalLayer *layer = journal->slab->allocator->completion.layer;
  if (layer->getWritePolicy(layer) == WRITE_POLICY_SYNC) {
    finishReaping(journal);
    return;
  }

  /*
   * In async mode, it is never safe to reap a slab journal block without first
   * issuing a flush, regardless of whether a user flush has been received or
   * not. In the absence of the flush, the reference block write which released
   * the locks allowing the slab journal to reap may not be persisted. Although
   * slab summary writes will eventually issue flushes, multiple slab journal
   * block writes can be issued while previous slab summary updates have not
   * yet been made. Even though those slab journal block writes will be ignored
   * if the slab summary update is not persisted, they may still overwrite the
   * to-be-reaped slab journal block resulting in a loss of reference count
   * updates (VDO-2912).
   *
   * In sync mode, it is similarly unsafe. However, we cannot possibly make
   * those additional slab journal block writes due to the blocking threshold
   * and the recovery journal's flush policy of flushing before every block.
   * We may make no more than (number of VIOs) entries in slab journals since
   * the last recovery journal flush; thus, due to the size of the slab
   * journal blocks, the RJ must have flushed the storage no more than one
   * slab journal block ago. So we could only overwrite the to-be-reaped block
   * if we wrote and flushed the last block in the journal. But the blocking
   * threshold prevents that.
   */
  journal->flushWaiter.callback = flushForReaping;
  int result = acquireVIO(journal->slab->allocator, &journal->flushWaiter);
  if (result != VDO_SUCCESS) {
    enterJournalReadOnlyMode(journal, result);
    return;
  }
}

/**
 * This is the callback invoked after a slab summary update completes. It
 * is registered in the constructor on behalf of updateTailBlockLocation().
 *
 * Implements WaiterCallback.
 *
 * @param waiter        The slab summary waiter that has just been notified
 * @param context       The result code of the update
 **/
static void releaseJournalLocks(Waiter *waiter, void *context)
{
  SlabJournal *journal = slabJournalFromSlabSummaryWaiter(waiter);
  int          result  = *((int *) context);
  if (result != VDO_SUCCESS) {
    if (result != VDO_READ_ONLY) {
      // Don't bother logging what might be lots of errors if we are already
      // in read-only mode.
      logErrorWithStringError(result, "failed slab summary update %llu",
                              journal->summarized);
    }

    journal->updatingSlabSummary = false;
    enterJournalReadOnlyMode(journal, result);
    return;
  }

  if (journal->partialWriteInProgress
      && (journal->summarized == journal->tail)) {
    journal->partialWriteInProgress = false;
    addEntries(journal);
  }

  SequenceNumber first    = journal->lastSummarized;
  journal->lastSummarized = journal->summarized;
  for (SequenceNumber i = journal->summarized - 1; i >= first; i--) {
    // Release the lock the summarized block held on the recovery journal.
    // (During replay, recoveryStart will always be 0.)
    if (journal->recoveryJournal != NULL) {
      ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
      releaseRecoveryJournalBlockReference(journal->recoveryJournal,
                                           getLock(journal, i)->recoveryStart,
                                           ZONE_TYPE_PHYSICAL,
                                           zoneNumber);

    }

    // Release our own lock against reaping for blocks that are committed.
    // (This function will not change locks during replay.)
    adjustSlabJournalBlockReference(journal, i, -1);
  }

  journal->updatingSlabSummary = false;

  reapSlabJournal(journal);

  // Check if the slab summary needs to be updated again.
  updateTailBlockLocation(journal);
}

/**
 * Update the tail block location in the slab summary, if necessary.
 *
 * @param journal  The slab journal that is updating its tail block location
 **/
static void updateTailBlockLocation(SlabJournal *journal)
{
  if (journal->updatingSlabSummary || isVDOReadOnly(journal)
      || (journal->lastSummarized >= journal->nextCommit)) {
    checkIfSlabDrained(journal->slab);
    return;
  }

  BlockCount freeBlockCount;
  if (isUnrecoveredSlab(journal->slab)) {
    freeBlockCount = getSummarizedFreeBlockCount(journal->summary,
                                                 journal->slab->slabNumber);
  } else {
    freeBlockCount = getSlabFreeBlockCount(journal->slab);
  }

  journal->summarized          = journal->nextCommit;
  journal->updatingSlabSummary = true;

  /*
   * Update slab summary as dirty.
   * Slab journal can only reap past sequence number 1 when all the refCounts
   * for this slab have been written to the layer. Therefore, indicate that the
   * refCounts must be loaded when the journal head has reaped past sequence
   * number 1.
   */
  TailBlockOffset blockOffset
    = getSlabJournalBlockOffset(journal, journal->summarized);
  updateSlabSummaryEntry(journal->summary, &journal->slabSummaryWaiter,
                         journal->slab->slabNumber, blockOffset,
                         (journal->head > 1), false, freeBlockCount);
}

/**********************************************************************/
void reopenSlabJournal(SlabJournal *journal)
{
  ASSERT_LOG_ONLY(journal->tailHeader.entryCount == 0,
                  "Slab journal's active block empty before reopening");
  journal->head       = journal->tail;
  initializeJournalState(journal);

  // Ensure no locks are spuriously held on an empty journal.
  for (SequenceNumber block = 1; block <= journal->size; block++) {
    ASSERT_LOG_ONLY((getLock(journal, block)->count == 0),
                    "Scrubbed journal's block %llu is not locked",
                    block);
  }

  addEntries(journal);
}

/**********************************************************************/
static SequenceNumber getCommittingSequenceNumber(const VIOPoolEntry *entry)
{
  const PackedSlabJournalBlock *block = entry->buffer;
  return getUInt64LE(block->header.fields.sequenceNumber);
}

/**
 * Handle post-commit processing. This is the callback registered by
 * writeSlabJournalBlock().
 *
 * @param completion  The write VIO as a completion
 **/
static void completeWrite(VDOCompletion *completion)
{
  int           writeResult = completion->result;
  VIOPoolEntry *entry       = completion->parent;
  SlabJournal  *journal     = entry->parent;

  SequenceNumber committed = getCommittingSequenceNumber(entry);
  unspliceRingNode(&entry->node);
  returnVIO(journal->slab->allocator, entry);

  if (writeResult != VDO_SUCCESS) {
    logErrorWithStringError(writeResult,
                            "cannot write slab journal block %llu",
                            committed);
    enterJournalReadOnlyMode(journal, writeResult);
    return;
  }

  relaxedAdd64(&journal->events->blocksWritten, 1);

  if (isRingEmpty(&journal->uncommittedBlocks)) {
    // If no blocks are outstanding, then the commit point is at the tail.
    journal->nextCommit = journal->tail;
  } else {
    // The commit point is always the beginning of the oldest incomplete block.
    VIOPoolEntry *oldest = asVIOPoolEntry(journal->uncommittedBlocks.next);
    journal->nextCommit = getCommittingSequenceNumber(oldest);
  }

  updateTailBlockLocation(journal);
}

/**
 * Callback from acquireVIO() registered in commitSlabJournalTail().
 *
 * @param waiter      The VIO pool waiter which was just notified
 * @param vioContext  The VIO pool entry for the write
 **/
static void writeSlabJournalBlock(Waiter *waiter, void *vioContext)
{
  SlabJournal            *journal = slabJournalFromResourceWaiter(waiter);
  VIOPoolEntry           *entry   = vioContext;
  SlabJournalBlockHeader *header  = &journal->tailHeader;

  header->head = journal->head;
  pushRingNode(&journal->uncommittedBlocks, &entry->node);
  packSlabJournalBlockHeader(header, &journal->block->header);

  // Copy the tail block into the VIO.
  memcpy(entry->buffer, journal->block, VDO_BLOCK_SIZE);

  int unusedEntries = journal->entriesPerBlock - header->entryCount;
  ASSERT_LOG_ONLY(unusedEntries >= 0, "Slab journal block is not overfull");
  if (unusedEntries > 0) {
    // Release the per-entry locks for any unused entries in the block we are
    // about to write.
    adjustSlabJournalBlockReference(journal, header->sequenceNumber,
                                    -unusedEntries);
    journal->partialWriteInProgress = !blockIsFull(journal);
  }

  PhysicalBlockNumber blockNumber
    = getBlockNumber(journal, header->sequenceNumber);

  entry->parent = journal;
  entry->vio->completion.callbackThreadID = journal->slab->allocator->threadID;
  /*
   * This block won't be read in recovery until the slab summary is updated
   * to refer to it. The slab summary update does a flush which is sufficient
   * to protect us from VDO-2331.
   */
  launchWriteMetadataVIO(entry->vio, blockNumber, completeWrite,
                         completeWrite);

  // Since the write is submitted, the tail block structure can be reused.
  journal->tail++;
  initializeTailBlock(journal);
  journal->waitingToCommit = false;
  if (journal->slab->state.state == ADMIN_STATE_WAITING_FOR_RECOVERY) {
    finishOperationWithResult(&journal->slab->state,
                              (isVDOReadOnly(journal)
                               ? VDO_READ_ONLY : VDO_SUCCESS));
    return;
  }

  addEntries(journal);
}

/**********************************************************************/
void commitSlabJournalTail(SlabJournal *journal)
{
  if ((journal->tailHeader.entryCount == 0)
      && mustMakeEntriesToFlush(journal)) {
    // There are no entries at the moment, but there are some waiters, so defer
    // initiating the flush until those entries are ready to write.
    return;
  }

  if (isVDOReadOnly(journal)
      || journal->waitingToCommit
      || (journal->tailHeader.entryCount == 0)) {
    // There is nothing to do since the tail block is empty, or writing, or
    // the journal is in read-only mode.
    return;
  }

  /*
   * Since we are about to commit the tail block, this journal no longer
   * needs to be on the ring of journals which the recovery journal might
   * ask to commit.
   */
  markSlabJournalClean(journal);

  journal->waitingToCommit = true;

  journal->resourceWaiter.callback = writeSlabJournalBlock;
  int result = acquireVIO(journal->slab->allocator, &journal->resourceWaiter);
  if (result != VDO_SUCCESS) {
    journal->waitingToCommit = false;
    enterJournalReadOnlyMode(journal, result);
    return;
  }
}

/**********************************************************************/
void encodeSlabJournalEntry(SlabJournalBlockHeader *tailHeader,
                            SlabJournalPayload     *payload,
                            SlabBlockNumber         sbn,
                            JournalOperation        operation)
{
  JournalEntryCount entryNumber = tailHeader->entryCount++;
  if (operation == BLOCK_MAP_INCREMENT) {
    if (!tailHeader->hasBlockMapIncrements) {
      memset(payload->fullEntries.entryTypes, 0,
             SLAB_JOURNAL_ENTRY_TYPES_SIZE);
      tailHeader->hasBlockMapIncrements = true;
    }

    payload->fullEntries.entryTypes[entryNumber / 8]
      |= ((byte) 1 << (entryNumber % 8));
  }

  packSlabJournalEntry(&payload->entries[entryNumber], sbn,
                       isIncrementOperation(operation));
}

/**********************************************************************/
SlabJournalEntry decodeSlabJournalEntry(PackedSlabJournalBlock *block,
                                        JournalEntryCount       entryCount)
{
  SlabJournalEntry entry
    = unpackSlabJournalEntry(&block->payload.entries[entryCount]);
  if (block->header.fields.hasBlockMapIncrements
      && ((block->payload.fullEntries.entryTypes[entryCount / 8]
           & ((byte) 1 << (entryCount % 8))) != 0)) {
    entry.operation = BLOCK_MAP_INCREMENT;
  }
  return entry;
}

/**
 * Actually add an entry to the slab journal, potentially firing off a write
 * if a block becomes full. This function is synchronous.
 *
 * @param journal        The slab journal to append to
 * @param pbn            The pbn being adjusted
 * @param operation      The type of entry to make
 * @param recoveryPoint  The recovery journal point for this entry
 **/
static void addEntry(SlabJournal         *journal,
                     PhysicalBlockNumber  pbn,
                     JournalOperation     operation,
                     const JournalPoint  *recoveryPoint)
{
  int result = ASSERT(beforeJournalPoint(&journal->tailHeader.recoveryPoint,
                                         recoveryPoint),
                      "recovery journal point is monotonically increasing, "
                      "recovery point: %llu.%u, "
                      "block recovery point: %llu.%u",
                      recoveryPoint->sequenceNumber, recoveryPoint->entryCount,
                      journal->tailHeader.recoveryPoint.sequenceNumber,
                      journal->tailHeader.recoveryPoint.entryCount);
  if (result != VDO_SUCCESS) {
    enterJournalReadOnlyMode(journal, result);
    return;
  }

  PackedSlabJournalBlock *block = journal->block;
  if (operation == BLOCK_MAP_INCREMENT) {
    result = ASSERT_LOG_ONLY((journal->tailHeader.entryCount
                              < journal->fullEntriesPerBlock),
                             "block has room for full entries");
    if (result != VDO_SUCCESS) {
      enterJournalReadOnlyMode(journal, result);
      return;
    }
  }

  encodeSlabJournalEntry(&journal->tailHeader, &block->payload,
                         pbn - journal->slab->start, operation);
  journal->tailHeader.recoveryPoint = *recoveryPoint;
  if (blockIsFull(journal)) {
    commitSlabJournalTail(journal);
  }
}

/**********************************************************************/
bool attemptReplayIntoSlabJournal(SlabJournal         *journal,
                                  PhysicalBlockNumber  pbn,
                                  JournalOperation     operation,
                                  JournalPoint        *recoveryPoint,
                                  VDOCompletion       *parent)
{
  // Only accept entries after the current recovery point.
  if (!beforeJournalPoint(&journal->tailHeader.recoveryPoint, recoveryPoint)) {
    return true;
  }

  SlabJournalBlockHeader *header = &journal->tailHeader;
  if ((header->entryCount >= journal->fullEntriesPerBlock)
      && (header->hasBlockMapIncrements ||
          (operation == BLOCK_MAP_INCREMENT))) {
    // The tail block does not have room for the entry we are attempting
    // to add so commit the tail block now.
    commitSlabJournalTail(journal);
  }

  if (journal->waitingToCommit) {
    startOperationWithWaiter(&journal->slab->state,
                             ADMIN_STATE_WAITING_FOR_RECOVERY, parent, NULL);
    return false;
  }

  if ((journal->tail - journal->head) >= journal->size) {
    /*
     * We must have reaped the current head before the crash, since
     * the blocked threshold keeps us from having more entries than
     * fit in a slab journal; hence we can just advance the head
     * (and unreapable block), as needed.
     */
    journal->head++;
    journal->unreapable++;
  }

  markSlabReplaying(journal->slab);
  addEntry(journal, pbn, operation, recoveryPoint);
  return true;
}

/**
 * Check whether the journal should be saving reference blocks out.
 *
 * @param journal       The journal to check
 *
 * @return true if the journal should be requesting reference block writes
 **/
static bool requiresFlushing(const SlabJournal *journal)
{
  BlockCount journalLength = (journal->tail - journal->head);
  return (journalLength >= journal->flushingThreshold);
}

/**
 * Check whether the journal must be reaped before adding new entries.
 *
 * @param journal       The journal to check
 *
 * @return true if the journal must be reaped
 **/
static bool requiresReaping(const SlabJournal *journal)
{
  BlockCount journalLength = (journal->tail - journal->head);
  return (journalLength >= journal->blockingThreshold);
}

/**********************************************************************/
bool requiresScrubbing(const SlabJournal *journal)
{
  BlockCount journalLength = (journal->tail - journal->head);
  return (journalLength >= journal->scrubbingThreshold);
}

/**
 * Implements WaiterCallback. This callback is invoked by addEntries() once
 * it has determined that we are ready to make another entry in the slab
 * journal.
 *
 * @param waiter        The VIO which should make an entry now
 * @param context       The slab journal to make an entry in
 **/
static void addEntryFromWaiter(Waiter *waiter, void *context)
{
  DataVIO     *dataVIO = waiterAsDataVIO(waiter);
  SlabJournal *journal = (SlabJournal *) context;
  SlabJournalBlockHeader *header = &journal->tailHeader;
  SequenceNumber recoveryBlock = dataVIO->recoveryJournalPoint.sequenceNumber;

  if (header->entryCount == 0) {
    /*
     * This is the first entry in the current tail block, so get a lock
     * on the recovery journal which we will hold until this tail block is
     * committed.
     */
    getLock(journal, header->sequenceNumber)->recoveryStart = recoveryBlock;
    if (journal->recoveryJournal != NULL) {
      ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
      acquireRecoveryJournalBlockReference(journal->recoveryJournal,
                                           recoveryBlock, ZONE_TYPE_PHYSICAL,
                                           zoneNumber);
    }
    markSlabJournalDirty(journal, recoveryBlock);

    // If the slab journal is over the first threshold, tell the refCounts to
    // write some reference blocks, but proceed apace.
    if (requiresFlushing(journal)) {
      relaxedAdd64(&journal->events->flushCount, 1);
      BlockCount journalLength = (journal->tail - journal->head);
      BlockCount blocksToDeadline = 0;
      if (journalLength <= journal->flushingDeadline) {
        blocksToDeadline = journal->flushingDeadline - journalLength;
      }
      saveSeveralReferenceBlocks(journal->slab->referenceCounts,
                                 blocksToDeadline + 1);
    }
  }

  JournalPoint slabJournalPoint = {
    .sequenceNumber = header->sequenceNumber,
    .entryCount     = header->entryCount,
  };

  addEntry(journal, dataVIO->operation.pbn, dataVIO->operation.type,
           &dataVIO->recoveryJournalPoint);

  // Now that an entry has been made in the slab journal, update the
  // reference counts.
  int result = modifySlabReferenceCount(journal->slab, &slabJournalPoint,
                                        dataVIO->operation);
  continueDataVIO(dataVIO, result);
}

/**
 * Check whether the next entry to be made is a block map increment.
 *
 * @param journal  The journal
 *
 * @return <code>true</code> if the first entry waiter's operation is a block
 *         map increment
 **/
static inline bool isNextEntryABlockMapIncrement(SlabJournal *journal)
{
  DataVIO *dataVIO = waiterAsDataVIO(getFirstWaiter(&journal->entryWaiters));
  return (dataVIO->operation.type == BLOCK_MAP_INCREMENT);
}

/**
 * Add as many entries as possible from the queue of VIOs waiting to make
 * entries. By processing the queue in order, we ensure that slab journal
 * entries are made in the same order as recovery journal entries for the
 * same increment or decrement.
 *
 * @param journal  The journal to which entries may be added
 **/
static void addEntries(SlabJournal *journal)
{
  if (journal->addingEntries) {
    // Protect against re-entrancy.
    return;
  }

  journal->addingEntries = true;
  while (hasWaiters(&journal->entryWaiters)) {
    if (journal->partialWriteInProgress || slabIsRebuilding(journal->slab)) {
      // Don't add entries while rebuilding or while a partial write is
      // outstanding (VDO-2399).
      break;
    }

    SlabJournalBlockHeader *header = &journal->tailHeader;
    if (journal->waitingToCommit) {
      // If we are waiting for resources to write the tail block, and the
      // tail block is full, we can't make another entry.
      relaxedAdd64(&journal->events->tailBusyCount, 1);
      break;
    } else if (isNextEntryABlockMapIncrement(journal)
               && (header->entryCount >= journal->fullEntriesPerBlock)) {
      // The tail block does not have room for a block map increment, so
      // commit it now.
      commitSlabJournalTail(journal);
      if (journal->waitingToCommit) {
        relaxedAdd64(&journal->events->tailBusyCount, 1);
        break;
      }
    }

    // If the slab is over the blocking threshold, make the VIO wait.
    if (requiresReaping(journal)) {
      relaxedAdd64(&journal->events->blockedCount, 1);
      saveDirtyReferenceBlocks(journal->slab->referenceCounts);
      break;
    }

    if (header->entryCount == 0) {
      JournalLock *lock = getLock(journal, header->sequenceNumber);
      // Check if the on disk slab journal is full. Because of the
      // blocking and scrubbing thresholds, this should never happen.
      if (lock->count > 0) {
        ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
                        "New block has locks, but journal is not full");

        /*
         * The blocking threshold must let the journal fill up if the new
         * block has locks; if the blocking threshold is smaller than the
         * journal size, the new block cannot possibly have locks already.
         */
        ASSERT_LOG_ONLY((journal->blockingThreshold >= journal->size),
                        "New block can have locks already iff blocking"
                        "threshold is at the end of the journal");

        relaxedAdd64(&journal->events->diskFullCount, 1);
        saveDirtyReferenceBlocks(journal->slab->referenceCounts);
        break;
      }

      /*
       * Don't allow the new block to be reaped until all of the reference
       * count blocks are written and the journal block has been
       * fully committed as well.
       */
      lock->count = journal->entriesPerBlock + 1;

      if (header->sequenceNumber == 1) {
        /*
         * This is the first entry in this slab journal, ever. Dirty all of
         * the reference count blocks. Each will acquire a lock on the
         * tail block so that the journal won't be reaped until the
         * reference counts are initialized. The lock acquisition must
         * be done by the RefCounts since here we don't know how many
         * reference blocks the RefCounts has.
         */
        acquireDirtyBlockLocks(journal->slab->referenceCounts);
      }
    }

    notifyNextWaiter(&journal->entryWaiters, addEntryFromWaiter, journal);
  }

  journal->addingEntries = false;

  // If there are no waiters, and we are flushing or saving, commit the
  // tail block.
  if (isSlabDraining(journal->slab) && !isSuspending(&journal->slab->state)
      && !hasWaiters(&journal->entryWaiters)) {
    commitSlabJournalTail(journal);
  }
}

/**********************************************************************/
void addSlabJournalEntry(SlabJournal *journal, DataVIO *dataVIO)
{
  if (!isSlabOpen(journal->slab)) {
    continueDataVIO(dataVIO, VDO_INVALID_ADMIN_STATE);
    return;
  }

  if (isVDOReadOnly(journal)) {
    continueDataVIO(dataVIO, VDO_READ_ONLY);
    return;
  }

  int result = enqueueDataVIO(&journal->entryWaiters, dataVIO,
                              THIS_LOCATION("$F($j-$js)"));
  if (result != VDO_SUCCESS) {
    continueDataVIO(dataVIO, result);
    return;
  }

  if (isUnrecoveredSlab(journal->slab) && requiresReaping(journal)) {
    increaseScrubbingPriority(journal->slab);
  }

  addEntries(journal);
}

/**********************************************************************/
void adjustSlabJournalBlockReference(SlabJournal    *journal,
                                     SequenceNumber  sequenceNumber,
                                     int             adjustment)
{
  if (sequenceNumber == 0) {
    return;
  }

  if (isReplayingSlab(journal->slab)) {
    // Locks should not be used during offline replay.
    return;
  }

  ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
  JournalLock *lock = getLock(journal, sequenceNumber);
  if (adjustment < 0) {
    ASSERT_LOG_ONLY((-adjustment <= lock->count),
                    "adjustment %d of lock count %u for slab journal block %"
		    PRIu64 " must not underflow", adjustment, lock->count,
		    sequenceNumber);
  }

  lock->count += adjustment;
  if (lock->count == 0) {
    reapSlabJournal(journal);
  }
}

/**********************************************************************/
bool releaseRecoveryJournalLock(SlabJournal    *journal,
                                SequenceNumber  recoveryLock)
{
  if (recoveryLock > journal->recoveryLock) {
    ASSERT_LOG_ONLY((recoveryLock < journal->recoveryLock),
                    "slab journal recovery lock is not older than the recovery"
                    " journal head");
    return false;
  }

  if ((recoveryLock < journal->recoveryLock) || isVDOReadOnly(journal)) {
    return false;
  }

  // All locks are held by the block which is in progress; write it.
  commitSlabJournalTail(journal);
  return true;
}

/**********************************************************************/
void drainSlabJournal(SlabJournal *journal)
{
  ASSERT_LOG_ONLY((getCallbackThreadID()
                   == journal->slab->allocator->threadID),
                  "drainSlabJournal() called on correct thread");
  if (isQuiescing(&journal->slab->state)) {
    // XXX: we should revisit this assertion since it is no longer clear what
    //      it is for.
    ASSERT_LOG_ONLY((!(slabIsRebuilding(journal->slab)
                       && hasWaiters(&journal->entryWaiters))),
                    "slab is recovered or has no waiters");
  }

  switch (journal->slab->state.state) {
  case ADMIN_STATE_REBUILDING:
  case ADMIN_STATE_SUSPENDING:
  case ADMIN_STATE_SAVE_FOR_SCRUBBING:
    break;

  default:
    commitSlabJournalTail(journal);
  }
}

/**
 * Finish the decode process by returning the VIO and notifying the slab that
 * we're done.
 *
 * @param completion  The VIO as a completion
 **/
static void finishDecodingJournal(VDOCompletion *completion)
{
  int           result  = completion->result;
  VIOPoolEntry *entry   = completion->parent;
  SlabJournal  *journal = entry->parent;
  returnVIO(journal->slab->allocator, entry);
  notifySlabJournalIsLoaded(journal->slab, result);
}

/**
 * Set up the in-memory journal state to the state which was written to disk.
 * This is the callback registered in readSlabJournalTail().
 *
 * @param completion  The VIO which was used to read the journal tail
 **/
static void setDecodedState(VDOCompletion *completion)
{
  VIOPoolEntry           *entry   = completion->parent;
  SlabJournal            *journal = entry->parent;
  PackedSlabJournalBlock *block   = entry->buffer;

  SlabJournalBlockHeader header;
  unpackSlabJournalBlockHeader(&block->header, &header);

  if ((header.metadataType != VDO_METADATA_SLAB_JOURNAL)
      || (header.nonce != journal->slab->allocator->nonce)) {
    finishDecodingJournal(completion);
    return;
  }

  journal->tail = header.sequenceNumber + 1;

  // If the slab is clean, this implies the slab journal is empty, so advance
  // the head appropriately.
  if (getSummarizedCleanliness(journal->summary, journal->slab->slabNumber)) {
    journal->head = journal->tail;
  } else {
    journal->head = header.head;
  }

  journal->tailHeader = header;
  initializeJournalState(journal);
  finishDecodingJournal(completion);
}

/**
 * This reads the slab journal tail block by using a VIO acquired from the VIO
 * pool. This is the success callback from acquireVIOFromPool() when decoding
 * the slab journal.
 *
 * @param waiter      The VIO pool waiter which has just been notified
 * @param vioContext  The VIO pool entry given to the waiter
 **/
static void readSlabJournalTail(Waiter *waiter, void *vioContext)
{
  SlabJournal  *journal = slabJournalFromResourceWaiter(waiter);
  Slab         *slab    = journal->slab;
  VIOPoolEntry *entry   = vioContext;
  TailBlockOffset lastCommitPoint
    = getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
  entry->parent = journal;


  // Slab summary keeps the commit point offset, so the tail block is the
  // block before that. Calculation supports small journals in unit tests.
  TailBlockOffset tailBlock = ((lastCommitPoint == 0)
                               ? (TailBlockOffset) (journal->size - 1)
                               : (lastCommitPoint - 1));
  entry->vio->completion.callbackThreadID = slab->allocator->threadID;
  launchReadMetadataVIO(entry->vio, slab->journalOrigin + tailBlock,
                        setDecodedState, finishDecodingJournal);
}

/**********************************************************************/
void decodeSlabJournal(SlabJournal *journal)
{
  ASSERT_LOG_ONLY((getCallbackThreadID()
                   == journal->slab->allocator->threadID),
                  "decodeSlabJournal() called on correct thread");
  Slab *slab = journal->slab;
  TailBlockOffset lastCommitPoint
    = getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
  if ((lastCommitPoint == 0)
      && !mustLoadRefCounts(journal->summary, slab->slabNumber)) {
    /*
     * This slab claims that it has a tail block at (journal->size - 1), but
     * a head of 1. This is impossible, due to the scrubbing threshold, on
     * a real system, so don't bother reading the (bogus) data off disk.
     */
    ASSERT_LOG_ONLY(((journal->size < 16)
                     || (journal->scrubbingThreshold < (journal->size - 1))),
                    "Scrubbing threshold protects against reads of unwritten"
                    "slab journal blocks");
    notifySlabJournalIsLoaded(slab, VDO_SUCCESS);
    return;
  }

  journal->resourceWaiter.callback = readSlabJournalTail;
  int result = acquireVIO(slab->allocator, &journal->resourceWaiter);
  if (result != VDO_SUCCESS) {
    notifySlabJournalIsLoaded(slab, result);
  }
}

/**********************************************************************/
void dumpSlabJournal(const SlabJournal *journal)
{
  logInfo("  slab journal: entryWaiters=%zu waitingToCommit=%s"
          " updatingSlabSummary=%s head=%llu unreapable=%" PRIu64
          " tail=%llu nextCommit=%llu summarized=%" PRIu64
          " lastSummarized=%llu recoveryJournalLock=%" PRIu64
          " dirty=%s", countWaiters(&journal->entryWaiters),
          boolToString(journal->waitingToCommit),
          boolToString(journal->updatingSlabSummary),
          journal->head, journal->unreapable, journal->tail,
          journal->nextCommit, journal->summarized, journal->lastSummarized,
          journal->recoveryLock,
          boolToString(isSlabJournalDirty(journal)));
  // Given the frequency with which the locks are just a tiny bit off, it
  // might be worth dumping all the locks, but that might be too much logging.
}