Blame source/vdo/base/packer.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/packer.c#8 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "packerInternals.h"
Packit Service 310c69
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
Packit Service 310c69
#include "adminState.h"
Packit Service 310c69
#include "allocatingVIO.h"
Packit Service 310c69
#include "allocationSelector.h"
Packit Service 310c69
#include "compressionState.h"
Packit Service 310c69
#include "dataVIO.h"
Packit Service 310c69
#include "hashLock.h"
Packit Service 310c69
#include "pbnLock.h"
Packit Service 310c69
#include "vdo.h"
Packit Service 310c69
#include "vdoInternal.h"
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check that we are on the packer thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param caller  The function which is asserting
Packit Service 310c69
 **/
Packit Service 310c69
static inline void assertOnPackerThread(Packer *packer, const char *caller)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY((getCallbackThreadID() == packer->threadID),
Packit Service 310c69
                  "%s() called from packer thread", caller);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline InputBin *inputBinFromRingNode(RingNode *node)
Packit Service 310c69
{
Packit Service 310c69
  STATIC_ASSERT(offsetof(InputBin, ring) == 0);
Packit Service 310c69
  return (InputBin *) node;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static inline OutputBin *outputBinFromRingNode(RingNode *node)
Packit Service 310c69
{
Packit Service 310c69
  STATIC_ASSERT(offsetof(OutputBin, ring) == 0);
Packit Service 310c69
  return (OutputBin *) node;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
InputBin *nextBin(const Packer *packer, InputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  if (bin->ring.next == &packer->inputBins) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  } else {
Packit Service 310c69
    return inputBinFromRingNode(bin->ring.next);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
InputBin *getFullestBin(const Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  if (isRingEmpty(&packer->inputBins)) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  } else {
Packit Service 310c69
    return inputBinFromRingNode(packer->inputBins.next);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Insert an input bin to the list, which is in ascending order of free space.
Packit Service 310c69
 * Since all bins are already in the list, this actually moves the bin to the
Packit Service 310c69
 * correct position in the list.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param bin     The input bin to move to its sorted position
Packit Service 310c69
 **/
Packit Service 310c69
static void insertInSortedList(Packer *packer, InputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  for (InputBin *activeBin = getFullestBin(packer);
Packit Service 310c69
       activeBin != NULL;
Packit Service 310c69
       activeBin = nextBin(packer, activeBin)) {
Packit Service 310c69
    if (activeBin->freeSpace > bin->freeSpace) {
Packit Service 310c69
      pushRingNode(&activeBin->ring, &bin->ring);
Packit Service 310c69
      return;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  pushRingNode(&packer->inputBins, &bin->ring);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Allocate an input bin and put it into the packer's list.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static int makeInputBin(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  InputBin *bin;
Packit Service 310c69
  int result = ALLOCATE_EXTENDED(InputBin, MAX_COMPRESSION_SLOTS, VIO *,
Packit Service 310c69
                                 __func__, &bin;;
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  bin->freeSpace = packer->binDataSize;
Packit Service 310c69
  initializeRing(&bin->ring);
Packit Service 310c69
  pushRingNode(&packer->inputBins, &bin->ring);
Packit Service 310c69
  return VDO_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Push an output bin onto the stack of idle bins.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param bin     The output bin
Packit Service 310c69
 **/
Packit Service 310c69
static void pushOutputBin(Packer *packer, OutputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  ASSERT_LOG_ONLY(!hasWaiters(&bin->outgoing),
Packit Service 310c69
                  "idle output bin has no waiters");
Packit Service 310c69
  packer->idleOutputBins[packer->idleOutputBinCount++] = bin;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Pop an output bin off the end of the stack of idle bins.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 *
Packit Service 310c69
 * @return an idle output bin, or NULL if there are no idle bins
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static OutputBin *popOutputBin(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  if (packer->idleOutputBinCount == 0) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  size_t     index = --packer->idleOutputBinCount;
Packit Service 310c69
  OutputBin *bin   = packer->idleOutputBins[index];
Packit Service 310c69
  packer->idleOutputBins[index] = NULL;
Packit Service 310c69
  return bin;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Allocate a new output bin and push it onto the packer's stack of idle bins.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param layer   The physical layer that will receive the compressed block
Packit Service 310c69
 *                writes from the output bin
Packit Service 310c69
 *
Packit Service 310c69
 * @return VDO_SUCCESS or an error code
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static int makeOutputBin(Packer *packer, PhysicalLayer *layer)
Packit Service 310c69
{
Packit Service 310c69
  OutputBin *output;
Packit Service 310c69
  int result = ALLOCATE(1, OutputBin, __func__, &output);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Add the bin to the stack even before it's fully initialized so it will
Packit Service 310c69
  // be freed even if we fail to initialize it below.
Packit Service 310c69
  initializeRing(&output->ring);
Packit Service 310c69
  pushRingNode(&packer->outputBins, &output->ring);
Packit Service 310c69
  pushOutputBin(packer, output);
Packit Service 310c69
Packit Service 310c69
  result = ALLOCATE_EXTENDED(CompressedBlock, packer->binDataSize, char,
Packit Service 310c69
                             "compressed block", &output->block);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  return layer->createCompressedWriteVIO(layer, output, (char *) output->block,
Packit Service 310c69
                                         &output->writer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Free an idle output bin and null out the reference to it.
Packit Service 310c69
 *
Packit Service 310c69
 * @param binPtr  The reference to the output bin to free
Packit Service 310c69
 **/
Packit Service 310c69
static void freeOutputBin(OutputBin **binPtr)
Packit Service 310c69
{
Packit Service 310c69
  OutputBin *bin = *binPtr;
Packit Service 310c69
  if (bin == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  unspliceRingNode(&bin->ring);
Packit Service 310c69
Packit Service 310c69
  VIO *vio = allocatingVIOAsVIO(bin->writer);
Packit Service 310c69
  freeVIO(&vio);
Packit Service 310c69
  FREE(bin->block);
Packit Service 310c69
  FREE(bin);
Packit Service 310c69
  *binPtr = NULL;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makePacker(PhysicalLayer       *layer,
Packit Service 310c69
               BlockCount           inputBinCount,
Packit Service 310c69
               BlockCount           outputBinCount,
Packit Service 310c69
               const ThreadConfig  *threadConfig,
Packit Service 310c69
               Packer             **packerPtr)
Packit Service 310c69
{
Packit Service 310c69
  Packer *packer;
Packit Service 310c69
  int result = ALLOCATE_EXTENDED(Packer, outputBinCount,
Packit Service 310c69
                                 OutputBin *, __func__, &packer);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  packer->threadID       = getPackerZoneThread(threadConfig);
Packit Service 310c69
  packer->binDataSize    = VDO_BLOCK_SIZE - sizeof(CompressedBlockHeader);
Packit Service 310c69
  packer->size           = inputBinCount;
Packit Service 310c69
  packer->maxSlots       = MAX_COMPRESSION_SLOTS;
Packit Service 310c69
  packer->outputBinCount = outputBinCount;
Packit Service 310c69
  initializeRing(&packer->inputBins);
Packit Service 310c69
  initializeRing(&packer->outputBins);
Packit Service 310c69
Packit Service 310c69
  result = makeAllocationSelector(threadConfig->physicalZoneCount,
Packit Service 310c69
                                  packer->threadID, &packer->selector);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    freePacker(&packer);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  for (BlockCount i = 0; i < inputBinCount; i++) {
Packit Service 310c69
    int result = makeInputBin(packer);
Packit Service 310c69
    if (result != VDO_SUCCESS) {
Packit Service 310c69
      freePacker(&packer);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * The canceled bin can hold up to half the number of user VIOs. Every
Packit Service 310c69
   * canceled VIO in the bin must have a canceler for which it is waiting, and
Packit Service 310c69
   * any canceler will only have canceled one lock holder at a time.
Packit Service 310c69
   */
Packit Service 310c69
  result = ALLOCATE_EXTENDED(InputBin, MAXIMUM_USER_VIOS / 2, VIO *, __func__,
Packit Service 310c69
                             &packer->canceledBin);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    freePacker(&packer);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  for (BlockCount i = 0; i < outputBinCount; i++) {
Packit Service 310c69
    int result = makeOutputBin(packer, layer);
Packit Service 310c69
    if (result != VDO_SUCCESS) {
Packit Service 310c69
      freePacker(&packer);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *packerPtr = packer;
Packit Service 310c69
  return VDO_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freePacker(Packer **packerPtr)
Packit Service 310c69
{
Packit Service 310c69
  Packer *packer = *packerPtr;
Packit Service 310c69
  if (packer == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  InputBin *input;
Packit Service 310c69
  while ((input = getFullestBin(packer)) != NULL) {
Packit Service 310c69
    unspliceRingNode(&input->ring);
Packit Service 310c69
    FREE(input);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  FREE(packer->canceledBin);
Packit Service 310c69
Packit Service 310c69
  OutputBin *output;
Packit Service 310c69
  while ((output = popOutputBin(packer)) != NULL) {
Packit Service 310c69
    freeOutputBin(&output);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  freeAllocationSelector(&packer->selector);
Packit Service 310c69
  FREE(packer);
Packit Service 310c69
  *packerPtr = NULL;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Get the Packer from a DataVIO.
Packit Service 310c69
 *
Packit Service 310c69
 * @param dataVIO  The DataVIO
Packit Service 310c69
 *
Packit Service 310c69
 * @return The Packer from the VDO to which the DataVIO belongs
Packit Service 310c69
 **/
Packit Service 310c69
static inline Packer *getPackerFromDataVIO(DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  return getVDOFromDataVIO(dataVIO)->packer;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isSufficientlyCompressible(DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  Packer *packer = getPackerFromDataVIO(dataVIO);
Packit Service 310c69
  return (dataVIO->compression.size < packer->binDataSize);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
ThreadID getPackerThreadID(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  return packer->threadID;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
PackerStatistics getPackerStatistics(const Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  /*
Packit Service 310c69
   * This is called from getVDOStatistics(), which is called from outside the
Packit Service 310c69
   * packer thread. These are just statistics with no semantics that could
Packit Service 310c69
   * rely on memory order, so unfenced reads are sufficient.
Packit Service 310c69
   */
Packit Service 310c69
  return (PackerStatistics) {
Packit Service 310c69
    .compressedFragmentsWritten  = relaxedLoad64(&packer->fragmentsWritten),
Packit Service 310c69
    .compressedBlocksWritten     = relaxedLoad64(&packer->blocksWritten),
Packit Service 310c69
    .compressedFragmentsInPacker = relaxedLoad64(&packer->fragmentsPending),
Packit Service 310c69
  };
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Abort packing a DataVIO.
Packit Service 310c69
 *
Packit Service 310c69
 * @param dataVIO     The DataVIO to abort
Packit Service 310c69
 **/
Packit Service 310c69
static void abortPacking(DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  setCompressionDone(dataVIO);
Packit Service 310c69
  relaxedAdd64(&getPackerFromDataVIO(dataVIO)->fragmentsPending, -1);
Packit Service 310c69
  dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
Packit Service 310c69
  continueDataVIO(dataVIO, VDO_SUCCESS);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This continues the VIO completion without packing the VIO.
Packit Service 310c69
 *
Packit Service 310c69
 * @param waiter  The wait queue entry of the VIO to continue
Packit Service 310c69
 * @param unused  An argument required so this function may be called
Packit Service 310c69
 *                from notifyAllWaiters
Packit Service 310c69
 **/
Packit Service 310c69
static void continueVIOWithoutPacking(Waiter *waiter,
Packit Service 310c69
                                      void *unused __attribute__((unused)))
Packit Service 310c69
{
Packit Service 310c69
  abortPacking(waiterAsDataVIO(waiter));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Check whether the packer has drained.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 **/
Packit Service 310c69
static void checkForDrainComplete(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  if (isDraining(&packer->state)
Packit Service 310c69
      && (packer->canceledBin->slotsUsed == 0)
Packit Service 310c69
      && (packer->idleOutputBinCount == packer->outputBinCount)) {
Packit Service 310c69
    finishDraining(&packer->state);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void writePendingBatches(Packer *packer);
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Ensure that a completion is running on the packer thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The compressed write VIO
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the completion is on the packer thread
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static bool switchToPackerThread(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  VIO      *vio      = asVIO(completion);
Packit Service 310c69
  ThreadID  threadID = vio->vdo->packer->threadID;
Packit Service 310c69
  if (completion->callbackThreadID == threadID) {
Packit Service 310c69
    return true;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  completion->callbackThreadID = threadID;
Packit Service 310c69
  invokeCallback(completion);
Packit Service 310c69
  return false;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Finish processing an output bin whose write has completed. If there was
Packit Service 310c69
 * an error, any DataVIOs waiting on the bin write will be notified.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer which owns the bin
Packit Service 310c69
 * @param bin     The bin which has finished
Packit Service 310c69
 **/
Packit Service 310c69
static void finishOutputBin(Packer *packer, OutputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  if (hasWaiters(&bin->outgoing)) {
Packit Service 310c69
    notifyAllWaiters(&bin->outgoing, continueVIOWithoutPacking, NULL);
Packit Service 310c69
  } else {
Packit Service 310c69
    // No waiters implies no error, so the compressed block was written.
Packit Service 310c69
    relaxedAdd64(&packer->fragmentsPending, -bin->slotsUsed);
Packit Service 310c69
    relaxedAdd64(&packer->fragmentsWritten, bin->slotsUsed);
Packit Service 310c69
    relaxedAdd64(&packer->blocksWritten, 1);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  bin->slotsUsed = 0;
Packit Service 310c69
  pushOutputBin(packer, bin);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This finishes the bin write process after the bin is written to disk. This
Packit Service 310c69
 * is the VIO callback function registered by writeOutputBin().
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The compressed write VIO
Packit Service 310c69
 **/
Packit Service 310c69
static void completeOutputBin(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  if (!switchToPackerThread(completion)) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  VIO *vio = asVIO(completion);
Packit Service 310c69
  if (completion->result != VDO_SUCCESS) {
Packit Service 310c69
    updateVIOErrorStats(vio,
Packit Service 310c69
                        "Completing compressed write VIO for physical block %"
Packit Service 310c69
                        PRIu64 " with error",
Packit Service 310c69
                        vio->physical);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  Packer *packer = vio->vdo->packer;
Packit Service 310c69
  finishOutputBin(packer, completion->parent);
Packit Service 310c69
  writePendingBatches(packer);
Packit Service 310c69
  checkForDrainComplete(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Implements WaiterCallback. Continues the DataVIO waiter.
Packit Service 310c69
 **/
Packit Service 310c69
static void continueWaiter(Waiter *waiter,
Packit Service 310c69
                           void   *context __attribute__((unused)))
Packit Service 310c69
{
Packit Service 310c69
  DataVIO *dataVIO = waiterAsDataVIO(waiter);
Packit Service 310c69
  continueDataVIO(dataVIO, VDO_SUCCESS);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Implements WaiterCallback. Updates the DataVIO waiter to refer to its slot
Packit Service 310c69
 * in the compressed block, gives the DataVIO a share of the PBN lock on that
Packit Service 310c69
 * block, and reserves a reference count increment on the lock.
Packit Service 310c69
 **/
Packit Service 310c69
static void shareCompressedBlock(Waiter *waiter, void *context)
Packit Service 310c69
{
Packit Service 310c69
  DataVIO   *dataVIO = waiterAsDataVIO(waiter);
Packit Service 310c69
  OutputBin *bin     = context;
Packit Service 310c69
Packit Service 310c69
  dataVIO->newMapped = (ZonedPBN) {
Packit Service 310c69
    .pbn   = bin->writer->allocation,
Packit Service 310c69
    .zone  = bin->writer->zone,
Packit Service 310c69
    .state = getStateForSlot(dataVIO->compression.slot),
Packit Service 310c69
  };
Packit Service 310c69
  dataVIOAsVIO(dataVIO)->physical = dataVIO->newMapped.pbn;
Packit Service 310c69
Packit Service 310c69
  shareCompressedWriteLock(dataVIO, bin->writer->allocationLock);
Packit Service 310c69
Packit Service 310c69
  // Wait again for all the waiters to get a share.
Packit Service 310c69
  int result = enqueueWaiter(&bin->outgoing, waiter);
Packit Service 310c69
  // Cannot fail since this waiter was just dequeued.
Packit Service 310c69
  ASSERT_LOG_ONLY(result == VDO_SUCCESS, "impossible enqueueWaiter error");
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Finish a compressed block write. This callback is registered in
Packit Service 310c69
 * continueAfterAllocation().
Packit Service 310c69
 *
Packit Service 310c69
 * @param completion  The compressed write completion
Packit Service 310c69
 **/
Packit Service 310c69
static void finishCompressedWrite(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  OutputBin *bin = completion->parent;
Packit Service 310c69
  assertInPhysicalZone(bin->writer);
Packit Service 310c69
Packit Service 310c69
  if (completion->result != VDO_SUCCESS) {
Packit Service 310c69
    releaseAllocationLock(bin->writer);
Packit Service 310c69
    // Invokes completeOutputBin() on the packer thread, which will deal with
Packit Service 310c69
    // the waiters.
Packit Service 310c69
    vioDoneCallback(completion);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // First give every DataVIO/HashLock a share of the PBN lock to ensure it
Packit Service 310c69
  // can't be released until they've all done their incRefs.
Packit Service 310c69
  notifyAllWaiters(&bin->outgoing, shareCompressedBlock, bin);
Packit Service 310c69
Packit Service 310c69
  // The waiters now hold the (downgraded) PBN lock.
Packit Service 310c69
  bin->writer->allocationLock = NULL;
Packit Service 310c69
Packit Service 310c69
  // Invokes the callbacks registered before entering the packer.
Packit Service 310c69
  notifyAllWaiters(&bin->outgoing, continueWaiter, NULL);
Packit Service 310c69
Packit Service 310c69
  // Invokes completeOutputBin() on the packer thread.
Packit Service 310c69
  vioDoneCallback(completion);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Continue the write path for a compressed write AllocatingVIO now that block
Packit Service 310c69
 * allocation is complete (the AllocatingVIO may or may not have actually
Packit Service 310c69
 * received an allocation).
Packit Service 310c69
 *
Packit Service 310c69
 * @param allocatingVIO  The AllocatingVIO which has finished the allocation
Packit Service 310c69
 *                       process
Packit Service 310c69
 **/
Packit Service 310c69
static void continueAfterAllocation(AllocatingVIO *allocatingVIO)
Packit Service 310c69
{
Packit Service 310c69
  VIO           *vio        = allocatingVIOAsVIO(allocatingVIO);
Packit Service 310c69
  VDOCompletion *completion = vioAsCompletion(vio);
Packit Service 310c69
  if (allocatingVIO->allocation == ZERO_BLOCK) {
Packit Service 310c69
    completion->requeue = true;
Packit Service 310c69
    setCompletionResult(completion, VDO_NO_SPACE);
Packit Service 310c69
    vioDoneCallback(completion);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  setPhysicalZoneCallback(allocatingVIO, finishCompressedWrite,
Packit Service 310c69
                          THIS_LOCATION("$F(meta);cb=finishCompressedWrite"));
Packit Service 310c69
  completion->layer->writeCompressedBlock(allocatingVIO);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Launch an output bin.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer which owns the bin
Packit Service 310c69
 * @param bin     The output bin to launch
Packit Service 310c69
 **/
Packit Service 310c69
static void launchCompressedWrite(Packer *packer, OutputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  if (isReadOnly(getVDOFromAllocatingVIO(bin->writer)->readOnlyNotifier)) {
Packit Service 310c69
    finishOutputBin(packer, bin);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  VIO *vio = allocatingVIOAsVIO(bin->writer);
Packit Service 310c69
  resetCompletion(vioAsCompletion(vio));
Packit Service 310c69
  vio->callback = completeOutputBin;
Packit Service 310c69
  vio->priority = VIO_PRIORITY_COMPRESSED_DATA;
Packit Service 310c69
  allocateDataBlock(bin->writer, packer->selector, VIO_COMPRESSED_WRITE_LOCK,
Packit Service 310c69
                    continueAfterAllocation);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Consume from the pending queue the next batch of VIOs that can be packed
Packit Service 310c69
 * together in a single compressed block. VIOs that have been mooted since
Packit Service 310c69
 * being placed in the pending queue will not be returned.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param batch   The counted array to fill with the next batch of VIOs
Packit Service 310c69
 **/
Packit Service 310c69
static void getNextBatch(Packer *packer, OutputBatch *batch)
Packit Service 310c69
{
Packit Service 310c69
  BlockSize spaceRemaining = packer->binDataSize;
Packit Service 310c69
  batch->slotsUsed         = 0;
Packit Service 310c69
Packit Service 310c69
  DataVIO *dataVIO;
Packit Service 310c69
  while ((dataVIO = waiterAsDataVIO(getFirstWaiter(&packer->batchedDataVIOs)))
Packit Service 310c69
         != NULL) {
Packit Service 310c69
    // If there's not enough space for the next DataVIO, the batch is done.
Packit Service 310c69
    if ((dataVIO->compression.size > spaceRemaining)
Packit Service 310c69
        || (batch->slotsUsed == packer->maxSlots)) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Remove the next DataVIO from the queue and put it in the output batch.
Packit Service 310c69
    dequeueNextWaiter(&packer->batchedDataVIOs);
Packit Service 310c69
    batch->slots[batch->slotsUsed++]  = dataVIO;
Packit Service 310c69
    spaceRemaining                   -= dataVIO->compression.size;
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Pack the next batch of compressed VIOs from the batched queue into an
Packit Service 310c69
 * output bin and write the output bin.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param output  The output bin to fill
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if a write was issued for the output bin
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static bool writeNextBatch(Packer *packer, OutputBin *output)
Packit Service 310c69
{
Packit Service 310c69
  OutputBatch batch;
Packit Service 310c69
  getNextBatch(packer, &batch);
Packit Service 310c69
Packit Service 310c69
  if (batch.slotsUsed == 0) {
Packit Service 310c69
    // The pending queue must now be empty (there may have been mooted VIOs).
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // If the batch contains only a single VIO, then we save nothing by saving
Packit Service 310c69
  // the compressed form. Continue processing the single VIO in the batch.
Packit Service 310c69
  if (batch.slotsUsed == 1) {
Packit Service 310c69
    abortPacking(batch.slots[0]);
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  resetCompressedBlockHeader(&output->block->header);
Packit Service 310c69
Packit Service 310c69
  size_t spaceUsed = 0;
Packit Service 310c69
  for (SlotNumber slot = 0; slot < batch.slotsUsed; slot++) {
Packit Service 310c69
    DataVIO *dataVIO = batch.slots[slot];
Packit Service 310c69
    dataVIO->compression.slot = slot;
Packit Service 310c69
    putCompressedBlockFragment(output->block, slot, spaceUsed,
Packit Service 310c69
                               dataVIO->compression.data,
Packit Service 310c69
                               dataVIO->compression.size);
Packit Service 310c69
    spaceUsed += dataVIO->compression.size;
Packit Service 310c69
Packit Service 310c69
    int result = enqueueDataVIO(&output->outgoing, dataVIO,
Packit Service 310c69
                                THIS_LOCATION(NULL));
Packit Service 310c69
    if (result != VDO_SUCCESS) {
Packit Service 310c69
      abortPacking(dataVIO);
Packit Service 310c69
      continue;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    output->slotsUsed += 1;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  launchCompressedWrite(packer, output);
Packit Service 310c69
  return true;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Put a DataVIO in a specific InputBin in which it will definitely fit.
Packit Service 310c69
 *
Packit Service 310c69
 * @param bin      The bin in which to put the DataVIO
Packit Service 310c69
 * @param dataVIO  The DataVIO to add
Packit Service 310c69
 **/
Packit Service 310c69
static void addToInputBin(InputBin *bin, DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  dataVIO->compression.bin  = bin;
Packit Service 310c69
  dataVIO->compression.slot = bin->slotsUsed;
Packit Service 310c69
  bin->incoming[bin->slotsUsed++] = dataVIO;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Start a new batch of VIOs in an InputBin, moving the existing batch, if
Packit Service 310c69
 * any, to the queue of pending batched VIOs in the packer.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 * @param bin     The bin to prepare
Packit Service 310c69
 **/
Packit Service 310c69
static void startNewBatch(Packer *packer, InputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  // Move all the DataVIOs in the current batch to the batched queue so they
Packit Service 310c69
  // will get packed into the next free output bin.
Packit Service 310c69
  for (SlotNumber slot = 0; slot < bin->slotsUsed; slot++) {
Packit Service 310c69
    DataVIO *dataVIO = bin->incoming[slot];
Packit Service 310c69
    dataVIO->compression.bin = NULL;
Packit Service 310c69
Packit Service 310c69
    if (!mayWriteCompressedDataVIO(dataVIO)) {
Packit Service 310c69
      /*
Packit Service 310c69
       * Compression of this DataVIO was canceled while it was waiting; put it
Packit Service 310c69
       * in the canceled bin so it can be rendezvous with the canceling
Packit Service 310c69
       * DataVIO.
Packit Service 310c69
       */
Packit Service 310c69
      addToInputBin(packer->canceledBin, dataVIO);
Packit Service 310c69
      continue;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    int result = enqueueDataVIO(&packer->batchedDataVIOs, dataVIO,
Packit Service 310c69
                                THIS_LOCATION(NULL));
Packit Service 310c69
    if (result != VDO_SUCCESS) {
Packit Service 310c69
      // Impossible but we're required to check the result from enqueue.
Packit Service 310c69
      abortPacking(dataVIO);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // The bin is now empty.
Packit Service 310c69
  bin->slotsUsed = 0;
Packit Service 310c69
  bin->freeSpace = packer->binDataSize;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Add a DataVIO to a bin's incoming queue, handle logical space change, and
Packit Service 310c69
 * call physical space processor.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer   The packer
Packit Service 310c69
 * @param bin      The bin to which to add the the DataVIO
Packit Service 310c69
 * @param dataVIO  The DataVIO to add to the bin's queue
Packit Service 310c69
 **/
Packit Service 310c69
static void addDataVIOToInputBin(Packer   *packer,
Packit Service 310c69
                                 InputBin *bin,
Packit Service 310c69
                                 DataVIO  *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  // If the selected bin doesn't have room, start a new batch to make room.
Packit Service 310c69
  if (bin->freeSpace < dataVIO->compression.size) {
Packit Service 310c69
    startNewBatch(packer, bin);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  addToInputBin(bin, dataVIO);
Packit Service 310c69
  bin->freeSpace -= dataVIO->compression.size;
Packit Service 310c69
Packit Service 310c69
  // If we happen to exactly fill the bin, start a new input batch.
Packit Service 310c69
  if ((bin->slotsUsed == packer->maxSlots) || (bin->freeSpace == 0)) {
Packit Service 310c69
    startNewBatch(packer, bin);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Now that we've finished changing the free space, restore the sort order.
Packit Service 310c69
  insertInSortedList(packer, bin);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Move DataVIOs in pending batches from the batchedDataVIOs to all free output
Packit Service 310c69
 * bins, issuing writes for the output bins as they are packed. This will loop
Packit Service 310c69
 * until either the pending queue is drained or all output bins are busy
Packit Service 310c69
 * writing a compressed block.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer
Packit Service 310c69
 **/
Packit Service 310c69
static void writePendingBatches(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  if (packer->writingBatches) {
Packit Service 310c69
    /*
Packit Service 310c69
     * We've attempted to re-enter this function recursively due to completion
Packit Service 310c69
     * handling, which can lead to kernel stack overflow as in VDO-1340. It's
Packit Service 310c69
     * perfectly safe to break the recursion and do nothing since we know any
Packit Service 310c69
     * pending batches will eventually be handled by the earlier call.
Packit Service 310c69
     */
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Record that we are in this function for the above check. IMPORTANT: never
Packit Service 310c69
  // return from this function without clearing this flag.
Packit Service 310c69
  packer->writingBatches = true;
Packit Service 310c69
Packit Service 310c69
  OutputBin *output;
Packit Service 310c69
  while (hasWaiters(&packer->batchedDataVIOs)
Packit Service 310c69
         && ((output = popOutputBin(packer)) != NULL)) {
Packit Service 310c69
    if (!writeNextBatch(packer, output)) {
Packit Service 310c69
      // We didn't use the output bin to write, so push it back on the stack.
Packit Service 310c69
      pushOutputBin(packer, output);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  packer->writingBatches = false;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Select the input bin that should be used to pack the compressed data in a
Packit Service 310c69
 * DataVIO with other DataVIOs.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer   The packer
Packit Service 310c69
 * @param dataVIO  The DataVIO
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static InputBin *selectInputBin(Packer *packer, DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  // First best fit: select the bin with the least free space that has enough
Packit Service 310c69
  // room for the compressed data in the DataVIO.
Packit Service 310c69
  InputBin *fullestBin = getFullestBin(packer);
Packit Service 310c69
  for (InputBin *bin = fullestBin; bin != NULL; bin = nextBin(packer, bin)) {
Packit Service 310c69
    if (bin->freeSpace >= dataVIO->compression.size) {
Packit Service 310c69
      return bin;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * None of the bins have enough space for the DataVIO. We're not allowed to
Packit Service 310c69
   * create new bins, so we have to overflow one of the existing bins. It's
Packit Service 310c69
   * pretty intuitive to select the fullest bin, since that "wastes" the least
Packit Service 310c69
   * amount of free space in the compressed block. But if the space currently
Packit Service 310c69
   * used in the fullest bin is smaller than the compressed size of the
Packit Service 310c69
   * incoming block, it seems wrong to force that bin to write when giving up
Packit Service 310c69
   * on compressing the incoming DataVIO would likewise "waste" the the least
Packit Service 310c69
   * amount of free space.
Packit Service 310c69
   */
Packit Service 310c69
  if (dataVIO->compression.size
Packit Service 310c69
      >= (packer->binDataSize - fullestBin->freeSpace)) {
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // The fullest bin doesn't have room, but writing it out and starting a new
Packit Service 310c69
  // batch with the incoming DataVIO will increase the packer's free space.
Packit Service 310c69
  return fullestBin;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void attemptPacking(DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  Packer *packer = getPackerFromDataVIO(dataVIO);
Packit Service 310c69
  assertOnPackerThread(packer, __func__);
Packit Service 310c69
Packit Service 310c69
  VIOCompressionState state = getCompressionState(dataVIO);
Packit Service 310c69
  int result = ASSERT((state.status == VIO_COMPRESSING),
Packit Service 310c69
                      "attempt to pack DataVIO not ready for packing, state: "
Packit Service 310c69
                      "%u",
Packit Service 310c69
                      state.status);
Packit Service 310c69
  if (result != VDO_SUCCESS) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Increment whether or not this DataVIO will be packed or not since
Packit Service 310c69
   * abortPacking() always decrements the counter.
Packit Service 310c69
   */
Packit Service 310c69
  relaxedAdd64(&packer->fragmentsPending, 1);
Packit Service 310c69
Packit Service 310c69
  // If packing of this DataVIO is disallowed for administrative reasons, give
Packit Service 310c69
  // up before making any state changes.
Packit Service 310c69
  if (!isNormal(&packer->state)
Packit Service 310c69
      || (dataVIO->flushGeneration < packer->flushGeneration)) {
Packit Service 310c69
    abortPacking(dataVIO);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * The check of mayBlockInPacker() here will set the DataVIO's compression
Packit Service 310c69
   * state to VIO_PACKING if the DataVIO is allowed to be compressed (if it has
Packit Service 310c69
   * already been canceled, we'll fall out here). Once the DataVIO is in the
Packit Service 310c69
   * VIO_PACKING state, it must be guaranteed to be put in an input bin before
Packit Service 310c69
   * any more requests can be processed by the packer thread. Otherwise, a
Packit Service 310c69
   * canceling DataVIO could attempt to remove the canceled DataVIO from the
Packit Service 310c69
   * packer and fail to rendezvous with it (VDO-2809). We must also make sure
Packit Service 310c69
   * that we will actually bin the DataVIO and not give up on it as being
Packit Service 310c69
   * larger than the space used in the fullest bin. Hence we must call
Packit Service 310c69
   * selectInputBin() before calling mayBlockInPacker() (VDO-2826).
Packit Service 310c69
   */
Packit Service 310c69
  InputBin *bin = selectInputBin(packer, dataVIO);
Packit Service 310c69
  if ((bin == NULL) || !mayBlockInPacker(dataVIO)) {
Packit Service 310c69
    abortPacking(dataVIO);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  addDataVIOToInputBin(packer, bin, dataVIO);
Packit Service 310c69
  writePendingBatches(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Force a pending write for all non-empty bins on behalf of a flush or
Packit Service 310c69
 * suspend.
Packit Service 310c69
 *
Packit Service 310c69
 * @param packer  The packer being flushed
Packit Service 310c69
 **/
Packit Service 310c69
static void writeAllNonEmptyBins(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  for (InputBin *bin = getFullestBin(packer);
Packit Service 310c69
       bin != NULL;
Packit Service 310c69
       bin = nextBin(packer, bin)) {
Packit Service 310c69
    startNewBatch(packer, bin);
Packit Service 310c69
    // We don't need to re-sort the bin here since this loop will make every
Packit Service 310c69
    // bin have the same amount of free space, so every ordering is sorted.
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  writePendingBatches(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void flushPacker(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  assertOnPackerThread(packer, __func__);
Packit Service 310c69
  if (isNormal(&packer->state)) {
Packit Service 310c69
    writeAllNonEmptyBins(packer);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/*
Packit Service 310c69
 * This method is only exposed for unit tests and should not normally be called
Packit Service 310c69
 * directly; use removeLockHolderFromPacker() instead.
Packit Service 310c69
 */
Packit Service 310c69
void removeFromPacker(DataVIO *dataVIO)
Packit Service 310c69
{
Packit Service 310c69
  InputBin *bin    = dataVIO->compression.bin;
Packit Service 310c69
  ASSERT_LOG_ONLY((bin != NULL), "DataVIO in packer has an input bin");
Packit Service 310c69
Packit Service 310c69
  SlotNumber slot = dataVIO->compression.slot;
Packit Service 310c69
  bin->slotsUsed--;
Packit Service 310c69
  if (slot < bin->slotsUsed) {
Packit Service 310c69
    bin->incoming[slot] = bin->incoming[bin->slotsUsed];
Packit Service 310c69
    bin->incoming[slot]->compression.slot = slot;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  dataVIO->compression.bin  = NULL;
Packit Service 310c69
  dataVIO->compression.slot = 0;
Packit Service 310c69
Packit Service 310c69
  Packer *packer = getPackerFromDataVIO(dataVIO);
Packit Service 310c69
  if (bin != packer->canceledBin) {
Packit Service 310c69
    bin->freeSpace += dataVIO->compression.size;
Packit Service 310c69
    insertInSortedList(packer, bin);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  abortPacking(dataVIO);
Packit Service 310c69
  checkForDrainComplete(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void removeLockHolderFromPacker(VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  DataVIO *dataVIO = asDataVIO(completion);
Packit Service 310c69
  assertInPackerZone(dataVIO);
Packit Service 310c69
Packit Service 310c69
  DataVIO *lockHolder             = dataVIO->compression.lockHolder;
Packit Service 310c69
  dataVIO->compression.lockHolder = NULL;
Packit Service 310c69
  removeFromPacker(lockHolder);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void incrementPackerFlushGeneration(Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  assertOnPackerThread(packer, __func__);
Packit Service 310c69
  packer->flushGeneration++;
Packit Service 310c69
  flushPacker(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Initiate a drain.
Packit Service 310c69
 *
Packit Service 310c69
 * Implements AdminInitiator.
Packit Service 310c69
 **/
Packit Service 310c69
static void initiateDrain(AdminState *state)
Packit Service 310c69
{
Packit Service 310c69
  Packer *packer = container_of(state, Packer, state);
Packit Service 310c69
  writeAllNonEmptyBins(packer);
Packit Service 310c69
  checkForDrainComplete(packer);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void drainPacker(Packer *packer, VDOCompletion *completion)
Packit Service 310c69
{
Packit Service 310c69
  assertOnPackerThread(packer, __func__);
Packit Service 310c69
  startDraining(&packer->state, ADMIN_STATE_SUSPENDING, completion,
Packit Service 310c69
                initiateDrain);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void resumePacker(Packer *packer, VDOCompletion *parent)
Packit Service 310c69
{
Packit Service 310c69
  assertOnPackerThread(packer, __func__);
Packit Service 310c69
  finishCompletion(parent, resumeIfQuiescent(&packer->state));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void resetSlotCount(Packer *packer, CompressedFragmentCount slots)
Packit Service 310c69
{
Packit Service 310c69
  if (slots > MAX_COMPRESSION_SLOTS) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  packer->maxSlots = slots;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void dumpInputBin(const InputBin *bin, bool canceled)
Packit Service 310c69
{
Packit Service 310c69
  if (bin->slotsUsed == 0) {
Packit Service 310c69
    // Don't dump empty input bins.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  logInfo("    %sBin slotsUsed=%u freeSpace=%zu",
Packit Service 310c69
          (canceled ? "Canceled" : "Input"), bin->slotsUsed, bin->freeSpace);
Packit Service 310c69
Packit Service 310c69
  // XXX dump VIOs in bin->incoming? The VIOs should have been dumped from the
Packit Service 310c69
  // VIO pool. Maybe just dump their addresses so it's clear they're here?
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void dumpOutputBin(const OutputBin *bin)
Packit Service 310c69
{
Packit Service 310c69
  size_t count = countWaiters(&bin->outgoing);
Packit Service 310c69
  if (bin->slotsUsed == 0) {
Packit Service 310c69
    // Don't dump empty output bins.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  logInfo("    OutputBin contains %zu outgoing waiters", count);
Packit Service 310c69
Packit Service 310c69
  // XXX dump VIOs in bin->outgoing? The VIOs should have been dumped from the
Packit Service 310c69
  // VIO pool. Maybe just dump their addresses so it's clear they're here?
Packit Service 310c69
Packit Service 310c69
  // XXX dump writer VIO?
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void dumpPacker(const Packer *packer)
Packit Service 310c69
{
Packit Service 310c69
  logInfo("Packer");
Packit Service 310c69
  logInfo("  flushGeneration=%llu state %s writingBatches=%s",
Packit Service 310c69
          packer->flushGeneration, getAdminStateName(&packer->state),
Packit Service 310c69
          boolToString(packer->writingBatches));
Packit Service 310c69
Packit Service 310c69
  logInfo("  inputBinCount=%llu", packer->size);
Packit Service 310c69
  for (InputBin *bin = getFullestBin(packer);
Packit Service 310c69
       bin != NULL;
Packit Service 310c69
       bin = nextBin(packer, bin)) {
Packit Service 310c69
    dumpInputBin(bin, false);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  dumpInputBin(packer->canceledBin, true);
Packit Service 310c69
Packit Service 310c69
  logInfo("  outputBinCount=%zu idleOutputBinCount=%zu",
Packit Service 310c69
          packer->outputBinCount, packer->idleOutputBinCount);
Packit Service 310c69
  const RingNode *head = &packer->outputBins;
Packit Service 310c69
  for (RingNode *node = head->next; node != head; node = node->next) {
Packit Service 310c69
    dumpOutputBin(outputBinFromRingNode(node));
Packit Service 310c69
  }
Packit Service 310c69
}