Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/volume.c#23 $
 */

#include "volume.h"

#include "cacheCounters.h"
#include "chapterIndex.h"
#include "compiler.h"
#include "errors.h"
#include "geometry.h"
#include "hashUtils.h"
#include "indexConfig.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "permassert.h"
#include "recordPage.h"
#include "request.h"
#include "sparseCache.h"
#include "stringUtils.h"
#include "threads.h"

enum {
  MAX_BAD_CHAPTERS = 100,           // max number of contiguous bad chapters
  DEFAULT_VOLUME_READ_THREADS = 2,  // Default number of reader threads
  MAX_VOLUME_READ_THREADS = 16,     // Maximum number of reader threads
};

/**********************************************************************/
static unsigned int getReadThreads(const struct uds_parameters *userParams)
{
  unsigned int readThreads = (userParams == NULL
                              ? DEFAULT_VOLUME_READ_THREADS
                              : userParams->read_threads);
  if (readThreads < 1) {
    readThreads = 1;
  }
  if (readThreads > MAX_VOLUME_READ_THREADS) {
    readThreads = MAX_VOLUME_READ_THREADS;
  }
  return readThreads;
}

/**********************************************************************/
static INLINE unsigned int mapToPageNumber(Geometry     *geometry,
                                           unsigned int  physicalPage)
{
  return ((physicalPage - 1) % geometry->pagesPerChapter);
}

/**********************************************************************/
static INLINE unsigned int mapToChapterNumber(Geometry     *geometry,
                                              unsigned int  physicalPage)
{
  return ((physicalPage - 1) / geometry->pagesPerChapter);
}

/**********************************************************************/
static INLINE bool isRecordPage(Geometry *geometry, unsigned int physicalPage)
{
  return (((physicalPage - 1) % geometry->pagesPerChapter)
          >= geometry->indexPagesPerChapter);
}

/**********************************************************************/
static INLINE unsigned int getZoneNumber(Request *request)
{
  return (request == NULL) ? 0 : request->zoneNumber;
}

/**********************************************************************/
int mapToPhysicalPage(const Geometry *geometry, int chapter, int page)
{
  // Page zero is the header page, so the first index page in the
  // first chapter is physical page one.
  return (1 + (geometry->pagesPerChapter * chapter) + page);
}

/**********************************************************************/
static void waitForReadQueueNotFull(Volume *volume, Request *request)
{
  unsigned int zoneNumber = getZoneNumber(request);
  InvalidateCounter invalidateCounter = getInvalidateCounter(volume->pageCache,
                                                             zoneNumber);
  if (searchPending(invalidateCounter)) {
    // Increment the invalidate counter to avoid deadlock where the reader
    // threads cannot make progress because they are waiting on the counter
    // and the index thread cannot because the read queue is full.
    endPendingSearch(volume->pageCache, zoneNumber);
  }

  while (readQueueIsFull(volume->pageCache)) {
    logDebug("Waiting until read queue not full");
    signalCond(&volume->readThreadsCond);
    waitCond(&volume->readThreadsReadDoneCond, &volume->readThreadsMutex);
  }

  if (searchPending(invalidateCounter)) {
    // Increment again so we get back to an odd value.
    beginPendingSearch(volume->pageCache, pageBeingSearched(invalidateCounter),
                       zoneNumber);
  }
}

/**********************************************************************/
int enqueuePageRead(Volume *volume, Request *request, int physicalPage)
{
  // Don't allow new requests if we are shutting down, but make sure
  // to process any requests that are still in the pipeline.
  if ((volume->readerState & READER_STATE_EXIT) != 0) {
    logInfo("failed to queue read while shutting down");
    return UDS_SHUTTINGDOWN;
  }

  // Mark the page as queued in the volume cache, for chapter invalidation to
  // be able to cancel a read.
  // If we are unable to do this because the queues are full, flush them first
  int result;
  while ((result = enqueueRead(volume->pageCache, request, physicalPage))
         == UDS_SUCCESS) {
    logDebug("Read queues full, waiting for reads to finish");
    waitForReadQueueNotFull(volume, request);
  }

  if (result == UDS_QUEUED) {
    /* signal a read thread */
    signalCond(&volume->readThreadsCond);
  }

  return result;
}

/**********************************************************************/
static INLINE void waitToReserveReadQueueEntry(Volume        *volume,
                                               unsigned int  *queuePos,
                                               Request      **requestList,
                                               unsigned int  *physicalPage,
                                               bool          *invalid)
{
  while (((volume->readerState & READER_STATE_EXIT) == 0)
         && (((volume->readerState & READER_STATE_STOP) != 0)
             || !reserveReadQueueEntry(volume->pageCache, queuePos,
                                       requestList, physicalPage, invalid))) {
    waitCond(&volume->readThreadsCond, &volume->readThreadsMutex);
  }
}

