Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/kernelLayer.c#38 $
 */

#include "kernelLayer.h"

#include <linux/backing-dev.h>
#include <linux/blkdev.h>
#include <linux/crc32.h>
#include <linux/delay.h>
#include <linux/module.h>

#include "logger.h"
#include "memoryAlloc.h"
#include "murmur/MurmurHash3.h"

#include "lz4.h"
#include "releaseVersions.h"
#include "volumeGeometry.h"
#include "statistics.h"
#include "vdo.h"

#include "bio.h"
#include "dataKVIO.h"
#include "dedupeIndex.h"
#include "deviceConfig.h"
#include "deviceRegistry.h"
#include "instanceNumber.h"
#include "ioSubmitter.h"
#include "kvdoFlush.h"
#include "kvio.h"
#include "poolSysfs.h"
#include "statusProcfs.h"
#include "stringUtils.h"
#include "verify.h"

enum {
  DEDUPE_TIMEOUT_REPORT_INTERVAL = 1000,
};

static const KvdoWorkQueueType bioAckQType = {
  .actionTable = {
    { .name = "bio_ack",
      .code = BIO_ACK_Q_ACTION_ACK,
      .priority = 0 },
  },
};

static const KvdoWorkQueueType cpuQType = {
  .actionTable = {
    { .name = "cpu_complete_kvio",
      .code = CPU_Q_ACTION_COMPLETE_KVIO,
      .priority = 0 },
    { .name = "cpu_compress_block",
      .code = CPU_Q_ACTION_COMPRESS_BLOCK,
      .priority = 0 },
    { .name = "cpu_hash_block",
      .code = CPU_Q_ACTION_HASH_BLOCK,
      .priority = 0 },
    { .name = "cpu_event_reporter",
      .code = CPU_Q_ACTION_EVENT_REPORTER,
      .priority = 0 },
  },
};

// 2000 is half the number of entries currently in our page cache,
// to allow for each in-progress operation to update two pages.
int defaultMaxRequestsActive = 2000;

/**********************************************************************/
static CRC32Checksum kvdoUpdateCRC32(CRC32Checksum  crc,
                                     const byte    *buffer,
                                     size_t         length)
{
  /*
   * The kernel's CRC 32 implementation does not do pre- and post-
   * conditioning, so do it ourselves.
   */
  return crc32(crc ^ 0xffffffff, buffer, length) ^ 0xffffffff;
}

/**********************************************************************/
static BlockCount kvdoGetBlockCount(PhysicalLayer *header)
{
  return asKernelLayer(header)->deviceConfig->physicalBlocks;
}

/**********************************************************************/
bool layerIsNamed(KernelLayer *layer, void *context)
{
  struct dm_target *ti = layer->deviceConfig->owningTarget;
  const char *deviceName = dm_device_name(dm_table_get_md(ti->table));
  return (strcmp(deviceName, (const char *) context) == 0);
}

/**
 * Implements LayerFilter.
 **/
static bool layerUsesDevice(KernelLayer *layer, void *context)
{
  DeviceConfig *config = context;
  return (layer->deviceConfig->ownedDevice->bdev->bd_dev
          == config->ownedDevice->bdev->bd_dev);
}

int mapToSystemError(int error)
{
  // 0 is success, negative a system error code
  if (likely(error <= 0)) {
    return error;
  }
  if (error < 1024) {
    // errno macro used without negating - may be a minor bug
    return -error;
  }
  // VDO or UDS error
  char errorName[80], errorMessage[ERRBUF_SIZE];
  switch (sansUnrecoverable(error)) {
  case VDO_NO_SPACE:
    return -ENOSPC;
  case VDO_READ_ONLY:
    return -EIO;
  default:
    logInfo("%s: mapping internal status code %d (%s: %s) to EIO",
            __func__, error,
            stringErrorName(error, errorName, sizeof(errorName)),
            stringError(error, errorMessage, sizeof(errorMessage)));
    return -EIO;
  }
}

/**********************************************************************/
static void setKernelLayerState(KernelLayer *layer, KernelLayerState newState)
{
  atomicStore32(&layer->state, newState);
}

/**********************************************************************/
void waitForNoRequestsActive(KernelLayer *layer)
{
  // Do nothing if there are no requests active.  This check is not necessary
  // for correctness but does reduce log message traffic.
  if (limiterIsIdle(&layer->requestLimiter)) {
    return;
  }

  // We have to make sure to flush the packer before waiting. We do this
  // by turning off compression, which also means no new entries coming in
  // while waiting will end up in the packer.
  bool wasCompressing = setKVDOCompressing(&layer->kvdo, false);
  // Now wait for there to be no active requests
  limiterWaitForIdle(&layer->requestLimiter);
  // Reset the compression state after all requests are done
  if (wasCompressing) {
    setKVDOCompressing(&layer->kvdo, true);
  }
}

/**
 * Start processing a new data KVIO based on the supplied bio, but from within
 * a VDO thread context, when we're not allowed to block. Using this path at
 * all suggests a bug or erroneous usage, but we special-case it to avoid a
 * deadlock that can apparently result. Message will be logged to alert the
 * administrator that something has gone wrong, while we attempt to continue
 * processing other requests.
 *
 * If a request permit can be acquired immediately, kvdoLaunchDataKVIOFromBio
 * will be called. (If the bio is a discard operation, a permit from the
 * discard limiter will be requested but the call will be made with or without
 * it.) If the request permit is not available, the bio will be saved on a list
 * to be launched later. Either way, this function will not block, and will
 * take responsibility for processing the bio.
 *
 * @param layer        The kernel layer
 * @param bio          The bio to launch
 * @param arrivalTime  The arrival time of the bio
 *
 * @return  DM_MAPIO_SUBMITTED or a system error code
 **/
