Blob Blame History Raw
/*
 * %Copyright%
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/indexSession.c#10 $
 */

#include "indexSession.h"

#include "indexCheckpoint.h"
#include "indexRouter.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "requestQueue.h"

/**********************************************************************/
static void collectStats(const struct uds_index_session *indexSession,
                         UdsContextStats                *stats)
{
  const SessionStats *sessionStats = &indexSession->stats;

  stats->currentTime = asTimeT(currentTime(CLOCK_REALTIME));

  stats->postsFound         = READ_ONCE(sessionStats->postsFound);
  stats->inMemoryPostsFound = READ_ONCE(sessionStats->postsFoundOpenChapter);
  stats->densePostsFound    = READ_ONCE(sessionStats->postsFoundDense);
  stats->sparsePostsFound   = READ_ONCE(sessionStats->postsFoundSparse);
  stats->postsNotFound      = READ_ONCE(sessionStats->postsNotFound);
  stats->updatesFound       = READ_ONCE(sessionStats->updatesFound);
  stats->updatesNotFound    = READ_ONCE(sessionStats->updatesNotFound);
  stats->deletionsFound     = READ_ONCE(sessionStats->deletionsFound);
  stats->deletionsNotFound  = READ_ONCE(sessionStats->deletionsNotFound);
  stats->queriesFound       = READ_ONCE(sessionStats->queriesFound);
  stats->queriesNotFound    = READ_ONCE(sessionStats->queriesNotFound);
  stats->requests           = READ_ONCE(sessionStats->requests);
}

/**********************************************************************/
static void handleCallbacks(Request *request)
{
  if (request->status == UDS_SUCCESS) {
    // Measure the turnaround time of this request and include that time,
    // along with the rest of the request, in the context's StatCounters.
    updateRequestContextStats(request);
  }

  if (request->callback != NULL) {
    // The request has specified its own callback and does not expect to be
    // freed.
    struct uds_index_session *indexSession = request->session;
    request->found = (request->location != LOC_UNAVAILABLE);
    request->callback((UdsRequest *) request);
    // We do this release after the callback because of the contract of the
    // udsFlushIndexSession method.
    releaseIndexSession(indexSession);
    return;
  }

  // Should not get here, because this is either a control message or it has a
  // callback method.
  freeRequest(request);
}

/**********************************************************************/
int checkIndexSession(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);
  unsigned int state = indexSession->state;
  unlockMutex(&indexSession->requestMutex);

  if (state == IS_FLAG_LOADED) {
    return UDS_SUCCESS;
  } else if (state & IS_FLAG_DISABLED) {
    return UDS_DISABLED;
  } else if ((state & IS_FLAG_LOADING)
             || (state & IS_FLAG_SUSPENDED)
             || (state & IS_FLAG_WAITING)) {
    return UDS_SUSPENDED;
  }

   return UDS_NO_INDEXSESSION;
}

/**********************************************************************/
int getIndexSession(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);
  indexSession->requestCount++;
  unlockMutex(&indexSession->requestMutex);

  int result = checkIndexSession(indexSession);
  if (result != UDS_SUCCESS) {
    releaseIndexSession(indexSession);
    return result;
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
void releaseIndexSession(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);
  if (--indexSession->requestCount == 0) {
    broadcastCond(&indexSession->requestCond);
  }
  unlockMutex(&indexSession->requestMutex);
}

/**********************************************************************/
int startLoadingIndexSession(struct uds_index_session *indexSession)
{
  int result;
  lockMutex(&indexSession->requestMutex);
  if (indexSession->state & IS_FLAG_SUSPENDED) {
    result = UDS_SUSPENDED;
  } else if (indexSession->state != 0) {
    result = UDS_INDEXSESSION_IN_USE;
  } else {
    indexSession->state |= IS_FLAG_LOADING;
    result = UDS_SUCCESS;
  }
  unlockMutex(&indexSession->requestMutex);
  return result;
}

/**********************************************************************/
void finishLoadingIndexSession(struct uds_index_session *indexSession,
                               int                       result)
{
  lockMutex(&indexSession->requestMutex);
  indexSession->state &= ~IS_FLAG_LOADING;
  if (result == UDS_SUCCESS) {
    indexSession->state |= IS_FLAG_LOADED;
  }
  broadcastCond(&indexSession->requestCond);
  unlockMutex(&indexSession->requestMutex);
}

