Blame source/vdo/base/dataVIO.c

Packit b55c50
/*
Packit b55c50
 * Copyright (c) 2020 Red Hat, Inc.
Packit b55c50
 *
Packit b55c50
 * This program is free software; you can redistribute it and/or
Packit b55c50
 * modify it under the terms of the GNU General Public License
Packit b55c50
 * as published by the Free Software Foundation; either version 2
Packit b55c50
 * of the License, or (at your option) any later version.
Packit b55c50
 * 
Packit b55c50
 * This program is distributed in the hope that it will be useful,
Packit b55c50
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b55c50
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b55c50
 * GNU General Public License for more details.
Packit b55c50
 * 
Packit b55c50
 * You should have received a copy of the GNU General Public License
Packit b55c50
 * along with this program; if not, write to the Free Software
Packit b55c50
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit b55c50
 * 02110-1301, USA. 
Packit b55c50
 *
Packit b55c50
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/dataVIO.c#7 $
Packit b55c50
 */
Packit b55c50
Packit b55c50
#include "dataVIO.h"
Packit b55c50
Packit b55c50
#include "logger.h"
Packit b55c50
Packit b55c50
#include "atomic.h"
Packit b55c50
#include "blockMap.h"
Packit b55c50
#include "compressionState.h"
Packit b55c50
#include "extent.h"
Packit b55c50
#include "logicalZone.h"
Packit b55c50
#include "threadConfig.h"
Packit b55c50
#include "vdoInternal.h"
Packit b55c50
#include "vioRead.h"
Packit b55c50
#include "vioWrite.h"
Packit b55c50
Packit b55c50
static const char *ASYNC_OPERATION_NAMES[] = {
Packit b55c50
  "launch",
Packit b55c50
  "acknowledgeWrite",
Packit b55c50
  "acquireHashLock",
Packit b55c50
  "acquireLogicalBlockLock",
Packit b55c50
  "acquirePBNReadLock",
Packit b55c50
  "checkForDedupeForRollover",
Packit b55c50
  "checkForDeduplication",
Packit b55c50
  "compressData",
Packit b55c50
  "continueVIOAsync",
Packit b55c50
  "findBlockMapSlot",
Packit b55c50
  "getMappedBlock",
Packit b55c50
  "getMappedBlockForDedupe",
Packit b55c50
  "getMappedBlockForWrite",
Packit b55c50
  "hashData",
Packit b55c50
  "journalDecrementForDedupe",
Packit b55c50
  "journalDecrementForWrite",
Packit b55c50
  "journalIncrementForCompression",
Packit b55c50
  "journalIncrementForDedupe",
Packit b55c50
  "journalIncrementForWrite",
Packit b55c50
  "journalMappingForCompression",
Packit b55c50
  "journalMappingForDedupe",
Packit b55c50
  "journalMappingForWrite",
Packit b55c50
  "journalUnmappingForDedupe",
Packit b55c50
  "journalUnmappingForWrite",
Packit b55c50
  "attemptPacking",
Packit b55c50
  "putMappedBlock",
Packit b55c50
  "putMappedBlockForDedupe",
Packit b55c50
  "readData",
Packit b55c50
  "updateIndex",
Packit b55c50
  "verifyDeduplication",
Packit b55c50
  "writeData",
Packit b55c50
};
Packit b55c50
Packit b55c50
/**
Packit b55c50
 * Initialize the LBN lock of a DataVIO. In addition to recording the LBN on
Packit b55c50
 * which the DataVIO will operate, it will also find the logical zone
Packit b55c50
 * associated with the LBN.
Packit b55c50
 *
Packit b55c50
 * @param dataVIO  The dataVIO to initialize
Packit b55c50
 * @param lbn      The lbn on which the dataVIO will operate
Packit b55c50
 **/