static int launchDataKVIOFromVDOThread(KernelLayer *layer,
                                       BIO         *bio,
                                       Jiffies      arrivalTime)
{
  logWarning("kvdoMapBio called from within a VDO thread!");
  /*
   * We're not yet entirely sure what circumstances are causing this situation
   * in [ESC-638], but it does appear to be happening and causing VDO to
   * deadlock.
   *
   * Somehow kvdoMapBio is being called from generic_make_request which is
   * being called from the VDO code to pass a flush on down to the underlying
   * storage system; we've got 2000 requests in progress, so we have to wait
   * for one to complete, but none can complete while the bio thread is blocked
   * from passing more I/O requests down. Near as we can tell, the flush bio
   * should always have gotten updated to point to the storage system, so we
   * shouldn't be calling back into VDO unless something's gotten messed up
   * somewhere.
   *
   * To side-step this case, if the limiter says we're busy *and* we're running
   * on one of VDO's own threads, we'll drop the I/O request in a special queue
   * for processing as soon as KVIOs become free.
   *
   * We don't want to do this in general because it leads to unbounded
   * buffering, arbitrarily high latencies, inability to push back in a way the
   * caller can take advantage of, etc. If someone wants huge amounts of
   * buffering on top of VDO, they're welcome to access it through the kernel
   * page cache or roll their own.
   */
  if (!limiterPoll(&layer->requestLimiter)) {
    addToDeadlockQueue(&layer->deadlockQueue, bio, arrivalTime);
    logWarning("queued an I/O request to avoid deadlock!");

    return DM_MAPIO_SUBMITTED;
  }

  bool hasDiscardPermit
    = (isDiscardBio(bio) && limiterPoll(&layer->discardLimiter));
  int result = kvdoLaunchDataKVIOFromBio(layer, bio, arrivalTime,
                                         hasDiscardPermit);
  // Succeed or fail, kvdoLaunchDataKVIOFromBio owns the permit(s) now.
  if (result != VDO_SUCCESS) {
    return result;
  }

  return DM_MAPIO_SUBMITTED;
}

/**********************************************************************/
int kvdoMapBio(KernelLayer *layer, BIO *bio)
{
  Jiffies          arrivalTime = jiffies;
  KernelLayerState state       = getKernelLayerState(layer);
  ASSERT_LOG_ONLY(state == LAYER_RUNNING,
                  "kvdoMapBio should not be called while in state %d", state);

  // Count all incoming bios.
  countBios(&layer->biosIn, bio);

  // Handle empty bios.  Empty flush bios are not associated with a VIO.
  if (isFlushBio(bio)) {
    if (ASSERT(getBioSize(bio) == 0, "Flush bio is size 0") != VDO_SUCCESS) {
      // We expect flushes to be of size 0.
      return -EINVAL;
    }
    if (shouldProcessFlush(layer)) {
      launchKVDOFlush(layer, bio);
      return DM_MAPIO_SUBMITTED;
    } else {
      // We're not acknowledging this bio now, but we'll never touch it
      // again, so this is the last chance to account for it.
      countBios(&layer->biosAcknowledged, bio);
      atomic64_inc(&layer->flushOut);
      setBioBlockDevice(bio, getKernelLayerBdev(layer));
      return DM_MAPIO_REMAPPED;
    }
  }

  if (ASSERT(getBioSize(bio) != 0, "Data bio is not size 0") != VDO_SUCCESS) {
    // We expect non-flushes to be non-zero in size.
    return -EINVAL;
  }

  if (isDiscardBio(bio) && isReadBio(bio)) {
    // Read and Discard should never occur together
    return -EIO;
  }

  KvdoWorkQueue *currentWorkQueue = getCurrentWorkQueue();
  if ((currentWorkQueue != NULL)
      && (layer == getWorkQueueOwner(currentWorkQueue))) {
    /*
     * This prohibits sleeping during I/O submission to VDO from its own
     * thread.
     */
    return launchDataKVIOFromVDOThread(layer, bio, arrivalTime);
  }
  bool hasDiscardPermit = false;
  if (isDiscardBio(bio)) {
    limiterWaitForOneFree(&layer->discardLimiter);
    hasDiscardPermit = true;
  }
  limiterWaitForOneFree(&layer->requestLimiter);

  int result = kvdoLaunchDataKVIOFromBio(layer, bio, arrivalTime,
                                         hasDiscardPermit);
  // Succeed or fail, kvdoLaunchDataKVIOFromBio owns the permit(s) now.
  if (result != VDO_SUCCESS) {
    return result;
  }

  return DM_MAPIO_SUBMITTED;
}

/**********************************************************************/
struct block_device *getKernelLayerBdev(const KernelLayer *layer)
{
  return layer->deviceConfig->ownedDevice->bdev;
}

/**********************************************************************/
void completeManyRequests(KernelLayer *layer, uint32_t count)
{
  // If we had to buffer some requests to avoid deadlock, release them now.
  while (count > 0) {
    Jiffies arrivalTime = 0;
    BIO *bio = pollDeadlockQueue(&layer->deadlockQueue, &arrivalTime);
    if (likely(bio == NULL)) {
      break;
    }

    bool hasDiscardPermit
      = (isDiscardBio(bio) && limiterPoll(&layer->discardLimiter));
    int result = kvdoLaunchDataKVIOFromBio(layer, bio, arrivalTime,
                                           hasDiscardPermit);
    if (result != VDO_SUCCESS) {
      completeBio(bio, result);
    }
    // Succeed or fail, kvdoLaunchDataKVIOFromBio owns the permit(s) now.
    count--;
  }
  // Notify the limiter, so it can wake any blocked processes.
  if (count > 0) {
    limiterReleaseMany(&layer->requestLimiter, count);
  }
}

/**********************************************************************/
static void reportEvents(PeriodicEventReporter *reporter)
{
  atomic_set(&reporter->workItemQueued, 0);
  uint64_t newValue = atomic64_read(&reporter->value);
  uint64_t difference = newValue - reporter->lastReportedValue;
  if (difference != 0) {
    logDebug(reporter->format, difference);
    reporter->lastReportedValue = newValue;
  }
}

/**********************************************************************/
static void reportEventsWork(KvdoWorkItem *item)
{
  PeriodicEventReporter *reporter = container_of(item, PeriodicEventReporter,
                                                 workItem);
  reportEvents(reporter);
}