/**********************************************************************/
static int initChapterIndexPage(const Volume   *volume,
                                byte           *indexPage,
                                unsigned int    chapter,
                                unsigned int    indexPageNumber,
                                DeltaIndexPage *chapterIndexPage)
{
  Geometry *geometry = volume->geometry;

  int result = initializeChapterIndexPage(chapterIndexPage, geometry,
                                          indexPage, volume->nonce);
  if (volume->lookupMode == LOOKUP_FOR_REBUILD) {
    return result;
  }
  if (result != UDS_SUCCESS) {
    return logErrorWithStringError(result,
                                   "Reading chapter index page for chapter %u"
                                   " page %u",
                                   chapter, indexPageNumber);
  }

  IndexPageBounds bounds;
  result = getListNumberBounds(volume->indexPageMap, chapter,
                               indexPageNumber, &bounds);
  if (result != UDS_SUCCESS) {
    return result;
  }

  uint64_t     ciVirtual = chapterIndexPage->virtualChapterNumber;
  unsigned int ciChapter = mapToPhysicalChapter(geometry, ciVirtual);
  if ((chapter == ciChapter)
      && (bounds.lowestList == chapterIndexPage->lowestListNumber)
      && (bounds.highestList == chapterIndexPage->highestListNumber)) {
    return UDS_SUCCESS;
  }

  logWarning("Index page map updated to %llu",
             getLastUpdate(volume->indexPageMap));
  logWarning("Page map expects that chapter %u page %u has range %u to %u, "
             "but chapter index page has chapter %" PRIu64
             " with range %u to %u",
             chapter, indexPageNumber, bounds.lowestList, bounds.highestList,
             ciVirtual, chapterIndexPage->lowestListNumber,
             chapterIndexPage->highestListNumber);
  return ASSERT_WITH_ERROR_CODE(false,
                                UDS_CORRUPT_DATA,
                                "index page map mismatch with chapter index");
}

/**********************************************************************/
static int initializeIndexPage(const Volume *volume,
                               unsigned int  physicalPage,
                               CachedPage   *page)
{
  unsigned int chapter = mapToChapterNumber(volume->geometry, physicalPage);
  unsigned int indexPageNumber = mapToPageNumber(volume->geometry,
                                                 physicalPage);
  int result = initChapterIndexPage(volume, getPageData(&page->cp_pageData),
                                    chapter, indexPageNumber,
                                    &page->cp_indexPage);
  return result;
}

/**********************************************************************/
static void readThreadFunction(void *arg)
{
  Volume       *volume  = arg;
  unsigned int  queuePos;
  Request      *requestList;
  unsigned int  physicalPage;
  bool          invalid = false;

  logDebug("reader starting");
  lockMutex(&volume->readThreadsMutex);
  while (true) {
    waitToReserveReadQueueEntry(volume, &queuePos, &requestList, &physicalPage,
                                &invalid);
    if ((volume->readerState & READER_STATE_EXIT) != 0) {
      break;
    }

    volume->busyReaderThreads++;

    bool recordPage = isRecordPage(volume->geometry, physicalPage);

    CachedPage *page = NULL;
    int result = UDS_SUCCESS;
    if (!invalid) {
      // Find a place to put the read queue page we reserved above.
      result = selectVictimInCache(volume->pageCache, &page);
      if (result == UDS_SUCCESS) {
        unlockMutex(&volume->readThreadsMutex);
        result = readVolumePage(&volume->volumeStore, physicalPage,
                                &page->cp_pageData);
        if (result != UDS_SUCCESS) {
          logWarning("Error reading page %u from volume", physicalPage);
          cancelPageInCache(volume->pageCache, physicalPage, page);
        }
        lockMutex(&volume->readThreadsMutex);
      } else {
        logWarning("Error selecting cache victim for page read");
      }

      if (result == UDS_SUCCESS) {
        if (!volume->pageCache->readQueue[queuePos].invalid) {
          if (!recordPage) {
            result = initializeIndexPage(volume, physicalPage, page);
            if (result != UDS_SUCCESS) {
              logWarning("Error initializing chapter index page");
              cancelPageInCache(volume->pageCache, physicalPage, page);
            }
          }

          if (result == UDS_SUCCESS) {
            result = putPageInCache(volume->pageCache, physicalPage, page);
            if (result != UDS_SUCCESS) {
              logWarning("Error putting page %u in cache", physicalPage);
              cancelPageInCache(volume->pageCache, physicalPage, page);
            }
          }
        } else {
          logWarning("Page %u invalidated after read", physicalPage);
          cancelPageInCache(volume->pageCache, physicalPage, page);
          invalid = true;
        }
      }
    } else {
      logDebug("Requeuing requests for invalid page");
    }

    if (invalid) {
      result = UDS_SUCCESS;
      page = NULL;
    }

    while (requestList != NULL) {
      Request *request = requestList;
      requestList = request->nextRequest;

      /*
       * If we've read in a record page, we're going to do an immediate search,
       * in an attempt to speed up processing when we requeue the request, so
       * that it doesn't have to go back into the getRecordFromZone code again.
       * However, if we've just read in an index page, we don't want to search.
       * We want the request to be processed again and getRecordFromZone to be
       * run.  We have added new fields in request to allow the index code to
       * know whether it can stop processing before getRecordFromZone is called
       * again.
       */
      if ((result == UDS_SUCCESS) && (page != NULL) && recordPage) {
        if (searchRecordPage(getPageData(&page->cp_pageData),
                             &request->chunkName, volume->geometry,
                             &request->oldMetadata)) {
          request->slLocation = LOC_IN_DENSE;
        } else {
          request->slLocation = LOC_UNAVAILABLE;
        }
        request->slLocationKnown = true;
      }

      // reflect any read failures in the request status
      request->status = result;
      restartRequest(request);
    }

    releaseReadQueueEntry(volume->pageCache, queuePos);

    volume->busyReaderThreads--;
    broadcastCond(&volume->readThreadsReadDoneCond);
  }
  unlockMutex(&volume->readThreadsMutex);
  logDebug("reader done");
}

