/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */ /* * GData Client * Copyright (C) Philip Withnall 2009 * * GData Client is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * GData Client is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with GData Client. If not, see . */ /** * SECTION:gdata-download-stream * @short_description: GData download stream object * @stability: Stable * @include: gdata/gdata-download-stream.h * * #GDataDownloadStream is a #GInputStream subclass to allow downloading of files from GData services with authorization from a #GDataService under * the given #GDataAuthorizationDomain. If authorization is not required to perform the download, a #GDataAuthorizationDomain doesn't have to be * specified. * * Once a #GDataDownloadStream is instantiated with gdata_download_stream_new(), the standard #GInputStream API can be used on the stream to download * the file. Network communication may not actually begin until the first call to g_input_stream_read(), so having a #GDataDownloadStream around is no * guarantee that the file is being downloaded. * * The content type and length of the file being downloaded are made available through #GDataDownloadStream:content-type and * #GDataDownloadStream:content-length as soon as the appropriate data is received from the server. Connect to the * #GObject::notify content-type and content-length details to be notified as * soon as the data is available. * * The entire download operation can be cancelled using the #GCancellable instance provided to gdata_download_stream_new(), or returned by * gdata_download_stream_get_cancellable(). Cancelling this at any time will cause all future #GInputStream method calls to return * %G_IO_ERROR_CANCELLED. If any #GInputStream methods are in the process of being called, they will be cancelled and return %G_IO_ERROR_CANCELLED as * soon as possible. * * Note that cancelling an individual method call (such as a call to g_input_stream_read()) using the #GCancellable parameter of the method will not * cancel the download as a whole — just that particular method call. In the case of g_input_stream_read(), this will cause it to successfully return * any data that it has in memory at the moment (up to the requested number of bytes), or return a %G_IO_ERROR_CANCELLED if it was blocking on receiving * data from the network. This is also the behaviour of g_input_stream_read() when the download operation as a whole is cancelled. * * In the case of g_input_stream_close(), the call will return immediately if network activity hasn't yet started. If it has, the network activity will * be cancelled, regardless of whether the call to g_input_stream_close() is cancelled. Cancelling a pending call to g_input_stream_close() (either * using the method's #GCancellable, or by cancelling the download stream as a whole) will cause it to stop waiting for the network activity to finish, * and return %G_IO_ERROR_CANCELLED immediately. Network activity will continue to be shut down in the background. * * If the server returns an error message (for example, if the user is not correctly authenticated/authorized or doesn't have suitable permissions to * download from the given URI), it will be returned as a #GDataServiceError by the first call to g_input_stream_read(). * * * Downloading to a File * * GDataService *service; * GDataAuthorizationDomain *domain; * GCancellable *cancellable; * GInputStream *download_stream; * GOutputStream *output_stream; * * /* Create the download stream */ * service = create_my_service (); * domain = get_my_authorization_domain_from_service (service); * cancellable = g_cancellable_new (); /* cancel this to cancel the entire download operation */ * download_stream = gdata_download_stream_new (service, domain, download_uri, cancellable); * output_stream = create_file_and_return_output_stream (); * * /* Perform the download asynchronously */ * g_output_stream_splice_async (output_stream, download_stream, G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET, * G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) download_splice_cb, NULL); * * g_object_unref (output_stream); * g_object_unref (download_stream); * g_object_unref (cancellable); * g_object_unref (domain); * g_object_unref (service); * * static void * download_splice_cb (GOutputStream *output_stream, GAsyncResult *result, gpointer user_data) * { * GError *error = NULL; * * g_output_stream_splice_finish (output_stream, result, &error); * * if (error != NULL && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) == FALSE)) { * /* Error downloading the file; potentially an I/O error (GIOError), or an error response from the server * * (GDataServiceError). You might want to delete the newly created file because of the error. */ * g_error ("Error downloading file: %s", error->message); * g_error_free (error); * } * } * * * * Since: 0.5.0 */ #include #include #include #include "gdata-download-stream.h" #include "gdata-buffer.h" #include "gdata-private.h" static void gdata_download_stream_seekable_iface_init (GSeekableIface *seekable_iface); static GObject *gdata_download_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params); static void gdata_download_stream_dispose (GObject *object); static void gdata_download_stream_finalize (GObject *object); static void gdata_download_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec); static void gdata_download_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec); static gssize gdata_download_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error); static gboolean gdata_download_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error); static goffset gdata_download_stream_tell (GSeekable *seekable); static gboolean gdata_download_stream_can_seek (GSeekable *seekable); static gboolean gdata_download_stream_seek (GSeekable *seekable, goffset offset, GSeekType type, GCancellable *cancellable, GError **error); static gboolean gdata_download_stream_can_truncate (GSeekable *seekable); static gboolean gdata_download_stream_truncate (GSeekable *seekable, goffset offset, GCancellable *cancellable, GError **error); static void create_network_thread (GDataDownloadStream *self, GError **error); static void reset_network_thread (GDataDownloadStream *self); /* * The GDataDownloadStream can be in one of several states: * 1. Pre-network activity. This is the state that the stream is created in. @network_thread and @cancellable are both %NULL, and @finished is %FALSE. * The stream will remain in this state until gdata_download_stream_read() or gdata_download_stream_seek() are called for the first time. * @content_type and @content_length are at their default values (NULL and -1, respectively). * 2. Network activity. This state is entered when gdata_download_stream_read() is called for the first time. * @network_thread, @buffer and @cancellable are created, while @finished remains %FALSE. * As soon as the headers are downloaded, which is guaranteed to be before the first call to gdata_download_stream_read() returns, @content_type * and @content_length are set from the headers. From this point onwards, they are immutable. * 3. Reset network activity. This state is entered only if case 3 is encountered in a call to gdata_download_stream_seek(): a seek to an offset which * has already been read out of the buffer. In this state, @buffer is freed and set to %NULL, @network_thread is cancelled (then set to %NULL), * and @offset is set to the seeked-to offset. @finished remains at %FALSE. * When the next call to gdata_download_stream_read() is made, the download stream will go back to state 2 as if this was the first call to * gdata_download_stream_read(). * 4. Post-network activity. This state is reached once the download thread finishes downloading, due to having downloaded everything. * @buffer is non-%NULL, @network_thread is non-%NULL, but meaningless; @cancellable is still a valid #GCancellable instance; and @finished is set * to %TRUE. At the same time, @finished_cond is signalled. * This state can be exited either by making a call to gdata_download_stream_seek(), in which case the stream will go back to state 3; or by * calling gdata_download_stream_close(), in which case the stream will return errors for all operations, as the underlying %GInputStream will be * marked as closed. */ struct _GDataDownloadStreamPrivate { gchar *download_uri; GDataService *service; GDataAuthorizationDomain *authorization_domain; SoupSession *session; SoupMessage *message; GDataBuffer *buffer; goffset offset; /* current position in the stream */ GThread *network_thread; GCancellable *cancellable; GCancellable *network_cancellable; /* see the comment in gdata_download_stream_constructor() about the relationship between these two */ gulong network_cancellable_id; gboolean finished; GCond finished_cond; GMutex finished_mutex; /* mutex for ->finished, protected by ->finished_cond */ /* Cached data from the SoupMessage */ gchar *content_type; gssize content_length; GMutex content_mutex; /* mutex to protect them */ }; enum { PROP_SERVICE = 1, PROP_DOWNLOAD_URI, PROP_CONTENT_TYPE, PROP_CONTENT_LENGTH, PROP_CANCELLABLE, PROP_AUTHORIZATION_DOMAIN, }; G_DEFINE_TYPE_WITH_CODE (GDataDownloadStream, gdata_download_stream, G_TYPE_INPUT_STREAM, G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE, gdata_download_stream_seekable_iface_init)) static void gdata_download_stream_class_init (GDataDownloadStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass); g_type_class_add_private (klass, sizeof (GDataDownloadStreamPrivate)); gobject_class->constructor = gdata_download_stream_constructor; gobject_class->dispose = gdata_download_stream_dispose; gobject_class->finalize = gdata_download_stream_finalize; gobject_class->get_property = gdata_download_stream_get_property; gobject_class->set_property = gdata_download_stream_set_property; /* We use the default implementations of the async functions, which just run * our implementation of the sync function in a thread. */ stream_class->read_fn = gdata_download_stream_read; stream_class->close_fn = gdata_download_stream_close; /** * GDataDownloadStream:service: * * The service which is used to authorize the download, and to which the download relates. * * Since: 0.5.0 */ g_object_class_install_property (gobject_class, PROP_SERVICE, g_param_spec_object ("service", "Service", "The service which is used to authorize the download.", GDATA_TYPE_SERVICE, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GDataDownloadStream:authorization-domain: * * The authorization domain for the download, against which the #GDataService:authorizer for the #GDataDownloadStream:service should be * authorized. This may be %NULL if authorization is not needed for the download. * * Since: 0.9.0 */ g_object_class_install_property (gobject_class, PROP_AUTHORIZATION_DOMAIN, g_param_spec_object ("authorization-domain", "Authorization domain", "The authorization domain for the download.", GDATA_TYPE_AUTHORIZATION_DOMAIN, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GDataDownloadStream:download-uri: * * The URI of the file to download. This must be HTTPS. * * Since: 0.5.0 */ g_object_class_install_property (gobject_class, PROP_DOWNLOAD_URI, g_param_spec_string ("download-uri", "Download URI", "The URI of the file to download.", NULL, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GDataDownloadStream:content-type: * * The content type of the file being downloaded. This will initially be %NULL, and will be populated as soon as the appropriate header is * received from the server. Its value will never change after this. * * Note that change notifications for this property (#GObject::notify emissions) may be emitted in threads other than the one which created * the #GDataDownloadStream. It is the client's responsibility to ensure that any notification signal handlers are either multi-thread safe * or marshal the notification to the thread which owns the #GDataDownloadStream as appropriate. * * Since: 0.5.0 */ g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE, g_param_spec_string ("content-type", "Content type", "The content type of the file being downloaded.", NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** * GDataDownloadStream:content-length: * * The length (in bytes) of the file being downloaded. This will initially be -1, and will be populated as soon as * the appropriate header is received from the server. Its value will never change after this. * * Note that change notifications for this property (#GObject::notify emissions) may be emitted in threads other than the one which created * the #GDataDownloadStream. It is the client's responsibility to ensure that any notification signal handlers are either multi-thread safe * or marshal the notification to the thread which owns the #GDataDownloadStream as appropriate. * * Since: 0.5.0 */ g_object_class_install_property (gobject_class, PROP_CONTENT_LENGTH, g_param_spec_long ("content-length", "Content length", "The length (in bytes) of the file being downloaded.", -1, G_MAXSSIZE, -1, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** * GDataDownloadStream:cancellable: * * An optional cancellable used to cancel the entire download operation. If a #GCancellable instance isn't provided for this property at * construction time (i.e. to gdata_download_stream_new()), one will be created internally and can be retrieved using * gdata_download_stream_get_cancellable() and used to cancel the download operation with g_cancellable_cancel() just as if it was passed to * gdata_download_stream_new(). * * If the download operation is cancelled using this #GCancellable, any ongoing network activity will be stopped, and any pending or future * calls to #GInputStream API on the #GDataDownloadStream will return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be * passed to individual #GInputStream operations will not cancel the download operation proper if cancelled — they will merely cancel that API * call. The only way to cancel the download operation completely is using #GDataDownloadStream:cancellable. * * Since: 0.8.0 */ g_object_class_install_property (gobject_class, PROP_CANCELLABLE, g_param_spec_object ("cancellable", "Cancellable", "An optional cancellable used to cancel the entire download operation.", G_TYPE_CANCELLABLE, G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void gdata_download_stream_seekable_iface_init (GSeekableIface *seekable_iface) { seekable_iface->tell = gdata_download_stream_tell; seekable_iface->can_seek = gdata_download_stream_can_seek; seekable_iface->seek = gdata_download_stream_seek; seekable_iface->can_truncate = gdata_download_stream_can_truncate; seekable_iface->truncate_fn = gdata_download_stream_truncate; } static void gdata_download_stream_init (GDataDownloadStream *self) { self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, GDATA_TYPE_DOWNLOAD_STREAM, GDataDownloadStreamPrivate); self->priv->buffer = NULL; /* created when the network thread is started and destroyed when the stream is closed */ self->priv->finished = FALSE; g_cond_init (&(self->priv->finished_cond)); g_mutex_init (&(self->priv->finished_mutex)); self->priv->content_type = NULL; self->priv->content_length = -1; g_mutex_init (&(self->priv->content_mutex)); } static void cancellable_cancel_cb (GCancellable *cancellable, GCancellable *network_cancellable) { g_cancellable_cancel (network_cancellable); } static GObject * gdata_download_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params) { GDataDownloadStreamPrivate *priv; GDataServiceClass *klass; GObject *object; SoupURI *_uri; /* Chain up to the parent class */ object = G_OBJECT_CLASS (gdata_download_stream_parent_class)->constructor (type, n_construct_params, construct_params); priv = GDATA_DOWNLOAD_STREAM (object)->priv; /* Create a #GCancellable for the network. Cancellation of ->cancellable is chained to this one, so that if ->cancellable is cancelled, * ->network_cancellable is also cancelled. However, if ->network_cancellable is cancelled, the cancellation doesn't propagate back upwards * to ->cancellable. This allows closing the stream part-way through a download to be implemented by cancelling ->network_cancellable, without * causing ->cancellable to be unnecessarily cancelled (which would be a nasty side-effect of closing the stream early otherwise). */ priv->network_cancellable = g_cancellable_new (); /* Create a #GCancellable for the entire download operation if one wasn't specified for #GDataDownloadStream:cancellable during construction */ if (priv->cancellable == NULL) priv->cancellable = g_cancellable_new (); priv->network_cancellable_id = g_cancellable_connect (priv->cancellable, (GCallback) cancellable_cancel_cb, priv->network_cancellable, NULL); /* Build the message. The URI must be HTTPS. */ _uri = soup_uri_new (priv->download_uri); soup_uri_set_port (_uri, _gdata_service_get_https_port ()); g_assert_cmpstr (soup_uri_get_scheme (_uri), ==, SOUP_URI_SCHEME_HTTPS); priv->message = soup_message_new_from_uri (SOUP_METHOD_GET, _uri); soup_uri_free (_uri); /* Make sure the headers are set */ klass = GDATA_SERVICE_GET_CLASS (priv->service); if (klass->append_query_headers != NULL) { klass->append_query_headers (priv->service, priv->authorization_domain, priv->message); } /* We don't want to accumulate chunks */ soup_message_body_set_accumulate (priv->message->request_body, FALSE); /* Downloading doesn't actually start until the first call to read() */ return object; } static void gdata_download_stream_dispose (GObject *object) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv; /* Block on closing the stream */ g_input_stream_close (G_INPUT_STREAM (object), NULL, NULL); if (priv->cancellable != NULL) { if (priv->network_cancellable_id != 0) { g_cancellable_disconnect (priv->cancellable, priv->network_cancellable_id); } g_object_unref (priv->cancellable); } priv->network_cancellable_id = 0; priv->cancellable = NULL; if (priv->network_cancellable != NULL) g_object_unref (priv->network_cancellable); priv->network_cancellable = NULL; if (priv->authorization_domain != NULL) g_object_unref (priv->authorization_domain); priv->authorization_domain = NULL; if (priv->service != NULL) g_object_unref (priv->service); priv->service = NULL; if (priv->message != NULL) g_object_unref (priv->message); priv->message = NULL; /* Chain up to the parent class */ G_OBJECT_CLASS (gdata_download_stream_parent_class)->dispose (object); } static void gdata_download_stream_finalize (GObject *object) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv; reset_network_thread (GDATA_DOWNLOAD_STREAM (object)); g_cond_clear (&(priv->finished_cond)); g_mutex_clear (&(priv->finished_mutex)); g_mutex_clear (&(priv->content_mutex)); g_free (priv->download_uri); g_free (priv->content_type); /* Chain up to the parent class */ G_OBJECT_CLASS (gdata_download_stream_parent_class)->finalize (object); } static void gdata_download_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv; switch (property_id) { case PROP_SERVICE: g_value_set_object (value, priv->service); break; case PROP_AUTHORIZATION_DOMAIN: g_value_set_object (value, priv->authorization_domain); break; case PROP_DOWNLOAD_URI: g_value_set_string (value, priv->download_uri); break; case PROP_CONTENT_TYPE: g_mutex_lock (&(priv->content_mutex)); g_value_set_string (value, priv->content_type); g_mutex_unlock (&(priv->content_mutex)); break; case PROP_CONTENT_LENGTH: g_mutex_lock (&(priv->content_mutex)); g_value_set_long (value, priv->content_length); g_mutex_unlock (&(priv->content_mutex)); break; case PROP_CANCELLABLE: g_value_set_object (value, priv->cancellable); break; default: /* We don't have any other property... */ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void gdata_download_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (object)->priv; switch (property_id) { case PROP_SERVICE: priv->service = g_value_dup_object (value); priv->session = _gdata_service_get_session (priv->service); break; case PROP_AUTHORIZATION_DOMAIN: priv->authorization_domain = g_value_dup_object (value); break; case PROP_DOWNLOAD_URI: priv->download_uri = g_value_dup_string (value); break; case PROP_CANCELLABLE: /* Construction only */ priv->cancellable = g_value_dup_object (value); break; default: /* We don't have any other property... */ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); break; } } static void read_cancelled_cb (GCancellable *cancellable, GCancellable *child_cancellable) { g_cancellable_cancel (child_cancellable); } static gssize gdata_download_stream_read (GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (stream)->priv; gssize length_read = -1; gboolean reached_eof = FALSE; gulong cancelled_signal = 0, global_cancelled_signal = 0; GCancellable *child_cancellable; GError *child_error = NULL; /* Listen for cancellation from either @cancellable or @priv->cancellable. We have to multiplex cancellation signals from the two sources * into a single #GCancellabel, @child_cancellable. */ child_cancellable = g_cancellable_new (); global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) read_cancelled_cb, child_cancellable, NULL); if (cancellable != NULL) cancelled_signal = g_cancellable_connect (cancellable, (GCallback) read_cancelled_cb, child_cancellable, NULL); /* We're lazy about starting the network operation so we don't end up with a massive buffer */ if (priv->network_thread == NULL) { /* Handle early cancellation so that we don't create the network thread unnecessarily */ if (g_cancellable_set_error_if_cancelled (child_cancellable, &child_error) == TRUE) { length_read = -1; goto done; } /* Create the network thread */ create_network_thread (GDATA_DOWNLOAD_STREAM (stream), &child_error); if (priv->network_thread == NULL) { length_read = -1; goto done; } } /* Read the data off the buffer. If the operation is cancelled, it'll probably still return a positive number of bytes read — if it does, we * can return without error. Iff it returns a non-positive number of bytes should we return an error. */ g_assert (priv->buffer != NULL); length_read = (gssize) gdata_buffer_pop_data (priv->buffer, buffer, count, &reached_eof, child_cancellable); if (length_read < 1 && g_cancellable_set_error_if_cancelled (child_cancellable, &child_error) == TRUE) { /* Handle cancellation */ length_read = -1; goto done; } else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code) == FALSE) { GDataServiceClass *klass = GDATA_SERVICE_GET_CLASS (priv->service); /* Set an appropriate error */ g_assert (klass->parse_error_response != NULL); klass->parse_error_response (priv->service, GDATA_OPERATION_DOWNLOAD, priv->message->status_code, priv->message->reason_phrase, NULL, 0, &child_error); length_read = -1; goto done; } done: /* Disconnect from the cancelled signals. */ if (cancelled_signal != 0) g_cancellable_disconnect (cancellable, cancelled_signal); if (global_cancelled_signal != 0) g_cancellable_disconnect (priv->cancellable, global_cancelled_signal); g_object_unref (child_cancellable); g_assert ((reached_eof == FALSE && length_read > 0 && length_read <= (gssize) count && child_error == NULL) || (reached_eof == TRUE && length_read >= 0 && length_read <= (gssize) count && child_error == NULL) || (length_read == -1 && child_error != NULL)); if (child_error != NULL) g_propagate_error (error, child_error); /* Update our internal offset */ if (length_read > 0) { priv->offset += length_read; } return length_read; } typedef struct { GDataDownloadStream *download_stream; gboolean *cancelled; } CancelledData; static void close_cancelled_cb (GCancellable *cancellable, CancelledData *data) { GDataDownloadStreamPrivate *priv = data->download_stream->priv; g_mutex_lock (&(priv->finished_mutex)); *(data->cancelled) = TRUE; g_cond_signal (&(priv->finished_cond)); g_mutex_unlock (&(priv->finished_mutex)); } /* Even though calling g_input_stream_close() multiple times on this stream is guaranteed to call gdata_download_stream_close() at most once, other * GIO methods (notably g_output_stream_splice()) can call gdata_download_stream_close() directly. Consequently, we need to be careful to be idempotent * after the first call. * * If the network thread hasn't yet been started (i.e. gdata_download_stream_read() hasn't been called at all yet), %TRUE will be returned immediately. * * If the global cancellable, ->cancellable, or @cancellable are cancelled before the call to gdata_download_stream_close(), * gdata_download_stream_close() should return immediately with %G_IO_ERROR_CANCELLED. If they're cancelled during the call, * gdata_download_stream_close() should stop waiting for any outstanding network activity to finish and return %G_IO_ERROR_CANCELLED (though the * operation to finish off network activity and close the stream will still continue). * * If the call to gdata_download_stream_close() is not cancelled by any #GCancellable, it will cancel the ongoing network activity, and wait until * the operation has been cleaned up before returning success. */ static gboolean gdata_download_stream_close (GInputStream *stream, GCancellable *cancellable, GError **error) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (stream)->priv; gulong cancelled_signal = 0, global_cancelled_signal = 0; gboolean cancelled = FALSE; /* must only be touched with ->finished_mutex held */ gboolean success = TRUE; CancelledData data; GError *child_error = NULL; /* If the operation was never started, return successfully immediately */ if (priv->network_thread == NULL) { goto done; } /* Allow cancellation */ data.download_stream = GDATA_DOWNLOAD_STREAM (stream); data.cancelled = &cancelled; global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) close_cancelled_cb, &data, NULL); if (cancellable != NULL) cancelled_signal = g_cancellable_connect (cancellable, (GCallback) close_cancelled_cb, &data, NULL); g_mutex_lock (&(priv->finished_mutex)); /* If the operation has started but hasn't already finished, cancel the network thread and wait for it to finish before returning */ if (priv->finished == FALSE) { g_cancellable_cancel (priv->network_cancellable); /* Allow the close() call to be cancelled by cancelling either @cancellable or ->cancellable. Note that this won't prevent the stream * from continuing to be closed in the background — it'll just stop waiting on the operation to finish being cancelled. */ if (cancelled == FALSE) { g_cond_wait (&(priv->finished_cond), &(priv->finished_mutex)); } } /* Error handling */ if (priv->finished == FALSE && cancelled == TRUE) { /* Cancelled? If ->finished is TRUE, the network activity finished before the gdata_download_stream_close() operation was cancelled, * so we don't need to return an error. */ g_assert (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == TRUE || g_cancellable_set_error_if_cancelled (priv->cancellable, &child_error) == TRUE); success = FALSE; } g_mutex_unlock (&(priv->finished_mutex)); /* Disconnect from the signal handlers. Note that we have to do this without @finished_mutex held, as g_cancellable_disconnect() blocks * until any outstanding cancellation callbacks return, and they will block on @finished_mutex. */ if (cancelled_signal != 0) g_cancellable_disconnect (cancellable, cancelled_signal); if (global_cancelled_signal != 0) g_cancellable_disconnect (priv->cancellable, global_cancelled_signal); done: /* If we were successful, tidy up various bits of state */ g_mutex_lock (&(priv->finished_mutex)); if (success == TRUE && priv->finished == TRUE) { reset_network_thread (GDATA_DOWNLOAD_STREAM (stream)); } g_mutex_unlock (&(priv->finished_mutex)); g_assert ((success == TRUE && child_error == NULL) || (success == FALSE && child_error != NULL)); if (child_error != NULL) g_propagate_error (error, child_error); return success; } static goffset gdata_download_stream_tell (GSeekable *seekable) { return GDATA_DOWNLOAD_STREAM (seekable)->priv->offset; } static gboolean gdata_download_stream_can_seek (GSeekable *seekable) { return TRUE; } static gboolean gdata_download_stream_seek (GSeekable *seekable, goffset offset, GSeekType type, GCancellable *cancellable, GError **error) { GDataDownloadStreamPrivate *priv = GDATA_DOWNLOAD_STREAM (seekable)->priv; GError *child_error = NULL; if (type == G_SEEK_END && priv->content_length == -1) { /* If we don't have the Content-Length, we can't calculate the offset from the start of the stream properly, so just give up. * We could technically use a HEAD request to get the Content-Length, but this hasn't been tried yet (FIXME). */ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "G_SEEK_END not currently supported"); return FALSE; } if (g_input_stream_set_pending (G_INPUT_STREAM (seekable), error) == FALSE) { return FALSE; } /* Ensure that offset is relative to the start of the stream. */ switch (type) { case G_SEEK_CUR: offset += priv->offset; break; case G_SEEK_SET: /* Nothing needs doing */ break; case G_SEEK_END: offset += priv->content_length; break; default: g_assert_not_reached (); } /* There are three cases to consider: * 1. The network thread hasn't been started. In this case, we need to set the offset and do nothing. When the network thread is started * (in the next read() call), a Range header will be set on it which will give the correct seek. * 2. The network thread has been started and the seek is to a position greater than our current position (i.e. one which already does, or * will soon, exist in the buffer). In this case, we need to pop the intervening bytes off the buffer (which may block) and update the * offset. * 3. The network thread has been started and the seek is to a position which has already been popped off the buffer. In this case, we need * to set the offset and cancel the network thread. When the network thread is restarted (in the next read() call), a Range header will * be set on it which will give the correct seek. */ if (priv->network_thread == NULL) { /* Case 1. Set the offset and we're done. */ priv->offset = offset; goto done; } /* Cases 2 and 3. The network thread has already been started. */ if (offset >= priv->offset) { goffset num_intervening_bytes; gssize length_read; /* Case 2. Pop off the intervening bytes and update the offset. If we can't pop enough bytes off, we throw an error. */ num_intervening_bytes = offset - priv->offset; g_assert (priv->buffer != NULL); length_read = (gssize) gdata_buffer_pop_data (priv->buffer, NULL, num_intervening_bytes, NULL, cancellable); if (length_read != num_intervening_bytes) { if (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == FALSE) { /* Tried to seek too far */ g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, _("Invalid seek request")); } goto done; } /* Update the offset */ priv->offset = offset; goto done; } else { /* Case 3. Cancel the current network thread. Note that we don't allow cancellation of this call, as we depend on it waiting for * the network thread to join. */ if (gdata_download_stream_close (G_INPUT_STREAM (seekable), NULL, &child_error) == FALSE) { goto done; } /* Update the offset */ priv->offset = offset; /* Mark the thread as unfinished */ g_mutex_lock (&(priv->finished_mutex)); priv->finished = FALSE; g_mutex_unlock (&(priv->finished_mutex)); goto done; } done: g_input_stream_clear_pending (G_INPUT_STREAM (seekable)); if (child_error != NULL) { g_propagate_error (error, child_error); return FALSE; } return TRUE; } static gboolean gdata_download_stream_can_truncate (GSeekable *seekable) { return FALSE; } static gboolean gdata_download_stream_truncate (GSeekable *seekable, goffset offset, GCancellable *cancellable, GError **error) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, "Truncate not allowed on input stream"); return FALSE; } static void got_headers_cb (SoupMessage *message, GDataDownloadStream *self) { goffset end; goffset start; goffset total_length; /* Don't get the client's hopes up by setting the Content-Type or -Length if the response * is actually unsuccessful. */ if (SOUP_STATUS_IS_SUCCESSFUL (message->status_code) == FALSE) return; g_mutex_lock (&(self->priv->content_mutex)); self->priv->content_type = g_strdup (soup_message_headers_get_content_type (message->response_headers, NULL)); self->priv->content_length = soup_message_headers_get_content_length (message->response_headers); if (soup_message_headers_get_content_range (message->response_headers, &start, &end, &total_length)) { self->priv->content_length = (gssize) total_length; } g_mutex_unlock (&(self->priv->content_mutex)); /* Emit the notifications for the Content-Length and -Type properties */ g_object_freeze_notify (G_OBJECT (self)); g_object_notify (G_OBJECT (self), "content-length"); g_object_notify (G_OBJECT (self), "content-type"); g_object_thaw_notify (G_OBJECT (self)); } static void got_chunk_cb (SoupMessage *message, SoupBuffer *buffer, GDataDownloadStream *self) { /* Ignore the chunk if the response is unsuccessful or it has zero length */ if (SOUP_STATUS_IS_SUCCESSFUL (message->status_code) == FALSE || buffer->length == 0) return; /* Push the data onto the buffer immediately */ g_assert (self->priv->buffer != NULL); gdata_buffer_push_data (self->priv->buffer, (const guint8*) buffer->data, buffer->length); } static gpointer download_thread (GDataDownloadStream *self) { GDataDownloadStreamPrivate *priv = self->priv; g_object_ref (self); g_assert (priv->network_cancellable != NULL); /* Connect to the got-headers signal so we can notify clients of the values of content-type and content-length */ g_signal_connect (priv->message, "got-headers", (GCallback) got_headers_cb, self); g_signal_connect (priv->message, "got-chunk", (GCallback) got_chunk_cb, self); /* Set a Range header if our starting offset is non-zero */ if (priv->offset > 0) { soup_message_headers_set_range (priv->message->request_headers, priv->offset, -1); } else { soup_message_headers_remove (priv->message->request_headers, "Range"); } _gdata_service_actually_send_message (priv->session, priv->message, priv->network_cancellable, NULL); /* Mark the buffer as having reached EOF */ g_assert (priv->buffer != NULL); gdata_buffer_push_data (priv->buffer, NULL, 0); /* Mark the download as finished */ g_mutex_lock (&(priv->finished_mutex)); priv->finished = TRUE; g_cond_signal (&(priv->finished_cond)); g_mutex_unlock (&(priv->finished_mutex)); g_object_unref (self); return NULL; } static void create_network_thread (GDataDownloadStream *self, GError **error) { GDataDownloadStreamPrivate *priv = self->priv; g_assert (priv->buffer == NULL); priv->buffer = gdata_buffer_new (); g_assert (priv->network_thread == NULL); priv->network_thread = g_thread_try_new ("download-thread", (GThreadFunc) download_thread, self, error); } static void reset_network_thread (GDataDownloadStream *self) { GDataDownloadStreamPrivate *priv = self->priv; priv->network_thread = NULL; if (priv->buffer != NULL) { gdata_buffer_free (priv->buffer); priv->buffer = NULL; } if (priv->message != NULL) { soup_session_cancel_message (priv->session, priv->message, SOUP_STATUS_CANCELLED); g_signal_handlers_disconnect_by_func (priv->message, got_headers_cb, self); g_signal_handlers_disconnect_by_func (priv->message, got_chunk_cb, self); } priv->offset = 0; if (priv->network_cancellable != NULL) { g_cancellable_reset (priv->network_cancellable); } } /** * gdata_download_stream_new: * @service: a #GDataService * @domain: (allow-none): the #GDataAuthorizationDomain to authorize the download, or %NULL * @download_uri: the URI to download; this must be HTTPS * @cancellable: (allow-none): a #GCancellable for the entire download stream, or %NULL * * Creates a new #GDataDownloadStream, allowing a file to be downloaded from a GData service using standard #GInputStream API. * * As well as the standard GIO errors, calls to the #GInputStream API on a #GDataDownloadStream can also return any relevant specific error from * #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case. * * If a #GCancellable is provided in @cancellable, the download operation may be cancelled at any time from another thread using g_cancellable_cancel(). * In this case, any ongoing network activity will be stopped, and any pending or future calls to #GInputStream API on the #GDataDownloadStream will * return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GInputStream operations will not cancel the * download operation proper if cancelled — they will merely cancel that API call. The only way to cancel the download operation completely is using * this @cancellable. * * Return value: a new #GInputStream, or %NULL; unref with g_object_unref() * * Since: 0.9.0 */ GInputStream * gdata_download_stream_new (GDataService *service, GDataAuthorizationDomain *domain, const gchar *download_uri, GCancellable *cancellable) { g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL); g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL); g_return_val_if_fail (download_uri != NULL, NULL); g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL); return G_INPUT_STREAM (g_object_new (GDATA_TYPE_DOWNLOAD_STREAM, "download-uri", download_uri, "service", service, "authorization-domain", domain, "cancellable", cancellable, NULL)); } /** * gdata_download_stream_get_service: * @self: a #GDataDownloadStream * * Gets the service used to authorize the download, as passed to gdata_download_stream_new(). * * Return value: (transfer none): the #GDataService used to authorize the download * * Since: 0.5.0 */ GDataService * gdata_download_stream_get_service (GDataDownloadStream *self) { g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL); return self->priv->service; } /** * gdata_download_stream_get_authorization_domain: * @self: a #GDataDownloadStream * * Gets the authorization domain used to authorize the download, as passed to gdata_download_stream_new(). It may be %NULL if authorization is not * needed for the download. * * Return value: (transfer none) (allow-none): the #GDataAuthorizationDomain used to authorize the download, or %NULL * * Since: 0.9.0 */ GDataAuthorizationDomain * gdata_download_stream_get_authorization_domain (GDataDownloadStream *self) { g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL); return self->priv->authorization_domain; } /** * gdata_download_stream_get_download_uri: * @self: a #GDataDownloadStream * * Gets the URI of the file being downloaded, as passed to gdata_download_stream_new(). * * Return value: the URI of the file being downloaded * * Since: 0.5.0 */ const gchar * gdata_download_stream_get_download_uri (GDataDownloadStream *self) { g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL); return self->priv->download_uri; } /** * gdata_download_stream_get_content_type: * @self: a #GDataDownloadStream * * Gets the content type of the file being downloaded. If the Content-Type header has not yet * been received, %NULL will be returned. * * Return value: the content type of the file being downloaded, or %NULL * * Since: 0.5.0 */ const gchar * gdata_download_stream_get_content_type (GDataDownloadStream *self) { const gchar *content_type; g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL); g_mutex_lock (&(self->priv->content_mutex)); content_type = self->priv->content_type; g_mutex_unlock (&(self->priv->content_mutex)); /* It's safe to return this, even though we're not taking a copy of it, as it's immutable once set. */ return content_type; } /** * gdata_download_stream_get_content_length: * @self: a #GDataDownloadStream * * Gets the length (in bytes) of the file being downloaded. If the Content-Length header has not yet * been received from the server, -1 will be returned. * * Return value: the content length of the file being downloaded, or -1 * * Since: 0.5.0 */ gssize gdata_download_stream_get_content_length (GDataDownloadStream *self) { gssize content_length; g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), -1); g_mutex_lock (&(self->priv->content_mutex)); content_length = self->priv->content_length; g_mutex_unlock (&(self->priv->content_mutex)); g_assert (content_length >= -1); return content_length; } /** * gdata_download_stream_get_cancellable: * @self: a #GDataDownloadStream * * Gets the #GCancellable for the entire download operation, #GDataDownloadStream:cancellable. * * Return value: (transfer none): the #GCancellable for the entire download operation * * Since: 0.8.0 */ GCancellable * gdata_download_stream_get_cancellable (GDataDownloadStream *self) { g_return_val_if_fail (GDATA_IS_DOWNLOAD_STREAM (self), NULL); g_assert (self->priv->cancellable != NULL); return self->priv->cancellable; }