/**********************************************************************/
static void initPeriodicEventReporter(PeriodicEventReporter *reporter,
                                      const char            *format,
                                      unsigned long          reportingInterval,
                                      KernelLayer           *layer)
{
  setupWorkItem(&reporter->workItem, reportEventsWork, NULL,
                CPU_Q_ACTION_EVENT_REPORTER);
  reporter->format            = format;
  reporter->reportingInterval = msecs_to_jiffies(reportingInterval);
  reporter->layer             = layer;
}

/**********************************************************************/
static void addEventCount(PeriodicEventReporter *reporter, unsigned int count)
{
  if (count > 0) {
    atomic64_add(count, &reporter->value);
    int oldWorkItemQueued = atomic_xchg(&reporter->workItemQueued, 1);
    if (oldWorkItemQueued == 0) {
      enqueueWorkQueueDelayed(reporter->layer->cpuQueue,
                              &reporter->workItem,
                              jiffies + reporter->reportingInterval);
    }
  }
}

/**********************************************************************/
static void stopPeriodicEventReporter(PeriodicEventReporter *reporter)
{
  reportEvents(reporter);
}

/**********************************************************************/
void kvdoReportDedupeTimeout(KernelLayer *layer, unsigned int expiredCount)
{
  addEventCount(&layer->albireoTimeoutReporter, expiredCount);
}

/**********************************************************************/
static int kvdoCreateEnqueueable(VDOCompletion *completion)
{
  KvdoEnqueueable *kvdoEnqueueable;
  int result = ALLOCATE(1, KvdoEnqueueable, "kvdoEnqueueable",
                        &kvdoEnqueueable);
  if (result != VDO_SUCCESS) {
    logError("kvdoEnqueueable allocation failure %d", result);
    return result;
  }
  kvdoEnqueueable->enqueueable.completion = completion;
  completion->enqueueable                 = &kvdoEnqueueable->enqueueable;
  return VDO_SUCCESS;
}

/**********************************************************************/
static void kvdoDestroyEnqueueable(Enqueueable **enqueueablePtr)
{
  Enqueueable *enqueueable = *enqueueablePtr;
  if (enqueueable != NULL) {
    KvdoEnqueueable *kvdoEnqueueable
      = container_of(enqueueable, KvdoEnqueueable, enqueueable);
    FREE(kvdoEnqueueable);
    *enqueueablePtr = NULL;
  }
}

/**
 * Implements BufferAllocator.
 **/
static int kvdoAllocateIOBuffer(PhysicalLayer  *layer __attribute__((unused)),
                                size_t          bytes,
                                const char     *why,
                                char          **bufferPtr)
{
  return ALLOCATE(bytes, char, why, bufferPtr);
}

/**
 * Implements ExtentReader. Exists only for the geometry block; is unset after
 * it is read.
 **/
static int kvdoSynchronousRead(PhysicalLayer       *layer,
                               PhysicalBlockNumber  startBlock,
                               size_t               blockCount,
                               char                *buffer,
                               size_t              *blocksRead)
{
  if (blockCount != 1) {
    return VDO_NOT_IMPLEMENTED;
  }

  KernelLayer *kernelLayer = asKernelLayer(layer);

  BIO *bio;
  int result = createBio(kernelLayer, buffer, &bio);
  if (result != VDO_SUCCESS) {
    return result;
  }
  setBioBlockDevice(bio, getKernelLayerBdev(kernelLayer));
  setBioSector(bio, blockToSector(kernelLayer, startBlock));
  setBioOperationRead(bio);
  result = submitBioAndWait(bio);
  if (result != 0) {
    logErrorWithStringError(result, "synchronous read failed");
    result = -EIO;
  }
  freeBio(bio, kernelLayer);

  if (result != VDO_SUCCESS) {
    return result;
  }
  if (blocksRead != NULL) {
    *blocksRead = blockCount;
  }
  return VDO_SUCCESS;
}

/**
 * Implements VIODestructor.
 **/
static void kvdoFreeVIO(VIO **vioPtr)
{
  VIO *vio = *vioPtr;
  if (vio == NULL) {
    return;
  }

  BUG_ON(isDataVIO(vio));

  if (isCompressedWriteVIO(vio)) {
    CompressedWriteKVIO *compressedWriteKVIO
      = allocatingVIOAsCompressedWriteKVIO(vioAsAllocatingVIO(vio));
    freeCompressedWriteKVIO(&compressedWriteKVIO);
  } else {
    MetadataKVIO *metadataKVIO = vioAsMetadataKVIO(vio);
    freeMetadataKVIO(&metadataKVIO);
  }

  *vioPtr = NULL;
}

/**********************************************************************/
static WritePolicy kvdoGetWritePolicy(PhysicalLayer *common)
{
  KernelLayer *layer = asKernelLayer(common);
  return getKVDOWritePolicy(&layer->kvdo);
}

/**
 * Function that is called when a synchronous operation is completed. We let
 * the waiting thread know it can continue.
 *
 * <p>Implements OperationComplete.
 *
 * @param common  The kernel layer
 **/
static void kvdoCompleteSyncOperation(PhysicalLayer *common)
{
  KernelLayer *layer = asKernelLayer(common);
  complete(&layer->callbackSync);
}

/**
 * Wait for a synchronous operation to complete.
 *
 * <p>Implements OperationWaiter.
 *
 * @param common  The kernel layer
 **/
static void waitForSyncOperation(PhysicalLayer *common)
{
  KernelLayer *layer = asKernelLayer(common);
  // Using the "interruptible" interface means that Linux will not log a
  // message when we wait for more than 120 seconds.
  while (wait_for_completion_interruptible(&layer->callbackSync) != 0) {
    // However, if we get a signal in a user-mode process, we could
    // spin...
    msleep(1);
  }
}

/**
 * Make the bio set for allocating new bios.
 *
 * @param layer  The kernel layer
 *
 * @returns VDO_SUCCESS if bio set created, error code otherwise
 **/
