/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/slabJournal.c#18 $
*/
#include "slabJournalInternals.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "stringUtils.h"
#include "adminState.h"
#include "blockAllocatorInternals.h"
#include "dataVIO.h"
#include "recoveryJournal.h"
#include "refCounts.h"
#include "slabDepot.h"
#include "slabSummary.h"
/**
* Return the slab journal from the resource waiter.
*
* @param waiter The waiter
*
* @return The slab journal
**/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromResourceWaiter(Waiter *waiter)
{
STATIC_ASSERT(offsetof(SlabJournal, resourceWaiter) == 0);
return (SlabJournal *) waiter;
}
/**
* Return the slab journal from the flush waiter.
*
* @param waiter The waiter
*
* @return The slab journal
**/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromFlushWaiter(Waiter *waiter)
{
if (waiter == NULL) {
return NULL;
}
return (SlabJournal *)
((uintptr_t) waiter - offsetof(SlabJournal, flushWaiter));
}
/**********************************************************************/
SlabJournal *slabJournalFromDirtyNode(RingNode *node)
{
if (node == NULL) {
return NULL;
}
return (SlabJournal *) ((uintptr_t) node - offsetof(SlabJournal, dirtyNode));
}
/**
* Return the slab journal from the slab summary waiter.
*
* @param waiter The waiter
*
* @return The slab journal
**/
__attribute__((warn_unused_result))
static inline SlabJournal *slabJournalFromSlabSummaryWaiter(Waiter *waiter)
{
if (waiter == NULL) {
return NULL;
}
return (SlabJournal *)
((uintptr_t) waiter - offsetof(SlabJournal, slabSummaryWaiter));
}
/**
* Get the physical block number for a given sequence number.
*
* @param journal The journal
* @param sequence The sequence number of the desired block
*
* @return the block number corresponding to the sequence number
**/
__attribute__((warn_unused_result))
static inline PhysicalBlockNumber getBlockNumber(SlabJournal *journal,
SequenceNumber sequence)
{
TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequence);
return (journal->slab->journalOrigin + offset);
}
/**
* Get the lock object for a slab journal block by sequence number.
*
* @param journal Slab journal to retrieve from
* @param sequenceNumber Sequence number of the block
*
* @return the lock object for the given sequence number
**/
__attribute__((warn_unused_result))
static inline JournalLock *getLock(SlabJournal *journal,
SequenceNumber sequenceNumber)
{
TailBlockOffset offset = getSlabJournalBlockOffset(journal, sequenceNumber);
return &journal->locks[offset];
}
/**
* Check whether the VDO is in read-only mode.
*
* @param journal The journal whose owning VDO should be checked
*
* @return <code>true</code> if the VDO is in read-only mode
**/
__attribute__((warn_unused_result))
static inline bool isVDOReadOnly(SlabJournal *journal)
{
return isReadOnly(journal->slab->allocator->readOnlyNotifier);
}
/**
* Check whether there are entry waiters which should delay a flush.
*
* @param journal The journal to check
*
* @return <code>true</code> if there are no entry waiters, or if the slab
* is unrecovered
**/
__attribute__((warn_unused_result))
static inline bool mustMakeEntriesToFlush(SlabJournal *journal)
{
return (!slabIsRebuilding(journal->slab)
&& hasWaiters(&journal->entryWaiters));
}
/**
* Check whether a reap is currently in progress.
*
* @param journal The journal which may be reaping
*
* @return <code>true</code> if the journal is reaping
**/
__attribute__((warn_unused_result))
static inline bool isReaping(SlabJournal *journal)
{
return (journal->head != journal->unreapable);
}
/**********************************************************************/
bool isSlabJournalActive(SlabJournal *journal)
{
return (mustMakeEntriesToFlush(journal)
|| isReaping(journal)
|| journal->waitingToCommit
|| !isRingEmpty(&journal->uncommittedBlocks)
|| journal->updatingSlabSummary);
}
/**
* Initialize tail block as a new block.
*
* @param journal The journal whose tail block is being initialized
**/
static void initializeTailBlock(SlabJournal *journal)
{
SlabJournalBlockHeader *header = &journal->tailHeader;
header->sequenceNumber = journal->tail;
header->entryCount = 0;
header->hasBlockMapIncrements = false;
}
/**
* Set all journal fields appropriately to start journaling.
*
* @param journal The journal to be reset, based on its tail sequence number
**/
static void initializeJournalState(SlabJournal *journal)
{
journal->unreapable = journal->head;
journal->reapLock = getLock(journal, journal->unreapable);
journal->nextCommit = journal->tail;
journal->summarized = journal->lastSummarized = journal->tail;
initializeTailBlock(journal);
}
/**
* Check whether a journal block is full.
*
* @param journal The slab journal for the block
*
* @return <code>true</code> if the tail block is full
**/
__attribute__((warn_unused_result))
static bool blockIsFull(SlabJournal *journal)
{
JournalEntryCount count = journal->tailHeader.entryCount;
return (journal->tailHeader.hasBlockMapIncrements
? (journal->fullEntriesPerBlock == count)
: (journal->entriesPerBlock == count));
}
/**********************************************************************/
static void addEntries(SlabJournal *journal);
static void updateTailBlockLocation(SlabJournal *journal);
static void releaseJournalLocks(Waiter *waiter, void *context);
/**********************************************************************/
int makeSlabJournal(BlockAllocator *allocator,
Slab *slab,
RecoveryJournal *recoveryJournal,
SlabJournal **journalPtr)
{
SlabJournal *journal;
const SlabConfig *slabConfig = getSlabConfig(allocator->depot);
int result = ALLOCATE_EXTENDED(SlabJournal, slabConfig->slabJournalBlocks,
JournalLock, __func__, &journal);
if (result != VDO_SUCCESS) {
return result;
}
journal->slab = slab;
journal->size = slabConfig->slabJournalBlocks;
journal->flushingThreshold = slabConfig->slabJournalFlushingThreshold;
journal->blockingThreshold = slabConfig->slabJournalBlockingThreshold;
journal->scrubbingThreshold = slabConfig->slabJournalScrubbingThreshold;
journal->entriesPerBlock = SLAB_JOURNAL_ENTRIES_PER_BLOCK;
journal->fullEntriesPerBlock = SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
journal->events = &allocator->slabJournalStatistics;
journal->recoveryJournal = recoveryJournal;
journal->summary = getSlabSummaryZone(allocator);
journal->tail = 1;
journal->head = 1;
journal->flushingDeadline = journal->flushingThreshold;
// Set there to be some time between the deadline and the blocking threshold,
// so that hopefully all are done before blocking.
if ((journal->blockingThreshold - journal->flushingThreshold) > 5) {
journal->flushingDeadline = journal->blockingThreshold - 5;
}
journal->slabSummaryWaiter.callback = releaseJournalLocks;
result = ALLOCATE(VDO_BLOCK_SIZE, char, "PackedSlabJournalBlock",
(char **) &journal->block);
if (result != VDO_SUCCESS) {
freeSlabJournal(&journal);
return result;
}
initializeRing(&journal->dirtyNode);
initializeRing(&journal->uncommittedBlocks);
journal->tailHeader.nonce = slab->allocator->nonce;
journal->tailHeader.metadataType = VDO_METADATA_SLAB_JOURNAL;
initializeJournalState(journal);
*journalPtr = journal;
return VDO_SUCCESS;
}
/**********************************************************************/
void freeSlabJournal(SlabJournal **journalPtr)
{
SlabJournal *journal = *journalPtr;
if (journal == NULL) {
return;
}
FREE(journal->block);
FREE(journal);
*journalPtr = NULL;
}
/**********************************************************************/
bool isSlabJournalBlank(const SlabJournal *journal)
{
return ((journal != NULL)
&& (journal->tail == 1)
&& (journal->tailHeader.entryCount == 0));
}
/**********************************************************************/
bool isSlabJournalDirty(const SlabJournal *journal)
{
return (journal->recoveryLock != 0);
}
/**
* Put a slab journal on the dirty ring of its allocator in the correct order.
*
* @param journal The journal to be marked dirty
* @param lock The recovery journal lock held by the slab journal
**/
static void markSlabJournalDirty(SlabJournal *journal, SequenceNumber lock)
{
ASSERT_LOG_ONLY(!isSlabJournalDirty(journal), "slab journal was clean");
journal->recoveryLock = lock;
RingNode *dirtyRing = &journal->slab->allocator->dirtySlabJournals;
RingNode *node = dirtyRing->prev;
while (node != dirtyRing) {
SlabJournal *dirtyJournal = slabJournalFromDirtyNode(node);
if (dirtyJournal->recoveryLock <= journal->recoveryLock) {
break;
}
node = node->prev;
}
pushRingNode(node->next, &journal->dirtyNode);
}
/**********************************************************************/
static void markSlabJournalClean(SlabJournal *journal)
{
journal->recoveryLock = 0;
unspliceRingNode(&journal->dirtyNode);
}
/**
* Implements WaiterCallback. This callback is invoked on all VIOs waiting
* to make slab journal entries after the VDO has gone into read-only mode.
**/
static void abortWaiter(Waiter *waiter,
void *context __attribute__((unused)))
{
continueDataVIO(waiterAsDataVIO(waiter), VDO_READ_ONLY);
}
/**********************************************************************/
void abortSlabJournalWaiters(SlabJournal *journal)
{
ASSERT_LOG_ONLY((getCallbackThreadID()
== journal->slab->allocator->threadID),
"abortSlabJournalWaiters() called on correct thread");
notifyAllWaiters(&journal->entryWaiters, abortWaiter, journal);
checkIfSlabDrained(journal->slab);
}
/**
* Put the journal in read-only mode. All attempts to add entries after
* this function is called will fail. All VIOs waiting for to make entries
* will be awakened with an error. All flushes will complete as soon as all
* pending IO is done.
*
* @param journal The journal which has failed
* @param errorCode The error result triggering this call
**/
static void enterJournalReadOnlyMode(SlabJournal *journal, int errorCode)
{
enterReadOnlyMode(journal->slab->allocator->readOnlyNotifier, errorCode);
abortSlabJournalWaiters(journal);
}
/**
* Actually advance the head of the journal now that any necessary flushes
* are complete.
*
* @param journal The journal to be reaped
**/
static void finishReaping(SlabJournal *journal)
{
journal->head = journal->unreapable;
addEntries(journal);
checkIfSlabDrained(journal->slab);
}
/**********************************************************************/
static void reapSlabJournal(SlabJournal *journal);
/**
* Finish reaping now that we have flushed the lower layer and then try
* reaping again in case we deferred reaping due to an outstanding VIO.
*
* @param completion The flush VIO
**/
static void completeReaping(VDOCompletion *completion)
{
VIOPoolEntry *entry = completion->parent;
SlabJournal *journal = entry->parent;
returnVIO(journal->slab->allocator, entry);
finishReaping(journal);
reapSlabJournal(journal);
}
/**
* Handle an error flushing the lower layer.
*
* @param completion The flush VIO
**/
static void handleFlushError(VDOCompletion *completion)
{
SlabJournal *journal = ((VIOPoolEntry *) completion->parent)->parent;
enterJournalReadOnlyMode(journal, completion->result);
completeReaping(completion);
}
/**
* Waiter callback for getting a VIO with which to flush the lower layer prior
* to reaping.
*
* @param waiter The journal as a flush waiter
* @param vioContext The newly acquired flush VIO
**/
static void flushForReaping(Waiter *waiter, void *vioContext)
{
SlabJournal *journal = slabJournalFromFlushWaiter(waiter);
VIOPoolEntry *entry = vioContext;
VIO *vio = entry->vio;
entry->parent = journal;
vio->completion.callbackThreadID = journal->slab->allocator->threadID;
launchFlush(vio, completeReaping, handleFlushError);
}
/**
* Conduct a reap on a slab journal to reclaim unreferenced blocks.
*
* @param journal The slab journal
**/
static void reapSlabJournal(SlabJournal *journal)
{
if (isReaping(journal)) {
// We already have a reap in progress so wait for it to finish.
return;
}
if (isUnrecoveredSlab(journal->slab) || !isNormal(&journal->slab->state)
|| isVDOReadOnly(journal)) {
// We must not reap in the first two cases, and there's no point in
// read-only mode.
return;
}
/*
* Start reclaiming blocks only when the journal head has no references. Then
* stop when a block is referenced or reap reaches the most recently written
* block, referenced by the slab summary, which has the sequence number just
* before the tail.
*/
bool reaped = false;
while ((journal->unreapable < journal->tail)
&& (journal->reapLock->count == 0)) {
reaped = true;
journal->unreapable++;
journal->reapLock++;
if (journal->reapLock == &journal->locks[journal->size]) {
journal->reapLock = &journal->locks[0];
}
}
if (!reaped) {
return;
}
PhysicalLayer *layer = journal->slab->allocator->completion.layer;
if (layer->getWritePolicy(layer) == WRITE_POLICY_SYNC) {
finishReaping(journal);
return;
}
/*
* In async mode, it is never safe to reap a slab journal block without first
* issuing a flush, regardless of whether a user flush has been received or
* not. In the absence of the flush, the reference block write which released
* the locks allowing the slab journal to reap may not be persisted. Although
* slab summary writes will eventually issue flushes, multiple slab journal
* block writes can be issued while previous slab summary updates have not
* yet been made. Even though those slab journal block writes will be ignored
* if the slab summary update is not persisted, they may still overwrite the
* to-be-reaped slab journal block resulting in a loss of reference count
* updates (VDO-2912).
*
* In sync mode, it is similarly unsafe. However, we cannot possibly make
* those additional slab journal block writes due to the blocking threshold
* and the recovery journal's flush policy of flushing before every block.
* We may make no more than (number of VIOs) entries in slab journals since
* the last recovery journal flush; thus, due to the size of the slab
* journal blocks, the RJ must have flushed the storage no more than one
* slab journal block ago. So we could only overwrite the to-be-reaped block
* if we wrote and flushed the last block in the journal. But the blocking
* threshold prevents that.
*/
journal->flushWaiter.callback = flushForReaping;
int result = acquireVIO(journal->slab->allocator, &journal->flushWaiter);
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
return;
}
}
/**
* This is the callback invoked after a slab summary update completes. It
* is registered in the constructor on behalf of updateTailBlockLocation().
*
* Implements WaiterCallback.
*
* @param waiter The slab summary waiter that has just been notified
* @param context The result code of the update
**/
static void releaseJournalLocks(Waiter *waiter, void *context)
{
SlabJournal *journal = slabJournalFromSlabSummaryWaiter(waiter);
int result = *((int *) context);
if (result != VDO_SUCCESS) {
if (result != VDO_READ_ONLY) {
// Don't bother logging what might be lots of errors if we are already
// in read-only mode.
logErrorWithStringError(result, "failed slab summary update %llu",
journal->summarized);
}
journal->updatingSlabSummary = false;
enterJournalReadOnlyMode(journal, result);
return;
}
if (journal->partialWriteInProgress
&& (journal->summarized == journal->tail)) {
journal->partialWriteInProgress = false;
addEntries(journal);
}
SequenceNumber first = journal->lastSummarized;
journal->lastSummarized = journal->summarized;
for (SequenceNumber i = journal->summarized - 1; i >= first; i--) {
// Release the lock the summarized block held on the recovery journal.
// (During replay, recoveryStart will always be 0.)
if (journal->recoveryJournal != NULL) {
ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
releaseRecoveryJournalBlockReference(journal->recoveryJournal,
getLock(journal, i)->recoveryStart,
ZONE_TYPE_PHYSICAL,
zoneNumber);
}
// Release our own lock against reaping for blocks that are committed.
// (This function will not change locks during replay.)
adjustSlabJournalBlockReference(journal, i, -1);
}
journal->updatingSlabSummary = false;
reapSlabJournal(journal);
// Check if the slab summary needs to be updated again.
updateTailBlockLocation(journal);
}
/**
* Update the tail block location in the slab summary, if necessary.
*
* @param journal The slab journal that is updating its tail block location
**/
static void updateTailBlockLocation(SlabJournal *journal)
{
if (journal->updatingSlabSummary || isVDOReadOnly(journal)
|| (journal->lastSummarized >= journal->nextCommit)) {
checkIfSlabDrained(journal->slab);
return;
}
BlockCount freeBlockCount;
if (isUnrecoveredSlab(journal->slab)) {
freeBlockCount = getSummarizedFreeBlockCount(journal->summary,
journal->slab->slabNumber);
} else {
freeBlockCount = getSlabFreeBlockCount(journal->slab);
}
journal->summarized = journal->nextCommit;
journal->updatingSlabSummary = true;
/*
* Update slab summary as dirty.
* Slab journal can only reap past sequence number 1 when all the refCounts
* for this slab have been written to the layer. Therefore, indicate that the
* refCounts must be loaded when the journal head has reaped past sequence
* number 1.
*/
TailBlockOffset blockOffset
= getSlabJournalBlockOffset(journal, journal->summarized);
updateSlabSummaryEntry(journal->summary, &journal->slabSummaryWaiter,
journal->slab->slabNumber, blockOffset,
(journal->head > 1), false, freeBlockCount);
}
/**********************************************************************/
void reopenSlabJournal(SlabJournal *journal)
{
ASSERT_LOG_ONLY(journal->tailHeader.entryCount == 0,
"Slab journal's active block empty before reopening");
journal->head = journal->tail;
initializeJournalState(journal);
// Ensure no locks are spuriously held on an empty journal.
for (SequenceNumber block = 1; block <= journal->size; block++) {
ASSERT_LOG_ONLY((getLock(journal, block)->count == 0),
"Scrubbed journal's block %llu is not locked",
block);
}
addEntries(journal);
}
/**********************************************************************/
static SequenceNumber getCommittingSequenceNumber(const VIOPoolEntry *entry)
{
const PackedSlabJournalBlock *block = entry->buffer;
return getUInt64LE(block->header.fields.sequenceNumber);
}
/**
* Handle post-commit processing. This is the callback registered by
* writeSlabJournalBlock().
*
* @param completion The write VIO as a completion
**/
static void completeWrite(VDOCompletion *completion)
{
int writeResult = completion->result;
VIOPoolEntry *entry = completion->parent;
SlabJournal *journal = entry->parent;
SequenceNumber committed = getCommittingSequenceNumber(entry);
unspliceRingNode(&entry->node);
returnVIO(journal->slab->allocator, entry);
if (writeResult != VDO_SUCCESS) {
logErrorWithStringError(writeResult,
"cannot write slab journal block %llu",
committed);
enterJournalReadOnlyMode(journal, writeResult);
return;
}
relaxedAdd64(&journal->events->blocksWritten, 1);
if (isRingEmpty(&journal->uncommittedBlocks)) {
// If no blocks are outstanding, then the commit point is at the tail.
journal->nextCommit = journal->tail;
} else {
// The commit point is always the beginning of the oldest incomplete block.
VIOPoolEntry *oldest = asVIOPoolEntry(journal->uncommittedBlocks.next);
journal->nextCommit = getCommittingSequenceNumber(oldest);
}
updateTailBlockLocation(journal);
}
/**
* Callback from acquireVIO() registered in commitSlabJournalTail().
*
* @param waiter The VIO pool waiter which was just notified
* @param vioContext The VIO pool entry for the write
**/
static void writeSlabJournalBlock(Waiter *waiter, void *vioContext)
{
SlabJournal *journal = slabJournalFromResourceWaiter(waiter);
VIOPoolEntry *entry = vioContext;
SlabJournalBlockHeader *header = &journal->tailHeader;
header->head = journal->head;
pushRingNode(&journal->uncommittedBlocks, &entry->node);
packSlabJournalBlockHeader(header, &journal->block->header);
// Copy the tail block into the VIO.
memcpy(entry->buffer, journal->block, VDO_BLOCK_SIZE);
int unusedEntries = journal->entriesPerBlock - header->entryCount;
ASSERT_LOG_ONLY(unusedEntries >= 0, "Slab journal block is not overfull");
if (unusedEntries > 0) {
// Release the per-entry locks for any unused entries in the block we are
// about to write.
adjustSlabJournalBlockReference(journal, header->sequenceNumber,
-unusedEntries);
journal->partialWriteInProgress = !blockIsFull(journal);
}
PhysicalBlockNumber blockNumber
= getBlockNumber(journal, header->sequenceNumber);
entry->parent = journal;
entry->vio->completion.callbackThreadID = journal->slab->allocator->threadID;
/*
* This block won't be read in recovery until the slab summary is updated
* to refer to it. The slab summary update does a flush which is sufficient
* to protect us from VDO-2331.
*/
launchWriteMetadataVIO(entry->vio, blockNumber, completeWrite,
completeWrite);
// Since the write is submitted, the tail block structure can be reused.
journal->tail++;
initializeTailBlock(journal);
journal->waitingToCommit = false;
if (journal->slab->state.state == ADMIN_STATE_WAITING_FOR_RECOVERY) {
finishOperationWithResult(&journal->slab->state,
(isVDOReadOnly(journal)
? VDO_READ_ONLY : VDO_SUCCESS));
return;
}
addEntries(journal);
}
/**********************************************************************/
void commitSlabJournalTail(SlabJournal *journal)
{
if ((journal->tailHeader.entryCount == 0)
&& mustMakeEntriesToFlush(journal)) {
// There are no entries at the moment, but there are some waiters, so defer
// initiating the flush until those entries are ready to write.
return;
}
if (isVDOReadOnly(journal)
|| journal->waitingToCommit
|| (journal->tailHeader.entryCount == 0)) {
// There is nothing to do since the tail block is empty, or writing, or
// the journal is in read-only mode.
return;
}
/*
* Since we are about to commit the tail block, this journal no longer
* needs to be on the ring of journals which the recovery journal might
* ask to commit.
*/
markSlabJournalClean(journal);
journal->waitingToCommit = true;
journal->resourceWaiter.callback = writeSlabJournalBlock;
int result = acquireVIO(journal->slab->allocator, &journal->resourceWaiter);
if (result != VDO_SUCCESS) {
journal->waitingToCommit = false;
enterJournalReadOnlyMode(journal, result);
return;
}
}
/**********************************************************************/
void encodeSlabJournalEntry(SlabJournalBlockHeader *tailHeader,
SlabJournalPayload *payload,
SlabBlockNumber sbn,
JournalOperation operation)
{
JournalEntryCount entryNumber = tailHeader->entryCount++;
if (operation == BLOCK_MAP_INCREMENT) {
if (!tailHeader->hasBlockMapIncrements) {
memset(payload->fullEntries.entryTypes, 0,
SLAB_JOURNAL_ENTRY_TYPES_SIZE);
tailHeader->hasBlockMapIncrements = true;
}
payload->fullEntries.entryTypes[entryNumber / 8]
|= ((byte) 1 << (entryNumber % 8));
}
packSlabJournalEntry(&payload->entries[entryNumber], sbn,
isIncrementOperation(operation));
}
/**********************************************************************/
SlabJournalEntry decodeSlabJournalEntry(PackedSlabJournalBlock *block,
JournalEntryCount entryCount)
{
SlabJournalEntry entry
= unpackSlabJournalEntry(&block->payload.entries[entryCount]);
if (block->header.fields.hasBlockMapIncrements
&& ((block->payload.fullEntries.entryTypes[entryCount / 8]
& ((byte) 1 << (entryCount % 8))) != 0)) {
entry.operation = BLOCK_MAP_INCREMENT;
}
return entry;
}
/**
* Actually add an entry to the slab journal, potentially firing off a write
* if a block becomes full. This function is synchronous.
*
* @param journal The slab journal to append to
* @param pbn The pbn being adjusted
* @param operation The type of entry to make
* @param recoveryPoint The recovery journal point for this entry
**/
static void addEntry(SlabJournal *journal,
PhysicalBlockNumber pbn,
JournalOperation operation,
const JournalPoint *recoveryPoint)
{
int result = ASSERT(beforeJournalPoint(&journal->tailHeader.recoveryPoint,
recoveryPoint),
"recovery journal point is monotonically increasing, "
"recovery point: %llu.%u, "
"block recovery point: %llu.%u",
recoveryPoint->sequenceNumber, recoveryPoint->entryCount,
journal->tailHeader.recoveryPoint.sequenceNumber,
journal->tailHeader.recoveryPoint.entryCount);
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
return;
}
PackedSlabJournalBlock *block = journal->block;
if (operation == BLOCK_MAP_INCREMENT) {
result = ASSERT_LOG_ONLY((journal->tailHeader.entryCount
< journal->fullEntriesPerBlock),
"block has room for full entries");
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
return;
}
}
encodeSlabJournalEntry(&journal->tailHeader, &block->payload,
pbn - journal->slab->start, operation);
journal->tailHeader.recoveryPoint = *recoveryPoint;
if (blockIsFull(journal)) {
commitSlabJournalTail(journal);
}
}
/**********************************************************************/
bool attemptReplayIntoSlabJournal(SlabJournal *journal,
PhysicalBlockNumber pbn,
JournalOperation operation,
JournalPoint *recoveryPoint,
VDOCompletion *parent)
{
// Only accept entries after the current recovery point.
if (!beforeJournalPoint(&journal->tailHeader.recoveryPoint, recoveryPoint)) {
return true;
}
SlabJournalBlockHeader *header = &journal->tailHeader;
if ((header->entryCount >= journal->fullEntriesPerBlock)
&& (header->hasBlockMapIncrements ||
(operation == BLOCK_MAP_INCREMENT))) {
// The tail block does not have room for the entry we are attempting
// to add so commit the tail block now.
commitSlabJournalTail(journal);
}
if (journal->waitingToCommit) {
startOperationWithWaiter(&journal->slab->state,
ADMIN_STATE_WAITING_FOR_RECOVERY, parent, NULL);
return false;
}
if ((journal->tail - journal->head) >= journal->size) {
/*
* We must have reaped the current head before the crash, since
* the blocked threshold keeps us from having more entries than
* fit in a slab journal; hence we can just advance the head
* (and unreapable block), as needed.
*/
journal->head++;
journal->unreapable++;
}
markSlabReplaying(journal->slab);
addEntry(journal, pbn, operation, recoveryPoint);
return true;
}
/**
* Check whether the journal should be saving reference blocks out.
*
* @param journal The journal to check
*
* @return true if the journal should be requesting reference block writes
**/
static bool requiresFlushing(const SlabJournal *journal)
{
BlockCount journalLength = (journal->tail - journal->head);
return (journalLength >= journal->flushingThreshold);
}
/**
* Check whether the journal must be reaped before adding new entries.
*
* @param journal The journal to check
*
* @return true if the journal must be reaped
**/
static bool requiresReaping(const SlabJournal *journal)
{
BlockCount journalLength = (journal->tail - journal->head);
return (journalLength >= journal->blockingThreshold);
}
/**********************************************************************/
bool requiresScrubbing(const SlabJournal *journal)
{
BlockCount journalLength = (journal->tail - journal->head);
return (journalLength >= journal->scrubbingThreshold);
}
/**
* Implements WaiterCallback. This callback is invoked by addEntries() once
* it has determined that we are ready to make another entry in the slab
* journal.
*
* @param waiter The VIO which should make an entry now
* @param context The slab journal to make an entry in
**/
static void addEntryFromWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
SlabJournal *journal = (SlabJournal *) context;
SlabJournalBlockHeader *header = &journal->tailHeader;
SequenceNumber recoveryBlock = dataVIO->recoveryJournalPoint.sequenceNumber;
if (header->entryCount == 0) {
/*
* This is the first entry in the current tail block, so get a lock
* on the recovery journal which we will hold until this tail block is
* committed.
*/
getLock(journal, header->sequenceNumber)->recoveryStart = recoveryBlock;
if (journal->recoveryJournal != NULL) {
ZoneCount zoneNumber = journal->slab->allocator->zoneNumber;
acquireRecoveryJournalBlockReference(journal->recoveryJournal,
recoveryBlock, ZONE_TYPE_PHYSICAL,
zoneNumber);
}
markSlabJournalDirty(journal, recoveryBlock);
// If the slab journal is over the first threshold, tell the refCounts to
// write some reference blocks, but proceed apace.
if (requiresFlushing(journal)) {
relaxedAdd64(&journal->events->flushCount, 1);
BlockCount journalLength = (journal->tail - journal->head);
BlockCount blocksToDeadline = 0;
if (journalLength <= journal->flushingDeadline) {
blocksToDeadline = journal->flushingDeadline - journalLength;
}
saveSeveralReferenceBlocks(journal->slab->referenceCounts,
blocksToDeadline + 1);
}
}
JournalPoint slabJournalPoint = {
.sequenceNumber = header->sequenceNumber,
.entryCount = header->entryCount,
};
addEntry(journal, dataVIO->operation.pbn, dataVIO->operation.type,
&dataVIO->recoveryJournalPoint);
// Now that an entry has been made in the slab journal, update the
// reference counts.
int result = modifySlabReferenceCount(journal->slab, &slabJournalPoint,
dataVIO->operation);
continueDataVIO(dataVIO, result);
}
/**
* Check whether the next entry to be made is a block map increment.
*
* @param journal The journal
*
* @return <code>true</code> if the first entry waiter's operation is a block
* map increment
**/
static inline bool isNextEntryABlockMapIncrement(SlabJournal *journal)
{
DataVIO *dataVIO = waiterAsDataVIO(getFirstWaiter(&journal->entryWaiters));
return (dataVIO->operation.type == BLOCK_MAP_INCREMENT);
}
/**
* Add as many entries as possible from the queue of VIOs waiting to make
* entries. By processing the queue in order, we ensure that slab journal
* entries are made in the same order as recovery journal entries for the
* same increment or decrement.
*
* @param journal The journal to which entries may be added
**/
static void addEntries(SlabJournal *journal)
{
if (journal->addingEntries) {
// Protect against re-entrancy.
return;
}
journal->addingEntries = true;
while (hasWaiters(&journal->entryWaiters)) {
if (journal->partialWriteInProgress || slabIsRebuilding(journal->slab)) {
// Don't add entries while rebuilding or while a partial write is
// outstanding (VDO-2399).
break;
}
SlabJournalBlockHeader *header = &journal->tailHeader;
if (journal->waitingToCommit) {
// If we are waiting for resources to write the tail block, and the
// tail block is full, we can't make another entry.
relaxedAdd64(&journal->events->tailBusyCount, 1);
break;
} else if (isNextEntryABlockMapIncrement(journal)
&& (header->entryCount >= journal->fullEntriesPerBlock)) {
// The tail block does not have room for a block map increment, so
// commit it now.
commitSlabJournalTail(journal);
if (journal->waitingToCommit) {
relaxedAdd64(&journal->events->tailBusyCount, 1);
break;
}
}
// If the slab is over the blocking threshold, make the VIO wait.
if (requiresReaping(journal)) {
relaxedAdd64(&journal->events->blockedCount, 1);
saveDirtyReferenceBlocks(journal->slab->referenceCounts);
break;
}
if (header->entryCount == 0) {
JournalLock *lock = getLock(journal, header->sequenceNumber);
// Check if the on disk slab journal is full. Because of the
// blocking and scrubbing thresholds, this should never happen.
if (lock->count > 0) {
ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
"New block has locks, but journal is not full");
/*
* The blocking threshold must let the journal fill up if the new
* block has locks; if the blocking threshold is smaller than the
* journal size, the new block cannot possibly have locks already.
*/
ASSERT_LOG_ONLY((journal->blockingThreshold >= journal->size),
"New block can have locks already iff blocking"
"threshold is at the end of the journal");
relaxedAdd64(&journal->events->diskFullCount, 1);
saveDirtyReferenceBlocks(journal->slab->referenceCounts);
break;
}
/*
* Don't allow the new block to be reaped until all of the reference
* count blocks are written and the journal block has been
* fully committed as well.
*/
lock->count = journal->entriesPerBlock + 1;
if (header->sequenceNumber == 1) {
/*
* This is the first entry in this slab journal, ever. Dirty all of
* the reference count blocks. Each will acquire a lock on the
* tail block so that the journal won't be reaped until the
* reference counts are initialized. The lock acquisition must
* be done by the RefCounts since here we don't know how many
* reference blocks the RefCounts has.
*/
acquireDirtyBlockLocks(journal->slab->referenceCounts);
}
}
notifyNextWaiter(&journal->entryWaiters, addEntryFromWaiter, journal);
}
journal->addingEntries = false;
// If there are no waiters, and we are flushing or saving, commit the
// tail block.
if (isSlabDraining(journal->slab) && !isSuspending(&journal->slab->state)
&& !hasWaiters(&journal->entryWaiters)) {
commitSlabJournalTail(journal);
}
}
/**********************************************************************/
void addSlabJournalEntry(SlabJournal *journal, DataVIO *dataVIO)
{
if (!isSlabOpen(journal->slab)) {
continueDataVIO(dataVIO, VDO_INVALID_ADMIN_STATE);
return;
}
if (isVDOReadOnly(journal)) {
continueDataVIO(dataVIO, VDO_READ_ONLY);
return;
}
int result = enqueueDataVIO(&journal->entryWaiters, dataVIO,
THIS_LOCATION("$F($j-$js)"));
if (result != VDO_SUCCESS) {
continueDataVIO(dataVIO, result);
return;
}
if (isUnrecoveredSlab(journal->slab) && requiresReaping(journal)) {
increaseScrubbingPriority(journal->slab);
}
addEntries(journal);
}
/**********************************************************************/
void adjustSlabJournalBlockReference(SlabJournal *journal,
SequenceNumber sequenceNumber,
int adjustment)
{
if (sequenceNumber == 0) {
return;
}
if (isReplayingSlab(journal->slab)) {
// Locks should not be used during offline replay.
return;
}
ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
JournalLock *lock = getLock(journal, sequenceNumber);
if (adjustment < 0) {
ASSERT_LOG_ONLY((-adjustment <= lock->count),
"adjustment %d of lock count %u for slab journal block %"
PRIu64 " must not underflow", adjustment, lock->count,
sequenceNumber);
}
lock->count += adjustment;
if (lock->count == 0) {
reapSlabJournal(journal);
}
}
/**********************************************************************/
bool releaseRecoveryJournalLock(SlabJournal *journal,
SequenceNumber recoveryLock)
{
if (recoveryLock > journal->recoveryLock) {
ASSERT_LOG_ONLY((recoveryLock < journal->recoveryLock),
"slab journal recovery lock is not older than the recovery"
" journal head");
return false;
}
if ((recoveryLock < journal->recoveryLock) || isVDOReadOnly(journal)) {
return false;
}
// All locks are held by the block which is in progress; write it.
commitSlabJournalTail(journal);
return true;
}
/**********************************************************************/
void drainSlabJournal(SlabJournal *journal)
{
ASSERT_LOG_ONLY((getCallbackThreadID()
== journal->slab->allocator->threadID),
"drainSlabJournal() called on correct thread");
if (isQuiescing(&journal->slab->state)) {
// XXX: we should revisit this assertion since it is no longer clear what
// it is for.
ASSERT_LOG_ONLY((!(slabIsRebuilding(journal->slab)
&& hasWaiters(&journal->entryWaiters))),
"slab is recovered or has no waiters");
}
switch (journal->slab->state.state) {
case ADMIN_STATE_REBUILDING:
case ADMIN_STATE_SUSPENDING:
case ADMIN_STATE_SAVE_FOR_SCRUBBING:
break;
default:
commitSlabJournalTail(journal);
}
}
/**
* Finish the decode process by returning the VIO and notifying the slab that
* we're done.
*
* @param completion The VIO as a completion
**/
static void finishDecodingJournal(VDOCompletion *completion)
{
int result = completion->result;
VIOPoolEntry *entry = completion->parent;
SlabJournal *journal = entry->parent;
returnVIO(journal->slab->allocator, entry);
notifySlabJournalIsLoaded(journal->slab, result);
}
/**
* Set up the in-memory journal state to the state which was written to disk.
* This is the callback registered in readSlabJournalTail().
*
* @param completion The VIO which was used to read the journal tail
**/
static void setDecodedState(VDOCompletion *completion)
{
VIOPoolEntry *entry = completion->parent;
SlabJournal *journal = entry->parent;
PackedSlabJournalBlock *block = entry->buffer;
SlabJournalBlockHeader header;
unpackSlabJournalBlockHeader(&block->header, &header);
if ((header.metadataType != VDO_METADATA_SLAB_JOURNAL)
|| (header.nonce != journal->slab->allocator->nonce)) {
finishDecodingJournal(completion);
return;
}
journal->tail = header.sequenceNumber + 1;
// If the slab is clean, this implies the slab journal is empty, so advance
// the head appropriately.
if (getSummarizedCleanliness(journal->summary, journal->slab->slabNumber)) {
journal->head = journal->tail;
} else {
journal->head = header.head;
}
journal->tailHeader = header;
initializeJournalState(journal);
finishDecodingJournal(completion);
}
/**
* This reads the slab journal tail block by using a VIO acquired from the VIO
* pool. This is the success callback from acquireVIOFromPool() when decoding
* the slab journal.
*
* @param waiter The VIO pool waiter which has just been notified
* @param vioContext The VIO pool entry given to the waiter
**/
static void readSlabJournalTail(Waiter *waiter, void *vioContext)
{
SlabJournal *journal = slabJournalFromResourceWaiter(waiter);
Slab *slab = journal->slab;
VIOPoolEntry *entry = vioContext;
TailBlockOffset lastCommitPoint
= getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
entry->parent = journal;
// Slab summary keeps the commit point offset, so the tail block is the
// block before that. Calculation supports small journals in unit tests.
TailBlockOffset tailBlock = ((lastCommitPoint == 0)
? (TailBlockOffset) (journal->size - 1)
: (lastCommitPoint - 1));
entry->vio->completion.callbackThreadID = slab->allocator->threadID;
launchReadMetadataVIO(entry->vio, slab->journalOrigin + tailBlock,
setDecodedState, finishDecodingJournal);
}
/**********************************************************************/
void decodeSlabJournal(SlabJournal *journal)
{
ASSERT_LOG_ONLY((getCallbackThreadID()
== journal->slab->allocator->threadID),
"decodeSlabJournal() called on correct thread");
Slab *slab = journal->slab;
TailBlockOffset lastCommitPoint
= getSummarizedTailBlockOffset(journal->summary, slab->slabNumber);
if ((lastCommitPoint == 0)
&& !mustLoadRefCounts(journal->summary, slab->slabNumber)) {
/*
* This slab claims that it has a tail block at (journal->size - 1), but
* a head of 1. This is impossible, due to the scrubbing threshold, on
* a real system, so don't bother reading the (bogus) data off disk.
*/
ASSERT_LOG_ONLY(((journal->size < 16)
|| (journal->scrubbingThreshold < (journal->size - 1))),
"Scrubbing threshold protects against reads of unwritten"
"slab journal blocks");
notifySlabJournalIsLoaded(slab, VDO_SUCCESS);
return;
}
journal->resourceWaiter.callback = readSlabJournalTail;
int result = acquireVIO(slab->allocator, &journal->resourceWaiter);
if (result != VDO_SUCCESS) {
notifySlabJournalIsLoaded(slab, result);
}
}
/**********************************************************************/
void dumpSlabJournal(const SlabJournal *journal)
{
logInfo(" slab journal: entryWaiters=%zu waitingToCommit=%s"
" updatingSlabSummary=%s head=%llu unreapable=%" PRIu64
" tail=%llu nextCommit=%llu summarized=%" PRIu64
" lastSummarized=%llu recoveryJournalLock=%" PRIu64
" dirty=%s", countWaiters(&journal->entryWaiters),
boolToString(journal->waitingToCommit),
boolToString(journal->updatingSlabSummary),
journal->head, journal->unreapable, journal->tail,
journal->nextCommit, journal->summarized, journal->lastSummarized,
journal->recoveryLock,
boolToString(isSlabJournalDirty(journal)));
// Given the frequency with which the locks are just a tiny bit off, it
// might be worth dumping all the locks, but that might be too much logging.
}