/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ /* * Copyright (C) 2012 Igalia, S.L. */ #ifdef HAVE_CONFIG_H #include #endif #include #include "soup-cache-input-stream.h" #include "soup-message-body.h" /* properties */ enum { PROP_0, PROP_OUTPUT_STREAM, LAST_PROP }; enum { CACHING_FINISHED, LAST_SIGNAL }; static guint signals[LAST_SIGNAL] = { 0 }; struct _SoupCacheInputStreamPrivate { GOutputStream *output_stream; GCancellable *cancellable; gsize bytes_written; gboolean read_finished; SoupBuffer *current_writing_buffer; GQueue *buffer_queue; }; static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM, G_ADD_PRIVATE (SoupCacheInputStream) G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, soup_cache_input_stream_pollable_init)) static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream); static inline void notify_and_clear (SoupCacheInputStream *istream, GError *error) { SoupCacheInputStreamPrivate *priv = istream->priv; g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error); g_clear_object (&priv->cancellable); g_clear_object (&priv->output_stream); g_clear_error (&error); } static inline void try_write_next_buffer (SoupCacheInputStream *istream) { SoupCacheInputStreamPrivate *priv = istream->priv; if (priv->current_writing_buffer == NULL && priv->buffer_queue->length) soup_cache_input_stream_write_next_buffer (istream); else if (priv->read_finished) notify_and_clear (istream, NULL); else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) { GError *error = NULL; g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED, _("Network stream unexpectedly closed")); notify_and_clear (istream, error); } } static void file_replaced_cb (GObject *source, GAsyncResult *res, gpointer user_data) { SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data); SoupCacheInputStreamPrivate *priv = istream->priv; GError *error = NULL; priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error); if (error) notify_and_clear (istream, error); else try_write_next_buffer (istream); g_object_unref (istream); } static void soup_cache_input_stream_init (SoupCacheInputStream *self) { SoupCacheInputStreamPrivate *priv = soup_cache_input_stream_get_instance_private (self); priv->buffer_queue = g_queue_new (); self->priv = priv; } static void soup_cache_input_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object); SoupCacheInputStreamPrivate *priv = self->priv; switch (property_id) { case PROP_OUTPUT_STREAM: g_value_set_object (value, priv->output_stream); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void soup_cache_input_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec) { SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object); SoupCacheInputStreamPrivate *priv = self->priv; switch (property_id) { case PROP_OUTPUT_STREAM: priv->output_stream = g_value_dup_object (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void soup_cache_input_stream_finalize (GObject *object) { SoupCacheInputStream *self = (SoupCacheInputStream *)object; SoupCacheInputStreamPrivate *priv = self->priv; g_clear_object (&priv->cancellable); g_clear_object (&priv->output_stream); g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free); G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object); } static void write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream) { GOutputStream *ostream = G_OUTPUT_STREAM (source); SoupCacheInputStreamPrivate *priv = istream->priv; gssize write_size; gsize pending; GError *error = NULL; write_size = g_output_stream_write_finish (ostream, result, &error); if (error) { notify_and_clear (istream, error); g_object_unref (istream); return; } /* Check that we have written everything */ pending = priv->current_writing_buffer->length - write_size; if (pending) { SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer, write_size, pending); g_queue_push_head (priv->buffer_queue, subbuffer); } priv->bytes_written += write_size; g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); try_write_next_buffer (istream); g_object_unref (istream); } static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream) { SoupCacheInputStreamPrivate *priv = istream->priv; SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue); int priority; g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream)); g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free); priv->current_writing_buffer = buffer; if (priv->buffer_queue->length > 10) priority = G_PRIORITY_DEFAULT; else priority = G_PRIORITY_LOW; g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length, priority, priv->cancellable, (GAsyncReadyCallback) write_ready_cb, g_object_ref (istream)); } static gssize read_internal (GInputStream *stream, void *buffer, gsize count, gboolean blocking, GCancellable *cancellable, GError **error) { SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream); SoupCacheInputStreamPrivate *priv = istream->priv; GInputStream *base_stream; gssize nread; base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream)); nread = g_pollable_stream_read (base_stream, buffer, count, blocking, cancellable, error); if (G_UNLIKELY (nread == -1 || priv->read_finished)) return nread; if (nread == 0) { priv->read_finished = TRUE; if (priv->current_writing_buffer == NULL && priv->output_stream) notify_and_clear (istream, NULL); } else { SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread); g_queue_push_tail (priv->buffer_queue, soup_buffer); if (priv->current_writing_buffer == NULL && priv->output_stream) soup_cache_input_stream_write_next_buffer (istream); } return nread; } static gssize soup_cache_input_stream_read_fn (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { return read_internal (stream, buffer, count, TRUE, cancellable, error); } static gssize soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer, gsize count, GError **error) { return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE, NULL, error); } static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data) { pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking; } static gboolean soup_cache_input_stream_close_fn (GInputStream *stream, GCancellable *cancellable, GError **error) { SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream); SoupCacheInputStreamPrivate *priv = istream->priv; if (!priv->read_finished) { if (priv->output_stream) { /* Cancel any pending write operation or return an error if none. */ if (g_output_stream_has_pending (priv->output_stream)) g_cancellable_cancel (priv->cancellable); else { GError *notify_error = NULL; g_set_error_literal (¬ify_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, _("Failed to completely cache the resource")); notify_and_clear (istream, notify_error); } } else if (priv->cancellable) /* The file_replace_async() hasn't finished yet */ g_cancellable_cancel (priv->cancellable); } return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error); } static void soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass); gobject_class->get_property = soup_cache_input_stream_get_property; gobject_class->set_property = soup_cache_input_stream_set_property; gobject_class->finalize = soup_cache_input_stream_finalize; istream_class->read_fn = soup_cache_input_stream_read_fn; istream_class->close_fn = soup_cache_input_stream_close_fn; g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM, g_param_spec_object ("output-stream", "Output stream", "the output stream where to write.", G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); signals[CACHING_FINISHED] = g_signal_new ("caching-finished", G_OBJECT_CLASS_TYPE (gobject_class), G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished), NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_ERROR); } GInputStream * soup_cache_input_stream_new (GInputStream *base_stream, GFile *file) { SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM, "base-stream", base_stream, "close-base-stream", FALSE, NULL); istream->priv->cancellable = g_cancellable_new (); g_file_replace_async (file, NULL, FALSE, G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION, G_PRIORITY_DEFAULT, istream->priv->cancellable, file_replaced_cb, g_object_ref (istream)); return (GInputStream *) istream; }