static int makeDedupeBioSet(KernelLayer *layer)
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,18,0)
  int result = ALLOCATE(1, struct bio_set, "bio set", &layer->bioset);
  if (result != VDO_SUCCESS) {
    return result;
  }

  result = bioset_init(layer->bioset, 0, 0, BIOSET_NEED_BVECS);
  if (result != 0) {
    return result;
  }
#else
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,13,0)
  layer->bioset = bioset_create(0, 0, BIOSET_NEED_BVECS);
#else
  layer->bioset = bioset_create(0, 0);
#endif
  if (layer->bioset == NULL) {
    return -ENOMEM;
  }
#endif

  return VDO_SUCCESS;
}

/**********************************************************************/
int makeKernelLayer(uint64_t        startingSector,
                    unsigned int    instance,
                    DeviceConfig   *config,
                    struct kobject *parentKobject,
                    ThreadConfig  **threadConfigPointer,
                    char          **reason,
                    KernelLayer   **layerPtr)
{
  // VDO-3769 - Set a generic reason so we don't ever return garbage.
  *reason = "Unspecified error";

  KernelLayer *oldLayer = findLayerMatching(layerUsesDevice, config);
  if (oldLayer != NULL) {
    logError("Existing layer named %s already uses device %s",
             oldLayer->deviceConfig->poolName,
	     oldLayer->deviceConfig->parentDeviceName);
    *reason = "Cannot share storage device with already-running VDO";
    return VDO_BAD_CONFIGURATION;
  }

  /*
   * Part 1 - Allocate the kernel layer, its essential parts, and setup up the
   * sysfs node.  These must come first so that the sysfs node works correctly
   * through the freeing of the kernel layer.  After this part you must use
   * freeKernelLayer.
   */
  KernelLayer *layer;
  int result = ALLOCATE(1, KernelLayer, "VDO configuration", &layer);
  if (result != UDS_SUCCESS) {
    *reason = "Cannot allocate VDO configuration";
    return result;
  }

  // Allow the base VDO to allocate buffers and construct or destroy
  // enqueuables as part of its allocation.
  layer->common.allocateIOBuffer   = kvdoAllocateIOBuffer;
  layer->common.createEnqueueable  = kvdoCreateEnqueueable;
  layer->common.destroyEnqueueable = kvdoDestroyEnqueueable;

  result = allocateVDO(&layer->common, &layer->kvdo.vdo);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot allocate VDO";
    FREE(layer);
    return result;
  }

  // After this point, calling kobject_put on kobj will decrement its
  // reference count, and when the count goes to 0 the KernelLayer will
  // be freed.
  kobject_init(&layer->kobj, &kernelLayerKobjType);
  result = kobject_add(&layer->kobj, parentKobject, config->poolName);
  if (result != 0) {
    *reason = "Cannot add sysfs node";
    kobject_put(&layer->kobj);
    return result;
  }
  kobject_init(&layer->wqDirectory, &workQueueDirectoryKobjType);
  result = kobject_add(&layer->wqDirectory, &layer->kobj, "work_queues");
  if (result != 0) {
    *reason = "Cannot add sysfs node";
    kobject_put(&layer->wqDirectory);
    kobject_put(&layer->kobj);
    return result;
  }

  /*
   * Part 2 - Do all the simple initialization.  These initializations have no
   * order dependencies and can be done in any order, but freeKernelLayer()
   * cannot be called until all the simple layer properties are set.
   *
   * The KernelLayer structure starts as all zeros.  Pointer initializations
   * consist of replacing a NULL pointer with a non-NULL pointer, which can be
   * easily undone by freeing all of the non-NULL pointers (using the proper
   * free routine).
   */
  setKernelLayerState(layer, LAYER_SIMPLE_THINGS_INITIALIZED);

  initializeDeadlockQueue(&layer->deadlockQueue);

  int requestLimit = defaultMaxRequestsActive;
  initializeLimiter(&layer->requestLimiter, requestLimit);
  initializeLimiter(&layer->discardLimiter, requestLimit * 3 / 4);

  layer->allocationsAllowed   = true;
  layer->instance             = instance;
  layer->deviceConfig         = config;
  layer->startingSectorOffset = startingSector;
  initializeRing(&layer->deviceConfigRing);

  layer->common.updateCRC32              = kvdoUpdateCRC32;
  layer->common.getBlockCount            = kvdoGetBlockCount;
  layer->common.getWritePolicy           = kvdoGetWritePolicy;
  layer->common.createMetadataVIO        = kvdoCreateMetadataVIO;
  layer->common.createCompressedWriteVIO = kvdoCreateCompressedWriteVIO;
  layer->common.freeVIO                  = kvdoFreeVIO;
  layer->common.completeFlush            = kvdoCompleteFlush;
  layer->common.enqueue                  = kvdoEnqueue;
  layer->common.waitForAdminOperation    = waitForSyncOperation;
  layer->common.completeAdminOperation   = kvdoCompleteSyncOperation;
  layer->common.getCurrentThreadID       = kvdoGetCurrentThreadID;
  layer->common.zeroDataVIO              = kvdoZeroDataVIO;
  layer->common.compareDataVIOs          = kvdoCompareDataVIOs;
  layer->common.copyData                 = kvdoCopyDataVIO;
  layer->common.readData                 = kvdoReadDataVIO;
  layer->common.writeData                = kvdoWriteDataVIO;
  layer->common.writeCompressedBlock     = kvdoWriteCompressedBlock;
  layer->common.readMetadata             = kvdoSubmitMetadataVIO;
  layer->common.writeMetadata            = kvdoSubmitMetadataVIO;
  layer->common.applyPartialWrite        = kvdoModifyWriteDataVIO;
  layer->common.flush                    = kvdoFlushVIO;
  layer->common.hashData                 = kvdoHashDataVIO;
  layer->common.checkForDuplication      = kvdoCheckForDuplication;
  layer->common.verifyDuplication        = kvdoVerifyDuplication;
  layer->common.acknowledgeDataVIO       = kvdoAcknowledgeDataVIO;
  layer->common.compressDataVIO          = kvdoCompressDataVIO;
  layer->common.updateAlbireo            = kvdoUpdateDedupeAdvice;

  spin_lock_init(&layer->flushLock);
  mutex_init(&layer->statsMutex);
  bio_list_init(&layer->waitingFlushes);

  result = addLayerToDeviceRegistry(layer);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot add layer to device registry";
    freeKernelLayer(layer);
    return result;
  }

  snprintf(layer->threadNamePrefix, sizeof(layer->threadNamePrefix), "%s%u",
           THIS_MODULE->name, instance);

  result = makeThreadConfig(config->threadCounts.logicalZones,
                            config->threadCounts.physicalZones,
                            config->threadCounts.hashZones,
                            threadConfigPointer);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot create thread configuration";
    freeKernelLayer(layer);
    return result;
  }

  logInfo("zones: %d logical, %d physical, %d hash; base threads: %d",
          config->threadCounts.logicalZones,
          config->threadCounts.physicalZones,
          config->threadCounts.hashZones,
          (*threadConfigPointer)->baseThreadCount);

  result = makeBatchProcessor(layer, returnDataKVIOBatchToPool, layer,
                              &layer->dataKVIOReleaser);
  if (result != UDS_SUCCESS) {
    *reason = "Cannot allocate KVIO-freeing batch processor";
    freeKernelLayer(layer);
    return result;
  }

  // Spare KVDOFlush, so that we will always have at least one available
  result = makeKVDOFlush(&layer->spareKVDOFlush);
  if (result != UDS_SUCCESS) {
    *reason = "Cannot allocate KVDOFlush record";
    freeKernelLayer(layer);
    return result;
  }

  // BIO pool (needed before the geometry block)
  result = makeDedupeBioSet(layer);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot allocate dedupe bioset";
    freeKernelLayer(layer);
    return result;
  }

  // Read the geometry block so we know how to set up the index. Allow it to
  // do synchronous reads.
  layer->common.reader = kvdoSynchronousRead;
  result = loadVolumeGeometry(&layer->common, &layer->geometry);
  layer->common.reader = NULL;
  if (result != VDO_SUCCESS) {
    *reason = "Could not load geometry block";
    freeKernelLayer(layer);
    return result;
  }

  // Albireo Timeout Reporter
  initPeriodicEventReporter(&layer->albireoTimeoutReporter,
                            "Albireo timeout on %llu requests",
                            DEDUPE_TIMEOUT_REPORT_INTERVAL, layer);

  // Dedupe Index
  BUG_ON(layer->threadNamePrefix[0] == '\0');
  result = makeDedupeIndex(&layer->dedupeIndex, layer);
  if (result != UDS_SUCCESS) {
    *reason = "Cannot initialize dedupe index";
    freeKernelLayer(layer);
    return result;
  }

  // Compression context storage
  result = ALLOCATE(config->threadCounts.cpuThreads, char *, "LZ4 context",
                    &layer->compressionContext);
  if (result != VDO_SUCCESS) {
    *reason = "cannot allocate LZ4 context";
    freeKernelLayer(layer);
    return result;
  }
  for (int i = 0; i < config->threadCounts.cpuThreads; i++) {
    result = ALLOCATE(LZ4_context_size(), char, "LZ4 context",
                      &layer->compressionContext[i]);
    if (result != VDO_SUCCESS) {
      *reason = "cannot allocate LZ4 context";
      freeKernelLayer(layer);
      return result;
    }
  }


  /*
   * Part 3 - Do initializations that depend upon other previous
   * initializations, but have no order dependencies at freeing time.
   * Order dependencies for initialization are identified using BUG_ON.
   */
  setKernelLayerState(layer, LAYER_BUFFER_POOLS_INITIALIZED);

  // Trace pool
  BUG_ON(layer->requestLimiter.limit <= 0);
  result = traceKernelLayerInit(layer);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot initialize trace data";
    freeKernelLayer(layer);
    return result;
  }

  // KVIO and VIO pool
  BUG_ON(layer->deviceConfig->logicalBlockSize <= 0);
  BUG_ON(layer->requestLimiter.limit <= 0);
  BUG_ON(layer->bioset == NULL);
  BUG_ON(layer->deviceConfig->ownedDevice == NULL);
  result = makeDataKVIOBufferPool(layer, layer->requestLimiter.limit,
                                  &layer->dataKVIOPool);
  if (result != VDO_SUCCESS) {
    *reason = "Cannot allocate vio data";
    freeKernelLayer(layer);
    return result;
  }

  /*
   * Part 4 - Do initializations that depend upon other previous
   * initialization, that may have order dependencies at freeing time.
   * These are mostly starting up the workqueue threads.
   */

  // Base-code thread, etc
  result = initializeKVDO(&layer->kvdo, *threadConfigPointer, reason);
  if (result != VDO_SUCCESS) {
    freeKernelLayer(layer);
    return result;
  }

  setKernelLayerState(layer, LAYER_REQUEST_QUEUE_INITIALIZED);

  // Bio queue
  result = makeIOSubmitter(layer->threadNamePrefix,
                           config->threadCounts.bioThreads,
                           config->threadCounts.bioRotationInterval,
                           layer->requestLimiter.limit,
                           layer,
                           &layer->ioSubmitter);
  if (result != VDO_SUCCESS) {
    // If initialization of the bio-queues failed, they are cleaned
    // up already, so just free the rest of the kernel layer.
    freeKernelLayer(layer);
    *reason = "bio submission initialization failed";
    return result;
  }
  setKernelLayerState(layer, LAYER_BIO_DATA_INITIALIZED);

  // Bio ack queue
  if (useBioAckQueue(layer)) {
    result = makeWorkQueue(layer->threadNamePrefix, "ackQ",
                           &layer->wqDirectory, layer, layer, &bioAckQType,
                           config->threadCounts.bioAckThreads,
                           &layer->bioAckQueue);
    if (result != VDO_SUCCESS) {
      *reason = "bio ack queue initialization failed";
      freeKernelLayer(layer);
      return result;
    }
  }

  setKernelLayerState(layer, LAYER_BIO_ACK_QUEUE_INITIALIZED);

  // CPU Queues
  result = makeWorkQueue(layer->threadNamePrefix, "cpuQ", &layer->wqDirectory,
                         layer, NULL, &cpuQType,
                         config->threadCounts.cpuThreads, &layer->cpuQueue);
  if (result != VDO_SUCCESS) {
    *reason = "Albireo CPU queue initialization failed";
    freeKernelLayer(layer);
    return result;
  }

  setKernelLayerState(layer, LAYER_CPU_QUEUE_INITIALIZED);

  *layerPtr = layer;
  return VDO_SUCCESS;
}

