Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/pageCache.c#6 $
 */

#include "pageCache.h"

#include "atomicDefs.h"
#include "cacheCounters.h"
#include "chapterIndex.h"
#include "compiler.h"
#include "errors.h"
#include "geometry.h"
#include "hashUtils.h"
#include "indexConfig.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "permassert.h"
#include "recordPage.h"
#include "stringUtils.h"
#include "threads.h"
#include "zone.h"

/**********************************************************************/
int assertPageInCache(PageCache *cache, CachedPage *page)
{
  int result = ASSERT((page->cp_physicalPage < cache->numIndexEntries),
                      "physicalPage %u is valid (< %u)",
                      page->cp_physicalPage, cache->numIndexEntries);
  if (result != UDS_SUCCESS) {
    return result;
  }

  uint16_t pageIndex = cache->index[page->cp_physicalPage];
  return ASSERT((pageIndex < cache->numCacheEntries)
                && (&cache->cache[pageIndex] == page),
                "page is at expected location in cache");
}

/**
 * Clear a cache page.  Note: this does not clear readPending - a read could
 * still be pending and the read thread needs to be able to proceed and restart
 * the requests regardless. This page will still be marked invalid, but it
 * won't get reused (see getLeastRecentPage()) until the readPending flag
 * is cleared. This is a valid case, e.g. the chapter gets forgotten and
 * replaced with a new one in LRU.  Restarting the requests will lead them to
 * not find the records in the MI.
 *
 * @param cache   the cache
 * @param page    the cached page to clear
 *
 **/
static void clearPage(PageCache *cache, CachedPage *page)
{
  page->cp_physicalPage = cache->numIndexEntries;
  WRITE_ONCE(page->cp_lastUsed, 0);
}

/**
 * Get a page from the cache, but with no stats
 *
 * @param cache        the cache
 * @param physicalPage the physical page to get
 * @param queueIndex   the index of the page in the read queue if
 *                     queued, -1 otherwise
 * @param pagePtr      a pointer to hold the page
 *
 * @return UDS_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int getPageNoStats(PageCache     *cache,
                          unsigned int   physicalPage,
                          int           *queueIndex,
                          CachedPage   **pagePtr)
{
  /*
   * ASSERTION: We are either a zone thread holding a searchPendingCounter,
   *            or we are any thread holding the readThreadsMutex.
   *
   * Holding only a searchPendingCounter is the most frequent case.
   */

  int result = ASSERT((physicalPage < cache->numIndexEntries),
                      "physical page %u is invalid", physicalPage);
  if (result != UDS_SUCCESS) {
    return result;
  }

  /*
   * It would be unlikely that the compiler turns the usage of indexValue into
   * two reads of cache->index, but it would be possible and very bad if those
   * reads did not return the same bits.
   */
  uint16_t indexValue = READ_ONCE(cache->index[physicalPage]);
  bool     queued     = (indexValue & VOLUME_CACHE_QUEUED_FLAG) != 0;
  uint16_t index      = indexValue & ~VOLUME_CACHE_QUEUED_FLAG;

  if (!queued && (index < cache->numCacheEntries)) {
    *pagePtr = &cache->cache[index];
    /*
     * We have acquired access to the cached page, but unless we hold the
     * readThreadsMutex, we need a read memory barrier now.  The corresponding
     * write memory barrier is in putPageInCache.
     */
    smp_rmb();
  } else {
    *pagePtr = NULL;
  }
  if (queueIndex != NULL) {
    *queueIndex = queued ? index : -1;
  }
  return UDS_SUCCESS;
}

/**
 * Wait for all pending searches on a page in the cache to complete
 *
 * @param cache         the page cache
 * @param physicalPage  the page to check searches on
 **/
static void waitForPendingSearches(PageCache *cache, unsigned int physicalPage)
{
  /*
   * We hold the readThreadsMutex.  We are waiting for threads that do not hold
   * the readThreadsMutex.  Those threads have "locked" their targeted page by
   * setting the searchPendingCounter.  The corresponding write memory barrier
   * is in beginPendingSearch.
   */
  smp_mb();

  InvalidateCounter initialCounters[MAX_ZONES];
  unsigned int i;
  for (i = 0; i < cache->zoneCount; i++) {
    initialCounters[i] = getInvalidateCounter(cache, i);
  }
  for (i = 0; i < cache->zoneCount; i++) {
    if (searchPending(initialCounters[i])
        && (pageBeingSearched(initialCounters[i]) == physicalPage)) {
      // There is an active search using the physical page.
      // We need to wait for the search to finish.
      while (initialCounters[i] == getInvalidateCounter(cache, i)) {
        yieldScheduler();
      }
    }
  }
}

