|
Packit |
1470ea |
/**
|
|
Packit |
1470ea |
* MessageQueueSource:
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
|
|
Packit |
1470ea |
* message can be pulled off the queue. Messages can be enqueued from any
|
|
Packit |
1470ea |
* thread.
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* The callbacks dispatched by a #MessageQueueSource have type
|
|
Packit |
1470ea |
* #MessageQueueSourceFunc.
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* #MessageQueueSource supports adding a #GCancellable child source which will
|
|
Packit |
1470ea |
* additionally dispatch if a provided #GCancellable is cancelled.
|
|
Packit |
1470ea |
*/
|
|
Packit |
1470ea |
typedef struct {
|
|
Packit |
1470ea |
GSource parent;
|
|
Packit |
1470ea |
GAsyncQueue *queue; /* owned */
|
|
Packit |
1470ea |
GDestroyNotify destroy_message;
|
|
Packit |
1470ea |
} MessageQueueSource;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/**
|
|
Packit |
1470ea |
* MessageQueueSourceFunc:
|
|
Packit |
1470ea |
* @message: (transfer full) (nullable): message pulled off the queue
|
|
Packit |
1470ea |
* @user_data: user data provided to g_source_set_callback()
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* Callback function type for #MessageQueueSource.
|
|
Packit |
1470ea |
*/
|
|
Packit |
1470ea |
typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
|
|
Packit |
1470ea |
gpointer user_data);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
static gboolean
|
|
Packit |
1470ea |
message_queue_source_prepare (GSource *source,
|
|
Packit |
1470ea |
gint *timeout_)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
return (g_async_queue_length (message_queue_source->queue) > 0);
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
static gboolean
|
|
Packit |
1470ea |
message_queue_source_dispatch (GSource *source,
|
|
Packit |
1470ea |
GSourceFunc callback,
|
|
Packit |
1470ea |
gpointer user_data)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
|
|
Packit |
1470ea |
gpointer message;
|
|
Packit |
1470ea |
MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* Pop a message off the queue. */
|
|
Packit |
1470ea |
message = g_async_queue_try_pop (message_queue_source->queue);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* If there was no message, bail. */
|
|
Packit |
1470ea |
if (message == NULL)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
/* Keep the source around to handle the next message. */
|
|
Packit |
1470ea |
return TRUE;
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* @func may be %NULL if no callback was specified.
|
|
Packit |
1470ea |
* If so, drop the message. */
|
|
Packit |
1470ea |
if (func == NULL)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
if (message_queue_source->destroy_message != NULL)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
message_queue_source->destroy_message (message);
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* Keep the source around to consume the next message. */
|
|
Packit |
1470ea |
return TRUE;
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
return func (message, user_data);
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
static void
|
|
Packit |
1470ea |
message_queue_source_finalize (GSource *source)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
g_async_queue_unref (message_queue_source->queue);
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
static gboolean
|
|
Packit |
1470ea |
message_queue_source_closure_callback (gpointer message,
|
|
Packit |
1470ea |
gpointer user_data)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
GClosure *closure = user_data;
|
|
Packit |
1470ea |
GValue param_value = G_VALUE_INIT;
|
|
Packit |
1470ea |
GValue result_value = G_VALUE_INIT;
|
|
Packit |
1470ea |
gboolean retval;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* The invoked function is responsible for freeing @message. */
|
|
Packit |
1470ea |
g_value_init (&result_value, G_TYPE_BOOLEAN);
|
|
Packit |
1470ea |
g_value_init (¶m_value, G_TYPE_POINTER);
|
|
Packit |
1470ea |
g_value_set_pointer (¶m_value, message);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
g_closure_invoke (closure, &result_value, 1, ¶m_value, NULL);
|
|
Packit |
1470ea |
retval = g_value_get_boolean (&result_value);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
g_value_unset (¶m_value);
|
|
Packit |
1470ea |
g_value_unset (&result_value);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
return retval;
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
static GSourceFuncs message_queue_source_funcs =
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
message_queue_source_prepare,
|
|
Packit |
1470ea |
NULL, /* check */
|
|
Packit |
1470ea |
message_queue_source_dispatch,
|
|
Packit |
1470ea |
message_queue_source_finalize,
|
|
Packit |
1470ea |
(GSourceFunc) message_queue_source_closure_callback,
|
|
Packit |
1470ea |
NULL,
|
|
Packit |
1470ea |
};
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/**
|
|
Packit |
1470ea |
* message_queue_source_new:
|
|
Packit |
1470ea |
* @queue: the queue to check
|
|
Packit |
1470ea |
* @destroy_message: (nullable): function to free a message, or %NULL
|
|
Packit |
1470ea |
* @cancellable: (nullable): a #GCancellable, or %NULL
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* Create a new #MessageQueueSource, a type of #GSource which dispatches for
|
|
Packit |
1470ea |
* each message queued to it.
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* If a callback function of type #MessageQueueSourceFunc is connected to the
|
|
Packit |
1470ea |
* returned #GSource using g_source_set_callback(), it will be invoked for each
|
|
Packit |
1470ea |
* message, with the message passed as its first argument. It is responsible for
|
|
Packit |
1470ea |
* freeing the message. If no callback is set, messages are automatically freed
|
|
Packit |
1470ea |
* as they are queued.
|
|
Packit |
1470ea |
*
|
|
Packit |
1470ea |
* Returns: (transfer full): a new #MessageQueueSource
|
|
Packit |
1470ea |
*/
|
|
Packit |
1470ea |
GSource *
|
|
Packit |
1470ea |
message_queue_source_new (GAsyncQueue *queue,
|
|
Packit |
1470ea |
GDestroyNotify destroy_message,
|
|
Packit |
1470ea |
GCancellable *cancellable)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
GSource *source; /* alias of @message_queue_source */
|
|
Packit |
1470ea |
MessageQueueSource *message_queue_source; /* alias of @source */
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
g_return_val_if_fail (queue != NULL, NULL);
|
|
Packit |
1470ea |
g_return_val_if_fail (cancellable == NULL ||
|
|
Packit |
1470ea |
G_IS_CANCELLABLE (cancellable), NULL);
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
source = g_source_new (&message_queue_source_funcs,
|
|
Packit |
1470ea |
sizeof (MessageQueueSource));
|
|
Packit |
1470ea |
message_queue_source = (MessageQueueSource *) source;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* The caller can overwrite this name with something more useful later. */
|
|
Packit |
1470ea |
g_source_set_name (source, "MessageQueueSource");
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
message_queue_source->queue = g_async_queue_ref (queue);
|
|
Packit |
1470ea |
message_queue_source->destroy_message = destroy_message;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
/* Add a cancellable source. */
|
|
Packit |
1470ea |
if (cancellable != NULL)
|
|
Packit |
1470ea |
{
|
|
Packit |
1470ea |
GSource *cancellable_source;
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
cancellable_source = g_cancellable_source_new (cancellable);
|
|
Packit |
1470ea |
g_source_set_dummy_callback (cancellable_source);
|
|
Packit |
1470ea |
g_source_add_child_source (source, cancellable_source);
|
|
Packit |
1470ea |
g_source_unref (cancellable_source);
|
|
Packit |
1470ea |
}
|
|
Packit |
1470ea |
|
|
Packit |
1470ea |
return source;
|
|
Packit |
1470ea |
}
|