/**********************************************************************/
void disableIndexSession(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);
  indexSession->state |= IS_FLAG_DISABLED;
  unlockMutex(&indexSession->requestMutex);
}

/**********************************************************************/
int makeEmptyIndexSession(struct uds_index_session **indexSessionPtr)
{
  struct uds_index_session *session;
  int result = ALLOCATE(1, struct uds_index_session, __func__, &session);
  if (result != UDS_SUCCESS) {
    return result;
  }

  result = initMutex(&session->requestMutex);
  if (result != UDS_SUCCESS) {
    FREE(session);
    return result;
  }

  result = initCond(&session->requestCond);
  if (result != UDS_SUCCESS) {
    destroyMutex(&session->requestMutex);
    FREE(session);
    return result;
  }

  result = initMutex(&session->loadContext.mutex);
  if (result != UDS_SUCCESS) {
    destroyCond(&session->requestCond);
    destroyMutex(&session->requestMutex);
    FREE(session);
    return result;
  }

  result = initCond(&session->loadContext.cond);
  if (result != UDS_SUCCESS) {
    destroyMutex(&session->loadContext.mutex);
    destroyCond(&session->requestCond);
    destroyMutex(&session->requestMutex);
    FREE(session);
    return result;
  }

  result = makeRequestQueue("callbackW", &handleCallbacks,
                            &session->callbackQueue);
  if (result != UDS_SUCCESS) {
    destroyCond(&session->loadContext.cond);
    destroyMutex(&session->loadContext.mutex);
    destroyCond(&session->requestCond);
    destroyMutex(&session->requestMutex);
    FREE(session);
    return result;
  }

  *indexSessionPtr = session;
  return UDS_SUCCESS;
}

/**********************************************************************/
int udsSuspendIndexSession(struct uds_index_session *session, bool save)
{
  int result;
  bool saveIndex = false;
  bool suspendIndex = false;
  lockMutex(&session->requestMutex);
  // Wait for any pending close operation to complete.
  while (session->state & IS_FLAG_CLOSING) {
    waitCond(&session->requestCond, &session->requestMutex);
  }
  if ((session->state & IS_FLAG_WAITING)
      || (session->state & IS_FLAG_DESTROYING)) {
    result = EBUSY;
  } else if (session->state & IS_FLAG_SUSPENDED) {
    result = UDS_SUCCESS;
  } else if (session->state & IS_FLAG_LOADING) {
    session->state |= IS_FLAG_WAITING;
    suspendIndex = true;
    result = UDS_SUCCESS;
  } else if (!(session->state & IS_FLAG_LOADED)) {
    session->state |= IS_FLAG_SUSPENDED;
    broadcastCond(&session->requestCond);
    result = UDS_SUCCESS;
  } else {
    saveIndex = save;
    if (saveIndex) {
      session->state |= IS_FLAG_WAITING;
    } else {
      session->state |= IS_FLAG_SUSPENDED;
      broadcastCond(&session->requestCond);
    }
    result = UDS_SUCCESS;
  }
  unlockMutex(&session->requestMutex);

  if (!saveIndex && !suspendIndex) {
    return result;
  }

  if (saveIndex) {
    result = udsSaveIndex(session);
    lockMutex(&session->requestMutex);
    session->state &= ~IS_FLAG_WAITING;
    session->state |= IS_FLAG_SUSPENDED;
    broadcastCond(&session->requestCond);
    unlockMutex(&session->requestMutex);
    return result;
  }

  lockMutex(&session->loadContext.mutex);
  switch (session->loadContext.status) {
  case INDEX_OPENING:
    session->loadContext.status = INDEX_SUSPENDING;

    // Wait until the index indicates that it is not replaying.
    while ((session->loadContext.status != INDEX_SUSPENDED)
           && (session->loadContext.status != INDEX_READY)) {
      waitCond(&session->loadContext.cond,
               &session->loadContext.mutex);
    }
    break;

  case INDEX_READY:
    // Index load does not need to be suspended.
    break;

  case INDEX_SUSPENDED:
  case INDEX_SUSPENDING:
  case INDEX_FREEING:
  default:
    // These cases should not happen.
    ASSERT_LOG_ONLY(false, "Bad load context state %u",
                    session->loadContext.status);
    break;
  }
  unlockMutex(&session->loadContext.mutex);

  lockMutex(&session->requestMutex);
  session->state &= ~IS_FLAG_WAITING;
  session->state |= IS_FLAG_SUSPENDED;
  broadcastCond(&session->requestCond);
  unlockMutex(&session->requestMutex);
  return UDS_SUCCESS;
}