/**********************************************************************/
int prepareToModifyKernelLayer(KernelLayer       *layer,
                               DeviceConfig      *config,
                               char             **errorPtr)
{
  DeviceConfig *extantConfig = layer->deviceConfig;
  if (config->owningTarget->begin != extantConfig->owningTarget->begin) {
    *errorPtr = "Starting sector cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (strcmp(config->parentDeviceName, extantConfig->parentDeviceName) != 0) {
    *errorPtr = "Underlying device cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (config->logicalBlockSize != extantConfig->logicalBlockSize) {
    *errorPtr = "Logical block size cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (config->cacheSize != extantConfig->cacheSize) {
    *errorPtr = "Block map cache size cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (config->blockMapMaximumAge != extantConfig->blockMapMaximumAge) {
    *errorPtr = "Block map maximum age cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (config->mdRaid5ModeEnabled != extantConfig->mdRaid5ModeEnabled) {
    *errorPtr = "mdRaid5Mode cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  if (memcmp(&config->threadCounts, &extantConfig->threadCounts,
	     sizeof(ThreadCountConfig)) != 0) {
    *errorPtr = "Thread configuration cannot change";
    return VDO_PARAMETER_MISMATCH;
  }

  // Below here are the actions to take when a non-immutable property changes.

  if (config->writePolicy != extantConfig->writePolicy) {
    // Nothing needs doing right now for a write policy change.
  }

  if (config->owningTarget->len != extantConfig->owningTarget->len) {
    size_t logicalBytes = to_bytes(config->owningTarget->len);
    if ((logicalBytes % VDO_BLOCK_SIZE) != 0) {
      *errorPtr = "Logical size must be a multiple of 4096";
      return VDO_PARAMETER_MISMATCH;
    }

    int result = prepareToResizeLogical(layer, logicalBytes / VDO_BLOCK_SIZE);
    if (result != VDO_SUCCESS) {
      *errorPtr = "Device prepareToGrowLogical failed";
      return result;
    }
  }

  if (config->physicalBlocks != extantConfig->physicalBlocks) {
    int result = prepareToResizePhysical(layer, config->physicalBlocks);
    if (result != VDO_SUCCESS) {
      if (result == VDO_TOO_MANY_SLABS) {
        *errorPtr = "Device prepareToGrowPhysical failed (specified physical"
                    " size too big based on formatted slab size)";
      } else {
        *errorPtr = "Device prepareToGrowPhysical failed";
      }
      return result;
    }
  }

  return VDO_SUCCESS;
}

