Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/vdoPageCache.c#11 $
 */

#include "vdoPageCacheInternals.h"

#if __KERNEL__
#include <linux/ratelimit.h>
#endif

#include "errors.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "permassert.h"

#include "adminState.h"
#include "constants.h"
#include "numUtils.h"
#include "readOnlyNotifier.h"
#include "statusCodes.h"
#include "types.h"
#include "vio.h"

enum {
  LOG_INTERVAL                = 4000,
  DISPLAY_INTERVAL            = 100000,
};

/**********************************************************************/
static char *getPageBuffer(PageInfo *info)
{
  VDOPageCache *cache = info->cache;
  return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
}

/**
 * Allocate components of the cache which require their own allocation. The
 * caller is responsible for all clean up on errors.
 *
 * @param cache     The cache being constructed
 *
 * @return VDO_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int allocateCacheComponents(VDOPageCache *cache)
{
  int result = ALLOCATE(cache->pageCount, PageInfo, "page infos",
                        &cache->infos);
  if (result != UDS_SUCCESS) {
    return result;
  }

  uint64_t size = cache->pageCount * (uint64_t) VDO_BLOCK_SIZE;
  result = allocateMemory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
  if (result != UDS_SUCCESS) {
    return result;
  }

  return makeIntMap(cache->pageCount, 0, &cache->pageMap);
}

/**
 * Initialize all page info structures and put them on the free list.
 *
 * @param cache  The cache to initialize
 *
 * @return VDO_SUCCESS or an error
 **/
static int initializeInfo(VDOPageCache *cache)
{
  initializeRing(&cache->freeList);
  PageInfo *info;
  for (info = cache->infos; info < cache->infos + cache->pageCount; ++info) {
    info->cache = cache;
    info->state = PS_FREE;
    info->pbn   = NO_PAGE;

    if (cache->layer->createMetadataVIO != NULL) {
      int result = createVIO(cache->layer, VIO_TYPE_BLOCK_MAP,
                             VIO_PRIORITY_METADATA, info, getPageBuffer(info),
                             &info->vio);
      if (result != VDO_SUCCESS) {
        return result;
      }

      // The thread ID should never change.
      info->vio->completion.callbackThreadID = cache->zone->threadID;
    }

    initializeRing(&info->listNode);
    pushRingNode(&cache->freeList, &info->listNode);
    initializeRing(&info->lruNode);
  }

  relaxedStore64(&cache->stats.counts.freePages, cache->pageCount);
  return VDO_SUCCESS;
}

/**********************************************************************/
static void writeDirtyPagesCallback(RingNode *node, void *context);

/**********************************************************************/
int makeVDOPageCache(PhysicalLayer         *layer,
                     PageCount              pageCount,
                     VDOPageReadFunction   *readHook,
                     VDOPageWriteFunction  *writeHook,
                     size_t                 pageContextSize,
                     BlockCount             maximumAge,
                     BlockMapZone          *zone,
                     VDOPageCache         **cachePtr)
{
  int result = ASSERT(pageContextSize <= MAX_PAGE_CONTEXT_SIZE,
                      "page context size %zu cannot exceed %u bytes",
                      pageContextSize, MAX_PAGE_CONTEXT_SIZE);
  if (result != VDO_SUCCESS) {
    return result;
  }

  VDOPageCache *cache;
  result = ALLOCATE(1, VDOPageCache, "page cache", &cache);
  if (result != UDS_SUCCESS) {
    return result;
  }

  cache->layer            = layer;
  cache->pageCount        = pageCount;
  cache->readHook         = readHook;
  cache->writeHook        = writeHook;
  cache->zone             = zone;

  result = allocateCacheComponents(cache);
  if (result != VDO_SUCCESS) {
    freeVDOPageCache(&cache);
    return result;
  }

  result = initializeInfo(cache);
  if (result != VDO_SUCCESS) {
    freeVDOPageCache(&cache);
    return result;
  }

  result = makeDirtyLists(maximumAge, writeDirtyPagesCallback, cache,
                          &cache->dirtyLists);
  if (result != VDO_SUCCESS) {
    freeVDOPageCache(&cache);
    return result;
  }

  // initialize empty circular queues
  initializeRing(&cache->lruList);
  initializeRing(&cache->outgoingList);

  *cachePtr = cache;
  return VDO_SUCCESS;
}

/**********************************************************************/
void freeVDOPageCache(VDOPageCache **cachePtr)
{
  VDOPageCache *cache = *cachePtr;
  if (cache == NULL) {
    return;
  }

  if (cache->infos != NULL) {
    PageInfo *info;
    for (info = cache->infos; info < cache->infos + cache->pageCount; ++info) {
      freeVIO(&info->vio);
    }
  }

  freeDirtyLists(&cache->dirtyLists);
  freeIntMap(&cache->pageMap);
  FREE(cache->infos);
  FREE(cache->pages);
  FREE(cache);
  *cachePtr = NULL;
}

/**********************************************************************/
void setVDOPageCacheInitialPeriod(VDOPageCache *cache, SequenceNumber period)
{
  setCurrentPeriod(cache->dirtyLists, period);
}

