Blame nptl/pthread_rwlock_common.c

Packit 6c4009
/* POSIX reader--writer lock: core parts.
Packit 6c4009
   Copyright (C) 2016-2018 Free Software Foundation, Inc.
Packit 6c4009
   This file is part of the GNU C Library.
Packit 6c4009
Packit 6c4009
   The GNU C Library is free software; you can redistribute it and/or
Packit 6c4009
   modify it under the terms of the GNU Lesser General Public
Packit 6c4009
   License as published by the Free Software Foundation; either
Packit 6c4009
   version 2.1 of the License, or (at your option) any later version.
Packit 6c4009
Packit 6c4009
   The GNU C Library is distributed in the hope that it will be useful,
Packit 6c4009
   but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 6c4009
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
Packit 6c4009
   Lesser General Public License for more details.
Packit 6c4009
Packit 6c4009
   You should have received a copy of the GNU Lesser General Public
Packit 6c4009
   License along with the GNU C Library; if not, see
Packit 6c4009
   <http://www.gnu.org/licenses/>.  */
Packit 6c4009
Packit 6c4009
#include <errno.h>
Packit 6c4009
#include <sysdep.h>
Packit 6c4009
#include <pthread.h>
Packit 6c4009
#include <pthreadP.h>
Packit 6c4009
#include <sys/time.h>
Packit 6c4009
#include <stap-probe.h>
Packit 6c4009
#include <atomic.h>
Packit 6c4009
#include <futex-internal.h>
Packit 6c4009
Packit 6c4009
Packit 6c4009
/* A reader--writer lock that fulfills the POSIX requirements (but operations
Packit 6c4009
   on this lock are not necessarily full barriers, as one may interpret the
Packit 6c4009
   POSIX requirement about "synchronizing memory").  All critical sections are
Packit 6c4009
   in a total order, writers synchronize with prior writers and readers, and
Packit 6c4009
   readers synchronize with prior writers.
Packit 6c4009
Packit 6c4009
   A thread is allowed to acquire a read lock recursively (i.e., have rdlock
Packit 6c4009
   critical sections that overlap in sequenced-before) unless the kind of the
Packit Service c9814d
   rwlock is set to PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP.
Packit 6c4009
Packit 6c4009
   This lock is built so that workloads of mostly readers can be executed with
Packit 6c4009
   low runtime overheads.  This matches that the default kind of the lock is
Packit 6c4009
   PTHREAD_RWLOCK_PREFER_READER_NP.  Acquiring a read lock requires a single
Packit 6c4009
   atomic addition if the lock is or was previously acquired by other
Packit 6c4009
   readers; releasing the lock is a single CAS if there are no concurrent
Packit 6c4009
   writers.
Packit 6c4009
   Workloads consisting of mostly writers are of secondary importance.
Packit 6c4009
   An uncontended write lock acquisition is as fast as for a normal
Packit 6c4009
   exclusive mutex but writer contention is somewhat more costly due to
Packit 6c4009
   keeping track of the exact number of writers.  If the rwlock kind requests
Packit Service c9814d
   writers to be preferred (i.e., PTHREAD_RWLOCK_PREFER_WRITER_NP or the
Packit 6c4009
   no-recursive-readers variant of it), then writer--to--writer lock ownership
Packit 6c4009
   hand-over is fairly fast and bypasses lock acquisition attempts by readers.
Packit 6c4009
   The costs of lock ownership transfer between readers and writers vary.  If
Packit 6c4009
   the program asserts that there are no recursive readers and writers are
Packit 6c4009
   preferred, then write lock acquisition attempts will block subsequent read
Packit 6c4009
   lock acquisition attempts, so that new incoming readers do not prolong a
Packit 6c4009
   phase in which readers have acquired the lock.
Packit 6c4009
Packit 6c4009
   The main components of the rwlock are a writer-only lock that allows only
Packit 6c4009
   one of the concurrent writers to be the primary writer, and a
Packit 6c4009
   single-writer-multiple-readers lock that decides between read phases, in
Packit 6c4009
   which readers have acquired the rwlock, and write phases in which a primary
Packit 6c4009
   writer or a sequence of different primary writers have acquired the rwlock.
Packit 6c4009
Packit 6c4009
   The single-writer-multiple-readers lock is the central piece of state
Packit 6c4009
   describing the rwlock and is encoded in the __readers field (see below for
Packit 6c4009
   a detailed explanation):
Packit 6c4009
Packit 6c4009
   State WP  WL  R   RW  Notes
Packit 6c4009
   ---------------------------
Packit 6c4009
   #1    0   0   0   0   Lock is idle (and in a read phase).
Packit 6c4009
   #2    0   0   >0  0   Readers have acquired the lock.
Packit 6c4009
   #3    0   1   0   0   Lock is not acquired; a writer will try to start a
Packit 6c4009
			 write phase.
Packit 6c4009
   #4    0   1   >0  0   Readers have acquired the lock; a writer is waiting
Packit 6c4009
			 and explicit hand-over to the writer is required.
Packit 6c4009
   #4a   0   1   >0  1   Same as #4 except that there are further readers
Packit 6c4009
			 waiting because the writer is to be preferred.
Packit 6c4009
   #5    1   0   0   0   Lock is idle (and in a write phase).
Packit 6c4009
   #6    1   0   >0  0   Write phase; readers will try to start a read phase
Packit 6c4009
			 (requires explicit hand-over to all readers that
Packit 6c4009
			 do not start the read phase).
Packit 6c4009
   #7    1   1   0   0   Lock is acquired by a writer.
Packit 6c4009
   #8    1   1   >0  0   Lock acquired by a writer and readers are waiting;
Packit 6c4009
			 explicit hand-over to the readers is required.
Packit 6c4009
Packit 6c4009
   WP (PTHREAD_RWLOCK_WRPHASE) is true if the lock is in a write phase, so
Packit 6c4009
   potentially acquired by a primary writer.
Packit 6c4009
   WL (PTHREAD_RWLOCK_WRLOCKED) is true if there is a primary writer (i.e.,
Packit 6c4009
   the thread that was able to set this bit from false to true).
Packit 6c4009
   R (all bits in __readers except the number of least-significant bits
Packit 6c4009
   denoted in PTHREAD_RWLOCK_READER_SHIFT) is the number of readers that have
Packit 6c4009
   or are trying to acquired the lock.  There may be more readers waiting if
Packit 6c4009
   writers are preferred and there will be no recursive readers, in which
Packit 6c4009
   case RW (PTHREAD_RWLOCK_RWAITING) is true in state #4a.
Packit 6c4009
Packit 6c4009
   We want to block using futexes but using __readers as a futex word directly
Packit 6c4009
   is not a good solution.  First, we want to wait on different conditions
Packit 6c4009
   such as waiting for a phase change vs. waiting for the primary writer to
Packit 6c4009
   release the writer-only lock.  Second, the number of readers could change
Packit 6c4009
   frequently, which would make it likely that a writer's futex_wait fails
Packit 6c4009
   frequently too because the expected value does not match the value of
Packit 6c4009
   __readers anymore.
Packit 6c4009
   Therefore, we split out the futex words into the __wrphase_futex and
Packit 6c4009
   __writers_futex fields.  The former tracks the value of the WP bit and is
Packit 6c4009
   changed after changing WP by the thread that changes WP.  However, because
Packit 6c4009
   of the POSIX requirements regarding mutex/rwlock destruction (i.e., that
Packit 6c4009
   destroying a rwlock is allowed as soon as no thread has acquired or will
Packit 6c4009
   acquire the lock), we have to be careful and hand over lock ownership (via
Packit 6c4009
   a phase change) carefully to those threads waiting.  Specifically, we must
Packit 6c4009
   prevent a situation in which we are not quite sure whether we still have
Packit 6c4009
   to unblock another thread through a change to memory (executing a
Packit 6c4009
   futex_wake on a former futex word that is now used for something else is
Packit 6c4009
   fine).
Packit 6c4009
   The scheme we use for __wrphase_futex is that waiting threads that may
Packit 6c4009
   use the futex word to block now all have to use the futex word to block; it
Packit 6c4009
   is not allowed to take the short-cut and spin-wait on __readers because
Packit 6c4009
   then the waking thread cannot just make one final change to memory to
Packit 6c4009
   unblock all potentially waiting threads.  If, for example, a reader
Packit 6c4009
   increments R in states #7 or #8, it has to then block until __wrphase_futex
Packit 6c4009
   is 0 and it can confirm that the value of 0 was stored by the primary
Packit 6c4009
   writer; in turn, the primary writer has to change to a read phase too when
Packit 6c4009
   releasing WL (i.e., to state #2), and it must change __wrphase_futex to 0
Packit 6c4009
   as the next step.  This ensures that the waiting reader will not be able to
Packit 6c4009
   acquire, release, and then destroy the lock concurrently with the pending
Packit 6c4009
   futex unblock operations by the former primary writer.  This scheme is
Packit 6c4009
   called explicit hand-over in what follows.
Packit 6c4009
   Note that waiting threads can cancel waiting only if explicit hand-over has
Packit 6c4009
   not yet started (e.g., if __readers is still in states #7 or #8 in the
Packit 6c4009
   example above).
Packit 6c4009
Packit 6c4009
   Writers determine the primary writer through WL.  Blocking using futexes
Packit 6c4009
   is performed using __writers_futex as a futex word; primary writers will
Packit 6c4009
   enable waiting on this futex by setting it to 1 after they acquired the WL
Packit 6c4009
   bit and will disable waiting by setting it to 0 before they release WL.
Packit 6c4009
   This leaves small windows where blocking using futexes is not possible
Packit 6c4009
   although a primary writer exists, but in turn decreases complexity of the
Packit 6c4009
   writer--writer synchronization and does not affect correctness.
Packit 6c4009
   If writers are preferred, writers can hand over WL directly to other
Packit 6c4009
   waiting writers that registered by incrementing __writers:  If the primary
Packit 6c4009
   writer can CAS __writers from a non-zero value to the same value with the
Packit 6c4009
   PTHREAD_RWLOCK_WRHANDOVER bit set, it effectively transfers WL ownership
Packit 6c4009
   to one of the registered waiting writers and does not reset WL; in turn,
Packit 6c4009
   a registered writer that can clear PTHREAD_RWLOCK_WRHANDOVER using a CAS
Packit 6c4009
   then takes over WL.  Note that registered waiting writers can cancel
Packit 6c4009
   waiting by decrementing __writers, but the last writer to unregister must
Packit 6c4009
   become the primary writer if PTHREAD_RWLOCK_WRHANDOVER is set.
Packit 6c4009
   Also note that adding another state/bit to signal potential writer--writer
Packit 6c4009
   contention (e.g., as done in the normal mutex algorithm) would not be
Packit 6c4009
   helpful because we would have to conservatively assume that there is in
Packit 6c4009
   fact no other writer, and wake up readers too.
Packit 6c4009
Packit 6c4009
   To avoid having to call futex_wake when no thread uses __wrphase_futex or
Packit 6c4009
   __writers_futex, threads will set the PTHREAD_RWLOCK_FUTEX_USED bit in the
Packit 6c4009
   respective futex words before waiting on it (using a CAS so it will only be
Packit 6c4009
   set if in a state in which waiting would be possible).  In the case of
Packit 6c4009
   __writers_futex, we wake only one thread but several threads may share
Packit 6c4009
   PTHREAD_RWLOCK_FUTEX_USED, so we must assume that there are still others.
Packit 6c4009
   This is similar to what we do in pthread_mutex_lock.  We do not need to
Packit 6c4009
   do this for __wrphase_futex because there, we always wake all waiting
Packit 6c4009
   threads.
Packit 6c4009
Packit 6c4009
   Blocking in the state #4a simply uses __readers as futex word.  This
Packit 6c4009
   simplifies the algorithm but suffers from some of the drawbacks discussed
Packit 6c4009
   before, though not to the same extent because R can only decrease in this
Packit 6c4009
   state, so the number of potentially failing futex_wait attempts will be
Packit 6c4009
   bounded.  All threads moving from state #4a to another state must wake
Packit 6c4009
   up threads blocked on the __readers futex.
Packit 6c4009
Packit 6c4009
   The ordering invariants that we have to take care of in the implementation
Packit 6c4009
   are primarily those necessary for a reader--writer lock; this is rather
Packit 6c4009
   straightforward and happens during write/read phase switching (potentially
Packit 6c4009
   through explicit hand-over), and between writers through synchronization
Packit 6c4009
   involving the PTHREAD_RWLOCK_WRLOCKED or PTHREAD_RWLOCK_WRHANDOVER bits.
Packit 6c4009
   Additionally, we need to take care that modifications of __writers_futex
Packit 6c4009
   and __wrphase_futex (e.g., by otherwise unordered readers) take place in
Packit 6c4009
   the writer critical sections or read/write phases, respectively, and that
Packit 6c4009
   explicit hand-over observes stores from the previous phase.  How this is
Packit 6c4009
   done is explained in more detail in comments in the code.
Packit 6c4009
Packit 6c4009
   Many of the accesses to the futex words just need relaxed MO.  This is
Packit 6c4009
   possible because we essentially drive both the core rwlock synchronization
Packit 6c4009
   and the futex synchronization in parallel.  For example, an unlock will
Packit 6c4009
   unlock the rwlock and take part in the futex synchronization (using
Packit 6c4009
   PTHREAD_RWLOCK_FUTEX_USED, see above); even if they are not tightly
Packit 6c4009
   ordered in some way, the futex synchronization ensures that there are no
Packit 6c4009
   lost wake-ups, and woken threads will then eventually see the most recent
Packit 6c4009
   state of the rwlock.  IOW, waiting threads will always be woken up, while
Packit 6c4009
   not being able to wait using futexes (which can happen) is harmless; in
Packit 6c4009
   turn, this means that waiting threads don't need special ordering wrt.
Packit 6c4009
   waking threads.
Packit 6c4009
Packit 6c4009
   The futex synchronization consists of the three-state futex word:
Packit 6c4009
   (1) cannot block on it, (2) can block on it, and (3) there might be a
Packit 6c4009
   thread blocked on it (i.e., with PTHREAD_RWLOCK_FUTEX_USED set).
Packit 6c4009
   Relaxed-MO atomic read-modify-write operations are sufficient to maintain
Packit 6c4009
   this (e.g., using a CAS to go from (2) to (3) but not from (1) to (3)),
Packit 6c4009
   but we need ordering of the futex word modifications by the waking threads
Packit 6c4009
   so that they collectively make correct state changes between (1)-(3).
Packit 6c4009
   The futex-internal synchronization (i.e., the conceptual critical sections
Packit 6c4009
   around futex operations in the kernel) then ensures that even an
Packit 6c4009
   unconstrained load (i.e., relaxed MO) inside of futex_wait will not lead to
Packit 6c4009
   lost wake-ups because either the waiting thread will see the change from
Packit 6c4009
   (3) to (1) when a futex_wake came first, or this futex_wake will wake this
Packit 6c4009
   waiting thread because the waiting thread came first.
Packit 6c4009
Packit 6c4009
Packit 6c4009
   POSIX allows but does not require rwlock acquisitions to be a cancellation
Packit 6c4009
   point.  We do not support cancellation.
Packit 6c4009
Packit 6c4009
   TODO We do not try to elide any read or write lock acquisitions currently.
Packit 6c4009
   While this would be possible, it is unclear whether HTM performance is
Packit 6c4009
   currently predictable enough and our runtime tuning is good enough at
Packit 6c4009
   deciding when to use elision so that enabling it would lead to consistently
Packit 6c4009
   better performance.  */
