/* -*- Mode: C; indent-tabs-mode: t; c-basic-offset: 8; tab-width: 8 -*- */
/*
* GData Client
* Copyright (C) Philip Withnall 2009 <philip@tecnocode.co.uk>
*
* GData Client is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* GData Client is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with GData Client. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* SECTION:gdata-upload-stream
* @short_description: GData upload stream object
* @stability: Stable
* @include: gdata/gdata-upload-stream.h
*
* #GDataUploadStream is a #GOutputStream subclass to allow uploading of files from GData services with authorization from a #GDataService under
* the given #GDataAuthorizationDomain. If authorization is not required to perform the upload, a #GDataAuthorizationDomain doesn't have to be
* specified.
*
* Once a #GDataUploadStream is instantiated with gdata_upload_stream_new(), the standard #GOutputStream API can be used on the stream to upload
* the file. Network communication may not actually begin until the first call to g_output_stream_write(), so having a #GDataUploadStream around is no
* guarantee that data is being uploaded.
*
* Uploads of a file, or a file with associated metadata (a #GDataEntry) should use #GDataUploadStream, but if you want to simply upload a single
* #GDataEntry, use gdata_service_insert_entry() instead. #GDataUploadStream is for large streaming uploads.
*
* Once an upload is complete, the server's response can be retrieved from the #GDataUploadStream using gdata_upload_stream_get_response(). In order
* for network communication to be guaranteed to have stopped (and thus the response definitely available), g_output_stream_close() must be called
* on the #GDataUploadStream first. Otherwise, gdata_upload_stream_get_response() may return saying that the operation is still in progress.
*
* If the server returns an error instead of a success response, the error will be returned by g_output_stream_close() as a #GDataServiceError.
*
* The entire upload operation can be cancelled using the #GCancellable instance provided to gdata_upload_stream_new(), or returned by
* gdata_upload_stream_get_cancellable(). Cancelling this at any time will cause all future #GOutputStream method calls to return
* %G_IO_ERROR_CANCELLED. If any #GOutputStream methods are in the process of being called, they will be cancelled and return %G_IO_ERROR_CANCELLED as
* soon as possible.
*
* Note that cancelling an individual method call (such as a call to g_output_stream_write()) using the #GCancellable parameter of the method will not
* cancel the upload as a whole — just that particular method call. In the case of g_output_stream_write(), this will cause it to return the number of
* bytes it has successfully written up to the point of cancellation (up to the requested number of bytes), or return a %G_IO_ERROR_CANCELLED if it
* had not managed to write any bytes to the network by that point. This is also the behaviour of g_output_stream_write() when the upload operation as
* a whole is cancelled.
*
* In the case of g_output_stream_close(), the call will return immediately if network activity hasn't yet started. If it has, the network activity
* will be cancelled, regardless of whether the call to g_output_stream_close() is cancelled. Cancelling a pending call to g_output_stream_close()
* (either using the method's #GCancellable, or by cancelling the upload stream as a whole) will cause it to stop waiting for the network activity to
* finish, and return %G_IO_ERROR_CANCELLED immediately. Network activity will continue to be shut down in the background.
*
* Any outstanding data is guaranteed to be written to the network successfully even if a call to g_output_stream_close() is cancelled. However, if
* the upload stream as a whole is cancelled using #GDataUploadStream:cancellable, no more data will be sent over the network, and the network
* connection will be closed immediately. i.e. #GDataUploadStream will do its best to instruct the server to cancel the upload and any associated
* server-side changes of state.
*
* If the server returns an error message (for example, if the user is not correctly authenticated/authorized or doesn't have suitable permissions
* to upload from the given URI), it will be returned as a #GDataServiceError by g_output_stream_close().
*
* <example>
* <title>Uploading from a File</title>
* <programlisting>
* GDataService *service;
* GDataAuthorizationDomain *domain;
* GCancellable *cancellable;
* GInputStream *input_stream;
* GOutputStream *upload_stream;
* GFile *file;
* GFileInfo *file_info;
* GError *error = NULL;
*
* /<!-- -->* Get the file to upload *<!-- -->/
* file = get_file_to_upload ();
* file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
* G_FILE_ATTRIBUTE_STANDARD_SIZE,
* G_FILE_QUERY_INFO_NONE, NULL, &error);
*
* if (file_info == NULL) {
* g_error ("Error getting file info: %s", error->message);
* g_error_free (error);
* g_object_unref (file);
* return;
* }
*
* input_stream = g_file_read (file, NULL, &error);
* g_object_unref (file);
*
* if (input_stream == NULL) {
* g_error ("Error getting file input stream: %s", error->message);
* g_error_free (error);
* g_object_unref (file_info);
* return;
* }
*
* /<!-- -->* Create the upload stream *<!-- -->/
* service = create_my_service ();
* domain = get_my_authorization_domain_from_service (service);
* cancellable = g_cancellable_new (); /<!-- -->* cancel this to cancel the entire upload operation *<!-- -->/
* upload_stream = gdata_upload_stream_new_resumable (service, domain, SOUP_METHOD_POST, upload_uri, NULL,
* g_file_info_get_display_name (file_info), g_file_info_get_content_type (file_info),
* g_file_info_get_size (file_info), cancellable);
* g_object_unref (file_info);
*
* /<!-- -->* Perform the upload asynchronously *<!-- -->/
* g_output_stream_splice_async (upload_stream, input_stream, G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
* G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) upload_splice_cb, NULL);
*
* g_object_unref (upload_stream);
* g_object_unref (input_stream);
* g_object_unref (cancellable);
* g_object_unref (domain);
* g_object_unref (service);
*
* static void
* upload_splice_cb (GOutputStream *upload_stream, GAsyncResult *result, gpointer user_data)
* {
* gssize length;
* GError *error = NULL;
*
* g_output_stream_splice_finish (upload_stream, result, &error);
*
* if (error != NULL && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) == FALSE)) {
* /<!-- -->* Error upload the file; potentially an I/O error (GIOError), or an error response from the server
* * (GDataServiceError). *<!-- -->/
* g_error ("Error uploading file: %s", error->message);
* g_error_free (error);
* }
*
* /<!-- -->* If the upload was successful, carry on to parse the result. Note that this will normally be handled by methods like
* * gdata_youtube_service_finish_video_upload(), gdata_picasaweb_service_finish_file_upload() and
* * gdata_documents_service_finish_upload() *<!-- -->/
* parse_server_result (gdata_upload_stream_get_response (GDATA_UPLOAD_STREAM (upload_stream), &length), length);
* }
* </programlisting>
* </example>
*
* Since: 0.5.0
*/
/*
* We have a network thread which does all the uploading work. We send the message encoded as chunks, but cannot use the SoupMessageBody as a
* data buffer, since it can only ever be touched by the network thread. Instead, we pass data to the network thread through a GDataBuffer, with
* the main thread pushing it on as and when write() is called. The network thread cannot block on popping data off the buffer, as it requests fixed-
* size chunks, and there's no way to notify it that we've reached EOF; so when it gets to popping the last chunk off the buffer, which may well be
* smaller than its chunk size, it would block for more data and therefore hang. Consequently, the network thread instead pops as much data as it can
* off the buffer, up to its chunk size, which is a non-blocking operation.
*
* The write() and close() operations on the output stream are synchronised with the network thread, so that the write() call only returns once the
* network thread has written at least as many bytes as were passed to the write() call, and the close() call only returns once all network activity
* has finished (including receiving the response from the server). Async versions of these calls are provided by GOutputStream.
*
* The number of bytes in the various buffers are recorded using:
* • message_bytes_outstanding: the number of bytes in the GDataBuffer which are waiting to be written to the SoupMessageBody
* • network_bytes_outstanding: the number of bytes which have been written to the SoupMessageBody, and are waiting to be written to the network
* • network_bytes_written: the total number of bytes which have been successfully written to the network
*
* Mutex locking order:
* 1. response_mutex
* 2. write_mutex
*/
#include <config.h>
#include <glib.h>
#include <glib/gi18n-lib.h>
#include <string.h>
#include "gdata-upload-stream.h"
#include "gdata-buffer.h"
#include "gdata-private.h"
#define BOUNDARY_STRING "0003Z5W789deadbeefRTE456KlemsnoZV"
#define MAX_RESUMABLE_CHUNK_SIZE (512 * 1024) /* bytes = 512 KiB */
static void gdata_upload_stream_constructed (GObject *object);
static void gdata_upload_stream_dispose (GObject *object);
static void gdata_upload_stream_finalize (GObject *object);
static void gdata_upload_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec);
static void gdata_upload_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec);
static gssize gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error);
static gboolean gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GError **error);
static gboolean gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error);
static void create_network_thread (GDataUploadStream *self, GError **error);
typedef enum {
STATE_INITIAL_REQUEST, /* initial POST request to the resumable-create-media link (unused for non-resumable uploads) */
STATE_DATA_REQUESTS, /* one or more subsequent PUT requests (only state used for non-resumable uploads) */
STATE_FINISHED, /* finished successfully or in error */
} UploadState;
struct _GDataUploadStreamPrivate {
gchar *method;
gchar *upload_uri;
GDataService *service;
GDataAuthorizationDomain *authorization_domain;
GDataEntry *entry;
gchar *slug;
gchar *content_type;
goffset content_length; /* -1 for non-resumable uploads; 0 or greater for resumable ones */
SoupSession *session;
SoupMessage *message;
GDataBuffer *buffer;
GCancellable *cancellable;
GThread *network_thread;
UploadState state; /* protected by write_mutex */
GMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
/* This persists across all resumable upload chunks. Note that it doesn't count bytes from the entry XML. */
gsize total_network_bytes_written; /* the number of bytes which have been written to the network in STATE_DATA_REQUESTS */
/* All of the following apply only to the current resumable upload chunk. */
gsize message_bytes_outstanding; /* the number of bytes which have been written to the buffer but not libsoup (signalled by write_cond) */
gsize network_bytes_outstanding; /* the number of bytes which have been written to libsoup but not the network (signalled by write_cond) */
gsize network_bytes_written; /* the number of bytes which have been written to the network (signalled by write_cond) */
gsize chunk_size; /* the size of the current chunk (in bytes); 0 iff content_length <= 0; must be <= MAX_RESUMABLE_CHUNK_SIZE */
GCond write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
GCond finished_cond; /* signalled when sending the message (and receiving the response) is finished (protected by response_mutex) */
guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
GError *response_error; /* error asynchronously set by the network thread, and picked up by the main thread when appropriate */
GMutex response_mutex; /* mutex for ->response_error, ->response_status and ->finished_cond */
};
enum {
PROP_SERVICE = 1,
PROP_UPLOAD_URI,
PROP_ENTRY,
PROP_SLUG,
PROP_CONTENT_TYPE,
PROP_METHOD,
PROP_CANCELLABLE,
PROP_AUTHORIZATION_DOMAIN,
PROP_CONTENT_LENGTH,
};
G_DEFINE_TYPE (GDataUploadStream, gdata_upload_stream, G_TYPE_OUTPUT_STREAM)
static void
gdata_upload_stream_class_init (GDataUploadStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
g_type_class_add_private (klass, sizeof (GDataUploadStreamPrivate));
gobject_class->constructed = gdata_upload_stream_constructed;
gobject_class->dispose = gdata_upload_stream_dispose;
gobject_class->finalize = gdata_upload_stream_finalize;
gobject_class->get_property = gdata_upload_stream_get_property;
gobject_class->set_property = gdata_upload_stream_set_property;
/* We use the default implementations of the async functions, which just run
* our implementation of the sync function in a thread. */
stream_class->write_fn = gdata_upload_stream_write;
stream_class->flush = gdata_upload_stream_flush;
stream_class->close_fn = gdata_upload_stream_close;
/**
* GDataUploadStream:service:
*
* The service which is used to authorize the upload, and to which the upload relates.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_SERVICE,
g_param_spec_object ("service",
"Service", "The service which is used to authorize the upload.",
GDATA_TYPE_SERVICE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:authorization-domain:
*
* The authorization domain for the upload, against which the #GDataService:authorizer for the #GDataDownloadStream:service should be
* authorized. This may be %NULL if authorization is not needed for the upload.
*
* Since: 0.9.0
*/
g_object_class_install_property (gobject_class, PROP_AUTHORIZATION_DOMAIN,
g_param_spec_object ("authorization-domain",
"Authorization domain", "The authorization domain for the upload.",
GDATA_TYPE_AUTHORIZATION_DOMAIN,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:method:
*
* The HTTP request method to use when uploading the file.
*
* Since: 0.7.0
*/
g_object_class_install_property (gobject_class, PROP_METHOD,
g_param_spec_string ("method",
"Method", "The HTTP request method to use when uploading the file.",
NULL,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:upload-uri:
*
* The URI to upload the data and metadata to. This must be HTTPS.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_UPLOAD_URI,
g_param_spec_string ("upload-uri",
"Upload URI", "The URI to upload the data and metadata to.",
NULL,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:entry:
*
* The entry used for metadata to upload.
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_ENTRY,
g_param_spec_object ("entry",
"Entry", "The entry used for metadata to upload.",
GDATA_TYPE_ENTRY,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:slug:
*
* The slug of the file being uploaded. This is usually the display name of the file (i.e. as returned by g_file_info_get_display_name()).
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_SLUG,
g_param_spec_string ("slug",
"Slug", "The slug of the file being uploaded.",
NULL,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:content-length:
*
* The content length (in bytes) of the file being uploaded (i.e. as returned by g_file_info_get_size()). Note that this does not include the
* length of the XML serialisation of #GDataUploadStream:entry, if set.
*
* If this is <code class="literal">-1</code> the upload will be non-resumable; if it is non-negative, the upload will be resumable.
*
* Since: 0.13.0
*/
g_object_class_install_property (gobject_class, PROP_CONTENT_LENGTH,
g_param_spec_int64 ("content-length",
"Content length", "The content length (in bytes) of the file being uploaded.",
-1, G_MAXINT64, -1,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:content-type:
*
* The content type of the file being uploaded (i.e. as returned by g_file_info_get_content_type()).
*
* Since: 0.5.0
*/
g_object_class_install_property (gobject_class, PROP_CONTENT_TYPE,
g_param_spec_string ("content-type",
"Content type", "The content type of the file being uploaded.",
NULL,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GDataUploadStream:cancellable:
*
* An optional cancellable used to cancel the entire upload operation. If a #GCancellable instance isn't provided for this property at
* construction time (i.e. to gdata_upload_stream_new()), one will be created internally and can be retrieved using
* gdata_upload_stream_get_cancellable() and used to cancel the upload operation with g_cancellable_cancel() just as if it was passed to
* gdata_upload_stream_new().
*
* If the upload operation is cancelled using this #GCancellable, any ongoing network activity will be stopped, and any pending or future calls
* to #GOutputStream API on the #GDataUploadStream will return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed
* to individual #GOutputStream operations will not cancel the upload operation proper if cancelled — they will merely cancel that API call.
* The only way to cancel the upload operation completely is using #GDataUploadStream:cancellable.
*
* Since: 0.8.0
*/
g_object_class_install_property (gobject_class, PROP_CANCELLABLE,
g_param_spec_object ("cancellable",
"Cancellable", "An optional cancellable used to cancel the entire upload operation.",
G_TYPE_CANCELLABLE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gdata_upload_stream_init (GDataUploadStream *self)
{
self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, GDATA_TYPE_UPLOAD_STREAM, GDataUploadStreamPrivate);
self->priv->buffer = gdata_buffer_new ();
g_mutex_init (&(self->priv->write_mutex));
g_cond_init (&(self->priv->write_cond));
g_cond_init (&(self->priv->finished_cond));
g_mutex_init (&(self->priv->response_mutex));
}
static SoupMessage *
build_message (GDataUploadStream *self, const gchar *method, const gchar *upload_uri)
{
SoupMessage *new_message;
SoupURI *_uri;
/* Build the message */
_uri = soup_uri_new (upload_uri);
soup_uri_set_port (_uri, _gdata_service_get_https_port ());
new_message = soup_message_new_from_uri (method, _uri);
soup_uri_free (_uri);
/* We don't want to accumulate chunks */
soup_message_body_set_accumulate (new_message->request_body, FALSE);
return new_message;
}
static void
gdata_upload_stream_constructed (GObject *object)
{
GDataUploadStreamPrivate *priv;
GDataServiceClass *service_klass;
SoupURI *uri = NULL;
/* Chain up to the parent class */
G_OBJECT_CLASS (gdata_upload_stream_parent_class)->constructed (object);
priv = GDATA_UPLOAD_STREAM (object)->priv;
/* The upload URI must be HTTPS. */
uri = soup_uri_new (priv->upload_uri);
g_assert_cmpstr (soup_uri_get_scheme (uri), ==, SOUP_URI_SCHEME_HTTPS);
soup_uri_free (uri);
/* Create a #GCancellable for the entire upload operation if one wasn't specified for #GDataUploadStream:cancellable during construction */
if (priv->cancellable == NULL)
priv->cancellable = g_cancellable_new ();
/* Build the message */
priv->message = build_message (GDATA_UPLOAD_STREAM (object), priv->method, priv->upload_uri);
if (priv->slug != NULL)
soup_message_headers_append (priv->message->request_headers, "Slug", priv->slug);
if (priv->content_length == -1) {
/* Non-resumable upload */
soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CHUNKED);
/* The Content-Type should be multipart/related if we're also uploading the metadata (entry != NULL),
* and the given content_type otherwise. */
if (priv->entry != NULL) {
gchar *first_part_header, *upload_data;
gchar *second_part_header;
GDataParsableClass *parsable_klass;
parsable_klass = GDATA_PARSABLE_GET_CLASS (priv->entry);
g_assert (parsable_klass->get_content_type != NULL);
soup_message_headers_set_content_type (priv->message->request_headers, "multipart/related; boundary=" BOUNDARY_STRING, NULL);
if (g_strcmp0 (parsable_klass->get_content_type (), "application/json") == 0) {
upload_data = gdata_parsable_get_json (GDATA_PARSABLE (priv->entry));
} else {
upload_data = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
}
/* Start by writing out the entry; then the thread has something to write to the network when it's created */
first_part_header = g_strdup_printf ("--" BOUNDARY_STRING "\n"
"Content-Type: %s; charset=UTF-8\n\n",
parsable_klass->get_content_type ());
second_part_header = g_strdup_printf ("\n--" BOUNDARY_STRING "\n"
"Content-Type: %s\n"
"Content-Transfer-Encoding: binary\n\n",
priv->content_type);
/* Push the message parts onto the message body; we can skip the buffer, since the network thread hasn't yet been created,
* so we're the sole thread accessing the SoupMessage. */
soup_message_body_append (priv->message->request_body,
SOUP_MEMORY_TAKE,
first_part_header,
strlen (first_part_header));
soup_message_body_append (priv->message->request_body,
SOUP_MEMORY_TAKE, upload_data,
strlen (upload_data));
soup_message_body_append (priv->message->request_body,
SOUP_MEMORY_TAKE,
second_part_header,
strlen (second_part_header));
first_part_header = NULL;
upload_data = NULL;
second_part_header = NULL;
priv->network_bytes_outstanding = priv->message->request_body->length;
} else {
soup_message_headers_set_content_type (priv->message->request_headers, priv->content_type, NULL);
}
/* Non-resumable uploads start with the data requests immediately. */
priv->state = STATE_DATA_REQUESTS;
} else {
gchar *content_length_str;
/* Resumable upload's initial request */
soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Type", priv->content_type);
content_length_str = g_strdup_printf ("%" G_GOFFSET_FORMAT, priv->content_length);
soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Length", content_length_str);
g_free (content_length_str);
if (priv->entry != NULL) {
GDataParsableClass *parsable_klass;
gchar *content_type, *upload_data;
parsable_klass = GDATA_PARSABLE_GET_CLASS (priv->entry);
g_assert (parsable_klass->get_content_type != NULL);
if (g_strcmp0 (parsable_klass->get_content_type (), "application/json") == 0) {
upload_data = gdata_parsable_get_json (GDATA_PARSABLE (priv->entry));
} else {
upload_data = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
}
content_type = g_strdup_printf ("%s; charset=UTF-8",
parsable_klass->get_content_type ());
soup_message_headers_set_content_type (priv->message->request_headers,
content_type,
NULL);
g_free (content_type);
soup_message_body_append (priv->message->request_body,
SOUP_MEMORY_TAKE,
upload_data,
strlen (upload_data));
upload_data = NULL;
priv->network_bytes_outstanding = priv->message->request_body->length;
} else {
soup_message_headers_set_content_length (priv->message->request_headers, 0);
}
/* Resumable uploads always start with an initial request, which either contains the XML or is empty. */
priv->state = STATE_INITIAL_REQUEST;
priv->chunk_size = MIN (priv->content_length, MAX_RESUMABLE_CHUNK_SIZE);
}
/* Make sure the headers are set. HACK: This should actually be in build_message(), but we have to work around
* http://code.google.com/a/google.com/p/apps-api-issues/issues/detail?id=3033 in GDataDocumentsService's append_query_headers(). */
service_klass = GDATA_SERVICE_GET_CLASS (priv->service);
if (service_klass->append_query_headers != NULL) {
service_klass->append_query_headers (priv->service, priv->authorization_domain, priv->message);
}
/* If the entry exists and has an ETag, we assume we're updating the entry, so we can set the If-Match header */
if (priv->entry != NULL && gdata_entry_get_etag (priv->entry) != NULL)
soup_message_headers_append (priv->message->request_headers, "If-Match", gdata_entry_get_etag (priv->entry));
/* Uploading doesn't actually start until the first call to write() */
}
static void
gdata_upload_stream_dispose (GObject *object)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (object)->priv;
/* Close the stream before unreffing things like priv->service, which stops crashes like bgo#602156 if the stream is unreffed in the middle
* of network operations */
g_output_stream_close (G_OUTPUT_STREAM (object), NULL, NULL);
if (priv->cancellable != NULL)
g_object_unref (priv->cancellable);
priv->cancellable = NULL;
if (priv->service != NULL)
g_object_unref (priv->service);
priv->service = NULL;
if (priv->authorization_domain != NULL)
g_object_unref (priv->authorization_domain);
priv->authorization_domain = NULL;
if (priv->message != NULL)
g_object_unref (priv->message);
priv->message = NULL;
if (priv->entry != NULL)
g_object_unref (priv->entry);
priv->entry = NULL;
/* Chain up to the parent class */
G_OBJECT_CLASS (gdata_upload_stream_parent_class)->dispose (object);
}
static void
gdata_upload_stream_finalize (GObject *object)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (object)->priv;
g_mutex_clear (&(priv->response_mutex));
g_cond_clear (&(priv->finished_cond));
g_cond_clear (&(priv->write_cond));
g_mutex_clear (&(priv->write_mutex));
gdata_buffer_free (priv->buffer);
g_clear_error (&(priv->response_error));
g_free (priv->upload_uri);
g_free (priv->method);
g_free (priv->slug);
g_free (priv->content_type);
/* Chain up to the parent class */
G_OBJECT_CLASS (gdata_upload_stream_parent_class)->finalize (object);
}
static void
gdata_upload_stream_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (object)->priv;
switch (property_id) {
case PROP_SERVICE:
g_value_set_object (value, priv->service);
break;
case PROP_AUTHORIZATION_DOMAIN:
g_value_set_object (value, priv->authorization_domain);
break;
case PROP_METHOD:
g_value_set_string (value, priv->method);
break;
case PROP_UPLOAD_URI:
g_value_set_string (value, priv->upload_uri);
break;
case PROP_ENTRY:
g_value_set_object (value, priv->entry);
break;
case PROP_SLUG:
g_value_set_string (value, priv->slug);
break;
case PROP_CONTENT_TYPE:
g_value_set_string (value, priv->content_type);
break;
case PROP_CONTENT_LENGTH:
g_value_set_int64 (value, priv->content_length);
break;
case PROP_CANCELLABLE:
g_value_set_object (value, priv->cancellable);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
static void
gdata_upload_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (object)->priv;
switch (property_id) {
case PROP_SERVICE:
priv->service = g_value_dup_object (value);
priv->session = _gdata_service_get_session (priv->service);
break;
case PROP_AUTHORIZATION_DOMAIN:
priv->authorization_domain = g_value_dup_object (value);
break;
case PROP_METHOD:
priv->method = g_value_dup_string (value);
break;
case PROP_UPLOAD_URI:
priv->upload_uri = g_value_dup_string (value);
break;
case PROP_ENTRY:
priv->entry = g_value_dup_object (value);
break;
case PROP_SLUG:
priv->slug = g_value_dup_string (value);
break;
case PROP_CONTENT_TYPE:
priv->content_type = g_value_dup_string (value);
break;
case PROP_CONTENT_LENGTH:
priv->content_length = g_value_get_int64 (value);
break;
case PROP_CANCELLABLE:
/* Construction only */
priv->cancellable = g_value_dup_object (value);
break;
default:
/* We don't have any other property... */
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
break;
}
}
typedef struct {
GDataUploadStream *upload_stream;
gboolean *cancelled;
} CancelledData;
static void
write_cancelled_cb (GCancellable *cancellable, CancelledData *data)
{
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_write() function that it should stop blocking and cancel */
g_mutex_lock (&(priv->write_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
}
static gssize
gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error_out)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
gssize length_written = -1;
gulong cancelled_signal = 0, global_cancelled_signal = 0;
gboolean cancelled = FALSE; /* must only be touched with ->write_mutex held */
gsize old_total_network_bytes_written;
CancelledData data;
GError *error = NULL;
/* Listen for cancellation events */
data.upload_stream = GDATA_UPLOAD_STREAM (stream);
data.cancelled = &cancelled;
global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) write_cancelled_cb, &data, NULL);
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) write_cancelled_cb, &data, NULL);
/* Check for an error and return if necessary */
g_mutex_lock (&(priv->write_mutex));
if (cancelled == TRUE) {
g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
g_mutex_unlock (&(priv->write_mutex));
length_written = -1;
goto done;
}
g_mutex_unlock (&(priv->write_mutex));
/* Increment the number of bytes outstanding for the new write, and keep a record of the old number written so we know if the write's
* finished before we reach write_cond. */
old_total_network_bytes_written = priv->total_network_bytes_written;
priv->message_bytes_outstanding += count;
/* Handle the more common case of the network thread already having been created first */
if (priv->network_thread != NULL) {
/* Push the new data into the buffer */
gdata_buffer_push_data (priv->buffer, buffer, count);
goto write;
}
/* Write out the first chunk of data, so there's guaranteed to be something in the buffer */
gdata_buffer_push_data (priv->buffer, buffer, count);
/* Create the thread and let the writing commence! */
create_network_thread (GDATA_UPLOAD_STREAM (stream), &error);
if (priv->network_thread == NULL) {
length_written = -1;
goto done;
}
write:
g_mutex_lock (&(priv->write_mutex));
/* Wait for it to be written */
while (priv->total_network_bytes_written - old_total_network_bytes_written < count && cancelled == FALSE && priv->state != STATE_FINISHED) {
g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
}
length_written = MIN (count, priv->total_network_bytes_written - old_total_network_bytes_written);
/* Check for an error and return if necessary */
if (cancelled == TRUE && length_written < 1) {
/* Cancellation. */
g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
length_written = -1;
} else if (priv->state == STATE_FINISHED && (length_written < 0 || (gsize) length_written < count)) {
/* Resumable upload error. */
g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
length_written = -1;
}
g_mutex_unlock (&(priv->write_mutex));
done:
/* Disconnect from the cancelled signals. Note that we have to do this with @write_mutex not held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
if (cancelled_signal != 0)
g_cancellable_disconnect (cancellable, cancelled_signal);
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
g_assert (error != NULL || length_written > 0);
if (error != NULL) {
g_propagate_error (error_out, error);
}
return length_written;
}
static void
flush_cancelled_cb (GCancellable *cancellable, CancelledData *data)
{
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_flush() function that it should stop blocking and cancel */
g_mutex_lock (&(priv->write_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
}
/* Block until ->network_bytes_outstanding reaches zero. Cancelling the cancellable passed to gdata_upload_stream_flush() breaks out of the wait(),
* but doesn't stop the network thread from continuing to write the remaining bytes to the network.
* The wrapper function, g_output_stream_flush(), calls g_output_stream_set_pending() before calling this function, and calls
* g_output_stream_clear_pending() afterwards, so we don't need to worry about other operations happening concurrently. We also don't need to worry
* about being called after the stream has been closed (though the network activity could finish before or during this function). */
static gboolean
gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GError **error)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
gulong cancelled_signal = 0, global_cancelled_signal = 0;
gboolean cancelled = FALSE; /* must only be touched with ->write_mutex held */
gboolean success = TRUE;
CancelledData data;
/* Listen for cancellation events */
data.upload_stream = GDATA_UPLOAD_STREAM (stream);
data.cancelled = &cancelled;
global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
/* Create the thread if it hasn't been created already. This can happen if flush() is called immediately after creating the stream. */
if (priv->network_thread == NULL) {
create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
if (priv->network_thread == NULL) {
success = FALSE;
goto done;
}
}
/* Start the flush operation proper */
g_mutex_lock (&(priv->write_mutex));
/* Wait for all outstanding bytes to be written to the network */
while (priv->network_bytes_outstanding > 0 && cancelled == FALSE && priv->state != STATE_FINISHED) {
g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
}
/* Check for an error and return if necessary */
if (cancelled == TRUE) {
g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
success = FALSE;
} else if (priv->state == STATE_FINISHED && priv->network_bytes_outstanding > 0) {
/* Resumable upload error. */
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
success = FALSE;
}
g_mutex_unlock (&(priv->write_mutex));
done:
/* Disconnect from the cancelled signals. Note that we have to do this without @write_mutex held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
if (cancelled_signal != 0)
g_cancellable_disconnect (cancellable, cancelled_signal);
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
return success;
}
static void
close_cancelled_cb (GCancellable *cancellable, CancelledData *data)
{
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_close() function that it should stop blocking and cancel */
g_mutex_lock (&(priv->response_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (&(priv->finished_cond));
g_mutex_unlock (&(priv->response_mutex));
}
/* It's guaranteed that we have set ->response_status and ->response_error and are done with *all* network activity before this returns, unless it's
* cancelled. This means that it's safe to call gdata_upload_stream_get_response() once a call to close() has returned without being cancelled.
*
* Even though calling g_output_stream_close() multiple times on this stream is guaranteed to call gdata_upload_stream_close() at most once, other
* GIO methods (notably g_output_stream_splice()) can call gdata_upload_stream_close() directly. Consequently, we need to be careful to be idempotent
* after the first call.
*
* If the network thread hasn't yet been started (i.e. gdata_upload_stream_write() hasn't been called at all yet), %TRUE will be returned immediately.
*
* If the global cancellable, ->cancellable, or @cancellable are cancelled before the call to gdata_upload_stream_close(), gdata_upload_stream_close()
* should return immediately with %G_IO_ERROR_CANCELLED. If they're cancelled during the call, gdata_upload_stream_close() should stop waiting for
* any outstanding data to be flushed to the network and return %G_IO_ERROR_CANCELLED (though the operation to finish off network activity and close
* the stream will still continue).
*
* If the call to gdata_upload_stream_close() is not cancelled by any #GCancellable, it will wait until all the data has been flushed to the network
* and a response has been received. At this point, ->response_status and ->response_error have been set (and won't ever change) and we can return
* either success or an error code. */
static gboolean
gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
gboolean success = TRUE;
gboolean cancelled = FALSE; /* must only be touched with ->response_mutex held */
gulong cancelled_signal = 0, global_cancelled_signal = 0;
CancelledData data;
gboolean is_finished;
GError *child_error = NULL;
/* If the operation was never started, return successfully immediately */
if (priv->network_thread == NULL)
return TRUE;
/* If we've already closed the stream, return G_IO_ERROR_CLOSED */
g_mutex_lock (&(priv->response_mutex));
if (priv->response_status != SOUP_STATUS_NONE) {
g_mutex_unlock (&(priv->response_mutex));
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, _("Stream is already closed"));
return FALSE;
}
g_mutex_unlock (&(priv->response_mutex));
/* Allow cancellation */
data.upload_stream = GDATA_UPLOAD_STREAM (stream);
data.cancelled = &cancelled;
global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) close_cancelled_cb, &data, NULL);
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) close_cancelled_cb, &data, NULL);
g_mutex_lock (&(priv->response_mutex));
g_mutex_lock (&(priv->write_mutex));
is_finished = (priv->state == STATE_FINISHED);
g_mutex_unlock (&(priv->write_mutex));
/* If an operation is still in progress, the upload thread hasn't finished yet… */
if (!is_finished) {
/* We've reached the end of the stream, so append the footer if the entire operation hasn't been cancelled. */
if (priv->entry != NULL && g_cancellable_is_cancelled (priv->cancellable) == FALSE) {
const gchar *footer = "\n--" BOUNDARY_STRING "--";
gsize footer_length = strlen (footer);
gdata_buffer_push_data (priv->buffer, (const guint8*) footer, footer_length);
g_mutex_lock (&(priv->write_mutex));
priv->message_bytes_outstanding += footer_length;
g_mutex_unlock (&(priv->write_mutex));
}
/* Mark the buffer as having reached EOF, and the write operation will close in its own time */
gdata_buffer_push_data (priv->buffer, NULL, 0);
/* Wait for the signal that we've finished. Cancelling the call to gdata_upload_stream_close() will cause this wait to be aborted,
* but won't actually prevent the stream being closed (i.e. all it means is that the stream isn't guaranteed to have been closed by
* the time gdata_upload_stream_close() returns — whereas normally it would be). */
if (cancelled == FALSE) {
g_cond_wait (&(priv->finished_cond), &(priv->response_mutex));
}
}
g_assert (priv->response_status == SOUP_STATUS_NONE);
g_assert (priv->response_error == NULL);
g_mutex_lock (&(priv->write_mutex));
is_finished = (priv->state == STATE_FINISHED);
g_mutex_unlock (&(priv->write_mutex));
/* Error handling */
if (!is_finished && cancelled == TRUE) {
/* Cancelled? If ->state == STATE_FINISHED, the network activity finished before the gdata_upload_stream_close() operation was
* cancelled, so we don't need to return an error. */
g_assert (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, &child_error) == TRUE);
priv->response_status = SOUP_STATUS_CANCELLED;
success = FALSE;
} else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code) == FALSE) {
GDataServiceClass *klass = GDATA_SERVICE_GET_CLASS (priv->service);
/* Parse the error */
g_assert (klass->parse_error_response != NULL);
klass->parse_error_response (priv->service, GDATA_OPERATION_UPLOAD, priv->message->status_code, priv->message->reason_phrase,
priv->message->response_body->data, priv->message->response_body->length, &child_error);
priv->response_status = priv->message->status_code;
success = FALSE;
} else {
/* Success! Set the response status */
priv->response_status = priv->message->status_code;
}
g_assert (priv->response_status != SOUP_STATUS_NONE && (SOUP_STATUS_IS_SUCCESSFUL (priv->response_status) || child_error != NULL));
g_mutex_unlock (&(priv->response_mutex));
/* Disconnect from the signal handler. Note that we have to do this with @response_mutex not held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @response_mutex. */
if (cancelled_signal != 0)
g_cancellable_disconnect (cancellable, cancelled_signal);
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
g_assert ((success == TRUE && child_error == NULL) || (success == FALSE && child_error != NULL));
if (child_error != NULL)
g_propagate_error (error, child_error);
return success;
}
/* In the network thread context, called just after writing the headers, or just after writing a chunk, to write the next chunk to libsoup.
* We don't let it return until we've finished pushing all the data into the buffer.
* This is due to http://bugzilla.gnome.org/show_bug.cgi?id=522147, which means that
* we can't use soup_session_(un)?pause_message() with a SoupSessionSync.
* If we don't return from this signal handler, the message is never paused, and thus
* Bad Things don't happen (due to the bug, messages can be paused, but not unpaused).
* Obviously this means that our memory usage will increase, and we'll eventually end
* up storing the entire request body in memory, but that's unavoidable at this point. */
static void
write_next_chunk (GDataUploadStream *self, SoupMessage *message)
{
#define CHUNK_SIZE 8192 /* 8KiB */
GDataUploadStreamPrivate *priv = self->priv;
gboolean has_network_bytes_outstanding, is_complete;
gsize length;
gboolean reached_eof = FALSE;
guint8 next_buffer[CHUNK_SIZE];
g_mutex_lock (&(priv->write_mutex));
has_network_bytes_outstanding = (priv->network_bytes_outstanding > 0);
is_complete = (priv->state == STATE_INITIAL_REQUEST ||
(priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size));
g_mutex_unlock (&(priv->write_mutex));
/* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream. Also, if we're making the initial request
* of a resumable upload, don't push new data onto the network, since all of the XML was pushed into the buffer when we started. */
if (has_network_bytes_outstanding) {
return;
} else if (is_complete) {
soup_message_body_complete (priv->message->request_body);
return;
}
/* Append the next chunk to the message body so it can join in the fun.
* Note that this call isn't necessarily blocking, and can return less than the CHUNK_SIZE. This is because
* we could deadlock if we block on getting CHUNK_SIZE bytes at the end of the stream. write() could
* easily be called with fewer bytes, but has no way to notify us that we've reached the end of the
* stream, so we'd happily block on receiving more bytes which weren't forthcoming.
*
* Note also that we can't block on this call with write_mutex locked, or we could get into a deadlock if the stream is flushed at the same
* time (in the case that we don't know the content length ahead of time). */
if (priv->content_length == -1) {
/* Non-resumable upload. */
length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
} else {
/* Resumable upload. Ensure we don't exceed the chunk size. */
length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer,
MIN (CHUNK_SIZE, priv->chunk_size - (priv->network_bytes_written +
priv->network_bytes_outstanding)), &reached_eof);
}
g_mutex_lock (&(priv->write_mutex));
priv->message_bytes_outstanding -= length;
priv->network_bytes_outstanding += length;
/* Append whatever data was returned */
if (length > 0)
soup_message_body_append (priv->message->request_body, SOUP_MEMORY_COPY, next_buffer, length);
/* Finish off the request body if we've reached EOF (i.e. the stream has been closed), or if we're doing a resumable upload and we reach
* the maximum chunk size. */
if (reached_eof == TRUE ||
(priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size)) {
g_assert (reached_eof == FALSE || priv->message_bytes_outstanding == 0);
soup_message_body_complete (priv->message->request_body);
}
g_mutex_unlock (&(priv->write_mutex));
}
static void
wrote_headers_cb (SoupMessage *message, GDataUploadStream *self)
{
GDataUploadStreamPrivate *priv = self->priv;
/* Signal the main thread that the headers have been written */
g_mutex_lock (&(priv->write_mutex));
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
/* Send the first chunk to libsoup */
write_next_chunk (self, message);
}
static void
wrote_body_data_cb (SoupMessage *message, SoupBuffer *buffer, GDataUploadStream *self)
{
GDataUploadStreamPrivate *priv = self->priv;
/* Signal the main thread that the chunk has been written */
g_mutex_lock (&(priv->write_mutex));
g_assert (priv->network_bytes_outstanding > 0);
priv->network_bytes_outstanding -= buffer->length;
priv->network_bytes_written += buffer->length;
if (priv->state == STATE_DATA_REQUESTS) {
priv->total_network_bytes_written += buffer->length;
}
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
/* Send the next chunk to libsoup */
write_next_chunk (self, message);
}
static gpointer
upload_thread (GDataUploadStream *self)
{
GDataUploadStreamPrivate *priv = self->priv;
g_assert (priv->cancellable != NULL);
while (TRUE) {
GDataServiceClass *klass;
gulong wrote_headers_signal, wrote_body_data_signal;
gchar *new_uri;
SoupMessage *new_message;
gsize next_chunk_length;
/* Connect to the wrote-* signals so we can prepare the next chunk for transmission */
wrote_headers_signal = g_signal_connect (priv->message, "wrote-headers", (GCallback) wrote_headers_cb, self);
wrote_body_data_signal = g_signal_connect (priv->message, "wrote-body-data", (GCallback) wrote_body_data_cb, self);
_gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
g_mutex_lock (&(priv->write_mutex));
/* If this is a resumable upload, continue to the next chunk. If it's a non-resumable upload, we're done. We have several cases:
* • Non-resumable upload:
* - Content only: STATE_DATA_REQUESTS → STATE_FINISHED
* - Metadata only: not supported
* - Content and metadata: STATE_DATA_REQUESTS → STATE_FINISHED
* • Resumable upload:
* - Content only:
* * STATE_INITIAL_REQUEST → STATE_DATA_REQUESTS
* * STATE_DATA_REQUESTS → STATE_DATA_REQUESTS
* * STATE_DATA_REQUESTS → STATE_FINISHED
* - Metadata only: STATE_INITIAL_REQUEST → STATE_FINISHED
* - Content and metadata:
* * STATE_INITIAL_REQUEST → STATE_DATA_REQUESTS
* * STATE_DATA_REQUESTS → STATE_DATA_REQUESTS
* * STATE_DATA_REQUESTS → STATE_FINISHED
*/
switch (priv->state) {
case STATE_INITIAL_REQUEST:
/* We're either a content-only or a content-and-metadata resumable upload. */
priv->state = STATE_DATA_REQUESTS;
/* Check the response. On success it should be empty, status 200, with a Location header telling us where to upload
* next. If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
if (!SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
goto finished;
} else if (priv->content_length == 0 && priv->message->status_code == SOUP_STATUS_CREATED) {
/* If this was a metadata-only upload, we're done. */
goto finished;
}
/* Fall out and prepare the next message */
g_assert (priv->total_network_bytes_written == 0); /* haven't written any data yet */
break;
case STATE_DATA_REQUESTS:
/* Check the response. On completion it should contain the resulting entry's XML, status 201. On continuation it should
* be empty, status 308, with a Range header and potentially a Location header telling us what/where to upload next.
* If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
if (priv->message->status_code == 308) {
/* Continuation: fall out and prepare the next message */
g_assert (priv->content_length == -1 || priv->total_network_bytes_written < (gsize) priv->content_length);
} else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
/* Completion. Check the server isn't misbehaving. */
g_assert (priv->content_length == -1 || priv->total_network_bytes_written == (gsize) priv->content_length);
goto finished;
} else {
/* Error */
goto finished;
}
/* Fall out and prepare the next message */
g_assert (priv->total_network_bytes_written > 0);
break;
case STATE_FINISHED:
default:
g_assert_not_reached ();
}
/* Prepare the next message. */
g_assert (priv->content_length != -1);
next_chunk_length = MIN (priv->content_length - priv->total_network_bytes_written, MAX_RESUMABLE_CHUNK_SIZE);
new_uri = g_strdup (soup_message_headers_get_one (priv->message->response_headers, "Location"));
if (new_uri == NULL) {
new_uri = soup_uri_to_string (soup_message_get_uri (priv->message), FALSE);
}
new_message = build_message (self, SOUP_METHOD_PUT, new_uri);
g_free (new_uri);
soup_message_headers_set_encoding (new_message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
soup_message_headers_set_content_type (new_message->request_headers, priv->content_type, NULL);
soup_message_headers_set_content_length (new_message->request_headers, next_chunk_length);
soup_message_headers_set_content_range (new_message->request_headers, priv->total_network_bytes_written,
priv->total_network_bytes_written + next_chunk_length - 1, priv->content_length);
/* Make sure the headers are set. HACK: This should actually be in build_message(), but we have to work around
* http://code.google.com/a/google.com/p/apps-api-issues/issues/detail?id=3033 in GDataDocumentsService's append_query_headers(). */
klass = GDATA_SERVICE_GET_CLASS (priv->service);
if (klass->append_query_headers != NULL) {
klass->append_query_headers (priv->service, priv->authorization_domain, new_message);
}
g_signal_handler_disconnect (priv->message, wrote_body_data_signal);
g_signal_handler_disconnect (priv->message, wrote_headers_signal);
g_object_unref (priv->message);
priv->message = new_message;
/* Reset various counters for the next upload. Note that message_bytes_outstanding may be > 0 at this point, since the client may
* have pushed some content into the buffer while we were waiting for the response to this request. */
g_assert (priv->network_bytes_outstanding == 0);
priv->chunk_size = next_chunk_length;
priv->network_bytes_written = 0;
/* Loop round and upload this chunk now. */
g_mutex_unlock (&(priv->write_mutex));
continue;
finished:
g_mutex_unlock (&(priv->write_mutex));
goto finished_outer;
}
finished_outer:
/* Signal that the operation has finished (either successfully or in error).
* Also signal write_cond, just in case we errored out and finished sending in the middle of a write. */
g_mutex_lock (&(priv->write_mutex));
priv->state = STATE_FINISHED;
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
g_cond_signal (&(priv->finished_cond));
/* Referenced in create_network_thread(). */
g_object_unref (self);
return NULL;
}
static void
create_network_thread (GDataUploadStream *self, GError **error)
{
GDataUploadStreamPrivate *priv = self->priv;
g_assert (priv->network_thread == NULL);
g_object_ref (self); /* ownership transferred to thread */
priv->network_thread = g_thread_try_new ("upload-thread", (GThreadFunc) upload_thread, self, error);
}
/**
* gdata_upload_stream_new:
* @service: a #GDataService
* @domain: (allow-none): the #GDataAuthorizationDomain to authorize the upload, or %NULL
* @method: the HTTP method to use
* @upload_uri: the URI to upload, which must be HTTPS
* @entry: (allow-none): the entry to upload as metadata, or %NULL
* @slug: the file's slug (filename)
* @content_type: the content type of the file being uploaded
* @cancellable: (allow-none): a #GCancellable for the entire upload stream, or %NULL
*
* Creates a new #GDataUploadStream, allowing a file to be uploaded from a GData service using standard #GOutputStream API.
*
* The HTTP method to use should be specified in @method, and will typically be either %SOUP_METHOD_POST (for insertions) or %SOUP_METHOD_PUT
* (for updates), according to the server and the @upload_uri.
*
* If @entry is specified, it will be attached to the upload as the entry to which the file being uploaded belongs. Otherwise, just the file
* written to the stream will be uploaded, and given a default entry as determined by the server.
*
* @slug and @content_type must be specified before the upload begins, as they describe the file being streamed. @slug is the filename given to the
* file, which will typically be stored on the server and made available when downloading the file again. @content_type must be the correct
* content type for the file, and should be in the service's list of acceptable content types.
*
* As well as the standard GIO errors, calls to the #GOutputStream API on a #GDataUploadStream can also return any relevant specific error from
* #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case.
*
* If a #GCancellable is provided in @cancellable, the upload operation may be cancelled at any time from another thread using g_cancellable_cancel().
* In this case, any ongoing network activity will be stopped, and any pending or future calls to #GOutputStream API on the #GDataUploadStream will
* return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GOutputStream operations will not cancel the
* upload operation proper if cancelled — they will merely cancel that API call. The only way to cancel the upload operation completely is using this
* @cancellable.
*
* Note that network communication won't begin until the first call to g_output_stream_write() on the #GDataUploadStream.
*
* Return value: a new #GOutputStream, or %NULL; unref with g_object_unref()
*
* Since: 0.9.0
*/
GOutputStream *
gdata_upload_stream_new (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri, GDataEntry *entry,
const gchar *slug, const gchar *content_type, GCancellable *cancellable)
{
g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL);
g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL);
g_return_val_if_fail (method != NULL, NULL);
g_return_val_if_fail (upload_uri != NULL, NULL);
g_return_val_if_fail (entry == NULL || GDATA_IS_ENTRY (entry), NULL);
g_return_val_if_fail (slug != NULL, NULL);
g_return_val_if_fail (content_type != NULL, NULL);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
/* Create the upload stream */
return G_OUTPUT_STREAM (g_object_new (GDATA_TYPE_UPLOAD_STREAM,
"method", method,
"upload-uri", upload_uri,
"service", service,
"authorization-domain", domain,
"entry", entry,
"slug", slug,
"content-type", content_type,
"cancellable", cancellable,
NULL));
}
/**
* gdata_upload_stream_new_resumable:
* @service: a #GDataService
* @domain: (allow-none): the #GDataAuthorizationDomain to authorize the upload, or %NULL
* @method: the HTTP method to use
* @upload_uri: the URI to upload
* @entry: (allow-none): the entry to upload as metadata, or %NULL
* @slug: the file's slug (filename)
* @content_type: the content type of the file being uploaded
* @content_length: the size (in bytes) of the file being uploaded
* @cancellable: (allow-none): a #GCancellable for the entire upload stream, or %NULL
*
* Creates a new resumable #GDataUploadStream, allowing a file to be uploaded from a GData service using standard #GOutputStream API. The upload will
* use GData's resumable upload API, so should be more reliable than a normal upload (especially if the file is large). See the
* <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html">GData documentation on resumable uploads</ulink> for more
* information.
*
* The HTTP method to use should be specified in @method, and will typically be either %SOUP_METHOD_POST (for insertions) or %SOUP_METHOD_PUT
* (for updates), according to the server and the @upload_uri.
*
* If @entry is specified, it will be attached to the upload as the entry to which the file being uploaded belongs. Otherwise, just the file
* written to the stream will be uploaded, and given a default entry as determined by the server.
*
* @slug, @content_type and @content_length must be specified before the upload begins, as they describe the file being streamed. @slug is the filename
* given to the file, which will typically be stored on the server and made available when downloading the file again. @content_type must be the
* correct content type for the file, and should be in the service's list of acceptable content types. @content_length must be the size of the file
* being uploaded (not including the XML for any associated #GDataEntry) in bytes. Zero is accepted if a metadata-only upload is being performed.
*
* As well as the standard GIO errors, calls to the #GOutputStream API on a #GDataUploadStream can also return any relevant specific error from
* #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case.
*
* If a #GCancellable is provided in @cancellable, the upload operation may be cancelled at any time from another thread using g_cancellable_cancel().
* In this case, any ongoing network activity will be stopped, and any pending or future calls to #GOutputStream API on the #GDataUploadStream will
* return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GOutputStream operations will not cancel the
* upload operation proper if cancelled — they will merely cancel that API call. The only way to cancel the upload operation completely is using this
* @cancellable.
*
* Note that network communication won't begin until the first call to g_output_stream_write() on the #GDataUploadStream.
*
* Return value: a new #GOutputStream, or %NULL; unref with g_object_unref()
*
* Since: 0.13.0
*/
GOutputStream *
gdata_upload_stream_new_resumable (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
GDataEntry *entry, const gchar *slug, const gchar *content_type, goffset content_length, GCancellable *cancellable)
{
g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL);
g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL);
g_return_val_if_fail (method != NULL, NULL);
g_return_val_if_fail (upload_uri != NULL, NULL);
g_return_val_if_fail (entry == NULL || GDATA_IS_ENTRY (entry), NULL);
g_return_val_if_fail (slug != NULL, NULL);
g_return_val_if_fail (content_type != NULL, NULL);
g_return_val_if_fail (content_length >= 0, NULL);
g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
/* Create the upload stream */
return G_OUTPUT_STREAM (g_object_new (GDATA_TYPE_UPLOAD_STREAM,
"method", method,
"upload-uri", upload_uri,
"service", service,
"authorization-domain", domain,
"entry", entry,
"slug", slug,
"content-type", content_type,
"content-length", content_length,
"cancellable", cancellable,
NULL));
}
/**
* gdata_upload_stream_get_response:
* @self: a #GDataUploadStream
* @length: (allow-none) (out caller-allocates): return location for the length of the response, or %NULL
*
* Returns the server's response to the upload operation performed by the #GDataUploadStream. If the operation
* is still underway, or the server's response hasn't been received yet, %NULL is returned and @length is set to <code class="literal">-1</code>.
*
* If there was an error during the upload operation (but it is complete), %NULL is returned, and @length is set to <code class="literal">0</code>.
*
* While it is safe to call this function from any thread at any time during the network operation, the only way to guarantee that the response has
* been set before calling this function is to have closed the #GDataUploadStream by calling g_output_stream_close() on it, without cancelling
* the close operation. Once the stream has been closed, all network communication is guaranteed to have finished. Note that if a call to
* g_output_stream_close() is cancelled, g_output_stream_is_closed() will immediately start to return %TRUE, even if the #GDataUploadStream is still
* attempting to flush the network buffers asynchronously — consequently, gdata_upload_stream_get_response() may still return %NULL and a @length of
* <code class="literal">-1</code>. The only reliable way to determine if the stream has been fully closed in this situation is to check the results
* of gdata_upload_stream_get_response(), rather than g_output_stream_is_closed().
*
* Return value: the server's response to the upload, or %NULL
*
* Since: 0.5.0
*/
const gchar *
gdata_upload_stream_get_response (GDataUploadStream *self, gssize *length)
{
gssize _length;
const gchar *_response;
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
g_mutex_lock (&(self->priv->response_mutex));
if (self->priv->response_status == SOUP_STATUS_NONE) {
/* We can't touch the message until the network thread has finished using it, since it isn't threadsafe */
_length = -1;
_response = NULL;
} else if (SOUP_STATUS_IS_SUCCESSFUL (self->priv->response_status) == FALSE) {
/* The response has been received, and was unsuccessful */
_length = 0;
_response = NULL;
} else {
/* The response has been received, and was successful */
_length = self->priv->message->response_body->length;
_response = self->priv->message->response_body->data;
}
g_mutex_unlock (&(self->priv->response_mutex));
if (length != NULL)
*length = _length;
return _response;
}
/**
* gdata_upload_stream_get_service:
* @self: a #GDataUploadStream
*
* Gets the service used to authorize the upload, as passed to gdata_upload_stream_new().
*
* Return value: (transfer none): the #GDataService used to authorize the upload
*
* Since: 0.5.0
*/
GDataService *
gdata_upload_stream_get_service (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->service;
}
/**
* gdata_upload_stream_get_authorization_domain:
* @self: a #GDataUploadStream
*
* Gets the authorization domain used to authorize the upload, as passed to gdata_upload_stream_new(). It may be %NULL if authorization is not
* needed for the upload.
*
* Return value: (transfer none) (allow-none): the #GDataAuthorizationDomain used to authorize the upload, or %NULL
*
* Since: 0.9.0
*/
GDataAuthorizationDomain *
gdata_upload_stream_get_authorization_domain (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->authorization_domain;
}
/**
* gdata_upload_stream_get_method:
* @self: a #GDataUploadStream
*
* Gets the HTTP request method being used to upload the file, as passed to gdata_upload_stream_new().
*
* Return value: the HTTP request method in use
*
* Since: 0.7.0
*/
const gchar *
gdata_upload_stream_get_method (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->method;
}
/**
* gdata_upload_stream_get_upload_uri:
* @self: a #GDataUploadStream
*
* Gets the URI the file is being uploaded to, as passed to gdata_upload_stream_new().
*
* Return value: the URI which the file is being uploaded to
*
* Since: 0.5.0
*/
const gchar *
gdata_upload_stream_get_upload_uri (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->upload_uri;
}
/**
* gdata_upload_stream_get_entry:
* @self: a #GDataUploadStream
*
* Gets the entry being used to upload metadata, if one was passed to gdata_upload_stream_new().
*
* Return value: (transfer none): the entry used for metadata, or %NULL
*
* Since: 0.5.0
*/
GDataEntry *
gdata_upload_stream_get_entry (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->entry;
}
/**
* gdata_upload_stream_get_slug:
* @self: a #GDataUploadStream
*
* Gets the slug (filename) of the file being uploaded.
*
* Return value: the slug of the file being uploaded
*
* Since: 0.5.0
*/
const gchar *
gdata_upload_stream_get_slug (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->slug;
}
/**
* gdata_upload_stream_get_content_type:
* @self: a #GDataUploadStream
*
* Gets the content type of the file being uploaded.
*
* Return value: the content type of the file being uploaded
*
* Since: 0.5.0
*/
const gchar *
gdata_upload_stream_get_content_type (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
return self->priv->content_type;
}
/**
* gdata_upload_stream_get_content_length:
* @self: a #GDataUploadStream
*
* Gets the size (in bytes) of the file being uploaded. This will be <code class="literal">-1</code> for a non-resumable upload, and zero or greater
* for a resumable upload.
*
* Return value: the size of the file being uploaded
*
* Since: 0.13.0
*/
goffset
gdata_upload_stream_get_content_length (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), -1);
return self->priv->content_length;
}
/**
* gdata_upload_stream_get_cancellable:
* @self: a #GDataUploadStream
*
* Gets the #GCancellable for the entire upload operation, #GDataUploadStream:cancellable.
*
* Return value: (transfer none): the #GCancellable for the entire upload operation
*
* Since: 0.8.0
*/
GCancellable *
gdata_upload_stream_get_cancellable (GDataUploadStream *self)
{
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
g_assert (self->priv->cancellable != NULL);
return self->priv->cancellable;
}