/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/ioSubmitter.c#8 $
*/
#include "ioSubmitter.h"
#include <linux/version.h>
#include "memoryAlloc.h"
#include "bio.h"
#include "dataKVIO.h"
#include "kernelLayer.h"
#include "logger.h"
enum {
/*
* Whether to use bio merging code.
*
* Merging I/O requests in the request queue below us is helpful for
* many devices, and VDO does a good job sometimes of shuffling up
* the I/O order (too much for some simple I/O schedulers to sort
* out) as we deal with dedupe advice etc. The bio map tracks the
* yet-to-be-submitted I/O requests by block number so that we can
* collect together and submit sequential I/O operations that should
* be easy to merge. (So we don't actually *merge* them here, we
* just arrange them so that merging can happen.)
*
* For some devices, merging may not help, and we may want to turn
* off this code and save compute/spinlock cycles.
*/
USE_BIOMAP = 1,
};
/*
* Submission of bio operations to the underlying storage device will
* go through a separate work queue thread (or more than one) to
* prevent blocking in other threads if the storage device has a full
* queue. The plug structure allows that thread to do better batching
* of requests to make the I/O more efficient.
*
* When multiple worker threads are used, a thread is chosen for a
* I/O operation submission based on the PBN, so a given PBN will
* consistently wind up on the same thread. Flush operations are
* assigned round-robin.
*
* The map (protected by the mutex) collects pending I/O operations so
* that the worker thread can reorder them to try to encourage I/O
* request merging in the request queue underneath.
*/
typedef struct bioQueueData {
KvdoWorkQueue *queue;
struct blk_plug plug;
IntMap *map;
struct mutex lock;
unsigned int queueNumber;
} BioQueueData;
struct ioSubmitter {
unsigned int numBioQueuesUsed;
unsigned int bioQueueRotationInterval;
unsigned int bioQueueRotor;
BioQueueData bioQueueData[];
};
/**********************************************************************/
static void startBioQueue(void *ptr)
{
BioQueueData *bioQueueData = (BioQueueData *)ptr;
blk_start_plug(&bioQueueData->plug);
}
/**********************************************************************/
static void finishBioQueue(void *ptr)
{
BioQueueData *bioQueueData = (BioQueueData *)ptr;
blk_finish_plug(&bioQueueData->plug);
}
static const KvdoWorkQueueType bioQueueType = {
.start = startBioQueue,
.finish = finishBioQueue,
.actionTable = {
{ .name = "bio_compressed_data",
.code = BIO_Q_ACTION_COMPRESSED_DATA,
.priority = 0 },
{ .name = "bio_data",
.code = BIO_Q_ACTION_DATA,
.priority = 0 },
{ .name = "bio_flush",
.code = BIO_Q_ACTION_FLUSH,
.priority = 2 },
{ .name = "bio_high",
.code = BIO_Q_ACTION_HIGH,
.priority = 2 },
{ .name = "bio_metadata",
.code = BIO_Q_ACTION_METADATA,
.priority = 1 },
{ .name = "bio_readcache",
.code = BIO_Q_ACTION_READCACHE,
.priority = 0 },
{ .name = "bio_verify",
.code = BIO_Q_ACTION_VERIFY,
.priority = 1 },
},
};
/**
* Check that we're running normally (i.e., not in an
* interrupt-servicing context) in an IOSubmitter bio thread.
**/
static void assertRunningInBioQueue(void)
{
ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context");
ASSERT_LOG_ONLY(strnstr(current->comm, "bioQ", TASK_COMM_LEN) != NULL,
"running in bio submission work queue thread");
}
/**
* Returns the BioQueueData pointer associated with the current thread.
* Results are undefined if called from any other thread.
*
* @return the BioQueueData pointer
**/
static inline BioQueueData *getCurrentBioQueueData(void)
{
BioQueueData *bioQueueData = (BioQueueData *) getWorkQueuePrivateData();
// Does it look like a bio queue thread?
BUG_ON(bioQueueData == NULL);
BUG_ON(bioQueueData->queue != getCurrentWorkQueue());
return bioQueueData;
}
/**********************************************************************/
static inline IOSubmitter *bioQueueToSubmitter(BioQueueData *bioQueue)
{
BioQueueData *firstBioQueue = bioQueue - bioQueue->queueNumber;
IOSubmitter *submitter = container_of(firstBioQueue, IOSubmitter,
bioQueueData[0]);
return submitter;
}
/**
* Return the bio thread number handling the specified physical block
* number.
*
* @param ioSubmitter The I/O submitter data
* @param pbn The physical block number
*
* @return read cache zone number
**/
static unsigned int bioQueueNumberForPBN(IOSubmitter *ioSubmitter,
PhysicalBlockNumber pbn)
{
unsigned int bioQueueIndex
= ((pbn
% (ioSubmitter->numBioQueuesUsed
* ioSubmitter->bioQueueRotationInterval))
/ ioSubmitter->bioQueueRotationInterval);
return bioQueueIndex;
}
/**
* Check that we're running normally (i.e., not in an
* interrupt-servicing context) in an IOSubmitter bio thread. Also
* require that the thread we're running on is the correct one for the
* supplied physical block number.
*
* @param pbn The PBN that should have been used in thread selection
**/
static void assertRunningInBioQueueForPBN(PhysicalBlockNumber pbn)
{
assertRunningInBioQueue();
BioQueueData *thisQueue = getCurrentBioQueueData();
IOSubmitter *submitter = bioQueueToSubmitter(thisQueue);
unsigned int computedQueueNumber = bioQueueNumberForPBN(submitter, pbn);
ASSERT_LOG_ONLY(thisQueue->queueNumber == computedQueueNumber,
"running in correct bio queue (%u vs %u) for PBN %llu",
thisQueue->queueNumber, computedQueueNumber, pbn);
}
/**
* Increments appropriate counters for bio completions
*
* @param kvio the kvio associated with the bio
* @param bio the bio to count
*/
static void countAllBiosCompleted(KVIO *kvio, BIO *bio)
{
KernelLayer *layer = kvio->layer;
if (isData(kvio)) {
countBios(&layer->biosOutCompleted, bio);
return;
}
countBios(&layer->biosMetaCompleted, bio);
if (kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL) {
countBios(&layer->biosJournalCompleted, bio);
} else if (kvio->vio->type == VIO_TYPE_BLOCK_MAP) {
countBios(&layer->biosPageCacheCompleted, bio);
}
}
/**********************************************************************/
void countCompletedBios(BIO *bio)
{
KVIO *kvio = (KVIO *)bio->bi_private;
KernelLayer *layer = kvio->layer;
atomic64_inc(&layer->biosCompleted);
countAllBiosCompleted(kvio, bio);
}
/**********************************************************************/
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
void completeAsyncBio(BIO *bio)
#else
void completeAsyncBio(BIO *bio, int error)
#endif
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0)
int error = getBioResult(bio);
#endif
KVIO *kvio = (KVIO *) bio->bi_private;
kvioAddTraceRecord(kvio, THIS_LOCATION("$F($io);cb=io($io)"));
countCompletedBios(bio);
if ((error == 0) && isData(kvio) && isReadVIO(kvio->vio)) {
DataKVIO *dataKVIO = kvioAsDataKVIO(kvio);
if (!isCompressed(dataKVIO->dataVIO.mapped.state)
&& !dataKVIO->isPartial) {
kvdoAcknowledgeDataVIO(&dataKVIO->dataVIO);
return;
}
}
kvdoContinueKvio(kvio, error);
}
/**
* Determines which bio counter to use
*
* @param kvio the kvio associated with the bio
* @param bio the bio to count
*/
static void countAllBios(KVIO *kvio, BIO *bio)
{
KernelLayer *layer = kvio->layer;
if (isData(kvio)) {
countBios(&layer->biosOut, bio);
return;
}
countBios(&layer->biosMeta, bio);
if (kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL) {
countBios(&layer->biosJournal, bio);
} else if (kvio->vio->type == VIO_TYPE_BLOCK_MAP) {
countBios(&layer->biosPageCache, bio);
}
}
/**
* Update stats and tracing info, then submit the supplied bio to the
* OS for processing.
*
* @param kvio The KVIO associated with the bio
* @param bio The bio to submit to the OS
* @param location Call site location for tracing
**/
static void sendBioToDevice(KVIO *kvio, BIO *bio, TraceLocation location)
{
assertRunningInBioQueueForPBN(kvio->vio->physical);
atomic64_inc(&kvio->layer->biosSubmitted);
countAllBios(kvio, bio);
kvioAddTraceRecord(kvio, location);
bio->bi_next = NULL;
generic_make_request(bio);
}
/**
* Submits a bio to the underlying block device. May block if the
* device is busy.
*
* For metadata or if USE_BIOMAP is disabled, kvio->bioToSubmit holds
* the BIO pointer to submit to the target device. For normal
* data when USE_BIOMAP is enabled, kvio->biosMerged is the list of
* all bios collected together in this group; all of them get
* submitted. In both cases, the bi_end_io callback is invoked when
* each I/O operation completes.
*
* @param item The work item in the KVIO "owning" either the bio to
* submit, or the head of the bio_list to be submitted.
**/
static void processBioMap(KvdoWorkItem *item)
{
assertRunningInBioQueue();
KVIO *kvio = workItemAsKVIO(item);
/*
* XXX Make these paths more regular: Should bi_bdev be set here, or
* in the caller, or in the callback function? Should we call
* finishBioQueue for the biomap case on old kernels?
*/
if (USE_BIOMAP && isData(kvio)) {
// We need to make sure to do two things here:
// 1. Use each bio's kvio when submitting. Any other kvio is not safe
// 2. Detach the bio list from the kvio before submitting, because it
// could get reused/free'd up before all bios are submitted.
BioQueueData *bioQueueData = getWorkQueuePrivateData();
BIO *bio = NULL;
mutex_lock(&bioQueueData->lock);
if (!bio_list_empty(&kvio->biosMerged)) {
intMapRemove(bioQueueData->map, getBioSector(kvio->biosMerged.head));
intMapRemove(bioQueueData->map, getBioSector(kvio->biosMerged.tail));
}
bio = kvio->biosMerged.head;
bio_list_init(&kvio->biosMerged);
mutex_unlock(&bioQueueData->lock);
// Somewhere in the list we'll be submitting the current "kvio",
// so drop our handle on it now.
kvio = NULL;
while (bio != NULL) {
KVIO *kvioBio = bio->bi_private;
BIO *next = bio->bi_next;
bio->bi_next = NULL;
setBioBlockDevice(bio, getKernelLayerBdev(kvioBio->layer));
sendBioToDevice(kvioBio, bio, THIS_LOCATION("$F($io)"));
bio = next;
}
} else {
sendBioToDevice(kvio, kvio->bioToSubmit, THIS_LOCATION("$F($io)"));
}
}
/**
* This function will attempt to find an already queued bio that the current
* bio can be merged with. There are two types of merging possible, forward
* and backward, which are distinguished by a flag that uses kernel
* elevator terminology.
*
* @param map The bio map to use for merging
* @param kvio The kvio we want to merge
* @param mergeType The type of merging we want to try
*
* @return the kvio to merge to, NULL if no merging is possible
*/
static KVIO *getMergeableLocked(IntMap *map,
KVIO *kvio,
unsigned int mergeType)
{
BIO *bio = kvio->bioToSubmit;
sector_t mergeSector = getBioSector(bio);
switch (mergeType) {
case ELEVATOR_BACK_MERGE:
mergeSector -= VDO_SECTORS_PER_BLOCK;
break;
case ELEVATOR_FRONT_MERGE:
mergeSector += VDO_SECTORS_PER_BLOCK;
break;
}
KVIO *kvioMerge = intMapGet(map, mergeSector);
if (kvioMerge != NULL) {
if (!areWorkItemActionsEqual(&kvio->enqueueable.workItem,
&kvioMerge->enqueueable.workItem)) {
return NULL;
} else if (bio_data_dir(bio) != bio_data_dir(kvioMerge->bioToSubmit)) {
return NULL;
} else if (bio_list_empty(&kvioMerge->biosMerged)) {
return NULL;
} else {
switch (mergeType) {
case ELEVATOR_BACK_MERGE:
if (getBioSector(kvioMerge->biosMerged.tail) != mergeSector) {
return NULL;
}
break;
case ELEVATOR_FRONT_MERGE:
if (getBioSector(kvioMerge->biosMerged.head) != mergeSector) {
return NULL;
}
break;
}
}
}
return kvioMerge;
}
/**********************************************************************/
static inline unsigned int advanceBioRotor(IOSubmitter *bioData)
{
unsigned int index = bioData->bioQueueRotor++
% (bioData->numBioQueuesUsed
* bioData->bioQueueRotationInterval);
index /= bioData->bioQueueRotationInterval;
return index;
}
/**********************************************************************/
static bool tryBioMapMerge(BioQueueData *bioQueueData, KVIO *kvio, BIO *bio)
{
bool merged = false;
mutex_lock(&bioQueueData->lock);
KVIO *prevKvio = getMergeableLocked(bioQueueData->map, kvio,
ELEVATOR_BACK_MERGE);
KVIO *nextKvio = getMergeableLocked(bioQueueData->map, kvio,
ELEVATOR_FRONT_MERGE);
if (prevKvio == nextKvio) {
nextKvio = NULL;
}
int result;
if ((prevKvio == NULL) && (nextKvio == NULL)) {
// no merge. just add to bioQueue
result = intMapPut(bioQueueData->map, getBioSector(bio), kvio, true, NULL);
// We don't care about failure of intMapPut in this case.
result = result;
mutex_unlock(&bioQueueData->lock);
} else {
if (nextKvio == NULL) {
// Only prev. merge to prev's tail
intMapRemove(bioQueueData->map, getBioSector(prevKvio->biosMerged.tail));
bio_list_merge(&prevKvio->biosMerged, &kvio->biosMerged);
result = intMapPut(bioQueueData->map,
getBioSector(prevKvio->biosMerged.head),
prevKvio, true, NULL);
result = intMapPut(bioQueueData->map,
getBioSector(prevKvio->biosMerged.tail),
prevKvio, true, NULL);
} else {
// Only next. merge to next's head
//
// Handle "next merge" and "gap fill" cases the same way so as to
// reorder bios in a way that's compatible with using funnel queues
// in work queues. This avoids removing an existing work item.
intMapRemove(bioQueueData->map, getBioSector(nextKvio->biosMerged.head));
bio_list_merge_head(&nextKvio->biosMerged, &kvio->biosMerged);
result = intMapPut(bioQueueData->map,
getBioSector(nextKvio->biosMerged.head),
nextKvio, true, NULL);
result = intMapPut(bioQueueData->map,
getBioSector(nextKvio->biosMerged.tail),
nextKvio, true, NULL);
}
// We don't care about failure of intMapPut in this case.
result = result;
mutex_unlock(&bioQueueData->lock);
merged = true;
}
return merged;
}
/**********************************************************************/
static BioQueueData *bioQueueDataForPBN(IOSubmitter *ioSubmitter,
PhysicalBlockNumber pbn)
{
unsigned int bioQueueIndex = bioQueueNumberForPBN(ioSubmitter, pbn);
return &ioSubmitter->bioQueueData[bioQueueIndex];
}
/**********************************************************************/
void submitBio(BIO *bio, BioQAction action)
{
KVIO *kvio = bio->bi_private;
kvio->bioToSubmit = bio;
setupKVIOWork(kvio, processBioMap, (KvdoWorkFunction) bio->bi_end_io,
action);
KernelLayer *layer = kvio->layer;
BioQueueData *bioQueueData
= bioQueueDataForPBN(layer->ioSubmitter, kvio->vio->physical);
kvioAddTraceRecord(kvio, THIS_LOCATION("$F($io)"));
bio->bi_next = NULL;
bio_list_init(&kvio->biosMerged);
bio_list_add(&kvio->biosMerged, bio);
/*
* Enabling of MD RAID5 mode optimizes performance for MD RAID5 storage
* configurations. It clears the bits for sync I/O RW flags on data block
* bios and sets the bits for sync I/O RW flags on all journal-related
* bios.
*
* This increases the frequency of full-stripe writes by altering flags of
* submitted bios. For workloads with write requests this increases the
* likelihood that the MD RAID5 device will update a full stripe instead of
* a partial stripe, thereby avoiding making read requests to the underlying
* physical storage for purposes of parity chunk calculations.
*
* Setting the sync-flag on journal-related bios is expected to reduce
* latency on journal updates submitted to an MD RAID5 device.
*/
if (layer->deviceConfig->mdRaid5ModeEnabled) {
if (isData(kvio)) {
// Clear the bits for sync I/O RW flags on data block bios.
clearBioOperationFlagSync(bio);
} else if ((kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL)
|| (kvio->vio->type == VIO_TYPE_SLAB_JOURNAL)) {
// Set the bits for sync I/O RW flags on all journal-related and
// slab-journal-related bios.
setBioOperationFlagSync(bio);
}
}
/*
* Try to use the bio map to submit this bio earlier if we're already sending
* IO for an adjacent block. If we can't use an existing pending bio, enqueue
* an operation to run in a bio submission thread appropriate to the
* indicated physical block number.
*/
bool merged = false;
if (USE_BIOMAP && isData(kvio)) {
merged = tryBioMapMerge(bioQueueData, kvio, bio);
}
if (!merged) {
enqueueKVIOWork(bioQueueData->queue, kvio);
}
}
/**********************************************************************/
static int initializeBioQueue(BioQueueData *bioQueueData,
const char *threadNamePrefix,
const char *queueName,
unsigned int queueNumber,
KernelLayer *layer)
{
#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,38)
bioQueueData->bdev = layer->dev->bdev;
#endif
bioQueueData->queueNumber = queueNumber;
return makeWorkQueue(threadNamePrefix, queueName, &layer->wqDirectory,
layer, bioQueueData, &bioQueueType, 1,
&bioQueueData->queue);
}
/**********************************************************************/
int makeIOSubmitter(const char *threadNamePrefix,
unsigned int threadCount,
unsigned int rotationInterval,
unsigned int maxRequestsActive,
KernelLayer *layer,
IOSubmitter **ioSubmitterPtr)
{
IOSubmitter *ioSubmitter;
int result = ALLOCATE_EXTENDED(IOSubmitter,
threadCount,
BioQueueData,
"bio submission data",
&ioSubmitter);
if (result != UDS_SUCCESS) {
return result;
}
// Setup for each bio-submission work queue
char queueName[MAX_QUEUE_NAME_LEN];
ioSubmitter->bioQueueRotationInterval = rotationInterval;
for (unsigned int i=0; i < threadCount; i++) {
BioQueueData *bioQueueData = &ioSubmitter->bioQueueData[i];
snprintf(queueName, sizeof(queueName), "bioQ%u", i);
if (USE_BIOMAP) {
mutex_init(&bioQueueData->lock);
/*
* One I/O operation per request, but both first & last sector numbers.
*
* If requests are assigned to threads round-robin, they should
* be distributed quite evenly. But if they're assigned based on
* PBN, things can sometimes be very uneven. So for now, we'll
* assume that all requests *may* wind up on one thread, and
* thus all in the same map.
*/
result = makeIntMap(maxRequestsActive * 2, 0, &bioQueueData->map);
if (result != 0) {
// Clean up the partially initialized bio-queue entirely and
// indicate that initialization failed.
logError("bio map initialization failed %d", result);
cleanupIOSubmitter(ioSubmitter);
freeIOSubmitter(ioSubmitter);
return result;
}
}
result = initializeBioQueue(bioQueueData,
threadNamePrefix,
queueName,
i,
layer);
if (result != VDO_SUCCESS) {
// Clean up the partially initialized bio-queue entirely and
// indicate that initialization failed.
if (USE_BIOMAP) {
freeIntMap(&ioSubmitter->bioQueueData[i].map);
}
logError("bio queue initialization failed %d", result);
cleanupIOSubmitter(ioSubmitter);
freeIOSubmitter(ioSubmitter);
return result;
}
ioSubmitter->numBioQueuesUsed++;
}
*ioSubmitterPtr = ioSubmitter;
return VDO_SUCCESS;
}
/**********************************************************************/
void cleanupIOSubmitter(IOSubmitter *ioSubmitter)
{
for (int i=ioSubmitter->numBioQueuesUsed - 1; i >= 0; i--) {
finishWorkQueue(ioSubmitter->bioQueueData[i].queue);
}
}
/**********************************************************************/
void freeIOSubmitter(IOSubmitter *ioSubmitter)
{
for (int i = ioSubmitter->numBioQueuesUsed - 1; i >= 0; i--) {
ioSubmitter->numBioQueuesUsed--;
freeWorkQueue(&ioSubmitter->bioQueueData[i].queue);
if (USE_BIOMAP) {
freeIntMap(&ioSubmitter->bioQueueData[i].map);
}
}
FREE(ioSubmitter);
}
/**********************************************************************/
void dumpBioWorkQueue(IOSubmitter *ioSubmitter)
{
for (int i=0; i < ioSubmitter->numBioQueuesUsed; i++) {
dumpWorkQueue(ioSubmitter->bioQueueData[i].queue);
}
}
/**********************************************************************/
void enqueueBioWorkItem(IOSubmitter *ioSubmitter, KvdoWorkItem *workItem)
{
unsigned int bioQueueIndex = advanceBioRotor(ioSubmitter);
enqueueWorkQueue(ioSubmitter->bioQueueData[bioQueueIndex].queue,
workItem);
}