Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/request.c#6 $
 */

#include "request.h"

#include "indexRouter.h"
#include "indexSession.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "permassert.h"
#include "requestQueue.h"

/**********************************************************************/
int udsStartChunkOperation(UdsRequest *udsRequest)
{
  if (udsRequest->callback == NULL) {
    return UDS_CALLBACK_REQUIRED;
  }
  switch (udsRequest->type) {
  case UDS_DELETE:
  case UDS_POST:
  case UDS_QUERY:
  case UDS_UPDATE:
    break;
  default:
    return UDS_INVALID_OPERATION_TYPE;
  }
  memset(udsRequest->private, 0, sizeof(udsRequest->private));
  Request *request = (Request *)udsRequest;

  int result = getIndexSession(request->session);
  if (result != UDS_SUCCESS) {
    return sansUnrecoverable(result);
  }

  request->found            = false;
  request->action           = (RequestAction) request->type;
  request->isControlMessage = false;
  request->unbatched        = false;
  request->router           = request->session->router;

  enqueueRequest(request, STAGE_TRIAGE);
  return UDS_SUCCESS;
}

/**********************************************************************/
int launchZoneControlMessage(RequestAction  action,
                             ZoneMessage    message,
                             unsigned int   zone,
                             IndexRouter   *router)
{
  Request *request;
  int result = ALLOCATE(1, Request, __func__, &request);
  if (result != UDS_SUCCESS) {
    return result;
  }

  request->router           = router;
  request->isControlMessage = true;
  request->unbatched        = true;
  request->action           = action;
  request->zoneNumber       = zone;
  request->zoneMessage      = message;

  enqueueRequest(request, STAGE_INDEX);
  return UDS_SUCCESS;
}

/**********************************************************************/
void freeRequest(Request *request)
{
  if (request != NULL) {
    FREE(request);
  }
}

/**********************************************************************/
static RequestQueue *getNextStageQueue(Request      *request,
                                       RequestStage  nextStage)
{
  if (nextStage == STAGE_CALLBACK) {
    return request->session->callbackQueue;
  }

  // Local and remote index routers handle the rest of the pipeline
  // differently, so delegate the choice of queue to the router.
  return selectIndexRouterQueue(request->router, request, nextStage);
}

/**********************************************************************/
static void handleRequestErrors(Request *request)
{
  // XXX Use the router's callback function to hand back the error
  // and clean up the request? (Possible thread issues doing that.)

  freeRequest(request);
}

/**********************************************************************/
void enqueueRequest(Request *request, RequestStage nextStage)
{
  RequestQueue *nextQueue = getNextStageQueue(request, nextStage);
  if (nextQueue == NULL) {
    handleRequestErrors(request);
    return;
  }

  requestQueueEnqueue(nextQueue, request);
}

/*
 * This function pointer allows unit test code to intercept the slow-lane
 * requeuing of a request.
 */
static RequestRestarter requestRestarter = NULL;

/**********************************************************************/
void restartRequest(Request *request)
{
  request->requeued = true;
  if (requestRestarter == NULL) {
    enqueueRequest(request, STAGE_INDEX);
  } else {
    requestRestarter(request);
  }
}

/**********************************************************************/
void setRequestRestarter(RequestRestarter restarter)
{
  requestRestarter = restarter;
}

/**********************************************************************/
static INLINE void increment_once(uint64_t *countPtr)
{
  WRITE_ONCE(*countPtr, READ_ONCE(*countPtr) + 1);
}

/**********************************************************************/
void updateRequestContextStats(Request *request)
{
  /*
   * We don't need any synchronization since the context stats are only
   *  modified from the single callback thread.
   *
   * We increment either 2 or 3 counters in this method.
   *
   * XXX We always increment the "requests" counter.  But there is no code
   *     that uses the value stored in this counter.
   *
   * We always increment exactly one of these counters (unless there is an
   * error in the code, which never happens):
   *     postsFound      postsNotFound
   *     updatesFound    updatesNotFound
   *     deletionsFound  deletionsNotFound
   *     queriesFound    queriesNotFound
   *
   * XXX In the case of post request that were found in the index, we increment
   *     exactly one of these counters.  But there is no code that uses the
   *     value stored in these counters.
   *          inMemoryPostsFound
   *          densePostsFound
   *          sparsePostsFound
   */

  SessionStats *sessionStats = &request->session->stats;

  increment_once(&sessionStats->requests);
  bool found = (request->location != LOC_UNAVAILABLE);

  switch (request->action) {
  case REQUEST_INDEX:
    if (found) {
      increment_once(&sessionStats->postsFound);

      if (request->location == LOC_IN_OPEN_CHAPTER) {
        increment_once(&sessionStats->postsFoundOpenChapter);
      } else if (request->location == LOC_IN_DENSE) {
        increment_once(&sessionStats->postsFoundDense);
      } else if (request->location == LOC_IN_SPARSE) {
        increment_once(&sessionStats->postsFoundSparse);
      }
    } else {
      increment_once(&sessionStats->postsNotFound);
    }
    break;

  case REQUEST_UPDATE:
    if (found) {
      increment_once(&sessionStats->updatesFound);
    } else {
      increment_once(&sessionStats->updatesNotFound);
    }
    break;

  case REQUEST_DELETE:
    if (found) {
      increment_once(&sessionStats->deletionsFound);
    } else {
      increment_once(&sessionStats->deletionsNotFound);
    }
    break;

  case REQUEST_QUERY:
    if (found) {
      increment_once(&sessionStats->queriesFound);
    } else {
      increment_once(&sessionStats->queriesNotFound);
    }
    break;

  default:
    request->status = ASSERT(false, "unknown next action in request: %d",
                             request->action);
  }
}

/**********************************************************************/
void enterCallbackStage(Request *request)
{
  if (!request->isControlMessage) {
    if (isUnrecoverable(request->status)) {
      // Unrecoverable errors must disable the index session
      disableIndexSession(request->session);
      // The unrecoverable state is internal and must not sent to the client.
      request->status = sansUnrecoverable(request->status);
    }

    // Handle asynchronous client callbacks in the designated thread.
    enqueueRequest(request, STAGE_CALLBACK);
  } else {
    /*
     * Asynchronous control messages are complete when they are executed.
     * There should be nothing they need to do on the callback thread. The
     * message has been completely processed, so just free it.
     */
    freeRequest(request);
  }
}