Blob Blame History Raw
/*
 * Copyright (C) 2010-2011 Red Hat, Inc.
 *
 * Author: Angus Salkeld <asalkeld@redhat.com>
 *
 * This file is part of libqb.
 *
 * libqb is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 2.1 of the License, or
 * (at your option) any later version.
 *
 * libqb is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
 */
#include "ringbuffer_int.h"
#include <qb/qbdefs.h>
#include "atomic_int.h"

#define QB_RB_FILE_HEADER_VERSION 1

/*
 * #define CRAZY_DEBUG_PRINTFS 1
 */
#ifdef CRAZY_DEBUG_PRINTFS
#define DEBUG_PRINTF(format, args...)	\
do {				\
	printf(format, ##args);	\
} while(0)
#else
#define DEBUG_PRINTF(format, args...)
#endif /* CRAZY_DEBUG_PRINTFS */

/*
 * move the write pointer to the next 128 byte boundary
 * write_pt goes in 4 bytes (sizeof(uint32_t))
 * #define USE_CACHE_LINE_ALIGNMENT 1
 */
#ifdef USE_CACHE_LINE_ALIGNMENT
#define QB_CACHE_LINE_SIZE 128
#define QB_CACHE_LINE_WORDS (QB_CACHE_LINE_SIZE/sizeof(uint32_t))
#define idx_cache_line_step(idx)	\
do {					\
	if (idx % QB_CACHE_LINE_WORDS) {			\
		idx += (QB_CACHE_LINE_WORDS - (idx % QB_CACHE_LINE_WORDS));	\
	}				\
	if (idx > (rb->shared_hdr->word_size - 1)) {		\
		idx = ((idx) % (rb->shared_hdr->word_size));	\
	}						\
} while (0)
#else
#define QB_CACHE_LINE_SIZE 0
#define QB_CACHE_LINE_WORDS 0
#define idx_cache_line_step(idx)			\
do {							\
	if (idx > (rb->shared_hdr->word_size - 1)) {		\
		idx = ((idx) % (rb->shared_hdr->word_size));	\
	}						\
} while (0)
#endif


/* the chunk header is two words
 * 1) the chunk data size
 * 2) the magic number
 */
#define QB_RB_CHUNK_HEADER_WORDS 2
#define QB_RB_CHUNK_HEADER_SIZE (sizeof(uint32_t) * QB_RB_CHUNK_HEADER_WORDS)
/*
 * margin is the gap we leave when checking to see if we have enough
 * space for a new chunk.
 * So:
 * qb_rb_space_free() >= QB_RB_CHUNK_MARGIN + new data chunk
 * The extra word size is to allow for non word sized data chunks.
 * QB_CACHE_LINE_WORDS is to make sure we have space to align the
 * chunk.
 */
#define QB_RB_WORD_ALIGN 1
#define QB_RB_CHUNK_MARGIN (sizeof(uint32_t) * (QB_RB_CHUNK_HEADER_WORDS +\
						QB_RB_WORD_ALIGN +\
						QB_CACHE_LINE_WORDS))
#define QB_RB_CHUNK_MAGIC		0xA1A1A1A1
#define QB_RB_CHUNK_MAGIC_DEAD		0xD0D0D0D0
#define QB_RB_CHUNK_MAGIC_ALLOC		0xA110CED0
#define QB_RB_CHUNK_SIZE_GET(rb, pointer) rb->shared_data[pointer]
#define QB_RB_CHUNK_MAGIC_GET(rb, pointer) \
	qb_atomic_int_get_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
                             QB_ATOMIC_ACQUIRE)
#define QB_RB_CHUNK_MAGIC_SET(rb, pointer, new_val) \
	qb_atomic_int_set_ex((int32_t*)&rb->shared_data[(pointer + 1) % rb->shared_hdr->word_size], \
			     new_val, QB_ATOMIC_RELEASE)
#define QB_RB_CHUNK_DATA_GET(rb, pointer) \
	&rb->shared_data[(pointer + QB_RB_CHUNK_HEADER_WORDS) % rb->shared_hdr->word_size]

