Blame source/uds/util/eventCount.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/uds-releases/jasper/src/uds/util/eventCount.c#2 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * This EventCount implementation uses a posix semaphore for portability,
Packit Service 310c69
 * although a futex would be slightly superior to use and easy to substitute.
Packit Service 310c69
 * It is designed to make signalling as cheap as possible, since that is the
Packit Service 310c69
 * code path likely triggered on most updates to a lock-free data structure.
Packit Service 310c69
 * Waiters are likely going to sleep, so optimizing for that case isn't
Packit Service 310c69
 * necessary.
Packit Service 310c69
 *
Packit Service 310c69
 * The critical field is the state, which is really two fields that can be
Packit Service 310c69
 * atomically updated in unison: an event counter and a waiter count. Every
Packit Service 310c69
 * call to eventCountPrepare() issues a wait token by atomically incrementing
Packit Service 310c69
 * the waiter count. The key invariant is a strict accounting of the number of
Packit Service 310c69
 * tokens issued. Every token returned by eventCountPrepare() is a contract
Packit Service 310c69
 * that the caller will call acquireSemaphore() and a signaller will call
Packit Service 310c69
 * releaseSemaphore(), each exactly once. Atomic updates to the state field
Packit Service 310c69
 * ensure that each token is counted once and that tokens are not lost.
Packit Service 310c69
 * Cancelling a token attempts to take a fast-path by simply decrementing the
Packit Service 310c69
 * waiters field, but if the token has already been claimed by a signaller,
Packit Service 310c69
 * the canceller must still wait on the semaphore to consume the transferred
Packit Service 310c69
 * token.
Packit Service 310c69
 *
Packit Service 310c69
 * The state field is 64 bits, partitioned into a 16-bit waiter field and a
Packit Service 310c69
 * 48-bit counter. We are unlikely to have 2^16 threads, much less 2^16
Packit Service 310c69
 * threads waiting on any single event transition. 2^48 microseconds is
Packit Service 310c69
 * several years, so a token holder would have to wait that long for the
Packit Service 310c69
 * counter to wrap around, and then call eventCountWait() at the exact right
Packit Service 310c69
 * time to see the re-used counter, in order to lose a wakeup due to counter
Packit Service 310c69
 * wrap-around. Using a 32-bit state field would greatly increase that chance,
Packit Service 310c69
 * but if forced to do so, the implementation could likely tolerate it since
Packit Service 310c69
 * callers are supposed to hold tokens for miniscule periods of time.
Packit Service 310c69
 * Fortunately, x64 has 64-bit compare-and-swap, and the performance of
Packit Service 310c69
 * interlocked 64-bit operations appears to be about the same as for 32-bit
Packit Service 310c69
 * ones, so being paranoid and using 64 bits costs us nothing.
Packit Service 310c69
 *
Packit Service 310c69
 * Here are some sequences of calls and state transitions:
Packit Service 310c69
 *
Packit Service 310c69
 *    action                    postcondition
Packit Service 310c69
 *                        counter   waiters   semaphore
Packit Service 310c69
 *    initialized           0          0          0
Packit Service 310c69
 *    prepare               0          1          0
Packit Service 310c69
 *    wait (blocks)         0          1          0
Packit Service 310c69
 *    signal                1          0          1
Packit Service 310c69
 *    wait (unblocks)       1          0          0
Packit Service 310c69
 *
Packit Service 310c69
 *    signal (fast-path)    1          0          0
Packit Service 310c69
 *    signal (fast-path)    1          0          0
Packit Service 310c69
 *
Packit Service 310c69
 *    prepare A             1          1          0
Packit Service 310c69
 *    prepare B             1          2          0
Packit Service 310c69
 *    signal                2          0          2
Packit Service 310c69
 *    wait B (fast-path)    2          0          1
Packit Service 310c69
 *    wait A (fast-path)    2          0          0
Packit Service 310c69
 *
Packit Service 310c69
 *    prepare               2          1          0
Packit Service 310c69
 *    cancel (fast-path)    2          0          0
Packit Service 310c69
 *
Packit Service 310c69
 *    prepare               2          1          0
Packit Service 310c69
 *    signal                3          0          1
Packit Service 310c69
 *    cancel (must wait)    3          0          0
Packit Service 310c69
 *
Packit Service 310c69
 * The EventCount structure is aligned, sized, and allocated to cache line
Packit Service 310c69
 * boundaries to avoid any false sharing between the EventCount and other
Packit Service 310c69
 * shared state. The state field and semaphore should fit on a single cache
Packit Service 310c69
 * line. The instrumentation counters increase the size of the structure so it
Packit Service 310c69
 * rounds up to use two (64-byte x86) cache lines.
Packit Service 310c69
 *
Packit Service 310c69
 * XXX Need interface to access or display instrumentation counters.
Packit Service 310c69
 **/