/**********************************************************************/
void setVDOPageCacheRebuildMode(VDOPageCache *cache, bool rebuilding)
{
  cache->rebuilding = rebuilding;
}

/**
 * Assert that a function has been called on the VDO page cache's thread.
 *
 * @param cache         the page cache
 * @param functionName  the name of the function
 **/
static inline void assertOnCacheThread(VDOPageCache *cache,
                                       const char   *functionName)
{
  ThreadID threadID = getCallbackThreadID();
  ASSERT_LOG_ONLY((threadID == cache->zone->threadID),
                  "%s() must only be called on cache thread %d, not thread %d",
                  functionName, cache->zone->threadID, threadID);
}

/**
 * Assert that a page cache may issue I/O.
 *
 * @param cache  the page cache
 **/
static inline void assertIOAllowed(VDOPageCache *cache)
{
  ASSERT_LOG_ONLY(!isQuiescent(&cache->zone->state),
                  "VDO page cache may issue I/O");
}

/**
 * Log and, if enabled, report cache pressure.
 *
 * @param cache         the page cache
 **/
static void reportCachePressure(VDOPageCache *cache)
{
  relaxedAdd64(&cache->stats.cachePressure, 1);
  if (cache->waiterCount > cache->pageCount) {
    if ((cache->pressureReport % LOG_INTERVAL) == 0) {
      logInfo("page cache pressure %llu",
              relaxedLoad64(&cache->stats.cachePressure));
    }

    if (++cache->pressureReport >= DISPLAY_INTERVAL) {
      cache->pressureReport = 0;
    }
  }
}

/**********************************************************************/
const char *vpcPageStateName(PageState state)
{
  static const char *stateNames[] = {
    "FREE",
    "INCOMING",
    "FAILED",
    "RESIDENT",
    "DIRTY",
    "OUTGOING"
  };
  STATIC_ASSERT(COUNT_OF(stateNames) == PAGE_STATE_COUNT);

  int result = ASSERT(state < COUNT_OF(stateNames),
                      "Unknown PageState value %d", state);
  if (result != UDS_SUCCESS) {
    return "[UNKNOWN PAGE STATE]";
  }

  return stateNames[state];
}

/**
 * Update the counter associated with a given state.
 *
 * @param info   the page info to count
 * @param delta  the delta to apply to the counter
 **/
static void updateCounter(PageInfo *info, int32_t delta)
{
  VDOPageCache *cache = info->cache;
  switch (info->state) {
    case PS_FREE:
      relaxedAdd64(&cache->stats.counts.freePages, delta);
      return;

    case PS_INCOMING:
      relaxedAdd64(&cache->stats.counts.incomingPages, delta);
      return;

    case PS_OUTGOING:
      relaxedAdd64(&cache->stats.counts.outgoingPages, delta);
      return;

    case PS_FAILED:
      relaxedAdd64(&cache->stats.counts.failedPages, delta);
      return;

    case PS_RESIDENT:
      relaxedAdd64(&cache->stats.counts.cleanPages, delta);
      return;

    case PS_DIRTY:
      relaxedAdd64(&cache->stats.counts.dirtyPages, delta);
      return;

    default:
      return;
  }
}

/**
 * Update the lru information for an active page.
 **/
static void updateLru(PageInfo *info)
{
  VDOPageCache *cache = info->cache;

  if (cache->lruList.prev != &info->lruNode) {
    pushRingNode(&cache->lruList, &info->lruNode);
  }
}

/**
 * Set the state of a PageInfo and put it on the right list, adjusting
 * counters.
 *
 * @param info      the PageInfo to modify
 * @param newState  the new state for the PageInfo
 **/
static void setInfoState(PageInfo *info, PageState newState)
{
  if (newState == info->state) {
    return;
  }

  updateCounter(info, -1);
  info->state = newState;
  updateCounter(info, 1);

  switch (info->state) {
  case PS_FREE:
  case PS_FAILED:
    pushRingNode(&info->cache->freeList, &info->listNode);
    return;

  case PS_OUTGOING:
    pushRingNode(&info->cache->outgoingList, &info->listNode);
    return;

  case PS_DIRTY:
    return;

  default:
    unspliceRingNode(&info->listNode);
  }
}

/**
 * Set the pbn for an info, updating the map as needed.
 *
 * @param info  The page info
 * @param pbn   The physical block number to set
 **/
__attribute__((warn_unused_result))
static int setInfoPBN(PageInfo *info, PhysicalBlockNumber pbn)
{
  VDOPageCache *cache = info->cache;

  // Either the new or the old page number must be NO_PAGE.
  int result = ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
                      "Must free a page before reusing it.");
  if (result != VDO_SUCCESS) {
    return result;
  }

  if (info->pbn != NO_PAGE) {
    intMapRemove(cache->pageMap, info->pbn);
  }

  info->pbn = pbn;

  if (pbn != NO_PAGE) {
    result = intMapPut(cache->pageMap, pbn, info, true, NULL);
    if (result != UDS_SUCCESS) {
      return result;
    }
  }
  return VDO_SUCCESS;
}

