Blob Blame History Raw
/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
 * soup-multipart-input-stream.c
 *
 * Copyright (C) 2012 Collabora Ltd.
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif

#include <string.h>

#include "soup-body-input-stream.h"
#include "soup-filter-input-stream.h"
#include "soup-enum-types.h"
#include "soup-message.h"
#include "soup-message-private.h"
#include "soup-multipart-input-stream.h"

#define RESPONSE_BLOCK_SIZE 8192

/**
 * SECTION:soup-multipart-input-stream
 * @short_description: Multipart input handling stream
 *
 * This adds support for the multipart responses. For handling the
 * multiple parts the user needs to wrap the #GInputStream obtained by
 * sending the request with a #SoupMultipartInputStream and use
 * soup_multipart_input_stream_next_part() before reading. Responses
 * which are not wrapped will be treated like non-multipart responses.
 *
 * Note that although #SoupMultipartInputStream is a #GInputStream,
 * you should not read directly from it, and the results are undefined
 * if you do.
 *
 * Since: 2.40
 **/

enum {
	PROP_0,

	PROP_MESSAGE,
};

struct _SoupMultipartInputStreamPrivate {
	SoupMessage	        *msg;

	gboolean	         done_with_part;

	GByteArray	        *meta_buf;
	SoupMessageHeaders      *current_headers;

	SoupFilterInputStream   *base_stream;

	char		        *boundary;
	gsize		         boundary_size;

	goffset		        remaining_bytes;
};

static void soup_multipart_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);

G_DEFINE_TYPE_WITH_CODE (SoupMultipartInputStream, soup_multipart_input_stream, G_TYPE_FILTER_INPUT_STREAM,
                         G_ADD_PRIVATE (SoupMultipartInputStream)
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
						soup_multipart_input_stream_pollable_init))

static void
soup_multipart_input_stream_dispose (GObject *object)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);

	g_clear_object (&multipart->priv->msg);
	g_clear_object (&multipart->priv->base_stream);

	G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->dispose (object);
}

static void
soup_multipart_input_stream_finalize (GObject *object)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);

	g_free (multipart->priv->boundary);

	if (multipart->priv->meta_buf)
		g_clear_pointer (&multipart->priv->meta_buf, g_byte_array_unref);

	G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->finalize (object);
}

static void
soup_multipart_input_stream_set_property (GObject *object, guint prop_id,
					  const GValue *value, GParamSpec *pspec)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);

	switch (prop_id) {
	case PROP_MESSAGE:
		multipart->priv->msg = g_value_dup_object (value);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
		break;
	}
}

static void
soup_multipart_input_stream_get_property (GObject *object, guint prop_id,
					  GValue *value, GParamSpec *pspec)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);

	switch (prop_id) {
	case PROP_MESSAGE:
		g_value_set_object (value, multipart->priv->msg);
		break;
	default:
		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
		break;
	}
}

static gssize
soup_multipart_input_stream_read_real (GInputStream	*stream,
				       void		*buffer,
				       gsize		 count,
				       gboolean          blocking,
				       GCancellable	*cancellable,
				       GError          **error)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
	SoupMultipartInputStreamPrivate *priv = multipart->priv;
	gboolean got_boundary = FALSE;
	gssize nread = 0;
	guint8 *buf;

	g_return_val_if_fail (priv->boundary != NULL, -1);

	/* If we have received a Content-Length, and are not yet close to the end of
	 * the part, let's not look for the boundary for now. This optimization is
	 * necessary for keeping CPU usage civil.
	 */
	if (priv->remaining_bytes > priv->boundary_size) {
		goffset bytes_to_read = MIN (count, priv->remaining_bytes - priv->boundary_size);

		nread = g_pollable_stream_read (G_INPUT_STREAM (priv->base_stream),
						buffer, bytes_to_read, blocking,
						cancellable, error);

		if (nread > 0)
			priv->remaining_bytes -= nread;

		return nread;
	}

	if (priv->done_with_part)
		return 0;

	nread = soup_filter_input_stream_read_until (priv->base_stream, buffer, count,
						     priv->boundary, priv->boundary_size,
						     blocking, FALSE, &got_boundary,
						     cancellable, error);

	if (nread <= 0)
		return nread;

	if (!got_boundary)
		return nread;

	priv->done_with_part = TRUE;

	/* Ignore the newline that preceded the boundary. */
	if (nread == 1) {
		buf = ((guint8*)buffer);
		if (!memcmp (buf, "\n", 1))
			nread -= 1;
	} else {
		buf = ((guint8*)buffer) + nread - 2;
		if (!memcmp (buf, "\r\n", 2))
			nread -= 2;
		else if (!memcmp (buf, "\n", 1))
			nread -= 1;
	}

	return nread;
}