Packit 6c4009
Packit 6c4009
Packit 6c4009
static int
Packit 6c4009
__pthread_rwlock_get_private (pthread_rwlock_t *rwlock)
Packit 6c4009
{
Packit 6c4009
  return rwlock->__data.__shared != 0 ? FUTEX_SHARED : FUTEX_PRIVATE;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
static __always_inline void
Packit 6c4009
__pthread_rwlock_rdunlock (pthread_rwlock_t *rwlock)
Packit 6c4009
{
Packit 6c4009
  int private = __pthread_rwlock_get_private (rwlock);
Packit 6c4009
  /* We decrease the number of readers, and if we are the last reader and
Packit 6c4009
     there is a primary writer, we start a write phase.  We use a CAS to
Packit 6c4009
     make this atomic so that it is clear whether we must hand over ownership
Packit 6c4009
     explicitly.  */
Packit 6c4009
  unsigned int r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
  unsigned int rnew;
Packit 6c4009
  for (;;)
Packit 6c4009
    {
Packit 6c4009
      rnew = r - (1 << PTHREAD_RWLOCK_READER_SHIFT);
Packit 6c4009
      /* If we are the last reader, we also need to unblock any readers
Packit 6c4009
	 that are waiting for a writer to go first (PTHREAD_RWLOCK_RWAITING)
Packit 6c4009
	 so that they can register while the writer is active.  */
Packit 6c4009
      if ((rnew >> PTHREAD_RWLOCK_READER_SHIFT) == 0)
Packit 6c4009
	{
Packit 6c4009
	  if ((rnew & PTHREAD_RWLOCK_WRLOCKED) != 0)
Packit 6c4009
	    rnew |= PTHREAD_RWLOCK_WRPHASE;
Packit 6c4009
	  rnew &= ~(unsigned int) PTHREAD_RWLOCK_RWAITING;
Packit 6c4009
	}
Packit 6c4009
      /* We need release MO here for three reasons.  First, so that we
Packit 6c4009
	 synchronize with subsequent writers.  Second, we might have been the
Packit 6c4009
	 first reader and set __wrphase_futex to 0, so we need to synchronize
Packit 6c4009
	 with the last reader that will set it to 1 (note that we will always
Packit 6c4009
	 change __readers before the last reader, or we are the last reader).
Packit 6c4009
	 Third, a writer that takes part in explicit hand-over needs to see
Packit 6c4009
	 the first reader's store to __wrphase_futex (or a later value) if
Packit 6c4009
	 the writer observes that a write phase has been started.  */
Packit 6c4009
      if (atomic_compare_exchange_weak_release (&rwlock->__data.__readers,
Packit Service c9814d
						&r, rnew))
Packit 6c4009
	break;
Packit 6c4009
      /* TODO Back-off.  */
Packit 6c4009
    }
Packit 6c4009
  if ((rnew & PTHREAD_RWLOCK_WRPHASE) != 0)
Packit 6c4009
    {
Packit 6c4009
      /* We need to do explicit hand-over.  We need the acquire MO fence so
Packit 6c4009
	 that our modification of _wrphase_futex happens after a store by
Packit 6c4009
	 another reader that started a read phase.  Relaxed MO is sufficient
Packit 6c4009
	 for the modification of __wrphase_futex because it is just used
Packit 6c4009
	 to delay acquisition by a writer until all threads are unblocked
Packit 6c4009
	 irrespective of whether they are looking at __readers or
Packit 6c4009
	 __wrphase_futex; any other synchronizes-with relations that are
Packit 6c4009
	 necessary are established through __readers.  */
Packit 6c4009
      atomic_thread_fence_acquire ();
Packit 6c4009
      if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 1)
Packit 6c4009
	   & PTHREAD_RWLOCK_FUTEX_USED) != 0)