#define QB_MAGIC_ASSERT(_ptr_) \
do {							\
	uint32_t chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, _ptr_); \
	if (chunk_magic != QB_RB_CHUNK_MAGIC) print_header(rb); \
	assert(chunk_magic == QB_RB_CHUNK_MAGIC); \
} while (0)

#define idx_step(idx)					\
do {							\
	if (idx > (rb->shared_hdr->word_size - 1)) {		\
		idx = ((idx) % (rb->shared_hdr->word_size));	\
	}						\
} while (0)

static void print_header(struct qb_ringbuffer_s * rb);
static int _rb_chunk_reclaim(struct qb_ringbuffer_s * rb);

qb_ringbuffer_t *
qb_rb_open(const char *name, size_t size, uint32_t flags,
	   size_t shared_user_data_size)
{
	return qb_rb_open_2(name, size, flags, shared_user_data_size, NULL);
}

qb_ringbuffer_t *
qb_rb_open_2(const char *name, size_t size, uint32_t flags,
	     size_t shared_user_data_size,
	     struct qb_rb_notifier *notifiers)
{
	struct qb_ringbuffer_s *rb;
	size_t real_size;
	size_t shared_size;
	char path[PATH_MAX];
	int32_t fd_hdr;
	int32_t fd_data;
	uint32_t file_flags = O_RDWR;
	char filename[PATH_MAX];
	int32_t error = 0;
	void *shm_addr;
	long page_size = sysconf(_SC_PAGESIZE);

#ifdef QB_ARCH_HPPA
	page_size = QB_MAX(page_size, 0x00400000); /* align to page colour */
#elif defined(QB_FORCE_SHM_ALIGN)
	page_size = QB_MAX(page_size, 16 * 1024);
#endif /* QB_FORCE_SHM_ALIGN */
	/* The user of this api expects the 'size' parameter passed into this function
	 * to be reflective of the max size single write we can do to the 
	 * ringbuffer.  This means we have to add both the 'margin' space used
	 * to calculate if there is enough space for a new chunk as well as the '+1' that
	 * prevents overlap of the read/write pointers */
	size += QB_RB_CHUNK_MARGIN + 1;
	real_size = QB_ROUNDUP(size, page_size);

	shared_size =
	    sizeof(struct qb_ringbuffer_shared_s) + shared_user_data_size;

	if (flags & QB_RB_FLAG_CREATE) {
		file_flags |= O_CREAT | O_TRUNC | O_EXCL;
	}

	rb = calloc(1, sizeof(struct qb_ringbuffer_s));
	if (rb == NULL) {
		return NULL;
	}

	/*
	 * Create a shared_hdr memory segment for the header.
	 */
	snprintf(filename, PATH_MAX, "%s-header", name);
	fd_hdr = qb_sys_mmap_file_open(path, filename,
				       shared_size, file_flags);
	if (fd_hdr < 0) {
		error = fd_hdr;
		qb_util_log(LOG_ERR, "couldn't create file for mmap");
		goto cleanup_hdr;
	}

	rb->shared_hdr = mmap(0,
			      shared_size,
			      PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);

	if (rb->shared_hdr == MAP_FAILED) {
		error = -errno;
		qb_util_log(LOG_ERR, "couldn't create mmap for header");
		goto cleanup_hdr;
	}
	qb_atomic_init();

	rb->flags = flags;

	/*
	 * create the semaphore
	 */
	if (flags & QB_RB_FLAG_CREATE) {
		rb->shared_data = NULL;
		/* rb->shared_hdr->word_size tracks data by ints and not bytes/chars. */
		rb->shared_hdr->word_size = real_size / sizeof(uint32_t);
		rb->shared_hdr->write_pt = 0;
		rb->shared_hdr->read_pt = 0;
		(void)strlcpy(rb->shared_hdr->hdr_path, path, PATH_MAX);
	}
	if (notifiers && notifiers->post_fn) {
		error = 0;
		memcpy(&rb->notifier,
		       notifiers,
		       sizeof(struct qb_rb_notifier));
	} else {
		error = qb_rb_sem_create(rb, flags);
	}
	if (error < 0) {
		errno = -error;
		qb_util_perror(LOG_ERR, "couldn't create a semaphore");
		goto cleanup_hdr;
	}

