Blob Blame History Raw
/*
 * Copyright (c) 2020 Red Hat, Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
 * 02110-1301, USA. 
 *
 * $Id: //eng/uds-releases/jasper/src/uds/util/eventCount.c#2 $
 */

/**
 * This EventCount implementation uses a posix semaphore for portability,
 * although a futex would be slightly superior to use and easy to substitute.
 * It is designed to make signalling as cheap as possible, since that is the
 * code path likely triggered on most updates to a lock-free data structure.
 * Waiters are likely going to sleep, so optimizing for that case isn't
 * necessary.
 *
 * The critical field is the state, which is really two fields that can be
 * atomically updated in unison: an event counter and a waiter count. Every
 * call to eventCountPrepare() issues a wait token by atomically incrementing
 * the waiter count. The key invariant is a strict accounting of the number of
 * tokens issued. Every token returned by eventCountPrepare() is a contract
 * that the caller will call acquireSemaphore() and a signaller will call
 * releaseSemaphore(), each exactly once. Atomic updates to the state field
 * ensure that each token is counted once and that tokens are not lost.
 * Cancelling a token attempts to take a fast-path by simply decrementing the
 * waiters field, but if the token has already been claimed by a signaller,
 * the canceller must still wait on the semaphore to consume the transferred
 * token.
 *
 * The state field is 64 bits, partitioned into a 16-bit waiter field and a
 * 48-bit counter. We are unlikely to have 2^16 threads, much less 2^16
 * threads waiting on any single event transition. 2^48 microseconds is
 * several years, so a token holder would have to wait that long for the
 * counter to wrap around, and then call eventCountWait() at the exact right
 * time to see the re-used counter, in order to lose a wakeup due to counter
 * wrap-around. Using a 32-bit state field would greatly increase that chance,
 * but if forced to do so, the implementation could likely tolerate it since
 * callers are supposed to hold tokens for miniscule periods of time.
 * Fortunately, x64 has 64-bit compare-and-swap, and the performance of
 * interlocked 64-bit operations appears to be about the same as for 32-bit
 * ones, so being paranoid and using 64 bits costs us nothing.
 *
 * Here are some sequences of calls and state transitions:
 *
 *    action                    postcondition
 *                        counter   waiters   semaphore
 *    initialized           0          0          0
 *    prepare               0          1          0
 *    wait (blocks)         0          1          0
 *    signal                1          0          1
 *    wait (unblocks)       1          0          0
 *
 *    signal (fast-path)    1          0          0
 *    signal (fast-path)    1          0          0
 *
 *    prepare A             1          1          0
 *    prepare B             1          2          0
 *    signal                2          0          2
 *    wait B (fast-path)    2          0          1
 *    wait A (fast-path)    2          0          0
 *
 *    prepare               2          1          0
 *    cancel (fast-path)    2          0          0
 *
 *    prepare               2          1          0
 *    signal                3          0          1
 *    cancel (must wait)    3          0          0
 *
 * The EventCount structure is aligned, sized, and allocated to cache line
 * boundaries to avoid any false sharing between the EventCount and other
 * shared state. The state field and semaphore should fit on a single cache
 * line. The instrumentation counters increase the size of the structure so it
 * rounds up to use two (64-byte x86) cache lines.
 *
 * XXX Need interface to access or display instrumentation counters.
 **/

#include "eventCount.h"

#include "atomicDefs.h"
#include "common.h"
#include "compiler.h"
#include "cpu.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "threads.h"

enum {
  ONE_WAITER   = 1,               // value used to increment the waiters field
  ONE_EVENT    = (1 << 16),       // value used to increment the event counter
  WAITERS_MASK = (ONE_EVENT - 1), // bit mask to access the waiters field
  EVENTS_MASK  = ~WAITERS_MASK,   // bit mask to access the event counter
};

struct eventCount {
  // Atomically mutable state:
  // low  16 bits: the number of wait tokens not posted to the semaphore
  // high 48 bits: current event counter
  atomic64_t state;

  // Semaphore used to block threads when waiting is required.
  Semaphore semaphore;

  // Instrumentation counters.

  // Declare alignment so we don't share a cache line.
} __attribute__((aligned(CACHE_LINE_BYTES)));

/**
 * Test the event field in two tokens for equality.
 *
 * @return  true iff the tokens contain the same event field value
 **/
static INLINE bool sameEvent(EventToken token1, EventToken token2)
{
  return ((token1 & EVENTS_MASK) == (token2 & EVENTS_MASK));
}

