Blame gdata/gdata-buffer.c

Packit 4b6dd7
/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
Packit 4b6dd7
/*
Packit 4b6dd7
 * GData Client
Packit 4b6dd7
 * Copyright (C) Philip Withnall 2009 <philip@tecnocode.co.uk>
Packit 4b6dd7
 *
Packit 4b6dd7
 * GData Client is free software; you can redistribute it and/or
Packit 4b6dd7
 * modify it under the terms of the GNU Lesser General Public
Packit 4b6dd7
 * License as published by the Free Software Foundation; either
Packit 4b6dd7
 * version 2.1 of the License, or (at your option) any later version.
Packit 4b6dd7
 *
Packit 4b6dd7
 * GData Client is distributed in the hope that it will be useful,
Packit 4b6dd7
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 4b6dd7
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 4b6dd7
 * Lesser General Public License for more details.
Packit 4b6dd7
 *
Packit 4b6dd7
 * You should have received a copy of the GNU Lesser General Public
Packit 4b6dd7
 * License along with GData Client.  If not, see <http://www.gnu.org/licenses/>.
Packit 4b6dd7
 */
Packit 4b6dd7
Packit 4b6dd7
/*
Packit 4b6dd7
 * SECTION:gdata-buffer
Packit 4b6dd7
 * @short_description: GData buffer to allow threadsafe buffering
Packit 4b6dd7
 * @stability: Stable
Packit 4b6dd7
 * @include: gdata/gdata-buffer.h
Packit 4b6dd7
 *
Packit 4b6dd7
 * #GDataBuffer is a simple object which allows threadsafe buffering of data meaning, for example, data can be received from
Packit 4b6dd7
 * the network in a "push" fashion, buffered, then sent out to an output stream in a "pull" fashion.
Packit 4b6dd7
 */
Packit 4b6dd7
Packit 4b6dd7
#include <config.h>
Packit 4b6dd7
#include <glib.h>
Packit 4b6dd7
#include <string.h>
Packit 4b6dd7
Packit 4b6dd7
#include "gdata-buffer.h"
Packit 4b6dd7
Packit 4b6dd7
struct _GDataBufferChunk {
Packit 4b6dd7
	/*< private >*/
Packit 4b6dd7
	guint8 *data;
Packit 4b6dd7
	gsize length;
Packit 4b6dd7
	GDataBufferChunk *next;
Packit 4b6dd7
	/* Note: the data is actually allocated in the same memory block, so it's inside this comment right now.
Packit 4b6dd7
	 * We simply set chunk->data to point to chunk + sizeof (GDataBufferChunk). */
Packit 4b6dd7
};
Packit 4b6dd7
Packit 4b6dd7
/**
Packit 4b6dd7
 * gdata_buffer_new:
Packit 4b6dd7
 *
Packit 4b6dd7
 * Creates a new empty #GDataBuffer.
Packit 4b6dd7
 *
Packit 4b6dd7
 * Return value: a new #GDataBuffer; free with gdata_buffer_free()
Packit 4b6dd7
 *
Packit 4b6dd7
 * Since: 0.5.0
Packit 4b6dd7
 */
Packit 4b6dd7
GDataBuffer *
Packit 4b6dd7
gdata_buffer_new (void)
Packit 4b6dd7
{
Packit 4b6dd7
	GDataBuffer *buffer = g_slice_new0 (GDataBuffer);
Packit 4b6dd7
Packit 4b6dd7
	g_mutex_init (&(buffer->mutex));
Packit 4b6dd7
	g_cond_init (&(buffer->cond));
Packit 4b6dd7
Packit 4b6dd7
	return buffer;
Packit 4b6dd7
}
Packit 4b6dd7
Packit 4b6dd7
/**
Packit 4b6dd7
 * gdata_buffer_free:
Packit 4b6dd7
 *
Packit 4b6dd7
 * Frees a #GDataBuffer. The function isn't threadsafe, so should only be called once
Packit 4b6dd7
 * use of the buffer has been reduced to only one thread (the reading thread, after
Packit 4b6dd7
 * the EOF has been reached).
Packit 4b6dd7
 *
Packit 4b6dd7
 * Since: 0.5.0
Packit 4b6dd7
 */