/**
 * Invalidate a cache page
 *
 * @param cache   the cache
 * @param page    the cached page
 * @param reason  the reason for invalidation, for stats
 *
 * @return UDS_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int invalidatePageInCache(PageCache          *cache,
                                 CachedPage         *page,
                                 InvalidationReason  reason)
{
  // We hold the readThreadsMutex.
  if (page == NULL) {
    return UDS_SUCCESS;
  }

  if (page->cp_physicalPage != cache->numIndexEntries) {
    switch (reason) {
    case INVALIDATION_EVICT:
      cache->counters.evictions++;
      break;
    case INVALIDATION_EXPIRE:
      cache->counters.expirations++;
      break;
    default:
      break;
    }

    if (reason != INVALIDATION_ERROR) {
      int result = assertPageInCache(cache, page);
      if (result != UDS_SUCCESS) {
        return result;
      }
    }

    WRITE_ONCE(cache->index[page->cp_physicalPage], cache->numCacheEntries);
    waitForPendingSearches(cache, page->cp_physicalPage);
  }

  clearPage(cache, page);

  return UDS_SUCCESS;
}

/**********************************************************************/
int findInvalidateAndMakeLeastRecent(PageCache          *cache,
                                     unsigned int        physicalPage,
                                     QueuedRead         *readQueue,
                                     InvalidationReason  reason,
                                     bool                mustFind)
{
  // We hold the readThreadsMutex.
  if (cache == NULL) {
    return UDS_SUCCESS;
  }

  CachedPage *page;
  int queuedIndex = -1;
  int result
    = getPageNoStats(cache, physicalPage,
                     ((readQueue != NULL) ? &queuedIndex : NULL), &page);
  if (result != UDS_SUCCESS) {
    return result;
  }

  if (page == NULL) {
    result = ASSERT(!mustFind, "found page");
    if (result != UDS_SUCCESS) {
      return result;
    }

    if (queuedIndex > -1) {
      logDebug("setting pending read to invalid");
      readQueue[queuedIndex].invalid = true;
    }
    return UDS_SUCCESS;
  }

  // Invalidate the page and unmap it from the cache.
  result = invalidatePageInCache(cache, page, reason);
  if (result != UDS_SUCCESS) {
    return result;
  }

  // Move the cached page to the least recently used end of the list
  // so it will be replaced before any page with valid data.
  WRITE_ONCE(page->cp_lastUsed, 0);

  return UDS_SUCCESS;
}

/**********************************************************************/
__attribute__((warn_unused_result))
static int initializePageCache(PageCache      *cache,
                               const Geometry *geometry,
                               unsigned int    chaptersInCache,
                               unsigned int    readQueueMaxSize,
                               unsigned int    zoneCount)
{
  cache->geometry  = geometry;
  cache->numIndexEntries = geometry->pagesPerVolume + 1;
  cache->numCacheEntries = chaptersInCache * geometry->recordPagesPerChapter;
  cache->readQueueMaxSize = readQueueMaxSize;
  cache->zoneCount = zoneCount;
  atomic64_set(&cache->clock, 1);

  int result = ALLOCATE(readQueueMaxSize, QueuedRead,
                        "volume read queue", &cache->readQueue);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ALLOCATE(cache->zoneCount, SearchPendingCounter,
                    "Volume Cache Zones", &cache->searchPendingCounters);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ASSERT((cache->numCacheEntries <= VOLUME_CACHE_MAX_ENTRIES),
                  "requested cache size, %u, within limit %u",
                  cache->numCacheEntries, VOLUME_CACHE_MAX_ENTRIES);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ALLOCATE(cache->numIndexEntries, uint16_t, "page cache index",
                    &cache->index);
  if (result != UDS_SUCCESS) {
    return result;
  }

  // Initialize index values to invalid values.
  unsigned int i;
  for (i = 0; i < cache->numIndexEntries; i++) {
    cache->index[i] = cache->numCacheEntries;
  }

  result = ALLOCATE(cache->numCacheEntries, CachedPage,
                    "page cache cache", &cache->cache);
  if (result != UDS_SUCCESS) {
    return result;
  }

  for (i = 0; i < cache->numCacheEntries; i++) {
    CachedPage *page = &cache->cache[i];
    result = initializeVolumePage(geometry, &page->cp_pageData);
    if (result != UDS_SUCCESS) {
      return result;
    }
    clearPage(cache, page);
  }

  return UDS_SUCCESS;
}

