Blame src/openpa/src/opa_queue.h

Packit 0848f5
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
Packit 0848f5
/*  
Packit 0848f5
 *  (C) 2008 by Argonne National Laboratory.
Packit 0848f5
 *      See COPYRIGHT in top-level directory.
Packit 0848f5
 */
Packit 0848f5
Packit 0848f5
/* Implements a fast, lockfree, multi-producer, single-consumer queue.  It's
Packit 0848f5
 * important to note the *single-consumer* piece of this, since multithreaded
Packit 0848f5
 * consumption will surely lead to data corruption and/or other problems. */
Packit 0848f5
Packit 0848f5
#ifndef OPA_QUEUE_H_INCLUDED
Packit 0848f5
#define OPA_QUEUE_H_INCLUDED
Packit 0848f5
Packit 0848f5
#include "opa_primitives.h"
Packit 0848f5
#ifdef OPA_HAVE_STDDEF_H
Packit 0848f5
#include <stddef.h>
Packit 0848f5
#endif /* OPA_HAVE_STDDEF_H */
Packit 0848f5
Packit 0848f5
/* This value is used to indicate NULL in the OPA_Shm_asymm_base_addr
Packit 0848f5
   variable.  It is non-zero because one of the likely base addresses is zero
Packit 0848f5
   (indicating memory is actually symmetrically mapped).  The value 64 was used
Packit 0848f5
   because it is unlikely that mmap will choose an address this low for a
Packit 0848f5
   mapping.  */
Packit 0848f5
/* XXX DJG TODO put some conditionally compiled error checking in that uses this */
Packit 0848f5
#define OPA_SHM_ASYMM_NULL_VAL    64
Packit 0848f5
Packit 0848f5
extern char *OPA_Shm_asymm_base_addr;
Packit 0848f5
Packit 0848f5
/* Used to initialize the base address for relative pointers.  This interface
Packit 0848f5
   assumes that there is only one shared memory segment.  If this turns out to
Packit 0848f5
   not be the case in the future, we should probably add support for multiple
Packit 0848f5
   shm segments.
Packit 0848f5
   
Packit 0848f5
   This function will return an error if it has already been called. */
Packit 0848f5
int OPA_Shm_asymm_init(char *base);
Packit 0848f5
Packit 0848f5
/* Relative addressing macros.  These are for manipulating addresses relative
Packit 0848f5
   to the start of a shared memory region. */
Packit 0848f5
#define OPA_SHM_REL_NULL (0x0)
Packit 0848f5
#define OPA_SHM_IS_REL_NULL(rel_ptr) (OPA_load_ptr(&(rel_ptr).offset) == OPA_SHM_REL_NULL)
Packit 0848f5
#define OPA_SHM_SET_REL_NULL(rel_ptr) (OPA_store_ptr(&(rel_ptr).offset, OPA_SHM_REL_NULL))
Packit 0848f5
#define OPA_SHM_REL_ARE_EQUAL(rel_ptr1, rel_ptr2) \
Packit 0848f5
    (OPA_load_ptr(&(rel_ptr1).offset) == OPA_load_ptr(&(rel_ptr2).offset))
Packit 0848f5
Packit 0848f5
/* This structure exists such that it is possible to expand the expressiveness
Packit 0848f5
   of a relative address at some point in the future.  It also provides a
Packit 0848f5
   modicum of type safety to help prevent certain flavors of errors.
Packit 0848f5
   
Packit 0848f5
   For example, instead of referencing an offset from a global base address, it
Packit 0848f5
   might make sense for there to be multiple base addresses.  These base
Packit 0848f5
   addresses could correspond to the start of a segment or region of shared
Packit 0848f5
   memory.  This structure could store the segment number that is used to lookup
Packit 0848f5
   a base address in a non-shared table.  Note that you would have to be very
Packit 0848f5
   careful about all of this because if you add the segment number as a separate
Packit 0848f5
   field you can no longer (compare and) swap a relative address atomically.  So
Packit 0848f5
   you'll either have to shave bits from the pointer or make some sort of
Packit 0848f5
   requirement that relative addresses can only be swapped within the same
Packit 0848f5
   segment.  */
