/*
* Farstream - Farstream RTP Conference Implementation
*
* Copyright 2007 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* Copyright 2007 Nokia Corp.
*
* fs-rtp-conference.c - RTP implementation for Farstream Conference Gstreamer
* Elements
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* SECTION:element-fsrtpconference
* @short_description: Farstream RTP Conference Gstreamer Elements
*
* This is the core gstreamer element for a RTP conference. It must be added
* to your pipeline before anything else is done. Then you create the session,
* participants and streams according to the #FsConference interface.
*
* The various sdes property allow you to set the content of the SDES packet
* in the sent RTCP reports.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-rtp-conference.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "fs-rtp-session.h"
#include "fs-rtp-stream.h"
#include "fs-rtp-participant.h"
GST_DEBUG_CATEGORY (fsrtpconference_debug);
GST_DEBUG_CATEGORY (fsrtpconference_disco);
GST_DEBUG_CATEGORY (fsrtpconference_nego);
#define GST_CAT_DEFAULT fsrtpconference_debug
/* Signals */
enum
{
LAST_SIGNAL
};
/* Properties */
enum
{
PROP_0,
PROP_SDES,
};
static GstStaticPadTemplate fs_rtp_conference_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
static GstStaticPadTemplate fs_rtp_conference_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%u_%u_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
#define FS_RTP_CONFERENCE_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), FS_TYPE_RTP_CONFERENCE, FsRtpConferencePrivate))
struct _FsRtpConferencePrivate
{
gboolean disposed;
/* Protected by GST_OBJECT_LOCK */
GList *sessions;
guint sessions_cookie;
guint max_session_id;
GList *participants;
/* Array of all internal threads, as GThreads */
GPtrArray *threads;
};
G_DEFINE_TYPE (FsRtpConference, fs_rtp_conference, FS_TYPE_CONFERENCE);
static void fs_rtp_conference_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec);
static void fs_rtp_conference_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec);
static void fs_rtp_conference_finalize (GObject *object);
static FsSession *fs_rtp_conference_new_session (FsConference *conf,
FsMediaType media_type,
GError **error);
static FsParticipant *fs_rtp_conference_new_participant (FsConference *conf,
GError **error);
static FsRtpSession *fs_rtp_conference_get_session_by_id_locked (
FsRtpConference *self, guint session_id);
static FsRtpSession *fs_rtp_conference_get_session_by_id (
FsRtpConference *self, guint session_id);
static GstCaps *_rtpbin_request_pt_map (GstElement *element,
guint session_id,
guint pt,
gpointer user_data);
static void _rtpbin_pad_added (GstElement *rtpbin,
GstPad *new_pad,
gpointer user_data);
static void _rtpbin_on_bye_ssrc (GstElement *rtpbin,
guint session_id,
guint ssrc,
gpointer user_data);
static void _rtpbin_on_ssrc_validated (GstElement *rtpbin,
guint session_id,
guint ssrc,
gpointer user_data);
static void
_remove_session (gpointer user_data,
GObject *where_the_object_was);
static void
_remove_participant (gpointer user_data,
GObject *where_the_object_was);
static void fs_rtp_conference_handle_message (
GstBin * bin,
GstMessage * message);
static GstStateChangeReturn fs_rtp_conference_change_state (
GstElement *element,
GstStateChange transition);
static void
fs_rtp_conference_dispose (GObject * object)
{
FsRtpConference *self = FS_RTP_CONFERENCE (object);
GList *item;
if (self->priv->disposed)
return;
if (self->rtpbin) {
gst_object_unref (self->rtpbin);
self->rtpbin = NULL;
}
for (item = g_list_first (self->priv->sessions);
item;
item = g_list_next (item))
g_object_weak_unref (G_OBJECT (item->data), _remove_session, self);
g_list_free (self->priv->sessions);
self->priv->sessions = NULL;
self->priv->sessions_cookie++;
for (item = g_list_first (self->priv->participants);
item;
item = g_list_next (item))
g_object_weak_unref (G_OBJECT (item->data), _remove_participant, self);
g_list_free (self->priv->participants);
self->priv->participants = NULL;
self->priv->disposed = TRUE;
G_OBJECT_CLASS (fs_rtp_conference_parent_class)->dispose (object);
}
static void
fs_rtp_conference_finalize (GObject * object)
{
FsRtpConference *self = FS_RTP_CONFERENCE (object);
/* Peek will always succeed here because we 'refed the class in the _init */
g_type_class_unref (g_type_class_peek (FS_TYPE_RTP_SUB_STREAM));
g_ptr_array_free (self->priv->threads, TRUE);
G_OBJECT_CLASS (fs_rtp_conference_parent_class)->finalize (object);
}
static void
fs_rtp_conference_class_init (FsRtpConferenceClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
FsConferenceClass *baseconf_class = FS_CONFERENCE_CLASS (klass);
GstBinClass *gstbin_class = GST_BIN_CLASS (klass);
g_type_class_add_private (klass, sizeof (FsRtpConferencePrivate));
GST_DEBUG_CATEGORY_INIT (fsrtpconference_debug, "fsrtpconference", 0,
"Farstream RTP Conference Element");
GST_DEBUG_CATEGORY_INIT (fsrtpconference_disco, "fsrtpconference_disco",
0, "Farstream RTP Codec Discovery");
GST_DEBUG_CATEGORY_INIT (fsrtpconference_nego, "fsrtpconference_nego",
0, "Farstream RTP Codec Negotiation");
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&fs_rtp_conference_sink_template));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&fs_rtp_conference_src_template));
gst_element_class_set_metadata (gstelement_class,
"Farstream RTP Conference",
"Generic/Bin/RTP",
"A Farstream RTP Conference",
"Olivier Crete <olivier.crete@collabora.co.uk>");
baseconf_class->new_session =
GST_DEBUG_FUNCPTR (fs_rtp_conference_new_session);
baseconf_class->new_participant =
GST_DEBUG_FUNCPTR (fs_rtp_conference_new_participant);
gstbin_class->handle_message =
GST_DEBUG_FUNCPTR (fs_rtp_conference_handle_message);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (fs_rtp_conference_change_state);
gobject_class->finalize = GST_DEBUG_FUNCPTR (fs_rtp_conference_finalize);
gobject_class->dispose = GST_DEBUG_FUNCPTR (fs_rtp_conference_dispose);
gobject_class->set_property =
GST_DEBUG_FUNCPTR (fs_rtp_conference_set_property);
gobject_class->get_property =
GST_DEBUG_FUNCPTR (fs_rtp_conference_get_property);
g_object_class_install_property (gobject_class, PROP_SDES,
g_param_spec_boxed ("sdes", "SDES Items for this conference",
"SDES items to use for sessions in this conference",
GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
fs_rtp_conference_init (FsRtpConference *conf)
{
GST_DEBUG_OBJECT (conf, "fs_rtp_conference_init");
conf->priv = FS_RTP_CONFERENCE_GET_PRIVATE (conf);
conf->priv->disposed = FALSE;
conf->priv->max_session_id = 1;
conf->priv->threads = g_ptr_array_new ();
conf->rtpbin = gst_element_factory_make ("rtpbin", NULL);
if (!conf->rtpbin) {
GST_ERROR_OBJECT (conf, "Could not create Rtpbin element");
return;
}
if (!gst_bin_add (GST_BIN (conf), conf->rtpbin)) {
GST_ERROR_OBJECT (conf, "Could not add Rtpbin element");
gst_object_unref (conf->rtpbin);
conf->rtpbin = NULL;
return;
}
gst_object_ref (conf->rtpbin);
g_signal_connect (conf->rtpbin, "request-pt-map",
G_CALLBACK (_rtpbin_request_pt_map), conf);
g_signal_connect (conf->rtpbin, "pad-added",
G_CALLBACK (_rtpbin_pad_added), conf);
g_signal_connect (conf->rtpbin, "on-bye-ssrc",
G_CALLBACK (_rtpbin_on_bye_ssrc), conf);
g_signal_connect (conf->rtpbin, "on-ssrc-validated",
G_CALLBACK (_rtpbin_on_ssrc_validated), conf);
/* We have to ref the class here because the class initialization
* in GLib is not thread safe
* http://bugzilla.gnome.org/show_bug.cgi?id=349410
* http://bugzilla.gnome.org/show_bug.cgi?id=64764
*/
g_type_class_ref (FS_TYPE_RTP_SUB_STREAM);
}
static void
fs_rtp_conference_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
FsRtpConference *self = FS_RTP_CONFERENCE (object);
if (!self->rtpbin)
return;
switch (prop_id)
{
case PROP_SDES:
g_object_get_property (G_OBJECT (self->rtpbin), "sdes", value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
fs_rtp_conference_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
FsRtpConference *self = FS_RTP_CONFERENCE (object);
if (!self->rtpbin)
return;
switch (prop_id)
{
case PROP_SDES:
g_object_set_property (G_OBJECT (self->rtpbin), "sdes", value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static GstCaps *
_rtpbin_request_pt_map (GstElement *element, guint session_id,
guint pt, gpointer user_data)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
FsRtpSession *session = NULL;
GstCaps *caps = NULL;
session = fs_rtp_conference_get_session_by_id (self, session_id);
if (session) {
caps = fs_rtp_session_request_pt_map (session, pt);
g_object_unref (session);
} else {
GST_WARNING_OBJECT (self,"Rtpbin %p tried to request the caps for "
" payload type %u for non-existent session %u",
element, pt, session_id);
}
return caps;
}
static void
_rtpbin_pad_added (GstElement *rtpbin, GstPad *new_pad,
gpointer user_data)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
gchar *name;
GST_DEBUG_OBJECT (self, "pad %s:%s added", GST_DEBUG_PAD_NAME (new_pad));
name = gst_pad_get_name (new_pad);
if (g_str_has_prefix (name, "recv_rtp_src_"))
{
guint session_id, ssrc, pt;
if (sscanf (name, "recv_rtp_src_%u_%u_%u",
&session_id, &ssrc, &pt) == 3 && ssrc <= G_MAXUINT32)
{
FsRtpSession *session =
fs_rtp_conference_get_session_by_id (self, session_id);
if (session)
{
fs_rtp_session_new_recv_pad (session, new_pad, ssrc, pt);
g_object_unref (session);
}
}
}
g_free (name);
}
static void
_rtpbin_on_bye_ssrc (GstElement *rtpbin,
guint session_id,
guint ssrc,
gpointer user_data)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
FsRtpSession *session =
fs_rtp_conference_get_session_by_id (self, session_id);
if (session)
{
fs_rtp_session_bye_ssrc (session, ssrc);
g_object_unref (session);
}
}
/**
* fs_rtp_conference_get_session_by_id_locked
* @self: The #FsRtpConference
* @session_id: The session id
*
* Gets the #FsRtpSession from a list of sessions or NULL if it doesnt exist
* You have to hold the GST_OBJECT_LOCK to call this function.
*
* Return value: A #FsRtpSession (unref after use) or NULL if it doesn't exist
*/
static FsRtpSession *
fs_rtp_conference_get_session_by_id_locked (FsRtpConference *self,
guint session_id)
{
GList *item = NULL;
for (item = g_list_first (self->priv->sessions);
item;
item = g_list_next (item)) {
FsRtpSession *session = item->data;
if (session->id == session_id) {
g_object_ref (session);
break;
}
}
if (item)
return FS_RTP_SESSION (item->data);
else
return NULL;
}
/**
* fs_rtp_conference_get_session_by_id
* @self: The #FsRtpConference
* @session_id: The session id
*
* Gets the #FsRtpSession from a list of sessions or NULL if it doesnt exist
*
* Return value: A #FsRtpSession (unref after use) or NULL if it doesn't exist
*/
static FsRtpSession *
fs_rtp_conference_get_session_by_id (FsRtpConference *self, guint session_id)
{
FsRtpSession *session = NULL;
GST_OBJECT_LOCK (self);
session = fs_rtp_conference_get_session_by_id_locked (self, session_id);
GST_OBJECT_UNLOCK (self);
return session;
}
static void
_remove_session (gpointer user_data,
GObject *where_the_object_was)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
GST_OBJECT_LOCK (self);
self->priv->sessions =
g_list_remove_all (self->priv->sessions, where_the_object_was);
self->priv->sessions_cookie++;
GST_OBJECT_UNLOCK (self);
}
static void
_remove_participant (gpointer user_data,
GObject *where_the_object_was)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
GST_OBJECT_LOCK (self);
self->priv->participants =
g_list_remove_all (self->priv->participants, where_the_object_was);
GST_OBJECT_UNLOCK (self);
}
static FsSession *
fs_rtp_conference_new_session (FsConference *conf,
FsMediaType media_type,
GError **error)
{
FsRtpConference *self = FS_RTP_CONFERENCE (conf);
FsSession *new_session = NULL;
guint id;
if (!self->rtpbin)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not create Rtpbin");
return NULL;
}
GST_OBJECT_LOCK (self);
do {
id = self->priv->max_session_id++;
} while (fs_rtp_conference_get_session_by_id_locked (self, id));
GST_OBJECT_UNLOCK (self);
new_session = FS_SESSION_CAST (fs_rtp_session_new (media_type, self, id,
error));
if (!new_session) {
return NULL;
}
GST_OBJECT_LOCK (self);
self->priv->sessions = g_list_append (self->priv->sessions, new_session);
self->priv->sessions_cookie++;
GST_OBJECT_UNLOCK (self);
g_object_weak_ref (G_OBJECT (new_session), _remove_session, self);
return new_session;
}
static FsParticipant *
fs_rtp_conference_new_participant (FsConference *conf,
GError **error)
{
FsRtpConference *self = FS_RTP_CONFERENCE (conf);
FsParticipant *new_participant = NULL;
if (!self->rtpbin)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not create Rtpbin");
return NULL;
}
new_participant = FS_PARTICIPANT_CAST (fs_rtp_participant_new ());
GST_OBJECT_LOCK (self);
self->priv->participants = g_list_append (self->priv->participants,
new_participant);
GST_OBJECT_UNLOCK (self);
g_object_weak_ref (G_OBJECT (new_participant), _remove_participant, self);
return new_participant;
}
static void
fs_rtp_conference_handle_message (
GstBin * bin,
GstMessage * message)
{
FsRtpConference *self = FS_RTP_CONFERENCE (bin);
if (!self->rtpbin)
goto out;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ELEMENT:
{
const GstStructure *s = gst_message_get_structure (message);
/* we change the structure name and add the session ID to it */
if (gst_structure_has_name (s, "application/x-rtp-source-sdes") &&
gst_structure_has_field_typed (s, "session", G_TYPE_UINT) &&
gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
gst_structure_has_field_typed (s, "cname", G_TYPE_STRING))
{
guint session_id;
guint ssrc;
const GValue *val;
FsRtpSession *session;
const gchar *cname;
val = gst_structure_get_value (s, "session");
session_id = g_value_get_uint (val);
val = gst_structure_get_value (s, "ssrc");
ssrc = g_value_get_uint (val);
cname = gst_structure_get_string (s, "cname");
if (!ssrc || !cname)
{
GST_WARNING_OBJECT (self,
"Got GstRTPBinSDES without a ssrc or a cname (ssrc:%u cname:%p)",
ssrc, cname);
break;
}
session = fs_rtp_conference_get_session_by_id (self, session_id);
if (session) {
fs_rtp_session_associate_ssrc_cname (session, ssrc, cname);
g_object_unref (session);
} else {
GST_WARNING_OBJECT (self,"Our RtpBin announced a new association"
"for non-existent session %u for ssrc: %u and cname %s",
session_id, ssrc, cname);
}
}
else if (gst_structure_has_name (s, "dtmf-event-processed") ||
gst_structure_has_name (s, "dtmf-event-dropped"))
{
GList *item;
guint cookie;
GST_OBJECT_LOCK (self);
restart:
cookie = self->priv->sessions_cookie;
for (item = self->priv->sessions; item; item = item->next)
{
GST_OBJECT_UNLOCK (self);
if (fs_rtp_session_handle_dtmf_event_message (item->data, message))
{
gst_message_unref (message);
message = NULL;
goto out;
}
GST_OBJECT_LOCK (self);
if (cookie != self->priv->sessions_cookie)
goto restart;
}
GST_OBJECT_UNLOCK (self);
}
}
break;
case GST_MESSAGE_STREAM_STATUS:
{
GstStreamStatusType type;
guint i;
gst_message_parse_stream_status (message, &type, NULL);
switch (type)
{
case GST_STREAM_STATUS_TYPE_ENTER:
GST_OBJECT_LOCK (self);
for (i = 0; i < self->priv->threads->len; i++)
{
if (g_ptr_array_index (self->priv->threads, i) ==
g_thread_self ())
goto done;
}
g_ptr_array_add (self->priv->threads, g_thread_self ());
done:
GST_OBJECT_UNLOCK (self);
break;
case GST_STREAM_STATUS_TYPE_LEAVE:
GST_OBJECT_LOCK (self);
while (g_ptr_array_remove_fast (self->priv->threads,
g_thread_self ()));
GST_OBJECT_UNLOCK (self);
break;
default:
/* Do nothing */
break;
}
}
break;
default:
break;
}
out:
/* forward all messages to the parent */
if (message)
GST_BIN_CLASS (fs_rtp_conference_parent_class)->handle_message (bin,
message);
}
static GstStateChangeReturn
fs_rtp_conference_change_state (GstElement *element, GstStateChange transition)
{
FsRtpConference *self = FS_RTP_CONFERENCE (element);
GstStateChangeReturn result;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
if (!self->rtpbin)
{
GST_ERROR_OBJECT (element, "Could not create the RtpBin subelement");
result = GST_STATE_CHANGE_FAILURE;
goto failure;
}
break;
default:
break;
}
if ((result =
GST_ELEMENT_CLASS (fs_rtp_conference_parent_class)->change_state (
element, transition)) == GST_STATE_CHANGE_FAILURE)
goto failure;
return result;
failure:
{
GST_ERROR_OBJECT (element, "parent failed state change");
return result;
}
}
/**
* fs_codec_to_gst_caps
* @codec: A #FsCodec to be converted
*
* This function converts a #FsCodec to a fixed #GstCaps with media type
* application/x-rtp.
*
* Return value: A newly-allocated #GstCaps or %NULL if the codec was %NULL
*/
GstCaps *
fs_codec_to_gst_caps (const FsCodec *codec)
{
GstCaps *caps;
GstStructure *structure;
GList *item;
if (codec == NULL)
return NULL;
caps = gst_caps_new_empty_simple ("application/x-rtp");
structure = gst_caps_get_structure (caps, 0);
if (codec->encoding_name)
{
gchar *encoding_name = g_ascii_strup (codec->encoding_name, -1);
gst_structure_set (structure,
"encoding-name", G_TYPE_STRING, encoding_name,
NULL);
g_free (encoding_name);
}
if (codec->clock_rate)
gst_structure_set (structure,
"clock-rate", G_TYPE_INT, codec->clock_rate, NULL);
if (fs_media_type_to_string (codec->media_type))
gst_structure_set (structure, "media", G_TYPE_STRING,
fs_media_type_to_string (codec->media_type), NULL);
if (codec->id >= 0 && codec->id < 128)
gst_structure_set (structure, "payload", G_TYPE_INT, codec->id, NULL);
if (codec->channels)
{
gchar tmp[11];
snprintf (tmp, 11, "%u", codec->channels);
gst_structure_set (structure,
"channels", G_TYPE_INT, codec->channels,
"encoding-params", G_TYPE_STRING, tmp,
NULL);
}
for (item = codec->optional_params;
item;
item = g_list_next (item))
{
FsCodecParameter *param = item->data;
gchar *lower_name = g_ascii_strdown (param->name, -1);
if (!strcmp (lower_name, "ptime") || !strcmp (lower_name, "maxptime"))
gst_structure_set (structure, lower_name, G_TYPE_UINT,
atoi (param->value), NULL);
else
gst_structure_set (structure, lower_name, G_TYPE_STRING, param->value,
NULL);
g_free (lower_name);
}
for (item = codec->feedback_params;
item;
item = g_list_next (item))
{
FsFeedbackParameter *param = item->data;
gchar *lower_type = g_ascii_strdown (param->type, -1);
gchar *rtcpfb_name;
if (param->subtype[0])
{
gchar *lower_subt = g_ascii_strdown (param->subtype, -1);
rtcpfb_name = g_strdup_printf ("rtcp-fb-%s-%s", lower_type, lower_subt);
g_free (lower_subt);
}
else
{
rtcpfb_name = g_strdup_printf ("rtcp-fb-%s", lower_type);
}
gst_structure_set (structure,
rtcpfb_name, G_TYPE_STRING, param->extra_params, NULL);
g_free (lower_type);
g_free (rtcpfb_name);
}
return caps;
}
static void
_rtpbin_on_ssrc_validated (GstElement *rtpbin,
guint session_id,
guint ssrc,
gpointer user_data)
{
FsRtpConference *self = FS_RTP_CONFERENCE (user_data);
FsRtpSession *session =
fs_rtp_conference_get_session_by_id (self, session_id);
if (session)
{
fs_rtp_session_ssrc_validated (session, ssrc);
g_object_unref (session);
}
}
gboolean
fs_rtp_conference_is_internal_thread (FsRtpConference *self)
{
guint i;
gboolean ret = FALSE;
GST_OBJECT_LOCK (self);
for (i = 0; i < self->priv->threads->len; i++)
{
if (g_ptr_array_index (self->priv->threads, i) == g_thread_self ())
{
ret = TRUE;
break;
}
}
GST_OBJECT_UNLOCK (self);
return ret;
}