/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/ioSubmitter.c#8 $ */ #include "ioSubmitter.h" #include #include "memoryAlloc.h" #include "bio.h" #include "dataKVIO.h" #include "kernelLayer.h" #include "logger.h" enum { /* * Whether to use bio merging code. * * Merging I/O requests in the request queue below us is helpful for * many devices, and VDO does a good job sometimes of shuffling up * the I/O order (too much for some simple I/O schedulers to sort * out) as we deal with dedupe advice etc. The bio map tracks the * yet-to-be-submitted I/O requests by block number so that we can * collect together and submit sequential I/O operations that should * be easy to merge. (So we don't actually *merge* them here, we * just arrange them so that merging can happen.) * * For some devices, merging may not help, and we may want to turn * off this code and save compute/spinlock cycles. */ USE_BIOMAP = 1, }; /* * Submission of bio operations to the underlying storage device will * go through a separate work queue thread (or more than one) to * prevent blocking in other threads if the storage device has a full * queue. The plug structure allows that thread to do better batching * of requests to make the I/O more efficient. * * When multiple worker threads are used, a thread is chosen for a * I/O operation submission based on the PBN, so a given PBN will * consistently wind up on the same thread. Flush operations are * assigned round-robin. * * The map (protected by the mutex) collects pending I/O operations so * that the worker thread can reorder them to try to encourage I/O * request merging in the request queue underneath. */ typedef struct bioQueueData { KvdoWorkQueue *queue; struct blk_plug plug; IntMap *map; struct mutex lock; unsigned int queueNumber; } BioQueueData; struct ioSubmitter { unsigned int numBioQueuesUsed; unsigned int bioQueueRotationInterval; unsigned int bioQueueRotor; BioQueueData bioQueueData[]; }; /**********************************************************************/ static void startBioQueue(void *ptr) { BioQueueData *bioQueueData = (BioQueueData *)ptr; blk_start_plug(&bioQueueData->plug); } /**********************************************************************/ static void finishBioQueue(void *ptr) { BioQueueData *bioQueueData = (BioQueueData *)ptr; blk_finish_plug(&bioQueueData->plug); } static const KvdoWorkQueueType bioQueueType = { .start = startBioQueue, .finish = finishBioQueue, .actionTable = { { .name = "bio_compressed_data", .code = BIO_Q_ACTION_COMPRESSED_DATA, .priority = 0 }, { .name = "bio_data", .code = BIO_Q_ACTION_DATA, .priority = 0 }, { .name = "bio_flush", .code = BIO_Q_ACTION_FLUSH, .priority = 2 }, { .name = "bio_high", .code = BIO_Q_ACTION_HIGH, .priority = 2 }, { .name = "bio_metadata", .code = BIO_Q_ACTION_METADATA, .priority = 1 }, { .name = "bio_readcache", .code = BIO_Q_ACTION_READCACHE, .priority = 0 }, { .name = "bio_verify", .code = BIO_Q_ACTION_VERIFY, .priority = 1 }, }, }; /** * Check that we're running normally (i.e., not in an * interrupt-servicing context) in an IOSubmitter bio thread. **/ static void assertRunningInBioQueue(void) { ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context"); ASSERT_LOG_ONLY(strnstr(current->comm, "bioQ", TASK_COMM_LEN) != NULL, "running in bio submission work queue thread"); } /** * Returns the BioQueueData pointer associated with the current thread. * Results are undefined if called from any other thread. * * @return the BioQueueData pointer **/ static inline BioQueueData *getCurrentBioQueueData(void) { BioQueueData *bioQueueData = (BioQueueData *) getWorkQueuePrivateData(); // Does it look like a bio queue thread? BUG_ON(bioQueueData == NULL); BUG_ON(bioQueueData->queue != getCurrentWorkQueue()); return bioQueueData; } /**********************************************************************/ static inline IOSubmitter *bioQueueToSubmitter(BioQueueData *bioQueue) { BioQueueData *firstBioQueue = bioQueue - bioQueue->queueNumber; IOSubmitter *submitter = container_of(firstBioQueue, IOSubmitter, bioQueueData[0]); return submitter; } /** * Return the bio thread number handling the specified physical block * number. * * @param ioSubmitter The I/O submitter data * @param pbn The physical block number * * @return read cache zone number **/ static unsigned int bioQueueNumberForPBN(IOSubmitter *ioSubmitter, PhysicalBlockNumber pbn) { unsigned int bioQueueIndex = ((pbn % (ioSubmitter->numBioQueuesUsed * ioSubmitter->bioQueueRotationInterval)) / ioSubmitter->bioQueueRotationInterval); return bioQueueIndex; } /** * Check that we're running normally (i.e., not in an * interrupt-servicing context) in an IOSubmitter bio thread. Also * require that the thread we're running on is the correct one for the * supplied physical block number. * * @param pbn The PBN that should have been used in thread selection **/ static void assertRunningInBioQueueForPBN(PhysicalBlockNumber pbn) { assertRunningInBioQueue(); BioQueueData *thisQueue = getCurrentBioQueueData(); IOSubmitter *submitter = bioQueueToSubmitter(thisQueue); unsigned int computedQueueNumber = bioQueueNumberForPBN(submitter, pbn); ASSERT_LOG_ONLY(thisQueue->queueNumber == computedQueueNumber, "running in correct bio queue (%u vs %u) for PBN %llu", thisQueue->queueNumber, computedQueueNumber, pbn); } /** * Increments appropriate counters for bio completions * * @param kvio the kvio associated with the bio * @param bio the bio to count */ static void countAllBiosCompleted(KVIO *kvio, BIO *bio) { KernelLayer *layer = kvio->layer; if (isData(kvio)) { countBios(&layer->biosOutCompleted, bio); return; } countBios(&layer->biosMetaCompleted, bio); if (kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL) { countBios(&layer->biosJournalCompleted, bio); } else if (kvio->vio->type == VIO_TYPE_BLOCK_MAP) { countBios(&layer->biosPageCacheCompleted, bio); } } /**********************************************************************/ void countCompletedBios(BIO *bio) { KVIO *kvio = (KVIO *)bio->bi_private; KernelLayer *layer = kvio->layer; atomic64_inc(&layer->biosCompleted); countAllBiosCompleted(kvio, bio); } /**********************************************************************/ #if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0) void completeAsyncBio(BIO *bio) #else void completeAsyncBio(BIO *bio, int error) #endif { #if LINUX_VERSION_CODE >= KERNEL_VERSION(4,4,0) int error = getBioResult(bio); #endif KVIO *kvio = (KVIO *) bio->bi_private; kvioAddTraceRecord(kvio, THIS_LOCATION("$F($io);cb=io($io)")); countCompletedBios(bio); if ((error == 0) && isData(kvio) && isReadVIO(kvio->vio)) { DataKVIO *dataKVIO = kvioAsDataKVIO(kvio); if (!isCompressed(dataKVIO->dataVIO.mapped.state) && !dataKVIO->isPartial) { kvdoAcknowledgeDataVIO(&dataKVIO->dataVIO); return; } } kvdoContinueKvio(kvio, error); } /** * Determines which bio counter to use * * @param kvio the kvio associated with the bio * @param bio the bio to count */ static void countAllBios(KVIO *kvio, BIO *bio) { KernelLayer *layer = kvio->layer; if (isData(kvio)) { countBios(&layer->biosOut, bio); return; } countBios(&layer->biosMeta, bio); if (kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL) { countBios(&layer->biosJournal, bio); } else if (kvio->vio->type == VIO_TYPE_BLOCK_MAP) { countBios(&layer->biosPageCache, bio); } } /** * Update stats and tracing info, then submit the supplied bio to the * OS for processing. * * @param kvio The KVIO associated with the bio * @param bio The bio to submit to the OS * @param location Call site location for tracing **/ static void sendBioToDevice(KVIO *kvio, BIO *bio, TraceLocation location) { assertRunningInBioQueueForPBN(kvio->vio->physical); atomic64_inc(&kvio->layer->biosSubmitted); countAllBios(kvio, bio); kvioAddTraceRecord(kvio, location); bio->bi_next = NULL; generic_make_request(bio); } /** * Submits a bio to the underlying block device. May block if the * device is busy. * * For metadata or if USE_BIOMAP is disabled, kvio->bioToSubmit holds * the BIO pointer to submit to the target device. For normal * data when USE_BIOMAP is enabled, kvio->biosMerged is the list of * all bios collected together in this group; all of them get * submitted. In both cases, the bi_end_io callback is invoked when * each I/O operation completes. * * @param item The work item in the KVIO "owning" either the bio to * submit, or the head of the bio_list to be submitted. **/ static void processBioMap(KvdoWorkItem *item) { assertRunningInBioQueue(); KVIO *kvio = workItemAsKVIO(item); /* * XXX Make these paths more regular: Should bi_bdev be set here, or * in the caller, or in the callback function? Should we call * finishBioQueue for the biomap case on old kernels? */ if (USE_BIOMAP && isData(kvio)) { // We need to make sure to do two things here: // 1. Use each bio's kvio when submitting. Any other kvio is not safe // 2. Detach the bio list from the kvio before submitting, because it // could get reused/free'd up before all bios are submitted. BioQueueData *bioQueueData = getWorkQueuePrivateData(); BIO *bio = NULL; mutex_lock(&bioQueueData->lock); if (!bio_list_empty(&kvio->biosMerged)) { intMapRemove(bioQueueData->map, getBioSector(kvio->biosMerged.head)); intMapRemove(bioQueueData->map, getBioSector(kvio->biosMerged.tail)); } bio = kvio->biosMerged.head; bio_list_init(&kvio->biosMerged); mutex_unlock(&bioQueueData->lock); // Somewhere in the list we'll be submitting the current "kvio", // so drop our handle on it now. kvio = NULL; while (bio != NULL) { KVIO *kvioBio = bio->bi_private; BIO *next = bio->bi_next; bio->bi_next = NULL; setBioBlockDevice(bio, getKernelLayerBdev(kvioBio->layer)); sendBioToDevice(kvioBio, bio, THIS_LOCATION("$F($io)")); bio = next; } } else { sendBioToDevice(kvio, kvio->bioToSubmit, THIS_LOCATION("$F($io)")); } } /** * This function will attempt to find an already queued bio that the current * bio can be merged with. There are two types of merging possible, forward * and backward, which are distinguished by a flag that uses kernel * elevator terminology. * * @param map The bio map to use for merging * @param kvio The kvio we want to merge * @param mergeType The type of merging we want to try * * @return the kvio to merge to, NULL if no merging is possible */ static KVIO *getMergeableLocked(IntMap *map, KVIO *kvio, unsigned int mergeType) { BIO *bio = kvio->bioToSubmit; sector_t mergeSector = getBioSector(bio); switch (mergeType) { case ELEVATOR_BACK_MERGE: mergeSector -= VDO_SECTORS_PER_BLOCK; break; case ELEVATOR_FRONT_MERGE: mergeSector += VDO_SECTORS_PER_BLOCK; break; } KVIO *kvioMerge = intMapGet(map, mergeSector); if (kvioMerge != NULL) { if (!areWorkItemActionsEqual(&kvio->enqueueable.workItem, &kvioMerge->enqueueable.workItem)) { return NULL; } else if (bio_data_dir(bio) != bio_data_dir(kvioMerge->bioToSubmit)) { return NULL; } else if (bio_list_empty(&kvioMerge->biosMerged)) { return NULL; } else { switch (mergeType) { case ELEVATOR_BACK_MERGE: if (getBioSector(kvioMerge->biosMerged.tail) != mergeSector) { return NULL; } break; case ELEVATOR_FRONT_MERGE: if (getBioSector(kvioMerge->biosMerged.head) != mergeSector) { return NULL; } break; } } } return kvioMerge; } /**********************************************************************/ static inline unsigned int advanceBioRotor(IOSubmitter *bioData) { unsigned int index = bioData->bioQueueRotor++ % (bioData->numBioQueuesUsed * bioData->bioQueueRotationInterval); index /= bioData->bioQueueRotationInterval; return index; } /**********************************************************************/ static bool tryBioMapMerge(BioQueueData *bioQueueData, KVIO *kvio, BIO *bio) { bool merged = false; mutex_lock(&bioQueueData->lock); KVIO *prevKvio = getMergeableLocked(bioQueueData->map, kvio, ELEVATOR_BACK_MERGE); KVIO *nextKvio = getMergeableLocked(bioQueueData->map, kvio, ELEVATOR_FRONT_MERGE); if (prevKvio == nextKvio) { nextKvio = NULL; } int result; if ((prevKvio == NULL) && (nextKvio == NULL)) { // no merge. just add to bioQueue result = intMapPut(bioQueueData->map, getBioSector(bio), kvio, true, NULL); // We don't care about failure of intMapPut in this case. result = result; mutex_unlock(&bioQueueData->lock); } else { if (nextKvio == NULL) { // Only prev. merge to prev's tail intMapRemove(bioQueueData->map, getBioSector(prevKvio->biosMerged.tail)); bio_list_merge(&prevKvio->biosMerged, &kvio->biosMerged); result = intMapPut(bioQueueData->map, getBioSector(prevKvio->biosMerged.head), prevKvio, true, NULL); result = intMapPut(bioQueueData->map, getBioSector(prevKvio->biosMerged.tail), prevKvio, true, NULL); } else { // Only next. merge to next's head // // Handle "next merge" and "gap fill" cases the same way so as to // reorder bios in a way that's compatible with using funnel queues // in work queues. This avoids removing an existing work item. intMapRemove(bioQueueData->map, getBioSector(nextKvio->biosMerged.head)); bio_list_merge_head(&nextKvio->biosMerged, &kvio->biosMerged); result = intMapPut(bioQueueData->map, getBioSector(nextKvio->biosMerged.head), nextKvio, true, NULL); result = intMapPut(bioQueueData->map, getBioSector(nextKvio->biosMerged.tail), nextKvio, true, NULL); } // We don't care about failure of intMapPut in this case. result = result; mutex_unlock(&bioQueueData->lock); merged = true; } return merged; } /**********************************************************************/ static BioQueueData *bioQueueDataForPBN(IOSubmitter *ioSubmitter, PhysicalBlockNumber pbn) { unsigned int bioQueueIndex = bioQueueNumberForPBN(ioSubmitter, pbn); return &ioSubmitter->bioQueueData[bioQueueIndex]; } /**********************************************************************/ void submitBio(BIO *bio, BioQAction action) { KVIO *kvio = bio->bi_private; kvio->bioToSubmit = bio; setupKVIOWork(kvio, processBioMap, (KvdoWorkFunction) bio->bi_end_io, action); KernelLayer *layer = kvio->layer; BioQueueData *bioQueueData = bioQueueDataForPBN(layer->ioSubmitter, kvio->vio->physical); kvioAddTraceRecord(kvio, THIS_LOCATION("$F($io)")); bio->bi_next = NULL; bio_list_init(&kvio->biosMerged); bio_list_add(&kvio->biosMerged, bio); /* * Enabling of MD RAID5 mode optimizes performance for MD RAID5 storage * configurations. It clears the bits for sync I/O RW flags on data block * bios and sets the bits for sync I/O RW flags on all journal-related * bios. * * This increases the frequency of full-stripe writes by altering flags of * submitted bios. For workloads with write requests this increases the * likelihood that the MD RAID5 device will update a full stripe instead of * a partial stripe, thereby avoiding making read requests to the underlying * physical storage for purposes of parity chunk calculations. * * Setting the sync-flag on journal-related bios is expected to reduce * latency on journal updates submitted to an MD RAID5 device. */ if (layer->deviceConfig->mdRaid5ModeEnabled) { if (isData(kvio)) { // Clear the bits for sync I/O RW flags on data block bios. clearBioOperationFlagSync(bio); } else if ((kvio->vio->type == VIO_TYPE_RECOVERY_JOURNAL) || (kvio->vio->type == VIO_TYPE_SLAB_JOURNAL)) { // Set the bits for sync I/O RW flags on all journal-related and // slab-journal-related bios. setBioOperationFlagSync(bio); } } /* * Try to use the bio map to submit this bio earlier if we're already sending * IO for an adjacent block. If we can't use an existing pending bio, enqueue * an operation to run in a bio submission thread appropriate to the * indicated physical block number. */ bool merged = false; if (USE_BIOMAP && isData(kvio)) { merged = tryBioMapMerge(bioQueueData, kvio, bio); } if (!merged) { enqueueKVIOWork(bioQueueData->queue, kvio); } } /**********************************************************************/ static int initializeBioQueue(BioQueueData *bioQueueData, const char *threadNamePrefix, const char *queueName, unsigned int queueNumber, KernelLayer *layer) { #if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,38) bioQueueData->bdev = layer->dev->bdev; #endif bioQueueData->queueNumber = queueNumber; return makeWorkQueue(threadNamePrefix, queueName, &layer->wqDirectory, layer, bioQueueData, &bioQueueType, 1, &bioQueueData->queue); } /**********************************************************************/ int makeIOSubmitter(const char *threadNamePrefix, unsigned int threadCount, unsigned int rotationInterval, unsigned int maxRequestsActive, KernelLayer *layer, IOSubmitter **ioSubmitterPtr) { IOSubmitter *ioSubmitter; int result = ALLOCATE_EXTENDED(IOSubmitter, threadCount, BioQueueData, "bio submission data", &ioSubmitter); if (result != UDS_SUCCESS) { return result; } // Setup for each bio-submission work queue char queueName[MAX_QUEUE_NAME_LEN]; ioSubmitter->bioQueueRotationInterval = rotationInterval; for (unsigned int i=0; i < threadCount; i++) { BioQueueData *bioQueueData = &ioSubmitter->bioQueueData[i]; snprintf(queueName, sizeof(queueName), "bioQ%u", i); if (USE_BIOMAP) { mutex_init(&bioQueueData->lock); /* * One I/O operation per request, but both first & last sector numbers. * * If requests are assigned to threads round-robin, they should * be distributed quite evenly. But if they're assigned based on * PBN, things can sometimes be very uneven. So for now, we'll * assume that all requests *may* wind up on one thread, and * thus all in the same map. */ result = makeIntMap(maxRequestsActive * 2, 0, &bioQueueData->map); if (result != 0) { // Clean up the partially initialized bio-queue entirely and // indicate that initialization failed. logError("bio map initialization failed %d", result); cleanupIOSubmitter(ioSubmitter); freeIOSubmitter(ioSubmitter); return result; } } result = initializeBioQueue(bioQueueData, threadNamePrefix, queueName, i, layer); if (result != VDO_SUCCESS) { // Clean up the partially initialized bio-queue entirely and // indicate that initialization failed. if (USE_BIOMAP) { freeIntMap(&ioSubmitter->bioQueueData[i].map); } logError("bio queue initialization failed %d", result); cleanupIOSubmitter(ioSubmitter); freeIOSubmitter(ioSubmitter); return result; } ioSubmitter->numBioQueuesUsed++; } *ioSubmitterPtr = ioSubmitter; return VDO_SUCCESS; } /**********************************************************************/ void cleanupIOSubmitter(IOSubmitter *ioSubmitter) { for (int i=ioSubmitter->numBioQueuesUsed - 1; i >= 0; i--) { finishWorkQueue(ioSubmitter->bioQueueData[i].queue); } } /**********************************************************************/ void freeIOSubmitter(IOSubmitter *ioSubmitter) { for (int i = ioSubmitter->numBioQueuesUsed - 1; i >= 0; i--) { ioSubmitter->numBioQueuesUsed--; freeWorkQueue(&ioSubmitter->bioQueueData[i].queue); if (USE_BIOMAP) { freeIntMap(&ioSubmitter->bioQueueData[i].map); } } FREE(ioSubmitter); } /**********************************************************************/ void dumpBioWorkQueue(IOSubmitter *ioSubmitter) { for (int i=0; i < ioSubmitter->numBioQueuesUsed; i++) { dumpWorkQueue(ioSubmitter->bioQueueData[i].queue); } } /**********************************************************************/ void enqueueBioWorkItem(IOSubmitter *ioSubmitter, KvdoWorkItem *workItem) { unsigned int bioQueueIndex = advanceBioRotor(ioSubmitter); enqueueWorkQueue(ioSubmitter->bioQueueData[bioQueueIndex].queue, workItem); }