/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
/*
* soup-message-queue.c: Message queue
*
* Copyright (C) 2003 Novell, Inc.
* Copyright (C) 2008 Red Hat, Inc.
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "soup-message-queue.h"
#include "soup.h"
/* This is an internal structure used by #SoupSession and its
* subclasses to keep track of the status of messages currently being
* processed.
*
* The #SoupMessageQueue itself is mostly just a linked list of
* #SoupMessageQueueItem, with some added cleverness to allow the list
* to be walked safely while other threads / re-entrant loops are
* adding items to and removing items from it. In particular, this is
* handled by refcounting items and then keeping "removed" items in
* the list until their ref_count drops to 0, but skipping over the
* "removed" ones when walking the queue.
**/
struct _SoupMessageQueue {
SoupSession *session;
GMutex mutex;
SoupMessageQueueItem *head, *tail;
};
SoupMessageQueue *
soup_message_queue_new (SoupSession *session)
{
SoupMessageQueue *queue;
queue = g_slice_new0 (SoupMessageQueue);
queue->session = session;
g_mutex_init (&queue->mutex);
return queue;
}
void
soup_message_queue_destroy (SoupMessageQueue *queue)
{
g_return_if_fail (queue->head == NULL);
g_mutex_clear (&queue->mutex);
g_slice_free (SoupMessageQueue, queue);
}
static void
queue_message_restarted (SoupMessage *msg, gpointer user_data)
{
SoupMessageQueueItem *item = user_data;
g_cancellable_reset (item->cancellable);
}
/**
* soup_message_queue_append:
* @queue: a #SoupMessageQueue
* @msg: a #SoupMessage
* @callback: the callback for @msg
* @user_data: the data to pass to @callback
*
* Creates a new #SoupMessageQueueItem and appends it to @queue.
*
* Return value: the new item, which you must unref with
* soup_message_queue_unref_item() when you are done with.
**/
SoupMessageQueueItem *
soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
SoupSessionCallback callback, gpointer user_data)
{
SoupMessageQueueItem *item;
item = g_slice_new0 (SoupMessageQueueItem);
item->session = g_object_ref (queue->session);
item->async_context = soup_session_get_async_context (item->session);
if (item->async_context)
g_main_context_ref (item->async_context);
item->queue = queue;
item->msg = g_object_ref (msg);
item->callback = callback;
item->callback_data = user_data;
item->cancellable = g_cancellable_new ();
item->priority = soup_message_get_priority (msg);
g_signal_connect (msg, "restarted",
G_CALLBACK (queue_message_restarted), item);
/* Note: the initial ref_count of 1 represents the caller's
* ref; the queue's own ref is indicated by the absence of the
* "removed" flag.
*/
item->ref_count = 1;
g_mutex_lock (&queue->mutex);
if (queue->head) {
SoupMessageQueueItem *it = queue->head;
while (it && it->priority >= item->priority)
it = it->next;
if (!it) {
if (queue->tail) {
queue->tail->next = item;
item->prev = queue->tail;
} else
queue->head = item;
queue->tail = item;
} else {
if (it != queue->head)
it->prev->next = item;
else
queue->head = item;
item->prev = it->prev;
it->prev = item;
item->next = it;
}
} else
queue->head = queue->tail = item;
g_mutex_unlock (&queue->mutex);
return item;
}
/**
* soup_message_queue_item_ref:
* @item: a #SoupMessageQueueItem
*
* Refs @item.
**/
void
soup_message_queue_item_ref (SoupMessageQueueItem *item)
{
g_mutex_lock (&item->queue->mutex);
item->ref_count++;
g_mutex_unlock (&item->queue->mutex);
}
/**
* soup_message_queue_item_unref:
* @item: a #SoupMessageQueueItem
*
* Unrefs @item; use this on a #SoupMessageQueueItem that you are done
* with (but that you aren't passing to
* soup_message_queue_item_next()).
**/
void
soup_message_queue_item_unref (SoupMessageQueueItem *item)
{
g_mutex_lock (&item->queue->mutex);
/* Decrement the ref_count; if it's still non-zero OR if the
* item is still in the queue, then return.
*/
if (--item->ref_count || !item->removed) {
g_mutex_unlock (&item->queue->mutex);
return;
}
g_warn_if_fail (item->conn == NULL);
/* OK, @item is dead. Rewrite @queue around it */
if (item->prev)
item->prev->next = item->next;
else
item->queue->head = item->next;
if (item->next)
item->next->prev = item->prev;
else
item->queue->tail = item->prev;
g_mutex_unlock (&item->queue->mutex);
/* And free it */
g_signal_handlers_disconnect_by_func (item->msg,
queue_message_restarted, item);
g_object_unref (item->session);
g_object_unref (item->msg);
g_object_unref (item->cancellable);
g_clear_error (&item->error);
g_clear_object (&item->task);
g_clear_pointer (&item->async_context, g_main_context_unref);
if (item->io_source) {
g_source_destroy (item->io_source);
g_source_unref (item->io_source);
}
g_slice_free (SoupMessageQueueItem, item);
}
/**
* soup_message_queue_lookup:
* @queue: a #SoupMessageQueue
* @msg: a #SoupMessage
*
* Finds the #SoupMessageQueueItem for @msg in @queue. You must unref
* the item with soup_message_queue_unref_item() when you are done
* with it.
*
* Return value: (nullable): the queue item for @msg, or %NULL
**/
SoupMessageQueueItem *
soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
{
SoupMessageQueueItem *item;
g_mutex_lock (&queue->mutex);
item = queue->tail;
while (item && (item->removed || item->msg != msg))
item = item->prev;
if (item)
item->ref_count++;
g_mutex_unlock (&queue->mutex);
return item;
}
/**
* soup_message_queue_first:
* @queue: a #SoupMessageQueue
*
* Gets the first item in @queue. You must unref the item by calling
* soup_message_queue_unref_item() on it when you are done.
* (soup_message_queue_next() does this for you automatically, so you
* only need to unref the item yourself if you are not going to
* finishing walking the queue.)
*
* Return value: the first item in @queue.
**/
SoupMessageQueueItem *
soup_message_queue_first (SoupMessageQueue *queue)
{
SoupMessageQueueItem *item;
g_mutex_lock (&queue->mutex);
item = queue->head;
while (item && item->removed)
item = item->next;
if (item)
item->ref_count++;
g_mutex_unlock (&queue->mutex);
return item;
}
/**
* soup_message_queue_next:
* @queue: a #SoupMessageQueue
* @item: a #SoupMessageQueueItem
*
* Unrefs @item and gets the next item after it in @queue. As with
* soup_message_queue_first(), you must unref the returned item
* yourself with soup_message_queue_unref_item() if you do not finish
* walking the queue.
*
* Return value: the next item in @queue.
**/
SoupMessageQueueItem *
soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item)
{
SoupMessageQueueItem *next;
g_mutex_lock (&queue->mutex);
next = item->next;
while (next && next->removed)
next = next->next;
if (next)
next->ref_count++;
g_mutex_unlock (&queue->mutex);
soup_message_queue_item_unref (item);
return next;
}
/**
* soup_message_queue_remove:
* @queue: a #SoupMessageQueue
* @item: a #SoupMessageQueueItem
*
* Removes @item from @queue. Note that you probably also need to call
* soup_message_queue_unref_item() after this.
**/
void
soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueItem *item)
{
g_return_if_fail (!item->removed);
g_mutex_lock (&queue->mutex);
item->removed = TRUE;
g_mutex_unlock (&queue->mutex);
}