/*
* %Copyright%
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/uds-releases/jasper/src/uds/indexSession.c#10 $
*/
#include "indexSession.h"
#include "indexCheckpoint.h"
#include "indexRouter.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "requestQueue.h"
/**********************************************************************/
static void collectStats(const struct uds_index_session *indexSession,
UdsContextStats *stats)
{
const SessionStats *sessionStats = &indexSession->stats;
stats->currentTime = asTimeT(currentTime(CLOCK_REALTIME));
stats->postsFound = READ_ONCE(sessionStats->postsFound);
stats->inMemoryPostsFound = READ_ONCE(sessionStats->postsFoundOpenChapter);
stats->densePostsFound = READ_ONCE(sessionStats->postsFoundDense);
stats->sparsePostsFound = READ_ONCE(sessionStats->postsFoundSparse);
stats->postsNotFound = READ_ONCE(sessionStats->postsNotFound);
stats->updatesFound = READ_ONCE(sessionStats->updatesFound);
stats->updatesNotFound = READ_ONCE(sessionStats->updatesNotFound);
stats->deletionsFound = READ_ONCE(sessionStats->deletionsFound);
stats->deletionsNotFound = READ_ONCE(sessionStats->deletionsNotFound);
stats->queriesFound = READ_ONCE(sessionStats->queriesFound);
stats->queriesNotFound = READ_ONCE(sessionStats->queriesNotFound);
stats->requests = READ_ONCE(sessionStats->requests);
}
/**********************************************************************/
static void handleCallbacks(Request *request)
{
if (request->status == UDS_SUCCESS) {
// Measure the turnaround time of this request and include that time,
// along with the rest of the request, in the context's StatCounters.
updateRequestContextStats(request);
}
if (request->callback != NULL) {
// The request has specified its own callback and does not expect to be
// freed.
struct uds_index_session *indexSession = request->session;
request->found = (request->location != LOC_UNAVAILABLE);
request->callback((UdsRequest *) request);
// We do this release after the callback because of the contract of the
// udsFlushIndexSession method.
releaseIndexSession(indexSession);
return;
}
// Should not get here, because this is either a control message or it has a
// callback method.
freeRequest(request);
}
/**********************************************************************/
int checkIndexSession(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
unsigned int state = indexSession->state;
unlockMutex(&indexSession->requestMutex);
if (state == IS_FLAG_LOADED) {
return UDS_SUCCESS;
} else if (state & IS_FLAG_DISABLED) {
return UDS_DISABLED;
} else if ((state & IS_FLAG_LOADING)
|| (state & IS_FLAG_SUSPENDED)
|| (state & IS_FLAG_WAITING)) {
return UDS_SUSPENDED;
}
return UDS_NO_INDEXSESSION;
}
/**********************************************************************/
int getIndexSession(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
indexSession->requestCount++;
unlockMutex(&indexSession->requestMutex);
int result = checkIndexSession(indexSession);
if (result != UDS_SUCCESS) {
releaseIndexSession(indexSession);
return result;
}
return UDS_SUCCESS;
}
/**********************************************************************/
void releaseIndexSession(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
if (--indexSession->requestCount == 0) {
broadcastCond(&indexSession->requestCond);
}
unlockMutex(&indexSession->requestMutex);
}
/**********************************************************************/
int startLoadingIndexSession(struct uds_index_session *indexSession)
{
int result;
lockMutex(&indexSession->requestMutex);
if (indexSession->state & IS_FLAG_SUSPENDED) {
result = UDS_SUSPENDED;
} else if (indexSession->state != 0) {
result = UDS_INDEXSESSION_IN_USE;
} else {
indexSession->state |= IS_FLAG_LOADING;
result = UDS_SUCCESS;
}
unlockMutex(&indexSession->requestMutex);
return result;
}
/**********************************************************************/
void finishLoadingIndexSession(struct uds_index_session *indexSession,
int result)
{
lockMutex(&indexSession->requestMutex);
indexSession->state &= ~IS_FLAG_LOADING;
if (result == UDS_SUCCESS) {
indexSession->state |= IS_FLAG_LOADED;
}
broadcastCond(&indexSession->requestCond);
unlockMutex(&indexSession->requestMutex);
}
/**********************************************************************/
void disableIndexSession(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
indexSession->state |= IS_FLAG_DISABLED;
unlockMutex(&indexSession->requestMutex);
}
/**********************************************************************/
int makeEmptyIndexSession(struct uds_index_session **indexSessionPtr)
{
struct uds_index_session *session;
int result = ALLOCATE(1, struct uds_index_session, __func__, &session);
if (result != UDS_SUCCESS) {
return result;
}
result = initMutex(&session->requestMutex);
if (result != UDS_SUCCESS) {
FREE(session);
return result;
}
result = initCond(&session->requestCond);
if (result != UDS_SUCCESS) {
destroyMutex(&session->requestMutex);
FREE(session);
return result;
}
result = initMutex(&session->loadContext.mutex);
if (result != UDS_SUCCESS) {
destroyCond(&session->requestCond);
destroyMutex(&session->requestMutex);
FREE(session);
return result;
}
result = initCond(&session->loadContext.cond);
if (result != UDS_SUCCESS) {
destroyMutex(&session->loadContext.mutex);
destroyCond(&session->requestCond);
destroyMutex(&session->requestMutex);
FREE(session);
return result;
}
result = makeRequestQueue("callbackW", &handleCallbacks,
&session->callbackQueue);
if (result != UDS_SUCCESS) {
destroyCond(&session->loadContext.cond);
destroyMutex(&session->loadContext.mutex);
destroyCond(&session->requestCond);
destroyMutex(&session->requestMutex);
FREE(session);
return result;
}
*indexSessionPtr = session;
return UDS_SUCCESS;
}
/**********************************************************************/
int udsSuspendIndexSession(struct uds_index_session *session, bool save)
{
int result;
bool saveIndex = false;
bool suspendIndex = false;
lockMutex(&session->requestMutex);
// Wait for any pending close operation to complete.
while (session->state & IS_FLAG_CLOSING) {
waitCond(&session->requestCond, &session->requestMutex);
}
if ((session->state & IS_FLAG_WAITING)
|| (session->state & IS_FLAG_DESTROYING)) {
result = EBUSY;
} else if (session->state & IS_FLAG_SUSPENDED) {
result = UDS_SUCCESS;
} else if (session->state & IS_FLAG_LOADING) {
session->state |= IS_FLAG_WAITING;
suspendIndex = true;
result = UDS_SUCCESS;
} else if (!(session->state & IS_FLAG_LOADED)) {
session->state |= IS_FLAG_SUSPENDED;
broadcastCond(&session->requestCond);
result = UDS_SUCCESS;
} else {
saveIndex = save;
if (saveIndex) {
session->state |= IS_FLAG_WAITING;
} else {
session->state |= IS_FLAG_SUSPENDED;
broadcastCond(&session->requestCond);
}
result = UDS_SUCCESS;
}
unlockMutex(&session->requestMutex);
if (!saveIndex && !suspendIndex) {
return result;
}
if (saveIndex) {
result = udsSaveIndex(session);
lockMutex(&session->requestMutex);
session->state &= ~IS_FLAG_WAITING;
session->state |= IS_FLAG_SUSPENDED;
broadcastCond(&session->requestCond);
unlockMutex(&session->requestMutex);
return result;
}
lockMutex(&session->loadContext.mutex);
switch (session->loadContext.status) {
case INDEX_OPENING:
session->loadContext.status = INDEX_SUSPENDING;
// Wait until the index indicates that it is not replaying.
while ((session->loadContext.status != INDEX_SUSPENDED)
&& (session->loadContext.status != INDEX_READY)) {
waitCond(&session->loadContext.cond,
&session->loadContext.mutex);
}
break;
case INDEX_READY:
// Index load does not need to be suspended.
break;
case INDEX_SUSPENDED:
case INDEX_SUSPENDING:
case INDEX_FREEING:
default:
// These cases should not happen.
ASSERT_LOG_ONLY(false, "Bad load context state %u",
session->loadContext.status);
break;
}
unlockMutex(&session->loadContext.mutex);
lockMutex(&session->requestMutex);
session->state &= ~IS_FLAG_WAITING;
session->state |= IS_FLAG_SUSPENDED;
broadcastCond(&session->requestCond);
unlockMutex(&session->requestMutex);
return UDS_SUCCESS;
}
/**********************************************************************/
int udsResumeIndexSession(struct uds_index_session *session)
{
lockMutex(&session->requestMutex);
if (session->state & IS_FLAG_WAITING) {
unlockMutex(&session->requestMutex);
return EBUSY;
}
/* If not suspended, just succeed */
if (!(session->state & IS_FLAG_SUSPENDED)) {
unlockMutex(&session->requestMutex);
return UDS_SUCCESS;
}
if (!(session->state & IS_FLAG_LOADING)) {
session->state &= ~IS_FLAG_SUSPENDED;
unlockMutex(&session->requestMutex);
return UDS_SUCCESS;
}
session->state |= IS_FLAG_WAITING;
unlockMutex(&session->requestMutex);
lockMutex(&session->loadContext.mutex);
switch (session->loadContext.status) {
case INDEX_SUSPENDED:
session->loadContext.status = INDEX_OPENING;
// Notify the index to start replaying again.
broadcastCond(&session->loadContext.cond);
break;
case INDEX_READY:
// There is no index rebuild to resume.
break;
case INDEX_OPENING:
case INDEX_SUSPENDING:
case INDEX_FREEING:
default:
// These cases should not happen; do nothing.
ASSERT_LOG_ONLY(false, "Bad load context state %u",
session->loadContext.status);
break;
}
unlockMutex(&session->loadContext.mutex);
lockMutex(&session->requestMutex);
session->state &= ~IS_FLAG_WAITING;
session->state &= ~IS_FLAG_SUSPENDED;
broadcastCond(&session->requestCond);
unlockMutex(&session->requestMutex);
return UDS_SUCCESS;
}
/**********************************************************************/
static void waitForNoRequestsInProgress(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
while (indexSession->requestCount > 0) {
waitCond(&indexSession->requestCond, &indexSession->requestMutex);
}
unlockMutex(&indexSession->requestMutex);
}
/**********************************************************************/
int saveAndFreeIndex(struct uds_index_session *indexSession)
{
int result = UDS_SUCCESS;
IndexRouter *router = indexSession->router;
if (router != NULL) {
lockMutex(&indexSession->requestMutex);
bool suspended = (indexSession->state & IS_FLAG_SUSPENDED);
unlockMutex(&indexSession->requestMutex);
if (!suspended) {
result = saveIndexRouter(router);
if (result != UDS_SUCCESS) {
logWarningWithStringError(result, "ignoring error from saveIndexRouter");
}
}
freeIndexRouter(router);
indexSession->router = NULL;
// Reset all index state that happens to be in the index session, so it
// doesn't affect any future index.
lockMutex(&indexSession->loadContext.mutex);
indexSession->loadContext.status = INDEX_OPENING;
unlockMutex(&indexSession->loadContext.mutex);
lockMutex(&indexSession->requestMutex);
// Only the suspend bit will remain relevant.
indexSession->state &= IS_FLAG_SUSPENDED;
unlockMutex(&indexSession->requestMutex);
}
logDebug("Closed index");
return result;
}
/**********************************************************************/
int udsCloseIndex(struct uds_index_session *indexSession)
{
lockMutex(&indexSession->requestMutex);
// Wait for any pending suspend, resume or close operations to complete.
while ((indexSession->state & IS_FLAG_WAITING)
|| (indexSession->state & IS_FLAG_CLOSING)) {
waitCond(&indexSession->requestCond, &indexSession->requestMutex);
}
int result = UDS_SUCCESS;
if (indexSession->state & IS_FLAG_SUSPENDED) {
result = UDS_SUSPENDED;
} else if ((indexSession->state & IS_FLAG_DESTROYING)
|| !(indexSession->state & IS_FLAG_LOADED)) {
// The index doesn't exist, hasn't finished loading, or is being destroyed.
result = UDS_NO_INDEXSESSION;
} else {
indexSession->state |= IS_FLAG_CLOSING;
}
unlockMutex(&indexSession->requestMutex);
if (result != UDS_SUCCESS) {
return result;
}
logDebug("Closing index");
waitForNoRequestsInProgress(indexSession);
result = saveAndFreeIndex(indexSession);
lockMutex(&indexSession->requestMutex);
indexSession->state &= ~IS_FLAG_CLOSING;
broadcastCond(&indexSession->requestCond);
unlockMutex(&indexSession->requestMutex);
return result;
}
/**********************************************************************/
int udsDestroyIndexSession(struct uds_index_session *indexSession)
{
logDebug("Destroying index session");
bool loadPending = false;
lockMutex(&indexSession->requestMutex);
// Wait for any pending suspend, resume, or close operations to complete.
while ((indexSession->state & IS_FLAG_WAITING)
|| (indexSession->state & IS_FLAG_CLOSING)) {
waitCond(&indexSession->requestCond, &indexSession->requestMutex);
}
if (indexSession->state & IS_FLAG_DESTROYING) {
unlockMutex(&indexSession->requestMutex);
return EBUSY;
}
indexSession->state |= IS_FLAG_DESTROYING;
loadPending = ((indexSession->state & IS_FLAG_LOADING)
&& (indexSession->state & IS_FLAG_SUSPENDED));
unlockMutex(&indexSession->requestMutex);
if (loadPending) {
// Tell the index to terminate the rebuild.
lockMutex(&indexSession->loadContext.mutex);
if (indexSession->loadContext.status == INDEX_SUSPENDED) {
indexSession->loadContext.status = INDEX_FREEING;
broadcastCond(&indexSession->loadContext.cond);
}
unlockMutex(&indexSession->loadContext.mutex);
// Wait until the load exits before proceeding.
lockMutex(&indexSession->requestMutex);
while (indexSession->state & IS_FLAG_LOADING) {
waitCond(&indexSession->requestCond, &indexSession->requestMutex);
}
unlockMutex(&indexSession->requestMutex);
}
waitForNoRequestsInProgress(indexSession);
int result = saveAndFreeIndex(indexSession);
requestQueueFinish(indexSession->callbackQueue);
indexSession->callbackQueue = NULL;
destroyCond(&indexSession->loadContext.cond);
destroyMutex(&indexSession->loadContext.mutex);
destroyCond(&indexSession->requestCond);
destroyMutex(&indexSession->requestMutex);
logDebug("Destroyed index session");
FREE(indexSession);
return result;
}
/**********************************************************************/
int udsFlushIndexSession(struct uds_index_session *indexSession)
{
waitForNoRequestsInProgress(indexSession);
// Wait until any open chapter writes are complete
waitForIdleIndexRouter(indexSession->router);
return UDS_SUCCESS;
}
/**********************************************************************/
int udsSaveIndex(struct uds_index_session *indexSession)
{
waitForNoRequestsInProgress(indexSession);
// saveIndexRouter waits for open chapter writes to complete
return saveIndexRouter(indexSession->router);
}
/**********************************************************************/
int udsSetCheckpointFrequency(struct uds_index_session *indexSession,
unsigned int frequency)
{
setIndexCheckpointFrequency(indexSession->router->index->checkpoint,
frequency);
return UDS_SUCCESS;
}
/**********************************************************************/
int udsGetIndexConfiguration(struct uds_index_session *indexSession,
UdsConfiguration *conf)
{
if (conf == NULL) {
return logErrorWithStringError(UDS_CONF_PTR_REQUIRED,
"received a NULL config pointer");
}
int result = ALLOCATE(1, struct udsConfiguration, __func__, conf);
if (result == UDS_SUCCESS) {
**conf = indexSession->userConfig;
}
return result;
}
/**********************************************************************/
int udsGetIndexStats(struct uds_index_session *indexSession,
UdsIndexStats *stats)
{
if (stats == NULL) {
return logErrorWithStringError(UDS_INDEX_STATS_PTR_REQUIRED,
"received a NULL index stats pointer");
}
getIndexStats(indexSession->router->index, stats);
return UDS_SUCCESS;
}
/**********************************************************************/
int udsGetIndexSessionStats(struct uds_index_session *indexSession,
UdsContextStats *stats)
{
if (stats == NULL) {
return logWarningWithStringError(UDS_CONTEXT_STATS_PTR_REQUIRED,
"received a NULL context stats pointer");
}
collectStats(indexSession, stats);
return UDS_SUCCESS;
}