diff --git a/libsoup/soup-websocket-connection.c b/libsoup/soup-websocket-connection.c index 2ed4740..0174f4f 100644 --- a/libsoup/soup-websocket-connection.c +++ b/libsoup/soup-websocket-connection.c @@ -147,6 +147,7 @@ struct _SoupWebsocketConnectionPrivate { }; #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT 128 * 1024 +#define READ_BUFFER_SIZE 1024 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT) @@ -155,6 +156,11 @@ static void queue_frame (SoupWebsocketConnection *self, SoupWebsocketQueueFlags static void protocol_error_and_close (SoupWebsocketConnection *self); +static gboolean on_web_socket_input (GObject *pollable_stream, + gpointer user_data); +static gboolean on_web_socket_output (GObject *pollable_stream, + gpointer user_data); + /* Code below is based on g_utf8_validate() implementation, * but handling NULL characters as valid, as expected by * WebSockets and compliant with RFC 3629. @@ -283,7 +289,20 @@ on_iostream_closed (GObject *source, } static void -stop_input (SoupWebsocketConnection *self) +soup_websocket_connection_start_input_source (SoupWebsocketConnection *self) +{ + SoupWebsocketConnectionPrivate *pv = self->pv; + + if (pv->input_source) + return; + + pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL); + g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL); + g_source_attach (pv->input_source, pv->main_context); +} + +static void +soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self) { SoupWebsocketConnectionPrivate *pv = self->pv; @@ -296,7 +315,20 @@ stop_input (SoupWebsocketConnection *self) } static void -stop_output (SoupWebsocketConnection *self) +soup_websocket_connection_start_output_source (SoupWebsocketConnection *self) +{ + SoupWebsocketConnectionPrivate *pv = self->pv; + + if (pv->output_source) + return; + + pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL); + g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL); + g_source_attach (pv->output_source, pv->main_context); +} + +static void +soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self) { SoupWebsocketConnectionPrivate *pv = self->pv; @@ -341,8 +373,8 @@ close_io_stream (SoupWebsocketConnection *self) close_io_stop_timeout (self); if (!pv->io_closing) { - stop_input (self); - stop_output (self); + soup_websocket_connection_stop_input_source (self); + soup_websocket_connection_stop_output_source (self); pv->io_closing = TRUE; g_debug ("closing io stream"); g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT, @@ -359,7 +391,7 @@ shutdown_wr_io_stream (SoupWebsocketConnection *self) GSocket *socket; GError *error = NULL; - stop_output (self); + soup_websocket_connection_stop_output_source (self); if (G_IS_SOCKET_CONNECTION (pv->io_stream)) { socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream)); @@ -612,9 +644,6 @@ too_big_error_and_close (SoupWebsocketConnection *self, self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client", payload_len, self->pv->max_incoming_payload_size); emit_error_and_close (self, error, TRUE); - - /* The input is in an invalid state now */ - stop_input (self); } static void @@ -981,32 +1010,31 @@ process_incoming (SoupWebsocketConnection *self) ; } -static gboolean -on_web_socket_input (GObject *pollable_stream, - gpointer user_data) +static void +soup_websocket_connection_read (SoupWebsocketConnection *self) { - SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data); SoupWebsocketConnectionPrivate *pv = self->pv; GError *error = NULL; gboolean end = FALSE; gssize count; gsize len; + soup_websocket_connection_stop_input_source (self); + do { len = pv->incoming->len; - g_byte_array_set_size (pv->incoming, len + 1024); + g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE); count = g_pollable_input_stream_read_nonblocking (pv->input, pv->incoming->data + len, - 1024, NULL, &error); - + READ_BUFFER_SIZE, NULL, &error); if (count < 0) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_error_free (error); count = 0; } else { emit_error_and_close (self, error, TRUE); - return TRUE; + return; } } else if (count == 0) { end = TRUE; @@ -1026,16 +1054,24 @@ on_web_socket_input (GObject *pollable_stream, } close_io_stream (self); + return; } - return TRUE; + soup_websocket_connection_start_input_source (self); } static gboolean -on_web_socket_output (GObject *pollable_stream, - gpointer user_data) +on_web_socket_input (GObject *pollable_stream, + gpointer user_data) +{ + soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data)); + + return G_SOURCE_REMOVE; +} + +static void +soup_websocket_connection_write (SoupWebsocketConnection *self) { - SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data); SoupWebsocketConnectionPrivate *pv = self->pv; const guint8 *data; GError *error = NULL; @@ -1043,19 +1079,18 @@ on_web_socket_output (GObject *pollable_stream, gssize count; gsize len; + soup_websocket_connection_stop_output_source (self); + if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) { g_debug ("Ignoring message since the connection is closed"); - stop_output (self); - return TRUE; + return; } frame = g_queue_peek_head (&pv->outgoing); /* No more frames to send */ - if (frame == NULL) { - stop_output (self); - return TRUE; - } + if (frame == NULL) + return; data = g_bytes_get_data (frame->data, &len); g_assert (len > 0); @@ -1075,7 +1110,7 @@ on_web_socket_output (GObject *pollable_stream, frame->pending = TRUE; } else { emit_error_and_close (self, error, TRUE); - return FALSE; + return; } } @@ -1093,23 +1128,21 @@ on_web_socket_output (GObject *pollable_stream, } } frame_free (frame); + + if (g_queue_is_empty (&pv->outgoing)) + return; } - return TRUE; + soup_websocket_connection_start_output_source (self); } -static void -start_output (SoupWebsocketConnection *self) +static gboolean +on_web_socket_output (GObject *pollable_stream, + gpointer user_data) { - SoupWebsocketConnectionPrivate *pv = self->pv; - - if (pv->output_source) - return; + soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data)); - g_debug ("starting output source"); - pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL); - g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL); - g_source_attach (pv->output_source, pv->main_context); + return G_SOURCE_REMOVE; } static void @@ -1150,7 +1183,7 @@ queue_frame (SoupWebsocketConnection *self, g_queue_push_tail (&pv->outgoing, frame); } - start_output (self); + soup_websocket_connection_write (self); } static void @@ -1175,9 +1208,7 @@ soup_websocket_connection_constructed (GObject *object) pv->output = G_POLLABLE_OUTPUT_STREAM (os); g_return_if_fail (g_pollable_output_stream_can_poll (pv->output)); - pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL); - g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL); - g_source_attach (pv->input_source, pv->main_context); + soup_websocket_connection_start_input_source (self); } static void diff --git a/tests/websocket-test.c b/tests/websocket-test.c index 3caa3b2..b30576b 100644 --- a/tests/websocket-test.c +++ b/tests/websocket-test.c @@ -714,6 +714,7 @@ close_after_close_server_thread (gpointer user_data) const char frames[] = "\x88\x09\x03\xe8""reason1" "\x88\x09\x03\xe8""reason2"; + GSocket *socket; GError *error = NULL; g_mutex_lock (&test->mutex); @@ -723,7 +724,8 @@ close_after_close_server_thread (gpointer user_data) frames, sizeof (frames) -1, &written, NULL, &error); g_assert_no_error (error); g_assert_cmpuint (written, ==, sizeof (frames) - 1); - g_io_stream_close (test->raw_server, NULL, &error); + socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server)); + g_socket_shutdown (socket, FALSE, TRUE, &error); g_assert_no_error (error); return NULL; @@ -747,6 +749,7 @@ test_close_after_close (Test *test, WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED); g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, SOUP_WEBSOCKET_CLOSE_NORMAL); g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1"); + g_io_stream_close (test->raw_server, NULL, NULL); } static gpointer