	/* Create the shared_data memory segment for the actual ringbuffer.
	 * They have to be separate.
	 */
	if (flags & QB_RB_FLAG_CREATE) {
		snprintf(filename, PATH_MAX, "%s-data", name);
		fd_data = qb_sys_mmap_file_open(path,
						filename,
						real_size, file_flags);
		(void)strlcpy(rb->shared_hdr->data_path, path, PATH_MAX);
	} else {
		fd_data = qb_sys_mmap_file_open(path,
						rb->shared_hdr->data_path,
						real_size, file_flags);
	}
	if (fd_data < 0) {
		error = fd_data;
		qb_util_log(LOG_ERR, "couldn't create file for mmap");
		goto cleanup_hdr;
	}

	qb_util_log(LOG_DEBUG,
		    "shm size:%ld; real_size:%ld; rb->word_size:%d", size,
		    real_size, rb->shared_hdr->word_size);

	/* this function closes fd_data */
	error = qb_sys_circular_mmap(fd_data, &shm_addr, real_size);
	rb->shared_data = shm_addr;
	if (error != 0) {
		qb_util_log(LOG_ERR, "couldn't create circular mmap on %s",
			    rb->shared_hdr->data_path);
		goto cleanup_data;
	}

	if (flags & QB_RB_FLAG_CREATE) {
		memset(rb->shared_data, 0, real_size);
		rb->shared_data[rb->shared_hdr->word_size] = 5;
		rb->shared_hdr->ref_count = 1;
	} else {
		qb_atomic_int_inc(&rb->shared_hdr->ref_count);
	}

	close(fd_hdr);
	return rb;

cleanup_data:
	if (flags & QB_RB_FLAG_CREATE) {
		unlink(rb->shared_hdr->data_path);
	}

cleanup_hdr:
	if (fd_hdr >= 0) {
		close(fd_hdr);
	}
	if (rb && (rb->shared_hdr != MAP_FAILED) && (flags & QB_RB_FLAG_CREATE)) {
		unlink(rb->shared_hdr->hdr_path);
		if (rb->notifier.destroy_fn) {
			(void)rb->notifier.destroy_fn(rb->notifier.instance);
		}
	}
	if (rb && (rb->shared_hdr != MAP_FAILED && rb->shared_hdr != NULL)) {
		munmap(rb->shared_hdr, sizeof(struct qb_ringbuffer_shared_s));
	}
	free(rb);
	errno = -error;
	return NULL;
}


void
qb_rb_close(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return;
	}
	qb_enter();

	(void)qb_atomic_int_dec_and_test(&rb->shared_hdr->ref_count);
	(void)qb_rb_close_helper(rb, rb->flags & QB_RB_FLAG_CREATE, QB_FALSE);
}

void
qb_rb_force_close(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return;
	}
	qb_enter();

	qb_atomic_int_set(&rb->shared_hdr->ref_count, -1);
	(void)qb_rb_close_helper(rb, QB_TRUE, QB_TRUE);
}

char *
qb_rb_name_get(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return NULL;
	}
	return rb->shared_hdr->hdr_path;
}

void *
qb_rb_shared_user_data_get(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return NULL;
	}
	return rb->shared_hdr->user_data;
}

int32_t
qb_rb_refcount_get(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return -EINVAL;
	}
	return qb_atomic_int_get(&rb->shared_hdr->ref_count);
}

ssize_t
qb_rb_space_free(struct qb_ringbuffer_s * rb)
{
	uint32_t write_size;
	uint32_t read_size;
	size_t space_free = 0;

	if (rb == NULL) {
		return -EINVAL;
	}
	if (rb->notifier.space_used_fn) {
		return (rb->shared_hdr->word_size * sizeof(uint32_t)) -
			rb->notifier.space_used_fn(rb->notifier.instance);
	}
	write_size = rb->shared_hdr->write_pt;
	read_size = rb->shared_hdr->read_pt;

	if (write_size > read_size) {
		space_free =
		    (read_size - write_size + rb->shared_hdr->word_size) - 1;
	} else if (write_size < read_size) {
		space_free = (read_size - write_size) - 1;
	} else {
		if (rb->notifier.q_len_fn && rb->notifier.q_len_fn(rb->notifier.instance) > 0) {
			space_free = 0;
		} else {
			space_free = rb->shared_hdr->word_size;
		}
	}

	/* word -> bytes */
	return (space_free * sizeof(uint32_t));
}

