Blob Blame History Raw
/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
 * Copyright 2013 Red Hat, Inc.
 */

#include "test-utils.h"

static void
force_io_streams_init (void)
{
	SoupServer *server;
	SoupSession *session;
	SoupURI *base_uri;
	SoupMessage *msg;

	/* Poke libsoup enough to cause SoupBodyInputStream and
	 * SoupBodyOutputStream to get defined, so we can find them
	 * via g_type_from_name() later.
	 */

	server = soup_test_server_new (TRUE);
	base_uri = soup_test_server_get_uri (server, "http", NULL);

	session = soup_test_session_new (SOUP_TYPE_SESSION, NULL);
	msg = soup_message_new_from_uri ("POST", base_uri);
	soup_session_send_message (session, msg);
	g_object_unref (msg);
	soup_test_session_abort_unref (session);

	soup_uri_free (base_uri);
	soup_test_server_quit_unref (server);
}

typedef struct {
	GFilterInputStream grandparent;

	gpointer *soup_filter_input_stream_private;

	gboolean is_readable;
} SlowInputStream;

typedef struct {
	GFilterInputStreamClass grandparent;
} SlowInputStreamClass;

GType slow_input_stream_get_type (void);
static void slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
					     gpointer interface_data);

G_DEFINE_TYPE_WITH_CODE (SlowInputStream, slow_input_stream,
			 g_type_from_name ("SoupFilterInputStream"),
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, slow_pollable_input_stream_init);
			 )

static void
slow_input_stream_init (SlowInputStream *sis)
{
}

static gssize
slow_input_stream_read (GInputStream  *stream,
			void          *buffer,
			gsize          count,
			GCancellable  *cancellable,
			GError       **error)
{
	return g_input_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream,
				    buffer, 1, cancellable, error);
}

static void
slow_input_stream_class_init (SlowInputStreamClass *sisclass)
{
	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (sisclass);

	input_stream_class->read_fn = slow_input_stream_read;
}

static gboolean
slow_input_stream_is_readable (GPollableInputStream *stream)
{
	return ((SlowInputStream *)stream)->is_readable;
}

static gssize
slow_input_stream_read_nonblocking (GPollableInputStream  *stream,
				    void                  *buffer,
				    gsize                  count,
				    GError               **error)
{
	if (((SlowInputStream *)stream)->is_readable) {
		((SlowInputStream *)stream)->is_readable = FALSE;
		return slow_input_stream_read (G_INPUT_STREAM (stream), buffer, count,
					       NULL, error);
	} else {
		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
				     "would block");
		return -1;
	}
}

static GSource *
slow_input_stream_create_source (GPollableInputStream *stream,
				 GCancellable *cancellable)
{
	GSource *base_source, *pollable_source;

	((SlowInputStream *)stream)->is_readable = TRUE;
	base_source = g_timeout_source_new (0);
	g_source_set_dummy_callback (base_source);

	pollable_source = g_pollable_source_new (G_OBJECT (stream));
	g_source_add_child_source (pollable_source, base_source);
	g_source_unref (base_source);

	return pollable_source;
}

static void
slow_pollable_input_stream_init (GPollableInputStreamInterface *pollable_interface,
				 gpointer interface_data)
{
	pollable_interface->is_readable = slow_input_stream_is_readable;
	pollable_interface->read_nonblocking = slow_input_stream_read_nonblocking;
	pollable_interface->create_source = slow_input_stream_create_source;
}

typedef struct {
	GFilterOutputStream parent;

	gboolean is_writable;
} SlowOutputStream;

typedef struct {
	GFilterOutputStreamClass parent;
} SlowOutputStreamClass;

GType slow_output_stream_get_type (void);

static void slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
					      gpointer interface_data);

G_DEFINE_TYPE_WITH_CODE (SlowOutputStream, slow_output_stream,
			 g_type_from_name ("GFilterOutputStream"),
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, slow_pollable_output_stream_init);
			 )

static void
slow_output_stream_init (SlowOutputStream *sis)
{
}

static gssize
slow_output_stream_write (GOutputStream  *stream,
			  const void     *buffer,
			  gsize           count,
			  GCancellable   *cancellable,
			  GError        **error)
{
	return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
				      buffer, 1, cancellable, error);
}