/**********************************************************************
 * Modify the pool name of the device.
 *
 * @param layer      The kernel layer
 * @param oldName    The old pool name
 * @param newName    The new pool name
 *
 * @return  VDO_SUCCESS or an error
 *
 */
int modifyPoolName(KernelLayer *layer, char *oldName, char *newName)
{
  // We use pool name for sysfs and procfs. Rename them accordingly
  logInfo("Modify pool name from %s to %s", oldName, newName);

  void *procfsPrivate;
  int result = vdoCreateProcfsEntry(layer, newName, &procfsPrivate);
  if (result != VDO_SUCCESS) {
    return result;
  }

  result = kobject_rename(&layer->kobj, newName);
  if (result != 0) {
    vdoDestroyProcfsEntry(newName, procfsPrivate);
    return result;
  }

  void *tmpProcfs = layer->procfsPrivate;
  layer->procfsPrivate = procfsPrivate;

  vdoDestroyProcfsEntry(oldName, tmpProcfs);

  return VDO_SUCCESS;
}

/**********************************************************************/
int modifyKernelLayer(KernelLayer  *layer,
                      DeviceConfig *config)
{
  KernelLayerState state = getKernelLayerState(layer);
  if (state == LAYER_RUNNING) {
    return VDO_SUCCESS;
  } else if (state != LAYER_SUSPENDED) {
    logError("pre-resume invoked while in unexpected kernel layer state %d",
             state);
    return -EINVAL;
  }

  setKernelLayerState(layer, LAYER_RESUMING);

  DeviceConfig *extantConfig = layer->deviceConfig;

  // A failure here is unrecoverable. So there is no problem if it happens.

  if (config->writePolicy != extantConfig->writePolicy) {
    /*
     * Ordinarily, when going from async to sync, we must flush any metadata
     * written. However, because the underlying storage must have gone into
     * sync mode before we suspend VDO, and suspending VDO concludes by
     * issuing a flush, all metadata written before the suspend is flushed
     * by the suspend and all metadata between the suspend and the write
     * policy change is written to synchronous storage.
     */
    logInfo("Modifying device '%s' write policy from %s to %s",
            config->poolName, getConfigWritePolicyString(extantConfig),
            getConfigWritePolicyString(config));
    setWritePolicy(layer->kvdo.vdo, config->writePolicy);
  }

  if (config->owningTarget->len != extantConfig->owningTarget->len) {
    size_t logicalBytes = to_bytes(config->owningTarget->len);
    int result = resizeLogical(layer, logicalBytes / VDO_BLOCK_SIZE);
    if (result != VDO_SUCCESS) {
      return result;
    }
  }

  // Grow physical if the version is 0, so we can't tell if we
  // got an old-style growPhysical command, or if size changed.
  if ((config->physicalBlocks != extantConfig->physicalBlocks)
      || (config->version == 0)) {
    int result = resizePhysical(layer, config->physicalBlocks);
    if (result != VDO_SUCCESS) {
      return result;
    }
  }

  if (strcmp(config->poolName, extantConfig->poolName) != 0) {
    logInfo("Modifying device '%s' pool name from %s to %s",
	    config->poolName, extantConfig->poolName, config->poolName);
    int result = modifyPoolName(layer, extantConfig->poolName,
				config->poolName);
    if (result != VDO_SUCCESS) {
      return result;
    }
  }
    
  return VDO_SUCCESS;
}