ssize_t
qb_rb_space_used(struct qb_ringbuffer_s * rb)
{
	uint32_t write_size;
	uint32_t read_size;
	size_t space_used;

	if (rb == NULL) {
		return -EINVAL;
	}
	if (rb->notifier.space_used_fn) {
		return rb->notifier.space_used_fn(rb->notifier.instance);
	}
	write_size = rb->shared_hdr->write_pt;
	read_size = rb->shared_hdr->read_pt;

	if (write_size > read_size) {
		space_used = write_size - read_size;
	} else if (write_size < read_size) {
		space_used =
		    (write_size - read_size + rb->shared_hdr->word_size) - 1;
	} else {
		space_used = 0;
	}
	/* word -> bytes */
	return (space_used * sizeof(uint32_t));
}

ssize_t
qb_rb_chunks_used(struct qb_ringbuffer_s *rb)
{
	if (rb == NULL) {
		return -EINVAL;
	}
	if (rb->notifier.q_len_fn) {
		return rb->notifier.q_len_fn(rb->notifier.instance);
	}
	return -ENOTSUP;
}

void *
qb_rb_chunk_alloc(struct qb_ringbuffer_s * rb, size_t len)
{
	uint32_t write_pt;

	if (rb == NULL) {
		errno = EINVAL;
		return NULL;
	}
	/*
	 * Reclaim data if we are over writing and we need space
	 */
	if (rb->flags & QB_RB_FLAG_OVERWRITE) {
		while (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
			int rc = _rb_chunk_reclaim(rb);
			if (rc != 0) {
				errno = rc;
				return NULL;
			}
		}
	} else {
		if (qb_rb_space_free(rb) < (len + QB_RB_CHUNK_MARGIN)) {
			errno = EAGAIN;
			return NULL;
		}
	}

	write_pt = rb->shared_hdr->write_pt;
	/*
	 * insert the chunk header
	 */
	rb->shared_data[write_pt] = 0;
	QB_RB_CHUNK_MAGIC_SET(rb, write_pt, QB_RB_CHUNK_MAGIC_ALLOC);

	/*
	 * return a pointer to the beginning of the chunk data
	 */
	return (void *)QB_RB_CHUNK_DATA_GET(rb, write_pt);

}

static uint32_t
qb_rb_chunk_step(struct qb_ringbuffer_s * rb, uint32_t pointer)
{
	uint32_t chunk_size = QB_RB_CHUNK_SIZE_GET(rb, pointer);
	/*
	 * skip over the chunk header
	 */
	pointer += QB_RB_CHUNK_HEADER_WORDS;

	/*
	 * skip over the user's data.
	 */
	pointer += (chunk_size / sizeof(uint32_t));
	/* make allowance for non-word sizes */
	if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) {
		pointer++;
	}

	idx_cache_line_step(pointer);
	return pointer;
}

int32_t
qb_rb_chunk_commit(struct qb_ringbuffer_s * rb, size_t len)
{
	uint32_t old_write_pt;

	if (rb == NULL) {
		return -EINVAL;
	}
	/*
	 * commit the magic & chunk_size
	 */
	old_write_pt = rb->shared_hdr->write_pt;
	rb->shared_data[old_write_pt] = len;

	/*
	 * commit the new write pointer
	 */
	rb->shared_hdr->write_pt = qb_rb_chunk_step(rb, old_write_pt);
	QB_RB_CHUNK_MAGIC_SET(rb, old_write_pt, QB_RB_CHUNK_MAGIC);

	DEBUG_PRINTF("commit [%zd] read: %u, write: %u -> %u (%u)\n",
		     (rb->notifier.q_len_fn ?
		      rb->notifier.q_len_fn(rb->notifier.instance) : 0),
		     rb->shared_hdr->read_pt,
		     old_write_pt,
		     rb->shared_hdr->write_pt,
		     rb->shared_hdr->word_size);

	/*
	 * post the notification to the reader
	 */
	if (rb->notifier.post_fn) {
		return rb->notifier.post_fn(rb->notifier.instance, len);
	}
	return 0;
}