/**********************************************************************/
static int readPageLocked(Volume        *volume,
                          Request       *request,
                          unsigned int   physicalPage,
                          bool           syncRead,
                          CachedPage   **pagePtr)
{
  syncRead |= ((volume->lookupMode == LOOKUP_FOR_REBUILD)
               || (request == NULL)
               || (request->session == NULL));

  int result = UDS_SUCCESS;

  CachedPage *page = NULL;
  if (syncRead) {
    // Find a place to put the page.
    result = selectVictimInCache(volume->pageCache, &page);
    if (result != UDS_SUCCESS) {
      logWarning("Error selecting cache victim for page read");
      return result;
    }
    result = readVolumePage(&volume->volumeStore, physicalPage,
                            &page->cp_pageData);
    if (result != UDS_SUCCESS) {
      logWarning("Error reading page %u from volume", physicalPage);
      cancelPageInCache(volume->pageCache, physicalPage, page);
      return result;
    }
    if (!isRecordPage(volume->geometry, physicalPage)) {
      result = initializeIndexPage(volume, physicalPage, page);
      if (result != UDS_SUCCESS) {
        if (volume->lookupMode != LOOKUP_FOR_REBUILD) {
          logWarning("Corrupt index page %u", physicalPage);
        }
        cancelPageInCache(volume->pageCache, physicalPage, page);
        return result;
      }
    }
    result = putPageInCache(volume->pageCache, physicalPage, page);
    if (result != UDS_SUCCESS) {
      logWarning("Error putting page %u in cache", physicalPage);
      cancelPageInCache(volume->pageCache, physicalPage, page);
      return result;
    }
  } else {
    result = enqueuePageRead(volume, request, physicalPage);
    if (result != UDS_SUCCESS) {
      return result;
    }
  }

  *pagePtr = page;

  return UDS_SUCCESS;
}

/**********************************************************************/
int getPageLocked(Volume          *volume,
                  Request         *request,
                  unsigned int     physicalPage,
                  CacheProbeType   probeType,
                  CachedPage     **pagePtr)
{
  CachedPage *page = NULL;
  int result = getPageFromCache(volume->pageCache, physicalPage, probeType,
                                &page);
  if (result != UDS_SUCCESS) {
    return result;
  }
  if (page == NULL) {
    result = readPageLocked(volume, request, physicalPage, true, &page);
    if (result != UDS_SUCCESS) {
      return result;
    }
  } else if (getZoneNumber(request) == 0) {
    // Only 1 zone is responsible for updating LRU
    makePageMostRecent(volume->pageCache, page);
  }

  *pagePtr = page;
  return UDS_SUCCESS;
}

/**********************************************************************/
int getPageProtected(Volume          *volume,
                     Request         *request,
                     unsigned int     physicalPage,
                     CacheProbeType   probeType,
                     CachedPage     **pagePtr)
{
  CachedPage *page = NULL;
  int result = getPageFromCache(volume->pageCache, physicalPage,
                                probeType | CACHE_PROBE_IGNORE_FAILURE,
                                &page);
  if (result != UDS_SUCCESS) {
    return result;
  }

  unsigned int zoneNumber = getZoneNumber(request);
  // If we didn't find a page we need to enqueue a read for it, in which
  // case we need to grab the mutex.
  if (page == NULL) {
    endPendingSearch(volume->pageCache, zoneNumber);
    lockMutex(&volume->readThreadsMutex);

    /*
     * Do the lookup again while holding the read mutex (no longer the fast
     * case so this should be ok to repeat). We need to do this because an
     * page may have been added to the page map by the reader thread between
     * the time searched above and the time we went to actually try to enqueue
     * it below. This could result in us enqueuing another read for an page
     * which is already in the cache, which would mean we end up with two
     * entries in the cache for the same page.
     */
    result
      = getPageFromCache(volume->pageCache, physicalPage, probeType, &page);
    if (result != UDS_SUCCESS) {
      /*
       * In non-success cases (anything not UDS_SUCCESS, meaning both
       * UDS_QUEUED and "real" errors), the caller doesn't get a
       * handle on a cache page, so it can't continue the search, and
       * we don't need to prevent other threads from messing with the
       * cache.
       *
       * However, we do need to set the "search pending" flag because
       * the callers expect it to always be set on return, even if
       * they can't actually do the search.
       *
       * Doing the calls in this order ought to be faster, since we
       * let other threads have the reader thread mutex (which can
       * require a syscall) ASAP, and set the "search pending" state
       * that can block the reader thread as the last thing.
       */
      unlockMutex(&volume->readThreadsMutex);
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
      return result;
    }

    // If we found the page now, we can release the mutex and proceed
    // as if this were the fast case.
    if (page != NULL) {
      /*
       * If we found a page (*pagePtr != NULL and return
       * UDS_SUCCESS), then we're telling the caller where to look for
       * the cache page, and need to switch to "reader thread
       * unlocked" and "search pending" state in careful order so no
       * other thread can mess with the data before our caller gets to
       * look at it.
       */
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
      unlockMutex(&volume->readThreadsMutex);
    }
  }

  if (page == NULL) {
    result = readPageLocked(volume, request, physicalPage, false, &page);
    if (result != UDS_SUCCESS) {
      /*
       * This code path is used frequently in the UDS_QUEUED case, so
       * the performance gain from unlocking first, while "search
       * pending" mode is off, turns out to be significant in some
       * cases.
       */
      unlockMutex(&volume->readThreadsMutex);
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
      return result;
    }

    // See above re: ordering requirement.
    beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
    unlockMutex(&volume->readThreadsMutex);
  } else {
    if (getZoneNumber(request) == 0 ) {
      // Only 1 zone is responsible for updating LRU
      makePageMostRecent(volume->pageCache, page);
    }
  }

  *pagePtr = page;
  return UDS_SUCCESS;
}

