Blob Blame History Raw
/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
/*
 * GData Client
 * Copyright (C) Philip Withnall 2009 <philip@tecnocode.co.uk>
 *
 * GData Client is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * GData Client is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with GData Client.  If not, see <http://www.gnu.org/licenses/>.
 */

/*
 * SECTION:gdata-buffer
 * @short_description: GData buffer to allow threadsafe buffering
 * @stability: Stable
 * @include: gdata/gdata-buffer.h
 *
 * #GDataBuffer is a simple object which allows threadsafe buffering of data meaning, for example, data can be received from
 * the network in a "push" fashion, buffered, then sent out to an output stream in a "pull" fashion.
 */

#include <config.h>
#include <glib.h>
#include <string.h>

#include "gdata-buffer.h"

struct _GDataBufferChunk {
	/*< private >*/
	guint8 *data;
	gsize length;
	GDataBufferChunk *next;
	/* Note: the data is actually allocated in the same memory block, so it's inside this comment right now.
	 * We simply set chunk->data to point to chunk + sizeof (GDataBufferChunk). */
};

/**
 * gdata_buffer_new:
 *
 * Creates a new empty #GDataBuffer.
 *
 * Return value: a new #GDataBuffer; free with gdata_buffer_free()
 *
 * Since: 0.5.0
 */
GDataBuffer *
gdata_buffer_new (void)
{
	GDataBuffer *buffer = g_slice_new0 (GDataBuffer);

	g_mutex_init (&(buffer->mutex));
	g_cond_init (&(buffer->cond));

	return buffer;
}

/**
 * gdata_buffer_free:
 *
 * Frees a #GDataBuffer. The function isn't threadsafe, so should only be called once
 * use of the buffer has been reduced to only one thread (the reading thread, after
 * the EOF has been reached).
 *
 * Since: 0.5.0
 */
void
gdata_buffer_free (GDataBuffer *self)
{
	GDataBufferChunk *chunk, *next_chunk;

	g_return_if_fail (self != NULL);

	for (chunk = self->head; chunk != NULL; chunk = next_chunk) {
		next_chunk = chunk->next;
		g_free (chunk);
	}

	g_cond_clear (&(self->cond));
	g_mutex_clear (&(self->mutex));

	g_slice_free (GDataBuffer, self);
}

/**
 * gdata_buffer_push_data:
 * @self: a #GDataBuffer
 * @data: the data to push onto the buffer
 * @length: the length of @data
 *
 * Pushes @length bytes of @data onto the buffer, taking a copy of the data. If @data is %NULL and @length is <code class="literal">0</code>,
 * the buffer will be marked as having reached the EOF, and subsequent calls to gdata_buffer_push_data()
 * will fail and return %FALSE.
 *
 * Assuming the buffer hasn't reached EOF, this operation is guaranteed to succeed (unless memory allocation fails).
 *
 * This function holds the lock on the #GDataBuffer, and signals any waiting calls to gdata_buffer_pop_data() once
 * the new data has been pushed onto the buffer. This function is threadsafe.
 *
 * Return value: %TRUE on success, %FALSE otherwise
 *
 * Since: 0.5.0
 */
gboolean
gdata_buffer_push_data (GDataBuffer *self, const guint8 *data, gsize length)
{
	GDataBufferChunk *chunk;

	g_return_val_if_fail (self != NULL, 0);

	g_mutex_lock (&(self->mutex));

	if (G_UNLIKELY (self->reached_eof == TRUE)) {
		/* If we're marked as having reached EOF, don't accept any more data */
		g_mutex_unlock (&(self->mutex));
		return FALSE;
	} else if (G_UNLIKELY (data == NULL && length == 0)) {
		/* If @data is NULL and @length is 0, mark the buffer as having reached EOF,
		 * and signal any waiting threads. */
		self->reached_eof = TRUE;
		g_cond_signal (&(self->cond));
		g_mutex_unlock (&(self->mutex));
		return FALSE;
	}

	/* Create the chunk */
	chunk = g_malloc (sizeof (GDataBufferChunk) + length);
	chunk->data = (guint8*) ((guint8*) chunk + sizeof (GDataBufferChunk)); /* pointer arithmetic in terms of bytes here */
	chunk->length = length;
	chunk->next = NULL;

	/* Copy the data to the chunk */
	if (G_LIKELY (data != NULL))
		memcpy (chunk->data, data, length);

	/* Add it to the buffer's tail */
	if (self->tail != NULL)
		*(self->tail) = chunk;
	else
		self->head = chunk;
	self->tail = &(chunk->next);
	self->total_length += length;

	/* Signal any threads waiting to pop that data is available */
	g_cond_signal (&(self->cond));

	g_mutex_unlock (&(self->mutex));

	return TRUE;
}

