/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
/*
* GData Client
* Copyright (C) Philip Withnall 2009 <philip@tecnocode.co.uk>
*
* GData Client is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* GData Client is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with GData Client. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* SECTION:gdata-download-stream
* @short_description: GData download stream object
* @stability: Stable
* @include: gdata/gdata-download-stream.h
*
* #GDataDownloadStream is a #GInputStream subclass to allow downloading of files from GData services with authorization from a #GDataService under
* the given #GDataAuthorizationDomain. If authorization is not required to perform the download, a #GDataAuthorizationDomain doesn't have to be
* specified.
*
* Once a #GDataDownloadStream is instantiated with gdata_download_stream_new(), the standard #GInputStream API can be used on the stream to download
* the file. Network communication may not actually begin until the first call to g_input_stream_read(), so having a #GDataDownloadStream around is no
* guarantee that the file is being downloaded.
*
* The content type and length of the file being downloaded are made available through #GDataDownloadStream:content-type and
* #GDataDownloadStream:content-length as soon as the appropriate data is received from the server. Connect to the
* #GObject::notify <code type="literal">content-type</code> and <code type="literal">content-length</code> details to be notified as
* soon as the data is available.
*
* The entire download operation can be cancelled using the #GCancellable instance provided to gdata_download_stream_new(), or returned by
* gdata_download_stream_get_cancellable(). Cancelling this at any time will cause all future #GInputStream method calls to return
* %G_IO_ERROR_CANCELLED. If any #GInputStream methods are in the process of being called, they will be cancelled and return %G_IO_ERROR_CANCELLED as
* soon as possible.
*
* Note that cancelling an individual method call (such as a call to g_input_stream_read()) using the #GCancellable parameter of the method will not
* cancel the download as a whole — just that particular method call. In the case of g_input_stream_read(), this will cause it to successfully return
* any data that it has in memory at the moment (up to the requested number of bytes), or return a %G_IO_ERROR_CANCELLED if it was blocking on receiving
* data from the network. This is also the behaviour of g_input_stream_read() when the download operation as a whole is cancelled.
*
* In the case of g_input_stream_close(), the call will return immediately if network activity hasn't yet started. If it has, the network activity will
* be cancelled, regardless of whether the call to g_input_stream_close() is cancelled. Cancelling a pending call to g_input_stream_close() (either
* using the method's #GCancellable, or by cancelling the download stream as a whole) will cause it to stop waiting for the network activity to finish,
* and return %G_IO_ERROR_CANCELLED immediately. Network activity will continue to be shut down in the background.
*
* If the server returns an error message (for example, if the user is not correctly authenticated/authorized or doesn't have suitable permissions to
* download from the given URI), it will be returned as a #GDataServiceError by the first call to g_input_stream_read().
*
* <example>
* <title>Downloading to a File</title>
* <programlisting>
* GDataService *service;
* GDataAuthorizationDomain *domain;
* GCancellable *cancellable;
* GInputStream *download_stream;
* GOutputStream *output_stream;
*
* /<!-- -->* Create the download stream *<!-- -->/
* service = create_my_service ();
* domain = get_my_authorization_domain_from_service (service);
* cancellable = g_cancellable_new (); /<!-- -->* cancel this to cancel the entire download operation *<!-- -->/
* download_stream = gdata_download_stream_new (service, domain, download_uri, cancellable);
* output_stream = create_file_and_return_output_stream ();
*
* /<!-- -->* Perform the download asynchronously *<!-- -->/
* g_output_stream_splice_async (output_stream, download_stream, G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
* G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) download_splice_cb, NULL);
*
* g_object_unref (output_stream);
* g_object_unref (download_stream);
* g_object_unref (cancellable);
* g_object_unref (domain);
* g_object_unref (service);
*
* static void
* download_splice_cb (GOutputStream *output_stream, GAsyncResult *result, gpointer user_data)
* {
* GError *error = NULL;
*
* g_output_stream_splice_finish (output_stream, result, &error);
*
* if (error != NULL && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) == FALSE)) {
* /<!-- -->* Error downloading the file; potentially an I/O error (GIOError), or an error response from the server
* * (GDataServiceError). You might want to delete the newly created file because of the error. *<!-- -->/
* g_error ("Error downloading file: %s", error->message);
* g_error_free (error);
* }
* }
* </programlisting>
* </example>
*
* Since: 0.5.0
*/
#include <config.h>
#include <glib.h>
#include <glib/gi18n-lib.h>
#include "gdata-download-stream.h"
#include "gdata-buffer.h"
#include "gdata-private.h"
static void gdata_download_stream_seekable_iface_init (GSeekableIface *seekable_iface);
static GObject *gdata_download_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params);
static void gdata_download_stream_dispose (GObject *object);
static void gdata_download_stream_finalize (GObject *object);
static void gdata_download_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec);
static void gdata_download_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec);
static gssize gdata_download_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error);
static gboolean gdata_download_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error);
static goffset gdata_download_stream_tell (GSeekable *seekable);
static gboolean gdata_download_stream_can_seek (GSeekable *seekable);
static gboolean gdata_download_stream_seek (GSeekable *seekable, goffset offset, GSeekType type, GCancellable *cancellable, GError **error);
static gboolean gdata_download_stream_can_truncate (GSeekable *seekable);
static gboolean gdata_download_stream_truncate (GSeekable *seekable, goffset offset, GCancellable *cancellable, GError **error);
static void create_network_thread (GDataDownloadStream *self, GError **error);
static void reset_network_thread (GDataDownloadStream *self);
/*
* The GDataDownloadStream can be in one of several states:
* 1. Pre-network activity. This is the state that the stream is created in. @network_thread and @cancellable are both %NULL, and @finished is %FALSE.
* The stream will remain in this state until gdata_download_stream_read() or gdata_download_stream_seek() are called for the first time.
* @content_type and @content_length are at their default values (NULL and -1, respectively).
* 2. Network activity. This state is entered when gdata_download_stream_read() is called for the first time.
* @network_thread, @buffer and @cancellable are created, while @finished remains %FALSE.
* As soon as the headers are downloaded, which is guaranteed to be before the first call to gdata_download_stream_read() returns, @content_type
* and @content_length are set from the headers. From this point onwards, they are immutable.
* 3. Reset network activity. This state is entered only if case 3 is encountered in a call to gdata_download_stream_seek(): a seek to an offset which
* has already been read out of the buffer. In this state, @buffer is freed and set to %NULL, @network_thread is cancelled (then set to %NULL),
* and @offset is set to the seeked-to offset. @finished remains at %FALSE.
* When the next call to gdata_download_stream_read() is made, the download stream will go back to state 2 as if this was the first call to
* gdata_download_stream_read().
* 4. Post-network activity. This state is reached once the download thread finishes downloading, due to having downloaded everything.
* @buffer is non-%NULL, @network_thread is non-%NULL, but meaningless; @cancellable is still a valid #GCancellable instance; and @finished is set
* to %TRUE. At the same time, @finished_cond is signalled.
* This state can be exited either by making a call to gdata_download_stream_seek(), in which case the stream will go back to state 3; or by
* calling gdata_download_stream_close(), in which case the stream will return errors for all operations, as the underlying %GInputStream will be
* marked as closed.
*/
struct _GDataDownloadStreamPrivate {
gchar *download_uri;
GDataService *service;
GDataAuthorizationDomain *authorization_domain;
SoupSession *session;
SoupMessage *message;
GDataBuffer *buffer;
goffset offset; /* current position in the stream */
GThread *network_thread;
GCancellable *cancellable;
GCancellable *network_cancellable; /* see the comment in gdata_download_stream_constructor() about the relationship between these two */
gulong network_cancellable_id;
gboolean finished;
GCond finished_cond;
GMutex finished_mutex; /* mutex for ->finished, protected by ->finished_cond */
/* Cached data from the SoupMessage */
gchar *content_type;
gssize content_length;
GMutex content_mutex; /* mutex to protect them */
};
enum {
PROP_SERVICE = 1,
PROP_DOWNLOAD_URI,
PROP_CONTENT_TYPE,
PROP_CONTENT_LENGTH,
PROP_CANCELLABLE,
PROP_AUTHORIZATION_DOMAIN,
};
G_DEFINE_TYPE_WITH_CODE (GDataDownloadStream, gdata_download_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, gdata_download_stream_seekable_iface_init))
static void
gdata_download_stream_class_init (GDataDownloadStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
g_type_class_add_private (klass, sizeof (GDataDownloadStreamPrivate));
gobject_class->constructor = gdata_download_stream_constructor;
gobject_class->dispose = gdata_download_stream_dispose;
gobject_class->finalize = gdata_download_stream_finalize;
gobject_class->get_property = gdata_download_stream_get_property;
gobject_class->set_property = gdata_download_stream_set_property;
/* We use the default implementations of the async functions, which just run
* our implementation of the sync function in a thread. */
stream_class->read_fn = gdata_download_stream_read;
stream_class->close_fn = gdata_download_stream_close;
/**
* GDataDownloadStream:service:
*
* The service which is used to authorize the download, and to which the download relates.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_SERVICE,
g_param_spec_object ("service",
"Service", "The service which is used to authorize the download.",
GDATA_TYPE_SERVICE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataDownloadStream:authorization-domain:
*
* The authorization domain for the download, against which the #GDataService:authorizer for the #GDataDownloadStream:service should be
* authorized. This may be %NULL if authorization is not needed for the download.
*
* Since: 0.9.0
*/
g_object_class_install_property (gobject_class, PROP_AUTHORIZATION_DOMAIN,
g_param_spec_object ("authorization-domain",
"Authorization domain", "The authorization domain for the download.",
GDATA_TYPE_AUTHORIZATION_DOMAIN,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataDownloadStream:download-uri:
*
* The URI of the file to download. This must be HTTPS.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_DOWNLOAD_URI,
g_param_spec_string ("download-uri",
"Download URI", "The URI of the file to download.",
NULL,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataDownloadStream:content-type:
*
* The content type of the file being downloaded. This will initially be %NULL, and will be populated as soon as the appropriate header is
* received from the server. Its value will never change after this.
*
* Note that change notifications for this property (#GObject::notify emissions) may be emitted in threads other than the one which created
* the #GDataDownloadStream. It is the client's responsibility to ensure that any notification signal handlers are either multi-thread safe
* or marshal the notification to the thread which owns the #GDataDownloadStream as appropriate.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
g_param_spec_string ("content-type",
"Content type", "The content type of the file being downloaded.",
NULL,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GDataDownloadStream:content-length:
*
* The length (in bytes) of the file being downloaded. This will initially be <code class="literal">-1</code>, and will be populated as soon as
* the appropriate header is received from the server. Its value will never change after this.
*
* Note that change notifications for this property (#GObject::notify emissions) may be emitted in threads other than the one which created
* the #GDataDownloadStream. It is the client's responsibility to ensure that any notification signal handlers are either multi-thread safe
* or marshal the notification to the thread which owns the #GDataDownloadStream as appropriate.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_CONTENT_LENGTH,
g_param_spec_long ("content-length",
"Content length", "The length (in bytes) of the file being downloaded.",
-1, G_MAXSSIZE, -1,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GDataDownloadStream:cancellable:
*
* An optional cancellable used to cancel the entire download operation. If a #GCancellable instance isn't provided for this property at
* construction time (i.e. to gdata_download_stream_new()), one will be created internally and can be retrieved using
* gdata_download_stream_get_cancellable() and used to cancel the download operation with g_cancellable_cancel() just as if it was passed to
* gdata_download_stream_new().
*
* If the download operation is cancelled using this #GCancellable, any ongoing network activity will be stopped, and any pending or future
* calls to #GInputStream API on the #GDataDownloadStream will return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be
* passed to individual #GInputStream operations will not cancel the download operation proper if cancelled — they will merely cancel that API
* call. The only way to cancel the download operation completely is using #GDataDownloadStream:cancellable.
*
* Since: 0.8.0
*/
g_object_class_install_property (gobject_class, PROP_CANCELLABLE,
g_param_spec_object ("cancellable",
"Cancellable", "An optional cancellable used to cancel the entire download operation.",
G_TYPE_CANCELLABLE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gdata_download_stream_seekable_iface_init (GSeekableIface *seekable_iface)
{
seekable_iface->tell = gdata_download_stream_tell;
seekable_iface->can_seek = gdata_download_stream_can_seek;
seekable_iface->seek = gdata_download_stream_seek;
seekable_iface->can_truncate = gdata_download_stream_can_truncate;
seekable_iface->truncate_fn = gdata_download_stream_truncate;
}
static void
gdata_download_stream_init (GDataDownloadStream *self)
{
self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, GDATA_TYPE_DOWNLOAD_STREAM, GDataDownloadStreamPrivate);
self->priv->buffer = NULL; /* created when the network thread is started and destroyed when the stream is closed */
self->priv->finished = FALSE;
g_cond_init (&(self->priv->finished_cond));
g_mutex_init (&(self->priv->finished_mutex));
self->priv->content_type = NULL;
self->priv->content_length = -1;
g_mutex_init (&(self->priv->content_mutex));
}
static void
cancellable_cancel_cb (GCancellable *cancellable, GCancellable *network_cancellable)
{
g_cancellable_cancel (network_cancellable);
}
static GObject *
gdata_download_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params)
{
GDataDownloadStreamPrivate *priv;
GDataServiceClass *klass;
GObject *object;
SoupURI *_uri;
/* Chain up to the parent class */
object = G_OBJECT_CLASS (gdata_download_stream_parent_class)->constructor (type, n_construct_params, construct_params);
priv = GDATA_DOWNLOAD_STREAM (object)->priv;
/* Create a #GCancellable for the network. Cancellation of ->cancellable is chained to this one, so that if ->cancellable is cancelled,
* ->network_cancellable is also cancelled. However, if ->network_cancellable is cancelled, the cancellation doesn't propagate back upwards
* to ->cancellable. This allows closing the stream part-way through a download to be implemented by cancelling ->network_cancellable, without
* causing ->cancellable to be unnecessarily cancelled (which would be a nasty side-effect of closing the stream early otherwise). */
priv->network_cancellable = g_cancellable_new ();
/* Create a #GCancellable for the entire download operation if one wasn't specified for #GDataDownloadStream:cancellable during construction */
if (priv->cancellable == NULL)
priv->cancellable = g_cancellable_new ();
priv->network_cancellable_id = g_cancellable_connect (priv->cancellable, (GCallback) cancellable_cancel_cb, priv->network_cancellable, NULL);
/* Build the message. The URI must be HTTPS. */
_uri = soup_uri_new (priv->download_uri);
soup_uri_set_port (_uri, _gdata_service_get_https_port ());
g_assert_cmpstr (soup_uri_get_scheme (_uri), ==, SOUP_URI_SCHEME_HTTPS);
priv->message = soup_message_new_from_uri (SOUP_METHOD_GET, _uri);
soup_uri_free (_uri);
/* Make sure the headers are set */
klass = GDATA_SERVICE_GET_CLASS (priv->service);
if (klass->append_query_headers != NULL) {
klass->append_query_headers (priv->service, priv->authorization_domain, priv->message);
}
/* We don't want to accumulate chunks */
soup_message_body_set_accumulate (priv->message->request_body, FALSE);
/* Downloading doesn't actually start until the first call to read() */
return object;
}
static void
gdata_download_stream_dispose (GObject *object)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv;
/* Block on closing the stream */
g_input_stream_close (G_INPUT_STREAM (object), NULL, NULL);
if (priv->cancellable != NULL) {
if (priv->network_cancellable_id != 0) {
g_cancellable_disconnect (priv->cancellable, priv->network_cancellable_id);
}
g_object_unref (priv->cancellable);
}
priv->network_cancellable_id = 0;
priv->cancellable = NULL;
if (priv->network_cancellable != NULL)
g_object_unref (priv->network_cancellable);
priv->network_cancellable = NULL;
if (priv->authorization_domain != NULL)
g_object_unref (priv->authorization_domain);
priv->authorization_domain = NULL;
if (priv->service != NULL)
g_object_unref (priv->service);
priv->service = NULL;
if (priv->message != NULL)
g_object_unref (priv->message);
priv->message = NULL;
/* Chain up to the parent class */
G_OBJECT_CLASS (gdata_download_stream_parent_class)->dispose (object);
}
static void
gdata_download_stream_finalize (GObject *object)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv;
reset_network_thread (GDATA_DOWNLOAD_STREAM (object));
g_cond_clear (&(priv->finished_cond));
g_mutex_clear (&(priv->finished_mutex));
g_mutex_clear (&(priv->content_mutex));
g_free (priv->download_uri);
g_free (priv->content_type);
/* Chain up to the parent class */
G_OBJECT_CLASS (gdata_download_stream_parent_class)->finalize (object);
}
static void
gdata_download_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv;
switch (property_id) {
case PROP_SERVICE:
g_value_set_object (value, priv->service);
break;
case PROP_AUTHORIZATION_DOMAIN:
g_value_set_object (value, priv->authorization_domain);
break;
case PROP_DOWNLOAD_URI:
g_value_set_string (value, priv->download_uri);
break;
case PROP_CONTENT_TYPE:
g_mutex_lock (&(priv->content_mutex));
g_value_set_string (value, priv->content_type);
g_mutex_unlock (&(priv->content_mutex));
break;
case PROP_CONTENT_LENGTH:
g_mutex_lock (&(priv->content_mutex));
g_value_set_long (value, priv->content_length);
g_mutex_unlock (&(priv->content_mutex));
break;
case PROP_CANCELLABLE:
g_value_set_object (value, priv->cancellable);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
gdata_download_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv;
switch (property_id) {
case PROP_SERVICE:
priv->service = g_value_dup_object (value);
priv->session = _gdata_service_get_session (priv->service);
break;
case PROP_AUTHORIZATION_DOMAIN:
priv->authorization_domain = g_value_dup_object (value);
break;
case PROP_DOWNLOAD_URI:
priv->download_uri = g_value_dup_string (value);
break;
case PROP_CANCELLABLE:
/* Construction only */
priv->cancellable = g_value_dup_object (value);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
read_cancelled_cb (GCancellable *cancellable, GCancellable *child_cancellable)
{
g_cancellable_cancel (child_cancellable);
}
static gssize
gdata_download_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (stream)->priv;
gssize length_read = -1;
gboolean reached_eof = FALSE;
gulong cancelled_signal = 0, global_cancelled_signal = 0;
GCancellable *child_cancellable;
GError *child_error = NULL;
/* Listen for cancellation from either @cancellable or @priv->cancellable. We have to multiplex cancellation signals from the two sources
* into a single #GCancellabel, @child_cancellable. */
child_cancellable = g_cancellable_new ();
global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) read_cancelled_cb, child_cancellable, NULL);
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) read_cancelled_cb, child_cancellable, NULL);
/* We're lazy about starting the network operation so we don't end up with a massive buffer */
if (priv->network_thread == NULL) {
/* Handle early cancellation so that we don't create the network thread unnecessarily */
if (g_cancellable_set_error_if_cancelled (child_cancellable, &child_error) == TRUE) {
length_read = -1;
goto done;
}
/* Create the network thread */
create_network_thread (GDATA_DOWNLOAD_STREAM (stream), &child_error);
if (priv->network_thread == NULL) {
length_read = -1;
goto done;
}
}
/* Read the data off the buffer. If the operation is cancelled, it'll probably still return a positive number of bytes read — if it does, we
* can return without error. Iff it returns a non-positive number of bytes should we return an error. */
g_assert (priv->buffer != NULL);
length_read = (gssize) gdata_buffer_pop_data (priv->buffer, buffer, count, &reached_eof, child_cancellable);
if (length_read < 1 && g_cancellable_set_error_if_cancelled (child_cancellable, &child_error) == TRUE) {
/* Handle cancellation */
length_read = -1;
goto done;
} else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code) == FALSE) {
GDataServiceClass *klass = GDATA_SERVICE_GET_CLASS (priv->service);
/* Set an appropriate error */
g_assert (klass->parse_error_response != NULL);
klass->parse_error_response (priv->service, GDATA_OPERATION_DOWNLOAD, priv->message->status_code, priv->message->reason_phrase,
NULL, 0, &child_error);
length_read = -1;
goto done;
}
done:
/* Disconnect from the cancelled signals. */
if (cancelled_signal != 0)
g_cancellable_disconnect (cancellable, cancelled_signal);
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
g_object_unref (child_cancellable);
g_assert ((reached_eof == FALSE && length_read > 0 && length_read <= (gssize) count && child_error == NULL) ||
(reached_eof == TRUE && length_read >= 0 && length_read <= (gssize) count && child_error == NULL) ||
(length_read == -1 && child_error != NULL));
if (child_error != NULL)
g_propagate_error (error, child_error);
/* Update our internal offset */
if (length_read > 0) {
priv->offset += length_read;
}
return length_read;
}
typedef struct {
GDataDownloadStream *download_stream;
gboolean *cancelled;
} CancelledData;
static void
close_cancelled_cb (GCancellable *cancellable, CancelledData *data)
{
GDataDownloadStreamPrivate *priv = data->download_stream->priv;
g_mutex_lock (&(priv->finished_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (&(priv->finished_cond));
g_mutex_unlock (&(priv->finished_mutex));
}
/* Even though calling g_input_stream_close() multiple times on this stream is guaranteed to call gdata_download_stream_close() at most once, other
* GIO methods (notably g_output_stream_splice()) can call gdata_download_stream_close() directly. Consequently, we need to be careful to be idempotent
* after the first call.
*
* If the network thread hasn't yet been started (i.e. gdata_download_stream_read() hasn't been called at all yet), %TRUE will be returned immediately.
*
* If the global cancellable, ->cancellable, or @cancellable are cancelled before the call to gdata_download_stream_close(),
* gdata_download_stream_close() should return immediately with %G_IO_ERROR_CANCELLED. If they're cancelled during the call,
* gdata_download_stream_close() should stop waiting for any outstanding network activity to finish and return %G_IO_ERROR_CANCELLED (though the
* operation to finish off network activity and close the stream will still continue).
*
* If the call to gdata_download_stream_close() is not cancelled by any #GCancellable, it will cancel the ongoing network activity, and wait until
* the operation has been cleaned up before returning success. */
static gboolean
gdata_download_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (stream)->priv;
gulong cancelled_signal = 0, global_cancelled_signal = 0;
gboolean cancelled = FALSE; /* must only be touched with ->finished_mutex held */
gboolean success = TRUE;
CancelledData data;
GError *child_error = NULL;
/* If the operation was never started, return successfully immediately */
if (priv->network_thread == NULL) {
goto done;
}
/* Allow cancellation */
data.download_stream = GDATA_DOWNLOAD_STREAM (stream);
data.cancelled = &cancelled;
global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) close_cancelled_cb, &data, NULL);
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) close_cancelled_cb, &data, NULL);
g_mutex_lock (&(priv->finished_mutex));
/* If the operation has started but hasn't already finished, cancel the network thread and wait for it to finish before returning */
if (priv->finished == FALSE) {
g_cancellable_cancel (priv->network_cancellable);
/* Allow the close() call to be cancelled by cancelling either @cancellable or ->cancellable. Note that this won't prevent the stream
* from continuing to be closed in the background — it'll just stop waiting on the operation to finish being cancelled. */
if (cancelled == FALSE) {
g_cond_wait (&(priv->finished_cond), &(priv->finished_mutex));
}
}
/* Error handling */
if (priv->finished == FALSE && cancelled == TRUE) {
/* Cancelled? If ->finished is TRUE, the network activity finished before the gdata_download_stream_close() operation was cancelled,
* so we don't need to return an error. */
g_assert (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, &child_error) == TRUE);
success = FALSE;
}
g_mutex_unlock (&(priv->finished_mutex));
/* Disconnect from the signal handlers. Note that we have to do this without @finished_mutex held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @finished_mutex. */
if (cancelled_signal != 0)
g_cancellable_disconnect (cancellable, cancelled_signal);
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
done:
/* If we were successful, tidy up various bits of state */
g_mutex_lock (&(priv->finished_mutex));
if (success == TRUE && priv->finished == TRUE) {
reset_network_thread (GDATA_DOWNLOAD_STREAM (stream));
}
g_mutex_unlock (&(priv->finished_mutex));
g_assert ((success == TRUE && child_error == NULL) || (success == FALSE && child_error != NULL));
if (child_error != NULL)
g_propagate_error (error, child_error);
return success;
}
static goffset
gdata_download_stream_tell (GSeekable *seekable)
{
return GDATA_DOWNLOAD_STREAM (seekable)->priv->offset;
}
static gboolean
gdata_download_stream_can_seek (GSeekable *seekable)
{
return TRUE;
}
static gboolean
gdata_download_stream_seek (GSeekable *seekable, goffset offset, GSeekType type, GCancellable *cancellable, GError **error)
{
GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (seekable)->priv;
GError *child_error = NULL;
if (type == G_SEEK_END && priv->content_length == -1) {
/* If we don't have the Content-Length, we can't calculate the offset from the start of the stream properly, so just give up.
* We could technically use a HEAD request to get the Content-Length, but this hasn't been tried yet (FIXME). */
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "G_SEEK_END not currently supported");
return FALSE;
}
if (g_input_stream_set_pending (G_INPUT_STREAM (seekable), error) == FALSE) {
return FALSE;
}
/* Ensure that offset is relative to the start of the stream. */
switch (type) {
case G_SEEK_CUR:
offset += priv->offset;
break;
case G_SEEK_SET:
/* Nothing needs doing */
break;
case G_SEEK_END:
offset += priv->content_length;
break;
default:
g_assert_not_reached ();
}
/* There are three cases to consider:
* 1. The network thread hasn't been started. In this case, we need to set the offset and do nothing. When the network thread is started
* (in the next read() call), a Range header will be set on it which will give the correct seek.
* 2. The network thread has been started and the seek is to a position greater than our current position (i.e. one which already does, or
* will soon, exist in the buffer). In this case, we need to pop the intervening bytes off the buffer (which may block) and update the
* offset.
* 3. The network thread has been started and the seek is to a position which has already been popped off the buffer. In this case, we need
* to set the offset and cancel the network thread. When the network thread is restarted (in the next read() call), a Range header will
* be set on it which will give the correct seek.
*/
if (priv->network_thread == NULL) {
/* Case 1. Set the offset and we're done. */
priv->offset = offset;
goto done;
}
/* Cases 2 and 3. The network thread has already been started. */
if (offset >= priv->offset) {
goffset num_intervening_bytes;
gssize length_read;
/* Case 2. Pop off the intervening bytes and update the offset. If we can't pop enough bytes off, we throw an error. */
num_intervening_bytes = offset - priv->offset;
g_assert (priv->buffer != NULL);
length_read = (gssize) gdata_buffer_pop_data (priv->buffer, NULL, num_intervening_bytes, NULL, cancellable);
if (length_read != num_intervening_bytes) {
if (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == FALSE) {
/* Tried to seek too far */
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, _("Invalid seek request"));
}
goto done;
}
/* Update the offset */
priv->offset = offset;
goto done;
} else {
/* Case 3. Cancel the current network thread. Note that we don't allow cancellation of this call, as we depend on it waiting for
* the network thread to join. */
if (gdata_download_stream_close (G_INPUT_STREAM (seekable), NULL, &child_error) == FALSE) {
goto done;
}
/* Update the offset */
priv->offset = offset;
/* Mark the thread as unfinished */
g_mutex_lock (&(priv->finished_mutex));
priv->finished = FALSE;
g_mutex_unlock (&(priv->finished_mutex));
goto done;
}
done:
g_input_stream_clear_pending (G_INPUT_STREAM (seekable));
if (child_error != NULL) {
g_propagate_error (error, child_error);
return FALSE;
}
return TRUE;
}
static gboolean
gdata_download_stream_can_truncate (GSeekable *seekable)
{
return FALSE;
}
static gboolean
gdata_download_stream_truncate (GSeekable *seekable, goffset offset, GCancellable *cancellable, GError **error)
{
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "Truncate not allowed on input stream");
return FALSE;
}
static void
got_headers_cb (SoupMessage *message, GDataDownloadStream *self)
{
goffset end;
goffset start;
goffset total_length;
/* Don't get the client's hopes up by setting the Content-Type or -Length if the response
* is actually unsuccessful. */
if (SOUP_STATUS_IS_SUCCESSFUL (message->status_code) == FALSE)
return;
g_mutex_lock (&(self->priv->content_mutex));
self->priv->content_type = g_strdup (soup_message_headers_get_content_type (message->response_headers, NULL));
self->priv->content_length = soup_message_headers_get_content_length (message->response_headers);
if (soup_message_headers_get_content_range (message->response_headers, &start, &end, &total_length)) {
self->priv->content_length = (gssize) total_length;
}
g_mutex_unlock (&(self->priv->content_mutex));
/* Emit the notifications for the Content-Length and -Type properties */
g_object_freeze_notify (G_OBJECT (self));
g_object_notify (G_OBJECT (self), "content-length");
g_object_notify (G_OBJECT (self), "content-type");
g_object_thaw_notify (G_OBJECT (self));
}
static void
got_chunk_cb (SoupMessage *message, SoupBuffer *buffer, GDataDownloadStream *self)
{
/* Ignore the chunk if the response is unsuccessful or it has zero length */
if (SOUP_STATUS_IS_SUCCESSFUL (message->status_code) == FALSE || buffer->length == 0)
return;
/* Push the data onto the buffer immediately */
g_assert (self->priv->buffer != NULL);
gdata_buffer_push_data (self->priv->buffer, (const guint8*) buffer->data, buffer->length);
}
static gpointer
download_thread (GDataDownloadStream *self)
{
GDataDownloadStreamPrivate *priv = self->priv;
g_object_ref (self);
g_assert (priv->network_cancellable != NULL);
/* Connect to the got-headers signal so we can notify clients of the values of content-type and content-length */
g_signal_connect (priv->message, "got-headers", (GCallback) got_headers_cb, self);
g_signal_connect (priv->message, "got-chunk", (GCallback) got_chunk_cb, self);
/* Set a Range header if our starting offset is non-zero */
if (priv->offset > 0) {
soup_message_headers_set_range (priv->message->request_headers, priv->offset, -1);
} else {
soup_message_headers_remove (priv->message->request_headers, "Range");
}
_gdata_service_actually_send_message (priv->session, priv->message, priv->network_cancellable, NULL);
/* Mark the buffer as having reached EOF */
g_assert (priv->buffer != NULL);
gdata_buffer_push_data (priv->buffer, NULL, 0);
/* Mark the download as finished */
g_mutex_lock (&(priv->finished_mutex));
priv->finished = TRUE;
g_cond_signal (&(priv->finished_cond));
g_mutex_unlock (&(priv->finished_mutex));
g_object_unref (self);
return NULL;
}
static void
create_network_thread (GDataDownloadStream *self, GError **error)
{
GDataDownloadStreamPrivate *priv = self->priv;
g_assert (priv->buffer == NULL);
priv->buffer = gdata_buffer_new ();
g_assert (priv->network_thread == NULL);
priv->network_thread = g_thread_try_new ("download-thread", (GThreadFunc) download_thread, self, error);
}
static void
reset_network_thread (GDataDownloadStream *self)
{
GDataDownloadStreamPrivate *priv = self->priv;
priv->network_thread = NULL;
if (priv->buffer != NULL) {
gdata_buffer_free (priv->buffer);
priv->buffer = NULL;
}
if (priv->message != NULL) {
soup_session_cancel_message (priv->session, priv->message, SOUP_STATUS_CANCELLED);
g_signal_handlers_disconnect_by_func (priv->message, got_headers_cb, self);
g_signal_handlers_disconnect_by_func (priv->message, got_chunk_cb, self);
}
priv->offset = 0;
if (priv->network_cancellable != NULL) {
g_cancellable_reset (priv->network_cancellable);
}
}
/**
* gdata_download_stream_new:
* @service: a #GDataService
* @domain: (allow-none): the #GDataAuthorizationDomain to authorize the download, or %NULL
* @download_uri: the URI to download; this must be HTTPS
* @cancellable: (allow-none): a #GCancellable for the entire download stream, or %NULL
*
* Creates a new #GDataDownloadStream, allowing a file to be downloaded from a GData service using standard #GInputStream API.
*
* As well as the standard GIO errors, calls to the #GInputStream API on a #GDataDownloadStream can also return any relevant specific error from
* #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case.
*
* If a #GCancellable is provided in @cancellable, the download operation may be cancelled at any time from another thread using g_cancellable_cancel().
* In this case, any ongoing network activity will be stopped, and any pending or future calls to #GInputStream API on the #GDataDownloadStream will
* return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GInputStream operations will not cancel the
* download operation proper if cancelled — they will merely cancel that API call. The only way to cancel the download operation completely is using
* this @cancellable.
*
* Return value: a new #GInputStream, or %NULL; unref with g_object_unref()
*
* Since: 0.9.0
*/
GInputStream *
gdata_download_stream_new (GDataService *service, GDataAuthorizationDomain *domain, const gchar *download_uri, GCancellable *cancellable)
{
g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL);
g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL);
g_return_val_if_fail (download_uri != NULL, NULL);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
return G_INPUT_STREAM (g_object_new (GDATA_TYPE_DOWNLOAD_STREAM,
"download-uri", download_uri,
"service", service,
"authorization-domain", domain,
"cancellable", cancellable,
NULL));
}
/**
* gdata_download_stream_get_service:
* @self: a #GDataDownloadStream
*
* Gets the service used to authorize the download, as passed to gdata_download_stream_new().
*
* Return value: (transfer none): the #GDataService used to authorize the download
*
* Since: 0.5.0
*/
GDataService *
gdata_download_stream_get_service (GDataDownloadStream *self)
{
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL);
return self->priv->service;
}
/**
* gdata_download_stream_get_authorization_domain:
* @self: a #GDataDownloadStream
*
* Gets the authorization domain used to authorize the download, as passed to gdata_download_stream_new(). It may be %NULL if authorization is not
* needed for the download.
*
* Return value: (transfer none) (allow-none): the #GDataAuthorizationDomain used to authorize the download, or %NULL
*
* Since: 0.9.0
*/
GDataAuthorizationDomain *
gdata_download_stream_get_authorization_domain (GDataDownloadStream *self)
{
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL);
return self->priv->authorization_domain;
}
/**
* gdata_download_stream_get_download_uri:
* @self: a #GDataDownloadStream
*
* Gets the URI of the file being downloaded, as passed to gdata_download_stream_new().
*
* Return value: the URI of the file being downloaded
*
* Since: 0.5.0
*/
const gchar *
gdata_download_stream_get_download_uri (GDataDownloadStream *self)
{
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL);
return self->priv->download_uri;
}
/**
* gdata_download_stream_get_content_type:
* @self: a #GDataDownloadStream
*
* Gets the content type of the file being downloaded. If the <literal>Content-Type</literal> header has not yet
* been received, %NULL will be returned.
*
* Return value: the content type of the file being downloaded, or %NULL
*
* Since: 0.5.0
*/
const gchar *
gdata_download_stream_get_content_type (GDataDownloadStream *self)
{
const gchar *content_type;
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL);
g_mutex_lock (&(self->priv->content_mutex));
content_type = self->priv->content_type;
g_mutex_unlock (&(self->priv->content_mutex));
/* It's safe to return this, even though we're not taking a copy of it, as it's immutable once set. */
return content_type;
}
/**
* gdata_download_stream_get_content_length:
* @self: a #GDataDownloadStream
*
* Gets the length (in bytes) of the file being downloaded. If the <literal>Content-Length</literal> header has not yet
* been received from the server, <code class="literal">-1</code> will be returned.
*
* Return value: the content length of the file being downloaded, or <code class="literal">-1</code>
*
* Since: 0.5.0
*/
gssize
gdata_download_stream_get_content_length (GDataDownloadStream *self)
{
gssize content_length;
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), -1);
g_mutex_lock (&(self->priv->content_mutex));
content_length = self->priv->content_length;
g_mutex_unlock (&(self->priv->content_mutex));
g_assert (content_length >= -1);
return content_length;
}
/**
* gdata_download_stream_get_cancellable:
* @self: a #GDataDownloadStream
*
* Gets the #GCancellable for the entire download operation, #GDataDownloadStream:cancellable.
*
* Return value: (transfer none): the #GCancellable for the entire download operation
*
* Since: 0.8.0
*/
GCancellable *
gdata_download_stream_get_cancellable (GDataDownloadStream *self)
{
g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL);
g_assert (self->priv->cancellable != NULL);
return self->priv->cancellable;
}