/**
 * Reset page info to represent an unallocated page.
 **/
static int resetPageInfo(PageInfo *info)
{
  int result = ASSERT(info->busy == 0, "VDO Page must not be busy");
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = ASSERT(!hasWaiters(&info->waiting),
                  "VDO Page must not have waiters");
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = setInfoPBN(info, NO_PAGE);
  setInfoState(info, PS_FREE);
  unspliceRingNode(&info->lruNode);
  return result;
}

/**
 * Find a free page.
 *
 * @param cache         the page cache
 *
 * @return a pointer to the page info structure (if found), NULL otherwise
 **/
__attribute__((warn_unused_result))
static PageInfo *findFreePage(VDOPageCache *cache)
{
  if (cache->freeList.next == &cache->freeList) {
    return NULL;
  }
  PageInfo *info = pageInfoFromListNode(cache->freeList.next);
  unspliceRingNode(&info->listNode);
  return info;
}

/**********************************************************************/
PageInfo *vpcFindPage(VDOPageCache *cache, PhysicalBlockNumber pbn)
{
  if ((cache->lastFound != NULL)
      && (cache->lastFound->pbn == pbn)) {
    return cache->lastFound;
  }
  cache->lastFound = intMapGet(cache->pageMap, pbn);
  return cache->lastFound;
}

/**
 * Determine which page is least recently used.
 *
 * @param cache         the page cache structure
 *
 * @return a pointer to the info structure for a relevant page,
 *         or NULL if no such page can be found. The page can be
 *         dirty or resident.
 *
 * @note Picks the least recently used from among the non-busy entries
 *       at the front of each of the lru ring.
 *       Since whenever we mark a page busy we also put it to the end
 *       of the ring it is unlikely that the entries at the front
 *       are busy unless the queue is very short, but not impossible.
 **/
__attribute__((warn_unused_result))
static PageInfo *selectLRUPage(VDOPageCache *cache)
{
  PageInfoNode *lru;
  for (lru = cache->lruList.next;
       lru != &cache->lruList;
       lru = lru->next) {
    PageInfo *info = pageInfoFromLRUNode(lru);
    if ((info->busy == 0) && !isInFlight(info)) {
      return info;
    }
  }

  return NULL;
}

/**********************************************************************/
AtomicPageCacheStatistics *getVDOPageCacheStatistics(VDOPageCache *cache)
{
  return &cache->stats;
}

// ASYNCHRONOUS INTERFACE BEYOND THIS POINT

/**
 * Helper to complete the VDO Page Completion request successfully.
 *
 * @param info          the page info representing the result page
 * @param vdoPageComp   the VDO page completion to complete
 **/
static void completeWithPage(PageInfo *info, VDOPageCompletion *vdoPageComp)
{
  bool available = vdoPageComp->writable ? isPresent(info) : isValid(info);
  if (!available) {
    logErrorWithStringError(VDO_BAD_PAGE,
                            "Requested cache page %llu in state %s is"
                            " not %s",
                            info->pbn, vpcPageStateName(info->state),
                            vdoPageComp->writable ? "present" : "valid");
    finishCompletion(&vdoPageComp->completion, VDO_BAD_PAGE);
    return;
  }

  vdoPageComp->info = info;
  vdoPageComp->ready = true;
  finishCompletion(&vdoPageComp->completion, VDO_SUCCESS);
}

/**
 * Complete a page completion with an error code. Implements WaiterCallback.
 *
 * @param waiter        The page completion, as a waiter
 * @param resultPtr     A pointer to the error code.
 **/
static void completeWaiterWithError(Waiter *waiter, void *resultPtr)
{
  int               *result     = resultPtr;
  VDOPageCompletion *completion = pageCompletionFromWaiter(waiter);
  finishCompletion(&completion->completion, *result);
}

/**
 * Complete a queue of VDOPageCompletions with an error code.
 *
 * @param [in]      result      the error result
 * @param [in, out] queue       a pointer to the queue
 *
 * @note upon completion the queue will be empty
 **/
static void distributeErrorOverQueue(int result, WaitQueue *queue)
{
  notifyAllWaiters(queue, completeWaiterWithError, &result);
}

/**
 * Complete a page completion with a page. Implements WaiterCallback.
 *
 * @param waiter        The page completion, as a waiter
 * @param pageInfo      The page info to complete with
 **/
static void completeWaiterWithPage(Waiter *waiter, void *pageInfo)
{
  PageInfo *info = pageInfo;
  VDOPageCompletion *completion = pageCompletionFromWaiter(waiter);
  completeWithPage(info, completion);
}

/**
 * Complete a queue of VDOPageCompletions with a page result.
 *
 * @param [in]      info        the page info describing the page
 * @param [in, out] queue       a pointer to a queue of waiters
 *
 * @return the number of pages distributed
 *
 * @note upon completion the queue will be empty
 *
 **/
