Blame platform-demos/C/samples/example-custom-gsource.c

Packit 1470ea
/**
Packit 1470ea
 * MessageQueueSource:
Packit 1470ea
 *
Packit 1470ea
 * This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
Packit 1470ea
 * message can be pulled off the queue. Messages can be enqueued from any
Packit 1470ea
 * thread.
Packit 1470ea
 *
Packit 1470ea
 * The callbacks dispatched by a #MessageQueueSource have type
Packit 1470ea
 * #MessageQueueSourceFunc.
Packit 1470ea
 *
Packit 1470ea
 * #MessageQueueSource supports adding a #GCancellable child source which will
Packit 1470ea
 * additionally dispatch if a provided #GCancellable is cancelled.
Packit 1470ea
 */
Packit 1470ea
typedef struct {
Packit 1470ea
  GSource         parent;
Packit 1470ea
  GAsyncQueue    *queue;  /* owned */
Packit 1470ea
  GDestroyNotify  destroy_message;
Packit 1470ea
} MessageQueueSource;
Packit 1470ea
Packit 1470ea
/**
Packit 1470ea
 * MessageQueueSourceFunc:
Packit 1470ea
 * @message: (transfer full) (nullable): message pulled off the queue
Packit 1470ea
 * @user_data: user data provided to g_source_set_callback()
Packit 1470ea
 *
Packit 1470ea
 * Callback function type for #MessageQueueSource.
Packit 1470ea
 */
Packit 1470ea
typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
Packit 1470ea
                                            gpointer user_data);
Packit 1470ea
Packit 1470ea
static gboolean
Packit 1470ea
message_queue_source_prepare (GSource *source,
Packit 1470ea
                              gint    *timeout_)
Packit 1470ea
{
Packit 1470ea
  MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
Packit 1470ea
Packit 1470ea
  return (g_async_queue_length (message_queue_source->queue) > 0);
Packit 1470ea
}
Packit 1470ea
Packit 1470ea
static gboolean
Packit 1470ea
message_queue_source_dispatch (GSource     *source,
Packit 1470ea
                               GSourceFunc  callback,
Packit 1470ea
                               gpointer     user_data)
Packit 1470ea
{
Packit 1470ea
  MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
Packit 1470ea
  gpointer message;
Packit 1470ea
  MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;
Packit 1470ea
Packit 1470ea
  /* Pop a message off the queue. */
Packit 1470ea
  message = g_async_queue_try_pop (message_queue_source->queue);
Packit 1470ea
Packit 1470ea
  /* If there was no message, bail. */
Packit 1470ea
  if (message == NULL)
Packit 1470ea
    {
Packit 1470ea
      /* Keep the source around to handle the next message. */
Packit 1470ea
      return TRUE;
Packit 1470ea
    }
Packit 1470ea
Packit 1470ea
  /* @func may be %NULL if no callback was specified.
Packit 1470ea
   * If so, drop the message. */
Packit 1470ea
  if (func == NULL)
Packit 1470ea
    {
Packit 1470ea
      if (message_queue_source->destroy_message != NULL)
Packit 1470ea
        {
Packit 1470ea
          message_queue_source->destroy_message (message);
Packit 1470ea
        }
Packit 1470ea
Packit 1470ea
      /* Keep the source around to consume the next message. */
Packit 1470ea
      return TRUE;
Packit 1470ea
    }
Packit 1470ea
Packit 1470ea
  return func (message, user_data);
Packit 1470ea
}
Packit 1470ea
Packit 1470ea
static void
Packit 1470ea
message_queue_source_finalize (GSource *source)
Packit 1470ea
{
Packit 1470ea
  MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
Packit 1470ea
Packit 1470ea
  g_async_queue_unref (message_queue_source->queue);
Packit 1470ea
}
Packit 1470ea
Packit 1470ea
static gboolean
Packit 1470ea
message_queue_source_closure_callback (gpointer message,
Packit 1470ea
                                       gpointer user_data)