static gssize
soup_multipart_input_stream_read (GInputStream	*stream,
				  void		*buffer,
				  gsize		 count,
				  GCancellable	*cancellable,
				  GError       **error)
{
	return soup_multipart_input_stream_read_real (stream, buffer, count,
						      TRUE, cancellable, error);
}

static void
soup_multipart_input_stream_init (SoupMultipartInputStream *multipart)
{
	SoupMultipartInputStreamPrivate *priv;
	priv = multipart->priv = soup_multipart_input_stream_get_instance_private (multipart);

	priv->meta_buf = g_byte_array_sized_new (RESPONSE_BLOCK_SIZE);
	priv->done_with_part = FALSE;
}

static void
soup_multipart_input_stream_constructed (GObject *object)
{
	SoupMultipartInputStream *multipart;
	SoupMultipartInputStreamPrivate *priv;
	GInputStream *base_stream;
	const char* boundary;
	GHashTable *params = NULL;

	multipart = SOUP_MULTIPART_INPUT_STREAM (object);
	priv = multipart->priv;

	base_stream = G_FILTER_INPUT_STREAM (multipart)->base_stream;
	priv->base_stream = SOUP_FILTER_INPUT_STREAM (soup_filter_input_stream_new (base_stream));

	soup_message_headers_get_content_type (priv->msg->response_headers,
					       &params);

	boundary = g_hash_table_lookup (params, "boundary");
	if (boundary) {
		if (g_str_has_prefix (boundary, "--"))
			priv->boundary = g_strdup (boundary);
		else
			priv->boundary = g_strdup_printf ("--%s", boundary);

		priv->boundary_size = strlen (priv->boundary);
	} else {
		g_warning ("No boundary found in message tagged as multipart.");
	}

	g_hash_table_destroy (params);

	if (G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->constructed)
		G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->constructed (object);
}

static gboolean
soup_multipart_input_stream_is_readable (GPollableInputStream *stream)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
	SoupMultipartInputStreamPrivate *priv = multipart->priv;

	return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (priv->base_stream));
}

static gssize
soup_multipart_input_stream_read_nonblocking (GPollableInputStream  *stream,
					      void                  *buffer,
					      gsize                  count,
					      GError               **error)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);

	return soup_multipart_input_stream_read_real (G_INPUT_STREAM (multipart),
						      buffer, count,
						      FALSE, NULL, error);
}

static GSource *
soup_multipart_input_stream_create_source (GPollableInputStream *stream,
					   GCancellable         *cancellable)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
	SoupMultipartInputStreamPrivate *priv = multipart->priv;
	GSource *base_source, *pollable_source;

	base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->base_stream), cancellable);

	pollable_source = g_pollable_source_new_full (stream, base_source, cancellable);
	g_source_unref (base_source);

	return pollable_source;
}

static void
soup_multipart_input_stream_class_init (SoupMultipartInputStreamClass *multipart_class)
{
	GObjectClass *object_class = G_OBJECT_CLASS (multipart_class);
	GInputStreamClass *input_stream_class =
		G_INPUT_STREAM_CLASS (multipart_class);

	object_class->dispose = soup_multipart_input_stream_dispose;
	object_class->finalize = soup_multipart_input_stream_finalize;
	object_class->constructed = soup_multipart_input_stream_constructed;
	object_class->set_property = soup_multipart_input_stream_set_property;
	object_class->get_property = soup_multipart_input_stream_get_property;

	input_stream_class->read_fn = soup_multipart_input_stream_read;

	g_object_class_install_property (
		object_class, PROP_MESSAGE,
		g_param_spec_object ("message",
				     "Message",
				     "The SoupMessage",
				     SOUP_TYPE_MESSAGE,
				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));

}

static void
soup_multipart_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
					   gpointer                       interface_data)
{
	pollable_interface->is_readable = soup_multipart_input_stream_is_readable;
	pollable_interface->read_nonblocking = soup_multipart_input_stream_read_nonblocking;
	pollable_interface->create_source = soup_multipart_input_stream_create_source;
}