/**********************************************************************/
int getPage(Volume          *volume,
            unsigned int     chapter,
            unsigned int     pageNumber,
            CacheProbeType   probeType,
            byte           **dataPtr,
            DeltaIndexPage **indexPagePtr)
{
  unsigned int physicalPage
    = mapToPhysicalPage(volume->geometry, chapter, pageNumber);

  lockMutex(&volume->readThreadsMutex);
  CachedPage *page = NULL;
  int result = getPageLocked(volume, NULL, physicalPage, probeType, &page);
  unlockMutex(&volume->readThreadsMutex);

  if (dataPtr != NULL) {
    *dataPtr = (page != NULL) ? getPageData(&page->cp_pageData) : NULL;
  }
  if (indexPagePtr != NULL) {
    *indexPagePtr = (page != NULL) ? &page->cp_indexPage : NULL;
  }
  return result;
}

/**
 * Search for a chunk name in a cached index page or chapter index, returning
 * the record page number from a chapter index match.
 *
 * @param volume           the volume containing the index page to search
 * @param request          the request originating the search (may be NULL for
 *                         a direct query from volume replay)
 * @param name             the name of the block or chunk
 * @param chapter          the chapter to search
 * @param indexPageNumber  the index page number of the page to search
 * @param recordPageNumber pointer to return the chapter record page number
 *                         (value will be NO_CHAPTER_INDEX_ENTRY if the name
 *                         was not found)
 *
 * @return UDS_SUCCESS or an error code
 **/
static int searchCachedIndexPage(Volume             *volume,
                                 Request            *request,
                                 const UdsChunkName *name,
                                 unsigned int        chapter,
                                 unsigned int        indexPageNumber,
                                 int                *recordPageNumber)
{
  unsigned int zoneNumber = getZoneNumber(request);
  unsigned int physicalPage
    = mapToPhysicalPage(volume->geometry, chapter, indexPageNumber);

  /*
   * Make sure the invalidate counter is updated before we try and read from
   * the page map.  This prevents this thread from reading a page in the
   * page map which has already been marked for invalidation by the reader
   * thread, before the reader thread has noticed that the invalidateCounter
   * has been incremented.
   */
  beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);

  CachedPage *page = NULL;
  int result = getPageProtected(volume, request, physicalPage,
                                cacheProbeType(request, true), &page);
  if (result != UDS_SUCCESS) {
    endPendingSearch(volume->pageCache, zoneNumber);
    return result;
  }

  result
    = ASSERT_LOG_ONLY(searchPending(getInvalidateCounter(volume->pageCache,
                                                         zoneNumber)),
                      "Search is pending for zone %u", zoneNumber);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = searchChapterIndexPage(&page->cp_indexPage, volume->geometry, name,
                                  recordPageNumber);
  endPendingSearch(volume->pageCache, zoneNumber);
  return result;
}

/**********************************************************************/
int searchCachedRecordPage(Volume             *volume,
                           Request            *request,
                           const UdsChunkName *name,
                           unsigned int        chapter,
                           int                 recordPageNumber,
                           UdsChunkData       *duplicate,
                           bool               *found)
{
  *found = false;

  if (recordPageNumber == NO_CHAPTER_INDEX_ENTRY) {
    // No record for that name can exist in the chapter.
    return UDS_SUCCESS;
  }

  Geometry *geometry = volume->geometry;
  int result = ASSERT(((recordPageNumber >= 0)
                       && ((unsigned int) recordPageNumber
                           < geometry->recordPagesPerChapter)),
                      "0 <= %d <= %u",
                      recordPageNumber, geometry->recordPagesPerChapter);
  if (result != UDS_SUCCESS) {
    return result;
  }

  unsigned int pageNumber = geometry->indexPagesPerChapter + recordPageNumber;

  unsigned int zoneNumber = getZoneNumber(request);
  int physicalPage
    = mapToPhysicalPage(volume->geometry, chapter, pageNumber);

  /*
   * Make sure the invalidate counter is updated before we try and read from
   * the page map. This prevents this thread from reading a page in the page
   * map which has already been marked for invalidation by the reader thread,
   * before the reader thread has noticed that the invalidateCounter has been
   * incremented.
   */
  beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);

  CachedPage *recordPage;
  result = getPageProtected(volume, request, physicalPage,
                            cacheProbeType(request, false), &recordPage);
  if (result != UDS_SUCCESS) {
    endPendingSearch(volume->pageCache, zoneNumber);
    return result;
  }

  if (searchRecordPage(getPageData(&recordPage->cp_pageData), name, geometry,
                       duplicate)) {
    *found = true;
  }
  endPendingSearch(volume->pageCache, zoneNumber);
  return UDS_SUCCESS;
}

/**********************************************************************/
int readChapterIndexFromVolume(const Volume       *volume,
                               uint64_t            virtualChapter,
                               struct volume_page  volumePages[],
                               DeltaIndexPage      indexPages[])
{
  const Geometry *geometry = volume->geometry;
  unsigned int physicalChapter = mapToPhysicalChapter(geometry,
                                                      virtualChapter);
  int physicalPage = mapToPhysicalPage(geometry, physicalChapter, 0);
  prefetchVolumePages(&volume->volumeStore, physicalPage,
                      geometry->indexPagesPerChapter);

  unsigned int i;
  struct volume_page volumePage;
  int result = initializeVolumePage(geometry, &volumePage);
  for (i = 0; i < geometry->indexPagesPerChapter; i++) {
    int result = readVolumePage(&volume->volumeStore, physicalPage + i,
                                &volumePages[i]);
    if (result != UDS_SUCCESS) {
      break;
    }
    byte *indexPage = getPageData(&volumePages[i]);
    result = initChapterIndexPage(volume, indexPage, physicalChapter, i,
                                  &indexPages[i]);
    if (result != UDS_SUCCESS) {
      break;
    }
  }
  destroyVolumePage(&volumePage);
  return result;
}

