/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/packer.c#8 $
*/
#include "packerInternals.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "adminState.h"
#include "allocatingVIO.h"
#include "allocationSelector.h"
#include "compressionState.h"
#include "dataVIO.h"
#include "hashLock.h"
#include "pbnLock.h"
#include "vdo.h"
#include "vdoInternal.h"
/**
* Check that we are on the packer thread.
*
* @param packer The packer
* @param caller The function which is asserting
**/
static inline void assertOnPackerThread(Packer *packer, const char *caller)
{
ASSERT_LOG_ONLY((getCallbackThreadID() == packer->threadID),
"%s() called from packer thread", caller);
}
/**********************************************************************/
__attribute__((warn_unused_result))
static inline InputBin *inputBinFromRingNode(RingNode *node)
{
STATIC_ASSERT(offsetof(InputBin, ring) == 0);
return (InputBin *) node;
}
/**********************************************************************/
__attribute__((warn_unused_result))
static inline OutputBin *outputBinFromRingNode(RingNode *node)
{
STATIC_ASSERT(offsetof(OutputBin, ring) == 0);
return (OutputBin *) node;
}
/**********************************************************************/
InputBin *nextBin(const Packer *packer, InputBin *bin)
{
if (bin->ring.next == &packer->inputBins) {
return NULL;
} else {
return inputBinFromRingNode(bin->ring.next);
}
}
/**********************************************************************/
InputBin *getFullestBin(const Packer *packer)
{
if (isRingEmpty(&packer->inputBins)) {
return NULL;
} else {
return inputBinFromRingNode(packer->inputBins.next);
}
}
/**
* Insert an input bin to the list, which is in ascending order of free space.
* Since all bins are already in the list, this actually moves the bin to the
* correct position in the list.
*
* @param packer The packer
* @param bin The input bin to move to its sorted position
**/
static void insertInSortedList(Packer *packer, InputBin *bin)
{
for (InputBin *activeBin = getFullestBin(packer);
activeBin != NULL;
activeBin = nextBin(packer, activeBin)) {
if (activeBin->freeSpace > bin->freeSpace) {
pushRingNode(&activeBin->ring, &bin->ring);
return;
}
}
pushRingNode(&packer->inputBins, &bin->ring);
}
/**
* Allocate an input bin and put it into the packer's list.
*
* @param packer The packer
**/
__attribute__((warn_unused_result))
static int makeInputBin(Packer *packer)
{
InputBin *bin;
int result = ALLOCATE_EXTENDED(InputBin, MAX_COMPRESSION_SLOTS, VIO *,
__func__, &bin);
if (result != VDO_SUCCESS) {
return result;
}
bin->freeSpace = packer->binDataSize;
initializeRing(&bin->ring);
pushRingNode(&packer->inputBins, &bin->ring);
return VDO_SUCCESS;
}
/**
* Push an output bin onto the stack of idle bins.
*
* @param packer The packer
* @param bin The output bin
**/
static void pushOutputBin(Packer *packer, OutputBin *bin)
{
ASSERT_LOG_ONLY(!hasWaiters(&bin->outgoing),
"idle output bin has no waiters");
packer->idleOutputBins[packer->idleOutputBinCount++] = bin;
}
/**
* Pop an output bin off the end of the stack of idle bins.
*
* @param packer The packer
*
* @return an idle output bin, or <code>NULL</code> if there are no idle bins
**/
__attribute__((warn_unused_result))
static OutputBin *popOutputBin(Packer *packer)
{
if (packer->idleOutputBinCount == 0) {
return NULL;
}
size_t index = --packer->idleOutputBinCount;
OutputBin *bin = packer->idleOutputBins[index];
packer->idleOutputBins[index] = NULL;
return bin;
}
/**
* Allocate a new output bin and push it onto the packer's stack of idle bins.
*
* @param packer The packer
* @param layer The physical layer that will receive the compressed block
* writes from the output bin
*
* @return VDO_SUCCESS or an error code
**/
__attribute__((warn_unused_result))
static int makeOutputBin(Packer *packer, PhysicalLayer *layer)
{
OutputBin *output;
int result = ALLOCATE(1, OutputBin, __func__, &output);
if (result != VDO_SUCCESS) {
return result;
}
// Add the bin to the stack even before it's fully initialized so it will
// be freed even if we fail to initialize it below.
initializeRing(&output->ring);
pushRingNode(&packer->outputBins, &output->ring);
pushOutputBin(packer, output);
result = ALLOCATE_EXTENDED(CompressedBlock, packer->binDataSize, char,
"compressed block", &output->block);
if (result != VDO_SUCCESS) {
return result;
}
return layer->createCompressedWriteVIO(layer, output, (char *) output->block,
&output->writer);
}
/**
* Free an idle output bin and null out the reference to it.
*
* @param binPtr The reference to the output bin to free
**/
static void freeOutputBin(OutputBin **binPtr)
{
OutputBin *bin = *binPtr;
if (bin == NULL) {
return;
}
unspliceRingNode(&bin->ring);
VIO *vio = allocatingVIOAsVIO(bin->writer);
freeVIO(&vio);
FREE(bin->block);
FREE(bin);
*binPtr = NULL;
}
/**********************************************************************/
int makePacker(PhysicalLayer *layer,
BlockCount inputBinCount,
BlockCount outputBinCount,
const ThreadConfig *threadConfig,
Packer **packerPtr)
{
Packer *packer;
int result = ALLOCATE_EXTENDED(Packer, outputBinCount,
OutputBin *, __func__, &packer);
if (result != VDO_SUCCESS) {
return result;
}
packer->threadID = getPackerZoneThread(threadConfig);
packer->binDataSize = VDO_BLOCK_SIZE - sizeof(CompressedBlockHeader);
packer->size = inputBinCount;
packer->maxSlots = MAX_COMPRESSION_SLOTS;
packer->outputBinCount = outputBinCount;
initializeRing(&packer->inputBins);
initializeRing(&packer->outputBins);
result = makeAllocationSelector(threadConfig->physicalZoneCount,
packer->threadID, &packer->selector);
if (result != VDO_SUCCESS) {
freePacker(&packer);
return result;
}
for (BlockCount i = 0; i < inputBinCount; i++) {
int result = makeInputBin(packer);
if (result != VDO_SUCCESS) {
freePacker(&packer);
return result;
}
}
/*
* The canceled bin can hold up to half the number of user VIOs. Every
* canceled VIO in the bin must have a canceler for which it is waiting, and
* any canceler will only have canceled one lock holder at a time.
*/
result = ALLOCATE_EXTENDED(InputBin, MAXIMUM_USER_VIOS / 2, VIO *, __func__,
&packer->canceledBin);
if (result != VDO_SUCCESS) {
freePacker(&packer);
return result;
}
for (BlockCount i = 0; i < outputBinCount; i++) {
int result = makeOutputBin(packer, layer);
if (result != VDO_SUCCESS) {
freePacker(&packer);
return result;
}
}
*packerPtr = packer;
return VDO_SUCCESS;
}
/**********************************************************************/
void freePacker(Packer **packerPtr)
{
Packer *packer = *packerPtr;
if (packer == NULL) {
return;
}
InputBin *input;
while ((input = getFullestBin(packer)) != NULL) {
unspliceRingNode(&input->ring);
FREE(input);
}
FREE(packer->canceledBin);
OutputBin *output;
while ((output = popOutputBin(packer)) != NULL) {
freeOutputBin(&output);
}
freeAllocationSelector(&packer->selector);
FREE(packer);
*packerPtr = NULL;
}
/**
* Get the Packer from a DataVIO.
*
* @param dataVIO The DataVIO
*
* @return The Packer from the VDO to which the DataVIO belongs
**/
static inline Packer *getPackerFromDataVIO(DataVIO *dataVIO)
{
return getVDOFromDataVIO(dataVIO)->packer;
}
/**********************************************************************/
bool isSufficientlyCompressible(DataVIO *dataVIO)
{
Packer *packer = getPackerFromDataVIO(dataVIO);
return (dataVIO->compression.size < packer->binDataSize);
}
/**********************************************************************/
ThreadID getPackerThreadID(Packer *packer)
{
return packer->threadID;
}
/**********************************************************************/
PackerStatistics getPackerStatistics(const Packer *packer)
{
/*
* This is called from getVDOStatistics(), which is called from outside the
* packer thread. These are just statistics with no semantics that could
* rely on memory order, so unfenced reads are sufficient.
*/
return (PackerStatistics) {
.compressedFragmentsWritten = relaxedLoad64(&packer->fragmentsWritten),
.compressedBlocksWritten = relaxedLoad64(&packer->blocksWritten),
.compressedFragmentsInPacker = relaxedLoad64(&packer->fragmentsPending),
};
}
/**
* Abort packing a DataVIO.
*
* @param dataVIO The DataVIO to abort
**/
static void abortPacking(DataVIO *dataVIO)
{
setCompressionDone(dataVIO);
relaxedAdd64(&getPackerFromDataVIO(dataVIO)->fragmentsPending, -1);
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
continueDataVIO(dataVIO, VDO_SUCCESS);
}
/**
* This continues the VIO completion without packing the VIO.
*
* @param waiter The wait queue entry of the VIO to continue
* @param unused An argument required so this function may be called
* from notifyAllWaiters
**/
static void continueVIOWithoutPacking(Waiter *waiter,
void *unused __attribute__((unused)))
{
abortPacking(waiterAsDataVIO(waiter));
}
/**
* Check whether the packer has drained.
*
* @param packer The packer
**/
static void checkForDrainComplete(Packer *packer)
{
if (isDraining(&packer->state)
&& (packer->canceledBin->slotsUsed == 0)
&& (packer->idleOutputBinCount == packer->outputBinCount)) {
finishDraining(&packer->state);
}
}
/**********************************************************************/
static void writePendingBatches(Packer *packer);
/**
* Ensure that a completion is running on the packer thread.
*
* @param completion The compressed write VIO
*
* @return <code>true</code> if the completion is on the packer thread
**/
__attribute__((warn_unused_result))
static bool switchToPackerThread(VDOCompletion *completion)
{
VIO *vio = asVIO(completion);
ThreadID threadID = vio->vdo->packer->threadID;
if (completion->callbackThreadID == threadID) {
return true;
}
completion->callbackThreadID = threadID;
invokeCallback(completion);
return false;
}
/**
* Finish processing an output bin whose write has completed. If there was
* an error, any DataVIOs waiting on the bin write will be notified.
*
* @param packer The packer which owns the bin
* @param bin The bin which has finished
**/
static void finishOutputBin(Packer *packer, OutputBin *bin)
{
if (hasWaiters(&bin->outgoing)) {
notifyAllWaiters(&bin->outgoing, continueVIOWithoutPacking, NULL);
} else {
// No waiters implies no error, so the compressed block was written.
relaxedAdd64(&packer->fragmentsPending, -bin->slotsUsed);
relaxedAdd64(&packer->fragmentsWritten, bin->slotsUsed);
relaxedAdd64(&packer->blocksWritten, 1);
}
bin->slotsUsed = 0;
pushOutputBin(packer, bin);
}
/**
* This finishes the bin write process after the bin is written to disk. This
* is the VIO callback function registered by writeOutputBin().
*
* @param completion The compressed write VIO
**/
static void completeOutputBin(VDOCompletion *completion)
{
if (!switchToPackerThread(completion)) {
return;
}
VIO *vio = asVIO(completion);
if (completion->result != VDO_SUCCESS) {
updateVIOErrorStats(vio,
"Completing compressed write VIO for physical block %"
PRIu64 " with error",
vio->physical);
}
Packer *packer = vio->vdo->packer;
finishOutputBin(packer, completion->parent);
writePendingBatches(packer);
checkForDrainComplete(packer);
}
/**
* Implements WaiterCallback. Continues the DataVIO waiter.
**/
static void continueWaiter(Waiter *waiter,
void *context __attribute__((unused)))
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
continueDataVIO(dataVIO, VDO_SUCCESS);
}
/**
* Implements WaiterCallback. Updates the DataVIO waiter to refer to its slot
* in the compressed block, gives the DataVIO a share of the PBN lock on that
* block, and reserves a reference count increment on the lock.
**/
static void shareCompressedBlock(Waiter *waiter, void *context)
{
DataVIO *dataVIO = waiterAsDataVIO(waiter);
OutputBin *bin = context;
dataVIO->newMapped = (ZonedPBN) {
.pbn = bin->writer->allocation,
.zone = bin->writer->zone,
.state = getStateForSlot(dataVIO->compression.slot),
};
dataVIOAsVIO(dataVIO)->physical = dataVIO->newMapped.pbn;
shareCompressedWriteLock(dataVIO, bin->writer->allocationLock);
// Wait again for all the waiters to get a share.
int result = enqueueWaiter(&bin->outgoing, waiter);
// Cannot fail since this waiter was just dequeued.
ASSERT_LOG_ONLY(result == VDO_SUCCESS, "impossible enqueueWaiter error");
}
/**
* Finish a compressed block write. This callback is registered in
* continueAfterAllocation().
*
* @param completion The compressed write completion
**/
static void finishCompressedWrite(VDOCompletion *completion)
{
OutputBin *bin = completion->parent;
assertInPhysicalZone(bin->writer);
if (completion->result != VDO_SUCCESS) {
releaseAllocationLock(bin->writer);
// Invokes completeOutputBin() on the packer thread, which will deal with
// the waiters.
vioDoneCallback(completion);
return;
}
// First give every DataVIO/HashLock a share of the PBN lock to ensure it
// can't be released until they've all done their incRefs.
notifyAllWaiters(&bin->outgoing, shareCompressedBlock, bin);
// The waiters now hold the (downgraded) PBN lock.
bin->writer->allocationLock = NULL;
// Invokes the callbacks registered before entering the packer.
notifyAllWaiters(&bin->outgoing, continueWaiter, NULL);
// Invokes completeOutputBin() on the packer thread.
vioDoneCallback(completion);
}
/**
* Continue the write path for a compressed write AllocatingVIO now that block
* allocation is complete (the AllocatingVIO may or may not have actually
* received an allocation).
*
* @param allocatingVIO The AllocatingVIO which has finished the allocation
* process
**/
static void continueAfterAllocation(AllocatingVIO *allocatingVIO)
{
VIO *vio = allocatingVIOAsVIO(allocatingVIO);
VDOCompletion *completion = vioAsCompletion(vio);
if (allocatingVIO->allocation == ZERO_BLOCK) {
completion->requeue = true;
setCompletionResult(completion, VDO_NO_SPACE);
vioDoneCallback(completion);
return;
}
setPhysicalZoneCallback(allocatingVIO, finishCompressedWrite,
THIS_LOCATION("$F(meta);cb=finishCompressedWrite"));
completion->layer->writeCompressedBlock(allocatingVIO);
}
/**
* Launch an output bin.
*
* @param packer The packer which owns the bin
* @param bin The output bin to launch
**/
static void launchCompressedWrite(Packer *packer, OutputBin *bin)
{
if (isReadOnly(getVDOFromAllocatingVIO(bin->writer)->readOnlyNotifier)) {
finishOutputBin(packer, bin);
return;
}
VIO *vio = allocatingVIOAsVIO(bin->writer);
resetCompletion(vioAsCompletion(vio));
vio->callback = completeOutputBin;
vio->priority = VIO_PRIORITY_COMPRESSED_DATA;
allocateDataBlock(bin->writer, packer->selector, VIO_COMPRESSED_WRITE_LOCK,
continueAfterAllocation);
}
/**
* Consume from the pending queue the next batch of VIOs that can be packed
* together in a single compressed block. VIOs that have been mooted since
* being placed in the pending queue will not be returned.
*
* @param packer The packer
* @param batch The counted array to fill with the next batch of VIOs
**/
static void getNextBatch(Packer *packer, OutputBatch *batch)
{
BlockSize spaceRemaining = packer->binDataSize;
batch->slotsUsed = 0;
DataVIO *dataVIO;
while ((dataVIO = waiterAsDataVIO(getFirstWaiter(&packer->batchedDataVIOs)))
!= NULL) {
// If there's not enough space for the next DataVIO, the batch is done.
if ((dataVIO->compression.size > spaceRemaining)
|| (batch->slotsUsed == packer->maxSlots)) {
break;
}
// Remove the next DataVIO from the queue and put it in the output batch.
dequeueNextWaiter(&packer->batchedDataVIOs);
batch->slots[batch->slotsUsed++] = dataVIO;
spaceRemaining -= dataVIO->compression.size;
}
}
/**
* Pack the next batch of compressed VIOs from the batched queue into an
* output bin and write the output bin.
*
* @param packer The packer
* @param output The output bin to fill
*
* @return <code>true</code> if a write was issued for the output bin
**/
__attribute__((warn_unused_result))
static bool writeNextBatch(Packer *packer, OutputBin *output)
{
OutputBatch batch;
getNextBatch(packer, &batch);
if (batch.slotsUsed == 0) {
// The pending queue must now be empty (there may have been mooted VIOs).
return false;
}
// If the batch contains only a single VIO, then we save nothing by saving
// the compressed form. Continue processing the single VIO in the batch.
if (batch.slotsUsed == 1) {
abortPacking(batch.slots[0]);
return false;
}
resetCompressedBlockHeader(&output->block->header);
size_t spaceUsed = 0;
for (SlotNumber slot = 0; slot < batch.slotsUsed; slot++) {
DataVIO *dataVIO = batch.slots[slot];
dataVIO->compression.slot = slot;
putCompressedBlockFragment(output->block, slot, spaceUsed,
dataVIO->compression.data,
dataVIO->compression.size);
spaceUsed += dataVIO->compression.size;
int result = enqueueDataVIO(&output->outgoing, dataVIO,
THIS_LOCATION(NULL));
if (result != VDO_SUCCESS) {
abortPacking(dataVIO);
continue;
}
output->slotsUsed += 1;
}
launchCompressedWrite(packer, output);
return true;
}
/**
* Put a DataVIO in a specific InputBin in which it will definitely fit.
*
* @param bin The bin in which to put the DataVIO
* @param dataVIO The DataVIO to add
**/
static void addToInputBin(InputBin *bin, DataVIO *dataVIO)
{
dataVIO->compression.bin = bin;
dataVIO->compression.slot = bin->slotsUsed;
bin->incoming[bin->slotsUsed++] = dataVIO;
}
/**
* Start a new batch of VIOs in an InputBin, moving the existing batch, if
* any, to the queue of pending batched VIOs in the packer.
*
* @param packer The packer
* @param bin The bin to prepare
**/
static void startNewBatch(Packer *packer, InputBin *bin)
{
// Move all the DataVIOs in the current batch to the batched queue so they
// will get packed into the next free output bin.
for (SlotNumber slot = 0; slot < bin->slotsUsed; slot++) {
DataVIO *dataVIO = bin->incoming[slot];
dataVIO->compression.bin = NULL;
if (!mayWriteCompressedDataVIO(dataVIO)) {
/*
* Compression of this DataVIO was canceled while it was waiting; put it
* in the canceled bin so it can be rendezvous with the canceling
* DataVIO.
*/
addToInputBin(packer->canceledBin, dataVIO);
continue;
}
int result = enqueueDataVIO(&packer->batchedDataVIOs, dataVIO,
THIS_LOCATION(NULL));
if (result != VDO_SUCCESS) {
// Impossible but we're required to check the result from enqueue.
abortPacking(dataVIO);
}
}
// The bin is now empty.
bin->slotsUsed = 0;
bin->freeSpace = packer->binDataSize;
}
/**
* Add a DataVIO to a bin's incoming queue, handle logical space change, and
* call physical space processor.
*
* @param packer The packer
* @param bin The bin to which to add the the DataVIO
* @param dataVIO The DataVIO to add to the bin's queue
**/
static void addDataVIOToInputBin(Packer *packer,
InputBin *bin,
DataVIO *dataVIO)
{
// If the selected bin doesn't have room, start a new batch to make room.
if (bin->freeSpace < dataVIO->compression.size) {
startNewBatch(packer, bin);
}
addToInputBin(bin, dataVIO);
bin->freeSpace -= dataVIO->compression.size;
// If we happen to exactly fill the bin, start a new input batch.
if ((bin->slotsUsed == packer->maxSlots) || (bin->freeSpace == 0)) {
startNewBatch(packer, bin);
}
// Now that we've finished changing the free space, restore the sort order.
insertInSortedList(packer, bin);
}
/**
* Move DataVIOs in pending batches from the batchedDataVIOs to all free output
* bins, issuing writes for the output bins as they are packed. This will loop
* until either the pending queue is drained or all output bins are busy
* writing a compressed block.
*
* @param packer The packer
**/
static void writePendingBatches(Packer *packer)
{
if (packer->writingBatches) {
/*
* We've attempted to re-enter this function recursively due to completion
* handling, which can lead to kernel stack overflow as in VDO-1340. It's
* perfectly safe to break the recursion and do nothing since we know any
* pending batches will eventually be handled by the earlier call.
*/
return;
}
// Record that we are in this function for the above check. IMPORTANT: never
// return from this function without clearing this flag.
packer->writingBatches = true;
OutputBin *output;
while (hasWaiters(&packer->batchedDataVIOs)
&& ((output = popOutputBin(packer)) != NULL)) {
if (!writeNextBatch(packer, output)) {
// We didn't use the output bin to write, so push it back on the stack.
pushOutputBin(packer, output);
}
}
packer->writingBatches = false;
}
/**
* Select the input bin that should be used to pack the compressed data in a
* DataVIO with other DataVIOs.
*
* @param packer The packer
* @param dataVIO The DataVIO
**/
__attribute__((warn_unused_result))
static InputBin *selectInputBin(Packer *packer, DataVIO *dataVIO)
{
// First best fit: select the bin with the least free space that has enough
// room for the compressed data in the DataVIO.
InputBin *fullestBin = getFullestBin(packer);
for (InputBin *bin = fullestBin; bin != NULL; bin = nextBin(packer, bin)) {
if (bin->freeSpace >= dataVIO->compression.size) {
return bin;
}
}
/*
* None of the bins have enough space for the DataVIO. We're not allowed to
* create new bins, so we have to overflow one of the existing bins. It's
* pretty intuitive to select the fullest bin, since that "wastes" the least
* amount of free space in the compressed block. But if the space currently
* used in the fullest bin is smaller than the compressed size of the
* incoming block, it seems wrong to force that bin to write when giving up
* on compressing the incoming DataVIO would likewise "waste" the the least
* amount of free space.
*/
if (dataVIO->compression.size
>= (packer->binDataSize - fullestBin->freeSpace)) {
return NULL;
}
// The fullest bin doesn't have room, but writing it out and starting a new
// batch with the incoming DataVIO will increase the packer's free space.
return fullestBin;
}
/**********************************************************************/
void attemptPacking(DataVIO *dataVIO)
{
Packer *packer = getPackerFromDataVIO(dataVIO);
assertOnPackerThread(packer, __func__);
VIOCompressionState state = getCompressionState(dataVIO);
int result = ASSERT((state.status == VIO_COMPRESSING),
"attempt to pack DataVIO not ready for packing, state: "
"%u",
state.status);
if (result != VDO_SUCCESS) {
return;
}
/*
* Increment whether or not this DataVIO will be packed or not since
* abortPacking() always decrements the counter.
*/
relaxedAdd64(&packer->fragmentsPending, 1);
// If packing of this DataVIO is disallowed for administrative reasons, give
// up before making any state changes.
if (!isNormal(&packer->state)
|| (dataVIO->flushGeneration < packer->flushGeneration)) {
abortPacking(dataVIO);
return;
}
/*
* The check of mayBlockInPacker() here will set the DataVIO's compression
* state to VIO_PACKING if the DataVIO is allowed to be compressed (if it has
* already been canceled, we'll fall out here). Once the DataVIO is in the
* VIO_PACKING state, it must be guaranteed to be put in an input bin before
* any more requests can be processed by the packer thread. Otherwise, a
* canceling DataVIO could attempt to remove the canceled DataVIO from the
* packer and fail to rendezvous with it (VDO-2809). We must also make sure
* that we will actually bin the DataVIO and not give up on it as being
* larger than the space used in the fullest bin. Hence we must call
* selectInputBin() before calling mayBlockInPacker() (VDO-2826).
*/
InputBin *bin = selectInputBin(packer, dataVIO);
if ((bin == NULL) || !mayBlockInPacker(dataVIO)) {
abortPacking(dataVIO);
return;
}
addDataVIOToInputBin(packer, bin, dataVIO);
writePendingBatches(packer);
}
/**
* Force a pending write for all non-empty bins on behalf of a flush or
* suspend.
*
* @param packer The packer being flushed
**/
static void writeAllNonEmptyBins(Packer *packer)
{
for (InputBin *bin = getFullestBin(packer);
bin != NULL;
bin = nextBin(packer, bin)) {
startNewBatch(packer, bin);
// We don't need to re-sort the bin here since this loop will make every
// bin have the same amount of free space, so every ordering is sorted.
}
writePendingBatches(packer);
}
/**********************************************************************/
void flushPacker(Packer *packer)
{
assertOnPackerThread(packer, __func__);
if (isNormal(&packer->state)) {
writeAllNonEmptyBins(packer);
}
}
/*
* This method is only exposed for unit tests and should not normally be called
* directly; use removeLockHolderFromPacker() instead.
*/
void removeFromPacker(DataVIO *dataVIO)
{
InputBin *bin = dataVIO->compression.bin;
ASSERT_LOG_ONLY((bin != NULL), "DataVIO in packer has an input bin");
SlotNumber slot = dataVIO->compression.slot;
bin->slotsUsed--;
if (slot < bin->slotsUsed) {
bin->incoming[slot] = bin->incoming[bin->slotsUsed];
bin->incoming[slot]->compression.slot = slot;
}
dataVIO->compression.bin = NULL;
dataVIO->compression.slot = 0;
Packer *packer = getPackerFromDataVIO(dataVIO);
if (bin != packer->canceledBin) {
bin->freeSpace += dataVIO->compression.size;
insertInSortedList(packer, bin);
}
abortPacking(dataVIO);
checkForDrainComplete(packer);
}
/**********************************************************************/
void removeLockHolderFromPacker(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
assertInPackerZone(dataVIO);
DataVIO *lockHolder = dataVIO->compression.lockHolder;
dataVIO->compression.lockHolder = NULL;
removeFromPacker(lockHolder);
}
/**********************************************************************/
void incrementPackerFlushGeneration(Packer *packer)
{
assertOnPackerThread(packer, __func__);
packer->flushGeneration++;
flushPacker(packer);
}
/**
* Initiate a drain.
*
* Implements AdminInitiator.
**/
static void initiateDrain(AdminState *state)
{
Packer *packer = container_of(state, Packer, state);
writeAllNonEmptyBins(packer);
checkForDrainComplete(packer);
}
/**********************************************************************/
void drainPacker(Packer *packer, VDOCompletion *completion)
{
assertOnPackerThread(packer, __func__);
startDraining(&packer->state, ADMIN_STATE_SUSPENDING, completion,
initiateDrain);
}
/**********************************************************************/
void resumePacker(Packer *packer, VDOCompletion *parent)
{
assertOnPackerThread(packer, __func__);
finishCompletion(parent, resumeIfQuiescent(&packer->state));
}
/**********************************************************************/
void resetSlotCount(Packer *packer, CompressedFragmentCount slots)
{
if (slots > MAX_COMPRESSION_SLOTS) {
return;
}
packer->maxSlots = slots;
}
/**********************************************************************/
static void dumpInputBin(const InputBin *bin, bool canceled)
{
if (bin->slotsUsed == 0) {
// Don't dump empty input bins.
return;
}
logInfo(" %sBin slotsUsed=%u freeSpace=%zu",
(canceled ? "Canceled" : "Input"), bin->slotsUsed, bin->freeSpace);
// XXX dump VIOs in bin->incoming? The VIOs should have been dumped from the
// VIO pool. Maybe just dump their addresses so it's clear they're here?
}
/**********************************************************************/
static void dumpOutputBin(const OutputBin *bin)
{
size_t count = countWaiters(&bin->outgoing);
if (bin->slotsUsed == 0) {
// Don't dump empty output bins.
return;
}
logInfo(" OutputBin contains %zu outgoing waiters", count);
// XXX dump VIOs in bin->outgoing? The VIOs should have been dumped from the
// VIO pool. Maybe just dump their addresses so it's clear they're here?
// XXX dump writer VIO?
}
/**********************************************************************/
void dumpPacker(const Packer *packer)
{
logInfo("Packer");
logInfo(" flushGeneration=%llu state %s writingBatches=%s",
packer->flushGeneration, getAdminStateName(&packer->state),
boolToString(packer->writingBatches));
logInfo(" inputBinCount=%llu", packer->size);
for (InputBin *bin = getFullestBin(packer);
bin != NULL;
bin = nextBin(packer, bin)) {
dumpInputBin(bin, false);
}
dumpInputBin(packer->canceledBin, true);
logInfo(" outputBinCount=%zu idleOutputBinCount=%zu",
packer->outputBinCount, packer->idleOutputBinCount);
const RingNode *head = &packer->outputBins;
for (RingNode *node = head->next; node != head; node = node->next) {
dumpOutputBin(outputBinFromRingNode(node));
}
}