/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/recoveryJournalBlock.c#13 $
*/
#include "recoveryJournalBlock.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "dataVIO.h"
#include "fixedLayout.h"
#include "packedRecoveryJournalBlock.h"
#include "recoveryJournalEntry.h"
#include "recoveryJournalInternals.h"
#include "ringNode.h"
#include "vio.h"
#include "waitQueue.h"
/**********************************************************************/
int makeRecoveryBlock(PhysicalLayer *layer,
RecoveryJournal *journal,
RecoveryJournalBlock **blockPtr)
{
// Ensure that a block is large enough to store
// RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries.
STATIC_ASSERT(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK
<= ((VDO_BLOCK_SIZE - sizeof(PackedJournalHeader))
/ sizeof(PackedRecoveryJournalEntry)));
RecoveryJournalBlock *block;
int result = ALLOCATE(1, RecoveryJournalBlock, __func__, &block);
if (result != VDO_SUCCESS) {
return result;
}
// Allocate a full block for the journal block even though not all of the
// space is used since the VIO needs to write a full disk block.
result = ALLOCATE(VDO_BLOCK_SIZE, char, "PackedJournalBlock", &block->block);
if (result != VDO_SUCCESS) {
freeRecoveryBlock(&block);
return result;
}
result = createVIO(layer, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH,
block, block->block, &block->vio);
if (result != VDO_SUCCESS) {
freeRecoveryBlock(&block);
return result;
}
block->vio->completion.callbackThreadID = journal->threadID;
initializeRing(&block->ringNode);
block->journal = journal;
*blockPtr = block;
return VDO_SUCCESS;
}
/**********************************************************************/
void freeRecoveryBlock(RecoveryJournalBlock **blockPtr)
{
RecoveryJournalBlock *block = *blockPtr;
if (block == NULL) {
return;
}
FREE(block->block);
freeVIO(&block->vio);
FREE(block);
*blockPtr = NULL;
}
/**
* Get a pointer to the packed journal block header in the block buffer.
*
* @param block The recovery block
*
* @return The block's header
**/
static inline
PackedJournalHeader *getBlockHeader(const RecoveryJournalBlock *block)
{
return (PackedJournalHeader *) block->block;
}
/**
* Set the current sector of the current block and initialize it.
*
* @param block The block to update
* @param sector A pointer to the first byte of the new sector
**/
static void setActiveSector(RecoveryJournalBlock *block, void *sector)
{
block->sector = (PackedJournalSector *) sector;
block->sector->checkByte = getBlockHeader(block)->fields.checkByte;
block->sector->recoveryCount = block->journal->recoveryCount;
block->sector->entryCount = 0;
}
/**********************************************************************/
void initializeRecoveryBlock(RecoveryJournalBlock *block)
{
memset(block->block, 0x0, VDO_BLOCK_SIZE);
RecoveryJournal *journal = block->journal;
block->sequenceNumber = journal->tail;
block->entryCount = 0;
block->uncommittedEntryCount = 0;
block->blockNumber = getRecoveryJournalBlockNumber(journal, journal->tail);
RecoveryBlockHeader unpacked = {
.metadataType = VDO_METADATA_RECOVERY_JOURNAL,
.blockMapDataBlocks = journal->blockMapDataBlocks,
.logicalBlocksUsed = journal->logicalBlocksUsed,
.nonce = journal->nonce,
.recoveryCount = journal->recoveryCount,
.sequenceNumber = journal->tail,
.checkByte = computeRecoveryCheckByte(journal, journal->tail),
};
PackedJournalHeader *header = getBlockHeader(block);
packRecoveryBlockHeader(&unpacked, header);
setActiveSector(block, getJournalBlockSector(header, 1));
}
/**********************************************************************/
int enqueueRecoveryBlockEntry(RecoveryJournalBlock *block, DataVIO *dataVIO)
{
// First queued entry indicates this is a journal block we've just opened
// or a committing block we're extending and will have to write again.
bool newBatch = !hasWaiters(&block->entryWaiters);
// Enqueue the DataVIO to wait for its entry to commit.
int result = enqueueDataVIO(&block->entryWaiters, dataVIO,
THIS_LOCATION("$F($j-$js)"));
if (result != VDO_SUCCESS) {
return result;
}
block->entryCount++;
block->uncommittedEntryCount++;
// Update stats to reflect the journal entry we're going to write.
if (newBatch) {
block->journal->events.blocks.started++;
}
block->journal->events.entries.started++;
return VDO_SUCCESS;
}
/**
* Check whether the current sector of a block is full.
*
* @param block The block to check
*
* @return <code>true</code> if the sector is full
**/
__attribute__((warn_unused_result))
static bool isSectorFull(const RecoveryJournalBlock *block)
{
return (block->sector->entryCount == RECOVERY_JOURNAL_ENTRIES_PER_SECTOR);
}
/**
* Actually add entries from the queue to the given block.
*
* @param block The journal block
*
* @return VDO_SUCCESS or an error code
**/
__attribute__((warn_unused_result))
static int addQueuedRecoveryEntries(RecoveryJournalBlock *block)
{
while (hasWaiters(&block->entryWaiters)) {
DataVIO *dataVIO
= waiterAsDataVIO(dequeueNextWaiter(&block->entryWaiters));
if (dataVIO->operation.type == DATA_INCREMENT) {
// In order to not lose committed sectors of this partial write, we must
// flush before the partial write entries are committed.
block->hasPartialWriteEntry = (block->hasPartialWriteEntry
|| dataVIO->isPartialWrite);
/*
* In order to not lose acknowledged writes with the FUA flag set, we
* must issue a flush to cover the data write and also all previous
* journal writes, and we must issue a FUA on the journal write.
*/
block->hasFUAEntry = (block->hasFUAEntry
|| vioRequiresFlushAfter(dataVIOAsVIO(dataVIO)));
}
// Compose and encode the entry.
PackedRecoveryJournalEntry *packedEntry
= &block->sector->entries[block->sector->entryCount++];
TreeLock *lock = &dataVIO->treeLock;
RecoveryJournalEntry newEntry = {
.mapping = {
.pbn = dataVIO->operation.pbn,
.state = dataVIO->operation.state,
},
.operation = dataVIO->operation.type,
.slot = lock->treeSlots[lock->height].blockMapSlot,
};
*packedEntry = packRecoveryJournalEntry(&newEntry);
if (isIncrementOperation(dataVIO->operation.type)) {
dataVIO->recoverySequenceNumber = block->sequenceNumber;
}
// Enqueue the DataVIO to wait for its entry to commit.
int result = enqueueDataVIO(&block->commitWaiters, dataVIO,
THIS_LOCATION("$F($j-$js)"));
if (result != VDO_SUCCESS) {
continueDataVIO(dataVIO, result);
return result;
}
if (isSectorFull(block)) {
setActiveSector(block, (char *) block->sector + VDO_SECTOR_SIZE);
}
}
return VDO_SUCCESS;
}
/**********************************************************************/
__attribute__((warn_unused_result))
static int getRecoveryBlockPBN(RecoveryJournalBlock *block,
PhysicalBlockNumber *pbnPtr)
{
RecoveryJournal *journal = block->journal;
int result = translateToPBN(journal->partition, block->blockNumber, pbnPtr);
if (result != VDO_SUCCESS) {
logErrorWithStringError(result,
"Error translating recovery journal block "
"number %llu", block->blockNumber);
}
return result;
}
/**********************************************************************/
bool canCommitRecoveryBlock(RecoveryJournalBlock *block)
{
// Cannot commit in read-only mode, if already committing the block, or
// if there are no entries to commit.
return ((block != NULL)
&& !block->committing
&& hasWaiters(&block->entryWaiters)
&& !isReadOnly(block->journal->readOnlyNotifier));
}
/**********************************************************************/
int commitRecoveryBlock(RecoveryJournalBlock *block,
VDOAction *callback,
VDOAction *errorHandler)
{
int result = ASSERT(canCommitRecoveryBlock(block), "should never call %s"
" when the block can't be committed", __func__);
if (result != VDO_SUCCESS) {
return result;
}
PhysicalBlockNumber blockPBN;
result = getRecoveryBlockPBN(block, &blockPBN);
if (result != VDO_SUCCESS) {
return result;
}
block->entriesInCommit = countWaiters(&block->entryWaiters);
result = addQueuedRecoveryEntries(block);
if (result != VDO_SUCCESS) {
return result;
}
RecoveryJournal *journal = block->journal;
PackedJournalHeader *header = getBlockHeader(block);
// Update stats to reflect the block and entries we're about to write.
journal->pendingWriteCount += 1;
journal->events.blocks.written += 1;
journal->events.entries.written += block->entriesInCommit;
storeUInt64LE(header->fields.blockMapHead, journal->blockMapHead);
storeUInt64LE(header->fields.slabJournalHead, journal->slabJournalHead);
storeUInt16LE(header->fields.entryCount, block->entryCount);
block->committing = true;
/*
* In sync or async mode, when we are writing an increment entry for a
* request with FUA, or when making the increment entry for a partial
* write, we need to make sure all the data being mapped to by this block
* is stable on disk and also that the recovery journal is stable up to
* the current block, so we must flush before writing.
*
* In sync mode, and for FUA, we also need to make sure that the write we
* are doing is stable, so we issue the write with FUA.
*/
PhysicalLayer *layer = vioAsCompletion(block->vio)->layer;
bool fua = (block->hasFUAEntry
|| (layer->getWritePolicy(layer) == WRITE_POLICY_SYNC));
bool flush = (block->hasFUAEntry
|| (layer->getWritePolicy(layer) != WRITE_POLICY_ASYNC_UNSAFE)
|| block->hasPartialWriteEntry);
block->hasFUAEntry = false;
block->hasPartialWriteEntry = false;
launchWriteMetadataVIOWithFlush(block->vio, blockPBN, callback, errorHandler,
flush, fua);
return VDO_SUCCESS;
}
/**********************************************************************/
void dumpRecoveryBlock(const RecoveryJournalBlock *block)
{
logInfo(" sequence number %llu; entries %" PRIu16
"; %s; %zu entry waiters; %zu commit waiters",
block->sequenceNumber,
block->entryCount,
(block->committing ? "committing" : "waiting"),
countWaiters(&block->entryWaiters),
countWaiters(&block->commitWaiters));
}