Packit Service 310c69
Packit Service 310c69
#include "eventCount.h"
Packit Service 310c69
Packit Service 310c69
#include "atomicDefs.h"
Packit Service 310c69
#include "common.h"
Packit Service 310c69
#include "compiler.h"
Packit Service 310c69
#include "cpu.h"
Packit Service 310c69
#include "logger.h"
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "threads.h"
Packit Service 310c69
Packit Service 310c69
enum {
Packit Service 310c69
  ONE_WAITER   = 1,               // value used to increment the waiters field
Packit Service 310c69
  ONE_EVENT    = (1 << 16),       // value used to increment the event counter
Packit Service 310c69
  WAITERS_MASK = (ONE_EVENT - 1), // bit mask to access the waiters field
Packit Service 310c69
  EVENTS_MASK  = ~WAITERS_MASK,   // bit mask to access the event counter
Packit Service 310c69
};
Packit Service 310c69
Packit Service 310c69
struct eventCount {
Packit Service 310c69
  // Atomically mutable state:
Packit Service 310c69
  // low  16 bits: the number of wait tokens not posted to the semaphore
Packit Service 310c69
  // high 48 bits: current event counter
Packit Service 310c69
  atomic64_t state;
Packit Service 310c69
Packit Service 310c69
  // Semaphore used to block threads when waiting is required.
Packit Service 310c69
  Semaphore semaphore;
Packit Service 310c69
Packit Service 310c69
  // Instrumentation counters.
Packit Service 310c69
Packit Service 310c69
  // Declare alignment so we don't share a cache line.
Packit Service 310c69
} __attribute__((aligned(CACHE_LINE_BYTES)));
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Test the event field in two tokens for equality.
Packit Service 310c69
 *
Packit Service 310c69
 * @return  true iff the tokens contain the same event field value
Packit Service 310c69
 **/