static unsigned int distributePageOverQueue(PageInfo *info, WaitQueue *queue)
{
  updateLru(info);

  size_t pages = countWaiters(queue);

  /*
   * Increment the busy count once for each pending completion so that
   * this page does not stop being busy until all completions have
   * been processed (VDO-83).
   */
  info->busy += pages;

  notifyAllWaiters(queue, completeWaiterWithPage, info);
  return pages;
}

/**
 * Set a persistent error which all requests will receive in the future.
 *
 * @param cache         the page cache
 * @param context       a string describing what triggered the error
 * @param result        the error result
 *
 * Once triggered, all enqueued completions will get this error.
 * Any future requests will result in this error as well.
 **/
static void setPersistentError(VDOPageCache *cache,
                               const char   *context,
                               int           result)
{
  // If we're already read-only, there's no need to log.
  ReadOnlyNotifier *notifier = cache->zone->readOnlyNotifier;
  if ((result != VDO_READ_ONLY) && !isReadOnly(notifier)) {
    logErrorWithStringError(result, "VDO Page Cache persistent error: %s",
                            context);
    enterReadOnlyMode(notifier, result);
  }

  assertOnCacheThread(cache, __func__);

  distributeErrorOverQueue(result, &cache->freeWaiters);
  cache->waiterCount = 0;

  PageInfo *info;
  for (info = cache->infos; info < cache->infos + cache->pageCount; ++info) {
    distributeErrorOverQueue(result, &info->waiting);
  }
}

/**********************************************************************/
void initVDOPageCompletion(VDOPageCompletion   *pageCompletion,
                           VDOPageCache        *cache,
                           PhysicalBlockNumber  pbn,
                           bool                 writable,
                           void                *parent,
                           VDOAction           *callback,
                           VDOAction           *errorHandler)
{
  ASSERT_LOG_ONLY((pageCompletion->waiter.nextWaiter == NULL),
                  "New page completion was not already on a wait queue");

  *pageCompletion = (VDOPageCompletion) {
    .pbn      = pbn,
    .writable = writable,
    .cache    = cache,
  };

  VDOCompletion *completion = &pageCompletion->completion;
  initializeCompletion(completion, VDO_PAGE_COMPLETION, cache->layer);
  prepareCompletion(completion, callback, errorHandler, cache->zone->threadID,
                    parent);
}

/**
 * Helper function to check that a completion represents a successfully
 * completed VDO Page Completion referring to a valid page.
 *
 * @param completion    a VDO completion
 * @param writable      whether a writable page is required
 *
 * @return the embedding completion if valid, NULL if not
 **/
__attribute__((warn_unused_result))
static VDOPageCompletion *validateCompletedPage(VDOCompletion *completion,
                                                bool           writable)
{
  VDOPageCompletion *vpc = asVDOPageCompletion(completion);

  int result = ASSERT(vpc->ready, "VDO Page completion not ready");
  if (result != UDS_SUCCESS) {
    return NULL;
  }

  result = ASSERT(vpc->info != NULL, "VDO Page Completion must be complete");
  if (result != UDS_SUCCESS) {
    return NULL;
  }

  result = ASSERT(vpc->info->pbn == vpc->pbn,
                  "VDO Page Completion pbn must be consistent");
  if (result != UDS_SUCCESS) {
    return NULL;
  }

  result = ASSERT(isValid(vpc->info),
                  "VDO Page Completion page must be valid");
  if (result != UDS_SUCCESS) {
    return NULL;
  }

  if (writable) {
    result = ASSERT(vpc->writable, "VDO Page Completion is writable");
    if (result != UDS_SUCCESS) {
      return NULL;
    }
  }

  return vpc;
}

/**********************************************************************/
bool isPageCacheActive(VDOPageCache *cache)
{
  return ((cache->outstandingReads != 0) || (cache->outstandingWrites != 0));
}

/**
 * VIO callback used when a page has been loaded.
 *
 * @param completion  A completion for the VIO, the parent of which is a
 *                    PageInfo.
 **/
static void pageIsLoaded(VDOCompletion *completion)
{
  PageInfo     *info   = completion->parent;
  VDOPageCache *cache  = info->cache;
  assertOnCacheThread(cache, __func__);

  setInfoState(info, PS_RESIDENT);
  distributePageOverQueue(info, &info->waiting);

  /*
   * Don't decrement until right before calling checkForDrainComplete() to
   * ensure that the above work can't cause the page cache to be freed out from
   * under us.
   */
  cache->outstandingReads--;
  checkForDrainComplete(cache->zone);
}

/**
 * Handle page load errors.
 *
 * @param completion  The page read VIO
 **/
static void handleLoadError(VDOCompletion *completion)
{
  int           result = completion->result;
  PageInfo     *info   = completion->parent;
  VDOPageCache *cache  = info->cache;
  assertOnCacheThread(cache, __func__);

  enterReadOnlyMode(cache->zone->readOnlyNotifier, result);
  relaxedAdd64(&cache->stats.failedReads, 1);
  setInfoState(info, PS_FAILED);
  distributeErrorOverQueue(result, &info->waiting);
  resetPageInfo(info);

  /*
   * Don't decrement until right before calling checkForDrainComplete() to
   * ensure that the above work can't cause the page cache to be freed out from
   * under us.
   */
  cache->outstandingReads--;
  checkForDrainComplete(cache->zone);
}

