|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Copyright (c) 2020 Red Hat, Inc.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is free software; you can redistribute it and/or
|
|
Packit Service |
310c69 |
* modify it under the terms of the GNU General Public License
|
|
Packit Service |
310c69 |
* as published by the Free Software Foundation; either version 2
|
|
Packit Service |
310c69 |
* of the License, or (at your option) any later version.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* This program is distributed in the hope that it will be useful,
|
|
Packit Service |
310c69 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit Service |
310c69 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
Packit Service |
310c69 |
* GNU General Public License for more details.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* You should have received a copy of the GNU General Public License
|
|
Packit Service |
310c69 |
* along with this program; if not, write to the Free Software
|
|
Packit Service |
310c69 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
Packit Service |
310c69 |
* 02110-1301, USA.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/packer.c#8 $
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "packerInternals.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "logger.h"
|
|
Packit Service |
310c69 |
#include "memoryAlloc.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
#include "adminState.h"
|
|
Packit Service |
310c69 |
#include "allocatingVIO.h"
|
|
Packit Service |
310c69 |
#include "allocationSelector.h"
|
|
Packit Service |
310c69 |
#include "compressionState.h"
|
|
Packit Service |
310c69 |
#include "dataVIO.h"
|
|
Packit Service |
310c69 |
#include "hashLock.h"
|
|
Packit Service |
310c69 |
#include "pbnLock.h"
|
|
Packit Service |
310c69 |
#include "vdo.h"
|
|
Packit Service |
310c69 |
#include "vdoInternal.h"
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Check that we are on the packer thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param caller The function which is asserting
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static inline void assertOnPackerThread(Packer *packer, const char *caller)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
ASSERT_LOG_ONLY((getCallbackThreadID() == packer->threadID),
|
|
Packit Service |
310c69 |
"%s() called from packer thread", caller);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static inline InputBin *inputBinFromRingNode(RingNode *node)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
STATIC_ASSERT(offsetof(InputBin, ring) == 0);
|
|
Packit Service |
310c69 |
return (InputBin *) node;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static inline OutputBin *outputBinFromRingNode(RingNode *node)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
STATIC_ASSERT(offsetof(OutputBin, ring) == 0);
|
|
Packit Service |
310c69 |
return (OutputBin *) node;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
InputBin *nextBin(const Packer *packer, InputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (bin->ring.next == &packer->inputBins) {
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
} else {
|
|
Packit Service |
310c69 |
return inputBinFromRingNode(bin->ring.next);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
InputBin *getFullestBin(const Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (isRingEmpty(&packer->inputBins)) {
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
} else {
|
|
Packit Service |
310c69 |
return inputBinFromRingNode(packer->inputBins.next);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Insert an input bin to the list, which is in ascending order of free space.
|
|
Packit Service |
310c69 |
* Since all bins are already in the list, this actually moves the bin to the
|
|
Packit Service |
310c69 |
* correct position in the list.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param bin The input bin to move to its sorted position
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void insertInSortedList(Packer *packer, InputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
for (InputBin *activeBin = getFullestBin(packer);
|
|
Packit Service |
310c69 |
activeBin != NULL;
|
|
Packit Service |
310c69 |
activeBin = nextBin(packer, activeBin)) {
|
|
Packit Service |
310c69 |
if (activeBin->freeSpace > bin->freeSpace) {
|
|
Packit Service |
310c69 |
pushRingNode(&activeBin->ring, &bin->ring);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
pushRingNode(&packer->inputBins, &bin->ring);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Allocate an input bin and put it into the packer's list.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static int makeInputBin(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
InputBin *bin;
|
|
Packit Service |
310c69 |
int result = ALLOCATE_EXTENDED(InputBin, MAX_COMPRESSION_SLOTS, VIO *,
|
|
Packit Service |
310c69 |
__func__, &bin;;
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
bin->freeSpace = packer->binDataSize;
|
|
Packit Service |
310c69 |
initializeRing(&bin->ring);
|
|
Packit Service |
310c69 |
pushRingNode(&packer->inputBins, &bin->ring);
|
|
Packit Service |
310c69 |
return VDO_SUCCESS;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Push an output bin onto the stack of idle bins.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param bin The output bin
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void pushOutputBin(Packer *packer, OutputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
ASSERT_LOG_ONLY(!hasWaiters(&bin->outgoing),
|
|
Packit Service |
310c69 |
"idle output bin has no waiters");
|
|
Packit Service |
310c69 |
packer->idleOutputBins[packer->idleOutputBinCount++] = bin;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Pop an output bin off the end of the stack of idle bins.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return an idle output bin, or NULL if there are no idle bins
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static OutputBin *popOutputBin(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (packer->idleOutputBinCount == 0) {
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
size_t index = --packer->idleOutputBinCount;
|
|
Packit Service |
310c69 |
OutputBin *bin = packer->idleOutputBins[index];
|
|
Packit Service |
310c69 |
packer->idleOutputBins[index] = NULL;
|
|
Packit Service |
310c69 |
return bin;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Allocate a new output bin and push it onto the packer's stack of idle bins.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param layer The physical layer that will receive the compressed block
|
|
Packit Service |
310c69 |
* writes from the output bin
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return VDO_SUCCESS or an error code
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static int makeOutputBin(Packer *packer, PhysicalLayer *layer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
OutputBin *output;
|
|
Packit Service |
310c69 |
int result = ALLOCATE(1, OutputBin, __func__, &output);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Add the bin to the stack even before it's fully initialized so it will
|
|
Packit Service |
310c69 |
// be freed even if we fail to initialize it below.
|
|
Packit Service |
310c69 |
initializeRing(&output->ring);
|
|
Packit Service |
310c69 |
pushRingNode(&packer->outputBins, &output->ring);
|
|
Packit Service |
310c69 |
pushOutputBin(packer, output);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
result = ALLOCATE_EXTENDED(CompressedBlock, packer->binDataSize, char,
|
|
Packit Service |
310c69 |
"compressed block", &output->block);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
return layer->createCompressedWriteVIO(layer, output, (char *) output->block,
|
|
Packit Service |
310c69 |
&output->writer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Free an idle output bin and null out the reference to it.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param binPtr The reference to the output bin to free
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void freeOutputBin(OutputBin **binPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
OutputBin *bin = *binPtr;
|
|
Packit Service |
310c69 |
if (bin == NULL) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
unspliceRingNode(&bin->ring);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
VIO *vio = allocatingVIOAsVIO(bin->writer);
|
|
Packit Service |
310c69 |
freeVIO(&vio);
|
|
Packit Service |
310c69 |
FREE(bin->block);
|
|
Packit Service |
310c69 |
FREE(bin);
|
|
Packit Service |
310c69 |
*binPtr = NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
int makePacker(PhysicalLayer *layer,
|
|
Packit Service |
310c69 |
BlockCount inputBinCount,
|
|
Packit Service |
310c69 |
BlockCount outputBinCount,
|
|
Packit Service |
310c69 |
const ThreadConfig *threadConfig,
|
|
Packit Service |
310c69 |
Packer **packerPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
Packer *packer;
|
|
Packit Service |
310c69 |
int result = ALLOCATE_EXTENDED(Packer, outputBinCount,
|
|
Packit Service |
310c69 |
OutputBin *, __func__, &packer);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
packer->threadID = getPackerZoneThread(threadConfig);
|
|
Packit Service |
310c69 |
packer->binDataSize = VDO_BLOCK_SIZE - sizeof(CompressedBlockHeader);
|
|
Packit Service |
310c69 |
packer->size = inputBinCount;
|
|
Packit Service |
310c69 |
packer->maxSlots = MAX_COMPRESSION_SLOTS;
|
|
Packit Service |
310c69 |
packer->outputBinCount = outputBinCount;
|
|
Packit Service |
310c69 |
initializeRing(&packer->inputBins);
|
|
Packit Service |
310c69 |
initializeRing(&packer->outputBins);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
result = makeAllocationSelector(threadConfig->physicalZoneCount,
|
|
Packit Service |
310c69 |
packer->threadID, &packer->selector);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
freePacker(&packer);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
for (BlockCount i = 0; i < inputBinCount; i++) {
|
|
Packit Service |
310c69 |
int result = makeInputBin(packer);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
freePacker(&packer);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* The canceled bin can hold up to half the number of user VIOs. Every
|
|
Packit Service |
310c69 |
* canceled VIO in the bin must have a canceler for which it is waiting, and
|
|
Packit Service |
310c69 |
* any canceler will only have canceled one lock holder at a time.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
result = ALLOCATE_EXTENDED(InputBin, MAXIMUM_USER_VIOS / 2, VIO *, __func__,
|
|
Packit Service |
310c69 |
&packer->canceledBin);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
freePacker(&packer);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
for (BlockCount i = 0; i < outputBinCount; i++) {
|
|
Packit Service |
310c69 |
int result = makeOutputBin(packer, layer);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
freePacker(&packer);
|
|
Packit Service |
310c69 |
return result;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
*packerPtr = packer;
|
|
Packit Service |
310c69 |
return VDO_SUCCESS;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void freePacker(Packer **packerPtr)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
Packer *packer = *packerPtr;
|
|
Packit Service |
310c69 |
if (packer == NULL) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
InputBin *input;
|
|
Packit Service |
310c69 |
while ((input = getFullestBin(packer)) != NULL) {
|
|
Packit Service |
310c69 |
unspliceRingNode(&input->ring);
|
|
Packit Service |
310c69 |
FREE(input);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
FREE(packer->canceledBin);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
OutputBin *output;
|
|
Packit Service |
310c69 |
while ((output = popOutputBin(packer)) != NULL) {
|
|
Packit Service |
310c69 |
freeOutputBin(&output);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
freeAllocationSelector(&packer->selector);
|
|
Packit Service |
310c69 |
FREE(packer);
|
|
Packit Service |
310c69 |
*packerPtr = NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Get the Packer from a DataVIO.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param dataVIO The DataVIO
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return The Packer from the VDO to which the DataVIO belongs
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static inline Packer *getPackerFromDataVIO(DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
return getVDOFromDataVIO(dataVIO)->packer;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
bool isSufficientlyCompressible(DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
Packer *packer = getPackerFromDataVIO(dataVIO);
|
|
Packit Service |
310c69 |
return (dataVIO->compression.size < packer->binDataSize);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
ThreadID getPackerThreadID(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
return packer->threadID;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
PackerStatistics getPackerStatistics(const Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* This is called from getVDOStatistics(), which is called from outside the
|
|
Packit Service |
310c69 |
* packer thread. These are just statistics with no semantics that could
|
|
Packit Service |
310c69 |
* rely on memory order, so unfenced reads are sufficient.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
return (PackerStatistics) {
|
|
Packit Service |
310c69 |
.compressedFragmentsWritten = relaxedLoad64(&packer->fragmentsWritten),
|
|
Packit Service |
310c69 |
.compressedBlocksWritten = relaxedLoad64(&packer->blocksWritten),
|
|
Packit Service |
310c69 |
.compressedFragmentsInPacker = relaxedLoad64(&packer->fragmentsPending),
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Abort packing a DataVIO.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param dataVIO The DataVIO to abort
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void abortPacking(DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
setCompressionDone(dataVIO);
|
|
Packit Service |
310c69 |
relaxedAdd64(&getPackerFromDataVIO(dataVIO)->fragmentsPending, -1);
|
|
Packit Service |
310c69 |
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
|
|
Packit Service |
310c69 |
continueDataVIO(dataVIO, VDO_SUCCESS);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* This continues the VIO completion without packing the VIO.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param waiter The wait queue entry of the VIO to continue
|
|
Packit Service |
310c69 |
* @param unused An argument required so this function may be called
|
|
Packit Service |
310c69 |
* from notifyAllWaiters
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void continueVIOWithoutPacking(Waiter *waiter,
|
|
Packit Service |
310c69 |
void *unused __attribute__((unused)))
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
abortPacking(waiterAsDataVIO(waiter));
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Check whether the packer has drained.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void checkForDrainComplete(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (isDraining(&packer->state)
|
|
Packit Service |
310c69 |
&& (packer->canceledBin->slotsUsed == 0)
|
|
Packit Service |
310c69 |
&& (packer->idleOutputBinCount == packer->outputBinCount)) {
|
|
Packit Service |
310c69 |
finishDraining(&packer->state);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
static void writePendingBatches(Packer *packer);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Ensure that a completion is running on the packer thread.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param completion The compressed write VIO
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return true if the completion is on the packer thread
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static bool switchToPackerThread(VDOCompletion *completion)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
VIO *vio = asVIO(completion);
|
|
Packit Service |
310c69 |
ThreadID threadID = vio->vdo->packer->threadID;
|
|
Packit Service |
310c69 |
if (completion->callbackThreadID == threadID) {
|
|
Packit Service |
310c69 |
return true;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
completion->callbackThreadID = threadID;
|
|
Packit Service |
310c69 |
invokeCallback(completion);
|
|
Packit Service |
310c69 |
return false;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Finish processing an output bin whose write has completed. If there was
|
|
Packit Service |
310c69 |
* an error, any DataVIOs waiting on the bin write will be notified.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer which owns the bin
|
|
Packit Service |
310c69 |
* @param bin The bin which has finished
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void finishOutputBin(Packer *packer, OutputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (hasWaiters(&bin->outgoing)) {
|
|
Packit Service |
310c69 |
notifyAllWaiters(&bin->outgoing, continueVIOWithoutPacking, NULL);
|
|
Packit Service |
310c69 |
} else {
|
|
Packit Service |
310c69 |
// No waiters implies no error, so the compressed block was written.
|
|
Packit Service |
310c69 |
relaxedAdd64(&packer->fragmentsPending, -bin->slotsUsed);
|
|
Packit Service |
310c69 |
relaxedAdd64(&packer->fragmentsWritten, bin->slotsUsed);
|
|
Packit Service |
310c69 |
relaxedAdd64(&packer->blocksWritten, 1);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
bin->slotsUsed = 0;
|
|
Packit Service |
310c69 |
pushOutputBin(packer, bin);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* This finishes the bin write process after the bin is written to disk. This
|
|
Packit Service |
310c69 |
* is the VIO callback function registered by writeOutputBin().
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param completion The compressed write VIO
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void completeOutputBin(VDOCompletion *completion)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (!switchToPackerThread(completion)) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
VIO *vio = asVIO(completion);
|
|
Packit Service |
310c69 |
if (completion->result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
updateVIOErrorStats(vio,
|
|
Packit Service |
310c69 |
"Completing compressed write VIO for physical block %"
|
|
Packit Service |
310c69 |
PRIu64 " with error",
|
|
Packit Service |
310c69 |
vio->physical);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
Packer *packer = vio->vdo->packer;
|
|
Packit Service |
310c69 |
finishOutputBin(packer, completion->parent);
|
|
Packit Service |
310c69 |
writePendingBatches(packer);
|
|
Packit Service |
310c69 |
checkForDrainComplete(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Implements WaiterCallback. Continues the DataVIO waiter.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void continueWaiter(Waiter *waiter,
|
|
Packit Service |
310c69 |
void *context __attribute__((unused)))
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
DataVIO *dataVIO = waiterAsDataVIO(waiter);
|
|
Packit Service |
310c69 |
continueDataVIO(dataVIO, VDO_SUCCESS);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Implements WaiterCallback. Updates the DataVIO waiter to refer to its slot
|
|
Packit Service |
310c69 |
* in the compressed block, gives the DataVIO a share of the PBN lock on that
|
|
Packit Service |
310c69 |
* block, and reserves a reference count increment on the lock.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void shareCompressedBlock(Waiter *waiter, void *context)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
DataVIO *dataVIO = waiterAsDataVIO(waiter);
|
|
Packit Service |
310c69 |
OutputBin *bin = context;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
dataVIO->newMapped = (ZonedPBN) {
|
|
Packit Service |
310c69 |
.pbn = bin->writer->allocation,
|
|
Packit Service |
310c69 |
.zone = bin->writer->zone,
|
|
Packit Service |
310c69 |
.state = getStateForSlot(dataVIO->compression.slot),
|
|
Packit Service |
310c69 |
};
|
|
Packit Service |
310c69 |
dataVIOAsVIO(dataVIO)->physical = dataVIO->newMapped.pbn;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
shareCompressedWriteLock(dataVIO, bin->writer->allocationLock);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Wait again for all the waiters to get a share.
|
|
Packit Service |
310c69 |
int result = enqueueWaiter(&bin->outgoing, waiter);
|
|
Packit Service |
310c69 |
// Cannot fail since this waiter was just dequeued.
|
|
Packit Service |
310c69 |
ASSERT_LOG_ONLY(result == VDO_SUCCESS, "impossible enqueueWaiter error");
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Finish a compressed block write. This callback is registered in
|
|
Packit Service |
310c69 |
* continueAfterAllocation().
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param completion The compressed write completion
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void finishCompressedWrite(VDOCompletion *completion)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
OutputBin *bin = completion->parent;
|
|
Packit Service |
310c69 |
assertInPhysicalZone(bin->writer);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (completion->result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
releaseAllocationLock(bin->writer);
|
|
Packit Service |
310c69 |
// Invokes completeOutputBin() on the packer thread, which will deal with
|
|
Packit Service |
310c69 |
// the waiters.
|
|
Packit Service |
310c69 |
vioDoneCallback(completion);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// First give every DataVIO/HashLock a share of the PBN lock to ensure it
|
|
Packit Service |
310c69 |
// can't be released until they've all done their incRefs.
|
|
Packit Service |
310c69 |
notifyAllWaiters(&bin->outgoing, shareCompressedBlock, bin);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// The waiters now hold the (downgraded) PBN lock.
|
|
Packit Service |
310c69 |
bin->writer->allocationLock = NULL;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Invokes the callbacks registered before entering the packer.
|
|
Packit Service |
310c69 |
notifyAllWaiters(&bin->outgoing, continueWaiter, NULL);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Invokes completeOutputBin() on the packer thread.
|
|
Packit Service |
310c69 |
vioDoneCallback(completion);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Continue the write path for a compressed write AllocatingVIO now that block
|
|
Packit Service |
310c69 |
* allocation is complete (the AllocatingVIO may or may not have actually
|
|
Packit Service |
310c69 |
* received an allocation).
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param allocatingVIO The AllocatingVIO which has finished the allocation
|
|
Packit Service |
310c69 |
* process
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void continueAfterAllocation(AllocatingVIO *allocatingVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
VIO *vio = allocatingVIOAsVIO(allocatingVIO);
|
|
Packit Service |
310c69 |
VDOCompletion *completion = vioAsCompletion(vio);
|
|
Packit Service |
310c69 |
if (allocatingVIO->allocation == ZERO_BLOCK) {
|
|
Packit Service |
310c69 |
completion->requeue = true;
|
|
Packit Service |
310c69 |
setCompletionResult(completion, VDO_NO_SPACE);
|
|
Packit Service |
310c69 |
vioDoneCallback(completion);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
setPhysicalZoneCallback(allocatingVIO, finishCompressedWrite,
|
|
Packit Service |
310c69 |
THIS_LOCATION("$F(meta);cb=finishCompressedWrite"));
|
|
Packit Service |
310c69 |
completion->layer->writeCompressedBlock(allocatingVIO);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Launch an output bin.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer which owns the bin
|
|
Packit Service |
310c69 |
* @param bin The output bin to launch
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void launchCompressedWrite(Packer *packer, OutputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (isReadOnly(getVDOFromAllocatingVIO(bin->writer)->readOnlyNotifier)) {
|
|
Packit Service |
310c69 |
finishOutputBin(packer, bin);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
VIO *vio = allocatingVIOAsVIO(bin->writer);
|
|
Packit Service |
310c69 |
resetCompletion(vioAsCompletion(vio));
|
|
Packit Service |
310c69 |
vio->callback = completeOutputBin;
|
|
Packit Service |
310c69 |
vio->priority = VIO_PRIORITY_COMPRESSED_DATA;
|
|
Packit Service |
310c69 |
allocateDataBlock(bin->writer, packer->selector, VIO_COMPRESSED_WRITE_LOCK,
|
|
Packit Service |
310c69 |
continueAfterAllocation);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Consume from the pending queue the next batch of VIOs that can be packed
|
|
Packit Service |
310c69 |
* together in a single compressed block. VIOs that have been mooted since
|
|
Packit Service |
310c69 |
* being placed in the pending queue will not be returned.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param batch The counted array to fill with the next batch of VIOs
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void getNextBatch(Packer *packer, OutputBatch *batch)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
BlockSize spaceRemaining = packer->binDataSize;
|
|
Packit Service |
310c69 |
batch->slotsUsed = 0;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
DataVIO *dataVIO;
|
|
Packit Service |
310c69 |
while ((dataVIO = waiterAsDataVIO(getFirstWaiter(&packer->batchedDataVIOs)))
|
|
Packit Service |
310c69 |
!= NULL) {
|
|
Packit Service |
310c69 |
// If there's not enough space for the next DataVIO, the batch is done.
|
|
Packit Service |
310c69 |
if ((dataVIO->compression.size > spaceRemaining)
|
|
Packit Service |
310c69 |
|| (batch->slotsUsed == packer->maxSlots)) {
|
|
Packit Service |
310c69 |
break;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Remove the next DataVIO from the queue and put it in the output batch.
|
|
Packit Service |
310c69 |
dequeueNextWaiter(&packer->batchedDataVIOs);
|
|
Packit Service |
310c69 |
batch->slots[batch->slotsUsed++] = dataVIO;
|
|
Packit Service |
310c69 |
spaceRemaining -= dataVIO->compression.size;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Pack the next batch of compressed VIOs from the batched queue into an
|
|
Packit Service |
310c69 |
* output bin and write the output bin.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param output The output bin to fill
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @return true if a write was issued for the output bin
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static bool writeNextBatch(Packer *packer, OutputBin *output)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
OutputBatch batch;
|
|
Packit Service |
310c69 |
getNextBatch(packer, &batch);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (batch.slotsUsed == 0) {
|
|
Packit Service |
310c69 |
// The pending queue must now be empty (there may have been mooted VIOs).
|
|
Packit Service |
310c69 |
return false;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// If the batch contains only a single VIO, then we save nothing by saving
|
|
Packit Service |
310c69 |
// the compressed form. Continue processing the single VIO in the batch.
|
|
Packit Service |
310c69 |
if (batch.slotsUsed == 1) {
|
|
Packit Service |
310c69 |
abortPacking(batch.slots[0]);
|
|
Packit Service |
310c69 |
return false;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
resetCompressedBlockHeader(&output->block->header);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
size_t spaceUsed = 0;
|
|
Packit Service |
310c69 |
for (SlotNumber slot = 0; slot < batch.slotsUsed; slot++) {
|
|
Packit Service |
310c69 |
DataVIO *dataVIO = batch.slots[slot];
|
|
Packit Service |
310c69 |
dataVIO->compression.slot = slot;
|
|
Packit Service |
310c69 |
putCompressedBlockFragment(output->block, slot, spaceUsed,
|
|
Packit Service |
310c69 |
dataVIO->compression.data,
|
|
Packit Service |
310c69 |
dataVIO->compression.size);
|
|
Packit Service |
310c69 |
spaceUsed += dataVIO->compression.size;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
int result = enqueueDataVIO(&output->outgoing, dataVIO,
|
|
Packit Service |
310c69 |
THIS_LOCATION(NULL));
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
abortPacking(dataVIO);
|
|
Packit Service |
310c69 |
continue;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
output->slotsUsed += 1;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
launchCompressedWrite(packer, output);
|
|
Packit Service |
310c69 |
return true;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Put a DataVIO in a specific InputBin in which it will definitely fit.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param bin The bin in which to put the DataVIO
|
|
Packit Service |
310c69 |
* @param dataVIO The DataVIO to add
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void addToInputBin(InputBin *bin, DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
dataVIO->compression.bin = bin;
|
|
Packit Service |
310c69 |
dataVIO->compression.slot = bin->slotsUsed;
|
|
Packit Service |
310c69 |
bin->incoming[bin->slotsUsed++] = dataVIO;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Start a new batch of VIOs in an InputBin, moving the existing batch, if
|
|
Packit Service |
310c69 |
* any, to the queue of pending batched VIOs in the packer.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param bin The bin to prepare
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void startNewBatch(Packer *packer, InputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// Move all the DataVIOs in the current batch to the batched queue so they
|
|
Packit Service |
310c69 |
// will get packed into the next free output bin.
|
|
Packit Service |
310c69 |
for (SlotNumber slot = 0; slot < bin->slotsUsed; slot++) {
|
|
Packit Service |
310c69 |
DataVIO *dataVIO = bin->incoming[slot];
|
|
Packit Service |
310c69 |
dataVIO->compression.bin = NULL;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
if (!mayWriteCompressedDataVIO(dataVIO)) {
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Compression of this DataVIO was canceled while it was waiting; put it
|
|
Packit Service |
310c69 |
* in the canceled bin so it can be rendezvous with the canceling
|
|
Packit Service |
310c69 |
* DataVIO.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
addToInputBin(packer->canceledBin, dataVIO);
|
|
Packit Service |
310c69 |
continue;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
int result = enqueueDataVIO(&packer->batchedDataVIOs, dataVIO,
|
|
Packit Service |
310c69 |
THIS_LOCATION(NULL));
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
// Impossible but we're required to check the result from enqueue.
|
|
Packit Service |
310c69 |
abortPacking(dataVIO);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// The bin is now empty.
|
|
Packit Service |
310c69 |
bin->slotsUsed = 0;
|
|
Packit Service |
310c69 |
bin->freeSpace = packer->binDataSize;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Add a DataVIO to a bin's incoming queue, handle logical space change, and
|
|
Packit Service |
310c69 |
* call physical space processor.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param bin The bin to which to add the the DataVIO
|
|
Packit Service |
310c69 |
* @param dataVIO The DataVIO to add to the bin's queue
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void addDataVIOToInputBin(Packer *packer,
|
|
Packit Service |
310c69 |
InputBin *bin,
|
|
Packit Service |
310c69 |
DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// If the selected bin doesn't have room, start a new batch to make room.
|
|
Packit Service |
310c69 |
if (bin->freeSpace < dataVIO->compression.size) {
|
|
Packit Service |
310c69 |
startNewBatch(packer, bin);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
addToInputBin(bin, dataVIO);
|
|
Packit Service |
310c69 |
bin->freeSpace -= dataVIO->compression.size;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// If we happen to exactly fill the bin, start a new input batch.
|
|
Packit Service |
310c69 |
if ((bin->slotsUsed == packer->maxSlots) || (bin->freeSpace == 0)) {
|
|
Packit Service |
310c69 |
startNewBatch(packer, bin);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Now that we've finished changing the free space, restore the sort order.
|
|
Packit Service |
310c69 |
insertInSortedList(packer, bin);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Move DataVIOs in pending batches from the batchedDataVIOs to all free output
|
|
Packit Service |
310c69 |
* bins, issuing writes for the output bins as they are packed. This will loop
|
|
Packit Service |
310c69 |
* until either the pending queue is drained or all output bins are busy
|
|
Packit Service |
310c69 |
* writing a compressed block.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void writePendingBatches(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (packer->writingBatches) {
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* We've attempted to re-enter this function recursively due to completion
|
|
Packit Service |
310c69 |
* handling, which can lead to kernel stack overflow as in VDO-1340. It's
|
|
Packit Service |
310c69 |
* perfectly safe to break the recursion and do nothing since we know any
|
|
Packit Service |
310c69 |
* pending batches will eventually be handled by the earlier call.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// Record that we are in this function for the above check. IMPORTANT: never
|
|
Packit Service |
310c69 |
// return from this function without clearing this flag.
|
|
Packit Service |
310c69 |
packer->writingBatches = true;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
OutputBin *output;
|
|
Packit Service |
310c69 |
while (hasWaiters(&packer->batchedDataVIOs)
|
|
Packit Service |
310c69 |
&& ((output = popOutputBin(packer)) != NULL)) {
|
|
Packit Service |
310c69 |
if (!writeNextBatch(packer, output)) {
|
|
Packit Service |
310c69 |
// We didn't use the output bin to write, so push it back on the stack.
|
|
Packit Service |
310c69 |
pushOutputBin(packer, output);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
packer->writingBatches = false;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Select the input bin that should be used to pack the compressed data in a
|
|
Packit Service |
310c69 |
* DataVIO with other DataVIOs.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer
|
|
Packit Service |
310c69 |
* @param dataVIO The DataVIO
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
__attribute__((warn_unused_result))
|
|
Packit Service |
310c69 |
static InputBin *selectInputBin(Packer *packer, DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
// First best fit: select the bin with the least free space that has enough
|
|
Packit Service |
310c69 |
// room for the compressed data in the DataVIO.
|
|
Packit Service |
310c69 |
InputBin *fullestBin = getFullestBin(packer);
|
|
Packit Service |
310c69 |
for (InputBin *bin = fullestBin; bin != NULL; bin = nextBin(packer, bin)) {
|
|
Packit Service |
310c69 |
if (bin->freeSpace >= dataVIO->compression.size) {
|
|
Packit Service |
310c69 |
return bin;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* None of the bins have enough space for the DataVIO. We're not allowed to
|
|
Packit Service |
310c69 |
* create new bins, so we have to overflow one of the existing bins. It's
|
|
Packit Service |
310c69 |
* pretty intuitive to select the fullest bin, since that "wastes" the least
|
|
Packit Service |
310c69 |
* amount of free space in the compressed block. But if the space currently
|
|
Packit Service |
310c69 |
* used in the fullest bin is smaller than the compressed size of the
|
|
Packit Service |
310c69 |
* incoming block, it seems wrong to force that bin to write when giving up
|
|
Packit Service |
310c69 |
* on compressing the incoming DataVIO would likewise "waste" the the least
|
|
Packit Service |
310c69 |
* amount of free space.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
if (dataVIO->compression.size
|
|
Packit Service |
310c69 |
>= (packer->binDataSize - fullestBin->freeSpace)) {
|
|
Packit Service |
310c69 |
return NULL;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// The fullest bin doesn't have room, but writing it out and starting a new
|
|
Packit Service |
310c69 |
// batch with the incoming DataVIO will increase the packer's free space.
|
|
Packit Service |
310c69 |
return fullestBin;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void attemptPacking(DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
Packer *packer = getPackerFromDataVIO(dataVIO);
|
|
Packit Service |
310c69 |
assertOnPackerThread(packer, __func__);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
VIOCompressionState state = getCompressionState(dataVIO);
|
|
Packit Service |
310c69 |
int result = ASSERT((state.status == VIO_COMPRESSING),
|
|
Packit Service |
310c69 |
"attempt to pack DataVIO not ready for packing, state: "
|
|
Packit Service |
310c69 |
"%u",
|
|
Packit Service |
310c69 |
state.status);
|
|
Packit Service |
310c69 |
if (result != VDO_SUCCESS) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* Increment whether or not this DataVIO will be packed or not since
|
|
Packit Service |
310c69 |
* abortPacking() always decrements the counter.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
relaxedAdd64(&packer->fragmentsPending, 1);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// If packing of this DataVIO is disallowed for administrative reasons, give
|
|
Packit Service |
310c69 |
// up before making any state changes.
|
|
Packit Service |
310c69 |
if (!isNormal(&packer->state)
|
|
Packit Service |
310c69 |
|| (dataVIO->flushGeneration < packer->flushGeneration)) {
|
|
Packit Service |
310c69 |
abortPacking(dataVIO);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* The check of mayBlockInPacker() here will set the DataVIO's compression
|
|
Packit Service |
310c69 |
* state to VIO_PACKING if the DataVIO is allowed to be compressed (if it has
|
|
Packit Service |
310c69 |
* already been canceled, we'll fall out here). Once the DataVIO is in the
|
|
Packit Service |
310c69 |
* VIO_PACKING state, it must be guaranteed to be put in an input bin before
|
|
Packit Service |
310c69 |
* any more requests can be processed by the packer thread. Otherwise, a
|
|
Packit Service |
310c69 |
* canceling DataVIO could attempt to remove the canceled DataVIO from the
|
|
Packit Service |
310c69 |
* packer and fail to rendezvous with it (VDO-2809). We must also make sure
|
|
Packit Service |
310c69 |
* that we will actually bin the DataVIO and not give up on it as being
|
|
Packit Service |
310c69 |
* larger than the space used in the fullest bin. Hence we must call
|
|
Packit Service |
310c69 |
* selectInputBin() before calling mayBlockInPacker() (VDO-2826).
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
InputBin *bin = selectInputBin(packer, dataVIO);
|
|
Packit Service |
310c69 |
if ((bin == NULL) || !mayBlockInPacker(dataVIO)) {
|
|
Packit Service |
310c69 |
abortPacking(dataVIO);
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
addDataVIOToInputBin(packer, bin, dataVIO);
|
|
Packit Service |
310c69 |
writePendingBatches(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Force a pending write for all non-empty bins on behalf of a flush or
|
|
Packit Service |
310c69 |
* suspend.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* @param packer The packer being flushed
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void writeAllNonEmptyBins(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
for (InputBin *bin = getFullestBin(packer);
|
|
Packit Service |
310c69 |
bin != NULL;
|
|
Packit Service |
310c69 |
bin = nextBin(packer, bin)) {
|
|
Packit Service |
310c69 |
startNewBatch(packer, bin);
|
|
Packit Service |
310c69 |
// We don't need to re-sort the bin here since this loop will make every
|
|
Packit Service |
310c69 |
// bin have the same amount of free space, so every ordering is sorted.
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
writePendingBatches(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void flushPacker(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
assertOnPackerThread(packer, __func__);
|
|
Packit Service |
310c69 |
if (isNormal(&packer->state)) {
|
|
Packit Service |
310c69 |
writeAllNonEmptyBins(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/*
|
|
Packit Service |
310c69 |
* This method is only exposed for unit tests and should not normally be called
|
|
Packit Service |
310c69 |
* directly; use removeLockHolderFromPacker() instead.
|
|
Packit Service |
310c69 |
*/
|
|
Packit Service |
310c69 |
void removeFromPacker(DataVIO *dataVIO)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
InputBin *bin = dataVIO->compression.bin;
|
|
Packit Service |
310c69 |
ASSERT_LOG_ONLY((bin != NULL), "DataVIO in packer has an input bin");
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
SlotNumber slot = dataVIO->compression.slot;
|
|
Packit Service |
310c69 |
bin->slotsUsed--;
|
|
Packit Service |
310c69 |
if (slot < bin->slotsUsed) {
|
|
Packit Service |
310c69 |
bin->incoming[slot] = bin->incoming[bin->slotsUsed];
|
|
Packit Service |
310c69 |
bin->incoming[slot]->compression.slot = slot;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
dataVIO->compression.bin = NULL;
|
|
Packit Service |
310c69 |
dataVIO->compression.slot = 0;
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
Packer *packer = getPackerFromDataVIO(dataVIO);
|
|
Packit Service |
310c69 |
if (bin != packer->canceledBin) {
|
|
Packit Service |
310c69 |
bin->freeSpace += dataVIO->compression.size;
|
|
Packit Service |
310c69 |
insertInSortedList(packer, bin);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
abortPacking(dataVIO);
|
|
Packit Service |
310c69 |
checkForDrainComplete(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void removeLockHolderFromPacker(VDOCompletion *completion)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
DataVIO *dataVIO = asDataVIO(completion);
|
|
Packit Service |
310c69 |
assertInPackerZone(dataVIO);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
DataVIO *lockHolder = dataVIO->compression.lockHolder;
|
|
Packit Service |
310c69 |
dataVIO->compression.lockHolder = NULL;
|
|
Packit Service |
310c69 |
removeFromPacker(lockHolder);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void incrementPackerFlushGeneration(Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
assertOnPackerThread(packer, __func__);
|
|
Packit Service |
310c69 |
packer->flushGeneration++;
|
|
Packit Service |
310c69 |
flushPacker(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**
|
|
Packit Service |
310c69 |
* Initiate a drain.
|
|
Packit Service |
310c69 |
*
|
|
Packit Service |
310c69 |
* Implements AdminInitiator.
|
|
Packit Service |
310c69 |
**/
|
|
Packit Service |
310c69 |
static void initiateDrain(AdminState *state)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
Packer *packer = container_of(state, Packer, state);
|
|
Packit Service |
310c69 |
writeAllNonEmptyBins(packer);
|
|
Packit Service |
310c69 |
checkForDrainComplete(packer);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void drainPacker(Packer *packer, VDOCompletion *completion)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
assertOnPackerThread(packer, __func__);
|
|
Packit Service |
310c69 |
startDraining(&packer->state, ADMIN_STATE_SUSPENDING, completion,
|
|
Packit Service |
310c69 |
initiateDrain);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void resumePacker(Packer *packer, VDOCompletion *parent)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
assertOnPackerThread(packer, __func__);
|
|
Packit Service |
310c69 |
finishCompletion(parent, resumeIfQuiescent(&packer->state));
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void resetSlotCount(Packer *packer, CompressedFragmentCount slots)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (slots > MAX_COMPRESSION_SLOTS) {
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
packer->maxSlots = slots;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
static void dumpInputBin(const InputBin *bin, bool canceled)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
if (bin->slotsUsed == 0) {
|
|
Packit Service |
310c69 |
// Don't dump empty input bins.
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
logInfo(" %sBin slotsUsed=%u freeSpace=%zu",
|
|
Packit Service |
310c69 |
(canceled ? "Canceled" : "Input"), bin->slotsUsed, bin->freeSpace);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// XXX dump VIOs in bin->incoming? The VIOs should have been dumped from the
|
|
Packit Service |
310c69 |
// VIO pool. Maybe just dump their addresses so it's clear they're here?
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
static void dumpOutputBin(const OutputBin *bin)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
size_t count = countWaiters(&bin->outgoing);
|
|
Packit Service |
310c69 |
if (bin->slotsUsed == 0) {
|
|
Packit Service |
310c69 |
// Don't dump empty output bins.
|
|
Packit Service |
310c69 |
return;
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
logInfo(" OutputBin contains %zu outgoing waiters", count);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// XXX dump VIOs in bin->outgoing? The VIOs should have been dumped from the
|
|
Packit Service |
310c69 |
// VIO pool. Maybe just dump their addresses so it's clear they're here?
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
// XXX dump writer VIO?
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
/**********************************************************************/
|
|
Packit Service |
310c69 |
void dumpPacker(const Packer *packer)
|
|
Packit Service |
310c69 |
{
|
|
Packit Service |
310c69 |
logInfo("Packer");
|
|
Packit Service |
310c69 |
logInfo(" flushGeneration=%llu state %s writingBatches=%s",
|
|
Packit Service |
310c69 |
packer->flushGeneration, getAdminStateName(&packer->state),
|
|
Packit Service |
310c69 |
boolToString(packer->writingBatches));
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
logInfo(" inputBinCount=%llu", packer->size);
|
|
Packit Service |
310c69 |
for (InputBin *bin = getFullestBin(packer);
|
|
Packit Service |
310c69 |
bin != NULL;
|
|
Packit Service |
310c69 |
bin = nextBin(packer, bin)) {
|
|
Packit Service |
310c69 |
dumpInputBin(bin, false);
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
dumpInputBin(packer->canceledBin, true);
|
|
Packit Service |
310c69 |
|
|
Packit Service |
310c69 |
logInfo(" outputBinCount=%zu idleOutputBinCount=%zu",
|
|
Packit Service |
310c69 |
packer->outputBinCount, packer->idleOutputBinCount);
|
|
Packit Service |
310c69 |
const RingNode *head = &packer->outputBins;
|
|
Packit Service |
310c69 |
for (RingNode *node = head->next; node != head; node = node->next) {
|
|
Packit Service |
310c69 |
dumpOutputBin(outputBinFromRingNode(node));
|
|
Packit Service |
310c69 |
}
|
|
Packit Service |
310c69 |
}
|