Packit 0848f5
typedef struct OPA_Shm_rel_addr_t {
Packit 0848f5
    OPA_ptr_t offset;
Packit 0848f5
} OPA_Shm_rel_addr_t;
Packit 0848f5
Packit 0848f5
/* converts a relative pointer to an absolute pointer */
Packit 0848f5
static _opa_inline
Packit 0848f5
void *OPA_Shm_rel_to_abs(OPA_Shm_rel_addr_t r)
Packit 0848f5
{
Packit 0848f5
    void *offset = OPA_load_ptr(&r.offset);
Packit 0848f5
    OPA_assert((size_t)OPA_Shm_asymm_base_addr != OPA_SHM_ASYMM_NULL_VAL);
Packit 0848f5
    return (void*)(OPA_Shm_asymm_base_addr + (size_t)offset);
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* converts an absolute pointer to a relative pointer */
Packit 0848f5
static _opa_inline
Packit 0848f5
OPA_Shm_rel_addr_t OPA_Shm_abs_to_rel(void *a)
Packit 0848f5
{
Packit 0848f5
    OPA_Shm_rel_addr_t ret;
Packit 0848f5
Packit 0848f5
    OPA_assert((size_t)OPA_Shm_asymm_base_addr != OPA_SHM_ASYMM_NULL_VAL);
Packit 0848f5
    OPA_store_ptr(&ret.offset, (void*)((size_t)a - (size_t)OPA_Shm_asymm_base_addr));
Packit 0848f5
    return ret;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
static _opa_inline
Packit 0848f5
OPA_Shm_rel_addr_t OPA_Shm_swap_rel(OPA_Shm_rel_addr_t *addr, OPA_Shm_rel_addr_t newv) {
Packit 0848f5
    OPA_Shm_rel_addr_t oldv;
Packit 0848f5
    OPA_store_ptr(&oldv.offset, OPA_swap_ptr(&addr->offset, OPA_load_ptr(&newv.offset)));
Packit 0848f5
    return oldv;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* Compare the relative pointer to (relative) null and swap if equal.  Prevents
Packit 0848f5
   the guts of the _rel_addr_t abstraction from bleeding up into the
Packit 0848f5
   enqueue/dequeue operations. */
Packit 0848f5
static _opa_inline
Packit 0848f5
OPA_Shm_rel_addr_t OPA_Shm_cas_rel_null(OPA_Shm_rel_addr_t *addr, OPA_Shm_rel_addr_t oldv) {
Packit 0848f5
    OPA_Shm_rel_addr_t prev;
Packit 0848f5
    OPA_store_ptr(&prev.offset, OPA_cas_ptr(&(addr->offset), OPA_load_ptr(&oldv.offset), (void*)OPA_SHM_REL_NULL));
Packit 0848f5
    return prev;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* The shadow head pointer makes this queue implementation perform much better
Packit 0848f5
   than a standard queue.  Unfortunately, it also makes it a bit non-intuitive
Packit 0848f5
   when read the code.  The following is an excerpt from "Design and Evaluation
Packit 0848f5
   of Nemesis,  a Scalable, Low-Latency, Message-Passing Communication
Packit 0848f5
   Subsystem" by D. Buntinas, G.  Mercier, and W. Gropp that gives an
Packit 0848f5
   explanation:
Packit 0848f5
Packit 0848f5
      A process must access both the head and tail of the queue when it is
Packit 0848f5
      enqueuing an element on an empty queue or when it is dequeuing an element
Packit 0848f5
      that is the last element in the queue. In these cases, if the head and
Packit 0848f5
      tail were in the same cache line, only one L2 cache miss would be
Packit 0848f5
      encountered. If the queue has more elements in it, however, then the
Packit 0848f5
      enqueuer only needs to access the tail, and the dequeuer only needs to
Packit 0848f5
      access the head. If the head and tail were in the same cache line, then
Packit 0848f5
      there would be L2 misses encountered as a result of false sharing each
Packit 0848f5
      time a process enqueues an element after another has been dequeued from
Packit 0848f5
      the same queue, and vice versa. In this case it would be better if the
Packit 0848f5
      head and tail were in separate cache lines.
Packit 0848f5
Packit 0848f5
      Our solution is to put the head and tail in the same cache line and have a
Packit 0848f5
      shadow head pointer in a separate cache line. The shadow head is
Packit 0848f5
      initialized to NULL. The dequeuer uses the shadow head in place of the
Packit 0848f5
      real head except when the shadow head is NULL, meaning that the queue has
Packit 0848f5
      become empty. If the shadow head is NULL when the dequeuer tries to
Packit 0848f5
      dequeue, it checks the value of the real head. If the real head is not
Packit 0848f5
      NULL, meaning that an element has been enqueued on the queue since the
Packit 0848f5
      last time the queue became empty, the dequeuer initializes its shadow head
Packit 0848f5
      to the value of the real head and sets the real head to NULL. In this way,
Packit 0848f5
      only one L2 cache miss is encountered when enqueuing onto an empty queue
Packit 0848f5
      or dequeuing from a queue with one element. And because the tail and
Packit 0848f5
      shadow head are in separate cache lines, there are no L2 cache misses from
Packit 0848f5
      false sharing. 
Packit 0848f5
Packit 0848f5
      We found that using a shadow head pointer reduced one-way latency by about
Packit 0848f5
      200 ns on a dual 2 GHz Xeon node.
Packit 0848f5
*/
Packit 0848f5
Packit 0848f5
/* Pick an arbitrary cacheline size for now, we can setup a mechanism to detect
Packit 0848f5
   it at build time later on.  This should work well on most intel systems at
Packit 0848f5
   the very least. */
Packit 0848f5
#define OPA_QUEUE_CACHELINE_PADDING 128
Packit 0848f5
Packit 0848f5
/* All absolute and relative pointers point to the start of the enclosing element. */
Packit 0848f5
typedef struct OPA_Queue_info_t {
Packit 0848f5
    OPA_Shm_rel_addr_t head; /* holds the offset pointer, not the abs ptr */
Packit 0848f5
    OPA_Shm_rel_addr_t tail; /* holds the offset pointer, not the abs ptr */
Packit 0848f5
    char padding1[OPA_QUEUE_CACHELINE_PADDING-2*sizeof(OPA_Shm_rel_addr_t)];
Packit 0848f5
    OPA_Shm_rel_addr_t  shadow_head; /* holds the offset pointer, not the abs ptr */
Packit 0848f5
    char padding2[OPA_QUEUE_CACHELINE_PADDING-sizeof(OPA_Shm_rel_addr_t)];
Packit 0848f5
} OPA_Queue_info_t;
Packit 0848f5
Packit 0848f5
/* Using this header struct even though it's just one element gives us the
Packit 0848f5
   opportunity to vary the implementation more easily in the future without
Packit 0848f5
   updating all callers. */
Packit 0848f5
typedef struct OPA_Queue_element_hdr_t {
Packit 0848f5
    OPA_Shm_rel_addr_t next; /* holds the offset pointer, not the abs ptr */
Packit 0848f5
} OPA_Queue_element_hdr_t;
Packit 0848f5
Packit 0848f5
Packit 0848f5
/* Used to initialize a queue structure. */
Packit 0848f5
void OPA_Queue_init(OPA_Queue_info_t *qhead);
Packit 0848f5
Packit 0848f5
/* Used to initialize a queue element header. */
Packit 0848f5
void OPA_Queue_header_init(OPA_Queue_element_hdr_t *hdr);
Packit 0848f5
Packit 0848f5
static _opa_inline
Packit 0848f5
int OPA_Queue_is_empty(OPA_Queue_info_t *qhead)
Packit 0848f5
{
Packit 0848f5
    int __ret = 0;
Packit 0848f5
    if (OPA_SHM_IS_REL_NULL (qhead->shadow_head)) {
Packit 0848f5
        if (OPA_SHM_IS_REL_NULL(qhead->head)) {
Packit 0848f5
            __ret = 1;
Packit 0848f5
        }
Packit 0848f5
        else {
Packit 0848f5
            qhead->shadow_head = qhead->head;
Packit 0848f5
            OPA_SHM_SET_REL_NULL(qhead->head); /* reset it for next time */
Packit 0848f5
        }
Packit 0848f5
    }
Packit 0848f5
    return __ret;
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* Returns a pointer to the element at the head of the queue.  The current
Packit 0848f5
   implementation of these queue algorithms imposes several notable
Packit 0848f5
   restrictions on the use of this function:
Packit 0848f5
    - The caller must currently hold the critical section, just as if you were
Packit 0848f5
      calling OPA_Queue_dequeue.  Failure to do could easily produce incorrect
Packit 0848f5
      results.
Packit 0848f5
    - OPA_Queue_is_empty must be called on this queue prior to calling this
Packit 0848f5
      function.  Furthermore, there must be no intervening calls to any
Packit 0848f5
      OPA_Queue_* functions (for this queue) between _is_empty and _peek_head.
Packit 0848f5
      Failure to do so will produce incorrect results.
Packit 0848f5
Packit 0848f5
   This operation is effectively the same as the dequeue operation (insofar as
Packit 0848f5
   it shares the same calling restrictions) but it does not disturb the actual
Packit 0848f5
   contents of the queue. */
Packit 0848f5
static _opa_inline
Packit 0848f5
void *OPA_Queue_peek_head(OPA_Queue_info_t *qhead_ptr)
Packit 0848f5
{
Packit 0848f5
    OPA_assert(qhead_ptr != NULL);
Packit 0848f5
Packit 0848f5
    if (OPA_SHM_IS_REL_NULL(qhead_ptr->shadow_head)) {
Packit 0848f5
        return NULL;
Packit 0848f5
    }
Packit 0848f5
    return OPA_Shm_rel_to_abs(qhead_ptr->shadow_head);
Packit 0848f5
}
Packit 0848f5
Packit 0848f5
/* This macro atomically enqueues an element (elt for short) into the queue
Packit 0848f5
   indicated by qhead_ptr.  You need to pass several arguments:
Packit 0848f5
     qhead_ptr - a pointer to a OPA_Queue_info_t structure to which the
Packit 0848f5
                 element should be enqueued
Packit 0848f5
     elt_ptr - a pointer to an element structure type that is to be enqueued
Packit 0848f5
     elt_type - The base type of elt_ptr.  That is, if elt_ptr is a
Packit 0848f5
                '(struct example_t *)' then elt_type is 'struct example_t'.
Packit 0848f5
     elt_hdr_field - This is the member name of elt_type that is a
Packit 0848f5
                     OPA_Queue_element_hdr_t.
Packit 0848f5
Packit 0848f5
    This queue implementation is loosely modeled after the linked lists used in
Packit 0848f5
    the linux kernel.  You put the list header structure inside of the client
Packit 0848f5
    structure, rather than the other way around.
Packit 0848f5
   */
Packit 0848f5
#define OPA_Queue_enqueue(qhead_ptr, elt_ptr, elt_type, elt_hdr_field)    \
Packit 0848f5
do {                                                                      \
Packit 0848f5
    OPA_Shm_rel_addr_t r_prev;                                            \
Packit 0848f5
    OPA_Shm_rel_addr_t r_elt = OPA_Shm_abs_to_rel(elt_ptr);               \
Packit 0848f5
                                                                          \
Packit 0848f5
    OPA_SHM_SET_REL_NULL((elt_ptr)->elt_hdr_field.next);                  \
Packit 0848f5
                                                                          \
Packit 0848f5
    OPA_write_barrier();                                                  \
Packit 0848f5
                                                                          \
Packit 0848f5
    r_prev = OPA_Shm_swap_rel(&((qhead_ptr)->tail), r_elt);               \
Packit 0848f5
                                                                          \
Packit 0848f5
    if (OPA_SHM_IS_REL_NULL(r_prev)) {                                    \
Packit 0848f5
        (qhead_ptr)->head = r_elt;                                        \
Packit 0848f5
    }                                                                     \
Packit 0848f5
    else {                                                                \
Packit 0848f5
        elt_type *abs_prev = (elt_type *)OPA_Shm_rel_to_abs(r_prev);      \
Packit 0848f5
        abs_prev->elt_hdr_field.next = r_elt;                             \
Packit 0848f5
    }                                                                     \
Packit 0848f5
} while (0)
Packit 0848f5
Packit 0848f5
/* This macro atomically dequeues an element (elt for short) from the queue
Packit 0848f5
   indicated by qhead_ptr.  You need to pass several arguments:
Packit 0848f5
     qhead_ptr - a pointer to a OPA_Queue_info_t structure to which the
Packit 0848f5
                 element should be dequeued
Packit 0848f5
     elt_ptr - A pointer to an element structure type that should be populated
Packit 0848f5
               with the dequeued value.  Must be an lvalue.
Packit 0848f5
     elt_type - The base type of elt_ptr.  That is, if elt_ptr is a
Packit 0848f5
                '(struct example_t *)' then elt_type is 'struct example_t'.
Packit 0848f5
     elt_hdr_field - This is the member name of elt_type that is a
Packit 0848f5
                     OPA_Queue_element_hdr_t.
Packit 0848f5
Packit 0848f5
    This queue implementation is loosely modeled after the linked lists used in
Packit 0848f5
    the linux kernel.  You put the list header structure inside of the client
Packit 0848f5
    structure, rather than the other way around.
Packit 0848f5
Packit 0848f5
    NOTE: you must *always* call _is_empty() prior to this function */
Packit 0848f5
#define OPA_Queue_dequeue(qhead_ptr, elt_ptr, elt_type, elt_hdr_field)        \
Packit 0848f5
do {                                                                          \
Packit 0848f5
    elt_type *_e;                                                             \
Packit 0848f5
    OPA_Shm_rel_addr_t _r_e;                                                  \
Packit 0848f5
                                                                              \
Packit 0848f5
    _r_e = (qhead_ptr)->shadow_head;                                          \
Packit 0848f5
    _e = OPA_Shm_rel_to_abs(_r_e);                                            \
Packit 0848f5
                                                                              \
Packit 0848f5
    if (!OPA_SHM_IS_REL_NULL(_e->elt_hdr_field.next)) {                       \
Packit 0848f5
        (qhead_ptr)->shadow_head = _e->elt_hdr_field.next;                    \
Packit 0848f5
    }                                                                         \
Packit 0848f5
    else {                                                                    \
Packit 0848f5
        OPA_Shm_rel_addr_t old_tail;                                          \
Packit 0848f5
                                                                              \
Packit 0848f5
        OPA_SHM_SET_REL_NULL((qhead_ptr)->shadow_head);                       \
Packit 0848f5
                                                                              \
Packit 0848f5
        old_tail = OPA_Shm_cas_rel_null(&((qhead_ptr)->tail), _r_e);          \
Packit 0848f5
                                                                              \
Packit 0848f5
        if (!OPA_SHM_REL_ARE_EQUAL(old_tail, _r_e)) {                         \
Packit 0848f5
            while (OPA_SHM_IS_REL_NULL(_e->elt_hdr_field.next)) {             \
Packit 0848f5
                OPA_busy_wait();                                              \
Packit 0848f5
            }                                                                 \
Packit 0848f5
            (qhead_ptr)->shadow_head = _e->elt_hdr_field.next;                \
Packit 0848f5
        }                                                                     \
Packit 0848f5
    }                                                                         \
Packit 0848f5
    OPA_SHM_SET_REL_NULL(_e->elt_hdr_field.next);                             \
Packit 0848f5
    OPA_read_barrier();                                                       \
Packit 0848f5
    elt_ptr = _e;                                                             \
Packit 0848f5
} while (0)
Packit 0848f5
Packit 0848f5
#endif /* OPA_QUEUE_H_INCLUDED */