/**
 * Run the read hook after a page is loaded. This callback is registered in
 * launchPageLoad() when there is a read hook.
 *
 * @param completion  The page load completion
 **/
static void runReadHook(VDOCompletion *completion)
{
  PageInfo *info       = completion->parent;
  completion->callback = pageIsLoaded;
  resetCompletion(completion);
  int result = info->cache->readHook(getPageBuffer(info), info->pbn,
                                     info->cache->zone, info->context);
  continueCompletion(completion, result);
}

/**
 * Handle a read error during a read-only rebuild.
 *
 * @param completion  The page load completion
 **/
static void handleRebuildReadError(VDOCompletion *completion)
{
  PageInfo     *info   = completion->parent;
  VDOPageCache *cache  = info->cache;
  assertOnCacheThread(cache, __func__);

  // We are doing a read-only rebuild, so treat this as a successful read
  // of an uninitialized page.
  relaxedAdd64(&cache->stats.failedReads, 1);
  memset(getPageBuffer(info), 0, VDO_BLOCK_SIZE);
  resetCompletion(completion);
  if (cache->readHook != NULL) {
    runReadHook(completion);
  } else {
    pageIsLoaded(completion);
  }
}

/**
 * Begin the process of loading a page.
 *
 * @param info  the page info representing where to load the page
 * @param pbn   the absolute pbn of the desired page
 *
 * @return VDO_SUCCESS or an error code
 **/
__attribute__((warn_unused_result))
static int launchPageLoad(PageInfo *info, PhysicalBlockNumber pbn)
{
  VDOPageCache *cache = info->cache;
  assertIOAllowed(cache);

  int result = setInfoPBN(info, pbn);
  if (result != VDO_SUCCESS) {
    return result;
  }

  result = ASSERT((info->busy == 0), "Page is not busy before loading.");
  if (result != VDO_SUCCESS) {
    return result;
  }

  setInfoState(info, PS_INCOMING);
  cache->outstandingReads++;
  relaxedAdd64(&cache->stats.pagesLoaded, 1);
  launchReadMetadataVIO(info->vio, pbn,
                        (cache->readHook != NULL) ? runReadHook : pageIsLoaded,
                        (cache->rebuilding
                         ? handleRebuildReadError : handleLoadError));
  return VDO_SUCCESS;
}

/**********************************************************************/
static void writePages(VDOCompletion *completion);

/**
 * Handle errors flushing the layer.
 *
 * @param completion  The flush VIO
 **/
static void handleFlushError(VDOCompletion *completion)
{
  VDOPageCache *cache = ((PageInfo *) completion->parent)->cache;
  setPersistentError(cache, "flush failed", completion->result);
  writePages(completion);
}

/**
 * Attempt to save the outgoing pages by first flushing the layer.
 *
 * @param cache  The cache
 **/
static void savePages(VDOPageCache *cache)
{
  if ((cache->pagesInFlush > 0) || (cache->pagesToFlush == 0)) {
    return;
  }

  assertIOAllowed(cache);

  PageInfo *info      = pageInfoFromListNode(cache->outgoingList.next);
  cache->pagesInFlush = cache->pagesToFlush;
  cache->pagesToFlush = 0;
  relaxedAdd64(&cache->stats.flushCount, 1);

  VIO           *vio   = info->vio;
  PhysicalLayer *layer = vio->completion.layer;

  /*
   * We must make sure that the recovery journal entries that changed these
   * pages were successfully persisted, and thus must issue a flush before
   * each batch of pages is written to ensure this. However, in sync mode,
   * every journal block is written with FUA, thus guaranteeing the journal
   * persisted already.
   */
  if (layer->getWritePolicy(layer) != WRITE_POLICY_SYNC) {
    launchFlush(vio, writePages, handleFlushError);
    return;
  }

  writePages(&vio->completion);
}

/**
 * Add a page to the outgoing list of pages waiting to be saved. Once in the
 * list, a page may not be used until it has been written out.
 *
 * @param info  The page to save
 **/
static void schedulePageSave(PageInfo *info)
{
  if (info->busy > 0) {
    info->writeStatus = WRITE_STATUS_DEFERRED;
    return;
  }

  info->cache->pagesToFlush++;
  info->cache->outstandingWrites++;
  setInfoState(info, PS_OUTGOING);
}

/**********************************************************************/
static void writeDirtyPagesCallback(RingNode *expired, void *context)
{
  while (!isRingEmpty(expired)) {
    schedulePageSave(pageInfoFromListNode(chopRingNode(expired)));
  }

  savePages((VDOPageCache *) context);
}

/**
 * Add a page to outgoing pages waiting to be saved, and then start saving
 * pages if another save is not in progress.
 *
 * @param info  The page to save
 **/
static void launchPageSave(PageInfo *info)
{
  schedulePageSave(info);
  savePages(info->cache);
}