Packit 1470ea
{
Packit 1470ea
  GClosure *closure = user_data;
Packit 1470ea
  GValue param_value = G_VALUE_INIT;
Packit 1470ea
  GValue result_value = G_VALUE_INIT;
Packit 1470ea
  gboolean retval;
Packit 1470ea
Packit 1470ea
  /* The invoked function is responsible for freeing @message. */
Packit 1470ea
  g_value_init (&result_value, G_TYPE_BOOLEAN);
Packit 1470ea
  g_value_init (&param_value, G_TYPE_POINTER);
Packit 1470ea
  g_value_set_pointer (&param_value, message);
Packit 1470ea
Packit 1470ea
  g_closure_invoke (closure, &result_value, 1, &param_value, NULL);
Packit 1470ea
  retval = g_value_get_boolean (&result_value);
Packit 1470ea
Packit 1470ea
  g_value_unset (&param_value);
Packit 1470ea
  g_value_unset (&result_value);
Packit 1470ea
Packit 1470ea
  return retval;
Packit 1470ea
}
Packit 1470ea
Packit 1470ea
static GSourceFuncs message_queue_source_funcs =
Packit 1470ea
  {
Packit 1470ea
    message_queue_source_prepare,
Packit 1470ea
    NULL,  /* check */
Packit 1470ea
    message_queue_source_dispatch,
Packit 1470ea
    message_queue_source_finalize,
Packit 1470ea
    (GSourceFunc) message_queue_source_closure_callback,
Packit 1470ea
    NULL,
Packit 1470ea
  };
Packit 1470ea
Packit 1470ea
/**
Packit 1470ea
 * message_queue_source_new:
Packit 1470ea
 * @queue: the queue to check
Packit 1470ea
 * @destroy_message: (nullable): function to free a message, or %NULL
Packit 1470ea
 * @cancellable: (nullable): a #GCancellable, or %NULL
Packit 1470ea
 *
Packit 1470ea
 * Create a new #MessageQueueSource, a type of #GSource which dispatches for
Packit 1470ea
 * each message queued to it.
Packit 1470ea
 *
Packit 1470ea
 * If a callback function of type #MessageQueueSourceFunc is connected to the
Packit 1470ea
 * returned #GSource using g_source_set_callback(), it will be invoked for each
Packit 1470ea
 * message, with the message passed as its first argument. It is responsible for
Packit 1470ea
 * freeing the message. If no callback is set, messages are automatically freed
Packit 1470ea
 * as they are queued.
Packit 1470ea
 *
Packit 1470ea
 * Returns: (transfer full): a new #MessageQueueSource
Packit 1470ea
 */
Packit 1470ea
GSource *
Packit 1470ea
message_queue_source_new (GAsyncQueue    *queue,
Packit 1470ea
                          GDestroyNotify  destroy_message,
Packit 1470ea
                          GCancellable   *cancellable)
Packit 1470ea
{
Packit 1470ea
  GSource *source;  /* alias of @message_queue_source */
Packit 1470ea
  MessageQueueSource *message_queue_source;  /* alias of @source */
Packit 1470ea
Packit 1470ea
  g_return_val_if_fail (queue != NULL, NULL);
Packit 1470ea
  g_return_val_if_fail (cancellable == NULL ||
Packit 1470ea
                        G_IS_CANCELLABLE (cancellable), NULL);
Packit 1470ea
Packit 1470ea
  source = g_source_new (&message_queue_source_funcs,
Packit 1470ea
                         sizeof (MessageQueueSource));
Packit 1470ea
  message_queue_source = (MessageQueueSource *) source;
Packit 1470ea
Packit 1470ea
  /* The caller can overwrite this name with something more useful later. */
Packit 1470ea
  g_source_set_name (source, "MessageQueueSource");
Packit 1470ea
Packit 1470ea
  message_queue_source->queue = g_async_queue_ref (queue);
Packit 1470ea
  message_queue_source->destroy_message = destroy_message;
Packit 1470ea
Packit 1470ea
  /* Add a cancellable source. */
Packit 1470ea
  if (cancellable != NULL)
Packit 1470ea
    {
Packit 1470ea
      GSource *cancellable_source;
Packit 1470ea
Packit 1470ea
      cancellable_source = g_cancellable_source_new (cancellable);
Packit 1470ea
      g_source_set_dummy_callback (cancellable_source);
Packit 1470ea
      g_source_add_child_source (source, cancellable_source);
Packit 1470ea
      g_source_unref (cancellable_source);
Packit 1470ea
    }
Packit 1470ea
Packit 1470ea
  return source;
Packit 1470ea
}