typedef struct {
	GDataBuffer *buffer;
	gboolean *cancelled;
} CancelledData;

static void
pop_cancelled_cb (GCancellable *cancellable, CancelledData *data)
{
	/* Signal the pop_data function that it should stop blocking and cancel */
	g_mutex_lock (&(data->buffer->mutex));
	*(data->cancelled) = TRUE;
	g_cond_signal (&(data->buffer->cond));
	g_mutex_unlock (&(data->buffer->mutex));
}

/**
 * gdata_buffer_pop_data:
 * @self: a #GDataBuffer
 * @data: (allow-none): return location for the popped data, or %NULL to just drop the data
 * @length_requested: the number of bytes of data requested
 * @reached_eof: return location for a value which is %TRUE when we've reached EOF, %FALSE otherwise, or %NULL
 * @cancellable: (allow-none): a #GCancellable, or %NULL
 *
 * Pops up to @length_requested bytes off the head of the buffer and copies them to @data, which must be allocated by
 * the caller and have enough space to store at most @length_requested bytes of output.
 *
 * If the buffer contains enough data to satisfy @length_requested, this function returns immediately.
 * Otherwise, this function blocks until data is pushed onto the head of the buffer with gdata_buffer_pop_data(). If
 * the buffer is marked as having reached the EOF, this function will not block, and will instead return the
 * remaining data in the buffer.
 *
 * This function holds the lock on the #GDataBuffer, and will automatically be signalled of new data pushed onto the
 * buffer if it's blocking.
 *
 * If @cancellable is provided, calling g_cancellable_cancel() on it from another thread will cause the call to
 * gdata_buffer_pop_data() to return immediately with whatever data it can find.
 *
 * Return value: the number of bytes returned in @data
 *
 * Since: 0.5.0
 */