/**
 * Determine whether a given VDOPageCompletion (as a waiter) is requesting a
 * given page number. Implements WaiterMatch.
 *
 * @param waiter        The page completion in question
 * @param context       A pointer to the pbn of the desired page
 *
 * @return true if the page completion is for the desired page number
 **/
static bool completionNeedsPage(Waiter *waiter, void *context)
{
  PhysicalBlockNumber *pbn = context;
  return (pageCompletionFromWaiter(waiter)->pbn == *pbn);
}

/**
 * Allocate a free page to the first completion in the waiting queue,
 * and any other completions that match it in page number.
 **/
static void allocateFreePage(PageInfo *info)
{
  VDOPageCache *cache = info->cache;
  assertOnCacheThread(cache, __func__);

  if (!hasWaiters(&cache->freeWaiters)) {
    if (relaxedLoad64(&cache->stats.cachePressure) > 0) {
      logInfo("page cache pressure relieved");
      relaxedStore64(&cache->stats.cachePressure, 0);
    }
    return;
  }

  int result = resetPageInfo(info);
  if (result != VDO_SUCCESS) {
    setPersistentError(cache, "cannot reset page info", result);
    return;
  }

  Waiter *oldestWaiter = getFirstWaiter(&cache->freeWaiters);
  PhysicalBlockNumber pbn = pageCompletionFromWaiter(oldestWaiter)->pbn;

  // Remove all entries which match the page number in question
  // and push them onto the page info's wait queue.
  dequeueMatchingWaiters(&cache->freeWaiters, completionNeedsPage,
                         &pbn, &info->waiting);
  cache->waiterCount -= countWaiters(&info->waiting);

  result = launchPageLoad(info, pbn);
  if (result != VDO_SUCCESS) {
    distributeErrorOverQueue(result, &info->waiting);
  }
}

/**
 * Begin the process of discarding a page.
 *
 * @param cache         the page cache
 *
 * @note If no page is discardable, increments a count of deferred frees so
 *       that the next release of a page which is no longer busy will kick
 *       off another discard cycle. This is an indication that the cache is
 *       not big enough.
 *
 * @note If the selected page is not dirty, immediately allocates the page
 *       to the oldest completion waiting for a free page.
 **/
static void discardAPage(VDOPageCache *cache)
{
  PageInfo *info = selectLRUPage(cache);
  if (info == NULL) {
    reportCachePressure(cache);
    return;
  }

  if (!isDirty(info)) {
    allocateFreePage(info);
    return;
  }

  ASSERT_LOG_ONLY(!isInFlight(info),
                  "page selected for discard is not in flight");

  ++cache->discardCount;
  info->writeStatus = WRITE_STATUS_DISCARD;
  launchPageSave(info);
}

/**
 * Helper used to trigger a discard so that the completion can get a different
 * page.
 *
 * @param vdoPageComp   the VDO Page completion
 **/
static void discardPageForCompletion(VDOPageCompletion *vdoPageComp)
{
  VDOPageCache *cache = vdoPageComp->cache;

  ++cache->waiterCount;

  int result = enqueueWaiter(&cache->freeWaiters, &vdoPageComp->waiter);
  if (result != VDO_SUCCESS) {
    setPersistentError(cache, "cannot enqueue waiter", result);
  }

  discardAPage(cache);
}

/**
 * Helper used to trigger a discard if the cache needs another free page.
 *
 * @param cache         the page cache
 **/
static void discardPageIfNeeded(VDOPageCache *cache)
{
  if (cache->waiterCount > cache->discardCount) {
    discardAPage(cache);
  }
}

/**********************************************************************/
void advanceVDOPageCachePeriod(VDOPageCache *cache, SequenceNumber period)
{
  assertOnCacheThread(cache, __func__);
  advancePeriod(cache->dirtyLists, period);
}

/**
 * Inform the cache that a write has finished (possibly with an error).
 *
 * @param info  The info structure for the page whose write just completed
 *
 * @return <code>true</code> if the page write was a discard
 **/
static bool writeHasFinished(PageInfo *info)
{
  assertOnCacheThread(info->cache, __func__);
  info->cache->outstandingWrites--;

  bool wasDiscard = (info->writeStatus == WRITE_STATUS_DISCARD);
  info->writeStatus = WRITE_STATUS_NORMAL;
  return wasDiscard;
}

/**
 * Handler for page write errors.
 *
 * @param completion  The page write VIO
 **/
static void handlePageWriteError(VDOCompletion *completion)
{
  int           result = completion->result;
  PageInfo     *info   = completion->parent;
  VDOPageCache *cache  = info->cache;

  // If we're already read-only, write failures are to be expected.
  if (result != VDO_READ_ONLY) {
#if __KERNEL__
    static DEFINE_RATELIMIT_STATE(errorLimiter, DEFAULT_RATELIMIT_INTERVAL,
                                  DEFAULT_RATELIMIT_BURST);

    if (__ratelimit(&errorLimiter)) {
      logError("failed to write block map page %llu", info->pbn);
    }
#else
    logError("failed to write block map page %llu", info->pbn);
#endif
  }

  setInfoState(info, PS_DIRTY);
  relaxedAdd64(&cache->stats.failedWrites, 1);
  setPersistentError(cache, "cannot write page", result);

  if (!writeHasFinished(info)) {
    discardPageIfNeeded(cache);
  }

  checkForDrainComplete(cache->zone);
}