/**********************************************************************/
int udsResumeIndexSession(struct uds_index_session *session)
{
  lockMutex(&session->requestMutex);
  if (session->state & IS_FLAG_WAITING) {
    unlockMutex(&session->requestMutex);
    return EBUSY;
  }

  /* If not suspended, just succeed */
  if (!(session->state & IS_FLAG_SUSPENDED)) {
    unlockMutex(&session->requestMutex);
    return UDS_SUCCESS;
  }

  if (!(session->state & IS_FLAG_LOADING)) {
    session->state &= ~IS_FLAG_SUSPENDED;
    unlockMutex(&session->requestMutex);
    return UDS_SUCCESS;
  }

  session->state |= IS_FLAG_WAITING;
  unlockMutex(&session->requestMutex);

  lockMutex(&session->loadContext.mutex);
  switch (session->loadContext.status) {
  case INDEX_SUSPENDED:
    session->loadContext.status = INDEX_OPENING;
    // Notify the index to start replaying again.
    broadcastCond(&session->loadContext.cond);
    break;

  case INDEX_READY:
    // There is no index rebuild to resume.
    break;

  case INDEX_OPENING:
  case INDEX_SUSPENDING:
  case INDEX_FREEING:
  default:
    // These cases should not happen; do nothing.
    ASSERT_LOG_ONLY(false, "Bad load context state %u",
                    session->loadContext.status);
    break;
  }
  unlockMutex(&session->loadContext.mutex);

  lockMutex(&session->requestMutex);
  session->state &= ~IS_FLAG_WAITING;
  session->state &= ~IS_FLAG_SUSPENDED;
  broadcastCond(&session->requestCond);
  unlockMutex(&session->requestMutex);
  return UDS_SUCCESS;
}

/**********************************************************************/
static void waitForNoRequestsInProgress(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);
  while (indexSession->requestCount > 0) {
    waitCond(&indexSession->requestCond, &indexSession->requestMutex);
  }
  unlockMutex(&indexSession->requestMutex);
}

/**********************************************************************/
int saveAndFreeIndex(struct uds_index_session *indexSession)
{
  int result = UDS_SUCCESS;
  IndexRouter *router = indexSession->router;
  if (router != NULL) {
    lockMutex(&indexSession->requestMutex);
    bool suspended = (indexSession->state & IS_FLAG_SUSPENDED);
    unlockMutex(&indexSession->requestMutex);
    if (!suspended) {
      result = saveIndexRouter(router);
      if (result != UDS_SUCCESS) {
        logWarningWithStringError(result, "ignoring error from saveIndexRouter");
      }
    }
    freeIndexRouter(router);
    indexSession->router = NULL;

    // Reset all index state that happens to be in the index session, so it
    // doesn't affect any future index.
    lockMutex(&indexSession->loadContext.mutex);
    indexSession->loadContext.status = INDEX_OPENING;
    unlockMutex(&indexSession->loadContext.mutex);

    lockMutex(&indexSession->requestMutex);
    // Only the suspend bit will remain relevant.
    indexSession->state &= IS_FLAG_SUSPENDED;
    unlockMutex(&indexSession->requestMutex);
  }

  logDebug("Closed index");
  return result;
}

/**********************************************************************/
int udsCloseIndex(struct uds_index_session *indexSession)
{
  lockMutex(&indexSession->requestMutex);

  // Wait for any pending suspend, resume or close operations to complete.
  while ((indexSession->state & IS_FLAG_WAITING)
         || (indexSession->state & IS_FLAG_CLOSING)) {
    waitCond(&indexSession->requestCond, &indexSession->requestMutex);
  }

  int result = UDS_SUCCESS;
  if (indexSession->state & IS_FLAG_SUSPENDED) {
    result = UDS_SUSPENDED;
  } else if ((indexSession->state & IS_FLAG_DESTROYING)
             || !(indexSession->state & IS_FLAG_LOADED)) {
    // The index doesn't exist, hasn't finished loading, or is being destroyed.
    result = UDS_NO_INDEXSESSION;
  } else {
    indexSession->state |= IS_FLAG_CLOSING;
  }
  unlockMutex(&indexSession->requestMutex);
  if (result != UDS_SUCCESS) {
    return result;
  }

  logDebug("Closing index");
  waitForNoRequestsInProgress(indexSession);
  result = saveAndFreeIndex(indexSession);

  lockMutex(&indexSession->requestMutex);
  indexSession->state &= ~IS_FLAG_CLOSING;
  broadcastCond(&indexSession->requestCond);
  unlockMutex(&indexSession->requestMutex);
  return result;
}

