/* * Copyright (c) 2020 Red Hat, Inc. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. * * $Id: //eng/vdo-releases/aluminum/src/c++/vdo/base/flush.c#3 $ */ #include "flush.h" #include "logger.h" #include "memoryAlloc.h" #include "blockAllocator.h" #include "completion.h" #include "logicalZone.h" #include "numUtils.h" #include "readOnlyNotifier.h" #include "slabDepot.h" #include "vdoInternal.h" struct flusher { VDOCompletion completion; /** The VDO to which this flusher belongs */ VDO *vdo; /** The current flush generation of the VDO */ SequenceNumber flushGeneration; /** The first unacknowledged flush generation */ SequenceNumber firstUnacknowledgedGeneration; /** The queue of flush requests waiting to notify other threads */ WaitQueue notifiers; /** The queue of flush requests waiting for VIOs to complete */ WaitQueue pendingFlushes; /** The flush generation for which notifications are being sent */ SequenceNumber notifyGeneration; /** The logical zone to notify next */ LogicalZone *logicalZoneToNotify; /** The ID of the thread on which flush requests should be made */ ThreadID threadID; }; /** * Convert a generic VDOCompletion to a Flusher. * * @param completion The completion to convert * * @return The completion as a Flusher **/ static Flusher *asFlusher(VDOCompletion *completion) { STATIC_ASSERT(offsetof(Flusher, completion) == 0); assertCompletionType(completion->type, FLUSH_NOTIFICATION_COMPLETION); return (Flusher *) completion; } /** * Convert a VDOFlush's generic wait queue entry back to the VDOFlush. * * @param waiter The wait queue entry to convert * * @return The wait queue entry as a VDOFlush **/ static VDOFlush *waiterAsFlush(Waiter *waiter) { STATIC_ASSERT(offsetof(VDOFlush, waiter) == 0); return (VDOFlush *) waiter; } /**********************************************************************/ int makeFlusher(VDO *vdo) { int result = ALLOCATE(1, Flusher, __func__, &vdo->flusher); if (result != VDO_SUCCESS) { return result; } vdo->flusher->vdo = vdo; vdo->flusher->threadID = getPackerZoneThread(getThreadConfig(vdo)); return initializeEnqueueableCompletion(&vdo->flusher->completion, FLUSH_NOTIFICATION_COMPLETION, vdo->layer); } /**********************************************************************/ void freeFlusher(Flusher **flusherPtr) { if (*flusherPtr == NULL) { return; } Flusher *flusher = *flusherPtr; destroyEnqueueable(&flusher->completion); FREE(flusher); *flusherPtr = NULL; } /**********************************************************************/ ThreadID getFlusherThreadID(Flusher *flusher) { return flusher->threadID; } /**********************************************************************/ static void notifyFlush(Flusher *flusher); /** * Finish the notification process by checking if any flushes have completed * and then starting the notification of the next flush request if one came in * while the current notification was in progress. This callback is registered * in flushPackerCallback(). * * @param completion The flusher completion **/ static void finishNotification(VDOCompletion *completion) { Flusher *flusher = asFlusher(completion); ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID), "finishNotification() called from flusher thread"); Waiter *waiter = dequeueNextWaiter(&flusher->notifiers); int result = enqueueWaiter(&flusher->pendingFlushes, waiter); if (result != VDO_SUCCESS) { enterReadOnlyMode(flusher->vdo->readOnlyNotifier, result); VDOFlush *flush = waiterAsFlush(waiter); completion->layer->completeFlush(&flush); return; } completeFlushes(flusher); if (hasWaiters(&flusher->notifiers)) { notifyFlush(flusher); } } /** * Flush the packer now that all of the logical and physical zones have been * notified of the new flush request. This callback is registered in * incrementGeneration(). * * @param completion The flusher completion **/ static void flushPackerCallback(VDOCompletion *completion) { Flusher *flusher = asFlusher(completion); incrementPackerFlushGeneration(flusher->vdo->packer); launchCallback(completion, finishNotification, flusher->threadID); } /** * Increment the flush generation in a logical zone. If there are more logical * zones, go on to the next one, otherwise, prepare the physical zones. This * callback is registered both in notifyFlush() and in itself. * * @param completion The flusher as a completion **/ static void incrementGeneration(VDOCompletion *completion) { Flusher *flusher = asFlusher(completion); incrementFlushGeneration(flusher->logicalZoneToNotify, flusher->notifyGeneration); flusher->logicalZoneToNotify = getNextLogicalZone(flusher->logicalZoneToNotify); if (flusher->logicalZoneToNotify == NULL) { launchCallback(completion, flushPackerCallback, flusher->threadID); return; } launchCallback(completion, incrementGeneration, getLogicalZoneThreadID(flusher->logicalZoneToNotify)); } /** * Lauch a flush notification. * * @param flusher The flusher doing the notification **/ static void notifyFlush(Flusher *flusher) { VDOFlush *flush = waiterAsFlush(getFirstWaiter(&flusher->notifiers)); flusher->notifyGeneration = flush->flushGeneration; flusher->logicalZoneToNotify = getLogicalZone(flusher->vdo->logicalZones, 0); flusher->completion.requeue = true; launchCallback(&flusher->completion, incrementGeneration, getLogicalZoneThreadID(flusher->logicalZoneToNotify)); } /**********************************************************************/ void flush(VDO *vdo, VDOFlush *flush) { Flusher *flusher = vdo->flusher; ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID), "flush() called from flusher thread"); flush->flushGeneration = flusher->flushGeneration++; bool mayNotify = !hasWaiters(&flusher->notifiers); int result = enqueueWaiter(&flusher->notifiers, &flush->waiter); if (result != VDO_SUCCESS) { enterReadOnlyMode(vdo->readOnlyNotifier, result); flusher->completion.layer->completeFlush(&flush); return; } if (mayNotify) { notifyFlush(flusher); } } /**********************************************************************/ void completeFlushes(Flusher *flusher) { ASSERT_LOG_ONLY((getCallbackThreadID() == flusher->threadID), "completeFlushes() called from flusher thread"); SequenceNumber oldestActiveGeneration = UINT64_MAX; for (LogicalZone *zone = getLogicalZone(flusher->vdo->logicalZones, 0); zone != NULL; zone = getNextLogicalZone(zone)) { SequenceNumber oldestInZone = getOldestLockedGeneration(zone); oldestActiveGeneration = minSequenceNumber(oldestActiveGeneration, oldestInZone); } while (hasWaiters(&flusher->pendingFlushes)) { VDOFlush *flush = waiterAsFlush(getFirstWaiter(&flusher->pendingFlushes)); if (flush->flushGeneration >= oldestActiveGeneration) { return; } ASSERT_LOG_ONLY((flush->flushGeneration == flusher->firstUnacknowledgedGeneration), "acknowledged next expected flush, %" PRIu64 ", was: %llu", flusher->firstUnacknowledgedGeneration, flush->flushGeneration); dequeueNextWaiter(&flusher->pendingFlushes); flusher->completion.layer->completeFlush(&flush); flusher->firstUnacknowledgedGeneration++; } } /**********************************************************************/ void dumpFlusher(const Flusher *flusher) { logInfo("Flusher"); logInfo(" flushGeneration=%" PRIu64 " firstUnacknowledgedGeneration=%llu", flusher->flushGeneration, flusher->firstUnacknowledgedGeneration); logInfo(" notifiers queue is %s; pendingFlushes queue is %s", (hasWaiters(&flusher->notifiers) ? "not empty" : "empty"), (hasWaiters(&flusher->pendingFlushes) ? "not empty" : "empty")); }