/**
 * VIO callback used when a page has been written out.
 *
 * @param completion    A completion for the VIO, the parent of which
 *                      is embedded in PageInfo.
 **/
static void pageIsWrittenOut(VDOCompletion *completion)
{
  PageInfo     *info  = completion->parent;
  VDOPageCache *cache = info->cache;

  if (cache->writeHook != NULL) {
    bool rewrite = cache->writeHook(getPageBuffer(info), cache->zone,
                                    info->context);
    if (rewrite) {
      launchWriteMetadataVIOWithFlush(info->vio, info->pbn, pageIsWrittenOut,
                                      handlePageWriteError, true, false);
      return;
    }
  }

  bool wasDiscard = writeHasFinished(info);
  bool reclaimed  = (!wasDiscard || (info->busy > 0)
                     || hasWaiters(&info->waiting));

  setInfoState(info, PS_RESIDENT);

  uint32_t reclamations = distributePageOverQueue(info, &info->waiting);
  relaxedAdd64(&cache->stats.reclaimed, reclamations);

  if (wasDiscard) {
    cache->discardCount--;
  }

  if (reclaimed) {
    discardPageIfNeeded(cache);
  } else {
    allocateFreePage(info);
  }

  checkForDrainComplete(cache->zone);
}

/**
 * Write the batch of pages which were covered by the layer flush which just
 * completed. This callback is registered in savePages().
 *
 * @param flushCompletion  The flush VIO
 **/
static void writePages(VDOCompletion *flushCompletion)
{
  VDOPageCache *cache = ((PageInfo *) flushCompletion->parent)->cache;

  /*
   * We need to cache these two values on the stack since in the error case
   * below, it is possible for the last page info to cause the page cache to
   * get freed. Hence once we launch the last page, it may be unsafe to
   * dereference the cache [VDO-4724].
   */
  bool      hasUnflushedPages = (cache->pagesToFlush > 0);
  PageCount pagesInFlush      = cache->pagesInFlush;
  cache->pagesInFlush         = 0;
  while (pagesInFlush-- > 0) {
    PageInfo *info = pageInfoFromListNode(chopRingNode(&cache->outgoingList));
    if (isReadOnly(info->cache->zone->readOnlyNotifier)) {
      VDOCompletion *completion = &info->vio->completion;
      resetCompletion(completion);
      completion->callback     = pageIsWrittenOut;
      completion->errorHandler = handlePageWriteError;
      finishCompletion(completion, VDO_READ_ONLY);
      continue;
    }
    relaxedAdd64(&info->cache->stats.pagesSaved, 1);
    launchWriteMetadataVIO(info->vio, info->pbn, pageIsWrittenOut,
                           handlePageWriteError);
  }

  if (hasUnflushedPages) {
    // If there are unflushed pages, the cache can't have been freed, so this
    // call is safe.
    savePages(cache);
  }
}

/**********************************************************************/
void releaseVDOPageCompletion(VDOCompletion *completion)
{
  if (completion == NULL) {
    return;
  }

  PageInfo *discardInfo = NULL;
  VDOPageCompletion *pageCompletion;
  if (completion->result == VDO_SUCCESS) {
    pageCompletion = validateCompletedPage(completion, false);
    if (--pageCompletion->info->busy == 0) {
      discardInfo = pageCompletion->info;
    }
  } else {
    // Do not check for errors if the completion was not successful.
    pageCompletion = asVDOPageCompletion(completion);
  }
  ASSERT_LOG_ONLY((pageCompletion->waiter.nextWaiter == NULL),
                  "Page being released after leaving all queues");

  VDOPageCache *cache = pageCompletion->cache;
  assertOnCacheThread(cache, __func__);
  memset(pageCompletion, 0, sizeof(VDOPageCompletion));

  if (discardInfo != NULL) {
    if (discardInfo->writeStatus == WRITE_STATUS_DEFERRED) {
      discardInfo->writeStatus = WRITE_STATUS_NORMAL;
      launchPageSave(discardInfo);
    }
    // if there are excess requests for pages (that have not already started
    // discards) we need to discard some page (which may be this one)
    discardPageIfNeeded(cache);
  }
}

/**
 * Helper function to load a page as described by a VDO Page Completion.
 *
 * @param info          the page info representing where to load the page
 * @param vdoPageComp   the VDO Page Completion describing the page
 **/
static void loadPageForCompletion(PageInfo          *info,
                                  VDOPageCompletion *vdoPageComp)
{
  int result = enqueueWaiter(&info->waiting, &vdoPageComp->waiter);
  if (result != VDO_SUCCESS) {
    finishCompletion(&vdoPageComp->completion, result);
    return;
  }

  result = launchPageLoad(info, vdoPageComp->pbn);
  if (result != VDO_SUCCESS) {
    distributeErrorOverQueue(result, &info->waiting);
  }
}

