Blame libdleyna/core/task-processor.c

Packit Service f7c0e4
/*
Packit Service f7c0e4
 * dLeyna
Packit Service f7c0e4
 *
Packit Service f7c0e4
 * Copyright (C) 2012-2017 Intel Corporation. All rights reserved.
Packit Service f7c0e4
 *
Packit Service f7c0e4
 * This program is free software; you can redistribute it and/or modify it
Packit Service f7c0e4
 * under the terms and conditions of the GNU Lesser General Public License,
Packit Service f7c0e4
 * version 2.1, as published by the Free Software Foundation.
Packit Service f7c0e4
 *
Packit Service f7c0e4
 * This program is distributed in the hope it will be useful, but WITHOUT
Packit Service f7c0e4
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
Packit Service f7c0e4
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
Packit Service f7c0e4
 * for more details.
Packit Service f7c0e4
 *
Packit Service f7c0e4
 * You should have received a copy of the GNU Lesser General Public License
Packit Service f7c0e4
 * along with this program; if not, write to the Free Software Foundation, Inc.,
Packit Service f7c0e4
 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
Packit Service f7c0e4
 *
Packit Service f7c0e4
 * Regis Merlino <regis.merlino@intel.com>
Packit Service f7c0e4
 *
Packit Service f7c0e4
 */
