/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/flush.c#3 $
*/
#include "flush.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "blockAllocator.h"
#include "completion.h"
#include "logicalZone.h"
#include "numUtils.h"
#include "readOnlyNotifier.h"
#include "slabDepot.h"
#include "vdoInternal.h"
struct flusher {
VDOCompletion completion;
/** The VDO to which this flusher belongs */
VDO *vdo;
/** The current flush generation of the VDO */
SequenceNumber flushGeneration;
/** The first unacknowledged flush generation */
SequenceNumber firstUnacknowledgedGeneration;
/** The queue of flush requests waiting to notify other threads */
WaitQueue notifiers;
/** The queue of flush requests waiting for VIOs to complete */
WaitQueue pendingFlushes;
/** The flush generation for which notifications are being sent */
SequenceNumber notifyGeneration;
/** The logical zone to notify next */
LogicalZone *logicalZoneToNotify;
/** The ID of the thread on which flush requests should be made */
ThreadID threadID;
};
/**
* Convert a generic VDOCompletion to a Flusher.
*
* @param completion The completion to convert
*
* @return The completion as a Flusher
**/
static Flusher *asFlusher(VDOCompletion *completion)
{
STATIC_ASSERT(offsetof(Flusher, completion) == 0);
assertCompletionType(completion->type, FLUSH_NOTIFICATION_COMPLETION);
return (Flusher *) completion;
}
/**
* Convert a VDOFlush's generic wait queue entry back to the VDOFlush.
*
* @param waiter The wait queue entry to convert
*
* @return The wait queue entry as a VDOFlush
**/
static VDOFlush *waiterAsFlush(Waiter *waiter)
{
STATIC_ASSERT(offsetof(VDOFlush, waiter) == 0);
return (VDOFlush *) waiter;
}
/**********************************************************************/
int makeFlusher(VDO *vdo)
{
int result = ALLOCATE(1, Flusher, __func__, &vdo->flusher);
if (result != VDO_SUCCESS) {
return result;
}
vdo->flusher->vdo = vdo;
vdo->flusher->threadID = getPackerZoneThread(getThreadConfig(vdo));
return initializeEnqueueableCompletion(&vdo->flusher->completion,
FLUSH_NOTIFICATION_COMPLETION,
vdo->layer);
}
/**********************************************************************/
void freeFlusher(Flusher **flusherPtr)
{
if (*flusherPtr == NULL) {
return;
}
Flusher *flusher = *flusherPtr;
destroyEnqueueable(&flusher->completion);
FREE(flusher);
*flusherPtr = NULL;
}
/**********************************************************************/
ThreadID getFlusherThreadID(Flusher *flusher)
{
return flusher->threadID;
}
/**********************************************************************/
static void notifyFlush(Flusher *flusher);
/**
* Finish the notification process by checking if any flushes have completed
* and then starting the notification of the next flush request if one came in
* while the current notification was in progress. This callback is registered
* in flushPackerCallback().
*
* @param completion The flusher completion
**/
static void finishNotification(VDOCompletion *completion)
{
Flusher *flusher = asFlusher(completion);
ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID),
"finishNotification() called from flusher thread");
Waiter *waiter = dequeueNextWaiter(&flusher->notifiers);
int result = enqueueWaiter(&flusher->pendingFlushes, waiter);
if (result != VDO_SUCCESS) {
enterReadOnlyMode(flusher->vdo->readOnlyNotifier, result);
VDOFlush *flush = waiterAsFlush(waiter);
completion->layer->completeFlush(&flush);
return;
}
completeFlushes(flusher);
if (hasWaiters(&flusher->notifiers)) {
notifyFlush(flusher);
}
}
/**
* Flush the packer now that all of the logical and physical zones have been
* notified of the new flush request. This callback is registered in
* incrementGeneration().
*
* @param completion The flusher completion
**/
static void flushPackerCallback(VDOCompletion *completion)
{
Flusher *flusher = asFlusher(completion);
incrementPackerFlushGeneration(flusher->vdo->packer);
launchCallback(completion, finishNotification, flusher->threadID);
}
/**
* Increment the flush generation in a logical zone. If there are more logical
* zones, go on to the next one, otherwise, prepare the physical zones. This
* callback is registered both in notifyFlush() and in itself.
*
* @param completion The flusher as a completion
**/
static void incrementGeneration(VDOCompletion *completion)
{
Flusher *flusher = asFlusher(completion);
incrementFlushGeneration(flusher->logicalZoneToNotify,
flusher->notifyGeneration);
flusher->logicalZoneToNotify
= getNextLogicalZone(flusher->logicalZoneToNotify);
if (flusher->logicalZoneToNotify == NULL) {
launchCallback(completion, flushPackerCallback, flusher->threadID);
return;
}
launchCallback(completion, incrementGeneration,
getLogicalZoneThreadID(flusher->logicalZoneToNotify));
}
/**
* Lauch a flush notification.
*
* @param flusher The flusher doing the notification
**/
static void notifyFlush(Flusher *flusher)
{
VDOFlush *flush = waiterAsFlush(getFirstWaiter(&flusher->notifiers));
flusher->notifyGeneration = flush->flushGeneration;
flusher->logicalZoneToNotify = getLogicalZone(flusher->vdo->logicalZones, 0);
flusher->completion.requeue = true;
launchCallback(&flusher->completion, incrementGeneration,
getLogicalZoneThreadID(flusher->logicalZoneToNotify));
}
/**********************************************************************/
void flush(VDO *vdo, VDOFlush *flush)
{
Flusher *flusher = vdo->flusher;
ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID),
"flush() called from flusher thread");
flush->flushGeneration = flusher->flushGeneration++;
bool mayNotify = !hasWaiters(&flusher->notifiers);
int result = enqueueWaiter(&flusher->notifiers, &flush->waiter);
if (result != VDO_SUCCESS) {
enterReadOnlyMode(vdo->readOnlyNotifier, result);
flusher->completion.layer->completeFlush(&flush);
return;
}
if (mayNotify) {
notifyFlush(flusher);
}
}
/**********************************************************************/
void completeFlushes(Flusher *flusher)
{
ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID),
"completeFlushes() called from flusher thread");
SequenceNumber oldestActiveGeneration = UINT64_MAX;
for (LogicalZone *zone = getLogicalZone(flusher->vdo->logicalZones, 0);
zone != NULL;
zone = getNextLogicalZone(zone)) {
SequenceNumber oldestInZone = getOldestLockedGeneration(zone);
oldestActiveGeneration = minSequenceNumber(oldestActiveGeneration,
oldestInZone);
}
while (hasWaiters(&flusher->pendingFlushes)) {
VDOFlush *flush = waiterAsFlush(getFirstWaiter(&flusher->pendingFlushes));
if (flush->flushGeneration >= oldestActiveGeneration) {
return;
}
ASSERT_LOG_ONLY((flush->flushGeneration
== flusher->firstUnacknowledgedGeneration),
"acknowledged next expected flush, %" PRIu64
", was: %llu",
flusher->firstUnacknowledgedGeneration,
flush->flushGeneration);
dequeueNextWaiter(&flusher->pendingFlushes);
flusher->completion.layer->completeFlush(&flush);
flusher->firstUnacknowledgedGeneration++;
}
}
/**********************************************************************/
void dumpFlusher(const Flusher *flusher)
{
logInfo("Flusher");
logInfo(" flushGeneration=%" PRIu64
" firstUnacknowledgedGeneration=%llu",
flusher->flushGeneration, flusher->firstUnacknowledgedGeneration);
logInfo(" notifiers queue is %s; pendingFlushes queue is %s",
(hasWaiters(&flusher->notifiers) ? "not empty" : "empty"),
(hasWaiters(&flusher->pendingFlushes) ? "not empty" : "empty"));
}