Blame source/uds/request.c

Packit b55c50
/*
Packit b55c50
 * Copyright (c) 2020 Red Hat, Inc.
Packit b55c50
 *
Packit b55c50
 * This program is free software; you can redistribute it and/or
Packit b55c50
 * modify it under the terms of the GNU General Public License
Packit b55c50
 * as published by the Free Software Foundation; either version 2
Packit b55c50
 * of the License, or (at your option) any later version.
Packit b55c50
 * 
Packit b55c50
 * This program is distributed in the hope that it will be useful,
Packit b55c50
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit b55c50
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit b55c50
 * GNU General Public License for more details.
Packit b55c50
 * 
Packit b55c50
 * You should have received a copy of the GNU General Public License
Packit b55c50
 * along with this program; if not, write to the Free Software
Packit b55c50
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit b55c50
 * 02110-1301, USA. 
Packit b55c50
 *
Packit b55c50
 * $Id: //eng/uds-releases/jasper/src/uds/request.c#6 $
Packit b55c50
 */
Packit b55c50
Packit b55c50
#include "request.h"
Packit b55c50
Packit b55c50
#include "indexRouter.h"
Packit b55c50
#include "indexSession.h"
Packit b55c50
#include "logger.h"
Packit b55c50
#include "memoryAlloc.h"
Packit b55c50
#include "permassert.h"
Packit b55c50
#include "requestQueue.h"
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
int udsStartChunkOperation(UdsRequest *udsRequest)
Packit b55c50
{
Packit b55c50
  if (udsRequest->callback == NULL) {
Packit b55c50
    return UDS_CALLBACK_REQUIRED;
Packit b55c50
  }
Packit b55c50
  switch (udsRequest->type) {
Packit b55c50
  case UDS_DELETE:
Packit b55c50
  case UDS_POST:
Packit b55c50
  case UDS_QUERY:
Packit b55c50
  case UDS_UPDATE:
Packit b55c50
    break;
Packit b55c50
  default:
Packit b55c50
    return UDS_INVALID_OPERATION_TYPE;
Packit b55c50
  }
Packit b55c50
  memset(udsRequest->private, 0, sizeof(udsRequest->private));
Packit b55c50
  Request *request = (Request *)udsRequest;
Packit b55c50
Packit b55c50
  int result = getIndexSession(request->session);
Packit b55c50
  if (result != UDS_SUCCESS) {
Packit b55c50
    return sansUnrecoverable(result);
Packit b55c50
  }
Packit b55c50
Packit b55c50
  request->found            = false;
Packit b55c50
  request->action           = (RequestAction) request->type;
Packit b55c50
  request->isControlMessage = false;
Packit b55c50
  request->unbatched        = false;
Packit b55c50
  request->router           = request->session->router;
Packit b55c50
Packit b55c50
  enqueueRequest(request, STAGE_TRIAGE);
Packit b55c50
  return UDS_SUCCESS;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
int launchZoneControlMessage(RequestAction  action,
Packit b55c50
                             ZoneMessage    message,
Packit b55c50
                             unsigned int   zone,
Packit b55c50
                             IndexRouter   *router)
Packit b55c50
{
Packit b55c50
  Request *request;
Packit b55c50
  int result = ALLOCATE(1, Request, __func__, &request);
Packit b55c50
  if (result != UDS_SUCCESS) {
Packit b55c50
    return result;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  request->router           = router;
Packit b55c50
  request->isControlMessage = true;
Packit b55c50
  request->unbatched        = true;
Packit b55c50
  request->action           = action;
Packit b55c50
  request->zoneNumber       = zone;
Packit b55c50
  request->zoneMessage      = message;
Packit b55c50
Packit b55c50
  enqueueRequest(request, STAGE_INDEX);
Packit b55c50
  return UDS_SUCCESS;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void freeRequest(Request *request)
Packit b55c50
{
Packit b55c50
  if (request != NULL) {
Packit b55c50
    FREE(request);
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
static RequestQueue *getNextStageQueue(Request      *request,
Packit b55c50
                                       RequestStage  nextStage)
Packit b55c50
{
Packit b55c50
  if (nextStage == STAGE_CALLBACK) {
Packit b55c50
    return request->session->callbackQueue;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  // Local and remote index routers handle the rest of the pipeline
Packit b55c50
  // differently, so delegate the choice of queue to the router.
Packit b55c50
  return selectIndexRouterQueue(request->router, request, nextStage);
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
static void handleRequestErrors(Request *request)
Packit b55c50
{
Packit b55c50
  // XXX Use the router's callback function to hand back the error
Packit b55c50
  // and clean up the request? (Possible thread issues doing that.)
Packit b55c50
Packit b55c50
  freeRequest(request);
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void enqueueRequest(Request *request, RequestStage nextStage)
Packit b55c50
{
Packit b55c50
  RequestQueue *nextQueue = getNextStageQueue(request, nextStage);
Packit b55c50
  if (nextQueue == NULL) {
Packit b55c50
    handleRequestErrors(request);
Packit b55c50
    return;
Packit b55c50
  }
Packit b55c50
Packit b55c50
  requestQueueEnqueue(nextQueue, request);
Packit b55c50
}
Packit b55c50
Packit b55c50
/*
Packit b55c50
 * This function pointer allows unit test code to intercept the slow-lane
Packit b55c50
 * requeuing of a request.
Packit b55c50
 */
Packit b55c50
static RequestRestarter requestRestarter = NULL;
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void restartRequest(Request *request)
Packit b55c50
{
Packit b55c50
  request->requeued = true;
Packit b55c50
  if (requestRestarter == NULL) {
Packit b55c50
    enqueueRequest(request, STAGE_INDEX);
Packit b55c50
  } else {
Packit b55c50
    requestRestarter(request);
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void setRequestRestarter(RequestRestarter restarter)
Packit b55c50
{
Packit b55c50
  requestRestarter = restarter;
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
static INLINE void increment_once(uint64_t *countPtr)
Packit b55c50
{
Packit b55c50
  WRITE_ONCE(*countPtr, READ_ONCE(*countPtr) + 1);
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void updateRequestContextStats(Request *request)
Packit b55c50
{
Packit b55c50
  /*
Packit b55c50
   * We don't need any synchronization since the context stats are only
Packit b55c50
   *  modified from the single callback thread.
Packit b55c50
   *
Packit b55c50
   * We increment either 2 or 3 counters in this method.
Packit b55c50
   *
Packit b55c50
   * XXX We always increment the "requests" counter.  But there is no code
Packit b55c50
   *     that uses the value stored in this counter.
Packit b55c50
   *
Packit b55c50
   * We always increment exactly one of these counters (unless there is an
Packit b55c50
   * error in the code, which never happens):
Packit b55c50
   *     postsFound      postsNotFound
Packit b55c50
   *     updatesFound    updatesNotFound
Packit b55c50
   *     deletionsFound  deletionsNotFound
Packit b55c50
   *     queriesFound    queriesNotFound
Packit b55c50
   *
Packit b55c50
   * XXX In the case of post request that were found in the index, we increment
Packit b55c50
   *     exactly one of these counters.  But there is no code that uses the
Packit b55c50
   *     value stored in these counters.
Packit b55c50
   *          inMemoryPostsFound
Packit b55c50
   *          densePostsFound
Packit b55c50
   *          sparsePostsFound
Packit b55c50
   */
Packit b55c50
Packit b55c50
  SessionStats *sessionStats = &request->session->stats;
Packit b55c50
Packit b55c50
  increment_once(&sessionStats->requests);
Packit b55c50
  bool found = (request->location != LOC_UNAVAILABLE);
Packit b55c50
Packit b55c50
  switch (request->action) {
Packit b55c50
  case REQUEST_INDEX:
Packit b55c50
    if (found) {
Packit b55c50
      increment_once(&sessionStats->postsFound);
Packit b55c50
Packit b55c50
      if (request->location == LOC_IN_OPEN_CHAPTER) {
Packit b55c50
        increment_once(&sessionStats->postsFoundOpenChapter);
Packit b55c50
      } else if (request->location == LOC_IN_DENSE) {
Packit b55c50
        increment_once(&sessionStats->postsFoundDense);
Packit b55c50
      } else if (request->location == LOC_IN_SPARSE) {
Packit b55c50
        increment_once(&sessionStats->postsFoundSparse);
Packit b55c50
      }
Packit b55c50
    } else {
Packit b55c50
      increment_once(&sessionStats->postsNotFound);
Packit b55c50
    }
Packit b55c50
    break;
Packit b55c50
Packit b55c50
  case REQUEST_UPDATE:
Packit b55c50
    if (found) {
Packit b55c50
      increment_once(&sessionStats->updatesFound);
Packit b55c50
    } else {
Packit b55c50
      increment_once(&sessionStats->updatesNotFound);
Packit b55c50
    }
Packit b55c50
    break;
Packit b55c50
Packit b55c50
  case REQUEST_DELETE:
Packit b55c50
    if (found) {
Packit b55c50
      increment_once(&sessionStats->deletionsFound);
Packit b55c50
    } else {
Packit b55c50
      increment_once(&sessionStats->deletionsNotFound);
Packit b55c50
    }
Packit b55c50
    break;
Packit b55c50
Packit b55c50
  case REQUEST_QUERY:
Packit b55c50
    if (found) {
Packit b55c50
      increment_once(&sessionStats->queriesFound);
Packit b55c50
    } else {
Packit b55c50
      increment_once(&sessionStats->queriesNotFound);
Packit b55c50
    }
Packit b55c50
    break;
Packit b55c50
Packit b55c50
  default:
Packit b55c50
    request->status = ASSERT(false, "unknown next action in request: %d",
Packit b55c50
                             request->action);
Packit b55c50
  }
Packit b55c50
}
Packit b55c50
Packit b55c50
/**********************************************************************/
Packit b55c50
void enterCallbackStage(Request *request)
Packit b55c50
{
Packit b55c50
  if (!request->isControlMessage) {
Packit b55c50
    if (isUnrecoverable(request->status)) {
Packit b55c50
      // Unrecoverable errors must disable the index session
Packit b55c50
      disableIndexSession(request->session);
Packit b55c50
      // The unrecoverable state is internal and must not sent to the client.
Packit b55c50
      request->status = sansUnrecoverable(request->status);
Packit b55c50
    }
Packit b55c50
Packit b55c50
    // Handle asynchronous client callbacks in the designated thread.
Packit b55c50
    enqueueRequest(request, STAGE_CALLBACK);
Packit b55c50
  } else {
Packit b55c50
    /*
Packit b55c50
     * Asynchronous control messages are complete when they are executed.
Packit b55c50
     * There should be nothing they need to do on the callback thread. The
Packit b55c50
     * message has been completely processed, so just free it.
Packit b55c50
     */
Packit b55c50
    freeRequest(request);
Packit b55c50
  }
Packit b55c50
}