Blame uds/indexRouter.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/uds-releases/jasper/src/uds/indexRouter.c#7 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "indexRouter.h"
Packit Service 310c69
Packit Service 310c69
#include "compiler.h"
Packit Service 310c69
#include "indexCheckpoint.h"
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "requestQueue.h"
Packit Service 310c69
#include "zone.h"
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This is the request processing function invoked by the zone's RequestQueue
Packit Service 310c69
 * worker thread.
Packit Service 310c69
 *
Packit Service 310c69
 * @param request  the request to be indexed or executed by the zone worker
Packit Service 310c69
 **/
Packit Service 310c69
static void executeZoneRequest(Request *request)
Packit Service 310c69
{
Packit Service 310c69
  executeIndexRouterRequest(request->router, request);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Construct and enqueue asynchronous control messages to add the chapter
Packit Service 310c69
 * index for a given virtual chapter to the sparse chapter index cache.
Packit Service 310c69
 *
Packit Service 310c69
 * @param router          the router containing the relevant queues
Packit Service 310c69
 * @param index           the index with the relevant cache and chapter
Packit Service 310c69
 * @param virtualChapter  the virtual chapter number of the chapter to cache
Packit Service 310c69
 **/
Packit Service 310c69
static void enqueueBarrierMessages(IndexRouter *router,
Packit Service 310c69
                                   Index       *index,
Packit Service 310c69
                                   uint64_t     virtualChapter)
Packit Service 310c69
{
Packit Service 310c69
  ZoneMessage barrier = {
Packit Service 310c69
    .index = index,
Packit Service 310c69
    .data = {
Packit Service 310c69
      .barrier = {
Packit Service 310c69
        .virtualChapter = virtualChapter,
Packit Service 310c69
      }
Packit Service 310c69
    }
Packit Service 310c69
  };
Packit Service 310c69
  unsigned int zone;
Packit Service 310c69
  for (zone = 0; zone < router->zoneCount; zone++) {
Packit Service 310c69
    int result = launchZoneControlMessage(REQUEST_SPARSE_CACHE_BARRIER,
Packit Service 310c69
                                          barrier, zone, router);
Packit Service 310c69
    ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This is the request processing function for the triage stage queue. Each
Packit Service 310c69
 * request is resolved in the master index, determining if it is a hook or
Packit Service 310c69
 * not, and if a hook, what virtual chapter (if any) it might be found in. If
Packit Service 310c69
 * a virtual chapter is found, this enqueues a sparse chapter cache barrier in
Packit Service 310c69
 * every zone before enqueueing the request in its zone.
Packit Service 310c69
 *
Packit Service 310c69
 * @param request  the request to triage
Packit Service 310c69
 **/
Packit Service 310c69
static void triageRequest(Request *request)
Packit Service 310c69
{
Packit Service 310c69
  IndexRouter *router = request->router;
Packit Service 310c69
  Index *index = router->index;
Packit Service 310c69
Packit Service 310c69
  // Check if the name is a hook in the index pointing at a sparse chapter.
Packit Service 310c69
  uint64_t sparseVirtualChapter = triageIndexRequest(index, request);
Packit Service 310c69
  if (sparseVirtualChapter != UINT64_MAX) {
Packit Service 310c69
    // Generate and place a barrier request on every zone queue.
Packit Service 310c69
    enqueueBarrierMessages(router, index, sparseVirtualChapter);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  enqueueRequest(request, STAGE_INDEX);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Initialize the zone queues and the triage queue.
Packit Service 310c69
 *
Packit Service 310c69
 * @param router    the router containing the queues
Packit Service 310c69
 * @param geometry  the geometry governing the indexes
Packit Service 310c69
 *
Packit Service 310c69
 * @return  UDS_SUCCESS or error code
Packit Service 310c69
 **/
Packit Service 310c69
static int initializeLocalIndexQueues(IndexRouter    *router,
Packit Service 310c69
                                      const Geometry *geometry)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int i;
Packit Service 310c69
  for (i = 0; i < router->zoneCount; i++) {
Packit Service 310c69
    int result = makeRequestQueue("indexW", &executeZoneRequest,
Packit Service 310c69
                                  &router->zoneQueues[i]);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // The triage queue is only needed for sparse multi-zone indexes.
Packit Service 310c69
  if ((router->zoneCount > 1) && isSparse(geometry)) {
Packit Service 310c69
    int result = makeRequestQueue("triageW", &triageRequest,
Packit Service 310c69
                                  &router->triageQueue);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      return result;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static INLINE RequestQueue *getZoneQueue(IndexRouter  *router,
Packit Service 310c69
                                         unsigned int  zoneNumber)
Packit Service 310c69
{
Packit Service 310c69
  return router->zoneQueues[zoneNumber];
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeIndexRouter(IndexLayout                  *layout,
Packit Service 310c69
                    const Configuration          *config,
Packit Service 310c69
                    const struct uds_parameters  *userParams,
Packit Service 310c69
                    LoadType                      loadType,
Packit Service 310c69
                    IndexLoadContext             *loadContext,
Packit Service 310c69
                    IndexRouterCallback           callback,
Packit Service 310c69
                    IndexRouter                 **routerPtr)
Packit Service 310c69
{
Packit Service 310c69
  unsigned int zoneCount = getZoneCount(userParams);
Packit Service 310c69
  IndexRouter *router;
Packit Service 310c69
  int result = ALLOCATE_EXTENDED(IndexRouter, zoneCount, RequestQueue *,
Packit Service 310c69
                                 "index router", &router);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  router->callback  = callback;
Packit Service 310c69
  router->zoneCount = zoneCount;
Packit Service 310c69
Packit Service 310c69
  result = initializeLocalIndexQueues(router, config->geometry);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeIndexRouter(router);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  result = makeIndex(layout, config, userParams, router->zoneCount, loadType,
Packit Service 310c69
                     loadContext, &router->index);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    freeIndexRouter(router);
Packit Service 310c69
    return logErrorWithStringError(result, "failed to create index");
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  router->needToSave = (router->index->loadedType != LOAD_LOAD);
Packit Service 310c69
  *routerPtr = router;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int saveIndexRouter(IndexRouter *router)
Packit Service 310c69
{
Packit Service 310c69
  if (!router->needToSave) {
Packit Service 310c69
    return UDS_SUCCESS;
Packit Service 310c69
  }
Packit Service 310c69
  int result = saveIndex(router->index);
Packit Service 310c69
  router->needToSave = (result != UDS_SUCCESS);
Packit Service 310c69
  return result;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freeIndexRouter(IndexRouter *router)
Packit Service 310c69
{
Packit Service 310c69
  if (router == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
  requestQueueFinish(router->triageQueue);
Packit Service 310c69
  unsigned int i;
Packit Service 310c69
  for (i = 0; i < router->zoneCount; i++) {
Packit Service 310c69
    requestQueueFinish(router->zoneQueues[i]);
Packit Service 310c69
  }
Packit Service 310c69
  freeIndex(router->index);
Packit Service 310c69
  FREE(router);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
RequestQueue *selectIndexRouterQueue(IndexRouter  *router,
Packit Service 310c69
                                     Request      *request,
Packit Service 310c69
                                     RequestStage  nextStage)
Packit Service 310c69
{
Packit Service 310c69
  if (request->isControlMessage) {
Packit Service 310c69
    return getZoneQueue(router, request->zoneNumber);
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  if (nextStage == STAGE_TRIAGE) {
Packit Service 310c69
    // The triage queue is only needed for multi-zone sparse indexes and won't
Packit Service 310c69
    // be allocated by the router if not needed, so simply check for NULL.
Packit Service 310c69
    if (router->triageQueue != NULL) {
Packit Service 310c69
      return router->triageQueue;
Packit Service 310c69
    }
Packit Service 310c69
    // Dense index or single zone, so route it directly to the zone queue.
Packit Service 310c69
  } else if (nextStage != STAGE_INDEX) {
Packit Service 310c69
    ASSERT_LOG_ONLY(false, "invalid index stage: %d", nextStage);
Packit Service 310c69
    return NULL;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  Index *index = router->index;
Packit Service 310c69
  request->zoneNumber = getMasterIndexZone(index->masterIndex,
Packit Service 310c69
                                           &request->chunkName);
Packit Service 310c69
  return getZoneQueue(router, request->zoneNumber);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void executeIndexRouterRequest(IndexRouter *router, Request *request)
Packit Service 310c69
{
Packit Service 310c69
  if (request->isControlMessage) {
Packit Service 310c69
    int result = dispatchIndexZoneControlRequest(request);
Packit Service 310c69
    if (result != UDS_SUCCESS) {
Packit Service 310c69
      logErrorWithStringError(result, "error executing control message: %d",
Packit Service 310c69
                              request->action);
Packit Service 310c69
    }
Packit Service 310c69
    request->status = result;
Packit Service 310c69
    enterCallbackStage(request);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  router->needToSave = true;
Packit Service 310c69
  if (request->requeued && !isSuccessful(request->status)) {
Packit Service 310c69
    request->status = makeUnrecoverable(request->status);
Packit Service 310c69
    router->callback(request);
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  Index *index = router->index;
Packit Service 310c69
  int result = dispatchIndexRequest(index, request);
Packit Service 310c69
  if (result == UDS_QUEUED) {
Packit Service 310c69
    // Take the request off the pipeline.
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  request->status = result;
Packit Service 310c69
  router->callback(request);
Packit Service 310c69
}