Blame source/vdo/base/slabJournal.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/slabJournal.c#18 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "slabJournalInternals.h"
Packit Service 310c69
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "stringUtils.h"
Packit Service 310c69
Packit Service 310c69
#include "adminState.h"
Packit Service 310c69
#include "blockAllocatorInternals.h"
Packit Service 310c69
#include "dataVIO.h"
Packit Service 310c69
#include "recoveryJournal.h"
Packit Service 310c69
#include "refCounts.h"
Packit Service 310c69
#include "slabDepot.h"
Packit Service 310c69
#include "slabSummary.h"
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Return the slab journal from the resource waiter.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter  The waiter
Packit Service 310c69
 *
Packit Service 310c69
 * @return The slab journal
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline SlabJournal *slabJournalFromResourceWaiter(Waiter *waiter)
Packit Service 310c69
{
Packit Service 310c69
  STATIC_ASSERT(offsetof(SlabJournal, resourceWaiter) == 0);
Packit Service 310c69
  return (SlabJournal *) waiter;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Return the slab journal from the flush waiter.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter  The waiter
Packit Service 310c69
 *
Packit Service 310c69
 * @return The slab journal
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline SlabJournal *slabJournalFromFlushWaiter(Waiter *waiter)
Packit Service 310c69
{
Packit Service 310c69
  if (waiter == NULL) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
  return (SlabJournal *)
Packit Service 310c69
    ((uintptr_t) waiter - offsetof(SlabJournal, flushWaiter));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
SlabJournal *slabJournalFromDirtyNode(RingNode *node)
Packit Service 310c69
{
Packit Service 310c69
  if (node == NULL) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
  return (SlabJournal *) ((uintptr_t) node - offsetof(SlabJournal, dirtyNode));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Return the slab journal from the slab summary waiter.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter  The waiter
Packit Service 310c69
 *
Packit Service 310c69
 * @return The slab journal
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline SlabJournal *slabJournalFromSlabSummaryWaiter(Waiter *waiter)
Packit Service 310c69
{
Packit Service 310c69
  if (waiter == NULL) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
  return (SlabJournal *)
Packit Service 310c69
    ((uintptr_t) waiter - offsetof(SlabJournal, slabSummaryWaiter));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Get the physical block number for a given sequence number.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal   The journal
Packit Service 310c69
 * @param sequence  The sequence number of the desired block
Packit Service 310c69
 *
Packit Service 310c69
 * @return the block number corresponding to the sequence number
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline PhysicalBlockNumber getBlockNumber(SlabJournal    *journal,
Packit Service 310c69
                                                 SequenceNumber  sequence)
Packit Service 310c69
{
Packit Service 310c69
  TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequence);
Packit Service 310c69
  return (journal->slab->journalOrigin + offset);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Get the lock object for a slab journal block by sequence number.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal         Slab journal to retrieve from
Packit Service 310c69
 * @param sequenceNumber  Sequence number of the block
Packit Service 310c69
 *
Packit Service 310c69
 * @return the lock object for the given sequence number
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline JournalLock *getLock(SlabJournal    *journal,
Packit Service 310c69
                                   SequenceNumber  sequenceNumber)
Packit Service 310c69
{
Packit Service 310c69
  TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequenceNumber);
Packit Service 310c69
  return &journal->locks[offset];
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether the VDO is in read-only mode.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal whose owning VDO should be checked
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the VDO is in read-only mode
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline bool isVDOReadOnly(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return isReadOnly(journal->slab->allocator->readOnlyNotifier);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether there are entry waiters which should delay a flush.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal to check
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if there are no entry waiters, or if the slab
Packit Service 310c69
 *         is unrecovered
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline bool mustMakeEntriesToFlush(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return (!slabIsRebuilding(journal->slab)
Packit Service 310c69
          && hasWaiters(&journal->entryWaiters));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether a reap is currently in progress.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal which may be reaping
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the journal is reaping
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline bool isReaping(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return (journal->head != journal->unreapable);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isSlabJournalActive(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return (mustMakeEntriesToFlush(journal)
Packit Service 310c69
          || isReaping(journal)
Packit Service 310c69
          || journal->waitingToCommit
Packit Service 310c69
          || !isRingEmpty(&journal->uncommittedBlocks)
Packit Service 310c69
          || journal->updatingSlabSummary);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Initialize tail block as a new block.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal whose tail block is being initialized
Packit Service 310c69
 **/
Packit Service 310c69
static void initializeTailBlock(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournalBlockHeader *header = &journal->tailHeader;
Packit Service 310c69
  header->sequenceNumber         = journal->tail;
Packit Service 310c69
  header->entryCount             = 0;
Packit Service 310c69
  header->hasBlockMapIncrements  = false;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Set all journal fields appropriately to start journaling.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal to be reset, based on its tail sequence number
Packit Service 310c69
 **/
Packit Service 310c69
static void initializeJournalState(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  journal->unreapable = journal->head;
Packit Service 310c69
  journal->reapLock   = getLock(journal, journal->unreapable);
Packit Service 310c69
  journal->nextCommit = journal->tail;
Packit Service 310c69
  journal->summarized = journal->lastSummarized = journal->tail;
Packit Service 310c69
  initializeTailBlock(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether a journal block is full.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal The slab journal for the block
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the tail block is full
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static bool blockIsFull(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  JournalEntryCount count = journal->tailHeader.entryCount;
Packit Service 310c69
  return (journal->tailHeader.hasBlockMapIncrements
Packit Service 310c69
          ? (journal->fullEntriesPerBlock == count)
Packit Service 310c69
          : (journal->entriesPerBlock == count));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void addEntries(SlabJournal *journal);
Packit Service 310c69
static void updateTailBlockLocation(SlabJournal *journal);
Packit Service 310c69
static void releaseJournalLocks(Waiter *waiter, void *context);
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeSlabJournal(BlockAllocator   *allocator,
Packit Service 310c69
                    Slab             *slab,
Packit Service 310c69
                    RecoveryJournal  *recoveryJournal,
Packit Service 310c69
                    SlabJournal     **journalPtr)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal *journal;
Packit Service 310c69
  const SlabConfig *slabConfig = getSlabConfig(allocator->depot);
Packit Service 310c69
  int result = ALLOCATE_EXTENDED(SlabJournal, slabConfig->slabJournalBlocks,
Packit Service 310c69
                                 JournalLock, __func__, &journal);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->slab                = slab;
Packit Service 310c69
  journal->size                = slabConfig->slabJournalBlocks;
Packit Service 310c69
  journal->flushingThreshold   = slabConfig->slabJournalFlushingThreshold;
Packit Service 310c69
  journal->blockingThreshold   = slabConfig->slabJournalBlockingThreshold;
Packit Service 310c69
  journal->scrubbingThreshold  = slabConfig->slabJournalScrubbingThreshold;
Packit Service 310c69
  journal->entriesPerBlock     = SLAB_JOURNAL_ENTRIES_PER_BLOCK;
Packit Service 310c69
  journal->fullEntriesPerBlock = SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
Packit Service 310c69
  journal->events              = &allocator->slabJournalStatistics;
Packit Service 310c69
  journal->recoveryJournal     = recoveryJournal;
Packit Service 310c69
  journal->summary             = getSlabSummaryZone(allocator);
Packit Service 310c69
  journal->tail                = 1;
Packit Service 310c69
  journal->head                = 1;
Packit Service 310c69
Packit Service 310c69
  journal->flushingDeadline = journal->flushingThreshold;
Packit Service 310c69
  // Set there to be some time between the deadline and the blocking threshold,
Packit Service 310c69
  // so that hopefully all are done before blocking.
Packit Service 310c69
  if ((journal->blockingThreshold - journal->flushingThreshold) > 5) {
Packit Service 310c69
    journal->flushingDeadline = journal->blockingThreshold - 5;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->slabSummaryWaiter.callback = releaseJournalLocks;
Packit Service 310c69
Packit Service 310c69
  result = ALLOCATE(VDO_BLOCK_SIZE, char, "PackedSlabJournalBlock",
Packit Service 310c69
                    (char **) &journal->block);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    freeSlabJournal(&journal);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  initializeRing(&journal->dirtyNode);
Packit Service 310c69
  initializeRing(&journal->uncommittedBlocks);
Packit Service 310c69
Packit Service 310c69
  journal->tailHeader.nonce        = slab->allocator->nonce;
Packit Service 310c69
  journal->tailHeader.metadataType = VDO_METADATA_SLAB_JOURNAL;
Packit Service 310c69
  initializeJournalState(journal);
Packit Service 310c69
Packit Service 310c69
  *journalPtr = journal;
Packit Service 310c69
  return VDO_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freeSlabJournal(SlabJournal **journalPtr)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal *journal = *journalPtr;
Packit Service 310c69
  if (journal == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  FREE(journal->block);
Packit Service 310c69
  FREE(journal);
Packit Service 310c69
  *journalPtr = NULL;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isSlabJournalBlank(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return ((journal != NULL)
Packit Service 310c69
          && (journal->tail == 1)
Packit Service 310c69
          && (journal->tailHeader.entryCount == 0));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isSlabJournalDirty(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  return (journal->recoveryLock != 0);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Put a slab journal on the dirty ring of its allocator in the correct order.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal to be marked dirty
Packit Service 310c69
 * @param lock     The recovery journal lock held by the slab journal
Packit Service 310c69
 **/
Packit Service 310c69
static void markSlabJournalDirty(SlabJournal *journal, SequenceNumber lock)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY(!isSlabJournalDirty(journal), "slab journal was clean");
Packit Service 310c69
Packit Service 310c69
  journal->recoveryLock = lock;
Packit Service 310c69
  RingNode *dirtyRing   = &journal->slab->allocator->dirtySlabJournals;
Packit Service 310c69
  RingNode *node        = dirtyRing->prev;
Packit Service 310c69
  while (node != dirtyRing) {
Packit Service 310c69
    SlabJournal *dirtyJournal = slabJournalFromDirtyNode(node);
Packit Service 310c69
    if (dirtyJournal->recoveryLock <= journal->recoveryLock) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    node = node->prev;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  pushRingNode(node->next, &journal->dirtyNode);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void markSlabJournalClean(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  journal->recoveryLock = 0;
Packit Service 310c69
  unspliceRingNode(&journal->dirtyNode);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Implements WaiterCallback. This callback is invoked on all VIOs waiting
Packit Service 310c69
 * to make slab journal entries after the VDO has gone into read-only mode.
Packit Service 310c69
 **/
Packit Service 310c69
static void abortWaiter(Waiter *waiter,
Packit Service 310c69
                        void   *context __attribute__((unused)))
Packit Service 310c69
{
Packit Service 310c69
  continueDataVIO(waiterAsDataVIO(waiter), VDO_READ_ONLY);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void abortSlabJournalWaiters(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY((getCallbackThreadID()
Packit Service 310c69
                   == journal->slab->allocator->threadID),
Packit Service 310c69
                  "abortSlabJournalWaiters() called on correct thread");
Packit Service 310c69
  notifyAllWaiters(&journal->entryWaiters, abortWaiter, journal);
Packit Service 310c69
  checkIfSlabDrained(journal->slab);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Put the journal in read-only mode. All attempts to add entries after
Packit Service 310c69
 * this function is called will fail. All VIOs waiting for to make entries
Packit Service 310c69
 * will be awakened with an error. All flushes will complete as soon as all
Packit Service 310c69
 * pending IO is done.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal    The journal which has failed
Packit Service 310c69
 * @param errorCode  The error result triggering this call
Packit Service 310c69
 **/
Packit Service 310c69
static void enterJournalReadOnlyMode(SlabJournal *journal, int errorCode)
Packit Service 310c69
{
Packit Service 310c69
  enterReadOnlyMode(journal->slab->allocator->readOnlyNotifier, errorCode);
Packit Service 310c69
  abortSlabJournalWaiters(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Actually advance the head of the journal now that any necessary flushes
Packit Service 310c69
 * are complete.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal to be reaped
Packit Service 310c69
 **/
Packit Service 310c69
static void finishReaping(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  journal->head = journal->unreapable;
Packit Service 310c69
  addEntries(journal);
Packit Service 310c69
  checkIfSlabDrained(journal->slab);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void reapSlabJournal(SlabJournal *journal);
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Finish reaping now that we have flushed the lower layer and then try
Packit Service 310c69
 * reaping again in case we deferred reaping due to an outstanding VIO.
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The flush VIO
Packit Service 310c69
 **/
Packit Service 310c69
static void completeReaping(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  VIOPoolEntry *entry   = completion->parent;
Packit Service 310c69
  SlabJournal  *journal = entry->parent;
Packit Service 310c69
  returnVIO(journal->slab->allocator, entry);
Packit Service 310c69
  finishReaping(journal);
Packit Service 310c69
  reapSlabJournal(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Handle an error flushing the lower layer.
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The flush VIO
Packit Service 310c69
 **/
Packit Service 310c69
static void handleFlushError(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal *journal = ((VIOPoolEntry *) completion->parent)->parent;
Packit Service 310c69
  enterJournalReadOnlyMode(journal, completion->result);
Packit Service 310c69
  completeReaping(completion);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Waiter callback for getting a VIO with which to flush the lower layer prior
Packit Service 310c69
 * to reaping.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter      The journal as a flush waiter
Packit Service 310c69
 * @param vioContext  The newly acquired flush VIO
Packit Service 310c69
 **/
Packit Service 310c69
static void flushForReaping(Waiter *waiter, void *vioContext)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal  *journal = slabJournalFromFlushWaiter(waiter);
Packit Service 310c69
  VIOPoolEntry *entry   = vioContext;
Packit Service 310c69
  VIO          *vio     = entry->vio;
Packit Service 310c69
Packit Service 310c69
  entry->parent                    = journal;
Packit Service 310c69
  vio->completion.callbackThreadID = journal->slab->allocator->threadID;
Packit Service 310c69
  launchFlush(vio, completeReaping, handleFlushError);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Conduct a reap on a slab journal to reclaim unreferenced blocks.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The slab journal
Packit Service 310c69
 **/
Packit Service 310c69
static void reapSlabJournal(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  if (isReaping(journal)) {
Packit Service 310c69
    // We already have a reap in progress so wait for it to finish.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isUnrecoveredSlab(journal->slab) || !isNormal(&journal->slab->state)
Packit Service 310c69
      || isVDOReadOnly(journal)) {
Packit Service 310c69
    // We must not reap in the first two cases, and there's no point in
Packit Service 310c69
    // read-only mode.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Start reclaiming blocks only when the journal head has no references. Then
Packit Service 310c69
   * stop when a block is referenced or reap reaches the most recently written
Packit Service 310c69
   * block, referenced by the slab summary, which has the sequence number just
Packit Service 310c69
   * before the tail.
Packit Service 310c69
   */
Packit Service 310c69
  bool reaped = false;
Packit Service 310c69
  while ((journal->unreapable < journal->tail)
Packit Service 310c69
         && (journal->reapLock->count == 0)) {
Packit Service 310c69
    reaped = true;
Packit Service 310c69
    journal->unreapable++;
Packit Service 310c69
    journal->reapLock++;
Packit Service 310c69
    if (journal->reapLock == &journal->locks[journal->size]) {
Packit Service 310c69
      journal->reapLock = &journal->locks[0];
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (!reaped) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  PhysicalLayer *layer = journal->slab->allocator->completion.layer;
Packit Service 310c69
  if (layer->getWritePolicy(layer) == WRITE_POLICY_SYNC) {
Packit Service 310c69
    finishReaping(journal);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * In async mode, it is never safe to reap a slab journal block without first
Packit Service 310c69
   * issuing a flush, regardless of whether a user flush has been received or
Packit Service 310c69
   * not. In the absence of the flush, the reference block write which released
Packit Service 310c69
   * the locks allowing the slab journal to reap may not be persisted. Although
Packit Service 310c69
   * slab summary writes will eventually issue flushes, multiple slab journal
Packit Service 310c69
   * block writes can be issued while previous slab summary updates have not
Packit Service 310c69
   * yet been made. Even though those slab journal block writes will be ignored
Packit Service 310c69
   * if the slab summary update is not persisted, they may still overwrite the
Packit Service 310c69
   * to-be-reaped slab journal block resulting in a loss of reference count
Packit Service 310c69
   * updates (VDO-2912).
Packit Service 310c69
   *
Packit Service 310c69
   * In sync mode, it is similarly unsafe. However, we cannot possibly make
Packit Service 310c69
   * those additional slab journal block writes due to the blocking threshold
Packit Service 310c69
   * and the recovery journal's flush policy of flushing before every block.
Packit Service 310c69
   * We may make no more than (number of VIOs) entries in slab journals since
Packit Service 310c69
   * the last recovery journal flush; thus, due to the size of the slab
Packit Service 310c69
   * journal blocks, the RJ must have flushed the storage no more than one
Packit Service 310c69
   * slab journal block ago. So we could only overwrite the to-be-reaped block
Packit Service 310c69
   * if we wrote and flushed the last block in the journal. But the blocking
Packit Service 310c69
   * threshold prevents that.
Packit Service 310c69
   */
Packit Service 310c69
  journal->flushWaiter.callback = flushForReaping;
Packit Service 310c69
  int result = acquireVIO(journal->slab->allocator, &journal->flushWaiter);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    enterJournalReadOnlyMode(journal, result);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This is the callback invoked after a slab summary update completes. It
Packit Service 310c69
 * is registered in the constructor on behalf of updateTailBlockLocation().
Packit Service 310c69
 *
Packit Service 310c69
 * Implements WaiterCallback.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter        The slab summary waiter that has just been notified
Packit Service 310c69
 * @param context       The result code of the update
Packit Service 310c69
 **/
Packit Service 310c69
static void releaseJournalLocks(Waiter *waiter, void *context)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal *journal = slabJournalFromSlabSummaryWaiter(waiter);
Packit Service 310c69
  int          result  = *((int *) context);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    if (result != VDO_READ_ONLY) {
Packit Service 310c69
      // Don't bother logging what might be lots of errors if we are already
Packit Service 310c69
      // in read-only mode.
Packit Service 310c69
      logErrorWithStringError(result, "failed slab summary update %llu",
Packit Service 310c69
                              journal->summarized);
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    journal->updatingSlabSummary = false;
Packit Service 310c69
    enterJournalReadOnlyMode(journal, result);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (journal->partialWriteInProgress
Packit Service 310c69
      && (journal->summarized == journal->tail)) {
Packit Service 310c69
    journal->partialWriteInProgress = false;
Packit Service 310c69
    addEntries(journal);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  SequenceNumber first    = journal->lastSummarized;
Packit Service 310c69
  journal->lastSummarized = journal->summarized;
Packit Service 310c69
  for (SequenceNumber i = journal->summarized - 1; i >= first; i--) {
Packit Service 310c69
    // Release the lock the summarized block held on the recovery journal.
Packit Service 310c69
    // (During replay, recoveryStart will always be 0.)
Packit Service 310c69
    if (journal->recoveryJournal != NULL) {
Packit Service 310c69
      ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
Packit Service 310c69
      releaseRecoveryJournalBlockReference(journal->recoveryJournal,
Packit Service 310c69
                                           getLock(journal, i)->recoveryStart,
Packit Service 310c69
                                           ZONE_TYPE_PHYSICAL,
Packit Service 310c69
                                           zoneNumber);
Packit Service 310c69
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Release our own lock against reaping for blocks that are committed.
Packit Service 310c69
    // (This function will not change locks during replay.)
Packit Service 310c69
    adjustSlabJournalBlockReference(journal, i, -1);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->updatingSlabSummary = false;
Packit Service 310c69
Packit Service 310c69
  reapSlabJournal(journal);
Packit Service 310c69
Packit Service 310c69
  // Check if the slab summary needs to be updated again.
Packit Service 310c69
  updateTailBlockLocation(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Update the tail block location in the slab summary, if necessary.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The slab journal that is updating its tail block location
Packit Service 310c69
 **/
Packit Service 310c69
static void updateTailBlockLocation(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  if (journal->updatingSlabSummary || isVDOReadOnly(journal)
Packit Service 310c69
      || (journal->lastSummarized >= journal->nextCommit)) {
Packit Service 310c69
    checkIfSlabDrained(journal->slab);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  BlockCount freeBlockCount;
Packit Service 310c69
  if (isUnrecoveredSlab(journal->slab)) {
Packit Service 310c69
    freeBlockCount = getSummarizedFreeBlockCount(journal->summary,
Packit Service 310c69
                                                 journal->slab->slabNumber);
Packit Service 310c69
  } else {
Packit Service 310c69
    freeBlockCount = getSlabFreeBlockCount(journal->slab);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->summarized          = journal->nextCommit;
Packit Service 310c69
  journal->updatingSlabSummary = true;
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Update slab summary as dirty.
Packit Service 310c69
   * Slab journal can only reap past sequence number 1 when all the refCounts
Packit Service 310c69
   * for this slab have been written to the layer. Therefore, indicate that the
Packit Service 310c69
   * refCounts must be loaded when the journal head has reaped past sequence
Packit Service 310c69
   * number 1.
Packit Service 310c69
   */
Packit Service 310c69
  TailBlockOffset blockOffset
Packit Service 310c69
    = getSlabJournalBlockOffset(journal, journal->summarized);
Packit Service 310c69
  updateSlabSummaryEntry(journal->summary, &journal->slabSummaryWaiter,
Packit Service 310c69
                         journal->slab->slabNumber, blockOffset,
Packit Service 310c69
                         (journal->head > 1), false, freeBlockCount);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void reopenSlabJournal(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY(journal->tailHeader.entryCount == 0,
Packit Service 310c69
                  "Slab journal's active block empty before reopening");
Packit Service 310c69
  journal->head       = journal->tail;
Packit Service 310c69
  initializeJournalState(journal);
Packit Service 310c69
Packit Service 310c69
  // Ensure no locks are spuriously held on an empty journal.
Packit Service 310c69
  for (SequenceNumber block = 1; block <= journal->size; block++) {
Packit Service 310c69
    ASSERT_LOG_ONLY((getLock(journal, block)->count == 0),
Packit Service 310c69
                    "Scrubbed journal's block %llu is not locked",
Packit Service 310c69
                    block);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  addEntries(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static SequenceNumber getCommittingSequenceNumber(const VIOPoolEntry *entry)
Packit Service 310c69
{
Packit Service 310c69
  const PackedSlabJournalBlock *block = entry->buffer;
Packit Service 310c69
  return getUInt64LE(block->header.fields.sequenceNumber);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Handle post-commit processing. This is the callback registered by
Packit Service 310c69
 * writeSlabJournalBlock().
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The write VIO as a completion
Packit Service 310c69
 **/
Packit Service 310c69
static void completeWrite(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  int           writeResult = completion->result;
Packit Service 310c69
  VIOPoolEntry *entry       = completion->parent;
Packit Service 310c69
  SlabJournal  *journal     = entry->parent;
Packit Service 310c69
Packit Service 310c69
  SequenceNumber committed = getCommittingSequenceNumber(entry);
Packit Service 310c69
  unspliceRingNode(&entry->node);
Packit Service 310c69
  returnVIO(journal->slab->allocator, entry);
Packit Service 310c69
Packit Service 310c69
  if (writeResult != VDO_SUCCESS) {
Packit Service 310c69
    logErrorWithStringError(writeResult,
Packit Service 310c69
                            "cannot write slab journal block %llu",
Packit Service 310c69
                            committed);
Packit Service 310c69
    enterJournalReadOnlyMode(journal, writeResult);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  relaxedAdd64(&journal->events->blocksWritten, 1);
Packit Service 310c69
Packit Service 310c69
  if (isRingEmpty(&journal->uncommittedBlocks)) {
Packit Service 310c69
    // If no blocks are outstanding, then the commit point is at the tail.
Packit Service 310c69
    journal->nextCommit = journal->tail;
Packit Service 310c69
  } else {
Packit Service 310c69
    // The commit point is always the beginning of the oldest incomplete block.
Packit Service 310c69
    VIOPoolEntry *oldest = asVIOPoolEntry(journal->uncommittedBlocks.next);
Packit Service 310c69
    journal->nextCommit = getCommittingSequenceNumber(oldest);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  updateTailBlockLocation(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Callback from acquireVIO() registered in commitSlabJournalTail().
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter      The VIO pool waiter which was just notified
Packit Service 310c69
 * @param vioContext  The VIO pool entry for the write
Packit Service 310c69
 **/
Packit Service 310c69
static void writeSlabJournalBlock(Waiter *waiter, void *vioContext)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal            *journal = slabJournalFromResourceWaiter(waiter);
Packit Service 310c69
  VIOPoolEntry           *entry   = vioContext;
Packit Service 310c69
  SlabJournalBlockHeader *header  = &journal->tailHeader;
Packit Service 310c69
Packit Service 310c69
  header->head = journal->head;
Packit Service 310c69
  pushRingNode(&journal->uncommittedBlocks, &entry->node);
Packit Service 310c69
  packSlabJournalBlockHeader(header, &journal->block->header);
Packit Service 310c69
Packit Service 310c69
  // Copy the tail block into the VIO.
Packit Service 310c69
  memcpy(entry->buffer, journal->block, VDO_BLOCK_SIZE);
Packit Service 310c69
Packit Service 310c69
  int unusedEntries = journal->entriesPerBlock - header->entryCount;
Packit Service 310c69
  ASSERT_LOG_ONLY(unusedEntries >= 0, "Slab journal block is not overfull");
Packit Service 310c69
  if (unusedEntries > 0) {
Packit Service 310c69
    // Release the per-entry locks for any unused entries in the block we are
Packit Service 310c69
    // about to write.
Packit Service 310c69
    adjustSlabJournalBlockReference(journal, header->sequenceNumber,
Packit Service 310c69
                                    -unusedEntries);
Packit Service 310c69
    journal->partialWriteInProgress = !blockIsFull(journal);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  PhysicalBlockNumber blockNumber
Packit Service 310c69
    = getBlockNumber(journal, header->sequenceNumber);
Packit Service 310c69
Packit Service 310c69
  entry->parent = journal;
Packit Service 310c69
  entry->vio->completion.callbackThreadID = journal->slab->allocator->threadID;
Packit Service 310c69
  /*
Packit Service 310c69
   * This block won't be read in recovery until the slab summary is updated
Packit Service 310c69
   * to refer to it. The slab summary update does a flush which is sufficient
Packit Service 310c69
   * to protect us from VDO-2331.
Packit Service 310c69
   */
Packit Service 310c69
  launchWriteMetadataVIO(entry->vio, blockNumber, completeWrite,
Packit Service 310c69
                         completeWrite);
Packit Service 310c69
Packit Service 310c69
  // Since the write is submitted, the tail block structure can be reused.
Packit Service 310c69
  journal->tail++;
Packit Service 310c69
  initializeTailBlock(journal);
Packit Service 310c69
  journal->waitingToCommit = false;
Packit Service 310c69
  if (journal->slab->state.state == ADMIN_STATE_WAITING_FOR_RECOVERY) {
Packit Service 310c69
    finishOperationWithResult(&journal->slab->state,
Packit Service 310c69
                              (isVDOReadOnly(journal)
Packit Service 310c69
                               ? VDO_READ_ONLY : VDO_SUCCESS));
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  addEntries(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void commitSlabJournalTail(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  if ((journal->tailHeader.entryCount == 0)
Packit Service 310c69
      && mustMakeEntriesToFlush(journal)) {
Packit Service 310c69
    // There are no entries at the moment, but there are some waiters, so defer
Packit Service 310c69
    // initiating the flush until those entries are ready to write.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isVDOReadOnly(journal)
Packit Service 310c69
      || journal->waitingToCommit
Packit Service 310c69
      || (journal->tailHeader.entryCount == 0)) {
Packit Service 310c69
    // There is nothing to do since the tail block is empty, or writing, or
Packit Service 310c69
    // the journal is in read-only mode.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Since we are about to commit the tail block, this journal no longer
Packit Service 310c69
   * needs to be on the ring of journals which the recovery journal might
Packit Service 310c69
   * ask to commit.
Packit Service 310c69
   */
Packit Service 310c69
  markSlabJournalClean(journal);
Packit Service 310c69
Packit Service 310c69
  journal->waitingToCommit = true;
Packit Service 310c69
Packit Service 310c69
  journal->resourceWaiter.callback = writeSlabJournalBlock;
Packit Service 310c69
  int result = acquireVIO(journal->slab->allocator, &journal->resourceWaiter);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    journal->waitingToCommit = false;
Packit Service 310c69
    enterJournalReadOnlyMode(journal, result);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void encodeSlabJournalEntry(SlabJournalBlockHeader *tailHeader,
Packit Service 310c69
                            SlabJournalPayload     *payload,
Packit Service 310c69
                            SlabBlockNumber         sbn,
Packit Service 310c69
                            JournalOperation        operation)
Packit Service 310c69
{
Packit Service 310c69
  JournalEntryCount entryNumber = tailHeader->entryCount++;
Packit Service 310c69
  if (operation == BLOCK_MAP_INCREMENT) {
Packit Service 310c69
    if (!tailHeader->hasBlockMapIncrements) {
Packit Service 310c69
      memset(payload->fullEntries.entryTypes, 0,
Packit Service 310c69
             SLAB_JOURNAL_ENTRY_TYPES_SIZE);
Packit Service 310c69
      tailHeader->hasBlockMapIncrements = true;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    payload->fullEntries.entryTypes[entryNumber / 8]
Packit Service 310c69
      |= ((byte) 1 << (entryNumber % 8));
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  packSlabJournalEntry(&payload->entries[entryNumber], sbn,
Packit Service 310c69
                       isIncrementOperation(operation));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
SlabJournalEntry decodeSlabJournalEntry(PackedSlabJournalBlock *block,
Packit Service 310c69
                                        JournalEntryCount       entryCount)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournalEntry entry
Packit Service 310c69
    = unpackSlabJournalEntry(&block->payload.entries[entryCount]);
Packit Service 310c69
  if (block->header.fields.hasBlockMapIncrements
Packit Service 310c69
      && ((block->payload.fullEntries.entryTypes[entryCount / 8]
Packit Service 310c69
           & ((byte) 1 << (entryCount % 8))) != 0)) {
Packit Service 310c69
    entry.operation = BLOCK_MAP_INCREMENT;
Packit Service 310c69
  }
Packit Service 310c69
  return entry;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Actually add an entry to the slab journal, potentially firing off a write
Packit Service 310c69
 * if a block becomes full. This function is synchronous.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal        The slab journal to append to
Packit Service 310c69
 * @param pbn            The pbn being adjusted
Packit Service 310c69
 * @param operation      The type of entry to make
Packit Service 310c69
 * @param recoveryPoint  The recovery journal point for this entry
Packit Service 310c69
 **/
Packit Service 310c69
static void addEntry(SlabJournal         *journal,
Packit Service 310c69
                     PhysicalBlockNumber  pbn,
Packit Service 310c69
                     JournalOperation     operation,
Packit Service 310c69
                     const JournalPoint  *recoveryPoint)
Packit Service 310c69
{
Packit Service 310c69
  int result = ASSERT(beforeJournalPoint(&journal->tailHeader.recoveryPoint,
Packit Service 310c69
                                         recoveryPoint),
Packit Service 310c69
                      "recovery journal point is monotonically increasing, "
Packit Service 310c69
                      "recovery point: %llu.%u, "
Packit Service 310c69
                      "block recovery point: %llu.%u",
Packit Service 310c69
                      recoveryPoint->sequenceNumber, recoveryPoint->entryCount,
Packit Service 310c69
                      journal->tailHeader.recoveryPoint.sequenceNumber,
Packit Service 310c69
                      journal->tailHeader.recoveryPoint.entryCount);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    enterJournalReadOnlyMode(journal, result);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  PackedSlabJournalBlock *block = journal->block;
Packit Service 310c69
  if (operation == BLOCK_MAP_INCREMENT) {
Packit Service 310c69
    result = ASSERT_LOG_ONLY((journal->tailHeader.entryCount
Packit Service 310c69
                              < journal->fullEntriesPerBlock),
Packit Service 310c69
                             "block has room for full entries");
Packit Service 310c69
    if (result != VDO_SUCCESS) {
Packit Service 310c69
      enterJournalReadOnlyMode(journal, result);
Packit Service 310c69
      return;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  encodeSlabJournalEntry(&journal->tailHeader, &block->payload,
Packit Service 310c69
                         pbn - journal->slab->start, operation);
Packit Service 310c69
  journal->tailHeader.recoveryPoint = *recoveryPoint;
Packit Service 310c69
  if (blockIsFull(journal)) {
Packit Service 310c69
    commitSlabJournalTail(journal);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool attemptReplayIntoSlabJournal(SlabJournal         *journal,
Packit Service 310c69
                                  PhysicalBlockNumber  pbn,
Packit Service 310c69
                                  JournalOperation     operation,
Packit Service 310c69
                                  JournalPoint        *recoveryPoint,
Packit Service 310c69
                                  VDOCompletion       *parent)
Packit Service 310c69
{
Packit Service 310c69
  // Only accept entries after the current recovery point.
Packit Service 310c69
  if (!beforeJournalPoint(&journal->tailHeader.recoveryPoint, recoveryPoint)) {
Packit Service 310c69
    return true;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  SlabJournalBlockHeader *header = &journal->tailHeader;
Packit Service 310c69
  if ((header->entryCount >= journal->fullEntriesPerBlock)
Packit Service 310c69
      && (header->hasBlockMapIncrements ||
Packit Service 310c69
          (operation == BLOCK_MAP_INCREMENT))) {
Packit Service 310c69
    // The tail block does not have room for the entry we are attempting
Packit Service 310c69
    // to add so commit the tail block now.
Packit Service 310c69
    commitSlabJournalTail(journal);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (journal->waitingToCommit) {
Packit Service 310c69
    startOperationWithWaiter(&journal->slab->state,
Packit Service 310c69
                             ADMIN_STATE_WAITING_FOR_RECOVERY, parent, NULL);
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if ((journal->tail - journal->head) >= journal->size) {
Packit Service 310c69
    /*
Packit Service 310c69
     * We must have reaped the current head before the crash, since
Packit Service 310c69
     * the blocked threshold keeps us from having more entries than
Packit Service 310c69
     * fit in a slab journal; hence we can just advance the head
Packit Service 310c69
     * (and unreapable block), as needed.
Packit Service 310c69
     */
Packit Service 310c69
    journal->head++;
Packit Service 310c69
    journal->unreapable++;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  markSlabReplaying(journal->slab);
Packit Service 310c69
  addEntry(journal, pbn, operation, recoveryPoint);
Packit Service 310c69
  return true;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether the journal should be saving reference blocks out.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal       The journal to check
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the journal should be requesting reference block writes
Packit Service 310c69
 **/
Packit Service 310c69
static bool requiresFlushing(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  BlockCount journalLength = (journal->tail - journal->head);
Packit Service 310c69
  return (journalLength >= journal->flushingThreshold);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether the journal must be reaped before adding new entries.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal       The journal to check
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the journal must be reaped
Packit Service 310c69
 **/
Packit Service 310c69
static bool requiresReaping(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  BlockCount journalLength = (journal->tail - journal->head);
Packit Service 310c69
  return (journalLength >= journal->blockingThreshold);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool requiresScrubbing(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  BlockCount journalLength = (journal->tail - journal->head);
Packit Service 310c69
  return (journalLength >= journal->scrubbingThreshold);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Implements WaiterCallback. This callback is invoked by addEntries() once
Packit Service 310c69
 * it has determined that we are ready to make another entry in the slab
Packit Service 310c69
 * journal.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter        The VIO which should make an entry now
Packit Service 310c69
 * @param context       The slab journal to make an entry in
Packit Service 310c69
 **/
Packit Service 310c69
static void addEntryFromWaiter(Waiter *waiter, void *context)
Packit Service 310c69
{
Packit Service 310c69
  DataVIO     *dataVIO = waiterAsDataVIO(waiter);
Packit Service 310c69
  SlabJournal *journal = (SlabJournal *) context;
Packit Service 310c69
  SlabJournalBlockHeader *header = &journal->tailHeader;
Packit Service 310c69
  SequenceNumber recoveryBlock = dataVIO->recoveryJournalPoint.sequenceNumber;
Packit Service 310c69
Packit Service 310c69
  if (header->entryCount == 0) {
Packit Service 310c69
    /*
Packit Service 310c69
     * This is the first entry in the current tail block, so get a lock
Packit Service 310c69
     * on the recovery journal which we will hold until this tail block is
Packit Service 310c69
     * committed.
Packit Service 310c69
     */
Packit Service 310c69
    getLock(journal, header->sequenceNumber)->recoveryStart = recoveryBlock;
Packit Service 310c69
    if (journal->recoveryJournal != NULL) {
Packit Service 310c69
      ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
Packit Service 310c69
      acquireRecoveryJournalBlockReference(journal->recoveryJournal,
Packit Service 310c69
                                           recoveryBlock, ZONE_TYPE_PHYSICAL,
Packit Service 310c69
                                           zoneNumber);
Packit Service 310c69
    }
Packit Service 310c69
    markSlabJournalDirty(journal, recoveryBlock);
Packit Service 310c69
Packit Service 310c69
    // If the slab journal is over the first threshold, tell the refCounts to
Packit Service 310c69
    // write some reference blocks, but proceed apace.
Packit Service 310c69
    if (requiresFlushing(journal)) {
Packit Service 310c69
      relaxedAdd64(&journal->events->flushCount, 1);
Packit Service 310c69
      BlockCount journalLength = (journal->tail - journal->head);
Packit Service 310c69
      BlockCount blocksToDeadline = 0;
Packit Service 310c69
      if (journalLength <= journal->flushingDeadline) {
Packit Service 310c69
        blocksToDeadline = journal->flushingDeadline - journalLength;
Packit Service 310c69
      }
Packit Service 310c69
      saveSeveralReferenceBlocks(journal->slab->referenceCounts,
Packit Service 310c69
                                 blocksToDeadline + 1);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  JournalPoint slabJournalPoint = {
Packit Service 310c69
    .sequenceNumber = header->sequenceNumber,
Packit Service 310c69
    .entryCount     = header->entryCount,
Packit Service 310c69
  };
Packit Service 310c69
Packit Service 310c69
  addEntry(journal, dataVIO->operation.pbn, dataVIO->operation.type,
Packit Service 310c69
           &dataVIO->recoveryJournalPoint);
Packit Service 310c69
Packit Service 310c69
  // Now that an entry has been made in the slab journal, update the
Packit Service 310c69
  // reference counts.
Packit Service 310c69
  int result = modifySlabReferenceCount(journal->slab, &slabJournalPoint,
Packit Service 310c69
                                        dataVIO->operation);
Packit Service 310c69
  continueDataVIO(dataVIO, result);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether the next entry to be made is a block map increment.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the first entry waiter's operation is a block
Packit Service 310c69
 *         map increment
Packit Service 310c69
 **/
Packit Service 310c69
static inline bool isNextEntryABlockMapIncrement(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  DataVIO *dataVIO = waiterAsDataVIO(getFirstWaiter(&journal->entryWaiters));
Packit Service 310c69
  return (dataVIO->operation.type == BLOCK_MAP_INCREMENT);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Add as many entries as possible from the queue of VIOs waiting to make
Packit Service 310c69
 * entries. By processing the queue in order, we ensure that slab journal
Packit Service 310c69
 * entries are made in the same order as recovery journal entries for the
Packit Service 310c69
 * same increment or decrement.
Packit Service 310c69
 *
Packit Service 310c69
 * @param journal  The journal to which entries may be added
Packit Service 310c69
 **/
Packit Service 310c69
static void addEntries(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  if (journal->addingEntries) {
Packit Service 310c69
    // Protect against re-entrancy.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->addingEntries = true;
Packit Service 310c69
  while (hasWaiters(&journal->entryWaiters)) {
Packit Service 310c69
    if (journal->partialWriteInProgress || slabIsRebuilding(journal->slab)) {
Packit Service 310c69
      // Don't add entries while rebuilding or while a partial write is
Packit Service 310c69
      // outstanding (VDO-2399).
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    SlabJournalBlockHeader *header = &journal->tailHeader;
Packit Service 310c69
    if (journal->waitingToCommit) {
Packit Service 310c69
      // If we are waiting for resources to write the tail block, and the
Packit Service 310c69
      // tail block is full, we can't make another entry.
Packit Service 310c69
      relaxedAdd64(&journal->events->tailBusyCount, 1);
Packit Service 310c69
      break;
Packit Service 310c69
    } else if (isNextEntryABlockMapIncrement(journal)
Packit Service 310c69
               && (header->entryCount >= journal->fullEntriesPerBlock)) {
Packit Service 310c69
      // The tail block does not have room for a block map increment, so
Packit Service 310c69
      // commit it now.
Packit Service 310c69
      commitSlabJournalTail(journal);
Packit Service 310c69
      if (journal->waitingToCommit) {
Packit Service 310c69
        relaxedAdd64(&journal->events->tailBusyCount, 1);
Packit Service 310c69
        break;
Packit Service 310c69
      }
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // If the slab is over the blocking threshold, make the VIO wait.
Packit Service 310c69
    if (requiresReaping(journal)) {
Packit Service 310c69
      relaxedAdd64(&journal->events->blockedCount, 1);
Packit Service 310c69
      saveDirtyReferenceBlocks(journal->slab->referenceCounts);
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (header->entryCount == 0) {
Packit Service 310c69
      JournalLock *lock = getLock(journal, header->sequenceNumber);
Packit Service 310c69
      // Check if the on disk slab journal is full. Because of the
Packit Service 310c69
      // blocking and scrubbing thresholds, this should never happen.
Packit Service 310c69
      if (lock->count > 0) {
Packit Service 310c69
        ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
Packit Service 310c69
                        "New block has locks, but journal is not full");
Packit Service 310c69
Packit Service 310c69
        /*
Packit Service 310c69
         * The blocking threshold must let the journal fill up if the new
Packit Service 310c69
         * block has locks; if the blocking threshold is smaller than the
Packit Service 310c69
         * journal size, the new block cannot possibly have locks already.
Packit Service 310c69
         */
Packit Service 310c69
        ASSERT_LOG_ONLY((journal->blockingThreshold >= journal->size),
Packit Service 310c69
                        "New block can have locks already iff blocking"
Packit Service 310c69
                        "threshold is at the end of the journal");
Packit Service 310c69
Packit Service 310c69
        relaxedAdd64(&journal->events->diskFullCount, 1);
Packit Service 310c69
        saveDirtyReferenceBlocks(journal->slab->referenceCounts);
Packit Service 310c69
        break;
Packit Service 310c69
      }
Packit Service 310c69
Packit Service 310c69
      /*
Packit Service 310c69
       * Don't allow the new block to be reaped until all of the reference
Packit Service 310c69
       * count blocks are written and the journal block has been
Packit Service 310c69
       * fully committed as well.
Packit Service 310c69
       */
Packit Service 310c69
      lock->count = journal->entriesPerBlock + 1;
Packit Service 310c69
Packit Service 310c69
      if (header->sequenceNumber == 1) {
Packit Service 310c69
        /*
Packit Service 310c69
         * This is the first entry in this slab journal, ever. Dirty all of
Packit Service 310c69
         * the reference count blocks. Each will acquire a lock on the
Packit Service 310c69
         * tail block so that the journal won't be reaped until the
Packit Service 310c69
         * reference counts are initialized. The lock acquisition must
Packit Service 310c69
         * be done by the RefCounts since here we don't know how many
Packit Service 310c69
         * reference blocks the RefCounts has.
Packit Service 310c69
         */
Packit Service 310c69
        acquireDirtyBlockLocks(journal->slab->referenceCounts);
Packit Service 310c69
      }
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    notifyNextWaiter(&journal->entryWaiters, addEntryFromWaiter, journal);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->addingEntries = false;
Packit Service 310c69
Packit Service 310c69
  // If there are no waiters, and we are flushing or saving, commit the
Packit Service 310c69
  // tail block.
Packit Service 310c69
  if (isSlabDraining(journal->slab) && !isSuspending(&journal->slab->state)
Packit Service 310c69
      && !hasWaiters(&journal->entryWaiters)) {
Packit Service 310c69
    commitSlabJournalTail(journal);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void addSlabJournalEntry(SlabJournal *journal, DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  if (!isSlabOpen(journal->slab)) {
Packit Service 310c69
    continueDataVIO(dataVIO, VDO_INVALID_ADMIN_STATE);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isVDOReadOnly(journal)) {
Packit Service 310c69
    continueDataVIO(dataVIO, VDO_READ_ONLY);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  int result = enqueueDataVIO(&journal->entryWaiters, dataVIO,
Packit Service 310c69
                              THIS_LOCATION("$F($j-$js)"));
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    continueDataVIO(dataVIO, result);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isUnrecoveredSlab(journal->slab) && requiresReaping(journal)) {
Packit Service 310c69
    increaseScrubbingPriority(journal->slab);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  addEntries(journal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void adjustSlabJournalBlockReference(SlabJournal    *journal,
Packit Service 310c69
                                     SequenceNumber  sequenceNumber,
Packit Service 310c69
                                     int             adjustment)
Packit Service 310c69
{
Packit Service 310c69
  if (sequenceNumber == 0) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isReplayingSlab(journal->slab)) {
Packit Service 310c69
    // Locks should not be used during offline replay.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
Packit Service 310c69
  JournalLock *lock = getLock(journal, sequenceNumber);
Packit Service 310c69
  if (adjustment < 0) {
Packit Service 310c69
    ASSERT_LOG_ONLY((-adjustment <= lock->count),
Packit Service 310c69
                    "adjustment %d of lock count %u for slab journal block %"
Packit Service 310c69
		    PRIu64 " must not underflow", adjustment, lock->count,
Packit Service 310c69
		    sequenceNumber);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  lock->count += adjustment;
Packit Service 310c69
  if (lock->count == 0) {
Packit Service 310c69
    reapSlabJournal(journal);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool releaseRecoveryJournalLock(SlabJournal    *journal,
Packit Service 310c69
                                SequenceNumber  recoveryLock)
Packit Service 310c69
{
Packit Service 310c69
  if (recoveryLock > journal->recoveryLock) {
Packit Service 310c69
    ASSERT_LOG_ONLY((recoveryLock < journal->recoveryLock),
Packit Service 310c69
                    "slab journal recovery lock is not older than the recovery"
Packit Service 310c69
                    " journal head");
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if ((recoveryLock < journal->recoveryLock) || isVDOReadOnly(journal)) {
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // All locks are held by the block which is in progress; write it.
Packit Service 310c69
  commitSlabJournalTail(journal);
Packit Service 310c69
  return true;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void drainSlabJournal(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY((getCallbackThreadID()
Packit Service 310c69
                   == journal->slab->allocator->threadID),
Packit Service 310c69
                  "drainSlabJournal() called on correct thread");
Packit Service 310c69
  if (isQuiescing(&journal->slab->state)) {
Packit Service 310c69
    // XXX: we should revisit this assertion since it is no longer clear what
Packit Service 310c69
    //      it is for.
Packit Service 310c69
    ASSERT_LOG_ONLY((!(slabIsRebuilding(journal->slab)
Packit Service 310c69
                       && hasWaiters(&journal->entryWaiters))),
Packit Service 310c69
                    "slab is recovered or has no waiters");
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  switch (journal->slab->state.state) {
Packit Service 310c69
  case ADMIN_STATE_REBUILDING:
Packit Service 310c69
  case ADMIN_STATE_SUSPENDING:
Packit Service 310c69
  case ADMIN_STATE_SAVE_FOR_SCRUBBING:
Packit Service 310c69
    break;
Packit Service 310c69
Packit Service 310c69
  default:
Packit Service 310c69
    commitSlabJournalTail(journal);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Finish the decode process by returning the VIO and notifying the slab that
Packit Service 310c69
 * we're done.
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The VIO as a completion
Packit Service 310c69
 **/
Packit Service 310c69
static void finishDecodingJournal(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  int           result  = completion->result;
Packit Service 310c69
  VIOPoolEntry *entry   = completion->parent;
Packit Service 310c69
  SlabJournal  *journal = entry->parent;
Packit Service 310c69
  returnVIO(journal->slab->allocator, entry);
Packit Service 310c69
  notifySlabJournalIsLoaded(journal->slab, result);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Set up the in-memory journal state to the state which was written to disk.
Packit Service 310c69
 * This is the callback registered in readSlabJournalTail().
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The VIO which was used to read the journal tail
Packit Service 310c69
 **/
Packit Service 310c69
static void setDecodedState(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  VIOPoolEntry           *entry   = completion->parent;
Packit Service 310c69
  SlabJournal            *journal = entry->parent;
Packit Service 310c69
  PackedSlabJournalBlock *block   = entry->buffer;
Packit Service 310c69
Packit Service 310c69
  SlabJournalBlockHeader header;
Packit Service 310c69
  unpackSlabJournalBlockHeader(&block->header, &header);
Packit Service 310c69
Packit Service 310c69
  if ((header.metadataType != VDO_METADATA_SLAB_JOURNAL)
Packit Service 310c69
      || (header.nonce != journal->slab->allocator->nonce)) {
Packit Service 310c69
    finishDecodingJournal(completion);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->tail = header.sequenceNumber + 1;
Packit Service 310c69
Packit Service 310c69
  // If the slab is clean, this implies the slab journal is empty, so advance
Packit Service 310c69
  // the head appropriately.
Packit Service 310c69
  if (getSummarizedCleanliness(journal->summary, journal->slab->slabNumber)) {
Packit Service 310c69
    journal->head = journal->tail;
Packit Service 310c69
  } else {
Packit Service 310c69
    journal->head = header.head;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->tailHeader = header;
Packit Service 310c69
  initializeJournalState(journal);
Packit Service 310c69
  finishDecodingJournal(completion);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This reads the slab journal tail block by using a VIO acquired from the VIO
Packit Service 310c69
 * pool. This is the success callback from acquireVIOFromPool() when decoding
Packit Service 310c69
 * the slab journal.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter      The VIO pool waiter which has just been notified
Packit Service 310c69
 * @param vioContext  The VIO pool entry given to the waiter
Packit Service 310c69
 **/
Packit Service 310c69
static void readSlabJournalTail(Waiter *waiter, void *vioContext)
Packit Service 310c69
{
Packit Service 310c69
  SlabJournal  *journal = slabJournalFromResourceWaiter(waiter);
Packit Service 310c69
  Slab         *slab    = journal->slab;
Packit Service 310c69
  VIOPoolEntry *entry   = vioContext;
Packit Service 310c69
  TailBlockOffset lastCommitPoint
Packit Service 310c69
    = getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
Packit Service 310c69
  entry->parent = journal;
Packit Service 310c69
Packit Service 310c69
Packit Service 310c69
  // Slab summary keeps the commit point offset, so the tail block is the
Packit Service 310c69
  // block before that. Calculation supports small journals in unit tests.
Packit Service 310c69
  TailBlockOffset tailBlock = ((lastCommitPoint == 0)
Packit Service 310c69
                               ? (TailBlockOffset) (journal->size - 1)
Packit Service 310c69
                               : (lastCommitPoint - 1));
Packit Service 310c69
  entry->vio->completion.callbackThreadID = slab->allocator->threadID;
Packit Service 310c69
  launchReadMetadataVIO(entry->vio, slab->journalOrigin + tailBlock,
Packit Service 310c69
                        setDecodedState, finishDecodingJournal);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void decodeSlabJournal(SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY((getCallbackThreadID()
Packit Service 310c69
                   == journal->slab->allocator->threadID),
Packit Service 310c69
                  "decodeSlabJournal() called on correct thread");
Packit Service 310c69
  Slab *slab = journal->slab;
Packit Service 310c69
  TailBlockOffset lastCommitPoint
Packit Service 310c69
    = getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
Packit Service 310c69
  if ((lastCommitPoint == 0)
Packit Service 310c69
      && !mustLoadRefCounts(journal->summary, slab->slabNumber)) {
Packit Service 310c69
    /*
Packit Service 310c69
     * This slab claims that it has a tail block at (journal->size - 1), but
Packit Service 310c69
     * a head of 1. This is impossible, due to the scrubbing threshold, on
Packit Service 310c69
     * a real system, so don't bother reading the (bogus) data off disk.
Packit Service 310c69
     */
Packit Service 310c69
    ASSERT_LOG_ONLY(((journal->size < 16)
Packit Service 310c69
                     || (journal->scrubbingThreshold < (journal->size - 1))),
Packit Service 310c69
                    "Scrubbing threshold protects against reads of unwritten"
Packit Service 310c69
                    "slab journal blocks");
Packit Service 310c69
    notifySlabJournalIsLoaded(slab, VDO_SUCCESS);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  journal->resourceWaiter.callback = readSlabJournalTail;
Packit Service 310c69
  int result = acquireVIO(slab->allocator, &journal->resourceWaiter);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    notifySlabJournalIsLoaded(slab, result);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void dumpSlabJournal(const SlabJournal *journal)
Packit Service 310c69
{
Packit Service 310c69
  logInfo("  slab journal: entryWaiters=%zu waitingToCommit=%s"
Packit Service 310c69
          " updatingSlabSummary=%s head=%llu unreapable=%" PRIu64
Packit Service 310c69
          " tail=%llu nextCommit=%llu summarized=%" PRIu64
Packit Service 310c69
          " lastSummarized=%llu recoveryJournalLock=%" PRIu64
Packit Service 310c69
          " dirty=%s", countWaiters(&journal->entryWaiters),
Packit Service 310c69
          boolToString(journal->waitingToCommit),
Packit Service 310c69
          boolToString(journal->updatingSlabSummary),
Packit Service 310c69
          journal->head, journal->unreapable, journal->tail,
Packit Service 310c69
          journal->nextCommit, journal->summarized, journal->lastSummarized,
Packit Service 310c69
          journal->recoveryLock,
Packit Service 310c69
          boolToString(isSlabJournalDirty(journal)));
Packit Service 310c69
  // Given the frequency with which the locks are just a tiny bit off, it
Packit Service 310c69
  // might be worth dumping all the locks, but that might be too much logging.
Packit Service 310c69
}