Blob Blame History Raw
/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
 * Copyright (C) 2012 Igalia, S.L.
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include <glib/gi18n-lib.h>
#include "soup-cache-input-stream.h"
#include "soup-message-body.h"

/* properties */
enum {
	PROP_0,

	PROP_OUTPUT_STREAM,

	LAST_PROP
};

enum {
	CACHING_FINISHED,

	LAST_SIGNAL
};

static guint signals[LAST_SIGNAL] = { 0 };

struct _SoupCacheInputStreamPrivate
{
	GOutputStream *output_stream;
	GCancellable *cancellable;
	gsize bytes_written;

	gboolean read_finished;
	SoupBuffer *current_writing_buffer;
	GQueue *buffer_queue;
};

static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);

G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
                         G_ADD_PRIVATE (SoupCacheInputStream)
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
						soup_cache_input_stream_pollable_init))


static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);

static inline void
notify_and_clear (SoupCacheInputStream *istream, GError *error)
{
	SoupCacheInputStreamPrivate *priv = istream->priv;

	g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);

	g_clear_object (&priv->cancellable);
	g_clear_object (&priv->output_stream);
	g_clear_error (&error);
}

static inline void
try_write_next_buffer (SoupCacheInputStream *istream)
{
	SoupCacheInputStreamPrivate *priv = istream->priv;

	if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
		soup_cache_input_stream_write_next_buffer (istream);
	else if (priv->read_finished)
		notify_and_clear (istream, NULL);
	else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
		GError *error = NULL;
		g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
				     _("Network stream unexpectedly closed"));
		notify_and_clear (istream, error);
	}
}

static void
file_replaced_cb (GObject      *source,
		  GAsyncResult *res,
		  gpointer      user_data)
{
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
	SoupCacheInputStreamPrivate *priv = istream->priv;
	GError *error = NULL;

	priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);

	if (error)
		notify_and_clear (istream, error);
	else
		try_write_next_buffer (istream);

	g_object_unref (istream);
}

static void
soup_cache_input_stream_init (SoupCacheInputStream *self)
{
	SoupCacheInputStreamPrivate *priv = soup_cache_input_stream_get_instance_private (self);

	priv->buffer_queue = g_queue_new ();
	self->priv = priv;
}

static void
soup_cache_input_stream_get_property (GObject *object,
				      guint property_id, GValue *value, GParamSpec *pspec)
{
	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
	SoupCacheInputStreamPrivate *priv = self->priv;

	switch (property_id) {
	case PROP_OUTPUT_STREAM:
		g_value_set_object (value, priv->output_stream);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
		break;
	}
}

static void
soup_cache_input_stream_set_property (GObject *object,
				      guint property_id, const GValue *value, GParamSpec *pspec)
{
	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
	SoupCacheInputStreamPrivate *priv = self->priv;

	switch (property_id) {
	case PROP_OUTPUT_STREAM:
		priv->output_stream = g_value_dup_object (value);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
		break;
	}
}

static void
soup_cache_input_stream_finalize (GObject *object)
{
	SoupCacheInputStream *self = (SoupCacheInputStream *)object;
	SoupCacheInputStreamPrivate *priv = self->priv;

	g_clear_object (&priv->cancellable);
	g_clear_object (&priv->output_stream);
	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
	g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);

	G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
}

static void
write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
{
	GOutputStream *ostream = G_OUTPUT_STREAM (source);
	SoupCacheInputStreamPrivate *priv = istream->priv;
	gssize write_size;
	gsize pending;
	GError *error = NULL;

	write_size = g_output_stream_write_finish (ostream, result, &error);
	if (error) {
		notify_and_clear (istream, error);
		g_object_unref (istream);
		return;
	}

	/* Check that we have written everything */
	pending = priv->current_writing_buffer->length - write_size;
	if (pending) {
		SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
								   write_size, pending);
		g_queue_push_head (priv->buffer_queue, subbuffer);
	}

	priv->bytes_written += write_size;
	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);

	try_write_next_buffer (istream);
	g_object_unref (istream);
}

