/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/dataVIO.c#7 $ */ #include "dataVIO.h" #include "logger.h" #include "atomic.h" #include "blockMap.h" #include "compressionState.h" #include "extent.h" #include "logicalZone.h" #include "threadConfig.h" #include "vdoInternal.h" #include "vioRead.h" #include "vioWrite.h" static const char *ASYNC_OPERATION_NAMES[] = { "launch", "acknowledgeWrite", "acquireHashLock", "acquireLogicalBlockLock", "acquirePBNReadLock", "checkForDedupeForRollover", "checkForDeduplication", "compressData", "continueVIOAsync", "findBlockMapSlot", "getMappedBlock", "getMappedBlockForDedupe", "getMappedBlockForWrite", "hashData", "journalDecrementForDedupe", "journalDecrementForWrite", "journalIncrementForCompression", "journalIncrementForDedupe", "journalIncrementForWrite", "journalMappingForCompression", "journalMappingForDedupe", "journalMappingForWrite", "journalUnmappingForDedupe", "journalUnmappingForWrite", "attemptPacking", "putMappedBlock", "putMappedBlockForDedupe", "readData", "updateIndex", "verifyDeduplication", "writeData", }; /** * Initialize the LBN lock of a DataVIO. In addition to recording the LBN on * which the DataVIO will operate, it will also find the logical zone * associated with the LBN. * * @param dataVIO The dataVIO to initialize * @param lbn The lbn on which the dataVIO will operate **/ static void initializeLBNLock(DataVIO *dataVIO, LogicalBlockNumber lbn) { LBNLock *lock = &dataVIO->logical; lock->lbn = lbn; lock->locked = false; initializeWaitQueue(&lock->waiters); VDO *vdo = getVDOFromDataVIO(dataVIO); lock->zone = getLogicalZone(vdo->logicalZones, computeLogicalZone(dataVIO)); } /**********************************************************************/ void prepareDataVIO(DataVIO *dataVIO, LogicalBlockNumber lbn, VIOOperation operation, bool isTrim, VDOAction *callback) { // Clearing the tree lock must happen before initializing the LBN lock, // which also adds information to the tree lock. memset(&dataVIO->treeLock, 0, sizeof(dataVIO->treeLock)); initializeLBNLock(dataVIO, lbn); initializeRing(&dataVIO->hashLockNode); initializeRing(&dataVIO->writeNode); resetAllocation(dataVIOAsAllocatingVIO(dataVIO)); dataVIO->isDuplicate = false; memset(&dataVIO->chunkName, 0, sizeof(dataVIO->chunkName)); memset(&dataVIO->duplicate, 0, sizeof(dataVIO->duplicate)); VIO *vio = dataVIOAsVIO(dataVIO); vio->operation = operation; vio->callback = callback; dataVIO->pageCompletion.completion.enqueueable = vioAsCompletion(vio)->enqueueable; dataVIO->mapped.state = MAPPING_STATE_UNCOMPRESSED; dataVIO->newMapped.state = (isTrim ? MAPPING_STATE_UNMAPPED : MAPPING_STATE_UNCOMPRESSED); resetCompletion(vioAsCompletion(vio)); setLogicalCallback(dataVIO, attemptLogicalBlockLock, THIS_LOCATION("$F;cb=acquireLogicalBlockLock")); } /**********************************************************************/ void completeDataVIO(VDOCompletion *completion) { DataVIO *dataVIO = asDataVIO(completion); if (completion->result != VDO_SUCCESS) { VIO *vio = dataVIOAsVIO(dataVIO); updateVIOErrorStats(vio, "Completing %s VIO for LBN %" PRIu64 " with error after %s", getVIOReadWriteFlavor(vio), dataVIO->logical.lbn, getOperationName(dataVIO)); } dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F($io)")); if (isReadDataVIO(dataVIO)) { cleanupReadDataVIO(dataVIO); } else { cleanupWriteDataVIO(dataVIO); } } /**********************************************************************/ void finishDataVIO(DataVIO *dataVIO, int result) { VDOCompletion *completion = dataVIOAsCompletion(dataVIO); setCompletionResult(completion, result); completeDataVIO(completion); } /**********************************************************************/ const char *getOperationName(DataVIO *dataVIO) { STATIC_ASSERT((MAX_ASYNC_OPERATION_NUMBER - MIN_ASYNC_OPERATION_NUMBER) == COUNT_OF(ASYNC_OPERATION_NAMES)); return ((dataVIO->lastAsyncOperation < MAX_ASYNC_OPERATION_NUMBER) ? ASYNC_OPERATION_NAMES[dataVIO->lastAsyncOperation] : "unknown async operation"); } /**********************************************************************/ void receiveDedupeAdvice(DataVIO *dataVIO, const DataLocation *advice) { /* * NOTE: this is called on non-base-code threads. Be very careful to not do * anything here that needs a base code thread-local variable, such as * trying to get the current thread ID, or that does a lot of work. */ VDO *vdo = getVDOFromDataVIO(dataVIO); ZonedPBN duplicate = validateDedupeAdvice(vdo, advice, dataVIO->logical.lbn); setDuplicateLocation(dataVIO, duplicate); } /**********************************************************************/ void setDuplicateLocation(DataVIO *dataVIO, const ZonedPBN source) { dataVIO->isDuplicate = (source.pbn != ZERO_BLOCK); dataVIO->duplicate = source; } /**********************************************************************/ void clearMappedLocation(DataVIO *dataVIO) { dataVIO->mapped = (ZonedPBN) { .state = MAPPING_STATE_UNMAPPED }; } /**********************************************************************/ int setMappedLocation(DataVIO *dataVIO, PhysicalBlockNumber pbn, BlockMappingState state) { PhysicalZone *zone; int result = getPhysicalZone(getVDOFromDataVIO(dataVIO), pbn, &zone); if (result != VDO_SUCCESS) { return result; } dataVIO->mapped = (ZonedPBN) { .pbn = pbn, .state = state, .zone = zone, }; return VDO_SUCCESS; } /** * Launch a request which has acquired an LBN lock. * * @param dataVIO The DataVIO which has just acquired a lock **/ static void launchLockedRequest(DataVIO *dataVIO) { dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL)); dataVIO->logical.locked = true; if (isWriteDataVIO(dataVIO)) { launchWriteDataVIO(dataVIO); } else { launchReadDataVIO(dataVIO); } } /**********************************************************************/ void attemptLogicalBlockLock(VDOCompletion *completion) { DataVIO *dataVIO = asDataVIO(completion); assertInLogicalZone(dataVIO); if (dataVIO->logical.lbn >= getVDOFromDataVIO(dataVIO)->config.logicalBlocks) { finishDataVIO(dataVIO, VDO_OUT_OF_RANGE); return; } DataVIO *lockHolder; LBNLock *lock = &dataVIO->logical; int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, dataVIO, false, (void **) &lockHolder); if (result != VDO_SUCCESS) { finishDataVIO(dataVIO, result); return; } if (lockHolder == NULL) { // We got the lock launchLockedRequest(dataVIO); return; } result = ASSERT(lockHolder->logical.locked, "logical block lock held"); if (result != VDO_SUCCESS) { finishDataVIO(dataVIO, result); return; } /* * If the new request is a pure read request (not read-modify-write) and * the lockHolder is writing and has received an allocation (VDO-2683), * service the read request immediately by copying data from the lockHolder * to avoid having to flush the write out of the packer just to prevent the * read from waiting indefinitely. If the lockHolder does not yet have an * allocation, prevent it from blocking in the packer and wait on it. */ if (isReadDataVIO(dataVIO) && atomicLoadBool(&lockHolder->hasAllocation)) { dataVIOAsCompletion(dataVIO)->layer->copyData(lockHolder, dataVIO); finishDataVIO(dataVIO, VDO_SUCCESS); return; } dataVIO->lastAsyncOperation = ACQUIRE_LOGICAL_BLOCK_LOCK; result = enqueueDataVIO(&lockHolder->logical.waiters, dataVIO, THIS_LOCATION("$F;cb=logicalBlockLock")); if (result != VDO_SUCCESS) { finishDataVIO(dataVIO, result); return; } // Prevent writes and read-modify-writes from blocking indefinitely on // lock holders in the packer. if (!isReadDataVIO(lockHolder) && cancelCompression(lockHolder)) { dataVIO->compression.lockHolder = lockHolder; launchPackerCallback(dataVIO, removeLockHolderFromPacker, THIS_LOCATION("$F;cb=removeLockHolderFromPacker")); } } /** * Release an uncontended LBN lock. * * @param dataVIO The DataVIO holding the lock **/ static void releaseLock(DataVIO *dataVIO) { LBNLock *lock = &dataVIO->logical; IntMap *lockMap = getLBNLockMap(lock->zone); if (!lock->locked) { // The lock is not locked, so it had better not be registered in the lock // map. DataVIO *lockHolder = intMapGet(lockMap, lock->lbn); ASSERT_LOG_ONLY((dataVIO != lockHolder), "no logical block lock held for block %llu", lock->lbn); return; } // Remove the lock from the logical block lock map, releasing the lock. DataVIO *lockHolder = intMapRemove(lockMap, lock->lbn); ASSERT_LOG_ONLY((dataVIO == lockHolder), "logical block lock mismatch for block %llu", lock->lbn); lock->locked = false; return; } /**********************************************************************/ void releaseLogicalBlockLock(DataVIO *dataVIO) { assertInLogicalZone(dataVIO); if (!hasWaiters(&dataVIO->logical.waiters)) { releaseLock(dataVIO); return; } LBNLock *lock = &dataVIO->logical; ASSERT_LOG_ONLY(lock->locked, "LBNLock with waiters is not locked"); // Another DataVIO is waiting for the lock, so just transfer it in a single // lock map operation DataVIO *nextLockHolder = waiterAsDataVIO(dequeueNextWaiter(&lock->waiters)); // Transfer the remaining lock waiters to the next lock holder. transferAllWaiters(&lock->waiters, &nextLockHolder->logical.waiters); DataVIO *lockHolder; int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, nextLockHolder, true, (void **) &lockHolder); if (result != VDO_SUCCESS) { finishDataVIO(nextLockHolder, result); return; } ASSERT_LOG_ONLY((lockHolder == dataVIO), "logical block lock mismatch for block %llu", lock->lbn); lock->locked = false; /* * If there are still waiters, other DataVIOs must be trying to get the lock * we just transferred. We must ensure that the new lock holder doesn't block * in the packer. */ if (hasWaiters(&nextLockHolder->logical.waiters)) { cancelCompression(nextLockHolder); } // Avoid stack overflow on lock transfer. // XXX: this is only an issue in the 1 thread config. dataVIOAsCompletion(nextLockHolder)->requeue = true; launchLockedRequest(nextLockHolder); }