static void
slow_output_stream_class_init (SlowOutputStreamClass *sisclass)
{
	GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);

	output_stream_class->write_fn = slow_output_stream_write;
}

static gboolean
slow_output_stream_is_writable (GPollableOutputStream *stream)
{
	return ((SlowOutputStream *)stream)->is_writable;
}

static gssize
slow_output_stream_write_nonblocking (GPollableOutputStream  *stream,
				      const void             *buffer,
				      gsize                   count,
				      GError                **error)
{
	if (((SlowOutputStream *)stream)->is_writable) {
		((SlowOutputStream *)stream)->is_writable = FALSE;
		return slow_output_stream_write (G_OUTPUT_STREAM (stream), buffer, count,
						 NULL, error);
	} else {
		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
				     "would block");
		return -1;
	}
}

static GSource *
slow_output_stream_create_source (GPollableOutputStream *stream,
				  GCancellable *cancellable)
{
	GSource *base_source, *pollable_source;

	((SlowOutputStream *)stream)->is_writable = TRUE;
	base_source = g_timeout_source_new (0);
	g_source_set_dummy_callback (base_source);

	pollable_source = g_pollable_source_new (G_OBJECT (stream));
	g_source_add_child_source (pollable_source, base_source);
	g_source_unref (base_source);

	return pollable_source;
}

static void
slow_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
				  gpointer interface_data)
{
	pollable_interface->is_writable = slow_output_stream_is_writable;
	pollable_interface->write_nonblocking = slow_output_stream_write_nonblocking;
	pollable_interface->create_source = slow_output_stream_create_source;
}

typedef struct {
	GFilterOutputStream parent;

	gboolean is_broken;
} BreakingOutputStream;

typedef struct {
	GFilterOutputStreamClass parent;
} BreakingOutputStreamClass;

GType breaking_output_stream_get_type (void);

static void breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
						  gpointer interface_data);

G_DEFINE_TYPE_WITH_CODE (BreakingOutputStream, breaking_output_stream,
			 g_type_from_name ("GFilterOutputStream"),
			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, breaking_pollable_output_stream_init);
			 )

static void
breaking_output_stream_init (BreakingOutputStream *sis)
{
}

static gssize
breaking_output_stream_write (GOutputStream  *stream,
			      const void     *buffer,
			      gsize           count,
			      GCancellable   *cancellable,
			      GError        **error)
{
	if (((BreakingOutputStream *)stream)->is_broken) {
		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
		return -1;
	}

	if (count > 128) {
		((BreakingOutputStream *)stream)->is_broken = TRUE;
		count /= 2;
	}
	return g_output_stream_write (G_FILTER_OUTPUT_STREAM (stream)->base_stream,
				      buffer, count, cancellable, error);
}

static void
breaking_output_stream_class_init (BreakingOutputStreamClass *sisclass)
{
	GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (sisclass);

	output_stream_class->write_fn = breaking_output_stream_write;
}

static gboolean
breaking_output_stream_is_writable (GPollableOutputStream *stream)
{
	return TRUE;
}

static gssize
breaking_output_stream_write_nonblocking (GPollableOutputStream  *stream,
					  const void             *buffer,
					  gsize                   count,
					  GError                **error)
{
	if (((BreakingOutputStream *)stream)->is_broken) {
		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, "failed");
		return -1;
	}

	if (count > 128) {
		((BreakingOutputStream *)stream)->is_broken = TRUE;
		count /= 2;
	}
	return g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (G_FILTER_OUTPUT_STREAM (stream)->base_stream),
							   buffer, count, NULL, error);
}

static GSource *
breaking_output_stream_create_source (GPollableOutputStream *stream,
				      GCancellable *cancellable)
{
	GSource *base_source, *pollable_source;

	base_source = g_timeout_source_new (0);
	g_source_set_dummy_callback (base_source);

	pollable_source = g_pollable_source_new (G_OBJECT (stream));
	g_source_add_child_source (pollable_source, base_source);
	g_source_unref (base_source);

	return pollable_source;
}