Packit Service 310c69
static INLINE bool sameEvent(EventToken token1, EventToken token2)
Packit Service 310c69
{
Packit Service 310c69
  return ((token1 & EVENTS_MASK) == (token2 & EVENTS_MASK));
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void eventCountBroadcast(EventCount *ec)
Packit Service 310c69
{
Packit Service 310c69
Packit Service 310c69
  // Even if there are no waiters (yet), we will need a memory barrier.
Packit Service 310c69
  smp_mb();
Packit Service 310c69
Packit Service 310c69
  uint64_t waiters;
Packit Service 310c69
  uint64_t state = atomic64_read(&ec->state);
Packit Service 310c69
  uint64_t oldState = state;
Packit Service 310c69
  do {
Packit Service 310c69
    // Check if there are any tokens that have not yet been been transferred
Packit Service 310c69
    // to the semaphore. This is the fast no-waiters path.
Packit Service 310c69
    waiters = (state & WAITERS_MASK);
Packit Service 310c69
    if (waiters == 0) {
Packit Service 310c69
      // Fast path first time through--no need to signal or post if there are
Packit Service 310c69
      // no observers.
Packit Service 310c69
      return;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    /*
Packit Service 310c69
     * Attempt to atomically claim all the wait tokens and bump the event count
Packit Service 310c69
     * using an atomic compare-and-swap.  This operation contains a memory
Packit Service 310c69
     * barrier.
Packit Service 310c69
     */
Packit Service 310c69
    EventToken newState = ((state & ~WAITERS_MASK) + ONE_EVENT);
Packit Service 310c69
    oldState = state;
Packit Service 310c69
    state = atomic64_cmpxchg(&ec->state, oldState, newState);
Packit Service 310c69
    // The cmpxchg fails when we lose a race with a new waiter or another
Packit Service 310c69
    // signaller, so try again.
Packit Service 310c69
  } while (unlikely(state != oldState));
Packit Service 310c69
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Wake the waiters by posting to the semaphore. This effectively transfers
Packit Service 310c69
   * the wait tokens to the semaphore. There's sadly no bulk post for posix
Packit Service 310c69
   * semaphores, so we've got to loop to do them all.
Packit Service 310c69
   */
Packit Service 310c69
  while (waiters-- > 0) {
Packit Service 310c69
    releaseSemaphore(&ec->semaphore);
Packit Service 310c69
  }
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Attempt to cancel a prepared wait token by decrementing the
Packit Service 310c69
 * number of waiters in the current state. This can only be done
Packit Service 310c69
 * safely if the event count hasn't been bumped.
Packit Service 310c69
 *
Packit Service 310c69
 * @param ec     the event count on which the wait token was issued
Packit Service 310c69
 * @param token  the wait to cancel
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if the wait was cancelled, false if the caller must
Packit Service 310c69
 *         still wait on the semaphore
Packit Service 310c69
 **/
Packit Service 310c69
static INLINE bool fastCancel(EventCount *ec, EventToken token)
Packit Service 310c69
{
Packit Service 310c69
  EventToken currentToken = atomic64_read(&ec->state);
Packit Service 310c69
  while (sameEvent(currentToken, token)) {
Packit Service 310c69
    // Try to decrement the waiter count via compare-and-swap as if we had
Packit Service 310c69
    // never prepared to wait.
Packit Service 310c69
    EventToken et = atomic64_cmpxchg(&ec->state, currentToken,
Packit Service 310c69
                                     currentToken - 1);
Packit Service 310c69
    if (et == currentToken) {
Packit Service 310c69
      return true;
Packit Service 310c69
    }
Packit Service 310c69
    currentToken = et;
Packit Service 310c69
  }
Packit Service 310c69
  return false;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**
Packit Service 310c69
 * Consume a token from the semaphore, waiting (with an optional timeout) if
Packit Service 310c69
 * one is not currently available. Also attempts to count the number of times
Packit Service 310c69
 * we'll actually have to wait because there are no tokens (permits) available
Packit Service 310c69
 * in the semaphore, and the number of times the wait times out.
Packit Service 310c69
 *
Packit Service 310c69
 * @param ec       the event count instance
Packit Service 310c69
 * @param timeout  an optional timeout value to pass to attemptSemaphore()
Packit Service 310c69
 *
Packit Service 310c69
 * @return true if a token was consumed, otherwise false only if a timeout
Packit Service 310c69
 *         was specified and we timed out
Packit Service 310c69
 **/
Packit Service 310c69
static bool consumeWaitToken(EventCount *ec, const RelTime *timeout)
Packit Service 310c69
{
Packit Service 310c69
  // Try to grab a token without waiting.
Packit Service 310c69
  if (attemptSemaphore(&ec->semaphore, 0)) {
Packit Service 310c69
    return true;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
Packit Service 310c69
  if (timeout == NULL) {
Packit Service 310c69
    acquireSemaphore(&ec->semaphore);
Packit Service 310c69
  } else if (!attemptSemaphore(&ec->semaphore, *timeout)) {
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
  return true;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeEventCount(EventCount **ecPtr)
Packit Service 310c69
{
Packit Service 310c69
  // The event count will be allocated on a cache line boundary so there will
Packit Service 310c69
  // not be false sharing of the line with any other data structure.
Packit Service 310c69
  EventCount *ec = NULL;
Packit Service 310c69
  int result = ALLOCATE(1, EventCount, "event count", &ec);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  atomic64_set(&ec->state, 0);
Packit Service 310c69
  result = initializeSemaphore(&ec->semaphore, 0);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    FREE(ec);
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  *ecPtr = ec;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freeEventCount(EventCount *ec)
Packit Service 310c69
{
Packit Service 310c69
  if (ec == NULL) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
  destroySemaphore(&ec->semaphore);
Packit Service 310c69
  FREE(ec);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
EventToken eventCountPrepare(EventCount *ec)
Packit Service 310c69
{
Packit Service 310c69
  return atomic64_add_return(ONE_WAITER, &ec->state);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void eventCountCancel(EventCount *ec, EventToken token)
Packit Service 310c69
{
Packit Service 310c69
  // Decrement the waiter count if the event hasn't been signalled.
Packit Service 310c69
  if (fastCancel(ec, token)) {
Packit Service 310c69
    return;
Packit Service 310c69
  }
Packit Service 310c69
  // A signaller has already transferred (or promised to transfer) our token
Packit Service 310c69
  // to the semaphore, so we must consume it from the semaphore by waiting.
Packit Service 310c69
  eventCountWait(ec, token, NULL);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool eventCountWait(EventCount *ec, EventToken token, const RelTime *timeout)
Packit Service 310c69
{
Packit Service 310c69
Packit Service 310c69
  for (;;) {
Packit Service 310c69
    // Wait for a signaller to transfer our wait token to the semaphore.
Packit Service 310c69
    if (!consumeWaitToken(ec, timeout)) {
Packit Service 310c69
      // The wait timed out, so we must cancel the token instead. Try to
Packit Service 310c69
      // decrement the waiter count if the event hasn't been signalled.
Packit Service 310c69
      if (fastCancel(ec, token)) {
Packit Service 310c69
        return false;
Packit Service 310c69
      }
Packit Service 310c69
      /*
Packit Service 310c69
       * We timed out, but a signaller came in before we could cancel the
Packit Service 310c69
       * wait. We have no choice but to wait for the semaphore to be posted.
Packit Service 310c69
       * Since signaller has promised to do it, the wait will be short. The
Packit Service 310c69
       * timeout and the signal happened at about the same time, so either
Packit Service 310c69
       * outcome could be returned. It's simpler to ignore the timeout.
Packit Service 310c69
       */
Packit Service 310c69
      timeout = NULL;
Packit Service 310c69
      continue;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // A wait token has now been consumed from the semaphore.
Packit Service 310c69
Packit Service 310c69
    // Stop waiting if the count has changed since the token was acquired.
Packit Service 310c69
    if (!sameEvent(token, atomic64_read(&ec->state))) {
Packit Service 310c69
      return true;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // We consumed someone else's wait token. Put it back in the semaphore,
Packit Service 310c69
    // which will wake another waiter, hopefully one who can stop waiting.
Packit Service 310c69
    releaseSemaphore(&ec->semaphore);
Packit Service 310c69
Packit Service 310c69
    // Attempt to give an earlier waiter a shot at the semaphore.
Packit Service 310c69
    yieldScheduler();
Packit Service 310c69
  }
Packit Service 310c69
}