#ifndef _LTTNG_RING_BUFFER_BACKEND_INTERNAL_H #define _LTTNG_RING_BUFFER_BACKEND_INTERNAL_H /* * libringbuffer/backend_internal.h * * Ring buffer backend (internal helpers). * * Copyright (C) 2005-2012 Mathieu Desnoyers * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; only * version 2.1 of the License. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include "backend_types.h" #include "frontend_types.h" #include "shm.h" /* Ring buffer backend API presented to the frontend */ /* Ring buffer and channel backend create/free */ int lib_ring_buffer_backend_create(struct lttng_ust_lib_ring_buffer_backend *bufb, struct channel_backend *chan, int cpu, struct lttng_ust_shm_handle *handle, struct shm_object *shmobj); void channel_backend_unregister_notifiers(struct channel_backend *chanb); void lib_ring_buffer_backend_free(struct lttng_ust_lib_ring_buffer_backend *bufb); int channel_backend_init(struct channel_backend *chanb, const char *name, const struct lttng_ust_lib_ring_buffer_config *config, size_t subbuf_size, size_t num_subbuf, struct lttng_ust_shm_handle *handle, const int *stream_fds); void channel_backend_free(struct channel_backend *chanb, struct lttng_ust_shm_handle *handle); void lib_ring_buffer_backend_reset(struct lttng_ust_lib_ring_buffer_backend *bufb, struct lttng_ust_shm_handle *handle); void channel_backend_reset(struct channel_backend *chanb); int lib_ring_buffer_backend_init(void); void lib_ring_buffer_backend_exit(void); extern void _lib_ring_buffer_write(struct lttng_ust_lib_ring_buffer_backend *bufb, size_t offset, const void *src, size_t len, ssize_t pagecpy); /* * Subbuffer ID bits for overwrite mode. Need to fit within a single word to be * exchanged atomically. * * Top half word, except lowest bit, belongs to "offset", which is used to keep * to count the produced buffers. For overwrite mode, this provides the * consumer with the capacity to read subbuffers in order, handling the * situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit * systems) concurrently with a single execution of get_subbuf (between offset * sampling and subbuffer ID exchange). */ #define HALF_ULONG_BITS (CAA_BITS_PER_LONG >> 1) #define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1) #define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT) #define SB_ID_OFFSET_MASK (~(SB_ID_OFFSET_COUNT - 1)) /* * Lowest bit of top word half belongs to noref. Used only for overwrite mode. */ #define SB_ID_NOREF_SHIFT (SB_ID_OFFSET_SHIFT - 1) #define SB_ID_NOREF_COUNT (1UL << SB_ID_NOREF_SHIFT) #define SB_ID_NOREF_MASK SB_ID_NOREF_COUNT /* * In overwrite mode: lowest half of word is used for index. * Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit. * In producer-consumer mode: whole word used for index. */ #define SB_ID_INDEX_SHIFT 0 #define SB_ID_INDEX_COUNT (1UL << SB_ID_INDEX_SHIFT) #define SB_ID_INDEX_MASK (SB_ID_NOREF_COUNT - 1) /* * Construct the subbuffer id from offset, index and noref. Use only the index * for producer-consumer mode (offset and noref are only used in overwrite * mode). */ static inline unsigned long subbuffer_id(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long offset, unsigned long noref, unsigned long index) { if (config->mode == RING_BUFFER_OVERWRITE) return (offset << SB_ID_OFFSET_SHIFT) | (noref << SB_ID_NOREF_SHIFT) | index; else return index; } /* * Compare offset with the offset contained within id. Return 1 if the offset * bits are identical, else 0. */ static inline int subbuffer_id_compare_offset(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long id, unsigned long offset) { return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT); } static inline unsigned long subbuffer_id_get_index(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long id) { if (config->mode == RING_BUFFER_OVERWRITE) return id & SB_ID_INDEX_MASK; else return id; } static inline unsigned long subbuffer_id_is_noref(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long id) { if (config->mode == RING_BUFFER_OVERWRITE) return !!(id & SB_ID_NOREF_MASK); else return 1; } /* * Only used by reader on subbuffer ID it has exclusive access to. No volatile * needed. */ static inline void subbuffer_id_set_noref(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long *id) { if (config->mode == RING_BUFFER_OVERWRITE) *id |= SB_ID_NOREF_MASK; } static inline void subbuffer_id_set_noref_offset(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long *id, unsigned long offset) { unsigned long tmp; if (config->mode == RING_BUFFER_OVERWRITE) { tmp = *id; tmp &= ~SB_ID_OFFSET_MASK; tmp |= offset << SB_ID_OFFSET_SHIFT; tmp |= SB_ID_NOREF_MASK; /* Volatile store, read concurrently by readers. */ CMM_ACCESS_ONCE(*id) = tmp; } } /* No volatile access, since already used locally */ static inline void subbuffer_id_clear_noref(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long *id) { if (config->mode == RING_BUFFER_OVERWRITE) *id &= ~SB_ID_NOREF_MASK; } /* * For overwrite mode, cap the number of subbuffers per buffer to: * 2^16 on 32-bit architectures * 2^32 on 64-bit architectures * This is required to fit in the index part of the ID. Return 0 on success, * -EPERM on failure. */ static inline int subbuffer_id_check_index(const struct lttng_ust_lib_ring_buffer_config *config, unsigned long num_subbuf) { if (config->mode == RING_BUFFER_OVERWRITE) return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0; else return 0; } static inline void subbuffer_count_record(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, shmp_index(handle, bufb->buf_wsb, idx)->id); v_inc(config, &shmp(handle, shmp_index(handle, bufb->array, sb_bindex)->shmp)->records_commit); } /* * Reader has exclusive subbuffer access for record consumption. No need to * perform the decrement atomically. */ static inline void subbuffer_consume_record(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, struct lttng_ust_shm_handle *handle) { unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); CHAN_WARN_ON(shmp(handle, bufb->chan), !v_read(config, &shmp(handle, shmp_index(handle, bufb->array, sb_bindex)->shmp)->records_unread)); /* Non-atomic decrement protected by exclusive subbuffer access */ _v_dec(config, &shmp(handle, shmp_index(handle, bufb->array, sb_bindex)->shmp)->records_unread); v_inc(config, &bufb->records_read); } static inline unsigned long subbuffer_get_records_count( const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, shmp_index(handle, bufb->buf_wsb, idx)->id); return v_read(config, &shmp(handle, shmp_index(handle, bufb->array, sb_bindex)->shmp)->records_commit); } /* * Must be executed at subbuffer delivery when the writer has _exclusive_ * subbuffer access. See lib_ring_buffer_check_deliver() for details. * lib_ring_buffer_get_records_count() must be called to get the records * count before this function, because it resets the records_commit * count. */ static inline unsigned long subbuffer_count_records_overrun( const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend_pages_shmp *pages; unsigned long overruns, sb_bindex; sb_bindex = subbuffer_id_get_index(config, shmp_index(handle, bufb->buf_wsb, idx)->id); pages = shmp_index(handle, bufb->array, sb_bindex); overruns = v_read(config, &shmp(handle, pages->shmp)->records_unread); v_set(config, &shmp(handle, pages->shmp)->records_unread, v_read(config, &shmp(handle, pages->shmp)->records_commit)); v_set(config, &shmp(handle, pages->shmp)->records_commit, 0); return overruns; } static inline void subbuffer_set_data_size(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, unsigned long data_size, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, shmp_index(handle, bufb->buf_wsb, idx)->id); pages = shmp_index(handle, bufb->array, sb_bindex); shmp(handle, pages->shmp)->data_size = data_size; } static inline unsigned long subbuffer_get_read_data_size( const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); pages = shmp_index(handle, bufb->array, sb_bindex); return shmp(handle, pages->shmp)->data_size; } static inline unsigned long subbuffer_get_data_size( const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend_pages_shmp *pages; unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, shmp_index(handle, bufb->buf_wsb, idx)->id); pages = shmp_index(handle, bufb->array, sb_bindex); return shmp(handle, pages->shmp)->data_size; } static inline void subbuffer_inc_packet_count(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { shmp_index(handle, bufb->buf_cnt, idx)->seq_cnt++; } /** * lib_ring_buffer_clear_noref - Clear the noref subbuffer flag, called by * writer. */ static inline void lib_ring_buffer_clear_noref(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, struct lttng_ust_shm_handle *handle) { unsigned long id, new_id; if (config->mode != RING_BUFFER_OVERWRITE) return; /* * Performing a volatile access to read the sb_pages, because we want to * read a coherent version of the pointer and the associated noref flag. */ id = CMM_ACCESS_ONCE(shmp_index(handle, bufb->buf_wsb, idx)->id); for (;;) { /* This check is called on the fast path for each record. */ if (caa_likely(!subbuffer_id_is_noref(config, id))) { /* * Store after load dependency ordering the writes to * the subbuffer after load and test of the noref flag * matches the memory barrier implied by the cmpxchg() * in update_read_sb_index(). */ return; /* Already writing to this buffer */ } new_id = id; subbuffer_id_clear_noref(config, &new_id); new_id = uatomic_cmpxchg(&shmp_index(handle, bufb->buf_wsb, idx)->id, id, new_id); if (caa_likely(new_id == id)) break; id = new_id; } } /** * lib_ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset, * called by writer. */ static inline void lib_ring_buffer_set_noref_offset(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, unsigned long idx, unsigned long offset, struct lttng_ust_shm_handle *handle) { if (config->mode != RING_BUFFER_OVERWRITE) return; /* * Because ring_buffer_set_noref() is only called by a single thread * (the one which updated the cc_sb value), there are no concurrent * updates to take care of: other writers have not updated cc_sb, so * they cannot set the noref flag, and concurrent readers cannot modify * the pointer because the noref flag is not set yet. * The smp_wmb() in ring_buffer_commit() takes care of ordering writes * to the subbuffer before this set noref operation. * subbuffer_set_noref() uses a volatile store to deal with concurrent * readers of the noref flag. */ CHAN_WARN_ON(shmp(handle, bufb->chan), subbuffer_id_is_noref(config, shmp_index(handle, bufb->buf_wsb, idx)->id)); /* * Memory barrier that ensures counter stores are ordered before set * noref and offset. */ cmm_smp_mb(); subbuffer_id_set_noref_offset(config, &shmp_index(handle, bufb->buf_wsb, idx)->id, offset); } /** * update_read_sb_index - Read-side subbuffer index update. */ static inline int update_read_sb_index(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer_backend *bufb, struct channel_backend *chanb, unsigned long consumed_idx, unsigned long consumed_count, struct lttng_ust_shm_handle *handle) { unsigned long old_id, new_id; if (config->mode == RING_BUFFER_OVERWRITE) { /* * Exchange the target writer subbuffer with our own unused * subbuffer. No need to use CMM_ACCESS_ONCE() here to read the * old_wpage, because the value read will be confirmed by the * following cmpxchg(). */ old_id = shmp_index(handle, bufb->buf_wsb, consumed_idx)->id; if (caa_unlikely(!subbuffer_id_is_noref(config, old_id))) return -EAGAIN; /* * Make sure the offset count we are expecting matches the one * indicated by the writer. */ if (caa_unlikely(!subbuffer_id_compare_offset(config, old_id, consumed_count))) return -EAGAIN; CHAN_WARN_ON(shmp(handle, bufb->chan), !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id, consumed_count); new_id = uatomic_cmpxchg(&shmp_index(handle, bufb->buf_wsb, consumed_idx)->id, old_id, bufb->buf_rsb.id); if (caa_unlikely(old_id != new_id)) return -EAGAIN; bufb->buf_rsb.id = new_id; } else { /* No page exchange, use the writer page directly */ bufb->buf_rsb.id = shmp_index(handle, bufb->buf_wsb, consumed_idx)->id; } return 0; } #ifndef inline_memcpy #define inline_memcpy(dest, src, n) memcpy(dest, src, n) #endif /* * Use the architecture-specific memcpy implementation for constant-sized * inputs, but rely on an inline memcpy for length statically unknown. * The function call to memcpy is just way too expensive for a fast path. */ #define lib_ring_buffer_do_copy(config, dest, src, len) \ do { \ size_t __len = (len); \ if (__builtin_constant_p(len)) \ memcpy(dest, src, __len); \ else \ inline_memcpy(dest, src, __len); \ } while (0) /* * write len bytes to dest with c */ static inline void lib_ring_buffer_do_memset(char *dest, int c, unsigned long len) { unsigned long i; for (i = 0; i < len; i++) dest[i] = c; } /* arch-agnostic implementation */ static inline int lttng_ust_fls(unsigned int x) { int r = 32; if (!x) return 0; if (!(x & 0xFFFF0000U)) { x <<= 16; r -= 16; } if (!(x & 0xFF000000U)) { x <<= 8; r -= 8; } if (!(x & 0xF0000000U)) { x <<= 4; r -= 4; } if (!(x & 0xC0000000U)) { x <<= 2; r -= 2; } if (!(x & 0x80000000U)) { /* No need to bit shift on last operation */ r -= 1; } return r; } static inline int get_count_order(unsigned int count) { int order; order = lttng_ust_fls(count) - 1; if (count & (count - 1)) order++; return order; } #endif /* _LTTNG_RING_BUFFER_BACKEND_INTERNAL_H */