/**********************************************************************/
void eventCountBroadcast(EventCount *ec)
{

  // Even if there are no waiters (yet), we will need a memory barrier.
  smp_mb();

  uint64_t waiters;
  uint64_t state = atomic64_read(&ec->state);
  uint64_t oldState = state;
  do {
    // Check if there are any tokens that have not yet been been transferred
    // to the semaphore. This is the fast no-waiters path.
    waiters = (state & WAITERS_MASK);
    if (waiters == 0) {
      // Fast path first time through--no need to signal or post if there are
      // no observers.
      return;
    }

    /*
     * Attempt to atomically claim all the wait tokens and bump the event count
     * using an atomic compare-and-swap.  This operation contains a memory
     * barrier.
     */
    EventToken newState = ((state & ~WAITERS_MASK) + ONE_EVENT);
    oldState = state;
    state = atomic64_cmpxchg(&ec->state, oldState, newState);
    // The cmpxchg fails when we lose a race with a new waiter or another
    // signaller, so try again.
  } while (unlikely(state != oldState));


  /*
   * Wake the waiters by posting to the semaphore. This effectively transfers
   * the wait tokens to the semaphore. There's sadly no bulk post for posix
   * semaphores, so we've got to loop to do them all.
   */
  while (waiters-- > 0) {
    releaseSemaphore(&ec->semaphore);
  }
}

/**
 * Attempt to cancel a prepared wait token by decrementing the
 * number of waiters in the current state. This can only be done
 * safely if the event count hasn't been bumped.
 *
 * @param ec     the event count on which the wait token was issued
 * @param token  the wait to cancel
 *
 * @return true if the wait was cancelled, false if the caller must
 *         still wait on the semaphore
 **/
static INLINE bool fastCancel(EventCount *ec, EventToken token)
{
  EventToken currentToken = atomic64_read(&ec->state);
  while (sameEvent(currentToken, token)) {
    // Try to decrement the waiter count via compare-and-swap as if we had
    // never prepared to wait.
    EventToken et = atomic64_cmpxchg(&ec->state, currentToken,
                                     currentToken - 1);
    if (et == currentToken) {
      return true;
    }
    currentToken = et;
  }
  return false;
}

/**
 * Consume a token from the semaphore, waiting (with an optional timeout) if
 * one is not currently available. Also attempts to count the number of times
 * we'll actually have to wait because there are no tokens (permits) available
 * in the semaphore, and the number of times the wait times out.
 *
 * @param ec       the event count instance
 * @param timeout  an optional timeout value to pass to attemptSemaphore()
 *
 * @return true if a token was consumed, otherwise false only if a timeout
 *         was specified and we timed out
 **/
static bool consumeWaitToken(EventCount *ec, const RelTime *timeout)
{
  // Try to grab a token without waiting.
  if (attemptSemaphore(&ec->semaphore, 0)) {
    return true;
  }


  if (timeout == NULL) {
    acquireSemaphore(&ec->semaphore);
  } else if (!attemptSemaphore(&ec->semaphore, *timeout)) {
    return false;
  }
  return true;
}

/**********************************************************************/
int makeEventCount(EventCount **ecPtr)
{
  // The event count will be allocated on a cache line boundary so there will
  // not be false sharing of the line with any other data structure.
  EventCount *ec = NULL;
  int result = ALLOCATE(1, EventCount, "event count", &ec);
  if (result != UDS_SUCCESS) {
    return result;
  }

  atomic64_set(&ec->state, 0);
  result = initializeSemaphore(&ec->semaphore, 0);
  if (result != UDS_SUCCESS) {
    FREE(ec);
    return result;
  }

  *ecPtr = ec;
  return UDS_SUCCESS;
}

/**********************************************************************/
void freeEventCount(EventCount *ec)
{
  if (ec == NULL) {
    return;
  }
  destroySemaphore(&ec->semaphore);
  FREE(ec);
}

/**********************************************************************/
EventToken eventCountPrepare(EventCount *ec)
{
  return atomic64_add_return(ONE_WAITER, &ec->state);
}

/**********************************************************************/
void eventCountCancel(EventCount *ec, EventToken token)
{
  // Decrement the waiter count if the event hasn't been signalled.
  if (fastCancel(ec, token)) {
    return;
  }
  // A signaller has already transferred (or promised to transfer) our token
  // to the semaphore, so we must consume it from the semaphore by waiting.
  eventCountWait(ec, token, NULL);
}

/**********************************************************************/
bool eventCountWait(EventCount *ec, EventToken token, const RelTime *timeout)
{

  for (;;) {
    // Wait for a signaller to transfer our wait token to the semaphore.
    if (!consumeWaitToken(ec, timeout)) {
      // The wait timed out, so we must cancel the token instead. Try to
      // decrement the waiter count if the event hasn't been signalled.
      if (fastCancel(ec, token)) {
        return false;
      }
      /*
       * We timed out, but a signaller came in before we could cancel the
       * wait. We have no choice but to wait for the semaphore to be posted.
       * Since signaller has promised to do it, the wait will be short. The
       * timeout and the signal happened at about the same time, so either
       * outcome could be returned. It's simpler to ignore the timeout.
       */
      timeout = NULL;
      continue;
    }

    // A wait token has now been consumed from the semaphore.

    // Stop waiting if the count has changed since the token was acquired.
    if (!sameEvent(token, atomic64_read(&ec->state))) {
      return true;
    }

    // We consumed someone else's wait token. Put it back in the semaphore,
    // which will wake another waiter, hopefully one who can stop waiting.
    releaseSemaphore(&ec->semaphore);

    // Attempt to give an earlier waiter a shot at the semaphore.
    yieldScheduler();
  }
}