Blame libsoup/soup-cache-input-stream.c

rpm-build 4f3c61
/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
rpm-build 4f3c61
/*
rpm-build 4f3c61
 * Copyright (C) 2012 Igalia, S.L.
rpm-build 4f3c61
 */
rpm-build 4f3c61
rpm-build 4f3c61
#ifdef HAVE_CONFIG_H
rpm-build 4f3c61
#include <config.h>
rpm-build 4f3c61
#endif
rpm-build 4f3c61
rpm-build 4f3c61
#include <glib/gi18n-lib.h>
rpm-build 4f3c61
#include "soup-cache-input-stream.h"
rpm-build 4f3c61
#include "soup-message-body.h"
rpm-build 4f3c61
rpm-build 4f3c61
/* properties */
rpm-build 4f3c61
enum {
rpm-build 4f3c61
	PROP_0,
rpm-build 4f3c61
rpm-build 4f3c61
	PROP_OUTPUT_STREAM,
rpm-build 4f3c61
rpm-build 4f3c61
	LAST_PROP
rpm-build 4f3c61
};
rpm-build 4f3c61
rpm-build 4f3c61
enum {
rpm-build 4f3c61
	CACHING_FINISHED,
rpm-build 4f3c61
rpm-build 4f3c61
	LAST_SIGNAL
rpm-build 4f3c61
};
rpm-build 4f3c61
rpm-build 4f3c61
static guint signals[LAST_SIGNAL] = { 0 };
rpm-build 4f3c61
rpm-build 4f3c61
struct _SoupCacheInputStreamPrivate
rpm-build 4f3c61
{
rpm-build 4f3c61
	GOutputStream *output_stream;
rpm-build 4f3c61
	GCancellable *cancellable;
rpm-build 4f3c61
	gsize bytes_written;
rpm-build 4f3c61
rpm-build 4f3c61
	gboolean read_finished;
rpm-build 4f3c61
	SoupBuffer *current_writing_buffer;
rpm-build 4f3c61
	GQueue *buffer_queue;
rpm-build 4f3c61
};
rpm-build 4f3c61
rpm-build 4f3c61
static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
rpm-build 4f3c61
rpm-build 4f3c61
G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
rpm-build 4f3c61
                         G_ADD_PRIVATE (SoupCacheInputStream)
rpm-build 4f3c61
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
rpm-build 4f3c61
						soup_cache_input_stream_pollable_init))
rpm-build 4f3c61
rpm-build 4f3c61
rpm-build 4f3c61
static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
rpm-build 4f3c61
rpm-build 4f3c61
static inline void
rpm-build 4f3c61
notify_and_clear (SoupCacheInputStream *istream, GError *error)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
rpm-build 4f3c61
rpm-build 4f3c61
	g_clear_object (&priv->cancellable);
rpm-build 4f3c61
	g_clear_object (&priv->output_stream);
rpm-build 4f3c61
	g_clear_error (&error);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static inline void
rpm-build 4f3c61
try_write_next_buffer (SoupCacheInputStream *istream)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
rpm-build 4f3c61
		soup_cache_input_stream_write_next_buffer (istream);
rpm-build 4f3c61
	else if (priv->read_finished)
rpm-build 4f3c61
		notify_and_clear (istream, NULL);
rpm-build 4f3c61
	else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
rpm-build 4f3c61
		GError *error = NULL;
rpm-build 4f3c61
		g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
rpm-build 4f3c61
				     _("Network stream unexpectedly closed"));
rpm-build 4f3c61
		notify_and_clear (istream, error);
rpm-build 4f3c61
	}
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
file_replaced_cb (GObject      *source,
rpm-build 4f3c61
		  GAsyncResult *res,
rpm-build 4f3c61
		  gpointer      user_data)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
	GError *error = NULL;
rpm-build 4f3c61
rpm-build 4f3c61
	priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
rpm-build 4f3c61
rpm-build 4f3c61
	if (error)
rpm-build 4f3c61
		notify_and_clear (istream, error);
rpm-build 4f3c61
	else
rpm-build 4f3c61
		try_write_next_buffer (istream);
rpm-build 4f3c61
rpm-build 4f3c61
	g_object_unref (istream);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_init (SoupCacheInputStream *self)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = soup_cache_input_stream_get_instance_private (self);
rpm-build 4f3c61
rpm-build 4f3c61
	priv->buffer_queue = g_queue_new ();
rpm-build 4f3c61
	self->priv = priv;
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_get_property (GObject *object,
rpm-build 4f3c61
				      guint property_id, GValue *value, GParamSpec *pspec)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = self->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	switch (property_id) {
rpm-build 4f3c61
	case PROP_OUTPUT_STREAM:
rpm-build 4f3c61
		g_value_set_object (value, priv->output_stream);
rpm-build 4f3c61
		break;
rpm-build 4f3c61
	default:
rpm-build 4f3c61
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
rpm-build 4f3c61
		break;
rpm-build 4f3c61
	}
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_set_property (GObject *object,
rpm-build 4f3c61
				      guint property_id, const GValue *value, GParamSpec *pspec)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = self->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	switch (property_id) {
rpm-build 4f3c61
	case PROP_OUTPUT_STREAM:
rpm-build 4f3c61
		priv->output_stream = g_value_dup_object (value);
rpm-build 4f3c61
		break;
rpm-build 4f3c61
	default:
rpm-build 4f3c61
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
rpm-build 4f3c61
		break;
rpm-build 4f3c61
	}
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_finalize (GObject *object)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *self = (SoupCacheInputStream *)object;
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = self->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	g_clear_object (&priv->cancellable);
rpm-build 4f3c61
	g_clear_object (&priv->output_stream);
