/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/blockMapTree.c#21 $
*/
#include "blockMapTree.h"
#include "logger.h"
#include "blockMap.h"
#include "blockMapInternals.h"
#include "blockMapPage.h"
#include "blockMapTreeInternals.h"
#include "constants.h"
#include "dataVIO.h"
#include "dirtyLists.h"
#include "forest.h"
#include "numUtils.h"
#include "recoveryJournal.h"
#include "referenceOperation.h"
#include "slabDepot.h"
#include "slabJournal.h"
#include "types.h"
#include "vdoInternal.h"
#include "vdoPageCache.h"
#include "vioPool.h"
enum {
BLOCK_MAP_VIO_POOL_SIZE = 64,
};
typedef struct __attribute__((packed)) {
RootCount rootIndex;
Height height;
PageNumber pageIndex;
SlotNumber slot;
} PageDescriptor;
typedef union {
PageDescriptor descriptor;
uint64_t key;
} PageKey;
typedef struct {
BlockMapTreeZone *zone;
uint8_t generation;
} WriteIfNotDirtiedContext;
/**
* An invalid PBN used to indicate that the page holding the location of a
* tree root has been "loaded".
**/
const PhysicalBlockNumber INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
/**
* Convert a RingNode to a TreePage.
*
* @param ringNode The RingNode to convert
*
* @return The TreePage which owns the RingNode
**/
static inline TreePage *treePageFromRingNode(RingNode *ringNode)
{
return (TreePage *) ((byte *) ringNode - offsetof(TreePage, node));
}
/**********************************************************************/
static void writeDirtyPagesCallback(RingNode *expired, void *context);
/**
* Make VIOs for reading, writing, and allocating the arboreal block map.
*
* Implements VIOConstructor.
**/
__attribute__((warn_unused_result))
static int makeBlockMapVIOs(PhysicalLayer *layer,
void *parent,
void *buffer,
VIO **vioPtr)
{
return createVIO(layer, VIO_TYPE_BLOCK_MAP_INTERIOR, VIO_PRIORITY_METADATA,
parent, buffer, vioPtr);
}
/**********************************************************************/
int initializeTreeZone(BlockMapZone *zone,
PhysicalLayer *layer,
BlockCount eraLength)
{
STATIC_ASSERT_SIZEOF(PageDescriptor, sizeof(uint64_t));
BlockMapTreeZone *treeZone = &zone->treeZone;
treeZone->mapZone = zone;
int result = makeDirtyLists(eraLength, writeDirtyPagesCallback, treeZone,
&treeZone->dirtyLists);
if (result != VDO_SUCCESS) {
return result;
}
result = makeIntMap(LOCK_MAP_CAPACITY, 0, &treeZone->loadingPages);
if (result != VDO_SUCCESS) {
return result;
}
return makeVIOPool(layer, BLOCK_MAP_VIO_POOL_SIZE, zone->threadID,
makeBlockMapVIOs, treeZone, &treeZone->vioPool);
}
/**********************************************************************/
int replaceTreeZoneVIOPool(BlockMapTreeZone *zone,
PhysicalLayer *layer,
size_t poolSize)
{
freeVIOPool(&zone->vioPool);
return makeVIOPool(layer, poolSize, zone->mapZone->threadID,
makeBlockMapVIOs, zone, &zone->vioPool);
}
/**********************************************************************/
void uninitializeBlockMapTreeZone(BlockMapTreeZone *treeZone)
{
freeDirtyLists(&treeZone->dirtyLists);
freeVIOPool(&treeZone->vioPool);
freeIntMap(&treeZone->loadingPages);
}
/**********************************************************************/
void setTreeZoneInitialPeriod(BlockMapTreeZone *treeZone,
SequenceNumber period)
{
setCurrentPeriod(treeZone->dirtyLists, period);
}
/**
* Get the BlockMapTreeZone in which a DataVIO is operating.
*
* @param dataVIO The DataVIO
*
* @return The BlockMapTreeZone
**/
__attribute__((warn_unused_result))
static inline BlockMapTreeZone *getBlockMapTreeZone(DataVIO *dataVIO)
{
return &(getBlockMapForZone(dataVIO->logical.zone)->treeZone);
}
/**
* Get the TreePage for a given lock. This will be the page referred to by the
* lock's tree slot for the lock's current height.
*
* @param zone The tree zone of the tree
* @param lock The lock describing the page to get
*
* @return The requested page
**/
static inline TreePage *getTreePage(const BlockMapTreeZone *zone,
const TreeLock *lock)
{
return getTreePageByIndex(zone->mapZone->blockMap->forest,
lock->rootIndex,
lock->height,
lock->treeSlots[lock->height].pageIndex);
}
/**********************************************************************/
bool copyValidPage(char *buffer,
Nonce nonce,
PhysicalBlockNumber pbn,
BlockMapPage *page)
{
BlockMapPage *loaded = (BlockMapPage *) buffer;
BlockMapPageValidity validity = validateBlockMapPage(loaded, nonce, pbn);
if (validity == BLOCK_MAP_PAGE_VALID) {
memcpy(page, loaded, VDO_BLOCK_SIZE);
return true;
}
if (validity == BLOCK_MAP_PAGE_BAD) {
logErrorWithStringError(VDO_BAD_PAGE,
"Expected page %" PRIu64
" but got page %llu instead",
pbn, getBlockMapPagePBN(loaded));
}
return false;
}
/**********************************************************************/
bool isTreeZoneActive(BlockMapTreeZone *zone)
{
return ((zone->activeLookups != 0)
|| hasWaiters(&zone->flushWaiters)
|| isVIOPoolBusy(zone->vioPool));
}
/**
* Put the VDO in read-only mode and wake any VIOs waiting for a flush.
*
* @param zone The zone
* @param result The error which is causing read-only mode
**/
static void enterZoneReadOnlyMode(BlockMapTreeZone *zone, int result)
{
enterReadOnlyMode(zone->mapZone->readOnlyNotifier, result);
// We are in read-only mode, so we won't ever write any page out. Just take
// all waiters off the queue so the tree zone can be closed.
while (hasWaiters(&zone->flushWaiters)) {
dequeueNextWaiter(&zone->flushWaiters);
}
checkForDrainComplete(zone->mapZone);
}
/**
* Check whether a generation is strictly older than some other generation in
* the context of a zone's current generation range.
*
* @param zone The zone in which to do the comparison
* @param a The generation in question
* @param b The generation to compare to
*
* @return <code>true</code> if generation a is not strictly older than
* generation b in the context of the zone
**/
__attribute__((warn_unused_result))
static bool isNotOlder(BlockMapTreeZone *zone, uint8_t a, uint8_t b)
{
int result = ASSERT((inCyclicRange(zone->oldestGeneration, a,
zone->generation, 1 << 8)
&& inCyclicRange(zone->oldestGeneration, b,
zone->generation, 1 << 8)),
"generation(s) %u, %u are out of range [%u, %u]",
a, b, zone->oldestGeneration, zone->generation);
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
return true;
}
return inCyclicRange(b, a, zone->generation, 1 << 8);
}
/**
* Decrement the count for a generation and roll the oldest generation if there
* are no longer any active pages in it.
*
* @param zone The zone
* @param generation The generation to release
**/
static void releaseGeneration(BlockMapTreeZone *zone, uint8_t generation)
{
int result = ASSERT((zone->dirtyPageCounts[generation] > 0),
"dirty page count underflow for generation %u",
generation);
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
return;
}
zone->dirtyPageCounts[generation]--;
while ((zone->dirtyPageCounts[zone->oldestGeneration] == 0)
&& (zone->oldestGeneration != zone->generation)) {
zone->oldestGeneration++;
}
}
/**
* Set the generation of a page and update the dirty page count in the zone.
*
* @param zone The zone which owns the page
* @param page The page
* @param newGeneration The generation to set
* @param decrementOld Whether to decrement the count of the page's old
* generation
**/
static void setGeneration(BlockMapTreeZone *zone,
TreePage *page,
uint8_t newGeneration,
bool decrementOld)
{
uint8_t oldGeneration = page->generation;
if (decrementOld && (oldGeneration == newGeneration)) {
return;
}
page->generation = newGeneration;
uint32_t newCount = ++zone->dirtyPageCounts[newGeneration];
int result = ASSERT((newCount != 0),
"dirty page count overflow for generation %u",
newGeneration);
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
return;
}
if (decrementOld) {
releaseGeneration(zone, oldGeneration);
}
}
/**********************************************************************/
static void writePage(TreePage *treePage, VIOPoolEntry *entry);
/**
* Write out a dirty page if it is still covered by the most recent flush
* or if it is the flusher.
*
* <p>Implements WaiterCallback
*
* @param waiter The page to write
* @param context The VIOPoolEntry with which to do the write
**/
static void writePageCallback(Waiter *waiter, void *context)
{
STATIC_ASSERT(offsetof(TreePage, waiter) == 0);
writePage((TreePage *) waiter, (VIOPoolEntry *) context);
}
/**
* Acquire a VIO for writing a dirty page.
*
* @param waiter The page which needs a VIO
* @param zone The zone
**/
static void acquireVIO(Waiter *waiter, BlockMapTreeZone *zone)
{
waiter->callback = writePageCallback;
int result = acquireVIOFromPool(zone->vioPool, waiter);
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
}
}
/**
* Attempt to increment the generation.
*
* @param zone The zone whose generation is to be incremented
*
* @return <code>true</code> if all possible generations were not already
* active
**/
static bool attemptIncrement(BlockMapTreeZone *zone)
{
uint8_t generation = zone->generation + 1;
if (zone->oldestGeneration == generation) {
return false;
}
zone->generation = generation;
return true;
}
/**
* Enqueue a page to either launch a flush or wait for the current flush which
* is already in progress.
*
* @param page The page to enqueue
* @param zone The zone
**/
static void enqueuePage(TreePage *page, BlockMapTreeZone *zone)
{
if ((zone->flusher == NULL) && attemptIncrement(zone)) {
zone->flusher = page;
acquireVIO(&page->waiter, zone);
return;
}
int result = enqueueWaiter(&zone->flushWaiters, &page->waiter);
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
}
}
/**
* Write pages which were waiting for a flush and have not been redirtied.
* Requeue those pages which were redirtied.
*
* <p>Implements WaiterCallback.
*
* @param waiter The dirty page
* @param context The zone and generation
**/
static void writePageIfNotDirtied(Waiter *waiter, void *context)
{
STATIC_ASSERT(offsetof(TreePage, waiter) == 0);
TreePage *page = (TreePage *) waiter;
WriteIfNotDirtiedContext *writeContext = context;
if (page->generation == writeContext->generation) {
acquireVIO(waiter, writeContext->zone);
return;
}
enqueuePage(page, writeContext->zone);
}
/**
* Return a VIO to the zone's pool.
*
* @param zone The zone which owns the pool
* @param entry The pool entry to return
**/
static void returnToPool(BlockMapTreeZone *zone, VIOPoolEntry *entry)
{
returnVIOToPool(zone->vioPool, entry);
checkForDrainComplete(zone->mapZone);
}
/**
* Handle the successful write of a tree page. This callback is registered in
* writeInitializedPage().
*
* @param completion The VIO doing the write
**/
static void finishPageWrite(VDOCompletion *completion)
{
VIOPoolEntry *entry = completion->parent;
TreePage *page = entry->parent;
BlockMapTreeZone *zone = entry->context;
releaseRecoveryJournalBlockReference(zone->mapZone->blockMap->journal,
page->writingRecoveryLock,
ZONE_TYPE_LOGICAL,
zone->mapZone->zoneNumber);
bool dirty = (page->writingGeneration != page->generation);
releaseGeneration(zone, page->writingGeneration);
page->writing = false;
if (zone->flusher == page) {
WriteIfNotDirtiedContext context = {
.zone = zone,
.generation = page->writingGeneration,
};
notifyAllWaiters(&zone->flushWaiters, writePageIfNotDirtied, &context);
if (dirty && attemptIncrement(zone)) {
writePage(page, entry);
return;
}
zone->flusher = NULL;
}
if (dirty) {
enqueuePage(page, zone);
} else if ((zone->flusher == NULL)
&& hasWaiters(&zone->flushWaiters)
&& attemptIncrement(zone)) {
zone->flusher = (TreePage *) dequeueNextWaiter(&zone->flushWaiters);
writePage(zone->flusher, entry);
return;
}
returnToPool(zone, entry);
}
/**
* Handle an error writing a tree page. This error handler is registered in
* writePage() and writeInitializedPage().
*
* @param completion The VIO doing the write
**/
static void handleWriteError(VDOCompletion *completion)
{
int result = completion->result;
VIOPoolEntry *entry = completion->parent;
BlockMapTreeZone *zone = entry->context;
enterZoneReadOnlyMode(zone, result);
returnToPool(zone, entry);
}
/**
* Write a page which has been written at least once. This callback is
* registered in (or called directly from) writePage().
*
* @param completion The VIO which will do the write
**/
static void writeInitializedPage(VDOCompletion *completion)
{
VIOPoolEntry *entry = completion->parent;
BlockMapTreeZone *zone = (BlockMapTreeZone *) entry->context;
TreePage *treePage = (TreePage *) entry->parent;
/*
* Set the initialized field of the copy of the page we are writing to true.
* We don't want to set it true on the real page in memory until after this
* write succeeds.
*/
BlockMapPage *page = (BlockMapPage *) entry->buffer;
markBlockMapPageInitialized(page, true);
launchWriteMetadataVIOWithFlush(entry->vio, getBlockMapPagePBN(page),
finishPageWrite, handleWriteError,
(zone->flusher == treePage), false);
}
/**
* Write a dirty tree page now that we have a VIO with which to write it.
*
* @param treePage The page to write
* @param entry The VIOPoolEntry with which to write
**/
static void writePage(TreePage *treePage, VIOPoolEntry *entry)
{
BlockMapTreeZone *zone = (BlockMapTreeZone *) entry->context;
if ((zone->flusher != treePage)
&& (isNotOlder(zone, treePage->generation, zone->generation))) {
// This page was re-dirtied after the last flush was issued, hence we need
// to do another flush.
enqueuePage(treePage, zone);
returnToPool(zone, entry);
return;
}
entry->parent = treePage;
memcpy(entry->buffer, treePage->pageBuffer, VDO_BLOCK_SIZE);
VDOCompletion *completion = vioAsCompletion(entry->vio);
completion->callbackThreadID = zone->mapZone->threadID;
treePage->writing = true;
treePage->writingGeneration = treePage->generation;
treePage->writingRecoveryLock = treePage->recoveryLock;
// Clear this now so that we know this page is not on any dirty list.
treePage->recoveryLock = 0;
BlockMapPage *page = asBlockMapPage(treePage);
if (!markBlockMapPageInitialized(page, true)) {
writeInitializedPage(completion);
return;
}
launchWriteMetadataVIO(entry->vio, getBlockMapPagePBN(page),
writeInitializedPage, handleWriteError);
}
/**
* Schedule a batch of dirty pages for writing.
*
* <p>Implements DirtyListsCallback.
*
* @param expired The pages to write
* @param context The zone
**/
static void writeDirtyPagesCallback(RingNode *expired, void *context)
{
BlockMapTreeZone *zone = (BlockMapTreeZone *) context;
uint8_t generation = zone->generation;
while (!isRingEmpty(expired)) {
TreePage *page = treePageFromRingNode(chopRingNode(expired));
int result = ASSERT(!isWaiting(&page->waiter),
"Newly expired page not already waiting to write");
if (result != VDO_SUCCESS) {
enterZoneReadOnlyMode(zone, result);
continue;
}
setGeneration(zone, page, generation, false);
if (!page->writing) {
enqueuePage(page, zone);
}
}
}
/**********************************************************************/
void advanceZoneTreePeriod(BlockMapTreeZone *zone, SequenceNumber period)
{
advancePeriod(zone->dirtyLists, period);
}
/**********************************************************************/
void drainZoneTrees(BlockMapTreeZone *zone)
{
ASSERT_LOG_ONLY((zone->activeLookups == 0),
"drainZoneTrees() called with no active lookups");
if (!isSuspending(&zone->mapZone->state)) {
flushDirtyLists(zone->dirtyLists);
}
}
/**
* Release a lock on a page which was being loaded or allocated.
*
* @param dataVIO The DataVIO releasing the page lock
* @param what What the DataVIO was doing (for logging)
**/
static void releasePageLock(DataVIO *dataVIO, char *what)
{
TreeLock *lock = &dataVIO->treeLock;
ASSERT_LOG_ONLY(lock->locked,
"release of unlocked block map page %s for key %" PRIu64
" in tree %u",
what, lock->key, lock->rootIndex);
BlockMapTreeZone *zone = getBlockMapTreeZone(dataVIO);
TreeLock *lockHolder = intMapRemove(zone->loadingPages, lock->key);
ASSERT_LOG_ONLY((lockHolder == lock),
"block map page %s mismatch for key %llu in tree %u",
what, lock->key, lock->rootIndex);
lock->locked = false;
}
/**
* Continue a DataVIO now that the lookup is complete.
*
* @param dataVIO The DataVIO
* @param result The result of the lookup
**/
static void finishLookup(DataVIO *dataVIO, int result)
{
dataVIO->treeLock.height = 0;
BlockMapTreeZone *zone = getBlockMapTreeZone(dataVIO);
--zone->activeLookups;
VDOCompletion *completion = dataVIOAsCompletion(dataVIO);
setCompletionResult(completion, result);
launchCallback(completion, dataVIO->treeLock.callback,
dataVIO->treeLock.threadID);
}
/**
* Abort a block map PBN lookup due to an error in the load or allocation on
* which we were waiting.
*
* @param waiter The DataVIO which was waiting for a page load or allocation
* @param context The error which caused the abort
**/
static void abortLookupForWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
int result = *((int *) context);
if (isReadDataVIO(dataVIO)) {
if (result == VDO_NO_SPACE) {
result = VDO_SUCCESS;
}
} else if (result != VDO_NO_SPACE) {
result = VDO_READ_ONLY;
}
finishLookup(dataVIO, result);
}
/**
* Abort a block map PBN lookup due to an error loading or allocating a page.
*
* @param dataVIO The DataVIO which was loading or allocating a page
* @param result The error code
* @param what What the DataVIO was doing (for logging)
**/
static void abortLookup(DataVIO *dataVIO, int result, char *what)
{
if (result != VDO_NO_SPACE) {
enterZoneReadOnlyMode(getBlockMapTreeZone(dataVIO), result);
}
if (dataVIO->treeLock.locked) {
releasePageLock(dataVIO, what);
notifyAllWaiters(&dataVIO->treeLock.waiters, abortLookupForWaiter,
&result);
}
finishLookup(dataVIO, result);
}
/**
* Abort a block map PBN lookup due to an error loading a page.
*
* @param dataVIO The DataVIO doing the page load
* @param result The error code
**/
static void abortLoad(DataVIO *dataVIO, int result)
{
abortLookup(dataVIO, result, "load");
}
/**
* Determine if a location represents a valid mapping for a tree page.
*
* @param vdo The VDO
* @param mapping The DataLocation to check
* @param height The height of the entry in the tree
*
* @return <code>true</code> if the entry represents a invalid page mapping
**/
__attribute__((warn_unused_result))
static bool isInvalidTreeEntry(const VDO *vdo,
const DataLocation *mapping,
Height height)
{
if (!isValidLocation(mapping)
|| isCompressed(mapping->state)
|| (isMappedLocation(mapping) && (mapping->pbn == ZERO_BLOCK))) {
return true;
}
// Roots aren't physical data blocks, so we can't check their PBNs.
if (height == BLOCK_MAP_TREE_HEIGHT) {
return false;
}
return !isPhysicalDataBlock(vdo->depot, mapping->pbn);
}
/**********************************************************************/
static void loadBlockMapPage(BlockMapTreeZone *zone, DataVIO *dataVIO);
static void allocateBlockMapPage(BlockMapTreeZone *zone, DataVIO *dataVIO);
/**
* Continue a block map PBN lookup now that a page has been loaded by
* descending one level in the tree.
*
* @param dataVIO The DataVIO doing the lookup
* @param page The page which was just loaded
**/
static void continueWithLoadedPage(DataVIO *dataVIO, BlockMapPage *page)
{
TreeLock *lock = &dataVIO->treeLock;
BlockMapTreeSlot slot = lock->treeSlots[lock->height];
DataLocation mapping
= unpackBlockMapEntry(&page->entries[slot.blockMapSlot.slot]);
if (isInvalidTreeEntry(getVDOFromDataVIO(dataVIO), &mapping, lock->height)) {
logErrorWithStringError(VDO_BAD_MAPPING,
"Invalid block map tree PBN: %llu with "
"state %u for page index %u at height %u",
mapping.pbn, mapping.state,
lock->treeSlots[lock->height - 1].pageIndex,
lock->height - 1);
abortLoad(dataVIO, VDO_BAD_MAPPING);
return;
}
if (!isMappedLocation(&mapping)) {
// The page we need is unallocated
allocateBlockMapPage(getBlockMapTreeZone(dataVIO), dataVIO);
return;
}
lock->treeSlots[lock->height - 1].blockMapSlot.pbn = mapping.pbn;
if (lock->height == 1) {
finishLookup(dataVIO, VDO_SUCCESS);
return;
}
// We know what page we need to load next
loadBlockMapPage(getBlockMapTreeZone(dataVIO), dataVIO);
}
/**
* Continue a block map PBN lookup now that the page load we were waiting on
* has finished.
*
* @param waiter The DataVIO waiting for a page to be loaded
* @param context The page which was just loaded
**/
static void continueLoadForWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
dataVIO->treeLock.height--;
continueWithLoadedPage(dataVIO, (BlockMapPage *) context);
}
/**
* Finish loading a page now that it has been read in from disk. This callback
* is registered in loadPage().
*
* @param completion The VIO doing the page read
**/
static void finishBlockMapPageLoad(VDOCompletion *completion)
{
VIOPoolEntry *entry = completion->parent;
DataVIO *dataVIO = entry->parent;
BlockMapTreeZone *zone = (BlockMapTreeZone *) entry->context;
TreeLock *treeLock = &dataVIO->treeLock;
treeLock->height--;
PhysicalBlockNumber pbn
= treeLock->treeSlots[treeLock->height].blockMapSlot.pbn;
TreePage *treePage = getTreePage(zone, treeLock);
BlockMapPage *page = (BlockMapPage *) treePage->pageBuffer;
Nonce nonce = zone->mapZone->blockMap->nonce;
if (!copyValidPage(entry->buffer, nonce, pbn, page)) {
formatBlockMapPage(page, nonce, pbn, false);
}
returnVIOToPool(zone->vioPool, entry);
// Release our claim to the load and wake any waiters
releasePageLock(dataVIO, "load");
notifyAllWaiters(&treeLock->waiters, continueLoadForWaiter, page);
continueWithLoadedPage(dataVIO, page);
}
/**
* Handle an error loading a tree page.
*
* @param completion The VIO doing the page read
**/
static void handleIOError(VDOCompletion *completion)
{
int result = completion->result;
VIOPoolEntry *entry = completion->parent;
DataVIO *dataVIO = entry->parent;
BlockMapTreeZone *zone = (BlockMapTreeZone *) entry->context;
returnVIOToPool(zone->vioPool, entry);
abortLoad(dataVIO, result);
}
/**
* Read a tree page from disk now that we've gotten a VIO with which to do the
* read. This WaiterCallback is registered in loadBlockMapPage().
*
* @param waiter The DataVIO which requires a page load
* @param context The VIOPool entry with which to do the read
**/
static void loadPage(Waiter *waiter, void *context)
{
VIOPoolEntry *entry = context;
DataVIO *dataVIO = waiterAsDataVIO(waiter);
entry->parent = dataVIO;
entry->vio->completion.callbackThreadID
= getBlockMapForZone(dataVIO->logical.zone)->threadID;
TreeLock *lock = &dataVIO->treeLock;
launchReadMetadataVIO(entry->vio,
lock->treeSlots[lock->height - 1].blockMapSlot.pbn,
finishBlockMapPageLoad, handleIOError);
}
/**
* Attempt to acquire a lock on a page in the block map tree. If the page is
* already locked, queue up to wait for the lock to be released. If the lock is
* acquired, the DataVIO's treeLock.locked field will be set to true.
*
* @param zone The BlockMapTreeZone in which the DataVIO operates
* @param dataVIO The DataVIO which desires a page lock
*
* @return VDO_SUCCESS or an error
**/
static int attemptPageLock(BlockMapTreeZone *zone, DataVIO *dataVIO)
{
TreeLock *lock = &dataVIO->treeLock;
Height height = lock->height;
BlockMapTreeSlot treeSlot = lock->treeSlots[height];
PageKey key;
key.descriptor = (PageDescriptor) {
.rootIndex = lock->rootIndex,
.height = height,
.pageIndex = treeSlot.pageIndex,
.slot = treeSlot.blockMapSlot.slot,
};
lock->key = key.key;
TreeLock *lockHolder;
int result = intMapPut(zone->loadingPages, lock->key, lock, false,
(void **) &lockHolder);
if (result != VDO_SUCCESS) {
return result;
}
if (lockHolder == NULL) {
// We got the lock
dataVIO->treeLock.locked = true;
return VDO_SUCCESS;
}
// Someone else is loading or allocating the page we need
return enqueueDataVIO(&lockHolder->waiters, dataVIO,
THIS_LOCATION("$F;cb=blockMapTreePage"));
}
/**
* Load a block map tree page from disk.
*
* @param zone The BlockMapTreeZone in which the DataVIO operates
* @param dataVIO The DataVIO which requires a page to be loaded
**/
static void loadBlockMapPage(BlockMapTreeZone *zone, DataVIO *dataVIO)
{
int result = attemptPageLock(zone, dataVIO);
if (result != VDO_SUCCESS) {
abortLoad(dataVIO, result);
return;
}
if (dataVIO->treeLock.locked) {
Waiter *waiter = dataVIOAsWaiter(dataVIO);
waiter->callback = loadPage;
result = acquireVIOFromPool(zone->vioPool, waiter);
if (result != VDO_SUCCESS) {
abortLoad(dataVIO, result);
}
}
}
/**
* Set the callback of a DataVIO after it has allocated a block map page.
*
* @param dataVIO The DataVIO
**/
static void setPostAllocationCallback(DataVIO *dataVIO)
{
setCallback(dataVIOAsCompletion(dataVIO), dataVIO->treeLock.callback,
dataVIO->treeLock.threadID);
}
/**
* Abort a block map PBN lookup due to an error allocating a page.
*
* @param dataVIO The DataVIO doing the page allocation
* @param result The error code
**/
static void abortAllocation(DataVIO *dataVIO, int result)
{
setPostAllocationCallback(dataVIO);
abortLookup(dataVIO, result, "allocation");
}
/**
* Callback to handle an error while attempting to allocate a page. This
* callback is used to transfer back to the logical zone along the block map
* page allocation path.
*
* @param completion The DataVIO doing the allocation
**/
static void allocationFailure(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInLogicalZone(dataVIO);
abortAllocation(dataVIO, completion->result);
}
/**
* Continue with page allocations now that a parent page has been allocated.
*
* @param waiter The DataVIO which was waiting for a page to be allocated
* @param context The physical block number of the page which was just
* allocated
**/
static void continueAllocationForWaiter(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
TreeLock *treeLock = &dataVIO->treeLock;
PhysicalBlockNumber pbn = *((PhysicalBlockNumber *) context);
treeLock->height--;
dataVIO->treeLock.treeSlots[treeLock->height].blockMapSlot.pbn = pbn;
if (treeLock->height == 0) {
finishLookup(dataVIO, VDO_SUCCESS);
return;
}
allocateBlockMapPage(getBlockMapTreeZone(dataVIO), dataVIO);
}
/**
* Finish the page allocation process by recording the allocation in the tree
* and waking any waiters now that the write lock has been released. This
* callback is registered in releaseBlockMapWriteLock().
*
* @param completion The DataVIO doing the allocation
**/
static void finishBlockMapAllocation(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInLogicalZone(dataVIO);
if (completion->result != VDO_SUCCESS) {
allocationFailure(completion);
return;
}
BlockMapTreeZone *zone = getBlockMapTreeZone(dataVIO);
TreeLock *treeLock = &dataVIO->treeLock;
TreePage *treePage = getTreePage(zone, treeLock);
Height height = treeLock->height;
PhysicalBlockNumber pbn = treeLock->treeSlots[height - 1].blockMapSlot.pbn;
// Record the allocation.
BlockMapPage *page = (BlockMapPage *) treePage->pageBuffer;
SequenceNumber oldLock = treePage->recoveryLock;
updateBlockMapPage(page, dataVIO, pbn, MAPPING_STATE_UNCOMPRESSED,
&treePage->recoveryLock);
if (isWaiting(&treePage->waiter)) {
// This page is waiting to be written out.
if (zone->flusher != treePage) {
// The outstanding flush won't cover the update we just made, so mark
// the page as needing another flush.
setGeneration(zone, treePage, zone->generation, true);
}
} else {
// Put the page on a dirty list
if (oldLock == 0) {
initializeRing(&treePage->node);
}
addToDirtyLists(zone->dirtyLists, &treePage->node, oldLock,
treePage->recoveryLock);
}
treeLock->height--;
if (height > 1) {
// Format the interior node we just allocated (in memory).
treePage = getTreePage(zone, treeLock);
formatBlockMapPage(treePage->pageBuffer, zone->mapZone->blockMap->nonce,
pbn, false);
}
// Release our claim to the allocation and wake any waiters
releasePageLock(dataVIO, "allocation");
notifyAllWaiters(&treeLock->waiters, continueAllocationForWaiter, &pbn);
if (treeLock->height == 0) {
finishLookup(dataVIO, VDO_SUCCESS);
return;
}
allocateBlockMapPage(zone, dataVIO);
}
/**
* Release the write lock on a newly allocated block map page now that we
* have made its journal entries and reference count updates. This callback
* is registered in setBlockMapPageReferenceCount().
*
* @param completion The DataVIO doing the allocation
**/
static void releaseBlockMapWriteLock(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
AllocatingVIO *allocatingVIO = dataVIOAsAllocatingVIO(dataVIO);
assertInAllocatedZone(dataVIO);
if (completion->result != VDO_SUCCESS) {
launchLogicalCallback(dataVIO, allocationFailure, THIS_LOCATION(NULL));
return;
}
releaseAllocationLock(allocatingVIO);
resetAllocation(allocatingVIO);
launchLogicalCallback(dataVIO, finishBlockMapAllocation,
THIS_LOCATION("$F;cb=finishBlockMapAllocation"));
}
/**
* Set the reference count of a newly allocated block map page to
* MAXIMUM_REFERENCES now that we have made a recovery journal entry for it.
* MAXIMUM_REFERENCES is used to prevent deduplication against the block after
* we release the write lock on it, but before we write out the page.
*
* @param completion The DataVIO doing the allocation
**/
static void setBlockMapPageReferenceCount(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInAllocatedZone(dataVIO);
if (completion->result != VDO_SUCCESS) {
launchLogicalCallback(dataVIO, allocationFailure, THIS_LOCATION(NULL));
return;
}
TreeLock *lock = &dataVIO->treeLock;
PhysicalBlockNumber pbn = lock->treeSlots[lock->height - 1].blockMapSlot.pbn;
completion->callback = releaseBlockMapWriteLock;
addSlabJournalEntry(getSlabJournal(getVDOFromDataVIO(dataVIO)->depot, pbn),
dataVIO);
}
/**
* Make a recovery journal entry for a newly allocated block map page.
* This callback is registered in continueBlockMapPageAllocation().
*
* @param completion The DataVIO doing the allocation
**/
static void journalBlockMapAllocation(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInJournalZone(dataVIO);
if (completion->result != VDO_SUCCESS) {
launchLogicalCallback(dataVIO, allocationFailure, THIS_LOCATION(NULL));
return;
}
setAllocatedZoneCallback(dataVIO, setBlockMapPageReferenceCount,
THIS_LOCATION(NULL));
addRecoveryJournalEntry(getVDOFromDataVIO(dataVIO)->recoveryJournal,
dataVIO);
}
/**
* Continue the process of allocating a block map page now that the
* BlockAllocator has given us a block. This method is supplied as the callback
* to allocateDataBlock() by allocateBlockMapPage().
*
* @param allocatingVIO The DataVIO which is doing the allocation
**/
static void continueBlockMapPageAllocation(AllocatingVIO *allocatingVIO)
{
DataVIO *dataVIO = allocatingVIOAsDataVIO(allocatingVIO);
if (!hasAllocation(dataVIO)) {
setLogicalCallback(dataVIO, allocationFailure, THIS_LOCATION(NULL));
continueDataVIO(dataVIO, VDO_NO_SPACE);
return;
}
PhysicalBlockNumber pbn = allocatingVIO->allocation;
TreeLock *lock = &dataVIO->treeLock;
lock->treeSlots[lock->height - 1].blockMapSlot.pbn = pbn;
setUpReferenceOperationWithLock(BLOCK_MAP_INCREMENT, pbn,
MAPPING_STATE_UNCOMPRESSED,
allocatingVIO->allocationLock,
&dataVIO->operation);
launchJournalCallback(dataVIO, journalBlockMapAllocation,
THIS_LOCATION("$F;cb=journalBlockMapAllocation"));
}
/**
* Allocate a block map page.
*
* @param zone The zone in which the DataVIO is operating
* @param dataVIO The DataVIO which needs to allocate a page
**/
static void allocateBlockMapPage(BlockMapTreeZone *zone, DataVIO *dataVIO)
{
if (!isWriteDataVIO(dataVIO) || isTrimDataVIO(dataVIO)) {
// This is a pure read, the read phase of a read-modify-write, or a trim,
// so there's nothing left to do here.
finishLookup(dataVIO, VDO_SUCCESS);
return;
}
int result = attemptPageLock(zone, dataVIO);
if (result != VDO_SUCCESS) {
abortAllocation(dataVIO, result);
return;
}
if (!dataVIO->treeLock.locked) {
return;
}
allocateDataBlock(dataVIOAsAllocatingVIO(dataVIO),
getAllocationSelector(dataVIO->logical.zone),
VIO_BLOCK_MAP_WRITE_LOCK,
continueBlockMapPageAllocation);
}
/**********************************************************************/
void lookupBlockMapPBN(DataVIO *dataVIO)
{
BlockMapTreeZone *zone = getBlockMapTreeZone(dataVIO);
zone->activeLookups++;
if (isDraining(&zone->mapZone->state)) {
finishLookup(dataVIO, VDO_SHUTTING_DOWN);
return;
}
TreeLock *lock = &dataVIO->treeLock;
PageNumber pageIndex
= ((lock->treeSlots[0].pageIndex - zone->mapZone->blockMap->flatPageCount)
/ zone->mapZone->blockMap->rootCount);
BlockMapTreeSlot treeSlot = {
.pageIndex = pageIndex / BLOCK_MAP_ENTRIES_PER_PAGE,
.blockMapSlot = {
.pbn = 0,
.slot = pageIndex % BLOCK_MAP_ENTRIES_PER_PAGE,
},
};
BlockMapPage *page = NULL;
for (lock->height = 1; lock->height <= BLOCK_MAP_TREE_HEIGHT;
lock->height++) {
lock->treeSlots[lock->height] = treeSlot;
page = (BlockMapPage *) (getTreePage(zone, lock)->pageBuffer);
PhysicalBlockNumber pbn = getBlockMapPagePBN(page);
if (pbn != ZERO_BLOCK) {
lock->treeSlots[lock->height].blockMapSlot.pbn = pbn;
break;
}
// Calculate the index and slot for the next level.
treeSlot.blockMapSlot.slot
= treeSlot.pageIndex % BLOCK_MAP_ENTRIES_PER_PAGE;
treeSlot.pageIndex
= treeSlot.pageIndex / BLOCK_MAP_ENTRIES_PER_PAGE;
}
// The page at this height has been allocated and loaded.
DataLocation mapping
= unpackBlockMapEntry(&page->entries[treeSlot.blockMapSlot.slot]);
if (isInvalidTreeEntry(getVDOFromDataVIO(dataVIO), &mapping, lock->height)) {
logErrorWithStringError(VDO_BAD_MAPPING,
"Invalid block map tree PBN: %llu with "
"state %u for page index %u at height %u",
mapping.pbn, mapping.state,
lock->treeSlots[lock->height - 1].pageIndex,
lock->height - 1);
abortLoad(dataVIO, VDO_BAD_MAPPING);
return;
}
if (!isMappedLocation(&mapping)) {
// The page we want one level down has not been allocated, so allocate it.
allocateBlockMapPage(zone, dataVIO);
return;
}
lock->treeSlots[lock->height - 1].blockMapSlot.pbn = mapping.pbn;
if (lock->height == 1) {
// This is the ultimate block map page, so we're done
finishLookup(dataVIO, VDO_SUCCESS);
return;
}
// We know what page we need to load.
loadBlockMapPage(zone, dataVIO);
}
/**********************************************************************/
PhysicalBlockNumber findBlockMapPagePBN(BlockMap *map, PageNumber pageNumber)
{
if (pageNumber < map->flatPageCount) {
return (BLOCK_MAP_FLAT_PAGE_ORIGIN + pageNumber);
}
RootCount rootIndex = pageNumber % map->rootCount;
PageNumber pageIndex = ((pageNumber - map->flatPageCount) / map->rootCount);
SlotNumber slot = pageIndex % BLOCK_MAP_ENTRIES_PER_PAGE;
pageIndex /= BLOCK_MAP_ENTRIES_PER_PAGE;
TreePage *treePage
= getTreePageByIndex(map->forest, rootIndex, 1, pageIndex);
BlockMapPage *page = (BlockMapPage *) treePage->pageBuffer;
if (!isBlockMapPageInitialized(page)) {
return ZERO_BLOCK;
}
DataLocation mapping = unpackBlockMapEntry(&page->entries[slot]);
if (!isValidLocation(&mapping) || isCompressed(mapping.state)) {
return ZERO_BLOCK;
}
return mapping.pbn;
}
/**********************************************************************/
void writeTreePage(TreePage *page, BlockMapTreeZone *zone)
{
bool waiting = isWaiting(&page->waiter);
if (waiting && (zone->flusher == page)) {
return;
}
setGeneration(zone, page, zone->generation, waiting);
if (waiting || page->writing) {
return;
}
enqueuePage(page, zone);
}