ssize_t
qb_rb_chunk_write(struct qb_ringbuffer_s * rb, const void *data, size_t len)
{
	char *dest = qb_rb_chunk_alloc(rb, len);
	int32_t res = 0;

	if (rb == NULL) {
		return -EINVAL;
	}

	if (dest == NULL) {
		return -errno;
	}

	memcpy(dest, data, len);

	res = qb_rb_chunk_commit(rb, len);
	if (res < 0) {
		return res;
	}

	return len;
}

static int
_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
{
	uint32_t old_read_pt;
	uint32_t new_read_pt;
	uint32_t old_chunk_size;
	uint32_t chunk_magic;
	int rc = 0;

	old_read_pt = rb->shared_hdr->read_pt;
	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, old_read_pt);
	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
		return -EINVAL;
	}

	old_chunk_size = QB_RB_CHUNK_SIZE_GET(rb, old_read_pt);
	new_read_pt = qb_rb_chunk_step(rb, old_read_pt);

	/*
	 * clear the header
	 */
	rb->shared_data[old_read_pt] = 0;
	QB_RB_CHUNK_MAGIC_SET(rb, old_read_pt, QB_RB_CHUNK_MAGIC_DEAD);

	/*
	 * set the new read pointer after clearing the header
	 * to prevent a situation where a fast writer will write their
	 * new chunk between setting the new read pointer and clearing the
	 * header.
	 */
	rb->shared_hdr->read_pt = new_read_pt;

	if (rb->notifier.reclaim_fn) {
		rc = rb->notifier.reclaim_fn(rb->notifier.instance,
						 old_chunk_size);
		if (rc < 0) {
			errno = -rc;
			qb_util_perror(LOG_WARNING, "reclaim_fn");
		}
	}

	DEBUG_PRINTF("reclaim [%zd]: read: %u -> %u, write: %u\n",
		     (rb->notifier.q_len_fn ?
		      rb->notifier.q_len_fn(rb->notifier.instance) : 0),
		     old_read_pt,
		     rb->shared_hdr->read_pt,
		     rb->shared_hdr->write_pt);

	return rc;
}

void
qb_rb_chunk_reclaim(struct qb_ringbuffer_s * rb)
{
	if (rb == NULL) {
		return;
	}
	_rb_chunk_reclaim(rb);
}

ssize_t
qb_rb_chunk_peek(struct qb_ringbuffer_s * rb, void **data_out, int32_t timeout)
{
	uint32_t read_pt;
	uint32_t chunk_size;
	uint32_t chunk_magic;
	int32_t res = 0;

	if (rb == NULL) {
		return -EINVAL;
	}
	if (rb->notifier.timedwait_fn) {
		res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
	}
	if (res < 0 && res != -EIDRM) {
		if (res == -ETIMEDOUT) {
			return 0;
		} else {
			errno = -res;
			qb_util_perror(LOG_ERR, "sem_timedwait");
		}
		return res;
	}
	read_pt = rb->shared_hdr->read_pt;
	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);
	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
		if (rb->notifier.post_fn) {
			(void)rb->notifier.post_fn(rb->notifier.instance, res);
		}
#ifdef EBADMSG
		return -EBADMSG;
#else
		return -EINVAL;
#endif
	}
	chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
	*data_out = QB_RB_CHUNK_DATA_GET(rb, read_pt);
	return chunk_size;
}

