/** * Copyright (C) Mellanox Technologies Ltd. 2001-2014. ALL RIGHTS RESERVED. * * See file LICENSE for terms. */ #ifndef UCS_MPMC_H #define UCS_MPMC_H #include <ucs/type/status.h> #include <ucs/sys/math.h> #define UCS_MPMC_VALID_SHIFT 31 #define UCS_MPMC_VALUE_MAX UCS_BIT(UCS_MPMC_VALID_SHIFT) /** * A Multi-producer-multi-consumer thread-safe queue. * Every push/pull is a single atomic operation in "good" scenario. * The queue can contain small integers up to UCS_MPMC_VALUE_MAX. * * TODO make the queue resizeable. */ typedef struct ucs_mpmc_queue { uint32_t length; /* Array size. Rounded to power of 2. */ int shift; volatile uint32_t producer; /* Producer index */ volatile uint32_t consumer; /* Consumer index */ uint32_t *queue; /* Array of data */ } ucs_mpmc_queue_t; /** * Initialize MPMC queue. * * @param length Queue length. */ ucs_status_t ucs_mpmc_queue_init(ucs_mpmc_queue_t *mpmc, uint32_t length); /** * Destroy MPMC queue. */ void ucs_mpmc_queue_cleanup(ucs_mpmc_queue_t *mpmc); /** * Atomically push a value to the queue. * * @param value Value to push. * @return UCS_ERR_EXCEEDS_LIMIT if the queue is full. */ ucs_status_t ucs_mpmc_queue_push(ucs_mpmc_queue_t *mpmc, uint32_t value); /** * Atomically pull a value from the queue. * * @param value_p Filled with the value, if successful. * @param UCS_ERR_NO_PROGRESS if there is currently no available item to retrieve, * or another thread removed the current item. */ ucs_status_t ucs_mpmc_queue_pull(ucs_mpmc_queue_t *mpmc, uint32_t *value_p); /** * @retrurn nonzero if queue is empty, 0 if queue *may* be non-empty. */ static inline int ucs_mpmc_queue_is_empty(ucs_mpmc_queue_t *mpmc) { return mpmc->producer == mpmc->consumer; } #endif