/**********************************************************************/
int searchVolumePageCache(Volume             *volume,
                          Request            *request,
                          const UdsChunkName *name,
                          uint64_t            virtualChapter,
                          UdsChunkData       *metadata,
                          bool               *found)
{
  unsigned int physicalChapter
    = mapToPhysicalChapter(volume->geometry, virtualChapter);
  unsigned int indexPageNumber;
  int result = findIndexPageNumber(volume->indexPageMap, name, physicalChapter,
                                   &indexPageNumber);
  if (result != UDS_SUCCESS) {
    return result;
  }

  int recordPageNumber;
  result = searchCachedIndexPage(volume, request, name, physicalChapter,
                                 indexPageNumber, &recordPageNumber);
  if (result == UDS_SUCCESS) {
    result = searchCachedRecordPage(volume, request, name, physicalChapter,
                                    recordPageNumber, metadata, found);
  }

  return result;
}

/**********************************************************************/
int forgetChapter(Volume             *volume,
                  uint64_t            virtualChapter,
                  InvalidationReason  reason)
{
  logDebug("forgetting chapter %llu", virtualChapter);
  unsigned int physicalChapter
    = mapToPhysicalChapter(volume->geometry, virtualChapter);
  lockMutex(&volume->readThreadsMutex);
  int result
    = invalidatePageCacheForChapter(volume->pageCache, physicalChapter,
                                    volume->geometry->pagesPerChapter,
                                    reason);
  unlockMutex(&volume->readThreadsMutex);
  return result;
}

/**
 * Donate index page data to the page cache for an index page that was just
 * written to the volume.  The caller must already hold the reader thread
 * mutex.
 *
 * @param volume           the volume
 * @param physicalChapter  the physical chapter number of the index page
 * @param indexPageNumber  the chapter page number of the index page
 * @param scratchPage      the index page data
 **/
static int donateIndexPageLocked(Volume             *volume,
                                 unsigned int        physicalChapter,
                                 unsigned int        indexPageNumber,
                                 struct volume_page *scratchPage)
{
  unsigned int physicalPage
    = mapToPhysicalPage(volume->geometry, physicalChapter, indexPageNumber);

  // Find a place to put the page.
  CachedPage *page = NULL;
  int result = selectVictimInCache(volume->pageCache, &page);
  if (result != UDS_SUCCESS) {
    return result;
  }

  // Exchange the scratch page with the cache page
  swapVolumePages(&page->cp_pageData, scratchPage);
  
  result = initChapterIndexPage(volume, getPageData(&page->cp_pageData),
                                physicalChapter, indexPageNumber,
                                &page->cp_indexPage);
  if (result != UDS_SUCCESS) {
    logWarning("Error initialize chapter index page");
    cancelPageInCache(volume->pageCache, physicalPage, page);
    return result;
  }

  result = putPageInCache(volume->pageCache, physicalPage, page);
  if (result != UDS_SUCCESS) {
    logWarning("Error putting page %u in cache", physicalPage);
    cancelPageInCache(volume->pageCache, physicalPage, page);
    return result;
  }

  return UDS_SUCCESS;
}

