/*
* dLeyna
*
* Copyright (C) 2012-2017 Intel Corporation. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU Lesser General Public License,
* version 2.1, as published by the Free Software Foundation.
*
* This program is distributed in the hope it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
*
* Regis Merlino <regis.merlino@intel.com>
*
*/
#include <stdlib.h>
#include <string.h>
#include "log.h"
#include "task-processor.h"
struct dleyna_task_processor_t_ {
GHashTable *task_queues;
guint running_tasks;
gboolean quitting;
GSourceFunc on_quit_cb;
};
struct dleyna_task_queue_t_ {
GPtrArray *tasks;
dleyna_task_process_cb_t task_process_cb;
dleyna_task_cancel_cb_t task_cancel_cb;
dleyna_task_delete_cb_t task_delete_cb;
dleyna_task_finally_cb_t task_queue_finally_cb;
dleyna_task_atom_t *current_task;
guint idle_id;
gboolean defer_remove;
guint32 flags;
gpointer user_data;
gboolean cancelled;
};
typedef struct dleyna_task_queue_t_ dleyna_task_queue_t;
struct dleyna_task_queue_key_t_ {
dleyna_task_processor_t *processor;
gchar *source;
gchar *sink;
};
static guint prv_key_hash_cb(gconstpointer ptr)
{
const dleyna_task_queue_key_t *queue_key = ptr;
guint hash;
hash = g_str_hash(queue_key->source);
hash ^= g_str_hash(queue_key->sink);
return hash;
}
static gboolean prv_key_equal_cb(gconstpointer ptr1, gconstpointer ptr2)
{
const dleyna_task_queue_key_t *queue_key1 = ptr1;
const dleyna_task_queue_key_t *queue_key2 = ptr2;
return !strcmp(queue_key1->source, queue_key2->source) &&
!strcmp(queue_key1->sink, queue_key2->sink);
}
static void prv_key_free_cb(gpointer ptr)
{
dleyna_task_queue_key_t *queue_key = ptr;
g_free(queue_key->source);
g_free(queue_key->sink);
g_free(queue_key);
}
static void prv_task_free_cb(gpointer data, gpointer user_data)
{
dleyna_task_queue_t *task_queue = user_data;
task_queue->task_delete_cb(data, task_queue->user_data);
}
static void prv_free_cb(gpointer data)
{
dleyna_task_queue_t *task_queue = data;
DLEYNA_LOG_DEBUG("Enter");
if (task_queue->idle_id) {
g_source_remove(task_queue->idle_id);
task_queue->idle_id = 0;
}
g_ptr_array_foreach(task_queue->tasks, prv_task_free_cb, task_queue);
g_ptr_array_unref(task_queue->tasks);
if (task_queue->task_queue_finally_cb)
task_queue->task_queue_finally_cb(task_queue->cancelled,
task_queue->user_data);
g_free(task_queue);
DLEYNA_LOG_DEBUG("Exit");
}
dleyna_task_processor_t *dleyna_task_processor_new(GSourceFunc on_quit_cb)
{
dleyna_task_processor_t *processor;
DLEYNA_LOG_DEBUG("Enter");
processor = g_malloc(sizeof(*processor));
processor->task_queues = g_hash_table_new_full(prv_key_hash_cb,
prv_key_equal_cb,
prv_key_free_cb,
prv_free_cb);
processor->running_tasks = 0;
processor->quitting = FALSE;
processor->on_quit_cb = on_quit_cb;
DLEYNA_LOG_DEBUG("Exit");
return processor;
}
void dleyna_task_processor_free(dleyna_task_processor_t *processor)
{
DLEYNA_LOG_DEBUG("Enter");
g_hash_table_unref(processor->task_queues);
g_free(processor);
DLEYNA_LOG_DEBUG("Exit");
}
const dleyna_task_queue_key_t *dleyna_task_processor_add_queue(
dleyna_task_processor_t *processor,
const gchar *source,
const gchar *sink,
guint32 flags,
dleyna_task_process_cb_t task_process_cb,
dleyna_task_cancel_cb_t task_cancel_cb,
dleyna_task_delete_cb_t task_delete_cb)
{
dleyna_task_queue_t *queue;
dleyna_task_queue_key_t *key;
DLEYNA_LOG_DEBUG("Enter - queue <%s,%s>", source, sink);
key = g_malloc(sizeof(*key));
key->processor = processor;
key->source = g_strdup(source);
key->sink = g_strdup(sink);
queue = g_malloc0(sizeof(*queue));
queue->task_process_cb = task_process_cb;
queue->task_cancel_cb = task_cancel_cb;
queue->task_delete_cb = task_delete_cb;
queue->tasks = g_ptr_array_new();
queue->flags = flags;
g_hash_table_insert(processor->task_queues, key, queue);
DLEYNA_LOG_DEBUG("Exit");
return key;
}
static void prv_task_cancel_and_free_cb(gpointer data, gpointer user_data)
{
dleyna_task_queue_t *task_queue = user_data;
task_queue->task_cancel_cb(data, task_queue->user_data);
task_queue->task_delete_cb(data, task_queue->user_data);
}
static gboolean prv_cancel_only(const dleyna_task_queue_key_t *queue_id,
dleyna_task_queue_t *task_queue)
{
gboolean remove_queue = FALSE;
if (task_queue->cancelled)
goto out;
task_queue->cancelled = TRUE;
g_ptr_array_foreach(task_queue->tasks, prv_task_cancel_and_free_cb,
task_queue);
g_ptr_array_set_size(task_queue->tasks, 0);
if (task_queue->idle_id) {
(void) g_source_remove(task_queue->idle_id);
task_queue->idle_id = 0;
}
if (task_queue->current_task)
task_queue->task_cancel_cb(task_queue->current_task,
task_queue->user_data);
else
remove_queue = task_queue->flags &
DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE;
out:
return remove_queue;
}
static void prv_cancel(const dleyna_task_queue_key_t *queue_id,
dleyna_task_queue_t *task_queue)
{
if (prv_cancel_only(queue_id, task_queue)) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_id->source, queue_id->sink);
g_hash_table_remove(queue_id->processor->task_queues, queue_id);
}
}
static gboolean prv_cancel_cb(gpointer key, gpointer value, gpointer user_data)
{
dleyna_task_queue_key_t *queue_id = key;
dleyna_task_queue_t *task_queue = value;
gboolean retval = prv_cancel_only(queue_id, task_queue);
#if DLEYNA_LOG_LEVEL & DLEYNA_LOG_LEVEL_DEBUG
if (retval)
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>", queue_id->source,
queue_id->sink);
#endif
return retval;
}
static void prv_cancel_all_queues(dleyna_task_processor_t *processor)
{
DLEYNA_LOG_DEBUG("Enter");
g_hash_table_foreach_remove(processor->task_queues, prv_cancel_cb,
NULL);
DLEYNA_LOG_DEBUG("Exit");
}
void dleyna_task_processor_set_quitting(dleyna_task_processor_t *processor)
{
DLEYNA_LOG_DEBUG("Enter");
processor->quitting = TRUE;
prv_cancel_all_queues(processor);
if (processor->running_tasks == 0) {
g_idle_add(processor->on_quit_cb, NULL);
g_hash_table_remove_all(processor->task_queues);
}
DLEYNA_LOG_DEBUG("Exit");
}
void dleyna_task_processor_cancel_queue(const dleyna_task_queue_key_t *queue_id)
{
dleyna_task_queue_t *queue;
DLEYNA_LOG_DEBUG("Cancel queue <%s,%s>", queue_id->source,
queue_id->sink);
queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
prv_cancel(queue_id, queue);
DLEYNA_LOG_DEBUG("Exit");
}
static gboolean prv_free_queue_for_source(gpointer key, gpointer value,
gpointer user_data)
{
dleyna_task_queue_key_t *queue_key = key;
dleyna_task_queue_t *queue = value;
const gchar *source = user_data;
gboolean ret_val = FALSE;
if (!strcmp(source, queue_key->source) && !queue->defer_remove) {
queue->defer_remove = (queue->current_task != NULL);
prv_cancel_only(queue_key, queue);
if (!queue->defer_remove) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_key->source, queue_key->sink);
ret_val = TRUE;
}
}
return ret_val;
}
void dleyna_task_processor_remove_queues_for_source(
dleyna_task_processor_t *processor,
const gchar *source)
{
DLEYNA_LOG_DEBUG("Enter - Source <%s>", source);
g_hash_table_foreach_remove(processor->task_queues,
prv_free_queue_for_source,
(gpointer)source);
DLEYNA_LOG_DEBUG("Exit");
}
static gboolean prv_free_queue_for_sink(gpointer key, gpointer value,
gpointer user_data)
{
dleyna_task_queue_key_t *queue_key = key;
dleyna_task_queue_t *queue = value;
const gchar *sink = user_data;
gboolean ret_val = FALSE;
if (!strcmp(sink, queue_key->sink) && !queue->defer_remove) {
queue->defer_remove = (queue->current_task != NULL);
prv_cancel_only(queue_key, queue);
if (!queue->defer_remove) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_key->source, queue_key->sink);
ret_val = TRUE;
}
}
return ret_val;
}
void dleyna_task_processor_remove_queues_for_sink(
dleyna_task_processor_t *processor,
const gchar *sink)
{
DLEYNA_LOG_DEBUG("Enter - Sink <%s>", sink);
g_hash_table_foreach_remove(processor->task_queues,
prv_free_queue_for_sink,
(gpointer)sink);
DLEYNA_LOG_DEBUG("Exit");
}
const dleyna_task_queue_key_t *dleyna_task_processor_lookup_queue(
const dleyna_task_processor_t *processor,
const gchar *source,
const gchar *sink)
{
dleyna_task_queue_key_t key;
dleyna_task_queue_key_t *orig_key = NULL;
dleyna_task_queue_t *queue;
key.source = (gchar *)source;
key.sink = (gchar *)sink;
g_hash_table_lookup_extended(processor->task_queues,
&key,
(gpointer *)&orig_key,
(gpointer *)&queue);
return orig_key;
}
static gboolean prv_process_task(gpointer user_data)
{
dleyna_task_queue_key_t *queue_id = user_data;
dleyna_task_queue_t *queue;
DLEYNA_LOG_DEBUG("Enter - Start task processing for queue <%s,%s>",
queue_id->source, queue_id->sink);
queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
queue->cancelled = FALSE;
queue->idle_id = 0;
queue->current_task = g_ptr_array_index(queue->tasks, 0);
g_ptr_array_remove_index(queue->tasks, 0);
queue_id->processor->running_tasks++;
queue->task_process_cb(queue->current_task, queue->user_data);
DLEYNA_LOG_DEBUG("Exit");
return FALSE;
}
void dleyna_task_queue_start(const dleyna_task_queue_key_t *queue_id)
{
dleyna_task_queue_t *queue;
dleyna_task_processor_t *processor = queue_id->processor;
DLEYNA_LOG_DEBUG("Enter - Starting queue <%s,%s>", queue_id->source,
queue_id->sink);
if (processor->quitting)
goto exit;
queue = g_hash_table_lookup(processor->task_queues, queue_id);
if (queue->defer_remove)
goto exit;
if (queue->tasks->len > 0) {
if (!queue->current_task && !queue->idle_id)
queue->idle_id = g_idle_add(prv_process_task,
(gpointer)queue_id);
} else if (queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_id->source, queue_id->sink);
g_hash_table_remove(processor->task_queues, queue_id);
}
exit:
DLEYNA_LOG_DEBUG("Exit");
}
void dleyna_task_queue_add_task(const dleyna_task_queue_key_t *queue_id,
dleyna_task_atom_t *task)
{
dleyna_task_queue_t *queue;
DLEYNA_LOG_DEBUG("Enter - Task added to queue <%s,%s>",
queue_id->source, queue_id->sink);
queue = g_hash_table_lookup(queue_id->processor->task_queues, queue_id);
task->queue_id = queue_id;
g_ptr_array_add(queue->tasks, task);
if (queue->defer_remove)
goto exit;
if ((queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_START) &&
(!queue->current_task && !queue->idle_id))
queue->idle_id = g_idle_add(prv_process_task,
(gpointer)queue_id);
exit:
DLEYNA_LOG_DEBUG("Exit");
}
void dleyna_task_queue_task_completed(const dleyna_task_queue_key_t *queue_id)
{
dleyna_task_queue_t *queue;
dleyna_task_processor_t *processor = queue_id->processor;
DLEYNA_LOG_DEBUG("Enter - Task completed for queue <%s,%s>",
queue_id->source, queue_id->sink);
queue = g_hash_table_lookup(processor->task_queues, queue_id);
if (queue->current_task) {
queue->task_delete_cb(queue->current_task, queue->user_data);
queue->current_task = NULL;
}
processor->running_tasks--;
if (processor->quitting && !processor->running_tasks) {
g_idle_add(processor->on_quit_cb, NULL);
g_hash_table_remove_all(processor->task_queues);
} else if (queue->defer_remove) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_id->source, queue_id->sink);
g_hash_table_remove(processor->task_queues, queue_id);
} else if (queue->tasks->len > 0) {
queue->idle_id = g_idle_add(prv_process_task,
(gpointer)queue_id);
} else if (queue->flags & DLEYNA_TASK_QUEUE_FLAG_AUTO_REMOVE) {
DLEYNA_LOG_DEBUG("Removing queue <%s,%s>",
queue_id->source, queue_id->sink);
g_hash_table_remove(processor->task_queues, queue_id);
}
DLEYNA_LOG_DEBUG("Exit");
}
void dleyna_task_queue_set_finally(const dleyna_task_queue_key_t *queue_id,
dleyna_task_finally_cb_t finally_cb)
{
dleyna_task_queue_t *queue;
dleyna_task_processor_t *processor = queue_id->processor;
queue = g_hash_table_lookup(processor->task_queues, queue_id);
queue->task_queue_finally_cb = finally_cb;
}
void dleyna_task_queue_set_user_data(const dleyna_task_queue_key_t *queue_id,
gpointer user_data)
{
dleyna_task_queue_t *queue;
dleyna_task_processor_t *processor = queue_id->processor;
queue = g_hash_table_lookup(processor->task_queues, queue_id);
queue->user_data = user_data;
}
gpointer dleyna_task_queue_get_user_data(
const dleyna_task_queue_key_t *queue_id)
{
dleyna_task_queue_t *queue;
dleyna_task_processor_t *processor = queue_id->processor;
queue = g_hash_table_lookup(processor->task_queues, queue_id);
return queue->user_data;
}
const gchar *dleyna_task_queue_get_source(
const dleyna_task_queue_key_t *queue_id)
{
return queue_id->source;
}