/*
* Copyright (c) 2020 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*
* $Id: //eng/uds-releases/jasper/src/uds/util/eventCount.c#2 $
*/
/**
* This EventCount implementation uses a posix semaphore for portability,
* although a futex would be slightly superior to use and easy to substitute.
* It is designed to make signalling as cheap as possible, since that is the
* code path likely triggered on most updates to a lock-free data structure.
* Waiters are likely going to sleep, so optimizing for that case isn't
* necessary.
*
* The critical field is the state, which is really two fields that can be
* atomically updated in unison: an event counter and a waiter count. Every
* call to eventCountPrepare() issues a wait token by atomically incrementing
* the waiter count. The key invariant is a strict accounting of the number of
* tokens issued. Every token returned by eventCountPrepare() is a contract
* that the caller will call acquireSemaphore() and a signaller will call
* releaseSemaphore(), each exactly once. Atomic updates to the state field
* ensure that each token is counted once and that tokens are not lost.
* Cancelling a token attempts to take a fast-path by simply decrementing the
* waiters field, but if the token has already been claimed by a signaller,
* the canceller must still wait on the semaphore to consume the transferred
* token.
*
* The state field is 64 bits, partitioned into a 16-bit waiter field and a
* 48-bit counter. We are unlikely to have 2^16 threads, much less 2^16
* threads waiting on any single event transition. 2^48 microseconds is
* several years, so a token holder would have to wait that long for the
* counter to wrap around, and then call eventCountWait() at the exact right
* time to see the re-used counter, in order to lose a wakeup due to counter
* wrap-around. Using a 32-bit state field would greatly increase that chance,
* but if forced to do so, the implementation could likely tolerate it since
* callers are supposed to hold tokens for miniscule periods of time.
* Fortunately, x64 has 64-bit compare-and-swap, and the performance of
* interlocked 64-bit operations appears to be about the same as for 32-bit
* ones, so being paranoid and using 64 bits costs us nothing.
*
* Here are some sequences of calls and state transitions:
*
* action postcondition
* counter waiters semaphore
* initialized 0 0 0
* prepare 0 1 0
* wait (blocks) 0 1 0
* signal 1 0 1
* wait (unblocks) 1 0 0
*
* signal (fast-path) 1 0 0
* signal (fast-path) 1 0 0
*
* prepare A 1 1 0
* prepare B 1 2 0
* signal 2 0 2
* wait B (fast-path) 2 0 1
* wait A (fast-path) 2 0 0
*
* prepare 2 1 0
* cancel (fast-path) 2 0 0
*
* prepare 2 1 0
* signal 3 0 1
* cancel (must wait) 3 0 0
*
* The EventCount structure is aligned, sized, and allocated to cache line
* boundaries to avoid any false sharing between the EventCount and other
* shared state. The state field and semaphore should fit on a single cache
* line. The instrumentation counters increase the size of the structure so it
* rounds up to use two (64-byte x86) cache lines.
*
* XXX Need interface to access or display instrumentation counters.
**/
#include "eventCount.h"
#include "atomicDefs.h"
#include "common.h"
#include "compiler.h"
#include "cpu.h"
#include "logger.h"
#include "memoryAlloc.h"
#include "threads.h"
enum {
ONE_WAITER = 1, // value used to increment the waiters field
ONE_EVENT = (1 << 16), // value used to increment the event counter
WAITERS_MASK = (ONE_EVENT - 1), // bit mask to access the waiters field
EVENTS_MASK = ~WAITERS_MASK, // bit mask to access the event counter
};
struct eventCount {
// Atomically mutable state:
// low 16 bits: the number of wait tokens not posted to the semaphore
// high 48 bits: current event counter
atomic64_t state;
// Semaphore used to block threads when waiting is required.
Semaphore semaphore;
// Instrumentation counters.
// Declare alignment so we don't share a cache line.
} __attribute__((aligned(CACHE_LINE_BYTES)));
/**
* Test the event field in two tokens for equality.
*
* @return true iff the tokens contain the same event field value
**/
static INLINE bool sameEvent(EventToken token1, EventToken token2)
{
return ((token1 & EVENTS_MASK) == (token2 & EVENTS_MASK));
}
/**********************************************************************/
void eventCountBroadcast(EventCount *ec)
{
// Even if there are no waiters (yet), we will need a memory barrier.
smp_mb();
uint64_t waiters;
uint64_t state = atomic64_read(&ec->state);
uint64_t oldState = state;
do {
// Check if there are any tokens that have not yet been been transferred
// to the semaphore. This is the fast no-waiters path.
waiters = (state & WAITERS_MASK);
if (waiters == 0) {
// Fast path first time through--no need to signal or post if there are
// no observers.
return;
}
/*
* Attempt to atomically claim all the wait tokens and bump the event count
* using an atomic compare-and-swap. This operation contains a memory
* barrier.
*/
EventToken newState = ((state & ~WAITERS_MASK) + ONE_EVENT);
oldState = state;
state = atomic64_cmpxchg(&ec->state, oldState, newState);
// The cmpxchg fails when we lose a race with a new waiter or another
// signaller, so try again.
} while (unlikely(state != oldState));
/*
* Wake the waiters by posting to the semaphore. This effectively transfers
* the wait tokens to the semaphore. There's sadly no bulk post for posix
* semaphores, so we've got to loop to do them all.
*/
while (waiters-- > 0) {
releaseSemaphore(&ec->semaphore);
}
}
/**
* Attempt to cancel a prepared wait token by decrementing the
* number of waiters in the current state. This can only be done
* safely if the event count hasn't been bumped.
*
* @param ec the event count on which the wait token was issued
* @param token the wait to cancel
*
* @return true if the wait was cancelled, false if the caller must
* still wait on the semaphore
**/
static INLINE bool fastCancel(EventCount *ec, EventToken token)
{
EventToken currentToken = atomic64_read(&ec->state);
while (sameEvent(currentToken, token)) {
// Try to decrement the waiter count via compare-and-swap as if we had
// never prepared to wait.
EventToken et = atomic64_cmpxchg(&ec->state, currentToken,
currentToken - 1);
if (et == currentToken) {
return true;
}
currentToken = et;
}
return false;
}
/**
* Consume a token from the semaphore, waiting (with an optional timeout) if
* one is not currently available. Also attempts to count the number of times
* we'll actually have to wait because there are no tokens (permits) available
* in the semaphore, and the number of times the wait times out.
*
* @param ec the event count instance
* @param timeout an optional timeout value to pass to attemptSemaphore()
*
* @return true if a token was consumed, otherwise false only if a timeout
* was specified and we timed out
**/
static bool consumeWaitToken(EventCount *ec, const RelTime *timeout)
{
// Try to grab a token without waiting.
if (attemptSemaphore(&ec->semaphore, 0)) {
return true;
}
if (timeout == NULL) {
acquireSemaphore(&ec->semaphore);
} else if (!attemptSemaphore(&ec->semaphore, *timeout)) {
return false;
}
return true;
}
/**********************************************************************/
int makeEventCount(EventCount **ecPtr)
{
// The event count will be allocated on a cache line boundary so there will
// not be false sharing of the line with any other data structure.
EventCount *ec = NULL;
int result = ALLOCATE(1, EventCount, "event count", &ec);
if (result != UDS_SUCCESS) {
return result;
}
atomic64_set(&ec->state, 0);
result = initializeSemaphore(&ec->semaphore, 0);
if (result != UDS_SUCCESS) {
FREE(ec);
return result;
}
*ecPtr = ec;
return UDS_SUCCESS;
}
/**********************************************************************/
void freeEventCount(EventCount *ec)
{
if (ec == NULL) {
return;
}
destroySemaphore(&ec->semaphore);
FREE(ec);
}
/**********************************************************************/
EventToken eventCountPrepare(EventCount *ec)
{
return atomic64_add_return(ONE_WAITER, &ec->state);
}
/**********************************************************************/
void eventCountCancel(EventCount *ec, EventToken token)
{
// Decrement the waiter count if the event hasn't been signalled.
if (fastCancel(ec, token)) {
return;
}
// A signaller has already transferred (or promised to transfer) our token
// to the semaphore, so we must consume it from the semaphore by waiting.
eventCountWait(ec, token, NULL);
}
/**********************************************************************/
bool eventCountWait(EventCount *ec, EventToken token, const RelTime *timeout)
{
for (;;) {
// Wait for a signaller to transfer our wait token to the semaphore.
if (!consumeWaitToken(ec, timeout)) {
// The wait timed out, so we must cancel the token instead. Try to
// decrement the waiter count if the event hasn't been signalled.
if (fastCancel(ec, token)) {
return false;
}
/*
* We timed out, but a signaller came in before we could cancel the
* wait. We have no choice but to wait for the semaphore to be posted.
* Since signaller has promised to do it, the wait will be short. The
* timeout and the signal happened at about the same time, so either
* outcome could be returned. It's simpler to ignore the timeout.
*/
timeout = NULL;
continue;
}
// A wait token has now been consumed from the semaphore.
// Stop waiting if the count has changed since the token was acquired.
if (!sameEvent(token, atomic64_read(&ec->state))) {
return true;
}
// We consumed someone else's wait token. Put it back in the semaphore,
// which will wake another waiter, hopefully one who can stop waiting.
releaseSemaphore(&ec->semaphore);
// Attempt to give an earlier waiter a shot at the semaphore.
yieldScheduler();
}
}