/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/uds-releases/jasper/src/uds/volume.c#23 $ */ #include "volume.h" #include "cacheCounters.h" #include "chapterIndex.h" #include "compiler.h" #include "errors.h" #include "geometry.h" #include "hashUtils.h" #include "indexConfig.h" #include "logger.h" #include "memoryAlloc.h" #include "permassert.h" #include "recordPage.h" #include "request.h" #include "sparseCache.h" #include "stringUtils.h" #include "threads.h" enum { MAX_BAD_CHAPTERS = 100, // max number of contiguous bad chapters DEFAULT_VOLUME_READ_THREADS = 2, // Default number of reader threads MAX_VOLUME_READ_THREADS = 16, // Maximum number of reader threads }; /**********************************************************************/ static unsigned int getReadThreads(const struct uds_parameters *userParams) { unsigned int readThreads = (userParams == NULL ? DEFAULT_VOLUME_READ_THREADS : userParams->read_threads); if (readThreads < 1) { readThreads = 1; } if (readThreads > MAX_VOLUME_READ_THREADS) { readThreads = MAX_VOLUME_READ_THREADS; } return readThreads; } /**********************************************************************/ static INLINE unsigned int mapToPageNumber(Geometry *geometry, unsigned int physicalPage) { return ((physicalPage - 1) % geometry->pagesPerChapter); } /**********************************************************************/ static INLINE unsigned int mapToChapterNumber(Geometry *geometry, unsigned int physicalPage) { return ((physicalPage - 1) / geometry->pagesPerChapter); } /**********************************************************************/ static INLINE bool isRecordPage(Geometry *geometry, unsigned int physicalPage) { return (((physicalPage - 1) % geometry->pagesPerChapter) >= geometry->indexPagesPerChapter); } /**********************************************************************/ static INLINE unsigned int getZoneNumber(Request *request) { return (request == NULL) ? 0 : request->zoneNumber; } /**********************************************************************/ int mapToPhysicalPage(const Geometry *geometry, int chapter, int page) { // Page zero is the header page, so the first index page in the // first chapter is physical page one. return (1 + (geometry->pagesPerChapter * chapter) + page); } /**********************************************************************/ static void waitForReadQueueNotFull(Volume *volume, Request *request) { unsigned int zoneNumber = getZoneNumber(request); InvalidateCounter invalidateCounter = getInvalidateCounter(volume->pageCache, zoneNumber); if (searchPending(invalidateCounter)) { // Increment the invalidate counter to avoid deadlock where the reader // threads cannot make progress because they are waiting on the counter // and the index thread cannot because the read queue is full. endPendingSearch(volume->pageCache, zoneNumber); } while (readQueueIsFull(volume->pageCache)) { logDebug("Waiting until read queue not full"); signalCond(&volume->readThreadsCond); waitCond(&volume->readThreadsReadDoneCond, &volume->readThreadsMutex); } if (searchPending(invalidateCounter)) { // Increment again so we get back to an odd value. beginPendingSearch(volume->pageCache, pageBeingSearched(invalidateCounter), zoneNumber); } } /**********************************************************************/ int enqueuePageRead(Volume *volume, Request *request, int physicalPage) { // Don't allow new requests if we are shutting down, but make sure // to process any requests that are still in the pipeline. if ((volume->readerState & READER_STATE_EXIT) != 0) { logInfo("failed to queue read while shutting down"); return UDS_SHUTTINGDOWN; } // Mark the page as queued in the volume cache, for chapter invalidation to // be able to cancel a read. // If we are unable to do this because the queues are full, flush them first int result; while ((result = enqueueRead(volume->pageCache, request, physicalPage)) == UDS_SUCCESS) { logDebug("Read queues full, waiting for reads to finish"); waitForReadQueueNotFull(volume, request); } if (result == UDS_QUEUED) { /* signal a read thread */ signalCond(&volume->readThreadsCond); } return result; } /**********************************************************************/ static INLINE void waitToReserveReadQueueEntry(Volume *volume, unsigned int *queuePos, Request **requestList, unsigned int *physicalPage, bool *invalid) { while (((volume->readerState & READER_STATE_EXIT) == 0) && (((volume->readerState & READER_STATE_STOP) != 0) || !reserveReadQueueEntry(volume->pageCache, queuePos, requestList, physicalPage, invalid))) { waitCond(&volume->readThreadsCond, &volume->readThreadsMutex); } } /**********************************************************************/ static int initChapterIndexPage(const Volume *volume, byte *indexPage, unsigned int chapter, unsigned int indexPageNumber, DeltaIndexPage *chapterIndexPage) { Geometry *geometry = volume->geometry; int result = initializeChapterIndexPage(chapterIndexPage, geometry, indexPage, volume->nonce); if (volume->lookupMode == LOOKUP_FOR_REBUILD) { return result; } if (result != UDS_SUCCESS) { return logErrorWithStringError(result, "Reading chapter index page for chapter %u" " page %u", chapter, indexPageNumber); } IndexPageBounds bounds; result = getListNumberBounds(volume->indexPageMap, chapter, indexPageNumber, &bounds); if (result != UDS_SUCCESS) { return result; } uint64_t ciVirtual = chapterIndexPage->virtualChapterNumber; unsigned int ciChapter = mapToPhysicalChapter(geometry, ciVirtual); if ((chapter == ciChapter) && (bounds.lowestList == chapterIndexPage->lowestListNumber) && (bounds.highestList == chapterIndexPage->highestListNumber)) { return UDS_SUCCESS; } logWarning("Index page map updated to %llu", getLastUpdate(volume->indexPageMap)); logWarning("Page map expects that chapter %u page %u has range %u to %u, " "but chapter index page has chapter %" PRIu64 " with range %u to %u", chapter, indexPageNumber, bounds.lowestList, bounds.highestList, ciVirtual, chapterIndexPage->lowestListNumber, chapterIndexPage->highestListNumber); return ASSERT_WITH_ERROR_CODE(false, UDS_CORRUPT_DATA, "index page map mismatch with chapter index"); } /**********************************************************************/ static int initializeIndexPage(const Volume *volume, unsigned int physicalPage, CachedPage *page) { unsigned int chapter = mapToChapterNumber(volume->geometry, physicalPage); unsigned int indexPageNumber = mapToPageNumber(volume->geometry, physicalPage); int result = initChapterIndexPage(volume, getPageData(&page->cp_pageData), chapter, indexPageNumber, &page->cp_indexPage); return result; } /**********************************************************************/ static void readThreadFunction(void *arg) { Volume *volume = arg; unsigned int queuePos; Request *requestList; unsigned int physicalPage; bool invalid = false; logDebug("reader starting"); lockMutex(&volume->readThreadsMutex); while (true) { waitToReserveReadQueueEntry(volume, &queuePos, &requestList, &physicalPage, &invalid); if ((volume->readerState & READER_STATE_EXIT) != 0) { break; } volume->busyReaderThreads++; bool recordPage = isRecordPage(volume->geometry, physicalPage); CachedPage *page = NULL; int result = UDS_SUCCESS; if (!invalid) { // Find a place to put the read queue page we reserved above. result = selectVictimInCache(volume->pageCache, &page); if (result == UDS_SUCCESS) { unlockMutex(&volume->readThreadsMutex); result = readVolumePage(&volume->volumeStore, physicalPage, &page->cp_pageData); if (result != UDS_SUCCESS) { logWarning("Error reading page %u from volume", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); } lockMutex(&volume->readThreadsMutex); } else { logWarning("Error selecting cache victim for page read"); } if (result == UDS_SUCCESS) { if (!volume->pageCache->readQueue[queuePos].invalid) { if (!recordPage) { result = initializeIndexPage(volume, physicalPage, page); if (result != UDS_SUCCESS) { logWarning("Error initializing chapter index page"); cancelPageInCache(volume->pageCache, physicalPage, page); } } if (result == UDS_SUCCESS) { result = putPageInCache(volume->pageCache, physicalPage, page); if (result != UDS_SUCCESS) { logWarning("Error putting page %u in cache", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); } } } else { logWarning("Page %u invalidated after read", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); invalid = true; } } } else { logDebug("Requeuing requests for invalid page"); } if (invalid) { result = UDS_SUCCESS; page = NULL; } while (requestList != NULL) { Request *request = requestList; requestList = request->nextRequest; /* * If we've read in a record page, we're going to do an immediate search, * in an attempt to speed up processing when we requeue the request, so * that it doesn't have to go back into the getRecordFromZone code again. * However, if we've just read in an index page, we don't want to search. * We want the request to be processed again and getRecordFromZone to be * run. We have added new fields in request to allow the index code to * know whether it can stop processing before getRecordFromZone is called * again. */ if ((result == UDS_SUCCESS) && (page != NULL) && recordPage) { if (searchRecordPage(getPageData(&page->cp_pageData), &request->chunkName, volume->geometry, &request->oldMetadata)) { request->slLocation = LOC_IN_DENSE; } else { request->slLocation = LOC_UNAVAILABLE; } request->slLocationKnown = true; } // reflect any read failures in the request status request->status = result; restartRequest(request); } releaseReadQueueEntry(volume->pageCache, queuePos); volume->busyReaderThreads--; broadcastCond(&volume->readThreadsReadDoneCond); } unlockMutex(&volume->readThreadsMutex); logDebug("reader done"); } /**********************************************************************/ static int readPageLocked(Volume *volume, Request *request, unsigned int physicalPage, bool syncRead, CachedPage **pagePtr) { syncRead |= ((volume->lookupMode == LOOKUP_FOR_REBUILD) || (request == NULL) || (request->session == NULL)); int result = UDS_SUCCESS; CachedPage *page = NULL; if (syncRead) { // Find a place to put the page. result = selectVictimInCache(volume->pageCache, &page); if (result != UDS_SUCCESS) { logWarning("Error selecting cache victim for page read"); return result; } result = readVolumePage(&volume->volumeStore, physicalPage, &page->cp_pageData); if (result != UDS_SUCCESS) { logWarning("Error reading page %u from volume", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); return result; } if (!isRecordPage(volume->geometry, physicalPage)) { result = initializeIndexPage(volume, physicalPage, page); if (result != UDS_SUCCESS) { if (volume->lookupMode != LOOKUP_FOR_REBUILD) { logWarning("Corrupt index page %u", physicalPage); } cancelPageInCache(volume->pageCache, physicalPage, page); return result; } } result = putPageInCache(volume->pageCache, physicalPage, page); if (result != UDS_SUCCESS) { logWarning("Error putting page %u in cache", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); return result; } } else { result = enqueuePageRead(volume, request, physicalPage); if (result != UDS_SUCCESS) { return result; } } *pagePtr = page; return UDS_SUCCESS; } /**********************************************************************/ int getPageLocked(Volume *volume, Request *request, unsigned int physicalPage, CacheProbeType probeType, CachedPage **pagePtr) { CachedPage *page = NULL; int result = getPageFromCache(volume->pageCache, physicalPage, probeType, &page); if (result != UDS_SUCCESS) { return result; } if (page == NULL) { result = readPageLocked(volume, request, physicalPage, true, &page); if (result != UDS_SUCCESS) { return result; } } else if (getZoneNumber(request) == 0) { // Only 1 zone is responsible for updating LRU makePageMostRecent(volume->pageCache, page); } *pagePtr = page; return UDS_SUCCESS; } /**********************************************************************/ int getPageProtected(Volume *volume, Request *request, unsigned int physicalPage, CacheProbeType probeType, CachedPage **pagePtr) { CachedPage *page = NULL; int result = getPageFromCache(volume->pageCache, physicalPage, probeType | CACHE_PROBE_IGNORE_FAILURE, &page); if (result != UDS_SUCCESS) { return result; } unsigned int zoneNumber = getZoneNumber(request); // If we didn't find a page we need to enqueue a read for it, in which // case we need to grab the mutex. if (page == NULL) { endPendingSearch(volume->pageCache, zoneNumber); lockMutex(&volume->readThreadsMutex); /* * Do the lookup again while holding the read mutex (no longer the fast * case so this should be ok to repeat). We need to do this because an * page may have been added to the page map by the reader thread between * the time searched above and the time we went to actually try to enqueue * it below. This could result in us enqueuing another read for an page * which is already in the cache, which would mean we end up with two * entries in the cache for the same page. */ result = getPageFromCache(volume->pageCache, physicalPage, probeType, &page); if (result != UDS_SUCCESS) { /* * In non-success cases (anything not UDS_SUCCESS, meaning both * UDS_QUEUED and "real" errors), the caller doesn't get a * handle on a cache page, so it can't continue the search, and * we don't need to prevent other threads from messing with the * cache. * * However, we do need to set the "search pending" flag because * the callers expect it to always be set on return, even if * they can't actually do the search. * * Doing the calls in this order ought to be faster, since we * let other threads have the reader thread mutex (which can * require a syscall) ASAP, and set the "search pending" state * that can block the reader thread as the last thing. */ unlockMutex(&volume->readThreadsMutex); beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); return result; } // If we found the page now, we can release the mutex and proceed // as if this were the fast case. if (page != NULL) { /* * If we found a page (*pagePtr != NULL and return * UDS_SUCCESS), then we're telling the caller where to look for * the cache page, and need to switch to "reader thread * unlocked" and "search pending" state in careful order so no * other thread can mess with the data before our caller gets to * look at it. */ beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); unlockMutex(&volume->readThreadsMutex); } } if (page == NULL) { result = readPageLocked(volume, request, physicalPage, false, &page); if (result != UDS_SUCCESS) { /* * This code path is used frequently in the UDS_QUEUED case, so * the performance gain from unlocking first, while "search * pending" mode is off, turns out to be significant in some * cases. */ unlockMutex(&volume->readThreadsMutex); beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); return result; } // See above re: ordering requirement. beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); unlockMutex(&volume->readThreadsMutex); } else { if (getZoneNumber(request) == 0 ) { // Only 1 zone is responsible for updating LRU makePageMostRecent(volume->pageCache, page); } } *pagePtr = page; return UDS_SUCCESS; } /**********************************************************************/ int getPage(Volume *volume, unsigned int chapter, unsigned int pageNumber, CacheProbeType probeType, byte **dataPtr, DeltaIndexPage **indexPagePtr) { unsigned int physicalPage = mapToPhysicalPage(volume->geometry, chapter, pageNumber); lockMutex(&volume->readThreadsMutex); CachedPage *page = NULL; int result = getPageLocked(volume, NULL, physicalPage, probeType, &page); unlockMutex(&volume->readThreadsMutex); if (dataPtr != NULL) { *dataPtr = (page != NULL) ? getPageData(&page->cp_pageData) : NULL; } if (indexPagePtr != NULL) { *indexPagePtr = (page != NULL) ? &page->cp_indexPage : NULL; } return result; } /** * Search for a chunk name in a cached index page or chapter index, returning * the record page number from a chapter index match. * * @param volume the volume containing the index page to search * @param request the request originating the search (may be NULL for * a direct query from volume replay) * @param name the name of the block or chunk * @param chapter the chapter to search * @param indexPageNumber the index page number of the page to search * @param recordPageNumber pointer to return the chapter record page number * (value will be NO_CHAPTER_INDEX_ENTRY if the name * was not found) * * @return UDS_SUCCESS or an error code **/ static int searchCachedIndexPage(Volume *volume, Request *request, const UdsChunkName *name, unsigned int chapter, unsigned int indexPageNumber, int *recordPageNumber) { unsigned int zoneNumber = getZoneNumber(request); unsigned int physicalPage = mapToPhysicalPage(volume->geometry, chapter, indexPageNumber); /* * Make sure the invalidate counter is updated before we try and read from * the page map. This prevents this thread from reading a page in the * page map which has already been marked for invalidation by the reader * thread, before the reader thread has noticed that the invalidateCounter * has been incremented. */ beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); CachedPage *page = NULL; int result = getPageProtected(volume, request, physicalPage, cacheProbeType(request, true), &page); if (result != UDS_SUCCESS) { endPendingSearch(volume->pageCache, zoneNumber); return result; } result = ASSERT_LOG_ONLY(searchPending(getInvalidateCounter(volume->pageCache, zoneNumber)), "Search is pending for zone %u", zoneNumber); if (result != UDS_SUCCESS) { return result; } result = searchChapterIndexPage(&page->cp_indexPage, volume->geometry, name, recordPageNumber); endPendingSearch(volume->pageCache, zoneNumber); return result; } /**********************************************************************/ int searchCachedRecordPage(Volume *volume, Request *request, const UdsChunkName *name, unsigned int chapter, int recordPageNumber, UdsChunkData *duplicate, bool *found) { *found = false; if (recordPageNumber == NO_CHAPTER_INDEX_ENTRY) { // No record for that name can exist in the chapter. return UDS_SUCCESS; } Geometry *geometry = volume->geometry; int result = ASSERT(((recordPageNumber >= 0) && ((unsigned int) recordPageNumber < geometry->recordPagesPerChapter)), "0 <= %d <= %u", recordPageNumber, geometry->recordPagesPerChapter); if (result != UDS_SUCCESS) { return result; } unsigned int pageNumber = geometry->indexPagesPerChapter + recordPageNumber; unsigned int zoneNumber = getZoneNumber(request); int physicalPage = mapToPhysicalPage(volume->geometry, chapter, pageNumber); /* * Make sure the invalidate counter is updated before we try and read from * the page map. This prevents this thread from reading a page in the page * map which has already been marked for invalidation by the reader thread, * before the reader thread has noticed that the invalidateCounter has been * incremented. */ beginPendingSearch(volume->pageCache, physicalPage, zoneNumber); CachedPage *recordPage; result = getPageProtected(volume, request, physicalPage, cacheProbeType(request, false), &recordPage); if (result != UDS_SUCCESS) { endPendingSearch(volume->pageCache, zoneNumber); return result; } if (searchRecordPage(getPageData(&recordPage->cp_pageData), name, geometry, duplicate)) { *found = true; } endPendingSearch(volume->pageCache, zoneNumber); return UDS_SUCCESS; } /**********************************************************************/ int readChapterIndexFromVolume(const Volume *volume, uint64_t virtualChapter, struct volume_page volumePages[], DeltaIndexPage indexPages[]) { const Geometry *geometry = volume->geometry; unsigned int physicalChapter = mapToPhysicalChapter(geometry, virtualChapter); int physicalPage = mapToPhysicalPage(geometry, physicalChapter, 0); prefetchVolumePages(&volume->volumeStore, physicalPage, geometry->indexPagesPerChapter); unsigned int i; struct volume_page volumePage; int result = initializeVolumePage(geometry, &volumePage); for (i = 0; i < geometry->indexPagesPerChapter; i++) { int result = readVolumePage(&volume->volumeStore, physicalPage + i, &volumePages[i]); if (result != UDS_SUCCESS) { break; } byte *indexPage = getPageData(&volumePages[i]); result = initChapterIndexPage(volume, indexPage, physicalChapter, i, &indexPages[i]); if (result != UDS_SUCCESS) { break; } } destroyVolumePage(&volumePage); return result; } /**********************************************************************/ int searchVolumePageCache(Volume *volume, Request *request, const UdsChunkName *name, uint64_t virtualChapter, UdsChunkData *metadata, bool *found) { unsigned int physicalChapter = mapToPhysicalChapter(volume->geometry, virtualChapter); unsigned int indexPageNumber; int result = findIndexPageNumber(volume->indexPageMap, name, physicalChapter, &indexPageNumber); if (result != UDS_SUCCESS) { return result; } int recordPageNumber; result = searchCachedIndexPage(volume, request, name, physicalChapter, indexPageNumber, &recordPageNumber); if (result == UDS_SUCCESS) { result = searchCachedRecordPage(volume, request, name, physicalChapter, recordPageNumber, metadata, found); } return result; } /**********************************************************************/ int forgetChapter(Volume *volume, uint64_t virtualChapter, InvalidationReason reason) { logDebug("forgetting chapter %llu", virtualChapter); unsigned int physicalChapter = mapToPhysicalChapter(volume->geometry, virtualChapter); lockMutex(&volume->readThreadsMutex); int result = invalidatePageCacheForChapter(volume->pageCache, physicalChapter, volume->geometry->pagesPerChapter, reason); unlockMutex(&volume->readThreadsMutex); return result; } /** * Donate index page data to the page cache for an index page that was just * written to the volume. The caller must already hold the reader thread * mutex. * * @param volume the volume * @param physicalChapter the physical chapter number of the index page * @param indexPageNumber the chapter page number of the index page * @param scratchPage the index page data **/ static int donateIndexPageLocked(Volume *volume, unsigned int physicalChapter, unsigned int indexPageNumber, struct volume_page *scratchPage) { unsigned int physicalPage = mapToPhysicalPage(volume->geometry, physicalChapter, indexPageNumber); // Find a place to put the page. CachedPage *page = NULL; int result = selectVictimInCache(volume->pageCache, &page); if (result != UDS_SUCCESS) { return result; } // Exchange the scratch page with the cache page swapVolumePages(&page->cp_pageData, scratchPage); result = initChapterIndexPage(volume, getPageData(&page->cp_pageData), physicalChapter, indexPageNumber, &page->cp_indexPage); if (result != UDS_SUCCESS) { logWarning("Error initialize chapter index page"); cancelPageInCache(volume->pageCache, physicalPage, page); return result; } result = putPageInCache(volume->pageCache, physicalPage, page); if (result != UDS_SUCCESS) { logWarning("Error putting page %u in cache", physicalPage); cancelPageInCache(volume->pageCache, physicalPage, page); return result; } return UDS_SUCCESS; } /**********************************************************************/ int writeIndexPages(Volume *volume, int physicalPage, OpenChapterIndex *chapterIndex, byte **pages) { Geometry *geometry = volume->geometry; unsigned int physicalChapterNumber = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber); unsigned int deltaListNumber = 0; unsigned int indexPageNumber; for (indexPageNumber = 0; indexPageNumber < geometry->indexPagesPerChapter; indexPageNumber++) { int result = prepareToWriteVolumePage(&volume->volumeStore, physicalPage + indexPageNumber, &volume->scratchPage); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to prepare index page"); } // Pack as many delta lists into the index page as will fit. unsigned int listsPacked; bool lastPage = ((indexPageNumber + 1) == geometry->indexPagesPerChapter); result = packOpenChapterIndexPage(chapterIndex, getPageData(&volume->scratchPage), deltaListNumber, lastPage, &listsPacked); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to pack index page"); } result = writeVolumePage(&volume->volumeStore, physicalPage + indexPageNumber, &volume->scratchPage); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to write chapter index page"); } if (pages != NULL) { memcpy(pages[indexPageNumber], getPageData(&volume->scratchPage), geometry->bytesPerPage); } // Tell the index page map the list number of the last delta list that was // packed into the index page. if (listsPacked == 0) { logDebug("no delta lists packed on chapter %u page %u", physicalChapterNumber, indexPageNumber); } else { deltaListNumber += listsPacked; } result = updateIndexPageMap(volume->indexPageMap, chapterIndex->virtualChapterNumber, physicalChapterNumber, indexPageNumber, deltaListNumber - 1); if (result != UDS_SUCCESS) { return logErrorWithStringError(result, "failed to update index page map"); } // Donate the page data for the index page to the page cache. lockMutex(&volume->readThreadsMutex); result = donateIndexPageLocked(volume, physicalChapterNumber, indexPageNumber, &volume->scratchPage); unlockMutex(&volume->readThreadsMutex); if (result != UDS_SUCCESS) { return result; } } return UDS_SUCCESS; } /**********************************************************************/ int writeRecordPages(Volume *volume, int physicalPage, const UdsChunkRecord records[], byte **pages) { Geometry *geometry = volume->geometry; // Skip over the index pages, which come before the record pages physicalPage += geometry->indexPagesPerChapter; // The record array from the open chapter is 1-based. const UdsChunkRecord *nextRecord = &records[1]; unsigned int recordPageNumber; for (recordPageNumber = 0; recordPageNumber < geometry->recordPagesPerChapter; recordPageNumber++) { int result = prepareToWriteVolumePage(&volume->volumeStore, physicalPage + recordPageNumber, &volume->scratchPage); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to prepare record page"); } // Sort the next page of records and copy them to the record page as a // binary tree stored in heap order. result = encodeRecordPage(volume, nextRecord, getPageData(&volume->scratchPage)); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to encode record page %u", recordPageNumber); } nextRecord += geometry->recordsPerPage; result = writeVolumePage(&volume->volumeStore, physicalPage + recordPageNumber, &volume->scratchPage); if (result != UDS_SUCCESS) { return logWarningWithStringError(result, "failed to write chapter record page"); } if (pages != NULL) { memcpy(pages[recordPageNumber], getPageData(&volume->scratchPage), geometry->bytesPerPage); } } return UDS_SUCCESS; } /**********************************************************************/ int writeChapter(Volume *volume, OpenChapterIndex *chapterIndex, const UdsChunkRecord records[]) { // Determine the position of the virtual chapter in the volume file. Geometry *geometry = volume->geometry; unsigned int physicalChapterNumber = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber); int physicalPage = mapToPhysicalPage(geometry, physicalChapterNumber, 0); // Pack and write the delta chapter index pages to the volume. int result = writeIndexPages(volume, physicalPage, chapterIndex, NULL); if (result != UDS_SUCCESS) { return result; } // Sort and write the record pages to the volume. result = writeRecordPages(volume, physicalPage, records, NULL); if (result != UDS_SUCCESS) { return result; } releaseVolumePage(&volume->scratchPage); // Flush the data to permanent storage. return syncVolumeStore(&volume->volumeStore); } /**********************************************************************/ size_t getCacheSize(Volume *volume) { size_t size = getPageCacheSize(volume->pageCache); if (isSparse(volume->geometry)) { size += getSparseCacheMemorySize(volume->sparseCache); } return size; } /**********************************************************************/ static int probeChapter(Volume *volume, unsigned int chapterNumber, uint64_t *virtualChapterNumber) { const Geometry *geometry = volume->geometry; unsigned int expectedListNumber = 0; uint64_t lastVCN = UINT64_MAX; prefetchVolumePages(&volume->volumeStore, mapToPhysicalPage(geometry, chapterNumber, 0), geometry->indexPagesPerChapter); unsigned int i; for (i = 0; i < geometry->indexPagesPerChapter; ++i) { DeltaIndexPage *page; int result = getPage(volume, chapterNumber, i, CACHE_PROBE_INDEX_FIRST, NULL, &page); if (result != UDS_SUCCESS) { return result; } uint64_t vcn = page->virtualChapterNumber; if (lastVCN == UINT64_MAX) { lastVCN = vcn; } else if (vcn != lastVCN) { logError("inconsistent chapter %u index page %u: expected vcn %" PRIu64 ", got vcn %llu", chapterNumber, i, lastVCN, vcn); return UDS_CORRUPT_COMPONENT; } if (expectedListNumber != page->lowestListNumber) { logError("inconsistent chapter %u index page %u: expected list number %u" ", got list number %u", chapterNumber, i, expectedListNumber, page->lowestListNumber); return UDS_CORRUPT_COMPONENT; } expectedListNumber = page->highestListNumber + 1; result = validateChapterIndexPage(page, geometry); if (result != UDS_SUCCESS) { return result; } } if (lastVCN == UINT64_MAX) { logError("no chapter %u virtual chapter number determined", chapterNumber); return UDS_CORRUPT_COMPONENT; } if (chapterNumber != lastVCN % geometry->chaptersPerVolume) { logError("chapter %u vcn %llu is out of phase (%u)", chapterNumber, lastVCN, geometry->chaptersPerVolume); return UDS_CORRUPT_COMPONENT; } *virtualChapterNumber = lastVCN; return UDS_SUCCESS; } /**********************************************************************/ static int probeWrapper(void *aux, unsigned int chapterNumber, uint64_t *virtualChapterNumber) { Volume *volume = aux; int result = probeChapter(volume, chapterNumber, virtualChapterNumber); if ((result == UDS_CORRUPT_COMPONENT) || (result == UDS_CORRUPT_DATA)) { *virtualChapterNumber = UINT64_MAX; return UDS_SUCCESS; } return result; } /**********************************************************************/ static int findRealEndOfVolume(Volume *volume, unsigned int limit, unsigned int *limitPtr) { /* * Start checking from the end of the volume. As long as we hit corrupt * data, start skipping larger and larger amounts until we find real data. * If we find real data, reduce the span and try again until we find * the exact boundary. */ unsigned int span = 1; unsigned int tries = 0; while (limit > 0) { unsigned int chapter = (span > limit) ? 0 : limit - span; uint64_t vcn = 0; int result = probeChapter(volume, chapter, &vcn); if (result == UDS_SUCCESS) { if (span == 1) { break; } span /= 2; tries = 0; } else if (result == UDS_CORRUPT_COMPONENT) { limit = chapter; if (++tries > 1) { span *= 2; } } else { return logErrorWithStringError(result, "cannot determine end of volume"); } } if (limitPtr != NULL) { *limitPtr = limit; } return UDS_SUCCESS; } /**********************************************************************/ int findVolumeChapterBoundaries(Volume *volume, uint64_t *lowestVCN, uint64_t *highestVCN, bool *isEmpty) { unsigned int chapterLimit = volume->geometry->chaptersPerVolume; int result = findRealEndOfVolume(volume, chapterLimit, &chapterLimit); if (result != UDS_SUCCESS) { return logErrorWithStringError(result, "cannot find end of volume"); } if (chapterLimit == 0) { *lowestVCN = 0; *highestVCN = 0; *isEmpty = true; return UDS_SUCCESS; } *isEmpty = false; return findVolumeChapterBoundariesImpl(chapterLimit, MAX_BAD_CHAPTERS, lowestVCN, highestVCN, probeWrapper, volume); } /**********************************************************************/ int findVolumeChapterBoundariesImpl(unsigned int chapterLimit, unsigned int maxBadChapters, uint64_t *lowestVCN, uint64_t *highestVCN, int (*probeFunc)(void *aux, unsigned int chapter, uint64_t *vcn), void *aux) { if (chapterLimit == 0) { *lowestVCN = 0; *highestVCN = 0; return UDS_SUCCESS; } /* * This method assumes there is at most one run of contiguous bad chapters * caused by unflushed writes. Either the bad spot is at the beginning and * end, or somewhere in the middle. Wherever it is, the highest and lowest * VCNs are adjacent to it. Otherwise the volume is cleanly saved and * somewhere in the middle of it the highest VCN immediately preceeds the * lowest one. */ uint64_t firstVCN = UINT64_MAX; // doesn't matter if this results in a bad spot (UINT64_MAX) int result = (*probeFunc)(aux, 0, &firstVCN); if (result != UDS_SUCCESS) { return UDS_SUCCESS; } /* * Binary search for end of the discontinuity in the monotonically * increasing virtual chapter numbers; bad spots are treated as a span of * UINT64_MAX values. In effect we're searching for the index of the * smallest value less than firstVCN. In the case we go off the end it means * that chapter 0 has the lowest vcn. */ unsigned int leftChapter = 0; unsigned int rightChapter = chapterLimit; while (leftChapter < rightChapter) { unsigned int chapter = (leftChapter + rightChapter) / 2; uint64_t probeVCN; result = (*probeFunc)(aux, chapter, &probeVCN); if (result != UDS_SUCCESS) { return result; } if (firstVCN <= probeVCN) { leftChapter = chapter + 1; } else { rightChapter = chapter; } } uint64_t lowest = UINT64_MAX; uint64_t highest = UINT64_MAX; result = ASSERT(leftChapter == rightChapter, "leftChapter == rightChapter"); if (result != UDS_SUCCESS) { return result; } leftChapter %= chapterLimit; // in case we're at the end // At this point, leftChapter is the chapter with the lowest virtual chapter // number. result = (*probeFunc)(aux, leftChapter, &lowest); if (result != UDS_SUCCESS) { return result; } result = ASSERT((lowest != UINT64_MAX), "invalid lowest chapter"); if (result != UDS_SUCCESS) { return result; } // We now circularly scan backwards, moving over any bad chapters until we // find the chapter with the highest vcn (the first good chapter we // encounter). unsigned int badChapters = 0; for (;;) { rightChapter = (rightChapter + chapterLimit - 1) % chapterLimit; result = (*probeFunc)(aux, rightChapter, &highest); if (result != UDS_SUCCESS) { return result; } if (highest != UINT64_MAX) { break; } if (++badChapters >= maxBadChapters) { logError("too many bad chapters in volume: %u", badChapters); return UDS_CORRUPT_COMPONENT; } } *lowestVCN = lowest; *highestVCN = highest; return UDS_SUCCESS; } /** * Allocate a volume. * * @param config The configuration to use * @param layout The index layout * @param readQueueMaxSize The maximum size of the read queue * @param zoneCount The number of zones to use * @param newVolume A pointer to hold the new volume * * @return UDS_SUCCESS or an error code **/ __attribute__((warn_unused_result)) static int allocateVolume(const Configuration *config, IndexLayout *layout, unsigned int readQueueMaxSize, unsigned int zoneCount, Volume **newVolume) { Volume *volume; int result = ALLOCATE(1, Volume, "volume", &volume); if (result != UDS_SUCCESS) { return result; } volume->nonce = getVolumeNonce(layout); // It is safe to call freeVolume now to clean up and close the volume result = copyGeometry(config->geometry, &volume->geometry); if (result != UDS_SUCCESS) { freeVolume(volume); return logWarningWithStringError(result, "failed to allocate geometry: error"); } // Need a buffer for each entry in the page cache unsigned int reservedBuffers = config->cacheChapters * config->geometry->recordPagesPerChapter; // And a buffer for the chapter writer reservedBuffers += 1; // And a buffer for each entry in the sparse cache if (isSparse(volume->geometry)) { reservedBuffers += config->cacheChapters * config->geometry->indexPagesPerChapter; } result = openVolumeStore(&volume->volumeStore, layout, reservedBuffers, config->geometry->bytesPerPage); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = initializeVolumePage(config->geometry, &volume->scratchPage); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = makeRadixSorter(config->geometry->recordsPerPage, &volume->radixSorter); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = ALLOCATE(config->geometry->recordsPerPage, const UdsChunkRecord *, "record pointers", &volume->recordPointers); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } if (isSparse(volume->geometry)) { result = makeSparseCache(volume->geometry, config->cacheChapters, zoneCount, &volume->sparseCache); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } } result = makePageCache(volume->geometry, config->cacheChapters, readQueueMaxSize, zoneCount, &volume->pageCache); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = makeIndexPageMap(volume->geometry, &volume->indexPageMap); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } *newVolume = volume; return UDS_SUCCESS; } /**********************************************************************/ int makeVolume(const Configuration *config, IndexLayout *layout, const struct uds_parameters *userParams, unsigned int readQueueMaxSize, unsigned int zoneCount, Volume **newVolume) { unsigned int volumeReadThreads = getReadThreads(userParams); if (readQueueMaxSize <= volumeReadThreads) { logError("Number of read threads must be smaller than read queue"); return UDS_INVALID_ARGUMENT; } Volume *volume = NULL; int result = allocateVolume(config, layout, readQueueMaxSize, zoneCount, &volume); if (result != UDS_SUCCESS) { return result; } result = initMutex(&volume->readThreadsMutex); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = initCond(&volume->readThreadsReadDoneCond); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } result = initCond(&volume->readThreadsCond); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } // Start the reader threads. If this allocation succeeds, freeVolume knows // that it needs to try and stop those threads. result = ALLOCATE(volumeReadThreads, Thread, "reader threads", &volume->readerThreads); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } unsigned int i; for (i = 0; i < volumeReadThreads; i++) { result = createThread(readThreadFunction, (void *) volume, "reader", &volume->readerThreads[i]); if (result != UDS_SUCCESS) { freeVolume(volume); return result; } // We only stop as many threads as actually got started. volume->numReadThreads = i + 1; } *newVolume = volume; return UDS_SUCCESS; } /**********************************************************************/ void freeVolume(Volume *volume) { if (volume == NULL) { return; } // If readerThreads is NULL, then we haven't set up the reader threads. if (volume->readerThreads != NULL) { // Stop the reader threads. It is ok if there aren't any of them. lockMutex(&volume->readThreadsMutex); volume->readerState |= READER_STATE_EXIT; broadcastCond(&volume->readThreadsCond); unlockMutex(&volume->readThreadsMutex); unsigned int i; for (i = 0; i < volume->numReadThreads; i++) { joinThreads(volume->readerThreads[i]); } FREE(volume->readerThreads); volume->readerThreads = NULL; } // Must close the volume store AFTER freeing the scratch page and the caches destroyVolumePage(&volume->scratchPage); freePageCache(volume->pageCache); freeSparseCache(volume->sparseCache); closeVolumeStore(&volume->volumeStore); destroyCond(&volume->readThreadsCond); destroyCond(&volume->readThreadsReadDoneCond); destroyMutex(&volume->readThreadsMutex); freeIndexPageMap(volume->indexPageMap); freeRadixSorter(volume->radixSorter); FREE(volume->geometry); FREE(volume->recordPointers); FREE(volume); }