/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/recoveryJournal.c#30 $
*/
#include "recoveryJournal.h"
#include "recoveryJournalInternals.h"
#include "buffer.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "blockMap.h"
#include "constants.h"
#include "dataVIO.h"
#include "extent.h"
#include "header.h"
#include "numUtils.h"
#include "packedRecoveryJournalBlock.h"
#include "recoveryJournalBlock.h"
#include "slabDepot.h"
#include "slabJournal.h"
#include "waitQueue.h"
typedef struct {
SequenceNumber journalStart; // Sequence number to start the journal
BlockCount logicalBlocksUsed; // Number of logical blocks used by VDO
BlockCount blockMapDataBlocks; // Number of block map pages allocated
} __attribute__((packed)) RecoveryJournalState7_0;
static const Header RECOVERY_JOURNAL_HEADER_7_0 = {
.id = RECOVERY_JOURNAL,
.version = {
.majorVersion = 7,
.minorVersion = 0,
},
.size = sizeof(RecoveryJournalState7_0),
};
static const uint64_t RECOVERY_COUNT_MASK = 0xff;
enum {
/*
* The number of reserved blocks must be large enough to prevent a
* new recovery journal block write from overwriting a block which
* appears to still be a valid head block of the journal. Currently,
* that means reserving enough space for all 2048 VIOs, or 8 blocks.
*/
RECOVERY_JOURNAL_RESERVED_BLOCKS = 8,
};
/**********************************************************************/
const char *getJournalOperationName(JournalOperation operation)
{
switch (operation) {
case DATA_DECREMENT:
return "data decrement";
case DATA_INCREMENT:
return "data increment";
case BLOCK_MAP_DECREMENT:
return "block map decrement";
case BLOCK_MAP_INCREMENT:
return "block map increment";
default:
return "unknown journal operation";
}
}
/**
* Get a block from the end of the free list.
*
* @param journal The journal
*
* @return The block or NULL
if the list is empty
**/
static RecoveryJournalBlock *popFreeList(RecoveryJournal *journal)
{
return blockFromRingNode(popRingNode(&journal->freeTailBlocks));
}
/**
* Get a block from the end of the active list.
*
* @param journal The journal
*
* @return The block or NULL
if the list is empty
**/
static RecoveryJournalBlock *popActiveList(RecoveryJournal *journal)
{
return blockFromRingNode(popRingNode(&journal->activeTailBlocks));
}
/**
* Assert that we are running on the journal thread.
*
* @param journal The journal
* @param functionName The function doing the check (for logging)
**/
static void assertOnJournalThread(RecoveryJournal *journal,
const char *functionName)
{
ASSERT_LOG_ONLY((getCallbackThreadID() == journal->threadID),
"%s() called on journal thread", functionName);
}
/**
* WaiterCallback implementation invoked whenever a DataVIO is to be released
* from the journal, either because its entry was committed to disk,
* or because there was an error.
**/
static void continueWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
dataVIOAddTraceRecord(dataVIO,
THIS_LOCATION("$F($j-$js);"
"cb=continueJournalWaiter($j-$js)"));
int waitResult = *((int *) context);
continueDataVIO(dataVIO, waitResult);
}
/**
* Check whether the journal has any waiters on any blocks.
*
* @param journal The journal in question
*
* @return true
if any block has a waiter
**/
static inline bool hasBlockWaiters(RecoveryJournal *journal)
{
// Either the first active tail block (if it exists) has waiters,
// or no active tail block has waiters.
if (isRingEmpty(&journal->activeTailBlocks)) {
return false;
}
RecoveryJournalBlock *block
= blockFromRingNode(journal->activeTailBlocks.next);
return (hasWaiters(&block->entryWaiters)
|| hasWaiters(&block->commitWaiters));
}
/**********************************************************************/
static void recycleJournalBlocks(RecoveryJournal *block);
static void recycleJournalBlock(RecoveryJournalBlock *block);
static void notifyCommitWaiters(RecoveryJournal *journal);
/**
* Check whether the journal has drained.
*
* @param journal The journal which may have just drained
**/
static void checkForDrainComplete(RecoveryJournal *journal)
{
int result = VDO_SUCCESS;
if (isReadOnly(journal->readOnlyNotifier)) {
result = VDO_READ_ONLY;
/*
* Clean up any full active blocks which were not written due to being
* in read-only mode.
*
* XXX: This would probably be better as a short-circuit in writeBlock().
*/
notifyCommitWaiters(journal);
recycleJournalBlocks(journal);
// Release any DataVIOs waiting to be assigned entries.
notifyAllWaiters(&journal->decrementWaiters, continueWaiter, &result);
notifyAllWaiters(&journal->incrementWaiters, continueWaiter, &result);
}
if (!isDraining(&journal->state)
|| journal->reaping || hasBlockWaiters(journal)
|| hasWaiters(&journal->incrementWaiters)
|| hasWaiters(&journal->decrementWaiters)
|| !suspendLockCounter(journal->lockCounter)) {
return;
}
if (isSaving(&journal->state)) {
if (journal->activeBlock != NULL) {
ASSERT_LOG_ONLY(((result == VDO_READ_ONLY)
|| !isRecoveryBlockDirty(journal->activeBlock)),
"journal being saved has clean active block");
recycleJournalBlock(journal->activeBlock);
}
ASSERT_LOG_ONLY(isRingEmpty(&journal->activeTailBlocks),
"all blocks in a journal being saved must be inactive");
}
finishDrainingWithResult(&journal->state, result);
}
/**
* Notifiy a recovery journal that the VDO has gone read-only.
*
*
Implements ReadOnlyNotification.
*
* @param listener The journal
* @param parent The completion to notify in order to acknowledge the
* notification
**/
static void notifyRecoveryJournalOfReadOnlyMode(void *listener,
VDOCompletion *parent)
{
checkForDrainComplete(listener);
completeCompletion(parent);
}
/**
* Put the journal in read-only mode. All attempts to add entries after
* this function is called will fail. All VIOs waiting for commits will be
* awakened with an error.
*
* @param journal The journal which has failed
* @param errorCode The error result triggering this call
**/
static void enterJournalReadOnlyMode(RecoveryJournal *journal, int errorCode)
{
enterReadOnlyMode(journal->readOnlyNotifier, errorCode);
checkForDrainComplete(journal);
}
/**********************************************************************/
SequenceNumber getCurrentJournalSequenceNumber(RecoveryJournal *journal)
{
return journal->tail;
}
/**
* Get the head of the recovery journal, which is the lowest sequence number of
* the block map head and the slab journal head.
*
* @param journal The journal
*
* @return the head of the journal
**/
static inline SequenceNumber getRecoveryJournalHead(RecoveryJournal *journal)
{
return minSequenceNumber(journal->blockMapHead, journal->slabJournalHead);
}
/**
* Compute the recovery count byte for a given recovery count.
*
* @param recoveryCount The recovery count
*
* @return The byte corresponding to the recovery count
**/
__attribute__((warn_unused_result))
static inline uint8_t computeRecoveryCountByte(uint64_t recoveryCount)
{
return (uint8_t) (recoveryCount & RECOVERY_COUNT_MASK);
}
/**
* Check whether the journal is over the threshold, and if so, force the oldest
* slab journal tail block to commit.
*
* @param journal The journal
**/
static void checkSlabJournalCommitThreshold(RecoveryJournal *journal)
{
BlockCount currentLength = journal->tail - journal->slabJournalHead;
if (currentLength > journal->slabJournalCommitThreshold) {
journal->events.slabJournalCommitsRequested++;
commitOldestSlabJournalTailBlocks(journal->depot,
journal->slabJournalHead);
}
}
/**********************************************************************/
static void reapRecoveryJournal(RecoveryJournal *journal);
static void assignEntries(RecoveryJournal *journal);
/**
* Finish reaping the journal.
*
* @param journal The journal being reaped
**/
static void finishReaping(RecoveryJournal *journal)
{
SequenceNumber oldHead = getRecoveryJournalHead(journal);
journal->blockMapHead = journal->blockMapReapHead;
journal->slabJournalHead = journal->slabJournalReapHead;
BlockCount blocksReaped = getRecoveryJournalHead(journal) - oldHead;
journal->availableSpace += blocksReaped * journal->entriesPerBlock;
journal->reaping = false;
checkSlabJournalCommitThreshold(journal);
assignEntries(journal);
checkForDrainComplete(journal);
}
/**
* Finish reaping the journal after flushing the lower layer. This is the
* callback registered in reapRecoveryJournal().
*
* @param completion The journal's flush VIO
**/
static void completeReaping(VDOCompletion *completion)
{
RecoveryJournal *journal = completion->parent;
finishReaping(journal);
// Try reaping again in case more locks were released while flush was out.
reapRecoveryJournal(journal);
}
/**
* Handle an error when flushing the lower layer due to reaping.
*
* @param completion The journal's flush VIO
**/
static void handleFlushError(VDOCompletion *completion)
{
RecoveryJournal *journal = completion->parent;
journal->reaping = false;
enterJournalReadOnlyMode(journal, completion->result);
}
/**
* Set all journal fields appropriately to start journaling from the current
* active block.
*
* @param journal The journal to be reset based on its active block
**/
static void initializeJournalState(RecoveryJournal *journal)
{
journal->appendPoint.sequenceNumber = journal->tail;
journal->lastWriteAcknowledged = journal->tail;
journal->blockMapHead = journal->tail;
journal->slabJournalHead = journal->tail;
journal->blockMapReapHead = journal->tail;
journal->slabJournalReapHead = journal->tail;
journal->blockMapHeadBlockNumber
= getRecoveryJournalBlockNumber(journal, journal->blockMapHead);
journal->slabJournalHeadBlockNumber
= getRecoveryJournalBlockNumber(journal, journal->slabJournalHead);
}
/**********************************************************************/
BlockCount getRecoveryJournalLength(BlockCount journalSize)
{
BlockCount reservedBlocks = journalSize / 4;
if (reservedBlocks > RECOVERY_JOURNAL_RESERVED_BLOCKS) {
reservedBlocks = RECOVERY_JOURNAL_RESERVED_BLOCKS;
}
return (journalSize - reservedBlocks);
}
/**
* Attempt to reap the journal now that all the locks on some journal block
* have been released. This is the callback registered with the lock counter.
*
* @param completion The lock counter completion
**/
static void reapRecoveryJournalCallback(VDOCompletion *completion)
{
RecoveryJournal *journal = (RecoveryJournal *) completion->parent;
// The acknowledgement must be done before reaping so that there is no
// race between acknowledging the notification and unlocks wishing to notify.
acknowledgeUnlock(journal->lockCounter);
if (isQuiescing(&journal->state)) {
// Don't start reaping when the journal is trying to quiesce. Do check if
// this notification is the last thing the drain is waiting on.
checkForDrainComplete(journal);
return;
}
reapRecoveryJournal(journal);
checkSlabJournalCommitThreshold(journal);
}
/**********************************************************************
* Set the journal's tail sequence number.
*
* @param journal The journal whose tail is to be set
* @param tail The new tail value
**/
static void setJournalTail(RecoveryJournal *journal, SequenceNumber tail)
{
// VDO does not support sequence numbers above 1 << 48 in the slab journal.
if (tail >= (1ULL << 48)) {
enterJournalReadOnlyMode(journal, VDO_JOURNAL_OVERFLOW);
}
journal->tail = tail;
}
/**********************************************************************/
int makeRecoveryJournal(Nonce nonce,
PhysicalLayer *layer,
Partition *partition,
uint64_t recoveryCount,
BlockCount journalSize,
BlockCount tailBufferSize,
ReadOnlyNotifier *readOnlyNotifier,
const ThreadConfig *threadConfig,
RecoveryJournal **journalPtr)
{
RecoveryJournal *journal;
int result = ALLOCATE(1, RecoveryJournal, __func__, &journal);
if (result != VDO_SUCCESS) {
return result;
}
initializeRing(&journal->freeTailBlocks);
initializeRing(&journal->activeTailBlocks);
initializeWaitQueue(&journal->pendingWrites);
journal->threadID = getJournalZoneThread(threadConfig);
journal->partition = partition;
journal->nonce = nonce;
journal->recoveryCount = computeRecoveryCountByte(recoveryCount);
journal->size = journalSize;
journal->readOnlyNotifier = readOnlyNotifier;
journal->tail = 1;
journal->slabJournalCommitThreshold = (journalSize * 2) / 3;
initializeJournalState(journal);
journal->entriesPerBlock = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK;
BlockCount journalLength = getRecoveryJournalLength(journalSize);
journal->availableSpace = journal->entriesPerBlock * journalLength;
// Only make the tail buffer and VIO in normal operation since the formatter
// doesn't need them.
if (layer->createMetadataVIO != NULL) {
for (BlockCount i = 0; i < tailBufferSize; i++) {
RecoveryJournalBlock *block;
result = makeRecoveryBlock(layer, journal, &block);
if (result != VDO_SUCCESS) {
freeRecoveryJournal(&journal);
return result;
}
pushRingNode(&journal->freeTailBlocks, &block->ringNode);
}
result = makeLockCounter(layer, journal, reapRecoveryJournalCallback,
journal->threadID, threadConfig->logicalZoneCount,
threadConfig->physicalZoneCount, journal->size,
&journal->lockCounter);
if (result != VDO_SUCCESS) {
freeRecoveryJournal(&journal);
return result;
}
result = ALLOCATE(VDO_BLOCK_SIZE, char, "journal flush data",
&journal->unusedFlushVIOData);
if (result != VDO_SUCCESS) {
freeRecoveryJournal(&journal);
return result;
}
result = createVIO(layer, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH,
journal, journal->unusedFlushVIOData,
&journal->flushVIO);
if (result != VDO_SUCCESS) {
freeRecoveryJournal(&journal);
return result;
}
result = registerReadOnlyListener(readOnlyNotifier, journal,
notifyRecoveryJournalOfReadOnlyMode,
journal->threadID);
if (result != VDO_SUCCESS) {
freeRecoveryJournal(&journal);
return result;
}
journal->flushVIO->completion.callbackThreadID = journal->threadID;
}
*journalPtr = journal;
return VDO_SUCCESS;
}
/**********************************************************************/
void freeRecoveryJournal(RecoveryJournal **journalPtr)
{
RecoveryJournal *journal = *journalPtr;
if (journal == NULL) {
return;
}
freeLockCounter(&journal->lockCounter);
freeVIO(&journal->flushVIO);
FREE(journal->unusedFlushVIOData);
// XXX: eventually, the journal should be constructed in a quiescent state
// which requires opening before use.
if (!isQuiescent(&journal->state)) {
ASSERT_LOG_ONLY(isRingEmpty(&journal->activeTailBlocks),
"journal being freed has no active tail blocks");
} else if (!isSaved(&journal->state)
&& !isRingEmpty(&journal->activeTailBlocks)) {
logWarning("journal being freed has uncommited entries");
}
RecoveryJournalBlock *block;
while ((block = popActiveList(journal)) != NULL) {
freeRecoveryBlock(&block);
}
while ((block = popFreeList(journal)) != NULL) {
freeRecoveryBlock(&block);
}
FREE(journal);
*journalPtr = NULL;
}
/**********************************************************************/
void setRecoveryJournalPartition(RecoveryJournal *journal,
Partition *partition)
{
journal->partition = partition;
}
/**********************************************************************/
void initializeRecoveryJournalPostRecovery(RecoveryJournal *journal,
uint64_t recoveryCount,
SequenceNumber tail)
{
setJournalTail(journal, tail + 1);
journal->recoveryCount = computeRecoveryCountByte(recoveryCount);
initializeJournalState(journal);
}
/**********************************************************************/
void initializeRecoveryJournalPostRebuild(RecoveryJournal *journal,
uint64_t recoveryCount,
SequenceNumber tail,
BlockCount logicalBlocksUsed,
BlockCount blockMapDataBlocks)
{
initializeRecoveryJournalPostRecovery(journal, recoveryCount, tail);
journal->logicalBlocksUsed = logicalBlocksUsed;
journal->blockMapDataBlocks = blockMapDataBlocks;
}
/**********************************************************************/
BlockCount getJournalBlockMapDataBlocksUsed(RecoveryJournal *journal)
{
return journal->blockMapDataBlocks;
}
/**********************************************************************/
void setJournalBlockMapDataBlocksUsed(RecoveryJournal *journal,
BlockCount pages)
{
journal->blockMapDataBlocks = pages;
}
/**********************************************************************/
ThreadID getRecoveryJournalThreadID(RecoveryJournal *journal)
{
return journal->threadID;
}
/**********************************************************************/
void openRecoveryJournal(RecoveryJournal *journal,
SlabDepot *depot,
BlockMap *blockMap)
{
journal->depot = depot;
journal->blockMap = blockMap;
journal->state.state = ADMIN_STATE_NORMAL_OPERATION;
}
/**********************************************************************/
size_t getRecoveryJournalEncodedSize(void)
{
return ENCODED_HEADER_SIZE + sizeof(RecoveryJournalState7_0);
}
/**********************************************************************/
int encodeRecoveryJournal(RecoveryJournal *journal, Buffer *buffer)
{
SequenceNumber journalStart;
if (isSaved(&journal->state)) {
// If the journal is saved, we should start one past the active block
// (since the active block is not guaranteed to be empty).
journalStart = journal->tail;
} else {
// When we're merely suspended or have gone read-only, we must record the
// first block that might have entries that need to be applied.
journalStart = getRecoveryJournalHead(journal);
}
int result = encodeHeader(&RECOVERY_JOURNAL_HEADER_7_0, buffer);
if (result != UDS_SUCCESS) {
return result;
}
size_t initialLength = contentLength(buffer);
result = putUInt64LEIntoBuffer(buffer, journalStart);
if (result != UDS_SUCCESS) {
return result;
}
result = putUInt64LEIntoBuffer(buffer, journal->logicalBlocksUsed);
if (result != UDS_SUCCESS) {
return result;
}
result = putUInt64LEIntoBuffer(buffer, journal->blockMapDataBlocks);
if (result != UDS_SUCCESS) {
return result;
}
size_t encodedSize = contentLength(buffer) - initialLength;
return ASSERT(RECOVERY_JOURNAL_HEADER_7_0.size == encodedSize,
"encoded recovery journal component size"
" must match header size");
}
/**
* Decode recovery journal component state version 7.0 from a buffer.
*
* @param buffer A buffer positioned at the start of the encoding
* @param state The state structure to receive the decoded values
*
* @return UDS_SUCCESS or an error code
**/
static int decodeRecoveryJournalState_7_0(Buffer *buffer,
RecoveryJournalState7_0 *state)
{
size_t initialLength = contentLength(buffer);
SequenceNumber journalStart;
int result = getUInt64LEFromBuffer(buffer, &journalStart);
if (result != UDS_SUCCESS) {
return result;
}
BlockCount logicalBlocksUsed;
result = getUInt64LEFromBuffer(buffer, &logicalBlocksUsed);
if (result != UDS_SUCCESS) {
return result;
}
BlockCount blockMapDataBlocks;
result = getUInt64LEFromBuffer(buffer, &blockMapDataBlocks);
if (result != UDS_SUCCESS) {
return result;
}
*state = (RecoveryJournalState7_0) {
.journalStart = journalStart,
.logicalBlocksUsed = logicalBlocksUsed,
.blockMapDataBlocks = blockMapDataBlocks,
};
size_t decodedSize = initialLength - contentLength(buffer);
return ASSERT(RECOVERY_JOURNAL_HEADER_7_0.size == decodedSize,
"decoded slab depot component size must match header size");
}
/**********************************************************************/
int decodeRecoveryJournal(RecoveryJournal *journal, Buffer *buffer)
{
Header header;
int result = decodeHeader(buffer, &header);
if (result != VDO_SUCCESS) {
return result;
}
result = validateHeader(&RECOVERY_JOURNAL_HEADER_7_0, &header,
true, __func__);
if (result != VDO_SUCCESS) {
return result;
}
RecoveryJournalState7_0 state;
result = decodeRecoveryJournalState_7_0(buffer, &state);
if (result != VDO_SUCCESS) {
return result;
}
// Update recovery journal in-memory information.
setJournalTail(journal, state.journalStart);
journal->logicalBlocksUsed = state.logicalBlocksUsed;
journal->blockMapDataBlocks = state.blockMapDataBlocks;
initializeJournalState(journal);
// XXX: this is a hack until we make initial resume of a VDO a real resume
journal->state.state = ADMIN_STATE_SUSPENDED;
return VDO_SUCCESS;
}
/**********************************************************************/
int decodeSodiumRecoveryJournal(RecoveryJournal *journal, Buffer *buffer)
{
// Sodium uses version 7.0, same as head, currently.
return decodeRecoveryJournal(journal, buffer);
}
/**
* Advance the tail of the journal.
*
* @param journal The journal whose tail should be advanced
*
* @return true
if the tail was advanced
**/
static bool advanceTail(RecoveryJournal *journal)
{
journal->activeBlock = popFreeList(journal);
if (journal->activeBlock == NULL) {
return false;
}
pushRingNode(&journal->activeTailBlocks, &journal->activeBlock->ringNode);
initializeRecoveryBlock(journal->activeBlock);
setJournalTail(journal, journal->tail + 1);
advanceBlockMapEra(journal->blockMap, journal->tail);
return true;
}
/**
* Check whether there is space to make a given type of entry.
*
* @param journal The journal to check
* @param increment Set to true
if the desired entry is an
* increment
*
* @return true
if there is space in the journal to make an
* entry of the specified type
**/
static bool checkForEntrySpace(RecoveryJournal *journal, bool increment)
{
if (increment) {
return ((journal->availableSpace - journal->pendingDecrementCount) > 1);
}
return (journal->availableSpace > 0);
}
/**
* Prepare the currently active block to receive an entry and check whether
* an entry of the given type may be assigned at this time.
*
* @param journal The journal receiving an entry
* @param increment Set to true
if the desired entry is an
* increment
*
* @return true
if there is space in the journal to store an
* entry of the specified type
**/
static bool prepareToAssignEntry(RecoveryJournal *journal, bool increment)
{
if (!checkForEntrySpace(journal, increment)) {
if (!increment) {
// There must always be room to make a decrement entry.
logError("No space for decrement entry in recovery journal");
enterJournalReadOnlyMode(journal, VDO_RECOVERY_JOURNAL_FULL);
}
return false;
}
if (isRecoveryBlockFull(journal->activeBlock) && !advanceTail(journal)) {
return false;
}
if (!isRecoveryBlockEmpty(journal->activeBlock)) {
return true;
}
if ((journal->tail - getRecoveryJournalHead(journal)) > journal->size) {
// Cannot use this block since the journal is full.
journal->events.diskFull++;
return false;
}
/*
* Don't allow the new block to be reaped until all of its entries have been
* committed to the block map and until the journal block has been fully
* committed as well. Because the block map update is done only after any
* slab journal entries have been made, the per-entry lock for the block map
* entry serves to protect those as well.
*/
initializeLockCount(journal->lockCounter, journal->activeBlock->blockNumber,
journal->entriesPerBlock + 1);
return true;
}
/**********************************************************************/
static void writeBlocks(RecoveryJournal *journal);
/**
* Queue a block for writing. The block is expected to be full. If the block
* is currently writing, this is a noop as the block will be queued for
* writing when the write finishes. The block must not currently be queued
* for writing.
*
* @param journal The journal in question
* @param block The block which is now ready to write
**/
static void scheduleBlockWrite(RecoveryJournal *journal,
RecoveryJournalBlock *block)
{
if (block->committing) {
return;
}
int result = enqueueWaiter(&journal->pendingWrites, &block->writeWaiter);
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
return;
}
PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer;
if ((layer->getWritePolicy(layer) == WRITE_POLICY_ASYNC)) {
/*
* At the end of adding entries, or discovering this partial block
* is now full and ready to rewrite, we will call writeBlocks() and
* write a whole batch.
*/
return;
}
writeBlocks(journal);
}
/**
* Release a reference to a journal block.
*
* @param block The journal block from which to release a reference
**/
static void releaseJournalBlockReference(RecoveryJournalBlock *block)
{
releaseJournalZoneReference(block->journal->lockCounter, block->blockNumber);
}
/**
* Implements WaiterCallback. Assign an entry waiter to the active block.
**/
static void assignEntry(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
RecoveryJournalBlock *block = (RecoveryJournalBlock *) context;
RecoveryJournal *journal = block->journal;
// Record the point at which we will make the journal entry.
dataVIO->recoveryJournalPoint = (JournalPoint) {
.sequenceNumber = block->sequenceNumber,
.entryCount = block->entryCount,
};
switch (dataVIO->operation.type) {
case DATA_INCREMENT:
if (dataVIO->operation.state != MAPPING_STATE_UNMAPPED) {
journal->logicalBlocksUsed++;
}
journal->pendingDecrementCount++;
break;
case DATA_DECREMENT:
if (dataVIO->operation.state != MAPPING_STATE_UNMAPPED) {
journal->logicalBlocksUsed--;
}
// Per-entry locks need not be held for decrement entries since the lock
// held for the incref entry will protect this entry as well.
releaseJournalBlockReference(block);
ASSERT_LOG_ONLY((journal->pendingDecrementCount != 0),
"decrement follows increment");
journal->pendingDecrementCount--;
break;
case BLOCK_MAP_INCREMENT:
journal->blockMapDataBlocks++;
break;
default:
logError("Invalid journal operation %u", dataVIO->operation.type);
enterJournalReadOnlyMode(journal, VDO_NOT_IMPLEMENTED);
continueDataVIO(dataVIO, VDO_NOT_IMPLEMENTED);
return;
}
journal->availableSpace--;
int result = enqueueRecoveryBlockEntry(block, dataVIO);
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
continueDataVIO(dataVIO, result);
}
if (isRecoveryBlockFull(block)) {
// The block is full, so we can write it anytime henceforth. If it is
// already committing, we'll queue it for writing when it comes back.
scheduleBlockWrite(journal, block);
}
// Force out slab journal tail blocks when threshold is reached.
checkSlabJournalCommitThreshold(journal);
}
/**********************************************************************/
static bool assignEntriesFromQueue(RecoveryJournal *journal,
WaitQueue *queue,
bool increment)
{
while (hasWaiters(queue)) {
if (!prepareToAssignEntry(journal, increment)) {
return false;
}
notifyNextWaiter(queue, assignEntry, journal->activeBlock);
}
return true;
}
/**********************************************************************/
static void assignEntries(RecoveryJournal *journal)
{
if (journal->addingEntries) {
// Protect against re-entrancy.
return;
}
journal->addingEntries = true;
if (assignEntriesFromQueue(journal, &journal->decrementWaiters, false)) {
assignEntriesFromQueue(journal, &journal->incrementWaiters, true);
}
// Now that we've finished with entries, see if we have a batch of blocks to
// write.
writeBlocks(journal);
journal->addingEntries = false;
}
/**
* Prepare an in-memory journal block to be reused now that it has been fully
* committed.
*
* @param block The block to be recycled
**/
static void recycleJournalBlock(RecoveryJournalBlock *block)
{
RecoveryJournal *journal = block->journal;
pushRingNode(&journal->freeTailBlocks, &block->ringNode);
// Release any unused entry locks.
for (BlockCount i = block->entryCount; i < journal->entriesPerBlock; i++) {
releaseJournalBlockReference(block);
}
// Release our own lock against reaping now that the block is completely
// committed, or we're giving up because we're in read-only mode.
if (block->entryCount > 0) {
releaseJournalBlockReference(block);
}
if (block == journal->activeBlock) {
journal->activeBlock = NULL;
}
}
/**
* WaiterCallback implementation invoked whenever a VIO is to be released
* from the journal because its entry was committed to disk.
**/
static void continueCommittedWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
RecoveryJournal *journal = (RecoveryJournal *) context;
ASSERT_LOG_ONLY(beforeJournalPoint(&journal->commitPoint,
&dataVIO->recoveryJournalPoint),
"DataVIOs released from recovery journal in order. "
"Recovery journal point is (%llu, %" PRIu16 "), "
"but commit waiter point is (%llu, %" PRIu16 ")",
journal->commitPoint.sequenceNumber,
journal->commitPoint.entryCount,
dataVIO->recoveryJournalPoint.sequenceNumber,
dataVIO->recoveryJournalPoint.entryCount);
journal->commitPoint = dataVIO->recoveryJournalPoint;
int result
= (isReadOnly(journal->readOnlyNotifier) ? VDO_READ_ONLY : VDO_SUCCESS);
continueWaiter(waiter, &result);
}
/**
* Notify any VIOs whose entries have now committed.
*
* @param journal The recovery journal to update
**/
static void notifyCommitWaiters(RecoveryJournal *journal)
{
if (isRingEmpty(&journal->activeTailBlocks)) {
return;
}
for (RingNode *node = journal->activeTailBlocks.next;
node != &journal->activeTailBlocks;
node = node->next) {
RecoveryJournalBlock *block = blockFromRingNode(node);
if (block->committing) {
return;
}
notifyAllWaiters(&block->commitWaiters, continueCommittedWaiter, journal);
if (isReadOnly(journal->readOnlyNotifier)) {
notifyAllWaiters(&block->entryWaiters, continueCommittedWaiter, journal);
} else if (isRecoveryBlockDirty(block) || !isRecoveryBlockFull(block)) {
// Stop at partially-committed or partially-filled blocks.
return;
}
}
}
/**
* Recycle any journal blocks which have been fully committed.
*
* @param journal The recovery journal to update
**/
static void recycleJournalBlocks(RecoveryJournal *journal)
{
while (!isRingEmpty(&journal->activeTailBlocks)) {
RecoveryJournalBlock *block
= blockFromRingNode(journal->activeTailBlocks.next);
if (block->committing) {
// Don't recycle committing blocks.
return;
}
if (!isReadOnly(journal->readOnlyNotifier)
&& (isRecoveryBlockDirty(block)
|| !isRecoveryBlockFull(block))) {
// Don't recycle partially written or partially full
// blocks, except in read-only mode.
return;
}
recycleJournalBlock(block);
}
}
/**
* Handle post-commit processing. This is the callback registered by
* writeBlock(). If more entries accumulated in the block being committed while
* the commit was in progress, another commit will be initiated.
*
* @param completion The completion of the VIO writing this block
**/
static void completeWrite(VDOCompletion *completion)
{
RecoveryJournalBlock *block = completion->parent;
RecoveryJournal *journal = block->journal;
assertOnJournalThread(journal, __func__);
journal->pendingWriteCount -= 1;
journal->events.blocks.committed += 1;
journal->events.entries.committed += block->entriesInCommit;
block->uncommittedEntryCount -= block->entriesInCommit;
block->entriesInCommit = 0;
block->committing = false;
// If this block is the latest block to be acknowledged, record that fact.
if (block->sequenceNumber > journal->lastWriteAcknowledged) {
journal->lastWriteAcknowledged = block->sequenceNumber;
}
RecoveryJournalBlock *lastActiveBlock
= blockFromRingNode(journal->activeTailBlocks.next);
ASSERT_LOG_ONLY((block->sequenceNumber >= lastActiveBlock->sequenceNumber),
"completed journal write is still active");
notifyCommitWaiters(journal);
// Is this block now full? Reaping, and adding entries, might have already
// sent it off for rewriting; else, queue it for rewrite.
if (isRecoveryBlockDirty(block) && isRecoveryBlockFull(block)) {
scheduleBlockWrite(journal, block);
}
recycleJournalBlocks(journal);
writeBlocks(journal);
checkForDrainComplete(journal);
}
/**********************************************************************/
static void handleWriteError(VDOCompletion *completion)
{
RecoveryJournalBlock *block = completion->parent;
RecoveryJournal *journal = block->journal;
logErrorWithStringError(completion->result,
"cannot write recovery journal block %llu",
block->sequenceNumber);
enterJournalReadOnlyMode(journal, completion->result);
completeWrite(completion);
}
/**
* Issue a block for writing. Implements WaiterCallback.
**/
static void writeBlock(Waiter *waiter, void *context __attribute__((unused)))
{
RecoveryJournalBlock *block = blockFromWaiter(waiter);
if (isReadOnly(block->journal->readOnlyNotifier)) {
return;
}
int result = commitRecoveryBlock(block, completeWrite, handleWriteError);
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(block->journal, result);
}
}
/**
* Attempt to commit blocks, according to write policy.
*
* @param journal The recovery journal
**/
static void writeBlocks(RecoveryJournal *journal)
{
assertOnJournalThread(journal, __func__);
/*
* In sync and async-unsafe modes, we call this function each time we queue
* a full block on pending writes; in addition, in all cases we call this
* function after adding entries to the journal and finishing a block write.
* Thus, when this function terminates we must either have no VIOs waiting
* in the journal or have some outstanding IO to provide a future wakeup.
*
* In all modes, if there are no outstanding writes and some unwritten
* entries, we must issue a block, even if it's the active block and it
* isn't full. Otherwise, in sync/async-unsafe modes, we want to issue
* all full blocks every time; since we call it each time we fill a block,
* this is equivalent to issuing every full block as soon as its full. In
* async mode, we want to only issue full blocks if there are no
* pending writes.
*/
PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer;
if ((layer->getWritePolicy(layer) != WRITE_POLICY_ASYNC)
|| (journal->pendingWriteCount == 0)) {
// Write all the full blocks.
notifyAllWaiters(&journal->pendingWrites, writeBlock, NULL);
}
// Do we need to write the active block? Only if we have no outstanding
// writes, even after issuing all of the full writes.
if ((journal->pendingWriteCount == 0)
&& canCommitRecoveryBlock(journal->activeBlock)) {
writeBlock(&journal->activeBlock->writeWaiter, NULL);
}
}
/**********************************************************************/
void addRecoveryJournalEntry(RecoveryJournal *journal, DataVIO *dataVIO)
{
assertOnJournalThread(journal, __func__);
if (!isNormal(&journal->state)) {
continueDataVIO(dataVIO, VDO_INVALID_ADMIN_STATE);
return;
}
if (isReadOnly(journal->readOnlyNotifier)) {
continueDataVIO(dataVIO, VDO_READ_ONLY);
return;
}
bool increment = isIncrementOperation(dataVIO->operation.type);
ASSERT_LOG_ONLY((!increment || (dataVIO->recoverySequenceNumber == 0)),
"journal lock not held for increment");
advanceJournalPoint(&journal->appendPoint, journal->entriesPerBlock);
int result = enqueueDataVIO((increment
? &journal->incrementWaiters
: &journal->decrementWaiters), dataVIO,
THIS_LOCATION("$F($j-$js);io=journal($j-$js)"));
if (result != VDO_SUCCESS) {
enterJournalReadOnlyMode(journal, result);
continueDataVIO(dataVIO, result);
return;
}
assignEntries(journal);
}
/**
* Conduct a sweep on a recovery journal to reclaim unreferenced blocks.
*
* @param journal The recovery journal
**/
static void reapRecoveryJournal(RecoveryJournal *journal)
{
if (journal->reaping) {
// We already have an outstanding reap in progress. We need to wait for it
// to finish.
return;
}
if (isQuiescent(&journal->state)) {
// We are supposed to not do IO. Don't botch it by reaping.
return;
}
// Start reclaiming blocks only when the journal head has no references. Then
// stop when a block is referenced.
while ((journal->blockMapReapHead < journal->lastWriteAcknowledged)
&& !isLocked(journal->lockCounter, journal->blockMapHeadBlockNumber,
ZONE_TYPE_LOGICAL)) {
journal->blockMapReapHead++;
if (++journal->blockMapHeadBlockNumber == journal->size) {
journal->blockMapHeadBlockNumber = 0;
}
}
while ((journal->slabJournalReapHead < journal->lastWriteAcknowledged)
&& !isLocked(journal->lockCounter,
journal->slabJournalHeadBlockNumber,
ZONE_TYPE_PHYSICAL)) {
journal->slabJournalReapHead++;
if (++journal->slabJournalHeadBlockNumber == journal->size) {
journal->slabJournalHeadBlockNumber = 0;
}
}
if ((journal->blockMapReapHead == journal->blockMapHead)
&& (journal->slabJournalReapHead == journal->slabJournalHead)) {
// Nothing happened.
return;
}
PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer;
if (layer->getWritePolicy(layer) != WRITE_POLICY_SYNC) {
/*
* If the block map head will advance, we must flush any block map page
* modified by the entries we are reaping. If the slab journal head will
* advance, we must flush the slab summary update covering the slab journal
* that just released some lock.
*
* In sync mode, this is unnecessary because we won't record these numbers
* on disk until the next journal block write, and in sync mode every
* journal block write is preceded by a flush, which does the block map
* page and slab summary update flushing itself.
*/
journal->reaping = true;
launchFlush(journal->flushVIO, completeReaping, handleFlushError);
return;
}
finishReaping(journal);
}
/**********************************************************************/
void acquireRecoveryJournalBlockReference(RecoveryJournal *journal,
SequenceNumber sequenceNumber,
ZoneType zoneType,
ZoneCount zoneID)
{
if (sequenceNumber == 0) {
return;
}
BlockCount blockNumber
= getRecoveryJournalBlockNumber(journal, sequenceNumber);
acquireLockCountReference(journal->lockCounter, blockNumber, zoneType,
zoneID);
}
/**********************************************************************/
void releaseRecoveryJournalBlockReference(RecoveryJournal *journal,
SequenceNumber sequenceNumber,
ZoneType zoneType,
ZoneCount zoneID)
{
if (sequenceNumber == 0) {
return;
}
BlockCount blockNumber
= getRecoveryJournalBlockNumber(journal, sequenceNumber);
releaseLockCountReference(journal->lockCounter, blockNumber, zoneType,
zoneID);
}
/**********************************************************************/
void releasePerEntryLockFromOtherZone(RecoveryJournal *journal,
SequenceNumber sequenceNumber)
{
if (sequenceNumber == 0) {
return;
}
BlockCount blockNumber
= getRecoveryJournalBlockNumber(journal, sequenceNumber);
releaseJournalZoneReferenceFromOtherZone(journal->lockCounter, blockNumber);
}
/**
* Initiate a drain.
*
* Implements AdminInitiator.
**/
static void initiateDrain(AdminState *state)
{
checkForDrainComplete(container_of(state, RecoveryJournal, state));
}
/**********************************************************************/
void drainRecoveryJournal(RecoveryJournal *journal,
AdminStateCode operation,
VDOCompletion *parent)
{
assertOnJournalThread(journal, __func__);
startDraining(&journal->state, operation, parent, initiateDrain);
}
/**********************************************************************/
void resumeRecoveryJournal(RecoveryJournal *journal, VDOCompletion *parent)
{
assertOnJournalThread(journal, __func__);
bool saved = isSaved(&journal->state);
setCompletionResult(parent, resumeIfQuiescent(&journal->state));
if (isReadOnly(journal->readOnlyNotifier)) {
finishCompletion(parent, VDO_READ_ONLY);
return;
}
if (saved) {
initializeJournalState(journal);
}
if (resumeLockCounter(journal->lockCounter)) {
// We might have missed a notification.
reapRecoveryJournal(journal);
}
completeCompletion(parent);
}
/**********************************************************************/
BlockCount getJournalLogicalBlocksUsed(const RecoveryJournal *journal)
{
return journal->logicalBlocksUsed;
}
/**********************************************************************/
RecoveryJournalStatistics
getRecoveryJournalStatistics(const RecoveryJournal *journal)
{
return journal->events;
}
/**********************************************************************/
void dumpRecoveryJournalStatistics(const RecoveryJournal *journal)
{
RecoveryJournalStatistics stats = getRecoveryJournalStatistics(journal);
logInfo("Recovery Journal");
logInfo(" blockMapHead=%llu slabJournalHead=%" PRIu64
" lastWriteAcknowledged=%llu tail=%" PRIu64
" blockMapReapHead=%llu slabJournalReapHead=%" PRIu64
" diskFull=%llu slabJournalCommitsRequested=%" PRIu64
" incrementWaiters=%zu decrementWaiters=%zu",
journal->blockMapHead, journal->slabJournalHead,
journal->lastWriteAcknowledged, journal->tail,
journal->blockMapReapHead, journal->slabJournalReapHead,
stats.diskFull, stats.slabJournalCommitsRequested,
countWaiters(&journal->incrementWaiters),
countWaiters(&journal->decrementWaiters));
logInfo(" entries: started=%llu written=%llu committed=%"
PRIu64,
stats.entries.started, stats.entries.written,
stats.entries.committed);
logInfo(" blocks: started=%llu written=%llu committed=%"
PRIu64,
stats.blocks.started, stats.blocks.written,
stats.blocks.committed);
logInfo(" active blocks:");
const RingNode *head = &journal->activeTailBlocks;
for (RingNode *node = head->next; node != head; node = node->next) {
dumpRecoveryBlock(blockFromRingNode(node));
}
}