Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/chapterWriter.c#2 $
 */

#include "chapterWriter.h"

#include "errors.h"
#include "index.h"
#include "indexCheckpoint.h"
#include "indexComponent.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "openChapter.h"
#include "threads.h"


struct chapterWriter {
  /* The index to which we belong */
  Index            *index;
  /* The thread to do the writing */
  Thread            thread;
  /* lock protecting the following fields */
  Mutex             mutex;
  /* condition signalled on state changes */
  CondVar           cond;
  /* Set to true to stop the thread */
  bool              stop;
  /* The result from the most recent write */
  int               result;
  /* The number of bytes allocated by the chapter writer */
  size_t            memoryAllocated;
  /* The number of zones which have submitted a chapter for writing */
  unsigned int      zonesToWrite;
  /* Open chapter index used by closeOpenChapter() */
  OpenChapterIndex *openChapterIndex;
  /* Collated records used by closeOpenChapter() */
  UdsChunkRecord   *collatedRecords;
  /* The chapters to write (one per zone) */
  OpenChapterZone  *chapters[];
};

/**
 * This is the driver function for the writer thread. It loops until
 * terminated, waiting for a chapter to provided to close.
 **/
static void closeChapters(void *arg)
{
  ChapterWriter *writer = arg;
  logDebug("chapter writer starting");
  lockMutex(&writer->mutex);
  for (;;) {
    while (writer->zonesToWrite < writer->index->zoneCount) {
      if (writer->stop && (writer->zonesToWrite == 0)) {
        // We've been told to stop, and all of the zones are in the same
        // open chapter, so we can exit now.
        unlockMutex(&writer->mutex);
        logDebug("chapter writer stopping");
        return;
      }
      waitCond(&writer->cond, &writer->mutex);
    }

    /*
     * Release the lock while closing a chapter. We probably don't need to do
     * this, but it seems safer in principle. It's OK to access the chapter
     * and chapterNumber fields without the lock since those aren't allowed to
     * change until we're done.
     */
    unlockMutex(&writer->mutex);

    if (writer->index->hasSavedOpenChapter) {
      writer->index->hasSavedOpenChapter = false;
      /*
       * Remove the saved open chapter as that chapter is about to be written
       * to the volume.  This matters the first time we close the open chapter
       * after loading from a clean shutdown, or after doing a clean save.
       */
      IndexComponent *oc = findIndexComponent(writer->index->state,
                                              &OPEN_CHAPTER_INFO);
      int result = discardIndexComponent(oc);
      if (result == UDS_SUCCESS) {
        logDebug("Discarding saved open chapter");
      }
    }

    int result = closeOpenChapter(writer->chapters,
                                  writer->index->zoneCount,
                                  writer->index->volume,
                                  writer->openChapterIndex,
                                  writer->collatedRecords,
                                  writer->index->newestVirtualChapter);

    if (result == UDS_SUCCESS) {
      result = processChapterWriterCheckpointSaves(writer->index);
    }


    lockMutex(&writer->mutex);
    // Note that the index is totally finished with the writing chapter
    advanceActiveChapters(writer->index);
    writer->result       = result;
    writer->zonesToWrite = 0;
    broadcastCond(&writer->cond);
  }
}