/**********************************************************************/
int udsDestroyIndexSession(struct uds_index_session *indexSession)
{
  logDebug("Destroying index session");

  bool loadPending = false;
  lockMutex(&indexSession->requestMutex);

  // Wait for any pending suspend, resume, or close operations to complete.
  while ((indexSession->state & IS_FLAG_WAITING)
         || (indexSession->state & IS_FLAG_CLOSING)) {
    waitCond(&indexSession->requestCond, &indexSession->requestMutex);
  }

  if (indexSession->state & IS_FLAG_DESTROYING) {
    unlockMutex(&indexSession->requestMutex);
    return EBUSY;
  }

  indexSession->state |= IS_FLAG_DESTROYING;
  loadPending = ((indexSession->state & IS_FLAG_LOADING)
                 && (indexSession->state & IS_FLAG_SUSPENDED));
  unlockMutex(&indexSession->requestMutex);

  if (loadPending) {
    // Tell the index to terminate the rebuild.
    lockMutex(&indexSession->loadContext.mutex);
    if (indexSession->loadContext.status == INDEX_SUSPENDED) {
      indexSession->loadContext.status = INDEX_FREEING;
      broadcastCond(&indexSession->loadContext.cond);
    }
    unlockMutex(&indexSession->loadContext.mutex);

    // Wait until the load exits before proceeding.
    lockMutex(&indexSession->requestMutex);
    while (indexSession->state & IS_FLAG_LOADING) {
      waitCond(&indexSession->requestCond, &indexSession->requestMutex);
    }
    unlockMutex(&indexSession->requestMutex);
  }

  waitForNoRequestsInProgress(indexSession);
  int result = saveAndFreeIndex(indexSession);
  requestQueueFinish(indexSession->callbackQueue);
  indexSession->callbackQueue = NULL;
  destroyCond(&indexSession->loadContext.cond);
  destroyMutex(&indexSession->loadContext.mutex);
  destroyCond(&indexSession->requestCond);
  destroyMutex(&indexSession->requestMutex);
  logDebug("Destroyed index session");
  FREE(indexSession);
  return result;
}

/**********************************************************************/
int udsFlushIndexSession(struct uds_index_session *indexSession)
{
  waitForNoRequestsInProgress(indexSession);
  // Wait until any open chapter writes are complete
  waitForIdleIndexRouter(indexSession->router);
  return UDS_SUCCESS;
}

/**********************************************************************/
int udsSaveIndex(struct uds_index_session *indexSession)
{
  waitForNoRequestsInProgress(indexSession);
  // saveIndexRouter waits for open chapter writes to complete
  return saveIndexRouter(indexSession->router);
}

/**********************************************************************/
int udsSetCheckpointFrequency(struct uds_index_session *indexSession,
                              unsigned int              frequency)
{
  setIndexCheckpointFrequency(indexSession->router->index->checkpoint,
                              frequency);
  return UDS_SUCCESS;
}

/**********************************************************************/
int udsGetIndexConfiguration(struct uds_index_session *indexSession,
                             UdsConfiguration         *conf)
{
  if (conf == NULL) {
    return logErrorWithStringError(UDS_CONF_PTR_REQUIRED,
                                   "received a NULL config pointer");
  }
  int result = ALLOCATE(1, struct udsConfiguration, __func__, conf);
  if (result == UDS_SUCCESS) {
    **conf = indexSession->userConfig;
  }
  return result;
}

/**********************************************************************/
int udsGetIndexStats(struct uds_index_session *indexSession,
                     UdsIndexStats            *stats)
{
  if (stats == NULL) {
    return logErrorWithStringError(UDS_INDEX_STATS_PTR_REQUIRED,
                                   "received a NULL index stats pointer");
  }
  getIndexStats(indexSession->router->index, stats);
  return UDS_SUCCESS;
}

/**********************************************************************/
int udsGetIndexSessionStats(struct uds_index_session *indexSession,
                            UdsContextStats          *stats)
{
  if (stats == NULL) {
    return logWarningWithStringError(UDS_CONTEXT_STATS_PTR_REQUIRED,
                                     "received a NULL context stats pointer");
  }
  collectStats(indexSession, stats);
  return UDS_SUCCESS;
}