/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/recoveryJournal.c#30 $ */ #include "recoveryJournal.h" #include "recoveryJournalInternals.h" #include "buffer.h" #include "logger.h" #include "memoryAlloc.h" #include "blockMap.h" #include "constants.h" #include "dataVIO.h" #include "extent.h" #include "header.h" #include "numUtils.h" #include "packedRecoveryJournalBlock.h" #include "recoveryJournalBlock.h" #include "slabDepot.h" #include "slabJournal.h" #include "waitQueue.h" typedef struct { SequenceNumber journalStart; // Sequence number to start the journal BlockCount logicalBlocksUsed; // Number of logical blocks used by VDO BlockCount blockMapDataBlocks; // Number of block map pages allocated } __attribute__((packed)) RecoveryJournalState7_0; static const Header RECOVERY_JOURNAL_HEADER_7_0 = { .id = RECOVERY_JOURNAL, .version = { .majorVersion = 7, .minorVersion = 0, }, .size = sizeof(RecoveryJournalState7_0), }; static const uint64_t RECOVERY_COUNT_MASK = 0xff; enum { /* * The number of reserved blocks must be large enough to prevent a * new recovery journal block write from overwriting a block which * appears to still be a valid head block of the journal. Currently, * that means reserving enough space for all 2048 VIOs, or 8 blocks. */ RECOVERY_JOURNAL_RESERVED_BLOCKS = 8, }; /**********************************************************************/ const char *getJournalOperationName(JournalOperation operation) { switch (operation) { case DATA_DECREMENT: return "data decrement"; case DATA_INCREMENT: return "data increment"; case BLOCK_MAP_DECREMENT: return "block map decrement"; case BLOCK_MAP_INCREMENT: return "block map increment"; default: return "unknown journal operation"; } } /** * Get a block from the end of the free list. * * @param journal The journal * * @return The block or NULL if the list is empty **/ static RecoveryJournalBlock *popFreeList(RecoveryJournal *journal) { return blockFromRingNode(popRingNode(&journal->freeTailBlocks)); } /** * Get a block from the end of the active list. * * @param journal The journal * * @return The block or NULL if the list is empty **/ static RecoveryJournalBlock *popActiveList(RecoveryJournal *journal) { return blockFromRingNode(popRingNode(&journal->activeTailBlocks)); } /** * Assert that we are running on the journal thread. * * @param journal The journal * @param functionName The function doing the check (for logging) **/ static void assertOnJournalThread(RecoveryJournal *journal, const char *functionName) { ASSERT_LOG_ONLY((getCallbackThreadID() == journal->threadID), "%s() called on journal thread", functionName); } /** * WaiterCallback implementation invoked whenever a DataVIO is to be released * from the journal, either because its entry was committed to disk, * or because there was an error. **/ static void continueWaiter(Waiter *waiter, void *context) { DataVIO *dataVIO = waiterAsDataVIO(waiter); dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F($j-$js);" "cb=continueJournalWaiter($j-$js)")); int waitResult = *((int *) context); continueDataVIO(dataVIO, waitResult); } /** * Check whether the journal has any waiters on any blocks. * * @param journal The journal in question * * @return true if any block has a waiter **/ static inline bool hasBlockWaiters(RecoveryJournal *journal) { // Either the first active tail block (if it exists) has waiters, // or no active tail block has waiters. if (isRingEmpty(&journal->activeTailBlocks)) { return false; } RecoveryJournalBlock *block = blockFromRingNode(journal->activeTailBlocks.next); return (hasWaiters(&block->entryWaiters) || hasWaiters(&block->commitWaiters)); } /**********************************************************************/ static void recycleJournalBlocks(RecoveryJournal *block); static void recycleJournalBlock(RecoveryJournalBlock *block); static void notifyCommitWaiters(RecoveryJournal *journal); /** * Check whether the journal has drained. * * @param journal The journal which may have just drained **/ static void checkForDrainComplete(RecoveryJournal *journal) { int result = VDO_SUCCESS; if (isReadOnly(journal->readOnlyNotifier)) { result = VDO_READ_ONLY; /* * Clean up any full active blocks which were not written due to being * in read-only mode. * * XXX: This would probably be better as a short-circuit in writeBlock(). */ notifyCommitWaiters(journal); recycleJournalBlocks(journal); // Release any DataVIOs waiting to be assigned entries. notifyAllWaiters(&journal->decrementWaiters, continueWaiter, &result); notifyAllWaiters(&journal->incrementWaiters, continueWaiter, &result); } if (!isDraining(&journal->state) || journal->reaping || hasBlockWaiters(journal) || hasWaiters(&journal->incrementWaiters) || hasWaiters(&journal->decrementWaiters) || !suspendLockCounter(journal->lockCounter)) { return; } if (isSaving(&journal->state)) { if (journal->activeBlock != NULL) { ASSERT_LOG_ONLY(((result == VDO_READ_ONLY) || !isRecoveryBlockDirty(journal->activeBlock)), "journal being saved has clean active block"); recycleJournalBlock(journal->activeBlock); } ASSERT_LOG_ONLY(isRingEmpty(&journal->activeTailBlocks), "all blocks in a journal being saved must be inactive"); } finishDrainingWithResult(&journal->state, result); } /** * Notifiy a recovery journal that the VDO has gone read-only. * *