Packit b55c50
static void initializeLBNLock(DataVIO *dataVIO, LogicalBlockNumber lbn)
Packit b55c50
{
Packit b55c50
  LBNLock *lock = &dataVIO->logical;
Packit b55c50
  lock->lbn     = lbn;
Packit b55c50
  lock->locked  = false;
Packit b55c50
  initializeWaitQueue(&lock->waiters);
Packit b55c50
Packit b55c50
  VDO *vdo   = getVDOFromDataVIO(dataVIO);
Packit b55c50
  lock->zone = getLogicalZone(vdo->logicalZones, computeLogicalZone(dataVIO));
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void prepareDataVIO(DataVIO            *dataVIO,
Packit b55c50
                    LogicalBlockNumber  lbn,
Packit b55c50
                    VIOOperation        operation,
Packit b55c50
                    bool                isTrim,
Packit b55c50
                    VDOAction          *callback)
Packit b55c50
{
Packit b55c50
  // Clearing the tree lock must happen before initializing the LBN lock,
Packit b55c50
  // which also adds information to the tree lock.
Packit b55c50
  memset(&dataVIO->treeLock,  0, sizeof(dataVIO->treeLock));
Packit b55c50
  initializeLBNLock(dataVIO, lbn);
Packit b55c50
  initializeRing(&dataVIO->hashLockNode);
Packit b55c50
  initializeRing(&dataVIO->writeNode);
Packit b55c50
Packit b55c50
  resetAllocation(dataVIOAsAllocatingVIO(dataVIO));
Packit b55c50
Packit b55c50
  dataVIO->isDuplicate = false;
Packit b55c50
Packit b55c50
  memset(&dataVIO->chunkName, 0, sizeof(dataVIO->chunkName));
Packit b55c50
  memset(&dataVIO->duplicate, 0, sizeof(dataVIO->duplicate));
Packit b55c50
Packit b55c50
  VIO *vio       = dataVIOAsVIO(dataVIO);
Packit b55c50
  vio->operation = operation;
Packit b55c50
  vio->callback  = callback;
Packit b55c50
  dataVIO->pageCompletion.completion.enqueueable
Packit b55c50
    = vioAsCompletion(vio)->enqueueable;
Packit b55c50
Packit b55c50
  dataVIO->mapped.state = MAPPING_STATE_UNCOMPRESSED;
Packit b55c50
  dataVIO->newMapped.state
Packit b55c50
    = (isTrim ? MAPPING_STATE_UNMAPPED : MAPPING_STATE_UNCOMPRESSED);
Packit b55c50
  resetCompletion(vioAsCompletion(vio));
Packit b55c50
  setLogicalCallback(dataVIO, attemptLogicalBlockLock,
Packit b55c50
                     THIS_LOCATION("$F;cb=acquireLogicalBlockLock"));
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void completeDataVIO(VDOCompletion *completion)
Packit b55c50
{
Packit b55c50
  DataVIO *dataVIO = asDataVIO(completion);
Packit b55c50
  if (completion->result != VDO_SUCCESS) {
Packit b55c50
    VIO *vio = dataVIOAsVIO(dataVIO);
Packit b55c50
    updateVIOErrorStats(vio,
Packit b55c50
                        "Completing %s VIO for LBN %" PRIu64
Packit b55c50
                        " with error after %s",
Packit b55c50
                        getVIOReadWriteFlavor(vio), dataVIO->logical.lbn,
Packit b55c50
                        getOperationName(dataVIO));
Packit b55c50
  }
Packit b55c50
Packit b55c50
  dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F($io)"));
Packit b55c50
  if (isReadDataVIO(dataVIO)) {
Packit b55c50
    cleanupReadDataVIO(dataVIO);
Packit b55c50
  } else {
Packit b55c50
    cleanupWriteDataVIO(dataVIO);
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void finishDataVIO(DataVIO *dataVIO, int result)
Packit b55c50
{
Packit b55c50
  VDOCompletion *completion = dataVIOAsCompletion(dataVIO);
Packit b55c50
  setCompletionResult(completion, result);
Packit b55c50
  completeDataVIO(completion);
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
const char *getOperationName(DataVIO *dataVIO)
Packit b55c50
{
Packit b55c50
  STATIC_ASSERT((MAX_ASYNC_OPERATION_NUMBER - MIN_ASYNC_OPERATION_NUMBER)
Packit b55c50
                == COUNT_OF(ASYNC_OPERATION_NAMES));
Packit b55c50
Packit b55c50
  return ((dataVIO->lastAsyncOperation < MAX_ASYNC_OPERATION_NUMBER)
Packit b55c50
          ? ASYNC_OPERATION_NAMES[dataVIO->lastAsyncOperation]
Packit b55c50
          : "unknown async operation");
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void receiveDedupeAdvice(DataVIO *dataVIO, const DataLocation *advice)
Packit b55c50
{
Packit b55c50
  /*
Packit b55c50
   * NOTE: this is called on non-base-code threads. Be very careful to not do
Packit b55c50
   * anything here that needs a base code thread-local variable, such as
Packit b55c50
   * trying to get the current thread ID, or that does a lot of work.
Packit b55c50
   */
Packit b55c50
Packit b55c50
  VDO *vdo = getVDOFromDataVIO(dataVIO);
Packit b55c50
  ZonedPBN duplicate = validateDedupeAdvice(vdo, advice, dataVIO->logical.lbn);
Packit b55c50
  setDuplicateLocation(dataVIO, duplicate);
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void setDuplicateLocation(DataVIO *dataVIO, const ZonedPBN source)
Packit b55c50
{
Packit b55c50
  dataVIO->isDuplicate = (source.pbn != ZERO_BLOCK);
Packit b55c50
  dataVIO->duplicate   = source;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void clearMappedLocation(DataVIO *dataVIO)
Packit b55c50
{
Packit b55c50
  dataVIO->mapped = (ZonedPBN) { .state = MAPPING_STATE_UNMAPPED };
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
int setMappedLocation(DataVIO             *dataVIO,
Packit b55c50
                      PhysicalBlockNumber  pbn,
Packit b55c50
                      BlockMappingState    state)
Packit b55c50
{
Packit b55c50
  PhysicalZone *zone;
Packit b55c50
  int result = getPhysicalZone(getVDOFromDataVIO(dataVIO), pbn, &zone);
Packit b55c50
  if (result != VDO_SUCCESS) {
Packit b55c50
    return result;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  dataVIO->mapped = (ZonedPBN) {
Packit b55c50
    .pbn   = pbn,
Packit b55c50
    .state = state,
Packit b55c50
    .zone  = zone,
Packit b55c50
  };
Packit b55c50
  return VDO_SUCCESS;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**
Packit b55c50
 * Launch a request which has acquired an LBN lock.
Packit b55c50
 *
Packit b55c50
 * @param dataVIO  The DataVIO which has just acquired a lock
Packit b55c50
 **/
Packit b55c50
static void launchLockedRequest(DataVIO *dataVIO)
Packit b55c50
{
Packit b55c50
  dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
Packit b55c50
  dataVIO->logical.locked = true;
Packit b55c50
Packit b55c50
  if (isWriteDataVIO(dataVIO)) {
Packit b55c50
    launchWriteDataVIO(dataVIO);
Packit b55c50
  } else {
Packit b55c50
    launchReadDataVIO(dataVIO);
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void attemptLogicalBlockLock(VDOCompletion *completion)
Packit b55c50
{
Packit b55c50
  DataVIO *dataVIO = asDataVIO(completion);
Packit b55c50
  assertInLogicalZone(dataVIO);
Packit b55c50
Packit b55c50
  if (dataVIO->logical.lbn
Packit b55c50
      >= getVDOFromDataVIO(dataVIO)->config.logicalBlocks) {
Packit b55c50
    finishDataVIO(dataVIO, VDO_OUT_OF_RANGE);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  DataVIO *lockHolder;
Packit b55c50
  LBNLock *lock = &dataVIO->logical;
Packit b55c50
  int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, dataVIO, false,
Packit b55c50
                         (void **) &lockHolder);
Packit b55c50
  if (result != VDO_SUCCESS) {
Packit b55c50
    finishDataVIO(dataVIO, result);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  if (lockHolder == NULL) {
Packit b55c50
    // We got the lock
Packit b55c50
    launchLockedRequest(dataVIO);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  result = ASSERT(lockHolder->logical.locked, "logical block lock held");
Packit b55c50
  if (result != VDO_SUCCESS) {
Packit b55c50
    finishDataVIO(dataVIO, result);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  /*
Packit b55c50
   * If the new request is a pure read request (not read-modify-write) and
Packit b55c50
   * the lockHolder is writing and has received an allocation (VDO-2683),
Packit b55c50
   * service the read request immediately by copying data from the lockHolder
Packit b55c50
   * to avoid having to flush the write out of the packer just to prevent the
Packit b55c50
   * read from waiting indefinitely. If the lockHolder does not yet have an
Packit b55c50
   * allocation, prevent it from blocking in the packer and wait on it.
Packit b55c50
   */
Packit b55c50
  if (isReadDataVIO(dataVIO) && atomicLoadBool(&lockHolder->hasAllocation)) {
Packit b55c50
    dataVIOAsCompletion(dataVIO)->layer->copyData(lockHolder, dataVIO);
Packit b55c50
    finishDataVIO(dataVIO, VDO_SUCCESS);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  dataVIO->lastAsyncOperation = ACQUIRE_LOGICAL_BLOCK_LOCK;
Packit b55c50
  result = enqueueDataVIO(&lockHolder->logical.waiters, dataVIO,
Packit b55c50
                          THIS_LOCATION("$F;cb=logicalBlockLock"));
Packit b55c50
  if (result != VDO_SUCCESS) {
Packit b55c50
    finishDataVIO(dataVIO, result);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  // Prevent writes and read-modify-writes from blocking indefinitely on
Packit b55c50
  // lock holders in the packer.
Packit b55c50
  if (!isReadDataVIO(lockHolder) && cancelCompression(lockHolder)) {
Packit b55c50
    dataVIO->compression.lockHolder = lockHolder;
Packit b55c50
    launchPackerCallback(dataVIO, removeLockHolderFromPacker,
Packit b55c50
                         THIS_LOCATION("$F;cb=removeLockHolderFromPacker"));
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**
Packit b55c50
 * Release an uncontended LBN lock.
Packit b55c50
 *
Packit b55c50
 * @param dataVIO  The DataVIO holding the lock
Packit b55c50
 **/
Packit b55c50
static void releaseLock(DataVIO *dataVIO)
Packit b55c50
{
Packit b55c50
  LBNLock *lock    = &dataVIO->logical;
Packit b55c50
  IntMap  *lockMap = getLBNLockMap(lock->zone);
Packit b55c50
  if (!lock->locked) {
Packit b55c50
    // The lock is not locked, so it had better not be registered in the lock
Packit b55c50
    // map.
Packit b55c50
    DataVIO *lockHolder = intMapGet(lockMap, lock->lbn);
Packit b55c50
    ASSERT_LOG_ONLY((dataVIO != lockHolder),
Packit Service 4cccae
                    "no logical block lock held for block %llu",
Packit b55c50
                    lock->lbn);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  // Remove the lock from the logical block lock map, releasing the lock.
Packit b55c50
  DataVIO *lockHolder = intMapRemove(lockMap, lock->lbn);
Packit b55c50
  ASSERT_LOG_ONLY((dataVIO == lockHolder),
Packit Service 4cccae
                  "logical block lock mismatch for block %llu", lock->lbn);
Packit b55c50
  lock->locked = false;
Packit b55c50
  return;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void releaseLogicalBlockLock(DataVIO *dataVIO)
Packit b55c50
{
Packit b55c50
  assertInLogicalZone(dataVIO);
Packit b55c50
  if (!hasWaiters(&dataVIO->logical.waiters)) {
Packit b55c50
    releaseLock(dataVIO);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  LBNLock *lock = &dataVIO->logical;
Packit b55c50
  ASSERT_LOG_ONLY(lock->locked, "LBNLock with waiters is not locked");
Packit b55c50
Packit b55c50
  // Another DataVIO is waiting for the lock, so just transfer it in a single
Packit b55c50
  // lock map operation
Packit b55c50
  DataVIO *nextLockHolder = waiterAsDataVIO(dequeueNextWaiter(&lock->waiters));
Packit b55c50
Packit b55c50
  // Transfer the remaining lock waiters to the next lock holder.
Packit b55c50
  transferAllWaiters(&lock->waiters, &nextLockHolder->logical.waiters);
Packit b55c50
Packit b55c50
  DataVIO *lockHolder;
Packit b55c50
  int result = intMapPut(getLBNLockMap(lock->zone), lock->lbn, nextLockHolder,
Packit b55c50
                         true, (void **) &lockHolder);
Packit b55c50
  if (result != VDO_SUCCESS) {
Packit b55c50
    finishDataVIO(nextLockHolder, result);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  ASSERT_LOG_ONLY((lockHolder == dataVIO),
Packit Service 4cccae
                  "logical block lock mismatch for block %llu", lock->lbn);
Packit b55c50
  lock->locked = false;
Packit b55c50
Packit b55c50
  /*
Packit b55c50
   * If there are still waiters, other DataVIOs must be trying to get the lock
Packit b55c50
   * we just transferred. We must ensure that the new lock holder doesn't block
Packit b55c50
   * in the packer.
Packit b55c50
   */
Packit b55c50
  if (hasWaiters(&nextLockHolder->logical.waiters)) {
Packit b55c50
    cancelCompression(nextLockHolder);
Packit b55c50
  }
Packit b55c50
Packit b55c50
  // Avoid stack overflow on lock transfer.
Packit b55c50
  // XXX: this is only an issue in the 1 thread config.
Packit b55c50
  dataVIOAsCompletion(nextLockHolder)->requeue = true;
Packit b55c50
  launchLockedRequest(nextLockHolder);
Packit b55c50
}