/**********************************************************************/
int writeIndexPages(Volume            *volume,
                    int                physicalPage,
                    OpenChapterIndex  *chapterIndex,
                    byte             **pages)
{
  Geometry *geometry = volume->geometry;
  unsigned int physicalChapterNumber
    = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber);
  unsigned int deltaListNumber = 0;

  unsigned int indexPageNumber;
  for (indexPageNumber = 0;
       indexPageNumber < geometry->indexPagesPerChapter;
       indexPageNumber++) {
    int result = prepareToWriteVolumePage(&volume->volumeStore,
                                          physicalPage + indexPageNumber,
                                          &volume->scratchPage);
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result, "failed to prepare index page");
    }

    // Pack as many delta lists into the index page as will fit.
    unsigned int listsPacked;
    bool lastPage = ((indexPageNumber + 1) == geometry->indexPagesPerChapter);
    result = packOpenChapterIndexPage(chapterIndex,
                                      getPageData(&volume->scratchPage),
                                      deltaListNumber, lastPage, &listsPacked);
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result, "failed to pack index page");
    }

    result = writeVolumePage(&volume->volumeStore,
                             physicalPage + indexPageNumber,
                             &volume->scratchPage);
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result,
                                       "failed to write chapter index page");
    }

    if (pages != NULL) {
      memcpy(pages[indexPageNumber], getPageData(&volume->scratchPage),
             geometry->bytesPerPage);
    }

    // Tell the index page map the list number of the last delta list that was
    // packed into the index page.
    if (listsPacked == 0) {
      logDebug("no delta lists packed on chapter %u page %u",
               physicalChapterNumber, indexPageNumber);
    } else {
      deltaListNumber += listsPacked;
    }
    result = updateIndexPageMap(volume->indexPageMap,
                                chapterIndex->virtualChapterNumber,
                                physicalChapterNumber,
                                indexPageNumber, deltaListNumber - 1);
    if (result != UDS_SUCCESS) {
      return logErrorWithStringError(result,
                                     "failed to update index page map");
    }

    // Donate the page data for the index page to the page cache.
    lockMutex(&volume->readThreadsMutex);
    result = donateIndexPageLocked(volume, physicalChapterNumber,
                                   indexPageNumber, &volume->scratchPage);
    unlockMutex(&volume->readThreadsMutex);
    if (result != UDS_SUCCESS) {
      return result;
    }
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
int writeRecordPages(Volume                *volume,
                     int                    physicalPage,
                     const UdsChunkRecord   records[],
                     byte                 **pages)
{
  Geometry *geometry = volume->geometry;
  // Skip over the index pages, which come before the record pages
  physicalPage += geometry->indexPagesPerChapter;
  // The record array from the open chapter is 1-based.
  const UdsChunkRecord *nextRecord = &records[1];

  unsigned int recordPageNumber;
  for (recordPageNumber = 0;
       recordPageNumber < geometry->recordPagesPerChapter;
       recordPageNumber++) {
    int result = prepareToWriteVolumePage(&volume->volumeStore,
                                          physicalPage + recordPageNumber,
                                          &volume->scratchPage);
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result,
                                       "failed to prepare record page");
    }

    // Sort the next page of records and copy them to the record page as a
    // binary tree stored in heap order.
    result = encodeRecordPage(volume, nextRecord,
                              getPageData(&volume->scratchPage));
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result,
                                       "failed to encode record page %u",
                                       recordPageNumber);
    }
    nextRecord += geometry->recordsPerPage;

    result = writeVolumePage(&volume->volumeStore,
                             physicalPage + recordPageNumber,
                             &volume->scratchPage);
    if (result != UDS_SUCCESS) {
      return logWarningWithStringError(result,
                                       "failed to write chapter record page");
    }

    if (pages != NULL) {
      memcpy(pages[recordPageNumber], getPageData(&volume->scratchPage),
             geometry->bytesPerPage);
    }
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
int writeChapter(Volume                 *volume,
                 OpenChapterIndex       *chapterIndex,
                 const UdsChunkRecord    records[])
{
  // Determine the position of the virtual chapter in the volume file.
  Geometry *geometry = volume->geometry;
  unsigned int physicalChapterNumber
    = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber);
  int physicalPage = mapToPhysicalPage(geometry, physicalChapterNumber, 0);

  // Pack and write the delta chapter index pages to the volume.
  int result = writeIndexPages(volume, physicalPage, chapterIndex, NULL);
  if (result != UDS_SUCCESS) {
    return result;
  }
  // Sort and write the record pages to the volume.
  result = writeRecordPages(volume, physicalPage, records, NULL);
  if (result != UDS_SUCCESS) {
    return result;
  }
  releaseVolumePage(&volume->scratchPage);
  // Flush the data to permanent storage.
  return syncVolumeStore(&volume->volumeStore);
}

/**********************************************************************/
size_t getCacheSize(Volume *volume)
{
  size_t size = getPageCacheSize(volume->pageCache);
  if (isSparse(volume->geometry)) {
    size += getSparseCacheMemorySize(volume->sparseCache);
  }
  return size;
}

/**********************************************************************/
static int probeChapter(Volume       *volume,
                        unsigned int  chapterNumber,
                        uint64_t     *virtualChapterNumber)
{
  const Geometry *geometry = volume->geometry;
  unsigned int expectedListNumber = 0;
  uint64_t lastVCN = UINT64_MAX;

  prefetchVolumePages(&volume->volumeStore,
                      mapToPhysicalPage(geometry, chapterNumber, 0),
                      geometry->indexPagesPerChapter);

  unsigned int i;
  for (i = 0; i < geometry->indexPagesPerChapter; ++i) {
    DeltaIndexPage *page;
    int result = getPage(volume, chapterNumber, i, CACHE_PROBE_INDEX_FIRST,
                         NULL, &page);
    if (result != UDS_SUCCESS) {
      return result;
    }

    uint64_t vcn = page->virtualChapterNumber;
    if (lastVCN == UINT64_MAX) {
      lastVCN = vcn;
    } else if (vcn != lastVCN) {
      logError("inconsistent chapter %u index page %u: expected vcn %"
               PRIu64 ", got vcn %llu",
               chapterNumber, i, lastVCN, vcn);
      return UDS_CORRUPT_COMPONENT;
    }

    if (expectedListNumber != page->lowestListNumber) {
      logError("inconsistent chapter %u index page %u: expected list number %u"
               ", got list number %u",
               chapterNumber, i, expectedListNumber, page->lowestListNumber);
      return UDS_CORRUPT_COMPONENT;
    }
    expectedListNumber = page->highestListNumber + 1;

    result = validateChapterIndexPage(page, geometry);
    if (result != UDS_SUCCESS) {
      return result;
    }
  }

  if (lastVCN == UINT64_MAX) {
    logError("no chapter %u virtual chapter number determined", chapterNumber);
    return UDS_CORRUPT_COMPONENT;
  }
  if (chapterNumber != lastVCN % geometry->chaptersPerVolume) {
    logError("chapter %u vcn %llu is out of phase (%u)",
             chapterNumber, lastVCN, geometry->chaptersPerVolume);
    return UDS_CORRUPT_COMPONENT;
  }
  *virtualChapterNumber = lastVCN;
  return UDS_SUCCESS;
}