static void
soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
{
	SoupCacheInputStreamPrivate *priv = istream->priv;
	SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
	int priority;

	g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));

	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
	priv->current_writing_buffer = buffer;

	if (priv->buffer_queue->length > 10)
		priority = G_PRIORITY_DEFAULT;
	else
		priority = G_PRIORITY_LOW;

	g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
				     priority, priv->cancellable,
				     (GAsyncReadyCallback) write_ready_cb,
				     g_object_ref (istream));
}

static gssize
read_internal (GInputStream  *stream,
	       void          *buffer,
	       gsize          count,
	       gboolean       blocking,
	       GCancellable  *cancellable,
	       GError       **error)
{
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
	SoupCacheInputStreamPrivate *priv = istream->priv;
	GInputStream *base_stream;
	gssize nread;

	base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
	nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
					cancellable, error);

	if (G_UNLIKELY (nread == -1 || priv->read_finished))
		return nread;

	if (nread == 0) {
		priv->read_finished = TRUE;

		if (priv->current_writing_buffer == NULL && priv->output_stream)
			notify_and_clear (istream, NULL);
	} else {
		SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
		g_queue_push_tail (priv->buffer_queue, soup_buffer);

		if (priv->current_writing_buffer == NULL && priv->output_stream)
			soup_cache_input_stream_write_next_buffer (istream);
	}

	return nread;
}

static gssize
soup_cache_input_stream_read_fn (GInputStream  *stream,
				 void          *buffer,
				 gsize          count,
				 GCancellable  *cancellable,
				 GError       **error)
{
	return read_internal (stream, buffer, count, TRUE,
			      cancellable, error);
}

static gssize
soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
					  void                  *buffer,
					  gsize                  count,
					  GError               **error)
{
	return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
			      NULL, error);
}

static void
soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
				       gpointer interface_data)
{
	pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
}

static gboolean
soup_cache_input_stream_close_fn (GInputStream  *stream,
				  GCancellable  *cancellable,
				  GError       **error)
{
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
	SoupCacheInputStreamPrivate *priv = istream->priv;

	if (!priv->read_finished) {
		if (priv->output_stream) {
			/* Cancel any pending write operation or return an error if none. */
			if (g_output_stream_has_pending (priv->output_stream))
				g_cancellable_cancel (priv->cancellable);
			else {
				GError *notify_error = NULL;
				g_set_error_literal (&notify_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
						     _("Failed to completely cache the resource"));
				notify_and_clear (istream, notify_error);
			}
		} else if (priv->cancellable)
			/* The file_replace_async() hasn't finished yet */
			g_cancellable_cancel (priv->cancellable);
	}

	return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
}

static void
soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
{
	GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
	GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);

	gobject_class->get_property = soup_cache_input_stream_get_property;
	gobject_class->set_property = soup_cache_input_stream_set_property;
	gobject_class->finalize = soup_cache_input_stream_finalize;

	istream_class->read_fn = soup_cache_input_stream_read_fn;
	istream_class->close_fn = soup_cache_input_stream_close_fn;

	g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
					 g_param_spec_object ("output-stream", "Output stream",
							      "the output stream where to write.",
							      G_TYPE_OUTPUT_STREAM,
							      G_PARAM_READWRITE |
							      G_PARAM_CONSTRUCT_ONLY |
							      G_PARAM_STATIC_STRINGS));

	signals[CACHING_FINISHED] =
		g_signal_new ("caching-finished",
			      G_OBJECT_CLASS_TYPE (gobject_class),
			      G_SIGNAL_RUN_FIRST,
			      G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
			      NULL, NULL,
			      NULL,
			      G_TYPE_NONE, 2,
			      G_TYPE_INT, G_TYPE_ERROR);
}

GInputStream *
soup_cache_input_stream_new (GInputStream *base_stream,
			     GFile        *file)
{
	SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
					      "base-stream", base_stream,
					      "close-base-stream", FALSE,
					      NULL);

	istream->priv->cancellable = g_cancellable_new ();
	g_file_replace_async (file, NULL, FALSE,
			      G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
			      G_PRIORITY_DEFAULT, istream->priv->cancellable,
			      file_replaced_cb, g_object_ref (istream));

	return (GInputStream *) istream;
}