static void
soup_multipart_input_stream_parse_headers (SoupMultipartInputStream *multipart)
{
	SoupMultipartInputStreamPrivate *priv = multipart->priv;
	gboolean success;

	priv->current_headers = soup_message_headers_new (SOUP_MESSAGE_HEADERS_MULTIPART);

	/* The part lacks headers, but is there. */
	if (!priv->meta_buf->len)
		return;

	success = soup_headers_parse ((const char*) priv->meta_buf->data,
				      (int) priv->meta_buf->len,
				      priv->current_headers);

	if (success)
		priv->remaining_bytes = soup_message_headers_get_content_length (priv->current_headers);
	else
		g_clear_pointer (&priv->current_headers, soup_message_headers_free);

	g_byte_array_remove_range (priv->meta_buf, 0, priv->meta_buf->len);
}

static gboolean
soup_multipart_input_stream_read_headers (SoupMultipartInputStream  *multipart,
					  GCancellable		    *cancellable,
					  GError		   **error)
{
	SoupMultipartInputStreamPrivate *priv = multipart->priv;
	guchar read_buf[RESPONSE_BLOCK_SIZE];
	guchar *buf;
	gboolean got_boundary = FALSE;
	gboolean got_lf = FALSE;
	gssize nread = 0;

	g_return_val_if_fail (priv->boundary != NULL, TRUE);

	g_clear_pointer (&priv->current_headers, soup_message_headers_free);

	while (1) {
		nread = soup_filter_input_stream_read_line (priv->base_stream, read_buf, sizeof (read_buf),
							    /* blocking */ TRUE, &got_lf, cancellable, error);

		if (nread <= 0)
			break;

		g_byte_array_append (priv->meta_buf, read_buf, nread);

		/* Need to do this boundary check before checking for the line feed, since we
		 * may get the multipart end indicator without getting a new line.
		 */
		if (!got_boundary &&
		    !strncmp ((char *)priv->meta_buf->data,
			      priv->boundary,
			      priv->boundary_size)) {
			got_boundary = TRUE;

			/* Now check for possible multipart termination. */
			buf = &read_buf[nread - 4];
			if ((nread >= 4 && !memcmp (buf, "--\r\n", 4)) ||
			    (nread >= 3 && !memcmp (buf + 1, "--\n", 3)) ||
			    (nread >= 3 && !memcmp (buf + 2, "--", 2))) {
				g_byte_array_set_size (priv->meta_buf, 0);
				return FALSE;
			}
		}

		g_return_val_if_fail (got_lf, FALSE);

		/* Discard pre-boundary lines. */
		if (!got_boundary) {
			g_byte_array_set_size (priv->meta_buf, 0);
			continue;
		}

		if (nread == 1 &&
		    priv->meta_buf->len >= 2 &&
		    !strncmp ((char *)priv->meta_buf->data +
			      priv->meta_buf->len - 2,
			      "\n\n", 2))
			break;
		else if (nread == 2 &&
			 priv->meta_buf->len >= 3 &&
			 !strncmp ((char *)priv->meta_buf->data +
				   priv->meta_buf->len - 3,
				   "\n\r\n", 3))
			break;
	}

	return TRUE;
}

/* Public APIs */

/**
 * soup_multipart_input_stream_new:
 * @msg: the #SoupMessage the response is related to.
 * @base_stream: the #GInputStream returned by sending the request.
 *
 * Creates a new #SoupMultipartInputStream that wraps the
 * #GInputStream obtained by sending the #SoupRequest. Reads should
 * not be done directly through this object, use the input streams
 * returned by soup_multipart_input_stream_next_part() or its async
 * counterpart instead.
 *
 * Returns: a new #SoupMultipartInputStream
 *
 * Since: 2.40
 **/
SoupMultipartInputStream *
soup_multipart_input_stream_new (SoupMessage  *msg,
				 GInputStream *base_stream)
{
	return g_object_new (SOUP_TYPE_MULTIPART_INPUT_STREAM,
			     "message", msg,
			     "base-stream", base_stream,
			     NULL);
}

/**
 * soup_multipart_input_stream_next_part:
 * @multipart: the #SoupMultipartInputStream
 * @cancellable: a #GCancellable
 * @error: a #GError
 *
 * Obtains an input stream for the next part. When dealing with a
 * multipart response the input stream needs to be wrapped in a
 * #SoupMultipartInputStream and this function or its async
 * counterpart need to be called to obtain the first part for
 * reading.
 *
 * After calling this function,
 * soup_multipart_input_stream_get_headers() can be used to obtain the
 * headers for the first part. A read of 0 bytes indicates the end of
 * the part; a new call to this function should be done at that point,
 * to obtain the next part.
 *
 * Return value: (nullable) (transfer full): a new #GInputStream, or
 * %NULL if there are no more parts
 *
 * Since: 2.40
 */