/**********************************************************************/
static int probeWrapper(void         *aux,
                        unsigned int  chapterNumber,
                        uint64_t     *virtualChapterNumber)
{
  Volume *volume = aux;
  int result = probeChapter(volume, chapterNumber, virtualChapterNumber);
  if ((result == UDS_CORRUPT_COMPONENT) || (result == UDS_CORRUPT_DATA)) {
    *virtualChapterNumber = UINT64_MAX;
    return UDS_SUCCESS;
  }
  return result;
}

/**********************************************************************/
static int findRealEndOfVolume(Volume       *volume,
                               unsigned int  limit,
                               unsigned int *limitPtr)
{
  /*
   * Start checking from the end of the volume. As long as we hit corrupt
   * data, start skipping larger and larger amounts until we find real data.
   * If we find real data, reduce the span and try again until we find
   * the exact boundary.
   */
  unsigned int span = 1;
  unsigned int tries = 0;
  while (limit > 0) {
    unsigned int chapter = (span > limit) ? 0 : limit - span;
    uint64_t vcn = 0;
    int result = probeChapter(volume, chapter, &vcn);
    if (result == UDS_SUCCESS) {
      if (span == 1) {
        break;
      }
      span /= 2;
      tries = 0;
    } else if (result == UDS_CORRUPT_COMPONENT) {
      limit = chapter;
      if (++tries > 1) {
        span *= 2;
      }
    } else {
      return logErrorWithStringError(result, "cannot determine end of volume");
    }
  }

  if (limitPtr != NULL) {
    *limitPtr = limit;
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
int findVolumeChapterBoundaries(Volume   *volume,
                                uint64_t *lowestVCN,
                                uint64_t *highestVCN,
                                bool     *isEmpty)
{
  unsigned int chapterLimit = volume->geometry->chaptersPerVolume;

  int result = findRealEndOfVolume(volume, chapterLimit, &chapterLimit);
  if (result != UDS_SUCCESS) {
    return logErrorWithStringError(result, "cannot find end of volume");
  }

  if (chapterLimit == 0) {
    *lowestVCN = 0;
    *highestVCN = 0;
    *isEmpty = true;
    return UDS_SUCCESS;
  }

  *isEmpty = false;
  return findVolumeChapterBoundariesImpl(chapterLimit, MAX_BAD_CHAPTERS,
                                         lowestVCN, highestVCN, probeWrapper,
                                         volume);
}

/**********************************************************************/
int findVolumeChapterBoundariesImpl(unsigned int  chapterLimit,
                                    unsigned int  maxBadChapters,
                                    uint64_t     *lowestVCN,
                                    uint64_t     *highestVCN,
                                    int (*probeFunc)(void         *aux,
                                                     unsigned int  chapter,
                                                     uint64_t     *vcn),
                                    void *aux)
{
  if (chapterLimit == 0) {
    *lowestVCN = 0;
    *highestVCN = 0;
    return UDS_SUCCESS;
  }

  /*
   * This method assumes there is at most one run of contiguous bad chapters
   * caused by unflushed writes. Either the bad spot is at the beginning and
   * end, or somewhere in the middle. Wherever it is, the highest and lowest
   * VCNs are adjacent to it. Otherwise the volume is cleanly saved and
   * somewhere in the middle of it the highest VCN immediately preceeds the
   * lowest one.
   */

  uint64_t firstVCN = UINT64_MAX;

  // doesn't matter if this results in a bad spot (UINT64_MAX)
  int result = (*probeFunc)(aux, 0, &firstVCN);
  if (result != UDS_SUCCESS) {
    return UDS_SUCCESS;
  }

  /*
   * Binary search for end of the discontinuity in the monotonically
   * increasing virtual chapter numbers; bad spots are treated as a span of
   * UINT64_MAX values. In effect we're searching for the index of the
   * smallest value less than firstVCN. In the case we go off the end it means
   * that chapter 0 has the lowest vcn.
   */

  unsigned int leftChapter = 0;
  unsigned int rightChapter = chapterLimit;

  while (leftChapter < rightChapter) {
    unsigned int chapter = (leftChapter + rightChapter) / 2;
    uint64_t probeVCN;

    result = (*probeFunc)(aux, chapter, &probeVCN);
    if (result != UDS_SUCCESS) {
      return result;
    }
    if (firstVCN <= probeVCN) {
      leftChapter = chapter + 1;
    } else {
      rightChapter = chapter;
    }
  }

  uint64_t lowest = UINT64_MAX;
  uint64_t highest = UINT64_MAX;

  result = ASSERT(leftChapter == rightChapter, "leftChapter == rightChapter");
  if (result != UDS_SUCCESS) {
    return result;
  }

  leftChapter %= chapterLimit;  // in case we're at the end

  // At this point, leftChapter is the chapter with the lowest virtual chapter
  // number.

  result = (*probeFunc)(aux, leftChapter, &lowest);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ASSERT((lowest != UINT64_MAX), "invalid lowest chapter");
  if (result != UDS_SUCCESS) {
    return result;
  }

  // We now circularly scan backwards, moving over any bad chapters until we
  // find the chapter with the highest vcn (the first good chapter we
  // encounter).

  unsigned int badChapters = 0;

  for (;;) {
    rightChapter = (rightChapter + chapterLimit - 1) % chapterLimit;
    result = (*probeFunc)(aux, rightChapter, &highest);
    if (result != UDS_SUCCESS) {
      return result;
    }
    if (highest != UINT64_MAX) {
      break;
    }
    if (++badChapters >= maxBadChapters) {
      logError("too many bad chapters in volume: %u", badChapters);
      return UDS_CORRUPT_COMPONENT;
    }
  }

  *lowestVCN = lowest;
  *highestVCN = highest;
  return UDS_SUCCESS;
}

/**
 * Allocate a volume.
 *
 * @param config            The configuration to use
 * @param layout            The index layout
 * @param readQueueMaxSize  The maximum size of the read queue
 * @param zoneCount         The number of zones to use
 * @param newVolume         A pointer to hold the new volume
 *
 * @return UDS_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int allocateVolume(const Configuration  *config,
                          IndexLayout          *layout,
                          unsigned int          readQueueMaxSize,
                          unsigned int          zoneCount,
                          Volume              **newVolume)
{
  Volume *volume;
  int result = ALLOCATE(1, Volume, "volume", &volume);
  if (result != UDS_SUCCESS) {
    return result;
  }
  volume->nonce = getVolumeNonce(layout);
  // It is safe to call freeVolume now to clean up and close the volume

  result = copyGeometry(config->geometry, &volume->geometry);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return logWarningWithStringError(result,
                                     "failed to allocate geometry: error");
  }

  // Need a buffer for each entry in the page cache
  unsigned int reservedBuffers
    = config->cacheChapters * config->geometry->recordPagesPerChapter;
  // And a buffer for the chapter writer
  reservedBuffers += 1;
  // And a buffer for each entry in the sparse cache
  if (isSparse(volume->geometry)) {
    reservedBuffers
      += config->cacheChapters * config->geometry->indexPagesPerChapter;
  }
  result = openVolumeStore(&volume->volumeStore, layout, reservedBuffers,
                           config->geometry->bytesPerPage);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }
  result = initializeVolumePage(config->geometry, &volume->scratchPage);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }

  result = makeRadixSorter(config->geometry->recordsPerPage,
                           &volume->radixSorter);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }

  result = ALLOCATE(config->geometry->recordsPerPage, const UdsChunkRecord *,
                    "record pointers", &volume->recordPointers);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }

  if (isSparse(volume->geometry)) {
    result = makeSparseCache(volume->geometry, config->cacheChapters,
                             zoneCount, &volume->sparseCache);
    if (result != UDS_SUCCESS) {
      freeVolume(volume);
      return result;
    }
  }
  result = makePageCache(volume->geometry, config->cacheChapters,
                         readQueueMaxSize, zoneCount, &volume->pageCache);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }
  result = makeIndexPageMap(volume->geometry, &volume->indexPageMap);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }

  *newVolume = volume;
  return UDS_SUCCESS;
}

/**********************************************************************/
int makeVolume(const Configuration          *config,
               IndexLayout                  *layout,
               const struct uds_parameters  *userParams,
               unsigned int                  readQueueMaxSize,
               unsigned int                  zoneCount,
               Volume                      **newVolume)
{
  unsigned int volumeReadThreads = getReadThreads(userParams);

  if (readQueueMaxSize <= volumeReadThreads) {
    logError("Number of read threads must be smaller than read queue");
    return UDS_INVALID_ARGUMENT;
  }

  Volume *volume = NULL;
  int result = allocateVolume(config, layout, readQueueMaxSize, zoneCount,
                              &volume);
  if (result != UDS_SUCCESS) {
    return result;
  }
  result = initMutex(&volume->readThreadsMutex);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }
  result = initCond(&volume->readThreadsReadDoneCond);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }
  result = initCond(&volume->readThreadsCond);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }

  // Start the reader threads.  If this allocation succeeds, freeVolume knows
  // that it needs to try and stop those threads.
  result = ALLOCATE(volumeReadThreads, Thread, "reader threads",
                    &volume->readerThreads);
  if (result != UDS_SUCCESS) {
    freeVolume(volume);
    return result;
  }
  unsigned int i;
  for (i = 0; i < volumeReadThreads; i++) {
    result = createThread(readThreadFunction, (void *) volume, "reader",
                          &volume->readerThreads[i]);
    if (result != UDS_SUCCESS) {
      freeVolume(volume);
      return result;
    }
    // We only stop as many threads as actually got started.
    volume->numReadThreads = i + 1;
  }

  *newVolume = volume;
  return UDS_SUCCESS;
}