rpm-build 4f3c61
	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
rpm-build 4f3c61
	g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
rpm-build 4f3c61
rpm-build 4f3c61
	G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
rpm-build 4f3c61
{
rpm-build 4f3c61
	GOutputStream *ostream = G_OUTPUT_STREAM (source);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
	gssize write_size;
rpm-build 4f3c61
	gsize pending;
rpm-build 4f3c61
	GError *error = NULL;
rpm-build 4f3c61
rpm-build 4f3c61
	write_size = g_output_stream_write_finish (ostream, result, &error);
rpm-build 4f3c61
	if (error) {
rpm-build 4f3c61
		notify_and_clear (istream, error);
rpm-build 4f3c61
		g_object_unref (istream);
rpm-build 4f3c61
		return;
rpm-build 4f3c61
	}
rpm-build 4f3c61
rpm-build 4f3c61
	/* Check that we have written everything */
rpm-build 4f3c61
	pending = priv->current_writing_buffer->length - write_size;
rpm-build 4f3c61
	if (pending) {
rpm-build 4f3c61
		SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
rpm-build 4f3c61
								   write_size, pending);
rpm-build 4f3c61
		g_queue_push_head (priv->buffer_queue, subbuffer);
rpm-build 4f3c61
	}
rpm-build 4f3c61
rpm-build 4f3c61
	priv->bytes_written += write_size;
rpm-build 4f3c61
	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
rpm-build 4f3c61
rpm-build 4f3c61
	try_write_next_buffer (istream);
rpm-build 4f3c61
	g_object_unref (istream);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
	SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
rpm-build 4f3c61
	int priority;
rpm-build 4f3c61
rpm-build 4f3c61
	g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
rpm-build 4f3c61
rpm-build 4f3c61
	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
rpm-build 4f3c61
	priv->current_writing_buffer = buffer;
rpm-build 4f3c61
rpm-build 4f3c61
	if (priv->buffer_queue->length > 10)
rpm-build 4f3c61
		priority = G_PRIORITY_DEFAULT;
rpm-build 4f3c61
	else
rpm-build 4f3c61
		priority = G_PRIORITY_LOW;
rpm-build 4f3c61
rpm-build 4f3c61
	g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
rpm-build 4f3c61
				     priority, priv->cancellable,
rpm-build 4f3c61
				     (GAsyncReadyCallback) write_ready_cb,
rpm-build 4f3c61
				     g_object_ref (istream));
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static gssize
rpm-build 4f3c61
read_internal (GInputStream  *stream,
rpm-build 4f3c61
	       void          *buffer,
rpm-build 4f3c61
	       gsize          count,
rpm-build 4f3c61
	       gboolean       blocking,
rpm-build 4f3c61
	       GCancellable  *cancellable,
rpm-build 4f3c61
	       GError       **error)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
	GInputStream *base_stream;
rpm-build 4f3c61
	gssize nread;
rpm-build 4f3c61
rpm-build 4f3c61
	base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
rpm-build 4f3c61
	nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
rpm-build 4f3c61
					cancellable, error);
rpm-build 4f3c61
rpm-build 4f3c61
	if (G_UNLIKELY (nread == -1 || priv->read_finished))
rpm-build 4f3c61
		return nread;
rpm-build 4f3c61
rpm-build 4f3c61
	if (nread == 0) {
rpm-build 4f3c61
		priv->read_finished = TRUE;
rpm-build 4f3c61
rpm-build 4f3c61
		if (priv->current_writing_buffer == NULL && priv->output_stream)
rpm-build 4f3c61
			notify_and_clear (istream, NULL);
rpm-build 4f3c61
	} else {
rpm-build 4f3c61
		SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
rpm-build 4f3c61
		g_queue_push_tail (priv->buffer_queue, soup_buffer);
rpm-build 4f3c61
rpm-build 4f3c61
		if (priv->current_writing_buffer == NULL && priv->output_stream)
rpm-build 4f3c61
			soup_cache_input_stream_write_next_buffer (istream);
rpm-build 4f3c61
	}
rpm-build 4f3c61
rpm-build 4f3c61
	return nread;
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static gssize
rpm-build 4f3c61
soup_cache_input_stream_read_fn (GInputStream  *stream,
rpm-build 4f3c61
				 void          *buffer,
rpm-build 4f3c61
				 gsize          count,
rpm-build 4f3c61
				 GCancellable  *cancellable,
rpm-build 4f3c61
				 GError       **error)