/**********************************************************************/
int makeChapterWriter(Index                       *index,
                      const struct index_version  *indexVersion,
                      ChapterWriter              **writerPtr)
{
  size_t collatedRecordsSize
    = (sizeof(UdsChunkRecord)
       * (1 + index->volume->geometry->recordsPerChapter));
  ChapterWriter *writer;
  int result = ALLOCATE_EXTENDED(ChapterWriter,
                                 index->zoneCount, OpenChapterZone *,
                                 "Chapter Writer", &writer);
  if (result != UDS_SUCCESS) {
    return result;
  }
  writer->index = index;

  result = initMutex(&writer->mutex);
  if (result != UDS_SUCCESS) {
    FREE(writer);
    return result;
  }
  result = initCond(&writer->cond);
  if (result != UDS_SUCCESS) {
    destroyMutex(&writer->mutex);
    FREE(writer);
    return result;
  }

  // Now that we have the mutex+cond, it is safe to call freeChapterWriter.
  result = allocateCacheAligned(collatedRecordsSize, "collated records",
                                &writer->collatedRecords);
  if (result != UDS_SUCCESS) {
    freeChapterWriter(writer);
    return makeUnrecoverable(result);
  }
  result = makeOpenChapterIndex(&writer->openChapterIndex,
                                index->volume->geometry,
                                indexVersion->chapterIndexHeaderNativeEndian,
                                index->volume->nonce);
  if (result != UDS_SUCCESS) {
    freeChapterWriter(writer);
    return makeUnrecoverable(result);
  }

  size_t openChapterIndexMemoryAllocated
    = getOpenChapterIndexMemoryAllocated(writer->openChapterIndex);
  writer->memoryAllocated = (sizeof(ChapterWriter)
                             + index->zoneCount * sizeof(OpenChapterZone *)
                             + collatedRecordsSize
                             + openChapterIndexMemoryAllocated);

  // We're initialized, so now it's safe to start the writer thread.
  result = createThread(closeChapters, writer, "writer", &writer->thread);
  if (result != UDS_SUCCESS) {
    freeChapterWriter(writer);
    return makeUnrecoverable(result);
  }

  *writerPtr = writer;
  return UDS_SUCCESS;
}

/**********************************************************************/
void freeChapterWriter(ChapterWriter *writer)
{
  if (writer == NULL) {
    return;
  }

  int result __attribute__((unused)) = stopChapterWriter(writer);
  destroyMutex(&writer->mutex);
  destroyCond(&writer->cond);
  freeOpenChapterIndex(writer->openChapterIndex);
  FREE(writer->collatedRecords);
  FREE(writer);
}

/**********************************************************************/
unsigned int startClosingChapter(ChapterWriter   *writer,
                                 unsigned int     zoneNumber,
                                 OpenChapterZone *chapter)
{
  lockMutex(&writer->mutex);
  unsigned int finishedZones = ++writer->zonesToWrite;
  writer->chapters[zoneNumber] = chapter;
  broadcastCond(&writer->cond);
  unlockMutex(&writer->mutex);

  return finishedZones;
}

/**********************************************************************/
int finishPreviousChapter(ChapterWriter *writer, uint64_t currentChapterNumber)
{
  int result;
  lockMutex(&writer->mutex);
  while (writer->index->newestVirtualChapter < currentChapterNumber) {
    waitCond(&writer->cond, &writer->mutex);
  }
  result = writer->result;
  unlockMutex(&writer->mutex);

  if (result != UDS_SUCCESS) {
    return logUnrecoverable(result, "Writing of previous open chapter failed");
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
void waitForIdleChapterWriter(ChapterWriter *writer)
{
  lockMutex(&writer->mutex);
  while (writer->zonesToWrite > 0) {
    // The chapter writer is probably writing a chapter.  If it is not, it will
    // soon wake up and write a chapter.
    waitCond(&writer->cond, &writer->mutex);
  }
  unlockMutex(&writer->mutex);
}

/**********************************************************************/
int stopChapterWriter(ChapterWriter *writer)
{
  Thread writerThread = 0;

  lockMutex(&writer->mutex);
  if (writer->thread != 0) {
    writerThread = writer->thread;
    writer->thread = 0;
    writer->stop = true;
    broadcastCond(&writer->cond);
  }
  int result = writer->result;
  unlockMutex(&writer->mutex);

  if (writerThread != 0) {
    joinThreads(writerThread);
  }

  if (result != UDS_SUCCESS) {
    return logUnrecoverable(result, "Writing of previous open chapter failed");
  }
  return UDS_SUCCESS;
}

/**********************************************************************/
size_t getChapterWriterMemoryAllocated(ChapterWriter *writer)
{
  return writer->memoryAllocated;
}