Packit 6c4009
	futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
Packit 6c4009
    }
Packit 6c4009
  /* Also wake up waiting readers if we did reset the RWAITING flag.  */
Packit 6c4009
  if ((r & PTHREAD_RWLOCK_RWAITING) != (rnew & PTHREAD_RWLOCK_RWAITING))
Packit 6c4009
    futex_wake (&rwlock->__data.__readers, INT_MAX, private);
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
static __always_inline int
Packit 6c4009
__pthread_rwlock_rdlock_full (pthread_rwlock_t *rwlock,
Packit 6c4009
    const struct timespec *abstime)
Packit 6c4009
{
Packit 6c4009
  unsigned int r;
Packit 6c4009
Packit 6c4009
  /* Make sure we are not holding the rwlock as a writer.  This is a deadlock
Packit 6c4009
     situation we recognize and report.  */
Packit 6c4009
  if (__glibc_unlikely (atomic_load_relaxed (&rwlock->__data.__cur_writer)
Packit Service c9814d
			== THREAD_GETMEM (THREAD_SELF, tid)))
Packit 6c4009
    return EDEADLK;
Packit 6c4009
Packit 6c4009
  /* If we prefer writers, recursive rdlock is disallowed, we are in a read
Packit 6c4009
     phase, and there are other readers present, we try to wait without
Packit 6c4009
     extending the read phase.  We will be unblocked by either one of the
Packit 6c4009
     other active readers, or if the writer gives up WRLOCKED (e.g., on
Packit 6c4009
     timeout).
Packit 6c4009
     If there are no other readers, we simply race with any existing primary
Packit 6c4009
     writer; it would have been a race anyway, and changing the odds slightly
Packit 6c4009
     will likely not make a big difference.  */
Packit 6c4009
  if (rwlock->__data.__flags == PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
Packit 6c4009
    {
Packit 6c4009
      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit Service c9814d
      while ((r & PTHREAD_RWLOCK_WRPHASE) == 0
Packit Service c9814d
	     && (r & PTHREAD_RWLOCK_WRLOCKED) != 0
Packit Service c9814d
	     && (r >> PTHREAD_RWLOCK_READER_SHIFT) > 0)
Packit 6c4009
	{
Packit 6c4009
	  /* TODO Spin first.  */
Packit 6c4009
	  /* Try setting the flag signaling that we are waiting without having
Packit 6c4009
	     incremented the number of readers.  Relaxed MO is fine because
Packit 6c4009
	     this is just about waiting for a state change in __readers.  */
Packit 6c4009
	  if (atomic_compare_exchange_weak_relaxed
Packit 6c4009
	      (&rwlock->__data.__readers, &r, r | PTHREAD_RWLOCK_RWAITING))
Packit 6c4009
	    {
Packit 6c4009
	      /* Wait for as long as the flag is set.  An ABA situation is
Packit 6c4009
		 harmless because the flag is just about the state of
Packit 6c4009
		 __readers, and all threads set the flag under the same
Packit 6c4009
		 conditions.  */
Packit Service a21b76
	      while (((r = atomic_load_relaxed (&rwlock->__data.__readers))
Packit Service c9814d
		      & PTHREAD_RWLOCK_RWAITING) != 0)
Packit 6c4009
		{
Packit 6c4009
		  int private = __pthread_rwlock_get_private (rwlock);
Packit 6c4009
		  int err = futex_abstimed_wait (&rwlock->__data.__readers,
Packit Service c9814d
						 r, abstime, private);
Packit 6c4009
		  /* We ignore EAGAIN and EINTR.  On time-outs, we can just
Packit 6c4009
		     return because we don't need to clean up anything.  */
Packit 6c4009
		  if (err == ETIMEDOUT)
Packit 6c4009
		    return err;
Packit 6c4009
		}
Packit 6c4009
	      /* It makes sense to not break out of the outer loop here
Packit 6c4009
		 because we might be in the same situation again.  */
Packit 6c4009
	    }
Packit 6c4009
	  else
Packit 6c4009
	    {
Packit 6c4009
	      /* TODO Back-off.  */
Packit 6c4009
	    }
Packit 6c4009
	}
Packit 6c4009
    }
Packit 6c4009
  /* Register as a reader, using an add-and-fetch so that R can be used as
Packit 6c4009
     expected value for future operations.  Acquire MO so we synchronize with
Packit 6c4009
     prior writers as well as the last reader of the previous read phase (see
Packit 6c4009
     below).  */
Packit Service c9814d
  r = (atomic_fetch_add_acquire (&rwlock->__data.__readers,
Packit Service c9814d
				 (1 << PTHREAD_RWLOCK_READER_SHIFT))
Packit Service c9814d
       + (1 << PTHREAD_RWLOCK_READER_SHIFT));
Packit 6c4009
Packit 6c4009
  /* Check whether there is an overflow in the number of readers.  We assume
Packit 6c4009
     that the total number of threads is less than half the maximum number
Packit 6c4009
     of readers that we have bits for in __readers (i.e., with 32-bit int and
Packit 6c4009
     PTHREAD_RWLOCK_READER_SHIFT of 3, we assume there are less than
Packit 6c4009
     1 << (32-3-1) concurrent threads).
Packit 6c4009
     If there is an overflow, we use a CAS to try to decrement the number of
Packit 6c4009
     readers if there still is an overflow situation.  If so, we return
Packit 6c4009
     EAGAIN; if not, we are not a thread causing an overflow situation, and so
Packit 6c4009
     we just continue.  Using a fetch-add instead of the CAS isn't possible
Packit 6c4009
     because other readers might release the lock concurrently, which could
Packit 6c4009
     make us the last reader and thus responsible for handing ownership over
Packit 6c4009
     to writers (which requires a CAS too to make the decrement and ownership
Packit 6c4009
     transfer indivisible).  */
Packit 6c4009
  while (__glibc_unlikely (r >= PTHREAD_RWLOCK_READER_OVERFLOW))
Packit 6c4009
    {
Packit 6c4009
      /* Relaxed MO is okay because we just want to undo our registration and
Packit 6c4009
	 cannot have changed the rwlock state substantially if the CAS
Packit 6c4009
	 succeeds.  */
Packit Service c9814d
      if (atomic_compare_exchange_weak_relaxed
Packit Service c9814d
	  (&rwlock->__data.__readers,
Packit Service c9814d
	   &r, r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
Packit 6c4009
	return EAGAIN;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* We have registered as a reader, so if we are in a read phase, we have
Packit 6c4009
     acquired a read lock.  This is also the reader--reader fast-path.
Packit 6c4009
     Even if there is a primary writer, we just return.  If writers are to
Packit 6c4009
     be preferred and we are the only active reader, we could try to enter a
Packit 6c4009
     write phase to let the writer proceed.  This would be okay because we
Packit 6c4009
     cannot have acquired the lock previously as a reader (which could result
Packit 6c4009
     in deadlock if we would wait for the primary writer to run).  However,
Packit 6c4009
     this seems to be a corner case and handling it specially not be worth the
Packit 6c4009
     complexity.  */
Packit 6c4009
  if (__glibc_likely ((r & PTHREAD_RWLOCK_WRPHASE) == 0))
Packit 6c4009
    return 0;
Packit 6c4009
  /* Otherwise, if we were in a write phase (states #6 or #8), we must wait
Packit 6c4009
     for explicit hand-over of the read phase; the only exception is if we
Packit 6c4009
     can start a read phase if there is no primary writer currently.  */
Packit Service c9814d
  while ((r & PTHREAD_RWLOCK_WRPHASE) != 0
Packit Service c9814d
	 && (r & PTHREAD_RWLOCK_WRLOCKED) == 0)
Packit 6c4009
    {
Packit Service c9814d
      /* Try to enter a read phase: If the CAS below succeeds, we have
Packit 6c4009
	 ownership; if it fails, we will simply retry and reassess the
Packit 6c4009
	 situation.
Packit 6c4009
	 Acquire MO so we synchronize with prior writers.  */
Packit 6c4009
      if (atomic_compare_exchange_weak_acquire (&rwlock->__data.__readers, &r,
Packit Service c9814d
						r ^ PTHREAD_RWLOCK_WRPHASE))
Packit 6c4009
	{
Packit 6c4009
	  /* We started the read phase, so we are also responsible for
Packit 6c4009
	     updating the write-phase futex.  Relaxed MO is sufficient.
Packit 6c4009
	     We have to do the same steps as a writer would when handing
Packit 6c4009
	     over the read phase to us because other readers cannot
Packit 6c4009
	     distinguish between us and the writer; this includes
Packit 6c4009
	     explicit hand-over and potentially having to wake other readers
Packit 6c4009
	     (but we can pretend to do the setting and unsetting of WRLOCKED
Packit 6c4009
	     atomically, and thus can skip this step).  */
Packit 6c4009
	  if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
Packit Service c9814d
	       & PTHREAD_RWLOCK_FUTEX_USED) != 0)
Packit 6c4009
	    {
Packit 6c4009
	      int private = __pthread_rwlock_get_private (rwlock);
Packit 6c4009
	      futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
Packit 6c4009
	    }
Packit 6c4009
	  return 0;
Packit 6c4009
	}
Packit 6c4009
      else
Packit 6c4009
	{
Packit 6c4009
	  /* TODO Back off before retrying.  Also see above.  */
Packit 6c4009
	}
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* We were in a write phase but did not install the read phase.  We cannot
Packit 6c4009
     distinguish between a writer and another reader starting the read phase,
Packit 6c4009
     so we must wait for explicit hand-over via __wrphase_futex.
Packit 6c4009
     However, __wrphase_futex might not have been set to 1 yet (either
Packit 6c4009
     because explicit hand-over to the writer is still ongoing, or because
Packit 6c4009
     the writer has started the write phase but has not yet updated
Packit 6c4009
     __wrphase_futex).  The least recent value of __wrphase_futex we can
Packit 6c4009
     read from here is the modification of the last read phase (because
Packit 6c4009
     we synchronize with the last reader in this read phase through
Packit 6c4009
     __readers; see the use of acquire MO on the fetch_add above).
Packit 6c4009
     Therefore, if we observe a value of 0 for __wrphase_futex, we need
Packit 6c4009
     to subsequently check that __readers now indicates a read phase; we
Packit 6c4009
     need to use acquire MO for this so that if we observe a read phase,
Packit 6c4009
     we will also see the modification of __wrphase_futex by the previous
Packit 6c4009
     writer.  We then need to load __wrphase_futex again and continue to
Packit 6c4009
     wait if it is not 0, so that we do not skip explicit hand-over.
Packit 6c4009
     Relaxed MO is sufficient for the load from __wrphase_futex because
Packit 6c4009
     we just use it as an indicator for when we can proceed; we use
Packit 6c4009
     __readers and the acquire MO accesses to it to eventually read from
Packit 6c4009
     the proper stores to __wrphase_futex.  */
Packit 6c4009
  unsigned int wpf;
Packit 6c4009
  bool ready = false;
Packit 6c4009
  for (;;)
Packit 6c4009
    {
Packit 6c4009
      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
Packit Service c9814d
	      | PTHREAD_RWLOCK_FUTEX_USED) == (1 | PTHREAD_RWLOCK_FUTEX_USED))
Packit 6c4009
	{
Packit 6c4009
	  int private = __pthread_rwlock_get_private (rwlock);
Packit 6c4009
	  if (((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0)
Packit Service c9814d
	      && (!atomic_compare_exchange_weak_relaxed
Packit 6c4009
		  (&rwlock->__data.__wrphase_futex,
Packit Service c9814d
		   &wpf, wpf | PTHREAD_RWLOCK_FUTEX_USED)))
Packit 6c4009
	    continue;
Packit 6c4009
	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
Packit Service c9814d
					 1 | PTHREAD_RWLOCK_FUTEX_USED,
Packit Service c9814d
					 abstime, private);
Packit 6c4009
	  if (err == ETIMEDOUT)
Packit 6c4009
	    {
Packit 6c4009
	      /* If we timed out, we need to unregister.  If no read phase
Packit 6c4009
		 has been installed while we waited, we can just decrement
Packit 6c4009
		 the number of readers.  Otherwise, we just acquire the
Packit 6c4009
		 lock, which is allowed because we give no precise timing
Packit 6c4009
		 guarantees, and because the timeout is only required to
Packit 6c4009
		 be in effect if we would have had to wait for other
Packit 6c4009
		 threads (e.g., if futex_wait would time-out immediately
Packit 6c4009
		 because the given absolute time is in the past).  */
Packit 6c4009
	      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
	      while ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
Packit 6c4009
		{
Packit 6c4009
		  /* We don't need to make anything else visible to
Packit 6c4009
		     others besides unregistering, so relaxed MO is
Packit 6c4009
		     sufficient.  */
Packit 6c4009
		  if (atomic_compare_exchange_weak_relaxed
Packit 6c4009
		      (&rwlock->__data.__readers, &r,
Packit 6c4009
		       r - (1 << PTHREAD_RWLOCK_READER_SHIFT)))
Packit 6c4009
		    return ETIMEDOUT;
Packit 6c4009
		  /* TODO Back-off.  */
Packit 6c4009
		}
Packit 6c4009
	      /* Use the acquire MO fence to mirror the steps taken in the
Packit 6c4009
		 non-timeout case.  Note that the read can happen both
Packit 6c4009
		 in the atomic_load above as well as in the failure case
Packit 6c4009
		 of the CAS operation.  */
Packit 6c4009
	      atomic_thread_fence_acquire ();
Packit 6c4009
	      /* We still need to wait for explicit hand-over, but we must
Packit 6c4009
		 not use futex_wait anymore because we would just time out
Packit 6c4009
		 in this case and thus make the spin-waiting we need
Packit 6c4009
		 unnecessarily expensive.  */
Packit 6c4009
	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
Packit Service c9814d
		      | PTHREAD_RWLOCK_FUTEX_USED)
Packit Service c9814d
		     == (1 | PTHREAD_RWLOCK_FUTEX_USED))
Packit 6c4009
		{
Packit 6c4009
		  /* TODO Back-off?  */
Packit 6c4009
		}
Packit 6c4009
	      ready = true;
Packit 6c4009
	      break;
Packit 6c4009
	    }
Packit 6c4009
	  /* If we got interrupted (EINTR) or the futex word does not have the
Packit 6c4009
	     expected value (EAGAIN), retry.  */
Packit 6c4009
	}
Packit 6c4009
      if (ready)
Packit 6c4009
	/* See below.  */
Packit 6c4009
	break;
Packit 6c4009
      /* We need acquire MO here so that we synchronize with the lock
Packit 6c4009
	 release of the writer, and so that we observe a recent value of
Packit 6c4009
	 __wrphase_futex (see below).  */
Packit 6c4009
      if ((atomic_load_acquire (&rwlock->__data.__readers)
Packit Service c9814d
	   & PTHREAD_RWLOCK_WRPHASE) == 0)
Packit 6c4009
	/* We are in a read phase now, so the least recent modification of
Packit 6c4009
	   __wrphase_futex we can read from is the store by the writer
Packit 6c4009
	   with value 1.  Thus, only now we can assume that if we observe
Packit 6c4009
	   a value of 0, explicit hand-over is finished. Retry the loop
Packit 6c4009
	   above one more time.  */
Packit 6c4009
	ready = true;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  return 0;
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
static __always_inline void
Packit 6c4009
__pthread_rwlock_wrunlock (pthread_rwlock_t *rwlock)
Packit 6c4009
{
Packit 6c4009
  int private = __pthread_rwlock_get_private (rwlock);
Packit 6c4009
Packit 6c4009
  atomic_store_relaxed (&rwlock->__data.__cur_writer, 0);
Packit 6c4009
  /* Disable waiting by writers.  We will wake up after we decided how to
Packit 6c4009
     proceed.  */
Packit Service c9814d
  bool wake_writers
Packit Service c9814d
    = ((atomic_exchange_relaxed (&rwlock->__data.__writers_futex, 0)
Packit Service c9814d
	& PTHREAD_RWLOCK_FUTEX_USED) != 0);
Packit 6c4009
Packit 6c4009
  if (rwlock->__data.__flags != PTHREAD_RWLOCK_PREFER_READER_NP)
Packit 6c4009
    {
Packit 6c4009
      /* First, try to hand over to another writer.  */
Packit 6c4009
      unsigned int w = atomic_load_relaxed (&rwlock->__data.__writers);
Packit 6c4009
      while (w != 0)
Packit 6c4009
	{
Packit 6c4009
	  /* Release MO so that another writer that gets WRLOCKED from us will
Packit 6c4009
	     synchronize with us and thus can take over our view of
Packit 6c4009
	     __readers (including, for example, whether we are in a write
Packit 6c4009
	     phase or not).  */
Packit Service c9814d
	  if (atomic_compare_exchange_weak_release
Packit Service c9814d
	      (&rwlock->__data.__writers, &w, w | PTHREAD_RWLOCK_WRHANDOVER))
Packit 6c4009
	    /* Another writer will take over.  */
Packit 6c4009
	    goto done;
Packit 6c4009
	  /* TODO Back-off.  */
Packit 6c4009
	}
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* We have done everything we needed to do to prefer writers, so now we
Packit 6c4009
     either hand over explicitly to readers if there are any, or we simply
Packit 6c4009
     stay in a write phase.  See pthread_rwlock_rdunlock for more details.  */
Packit 6c4009
  unsigned int r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
  /* Release MO so that subsequent readers or writers synchronize with us.  */
Packit 6c4009
  while (!atomic_compare_exchange_weak_release
Packit Service c9814d
	 (&rwlock->__data.__readers, &r,
Packit Service c9814d
	  ((r ^ PTHREAD_RWLOCK_WRLOCKED)
Packit Service c9814d
	   ^ ((r >> PTHREAD_RWLOCK_READER_SHIFT) == 0 ? 0
Packit Service c9814d
	      : PTHREAD_RWLOCK_WRPHASE))))
Packit 6c4009
    {
Packit 6c4009
      /* TODO Back-off.  */
Packit 6c4009
    }
Packit 6c4009
  if ((r >> PTHREAD_RWLOCK_READER_SHIFT) != 0)
Packit 6c4009
    {
Packit 6c4009
      /* We must hand over explicitly through __wrphase_futex.  Relaxed MO is
Packit 6c4009
	 sufficient because it is just used to delay acquisition by a writer;
Packit 6c4009
	 any other synchronizes-with relations that are necessary are
Packit 6c4009
	 established through __readers.  */
Packit 6c4009
      if ((atomic_exchange_relaxed (&rwlock->__data.__wrphase_futex, 0)
Packit 6c4009
	   & PTHREAD_RWLOCK_FUTEX_USED) != 0)
Packit 6c4009
	futex_wake (&rwlock->__data.__wrphase_futex, INT_MAX, private);
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
 done:
Packit 6c4009
  /* We released WRLOCKED in some way, so wake a writer.  */
Packit 6c4009
  if (wake_writers)
Packit 6c4009
    futex_wake (&rwlock->__data.__writers_futex, 1, private);
Packit 6c4009
}
Packit 6c4009
Packit 6c4009
Packit 6c4009
static __always_inline int
Packit 6c4009
__pthread_rwlock_wrlock_full (pthread_rwlock_t *rwlock,
Packit 6c4009
    const struct timespec *abstime)
Packit 6c4009
{
Packit 6c4009
  /* Make sure we are not holding the rwlock as a writer.  This is a deadlock
Packit 6c4009
     situation we recognize and report.  */
Packit 6c4009
  if (__glibc_unlikely (atomic_load_relaxed (&rwlock->__data.__cur_writer)
Packit Service c9814d
			== THREAD_GETMEM (THREAD_SELF, tid)))
Packit 6c4009
    return EDEADLK;
Packit 6c4009
Packit 6c4009
  /* First we try to acquire the role of primary writer by setting WRLOCKED;
Packit 6c4009
     if it was set before, there already is a primary writer.  Acquire MO so
Packit 6c4009
     that we synchronize with previous primary writers.
Packit 6c4009
Packit 6c4009
     We do not try to change to a write phase right away using a fetch_or
Packit 6c4009
     because we would have to reset it again and wake readers if there are
Packit 6c4009
     readers present (some readers could try to acquire the lock more than
Packit 6c4009
     once, so setting a write phase in the middle of this could cause
Packit 6c4009
     deadlock).  Changing to a write phase eagerly would only speed up the
Packit 6c4009
     transition from a read phase to a write phase in the uncontended case,
Packit 6c4009
     but it would slow down the contended case if readers are preferred (which
Packit 6c4009
     is the default).
Packit 6c4009
     We could try to CAS from a state with no readers to a write phase, but
Packit 6c4009
     this could be less scalable if readers arrive and leave frequently.  */
Packit 6c4009
  bool may_share_futex_used_flag = false;
Packit 6c4009
  unsigned int r = atomic_fetch_or_acquire (&rwlock->__data.__readers,
Packit Service c9814d
					    PTHREAD_RWLOCK_WRLOCKED);
Packit 6c4009
  if (__glibc_unlikely ((r & PTHREAD_RWLOCK_WRLOCKED) != 0))
Packit 6c4009
    {
Packit 6c4009
      /* There is another primary writer.  */
Packit Service c9814d
      bool prefer_writer
Packit Service c9814d
	= (rwlock->__data.__flags != PTHREAD_RWLOCK_PREFER_READER_NP);
Packit 6c4009
      if (prefer_writer)
Packit 6c4009
	{
Packit 6c4009
	  /* We register as a waiting writer, so that we can make use of
Packit 6c4009
	     writer--writer hand-over.  Relaxed MO is fine because we just
Packit 6c4009
	     want to register.  We assume that the maximum number of threads
Packit 6c4009
	     is less than the capacity in __writers.  */
Packit 6c4009
	  atomic_fetch_add_relaxed (&rwlock->__data.__writers, 1);
Packit 6c4009
	}
Packit 6c4009
      for (;;)
Packit 6c4009
	{
Packit 6c4009
	  /* TODO Spin until WRLOCKED is 0 before trying the CAS below.
Packit 6c4009
	     But pay attention to not delay trying writer--writer hand-over
Packit 6c4009
	     for too long (which we must try eventually anyway).  */
Packit 6c4009
	  if ((r & PTHREAD_RWLOCK_WRLOCKED) == 0)
Packit 6c4009
	    {
Packit 6c4009
	      /* Try to become the primary writer or retry.  Acquire MO as in
Packit 6c4009
		 the fetch_or above.  */
Packit 6c4009
	      if (atomic_compare_exchange_weak_acquire
Packit Service c9814d
		  (&rwlock->__data.__readers, &r, r | PTHREAD_RWLOCK_WRLOCKED))
Packit 6c4009
		{
Packit 6c4009
		  if (prefer_writer)
Packit 6c4009
		    {
Packit 6c4009
		      /* Unregister as a waiting writer.  Note that because we
Packit 6c4009
			 acquired WRLOCKED, WRHANDOVER will not be set.
Packit 6c4009
			 Acquire MO on the CAS above ensures that
Packit 6c4009
			 unregistering happens after the previous writer;
Packit 6c4009
			 this sorts the accesses to __writers by all
Packit 6c4009
			 primary writers in a useful way (e.g., any other
Packit 6c4009
			 primary writer acquiring after us or getting it from
Packit 6c4009
			 us through WRHANDOVER will see both our changes to
Packit 6c4009
			 __writers).
Packit 6c4009
			 ??? Perhaps this is not strictly necessary for
Packit 6c4009
			 reasons we do not yet know of.  */
Packit Service c9814d
		      atomic_fetch_add_relaxed (&rwlock->__data.__writers, -1);
Packit 6c4009
		    }
Packit 6c4009
		  break;
Packit 6c4009
		}
Packit 6c4009
	      /* Retry if the CAS fails (r will have been updated).  */
Packit 6c4009
	      continue;
Packit 6c4009
	    }
Packit 6c4009
	  /* If writer--writer hand-over is available, try to become the
Packit 6c4009
	     primary writer this way by grabbing the WRHANDOVER token.  If we
Packit 6c4009
	     succeed, we own WRLOCKED.  */
Packit 6c4009
	  if (prefer_writer)
Packit 6c4009
	    {
Packit Service c9814d
	      unsigned int w = atomic_load_relaxed (&rwlock->__data.__writers);
Packit 6c4009
	      if ((w & PTHREAD_RWLOCK_WRHANDOVER) != 0)
Packit 6c4009
		{
Packit 6c4009
		  /* Acquire MO is required here so that we synchronize with
Packit 6c4009
		     the writer that handed over WRLOCKED.  We also need this
Packit 6c4009
		     for the reload of __readers below because our view of
Packit 6c4009
		     __readers must be at least as recent as the view of the
Packit 6c4009
		     writer that handed over WRLOCKED; we must avoid an ABA
Packit 6c4009
		     through WRHANDOVER, which could, for example, lead to us
Packit 6c4009
		     assuming we are still in a write phase when in fact we
Packit 6c4009
		     are not.  */
Packit 6c4009
		  if (atomic_compare_exchange_weak_acquire
Packit 6c4009
		      (&rwlock->__data.__writers,
Packit 6c4009
		       &w, (w - PTHREAD_RWLOCK_WRHANDOVER - 1)))
Packit 6c4009
		    {
Packit 6c4009
		      /* Reload so our view is consistent with the view of
Packit 6c4009
			 the previous owner of WRLOCKED.  See above.  */
Packit 6c4009
		      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
		      break;
Packit 6c4009
		    }
Packit 6c4009
		  /* We do not need to reload __readers here.  We should try
Packit 6c4009
		     to perform writer--writer hand-over if possible; if it
Packit 6c4009
		     is not possible anymore, we will reload __readers
Packit 6c4009
		     elsewhere in this loop.  */
Packit 6c4009
		  continue;
Packit 6c4009
		}
Packit 6c4009
	    }
Packit 6c4009
	  /* We did not acquire WRLOCKED nor were able to use writer--writer
Packit 6c4009
	     hand-over, so we block on __writers_futex.  */
Packit 6c4009
	  int private = __pthread_rwlock_get_private (rwlock);
Packit Service c9814d
	  unsigned int wf
Packit Service c9814d
	    = atomic_load_relaxed (&rwlock->__data.__writers_futex);
Packit 6c4009
	  if (((wf & ~(unsigned int) PTHREAD_RWLOCK_FUTEX_USED) != 1)
Packit 6c4009
	      || ((wf != (1 | PTHREAD_RWLOCK_FUTEX_USED))
Packit Service c9814d
		  && (!atomic_compare_exchange_weak_relaxed
Packit 6c4009
		      (&rwlock->__data.__writers_futex, &wf,
Packit Service c9814d
		       1 | PTHREAD_RWLOCK_FUTEX_USED))))
Packit 6c4009
	    {
Packit 6c4009
	      /* If we cannot block on __writers_futex because there is no
Packit 6c4009
		 primary writer, or we cannot set PTHREAD_RWLOCK_FUTEX_USED,
Packit 6c4009
		 we retry.  We must reload __readers here in case we cannot
Packit 6c4009
		 block on __writers_futex so that we can become the primary
Packit 6c4009
		 writer and are not stuck in a loop that just continuously
Packit 6c4009
		 fails to block on __writers_futex.  */
Packit 6c4009
	      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
	      continue;
Packit 6c4009
	    }
Packit 6c4009
	  /* We set the flag that signals that the futex is used, or we could
Packit 6c4009
	     have set it if we had been faster than other waiters.  As a
Packit 6c4009
	     result, we may share the flag with an unknown number of other
Packit 6c4009
	     writers.  Therefore, we must keep this flag set when we acquire
Packit 6c4009
	     the lock.  We do not need to do this when we do not reach this
Packit 6c4009
	     point here because then we are not part of the group that may
Packit 6c4009
	     share the flag, and another writer will wake one of the writers
Packit 6c4009
	     in this group.  */
Packit 6c4009
	  may_share_futex_used_flag = true;
Packit 6c4009
	  int err = futex_abstimed_wait (&rwlock->__data.__writers_futex,
Packit Service c9814d
					 1 | PTHREAD_RWLOCK_FUTEX_USED,
Packit Service c9814d
					 abstime, private);
Packit 6c4009
	  if (err == ETIMEDOUT)
Packit 6c4009
	    {
Packit 6c4009
	      if (prefer_writer)
Packit 6c4009
		{
Packit 6c4009
		  /* We need to unregister as a waiting writer.  If we are the
Packit 6c4009
		     last writer and writer--writer hand-over is available,
Packit 6c4009
		     we must make use of it because nobody else will reset
Packit 6c4009
		     WRLOCKED otherwise.  (If we use it, we simply pretend
Packit 6c4009
		     that this happened before the timeout; see
Packit 6c4009
		     pthread_rwlock_rdlock_full for the full reasoning.)
Packit 6c4009
		     Also see the similar code above.  */
Packit Service c9814d
		  unsigned int w
Packit Service c9814d
		    = atomic_load_relaxed (&rwlock->__data.__writers);
Packit 6c4009
		  while (!atomic_compare_exchange_weak_acquire
Packit Service c9814d
			 (&rwlock->__data.__writers, &w,
Packit 6c4009
			  (w == PTHREAD_RWLOCK_WRHANDOVER + 1 ? 0 : w - 1)))
Packit 6c4009
		    {
Packit 6c4009
		      /* TODO Back-off.  */
Packit 6c4009
		    }
Packit 6c4009
		  if (w == PTHREAD_RWLOCK_WRHANDOVER + 1)
Packit 6c4009
		    {
Packit 6c4009
		      /* We must continue as primary writer.  See above.  */
Packit 6c4009
		      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
		      break;
Packit 6c4009
		    }
Packit 6c4009
		}
Packit 6c4009
	      /* We cleaned up and cannot have stolen another waiting writer's
Packit 6c4009
		 futex wake-up, so just return.  */
Packit 6c4009
	      return ETIMEDOUT;
Packit 6c4009
	    }
Packit 6c4009
	  /* If we got interrupted (EINTR) or the futex word does not have the
Packit 6c4009
	     expected value (EAGAIN), retry after reloading __readers.  */
Packit 6c4009
	  r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
	}
Packit 6c4009
      /* Our snapshot of __readers is up-to-date at this point because we
Packit 6c4009
	 either set WRLOCKED using a CAS (and update r accordingly below,
Packit 6c4009
	 which was used as expected value for the CAS) or got WRLOCKED from
Packit 6c4009
	 another writer whose snapshot of __readers we inherit.  */
Packit 6c4009
      r |= PTHREAD_RWLOCK_WRLOCKED;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* We are the primary writer; enable blocking on __writers_futex.  Relaxed
Packit 6c4009
     MO is sufficient for futex words; acquire MO on the previous
Packit 6c4009
     modifications of __readers ensures that this store happens after the
Packit 6c4009
     store of value 0 by the previous primary writer.  */
Packit 6c4009
  atomic_store_relaxed (&rwlock->__data.__writers_futex,
Packit Service c9814d
			1 | (may_share_futex_used_flag
Packit Service c9814d
			     ? PTHREAD_RWLOCK_FUTEX_USED : 0));
Packit 6c4009
Packit 6c4009
  /* If we are in a write phase, we have acquired the lock.  */
Packit 6c4009
  if ((r & PTHREAD_RWLOCK_WRPHASE) != 0)
Packit 6c4009
    goto done;
Packit 6c4009
Packit 6c4009
  /* If we are in a read phase and there are no readers, try to start a write
Packit 6c4009
     phase.  */
Packit Service c9814d
  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0
Packit Service c9814d
	 && (r >> PTHREAD_RWLOCK_READER_SHIFT) == 0)
Packit 6c4009
    {
Packit 6c4009
      /* Acquire MO so that we synchronize with prior writers and do
Packit 6c4009
	 not interfere with their updates to __writers_futex, as well
Packit 6c4009
	 as regarding prior readers and their updates to __wrphase_futex,
Packit 6c4009
	 respectively.  */
Packit 6c4009
      if (atomic_compare_exchange_weak_acquire (&rwlock->__data.__readers,
Packit Service c9814d
						&r, r | PTHREAD_RWLOCK_WRPHASE))
Packit 6c4009
	{
Packit 6c4009
	  /* We have started a write phase, so need to enable readers to wait.
Packit 6c4009
	     See the similar case in __pthread_rwlock_rdlock_full.  Unlike in
Packit 6c4009
	     that similar case, we are the (only) primary writer and so do
Packit 6c4009
	     not need to wake another writer.  */
Packit 6c4009
	  atomic_store_relaxed (&rwlock->__data.__wrphase_futex, 1);
Packit 6c4009
Packit 6c4009
	  goto done;
Packit 6c4009
	}
Packit 6c4009
      /* TODO Back-off.  */
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
  /* We became the primary writer in a read phase and there were readers when
Packit 6c4009
     we did (because of the previous loop).  Thus, we have to wait for
Packit 6c4009
     explicit hand-over from one of these readers.
Packit 6c4009
     We basically do the same steps as for the similar case in
Packit 6c4009
     __pthread_rwlock_rdlock_full, except that we additionally might try
Packit 6c4009
     to directly hand over to another writer and need to wake up
Packit 6c4009
     other writers or waiting readers (i.e., PTHREAD_RWLOCK_RWAITING).  */
Packit 6c4009
  unsigned int wpf;
Packit 6c4009
  bool ready = false;
Packit 6c4009
  for (;;)
Packit 6c4009
    {
Packit 6c4009
      while (((wpf = atomic_load_relaxed (&rwlock->__data.__wrphase_futex))
Packit Service c9814d
	      | PTHREAD_RWLOCK_FUTEX_USED) == PTHREAD_RWLOCK_FUTEX_USED)
Packit 6c4009
	{
Packit 6c4009
	  int private = __pthread_rwlock_get_private (rwlock);
Packit Service c9814d
	  if ((wpf & PTHREAD_RWLOCK_FUTEX_USED) == 0
Packit Service c9814d
	      && (!atomic_compare_exchange_weak_relaxed
Packit 6c4009
		  (&rwlock->__data.__wrphase_futex, &wpf,
Packit Service c9814d
		   PTHREAD_RWLOCK_FUTEX_USED)))
Packit 6c4009
	    continue;
Packit 6c4009
	  int err = futex_abstimed_wait (&rwlock->__data.__wrphase_futex,
Packit Service c9814d
					 PTHREAD_RWLOCK_FUTEX_USED,
Packit Service c9814d
					 abstime, private);
Packit 6c4009
	  if (err == ETIMEDOUT)
Packit 6c4009
	    {
Packit Service c9814d
	      if (rwlock->__data.__flags != PTHREAD_RWLOCK_PREFER_READER_NP)
Packit 6c4009
		{
Packit 6c4009
		  /* We try writer--writer hand-over.  */
Packit Service c9814d
		  unsigned int w
Packit Service c9814d
		    = atomic_load_relaxed (&rwlock->__data.__writers);
Packit 6c4009
		  if (w != 0)
Packit 6c4009
		    {
Packit 6c4009
		      /* We are about to hand over WRLOCKED, so we must
Packit 6c4009
			 release __writers_futex too; otherwise, we'd have
Packit 6c4009
			 a pending store, which could at least prevent
Packit 6c4009
			 other threads from waiting using the futex
Packit 6c4009
			 because it could interleave with the stores
Packit 6c4009
			 by subsequent writers.  In turn, this means that
Packit 6c4009
			 we have to clean up when we do not hand over
Packit 6c4009
			 WRLOCKED.
Packit 6c4009
			 Release MO so that another writer that gets
Packit 6c4009
			 WRLOCKED from us can take over our view of
Packit 6c4009
			 __readers.  */
Packit Service c9814d
		      unsigned int wf
Packit Service c9814d
			= atomic_exchange_relaxed (&rwlock->__data.__writers_futex, 0);
Packit 6c4009
		      while (w != 0)
Packit 6c4009
			{
Packit 6c4009
			  if (atomic_compare_exchange_weak_release
Packit 6c4009
			      (&rwlock->__data.__writers, &w,
Packit Service c9814d
			       w | PTHREAD_RWLOCK_WRHANDOVER))
Packit 6c4009
			    {
Packit 6c4009
			      /* Wake other writers.  */
Packit 6c4009
			      if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
Packit 6c4009
				futex_wake (&rwlock->__data.__writers_futex,
Packit 6c4009
					    1, private);
Packit 6c4009
			      return ETIMEDOUT;
Packit 6c4009
			    }
Packit 6c4009
			  /* TODO Back-off.  */
Packit 6c4009
			}
Packit 6c4009
		      /* We still own WRLOCKED and someone else might set
Packit 6c4009
			 a write phase concurrently, so enable waiting
Packit 6c4009
			 again.  Make sure we don't loose the flag that
Packit 6c4009
			 signals whether there are threads waiting on
Packit 6c4009
			 this futex.  */
Packit Service c9814d
		      atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
Packit 6c4009
		    }
Packit 6c4009
		}
Packit 6c4009
	      /* If we timed out and we are not in a write phase, we can
Packit 6c4009
		 just stop being a primary writer.  Otherwise, we just
Packit 6c4009
		 acquire the lock.  */
Packit 6c4009
	      r = atomic_load_relaxed (&rwlock->__data.__readers);
Packit 6c4009
	      if ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
Packit 6c4009
		{
Packit 6c4009
		  /* We are about to release WRLOCKED, so we must release
Packit 6c4009
		     __writers_futex too; see the handling of
Packit 6c4009
		     writer--writer hand-over above.  */
Packit Service c9814d
		  unsigned int wf
Packit Service c9814d
		    = atomic_exchange_relaxed (&rwlock->__data.__writers_futex, 0);
Packit 6c4009
		  while ((r & PTHREAD_RWLOCK_WRPHASE) == 0)
Packit 6c4009
		    {
Packit 6c4009
		      /* While we don't need to make anything from a
Packit 6c4009
			 caller's critical section visible to other
Packit 6c4009
			 threads, we need to ensure that our changes to
Packit 6c4009
			 __writers_futex are properly ordered.
Packit 6c4009
			 Therefore, use release MO to synchronize with
Packit 6c4009
			 subsequent primary writers.  Also wake up any
Packit 6c4009
			 waiting readers as they are waiting because of
Packit 6c4009
			 us.  */
Packit 6c4009
		      if (atomic_compare_exchange_weak_release
Packit 6c4009
			  (&rwlock->__data.__readers, &r,
Packit 6c4009
			   (r ^ PTHREAD_RWLOCK_WRLOCKED)
Packit 6c4009
			   & ~(unsigned int) PTHREAD_RWLOCK_RWAITING))
Packit 6c4009
			{
Packit 6c4009
			  /* Wake other writers.  */
Packit 6c4009
			  if ((wf & PTHREAD_RWLOCK_FUTEX_USED) != 0)
Packit 6c4009
			    futex_wake (&rwlock->__data.__writers_futex,
Packit Service c9814d
					1, private);
Packit 6c4009
			  /* Wake waiting readers.  */
Packit 6c4009
			  if ((r & PTHREAD_RWLOCK_RWAITING) != 0)
Packit 6c4009
			    futex_wake (&rwlock->__data.__readers,
Packit Service c9814d
					INT_MAX, private);
Packit 6c4009
			  return ETIMEDOUT;
Packit 6c4009
			}
Packit 6c4009
		    }
Packit 6c4009
		  /* We still own WRLOCKED and someone else might set a
Packit 6c4009
		     write phase concurrently, so enable waiting again.
Packit 6c4009
		     Make sure we don't loose the flag that signals
Packit 6c4009
		     whether there are threads waiting on this futex.  */
Packit 6c4009
		  atomic_store_relaxed (&rwlock->__data.__writers_futex, wf);
Packit 6c4009
		}
Packit 6c4009
	      /* Use the acquire MO fence to mirror the steps taken in the
Packit 6c4009
		 non-timeout case.  Note that the read can happen both
Packit 6c4009
		 in the atomic_load above as well as in the failure case
Packit 6c4009
		 of the CAS operation.  */
Packit 6c4009
	      atomic_thread_fence_acquire ();
Packit 6c4009
	      /* We still need to wait for explicit hand-over, but we must
Packit 6c4009
		 not use futex_wait anymore.  */
Packit Service c9814d
	      while ((atomic_load_relaxed (&rwlock->__data.__wrphase_futex)
Packit Service c9814d
		      | PTHREAD_RWLOCK_FUTEX_USED)
Packit Service c9814d
		     == PTHREAD_RWLOCK_FUTEX_USED)
Packit 6c4009
		{
Packit 6c4009
		  /* TODO Back-off.  */
Packit 6c4009
		}
Packit 6c4009
	      ready = true;
Packit 6c4009
	      break;
Packit 6c4009
	    }
Packit 6c4009
	  /* If we got interrupted (EINTR) or the futex word does not have
Packit 6c4009
	     the expected value (EAGAIN), retry.  */
Packit 6c4009
	}
Packit 6c4009
      /* See pthread_rwlock_rdlock_full.  */
Packit 6c4009
      if (ready)
Packit 6c4009
	break;
Packit 6c4009
      if ((atomic_load_acquire (&rwlock->__data.__readers)
Packit Service c9814d
	   & PTHREAD_RWLOCK_WRPHASE) != 0)
Packit 6c4009
	ready = true;
Packit 6c4009
    }
Packit 6c4009
Packit 6c4009
 done:
Packit 6c4009
  atomic_store_relaxed (&rwlock->__data.__cur_writer,
Packit Service c9814d
			THREAD_GETMEM (THREAD_SELF, tid));
Packit 6c4009
  return 0;
Packit 6c4009
}