GInputStream *
soup_multipart_input_stream_next_part (SoupMultipartInputStream  *multipart,
				       GCancellable	         *cancellable,
				       GError                   **error)
{
	if (!soup_multipart_input_stream_read_headers (multipart, cancellable, error))
		return NULL;

	soup_multipart_input_stream_parse_headers (multipart);

	multipart->priv->done_with_part = FALSE;

	return G_INPUT_STREAM (g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
					     "base-stream", G_INPUT_STREAM (multipart),
					     "close-base-stream", FALSE,
					     "encoding", SOUP_ENCODING_EOF,
					     NULL));

}

static void
soup_multipart_input_stream_next_part_thread (GTask        *task,
					      gpointer      object,
					      gpointer      task_data,
					      GCancellable *cancellable)
{
	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
	GError *error = NULL;
	GInputStream *new_stream;

	new_stream = soup_multipart_input_stream_next_part (multipart, cancellable, &error);

	g_input_stream_clear_pending (G_INPUT_STREAM (multipart));

	if (error)
		g_task_return_error (task, error);
	else
		g_task_return_pointer (task, new_stream, g_object_unref);
}

/**
 * soup_multipart_input_stream_next_part_async:
 * @multipart: the #SoupMultipartInputStream.
 * @io_priority: the I/O priority for the request.
 * @cancellable: a #GCancellable.
 * @callback: callback to call when request is satisfied.
 * @data: data for @callback
 *
 * Obtains a #GInputStream for the next request. See
 * soup_multipart_input_stream_next_part() for details on the
 * workflow.
 *
 * Since: 2.40
 */
void
soup_multipart_input_stream_next_part_async (SoupMultipartInputStream *multipart,
					     int                       io_priority,
					     GCancellable	      *cancellable,
					     GAsyncReadyCallback       callback,
					     gpointer		       data)
{
	GInputStream *stream = G_INPUT_STREAM (multipart);
	GTask *task;
	GError *error = NULL;

	g_return_if_fail (SOUP_IS_MULTIPART_INPUT_STREAM (multipart));

	task = g_task_new (multipart, cancellable, callback, data);
	g_task_set_priority (task, io_priority);

	if (!g_input_stream_set_pending (stream, &error)) {
		g_task_return_error (task, error);
		g_object_unref (task);
		return;
	}

	g_task_run_in_thread (task, soup_multipart_input_stream_next_part_thread);
	g_object_unref (task);
}

/**
 * soup_multipart_input_stream_next_part_finish:
 * @multipart: a #SoupMultipartInputStream.
 * @result: a #GAsyncResult.
 * @error: a #GError location to store any error, or %NULL to ignore.
 *
 * Finishes an asynchronous request for the next part.
 *
 * Return value: (nullable) (transfer full): a newly created
 * #GInputStream for reading the next part or %NULL if there are no
 * more parts.
 *
 * Since: 2.40
 */
GInputStream *
soup_multipart_input_stream_next_part_finish (SoupMultipartInputStream	*multipart,
					      GAsyncResult		*result,
					      GError		       **error)
{
	g_return_val_if_fail (g_task_is_valid (result, multipart), FALSE);

	return g_task_propagate_pointer (G_TASK (result), error);
}

/**
 * soup_multipart_input_stream_get_headers:
 * @multipart: a #SoupMultipartInputStream.
 *
 * Obtains the headers for the part currently being processed. Note
 * that the #SoupMessageHeaders that are returned are owned by the
 * #SoupMultipartInputStream and will be replaced when a call is made
 * to soup_multipart_input_stream_next_part() or its async
 * counterpart, so if keeping the headers is required, a copy must be
 * made.
 *
 * Note that if a part had no headers at all an empty #SoupMessageHeaders
 * will be returned.
 *
 * Return value: (nullable) (transfer none): a #SoupMessageHeaders
 * containing the headers for the part currently being processed or
 * %NULL if the headers failed to parse.
 *
 * Since: 2.40
 */
SoupMessageHeaders *
soup_multipart_input_stream_get_headers (SoupMultipartInputStream *multipart)
{
	return multipart->priv->current_headers;
}