Implements ReadOnlyNotification. * * @param listener The journal * @param parent The completion to notify in order to acknowledge the * notification **/ static void notifyRecoveryJournalOfReadOnlyMode(void *listener, VDOCompletion *parent) { checkForDrainComplete(listener); completeCompletion(parent); } /** * Put the journal in read-only mode. All attempts to add entries after * this function is called will fail. All VIOs waiting for commits will be * awakened with an error. * * @param journal The journal which has failed * @param errorCode The error result triggering this call **/ static void enterJournalReadOnlyMode(RecoveryJournal *journal, int errorCode) { enterReadOnlyMode(journal->readOnlyNotifier, errorCode); checkForDrainComplete(journal); } /**********************************************************************/ SequenceNumber getCurrentJournalSequenceNumber(RecoveryJournal *journal) { return journal->tail; } /** * Get the head of the recovery journal, which is the lowest sequence number of * the block map head and the slab journal head. * * @param journal The journal * * @return the head of the journal **/ static inline SequenceNumber getRecoveryJournalHead(RecoveryJournal *journal) { return minSequenceNumber(journal->blockMapHead, journal->slabJournalHead); } /** * Compute the recovery count byte for a given recovery count. * * @param recoveryCount The recovery count * * @return The byte corresponding to the recovery count **/ __attribute__((warn_unused_result)) static inline uint8_t computeRecoveryCountByte(uint64_t recoveryCount) { return (uint8_t) (recoveryCount & RECOVERY_COUNT_MASK); } /** * Check whether the journal is over the threshold, and if so, force the oldest * slab journal tail block to commit. * * @param journal The journal **/ static void checkSlabJournalCommitThreshold(RecoveryJournal *journal) { BlockCount currentLength = journal->tail - journal->slabJournalHead; if (currentLength > journal->slabJournalCommitThreshold) { journal->events.slabJournalCommitsRequested++; commitOldestSlabJournalTailBlocks(journal->depot, journal->slabJournalHead); } } /**********************************************************************/ static void reapRecoveryJournal(RecoveryJournal *journal); static void assignEntries(RecoveryJournal *journal); /** * Finish reaping the journal. * * @param journal The journal being reaped **/ static void finishReaping(RecoveryJournal *journal) { SequenceNumber oldHead = getRecoveryJournalHead(journal); journal->blockMapHead = journal->blockMapReapHead; journal->slabJournalHead = journal->slabJournalReapHead; BlockCount blocksReaped = getRecoveryJournalHead(journal) - oldHead; journal->availableSpace += blocksReaped * journal->entriesPerBlock; journal->reaping = false; checkSlabJournalCommitThreshold(journal); assignEntries(journal); checkForDrainComplete(journal); } /** * Finish reaping the journal after flushing the lower layer. This is the * callback registered in reapRecoveryJournal(). * * @param completion The journal's flush VIO **/ static void completeReaping(VDOCompletion *completion) { RecoveryJournal *journal = completion->parent; finishReaping(journal); // Try reaping again in case more locks were released while flush was out. reapRecoveryJournal(journal); } /** * Handle an error when flushing the lower layer due to reaping. * * @param completion The journal's flush VIO **/ static void handleFlushError(VDOCompletion *completion) { RecoveryJournal *journal = completion->parent; journal->reaping = false; enterJournalReadOnlyMode(journal, completion->result); } /** * Set all journal fields appropriately to start journaling from the current * active block. * * @param journal The journal to be reset based on its active block **/ static void initializeJournalState(RecoveryJournal *journal) { journal->appendPoint.sequenceNumber = journal->tail; journal->lastWriteAcknowledged = journal->tail; journal->blockMapHead = journal->tail; journal->slabJournalHead = journal->tail; journal->blockMapReapHead = journal->tail; journal->slabJournalReapHead = journal->tail; journal->blockMapHeadBlockNumber = getRecoveryJournalBlockNumber(journal, journal->blockMapHead); journal->slabJournalHeadBlockNumber = getRecoveryJournalBlockNumber(journal, journal->slabJournalHead); } /**********************************************************************/ BlockCount getRecoveryJournalLength(BlockCount journalSize) { BlockCount reservedBlocks = journalSize / 4; if (reservedBlocks > RECOVERY_JOURNAL_RESERVED_BLOCKS) { reservedBlocks = RECOVERY_JOURNAL_RESERVED_BLOCKS; } return (journalSize - reservedBlocks); } /** * Attempt to reap the journal now that all the locks on some journal block * have been released. This is the callback registered with the lock counter. * * @param completion The lock counter completion **/ static void reapRecoveryJournalCallback(VDOCompletion *completion) { RecoveryJournal *journal = (RecoveryJournal *) completion->parent; // The acknowledgement must be done before reaping so that there is no // race between acknowledging the notification and unlocks wishing to notify. acknowledgeUnlock(journal->lockCounter); if (isQuiescing(&journal->state)) { // Don't start reaping when the journal is trying to quiesce. Do check if // this notification is the last thing the drain is waiting on. checkForDrainComplete(journal); return; } reapRecoveryJournal(journal); checkSlabJournalCommitThreshold(journal); } /********************************************************************** * Set the journal's tail sequence number. * * @param journal The journal whose tail is to be set * @param tail The new tail value **/ static void setJournalTail(RecoveryJournal *journal, SequenceNumber tail) { // VDO does not support sequence numbers above 1 << 48 in the slab journal. if (tail >= (1ULL << 48)) { enterJournalReadOnlyMode(journal, VDO_JOURNAL_OVERFLOW); } journal->tail = tail; } /**********************************************************************/ int makeRecoveryJournal(Nonce nonce, PhysicalLayer *layer, Partition *partition, uint64_t recoveryCount, BlockCount journalSize, BlockCount tailBufferSize, ReadOnlyNotifier *readOnlyNotifier, const ThreadConfig *threadConfig, RecoveryJournal **journalPtr) { RecoveryJournal *journal; int result = ALLOCATE(1, RecoveryJournal, __func__, &journal); if (result != VDO_SUCCESS) { return result; } initializeRing(&journal->freeTailBlocks); initializeRing(&journal->activeTailBlocks); initializeWaitQueue(&journal->pendingWrites); journal->threadID = getJournalZoneThread(threadConfig); journal->partition = partition; journal->nonce = nonce; journal->recoveryCount = computeRecoveryCountByte(recoveryCount); journal->size = journalSize; journal->readOnlyNotifier = readOnlyNotifier; journal->tail = 1; journal->slabJournalCommitThreshold = (journalSize * 2) / 3; initializeJournalState(journal); journal->entriesPerBlock = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK; BlockCount journalLength = getRecoveryJournalLength(journalSize); journal->availableSpace = journal->entriesPerBlock * journalLength; // Only make the tail buffer and VIO in normal operation since the formatter // doesn't need them. if (layer->createMetadataVIO != NULL) { for (BlockCount i = 0; i < tailBufferSize; i++) { RecoveryJournalBlock *block; result = makeRecoveryBlock(layer, journal, &block); if (result != VDO_SUCCESS) { freeRecoveryJournal(&journal); return result; } pushRingNode(&journal->freeTailBlocks, &block->ringNode); } result = makeLockCounter(layer, journal, reapRecoveryJournalCallback, journal->threadID, threadConfig->logicalZoneCount, threadConfig->physicalZoneCount, journal->size, &journal->lockCounter); if (result != VDO_SUCCESS) { freeRecoveryJournal(&journal); return result; } result = ALLOCATE(VDO_BLOCK_SIZE, char, "journal flush data", &journal->unusedFlushVIOData); if (result != VDO_SUCCESS) { freeRecoveryJournal(&journal); return result; } result = createVIO(layer, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH, journal, journal->unusedFlushVIOData, &journal->flushVIO); if (result != VDO_SUCCESS) { freeRecoveryJournal(&journal); return result; } result = registerReadOnlyListener(readOnlyNotifier, journal, notifyRecoveryJournalOfReadOnlyMode, journal->threadID); if (result != VDO_SUCCESS) { freeRecoveryJournal(&journal); return result; } journal->flushVIO->completion.callbackThreadID = journal->threadID; } *journalPtr = journal; return VDO_SUCCESS; } /**********************************************************************/ void freeRecoveryJournal(RecoveryJournal **journalPtr) { RecoveryJournal *journal = *journalPtr; if (journal == NULL) { return; } freeLockCounter(&journal->lockCounter); freeVIO(&journal->flushVIO); FREE(journal->unusedFlushVIOData); // XXX: eventually, the journal should be constructed in a quiescent state // which requires opening before use. if (!isQuiescent(&journal->state)) { ASSERT_LOG_ONLY(isRingEmpty(&journal->activeTailBlocks), "journal being freed has no active tail blocks"); } else if (!isSaved(&journal->state) && !isRingEmpty(&journal->activeTailBlocks)) { logWarning("journal being freed has uncommited entries"); } RecoveryJournalBlock *block; while ((block = popActiveList(journal)) != NULL) { freeRecoveryBlock(&block); } while ((block = popFreeList(journal)) != NULL) { freeRecoveryBlock(&block); } FREE(journal); *journalPtr = NULL; } /**********************************************************************/ void setRecoveryJournalPartition(RecoveryJournal *journal, Partition *partition) { journal->partition = partition; } /**********************************************************************/ void initializeRecoveryJournalPostRecovery(RecoveryJournal *journal, uint64_t recoveryCount, SequenceNumber tail) { setJournalTail(journal, tail + 1); journal->recoveryCount = computeRecoveryCountByte(recoveryCount); initializeJournalState(journal); } /**********************************************************************/ void initializeRecoveryJournalPostRebuild(RecoveryJournal *journal, uint64_t recoveryCount, SequenceNumber tail, BlockCount logicalBlocksUsed, BlockCount blockMapDataBlocks) { initializeRecoveryJournalPostRecovery(journal, recoveryCount, tail); journal->logicalBlocksUsed = logicalBlocksUsed; journal->blockMapDataBlocks = blockMapDataBlocks; } /**********************************************************************/ BlockCount getJournalBlockMapDataBlocksUsed(RecoveryJournal *journal) { return journal->blockMapDataBlocks; } /**********************************************************************/ void setJournalBlockMapDataBlocksUsed(RecoveryJournal *journal, BlockCount pages) { journal->blockMapDataBlocks = pages; } /**********************************************************************/ ThreadID getRecoveryJournalThreadID(RecoveryJournal *journal) { return journal->threadID; } /**********************************************************************/ void openRecoveryJournal(RecoveryJournal *journal, SlabDepot *depot, BlockMap *blockMap) { journal->depot = depot; journal->blockMap = blockMap; journal->state.state = ADMIN_STATE_NORMAL_OPERATION; } /**********************************************************************/ size_t getRecoveryJournalEncodedSize(void) { return ENCODED_HEADER_SIZE + sizeof(RecoveryJournalState7_0); } /**********************************************************************/ int encodeRecoveryJournal(RecoveryJournal *journal, Buffer *buffer) { SequenceNumber journalStart; if (isSaved(&journal->state)) { // If the journal is saved, we should start one past the active block // (since the active block is not guaranteed to be empty). journalStart = journal->tail; } else { // When we're merely suspended or have gone read-only, we must record the // first block that might have entries that need to be applied. journalStart = getRecoveryJournalHead(journal); } int result = encodeHeader(&RECOVERY_JOURNAL_HEADER_7_0, buffer); if (result != UDS_SUCCESS) { return result; } size_t initialLength = contentLength(buffer); result = putUInt64LEIntoBuffer(buffer, journalStart); if (result != UDS_SUCCESS) { return result; } result = putUInt64LEIntoBuffer(buffer, journal->logicalBlocksUsed); if (result != UDS_SUCCESS) { return result; } result = putUInt64LEIntoBuffer(buffer, journal->blockMapDataBlocks); if (result != UDS_SUCCESS) { return result; } size_t encodedSize = contentLength(buffer) - initialLength; return ASSERT(RECOVERY_JOURNAL_HEADER_7_0.size == encodedSize, "encoded recovery journal component size" " must match header size"); } /** * Decode recovery journal component state version 7.0 from a buffer. * * @param buffer A buffer positioned at the start of the encoding * @param state The state structure to receive the decoded values * * @return UDS_SUCCESS or an error code **/ static int decodeRecoveryJournalState_7_0(Buffer *buffer, RecoveryJournalState7_0 *state) { size_t initialLength = contentLength(buffer); SequenceNumber journalStart; int result = getUInt64LEFromBuffer(buffer, &journalStart); if (result != UDS_SUCCESS) { return result; } BlockCount logicalBlocksUsed; result = getUInt64LEFromBuffer(buffer, &logicalBlocksUsed); if (result != UDS_SUCCESS) { return result; } BlockCount blockMapDataBlocks; result = getUInt64LEFromBuffer(buffer, &blockMapDataBlocks); if (result != UDS_SUCCESS) { return result; } *state = (RecoveryJournalState7_0) { .journalStart = journalStart, .logicalBlocksUsed = logicalBlocksUsed, .blockMapDataBlocks = blockMapDataBlocks, }; size_t decodedSize = initialLength - contentLength(buffer); return ASSERT(RECOVERY_JOURNAL_HEADER_7_0.size == decodedSize, "decoded slab depot component size must match header size"); } /**********************************************************************/ int decodeRecoveryJournal(RecoveryJournal *journal, Buffer *buffer) { Header header; int result = decodeHeader(buffer, &header); if (result != VDO_SUCCESS) { return result; } result = validateHeader(&RECOVERY_JOURNAL_HEADER_7_0, &header, true, __func__); if (result != VDO_SUCCESS) { return result; } RecoveryJournalState7_0 state; result = decodeRecoveryJournalState_7_0(buffer, &state); if (result != VDO_SUCCESS) { return result; } // Update recovery journal in-memory information. setJournalTail(journal, state.journalStart); journal->logicalBlocksUsed = state.logicalBlocksUsed; journal->blockMapDataBlocks = state.blockMapDataBlocks; initializeJournalState(journal); // XXX: this is a hack until we make initial resume of a VDO a real resume journal->state.state = ADMIN_STATE_SUSPENDED; return VDO_SUCCESS; } /**********************************************************************/ int decodeSodiumRecoveryJournal(RecoveryJournal *journal, Buffer *buffer) { // Sodium uses version 7.0, same as head, currently. return decodeRecoveryJournal(journal, buffer); } /** * Advance the tail of the journal. * * @param journal The journal whose tail should be advanced * * @return true if the tail was advanced **/ static bool advanceTail(RecoveryJournal *journal) { journal->activeBlock = popFreeList(journal); if (journal->activeBlock == NULL) { return false; } pushRingNode(&journal->activeTailBlocks, &journal->activeBlock->ringNode); initializeRecoveryBlock(journal->activeBlock); setJournalTail(journal, journal->tail + 1); advanceBlockMapEra(journal->blockMap, journal->tail); return true; } /** * Check whether there is space to make a given type of entry. * * @param journal The journal to check * @param increment Set to true if the desired entry is an * increment * * @return true if there is space in the journal to make an * entry of the specified type **/ static bool checkForEntrySpace(RecoveryJournal *journal, bool increment) { if (increment) { return ((journal->availableSpace - journal->pendingDecrementCount) > 1); } return (journal->availableSpace > 0); } /** * Prepare the currently active block to receive an entry and check whether * an entry of the given type may be assigned at this time. * * @param journal The journal receiving an entry * @param increment Set to true if the desired entry is an * increment * * @return true if there is space in the journal to store an * entry of the specified type **/ static bool prepareToAssignEntry(RecoveryJournal *journal, bool increment) { if (!checkForEntrySpace(journal, increment)) { if (!increment) { // There must always be room to make a decrement entry. logError("No space for decrement entry in recovery journal"); enterJournalReadOnlyMode(journal, VDO_RECOVERY_JOURNAL_FULL); } return false; } if (isRecoveryBlockFull(journal->activeBlock) && !advanceTail(journal)) { return false; } if (!isRecoveryBlockEmpty(journal->activeBlock)) { return true; } if ((journal->tail - getRecoveryJournalHead(journal)) > journal->size) { // Cannot use this block since the journal is full. journal->events.diskFull++; return false; } /* * Don't allow the new block to be reaped until all of its entries have been * committed to the block map and until the journal block has been fully * committed as well. Because the block map update is done only after any * slab journal entries have been made, the per-entry lock for the block map * entry serves to protect those as well. */ initializeLockCount(journal->lockCounter, journal->activeBlock->blockNumber, journal->entriesPerBlock + 1); return true; } /**********************************************************************/ static void writeBlocks(RecoveryJournal *journal); /** * Queue a block for writing. The block is expected to be full. If the block * is currently writing, this is a noop as the block will be queued for * writing when the write finishes. The block must not currently be queued * for writing. * * @param journal The journal in question * @param block The block which is now ready to write **/ static void scheduleBlockWrite(RecoveryJournal *journal, RecoveryJournalBlock *block) { if (block->committing) { return; } int result = enqueueWaiter(&journal->pendingWrites, &block->writeWaiter); if (result != VDO_SUCCESS) { enterJournalReadOnlyMode(journal, result); return; } PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer; if ((layer->getWritePolicy(layer) == WRITE_POLICY_ASYNC)) { /* * At the end of adding entries, or discovering this partial block * is now full and ready to rewrite, we will call writeBlocks() and * write a whole batch. */ return; } writeBlocks(journal); } /** * Release a reference to a journal block. * * @param block The journal block from which to release a reference **/ static void releaseJournalBlockReference(RecoveryJournalBlock *block) { releaseJournalZoneReference(block->journal->lockCounter, block->blockNumber); } /** * Implements WaiterCallback. Assign an entry waiter to the active block. **/ static void assignEntry(Waiter *waiter, void *context) { DataVIO *dataVIO = waiterAsDataVIO(waiter); RecoveryJournalBlock *block = (RecoveryJournalBlock *) context; RecoveryJournal *journal = block->journal; // Record the point at which we will make the journal entry. dataVIO->recoveryJournalPoint = (JournalPoint) { .sequenceNumber = block->sequenceNumber, .entryCount = block->entryCount, }; switch (dataVIO->operation.type) { case DATA_INCREMENT: if (dataVIO->operation.state != MAPPING_STATE_UNMAPPED) { journal->logicalBlocksUsed++; } journal->pendingDecrementCount++; break; case DATA_DECREMENT: if (dataVIO->operation.state != MAPPING_STATE_UNMAPPED) { journal->logicalBlocksUsed--; } // Per-entry locks need not be held for decrement entries since the lock // held for the incref entry will protect this entry as well. releaseJournalBlockReference(block); ASSERT_LOG_ONLY((journal->pendingDecrementCount != 0), "decrement follows increment"); journal->pendingDecrementCount--; break; case BLOCK_MAP_INCREMENT: journal->blockMapDataBlocks++; break; default: logError("Invalid journal operation %u", dataVIO->operation.type); enterJournalReadOnlyMode(journal, VDO_NOT_IMPLEMENTED); continueDataVIO(dataVIO, VDO_NOT_IMPLEMENTED); return; } journal->availableSpace--; int result = enqueueRecoveryBlockEntry(block, dataVIO); if (result != VDO_SUCCESS) { enterJournalReadOnlyMode(journal, result); continueDataVIO(dataVIO, result); } if (isRecoveryBlockFull(block)) { // The block is full, so we can write it anytime henceforth. If it is // already committing, we'll queue it for writing when it comes back. scheduleBlockWrite(journal, block); } // Force out slab journal tail blocks when threshold is reached. checkSlabJournalCommitThreshold(journal); } /**********************************************************************/ static bool assignEntriesFromQueue(RecoveryJournal *journal, WaitQueue *queue, bool increment) { while (hasWaiters(queue)) { if (!prepareToAssignEntry(journal, increment)) { return false; } notifyNextWaiter(queue, assignEntry, journal->activeBlock); } return true; } /**********************************************************************/ static void assignEntries(RecoveryJournal *journal) { if (journal->addingEntries) { // Protect against re-entrancy. return; } journal->addingEntries = true; if (assignEntriesFromQueue(journal, &journal->decrementWaiters, false)) { assignEntriesFromQueue(journal, &journal->incrementWaiters, true); } // Now that we've finished with entries, see if we have a batch of blocks to // write. writeBlocks(journal); journal->addingEntries = false; } /** * Prepare an in-memory journal block to be reused now that it has been fully * committed. * * @param block The block to be recycled **/ static void recycleJournalBlock(RecoveryJournalBlock *block) { RecoveryJournal *journal = block->journal; pushRingNode(&journal->freeTailBlocks, &block->ringNode); // Release any unused entry locks. for (BlockCount i = block->entryCount; i < journal->entriesPerBlock; i++) { releaseJournalBlockReference(block); } // Release our own lock against reaping now that the block is completely // committed, or we're giving up because we're in read-only mode. if (block->entryCount > 0) { releaseJournalBlockReference(block); } if (block == journal->activeBlock) { journal->activeBlock = NULL; } } /** * WaiterCallback implementation invoked whenever a VIO is to be released * from the journal because its entry was committed to disk. **/ static void continueCommittedWaiter(Waiter *waiter, void *context) { DataVIO *dataVIO = waiterAsDataVIO(waiter); RecoveryJournal *journal = (RecoveryJournal *) context; ASSERT_LOG_ONLY(beforeJournalPoint(&journal->commitPoint, &dataVIO->recoveryJournalPoint), "DataVIOs released from recovery journal in order. " "Recovery journal point is (%llu, %" PRIu16 "), " "but commit waiter point is (%llu, %" PRIu16 ")", journal->commitPoint.sequenceNumber, journal->commitPoint.entryCount, dataVIO->recoveryJournalPoint.sequenceNumber, dataVIO->recoveryJournalPoint.entryCount); journal->commitPoint = dataVIO->recoveryJournalPoint; int result = (isReadOnly(journal->readOnlyNotifier) ? VDO_READ_ONLY : VDO_SUCCESS); continueWaiter(waiter, &result); } /** * Notify any VIOs whose entries have now committed. * * @param journal The recovery journal to update **/ static void notifyCommitWaiters(RecoveryJournal *journal) { if (isRingEmpty(&journal->activeTailBlocks)) { return; } for (RingNode *node = journal->activeTailBlocks.next; node != &journal->activeTailBlocks; node = node->next) { RecoveryJournalBlock *block = blockFromRingNode(node); if (block->committing) { return; } notifyAllWaiters(&block->commitWaiters, continueCommittedWaiter, journal); if (isReadOnly(journal->readOnlyNotifier)) { notifyAllWaiters(&block->entryWaiters, continueCommittedWaiter, journal); } else if (isRecoveryBlockDirty(block) || !isRecoveryBlockFull(block)) { // Stop at partially-committed or partially-filled blocks. return; } } } /** * Recycle any journal blocks which have been fully committed. * * @param journal The recovery journal to update **/ static void recycleJournalBlocks(RecoveryJournal *journal) { while (!isRingEmpty(&journal->activeTailBlocks)) { RecoveryJournalBlock *block = blockFromRingNode(journal->activeTailBlocks.next); if (block->committing) { // Don't recycle committing blocks. return; } if (!isReadOnly(journal->readOnlyNotifier) && (isRecoveryBlockDirty(block) || !isRecoveryBlockFull(block))) { // Don't recycle partially written or partially full // blocks, except in read-only mode. return; } recycleJournalBlock(block); } } /** * Handle post-commit processing. This is the callback registered by * writeBlock(). If more entries accumulated in the block being committed while * the commit was in progress, another commit will be initiated. * * @param completion The completion of the VIO writing this block **/ static void completeWrite(VDOCompletion *completion) { RecoveryJournalBlock *block = completion->parent; RecoveryJournal *journal = block->journal; assertOnJournalThread(journal, __func__); journal->pendingWriteCount -= 1; journal->events.blocks.committed += 1; journal->events.entries.committed += block->entriesInCommit; block->uncommittedEntryCount -= block->entriesInCommit; block->entriesInCommit = 0; block->committing = false; // If this block is the latest block to be acknowledged, record that fact. if (block->sequenceNumber > journal->lastWriteAcknowledged) { journal->lastWriteAcknowledged = block->sequenceNumber; } RecoveryJournalBlock *lastActiveBlock = blockFromRingNode(journal->activeTailBlocks.next); ASSERT_LOG_ONLY((block->sequenceNumber >= lastActiveBlock->sequenceNumber), "completed journal write is still active"); notifyCommitWaiters(journal); // Is this block now full? Reaping, and adding entries, might have already // sent it off for rewriting; else, queue it for rewrite. if (isRecoveryBlockDirty(block) && isRecoveryBlockFull(block)) { scheduleBlockWrite(journal, block); } recycleJournalBlocks(journal); writeBlocks(journal); checkForDrainComplete(journal); } /**********************************************************************/ static void handleWriteError(VDOCompletion *completion) { RecoveryJournalBlock *block = completion->parent; RecoveryJournal *journal = block->journal; logErrorWithStringError(completion->result, "cannot write recovery journal block %llu", block->sequenceNumber); enterJournalReadOnlyMode(journal, completion->result); completeWrite(completion); } /** * Issue a block for writing. Implements WaiterCallback. **/ static void writeBlock(Waiter *waiter, void *context __attribute__((unused))) { RecoveryJournalBlock *block = blockFromWaiter(waiter); if (isReadOnly(block->journal->readOnlyNotifier)) { return; } int result = commitRecoveryBlock(block, completeWrite, handleWriteError); if (result != VDO_SUCCESS) { enterJournalReadOnlyMode(block->journal, result); } } /** * Attempt to commit blocks, according to write policy. * * @param journal The recovery journal **/ static void writeBlocks(RecoveryJournal *journal) { assertOnJournalThread(journal, __func__); /* * In sync and async-unsafe modes, we call this function each time we queue * a full block on pending writes; in addition, in all cases we call this * function after adding entries to the journal and finishing a block write. * Thus, when this function terminates we must either have no VIOs waiting * in the journal or have some outstanding IO to provide a future wakeup. * * In all modes, if there are no outstanding writes and some unwritten * entries, we must issue a block, even if it's the active block and it * isn't full. Otherwise, in sync/async-unsafe modes, we want to issue * all full blocks every time; since we call it each time we fill a block, * this is equivalent to issuing every full block as soon as its full. In * async mode, we want to only issue full blocks if there are no * pending writes. */ PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer; if ((layer->getWritePolicy(layer) != WRITE_POLICY_ASYNC) || (journal->pendingWriteCount == 0)) { // Write all the full blocks. notifyAllWaiters(&journal->pendingWrites, writeBlock, NULL); } // Do we need to write the active block? Only if we have no outstanding // writes, even after issuing all of the full writes. if ((journal->pendingWriteCount == 0) && canCommitRecoveryBlock(journal->activeBlock)) { writeBlock(&journal->activeBlock->writeWaiter, NULL); } } /**********************************************************************/ void addRecoveryJournalEntry(RecoveryJournal *journal, DataVIO *dataVIO) { assertOnJournalThread(journal, __func__); if (!isNormal(&journal->state)) { continueDataVIO(dataVIO, VDO_INVALID_ADMIN_STATE); return; } if (isReadOnly(journal->readOnlyNotifier)) { continueDataVIO(dataVIO, VDO_READ_ONLY); return; } bool increment = isIncrementOperation(dataVIO->operation.type); ASSERT_LOG_ONLY((!increment || (dataVIO->recoverySequenceNumber == 0)), "journal lock not held for increment"); advanceJournalPoint(&journal->appendPoint, journal->entriesPerBlock); int result = enqueueDataVIO((increment ? &journal->incrementWaiters : &journal->decrementWaiters), dataVIO, THIS_LOCATION("$F($j-$js);io=journal($j-$js)")); if (result != VDO_SUCCESS) { enterJournalReadOnlyMode(journal, result); continueDataVIO(dataVIO, result); return; } assignEntries(journal); } /** * Conduct a sweep on a recovery journal to reclaim unreferenced blocks. * * @param journal The recovery journal **/ static void reapRecoveryJournal(RecoveryJournal *journal) { if (journal->reaping) { // We already have an outstanding reap in progress. We need to wait for it // to finish. return; } if (isQuiescent(&journal->state)) { // We are supposed to not do IO. Don't botch it by reaping. return; } // Start reclaiming blocks only when the journal head has no references. Then // stop when a block is referenced. while ((journal->blockMapReapHead < journal->lastWriteAcknowledged) && !isLocked(journal->lockCounter, journal->blockMapHeadBlockNumber, ZONE_TYPE_LOGICAL)) { journal->blockMapReapHead++; if (++journal->blockMapHeadBlockNumber == journal->size) { journal->blockMapHeadBlockNumber = 0; } } while ((journal->slabJournalReapHead < journal->lastWriteAcknowledged) && !isLocked(journal->lockCounter, journal->slabJournalHeadBlockNumber, ZONE_TYPE_PHYSICAL)) { journal->slabJournalReapHead++; if (++journal->slabJournalHeadBlockNumber == journal->size) { journal->slabJournalHeadBlockNumber = 0; } } if ((journal->blockMapReapHead == journal->blockMapHead) && (journal->slabJournalReapHead == journal->slabJournalHead)) { // Nothing happened. return; } PhysicalLayer *layer = vioAsCompletion(journal->flushVIO)->layer; if (layer->getWritePolicy(layer) != WRITE_POLICY_SYNC) { /* * If the block map head will advance, we must flush any block map page * modified by the entries we are reaping. If the slab journal head will * advance, we must flush the slab summary update covering the slab journal * that just released some lock. * * In sync mode, this is unnecessary because we won't record these numbers * on disk until the next journal block write, and in sync mode every * journal block write is preceded by a flush, which does the block map * page and slab summary update flushing itself. */ journal->reaping = true; launchFlush(journal->flushVIO, completeReaping, handleFlushError); return; } finishReaping(journal); } /**********************************************************************/ void acquireRecoveryJournalBlockReference(RecoveryJournal *journal, SequenceNumber sequenceNumber, ZoneType zoneType, ZoneCount zoneID) { if (sequenceNumber == 0) { return; } BlockCount blockNumber = getRecoveryJournalBlockNumber(journal, sequenceNumber); acquireLockCountReference(journal->lockCounter, blockNumber, zoneType, zoneID); } /**********************************************************************/ void releaseRecoveryJournalBlockReference(RecoveryJournal *journal, SequenceNumber sequenceNumber, ZoneType zoneType, ZoneCount zoneID) { if (sequenceNumber == 0) { return; } BlockCount blockNumber = getRecoveryJournalBlockNumber(journal, sequenceNumber); releaseLockCountReference(journal->lockCounter, blockNumber, zoneType, zoneID); } /**********************************************************************/ void releasePerEntryLockFromOtherZone(RecoveryJournal *journal, SequenceNumber sequenceNumber) { if (sequenceNumber == 0) { return; } BlockCount blockNumber = getRecoveryJournalBlockNumber(journal, sequenceNumber); releaseJournalZoneReferenceFromOtherZone(journal->lockCounter, blockNumber); } /** * Initiate a drain. * * Implements AdminInitiator. **/ static void initiateDrain(AdminState *state) { checkForDrainComplete(container_of(state, RecoveryJournal, state)); } /**********************************************************************/ void drainRecoveryJournal(RecoveryJournal *journal, AdminStateCode operation, VDOCompletion *parent) { assertOnJournalThread(journal, __func__); startDraining(&journal->state, operation, parent, initiateDrain); } /**********************************************************************/ void resumeRecoveryJournal(RecoveryJournal *journal, VDOCompletion *parent) { assertOnJournalThread(journal, __func__); bool saved = isSaved(&journal->state); setCompletionResult(parent, resumeIfQuiescent(&journal->state)); if (isReadOnly(journal->readOnlyNotifier)) { finishCompletion(parent, VDO_READ_ONLY); return; } if (saved) { initializeJournalState(journal); } if (resumeLockCounter(journal->lockCounter)) { // We might have missed a notification. reapRecoveryJournal(journal); } completeCompletion(parent); } /**********************************************************************/ BlockCount getJournalLogicalBlocksUsed(const RecoveryJournal *journal) { return journal->logicalBlocksUsed; } /**********************************************************************/ RecoveryJournalStatistics getRecoveryJournalStatistics(const RecoveryJournal *journal) { return journal->events; } /**********************************************************************/ void dumpRecoveryJournalStatistics(const RecoveryJournal *journal) { RecoveryJournalStatistics stats = getRecoveryJournalStatistics(journal); logInfo("Recovery Journal"); logInfo(" blockMapHead=%llu slabJournalHead=%" PRIu64 " lastWriteAcknowledged=%llu tail=%" PRIu64 " blockMapReapHead=%llu slabJournalReapHead=%" PRIu64 " diskFull=%llu slabJournalCommitsRequested=%" PRIu64 " incrementWaiters=%zu decrementWaiters=%zu", journal->blockMapHead, journal->slabJournalHead, journal->lastWriteAcknowledged, journal->tail, journal->blockMapReapHead, journal->slabJournalReapHead, stats.diskFull, stats.slabJournalCommitsRequested, countWaiters(&journal->incrementWaiters), countWaiters(&journal->decrementWaiters)); logInfo(" entries: started=%llu written=%llu committed=%" PRIu64, stats.entries.started, stats.entries.written, stats.entries.committed); logInfo(" blocks: started=%llu written=%llu committed=%" PRIu64, stats.blocks.started, stats.blocks.written, stats.blocks.committed); logInfo(" active blocks:"); const RingNode *head = &journal->activeTailBlocks; for (RingNode *node = head->next; node != head; node = node->next) { dumpRecoveryBlock(blockFromRingNode(node)); } }