/**********************************************************************/
void freeVolume(Volume *volume)
{
  if (volume == NULL) {
    return;
  }

  // If readerThreads is NULL, then we haven't set up the reader threads.
  if (volume->readerThreads != NULL) {
    // Stop the reader threads.  It is ok if there aren't any of them.
    lockMutex(&volume->readThreadsMutex);
    volume->readerState |= READER_STATE_EXIT;
    broadcastCond(&volume->readThreadsCond);
    unlockMutex(&volume->readThreadsMutex);
    unsigned int i;
    for (i = 0; i < volume->numReadThreads; i++) {
      joinThreads(volume->readerThreads[i]);
    }
    FREE(volume->readerThreads);
    volume->readerThreads = NULL;
  }

  // Must close the volume store AFTER freeing the scratch page and the caches
  destroyVolumePage(&volume->scratchPage);
  freePageCache(volume->pageCache);
  freeSparseCache(volume->sparseCache);
  closeVolumeStore(&volume->volumeStore);

  destroyCond(&volume->readThreadsCond);
  destroyCond(&volume->readThreadsReadDoneCond);
  destroyMutex(&volume->readThreadsMutex);
  freeIndexPageMap(volume->indexPageMap);
  freeRadixSorter(volume->radixSorter);
  FREE(volume->geometry);
  FREE(volume->recordPointers);
  FREE(volume);
}