/*********************************************************************/
int makePageCache(const Geometry  *geometry,
                  unsigned int     chaptersInCache,
                  unsigned int     readQueueMaxSize,
                  unsigned int     zoneCount,
                  PageCache      **cachePtr)
{
  if (chaptersInCache < 1) {
    return logWarningWithStringError(UDS_BAD_STATE,
                                     "cache size must be"
                                     " at least one chapter");
  }
  if (readQueueMaxSize <= 0) {
    return logWarningWithStringError(UDS_INVALID_ARGUMENT,
                                     "read queue max size must be"
                                     " greater than 0");
  }
  if (zoneCount < 1) {
    return logWarningWithStringError(UDS_INVALID_ARGUMENT,
                                     "cache must have at least one zone");
  }

  PageCache *cache;
  int result = ALLOCATE(1, PageCache, "volume cache", &cache);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = initializePageCache(cache, geometry, chaptersInCache,
                               readQueueMaxSize, zoneCount);
  if (result != UDS_SUCCESS) {
    freePageCache(cache);
    return result;
  }

  *cachePtr = cache;
  return UDS_SUCCESS;
}

/**********************************************************************/
void freePageCache(PageCache *cache)
{
  if (cache == NULL) {
    return;
  }
  if (cache->cache != NULL) {
    unsigned int i;
    for (i = 0; i < cache->numCacheEntries; i++) {
      destroyVolumePage(&cache->cache[i].cp_pageData);
    }
  }
  FREE(cache->index);
  FREE(cache->cache);
  FREE(cache->searchPendingCounters);
  FREE(cache->readQueue);
  FREE(cache);
}

/**********************************************************************/
int invalidatePageCacheForChapter(PageCache          *cache,
                                  unsigned int        chapter,
                                  unsigned int        pagesPerChapter,
                                  InvalidationReason  reason)
{
  // We hold the readThreadsMutex.
  if ((cache == NULL) || (cache->cache == NULL)) {
    return UDS_SUCCESS;
  }

  int result;
  unsigned int i;
  for (i = 0; i < pagesPerChapter; i++) {
    unsigned int physicalPage = 1 + (pagesPerChapter * chapter) + i;
    result = findInvalidateAndMakeLeastRecent(cache, physicalPage,
                                              cache->readQueue, reason, false);
    if (result != UDS_SUCCESS) {
      return result;
    }
  }

  return UDS_SUCCESS;
}

/*********************************************************************/
void makePageMostRecent(PageCache *cache, CachedPage *page)
{
  // ASSERTION: We are either a zone thread holding a searchPendingCounter,
  //            or we are any thread holding the readThreadsMutex.
  if (atomic64_read(&cache->clock) != READ_ONCE(page->cp_lastUsed)) {
    WRITE_ONCE(page->cp_lastUsed, atomic64_inc_return(&cache->clock));
  }
}