ssize_t
qb_rb_chunk_read(struct qb_ringbuffer_s * rb, void *data_out, size_t len,
		 int32_t timeout)
{
	uint32_t read_pt;
	uint32_t chunk_size;
	uint32_t chunk_magic;
	int32_t res = 0;

	if (rb == NULL) {
		return -EINVAL;
	}
	if (rb->notifier.timedwait_fn) {
		res = rb->notifier.timedwait_fn(rb->notifier.instance, timeout);
	}
	if (res < 0 && res != -EIDRM) {
		if (res != -ETIMEDOUT) {
			errno = -res;
			qb_util_perror(LOG_ERR, "sem_timedwait");
		}
		return res;
	}

	read_pt = rb->shared_hdr->read_pt;
	chunk_magic = QB_RB_CHUNK_MAGIC_GET(rb, read_pt);

	if (chunk_magic != QB_RB_CHUNK_MAGIC) {
		if (rb->notifier.timedwait_fn == NULL) {
			return -ETIMEDOUT;
		} else {
			(void)rb->notifier.post_fn(rb->notifier.instance, res);
#ifdef EBADMSG
			return -EBADMSG;
#else
			return -EINVAL;
#endif
		}
	}

	chunk_size = QB_RB_CHUNK_SIZE_GET(rb, read_pt);
	if (len < chunk_size) {
		qb_util_log(LOG_ERR,
			    "trying to recv chunk of size %d but %d available",
			    len, chunk_size);
		if (rb->notifier.post_fn) {
			(void)rb->notifier.post_fn(rb->notifier.instance, chunk_size);
		}
		return -ENOBUFS;
	}

	memcpy(data_out,
	       QB_RB_CHUNK_DATA_GET(rb, read_pt),
	       chunk_size);

	_rb_chunk_reclaim(rb);

	return chunk_size;
}

static void
print_header(struct qb_ringbuffer_s * rb)
{
	printf("Ringbuffer: \n");
	if (rb->flags & QB_RB_FLAG_OVERWRITE) {
		printf(" ->OVERWRITE\n");
	} else {
		printf(" ->NORMAL\n");
	}
#ifndef S_SPLINT_S
	printf(" ->write_pt [%" PRIu32 "]\n", rb->shared_hdr->write_pt);
	printf(" ->read_pt [%" PRIu32 "]\n", rb->shared_hdr->read_pt);
	printf(" ->size [%" PRIu32 " words]\n", rb->shared_hdr->word_size);
	printf(" =>free [%zd bytes]\n", qb_rb_space_free(rb));
	printf(" =>used [%zd bytes]\n", qb_rb_space_used(rb));
#endif /* S_SPLINT_S */
}

/*
 * FILE HEADER ORDER
 * 1. word_size
 * 2. write_pt
 * 3. read_pt
 * 4. version
 * 5. header_hash
 *
 * 6. data
 */