gsize
gdata_buffer_pop_data (GDataBuffer *self, guint8 *data, gsize length_requested, gboolean *reached_eof, GCancellable *cancellable)
{
	GDataBufferChunk *chunk;
	gsize return_length = 0, length_remaining;
	gulong cancelled_signal = 0;
	gboolean cancelled = FALSE;

	g_return_val_if_fail (self != NULL, 0);
	g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), 0);

	/* In the case:
	 *  - length_requested < amount available: return length_requested
	 *  - length_requested > amount available: block until more is available, return length_requested
	 *  - length_requested > amount available and we've reached EOF: don't block, return all remaining data
	 *  - length_requested is a whole number of chunks: remove those chunks, return length_requested
	 *  - length_requested is less than one chunk: remove no chunks, return length_requested, set head_read_offset
	 *  - length_requested is a fraction of multiple chunks: remove whole chunks, return length_requested, set head_read_offset
	 *    for remaining fraction */

	/* Set up a handler so we can stop if we're cancelled. This must be done before we lock @self->mutex, or deadlock could occur if the
	 * cancellable has already been cancelled — g_cancellable_connect() would call pop_cancelled_cb() directly, and it would attempt to lock
	 * @self->mutex again. */
	if (cancellable != NULL) {
		CancelledData cancelled_data;

		cancelled_data.buffer = self;
		cancelled_data.cancelled = &cancelled;

		cancelled_signal = g_cancellable_connect (cancellable, (GCallback) pop_cancelled_cb, &cancelled_data, NULL);
	}

	g_mutex_lock (&(self->mutex));

	if (self->reached_eof == TRUE && length_requested > self->total_length) {
		/* Return data up to the EOF */
		return_length = self->total_length;
	} else if (length_requested > self->total_length) {
		/* Block until more data is available */
		while (length_requested > self->total_length) {
			/* If we've already been cancelled, don't wait on @self->cond, since it'll never be signalled again. */
			if (cancelled == FALSE) {
				g_cond_wait (&(self->cond), &(self->mutex));
			}

			/* If the g_cond_wait() returned because it was signalled from the GCancellable callback (rather than from
			 * data being pushed into the buffer), stop blocking for data and make do with what we have so far. */
			if (cancelled == TRUE || self->reached_eof == TRUE) {
				return_length = MIN (length_requested, self->total_length);
				break;
			} else {
				return_length = length_requested;
			}
		}
	} else {
		return_length = length_requested;
	}

	/* Set reached_eof */
	if (reached_eof != NULL)
		*reached_eof = self->reached_eof && length_requested >= self->total_length;

	/* Return if we haven't got any data to pop (i.e. if we were cancelled before even one chunk arrived) */
	if (return_length == 0)
		goto done;

	/* Otherwise, get on with things */
	length_remaining = return_length;

	/* We can't assume we'll have enough data, since we may have reached EOF */
	chunk = self->head;
	while (chunk != NULL && self->head_read_offset + length_remaining >= chunk->length) {
		GDataBufferChunk *next_chunk;
		gsize chunk_length = chunk->length - self->head_read_offset;

		/* Copy the data to the output */
		length_remaining -= chunk_length;
		if (data != NULL) {
			memcpy (data, chunk->data + self->head_read_offset, chunk_length);
			data += chunk_length;
		}

		/* Free the chunk and move on */
		next_chunk = chunk->next;
		g_free (chunk);
		chunk = next_chunk;

		/* Reset the head read offset, since we've processed at least the first chunk now */
		self->head_read_offset = 0;
	}

	/* If the requested length is still > 0, it must be < chunk->length, and chunk must != NULL (if it does, the cached total_length has
	 * been corrupted somewhere). */
	if (G_LIKELY (length_remaining > 0)) {
		g_assert (chunk != NULL);
		g_assert_cmpuint (length_remaining, <=, chunk->length);

		/* Copy the requested data to the output */
		if (data != NULL) {
			memcpy (data, chunk->data + self->head_read_offset, length_remaining);
		}
		self->head_read_offset += length_remaining;
	}

	self->head = chunk;
	if (self->head == NULL)
		self->tail = NULL;
	self->total_length -= return_length;

done:
	g_mutex_unlock (&(self->mutex));

	/* Disconnect from the cancelled signal. Note that this has to be done without @self->mutex held, or deadlock can occur.
	 * (g_cancellable_disconnect() waits for any in-progress signal handler call to finish, which can't happen until the mutex is released.) */
	if (cancelled_signal != 0)
		g_cancellable_disconnect (cancellable, cancelled_signal);

	return return_length;
}

/**
 * gdata_buffer_pop_all_data:
 * @self: a #GDataBuffer
 * @data: return location for the popped data
 * @maximum_length: the maximum number of bytes to return
 * @reached_eof: return location for a value which is %TRUE when we've reached EOF, %FALSE otherwise, or %NULL
 *
 * Pops as much data as possible off the #GDataBuffer, up to a limit of @maximum_length bytes. If fewer bytes exist
 * in the buffer, fewer bytes will be returned. If more bytes exist in the buffer, @maximum_length bytes will be returned.
 *
 * If <code class="literal">0</code> bytes exist in the buffer, this function will block until data is available. Otherwise, it will never block.
 *
 * Return value: the number of bytes returned in @data (guaranteed to be more than <code class="literal">0</code> and at most @maximum_length)
 *
 * Since: 0.5.0
 */
gsize
gdata_buffer_pop_data_limited (GDataBuffer *self, guint8 *data, gsize maximum_length, gboolean *reached_eof)
{
	g_return_val_if_fail (self != NULL, 0);
	g_return_val_if_fail (data != NULL, 0);
	g_return_val_if_fail (maximum_length > 0, 0);

	/* If there's no data in the buffer, block until some is available */
	g_mutex_lock (&(self->mutex));
	if (self->total_length == 0 && self->reached_eof == FALSE) {
		g_cond_wait (&(self->cond), &(self->mutex));
	}
	g_mutex_unlock (&(self->mutex));

	return gdata_buffer_pop_data (self, data, MIN (maximum_length, self->total_length), reached_eof, NULL);
}