/**
 * Get the least recent valid page from the cache.
 *
 * @param cache    the cache
 * @param pagePtr  a pointer to hold the new page (will be set to NULL
 *                 if the page was not found)
 *
 * @return UDS_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int getLeastRecentPage(PageCache *cache, CachedPage **pagePtr)
{
  // We hold the readThreadsMutex.
  int oldestIndex = 0;
  // Our first candidate is any page that does have a pending read.  We ensure
  // above that there are more entries than read threads, so there must be one.
  unsigned int i;
  for (i = 0;; i++) {
    if (i >= cache->numCacheEntries) {
      // This should never happen.
      return ASSERT(false, "oldest page is not NULL");
    }
    if (!cache->cache[i].cp_readPending) {
      oldestIndex = i;
      break;
    }
  }
  // Now find the least recently used page that does not have a pending read.
  for (i = 0; i < cache->numCacheEntries; i++) {
    if (!cache->cache[i].cp_readPending
        && (READ_ONCE(cache->cache[i].cp_lastUsed)
            <= READ_ONCE(cache->cache[oldestIndex].cp_lastUsed))) {
      oldestIndex = i;
    }
  }
  *pagePtr = &cache->cache[oldestIndex];
  return UDS_SUCCESS;
}

/***********************************************************************/
int getPageFromCache(PageCache     *cache,
                     unsigned int   physicalPage,
                     int            probeType,
                     CachedPage   **pagePtr)
{
  // ASSERTION: We are in a zone thread.
  // ASSERTION: We holding a searchPendingCounter or the readThreadsMutex.
  if (cache == NULL) {
    return logWarningWithStringError(UDS_BAD_STATE,
                                     "cannot get page with NULL cache");
  }

  // Get the cache page from the index
  CachedPage *page;
  int queueIndex = -1;
  int result = getPageNoStats(cache, physicalPage, &queueIndex, &page);
  if (result != UDS_SUCCESS) {
    return result;
  }

  CacheResultKind cacheResult = ((page != NULL)
                                 ? CACHE_RESULT_HIT
                                 : ((queueIndex != -1)
                                    ? CACHE_RESULT_QUEUED
                                    : CACHE_RESULT_MISS));
  incrementCacheCounter(&cache->counters, probeType, cacheResult);

  if (pagePtr != NULL) {
    *pagePtr = page;
  }
  return UDS_SUCCESS;
}

/***********************************************************************/
int enqueueRead(PageCache *cache, Request *request, unsigned int physicalPage)
{
  // We hold the readThreadsMutex.
  uint16_t first = cache->readQueueFirst;
  uint16_t last  = cache->readQueueLast;
  uint16_t next  = (last + 1) % cache->readQueueMaxSize;
  uint16_t readQueuePos;

  if ((cache->index[physicalPage] & VOLUME_CACHE_QUEUED_FLAG) == 0) {
    /* Not seen before, add this to the read queue and mark it as queued */
    if (next == first) {
      /* queue is full */
      return UDS_SUCCESS;
    }
    /* fill the read queue entry */
    cache->readQueue[last].physicalPage = physicalPage;
    cache->readQueue[last].invalid = false;

    /* point the cache index to it */
    readQueuePos = last;
    WRITE_ONCE(cache->index[physicalPage],
               readQueuePos | VOLUME_CACHE_QUEUED_FLAG);
    cache->readQueue[readQueuePos].requestList.first = NULL;
    cache->readQueue[readQueuePos].requestList.last = NULL;
    /* bump the last pointer */
    cache->readQueueLast = next;
  } else {
    /* It's already queued, just add on to it */
    readQueuePos = cache->index[physicalPage] & ~VOLUME_CACHE_QUEUED_FLAG;
  }

  int result = ASSERT((readQueuePos < cache->readQueueMaxSize),
                      "queue is not overfull");
  if (result != UDS_SUCCESS) {
    return result;
  }

  request->nextRequest = NULL;
  if (cache->readQueue[readQueuePos].requestList.first == NULL) {
    cache->readQueue[readQueuePos].requestList.first = request;
  } else {
    cache->readQueue[readQueuePos].requestList.last->nextRequest = request;
  }
  cache->readQueue[readQueuePos].requestList.last = request;
  return UDS_QUEUED;
}

/***********************************************************************/
bool reserveReadQueueEntry(PageCache     *cache,
                           unsigned int  *queuePos,
                           Request      **firstRequest,
                           unsigned int  *physicalPage,
                           bool          *invalid)
{
  // We hold the readThreadsMutex.
  uint16_t lastRead = cache->readQueueLastRead;

  // No items to dequeue
  if (lastRead == cache->readQueueLast) {
    return false;
  }

  unsigned int pageNo    = cache->readQueue[lastRead].physicalPage;
  bool         isInvalid = cache->readQueue[lastRead].invalid;

  uint16_t indexValue = cache->index[pageNo];
  bool     queued     = (indexValue & VOLUME_CACHE_QUEUED_FLAG) != 0;

  // ALB-1429 ... need to check to see if its still queued before resetting
  if (isInvalid && queued) {
    // invalidate cache index slot
    WRITE_ONCE(cache->index[pageNo], cache->numCacheEntries);
  }

  // If a sync read has taken this page, set invalid to true so we don't
  // overwrite, we simply just requeue requests.
  if (!queued) {
    isInvalid = true;
  }

  cache->readQueue[lastRead].reserved = true;

  *queuePos                = lastRead;
  *firstRequest            = cache->readQueue[lastRead].requestList.first;
  *physicalPage            = pageNo;
  *invalid                 = isInvalid;
  cache->readQueueLastRead = (lastRead  + 1) % cache->readQueueMaxSize;

  return true;
}

