/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/uds-releases/jasper/src/uds/indexRouter.c#7 $
*/
#include "indexRouter.h"
#include "compiler.h"
#include "indexCheckpoint.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "requestQueue.h"
#include "zone.h"
/**
* This is the request processing function invoked by the zone's RequestQueue
* worker thread.
*
* @param request the request to be indexed or executed by the zone worker
**/
static void executeZoneRequest(Request *request)
{
executeIndexRouterRequest(request->router, request);
}
/**
* Construct and enqueue asynchronous control messages to add the chapter
* index for a given virtual chapter to the sparse chapter index cache.
*
* @param router the router containing the relevant queues
* @param index the index with the relevant cache and chapter
* @param virtualChapter the virtual chapter number of the chapter to cache
**/
static void enqueueBarrierMessages(IndexRouter *router,
Index *index,
uint64_t virtualChapter)
{
ZoneMessage barrier = {
.index = index,
.data = {
.barrier = {
.virtualChapter = virtualChapter,
}
}
};
unsigned int zone;
for (zone = 0; zone < router->zoneCount; zone++) {
int result = launchZoneControlMessage(REQUEST_SPARSE_CACHE_BARRIER,
barrier, zone, router);
ASSERT_LOG_ONLY((result == UDS_SUCCESS), "barrier message allocation");
}
}
/**
* This is the request processing function for the triage stage queue. Each
* request is resolved in the master index, determining if it is a hook or
* not, and if a hook, what virtual chapter (if any) it might be found in. If
* a virtual chapter is found, this enqueues a sparse chapter cache barrier in
* every zone before enqueueing the request in its zone.
*
* @param request the request to triage
**/
static void triageRequest(Request *request)
{
IndexRouter *router = request->router;
Index *index = router->index;
// Check if the name is a hook in the index pointing at a sparse chapter.
uint64_t sparseVirtualChapter = triageIndexRequest(index, request);
if (sparseVirtualChapter != UINT64_MAX) {
// Generate and place a barrier request on every zone queue.
enqueueBarrierMessages(router, index, sparseVirtualChapter);
}
enqueueRequest(request, STAGE_INDEX);
}
/**
* Initialize the zone queues and the triage queue.
*
* @param router the router containing the queues
* @param geometry the geometry governing the indexes
*
* @return UDS_SUCCESS or error code
**/
static int initializeLocalIndexQueues(IndexRouter *router,
const Geometry *geometry)
{
unsigned int i;
for (i = 0; i < router->zoneCount; i++) {
int result = makeRequestQueue("indexW", &executeZoneRequest,
&router->zoneQueues[i]);
if (result != UDS_SUCCESS) {
return result;
}
}
// The triage queue is only needed for sparse multi-zone indexes.
if ((router->zoneCount > 1) && isSparse(geometry)) {
int result = makeRequestQueue("triageW", &triageRequest,
&router->triageQueue);
if (result != UDS_SUCCESS) {
return result;
}
}
return UDS_SUCCESS;
}
/**********************************************************************/
static INLINE RequestQueue *getZoneQueue(IndexRouter *router,
unsigned int zoneNumber)
{
return router->zoneQueues[zoneNumber];
}
/**********************************************************************/
int makeIndexRouter(IndexLayout *layout,
const Configuration *config,
const struct uds_parameters *userParams,
LoadType loadType,
IndexLoadContext *loadContext,
IndexRouterCallback callback,
IndexRouter **routerPtr)
{
unsigned int zoneCount = getZoneCount(userParams);
IndexRouter *router;
int result = ALLOCATE_EXTENDED(IndexRouter, zoneCount, RequestQueue *,
"index router", &router);
if (result != UDS_SUCCESS) {
return result;
}
router->callback = callback;
router->zoneCount = zoneCount;
result = initializeLocalIndexQueues(router, config->geometry);
if (result != UDS_SUCCESS) {
freeIndexRouter(router);
return result;
}
result = makeIndex(layout, config, userParams, router->zoneCount, loadType,
loadContext, &router->index);
if (result != UDS_SUCCESS) {
freeIndexRouter(router);
return logErrorWithStringError(result, "failed to create index");
}
router->needToSave = (router->index->loadedType != LOAD_LOAD);
*routerPtr = router;
return UDS_SUCCESS;
}
/**********************************************************************/
int saveIndexRouter(IndexRouter *router)
{
if (!router->needToSave) {
return UDS_SUCCESS;
}
int result = saveIndex(router->index);
router->needToSave = (result != UDS_SUCCESS);
return result;
}
/**********************************************************************/
void freeIndexRouter(IndexRouter *router)
{
if (router == NULL) {
return;
}
requestQueueFinish(router->triageQueue);
unsigned int i;
for (i = 0; i < router->zoneCount; i++) {
requestQueueFinish(router->zoneQueues[i]);
}
freeIndex(router->index);
FREE(router);
}
/**********************************************************************/
RequestQueue *selectIndexRouterQueue(IndexRouter *router,
Request *request,
RequestStage nextStage)
{
if (request->isControlMessage) {
return getZoneQueue(router, request->zoneNumber);
}
if (nextStage == STAGE_TRIAGE) {
// The triage queue is only needed for multi-zone sparse indexes and won't
// be allocated by the router if not needed, so simply check for NULL.
if (router->triageQueue != NULL) {
return router->triageQueue;
}
// Dense index or single zone, so route it directly to the zone queue.
} else if (nextStage != STAGE_INDEX) {
ASSERT_LOG_ONLY(false, "invalid index stage: %d", nextStage);
return NULL;
}
Index *index = router->index;
request->zoneNumber = getMasterIndexZone(index->masterIndex,
&request->chunkName);
return getZoneQueue(router, request->zoneNumber);
}
/**********************************************************************/
void executeIndexRouterRequest(IndexRouter *router, Request *request)
{
if (request->isControlMessage) {
int result = dispatchIndexZoneControlRequest(request);
if (result != UDS_SUCCESS) {
logErrorWithStringError(result, "error executing control message: %d",
request->action);
}
request->status = result;
enterCallbackStage(request);
return;
}
router->needToSave = true;
if (request->requeued && !isSuccessful(request->status)) {
request->status = makeUnrecoverable(request->status);
router->callback(request);
return;
}
Index *index = router->index;
int result = dispatchIndexRequest(index, request);
if (result == UDS_QUEUED) {
// Take the request off the pipeline.
return;
}
request->status = result;
router->callback(request);
}