/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/dataVIO.c#7 $
*/
#include "dataVIO.h"
#include "logger.h"
#include "atomic.h"
#include "blockMap.h"
#include "compressionState.h"
#include "extent.h"
#include "logicalZone.h"
#include "threadConfig.h"
#include "vdoInternal.h"
#include "vioRead.h"
#include "vioWrite.h"
static const char *ASYNC_OPERATION_NAMES[] = {
"launch",
"acknowledgeWrite",
"acquireHashLock",
"acquireLogicalBlockLock",
"acquirePBNReadLock",
"checkForDedupeForRollover",
"checkForDeduplication",
"compressData",
"continueVIOAsync",
"findBlockMapSlot",
"getMappedBlock",
"getMappedBlockForDedupe",
"getMappedBlockForWrite",
"hashData",
"journalDecrementForDedupe",
"journalDecrementForWrite",
"journalIncrementForCompression",
"journalIncrementForDedupe",
"journalIncrementForWrite",
"journalMappingForCompression",
"journalMappingForDedupe",
"journalMappingForWrite",
"journalUnmappingForDedupe",
"journalUnmappingForWrite",
"attemptPacking",
"putMappedBlock",
"putMappedBlockForDedupe",
"readData",
"updateIndex",
"verifyDeduplication",
"writeData",
};
/**
* Initialize the LBN lock of a DataVIO. In addition to recording the LBN on
* which the DataVIO will operate, it will also find the logical zone
* associated with the LBN.
*
* @param dataVIO The dataVIO to initialize
* @param lbn The lbn on which the dataVIO will operate
**/
static void initializeLBNLock(DataVIO *dataVIO, LogicalBlockNumber lbn)
{
LBNLock *lock = &dataVIO->logical;
lock->lbn = lbn;
lock->locked = false;
initializeWaitQueue(&lock->waiters);
VDO *vdo = getVDOFromDataVIO(dataVIO);
lock->zone = getLogicalZone(vdo->logicalZones, computeLogicalZone(dataVIO));
}
/**********************************************************************/
void prepareDataVIO(DataVIO *dataVIO,
LogicalBlockNumber lbn,
VIOOperation operation,
bool isTrim,
VDOAction *callback)
{
// Clearing the tree lock must happen before initializing the LBN lock,
// which also adds information to the tree lock.
memset(&dataVIO->treeLock, 0, sizeof(dataVIO->treeLock));
initializeLBNLock(dataVIO, lbn);
initializeRing(&dataVIO->hashLockNode);
initializeRing(&dataVIO->writeNode);
resetAllocation(dataVIOAsAllocatingVIO(dataVIO));
dataVIO->isDuplicate = false;
memset(&dataVIO->chunkName, 0, sizeof(dataVIO->chunkName));
memset(&dataVIO->duplicate, 0, sizeof(dataVIO->duplicate));
VIO *vio = dataVIOAsVIO(dataVIO);
vio->operation = operation;
vio->callback = callback;
dataVIO->pageCompletion.completion.enqueueable
= vioAsCompletion(vio)->enqueueable;
dataVIO->mapped.state = MAPPING_STATE_UNCOMPRESSED;
dataVIO->newMapped.state
= (isTrim ? MAPPING_STATE_UNMAPPED : MAPPING_STATE_UNCOMPRESSED);
resetCompletion(vioAsCompletion(vio));
setLogicalCallback(dataVIO, attemptLogicalBlockLock,
THIS_LOCATION("$F;cb=acquireLogicalBlockLock"));
}
/**********************************************************************/
void completeDataVIO(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
if (completion->result != VDO_SUCCESS) {
VIO *vio = dataVIOAsVIO(dataVIO);
updateVIOErrorStats(vio,
"Completing %s VIO for LBN %" PRIu64
" with error after %s",
getVIOReadWriteFlavor(vio), dataVIO->logical.lbn,
getOperationName(dataVIO));
}
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F($io)"));
if (isReadDataVIO(dataVIO)) {
cleanupReadDataVIO(dataVIO);
} else {
cleanupWriteDataVIO(dataVIO);
}
}
/**********************************************************************/
void finishDataVIO(DataVIO *dataVIO, int result)
{
VDOCompletion *completion = dataVIOAsCompletion(dataVIO);
setCompletionResult(completion, result);
completeDataVIO(completion);
}
/**********************************************************************/
const char *getOperationName(DataVIO *dataVIO)
{
STATIC_ASSERT((MAX_ASYNC_OPERATION_NUMBER - MIN_ASYNC_OPERATION_NUMBER)
== COUNT_OF(ASYNC_OPERATION_NAMES));
return ((dataVIO->lastAsyncOperation < MAX_ASYNC_OPERATION_NUMBER)
? ASYNC_OPERATION_NAMES[dataVIO->lastAsyncOperation]
: "unknown async operation");
}
/**********************************************************************/
void receiveDedupeAdvice(DataVIO *dataVIO, const DataLocation *advice)
{
/*
* NOTE: this is called on non-base-code threads. Be very careful to not do
* anything here that needs a base code thread-local variable, such as
* trying to get the current thread ID, or that does a lot of work.
*/
VDO *vdo = getVDOFromDataVIO(dataVIO);
ZonedPBN duplicate = validateDedupeAdvice(vdo, advice, dataVIO->logical.lbn);
setDuplicateLocation(dataVIO, duplicate);
}
/**********************************************************************/
void setDuplicateLocation(DataVIO *dataVIO, const ZonedPBN source)
{
dataVIO->isDuplicate = (source.pbn != ZERO_BLOCK);
dataVIO->duplicate = source;
}
/**********************************************************************/
void clearMappedLocation(DataVIO *dataVIO)
{
dataVIO->mapped = (ZonedPBN) { .state = MAPPING_STATE_UNMAPPED };
}
/**********************************************************************/
int setMappedLocation(DataVIO *dataVIO,
PhysicalBlockNumber pbn,
BlockMappingState state)
{
PhysicalZone *zone;
int result = getPhysicalZone(getVDOFromDataVIO(dataVIO), pbn, &zone);
if (result != VDO_SUCCESS) {
return result;
}
dataVIO->mapped = (ZonedPBN) {
.pbn = pbn,
.state = state,
.zone = zone,
};
return VDO_SUCCESS;
}
/**
* Launch a request which has acquired an LBN lock.
*
* @param dataVIO The DataVIO which has just acquired a lock
**/
static void launchLockedRequest(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
dataVIO->logical.locked = true;
if (isWriteDataVIO(dataVIO)) {
launchWriteDataVIO(dataVIO);
} else {
launchReadDataVIO(dataVIO);
}
}
/**********************************************************************/
void attemptLogicalBlockLock(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInLogicalZone(dataVIO);
if (dataVIO->logical.lbn
>= getVDOFromDataVIO(dataVIO)->config.logicalBlocks) {
finishDataVIO(dataVIO, VDO_OUT_OF_RANGE);
return;
}
DataVIO *lockHolder;
LBNLock *lock = &dataVIO->logical;
int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, dataVIO, false,
(void **) &lockHolder);
if (result != VDO_SUCCESS) {
finishDataVIO(dataVIO, result);
return;
}
if (lockHolder == NULL) {
// We got the lock
launchLockedRequest(dataVIO);
return;
}
result = ASSERT(lockHolder->logical.locked, "logical block lock held");
if (result != VDO_SUCCESS) {
finishDataVIO(dataVIO, result);
return;
}
/*
* If the new request is a pure read request (not read-modify-write) and
* the lockHolder is writing and has received an allocation (VDO-2683),
* service the read request immediately by copying data from the lockHolder
* to avoid having to flush the write out of the packer just to prevent the
* read from waiting indefinitely. If the lockHolder does not yet have an
* allocation, prevent it from blocking in the packer and wait on it.
*/
if (isReadDataVIO(dataVIO) && atomicLoadBool(&lockHolder->hasAllocation)) {
dataVIOAsCompletion(dataVIO)->layer->copyData(lockHolder, dataVIO);
finishDataVIO(dataVIO, VDO_SUCCESS);
return;
}
dataVIO->lastAsyncOperation = ACQUIRE_LOGICAL_BLOCK_LOCK;
result = enqueueDataVIO(&lockHolder->logical.waiters, dataVIO,
THIS_LOCATION("$F;cb=logicalBlockLock"));
if (result != VDO_SUCCESS) {
finishDataVIO(dataVIO, result);
return;
}
// Prevent writes and read-modify-writes from blocking indefinitely on
// lock holders in the packer.
if (!isReadDataVIO(lockHolder) && cancelCompression(lockHolder)) {
dataVIO->compression.lockHolder = lockHolder;
launchPackerCallback(dataVIO, removeLockHolderFromPacker,
THIS_LOCATION("$F;cb=removeLockHolderFromPacker"));
}
}
/**
* Release an uncontended LBN lock.
*
* @param dataVIO The DataVIO holding the lock
**/
static void releaseLock(DataVIO *dataVIO)
{
LBNLock *lock = &dataVIO->logical;
IntMap *lockMap = getLBNLockMap(lock->zone);
if (!lock->locked) {
// The lock is not locked, so it had better not be registered in the lock
// map.
DataVIO *lockHolder = intMapGet(lockMap, lock->lbn);
ASSERT_LOG_ONLY((dataVIO != lockHolder),
"no logical block lock held for block %llu",
lock->lbn);
return;
}
// Remove the lock from the logical block lock map, releasing the lock.
DataVIO *lockHolder = intMapRemove(lockMap, lock->lbn);
ASSERT_LOG_ONLY((dataVIO == lockHolder),
"logical block lock mismatch for block %llu", lock->lbn);
lock->locked = false;
return;
}
/**********************************************************************/
void releaseLogicalBlockLock(DataVIO *dataVIO)
{
assertInLogicalZone(dataVIO);
if (!hasWaiters(&dataVIO->logical.waiters)) {
releaseLock(dataVIO);
return;
}
LBNLock *lock = &dataVIO->logical;
ASSERT_LOG_ONLY(lock->locked, "LBNLock with waiters is not locked");
// Another DataVIO is waiting for the lock, so just transfer it in a single
// lock map operation
DataVIO *nextLockHolder = waiterAsDataVIO(dequeueNextWaiter(&lock->waiters));
// Transfer the remaining lock waiters to the next lock holder.
transferAllWaiters(&lock->waiters, &nextLockHolder->logical.waiters);
DataVIO *lockHolder;
int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, nextLockHolder,
true, (void **) &lockHolder);
if (result != VDO_SUCCESS) {
finishDataVIO(nextLockHolder, result);
return;
}
ASSERT_LOG_ONLY((lockHolder == dataVIO),
"logical block lock mismatch for block %llu", lock->lbn);
lock->locked = false;
/*
* If there are still waiters, other DataVIOs must be trying to get the lock
* we just transferred. We must ensure that the new lock holder doesn't block
* in the packer.
*/
if (hasWaiters(&nextLockHolder->logical.waiters)) {
cancelCompression(nextLockHolder);
}
// Avoid stack overflow on lock transfer.
// XXX: this is only an issue in the 1 thread config.
dataVIOAsCompletion(nextLockHolder)->requeue = true;
launchLockedRequest(nextLockHolder);
}