Blame source/uds/volume.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/uds-releases/jasper/src/uds/volume.c#23 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "volume.h"
Packit Service 310c69
Packit Service 310c69
#include "cacheCounters.h"
Packit Service 310c69
#include "chapterIndex.h"
Packit Service 310c69
#include "compiler.h"
Packit Service 310c69
#include "errors.h"
Packit Service 310c69
#include "geometry.h"
Packit Service 310c69
#include "hashUtils.h"
Packit Service 310c69
#include "indexConfig.h"
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "permassert.h"
Packit Service 310c69
#include "recordPage.h"
Packit Service 310c69
#include "request.h"
Packit Service 310c69
#include "sparseCache.h"
Packit Service 310c69
#include "stringUtils.h"
Packit Service 310c69
#include "threads.h"
Packit Service 310c69
Packit Service 310c69
enum {
Packit Service 310c69
  MAX_BAD_CHAPTERS = 100,           // max number of contiguous bad chapters
Packit Service 310c69
  DEFAULT_VOLUME_READ_THREADS = 2,  // Default number of reader threads
Packit Service 310c69
  MAX_VOLUME_READ_THREADS = 16,     // Maximum number of reader threads
Packit Service 310c69
};
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static unsigned int getReadThreads(const struct uds_parameters *userParams)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int readThreads = (userParams == NULL
Packit Service 310c69
                              ? DEFAULT_VOLUME_READ_THREADS
Packit Service 310c69
                              : userParams->read_threads);
Packit Service 310c69
  if (readThreads < 1) {
Packit Service 310c69
    readThreads = 1;
Packit Service 310c69
  }
Packit Service 310c69
  if (readThreads > MAX_VOLUME_READ_THREADS) {
Packit Service 310c69
    readThreads = MAX_VOLUME_READ_THREADS;
Packit Service 310c69
  }
Packit Service 310c69
  return readThreads;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE unsigned int mapToPageNumber(Geometry     *geometry,
Packit Service 310c69
                                           unsigned int  physicalPage)
