/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/kernel/udsIndex.c#16 $
*/
#include "udsIndex.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "murmur/MurmurHash3.h"
#include "numeric.h"
#include "stringUtils.h"
#include "uds-block.h"
/*****************************************************************************/
typedef struct udsAttribute {
struct attribute attr;
const char *(*showString)(DedupeIndex *);
} UDSAttribute;
/*****************************************************************************/
enum { UDS_Q_ACTION };
/*****************************************************************************/
// These are the values in the atomic dedupeContext.requestState field
enum {
// The UdsRequest object is not in use.
UR_IDLE = 0,
// The UdsRequest object is in use, and VDO is waiting for the result.
UR_BUSY = 1,
// The UdsRequest object is in use, but has timed out.
UR_TIMED_OUT = 2,
};
/*****************************************************************************/
typedef enum {
// The UDS index is closed
IS_CLOSED = 0,
// The UDS index session is opening or closing
IS_CHANGING = 1,
// The UDS index is open. There is a UDS index session.
IS_OPENED = 2,
} IndexState;
/*****************************************************************************/
typedef struct udsIndex {
DedupeIndex common;
struct kobject dedupeObject;
RegisteredThread allocatingThread;
char *indexName;
UdsConfiguration configuration;
struct uds_parameters udsParams;
struct uds_index_session *indexSession;
atomic_t active;
// This spinlock protects the state fields and the starting of dedupe
// requests.
spinlock_t stateLock;
KvdoWorkItem workItem; // protected by stateLock
KvdoWorkQueue *udsQueue; // protected by stateLock
unsigned int maximum; // protected by stateLock
IndexState indexState; // protected by stateLock
IndexState indexTarget; // protected by stateLock
bool changing; // protected by stateLock
bool createFlag; // protected by stateLock
bool dedupeFlag; // protected by stateLock
bool deduping; // protected by stateLock
bool errorFlag; // protected by stateLock
bool suspended; // protected by stateLock
// This spinlock protects the pending list, the pending flag in each KVIO,
// and the timeout list.
spinlock_t pendingLock;
struct list_head pendingHead; // protected by pendingLock
struct timer_list pendingTimer; // protected by pendingLock
bool startedTimer; // protected by pendingLock
} UDSIndex;
/*****************************************************************************/
// Version 1: user space albireo index (limited to 32 bytes)
// Version 2: kernel space albireo index (limited to 16 bytes)
enum {
UDS_ADVICE_VERSION = 2,
// version byte + state byte + 64-bit little-endian PBN
UDS_ADVICE_SIZE = 1 + 1 + sizeof(uint64_t),
};
/*****************************************************************************/
// We want to ensure that there is only one copy of the following constants.
static const char *CLOSED = "closed";
static const char *CLOSING = "closing";
static const char *ERROR = "error";
static const char *OFFLINE = "offline";
static const char *ONLINE = "online";
static const char *OPENING = "opening";
static const char *SUSPENDED = "suspended";
static const char *UNKNOWN = "unknown";
/*****************************************************************************/
static const char *indexStateToString(UDSIndex *index, IndexState state)
{
if (index->suspended) {
return SUSPENDED;
}
switch (state) {
case IS_CLOSED:
// Closed. The errorFlag tells if it is because of an error.
return index->errorFlag ? ERROR : CLOSED;
case IS_CHANGING:
// The indexTarget tells if we are opening or closing the index.
return index->indexTarget == IS_OPENED ? OPENING : CLOSING;
case IS_OPENED:
// Opened. The dedupeFlag tells if we are online or offline.
return index->dedupeFlag ? ONLINE : OFFLINE;
default:
return UNKNOWN;
}
}
/**
* Encode VDO duplicate advice into the newMetadata field of a UDS request.
*
* @param request The UDS request to receive the encoding
* @param advice The advice to encode
**/
static void encodeUDSAdvice(UdsRequest *request, DataLocation advice)
{
size_t offset = 0;
struct udsChunkData *encoding = &request->newMetadata;
encoding->data[offset++] = UDS_ADVICE_VERSION;
encoding->data[offset++] = advice.state;
encodeUInt64LE(encoding->data, &offset, advice.pbn);
BUG_ON(offset != UDS_ADVICE_SIZE);
}
/**
* Decode VDO duplicate advice from the oldMetadata field of a UDS request.
*
* @param request The UDS request containing the encoding
* @param advice The DataLocation to receive the decoded advice
*
* @return <code>true</code> if valid advice was found and decoded
**/
static bool decodeUDSAdvice(const UdsRequest *request, DataLocation *advice)
{
if ((request->status != UDS_SUCCESS) || !request->found) {
return false;
}
size_t offset = 0;
const struct udsChunkData *encoding = &request->oldMetadata;
byte version = encoding->data[offset++];
if (version != UDS_ADVICE_VERSION) {
logError("invalid UDS advice version code %u", version);
return false;
}
advice->state = encoding->data[offset++];
decodeUInt64LE(encoding->data, &offset, &advice->pbn);
BUG_ON(offset != UDS_ADVICE_SIZE);
return true;
}
/*****************************************************************************/
static void finishIndexOperation(UdsRequest *udsRequest)
{
DataKVIO *dataKVIO = container_of(udsRequest, DataKVIO,
dedupeContext.udsRequest);
DedupeContext *dedupeContext = &dataKVIO->dedupeContext;
if (compareAndSwap32(&dedupeContext->requestState, UR_BUSY, UR_IDLE)) {
KVIO *kvio = dataKVIOAsKVIO(dataKVIO);
UDSIndex *index = container_of(kvio->layer->dedupeIndex, UDSIndex, common);
spin_lock_bh(&index->pendingLock);
if (dedupeContext->isPending) {
list_del(&dedupeContext->pendingList);
dedupeContext->isPending = false;
}
spin_unlock_bh(&index->pendingLock);
dedupeContext->status = udsRequest->status;
if ((udsRequest->type == UDS_POST) || (udsRequest->type == UDS_QUERY)) {
DataLocation advice;
if (decodeUDSAdvice(udsRequest, &advice)) {
setDedupeAdvice(dedupeContext, &advice);
} else {
setDedupeAdvice(dedupeContext, NULL);
}
}
invokeDedupeCallback(dataKVIO);
atomic_dec(&index->active);
} else {
compareAndSwap32(&dedupeContext->requestState, UR_TIMED_OUT, UR_IDLE);
}
}
/*****************************************************************************/
static void startExpirationTimer(UDSIndex *index, DataKVIO *dataKVIO)
{
if (!index->startedTimer) {
index->startedTimer = true;
mod_timer(&index->pendingTimer,
getAlbireoTimeout(dataKVIO->dedupeContext.submissionTime));
}
}
/*****************************************************************************/
static void startIndexOperation(KvdoWorkItem *item)
{
KVIO *kvio = workItemAsKVIO(item);
DataKVIO *dataKVIO = kvioAsDataKVIO(kvio);
UDSIndex *index = container_of(kvio->layer->dedupeIndex, UDSIndex, common);
DedupeContext *dedupeContext = &dataKVIO->dedupeContext;
spin_lock_bh(&index->pendingLock);
list_add_tail(&dedupeContext->pendingList, &index->pendingHead);
dedupeContext->isPending = true;
startExpirationTimer(index, dataKVIO);
spin_unlock_bh(&index->pendingLock);
UdsRequest *udsRequest = &dedupeContext->udsRequest;
int status = udsStartChunkOperation(udsRequest);
if (status != UDS_SUCCESS) {
udsRequest->status = status;
finishIndexOperation(udsRequest);
}
}
/*****************************************************************************/
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
static void timeoutIndexOperations(struct timer_list *t)
#else
static void timeoutIndexOperations(unsigned long arg)
#endif
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
UDSIndex *index = from_timer(index, t, pendingTimer);
#else
UDSIndex *index = (UDSIndex *) arg;
#endif
LIST_HEAD(expiredHead);
uint64_t timeoutJiffies = msecs_to_jiffies(albireoTimeoutInterval);
unsigned long earliestSubmissionAllowed = jiffies - timeoutJiffies;
spin_lock_bh(&index->pendingLock);
index->startedTimer = false;
while (!list_empty(&index->pendingHead)) {
DataKVIO *dataKVIO = list_first_entry(&index->pendingHead, DataKVIO,
dedupeContext.pendingList);
DedupeContext *dedupeContext = &dataKVIO->dedupeContext;
if (earliestSubmissionAllowed <= dedupeContext->submissionTime) {
startExpirationTimer(index, dataKVIO);
break;
}
list_del(&dedupeContext->pendingList);
dedupeContext->isPending = false;
list_add_tail(&dedupeContext->pendingList, &expiredHead);
}
spin_unlock_bh(&index->pendingLock);
while (!list_empty(&expiredHead)) {
DataKVIO *dataKVIO = list_first_entry(&expiredHead, DataKVIO,
dedupeContext.pendingList);
DedupeContext *dedupeContext = &dataKVIO->dedupeContext;
list_del(&dedupeContext->pendingList);
if (compareAndSwap32(&dedupeContext->requestState,
UR_BUSY, UR_TIMED_OUT)) {
dedupeContext->status = ETIMEDOUT;
invokeDedupeCallback(dataKVIO);
atomic_dec(&index->active);
kvdoReportDedupeTimeout(dataKVIOAsKVIO(dataKVIO)->layer, 1);
}
}
}
/*****************************************************************************/
static void enqueueIndexOperation(DataKVIO *dataKVIO,
UdsCallbackType operation)
{
KVIO *kvio = dataKVIOAsKVIO(dataKVIO);
DedupeContext *dedupeContext = &dataKVIO->dedupeContext;
UDSIndex *index = container_of(kvio->layer->dedupeIndex, UDSIndex, common);
dedupeContext->status = UDS_SUCCESS;
dedupeContext->submissionTime = jiffies;
if (compareAndSwap32(&dedupeContext->requestState, UR_IDLE, UR_BUSY)) {
UdsRequest *udsRequest = &dataKVIO->dedupeContext.udsRequest;
udsRequest->chunkName = *dedupeContext->chunkName;
udsRequest->callback = finishIndexOperation;
udsRequest->session = index->indexSession;
udsRequest->type = operation;
udsRequest->update = true;
if ((operation == UDS_POST) || (operation == UDS_UPDATE)) {
encodeUDSAdvice(udsRequest, getDedupeAdvice(dedupeContext));
}
setupWorkItem(&kvio->enqueueable.workItem, startIndexOperation, NULL,
UDS_Q_ACTION);
spin_lock(&index->stateLock);
if (index->deduping) {
enqueueWorkQueue(index->udsQueue, &kvio->enqueueable.workItem);
unsigned int active = atomic_inc_return(&index->active);
if (active > index->maximum) {
index->maximum = active;
}
kvio = NULL;
} else {
atomicStore32(&dedupeContext->requestState, UR_IDLE);
}
spin_unlock(&index->stateLock);
} else {
// A previous user of the KVIO had a dedupe timeout
// and its request is still outstanding.
atomic64_inc(&kvio->layer->dedupeContextBusy);
}
if (kvio != NULL) {
invokeDedupeCallback(dataKVIO);
}
}
/*****************************************************************************/
static void closeIndex(UDSIndex *index)
{
// Change the index state so that getIndexStatistics will not try to
// use the index session we are closing.
index->indexState = IS_CHANGING;
spin_unlock(&index->stateLock);
int result = udsCloseIndex(index->indexSession);
if (result != UDS_SUCCESS) {
logErrorWithStringError(result, "Error closing index %s",
index->indexName);
}
spin_lock(&index->stateLock);
index->indexState = IS_CLOSED;
index->errorFlag |= result != UDS_SUCCESS;
// ASSERTION: We leave in IS_CLOSED state.
}
/*****************************************************************************/
static void openIndex(UDSIndex *index)
{
// ASSERTION: We enter in IS_CLOSED state.
bool createFlag = index->createFlag;
index->createFlag = false;
// Change the index state so that the it will be reported to the outside
// world as "opening".
index->indexState = IS_CHANGING;
index->errorFlag = false;
// Open the index session, while not holding the stateLock
spin_unlock(&index->stateLock);
int result = udsOpenIndex(createFlag ? UDS_CREATE : UDS_LOAD,
index->indexName, &index->udsParams,
index->configuration, index->indexSession);
if (result != UDS_SUCCESS) {
logErrorWithStringError(result, "Error opening index %s",
index->indexName);
}
spin_lock(&index->stateLock);
if (!createFlag) {
switch (result) {
case UDS_CORRUPT_COMPONENT:
case UDS_NO_INDEX:
// Either there is no index, or there is no way we can recover the index.
// We will be called again and try to create a new index.
index->indexState = IS_CLOSED;
index->createFlag = true;
return;
default:
break;
}
}
if (result == UDS_SUCCESS) {
index->indexState = IS_OPENED;
} else {
index->indexState = IS_CLOSED;
index->indexTarget = IS_CLOSED;
index->errorFlag = true;
spin_unlock(&index->stateLock);
logInfo("Setting UDS index target state to error");
spin_lock(&index->stateLock);
}
// ASSERTION: On success, we leave in IS_OPEN state.
// ASSERTION: On failure, we leave in IS_CLOSED state.
}
/*****************************************************************************/
static void changeDedupeState(KvdoWorkItem *item)
{
UDSIndex *index = container_of(item, UDSIndex, workItem);
spin_lock(&index->stateLock);
// Loop until the index is in the target state and the create flag is
// clear.
while (!index->suspended &&
((index->indexState != index->indexTarget) ||
index->createFlag)) {
if (index->indexState == IS_OPENED) {
closeIndex(index);
} else {
openIndex(index);
}
}
index->changing = false;
index->deduping = index->dedupeFlag && (index->indexState == IS_OPENED);
spin_unlock(&index->stateLock);
}
/*****************************************************************************/
static void launchDedupeStateChange(UDSIndex *index)
{
// ASSERTION: We enter with the state_lock held.
if (index->changing || index->suspended) {
// Either a change is already in progress, or changes are
// not allowed.
return;
}
if (index->createFlag ||
(index->indexState != index->indexTarget)) {
index->changing = true;
index->deduping = false;
setupWorkItem(&index->workItem,
changeDedupeState,
NULL,
UDS_Q_ACTION);
enqueueWorkQueue(index->udsQueue, &index->workItem);
return;
}
// Online vs. offline changes happen immediately
index->deduping = (index->dedupeFlag && !index->suspended &&
(index->indexState == IS_OPENED));
// ASSERTION: We exit with the state_lock held.
}
/*****************************************************************************/
static void setTargetState(UDSIndex *index,
IndexState target,
bool changeDedupe,
bool dedupe,
bool setCreate)
{
spin_lock(&index->stateLock);
const char *oldState = indexStateToString(index, index->indexTarget);
if (changeDedupe) {
index->dedupeFlag = dedupe;
}
if (setCreate) {
index->createFlag = true;
}
index->indexTarget = target;
launchDedupeStateChange(index);
const char *newState = indexStateToString(index, index->indexTarget);
spin_unlock(&index->stateLock);
if (oldState != newState) {
logInfo("Setting UDS index target state to %s", newState);
}
}
/*****************************************************************************/
static void suspendUDSIndex(DedupeIndex *dedupeIndex, bool saveFlag)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
spin_lock(&index->stateLock);
index->suspended = true;
IndexState indexState = index->indexState;
spin_unlock(&index->stateLock);
if (indexState != IS_CLOSED) {
int result = udsSuspendIndexSession(index->indexSession, saveFlag);
if (result != UDS_SUCCESS) {
logErrorWithStringError(result, "Error suspending dedupe index");
}
}
}
/*****************************************************************************/
static void resumeUDSIndex(DedupeIndex *dedupeIndex)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
int result = udsResumeIndexSession(index->indexSession);
if (result != UDS_SUCCESS) {
logErrorWithStringError(result, "Error resuming dedupe index");
}
spin_lock(&index->stateLock);
index->suspended = false;
launchDedupeStateChange(index);
spin_unlock(&index->stateLock);
}
/*****************************************************************************/
/*****************************************************************************/
static void dumpUDSIndex(DedupeIndex *dedupeIndex, bool showQueue)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
spin_lock(&index->stateLock);
const char *state = indexStateToString(index, index->indexState);
const char *target = (index->changing
? indexStateToString(index, index->indexTarget)
: NULL);
spin_unlock(&index->stateLock);
logInfo("UDS index: state: %s", state);
if (target != NULL) {
logInfo("UDS index: changing to state: %s", target);
}
if (showQueue) {
dumpWorkQueue(index->udsQueue);
}
}
/*****************************************************************************/
static void finishUDSIndex(DedupeIndex *dedupeIndex)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
setTargetState(index, IS_CLOSED, false, false, false);
udsDestroyIndexSession(index->indexSession);
finishWorkQueue(index->udsQueue);
}
/*****************************************************************************/
static void freeUDSIndex(DedupeIndex *dedupeIndex)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
freeWorkQueue(&index->udsQueue);
spin_lock_bh(&index->pendingLock);
if (index->startedTimer) {
del_timer_sync(&index->pendingTimer);
}
spin_unlock_bh(&index->pendingLock);
kobject_put(&index->dedupeObject);
}
/*****************************************************************************/
static const char *getUDSStateName(DedupeIndex *dedupeIndex)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
spin_lock(&index->stateLock);
const char *state = indexStateToString(index, index->indexState);
spin_unlock(&index->stateLock);
return state;
}
/*****************************************************************************/
static void getUDSStatistics(DedupeIndex *dedupeIndex, IndexStatistics *stats)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
spin_lock(&index->stateLock);
IndexState indexState = index->indexState;
stats->maxDedupeQueries = index->maximum;
spin_unlock(&index->stateLock);
stats->currDedupeQueries = atomic_read(&index->active);
if (indexState == IS_OPENED) {
UdsIndexStats indexStats;
int result = udsGetIndexStats(index->indexSession, &indexStats);
if (result == UDS_SUCCESS) {
stats->entriesIndexed = indexStats.entriesIndexed;
} else {
logErrorWithStringError(result, "Error reading index stats");
}
UdsContextStats contextStats;
result = udsGetIndexSessionStats(index->indexSession, &contextStats);
if (result == UDS_SUCCESS) {
stats->postsFound = contextStats.postsFound;
stats->postsNotFound = contextStats.postsNotFound;
stats->queriesFound = contextStats.queriesFound;
stats->queriesNotFound = contextStats.queriesNotFound;
stats->updatesFound = contextStats.updatesFound;
stats->updatesNotFound = contextStats.updatesNotFound;
} else {
logErrorWithStringError(result, "Error reading context stats");
}
}
}
/*****************************************************************************/
static int processMessage(DedupeIndex *dedupeIndex, const char *name)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
if (strcasecmp(name, "index-close") == 0) {
setTargetState(index, IS_CLOSED, false, false, false);
return 0;
} else if (strcasecmp(name, "index-create") == 0) {
setTargetState(index, IS_OPENED, false, false, true);
return 0;
} else if (strcasecmp(name, "index-disable") == 0) {
setTargetState(index, IS_OPENED, true, false, false);
return 0;
} else if (strcasecmp(name, "index-enable") == 0) {
setTargetState(index, IS_OPENED, true, true, false);
return 0;
}
return -EINVAL;
}
/*****************************************************************************/
static void udsPost(DataKVIO *dataKVIO)
{
enqueueIndexOperation(dataKVIO, UDS_POST);
}
/*****************************************************************************/
static void udsQuery(DataKVIO *dataKVIO)
{
enqueueIndexOperation(dataKVIO, UDS_QUERY);
}
/*****************************************************************************/
static void startUDSIndex(DedupeIndex *dedupeIndex, bool createFlag)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
setTargetState(index, IS_OPENED, true, true, createFlag);
}
/*****************************************************************************/
static void stopUDSIndex(DedupeIndex *dedupeIndex)
{
UDSIndex *index = container_of(dedupeIndex, UDSIndex, common);
setTargetState(index, IS_CLOSED, false, false, false);
}
/*****************************************************************************/
static void udsUpdate(DataKVIO *dataKVIO)
{
enqueueIndexOperation(dataKVIO, UDS_UPDATE);
}
/*****************************************************************************/
static void dedupeKobjRelease(struct kobject *kobj)
{
UDSIndex *index = container_of(kobj, UDSIndex, dedupeObject);
udsFreeConfiguration(index->configuration);
FREE(index->indexName);
FREE(index);
}
/*****************************************************************************/
static ssize_t dedupeStatusShow(struct kobject *kobj,
struct attribute *attr,
char *buf)
{
UDSAttribute *ua = container_of(attr, UDSAttribute, attr);
UDSIndex *index = container_of(kobj, UDSIndex, dedupeObject);
if (ua->showString != NULL) {
return sprintf(buf, "%s\n", ua->showString(&index->common));
} else {
return -EINVAL;
}
}
/*****************************************************************************/
static ssize_t dedupeStatusStore(struct kobject *kobj,
struct attribute *attr,
const char *buf,
size_t length)
{
return -EINVAL;
}
/*****************************************************************************/
static struct sysfs_ops dedupeSysfsOps = {
.show = dedupeStatusShow,
.store = dedupeStatusStore,
};
static UDSAttribute dedupeStatusAttribute = {
.attr = {.name = "status", .mode = 0444, },
.showString = getUDSStateName,
};
static struct attribute *dedupeAttributes[] = {
&dedupeStatusAttribute.attr,
NULL,
};
static struct kobj_type dedupeKobjType = {
.release = dedupeKobjRelease,
.sysfs_ops = &dedupeSysfsOps,
.default_attrs = dedupeAttributes,
};
/*****************************************************************************/
static void startUDSQueue(void *ptr)
{
/*
* Allow the UDS dedupe worker thread to do memory allocations. It will
* only do allocations during the UDS calls that open or close an index,
* but those allocations can safely sleep while reserving a large amount
* of memory. We could use an allocationsAllowed boolean (like the base
* threads do), but it would be an unnecessary embellishment.
*/
UDSIndex *index = ptr;
registerAllocatingThread(&index->allocatingThread, NULL);
}
/*****************************************************************************/
static void finishUDSQueue(void *ptr)
{
unregisterAllocatingThread();
}
/*****************************************************************************/
int makeUDSIndex(KernelLayer *layer, DedupeIndex **indexPtr)
{
UDSIndex *index;
int result = ALLOCATE(1, UDSIndex, "UDS index data", &index);
if (result != UDS_SUCCESS) {
return result;
}
result = allocSprintf("index name", &index->indexName,
"dev=%s offset=4096 size=%llu",
layer->deviceConfig->parentDeviceName,
getIndexRegionSize(layer->geometry) * VDO_BLOCK_SIZE);
if (result != UDS_SUCCESS) {
logError("Creating index name failed (%d)", result);
FREE(index);
return result;
}
index->udsParams = (struct uds_parameters) UDS_PARAMETERS_INITIALIZER;
indexConfigToUdsParameters(&layer->geometry.indexConfig, &index->udsParams);
result = indexConfigToUdsConfiguration(&layer->geometry.indexConfig,
&index->configuration);
if (result != VDO_SUCCESS) {
FREE(index->indexName);
FREE(index);
return result;
}
udsConfigurationSetNonce(index->configuration,
(UdsNonce) layer->geometry.nonce);
result = udsCreateIndexSession(&index->indexSession);
if (result != UDS_SUCCESS) {
udsFreeConfiguration(index->configuration);
FREE(index->indexName);
FREE(index);
return result;
}
static const KvdoWorkQueueType udsQueueType = {
.start = startUDSQueue,
.finish = finishUDSQueue,
.actionTable = {
{ .name = "uds_action", .code = UDS_Q_ACTION, .priority = 0 },
},
};
result = makeWorkQueue(layer->threadNamePrefix, "dedupeQ",
&layer->wqDirectory, layer, index, &udsQueueType, 1,
&index->udsQueue);
if (result != VDO_SUCCESS) {
logError("UDS index queue initialization failed (%d)", result);
udsDestroyIndexSession(index->indexSession);
udsFreeConfiguration(index->configuration);
FREE(index->indexName);
FREE(index);
return result;
}
kobject_init(&index->dedupeObject, &dedupeKobjType);
result = kobject_add(&index->dedupeObject, &layer->kobj, "dedupe");
if (result != VDO_SUCCESS) {
freeWorkQueue(&index->udsQueue);
udsDestroyIndexSession(index->indexSession);
udsFreeConfiguration(index->configuration);
FREE(index->indexName);
FREE(index);
return result;
}
index->common.dump = dumpUDSIndex;
index->common.free = freeUDSIndex;
index->common.getDedupeStateName = getUDSStateName;
index->common.getStatistics = getUDSStatistics;
index->common.message = processMessage;
index->common.post = udsPost;
index->common.query = udsQuery;
index->common.resume = resumeUDSIndex;
index->common.start = startUDSIndex;
index->common.stop = stopUDSIndex;
index->common.suspend = suspendUDSIndex;
index->common.finish = finishUDSIndex;
index->common.update = udsUpdate;
INIT_LIST_HEAD(&index->pendingHead);
spin_lock_init(&index->pendingLock);
spin_lock_init(&index->stateLock);
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,15,0)
timer_setup(&index->pendingTimer, timeoutIndexOperations, 0);
#else
setup_timer(&index->pendingTimer, timeoutIndexOperations,
(unsigned long) index);
#endif
*indexPtr = &index->common;
return VDO_SUCCESS;
}