Blame source/uds/util/funnelQueue.c

Packit Service 310c69
/*
Packit Service 310c69
 * Copyright (c) 2020 Red Hat, Inc.
Packit Service 310c69
 *
Packit Service 310c69
 * This program is free software; you can redistribute it and/or
Packit Service 310c69
 * modify it under the terms of the GNU General Public License
Packit Service 310c69
 * as published by the Free Software Foundation; either version 2
Packit Service 310c69
 * of the License, or (at your option) any later version.
Packit Service 310c69
 * 
Packit Service 310c69
 * This program is distributed in the hope that it will be useful,
Packit Service 310c69
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service 310c69
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service 310c69
 * GNU General Public License for more details.
Packit Service 310c69
 * 
Packit Service 310c69
 * You should have received a copy of the GNU General Public License
Packit Service 310c69
 * along with this program; if not, write to the Free Software
Packit Service 310c69
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit Service 310c69
 * 02110-1301, USA. 
Packit Service 310c69
 *
Packit Service 310c69
 * $Id: //eng/uds-releases/jasper/src/uds/util/funnelQueue.c#2 $
Packit Service 310c69
 */
Packit Service 310c69
Packit Service 310c69
#include "funnelQueue.h"
Packit Service 310c69
Packit Service 310c69
#include "memoryAlloc.h"
Packit Service 310c69
#include "permassert.h"
Packit Service 310c69
#include "uds.h"
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
int makeFunnelQueue(FunnelQueue **queuePtr)
Packit Service 310c69
{
Packit Service 310c69
  // Allocate the queue on a cache line boundary so the producer and consumer
Packit Service 310c69
  // fields in the structure will land on separate cache lines.
Packit Service 310c69
  FunnelQueue *queue;
Packit Service 310c69
  int result = ALLOCATE(1, FunnelQueue, "funnel queue", &queue);
Packit Service 310c69
  if (result != UDS_SUCCESS) {
Packit Service 310c69
    return result;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Initialize the stub entry and put it in the queue, establishing the
Packit Service 310c69
  // invariant that queue->newest and queue->oldest are never null.
Packit Service 310c69
  queue->stub.next = NULL;
Packit Service 310c69
  queue->newest = &queue->stub;
Packit Service 310c69
  queue->oldest = &queue->stub;
Packit Service 310c69
Packit Service 310c69
  *queuePtr = queue;
Packit Service 310c69
  return UDS_SUCCESS;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
void freeFunnelQueue(FunnelQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  FREE(queue);
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
static FunnelQueueEntry *getOldest(FunnelQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
 /*
Packit Service 310c69
  * Barrier requirements: We need a read barrier between reading a "next"
Packit Service 310c69
  * field pointer value and reading anything it points to. There's an
Packit Service 310c69
  * accompanying barrier in funnelQueuePut between its caller setting up the
Packit Service 310c69
  * entry and making it visible.
Packit Service 310c69
  */
Packit Service 310c69
  FunnelQueueEntry *oldest = queue->oldest;
Packit Service 310c69
  FunnelQueueEntry *next   = oldest->next;
Packit Service 310c69
Packit Service 310c69
  if (oldest == &queue->stub) {
Packit Service 310c69
    // When the oldest entry is the stub and it has no successor, the queue is
Packit Service 310c69
    // logically empty.
Packit Service 310c69
    if (next == NULL) {
Packit Service 310c69
      return NULL;
Packit Service 310c69
    }
Packit Service 310c69
    // The stub entry has a successor, so the stub can be dequeued and ignored
Packit Service 310c69
    // without breaking the queue invariants.
Packit Service 310c69
    oldest = next;
Packit Service 310c69
    queue->oldest = oldest;
Packit Service 310c69
    smp_read_barrier_depends();
Packit Service 310c69
    next = oldest->next;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // We have a non-stub candidate to dequeue. If it lacks a successor, we'll
Packit Service 310c69
  // need to put the stub entry back on the queue first.
Packit Service 310c69
  if (next == NULL) {
Packit Service 310c69
    FunnelQueueEntry *newest = queue->newest;
Packit Service 310c69
    if (oldest != newest) {
Packit Service 310c69
      // Another thread has already swung queue->newest atomically, but not
Packit Service 310c69
      // yet assigned previous->next. The queue is really still empty.
Packit Service 310c69
      return NULL;
Packit Service 310c69
    }
Packit Service 310c69
Packit Service 310c69
    // Put the stub entry back on the queue, ensuring a successor will
Packit Service 310c69
    // eventually be seen.
Packit Service 310c69
    funnelQueuePut(queue, &queue->stub);
Packit Service 310c69
Packit Service 310c69
    // Check again for a successor.
Packit Service 310c69
    next = oldest->next;
Packit Service 310c69
    if (next == NULL) {
Packit Service 310c69
      // We lost a race with a producer who swapped queue->newest before we
Packit Service 310c69
      // did, but who hasn't yet updated previous->next. Try again later.
Packit Service 310c69
      return NULL;
Packit Service 310c69
    }
Packit Service 310c69
  }
Packit Service 310c69
  return oldest;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
FunnelQueueEntry *funnelQueuePoll(FunnelQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  FunnelQueueEntry *oldest = getOldest(queue);
Packit Service 310c69
  if (oldest == NULL) {
Packit Service 310c69
    return oldest;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Dequeue the oldest entry and return it. Only one consumer thread may call
Packit Service 310c69
   * this function, so no locking, atomic operations, or fences are needed;
Packit Service 310c69
   * queue->oldest is owned by the consumer and oldest->next is never used by
Packit Service 310c69
   * a producer thread after it is swung from NULL to non-NULL.
Packit Service 310c69
   */
Packit Service 310c69
  queue->oldest = oldest->next;
Packit Service 310c69
  /*
Packit Service 310c69
   * Make sure the caller sees the proper stored data for this entry.
Packit Service 310c69
   *
Packit Service 310c69
   * Since we've already fetched the entry pointer we stored in
Packit Service 310c69
   * "queue->oldest", this also ensures that on entry to the next call we'll
Packit Service 310c69
   * properly see the dependent data.
Packit Service 310c69
   */
Packit Service 310c69
  smp_rmb();
Packit Service 310c69
  /*
Packit Service 310c69
   * If "oldest" is a very light-weight work item, we'll be looking
Packit Service 310c69
   * for the next one very soon, so prefetch it now.
Packit Service 310c69
   */
Packit Service 310c69
  prefetchAddress(queue->oldest, true);
Packit Service 310c69
  oldest->next = NULL;
Packit Service 310c69
  return oldest;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isFunnelQueueEmpty(FunnelQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  return getOldest(queue) == NULL;
Packit Service 310c69
}
Packit Service 310c69
Packit Service 310c69
/**********************************************************************/
Packit Service 310c69
bool isFunnelQueueIdle(FunnelQueue *queue)
Packit Service 310c69
{
Packit Service 310c69
  /*
Packit Service 310c69
   * Oldest is not the stub, so there's another entry, though if next is
Packit Service 310c69
   * NULL we can't retrieve it yet.
Packit Service 310c69
   */
Packit Service 310c69
  if (queue->oldest != &queue->stub) {
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  /*
Packit Service 310c69
   * Oldest is the stub, but newest has been updated by _put(); either
Packit Service 310c69
   * there's another, retrievable entry in the list, or the list is
Packit Service 310c69
   * officially empty but in the intermediate state of having an entry
Packit Service 310c69
   * added.
Packit Service 310c69
   *
Packit Service 310c69
   * Whether anything is retrievable depends on whether stub.next has
Packit Service 310c69
   * been updated and become visible to us, but for idleness we don't
Packit Service 310c69
   * care. And due to memory ordering in _put(), the update to newest
Packit Service 310c69
   * would be visible to us at the same time or sooner.
Packit Service 310c69
   */
Packit Service 310c69
  if (queue->newest != &queue->stub) {
Packit Service 310c69
    return false;
Packit Service 310c69
  }
Packit Service 310c69
Packit Service 310c69
  // Otherwise, we're idle.
Packit Service 310c69
  return true;
Packit Service 310c69
}