Packit Service 310c69
{
Packit Service 310c69
  return ((physicalPage - 1) % geometry->pagesPerChapter);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE unsigned int mapToChapterNumber(Geometry     *geometry,
Packit Service 310c69
                                              unsigned int  physicalPage)
Packit Service 310c69
{
Packit Service 310c69
  return ((physicalPage - 1) / geometry->pagesPerChapter);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE bool isRecordPage(Geometry *geometry, unsigned int physicalPage)
Packit Service 310c69
{
Packit Service 310c69
  return (((physicalPage - 1) % geometry->pagesPerChapter)
Packit Service 310c69
          >= geometry->indexPagesPerChapter);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE unsigned int getZoneNumber(Request *request)
Packit Service 310c69
{
Packit Service 310c69
  return (request == NULL) ? 0 : request->zoneNumber;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int mapToPhysicalPage(const Geometry *geometry, int chapter, int page)
Packit Service 310c69
{
Packit Service 310c69
  // Page zero is the header page, so the first index page in the
Packit Service 310c69
  // first chapter is physical page one.
Packit Service 310c69
  return (1 + (geometry->pagesPerChapter * chapter) + page);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void waitForReadQueueNotFull(Volume *volume, Request *request)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int zoneNumber = getZoneNumber(request);
Packit Service 310c69
  InvalidateCounter invalidateCounter = getInvalidateCounter(volume->pageCache,
Packit Service 310c69
                                                             zoneNumber);
Packit Service 310c69
  if (searchPending(invalidateCounter)) {
Packit Service 310c69
    // Increment the invalidate counter to avoid deadlock where the reader
Packit Service 310c69
    // threads cannot make progress because they are waiting on the counter
Packit Service 310c69
    // and the index thread cannot because the read queue is full.
Packit Service 310c69
    endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  while (readQueueIsFull(volume->pageCache)) {
Packit Service 310c69
    logDebug("Waiting until read queue not full");
Packit Service 310c69
    signalCond(&volume->readThreadsCond);
Packit Service 310c69
    waitCond(&volume->readThreadsReadDoneCond, &volume->readThreadsMutex);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (searchPending(invalidateCounter)) {
Packit Service 310c69
    // Increment again so we get back to an odd value.
Packit Service 310c69
    beginPendingSearch(volume->pageCache, pageBeingSearched(invalidateCounter),
Packit Service 310c69
                       zoneNumber);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int enqueuePageRead(Volume *volume, Request *request, int physicalPage)
Packit Service 310c69
{
Packit Service 310c69
  // Don't allow new requests if we are shutting down, but make sure
Packit Service 310c69
  // to process any requests that are still in the pipeline.
Packit Service 310c69
  if ((volume->readerState & READER_STATE_EXIT) != 0) {
Packit Service 310c69
    logInfo("failed to queue read while shutting down");
Packit Service 310c69
    return UDS_SHUTTINGDOWN;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Mark the page as queued in the volume cache, for chapter invalidation to
Packit Service 310c69
  // be able to cancel a read.
Packit Service 310c69
  // If we are unable to do this because the queues are full, flush them first
Packit Service 310c69
  int result;
Packit Service 310c69
  while ((result = enqueueRead(volume->pageCache, request, physicalPage))
Packit Service 310c69
         == UDS_SUCCESS) {
Packit Service 310c69
    logDebug("Read queues full, waiting for reads to finish");
Packit Service 310c69
    waitForReadQueueNotFull(volume, request);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (result == UDS_QUEUED) {
Packit Service 310c69
    /* signal a read thread */
Packit Service 310c69
    signalCond(&volume->readThreadsCond);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE void waitToReserveReadQueueEntry(Volume        *volume,
Packit Service 310c69
                                               unsigned int  *queuePos,
Packit Service 310c69
                                               Request      **requestList,
Packit Service 310c69
                                               unsigned int  *physicalPage,
Packit Service 310c69
                                               bool          *invalid)
Packit Service 310c69
{
Packit Service 310c69
  while (((volume->readerState & READER_STATE_EXIT) == 0)
Packit Service 310c69
         && (((volume->readerState & READER_STATE_STOP) != 0)
Packit Service 310c69
             || !reserveReadQueueEntry(volume->pageCache, queuePos,
Packit Service 310c69
                                       requestList, physicalPage, invalid))) {
Packit Service 310c69
    waitCond(&volume->readThreadsCond, &volume->readThreadsMutex);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int initChapterIndexPage(const Volume   *volume,
Packit Service 310c69
                                byte           *indexPage,
Packit Service 310c69
                                unsigned int    chapter,
Packit Service 310c69
                                unsigned int    indexPageNumber,
Packit Service 310c69
                                DeltaIndexPage *chapterIndexPage)
Packit Service 310c69
{
Packit Service 310c69
  Geometry *geometry = volume->geometry;
Packit Service 310c69
Packit Service 310c69
  int result = initializeChapterIndexPage(chapterIndexPage, geometry,
Packit Service 310c69
                                          indexPage, volume->nonce);
Packit Service 310c69
  if (volume->lookupMode == LOOKUP_FOR_REBUILD) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return logErrorWithStringError(result,
Packit Service 310c69
                                   "Reading chapter index page for chapter %u"
Packit Service 310c69
                                   " page %u",
Packit Service 310c69
                                   chapter, indexPageNumber);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  IndexPageBounds bounds;
Packit Service 310c69
  result = getListNumberBounds(volume->indexPageMap, chapter,
Packit Service 310c69
                               indexPageNumber, &bounds);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  uint64_t     ciVirtual = chapterIndexPage->virtualChapterNumber;
Packit Service 310c69
  unsigned int ciChapter = mapToPhysicalChapter(geometry, ciVirtual);
Packit Service 310c69
  if ((chapter == ciChapter)
Packit Service 310c69
      && (bounds.lowestList == chapterIndexPage->lowestListNumber)
Packit Service 310c69
      && (bounds.highestList == chapterIndexPage->highestListNumber)) {
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  logWarning("Index page map updated to %llu",
Packit Service 310c69
             getLastUpdate(volume->indexPageMap));
Packit Service 310c69
  logWarning("Page map expects that chapter %u page %u has range %u to %u, "
Packit Service 310c69
             "but chapter index page has chapter %" PRIu64
Packit Service 310c69
             " with range %u to %u",
Packit Service 310c69
             chapter, indexPageNumber, bounds.lowestList, bounds.highestList,
Packit Service 310c69
             ciVirtual, chapterIndexPage->lowestListNumber,
Packit Service 310c69
             chapterIndexPage->highestListNumber);
Packit Service 310c69
  return ASSERT_WITH_ERROR_CODE(false,
Packit Service 310c69
                                UDS_CORRUPT_DATA,
Packit Service 310c69
                                "index page map mismatch with chapter index");
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int initializeIndexPage(const Volume *volume,
Packit Service 310c69
                               unsigned int  physicalPage,
Packit Service 310c69
                               CachedPage   *page)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int chapter = mapToChapterNumber(volume->geometry, physicalPage);
Packit Service 310c69
  unsigned int indexPageNumber = mapToPageNumber(volume->geometry,
Packit Service 310c69
                                                 physicalPage);
Packit Service 310c69
  int result = initChapterIndexPage(volume, getPageData(&page->cp_pageData),
Packit Service 310c69
                                    chapter, indexPageNumber,
Packit Service 310c69
                                    &page->cp_indexPage);
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static void readThreadFunction(void *arg)
Packit Service 310c69
{
Packit Service 310c69
  Volume       *volume  = arg;
Packit Service 310c69
  unsigned int  queuePos;
Packit Service 310c69
  Request      *requestList;
Packit Service 310c69
  unsigned int  physicalPage;
Packit Service 310c69
  bool          invalid = false;
Packit Service 310c69
Packit Service 310c69
  logDebug("reader starting");
Packit Service 310c69
  lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  while (true) {
Packit Service 310c69
    waitToReserveReadQueueEntry(volume, &queuePos, &requestList, &physicalPage,
Packit Service 310c69
                                &invalid);
Packit Service 310c69
    if ((volume->readerState & READER_STATE_EXIT) != 0) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    volume->busyReaderThreads++;
Packit Service 310c69
Packit Service 310c69
    bool recordPage = isRecordPage(volume->geometry, physicalPage);
Packit Service 310c69
Packit Service 310c69
    CachedPage *page = NULL;
Packit Service 310c69
    int result = UDS_SUCCESS;
Packit Service 310c69
    if (!invalid) {
Packit Service 310c69
      // Find a place to put the read queue page we reserved above.
Packit Service 310c69
      result = selectVictimInCache(volume->pageCache, &page);
Packit Service 310c69
      if (result == UDS_SUCCESS) {
Packit Service 310c69
        unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
        result = readVolumePage(&volume->volumeStore, physicalPage,
Packit Service 310c69
                                &page->cp_pageData);
Packit Service 310c69
        if (result != UDS_SUCCESS) {
Packit Service 310c69
          logWarning("Error reading page %u from volume", physicalPage);
Packit Service 310c69
          cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
        }
Packit Service 310c69
        lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
      } else {
Packit Service 310c69
        logWarning("Error selecting cache victim for page read");
Packit Service 310c69
      }
Packit Service 310c69
Packit Service 310c69
      if (result == UDS_SUCCESS) {
Packit Service 310c69
        if (!volume->pageCache->readQueue[queuePos].invalid) {
Packit Service 310c69
          if (!recordPage) {
Packit Service 310c69
            result = initializeIndexPage(volume, physicalPage, page);
Packit Service 310c69
            if (result != UDS_SUCCESS) {
Packit Service 310c69
              logWarning("Error initializing chapter index page");
Packit Service 310c69
              cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
            }
Packit Service 310c69
          }
Packit Service 310c69
Packit Service 310c69
          if (result == UDS_SUCCESS) {
Packit Service 310c69
            result = putPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
            if (result != UDS_SUCCESS) {
Packit Service 310c69
              logWarning("Error putting page %u in cache", physicalPage);
Packit Service 310c69
              cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
            }
Packit Service 310c69
          }
Packit Service 310c69
        } else {
Packit Service 310c69
          logWarning("Page %u invalidated after read", physicalPage);
Packit Service 310c69
          cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
          invalid = true;
Packit Service 310c69
        }
Packit Service 310c69
      }
Packit Service 310c69
    } else {
Packit Service 310c69
      logDebug("Requeuing requests for invalid page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (invalid) {
Packit Service 310c69
      result = UDS_SUCCESS;
Packit Service 310c69
      page = NULL;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    while (requestList != NULL) {
Packit Service 310c69
      Request *request = requestList;
Packit Service 310c69
      requestList = request->nextRequest;
Packit Service 310c69
Packit Service 310c69
      /*
Packit Service 310c69
       * If we've read in a record page, we're going to do an immediate search,
Packit Service 310c69
       * in an attempt to speed up processing when we requeue the request, so
Packit Service 310c69
       * that it doesn't have to go back into the getRecordFromZone code again.
Packit Service 310c69
       * However, if we've just read in an index page, we don't want to search.
Packit Service 310c69
       * We want the request to be processed again and getRecordFromZone to be
Packit Service 310c69
       * run.  We have added new fields in request to allow the index code to
Packit Service 310c69
       * know whether it can stop processing before getRecordFromZone is called
Packit Service 310c69
       * again.
Packit Service 310c69
       */
Packit Service 310c69
      if ((result == UDS_SUCCESS) && (page != NULL) && recordPage) {
Packit Service 310c69
        if (searchRecordPage(getPageData(&page->cp_pageData),
Packit Service 310c69
                             &request->chunkName, volume->geometry,
Packit Service 310c69
                             &request->oldMetadata)) {
Packit Service 310c69
          request->slLocation = LOC_IN_DENSE;
Packit Service 310c69
        } else {
Packit Service 310c69
          request->slLocation = LOC_UNAVAILABLE;
Packit Service 310c69
        }
Packit Service 310c69
        request->slLocationKnown = true;
Packit Service 310c69
      }
Packit Service 310c69
Packit Service 310c69
      // reflect any read failures in the request status
Packit Service 310c69
      request->status = result;
Packit Service 310c69
      restartRequest(request);
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    releaseReadQueueEntry(volume->pageCache, queuePos);
Packit Service 310c69
Packit Service 310c69
    volume->busyReaderThreads--;
Packit Service 310c69
    broadcastCond(&volume->readThreadsReadDoneCond);
Packit Service 310c69
  }
Packit Service 310c69
  unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  logDebug("reader done");
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int readPageLocked(Volume        *volume,
Packit Service 310c69
                          Request       *request,
Packit Service 310c69
                          unsigned int   physicalPage,
Packit Service 310c69
                          bool           syncRead,
Packit Service 310c69
                          CachedPage   **pagePtr)
Packit Service 310c69
{
Packit Service 310c69
  syncRead |= ((volume->lookupMode == LOOKUP_FOR_REBUILD)
Packit Service 310c69
               || (request == NULL)
Packit Service 310c69
               || (request->session == NULL));
Packit Service 310c69
Packit Service 310c69
  int result = UDS_SUCCESS;
Packit Service 310c69
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  if (syncRead) {
Packit Service 310c69
    // Find a place to put the page.
Packit Service 310c69
    result = selectVictimInCache(volume->pageCache, &page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      logWarning("Error selecting cache victim for page read");
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
    result = readVolumePage(&volume->volumeStore, physicalPage,
Packit Service 310c69
                            &page->cp_pageData);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      logWarning("Error reading page %u from volume", physicalPage);
Packit Service 310c69
      cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
    if (!isRecordPage(volume->geometry, physicalPage)) {
Packit Service 310c69
      result = initializeIndexPage(volume, physicalPage, page);
Packit Service 310c69
      if (result != UDS_SUCCESS) {
Packit Service 310c69
        if (volume->lookupMode != LOOKUP_FOR_REBUILD) {
Packit Service 310c69
          logWarning("Corrupt index page %u", physicalPage);
Packit Service 310c69
        }
Packit Service 310c69
        cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
        return result;
Packit Service 310c69
      }
Packit Service 310c69
    }
Packit Service 310c69
    result = putPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      logWarning("Error putting page %u in cache", physicalPage);
Packit Service 310c69
      cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  } else {
Packit Service 310c69
    result = enqueuePageRead(volume, request, physicalPage);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *pagePtr = page;
Packit Service 310c69
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int getPageLocked(Volume          *volume,
Packit Service 310c69
                  Request         *request,
Packit Service 310c69
                  unsigned int     physicalPage,
Packit Service 310c69
                  CacheProbeType   probeType,
Packit Service 310c69
                  CachedPage     **pagePtr)
Packit Service 310c69
{
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  int result = getPageFromCache(volume->pageCache, physicalPage, probeType,
Packit Service 310c69
                                &page);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  if (page == NULL) {
Packit Service 310c69
    result = readPageLocked(volume, request, physicalPage, true, &page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  } else if (getZoneNumber(request) == 0) {
Packit Service 310c69
    // Only 1 zone is responsible for updating LRU
Packit Service 310c69
    makePageMostRecent(volume->pageCache, page);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *pagePtr = page;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int getPageProtected(Volume          *volume,
Packit Service 310c69
                     Request         *request,
Packit Service 310c69
                     unsigned int     physicalPage,
Packit Service 310c69
                     CacheProbeType   probeType,
Packit Service 310c69
                     CachedPage     **pagePtr)
Packit Service 310c69
{
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  int result = getPageFromCache(volume->pageCache, physicalPage,
Packit Service 310c69
                                probeType | CACHE_PROBE_IGNORE_FAILURE,
Packit Service 310c69
                                &page);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  unsigned int zoneNumber = getZoneNumber(request);
Packit Service 310c69
  // If we didn't find a page we need to enqueue a read for it, in which
Packit Service 310c69
  // case we need to grab the mutex.
Packit Service 310c69
  if (page == NULL) {
Packit Service 310c69
    endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
    lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
Packit Service 310c69
    /*
Packit Service 310c69
     * Do the lookup again while holding the read mutex (no longer the fast
Packit Service 310c69
     * case so this should be ok to repeat). We need to do this because an
Packit Service 310c69
     * page may have been added to the page map by the reader thread between
Packit Service 310c69
     * the time searched above and the time we went to actually try to enqueue
Packit Service 310c69
     * it below. This could result in us enqueuing another read for an page
Packit Service 310c69
     * which is already in the cache, which would mean we end up with two
Packit Service 310c69
     * entries in the cache for the same page.
Packit Service 310c69
     */
Packit Service 310c69
    result
Packit Service 310c69
      = getPageFromCache(volume->pageCache, physicalPage, probeType, &page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      /*
Packit Service 310c69
       * In non-success cases (anything not UDS_SUCCESS, meaning both
Packit Service 310c69
       * UDS_QUEUED and "real" errors), the caller doesn't get a
Packit Service 310c69
       * handle on a cache page, so it can't continue the search, and
Packit Service 310c69
       * we don't need to prevent other threads from messing with the
Packit Service 310c69
       * cache.
Packit Service 310c69
       *
Packit Service 310c69
       * However, we do need to set the "search pending" flag because
Packit Service 310c69
       * the callers expect it to always be set on return, even if
Packit Service 310c69
       * they can't actually do the search.
Packit Service 310c69
       *
Packit Service 310c69
       * Doing the calls in this order ought to be faster, since we
Packit Service 310c69
       * let other threads have the reader thread mutex (which can
Packit Service 310c69
       * require a syscall) ASAP, and set the "search pending" state
Packit Service 310c69
       * that can block the reader thread as the last thing.
Packit Service 310c69
       */
Packit Service 310c69
      unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // If we found the page now, we can release the mutex and proceed
Packit Service 310c69
    // as if this were the fast case.
Packit Service 310c69
    if (page != NULL) {
Packit Service 310c69
      /*
Packit Service 310c69
       * If we found a page (*pagePtr != NULL and return
Packit Service 310c69
       * UDS_SUCCESS), then we're telling the caller where to look for
Packit Service 310c69
       * the cache page, and need to switch to "reader thread
Packit Service 310c69
       * unlocked" and "search pending" state in careful order so no
Packit Service 310c69
       * other thread can mess with the data before our caller gets to
Packit Service 310c69
       * look at it.
Packit Service 310c69
       */
Packit Service 310c69
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
      unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (page == NULL) {
Packit Service 310c69
    result = readPageLocked(volume, request, physicalPage, false, &page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      /*
Packit Service 310c69
       * This code path is used frequently in the UDS_QUEUED case, so
Packit Service 310c69
       * the performance gain from unlocking first, while "search
Packit Service 310c69
       * pending" mode is off, turns out to be significant in some
Packit Service 310c69
       * cases.
Packit Service 310c69
       */
Packit Service 310c69
      unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
      beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // See above re: ordering requirement.
Packit Service 310c69
    beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
    unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  } else {
Packit Service 310c69
    if (getZoneNumber(request) == 0 ) {
Packit Service 310c69
      // Only 1 zone is responsible for updating LRU
Packit Service 310c69
      makePageMostRecent(volume->pageCache, page);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *pagePtr = page;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int getPage(Volume          *volume,
Packit Service 310c69
            unsigned int     chapter,
Packit Service 310c69
            unsigned int     pageNumber,
Packit Service 310c69
            CacheProbeType   probeType,
Packit Service 310c69
            byte           **dataPtr,
Packit Service 310c69
            DeltaIndexPage **indexPagePtr)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int physicalPage
Packit Service 310c69
    = mapToPhysicalPage(volume->geometry, chapter, pageNumber);
Packit Service 310c69
Packit Service 310c69
  lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  int result = getPageLocked(volume, NULL, physicalPage, probeType, &page);
Packit Service 310c69
  unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
Packit Service 310c69
  if (dataPtr != NULL) {
Packit Service 310c69
    *dataPtr = (page != NULL) ? getPageData(&page->cp_pageData) : NULL;
Packit Service 310c69
  }
Packit Service 310c69
  if (indexPagePtr != NULL) {
Packit Service 310c69
    *indexPagePtr = (page != NULL) ? &page->cp_indexPage : NULL;
Packit Service 310c69
  }
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Search for a chunk name in a cached index page or chapter index, returning
Packit Service 310c69
 * the record page number from a chapter index match.
Packit Service 310c69
 *
Packit Service 310c69
 * @param volume           the volume containing the index page to search
Packit Service 310c69
 * @param request          the request originating the search (may be NULL for
Packit Service 310c69
 *                         a direct query from volume replay)
Packit Service 310c69
 * @param name             the name of the block or chunk
Packit Service 310c69
 * @param chapter          the chapter to search
Packit Service 310c69
 * @param indexPageNumber  the index page number of the page to search
Packit Service 310c69
 * @param recordPageNumber pointer to return the chapter record page number
Packit Service 310c69
 *                         (value will be NO_CHAPTER_INDEX_ENTRY if the name
Packit Service 310c69
 *                         was not found)
Packit Service 310c69
 *
Packit Service 310c69
 * @return UDS_SUCCESS or an error code
Packit Service 310c69
 **/
Packit Service 310c69
static int searchCachedIndexPage(Volume             *volume,
Packit Service 310c69
                                 Request            *request,
Packit Service 310c69
                                 const UdsChunkName *name,
Packit Service 310c69
                                 unsigned int        chapter,
Packit Service 310c69
                                 unsigned int        indexPageNumber,
Packit Service 310c69
                                 int                *recordPageNumber)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int zoneNumber = getZoneNumber(request);
Packit Service 310c69
  unsigned int physicalPage
Packit Service 310c69
    = mapToPhysicalPage(volume->geometry, chapter, indexPageNumber);
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Make sure the invalidate counter is updated before we try and read from
Packit Service 310c69
   * the page map.  This prevents this thread from reading a page in the
Packit Service 310c69
   * page map which has already been marked for invalidation by the reader
Packit Service 310c69
   * thread, before the reader thread has noticed that the invalidateCounter
Packit Service 310c69
   * has been incremented.
Packit Service 310c69
   */
Packit Service 310c69
  beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  int result = getPageProtected(volume, request, physicalPage,
Packit Service 310c69
                                cacheProbeType(request, true), &page);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result
Packit Service 310c69
    = ASSERT_LOG_ONLY(searchPending(getInvalidateCounter(volume->pageCache,
Packit Service 310c69
                                                         zoneNumber)),
Packit Service 310c69
                      "Search is pending for zone %u", zoneNumber);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = searchChapterIndexPage(&page->cp_indexPage, volume->geometry, name,
Packit Service 310c69
                                  recordPageNumber);
Packit Service 310c69
  endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int searchCachedRecordPage(Volume             *volume,
Packit Service 310c69
                           Request            *request,
Packit Service 310c69
                           const UdsChunkName *name,
Packit Service 310c69
                           unsigned int        chapter,
Packit Service 310c69
                           int                 recordPageNumber,
Packit Service 310c69
                           UdsChunkData       *duplicate,
Packit Service 310c69
                           bool               *found)
Packit Service 310c69
{
Packit Service 310c69
  *found = false;
Packit Service 310c69
Packit Service 310c69
  if (recordPageNumber == NO_CHAPTER_INDEX_ENTRY) {
Packit Service 310c69
    // No record for that name can exist in the chapter.
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  Geometry *geometry = volume->geometry;
Packit Service 310c69
  int result = ASSERT(((recordPageNumber >= 0)
Packit Service 310c69
                       && ((unsigned int) recordPageNumber
Packit Service 310c69
                           < geometry->recordPagesPerChapter)),
Packit Service 310c69
                      "0 <= %d <= %u",
Packit Service 310c69
                      recordPageNumber, geometry->recordPagesPerChapter);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  unsigned int pageNumber = geometry->indexPagesPerChapter + recordPageNumber;
Packit Service 310c69
Packit Service 310c69
  unsigned int zoneNumber = getZoneNumber(request);
Packit Service 310c69
  int physicalPage
Packit Service 310c69
    = mapToPhysicalPage(volume->geometry, chapter, pageNumber);
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Make sure the invalidate counter is updated before we try and read from
Packit Service 310c69
   * the page map. This prevents this thread from reading a page in the page
Packit Service 310c69
   * map which has already been marked for invalidation by the reader thread,
Packit Service 310c69
   * before the reader thread has noticed that the invalidateCounter has been
Packit Service 310c69
   * incremented.
Packit Service 310c69
   */
Packit Service 310c69
  beginPendingSearch(volume->pageCache, physicalPage, zoneNumber);
Packit Service 310c69
Packit Service 310c69
  CachedPage *recordPage;
Packit Service 310c69
  result = getPageProtected(volume, request, physicalPage,
Packit Service 310c69
                            cacheProbeType(request, false), &recordPage);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (searchRecordPage(getPageData(&recordPage->cp_pageData), name, geometry,
Packit Service 310c69
                       duplicate)) {
Packit Service 310c69
    *found = true;
Packit Service 310c69
  }
Packit Service 310c69
  endPendingSearch(volume->pageCache, zoneNumber);
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int readChapterIndexFromVolume(const Volume       *volume,
Packit Service 310c69
                               uint64_t            virtualChapter,
Packit Service 310c69
                               struct volume_page  volumePages[],
Packit Service 310c69
                               DeltaIndexPage      indexPages[])
Packit Service 310c69
{
Packit Service 310c69
  const Geometry *geometry = volume->geometry;
Packit Service 310c69
  unsigned int physicalChapter = mapToPhysicalChapter(geometry,
Packit Service 310c69
                                                      virtualChapter);
Packit Service 310c69
  int physicalPage = mapToPhysicalPage(geometry, physicalChapter, 0);
Packit Service 310c69
  prefetchVolumePages(&volume->volumeStore, physicalPage,
Packit Service 310c69
                      geometry->indexPagesPerChapter);
Packit Service 310c69
Packit Service 310c69
  unsigned int i;
Packit Service 310c69
  struct volume_page volumePage;
Packit Service 310c69
  int result = initializeVolumePage(geometry, &volumePage);
Packit Service 310c69
  for (i = 0; i < geometry->indexPagesPerChapter; i++) {
Packit Service 310c69
    int result = readVolumePage(&volume->volumeStore, physicalPage + i,
Packit Service 310c69
                                &volumePages[i]);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
    byte *indexPage = getPageData(&volumePages[i]);
Packit Service 310c69
    result = initChapterIndexPage(volume, indexPage, physicalChapter, i,
Packit Service 310c69
                                  &indexPages[i]);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
  destroyVolumePage(&volumePage);
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int searchVolumePageCache(Volume             *volume,
Packit Service 310c69
                          Request            *request,
Packit Service 310c69
                          const UdsChunkName *name,
Packit Service 310c69
                          uint64_t            virtualChapter,
Packit Service 310c69
                          UdsChunkData       *metadata,
Packit Service 310c69
                          bool               *found)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int physicalChapter
Packit Service 310c69
    = mapToPhysicalChapter(volume->geometry, virtualChapter);
Packit Service 310c69
  unsigned int indexPageNumber;
Packit Service 310c69
  int result = findIndexPageNumber(volume->indexPageMap, name, physicalChapter,
Packit Service 310c69
                                   &indexPageNumber);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  int recordPageNumber;
Packit Service 310c69
  result = searchCachedIndexPage(volume, request, name, physicalChapter,
Packit Service 310c69
                                 indexPageNumber, &recordPageNumber);
Packit Service 310c69
  if (result == UDS_SUCCESS) {
Packit Service 310c69
    result = searchCachedRecordPage(volume, request, name, physicalChapter,
Packit Service 310c69
                                    recordPageNumber, metadata, found);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int forgetChapter(Volume             *volume,
Packit Service 310c69
                  uint64_t            virtualChapter,
Packit Service 310c69
                  InvalidationReason  reason)
Packit Service 310c69
{
Packit Service 310c69
  logDebug("forgetting chapter %llu", virtualChapter);
Packit Service 310c69
  unsigned int physicalChapter
Packit Service 310c69
    = mapToPhysicalChapter(volume->geometry, virtualChapter);
Packit Service 310c69
  lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  int result
Packit Service 310c69
    = invalidatePageCacheForChapter(volume->pageCache, physicalChapter,
Packit Service 310c69
                                    volume->geometry->pagesPerChapter,
Packit Service 310c69
                                    reason);
Packit Service 310c69
  unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Donate index page data to the page cache for an index page that was just
Packit Service 310c69
 * written to the volume.  The caller must already hold the reader thread
Packit Service 310c69
 * mutex.
Packit Service 310c69
 *
Packit Service 310c69
 * @param volume           the volume
Packit Service 310c69
 * @param physicalChapter  the physical chapter number of the index page
Packit Service 310c69
 * @param indexPageNumber  the chapter page number of the index page
Packit Service 310c69
 * @param scratchPage      the index page data
Packit Service 310c69
 **/
Packit Service 310c69
static int donateIndexPageLocked(Volume             *volume,
Packit Service 310c69
                                 unsigned int        physicalChapter,
Packit Service 310c69
                                 unsigned int        indexPageNumber,
Packit Service 310c69
                                 struct volume_page *scratchPage)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int physicalPage
Packit Service 310c69
    = mapToPhysicalPage(volume->geometry, physicalChapter, indexPageNumber);
Packit Service 310c69
Packit Service 310c69
  // Find a place to put the page.
Packit Service 310c69
  CachedPage *page = NULL;
Packit Service 310c69
  int result = selectVictimInCache(volume->pageCache, &page);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Exchange the scratch page with the cache page
Packit Service 310c69
  swapVolumePages(&page->cp_pageData, scratchPage);
Packit Service 310c69
  
Packit Service 310c69
  result = initChapterIndexPage(volume, getPageData(&page->cp_pageData),
Packit Service 310c69
                                physicalChapter, indexPageNumber,
Packit Service 310c69
                                &page->cp_indexPage);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    logWarning("Error initialize chapter index page");
Packit Service 310c69
    cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = putPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    logWarning("Error putting page %u in cache", physicalPage);
Packit Service 310c69
    cancelPageInCache(volume->pageCache, physicalPage, page);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int writeIndexPages(Volume            *volume,
Packit Service 310c69
                    int                physicalPage,
Packit Service 310c69
                    OpenChapterIndex  *chapterIndex,
Packit Service 310c69
                    byte             **pages)
Packit Service 310c69
{
Packit Service 310c69
  Geometry *geometry = volume->geometry;
Packit Service 310c69
  unsigned int physicalChapterNumber
Packit Service 310c69
    = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber);
Packit Service 310c69
  unsigned int deltaListNumber = 0;
Packit Service 310c69
Packit Service 310c69
  unsigned int indexPageNumber;
Packit Service 310c69
  for (indexPageNumber = 0;
Packit Service 310c69
       indexPageNumber < geometry->indexPagesPerChapter;
Packit Service 310c69
       indexPageNumber++) {
Packit Service 310c69
    int result = prepareToWriteVolumePage(&volume->volumeStore,
Packit Service 310c69
                                          physicalPage + indexPageNumber,
Packit Service 310c69
                                          &volume->scratchPage);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result, "failed to prepare index page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Pack as many delta lists into the index page as will fit.
Packit Service 310c69
    unsigned int listsPacked;
Packit Service 310c69
    bool lastPage = ((indexPageNumber + 1) == geometry->indexPagesPerChapter);
Packit Service 310c69
    result = packOpenChapterIndexPage(chapterIndex,
Packit Service 310c69
                                      getPageData(&volume->scratchPage),
Packit Service 310c69
                                      deltaListNumber, lastPage, &listsPacked);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result, "failed to pack index page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    result = writeVolumePage(&volume->volumeStore,
Packit Service 310c69
                             physicalPage + indexPageNumber,
Packit Service 310c69
                             &volume->scratchPage);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result,
Packit Service 310c69
                                       "failed to write chapter index page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (pages != NULL) {
Packit Service 310c69
      memcpy(pages[indexPageNumber], getPageData(&volume->scratchPage),
Packit Service 310c69
             geometry->bytesPerPage);
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Tell the index page map the list number of the last delta list that was
Packit Service 310c69
    // packed into the index page.
Packit Service 310c69
    if (listsPacked == 0) {
Packit Service 310c69
      logDebug("no delta lists packed on chapter %u page %u",
Packit Service 310c69
               physicalChapterNumber, indexPageNumber);
Packit Service 310c69
    } else {
Packit Service 310c69
      deltaListNumber += listsPacked;
Packit Service 310c69
    }
Packit Service 310c69
    result = updateIndexPageMap(volume->indexPageMap,
Packit Service 310c69
                                chapterIndex->virtualChapterNumber,
Packit Service 310c69
                                physicalChapterNumber,
Packit Service 310c69
                                indexPageNumber, deltaListNumber - 1);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logErrorWithStringError(result,
Packit Service 310c69
                                     "failed to update index page map");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Donate the page data for the index page to the page cache.
Packit Service 310c69
    lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
    result = donateIndexPageLocked(volume, physicalChapterNumber,
Packit Service 310c69
                                   indexPageNumber, &volume->scratchPage);
Packit Service 310c69
    unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int writeRecordPages(Volume                *volume,
Packit Service 310c69
                     int                    physicalPage,
Packit Service 310c69
                     const UdsChunkRecord   records[],
Packit Service 310c69
                     byte                 **pages)
Packit Service 310c69
{
Packit Service 310c69
  Geometry *geometry = volume->geometry;
Packit Service 310c69
  // Skip over the index pages, which come before the record pages
Packit Service 310c69
  physicalPage += geometry->indexPagesPerChapter;
Packit Service 310c69
  // The record array from the open chapter is 1-based.
Packit Service 310c69
  const UdsChunkRecord *nextRecord = &records[1];
Packit Service 310c69
Packit Service 310c69
  unsigned int recordPageNumber;
Packit Service 310c69
  for (recordPageNumber = 0;
Packit Service 310c69
       recordPageNumber < geometry->recordPagesPerChapter;
Packit Service 310c69
       recordPageNumber++) {
Packit Service 310c69
    int result = prepareToWriteVolumePage(&volume->volumeStore,
Packit Service 310c69
                                          physicalPage + recordPageNumber,
Packit Service 310c69
                                          &volume->scratchPage);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result,
Packit Service 310c69
                                       "failed to prepare record page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Sort the next page of records and copy them to the record page as a
Packit Service 310c69
    // binary tree stored in heap order.
Packit Service 310c69
    result = encodeRecordPage(volume, nextRecord,
Packit Service 310c69
                              getPageData(&volume->scratchPage));
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result,
Packit Service 310c69
                                       "failed to encode record page %u",
Packit Service 310c69
                                       recordPageNumber);
Packit Service 310c69
    }
Packit Service 310c69
    nextRecord += geometry->recordsPerPage;
Packit Service 310c69
Packit Service 310c69
    result = writeVolumePage(&volume->volumeStore,
Packit Service 310c69
                             physicalPage + recordPageNumber,
Packit Service 310c69
                             &volume->scratchPage);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return logWarningWithStringError(result,
Packit Service 310c69
                                       "failed to write chapter record page");
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (pages != NULL) {
Packit Service 310c69
      memcpy(pages[recordPageNumber], getPageData(&volume->scratchPage),
Packit Service 310c69
             geometry->bytesPerPage);
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int writeChapter(Volume                 *volume,
Packit Service 310c69
                 OpenChapterIndex       *chapterIndex,
Packit Service 310c69
                 const UdsChunkRecord    records[])
Packit Service 310c69
{
Packit Service 310c69
  // Determine the position of the virtual chapter in the volume file.
Packit Service 310c69
  Geometry *geometry = volume->geometry;
Packit Service 310c69
  unsigned int physicalChapterNumber
Packit Service 310c69
    = mapToPhysicalChapter(geometry, chapterIndex->virtualChapterNumber);
Packit Service 310c69
  int physicalPage = mapToPhysicalPage(geometry, physicalChapterNumber, 0);
Packit Service 310c69
Packit Service 310c69
  // Pack and write the delta chapter index pages to the volume.
Packit Service 310c69
  int result = writeIndexPages(volume, physicalPage, chapterIndex, NULL);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  // Sort and write the record pages to the volume.
Packit Service 310c69
  result = writeRecordPages(volume, physicalPage, records, NULL);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  releaseVolumePage(&volume->scratchPage);
Packit Service 310c69
  // Flush the data to permanent storage.
Packit Service 310c69
  return syncVolumeStore(&volume->volumeStore);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
size_t getCacheSize(Volume *volume)
Packit Service 310c69
{
Packit Service 310c69
  size_t size = getPageCacheSize(volume->pageCache);
Packit Service 310c69
  if (isSparse(volume->geometry)) {
Packit Service 310c69
    size += getSparseCacheMemorySize(volume->sparseCache);
Packit Service 310c69
  }
Packit Service 310c69
  return size;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int probeChapter(Volume       *volume,
Packit Service 310c69
                        unsigned int  chapterNumber,
Packit Service 310c69
                        uint64_t     *virtualChapterNumber)
Packit Service 310c69
{
Packit Service 310c69
  const Geometry *geometry = volume->geometry;
Packit Service 310c69
  unsigned int expectedListNumber = 0;
Packit Service 310c69
  uint64_t lastVCN = UINT64_MAX;
Packit Service 310c69
Packit Service 310c69
  prefetchVolumePages(&volume->volumeStore,
Packit Service 310c69
                      mapToPhysicalPage(geometry, chapterNumber, 0),
Packit Service 310c69
                      geometry->indexPagesPerChapter);
Packit Service 310c69
Packit Service 310c69
  unsigned int i;
Packit Service 310c69
  for (i = 0; i < geometry->indexPagesPerChapter; ++i) {
Packit Service 310c69
    DeltaIndexPage *page;
Packit Service 310c69
    int result = getPage(volume, chapterNumber, i, CACHE_PROBE_INDEX_FIRST,
Packit Service 310c69
                         NULL, &page);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    uint64_t vcn = page->virtualChapterNumber;
Packit Service 310c69
    if (lastVCN == UINT64_MAX) {
Packit Service 310c69
      lastVCN = vcn;
Packit Service 310c69
    } else if (vcn != lastVCN) {
Packit Service 310c69
      logError("inconsistent chapter %u index page %u: expected vcn %"
Packit Service 310c69
               PRIu64 ", got vcn %llu",
Packit Service 310c69
               chapterNumber, i, lastVCN, vcn);
Packit Service 310c69
      return UDS_CORRUPT_COMPONENT;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    if (expectedListNumber != page->lowestListNumber) {
Packit Service 310c69
      logError("inconsistent chapter %u index page %u: expected list number %u"
Packit Service 310c69
               ", got list number %u",
Packit Service 310c69
               chapterNumber, i, expectedListNumber, page->lowestListNumber);
Packit Service 310c69
      return UDS_CORRUPT_COMPONENT;
Packit Service 310c69
    }
Packit Service 310c69
    expectedListNumber = page->highestListNumber + 1;
Packit Service 310c69
Packit Service 310c69
    result = validateChapterIndexPage(page, geometry);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (lastVCN == UINT64_MAX) {
Packit Service 310c69
    logError("no chapter %u virtual chapter number determined", chapterNumber);
Packit Service 310c69
    return UDS_CORRUPT_COMPONENT;
Packit Service 310c69
  }
Packit Service 310c69
  if (chapterNumber != lastVCN % geometry->chaptersPerVolume) {
Packit Service 310c69
    logError("chapter %u vcn %llu is out of phase (%u)",
Packit Service 310c69
             chapterNumber, lastVCN, geometry->chaptersPerVolume);
Packit Service 310c69
    return UDS_CORRUPT_COMPONENT;
Packit Service 310c69
  }
Packit Service 310c69
  *virtualChapterNumber = lastVCN;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int probeWrapper(void         *aux,
Packit Service 310c69
                        unsigned int  chapterNumber,
Packit Service 310c69
                        uint64_t     *virtualChapterNumber)
Packit Service 310c69
{
Packit Service 310c69
  Volume *volume = aux;
Packit Service 310c69
  int result = probeChapter(volume, chapterNumber, virtualChapterNumber);
Packit Service 310c69
  if ((result == UDS_CORRUPT_COMPONENT) || (result == UDS_CORRUPT_DATA)) {
Packit Service 310c69
    *virtualChapterNumber = UINT64_MAX;
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static int findRealEndOfVolume(Volume       *volume,
Packit Service 310c69
                               unsigned int  limit,
Packit Service 310c69
                               unsigned int *limitPtr)
Packit Service 310c69
{
Packit Service 310c69
  /*
Packit Service 310c69
   * Start checking from the end of the volume. As long as we hit corrupt
Packit Service 310c69
   * data, start skipping larger and larger amounts until we find real data.
Packit Service 310c69
   * If we find real data, reduce the span and try again until we find
Packit Service 310c69
   * the exact boundary.
Packit Service 310c69
   */
Packit Service 310c69
  unsigned int span = 1;
Packit Service 310c69
  unsigned int tries = 0;
Packit Service 310c69
  while (limit > 0) {
Packit Service 310c69
    unsigned int chapter = (span > limit) ? 0 : limit - span;
Packit Service 310c69
    uint64_t vcn = 0;
Packit Service 310c69
    int result = probeChapter(volume, chapter, &vcn;;
Packit Service 310c69
    if (result == UDS_SUCCESS) {
Packit Service 310c69
      if (span == 1) {
Packit Service 310c69
        break;
Packit Service 310c69
      }
Packit Service 310c69
      span /= 2;
Packit Service 310c69
      tries = 0;
Packit Service 310c69
    } else if (result == UDS_CORRUPT_COMPONENT) {
Packit Service 310c69
      limit = chapter;
Packit Service 310c69
      if (++tries > 1) {
Packit Service 310c69
        span *= 2;
Packit Service 310c69
      }
Packit Service 310c69
    } else {
Packit Service 310c69
      return logErrorWithStringError(result, "cannot determine end of volume");
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (limitPtr != NULL) {
Packit Service 310c69
    *limitPtr = limit;
Packit Service 310c69
  }
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int findVolumeChapterBoundaries(Volume   *volume,
Packit Service 310c69
                                uint64_t *lowestVCN,
Packit Service 310c69
                                uint64_t *highestVCN,
Packit Service 310c69
                                bool     *isEmpty)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int chapterLimit = volume->geometry->chaptersPerVolume;
Packit Service 310c69
Packit Service 310c69
  int result = findRealEndOfVolume(volume, chapterLimit, &chapterLimit);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return logErrorWithStringError(result, "cannot find end of volume");
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (chapterLimit == 0) {
Packit Service 310c69
    *lowestVCN = 0;
Packit Service 310c69
    *highestVCN = 0;
Packit Service 310c69
    *isEmpty = true;
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *isEmpty = false;
Packit Service 310c69
  return findVolumeChapterBoundariesImpl(chapterLimit, MAX_BAD_CHAPTERS,
Packit Service 310c69
                                         lowestVCN, highestVCN, probeWrapper,
Packit Service 310c69
                                         volume);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int findVolumeChapterBoundariesImpl(unsigned int  chapterLimit,
Packit Service 310c69
                                    unsigned int  maxBadChapters,
Packit Service 310c69
                                    uint64_t     *lowestVCN,
Packit Service 310c69
                                    uint64_t     *highestVCN,
Packit Service 310c69
                                    int (*probeFunc)(void         *aux,
Packit Service 310c69
                                                     unsigned int  chapter,
Packit Service 310c69
                                                     uint64_t     *vcn),
Packit Service 310c69
                                    void *aux)
Packit Service 310c69
{
Packit Service 310c69
  if (chapterLimit == 0) {
Packit Service 310c69
    *lowestVCN = 0;
Packit Service 310c69
    *highestVCN = 0;
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * This method assumes there is at most one run of contiguous bad chapters
Packit Service 310c69
   * caused by unflushed writes. Either the bad spot is at the beginning and
Packit Service 310c69
   * end, or somewhere in the middle. Wherever it is, the highest and lowest
Packit Service 310c69
   * VCNs are adjacent to it. Otherwise the volume is cleanly saved and
Packit Service 310c69
   * somewhere in the middle of it the highest VCN immediately preceeds the
Packit Service 310c69
   * lowest one.
Packit Service 310c69
   */
Packit Service 310c69
Packit Service 310c69
  uint64_t firstVCN = UINT64_MAX;
Packit Service 310c69
Packit Service 310c69
  // doesn't matter if this results in a bad spot (UINT64_MAX)
Packit Service 310c69
  int result = (*probeFunc)(aux, 0, &firstVCN);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Binary search for end of the discontinuity in the monotonically
Packit Service 310c69
   * increasing virtual chapter numbers; bad spots are treated as a span of
Packit Service 310c69
   * UINT64_MAX values. In effect we're searching for the index of the
Packit Service 310c69
   * smallest value less than firstVCN. In the case we go off the end it means
Packit Service 310c69
   * that chapter 0 has the lowest vcn.
Packit Service 310c69
   */
Packit Service 310c69
Packit Service 310c69
  unsigned int leftChapter = 0;
Packit Service 310c69
  unsigned int rightChapter = chapterLimit;
Packit Service 310c69
Packit Service 310c69
  while (leftChapter < rightChapter) {
Packit Service 310c69
    unsigned int chapter = (leftChapter + rightChapter) / 2;
Packit Service 310c69
    uint64_t probeVCN;
Packit Service 310c69
Packit Service 310c69
    result = (*probeFunc)(aux, chapter, &probeVCN);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
    if (firstVCN <= probeVCN) {
Packit Service 310c69
      leftChapter = chapter + 1;
Packit Service 310c69
    } else {
Packit Service 310c69
      rightChapter = chapter;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  uint64_t lowest = UINT64_MAX;
Packit Service 310c69
  uint64_t highest = UINT64_MAX;
Packit Service 310c69
Packit Service 310c69
  result = ASSERT(leftChapter == rightChapter, "leftChapter == rightChapter");
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  leftChapter %= chapterLimit;  // in case we're at the end
Packit Service 310c69
Packit Service 310c69
  // At this point, leftChapter is the chapter with the lowest virtual chapter
Packit Service 310c69
  // number.
Packit Service 310c69
Packit Service 310c69
  result = (*probeFunc)(aux, leftChapter, &lowest);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = ASSERT((lowest != UINT64_MAX), "invalid lowest chapter");
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // We now circularly scan backwards, moving over any bad chapters until we
Packit Service 310c69
  // find the chapter with the highest vcn (the first good chapter we
Packit Service 310c69
  // encounter).
Packit Service 310c69
Packit Service 310c69
  unsigned int badChapters = 0;
Packit Service 310c69
Packit Service 310c69
  for (;;) {
Packit Service 310c69
    rightChapter = (rightChapter + chapterLimit - 1) % chapterLimit;
Packit Service 310c69
    result = (*probeFunc)(aux, rightChapter, &highest);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
    if (highest != UINT64_MAX) {
Packit Service 310c69
      break;
Packit Service 310c69
    }
Packit Service 310c69
    if (++badChapters >= maxBadChapters) {
Packit Service 310c69
      logError("too many bad chapters in volume: %u", badChapters);
Packit Service 310c69
      return UDS_CORRUPT_COMPONENT;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *lowestVCN = lowest;
Packit Service 310c69
  *highestVCN = highest;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Allocate a volume.
Packit Service 310c69
 *
Packit Service 310c69
 * @param config            The configuration to use
Packit Service 310c69
 * @param layout            The index layout
Packit Service 310c69
 * @param readQueueMaxSize  The maximum size of the read queue
Packit Service 310c69
 * @param zoneCount         The number of zones to use
Packit Service 310c69
 * @param newVolume         A pointer to hold the new volume
Packit Service 310c69
 *
Packit Service 310c69
 * @return UDS_SUCCESS or an error code
Packit Service 310c69
 **/
Packit Service 310c69
__attribute__((warn_unused_result))
Packit Service 310c69
static int allocateVolume(const Configuration  *config,
Packit Service 310c69
                          IndexLayout          *layout,
Packit Service 310c69
                          unsigned int          readQueueMaxSize,
Packit Service 310c69
                          unsigned int          zoneCount,
Packit Service 310c69
                          Volume              **newVolume)
Packit Service 310c69
{
Packit Service 310c69
  Volume *volume;
Packit Service 310c69
  int result = ALLOCATE(1, Volume, "volume", &volume);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  volume->nonce = getVolumeNonce(layout);
Packit Service 310c69
  // It is safe to call freeVolume now to clean up and close the volume
Packit Service 310c69
Packit Service 310c69
  result = copyGeometry(config->geometry, &volume->geometry);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return logWarningWithStringError(result,
Packit Service 310c69
                                     "failed to allocate geometry: error");
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Need a buffer for each entry in the page cache
Packit Service 310c69
  unsigned int reservedBuffers
Packit Service 310c69
    = config->cacheChapters * config->geometry->recordPagesPerChapter;
Packit Service 310c69
  // And a buffer for the chapter writer
Packit Service 310c69
  reservedBuffers += 1;
Packit Service 310c69
  // And a buffer for each entry in the sparse cache
Packit Service 310c69
  if (isSparse(volume->geometry)) {
Packit Service 310c69
    reservedBuffers
Packit Service 310c69
      += config->cacheChapters * config->geometry->indexPagesPerChapter;
Packit Service 310c69
  }
Packit Service 310c69
  result = openVolumeStore(&volume->volumeStore, layout, reservedBuffers,
Packit Service 310c69
                           config->geometry->bytesPerPage);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  result = initializeVolumePage(config->geometry, &volume->scratchPage);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = makeRadixSorter(config->geometry->recordsPerPage,
Packit Service 310c69
                           &volume->radixSorter);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = ALLOCATE(config->geometry->recordsPerPage, const UdsChunkRecord *,
Packit Service 310c69
                    "record pointers", &volume->recordPointers);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (isSparse(volume->geometry)) {
Packit Service 310c69
    result = makeSparseCache(volume->geometry, config->cacheChapters,
Packit Service 310c69
                             zoneCount, &volume->sparseCache);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      freeVolume(volume);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
  result = makePageCache(volume->geometry, config->cacheChapters,
Packit Service 310c69
                         readQueueMaxSize, zoneCount, &volume->pageCache);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  result = makeIndexPageMap(volume->geometry, &volume->indexPageMap);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *newVolume = volume;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeVolume(const Configuration          *config,
Packit Service 310c69
               IndexLayout                  *layout,
Packit Service 310c69
               const struct uds_parameters  *userParams,
Packit Service 310c69
               unsigned int                  readQueueMaxSize,
Packit Service 310c69
               unsigned int                  zoneCount,
Packit Service 310c69
               Volume                      **newVolume)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int volumeReadThreads = getReadThreads(userParams);
Packit Service 310c69
Packit Service 310c69
  if (readQueueMaxSize <= volumeReadThreads) {
Packit Service 310c69
    logError("Number of read threads must be smaller than read queue");
Packit Service 310c69
    return UDS_INVALID_ARGUMENT;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  Volume *volume = NULL;
Packit Service 310c69
  int result = allocateVolume(config, layout, readQueueMaxSize, zoneCount,
Packit Service 310c69
                              &volume);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  result = initMutex(&volume->readThreadsMutex);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  result = initCond(&volume->readThreadsReadDoneCond);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  result = initCond(&volume->readThreadsCond);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Start the reader threads.  If this allocation succeeds, freeVolume knows
Packit Service 310c69
  // that it needs to try and stop those threads.
Packit Service 310c69
  result = ALLOCATE(volumeReadThreads, Thread, "reader threads",
Packit Service 310c69
                    &volume->readerThreads);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeVolume(volume);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
  unsigned int i;
Packit Service 310c69
  for (i = 0; i < volumeReadThreads; i++) {
Packit Service 310c69
    result = createThread(readThreadFunction, (void *) volume, "reader",
Packit Service 310c69
                          &volume->readerThreads[i]);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      freeVolume(volume);
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
    // We only stop as many threads as actually got started.
Packit Service 310c69
    volume->numReadThreads = i + 1;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *newVolume = volume;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freeVolume(Volume *volume)
Packit Service 310c69
{
Packit Service 310c69
  if (volume == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // If readerThreads is NULL, then we haven't set up the reader threads.
Packit Service 310c69
  if (volume->readerThreads != NULL) {
Packit Service 310c69
    // Stop the reader threads.  It is ok if there aren't any of them.
Packit Service 310c69
    lockMutex(&volume->readThreadsMutex);
Packit Service 310c69
    volume->readerState |= READER_STATE_EXIT;
Packit Service 310c69
    broadcastCond(&volume->readThreadsCond);
Packit Service 310c69
    unlockMutex(&volume->readThreadsMutex);
Packit Service 310c69
    unsigned int i;
Packit Service 310c69
    for (i = 0; i < volume->numReadThreads; i++) {
Packit Service 310c69
      joinThreads(volume->readerThreads[i]);
Packit Service 310c69
    }
Packit Service 310c69
    FREE(volume->readerThreads);
Packit Service 310c69
    volume->readerThreads = NULL;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Must close the volume store AFTER freeing the scratch page and the caches
Packit Service 310c69
  destroyVolumePage(&volume->scratchPage);
Packit Service 310c69
  freePageCache(volume->pageCache);
Packit Service 310c69
  freeSparseCache(volume->sparseCache);
Packit Service 310c69
  closeVolumeStore(&volume->volumeStore);
Packit Service 310c69
Packit Service 310c69
  destroyCond(&volume->readThreadsCond);
Packit Service 310c69
  destroyCond(&volume->readThreadsReadDoneCond);
Packit Service 310c69
  destroyMutex(&volume->readThreadsMutex);
Packit Service 310c69
  freeIndexPageMap(volume->indexPageMap);
Packit Service 310c69
  freeRadixSorter(volume->radixSorter);
Packit Service 310c69
  FREE(volume->geometry);
Packit Service 310c69
  FREE(volume->recordPointers);
Packit Service 310c69
  FREE(volume);
Packit Service 310c69
}