Packit 4b6dd7
void
Packit 4b6dd7
gdata_buffer_free (GDataBuffer *self)
Packit 4b6dd7
{
Packit 4b6dd7
	GDataBufferChunk *chunk, *next_chunk;
Packit 4b6dd7
Packit 4b6dd7
	g_return_if_fail (self != NULL);
Packit 4b6dd7
Packit 4b6dd7
	for (chunk = self->head; chunk != NULL; chunk = next_chunk) {
Packit 4b6dd7
		next_chunk = chunk->next;
Packit 4b6dd7
		g_free (chunk);
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	g_cond_clear (&(self->cond));
Packit 4b6dd7
	g_mutex_clear (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	g_slice_free (GDataBuffer, self);
Packit 4b6dd7
}
Packit 4b6dd7
Packit 4b6dd7
/**
Packit 4b6dd7
 * gdata_buffer_push_data:
Packit 4b6dd7
 * @self: a #GDataBuffer
Packit 4b6dd7
 * @data: the data to push onto the buffer
Packit 4b6dd7
 * @length: the length of @data
Packit 4b6dd7
 *
Packit 4b6dd7
 * Pushes @length bytes of @data onto the buffer, taking a copy of the data. If @data is %NULL and @length is 0,
Packit 4b6dd7
 * the buffer will be marked as having reached the EOF, and subsequent calls to gdata_buffer_push_data()
Packit 4b6dd7
 * will fail and return %FALSE.
Packit 4b6dd7
 *
Packit 4b6dd7
 * Assuming the buffer hasn't reached EOF, this operation is guaranteed to succeed (unless memory allocation fails).
Packit 4b6dd7
 *
Packit 4b6dd7
 * This function holds the lock on the #GDataBuffer, and signals any waiting calls to gdata_buffer_pop_data() once
Packit 4b6dd7
 * the new data has been pushed onto the buffer. This function is threadsafe.
Packit 4b6dd7
 *
Packit 4b6dd7
 * Return value: %TRUE on success, %FALSE otherwise
Packit 4b6dd7
 *
Packit 4b6dd7
 * Since: 0.5.0
Packit 4b6dd7
 */
Packit 4b6dd7
gboolean
Packit 4b6dd7
gdata_buffer_push_data (GDataBuffer *self, const guint8 *data, gsize length)
Packit 4b6dd7
{
Packit 4b6dd7
	GDataBufferChunk *chunk;
Packit 4b6dd7
Packit 4b6dd7
	g_return_val_if_fail (self != NULL, 0);
Packit 4b6dd7
Packit 4b6dd7
	g_mutex_lock (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	if (G_UNLIKELY (self->reached_eof == TRUE)) {
Packit 4b6dd7
		/* If we're marked as having reached EOF, don't accept any more data */
Packit 4b6dd7
		g_mutex_unlock (&(self->mutex));
Packit 4b6dd7
		return FALSE;
Packit 4b6dd7
	} else if (G_UNLIKELY (data == NULL && length == 0)) {
Packit 4b6dd7
		/* If @data is NULL and @length is 0, mark the buffer as having reached EOF,
Packit 4b6dd7
		 * and signal any waiting threads. */
Packit 4b6dd7
		self->reached_eof = TRUE;
Packit 4b6dd7
		g_cond_signal (&(self->cond));
Packit 4b6dd7
		g_mutex_unlock (&(self->mutex));
Packit 4b6dd7
		return FALSE;
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	/* Create the chunk */
Packit 4b6dd7
	chunk = g_malloc (sizeof (GDataBufferChunk) + length);
Packit 4b6dd7
	chunk->data = (guint8*) ((guint8*) chunk + sizeof (GDataBufferChunk)); /* pointer arithmetic in terms of bytes here */
Packit 4b6dd7
	chunk->length = length;
Packit 4b6dd7
	chunk->next = NULL;
Packit 4b6dd7
Packit 4b6dd7
	/* Copy the data to the chunk */
Packit 4b6dd7
	if (G_LIKELY (data != NULL))
Packit 4b6dd7
		memcpy (chunk->data, data, length);
Packit 4b6dd7
Packit 4b6dd7
	/* Add it to the buffer's tail */
Packit 4b6dd7
	if (self->tail != NULL)
Packit 4b6dd7
		*(self->tail) = chunk;
Packit 4b6dd7
	else
Packit 4b6dd7
		self->head = chunk;
Packit 4b6dd7
	self->tail = &(chunk->next);
Packit 4b6dd7
	self->total_length += length;
Packit 4b6dd7
Packit 4b6dd7
	/* Signal any threads waiting to pop that data is available */
Packit 4b6dd7
	g_cond_signal (&(self->cond));
Packit 4b6dd7
Packit 4b6dd7
	g_mutex_unlock (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	return TRUE;
Packit 4b6dd7
}
Packit 4b6dd7
Packit 4b6dd7
typedef struct {
Packit 4b6dd7
	GDataBuffer *buffer;
Packit 4b6dd7
	gboolean *cancelled;
Packit 4b6dd7
} CancelledData;
Packit 4b6dd7
Packit 4b6dd7
static void
Packit 4b6dd7
pop_cancelled_cb (GCancellable *cancellable, CancelledData *data)
Packit 4b6dd7
{
Packit 4b6dd7
	/* Signal the pop_data function that it should stop blocking and cancel */
Packit 4b6dd7
	g_mutex_lock (&(data->buffer->mutex));
Packit 4b6dd7
	*(data->cancelled) = TRUE;
Packit 4b6dd7
	g_cond_signal (&(data->buffer->cond));
Packit 4b6dd7
	g_mutex_unlock (&(data->buffer->mutex));
Packit 4b6dd7
}
Packit 4b6dd7
Packit 4b6dd7
/**
Packit 4b6dd7
 * gdata_buffer_pop_data:
Packit 4b6dd7
 * @self: a #GDataBuffer
Packit 4b6dd7
 * @data: (allow-none): return location for the popped data, or %NULL to just drop the data
Packit 4b6dd7
 * @length_requested: the number of bytes of data requested
Packit 4b6dd7
 * @reached_eof: return location for a value which is %TRUE when we've reached EOF, %FALSE otherwise, or %NULL
Packit 4b6dd7
 * @cancellable: (allow-none): a #GCancellable, or %NULL
Packit 4b6dd7
 *
Packit 4b6dd7
 * Pops up to @length_requested bytes off the head of the buffer and copies them to @data, which must be allocated by
Packit 4b6dd7
 * the caller and have enough space to store at most @length_requested bytes of output.
Packit 4b6dd7
 *
Packit 4b6dd7
 * If the buffer contains enough data to satisfy @length_requested, this function returns immediately.
Packit 4b6dd7
 * Otherwise, this function blocks until data is pushed onto the head of the buffer with gdata_buffer_pop_data(). If
Packit 4b6dd7
 * the buffer is marked as having reached the EOF, this function will not block, and will instead return the
Packit 4b6dd7
 * remaining data in the buffer.
Packit 4b6dd7
 *
Packit 4b6dd7
 * This function holds the lock on the #GDataBuffer, and will automatically be signalled of new data pushed onto the
Packit 4b6dd7
 * buffer if it's blocking.
Packit 4b6dd7
 *
Packit 4b6dd7
 * If @cancellable is provided, calling g_cancellable_cancel() on it from another thread will cause the call to
Packit 4b6dd7
 * gdata_buffer_pop_data() to return immediately with whatever data it can find.
Packit 4b6dd7
 *
Packit 4b6dd7
 * Return value: the number of bytes returned in @data
Packit 4b6dd7
 *
Packit 4b6dd7
 * Since: 0.5.0
Packit 4b6dd7
 */
Packit 4b6dd7
gsize
Packit 4b6dd7
gdata_buffer_pop_data (GDataBuffer *self, guint8 *data, gsize length_requested, gboolean *reached_eof, GCancellable *cancellable)
Packit 4b6dd7
{
Packit 4b6dd7
	GDataBufferChunk *chunk;
Packit 4b6dd7
	gsize return_length = 0, length_remaining;
Packit 4b6dd7
	gulong cancelled_signal = 0;
Packit 4b6dd7
	gboolean cancelled = FALSE;
Packit 4b6dd7
Packit 4b6dd7
	g_return_val_if_fail (self != NULL, 0);
Packit 4b6dd7
	g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), 0);
Packit 4b6dd7
Packit 4b6dd7
	/* In the case:
Packit 4b6dd7
	 *  - length_requested < amount available: return length_requested
Packit 4b6dd7
	 *  - length_requested > amount available: block until more is available, return length_requested
Packit 4b6dd7
	 *  - length_requested > amount available and we've reached EOF: don't block, return all remaining data
Packit 4b6dd7
	 *  - length_requested is a whole number of chunks: remove those chunks, return length_requested
Packit 4b6dd7
	 *  - length_requested is less than one chunk: remove no chunks, return length_requested, set head_read_offset
Packit 4b6dd7
	 *  - length_requested is a fraction of multiple chunks: remove whole chunks, return length_requested, set head_read_offset
Packit 4b6dd7
	 *    for remaining fraction */
Packit 4b6dd7
Packit 4b6dd7
	/* Set up a handler so we can stop if we're cancelled. This must be done before we lock @self->mutex, or deadlock could occur if the
Packit 4b6dd7
	 * cancellable has already been cancelled — g_cancellable_connect() would call pop_cancelled_cb() directly, and it would attempt to lock
Packit 4b6dd7
	 * @self->mutex again. */
Packit 4b6dd7
	if (cancellable != NULL) {
Packit 4b6dd7
		CancelledData cancelled_data;
Packit 4b6dd7
Packit 4b6dd7
		cancelled_data.buffer = self;
Packit 4b6dd7
		cancelled_data.cancelled = &cancelled;
Packit 4b6dd7
Packit 4b6dd7
		cancelled_signal = g_cancellable_connect (cancellable, (GCallback) pop_cancelled_cb, &cancelled_data, NULL);
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	g_mutex_lock (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	if (self->reached_eof == TRUE && length_requested > self->total_length) {
Packit 4b6dd7
		/* Return data up to the EOF */
Packit 4b6dd7
		return_length = self->total_length;
Packit 4b6dd7
	} else if (length_requested > self->total_length) {
Packit 4b6dd7
		/* Block until more data is available */
Packit 4b6dd7
		while (length_requested > self->total_length) {
Packit 4b6dd7
			/* If we've already been cancelled, don't wait on @self->cond, since it'll never be signalled again. */
Packit 4b6dd7
			if (cancelled == FALSE) {
Packit 4b6dd7
				g_cond_wait (&(self->cond), &(self->mutex));
Packit 4b6dd7
			}
Packit 4b6dd7
Packit 4b6dd7
			/* If the g_cond_wait() returned because it was signalled from the GCancellable callback (rather than from
Packit 4b6dd7
			 * data being pushed into the buffer), stop blocking for data and make do with what we have so far. */
Packit 4b6dd7
			if (cancelled == TRUE || self->reached_eof == TRUE) {
Packit 4b6dd7
				return_length = MIN (length_requested, self->total_length);
Packit 4b6dd7
				break;
Packit 4b6dd7
			} else {
Packit 4b6dd7
				return_length = length_requested;
Packit 4b6dd7
			}
Packit 4b6dd7
		}
Packit 4b6dd7
	} else {
Packit 4b6dd7
		return_length = length_requested;
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	/* Set reached_eof */
Packit 4b6dd7
	if (reached_eof != NULL)
Packit 4b6dd7
		*reached_eof = self->reached_eof && length_requested >= self->total_length;
Packit 4b6dd7
Packit 4b6dd7
	/* Return if we haven't got any data to pop (i.e. if we were cancelled before even one chunk arrived) */
Packit 4b6dd7
	if (return_length == 0)
Packit 4b6dd7
		goto done;
Packit 4b6dd7
Packit 4b6dd7
	/* Otherwise, get on with things */
Packit 4b6dd7
	length_remaining = return_length;
Packit 4b6dd7
Packit 4b6dd7
	/* We can't assume we'll have enough data, since we may have reached EOF */
Packit 4b6dd7
	chunk = self->head;
Packit 4b6dd7
	while (chunk != NULL && self->head_read_offset + length_remaining >= chunk->length) {
Packit 4b6dd7
		GDataBufferChunk *next_chunk;
Packit 4b6dd7
		gsize chunk_length = chunk->length - self->head_read_offset;
Packit 4b6dd7
Packit 4b6dd7
		/* Copy the data to the output */
Packit 4b6dd7
		length_remaining -= chunk_length;
Packit 4b6dd7
		if (data != NULL) {
Packit 4b6dd7
			memcpy (data, chunk->data + self->head_read_offset, chunk_length);
Packit 4b6dd7
			data += chunk_length;
Packit 4b6dd7
		}
Packit 4b6dd7
Packit 4b6dd7
		/* Free the chunk and move on */
Packit 4b6dd7
		next_chunk = chunk->next;
Packit 4b6dd7
		g_free (chunk);
Packit 4b6dd7
		chunk = next_chunk;
Packit 4b6dd7
Packit 4b6dd7
		/* Reset the head read offset, since we've processed at least the first chunk now */
Packit 4b6dd7
		self->head_read_offset = 0;
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	/* If the requested length is still > 0, it must be < chunk->length, and chunk must != NULL (if it does, the cached total_length has
Packit 4b6dd7
	 * been corrupted somewhere). */
Packit 4b6dd7
	if (G_LIKELY (length_remaining > 0)) {
Packit 4b6dd7
		g_assert (chunk != NULL);
Packit 4b6dd7
		g_assert_cmpuint (length_remaining, <=, chunk->length);
Packit 4b6dd7
Packit 4b6dd7
		/* Copy the requested data to the output */
Packit 4b6dd7
		if (data != NULL) {
Packit 4b6dd7
			memcpy (data, chunk->data + self->head_read_offset, length_remaining);
Packit 4b6dd7
		}
Packit 4b6dd7
		self->head_read_offset += length_remaining;
Packit 4b6dd7
	}
Packit 4b6dd7
Packit 4b6dd7
	self->head = chunk;
Packit 4b6dd7
	if (self->head == NULL)
Packit 4b6dd7
		self->tail = NULL;
Packit 4b6dd7
	self->total_length -= return_length;
Packit 4b6dd7
Packit 4b6dd7
done:
Packit 4b6dd7
	g_mutex_unlock (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	/* Disconnect from the cancelled signal. Note that this has to be done without @self->mutex held, or deadlock can occur.
Packit 4b6dd7
	 * (g_cancellable_disconnect() waits for any in-progress signal handler call to finish, which can't happen until the mutex is released.) */
Packit 4b6dd7
	if (cancelled_signal != 0)
Packit 4b6dd7
		g_cancellable_disconnect (cancellable, cancelled_signal);
Packit 4b6dd7
Packit 4b6dd7
	return return_length;
Packit 4b6dd7
}
Packit 4b6dd7
Packit 4b6dd7
/**
Packit 4b6dd7
 * gdata_buffer_pop_all_data:
Packit 4b6dd7
 * @self: a #GDataBuffer
Packit 4b6dd7
 * @data: return location for the popped data
Packit 4b6dd7
 * @maximum_length: the maximum number of bytes to return
Packit 4b6dd7
 * @reached_eof: return location for a value which is %TRUE when we've reached EOF, %FALSE otherwise, or %NULL
Packit 4b6dd7
 *
Packit 4b6dd7
 * Pops as much data as possible off the #GDataBuffer, up to a limit of @maximum_length bytes. If fewer bytes exist
Packit 4b6dd7
 * in the buffer, fewer bytes will be returned. If more bytes exist in the buffer, @maximum_length bytes will be returned.
Packit 4b6dd7
 *
Packit 4b6dd7
 * If 0 bytes exist in the buffer, this function will block until data is available. Otherwise, it will never block.
Packit 4b6dd7
 *
Packit 4b6dd7
 * Return value: the number of bytes returned in @data (guaranteed to be more than 0 and at most @maximum_length)
Packit 4b6dd7
 *
Packit 4b6dd7
 * Since: 0.5.0
Packit 4b6dd7
 */
Packit 4b6dd7
gsize
Packit 4b6dd7
gdata_buffer_pop_data_limited (GDataBuffer *self, guint8 *data, gsize maximum_length, gboolean *reached_eof)
Packit 4b6dd7
{
Packit 4b6dd7
	g_return_val_if_fail (self != NULL, 0);
Packit 4b6dd7
	g_return_val_if_fail (data != NULL, 0);
Packit 4b6dd7
	g_return_val_if_fail (maximum_length > 0, 0);
Packit 4b6dd7
Packit 4b6dd7
	/* If there's no data in the buffer, block until some is available */
Packit 4b6dd7
	g_mutex_lock (&(self->mutex));
Packit 4b6dd7
	if (self->total_length == 0 && self->reached_eof == FALSE) {
Packit 4b6dd7
		g_cond_wait (&(self->cond), &(self->mutex));
Packit 4b6dd7
	}
Packit 4b6dd7
	g_mutex_unlock (&(self->mutex));
Packit 4b6dd7
Packit 4b6dd7
	return gdata_buffer_pop_data (self, data, MIN (maximum_length, self->total_length), reached_eof, NULL);
Packit 4b6dd7
}