/** * MessageQueueSource: * * This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a * message can be pulled off the queue. Messages can be enqueued from any * thread. * * The callbacks dispatched by a #MessageQueueSource have type * #MessageQueueSourceFunc. * * #MessageQueueSource supports adding a #GCancellable child source which will * additionally dispatch if a provided #GCancellable is cancelled. */ typedef struct { GSource parent; GAsyncQueue *queue; /* owned */ GDestroyNotify destroy_message; } MessageQueueSource; /** * MessageQueueSourceFunc: * @message: (transfer full) (nullable): message pulled off the queue * @user_data: user data provided to g_source_set_callback() * * Callback function type for #MessageQueueSource. */ typedef gboolean (*MessageQueueSourceFunc) (gpointer message, gpointer user_data); static gboolean message_queue_source_prepare (GSource *source, gint *timeout_) { MessageQueueSource *message_queue_source = (MessageQueueSource *) source; return (g_async_queue_length (message_queue_source->queue) > 0); } static gboolean message_queue_source_dispatch (GSource *source, GSourceFunc callback, gpointer user_data) { MessageQueueSource *message_queue_source = (MessageQueueSource *) source; gpointer message; MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback; /* Pop a message off the queue. */ message = g_async_queue_try_pop (message_queue_source->queue); /* If there was no message, bail. */ if (message == NULL) { /* Keep the source around to handle the next message. */ return TRUE; } /* @func may be %NULL if no callback was specified. * If so, drop the message. */ if (func == NULL) { if (message_queue_source->destroy_message != NULL) { message_queue_source->destroy_message (message); } /* Keep the source around to consume the next message. */ return TRUE; } return func (message, user_data); } static void message_queue_source_finalize (GSource *source) { MessageQueueSource *message_queue_source = (MessageQueueSource *) source; g_async_queue_unref (message_queue_source->queue); } static gboolean message_queue_source_closure_callback (gpointer message, gpointer user_data) { GClosure *closure = user_data; GValue param_value = G_VALUE_INIT; GValue result_value = G_VALUE_INIT; gboolean retval; /* The invoked function is responsible for freeing @message. */ g_value_init (&result_value, G_TYPE_BOOLEAN); g_value_init (¶m_value, G_TYPE_POINTER); g_value_set_pointer (¶m_value, message); g_closure_invoke (closure, &result_value, 1, ¶m_value, NULL); retval = g_value_get_boolean (&result_value); g_value_unset (¶m_value); g_value_unset (&result_value); return retval; } static GSourceFuncs message_queue_source_funcs = { message_queue_source_prepare, NULL, /* check */ message_queue_source_dispatch, message_queue_source_finalize, (GSourceFunc) message_queue_source_closure_callback, NULL, }; /** * message_queue_source_new: * @queue: the queue to check * @destroy_message: (nullable): function to free a message, or %NULL * @cancellable: (nullable): a #GCancellable, or %NULL * * Create a new #MessageQueueSource, a type of #GSource which dispatches for * each message queued to it. * * If a callback function of type #MessageQueueSourceFunc is connected to the * returned #GSource using g_source_set_callback(), it will be invoked for each * message, with the message passed as its first argument. It is responsible for * freeing the message. If no callback is set, messages are automatically freed * as they are queued. * * Returns: (transfer full): a new #MessageQueueSource */ GSource * message_queue_source_new (GAsyncQueue *queue, GDestroyNotify destroy_message, GCancellable *cancellable) { GSource *source; /* alias of @message_queue_source */ MessageQueueSource *message_queue_source; /* alias of @source */ g_return_val_if_fail (queue != NULL, NULL); g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL); source = g_source_new (&message_queue_source_funcs, sizeof (MessageQueueSource)); message_queue_source = (MessageQueueSource *) source; /* The caller can overwrite this name with something more useful later. */ g_source_set_name (source, "MessageQueueSource"); message_queue_source->queue = g_async_queue_ref (queue); message_queue_source->destroy_message = destroy_message; /* Add a cancellable source. */ if (cancellable != NULL) { GSource *cancellable_source; cancellable_source = g_cancellable_source_new (cancellable); g_source_set_dummy_callback (cancellable_source); g_source_add_child_source (source, cancellable_source); g_source_unref (cancellable_source); } return source; }