/**********************************************************************/
void getVDOPageAsync(VDOCompletion *completion)
{
  VDOPageCompletion *vdoPageComp = asVDOPageCompletion(completion);
  VDOPageCache      *cache       = vdoPageComp->cache;
  assertOnCacheThread(cache, __func__);

  if (vdoPageComp->writable && isReadOnly(cache->zone->readOnlyNotifier)) {
    finishCompletion(completion, VDO_READ_ONLY);
    return;
  }

  if (vdoPageComp->writable) {
    relaxedAdd64(&cache->stats.writeCount, 1);
  } else {
    relaxedAdd64(&cache->stats.readCount, 1);
  }

  PageInfo *info = vpcFindPage(cache, vdoPageComp->pbn);
  if (info != NULL) {
    // The page is in the cache already.
    if ((info->writeStatus == WRITE_STATUS_DEFERRED) || isIncoming(info)
        || (isOutgoing(info) && vdoPageComp->writable)) {
      // The page is unusable until it has finished I/O.
      relaxedAdd64(&cache->stats.waitForPage, 1);
      int result = enqueueWaiter(&info->waiting, &vdoPageComp->waiter);
      if (result != VDO_SUCCESS) {
        finishCompletion(&vdoPageComp->completion, result);
      }

      return;
    }

    if (isValid(info)) {
      // The page is usable.
      relaxedAdd64(&cache->stats.foundInCache, 1);
      if (!isPresent(info)) {
        relaxedAdd64(&cache->stats.readOutgoing, 1);
      }
      updateLru(info);
      ++info->busy;
      completeWithPage(info, vdoPageComp);
      return;
    }
    // Something horrible has gone wrong.
    ASSERT_LOG_ONLY(false, "Info found in a usable state.");
  }

  // The page must be fetched.
  info = findFreePage(cache);
  if (info != NULL) {
    relaxedAdd64(&cache->stats.fetchRequired, 1);
    loadPageForCompletion(info, vdoPageComp);
    return;
  }

  // The page must wait for a page to be discarded.
  relaxedAdd64(&cache->stats.discardRequired, 1);
  discardPageForCompletion(vdoPageComp);
}

/**********************************************************************/
void markCompletedVDOPageDirty(VDOCompletion  *completion,
                               SequenceNumber  oldDirtyPeriod,
                               SequenceNumber  newDirtyPeriod)
{
  VDOPageCompletion *vdoPageComp = validateCompletedPage(completion, true);
  if (vdoPageComp == NULL) {
    return;
  }

  PageInfo *info = vdoPageComp->info;
  setInfoState(info, PS_DIRTY);
  addToDirtyLists(info->cache->dirtyLists, &info->listNode, oldDirtyPeriod,
                  newDirtyPeriod);
}

/**********************************************************************/
void requestVDOPageWrite(VDOCompletion *completion)
{
  VDOPageCompletion *vdoPageComp = validateCompletedPage(completion, true);
  if (vdoPageComp == NULL) {
    return;
  }

  PageInfo *info = vdoPageComp->info;
  setInfoState(info, PS_DIRTY);
  launchPageSave(info);
}

/**********************************************************************/
static void *dereferencePageCompletion(VDOPageCompletion  *completion)
{
  return ((completion != NULL) ? getPageBuffer(completion->info) : NULL);
}

/**********************************************************************/
const void *dereferenceReadableVDOPage(VDOCompletion *completion)
{
  return dereferencePageCompletion(validateCompletedPage(completion, false));
}

/**********************************************************************/
void *dereferenceWritableVDOPage(VDOCompletion *completion)
{
  return dereferencePageCompletion(validateCompletedPage(completion, true));
}

/**********************************************************************/
void *getVDOPageCompletionContext(VDOCompletion *completion)
{
  VDOPageCompletion *pageCompletion = asVDOPageCompletion(completion);
  PageInfo *info = ((pageCompletion != NULL) ? pageCompletion->info : NULL);
  return (((info != NULL) && isValid(info)) ? info->context : NULL);
}

/**********************************************************************/
void drainVDOPageCache(VDOPageCache *cache)
{
  assertOnCacheThread(cache, __func__);
  ASSERT_LOG_ONLY(isDraining(&cache->zone->state),
                  "drainVDOPageCache() called during block map drain");

  if (!isSuspending(&cache->zone->state)) {
    flushDirtyLists(cache->dirtyLists);
    savePages(cache);
  }
}

/**********************************************************************/
int invalidateVDOPageCache(VDOPageCache *cache)
{
  assertOnCacheThread(cache, __func__);

  // Make sure we don't throw away any dirty pages.
  PageInfo *info;
  for (info = cache->infos; info < cache->infos + cache->pageCount; info++) {
    int result = ASSERT(!isDirty(info), "cache must have no dirty pages");
    if (result != VDO_SUCCESS) {
      return result;
    }
  }

  // Reset the pageMap by re-allocating it.
  freeIntMap(&cache->pageMap);
  return makeIntMap(cache->pageCount, 0, &cache->pageMap);
}