/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/dataKVIO.c#18 $
*/
#include "dataKVIO.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "murmur/MurmurHash3.h"
#include "dataVIO.h"
#include "compressedBlock.h"
#include "hashLock.h"
#include "lz4.h"
#include "bio.h"
#include "dedupeIndex.h"
#include "kvdoFlush.h"
#include "kvio.h"
#include "ioSubmitter.h"
#include "vdoCommon.h"
#include "verify.h"
static void dumpPooledDataKVIO(void *poolData, void *data);
enum {
WRITE_PROTECT_FREE_POOL = 0,
WP_DATA_KVIO_SIZE = (sizeof(DataKVIO) + PAGE_SIZE - 1
- ((sizeof(DataKVIO) + PAGE_SIZE - 1)
% PAGE_SIZE))
};
/**
* Alter the write-access permission to a page of memory, so that
* objects in the free pool may no longer be modified.
*
* To do: Deny read access as well.
*
* @param address The starting address to protect, which must be on a
* page boundary
* @param byteCount The number of bytes to protect, which must be a multiple
* of the page size
* @param mode The write protection mode (true means read-only)
**/
static __always_inline void
setWriteProtect(void *address,
size_t byteCount,
bool mode __attribute__((unused)))
{
BUG_ON((((long) address) % PAGE_SIZE) != 0);
BUG_ON((byteCount % PAGE_SIZE) != 0);
BUG(); // only works in internal code, sorry
}
/**********************************************************************/
static void maybeLogDataKVIOTrace(DataKVIO *dataKVIO)
{
if (dataKVIO->kvio.layer->traceLogging) {
logKvioTrace(&dataKVIO->kvio);
}
}
/**
* First tracing hook for VIO completion.
*
* If the SystemTap script vdotrace.stp is in use, it does stage 1 of
* its processing here. We must not call addTraceRecord between the
* two tap functions.
*
* @param dataKVIO The VIO we're finishing up
**/
static void kvioCompletionTap1(DataKVIO *dataKVIO)
{
/*
* Ensure that dataKVIO doesn't get optimized out, even under inline
* expansion. Also, make sure the compiler has to emit debug info
* for baseTraceLocation, which some of our SystemTap scripts will
* use here.
*
* First, make it look as though all memory could be clobbered; then
* require that a value be read into a register. That'll force at
* least one instruction to exist (so SystemTap can hook in) where
* dataKVIO is live. We use a field that the caller would've
* accessed recently anyway, so it may be cached.
*/
barrier();
__asm__ __volatile__(""
:
: "g" (dataKVIO), "g" (baseTraceLocation),
"r" (dataKVIO->kvio.layer));
}
/**
* Second tracing hook for VIO completion.
*
* The SystemTap script vdotrace.stp splits its VIO-completion work
* into two stages, to reduce lock contention for script variables.
* Hence, it needs two hooks in the code.
*
* @param dataKVIO The VIO we're finishing up
**/
static void kvioCompletionTap2(DataKVIO *dataKVIO)
{
// Hack to ensure variable doesn't get optimized out.
barrier();
__asm__ __volatile__("" : : "g" (dataKVIO), "r" (dataKVIO->kvio.layer));
}
/**********************************************************************/
static void kvdoAcknowledgeDataKVIO(DataKVIO *dataKVIO)
{
KernelLayer *layer = dataKVIO->kvio.layer;
ExternalIORequest *externalIORequest = &dataKVIO->externalIORequest;
BIO *bio = externalIORequest->bio;
if (bio == NULL) {
return;
}
externalIORequest->bio = NULL;
int error
= mapToSystemError(dataVIOAsCompletion(&dataKVIO->dataVIO)->result);
bio->bi_end_io = externalIORequest->endIO;
bio->bi_private = externalIORequest->private;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)
bio->bi_opf = externalIORequest->rw;
#else
bio->bi_rw = externalIORequest->rw;
#endif
countBios(&layer->biosAcknowledged, bio);
if (dataKVIO->isPartial) {
countBios(&layer->biosAcknowledgedPartial, bio);
}
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
completeBio(bio, error);
}
/**********************************************************************/
static noinline void cleanDataKVIO(DataKVIO *dataKVIO, FreeBufferPointers *fbp)
{
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
kvdoAcknowledgeDataKVIO(dataKVIO);
KVIO *kvio = dataKVIOAsKVIO(dataKVIO);
kvio->bio = NULL;
if (unlikely(kvio->vio->trace != NULL)) {
maybeLogDataKVIOTrace(dataKVIO);
kvioCompletionTap1(dataKVIO);
kvioCompletionTap2(dataKVIO);
freeTraceToPool(kvio->layer, kvio->vio->trace);
}
addFreeBufferPointer(fbp, dataKVIO);
}
/**********************************************************************/
void returnDataKVIOBatchToPool(BatchProcessor *batch, void *closure)
{
KernelLayer *layer = closure;
uint32_t count = 0;
ASSERT_LOG_ONLY(batch != NULL, "batch not null");
ASSERT_LOG_ONLY(layer != NULL, "layer not null");
FreeBufferPointers fbp;
initFreeBufferPointers(&fbp, layer->dataKVIOPool);
KvdoWorkItem *item;
while ((item = nextBatchItem(batch)) != NULL) {
cleanDataKVIO(workItemAsDataKVIO(item), &fbp);
condReschedBatchProcessor(batch);
count++;
}
if (fbp.index > 0) {
freeBufferPointers(&fbp);
}
completeManyRequests(layer, count);
}
/**********************************************************************/
static void kvdoAcknowledgeThenCompleteDataKVIO(KvdoWorkItem *item)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(item);
kvdoAcknowledgeDataKVIO(dataKVIO);
addToBatchProcessor(dataKVIO->kvio.layer->dataKVIOReleaser, item);
}
/**********************************************************************/
void kvdoCompleteDataKVIO(VDOCompletion *completion)
{
DataKVIO *dataKVIO = dataVIOAsDataKVIO(asDataVIO(completion));
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
if (useBioAckQueue(layer) && USE_BIO_ACK_QUEUE_FOR_READ
&& (dataKVIO->externalIORequest.bio != NULL)) {
launchDataKVIOOnBIOAckQueue(dataKVIO, kvdoAcknowledgeThenCompleteDataKVIO,
NULL, BIO_ACK_Q_ACTION_ACK);
} else {
addToBatchProcessor(layer->dataKVIOReleaser,
workItemFromDataKVIO(dataKVIO));
}
}
/**
* Copy the uncompressed data from a compressed block read into the user
* bio which requested the read.
*
* @param workItem The DataKVIO which requested the read
**/
static void copyReadBlockData(KvdoWorkItem *workItem)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(workItem);
// For a read-modify-write, copy the data into the dataBlock buffer so it
// will be set up for the write phase.
if (isReadModifyWriteVIO(dataKVIO->kvio.vio)) {
bioCopyDataOut(getBIOFromDataKVIO(dataKVIO), dataKVIO->readBlock.data);
kvdoEnqueueDataVIOCallback(dataKVIO);
return;
}
// For a partial read, the callback will copy the requested data from the
// read block.
if (dataKVIO->isPartial) {
kvdoEnqueueDataVIOCallback(dataKVIO);
return;
}
// For a full block read, copy the data to the bio and acknowledge.
bioCopyDataOut(getBIOFromDataKVIO(dataKVIO), dataKVIO->readBlock.data);
kvdoAcknowledgeDataVIO(&dataKVIO->dataVIO);
}
/**
* Finish reading data for a compressed block.
*
* @param dataKVIO The DataKVIO which requested the read
**/
static void readDataKVIOReadBlockCallback(DataKVIO *dataKVIO)
{
if (dataKVIO->readBlock.status != VDO_SUCCESS) {
setCompletionResult(dataVIOAsCompletion(&dataKVIO->dataVIO),
dataKVIO->readBlock.status);
kvdoEnqueueDataVIOCallback(dataKVIO);
return;
}
launchDataKVIOOnCPUQueue(dataKVIO, copyReadBlockData, NULL,
CPU_Q_ACTION_COMPRESS_BLOCK);
}
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
/**
* Complete and reset a bio that was supplied by the user and then used for a
* read (so that we can complete it with the user's callback).
*
* @param bio The bio to complete
**/
static void resetUserBio(BIO *bio)
#else
/**
* Complete and reset a bio that was supplied by the user and then used for a
* read (so that we can complete it with the user's callback).
*
* @param bio The bio to complete
* @param error Possible error from underlying block device
**/
static void resetUserBio(BIO *bio, int error)
#endif
{
#if ((LINUX_VERSION_CODE >= KERNEL_VERSION(3,14,0)) \
&& (LINUX_VERSION_CODE < KERNEL_VERSION(4,2,0)))
// This is a user bio, and the device just called bio_endio() on it, so
// we need to re-increment bi_remaining so we too can call bio_endio().
atomic_inc(&bio->bi_remaining);
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
completeAsyncBio(bio);
#else
completeAsyncBio(bio, error);
#endif
}
/**
* Uncompress the data that's just been read and then call back the requesting
* DataKVIO.
*
* @param workItem The DataKVIO requesting the data
**/
static void uncompressReadBlock(KvdoWorkItem *workItem)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(workItem);
ReadBlock *readBlock = &dataKVIO->readBlock;
BlockSize blockSize = VDO_BLOCK_SIZE;
// The DataKVIO's scratch block will be used to contain the
// uncompressed data.
uint16_t fragmentOffset, fragmentSize;
char *compressedData = readBlock->data;
int result = getCompressedBlockFragment(readBlock->mappingState,
compressedData, blockSize,
&fragmentOffset,
&fragmentSize);
if (result != VDO_SUCCESS) {
logDebug("%s: frag err %d", __func__, result);
readBlock->status = result;
readBlock->callback(dataKVIO);
return;
}
char *fragment = compressedData + fragmentOffset;
int size = LZ4_uncompress_unknownOutputSize(fragment, dataKVIO->scratchBlock,
fragmentSize, blockSize);
if (size == blockSize) {
readBlock->data = dataKVIO->scratchBlock;
} else {
logDebug("%s: lz4 error", __func__);
readBlock->status = VDO_INVALID_FRAGMENT;
}
readBlock->callback(dataKVIO);
}
/**
* Now that we have gotten the data from storage, uncompress the data if
* necessary and then call back the requesting DataKVIO.
*
* @param dataKVIO The DataKVIO requesting the data
* @param result The result of the read operation
**/
static void completeRead(DataKVIO *dataKVIO, int result)
{
ReadBlock *readBlock = &dataKVIO->readBlock;
readBlock->status = result;
if ((result == VDO_SUCCESS) && isCompressed(readBlock->mappingState)) {
launchDataKVIOOnCPUQueue(dataKVIO, uncompressReadBlock, NULL,
CPU_Q_ACTION_COMPRESS_BLOCK);
return;
}
readBlock->callback(dataKVIO);
}
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
/**
* Callback for a bio doing a read.
*
* @param bio The bio
*/
static void readBioCallback(BIO *bio)
#else
/**
* Callback for a bio doing a read.
*
* @param bio The bio
* @param result The result of the read operation
*/
static void readBioCallback(BIO *bio, int result)
#endif
{
KVIO *kvio = (KVIO *) bio->bi_private;
DataKVIO *dataKVIO = kvioAsDataKVIO(kvio);
dataKVIO->readBlock.data = dataKVIO->readBlock.buffer;
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
countCompletedBios(bio);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
completeRead(dataKVIO, getBioResult(bio));
#else
completeRead(dataKVIO, result);
#endif
}
/**********************************************************************/
void kvdoReadBlock(DataVIO *dataVIO,
PhysicalBlockNumber location,
BlockMappingState mappingState,
BioQAction action,
DataKVIOCallback callback)
{
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
ReadBlock *readBlock = &dataKVIO->readBlock;
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
readBlock->callback = callback;
readBlock->status = VDO_SUCCESS;
readBlock->mappingState = mappingState;
BUG_ON(getBIOFromDataKVIO(dataKVIO)->bi_private != &dataKVIO->kvio);
// Read the data directly from the device using the read bio.
BIO *bio = readBlock->bio;
resetBio(bio, layer);
setBioSector(bio, blockToSector(layer, location));
setBioOperationRead(bio);
bio->bi_end_io = readBioCallback;
submitBio(bio, action);
}
/**********************************************************************/
void kvdoReadDataVIO(DataVIO *dataVIO)
{
ASSERT_LOG_ONLY(!isWriteVIO(dataVIOAsVIO(dataVIO)),
"operation set correctly for data read");
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F;io=readData"));
if (isCompressed(dataVIO->mapped.state)) {
kvdoReadBlock(dataVIO, dataVIO->mapped.pbn, dataVIO->mapped.state,
BIO_Q_ACTION_COMPRESSED_DATA, readDataKVIOReadBlockCallback);
return;
}
KVIO *kvio = dataVIOAsKVIO(dataVIO);
BIO *bio = kvio->bio;
bio->bi_end_io = resetUserBio;
setBioSector(bio, blockToSector(kvio->layer, dataVIO->mapped.pbn));
submitBio(bio, BIO_Q_ACTION_DATA);
}
/**********************************************************************/
static void kvdoAcknowledgeDataKVIOThenContinue(KvdoWorkItem *item)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(item);
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
kvdoAcknowledgeDataKVIO(dataKVIO);
// Even if we're not using bio-ack threads, we may be in the wrong
// base-code thread.
kvdoEnqueueDataVIOCallback(dataKVIO);
}
/**********************************************************************/
void kvdoAcknowledgeDataVIO(DataVIO *dataVIO)
{
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
// If the remaining discard work is not completely processed by this VIO,
// don't acknowledge it yet.
if (isDiscardBio(dataKVIO->externalIORequest.bio)
&& (dataKVIO->remainingDiscard
> (VDO_BLOCK_SIZE - dataKVIO->offset))) {
invokeCallback(dataVIOAsCompletion(dataVIO));
return;
}
// We've finished with the KVIO; acknowledge completion of the bio to the
// kernel.
if (useBioAckQueue(layer)) {
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
launchDataKVIOOnBIOAckQueue(dataKVIO, kvdoAcknowledgeDataKVIOThenContinue,
NULL, BIO_ACK_Q_ACTION_ACK);
} else {
kvdoAcknowledgeDataKVIOThenContinue(workItemFromDataKVIO(dataKVIO));
}
}
/**********************************************************************/
void kvdoWriteDataVIO(DataVIO *dataVIO)
{
ASSERT_LOG_ONLY(isWriteVIO(dataVIOAsVIO(dataVIO)),
"kvdoWriteDataVIO() called on write DataVIO");
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("$F;io=writeData;j=normal"));
KVIO *kvio = dataVIOAsKVIO(dataVIO);
BIO *bio = kvio->bio;
setBioOperationWrite(bio);
setBioSector(bio, blockToSector(kvio->layer, dataVIO->newMapped.pbn));
submitBio(bio, BIO_Q_ACTION_DATA);
}
/**********************************************************************/
void kvdoModifyWriteDataVIO(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
BIO *bio = dataKVIO->externalIORequest.bio;
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
resetBio(dataKVIO->dataBlockBio, layer);
if (!isDiscardBio(bio)) {
bioCopyDataIn(bio, dataKVIO->dataBlock + dataKVIO->offset);
} else {
memset(dataKVIO->dataBlock + dataKVIO->offset, '\0',
min(dataKVIO->remainingDiscard,
(DiscardSize) (VDO_BLOCK_SIZE - dataKVIO->offset)));
}
dataVIO->isZeroBlock = bioIsZeroData(dataKVIO->dataBlockBio);
dataKVIO->dataBlockBio->bi_private = &dataKVIO->kvio;
copyBioOperationAndFlags(dataKVIO->dataBlockBio, bio);
// Make the bio a write, not (potentially) a discard.
setBioOperationWrite(dataKVIO->dataBlockBio);
}
/**********************************************************************/
void kvdoZeroDataVIO(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION("zeroDataVIO;io=readData"));
bioZeroData(dataVIOAsKVIO(dataVIO)->bio);
}
/**********************************************************************/
void kvdoCopyDataVIO(DataVIO *source, DataVIO *destination)
{
dataVIOAddTraceRecord(destination, THIS_LOCATION(NULL));
bioCopyDataOut(dataVIOAsKVIO(destination)->bio,
dataVIOAsDataKVIO(source)->dataBlock);
}
/**********************************************************************/
static void kvdoCompressWork(KvdoWorkItem *item)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(item);
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
char *context = getWorkQueuePrivateData();
if (unlikely(context == NULL)) {
uint32_t index = atomicAdd32(&layer->compressionContextIndex, 1) - 1;
BUG_ON(index >= layer->deviceConfig->threadCounts.cpuThreads);
context = layer->compressionContext[index];
setWorkQueuePrivateData(context);
}
int size = LZ4_compress_ctx_limitedOutput(context, dataKVIO->dataBlock,
dataKVIO->scratchBlock,
VDO_BLOCK_SIZE,
VDO_BLOCK_SIZE);
DataVIO *dataVIO = &dataKVIO->dataVIO;
if (size > 0) {
// The scratch block will be used to contain the compressed data.
dataVIO->compression.data = dataKVIO->scratchBlock;
dataVIO->compression.size = size;
} else {
// Use block size plus one as an indicator for uncompressible data.
dataVIO->compression.size = VDO_BLOCK_SIZE + 1;
}
kvdoEnqueueDataVIOCallback(dataKVIO);
}
/**********************************************************************/
void kvdoCompressDataVIO(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO,
THIS_LOCATION("compressDataVIO;"
"io=compress;cb=compress"));
/*
* If the orignal bio was a discard, but we got this far because the discard
* was a partial one (r/m/w), and it is part of a larger discard, we cannot
* compress this VIO. We need to make sure the VIO completes ASAP.
*/
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
if (isDiscardBio(dataKVIO->externalIORequest.bio)
&& (dataKVIO->remainingDiscard > 0)) {
dataVIO->compression.size = VDO_BLOCK_SIZE + 1;
kvdoEnqueueDataVIOCallback(dataKVIO);
return;
}
launchDataKVIOOnCPUQueue(dataKVIO, kvdoCompressWork, NULL,
CPU_Q_ACTION_COMPRESS_BLOCK);
}
/**
* Construct a DataKVIO.
*
* @param [in] layer The physical layer
* @param [in] bio The bio to associate with this DataKVIO
* @param [out] dataKVIOPtr A pointer to hold the new DataKVIO
*
* @return VDO_SUCCESS or an error
**/
__attribute__((warn_unused_result))
static int makeDataKVIO(KernelLayer *layer, BIO *bio, DataKVIO **dataKVIOPtr)
{
DataKVIO *dataKVIO;
int result = allocBufferFromPool(layer->dataKVIOPool, (void **) &dataKVIO);
if (result != VDO_SUCCESS) {
return logErrorWithStringError(result, "data kvio allocation failure");
}
if (WRITE_PROTECT_FREE_POOL) {
setWriteProtect(dataKVIO, WP_DATA_KVIO_SIZE, false);
}
KVIO *kvio = &dataKVIO->kvio;
kvio->vio = dataVIOAsVIO(&dataKVIO->dataVIO);
memset(&kvio->enqueueable, 0, sizeof(KvdoEnqueueable));
memset(&dataKVIO->dedupeContext.pendingList, 0, sizeof(struct list_head));
memset(&dataKVIO->dataVIO, 0, sizeof(DataVIO));
kvio->bioToSubmit = NULL;
bio_list_init(&kvio->biosMerged);
// The dataBlock is only needed for writes and some partial reads.
if (isWriteBio(bio) || (getBioSize(bio) < VDO_BLOCK_SIZE)) {
resetBio(dataKVIO->dataBlockBio, layer);
}
initializeKVIO(kvio, layer, VIO_TYPE_DATA, VIO_PRIORITY_DATA, NULL, bio);
*dataKVIOPtr = dataKVIO;
return VDO_SUCCESS;
}
/**
* Creates a new DataVIO structure. A DataVIO represents a single logical
* block of data. It is what most VDO operations work with. This function also
* creates a wrapping DataKVIO structure that is used when we want to
* physically read or write the data associated with the DataVIO.
*
* @param [in] layer The physical layer
* @param [in] bio The BIO from the request the new DataKVIO will
* service
* @param [in] arrivalTime The arrival time of the BIO
* @param [out] dataKVIOPtr A pointer to hold the new DataKVIO
*
* @return VDO_SUCCESS or an error
**/
static int kvdoCreateKVIOFromBio(KernelLayer *layer,
BIO *bio,
Jiffies arrivalTime,
DataKVIO **dataKVIOPtr)
{
ExternalIORequest externalIORequest = {
.bio = bio,
.private = bio->bi_private,
.endIO = bio->bi_end_io,
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)
.rw = bio->bi_opf,
#else
.rw = bio->bi_rw,
#endif
};
// We will handle FUA at the end of the request (after we restore the
// bi_rw field from externalIORequest.rw).
clearBioOperationFlagFua(bio);
DataKVIO *dataKVIO = NULL;
int result = makeDataKVIO(layer, bio, &dataKVIO);
if (result != VDO_SUCCESS) {
return result;
}
dataKVIO->externalIORequest = externalIORequest;
dataKVIO->offset = sectorToBlockOffset(layer, getBioSector(bio));
dataKVIO->isPartial = ((getBioSize(bio) < VDO_BLOCK_SIZE)
|| (dataKVIO->offset != 0));
if (dataKVIO->isPartial) {
countBios(&layer->biosInPartial, bio);
} else {
/*
* Note that we unconditionally fill in the dataBlock array for
* non-read operations. There are places like kvdoCopyVIO that may
* look at kvio->dataBlock for a zero block (and maybe for
* discards?). We could skip filling in dataBlock for such cases,
* but only once we're sure all such places are fixed to check the
* isZeroBlock flag first.
*/
if (isDiscardBio(bio)) {
/*
* This is a discard/trim operation. This is treated much like the zero
* block, but we keep different stats and distinguish it in the block
* map.
*/
memset(dataKVIO->dataBlock, 0, VDO_BLOCK_SIZE);
} else if (bio_data_dir(bio) == WRITE) {
dataKVIO->dataVIO.isZeroBlock = bioIsZeroData(bio);
// Copy the bio data to a char array so that we can continue to use
// the data after we acknowledge the bio.
bioCopyDataIn(bio, dataKVIO->dataBlock);
}
}
if (dataKVIO->isPartial || isWriteBio(bio)) {
/*
* dataKVIO->bio will point at kvio->dataBlockBio for all writes and
* partial block I/O so the rest of the kernel code doesn't need to
* make a decision as to what to use.
*/
dataKVIO->dataBlockBio->bi_private = &dataKVIO->kvio;
if (dataKVIO->isPartial && isWriteBio(bio)) {
clearBioOperationAndFlags(dataKVIO->dataBlockBio);
setBioOperationRead(dataKVIO->dataBlockBio);
} else {
copyBioOperationAndFlags(dataKVIO->dataBlockBio, bio);
}
dataKVIOAsKVIO(dataKVIO)->bio = dataKVIO->dataBlockBio;
dataKVIO->readBlock.data = dataKVIO->dataBlock;
}
setBioBlockDevice(bio, getKernelLayerBdev(layer));
bio->bi_end_io = completeAsyncBio;
*dataKVIOPtr = dataKVIO;
return VDO_SUCCESS;
}
/**********************************************************************/
static void launchDataKVIOWork(KvdoWorkItem *item)
{
runCallback(vioAsCompletion(workItemAsKVIO(item)->vio));
}
/**
* Continue discard processing for requests that span multiple physical blocks.
* If all have been processed the KVIO is completed. If we have already seen
* an error, we skip the rest of the discard and fail immediately.
*
* <p>Invoked in a request-queue thread after the discard of a block has
* completed.
*
* @param completion A completion representing the discard KVIO
**/
static void kvdoContinueDiscardKVIO(VDOCompletion *completion)
{
DataVIO *dataVIO = asDataVIO(completion);
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
KernelLayer *layer = getLayerFromDataKVIO(dataKVIO);
dataKVIO->remainingDiscard
-= min(dataKVIO->remainingDiscard,
(DiscardSize) (VDO_BLOCK_SIZE - dataKVIO->offset));
if ((completion->result != VDO_SUCCESS)
|| (dataKVIO->remainingDiscard == 0)) {
if (dataKVIO->hasDiscardPermit) {
limiterRelease(&layer->discardLimiter);
dataKVIO->hasDiscardPermit = false;
}
kvdoCompleteDataKVIO(completion);
return;
}
BIO *bio = getBIOFromDataKVIO(dataKVIO);
resetBio(bio, layer);
dataKVIO->isPartial = (dataKVIO->remainingDiscard < VDO_BLOCK_SIZE);
dataKVIO->offset = 0;
VIOOperation operation;
if (dataKVIO->isPartial) {
operation = VIO_READ_MODIFY_WRITE;
setBioOperationRead(bio);
} else {
operation = VIO_WRITE;
}
if (requestorSetFUA(dataKVIO)) {
operation |= VIO_FLUSH_AFTER;
}
prepareDataVIO(dataVIO, dataVIO->logical.lbn + 1, operation,
!dataKVIO->isPartial, kvdoContinueDiscardKVIO);
enqueueDataKVIO(dataKVIO, launchDataKVIOWork, completion->callback,
REQ_Q_ACTION_MAP_BIO);
}
/**
* Finish a partial read.
*
* @param completion The partial read KVIO
**/
static void kvdoCompletePartialRead(VDOCompletion *completion)
{
DataKVIO *dataKVIO = dataVIOAsDataKVIO(asDataVIO(completion));
dataKVIOAddTraceRecord(dataKVIO, THIS_LOCATION(NULL));
bioCopyDataOut(dataKVIO->externalIORequest.bio,
dataKVIO->readBlock.data + dataKVIO->offset);
kvdoCompleteDataKVIO(completion);
return;
}
/**********************************************************************/
int kvdoLaunchDataKVIOFromBio(KernelLayer *layer,
BIO *bio,
uint64_t arrivalTime,
bool hasDiscardPermit)
{
DataKVIO *dataKVIO = NULL;
int result = kvdoCreateKVIOFromBio(layer, bio, arrivalTime, &dataKVIO);
if (unlikely(result != VDO_SUCCESS)) {
logInfo("%s: KVIO allocation failure", __func__);
if (hasDiscardPermit) {
limiterRelease(&layer->discardLimiter);
}
limiterRelease(&layer->requestLimiter);
return mapToSystemError(result);
}
/*
* Discards behave very differently than other requests when coming
* in from device-mapper. We have to be able to handle any size discards
* and with various sector offsets within a block.
*/
KVIO *kvio = &dataKVIO->kvio;
VDOAction *callback = kvdoCompleteDataKVIO;
VIOOperation operation = VIO_WRITE;
bool isTrim = false;
if (isDiscardBio(bio)) {
dataKVIO->hasDiscardPermit = hasDiscardPermit;
dataKVIO->remainingDiscard = getBioSize(bio);
callback = kvdoContinueDiscardKVIO;
if (dataKVIO->isPartial) {
operation = VIO_READ_MODIFY_WRITE;
} else {
isTrim = true;
}
} else if (dataKVIO->isPartial) {
if (bio_data_dir(bio) == READ) {
callback = kvdoCompletePartialRead;
operation = VIO_READ;
} else {
operation = VIO_READ_MODIFY_WRITE;
}
} else if (bio_data_dir(bio) == READ) {
operation = VIO_READ;
}
if (requestorSetFUA(dataKVIO)) {
operation |= VIO_FLUSH_AFTER;
}
LogicalBlockNumber lbn
= sectorToBlock(layer, getBioSector(bio) - layer->startingSectorOffset);
prepareDataVIO(&dataKVIO->dataVIO, lbn, operation, isTrim, callback);
enqueueKVIO(kvio, launchDataKVIOWork, vioAsCompletion(kvio->vio)->callback,
REQ_Q_ACTION_MAP_BIO);
return VDO_SUCCESS;
}
/**
* Hash a DataKVIO and set its chunk name.
*
* @param item The DataKVIO to be hashed
**/
static void kvdoHashDataWork(KvdoWorkItem *item)
{
DataKVIO *dataKVIO = workItemAsDataKVIO(item);
DataVIO *dataVIO = &dataKVIO->dataVIO;
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
MurmurHash3_x64_128(dataKVIO->dataBlock, VDO_BLOCK_SIZE, 0x62ea60be,
&dataVIO->chunkName);
dataKVIO->dedupeContext.chunkName = &dataVIO->chunkName;
kvdoEnqueueDataVIOCallback(dataKVIO);
}
/**********************************************************************/
void kvdoHashDataVIO(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO, THIS_LOCATION(NULL));
launchDataKVIOOnCPUQueue(dataVIOAsDataKVIO(dataVIO), kvdoHashDataWork, NULL,
CPU_Q_ACTION_HASH_BLOCK);
}
/**********************************************************************/
void kvdoCheckForDuplication(DataVIO *dataVIO)
{
dataVIOAddTraceRecord(dataVIO,
THIS_LOCATION("checkForDuplication;dup=post"));
ASSERT_LOG_ONLY(!dataVIO->isZeroBlock,
"zero block not checked for duplication");
ASSERT_LOG_ONLY(dataVIO->newMapped.state != MAPPING_STATE_UNMAPPED,
"discard not checked for duplication");
DataKVIO *dataKVIO = dataVIOAsDataKVIO(dataVIO);
if (hasAllocation(dataVIO)) {
postDedupeAdvice(dataKVIO);
} else {
// This block has not actually been written (presumably because we are
// full), so attempt to dedupe without posting bogus advice.
queryDedupeAdvice(dataKVIO);
}
}
/**********************************************************************/
void kvdoUpdateDedupeAdvice(DataVIO *dataVIO)
{
updateDedupeAdvice(dataVIOAsDataKVIO(dataVIO));
}
/**
* Implements BufferFreeFunction.
**/
static void freePooledDataKVIO(void *poolData, void *data)
{
if (data == NULL) {
return;
}
DataKVIO *dataKVIO = (DataKVIO *) data;
KernelLayer *layer = (KernelLayer *) poolData;
if (WRITE_PROTECT_FREE_POOL) {
setWriteProtect(dataKVIO, WP_DATA_KVIO_SIZE, false);
}
if (dataKVIO->dataBlockBio != NULL) {
freeBio(dataKVIO->dataBlockBio, layer);
}
if (dataKVIO->readBlock.bio != NULL) {
freeBio(dataKVIO->readBlock.bio, layer);
}
FREE(dataKVIO->readBlock.buffer);
FREE(dataKVIO->dataBlock);
FREE(dataKVIO->scratchBlock);
FREE(dataKVIO);
}
/**
* Allocate a DataKVIO. This function is the internals of makePooledDataKVIO().
*
* @param [in] layer The layer in which the DataKVIO will operate
* @param [out] dataKVIOPtr A pointer to hold the newly allocated DataKVIO
*
* @return VDO_SUCCESS or an error
**/
static int allocatePooledDataKVIO(KernelLayer *layer, DataKVIO **dataKVIOPtr)
{
DataKVIO *dataKVIO;
int result;
if (WRITE_PROTECT_FREE_POOL) {
STATIC_ASSERT(WP_DATA_KVIO_SIZE >= sizeof(DataKVIO));
result = allocateMemory(WP_DATA_KVIO_SIZE, 0, __func__, &dataKVIO);
if (result == VDO_SUCCESS) {
BUG_ON((((size_t) dataKVIO) & (PAGE_SIZE - 1)) != 0);
}
} else {
result = ALLOCATE(1, DataKVIO, __func__, &dataKVIO);
}
if (result != VDO_SUCCESS) {
return logErrorWithStringError(result, "DataKVIO allocation failure");
}
STATIC_ASSERT(VDO_BLOCK_SIZE <= PAGE_SIZE);
result = allocateMemory(VDO_BLOCK_SIZE, 0, "kvio data",
&dataKVIO->dataBlock);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(layer, dataKVIO);
return logErrorWithStringError(result, "DataKVIO data allocation failure");
}
result = createBio(layer, dataKVIO->dataBlock, &dataKVIO->dataBlockBio);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(layer, dataKVIO);
return logErrorWithStringError(result,
"DataKVIO data bio allocation failure");
}
result = allocateMemory(VDO_BLOCK_SIZE, 0, "kvio read buffer",
&dataKVIO->readBlock.buffer);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(layer, dataKVIO);
return logErrorWithStringError(result,
"DataKVIO read allocation failure");
}
result = createBio(layer, dataKVIO->readBlock.buffer,
&dataKVIO->readBlock.bio);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(layer, dataKVIO);
return logErrorWithStringError(result,
"DataKVIO read bio allocation failure");
}
dataKVIO->readBlock.bio->bi_private = &dataKVIO->kvio;
result = allocateMemory(VDO_BLOCK_SIZE, 0, "kvio scratch",
&dataKVIO->scratchBlock);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(layer, dataKVIO);
return logErrorWithStringError(result,
"DataKVIO scratch allocation failure");
}
*dataKVIOPtr = dataKVIO;
return VDO_SUCCESS;
}
/**
* Implements BufferAllocateFunction.
**/
static int makePooledDataKVIO(void *poolData, void **dataPtr)
{
DataKVIO *dataKVIO = NULL;
int result = allocatePooledDataKVIO((KernelLayer *) poolData, &dataKVIO);
if (result != VDO_SUCCESS) {
freePooledDataKVIO(poolData, dataKVIO);
return result;
}
*dataPtr = dataKVIO;
return VDO_SUCCESS;
}
/**
* Dump out the waiters on each DataVIO in the DataVIO buffer pool.
*
* @param queue The queue to check (logical or physical)
* @param waitOn The label to print for queue (logical or physical)
**/
static void dumpVIOWaiters(WaitQueue *queue, char *waitOn)
{
Waiter *first = getFirstWaiter(queue);
if (first == NULL) {
return;
}
DataVIO *dataVIO = waiterAsDataVIO(first);
logInfo(" %s is locked. Waited on by: VIO %" PRIptr " pbn %" PRIu64
" lbn %llu d-pbn %llu lastOp %s",
waitOn, dataVIO, getDataVIOAllocation(dataVIO),
dataVIO->logical.lbn, dataVIO->duplicate.pbn,
getOperationName(dataVIO));
Waiter *waiter;
for (waiter = first->nextWaiter;
waiter != first;
waiter = waiter->nextWaiter) {
dataVIO = waiterAsDataVIO(waiter);
logInfo(" ... and : VIO %" PRIptr " pbn %llu lbn %"
PRIu64 " d-pbn %llu lastOp %s",
dataVIO, getDataVIOAllocation(dataVIO), dataVIO->logical.lbn,
dataVIO->duplicate.pbn, getOperationName(dataVIO));
}
}
/**
* Encode various attributes of a VIO as a string of one-character flags for
* dump logging. This encoding is for logging brevity:
*
* R => VIO completion result not VDO_SUCCESS
* W => VIO is on a wait queue
* D => VIO is a duplicate
*
* <p>The common case of no flags set will result in an empty, null-terminated
* buffer. If any flags are encoded, the first character in the string will be
* a space character.
*
* @param dataVIO The VIO to encode
* @param buffer The buffer to receive a null-terminated string of encoded
* flag character
**/
static void encodeVIODumpFlags(DataVIO *dataVIO, char buffer[8])
{
char *pFlag = buffer;
*pFlag++ = ' ';
if (dataVIOAsCompletion(dataVIO)->result != VDO_SUCCESS) {
*pFlag++ = 'R';
}
if (dataVIOAsAllocatingVIO(dataVIO)->waiter.nextWaiter != NULL) {
*pFlag++ = 'W';
}
if (dataVIO->isDuplicate) {
*pFlag++ = 'D';
}
if (pFlag == &buffer[1]) {
// No flags, so remove the blank space.
pFlag = buffer;
}
*pFlag = '\0';
}
/**
* Dump out info on a DataKVIO from the DataKVIO pool.
*
* <p>Implements BufferDumpFunction.
*
* @param poolData The pool data
* @param data The DataKVIO to dump
**/
static void dumpPooledDataKVIO(void *poolData __attribute__((unused)),
void *data)
{
DataKVIO *dataKVIO = (DataKVIO *) data;
DataVIO *dataVIO = &dataKVIO->dataVIO;
/*
* This just needs to be big enough to hold a queue (thread) name
* and a function name (plus a separator character and NUL). The
* latter is limited only by taste.
*
* In making this static, we're assuming only one "dump" will run at
* a time. If more than one does run, the log output will be garbled
* anyway.
*/
static char vioWorkItemDumpBuffer[100 + MAX_QUEUE_NAME_LEN];
/*
* We're likely to be logging a couple thousand of these lines, and
* in some circumstances syslogd may have trouble keeping up, so
* keep it BRIEF rather than user-friendly.
*/
dumpWorkItemToBuffer(&dataKVIO->kvio.enqueueable.workItem,
vioWorkItemDumpBuffer, sizeof(vioWorkItemDumpBuffer));
// Another static buffer...
// log10(256) = 2.408+, round up:
enum { DECIMAL_DIGITS_PER_UINT64_T = (int) (1 + 2.41 * sizeof(uint64_t)) };
static char vioBlockNumberDumpBuffer[sizeof("P L D")
+ 3 * DECIMAL_DIGITS_PER_UINT64_T];
if (dataVIO->isDuplicate) {
snprintf(vioBlockNumberDumpBuffer, sizeof(vioBlockNumberDumpBuffer),
"P%llu L%llu D%llu",
getDataVIOAllocation(dataVIO), dataVIO->logical.lbn,
dataVIO->duplicate.pbn);
} else if (hasAllocation(dataVIO)) {
snprintf(vioBlockNumberDumpBuffer, sizeof(vioBlockNumberDumpBuffer),
"P%llu L%llu",
getDataVIOAllocation(dataVIO), dataVIO->logical.lbn);
} else {
snprintf(vioBlockNumberDumpBuffer, sizeof(vioBlockNumberDumpBuffer),
"L%llu",
dataVIO->logical.lbn);
}
static char vioFlushGenerationBuffer[sizeof(" FG")
+ DECIMAL_DIGITS_PER_UINT64_T] = "";
if (dataVIO->flushGeneration != 0) {
snprintf(vioFlushGenerationBuffer, sizeof(vioFlushGenerationBuffer),
" FG%llu", dataVIO->flushGeneration);
}
// Encode VIO attributes as a string of one-character flags, usually empty.
static char flagsDumpBuffer[8];
encodeVIODumpFlags(dataVIO, flagsDumpBuffer);
logInfo(" kvio %" PRIptr " %s%s %s %s%s",
dataKVIO, vioBlockNumberDumpBuffer, vioFlushGenerationBuffer,
getOperationName(dataVIO), vioWorkItemDumpBuffer, flagsDumpBuffer);
// might want info on: wantAlbireoAnswer / operation / status
// might want info on: bio / bioToSubmit / biosMerged
dumpVIOWaiters(&dataVIO->logical.waiters, "lbn");
// might want to dump more info from VIO here
}
/**********************************************************************/
int makeDataKVIOBufferPool(KernelLayer *layer,
uint32_t poolSize,
BufferPool **bufferPoolPtr)
{
return makeBufferPool("DataKVIO Pool", poolSize,
makePooledDataKVIO, freePooledDataKVIO,
dumpPooledDataKVIO, layer, bufferPoolPtr);
}
/**********************************************************************/
DataLocation getDedupeAdvice(const DedupeContext *context)
{
DataKVIO *dataKVIO = container_of(context, DataKVIO, dedupeContext);
return (DataLocation) {
.state = dataKVIO->dataVIO.newMapped.state,
.pbn = dataKVIO->dataVIO.newMapped.pbn,
};
}
/**********************************************************************/
void setDedupeAdvice(DedupeContext *context, const DataLocation *advice)
{
DataKVIO *dataKVIO = container_of(context, DataKVIO, dedupeContext);
receiveDedupeAdvice(&dataKVIO->dataVIO, advice);
}