Packit Service f7c0e4
Packit Service f7c0e4
#include <stdlib.h>
Packit Service f7c0e4
#include <string.h>
Packit Service f7c0e4
Packit Service f7c0e4
#include "log.h"
Packit Service f7c0e4
#include "task-processor.h"
Packit Service f7c0e4
Packit Service f7c0e4
struct dleyna_task_processor_t_ {
Packit Service f7c0e4
	GHashTable *task_queues;
Packit Service f7c0e4
	guint running_tasks;
Packit Service f7c0e4
	gboolean quitting;
Packit Service f7c0e4
	GSourceFunc on_quit_cb;
Packit Service f7c0e4
};
Packit Service f7c0e4
Packit Service f7c0e4
struct dleyna_task_queue_t_ {
Packit Service f7c0e4
	GPtrArray *tasks;
Packit Service f7c0e4
	dleyna_task_process_cb_t task_process_cb;
Packit Service f7c0e4
	dleyna_task_cancel_cb_t task_cancel_cb;
Packit Service f7c0e4
	dleyna_task_delete_cb_t task_delete_cb;
Packit Service f7c0e4
	dleyna_task_finally_cb_t task_queue_finally_cb;
Packit Service f7c0e4
	dleyna_task_atom_t *current_task;
Packit Service f7c0e4
	guint idle_id;
Packit Service f7c0e4
	gboolean defer_remove;
Packit Service f7c0e4
	guint32 flags;
Packit Service f7c0e4
	gpointer user_data;
Packit Service f7c0e4
	gboolean cancelled;
Packit Service f7c0e4
};
Packit Service f7c0e4
typedef struct dleyna_task_queue_t_ dleyna_task_queue_t;
Packit Service f7c0e4
Packit Service f7c0e4
struct dleyna_task_queue_key_t_ {
Packit Service f7c0e4
	dleyna_task_processor_t *processor;
Packit Service f7c0e4
	gchar *source;
Packit Service f7c0e4
	gchar *sink;
Packit Service f7c0e4
};
Packit Service f7c0e4
Packit Service f7c0e4
static guint prv_key_hash_cb(gconstpointer ptr)
Packit Service f7c0e4
{
Packit Service f7c0e4
	const dleyna_task_queue_key_t *queue_key = ptr;
Packit Service f7c0e4
	guint hash;
Packit Service f7c0e4
Packit Service f7c0e4
	hash = g_str_hash(queue_key->source);
Packit Service f7c0e4
	hash ^= g_str_hash(queue_key->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	return hash;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_key_equal_cb(gconstpointer ptr1, gconstpointer ptr2)
Packit Service f7c0e4
{
Packit Service f7c0e4
	const dleyna_task_queue_key_t *queue_key1 = ptr1;
Packit Service f7c0e4
	const dleyna_task_queue_key_t *queue_key2 = ptr2;
Packit Service f7c0e4
Packit Service f7c0e4
	return !strcmp(queue_key1->source, queue_key2->source) &&
Packit Service f7c0e4
	       !strcmp(queue_key1->sink, queue_key2->sink);
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_key_free_cb(gpointer ptr)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t *queue_key = ptr;
Packit Service f7c0e4
Packit Service f7c0e4
	g_free(queue_key->source);
Packit Service f7c0e4
	g_free(queue_key->sink);
Packit Service f7c0e4
	g_free(queue_key);
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_task_free_cb(gpointer data, gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *task_queue = user_data;
Packit Service f7c0e4
Packit Service f7c0e4
	task_queue->task_delete_cb(data, task_queue->user_data);
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_free_cb(gpointer data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *task_queue = data;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter");
Packit Service f7c0e4
Packit Service a4cd49
	if (task_queue->idle_id) {
Packit Service a4cd49
		g_source_remove(task_queue->idle_id);
Packit Service a4cd49
		task_queue->idle_id = 0;
Packit Service a4cd49
	}
Packit Service a4cd49
Packit Service f7c0e4
	g_ptr_array_foreach(task_queue->tasks, prv_task_free_cb, task_queue);
Packit Service f7c0e4
	g_ptr_array_unref(task_queue->tasks);
Packit Service f7c0e4
Packit Service f7c0e4
	if (task_queue->task_queue_finally_cb)
Packit Service f7c0e4
		task_queue->task_queue_finally_cb(task_queue->cancelled,
Packit Service f7c0e4
						  task_queue->user_data);
Packit Service f7c0e4
Packit Service f7c0e4
	g_free(task_queue);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
dleyna_task_processor_t *dleyna_task_processor_new(GSourceFunc on_quit_cb)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_processor_t *processor;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter");
Packit Service f7c0e4
Packit Service f7c0e4
	processor = g_malloc(sizeof(*processor));
Packit Service f7c0e4
Packit Service f7c0e4
	processor->task_queues = g_hash_table_new_full(prv_key_hash_cb,
Packit Service f7c0e4
						       prv_key_equal_cb,
Packit Service f7c0e4
						       prv_key_free_cb,
Packit Service f7c0e4
						       prv_free_cb);
Packit Service f7c0e4
	processor->running_tasks = 0;
Packit Service f7c0e4
	processor->quitting = FALSE;
Packit Service f7c0e4
	processor->on_quit_cb = on_quit_cb;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
Packit Service f7c0e4
	return processor;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_processor_free(dleyna_task_processor_t *processor)
Packit Service f7c0e4
{
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter");
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_unref(processor->task_queues);
Packit Service f7c0e4
	g_free(processor);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
const dleyna_task_queue_key_t *dleyna_task_processor_add_queue(
Packit Service f7c0e4
				dleyna_task_processor_t *processor,
Packit Service f7c0e4
				const gchar *source,
Packit Service f7c0e4
				const gchar *sink,
Packit Service f7c0e4
				guint32 flags,
Packit Service f7c0e4
				dleyna_task_process_cb_t task_process_cb,
Packit Service f7c0e4
				dleyna_task_cancel_cb_t task_cancel_cb,
Packit Service f7c0e4
				dleyna_task_delete_cb_t task_delete_cb)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_queue_key_t *key;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - queue <%s,%s>", source, sink);
Packit Service f7c0e4
Packit Service f7c0e4
	key = g_malloc(sizeof(*key));
Packit Service f7c0e4
	key->processor = processor;
Packit Service f7c0e4
	key->source = g_strdup(source);
Packit Service f7c0e4
	key->sink = g_strdup(sink);
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_malloc0(sizeof(*queue));
Packit Service f7c0e4
	queue->task_process_cb = task_process_cb;
Packit Service f7c0e4
	queue->task_cancel_cb = task_cancel_cb;
Packit Service f7c0e4
	queue->task_delete_cb = task_delete_cb;
Packit Service f7c0e4
	queue->tasks = g_ptr_array_new();
Packit Service f7c0e4
	queue->flags = flags;
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_insert(processor->task_queues, key, queue);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
Packit Service f7c0e4
	return key;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_task_cancel_and_free_cb(gpointer data, gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *task_queue = user_data;
Packit Service f7c0e4
Packit Service f7c0e4
	task_queue->task_cancel_cb(data, task_queue->user_data);
Packit Service f7c0e4
	task_queue->task_delete_cb(data, task_queue->user_data);
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_cancel_only(const dleyna_task_queue_key_t *queue_id,
Packit Service f7c0e4
				dleyna_task_queue_t *task_queue)
Packit Service f7c0e4
{
Packit Service f7c0e4
	gboolean remove_queue = FALSE;
Packit Service f7c0e4
Packit Service f7c0e4
	if (task_queue->cancelled)
Packit Service f7c0e4
		goto out;
Packit Service f7c0e4
Packit Service f7c0e4
	task_queue->cancelled = TRUE;
Packit Service f7c0e4
Packit Service f7c0e4
	g_ptr_array_foreach(task_queue->tasks, prv_task_cancel_and_free_cb,
Packit Service f7c0e4
			    task_queue);
Packit Service f7c0e4
	g_ptr_array_set_size(task_queue->tasks, 0);
Packit Service f7c0e4
Packit Service f7c0e4
	if (task_queue->idle_id) {
Packit Service f7c0e4
		(void) g_source_remove(task_queue->idle_id);
Packit Service f7c0e4
		task_queue->idle_id = 0;
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
	if (task_queue->current_task)
Packit Service f7c0e4
		task_queue->task_cancel_cb(task_queue->current_task,
Packit Service f7c0e4
					   task_queue->user_data);
Packit Service f7c0e4
	else
Packit Service f7c0e4
		remove_queue = task_queue->flags &
Packit Service f7c0e4
			DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE;
Packit Service f7c0e4
Packit Service f7c0e4
out:
Packit Service f7c0e4
Packit Service f7c0e4
	return remove_queue;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_cancel(const dleyna_task_queue_key_t *queue_id,
Packit Service f7c0e4
		       dleyna_task_queue_t *task_queue)
Packit Service f7c0e4
{
Packit Service f7c0e4
	if (prv_cancel_only(queue_id, task_queue)) {
Packit Service f7c0e4
		DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
				 queue_id->source, queue_id->sink);
Packit Service f7c0e4
		g_hash_table_remove(queue_id->processor->task_queues, queue_id);
Packit Service f7c0e4
	}
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_cancel_cb(gpointer key, gpointer value, gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t *queue_id = key;
Packit Service f7c0e4
	dleyna_task_queue_t *task_queue = value;
Packit Service f7c0e4
	gboolean retval = prv_cancel_only(queue_id, task_queue);
Packit Service f7c0e4
Packit Service f7c0e4
#if DLEYNA_LOG_LEVEL & DLEYNA_LOG_LEVEL_DEBUG
Packit Service f7c0e4
	if (retval)
Packit Service f7c0e4
		DLEYNA_LOG_DEBUG("Removing queue <%s,%s>", queue_id->source,
Packit Service f7c0e4
				 queue_id->sink);
Packit Service f7c0e4
#endif
Packit Service f7c0e4
Packit Service f7c0e4
	return retval;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static void prv_cancel_all_queues(dleyna_task_processor_t *processor)
Packit Service f7c0e4
{
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter");
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_foreach_remove(processor->task_queues, prv_cancel_cb,
Packit Service f7c0e4
				    NULL);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_processor_set_quitting(dleyna_task_processor_t *processor)
Packit Service f7c0e4
{
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter");
Packit Service f7c0e4
Packit Service f7c0e4
	processor->quitting = TRUE;
Packit Service a4cd49
	prv_cancel_all_queues(processor);
Packit Service f7c0e4
Packit Service a4cd49
	if (processor->running_tasks == 0) {
Packit Service f7c0e4
		g_idle_add(processor->on_quit_cb, NULL);
Packit Service a4cd49
		g_hash_table_remove_all(processor->task_queues);
Packit Service a4cd49
	}
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_processor_cancel_queue(const dleyna_task_queue_key_t *queue_id)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Cancel queue <%s,%s>", queue_id->source,
Packit Service f7c0e4
			 queue_id->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
Packit Service f7c0e4
	prv_cancel(queue_id, queue);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_free_queue_for_source(gpointer key, gpointer value,
Packit Service f7c0e4
					  gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t *queue_key = key;
Packit Service f7c0e4
	dleyna_task_queue_t *queue = value;
Packit Service f7c0e4
	const gchar *source = user_data;
Packit Service f7c0e4
	gboolean ret_val = FALSE;
Packit Service f7c0e4
Packit Service f7c0e4
	if (!strcmp(source, queue_key->source) && !queue->defer_remove) {
Packit Service f7c0e4
		queue->defer_remove = (queue->current_task != NULL);
Packit Service 01c219
		prv_cancel_only(queue_key, queue);
Packit Service f7c0e4
Packit Service f7c0e4
		if (!queue->defer_remove) {
Packit Service f7c0e4
			DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
					 queue_key->source, queue_key->sink);
Packit Service f7c0e4
			ret_val = TRUE;
Packit Service f7c0e4
		}
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
	return ret_val;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_processor_remove_queues_for_source(
Packit Service f7c0e4
					dleyna_task_processor_t *processor,
Packit Service f7c0e4
					const gchar *source)
Packit Service f7c0e4
{
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Source <%s>", source);
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_foreach_remove(processor->task_queues,
Packit Service f7c0e4
				    prv_free_queue_for_source,
Packit Service f7c0e4
				    (gpointer)source);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_free_queue_for_sink(gpointer key, gpointer value,
Packit Service f7c0e4
					gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t *queue_key = key;
Packit Service f7c0e4
	dleyna_task_queue_t *queue = value;
Packit Service f7c0e4
	const gchar *sink = user_data;
Packit Service f7c0e4
	gboolean ret_val = FALSE;
Packit Service f7c0e4
Packit Service f7c0e4
	if (!strcmp(sink, queue_key->sink) && !queue->defer_remove) {
Packit Service f7c0e4
		queue->defer_remove = (queue->current_task != NULL);
Packit Service 01c219
		prv_cancel_only(queue_key, queue);
Packit Service f7c0e4
Packit Service f7c0e4
		if (!queue->defer_remove) {
Packit Service f7c0e4
			DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
					 queue_key->source, queue_key->sink);
Packit Service f7c0e4
			ret_val = TRUE;
Packit Service f7c0e4
		}
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
	return ret_val;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_processor_remove_queues_for_sink(
Packit Service f7c0e4
					dleyna_task_processor_t *processor,
Packit Service f7c0e4
					const gchar *sink)
Packit Service f7c0e4
{
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Sink <%s>", sink);
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_foreach_remove(processor->task_queues,
Packit Service f7c0e4
				    prv_free_queue_for_sink,
Packit Service f7c0e4
				    (gpointer)sink);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
const dleyna_task_queue_key_t *dleyna_task_processor_lookup_queue(
Packit Service f7c0e4
				const dleyna_task_processor_t *processor,
Packit Service f7c0e4
				const gchar *source,
Packit Service f7c0e4
				const gchar *sink)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t key;
Packit Service f7c0e4
	dleyna_task_queue_key_t *orig_key = NULL;
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
Packit Service f7c0e4
	key.source = (gchar *)source;
Packit Service f7c0e4
	key.sink = (gchar *)sink;
Packit Service f7c0e4
Packit Service f7c0e4
	g_hash_table_lookup_extended(processor->task_queues,
Packit Service f7c0e4
				     &key,
Packit Service f7c0e4
				     (gpointer *)&orig_key,
Packit Service f7c0e4
				     (gpointer *)&queue);
Packit Service f7c0e4
Packit Service f7c0e4
	return orig_key;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
static gboolean prv_process_task(gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_key_t *queue_id = user_data;
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Start task processing for queue <%s,%s>",
Packit Service f7c0e4
			 queue_id->source, queue_id->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	queue->cancelled = FALSE;
Packit Service f7c0e4
	queue->idle_id = 0;
Packit Service f7c0e4
	queue->current_task = g_ptr_array_index(queue->tasks, 0);
Packit Service f7c0e4
	g_ptr_array_remove_index(queue->tasks, 0);
Packit Service f7c0e4
	queue_id->processor->running_tasks++;
Packit Service f7c0e4
	queue->task_process_cb(queue->current_task, queue->user_data);
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
Packit Service f7c0e4
	return FALSE;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_queue_start(const dleyna_task_queue_key_t *queue_id)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_processor_t *processor = queue_id->processor;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Starting queue <%s,%s>", queue_id->source,
Packit Service f7c0e4
			 queue_id->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	if (processor->quitting)
Packit Service f7c0e4
		goto exit;
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	if (queue->defer_remove)
Packit Service f7c0e4
		goto exit;
Packit Service f7c0e4
Packit Service f7c0e4
	if (queue->tasks->len > 0) {
Packit Service f7c0e4
		if (!queue->current_task && !queue->idle_id)
Packit Service f7c0e4
			queue->idle_id = g_idle_add(prv_process_task,
Packit Service f7c0e4
						    (gpointer)queue_id);
Packit Service f7c0e4
	} else if (queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE) {
Packit Service f7c0e4
			DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
					 queue_id->source, queue_id->sink);
Packit Service f7c0e4
			g_hash_table_remove(processor->task_queues, queue_id);
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
exit:
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_queue_add_task(const dleyna_task_queue_key_t *queue_id,
Packit Service f7c0e4
				dleyna_task_atom_t *task)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Task added to queue <%s,%s>",
Packit Service f7c0e4
			 queue_id->source, queue_id->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	task->queue_id = queue_id;
Packit Service f7c0e4
	g_ptr_array_add(queue->tasks, task);
Packit Service f7c0e4
Packit Service f7c0e4
	if (queue->defer_remove)
Packit Service f7c0e4
		goto exit;
Packit Service f7c0e4
Packit Service f7c0e4
	if ((queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_START) &&
Packit Service f7c0e4
	    (!queue->current_task && !queue->idle_id))
Packit Service f7c0e4
		queue->idle_id = g_idle_add(prv_process_task,
Packit Service f7c0e4
					    (gpointer)queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
exit:
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_queue_task_completed(const dleyna_task_queue_key_t *queue_id)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_processor_t *processor = queue_id->processor;
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Enter - Task completed for queue <%s,%s>",
Packit Service f7c0e4
			 queue_id->source, queue_id->sink);
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	if (queue->current_task) {
Packit Service f7c0e4
		queue->task_delete_cb(queue->current_task, queue->user_data);
Packit Service f7c0e4
		queue->current_task = NULL;
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
	processor->running_tasks--;
Packit Service f7c0e4
Packit Service f7c0e4
	if (processor->quitting && !processor->running_tasks) {
Packit Service f7c0e4
		g_idle_add(processor->on_quit_cb, NULL);
Packit Service f7c0e4
		g_hash_table_remove_all(processor->task_queues);
Packit Service f7c0e4
	} else if (queue->defer_remove) {
Packit Service f7c0e4
		DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
				 queue_id->source, queue_id->sink);
Packit Service f7c0e4
		g_hash_table_remove(processor->task_queues, queue_id);
Packit Service f7c0e4
	} else if (queue->tasks->len > 0) {
Packit Service f7c0e4
		queue->idle_id = g_idle_add(prv_process_task,
Packit Service f7c0e4
					    (gpointer)queue_id);
Packit Service f7c0e4
	} else if (queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE) {
Packit Service f7c0e4
		DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
Packit Service f7c0e4
				 queue_id->source, queue_id->sink);
Packit Service f7c0e4
		g_hash_table_remove(processor->task_queues, queue_id);
Packit Service f7c0e4
	}
Packit Service f7c0e4
Packit Service f7c0e4
	DLEYNA_LOG_DEBUG("Exit");
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_queue_set_finally(const dleyna_task_queue_key_t *queue_id,
Packit Service f7c0e4
				   dleyna_task_finally_cb_t finally_cb)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_processor_t *processor = queue_id->processor;
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	queue->task_queue_finally_cb = finally_cb;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
void dleyna_task_queue_set_user_data(const dleyna_task_queue_key_t *queue_id,
Packit Service f7c0e4
				     gpointer user_data)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_processor_t *processor = queue_id->processor;
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	queue->user_data = user_data;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
gpointer dleyna_task_queue_get_user_data(
Packit Service f7c0e4
					const dleyna_task_queue_key_t *queue_id)
Packit Service f7c0e4
{
Packit Service f7c0e4
	dleyna_task_queue_t *queue;
Packit Service f7c0e4
	dleyna_task_processor_t *processor = queue_id->processor;
Packit Service f7c0e4
Packit Service f7c0e4
	queue = g_hash_table_lookup(processor->task_queues, queue_id);
Packit Service f7c0e4
Packit Service f7c0e4
	return queue->user_data;
Packit Service f7c0e4
}
Packit Service f7c0e4
Packit Service f7c0e4
const gchar *dleyna_task_queue_get_source(
Packit Service f7c0e4
					const dleyna_task_queue_key_t *queue_id)
Packit Service f7c0e4
{
Packit Service f7c0e4
	return queue_id->source;
Packit Service f7c0e4
}