/**********************************************************************/
void freeKernelLayer(KernelLayer *layer)
{
  // This is not the cleanest implementation, but given the current timing
  // uncertainties in the shutdown process for work queues, we need to
  // store information to enable a late-in-process deallocation of
  // funnel-queue data structures in work queues.
  bool usedBioAckQueue = false;
  bool usedCpuQueue    = false;
  bool usedKVDO        = false;
  bool releaseInstance = false;

  KernelLayerState state = getKernelLayerState(layer);
  switch (state) {
  case LAYER_STOPPING:
    logError("re-entered freeKernelLayer while stopping");
    break;

  case LAYER_RUNNING:
    suspendKernelLayer(layer);
    // fall through

  case LAYER_STARTING:
  case LAYER_RESUMING:
  case LAYER_SUSPENDED:
    stopKernelLayer(layer);
    // fall through

  case LAYER_STOPPED:
  case LAYER_CPU_QUEUE_INITIALIZED:
    finishWorkQueue(layer->cpuQueue);
    usedCpuQueue = true;
    releaseInstance = true;
    // fall through

  case LAYER_BIO_ACK_QUEUE_INITIALIZED:
    if (useBioAckQueue(layer)) {
      finishWorkQueue(layer->bioAckQueue);
      usedBioAckQueue = true;
    }
    // fall through

  case LAYER_BIO_DATA_INITIALIZED:
    cleanupIOSubmitter(layer->ioSubmitter);
    // fall through

  case LAYER_REQUEST_QUEUE_INITIALIZED:
    finishKVDO(&layer->kvdo);
    usedKVDO = true;
    // fall through

  case LAYER_BUFFER_POOLS_INITIALIZED:
    freeBufferPool(&layer->dataKVIOPool);
    freeBufferPool(&layer->traceBufferPool);
    // fall through

  case LAYER_SIMPLE_THINGS_INITIALIZED:
    if (layer->compressionContext != NULL) {
      for (int i = 0; i < layer->deviceConfig->threadCounts.cpuThreads; i++) {
        FREE(layer->compressionContext[i]);
      }
      FREE(layer->compressionContext);
    }
    if (layer->dedupeIndex != NULL) {
      finishDedupeIndex(layer->dedupeIndex);
    }
    FREE(layer->spareKVDOFlush);
    layer->spareKVDOFlush = NULL;
    freeBatchProcessor(&layer->dataKVIOReleaser);
    removeLayerFromDeviceRegistry(layer);
    break;

  default:
    logError("Unknown Kernel Layer state: %d", state);
  }

  // Late deallocation of resources in work queues.
  if (usedCpuQueue) {
    freeWorkQueue(&layer->cpuQueue);
  }
  if (usedBioAckQueue) {
    freeWorkQueue(&layer->bioAckQueue);
  }
  if (layer->ioSubmitter) {
    freeIOSubmitter(layer->ioSubmitter);
  }
  if (usedKVDO) {
    destroyKVDO(&layer->kvdo);
  }
  if (layer->bioset != NULL) {
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,18,0)
    bioset_exit(layer->bioset);
    FREE(layer->bioset);
#else
    bioset_free(layer->bioset);
#endif
    layer->bioset = NULL;
  }

  freeDedupeIndex(&layer->dedupeIndex);

  stopPeriodicEventReporter(&layer->albireoTimeoutReporter);
  if (releaseInstance) {
    releaseKVDOInstance(layer->instance);
  }

  // The call to kobject_put on the kobj sysfs node will decrement its
  // reference count; when the count goes to zero the VDO object and
  // the kernel layer object will be freed as a side effect.
  kobject_put(&layer->wqDirectory);
  kobject_put(&layer->kobj);
}

/**********************************************************************/
static void poolStatsRelease(struct kobject *kobj)
{
  KernelLayer *layer = container_of(kobj, KernelLayer, statsDirectory);
  complete(&layer->statsShutdown);
}

/**********************************************************************/
int preloadKernelLayer(KernelLayer          *layer,
                       const VDOLoadConfig  *loadConfig,
                       char                **reason)
{
  if (getKernelLayerState(layer) != LAYER_CPU_QUEUE_INITIALIZED) {
    *reason = "preloadKernelLayer() may only be invoked after initialization";
    return UDS_BAD_STATE;
  }

  setKernelLayerState(layer, LAYER_STARTING);
  int result = preloadKVDO(&layer->kvdo, &layer->common, loadConfig,
                         layer->vioTraceRecording, reason);
  if (result != VDO_SUCCESS) {
    stopKernelLayer(layer);
    return result;
  }

  return VDO_SUCCESS;
}

/**********************************************************************/
int startKernelLayer(KernelLayer *layer, char **reason)
{
  if (getKernelLayerState(layer) != LAYER_STARTING) {
    *reason = "Cannot start kernel from non-starting state";
    stopKernelLayer(layer);
    return UDS_BAD_STATE;
  }

  int result = startKVDO(&layer->kvdo, &layer->common, reason);
  if (result != VDO_SUCCESS) {
    stopKernelLayer(layer);
    return result;
  }

  setKernelLayerState(layer, LAYER_RUNNING);
  static struct kobj_type statsDirectoryKobjType = {
    .release       = poolStatsRelease,
    .sysfs_ops     = &poolStatsSysfsOps,
    .default_attrs = poolStatsAttrs,
  };
  kobject_init(&layer->statsDirectory, &statsDirectoryKobjType);
  result = kobject_add(&layer->statsDirectory, &layer->kobj, "statistics");
  if (result != 0) {
    *reason = "Cannot add sysfs statistics node";
    stopKernelLayer(layer);
    return result;
  }
  layer->statsAdded = true;

  if (layer->deviceConfig->deduplication) {
    // Don't try to load or rebuild the index first (and log scary error
    // messages) if this is known to be a newly-formatted volume.
    startDedupeIndex(layer->dedupeIndex, wasNew(layer->kvdo.vdo));
  }

  result = vdoCreateProcfsEntry(layer, layer->deviceConfig->poolName,
                                &layer->procfsPrivate);
  if (result != VDO_SUCCESS) {
    *reason = "Could not create proc filesystem entry";
    stopKernelLayer(layer);
    return result;
  }

  layer->allocationsAllowed = false;

  return VDO_SUCCESS;
}

