/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/packer.c#8 $ */ #include "packerInternals.h" #include "logger.h" #include "memoryAlloc.h" #include "adminState.h" #include "allocatingVIO.h" #include "allocationSelector.h" #include "compressionState.h" #include "dataVIO.h" #include "hashLock.h" #include "pbnLock.h" #include "vdo.h" #include "vdoInternal.h" /** * Check that we are on the packer thread. * * @param packer The packer * @param caller The function which is asserting **/ static inline void assertOnPackerThread(Packer *packer, const char *caller) { ASSERT_LOG_ONLY((getCallbackThreadID() == packer->threadID), "%s() called from packer thread", caller); } /**********************************************************************/ __attribute__((warn_unused_result)) static inline InputBin *inputBinFromRingNode(RingNode *node) { STATIC_ASSERT(offsetof(InputBin, ring) == 0); return (InputBin *) node; } /**********************************************************************/ __attribute__((warn_unused_result)) static inline OutputBin *outputBinFromRingNode(RingNode *node) { STATIC_ASSERT(offsetof(OutputBin, ring) == 0); return (OutputBin *) node; } /**********************************************************************/ InputBin *nextBin(const Packer *packer, InputBin *bin) { if (bin->ring.next == &packer->inputBins) { return NULL; } else { return inputBinFromRingNode(bin->ring.next); } } /**********************************************************************/ InputBin *getFullestBin(const Packer *packer) { if (isRingEmpty(&packer->inputBins)) { return NULL; } else { return inputBinFromRingNode(packer->inputBins.next); } } /** * Insert an input bin to the list, which is in ascending order of free space. * Since all bins are already in the list, this actually moves the bin to the * correct position in the list. * * @param packer The packer * @param bin The input bin to move to its sorted position **/ static void insertInSortedList(Packer *packer, InputBin *bin) { for (InputBin *activeBin = getFullestBin(packer); activeBin != NULL; activeBin = nextBin(packer, activeBin)) { if (activeBin->freeSpace > bin->freeSpace) { pushRingNode(&activeBin->ring, &bin->ring); return; } } pushRingNode(&packer->inputBins, &bin->ring); } /** * Allocate an input bin and put it into the packer's list. * * @param packer The packer **/ __attribute__((warn_unused_result)) static int makeInputBin(Packer *packer) { InputBin *bin; int result = ALLOCATE_EXTENDED(InputBin, MAX_COMPRESSION_SLOTS, VIO *, __func__, &bin); if (result != VDO_SUCCESS) { return result; } bin->freeSpace = packer->binDataSize; initializeRing(&bin->ring); pushRingNode(&packer->inputBins, &bin->ring); return VDO_SUCCESS; } /** * Push an output bin onto the stack of idle bins. * * @param packer The packer * @param bin The output bin **/ static void pushOutputBin(Packer *packer, OutputBin *bin) { ASSERT_LOG_ONLY(!hasWaiters(&bin->outgoing), "idle output bin has no waiters"); packer->idleOutputBins[packer->idleOutputBinCount++] = bin; } /** * Pop an output bin off the end of the stack of idle bins. * * @param packer The packer * * @return an idle output bin, or NULL if there are no idle bins **/ __attribute__((warn_unused_result)) static OutputBin *popOutputBin(Packer *packer) { if (packer->idleOutputBinCount == 0) { return NULL; } size_t index = --packer->idleOutputBinCount; OutputBin *bin = packer->idleOutputBins[index]; packer->idleOutputBins[index] = NULL; return bin; } /** * Allocate a new output bin and push it onto the packer's stack of idle bins. * * @param packer The packer * @param layer The physical layer that will receive the compressed block * writes from the output bin * * @return VDO_SUCCESS or an error code **/ __attribute__((warn_unused_result)) static int makeOutputBin(Packer *packer, PhysicalLayer *layer) { OutputBin *output; int result = ALLOCATE(1, OutputBin, __func__, &output); if (result != VDO_SUCCESS) { return result; } // Add the bin to the stack even before it's fully initialized so it will // be freed even if we fail to initialize it below. initializeRing(&output->ring); pushRingNode(&packer->outputBins, &output->ring); pushOutputBin(packer, output); result = ALLOCATE_EXTENDED(CompressedBlock, packer->binDataSize, char, "compressed block", &output->block); if (result != VDO_SUCCESS) { return result; } return layer->createCompressedWriteVIO(layer, output, (char *) output->block, &output->writer); } /** * Free an idle output bin and null out the reference to it. * * @param binPtr The reference to the output bin to free **/ static void freeOutputBin(OutputBin **binPtr) { OutputBin *bin = *binPtr; if (bin == NULL) { return; } unspliceRingNode(&bin->ring); VIO *vio = allocatingVIOAsVIO(bin->writer); freeVIO(&vio); FREE(bin->block); FREE(bin); *binPtr = NULL; } /**********************************************************************/ int makePacker(PhysicalLayer *layer, BlockCount inputBinCount, BlockCount outputBinCount, const ThreadConfig *threadConfig, Packer **packerPtr) { Packer *packer; int result = ALLOCATE_EXTENDED(Packer, outputBinCount, OutputBin *, __func__, &packer); if (result != VDO_SUCCESS) { return result; } packer->threadID = getPackerZoneThread(threadConfig); packer->binDataSize = VDO_BLOCK_SIZE - sizeof(CompressedBlockHeader); packer->size = inputBinCount; packer->maxSlots = MAX_COMPRESSION_SLOTS; packer->outputBinCount = outputBinCount; initializeRing(&packer->inputBins); initializeRing(&packer->outputBins); result = makeAllocationSelector(threadConfig->physicalZoneCount, packer->threadID, &packer->selector); if (result != VDO_SUCCESS) { freePacker(&packer); return result; } for (BlockCount i = 0; i < inputBinCount; i++) { int result = makeInputBin(packer); if (result != VDO_SUCCESS) { freePacker(&packer); return result; } } /* * The canceled bin can hold up to half the number of user VIOs. Every * canceled VIO in the bin must have a canceler for which it is waiting, and * any canceler will only have canceled one lock holder at a time. */ result = ALLOCATE_EXTENDED(InputBin, MAXIMUM_USER_VIOS / 2, VIO *, __func__, &packer->canceledBin); if (result != VDO_SUCCESS) { freePacker(&packer); return result; } for (BlockCount i = 0; i < outputBinCount; i++) { int result = makeOutputBin(packer, layer); if (result != VDO_SUCCESS) { freePacker(&packer); return result; } } *packerPtr = packer; return VDO_SUCCESS; } /**********************************************************************/ void freePacker(Packer **packerPtr) { Packer *packer = *packerPtr; if (packer == NULL) { return; } InputBin *input; while ((input = getFullestBin(packer)) != NULL) { unspliceRingNode(&input->ring); FREE(input); } FREE(packer->canceledBin); OutputBin *output; while ((output = popOutputBin(packer)) != NULL) { freeOutputBin(&output); } freeAllocationSelector(&packer->selector); FREE(packer); *packerPtr = NULL; } /** * Get the Packer from a DataVIO. * * @param dataVIO The DataVIO * * @return The Packer from the VDO to which the DataVIO belongs **/ static inline Packer *getPackerFromDataVIO(DataVIO *dataVIO) { return getVDOFromDataVIO(dataVIO)->packer; } /**********************************************************************/ bool isSufficientlyCompressible(DataVIO *dataVIO) { Packer *packer = getPackerFromDataVIO(dataVIO); return (dataVIO->compression.size < packer->binDataSize); } /**********************************************************************/ ThreadID getPackerThreadID(Packer *packer) { return packer->threadID; } /**********************************************************************/ PackerStatistics getPackerStatistics(const Packer *packer) { /* * This is called from getVDOStatistics(), which is called from outside the * packer thread. These are just statistics with no semantics that could * rely on memory order, so unfenced reads are sufficient. */ return (PackerStatistics) { .compressedFragmentsWritten = relaxedLoad64(&packer->fragmentsWritten), .compressedBlocksWritten = relaxedLoad64(&packer->blocksWritten), .compressedFragmentsInPacker = relaxedLoad64(&packer->fragmentsPending), }; } /** * Abort packing a DataVIO. * * @param dataVIO The DataVIO to abort **/ static void abortPacking(DataVIO *dataVIO) { setCompressionDone(dataVIO); relaxedAdd64(&getPackerFromDataVIO(dataVIO)->fragmentsPending, -1); dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL)); continueDataVIO(dataVIO, VDO_SUCCESS); } /** * This continues the VIO completion without packing the VIO. * * @param waiter The wait queue entry of the VIO to continue * @param unused An argument required so this function may be called * from notifyAllWaiters **/ static void continueVIOWithoutPacking(Waiter *waiter, void *unused __attribute__((unused))) { abortPacking(waiterAsDataVIO(waiter)); } /** * Check whether the packer has drained. * * @param packer The packer **/ static void checkForDrainComplete(Packer *packer) { if (isDraining(&packer->state) && (packer->canceledBin->slotsUsed == 0) && (packer->idleOutputBinCount == packer->outputBinCount)) { finishDraining(&packer->state); } } /**********************************************************************/ static void writePendingBatches(Packer *packer); /** * Ensure that a completion is running on the packer thread. * * @param completion The compressed write VIO * * @return true if the completion is on the packer thread **/ __attribute__((warn_unused_result)) static bool switchToPackerThread(VDOCompletion *completion) { VIO *vio = asVIO(completion); ThreadID threadID = vio->vdo->packer->threadID; if (completion->callbackThreadID == threadID) { return true; } completion->callbackThreadID = threadID; invokeCallback(completion); return false; } /** * Finish processing an output bin whose write has completed. If there was * an error, any DataVIOs waiting on the bin write will be notified. * * @param packer The packer which owns the bin * @param bin The bin which has finished **/ static void finishOutputBin(Packer *packer, OutputBin *bin) { if (hasWaiters(&bin->outgoing)) { notifyAllWaiters(&bin->outgoing, continueVIOWithoutPacking, NULL); } else { // No waiters implies no error, so the compressed block was written. relaxedAdd64(&packer->fragmentsPending, -bin->slotsUsed); relaxedAdd64(&packer->fragmentsWritten, bin->slotsUsed); relaxedAdd64(&packer->blocksWritten, 1); } bin->slotsUsed = 0; pushOutputBin(packer, bin); } /** * This finishes the bin write process after the bin is written to disk. This * is the VIO callback function registered by writeOutputBin(). * * @param completion The compressed write VIO **/ static void completeOutputBin(VDOCompletion *completion) { if (!switchToPackerThread(completion)) { return; } VIO *vio = asVIO(completion); if (completion->result != VDO_SUCCESS) { updateVIOErrorStats(vio, "Completing compressed write VIO for physical block %" PRIu64 " with error", vio->physical); } Packer *packer = vio->vdo->packer; finishOutputBin(packer, completion->parent); writePendingBatches(packer); checkForDrainComplete(packer); } /** * Implements WaiterCallback. Continues the DataVIO waiter. **/ static void continueWaiter(Waiter *waiter, void *context __attribute__((unused))) { DataVIO *dataVIO = waiterAsDataVIO(waiter); continueDataVIO(dataVIO, VDO_SUCCESS); } /** * Implements WaiterCallback. Updates the DataVIO waiter to refer to its slot * in the compressed block, gives the DataVIO a share of the PBN lock on that * block, and reserves a reference count increment on the lock. **/ static void shareCompressedBlock(Waiter *waiter, void *context) { DataVIO *dataVIO = waiterAsDataVIO(waiter); OutputBin *bin = context; dataVIO->newMapped = (ZonedPBN) { .pbn = bin->writer->allocation, .zone = bin->writer->zone, .state = getStateForSlot(dataVIO->compression.slot), }; dataVIOAsVIO(dataVIO)->physical = dataVIO->newMapped.pbn; shareCompressedWriteLock(dataVIO, bin->writer->allocationLock); // Wait again for all the waiters to get a share. int result = enqueueWaiter(&bin->outgoing, waiter); // Cannot fail since this waiter was just dequeued. ASSERT_LOG_ONLY(result == VDO_SUCCESS, "impossible enqueueWaiter error"); } /** * Finish a compressed block write. This callback is registered in * continueAfterAllocation(). * * @param completion The compressed write completion **/ static void finishCompressedWrite(VDOCompletion *completion) { OutputBin *bin = completion->parent; assertInPhysicalZone(bin->writer); if (completion->result != VDO_SUCCESS) { releaseAllocationLock(bin->writer); // Invokes completeOutputBin() on the packer thread, which will deal with // the waiters. vioDoneCallback(completion); return; } // First give every DataVIO/HashLock a share of the PBN lock to ensure it // can't be released until they've all done their incRefs. notifyAllWaiters(&bin->outgoing, shareCompressedBlock, bin); // The waiters now hold the (downgraded) PBN lock. bin->writer->allocationLock = NULL; // Invokes the callbacks registered before entering the packer. notifyAllWaiters(&bin->outgoing, continueWaiter, NULL); // Invokes completeOutputBin() on the packer thread. vioDoneCallback(completion); } /** * Continue the write path for a compressed write AllocatingVIO now that block * allocation is complete (the AllocatingVIO may or may not have actually * received an allocation). * * @param allocatingVIO The AllocatingVIO which has finished the allocation * process **/ static void continueAfterAllocation(AllocatingVIO *allocatingVIO) { VIO *vio = allocatingVIOAsVIO(allocatingVIO); VDOCompletion *completion = vioAsCompletion(vio); if (allocatingVIO->allocation == ZERO_BLOCK) { completion->requeue = true; setCompletionResult(completion, VDO_NO_SPACE); vioDoneCallback(completion); return; } setPhysicalZoneCallback(allocatingVIO, finishCompressedWrite, THIS_LOCATION("$F(meta);cb=finishCompressedWrite")); completion->layer->writeCompressedBlock(allocatingVIO); } /** * Launch an output bin. * * @param packer The packer which owns the bin * @param bin The output bin to launch **/ static void launchCompressedWrite(Packer *packer, OutputBin *bin) { if (isReadOnly(getVDOFromAllocatingVIO(bin->writer)->readOnlyNotifier)) { finishOutputBin(packer, bin); return; } VIO *vio = allocatingVIOAsVIO(bin->writer); resetCompletion(vioAsCompletion(vio)); vio->callback = completeOutputBin; vio->priority = VIO_PRIORITY_COMPRESSED_DATA; allocateDataBlock(bin->writer, packer->selector, VIO_COMPRESSED_WRITE_LOCK, continueAfterAllocation); } /** * Consume from the pending queue the next batch of VIOs that can be packed * together in a single compressed block. VIOs that have been mooted since * being placed in the pending queue will not be returned. * * @param packer The packer * @param batch The counted array to fill with the next batch of VIOs **/ static void getNextBatch(Packer *packer, OutputBatch *batch) { BlockSize spaceRemaining = packer->binDataSize; batch->slotsUsed = 0; DataVIO *dataVIO; while ((dataVIO = waiterAsDataVIO(getFirstWaiter(&packer->batchedDataVIOs))) != NULL) { // If there's not enough space for the next DataVIO, the batch is done. if ((dataVIO->compression.size > spaceRemaining) || (batch->slotsUsed == packer->maxSlots)) { break; } // Remove the next DataVIO from the queue and put it in the output batch. dequeueNextWaiter(&packer->batchedDataVIOs); batch->slots[batch->slotsUsed++] = dataVIO; spaceRemaining -= dataVIO->compression.size; } } /** * Pack the next batch of compressed VIOs from the batched queue into an * output bin and write the output bin. * * @param packer The packer * @param output The output bin to fill * * @return true if a write was issued for the output bin **/ __attribute__((warn_unused_result)) static bool writeNextBatch(Packer *packer, OutputBin *output) { OutputBatch batch; getNextBatch(packer, &batch); if (batch.slotsUsed == 0) { // The pending queue must now be empty (there may have been mooted VIOs). return false; } // If the batch contains only a single VIO, then we save nothing by saving // the compressed form. Continue processing the single VIO in the batch. if (batch.slotsUsed == 1) { abortPacking(batch.slots[0]); return false; } resetCompressedBlockHeader(&output->block->header); size_t spaceUsed = 0; for (SlotNumber slot = 0; slot < batch.slotsUsed; slot++) { DataVIO *dataVIO = batch.slots[slot]; dataVIO->compression.slot = slot; putCompressedBlockFragment(output->block, slot, spaceUsed, dataVIO->compression.data, dataVIO->compression.size); spaceUsed += dataVIO->compression.size; int result = enqueueDataVIO(&output->outgoing, dataVIO, THIS_LOCATION(NULL)); if (result != VDO_SUCCESS) { abortPacking(dataVIO); continue; } output->slotsUsed += 1; } launchCompressedWrite(packer, output); return true; } /** * Put a DataVIO in a specific InputBin in which it will definitely fit. * * @param bin The bin in which to put the DataVIO * @param dataVIO The DataVIO to add **/ static void addToInputBin(InputBin *bin, DataVIO *dataVIO) { dataVIO->compression.bin = bin; dataVIO->compression.slot = bin->slotsUsed; bin->incoming[bin->slotsUsed++] = dataVIO; } /** * Start a new batch of VIOs in an InputBin, moving the existing batch, if * any, to the queue of pending batched VIOs in the packer. * * @param packer The packer * @param bin The bin to prepare **/ static void startNewBatch(Packer *packer, InputBin *bin) { // Move all the DataVIOs in the current batch to the batched queue so they // will get packed into the next free output bin. for (SlotNumber slot = 0; slot < bin->slotsUsed; slot++) { DataVIO *dataVIO = bin->incoming[slot]; dataVIO->compression.bin = NULL; if (!mayWriteCompressedDataVIO(dataVIO)) { /* * Compression of this DataVIO was canceled while it was waiting; put it * in the canceled bin so it can be rendezvous with the canceling * DataVIO. */ addToInputBin(packer->canceledBin, dataVIO); continue; } int result = enqueueDataVIO(&packer->batchedDataVIOs, dataVIO, THIS_LOCATION(NULL)); if (result != VDO_SUCCESS) { // Impossible but we're required to check the result from enqueue. abortPacking(dataVIO); } } // The bin is now empty. bin->slotsUsed = 0; bin->freeSpace = packer->binDataSize; } /** * Add a DataVIO to a bin's incoming queue, handle logical space change, and * call physical space processor. * * @param packer The packer * @param bin The bin to which to add the the DataVIO * @param dataVIO The DataVIO to add to the bin's queue **/ static void addDataVIOToInputBin(Packer *packer, InputBin *bin, DataVIO *dataVIO) { // If the selected bin doesn't have room, start a new batch to make room. if (bin->freeSpace < dataVIO->compression.size) { startNewBatch(packer, bin); } addToInputBin(bin, dataVIO); bin->freeSpace -= dataVIO->compression.size; // If we happen to exactly fill the bin, start a new input batch. if ((bin->slotsUsed == packer->maxSlots) || (bin->freeSpace == 0)) { startNewBatch(packer, bin); } // Now that we've finished changing the free space, restore the sort order. insertInSortedList(packer, bin); } /** * Move DataVIOs in pending batches from the batchedDataVIOs to all free output * bins, issuing writes for the output bins as they are packed. This will loop * until either the pending queue is drained or all output bins are busy * writing a compressed block. * * @param packer The packer **/ static void writePendingBatches(Packer *packer) { if (packer->writingBatches) { /* * We've attempted to re-enter this function recursively due to completion * handling, which can lead to kernel stack overflow as in VDO-1340. It's * perfectly safe to break the recursion and do nothing since we know any * pending batches will eventually be handled by the earlier call. */ return; } // Record that we are in this function for the above check. IMPORTANT: never // return from this function without clearing this flag. packer->writingBatches = true; OutputBin *output; while (hasWaiters(&packer->batchedDataVIOs) && ((output = popOutputBin(packer)) != NULL)) { if (!writeNextBatch(packer, output)) { // We didn't use the output bin to write, so push it back on the stack. pushOutputBin(packer, output); } } packer->writingBatches = false; } /** * Select the input bin that should be used to pack the compressed data in a * DataVIO with other DataVIOs. * * @param packer The packer * @param dataVIO The DataVIO **/ __attribute__((warn_unused_result)) static InputBin *selectInputBin(Packer *packer, DataVIO *dataVIO) { // First best fit: select the bin with the least free space that has enough // room for the compressed data in the DataVIO. InputBin *fullestBin = getFullestBin(packer); for (InputBin *bin = fullestBin; bin != NULL; bin = nextBin(packer, bin)) { if (bin->freeSpace >= dataVIO->compression.size) { return bin; } } /* * None of the bins have enough space for the DataVIO. We're not allowed to * create new bins, so we have to overflow one of the existing bins. It's * pretty intuitive to select the fullest bin, since that "wastes" the least * amount of free space in the compressed block. But if the space currently * used in the fullest bin is smaller than the compressed size of the * incoming block, it seems wrong to force that bin to write when giving up * on compressing the incoming DataVIO would likewise "waste" the the least * amount of free space. */ if (dataVIO->compression.size >= (packer->binDataSize - fullestBin->freeSpace)) { return NULL; } // The fullest bin doesn't have room, but writing it out and starting a new // batch with the incoming DataVIO will increase the packer's free space. return fullestBin; } /**********************************************************************/ void attemptPacking(DataVIO *dataVIO) { Packer *packer = getPackerFromDataVIO(dataVIO); assertOnPackerThread(packer, __func__); VIOCompressionState state = getCompressionState(dataVIO); int result = ASSERT((state.status == VIO_COMPRESSING), "attempt to pack DataVIO not ready for packing, state: " "%u", state.status); if (result != VDO_SUCCESS) { return; } /* * Increment whether or not this DataVIO will be packed or not since * abortPacking() always decrements the counter. */ relaxedAdd64(&packer->fragmentsPending, 1); // If packing of this DataVIO is disallowed for administrative reasons, give // up before making any state changes. if (!isNormal(&packer->state) || (dataVIO->flushGeneration < packer->flushGeneration)) { abortPacking(dataVIO); return; } /* * The check of mayBlockInPacker() here will set the DataVIO's compression * state to VIO_PACKING if the DataVIO is allowed to be compressed (if it has * already been canceled, we'll fall out here). Once the DataVIO is in the * VIO_PACKING state, it must be guaranteed to be put in an input bin before * any more requests can be processed by the packer thread. Otherwise, a * canceling DataVIO could attempt to remove the canceled DataVIO from the * packer and fail to rendezvous with it (VDO-2809). We must also make sure * that we will actually bin the DataVIO and not give up on it as being * larger than the space used in the fullest bin. Hence we must call * selectInputBin() before calling mayBlockInPacker() (VDO-2826). */ InputBin *bin = selectInputBin(packer, dataVIO); if ((bin == NULL) || !mayBlockInPacker(dataVIO)) { abortPacking(dataVIO); return; } addDataVIOToInputBin(packer, bin, dataVIO); writePendingBatches(packer); } /** * Force a pending write for all non-empty bins on behalf of a flush or * suspend. * * @param packer The packer being flushed **/ static void writeAllNonEmptyBins(Packer *packer) { for (InputBin *bin = getFullestBin(packer); bin != NULL; bin = nextBin(packer, bin)) { startNewBatch(packer, bin); // We don't need to re-sort the bin here since this loop will make every // bin have the same amount of free space, so every ordering is sorted. } writePendingBatches(packer); } /**********************************************************************/ void flushPacker(Packer *packer) { assertOnPackerThread(packer, __func__); if (isNormal(&packer->state)) { writeAllNonEmptyBins(packer); } } /* * This method is only exposed for unit tests and should not normally be called * directly; use removeLockHolderFromPacker() instead. */ void removeFromPacker(DataVIO *dataVIO) { InputBin *bin = dataVIO->compression.bin; ASSERT_LOG_ONLY((bin != NULL), "DataVIO in packer has an input bin"); SlotNumber slot = dataVIO->compression.slot; bin->slotsUsed--; if (slot < bin->slotsUsed) { bin->incoming[slot] = bin->incoming[bin->slotsUsed]; bin->incoming[slot]->compression.slot = slot; } dataVIO->compression.bin = NULL; dataVIO->compression.slot = 0; Packer *packer = getPackerFromDataVIO(dataVIO); if (bin != packer->canceledBin) { bin->freeSpace += dataVIO->compression.size; insertInSortedList(packer, bin); } abortPacking(dataVIO); checkForDrainComplete(packer); } /**********************************************************************/ void removeLockHolderFromPacker(VDOCompletion *completion) { DataVIO *dataVIO = asDataVIO(completion); assertInPackerZone(dataVIO); DataVIO *lockHolder = dataVIO->compression.lockHolder; dataVIO->compression.lockHolder = NULL; removeFromPacker(lockHolder); } /**********************************************************************/ void incrementPackerFlushGeneration(Packer *packer) { assertOnPackerThread(packer, __func__); packer->flushGeneration++; flushPacker(packer); } /** * Initiate a drain. * * Implements AdminInitiator. **/ static void initiateDrain(AdminState *state) { Packer *packer = container_of(state, Packer, state); writeAllNonEmptyBins(packer); checkForDrainComplete(packer); } /**********************************************************************/ void drainPacker(Packer *packer, VDOCompletion *completion) { assertOnPackerThread(packer, __func__); startDraining(&packer->state, ADMIN_STATE_SUSPENDING, completion, initiateDrain); } /**********************************************************************/ void resumePacker(Packer *packer, VDOCompletion *parent) { assertOnPackerThread(packer, __func__); finishCompletion(parent, resumeIfQuiescent(&packer->state)); } /**********************************************************************/ void resetSlotCount(Packer *packer, CompressedFragmentCount slots) { if (slots > MAX_COMPRESSION_SLOTS) { return; } packer->maxSlots = slots; } /**********************************************************************/ static void dumpInputBin(const InputBin *bin, bool canceled) { if (bin->slotsUsed == 0) { // Don't dump empty input bins. return; } logInfo(" %sBin slotsUsed=%u freeSpace=%zu", (canceled ? "Canceled" : "Input"), bin->slotsUsed, bin->freeSpace); // XXX dump VIOs in bin->incoming? The VIOs should have been dumped from the // VIO pool. Maybe just dump their addresses so it's clear they're here? } /**********************************************************************/ static void dumpOutputBin(const OutputBin *bin) { size_t count = countWaiters(&bin->outgoing); if (bin->slotsUsed == 0) { // Don't dump empty output bins. return; } logInfo(" OutputBin contains %zu outgoing waiters", count); // XXX dump VIOs in bin->outgoing? The VIOs should have been dumped from the // VIO pool. Maybe just dump their addresses so it's clear they're here? // XXX dump writer VIO? } /**********************************************************************/ void dumpPacker(const Packer *packer) { logInfo("Packer"); logInfo(" flushGeneration=%llu state %s writingBatches=%s", packer->flushGeneration, getAdminStateName(&packer->state), boolToString(packer->writingBatches)); logInfo(" inputBinCount=%llu", packer->size); for (InputBin *bin = getFullestBin(packer); bin != NULL; bin = nextBin(packer, bin)) { dumpInputBin(bin, false); } dumpInputBin(packer->canceledBin, true); logInfo(" outputBinCount=%zu idleOutputBinCount=%zu", packer->outputBinCount, packer->idleOutputBinCount); const RingNode *head = &packer->outputBins; for (RingNode *node = head->next; node != head; node = node->next) { dumpOutputBin(outputBinFromRingNode(node)); } }