/*
* Farstream - Farstream Raw Conference Implementation
*
* Copyright 2008 Richard Spiers <richard.spiers@gmail.com>
* Copyright 2007 Nokia Corp.
* Copyright 2007-2010 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* @author: Mike Ruprecht <mike.ruprecht@collabora.co.uk>
*
* fs-raw-conference.c - Raw implementation for Farstream Conference Gstreamer
* Elements
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* SECTION:element-fsrawconference
* @short_description: Farstream Raw Conference Gstreamer Elements Base class
*
* This element implements a raw content stream over which any Gstreamer
* content may travel.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-raw-conference.h"
#include "fs-raw-session.h"
#include "fs-raw-participant.h"
GST_DEBUG_CATEGORY (fsrawconference_debug);
#define GST_CAT_DEFAULT fsrawconference_debug
/* Signals */
enum
{
LAST_SIGNAL
};
/* Properties */
enum
{
PROP_0
};
static GstStaticPadTemplate fs_raw_conference_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%d",
GST_PAD_SINK,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate fs_raw_conference_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
#define FS_RAW_CONFERENCE_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), FS_TYPE_RAW_CONFERENCE, \
FsRawConferencePrivate))
struct _FsRawConferencePrivate
{
gboolean disposed;
/* Protected by GST_OBJECT_LOCK */
GList *sessions;
guint max_session_id;
GList *participants;
/* Array of all internal threads, as GThreads */
GPtrArray *threads;
};
G_DEFINE_TYPE (FsRawConference, fs_raw_conference, FS_TYPE_CONFERENCE);
static FsSession *fs_raw_conference_new_session (FsConference *conf,
FsMediaType media_type,
GError **error);
static FsParticipant *fs_raw_conference_new_participant (FsConference *conf,
GError **error);
static void _remove_session (gpointer user_data,
GObject *where_the_object_was);
static void _remove_participant (gpointer user_data,
GObject *where_the_object_was);
static void fs_raw_conference_handle_message (
GstBin * bin,
GstMessage * message);
static void
fs_raw_conference_dispose (GObject * object)
{
FsRawConference *self = FS_RAW_CONFERENCE (object);
GList *item;
if (self->priv->disposed)
return;
for (item = g_list_first (self->priv->participants);
item;
item = g_list_next (item))
g_object_weak_unref (G_OBJECT (item->data), _remove_participant, self);
g_list_free (self->priv->participants);
self->priv->participants = NULL;
self->priv->disposed = TRUE;
G_OBJECT_CLASS (fs_raw_conference_parent_class)->dispose (object);
}
static void
fs_raw_conference_finalize (GObject * object)
{
FsRawConference *self = FS_RAW_CONFERENCE (object);
g_ptr_array_free (self->priv->threads, TRUE);
G_OBJECT_CLASS (fs_raw_conference_parent_class)->finalize (object);
}
static void
fs_raw_conference_class_init (FsRawConferenceClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
FsConferenceClass *baseconf_class = FS_CONFERENCE_CLASS (klass);
GstBinClass *gstbin_class = GST_BIN_CLASS (klass);
g_type_class_add_private (klass, sizeof (FsRawConferencePrivate));
GST_DEBUG_CATEGORY_INIT (fsrawconference_debug, "fsrawconference", 0,
"Farstream Raw Conference Element");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&fs_raw_conference_sink_template));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&fs_raw_conference_src_template));
baseconf_class->new_session =
GST_DEBUG_FUNCPTR (fs_raw_conference_new_session);
baseconf_class->new_participant =
GST_DEBUG_FUNCPTR (fs_raw_conference_new_participant);
gstbin_class->handle_message =
GST_DEBUG_FUNCPTR (fs_raw_conference_handle_message);
gobject_class->finalize = GST_DEBUG_FUNCPTR (fs_raw_conference_finalize);
gobject_class->dispose = GST_DEBUG_FUNCPTR (fs_raw_conference_dispose);
}
static void
fs_raw_conference_init (FsRawConference *conf)
{
GST_DEBUG_OBJECT (conf, "fs_raw_conference_init");
conf->priv = FS_RAW_CONFERENCE_GET_PRIVATE (conf);
conf->priv->max_session_id = 1;
conf->priv->threads = g_ptr_array_new ();
}
/**
* fs_rtp_conference_get_session_by_id_locked
* @self: The #FsRawConference
* @session_id: The session id
*
* Gets the #FsRawSession from a list of sessions or NULL if it doesnt exist
* You have to hold the GST_OBJECT_LOCK to call this function.
*
* Return value: A #FsRawSession (unref after use) or NULL if it doesn't exist
*/
static FsRawSession *
fs_raw_conference_get_session_by_id_locked (FsRawConference *self,
guint session_id)
{
GList *item = NULL;
for (item = g_list_first (self->priv->sessions);
item;
item = g_list_next (item)) {
FsRawSession *session = item->data;
if (session->id == session_id) {
g_object_ref (session);
break;
}
}
if (item)
return FS_RAW_SESSION (item->data);
else
return NULL;
}
static void
_remove_session (gpointer user_data,
GObject *where_the_object_was)
{
FsRawConference *self = FS_RAW_CONFERENCE (user_data);
GST_OBJECT_LOCK (self);
self->priv->sessions =
g_list_remove_all (self->priv->sessions, where_the_object_was);
GST_OBJECT_UNLOCK (self);
}
static void
_remove_participant (gpointer user_data,
GObject *where_the_object_was)
{
FsRawConference *self = FS_RAW_CONFERENCE (user_data);
GST_OBJECT_LOCK (self);
self->priv->participants =
g_list_remove_all (self->priv->participants, where_the_object_was);
GST_OBJECT_UNLOCK (self);
}
static FsSession *
fs_raw_conference_new_session (FsConference *conf,
FsMediaType media_type,
GError **error)
{
FsRawConference *self = FS_RAW_CONFERENCE (conf);
FsRawSession *new_session = NULL;
guint id;
GST_OBJECT_LOCK (self);
do {
id = self->priv->max_session_id++;
} while (fs_raw_conference_get_session_by_id_locked (self, id));
GST_OBJECT_UNLOCK (self);
new_session = fs_raw_session_new (media_type, self, id, error);
if (new_session == NULL)
return NULL;
GST_OBJECT_LOCK (self);
self->priv->sessions = g_list_append (self->priv->sessions, new_session);
GST_OBJECT_UNLOCK (self);
g_object_weak_ref (G_OBJECT (new_session), _remove_session, self);
return FS_SESSION (new_session);
}
static FsParticipant *
fs_raw_conference_new_participant (FsConference *conf,
GError **error)
{
FsRawConference *self = FS_RAW_CONFERENCE (conf);
FsParticipant *new_participant = NULL;
new_participant = FS_PARTICIPANT_CAST (fs_raw_participant_new ());
GST_OBJECT_LOCK (self);
self->priv->participants = g_list_append (self->priv->participants,
new_participant);
GST_OBJECT_UNLOCK (self);
g_object_weak_ref (G_OBJECT (new_participant), _remove_participant, self);
return new_participant;
}
static void
fs_raw_conference_handle_message (
GstBin * bin,
GstMessage * message)
{
FsRawConference *self = FS_RAW_CONFERENCE (bin);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_STREAM_STATUS:
{
GstStreamStatusType type;
guint i;
gst_message_parse_stream_status (message, &type, NULL);
switch (type)
{
case GST_STREAM_STATUS_TYPE_ENTER:
GST_OBJECT_LOCK (self);
for (i = 0; i < self->priv->threads->len; i++)
{
if (g_ptr_array_index (self->priv->threads, i) ==
g_thread_self ())
goto done;
}
g_ptr_array_add (self->priv->threads, g_thread_self ());
done:
GST_OBJECT_UNLOCK (self);
break;
case GST_STREAM_STATUS_TYPE_LEAVE:
GST_OBJECT_LOCK (self);
while (g_ptr_array_remove_fast (self->priv->threads,
g_thread_self ()));
GST_OBJECT_UNLOCK (self);
break;
default:
/* Do nothing */
break;
}
}
break;
default:
break;
}
/* forward all messages to the parent */
GST_BIN_CLASS (fs_raw_conference_parent_class)->handle_message (bin,
message);
}
/**
* fs_codec_to_gst_caps
* @codec: A #FsCodec to be converted
*
* This function converts a #FsCodec to a fixed #GstCaps.
*
* Return value: A newly-allocated #GstCaps or %NULL if the codec was %NULL
*/
GstCaps *
fs_raw_codec_to_gst_caps (const FsCodec *codec)
{
GstCaps *caps;
if (codec == NULL || codec->encoding_name == NULL)
return NULL;
caps = gst_caps_from_string (codec->encoding_name);
if (!caps)
return NULL;
if (gst_caps_is_fixed (caps))
return caps;
gst_caps_unref (caps);
return NULL;
}
gboolean
fs_raw_conference_is_internal_thread (FsRawConference *self)
{
guint i;
gboolean ret = FALSE;
GST_OBJECT_LOCK (self);
for (i = 0; i < self->priv->threads->len; i++)
{
if (g_ptr_array_index (self->priv->threads, i) == g_thread_self ())
{
ret = TRUE;
break;
}
}
GST_OBJECT_UNLOCK (self);
return ret;
}