/**********************************************************************/
void stopKernelLayer(KernelLayer *layer)
{
  layer->allocationsAllowed = true;

  // Stop services that need to gather VDO statistics from the worker threads.
  if (layer->statsAdded) {
    layer->statsAdded = false;
    init_completion(&layer->statsShutdown);
    kobject_put(&layer->statsDirectory);
    wait_for_completion(&layer->statsShutdown);
  }
  vdoDestroyProcfsEntry(layer->deviceConfig->poolName, layer->procfsPrivate);

  switch (getKernelLayerState(layer)) {
  case LAYER_RUNNING:
    suspendKernelLayer(layer);
    // fall through

  case LAYER_SUSPENDED:
    setKernelLayerState(layer, LAYER_STOPPING);
    stopDedupeIndex(layer->dedupeIndex);
    // fall through

  case LAYER_STOPPING:
  case LAYER_STOPPED:
  default:
    setKernelLayerState(layer, LAYER_STOPPED);
  }
}

/**********************************************************************/
int suspendKernelLayer(KernelLayer *layer)
{
  // It's important to note any error here does not actually stop device-mapper
  // from suspending the device. All this work is done post suspend.
  KernelLayerState state = getKernelLayerState(layer);
  if (state == LAYER_SUSPENDED) {
    return VDO_SUCCESS;
  }
  if (state != LAYER_RUNNING) {
    logError("Suspend invoked while in unexpected kernel layer state %d",
             state);
    return -EINVAL;
  }

  /*
   * Attempt to flush all I/O before completing post suspend work. This is
   * needed so that changing write policy upon resume is safe. Also, we think
   * a suspended device is expected to have persisted all data written before
   * the suspend, even if it hasn't been flushed yet.
   */
  waitForNoRequestsActive(layer);
  int result = synchronousFlush(layer);
  if (result != VDO_SUCCESS) {
    setKVDOReadOnly(&layer->kvdo, result);
  }

  /*
   * Suspend the VDO, writing out all dirty metadata if the no-flush flag
   * was not set on the dmsetup suspend call. This will ensure that we don't
   * have cause to write while suspended [VDO-4402].
   */
  int suspendResult = suspendKVDO(&layer->kvdo);
  if (result == VDO_SUCCESS) {
    result = suspendResult;
  }

  suspendDedupeIndex(layer->dedupeIndex, !layer->noFlushSuspend);
  setKernelLayerState(layer, LAYER_SUSPENDED);
  return result;
}

/**********************************************************************/
int resumeKernelLayer(KernelLayer *layer)
{
  if (getKernelLayerState(layer) == LAYER_RUNNING) {
    return VDO_SUCCESS;
  }

  resumeDedupeIndex(layer->dedupeIndex);
  int result = resumeKVDO(&layer->kvdo);
  if (result != VDO_SUCCESS) {
    return result;
  }

  setKernelLayerState(layer, LAYER_RUNNING);
  return VDO_SUCCESS;
}

/***********************************************************************/
int prepareToResizePhysical(KernelLayer *layer, BlockCount physicalCount)
{
  logInfo("Preparing to resize physical to %llu", physicalCount);
  // Allocations are allowed and permissible through this non-VDO thread,
  // since IO triggered by this allocation to VDO can finish just fine.
  int result = kvdoPrepareToGrowPhysical(&layer->kvdo, physicalCount);
  if (result != VDO_SUCCESS) {
    // kvdoPrepareToGrowPhysical logs errors.
    if (result == VDO_PARAMETER_MISMATCH) {
      // If we don't trap this case, mapToSystemError() will remap it to -EIO,
      // which is misleading and ahistorical.
      return -EINVAL;
    } else {
      return result;
    }
  }

  logInfo("Done preparing to resize physical");
  return VDO_SUCCESS;
}

/***********************************************************************/
int resizePhysical(KernelLayer *layer, BlockCount physicalCount)
{
  // We must not mark the layer as allowing allocations when it is suspended
  // lest an allocation attempt block on writing IO to the suspended VDO.
  int result = kvdoResizePhysical(&layer->kvdo, physicalCount);
  if (result != VDO_SUCCESS) {
    // kvdoResizePhysical logs errors
    return result;
  }
  return VDO_SUCCESS;
}

/***********************************************************************/
int prepareToResizeLogical(KernelLayer *layer, BlockCount logicalCount)
{
  logInfo("Preparing to resize logical to %llu", logicalCount);
  // Allocations are allowed and permissible through this non-VDO thread,
  // since IO triggered by this allocation to VDO can finish just fine.
  int result = kvdoPrepareToGrowLogical(&layer->kvdo, logicalCount);
  if (result != VDO_SUCCESS) {
    // kvdoPrepareToGrowLogical logs errors
    return result;
  }

  logInfo("Done preparing to resize logical");
  return VDO_SUCCESS;
}

/***********************************************************************/
int resizeLogical(KernelLayer *layer, BlockCount logicalCount)
{
  logInfo("Resizing logical to %llu", logicalCount);
  // We must not mark the layer as allowing allocations when it is suspended
  // lest an allocation attempt block on writing IO to the suspended VDO.
  int result = kvdoResizeLogical(&layer->kvdo, logicalCount);
  if (result != VDO_SUCCESS) {
    // kvdoResizeLogical logs errors
    return result;
  }

  logInfo("Logical blocks now %llu", logicalCount);
  return VDO_SUCCESS;
}