ssize_t
qb_rb_write_to_file(struct qb_ringbuffer_s * rb, int32_t fd)
{
	ssize_t result;
	ssize_t written_size = 0;
	uint32_t hash = 0;
	uint32_t version = QB_RB_FILE_HEADER_VERSION;

	if (rb == NULL) {
		return -EINVAL;
	}
	print_header(rb);

	/*
 	 * 1. word_size
 	 */
	result = write(fd, &rb->shared_hdr->word_size, sizeof(uint32_t));
	if (result != sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;

	/*
	 * 2. 3. store the read & write pointers
	 */
	result = write(fd, (void *)&rb->shared_hdr->write_pt, sizeof(uint32_t));
	if (result != sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;
	result = write(fd, (void *)&rb->shared_hdr->read_pt, sizeof(uint32_t));
	if (result != sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;

	/*
	 * 4. version used
	 */
	result = write(fd, &version, sizeof(uint32_t));
	if (result != sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;

	/*
	 * 5. hash helps us verify header is not corrupted on file read
	 */
	hash = rb->shared_hdr->word_size + rb->shared_hdr->write_pt + rb->shared_hdr->read_pt + QB_RB_FILE_HEADER_VERSION;
	result = write(fd, &hash, sizeof(uint32_t));
	if (result != sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;

	result = write(fd, rb->shared_data,
		       rb->shared_hdr->word_size * sizeof(uint32_t));
	if (result != rb->shared_hdr->word_size * sizeof(uint32_t)) {
		return -errno;
	}
	written_size += result;

	qb_util_log(LOG_DEBUG, " writing total of: %zd\n", written_size);

	return written_size;
}

qb_ringbuffer_t *
qb_rb_create_from_file(int32_t fd, uint32_t flags)
{
	ssize_t n_read;
	size_t n_required;
	size_t total_read = 0;
	uint32_t read_pt;
	uint32_t write_pt;
	struct qb_ringbuffer_s *rb;
	uint32_t word_size = 0;
	uint32_t version = 0;
	uint32_t hash = 0;
	uint32_t calculated_hash = 0;

	if (fd < 0) {
		return NULL;
	}

	/*
	 * 1. word size
	 */
	n_required = sizeof(uint32_t);
	n_read = read(fd, &word_size, n_required);
	if (n_read != n_required) {
		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
		return NULL;
	}
	total_read += n_read;

	/*
	 * 2. 3. read & write pointers
	 */
	n_read = read(fd, &write_pt, sizeof(uint32_t));
	assert(n_read == sizeof(uint32_t));
	total_read += n_read;

	n_read = read(fd, &read_pt, sizeof(uint32_t));
	assert(n_read == sizeof(uint32_t));
	total_read += n_read;

	/*
	 * 4. version
	 */
	n_required = sizeof(uint32_t);
	n_read = read(fd, &version, n_required);
	if (n_read != n_required) {
		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
		return NULL;
	}
	total_read += n_read;

	/*
	 * 5. Hash
	 */
	n_required = sizeof(uint32_t);
	n_read = read(fd, &hash, n_required);
	if (n_read != n_required) {
		qb_util_perror(LOG_ERR, "Unable to read blackbox file header");
		return NULL;
	}
	total_read += n_read;

	calculated_hash = word_size + write_pt + read_pt + version;
	if (hash != calculated_hash) {
		qb_util_log(LOG_ERR, "Corrupt blackbox: File header hash (%d) does not match calculated hash (%d)", hash, calculated_hash);
		return NULL;
	} else if (version != QB_RB_FILE_HEADER_VERSION) {
		qb_util_log(LOG_ERR, "Wrong file header version. Expected %d got %d",
			QB_RB_FILE_HEADER_VERSION, version);
		return NULL;
	}

	/*
	 * 6. data
	 */
	n_required = (word_size * sizeof(uint32_t));

	/*
	 * qb_rb_open adds QB_RB_CHUNK_MARGIN + 1 to the requested size.
	 */
	rb = qb_rb_open("create_from_file", n_required - (QB_RB_CHUNK_MARGIN + 1),
			QB_RB_FLAG_CREATE | QB_RB_FLAG_NO_SEMAPHORE, 0);
	if (rb == NULL) {
		return NULL;
	}
	rb->shared_hdr->read_pt = read_pt;
	rb->shared_hdr->write_pt = write_pt;

	n_read = read(fd, rb->shared_data, n_required);
	if (n_read < 0) {
		qb_util_perror(LOG_ERR, "Unable to read blackbox file data");
		goto cleanup_fail;
	}
	total_read += n_read;

	if (n_read != n_required) {
		qb_util_log(LOG_WARNING, "read %zd bytes, but expected %zu",
			    n_read, n_required);
		goto cleanup_fail;
	}

	qb_util_log(LOG_DEBUG, "read total of: %zd", total_read);
	print_header(rb);

	return rb;

cleanup_fail:
	qb_rb_close(rb);
	return NULL;
}

int32_t
qb_rb_chown(struct qb_ringbuffer_s * rb, uid_t owner, gid_t group)
{
	int32_t res;

	if (rb == NULL) {
		return -EINVAL;
	}
	res = chown(rb->shared_hdr->data_path, owner, group);
	if (res < 0 && errno != EPERM) {
		return -errno;
	}
	res = chown(rb->shared_hdr->hdr_path, owner, group);
	if (res < 0 && errno != EPERM) {
		return -errno;
	}
	return 0;
}

int32_t
qb_rb_chmod(qb_ringbuffer_t * rb, mode_t mode)
{
	int32_t res;

	if (rb == NULL) {
		return -EINVAL;
	}
	res = chmod(rb->shared_hdr->data_path, mode);
	if (res < 0) {
		return -errno;
	}
	res = chmod(rb->shared_hdr->hdr_path, mode);
	if (res < 0) {
		return -errno;
	}
	return 0;
}