/************************************************************************/
void releaseReadQueueEntry(PageCache *cache, unsigned int queuePos)
{
  // We hold the readThreadsMutex.
  cache->readQueue[queuePos].reserved = false;

  uint16_t lastRead = cache->readQueueLastRead;

  // Move the readQueueFirst pointer along when we can
  while ((cache->readQueueFirst != lastRead)
         && (!cache->readQueue[cache->readQueueFirst].reserved)) {
    cache->readQueueFirst =
      (cache->readQueueFirst + 1) % cache->readQueueMaxSize;
  }
}

/***********************************************************************/
int selectVictimInCache(PageCache   *cache,
                        CachedPage **pagePtr)
{
  // We hold the readThreadsMutex.
  if (cache == NULL) {
    return logWarningWithStringError(UDS_BAD_STATE,
                                     "cannot put page in NULL cache");
  }

  CachedPage *page = NULL;
  int result = getLeastRecentPage(cache, &page);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ASSERT((page != NULL), "least recent page was not NULL");
  if (result != UDS_SUCCESS) {
    return result;
  }

  // If the page is currently being pointed to by the page map, clear
  // it from the page map, and update cache stats
  if (page->cp_physicalPage != cache->numIndexEntries) {
    cache->counters.evictions++;
    WRITE_ONCE(cache->index[page->cp_physicalPage], cache->numCacheEntries);
    waitForPendingSearches(cache, page->cp_physicalPage);
  }

  page->cp_readPending = true;

  *pagePtr = page;

  return UDS_SUCCESS;
}

/***********************************************************************/
int putPageInCache(PageCache    *cache,
                   unsigned int  physicalPage,
                   CachedPage   *page)
{
  // We hold the readThreadsMutex.
  if (cache == NULL) {
    return logWarningWithStringError(UDS_BAD_STATE,
                                     "cannot complete page in NULL cache");
  }

  int result = ASSERT((page != NULL), "page to install exists");
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ASSERT((page->cp_readPending),
                  "page to install has a pending read");
  if (result != UDS_SUCCESS) {
    return result;
  }

  clearPage(cache, page);

  page->cp_physicalPage = physicalPage;

  // Figure out the index into the cache array using pointer arithmetic
  uint16_t value = page - cache->cache;
  result = ASSERT((value < cache->numCacheEntries), "cache index is valid");
  if (result != UDS_SUCCESS) {
    return result;
  }

  makePageMostRecent(cache, page);

  page->cp_readPending = false;

  /*
   * We hold the readThreadsMutex, but we must have a write memory barrier
   * before making the CachedPage available to the readers that do not hold the
   * mutex.  The corresponding read memory barrier is in getPageNoStats.
   */
  smp_wmb();

  // Point the page map to the new page. Will clear queued flag
  WRITE_ONCE(cache->index[physicalPage], value);

  return UDS_SUCCESS;
}

/***********************************************************************/
void cancelPageInCache(PageCache    *cache,
                       unsigned int  physicalPage,
                       CachedPage   *page)
{
  // We hold the readThreadsMutex.
  if (cache == NULL) {
    logWarning("cannot cancel page in NULL cache");
    return;
  }

  int result = ASSERT((page != NULL), "page to install exists");
  if (result != UDS_SUCCESS) {
    return;
  }

  result = ASSERT((page->cp_readPending),
                  "page to install has a pending read");
  if (result != UDS_SUCCESS) {
    return;
  }

  clearPage(cache, page);
  page->cp_readPending = false;

  // Clear the page map for the new page. Will clear queued flag
  WRITE_ONCE(cache->index[physicalPage], cache->numCacheEntries);
}

/**********************************************************************/
size_t getPageCacheSize(PageCache *cache)
{
  if (cache == NULL) {
    return 0;
  }
  return sizeof(DeltaIndexPage) * cache->numCacheEntries;
}