static void
breaking_pollable_output_stream_init (GPollableOutputStreamInterface *pollable_interface,
				  gpointer interface_data)
{
	pollable_interface->is_writable = breaking_output_stream_is_writable;
	pollable_interface->write_nonblocking = breaking_output_stream_write_nonblocking;
	pollable_interface->create_source = breaking_output_stream_create_source;
}

#define CHUNK_SIZE 1024

static GString *
chunkify (const char *str, gsize length)
{
	GString *gstr;
	int i, size;

	gstr = g_string_new (NULL);
	for (i = 0; i < length; i += CHUNK_SIZE) {
		size = MIN (CHUNK_SIZE, length - i);
		g_string_append_printf (gstr, "%x\r\n", size);
		g_string_append_len (gstr, str + i, size);
		g_string_append (gstr, "\r\n");
	}
	g_string_append (gstr, "0\r\n\r\n");

	return gstr;
}

static void
do_io_tests (void)
{
	GInputStream *imem, *islow, *in;
	GOutputStream *omem, *oslow, *out;
	GMemoryOutputStream *mem;
	SoupBuffer *raw_contents;
	char *buf;
	GString *chunkified;
	GError *error = NULL;
	gssize nread, nwrote, total;
	gssize chunk_length, chunk_total;

	raw_contents = soup_test_get_index ();
	chunkified = chunkify (raw_contents->data, raw_contents->length);

	debug_printf (1, "  sync read\n");

	imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
	islow = g_object_new (slow_input_stream_get_type (),
			      "base-stream", imem,
			      "close-base-stream", TRUE,
			      NULL);
	in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
			   "base-stream", islow,
			   "close-base-stream", TRUE,
			   "encoding", SOUP_ENCODING_CHUNKED,
			   NULL);
	g_object_unref (imem);
	g_object_unref (islow);

	buf = g_malloc (raw_contents->length);
	total = 0;
	while (TRUE) {
		nread = g_input_stream_read (in, buf + total,
					     raw_contents->length - total,
					     NULL, &error);
		g_assert_no_error (error);
		g_clear_error (&error);
		if (nread > 0)
			total += nread;
		else
			break;
	}

	g_input_stream_close (in, NULL, &error);
	g_assert_no_error (error);
	g_clear_error (&error);
	g_object_unref (in);

	soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
	g_free (buf);

	debug_printf (1, "  async read\n");

	imem = g_memory_input_stream_new_from_data (chunkified->str, chunkified->len, NULL);
	islow = g_object_new (slow_input_stream_get_type (),
			      "base-stream", imem,
			      "close-base-stream", TRUE,
			      NULL);
	in = g_object_new (g_type_from_name ("SoupBodyInputStream"),
			   "base-stream", islow,
			   "close-base-stream", TRUE,
			   "encoding", SOUP_ENCODING_CHUNKED,
			   NULL);
	g_object_unref (imem);
	g_object_unref (islow);

	buf = g_malloc (raw_contents->length);
	total = 0;
	while (TRUE) {
		nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in),
								  buf + total,
								  raw_contents->length - total,
								  NULL, &error);
		if (nread == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
			GSource *source;

			g_clear_error (&error);
			source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL);
			g_source_set_dummy_callback (source);
			g_source_attach (source, NULL);
			while (!g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (in)))
				g_main_context_iteration (NULL, TRUE);
			g_source_destroy (source);
			g_source_unref (source);
			continue;
		} else if (nread == -1) {
			g_assert_no_error (error);
			g_clear_error (&error);
			break;
		} else if (nread == 0)
			break;
		else
			total += nread;
	}

	g_input_stream_close (in, NULL, &error);
	g_assert_no_error (error);
	g_clear_error (&error);
	g_object_unref (in);

	soup_assert_cmpmem (buf, total, raw_contents->data, raw_contents->length);
	g_free (buf);

	debug_printf (1, "  sync write\n");

	buf = g_malloc (chunkified->len);
	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
	oslow = g_object_new (slow_output_stream_get_type (),
			      "base-stream", omem,
			      "close-base-stream", TRUE,
			      NULL);
	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
			    "base-stream", oslow,
			    "close-base-stream", TRUE,
			    "encoding", SOUP_ENCODING_CHUNKED,
			    NULL);
	g_object_unref (omem);
	g_object_unref (oslow);

	total = chunk_length = chunk_total = 0;
	while (total < raw_contents->length) {
		if (chunk_total == chunk_length) {
			chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
			chunk_total = 0;
		}
		nwrote = g_output_stream_write (out, raw_contents->data + total,
						chunk_length - chunk_total, NULL, &error);
		g_assert_no_error (error);
		g_clear_error (&error);
		if (nwrote > 0) {
			total += nwrote;
			chunk_total += nwrote;
		} else
			break;
	}

	g_output_stream_close (out, NULL, &error);
	g_assert_no_error (error);
	g_clear_error (&error);

	mem = G_MEMORY_OUTPUT_STREAM (omem);
	soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
			    g_memory_output_stream_get_data_size (mem),
			    chunkified->str, chunkified->len);

	g_object_unref (out);
	g_free (buf);

	debug_printf (1, "  async write\n");

	buf = g_malloc (chunkified->len);
	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
	oslow = g_object_new (slow_output_stream_get_type (),
			      "base-stream", omem,
			      "close-base-stream", TRUE,
			      NULL);
	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
			    "base-stream", oslow,
			    "close-base-stream", TRUE,
			    "encoding", SOUP_ENCODING_CHUNKED,
			    NULL);
	g_object_unref (omem);
	g_object_unref (oslow);

	total = chunk_length = chunk_total = 0;
	while (total < raw_contents->length) {
		if (chunk_total == chunk_length) {
			chunk_length = MIN (CHUNK_SIZE, raw_contents->length - total);
			chunk_total = 0;
		}
		nwrote = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (out),
								     raw_contents->data + total,
								     chunk_length - chunk_total,
								     NULL, &error);
		if (nwrote == -1 && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
			GSource *source;

			g_clear_error (&error);
			source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (out), NULL);
			g_source_set_dummy_callback (source);
			g_source_attach (source, NULL);
			while (!g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (out)))
				g_main_context_iteration (NULL, TRUE);
			g_source_destroy (source);
			g_source_unref (source);
			continue;
		} else if (nwrote == -1) {
			g_assert_no_error (error);
			g_clear_error (&error);
			break;
		} else {
			total += nwrote;
			chunk_total += nwrote;
		}
	}

	g_output_stream_close (out, NULL, &error);
	g_assert_no_error (error);
	g_clear_error (&error);

	mem = G_MEMORY_OUTPUT_STREAM (omem);
	soup_assert_cmpmem (g_memory_output_stream_get_data (mem),
			    g_memory_output_stream_get_data_size (mem),
			    chunkified->str, chunkified->len);

	g_object_unref (out);
	g_free (buf);

	debug_printf (1, "  failed write\n");
	/* this succeeds if it doesn't critical */

	buf = g_malloc (chunkified->len);
	omem = g_memory_output_stream_new (buf, chunkified->len, NULL, NULL);
	oslow = g_object_new (breaking_output_stream_get_type (),
			      "base-stream", omem,
			      "close-base-stream", TRUE,
			      NULL);
	out = g_object_new (g_type_from_name ("SoupBodyOutputStream"),
			    "base-stream", oslow,
			    "close-base-stream", TRUE,
			    "encoding", SOUP_ENCODING_CHUNKED,
			    NULL);
	g_object_unref (omem);
	g_object_unref (oslow);

	total = 0;
	while (total < raw_contents->length) {
		nwrote = g_output_stream_write (out, raw_contents->data + total,
						raw_contents->length - total, NULL, NULL);
		if (nwrote == -1)
			break;
		else
			total += nwrote;
	}

	g_assert_cmpint (total, !=, raw_contents->length);

	g_output_stream_close (out, NULL, NULL);
	g_object_unref (out);

	g_free (buf);

	g_string_free (chunkified, TRUE);
}

int
main (int argc, char **argv)
{
	int ret;

	test_init (argc, argv, NULL);

	force_io_streams_init ();

	g_test_add_func ("/chunk-io", do_io_tests);

	ret = g_test_run ();

	test_cleanup ();
	return ret;
}