rpm-build 4f3c61
{
rpm-build 4f3c61
	return read_internal (stream, buffer, count, TRUE,
rpm-build 4f3c61
			      cancellable, error);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static gssize
rpm-build 4f3c61
soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
rpm-build 4f3c61
					  void                  *buffer,
rpm-build 4f3c61
					  gsize                  count,
rpm-build 4f3c61
					  GError               **error)
rpm-build 4f3c61
{
rpm-build 4f3c61
	return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
rpm-build 4f3c61
			      NULL, error);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
rpm-build 4f3c61
				       gpointer interface_data)
rpm-build 4f3c61
{
rpm-build 4f3c61
	pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static gboolean
rpm-build 4f3c61
soup_cache_input_stream_close_fn (GInputStream  *stream,
rpm-build 4f3c61
				  GCancellable  *cancellable,
rpm-build 4f3c61
				  GError       **error)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
rpm-build 4f3c61
	SoupCacheInputStreamPrivate *priv = istream->priv;
rpm-build 4f3c61
rpm-build 4f3c61
	if (!priv->read_finished) {
rpm-build 4f3c61
		if (priv->output_stream) {
rpm-build 4f3c61
			/* Cancel any pending write operation or return an error if none. */
rpm-build 4f3c61
			if (g_output_stream_has_pending (priv->output_stream))
rpm-build 4f3c61
				g_cancellable_cancel (priv->cancellable);
rpm-build 4f3c61
			else {
rpm-build 4f3c61
				GError *notify_error = NULL;
rpm-build 4f3c61
				g_set_error_literal (&notify_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
rpm-build 4f3c61
						     _("Failed to completely cache the resource"));
rpm-build 4f3c61
				notify_and_clear (istream, notify_error);
rpm-build 4f3c61
			}
rpm-build 4f3c61
		} else if (priv->cancellable)
rpm-build 4f3c61
			/* The file_replace_async() hasn't finished yet */
rpm-build 4f3c61
			g_cancellable_cancel (priv->cancellable);
rpm-build 4f3c61
	}
rpm-build 4f3c61
rpm-build 4f3c61
	return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
static void
rpm-build 4f3c61
soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
rpm-build 4f3c61
{
rpm-build 4f3c61
	GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
rpm-build 4f3c61
	GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
rpm-build 4f3c61
rpm-build 4f3c61
	gobject_class->get_property = soup_cache_input_stream_get_property;
rpm-build 4f3c61
	gobject_class->set_property = soup_cache_input_stream_set_property;
rpm-build 4f3c61
	gobject_class->finalize = soup_cache_input_stream_finalize;
rpm-build 4f3c61
rpm-build 4f3c61
	istream_class->read_fn = soup_cache_input_stream_read_fn;
rpm-build 4f3c61
	istream_class->close_fn = soup_cache_input_stream_close_fn;
rpm-build 4f3c61
rpm-build 4f3c61
	g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
rpm-build 4f3c61
					 g_param_spec_object ("output-stream", "Output stream",
rpm-build 4f3c61
							      "the output stream where to write.",
rpm-build 4f3c61
							      G_TYPE_OUTPUT_STREAM,
rpm-build 4f3c61
							      G_PARAM_READWRITE |
rpm-build 4f3c61
							      G_PARAM_CONSTRUCT_ONLY |
rpm-build 4f3c61
							      G_PARAM_STATIC_STRINGS));
rpm-build 4f3c61
rpm-build 4f3c61
	signals[CACHING_FINISHED] =
rpm-build 4f3c61
		g_signal_new ("caching-finished",
rpm-build 4f3c61
			      G_OBJECT_CLASS_TYPE (gobject_class),
rpm-build 4f3c61
			      G_SIGNAL_RUN_FIRST,
rpm-build 4f3c61
			      G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
rpm-build 4f3c61
			      NULL, NULL,
rpm-build 4f3c61
			      NULL,
rpm-build 4f3c61
			      G_TYPE_NONE, 2,
rpm-build 4f3c61
			      G_TYPE_INT, G_TYPE_ERROR);
rpm-build 4f3c61
}
rpm-build 4f3c61
rpm-build 4f3c61
GInputStream *
rpm-build 4f3c61
soup_cache_input_stream_new (GInputStream *base_stream,
rpm-build 4f3c61
			     GFile        *file)
rpm-build 4f3c61
{
rpm-build 4f3c61
	SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
rpm-build 4f3c61
					      "base-stream", base_stream,
rpm-build 4f3c61
					      "close-base-stream", FALSE,
rpm-build 4f3c61
					      NULL);
rpm-build 4f3c61
rpm-build 4f3c61
	istream->priv->cancellable = g_cancellable_new ();
rpm-build 4f3c61
	g_file_replace_async (file, NULL, FALSE,
rpm-build 4f3c61
			      G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
rpm-build 4f3c61
			      G_PRIORITY_DEFAULT, istream->priv->cancellable,
rpm-build 4f3c61
			      file_replaced_cb, g_object_ref (istream));
rpm-build 4f3c61
rpm-build 4f3c61
	return (GInputStream *) istream;
rpm-build 4f3c61
}