/*
* Farstream - Farstream RTP Session
*
* Copyright 2007 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* Copyright 2007 Nokia Corp.
*
* fs-rtp-session.c - A Farstream RTP Session gobject
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* SECTION:fs-rtp-session
* @short_description: A RTP session in a #FsRtpConference
*
* This object represents one session, it is created by called
* fs_conference_new_session() on a #FsRtpConference. It can be either
* Audio or Video. It also represents data send with one and only one
* SSRC (although if there is a SSRC collision, that SSRC may change).
*
* <refsect2><title>Codec profiles</title>
* <para>
* It is possible to define "codec profiles", that is non-autodetected
* encoding and decoding pipelines for codecs. It is even possible to declare
* entirely new codecs using this method.
*
* To create a profile for a codec, add it to the codec-preferences with
* special optional parameters called "farstream-send-profile" and
* "farstream-recv-profile", these should contain gst-launch style descriptions
* of the encoding or decoding bin.
*
* As a special case, encoding profiles can have more than one unconnected
* source pad, all of these pads should produce application/x-rtp of some kind.
* The profile will be ignored if not ALL pads match currently negotiated
* codecs.
*
* Also, it is possible to declare profiles with only a decoding pipeline,
* you will only be able to receive from this codec, the encoding may be a
* secondary pad of some other codec.
* </para>
* </refsect2>
* <refsect2><title>SRTP signature and encryption</title>
* <para>
*
* To tell #FsRtpSession to authenticate encrypt the media it is
* sending using SRTP, one must set the parameters using a
* #GstStructure named "FarstreamSRTP" and passing it to
* fs_session_set_encryption_parameters().
*
* The cipher, auth, and key must be specified:
* <refsect3 id="FarstreamSRTP"><title>FarstreamSRTP</title>
* <table>
* <tr>
* <td><code>"rtp-cipher" and "rtcp-cipher"</code></td>
* <td>gchar *</td>
* <td>
* <para>Encryption algorithm</para>
* <para>Possible values: "null", "aes-128-icm" or "aes-256-icm"</para>
* </td>
* </tr>
* <tr>
* <td><code>"cipher"</code></td>
* <td>gchar *</td>
* <td><para>Default value for "rtp-cipher" and "rtcp-cipher"</para>
* <para>Possible values: "null", "aes-128-icm" or "aes-256-icm"</para>
* </td>
* </tr>
* <tr>
* <td><code>"rtp-auth" and "rtcp-auth"</code></td>
* <td>gchar *</td>
* <td>
* <para>Authentication algorithm, can never be null</para>
* <para>Possible values: "hmac-sha1-32" or "hmac-sha1-80"</para>
* </td>
* </tr>
* <tr>
* <td><code>"auth"</code></td>
* <td>gchar *</td>
* <td><para>Default value for "rtp-auth" and "rtcp-auth"</para>
* <para>Possible values: "hmac-sha1-32" or "hmac-sha1-80"</para>
* </td>
* </tr>
* <tr>
* <td><code>"key"</code></td>
* <td>#GstBuffer</td>
* <td>Size must be 30 if cipher is "aes-128-icm" and 46 if cipher is
* "aes-256-icm" </td>
* </tr>
* </table>
* </refsect3>
* </para>
* </refsect2>
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-rtp-session.h"
#include <string.h>
#include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
#include <farstream/fs-transmitter.h>
#include "farstream/fs-utils.h"
#include <farstream/fs-rtp.h>
#include "fs-rtp-bitrate-adapter.h"
#include "fs-rtp-stream.h"
#include "fs-rtp-participant.h"
#include "fs-rtp-discover-codecs.h"
#include "fs-rtp-codec-negotiation.h"
#include "fs-rtp-substream.h"
#include "fs-rtp-special-source.h"
#include "fs-rtp-codec-specific.h"
#include "fs-rtp-tfrc.h"
#define GST_CAT_DEFAULT fsrtpconference_debug
/* Signals */
enum
{
LAST_SIGNAL
};
/* props */
enum
{
PROP_0,
PROP_CONFERENCE,
PROP_MEDIA_TYPE,
PROP_ID,
PROP_SINK_PAD,
PROP_CODEC_PREFERENCES,
PROP_CODECS,
PROP_CODECS_WITHOUT_CONFIG,
PROP_CURRENT_SEND_CODEC,
PROP_NO_RTCP_TIMEOUT,
PROP_SSRC,
PROP_TOS,
PROP_SEND_BITRATE,
PROP_RTP_HEADER_EXTENSIONS,
PROP_RTP_HEADER_EXTENSION_PREFERENCES,
PROP_ALLOWED_SINK_CAPS,
PROP_ALLOWED_SRC_CAPS,
PROP_ENCRYPTION_PARAMETERS,
PROP_INTERNAL_SESSION
};
#define DEFAULT_NO_RTCP_TIMEOUT (7000)
struct _FsRtpSessionPrivate
{
FsMediaType media_type;
/* We hold a ref to this, needs the lock to access it */
FsRtpConference *conference;
GHashTable *transmitters;
/* We keep references to these elements
*/
GstElement *media_sink_valve;
GstElement *send_bitrate_adapter;
GstElement *send_tee;
GstElement *send_capsfilter;
GstElement *transmitter_rtp_tee;
GstElement *transmitter_rtcp_tee;
GstElement *transmitter_rtp_funnel;
GstElement *transmitter_rtcp_funnel;
GstElement *rtpmuxer;
GstElement *srtpenc;
GstElement *srtpdec;
GObject *rtpbin_internal_session;
/* Request pads that are disposed of when the tee is disposed of */
GstPad *send_tee_media_pad;
GstPad *send_tee_discovery_pad;
GstElement *discovery_valve;
/* We dont keep explicit references to the pads, the Bin does that for us
* only this element's methods can add/remote it
*/
GstPad *media_sink_pad;
/* The discovery elements are only created when codec parameter discovery is
* under progress.
* They are normally destroyed when the caps are found but may be destroyed
* by the dispose function too, we hold refs to them
* These three elements can only be modified from the streaming threads
* and are protected by the stream lock
*/
GstElement *discovery_fakesink;
GstElement *discovery_capsfilter;
GstElement *discovery_codecbin;
/* This one is protected by the session lock */
FsCodec *discovery_codec;
/* Request pad to release on dispose */
GstPad *rtpbin_send_rtp_sink;
GstPad *rtpbin_send_rtcp_src;
GstPad *rtpbin_recv_rtp_sink;
GstPad *rtpbin_recv_rtcp_sink;
/* Protected by the session mutex */
/* The codec bin is owned implicitely by the Conference bin for us */
FsCodec *current_send_codec;
FsCodec *requested_send_codec;
/* Can only be modified by the streaming thread with the pad blocked */
GstElement *send_codecbin;
GList *extra_send_capsfilters;
/* These lists are protected by the session mutex */
GList *streams;
guint streams_cookie;
GList *free_substreams;
guint streams_sending;
/* The static list of all the blueprints */
GList *blueprints;
GList *codec_preferences;
guint codec_preferences_generation;
/* These are protected by the session mutex */
GList *codec_associations;
GList *hdrext_negotiated;
GList *hdrext_preferences;
/* Protected by the session mutex */
gint no_rtcp_timeout;
GQueue telephony_events;
GstObject *running_telephony_src;
gboolean telephony_event_running;
GList *extra_sources;
/* This is a ht of ssrc->streams
* It is protected by the session mutex */
GHashTable *ssrc_streams;
GHashTable *ssrc_streams_manual;
GError *construction_error;
gulong send_pad_block_id;
gulong discovery_pad_block_id;
/* IP Type of Service, protext by session mutex */
guint tos;
/* Protected by session mutex */
guint send_bitrate;
GstStructure *encryption_parameters;
/* Protected by session mutex */
guint caps_generation;
GstCaps *input_caps;
GstCaps *output_caps;
/* Set at construction time, can not change */
FsRtpTfrc *rtp_tfrc;
FsRtpKeyunitManager *keyunit_manager;
/* Can only be used while using the lock */
GRWLock disposed_lock;
gboolean disposed;
};
G_DEFINE_TYPE (FsRtpSession, fs_rtp_session, FS_TYPE_SESSION);
#define FS_RTP_SESSION_GET_PRIVATE(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RTP_SESSION, FsRtpSessionPrivate))
static void fs_rtp_session_dispose (GObject *object);
static void fs_rtp_session_finalize (GObject *object);
static void fs_rtp_session_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec);
static void fs_rtp_session_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec);
static void fs_rtp_session_constructed (GObject *object);
static FsStream *fs_rtp_session_new_stream (FsSession *session,
FsParticipant *participant,
FsStreamDirection direction,
GError **error);
static gboolean fs_rtp_session_start_telephony_event (FsSession *session,
guint8 event,
guint8 volume);
static gboolean fs_rtp_session_stop_telephony_event (FsSession *session);
static gboolean fs_rtp_session_set_send_codec (FsSession *session,
FsCodec *send_codec,
GError **error);
static gboolean fs_rtp_session_set_codec_preferences (FsSession *session,
GList *codec_preferences,
GError **error);
static void fs_rtp_session_verify_send_codec_bin_locked (FsRtpSession *self);
static gchar **fs_rtp_session_list_transmitters (FsSession *session);
static GType fs_rtp_session_get_stream_transmitter_type (FsSession *session,
const gchar *transmitter);
static void _substream_no_rtcp_timedout_cb (FsRtpSubStream *substream,
FsRtpSession *session);
static GstElement *_substream_get_codec_bin (FsRtpSubStream *substream,
FsRtpStream *stream, FsCodec **new_codec,
guint current_builder_hash, guint *new_builder_hash,
GError **error, FsRtpSession *session);
static gboolean _stream_new_remote_codecs (FsRtpStream *stream,
GList *codecs, GError **error, gpointer user_data);
static FsStreamTransmitter* _stream_get_new_stream_transmitter (
FsRtpStream *stream,
FsParticipant *participant,
const gchar *transmitter_name,
GParameter *parameters,
guint n_parameters,
GError **error,
gpointer user_data);
static GList *fs_rtp_session_get_codecs_need_resend (FsSession *session,
GList *old_codecs, GList *new_codecs);
static void _remove_stream (gpointer user_data,
GObject *where_the_object_was);
static gboolean
fs_rtp_session_update_codecs (FsRtpSession *session,
FsRtpStream *stream,
GList *remote_codecs,
GError **error);
static CodecAssociation *
fs_rtp_session_get_recv_codec_locked (FsRtpSession *session,
guint pt,
FsRtpStream *stream,
FsCodec **recv_codec,
GError **error);
static void
fs_rtp_session_start_codec_param_gathering_locked (FsRtpSession *session);
static void
fs_rtp_session_stop_codec_param_gathering_unlock (FsRtpSession *session);
static void
fs_rtp_session_associate_free_substreams (FsRtpSession *session,
FsRtpStream *stream, guint32 ssrc);
static void
_send_caps_changed (GstPad *pad, GParamSpec *pspec, FsRtpSession *session);
static GstPadProbeReturn
_discovery_pad_blocked_callback (GstPad *pad, GstPadProbeInfo *info,
gpointer user_data);
static void
fs_rtp_session_set_send_bitrate (FsRtpSession *self, guint bitrate);
static gboolean
codecbin_set_bitrate (GstElement *codecbin, guint bitrate);
static gboolean
fs_rtp_session_set_allowed_caps (FsSession *session, GstCaps *sink_caps,
GstCaps *src_caps, GError **error);
static gboolean
fs_rtp_session_set_encryption_parameters (FsSession *session,
GstStructure *parameters, GError **error);
static GstCaps *
_srtpdec_request_key (GstElement *srtpdec, guint ssrc, gpointer user_data);
static gboolean
_stream_decrypt_clear_locked_cb (FsRtpStream *stream, gpointer user_data);
//static guint signals[LAST_SIGNAL] = { 0 };
static void
fs_rtp_session_class_init (FsRtpSessionClass *klass)
{
GObjectClass *gobject_class;
FsSessionClass *session_class;
gobject_class = (GObjectClass *) klass;
session_class = FS_SESSION_CLASS (klass);
gobject_class->set_property = fs_rtp_session_set_property;
gobject_class->get_property = fs_rtp_session_get_property;
gobject_class->constructed = fs_rtp_session_constructed;
session_class->new_stream = fs_rtp_session_new_stream;
session_class->start_telephony_event = fs_rtp_session_start_telephony_event;
session_class->stop_telephony_event = fs_rtp_session_stop_telephony_event;
session_class->set_send_codec = fs_rtp_session_set_send_codec;
session_class->set_codec_preferences =
fs_rtp_session_set_codec_preferences;
session_class->list_transmitters = fs_rtp_session_list_transmitters;
session_class->get_stream_transmitter_type =
fs_rtp_session_get_stream_transmitter_type;
session_class->codecs_need_resend = fs_rtp_session_get_codecs_need_resend;
session_class->set_allowed_caps = fs_rtp_session_set_allowed_caps;
session_class->set_encryption_parameters =
fs_rtp_session_set_encryption_parameters;
g_object_class_override_property (gobject_class,
PROP_CONFERENCE, "conference");
g_object_class_override_property (gobject_class,
PROP_MEDIA_TYPE, "media-type");
g_object_class_override_property (gobject_class,
PROP_ID, "id");
g_object_class_override_property (gobject_class,
PROP_SINK_PAD, "sink-pad");
g_object_class_override_property (gobject_class,
PROP_CODEC_PREFERENCES, "codec-preferences");
g_object_class_override_property (gobject_class,
PROP_CODECS, "codecs");
g_object_class_override_property (gobject_class,
PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
g_object_class_override_property (gobject_class,
PROP_CURRENT_SEND_CODEC, "current-send-codec");
g_object_class_override_property (gobject_class,
PROP_TOS, "tos");
g_object_class_override_property (gobject_class,
PROP_ALLOWED_SINK_CAPS, "allowed-sink-caps");
g_object_class_override_property (gobject_class,
PROP_ALLOWED_SRC_CAPS, "allowed-src-caps");
g_object_class_override_property (gobject_class,
PROP_ENCRYPTION_PARAMETERS, "encryption-parameters");
g_object_class_install_property (gobject_class,
PROP_NO_RTCP_TIMEOUT,
g_param_spec_int ("no-rtcp-timeout",
"The timeout (in ms) before no RTCP is assumed",
"This is the time (in ms) after which data received without RTCP"
" is attached the FsStream, this only works if there is only one"
" FsStream. -1 will wait forever. 0 will not wait for RTCP and"
" attach it immediataly to the FsStream and prohibit the creation"
" of a second FsStream",
-1, G_MAXINT, DEFAULT_NO_RTCP_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_SSRC,
g_param_spec_uint ("ssrc",
"The SSRC of the sent data",
"This is the current SSRC used to send data"
" (defaults to a random value)",
0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_SEND_BITRATE,
g_param_spec_uint ("send-bitrate",
"The bitrate at which data will be sent",
"The bitrate that the session will try to send at in bits/sec",
0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_RTP_HEADER_EXTENSIONS,
g_param_spec_boxed ("rtp-header-extensions",
"Currently negotiated RTP header extensions",
"GList of RTP Header extensions that have been negotiated and will"
" be used when sending of receiving RTP packets",
FS_TYPE_RTP_HEADER_EXTENSION_LIST,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_RTP_HEADER_EXTENSION_PREFERENCES,
g_param_spec_boxed ("rtp-header-extension-preferences",
"Desired RTP header extensions",
"GList of RTP Header extensions that are locally supported and"
" desired by the application",
FS_TYPE_RTP_HEADER_EXTENSION_LIST,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_INTERNAL_SESSION,
g_param_spec_object ("internal-session",
"Internal RTP Session",
"Internal RTPSession object from rtpbin",
G_TYPE_OBJECT,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
gobject_class->dispose = fs_rtp_session_dispose;
gobject_class->finalize = fs_rtp_session_finalize;
g_type_class_add_private (klass, sizeof (FsRtpSessionPrivate));
}
static void
fs_rtp_session_init (FsRtpSession *self)
{
/* member init */
self->priv = FS_RTP_SESSION_GET_PRIVATE (self);
self->priv->disposed = FALSE;
self->priv->construction_error = NULL;
self->priv->transmitters = g_hash_table_new_full (g_str_hash, g_str_equal,
g_free, g_object_unref);
g_mutex_init (&self->mutex);
g_rw_lock_init (&self->priv->disposed_lock);
self->priv->media_type = FS_MEDIA_TYPE_LAST + 1;
self->priv->no_rtcp_timeout = DEFAULT_NO_RTCP_TIMEOUT;
self->priv->ssrc_streams = g_hash_table_new (g_direct_hash, g_direct_equal);
self->priv->ssrc_streams_manual = g_hash_table_new (g_direct_hash,
g_direct_equal);
g_queue_init (&self->priv->telephony_events);
}
static void
_remove_transmitter (gpointer key, gpointer value, gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
FsTransmitter *transmitter = FS_TRANSMITTER (value);
GstElement *src, *sink;
g_object_get (transmitter, "gst-sink", &sink, "gst-src", &src, NULL);
gst_element_set_locked_state (src, TRUE);
gst_element_set_state (src, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), src);
gst_element_set_locked_state (sink, TRUE);
gst_element_set_state (sink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), sink);
gst_object_unref (src);
gst_object_unref (sink);
}
static void
_stop_transmitter_elem (gpointer key, gpointer value, gpointer elem_name)
{
FsTransmitter *transmitter = FS_TRANSMITTER (value);
GstElement *elem = NULL;
g_object_get (transmitter, elem_name, &elem, NULL);
gst_element_set_locked_state (elem, TRUE);
gst_element_set_state (elem, GST_STATE_NULL);
gst_object_unref (elem);
}
static void
stop_and_remove (GstBin *conf, GstElement **element, gboolean unref)
{
if (*element == NULL)
return;
gst_element_set_locked_state (*element, TRUE);
if (gst_element_set_state (*element, GST_STATE_NULL) !=
GST_STATE_CHANGE_SUCCESS)
{
gchar *elemname = gst_element_get_name (*element);
GST_WARNING ("Could not set %s to GST_STATE_NULL", elemname);
g_free (elemname);
}
if (!gst_bin_remove (conf, *element))
{
gchar *binname = gst_element_get_name (conf);
gchar *elemname = gst_element_get_name (*element);
GST_WARNING ("Could not remove %s from %s", binname, elemname);
g_free (binname);
g_free (elemname);
}
if (unref)
gst_object_unref (*element);
*element = NULL;
}
static void
fs_rtp_session_dispose (GObject *obj)
{
FsRtpSession *self = FS_RTP_SESSION (obj);
GList *item = NULL;
GstBin *conferencebin = NULL;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
if (fs_rtp_conference_is_internal_thread (self->priv->conference))
{
g_critical ("You MUST call fs_session_destroy() from your main thread, "
"this FsSession may now be leaked");
fs_rtp_session_has_disposed_exit (self);
return;
}
fs_rtp_session_has_disposed_exit (self);
g_rw_lock_writer_lock (&self->priv->disposed_lock);
if (self->priv->disposed)
{
g_rw_lock_writer_unlock (&self->priv->disposed_lock);
return;
}
self->priv->disposed = TRUE;
g_rw_lock_writer_unlock (&self->priv->disposed_lock);
conferencebin = GST_BIN (self->priv->conference);
if (self->priv->rtpbin_internal_session)
g_object_unref (self->priv->rtpbin_internal_session);
self->priv->rtpbin_internal_session = NULL;
if (self->priv->keyunit_manager)
g_object_unref (self->priv->keyunit_manager);
self->priv->keyunit_manager = NULL;
/* Lets stop all of the elements sink to source */
/* First the send pipeline */
if (self->priv->transmitters)
g_hash_table_foreach (self->priv->transmitters, _stop_transmitter_elem,
"gst-sink");
stop_and_remove (conferencebin, &self->priv->transmitter_rtp_tee, TRUE);
stop_and_remove (conferencebin, &self->priv->transmitter_rtcp_tee, TRUE);
if (self->priv->rtpbin_send_rtcp_src)
gst_pad_set_active (self->priv->rtpbin_send_rtcp_src, FALSE);
if (self->priv->rtpbin_send_rtp_sink)
gst_pad_set_active (self->priv->rtpbin_send_rtp_sink, FALSE);
if (self->priv->rtp_tfrc)
{
fs_rtp_tfrc_destroy (self->priv->rtp_tfrc);
g_object_unref (self->priv->rtp_tfrc);
}
self->priv->rtp_tfrc = NULL;
FS_RTP_SESSION_LOCK (self);
fs_rtp_session_stop_codec_param_gathering_unlock (self);
if (self->priv->discovery_valve)
g_object_set (self->priv->discovery_valve, "drop", TRUE, NULL);
stop_and_remove (conferencebin, &self->priv->discovery_valve, FALSE);
if (self->priv->send_tee_discovery_pad)
{
gst_object_unref (self->priv->send_tee_discovery_pad);
self->priv->send_tee_discovery_pad = NULL;
}
if (self->priv->send_tee_media_pad)
{
gst_object_unref (self->priv->send_tee_media_pad);
self->priv->send_tee_media_pad = NULL;
}
if (self->priv->send_capsfilter && self->priv->rtpmuxer)
{
GstPad *srcpad = gst_element_get_static_pad (self->priv->send_capsfilter,
"src");
if (srcpad)
{
GstPad *otherpad = gst_pad_get_peer (srcpad);
if (otherpad)
{
gst_element_release_request_pad (self->priv->rtpmuxer, otherpad);
gst_object_unref (otherpad);
}
gst_object_unref (srcpad);
}
}
for (item = self->priv->extra_send_capsfilters;
item;
item = g_list_next (item))
{
GstElement *cf = item->data;
GstPad *ourpad = gst_element_get_static_pad (cf, "src");
GstPad *pad = NULL;
if (ourpad)
{
pad = gst_pad_get_peer (ourpad);
if (pad)
{
gst_element_release_request_pad (self->priv->rtpmuxer, pad);
gst_object_unref (pad);
}
gst_object_unref (ourpad);
}
}
stop_and_remove (conferencebin, &self->priv->rtpmuxer, TRUE);
stop_and_remove (conferencebin, &self->priv->send_capsfilter, TRUE);
while (self->priv->extra_send_capsfilters)
{
GstElement *cf = self->priv->extra_send_capsfilters->data;
stop_and_remove (conferencebin, &cf, FALSE);
self->priv->extra_send_capsfilters = g_list_delete_link (
self->priv->extra_send_capsfilters,
self->priv->extra_send_capsfilters);
}
stop_and_remove (conferencebin, &self->priv->send_codecbin, FALSE);
stop_and_remove (conferencebin, &self->priv->media_sink_valve, TRUE);
stop_and_remove (conferencebin, &self->priv->send_tee, TRUE);
stop_and_remove (conferencebin, &self->priv->send_bitrate_adapter, FALSE);
if (self->priv->media_sink_pad)
gst_pad_set_active (self->priv->media_sink_pad, FALSE);
/* Now the recv pipeline */
if (self->priv->free_substreams)
g_list_foreach (self->priv->free_substreams, (GFunc) fs_rtp_sub_stream_stop,
NULL);
if (self->priv->rtpbin_recv_rtp_sink)
gst_pad_set_active (self->priv->rtpbin_recv_rtp_sink, FALSE);
if (self->priv->rtpbin_recv_rtcp_sink)
gst_pad_set_active (self->priv->rtpbin_recv_rtcp_sink, FALSE);
stop_and_remove (conferencebin, &self->priv->transmitter_rtp_funnel, TRUE);
stop_and_remove (conferencebin, &self->priv->transmitter_rtcp_funnel, TRUE);
if (self->priv->transmitters)
g_hash_table_foreach (self->priv->transmitters, _stop_transmitter_elem,
"gst-src");
self->priv->extra_sources =
fs_rtp_special_sources_destroy (self->priv->extra_sources);
if (self->priv->running_telephony_src)
gst_object_unref (self->priv->running_telephony_src);
/* Now they should all be stopped, we can remove them in peace */
if (self->priv->media_sink_pad)
{
gst_pad_set_active (self->priv->media_sink_pad, FALSE);
gst_element_remove_pad (GST_ELEMENT (self->priv->conference),
self->priv->media_sink_pad);
self->priv->media_sink_pad = NULL;
}
if (self->priv->rtpbin_send_rtcp_src)
{
gst_pad_set_active (self->priv->rtpbin_send_rtcp_src, FALSE);
gst_element_release_request_pad (self->priv->conference->rtpbin,
self->priv->rtpbin_send_rtcp_src);
gst_object_unref (self->priv->rtpbin_send_rtcp_src);
self->priv->rtpbin_send_rtcp_src = NULL;
}
if (self->priv->rtpbin_send_rtp_sink)
{
gst_pad_set_active (self->priv->rtpbin_send_rtp_sink, FALSE);
gst_element_release_request_pad (self->priv->conference->rtpbin,
self->priv->rtpbin_send_rtp_sink);
gst_object_unref (self->priv->rtpbin_send_rtp_sink);
self->priv->rtpbin_send_rtp_sink = NULL;
}
if (self->priv->rtpbin_recv_rtp_sink)
{
gst_pad_set_active (self->priv->rtpbin_recv_rtp_sink, FALSE);
gst_element_release_request_pad (self->priv->conference->rtpbin,
self->priv->rtpbin_recv_rtp_sink);
gst_object_unref (self->priv->rtpbin_recv_rtp_sink);
self->priv->rtpbin_recv_rtp_sink = NULL;
}
if (self->priv->rtpbin_recv_rtcp_sink)
{
gst_pad_set_active (self->priv->rtpbin_recv_rtcp_sink, FALSE);
gst_element_release_request_pad (self->priv->conference->rtpbin,
self->priv->rtpbin_recv_rtcp_sink);
gst_object_unref (self->priv->rtpbin_recv_rtcp_sink);
self->priv->rtpbin_recv_rtcp_sink = NULL;
}
g_clear_object (&self->priv->srtpenc);
g_clear_object (&self->priv->srtpdec);
if (self->priv->transmitters)
{
g_hash_table_foreach (self->priv->transmitters, _remove_transmitter,
self);
}
if (self->priv->free_substreams)
{
g_list_foreach (self->priv->free_substreams, (GFunc) g_object_unref, NULL);
g_list_free (self->priv->free_substreams);
self->priv->free_substreams = NULL;
}
if (self->priv->conference)
{
g_object_unref (self->priv->conference);
self->priv->conference = NULL;
}
for (item = g_list_first (self->priv->streams);
item;
item = g_list_next (item))
{
g_object_weak_unref (G_OBJECT (item->data), _remove_stream, self);
fs_stream_destroy (item->data);
}
g_list_free (self->priv->streams);
self->priv->streams = NULL;
self->priv->streams_cookie++;
g_hash_table_remove_all (self->priv->ssrc_streams);
g_hash_table_remove_all (self->priv->ssrc_streams_manual);
if (self->priv->transmitters)
{
g_hash_table_destroy (self->priv->transmitters);
self->priv->transmitters = NULL;
}
G_OBJECT_CLASS (fs_rtp_session_parent_class)->dispose (obj);
}
static void
fs_rtp_session_finalize (GObject *object)
{
FsRtpSession *self = FS_RTP_SESSION (object);
g_mutex_clear (&self->mutex);
if (self->priv->blueprints)
{
fs_rtp_blueprints_unref (self->priv->media_type);
self->priv->blueprints = NULL;
}
g_list_free_full (self->priv->codec_preferences,
(GDestroyNotify) codec_preference_destroy);
codec_association_list_destroy (self->priv->codec_associations);
fs_rtp_header_extension_list_destroy (self->priv->hdrext_preferences);
fs_rtp_header_extension_list_destroy (self->priv->hdrext_negotiated);
if (self->priv->current_send_codec)
fs_codec_destroy (self->priv->current_send_codec);
if (self->priv->requested_send_codec)
fs_codec_destroy (self->priv->requested_send_codec);
if (self->priv->ssrc_streams)
g_hash_table_destroy (self->priv->ssrc_streams);
if (self->priv->ssrc_streams_manual)
g_hash_table_destroy (self->priv->ssrc_streams_manual);
gst_caps_unref (self->priv->input_caps);
gst_caps_unref (self->priv->output_caps);
g_queue_foreach (&self->priv->telephony_events, (GFunc) gst_event_unref,
NULL);
if (self->priv->encryption_parameters)
gst_structure_free (self->priv->encryption_parameters);
g_rw_lock_clear (&self->priv->disposed_lock);
G_OBJECT_CLASS (fs_rtp_session_parent_class)->finalize (object);
}
gboolean
fs_rtp_session_has_disposed_enter (FsRtpSession *self, GError **error)
{
g_rw_lock_reader_lock (&self->priv->disposed_lock);
if (self->priv->disposed)
{
g_rw_lock_reader_unlock (&self->priv->disposed_lock);
g_set_error (error, FS_ERROR, FS_ERROR_DISPOSED,
"Called function after session has been disposed");
return TRUE;
}
return FALSE;
}
void
fs_rtp_session_has_disposed_exit (FsRtpSession *self)
{
g_rw_lock_reader_unlock (&self->priv->disposed_lock);
}
static void
fs_rtp_session_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
FsRtpSession *self = FS_RTP_SESSION (object);
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
switch (prop_id)
{
case PROP_MEDIA_TYPE:
g_value_set_enum (value, self->priv->media_type);
break;
case PROP_ID:
g_value_set_uint (value, self->id);
break;
case PROP_SINK_PAD:
g_value_set_object (value, self->priv->media_sink_pad);
break;
case PROP_CODEC_PREFERENCES:
{
GQueue tmpqueue = G_QUEUE_INIT;
GList *item;
FS_RTP_SESSION_LOCK (self);
for (item = self->priv->codec_preferences; item; item = item->next)
{
CodecPreference *cp = item->data;
g_queue_push_tail (&tmpqueue, fs_codec_copy (cp->codec));
}
g_value_take_boxed (value, tmpqueue.head);
FS_RTP_SESSION_UNLOCK (self);
break;
}
case PROP_CODECS:
{
GList *codecs = NULL;
GList *item = NULL;
FS_RTP_SESSION_LOCK (self);
for (item = g_list_first (self->priv->codec_associations);
item;
item = g_list_next (item))
{
CodecAssociation *ca = item->data;
if (!ca->disable && ca->need_config)
break;
}
if (item == NULL)
codecs = codec_associations_to_codecs (self->priv->codec_associations,
TRUE);
FS_RTP_SESSION_UNLOCK (self);
g_value_take_boxed (value, codecs);
}
break;
case PROP_CODECS_WITHOUT_CONFIG:
{
GList *codecs = NULL;
FS_RTP_SESSION_LOCK (self);
codecs = codec_associations_to_codecs (self->priv->codec_associations,
FALSE);
FS_RTP_SESSION_UNLOCK (self);
g_value_take_boxed (value, codecs);
}
break;
case PROP_CONFERENCE:
g_value_set_object (value, self->priv->conference);
break;
case PROP_CURRENT_SEND_CODEC:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->current_send_codec);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_NO_RTCP_TIMEOUT:
FS_RTP_SESSION_LOCK (self);
g_value_set_int (value, self->priv->no_rtcp_timeout);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_SSRC:
if (self->priv->rtpbin_send_rtp_sink)
{
GstCaps *caps = NULL;
g_object_get (self->priv->rtpbin_send_rtp_sink, "caps", &caps, NULL);
if (caps)
{
if (gst_caps_get_size (caps) > 0)
{
const GstStructure *s = gst_caps_get_structure (caps, 0);
guint ssrc;
if (gst_structure_get_uint (s, "ssrc", &ssrc))
g_value_set_uint (value, ssrc);
}
gst_caps_unref (caps);
}
break;
}
case PROP_TOS:
FS_RTP_SESSION_LOCK (self);
g_value_set_uint (value, self->priv->tos);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_SEND_BITRATE:
FS_RTP_SESSION_LOCK (self);
g_value_set_uint (value, self->priv->send_bitrate);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_RTP_HEADER_EXTENSIONS:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->hdrext_negotiated);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_RTP_HEADER_EXTENSION_PREFERENCES:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->hdrext_preferences);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_ALLOWED_SINK_CAPS:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->input_caps);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_ALLOWED_SRC_CAPS:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->output_caps);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_ENCRYPTION_PARAMETERS:
FS_RTP_SESSION_LOCK (self);
g_value_set_boxed (value, self->priv->encryption_parameters);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_INTERNAL_SESSION:
g_value_set_object (value, self->priv->rtpbin_internal_session);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
fs_rtp_session_has_disposed_exit (self);
}
static void
set_tos (gpointer key, gpointer val, gpointer user_data)
{
FsTransmitter *trans = val;
guint tos = GPOINTER_TO_UINT (user_data);
g_object_set (trans, "tos", tos, NULL);
}
static void
fs_rtp_session_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
FsRtpSession *self = FS_RTP_SESSION (object);
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
switch (prop_id)
{
case PROP_MEDIA_TYPE:
self->priv->media_type = g_value_get_enum (value);
break;
case PROP_ID:
self->id = g_value_get_uint (value);
break;
case PROP_CONFERENCE:
self->priv->conference = FS_RTP_CONFERENCE (g_value_dup_object (value));
break;
case PROP_NO_RTCP_TIMEOUT:
FS_RTP_SESSION_LOCK (self);
self->priv->no_rtcp_timeout = g_value_get_int (value);
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_SSRC:
g_object_set_property (G_OBJECT (self->priv->rtpbin_internal_session),
"internal-ssrc", value);
break;
case PROP_TOS:
FS_RTP_SESSION_LOCK (self);
self->priv->tos = g_value_get_uint (value);
g_hash_table_foreach (self->priv->transmitters, set_tos,
GUINT_TO_POINTER (self->priv->tos));
FS_RTP_SESSION_UNLOCK (self);
break;
case PROP_SEND_BITRATE:
fs_rtp_session_set_send_bitrate (self, g_value_get_uint (value));
break;
case PROP_RTP_HEADER_EXTENSION_PREFERENCES:
FS_RTP_SESSION_LOCK (self);
fs_rtp_header_extension_list_destroy (self->priv->hdrext_preferences);
self->priv->hdrext_preferences = g_value_dup_boxed (value);
FS_RTP_SESSION_UNLOCK (self);
/* This call can't fail because the codecs do NOT change */
fs_rtp_session_update_codecs (self, NULL, NULL, NULL);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
fs_rtp_session_has_disposed_exit (self);
}
static void
_rtpbin_internal_session_notify_internal_ssrc (GObject *internal_session,
GParamSpec *pspec, gpointer self)
{
g_object_notify (G_OBJECT (self), "ssrc");
}
static void
_rtpbin_send_rtp_sink_notify_caps (GstPad *pad, GParamSpec *param, gpointer self)
{
g_object_notify (G_OBJECT (self), "ssrc");
}
static void
_rtp_tfrc_bitrate_changed (GObject *rtp_tfrc, GParamSpec *pspec,
FsRtpSession *self)
{
guint bitrate;
g_object_get (rtp_tfrc, "bitrate", &bitrate, NULL);
g_debug ("setting bitrate to: %u", bitrate);
fs_rtp_session_set_send_bitrate (self, bitrate);
}
static GstElement *
_rtpbin_request_encoder (GstElement *rtpbin, guint session_id,
gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
if (self->id == session_id && self->priv->srtpenc) {
return gst_object_ref (self->priv->srtpenc);
} else {
return NULL;
}
}
static GstElement *
_rtpbin_request_decoder (GstElement *rtpbin, guint session_id,
gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
if (self->id == session_id && self->priv->srtpdec)
return gst_object_ref (self->priv->srtpdec);
else
return NULL;
}
static void
fs_rtp_session_constructed (GObject *object)
{
FsRtpSession *self = FS_RTP_SESSION_CAST (object);
GstElement *valve = NULL;
GstElement *capsfilter = NULL;
GstElement *tee = NULL;
GstElement *funnel = NULL;
GstElement *muxer = NULL;
GstPad *tee_sink_pad = NULL;
GstPad *valve_sink_pad = NULL;
GstPad *funnel_src_pad = NULL;
GstPad *muxer_src_pad = NULL;
GstPad *transmitter_rtcp_tee_sink_pad;
GstPad *pad;
GstPadLinkReturn ret;
gchar *tmp;
gulong request_rtp_encoder_id = 0;
gulong request_rtp_decoder_id = 0;
gulong request_rtcp_encoder_id = 0;
gulong request_rtcp_decoder_id = 0;
if (self->id == 0)
{
g_error ("You can no instantiate this element directly, you MUST"
" call fs_rtp_session_new ()");
return;
}
self->priv->blueprints = fs_rtp_blueprints_get (self->priv->media_type,
&self->priv->construction_error);
if (!self->priv->blueprints)
{
if (!self->priv->construction_error)
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_INTERNAL,
"Unknown error while trying to discover codecs");
return;
}
if (self->priv->media_type == FS_MEDIA_TYPE_AUDIO)
self->priv->input_caps = gst_caps_new_empty_simple ("audio/x-raw");
else if (self->priv->media_type == FS_MEDIA_TYPE_VIDEO)
self->priv->input_caps = gst_caps_new_empty_simple ("video/x-raw");
else if (self->priv->media_type == FS_MEDIA_TYPE_APPLICATION)
self->priv->input_caps = gst_caps_new_any ();
else
g_assert_not_reached ();
self->priv->output_caps = gst_caps_ref (self->priv->input_caps);
tmp = g_strdup_printf ("send_tee_%u", self->id);
tee = gst_element_factory_make ("tee", tmp);
g_free (tmp);
if (!tee)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the tee element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), tee))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the tee element to the FsRtpConference");
gst_object_unref (tee);
return;
}
gst_element_set_state (tee, GST_STATE_PLAYING);
self->priv->send_tee = gst_object_ref (tee);
if (self->priv->media_type == FS_MEDIA_TYPE_VIDEO)
{
GstElement *bitrate_adapter = fs_rtp_bitrate_adapter_new ();
if (!gst_bin_add (GST_BIN (self->priv->conference), bitrate_adapter))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the bitrate adapter to the FsRtpConference");
gst_object_unref (bitrate_adapter);
return;
}
if (!gst_element_link (bitrate_adapter, tee))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not link bitrate adapter to tee");
gst_object_unref (bitrate_adapter);
return;
}
gst_element_set_state (bitrate_adapter, GST_STATE_PLAYING);
self->priv->send_bitrate_adapter = bitrate_adapter;
tee_sink_pad = gst_element_get_static_pad (bitrate_adapter, "sink");
}
else
{
tee_sink_pad = gst_element_get_static_pad (tee, "sink");
}
tmp = g_strdup_printf ("sink_%u", self->id);
self->priv->media_sink_pad = gst_ghost_pad_new (tmp, tee_sink_pad);
g_free (tmp);
if (!self->priv->media_sink_pad)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create ghost pad for tee's sink pad");
return;
}
gst_pad_set_active (self->priv->media_sink_pad, TRUE);
if (!gst_element_add_pad (GST_ELEMENT (self->priv->conference),
self->priv->media_sink_pad))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add ghost pad to the conference bin");
gst_object_unref (self->priv->media_sink_pad);
self->priv->media_sink_pad = NULL;
return;
}
gst_object_unref (tee_sink_pad);
/* Create the SRTP encoder & decoder */
tmp = g_strdup_printf ("srtpenc_%u", self->id);
self->priv->srtpenc = gst_element_factory_make ("srtpenc", tmp);
g_free (tmp);
if (self->priv->srtpenc) {
GstPad *tmppad;
gst_object_ref_sink (self->priv->srtpenc);
g_object_set (self->priv->srtpenc,
"rtp-cipher", 0, "rtp-auth", 0, "rtcp-cipher", 0, "rtcp-auth", 0, NULL);
tmp = g_strdup_printf ("rtp_sink_%u", self->id);
tmppad = gst_element_get_request_pad (self->priv->srtpenc, tmp);
gst_object_unref (tmppad);
g_free (tmp);
tmp = g_strdup_printf ("rtcp_sink_%u", self->id);
tmppad = gst_element_get_request_pad (self->priv->srtpenc, tmp);
gst_object_unref (tmppad);
g_free (tmp);
}
tmp = g_strdup_printf ("srtpdec_%u", self->id);
self->priv->srtpdec = gst_element_factory_make ("srtpdec", tmp);
g_free (tmp);
if (self->priv->srtpdec) {
gst_object_ref_sink (self->priv->srtpdec);
g_signal_connect_object (self->priv->srtpdec, "request-key",
G_CALLBACK (_srtpdec_request_key), self, 0);
}
request_rtp_encoder_id =
g_signal_connect (self->priv->conference->rtpbin, "request-rtp-encoder",
G_CALLBACK (_rtpbin_request_encoder), self);
request_rtp_decoder_id =
g_signal_connect (self->priv->conference->rtpbin, "request-rtp-decoder",
G_CALLBACK (_rtpbin_request_decoder), self);
request_rtcp_encoder_id =
g_signal_connect (self->priv->conference->rtpbin, "request-rtcp-encoder",
G_CALLBACK (_rtpbin_request_encoder), self);
request_rtcp_decoder_id =
g_signal_connect (self->priv->conference->rtpbin, "request-rtcp-decoder",
G_CALLBACK (_rtpbin_request_decoder), self);
/* Request the parts of rtpbin */
tmp = g_strdup_printf ("recv_rtp_sink_%u", self->id);
self->priv->rtpbin_recv_rtp_sink =
gst_element_get_request_pad (self->priv->conference->rtpbin, tmp);
g_free (tmp);
tmp = g_strdup_printf ("recv_rtcp_sink_%u", self->id);
self->priv->rtpbin_recv_rtcp_sink =
gst_element_get_request_pad (self->priv->conference->rtpbin,
tmp);
g_free (tmp);
tmp = g_strdup_printf ("send_rtp_sink_%u", self->id);
self->priv->rtpbin_send_rtp_sink =
gst_element_get_request_pad (self->priv->conference->rtpbin, tmp);
g_free (tmp);
tmp = g_strdup_printf ("send_rtcp_src_%u", self->id);
self->priv->rtpbin_send_rtcp_src =
gst_element_get_request_pad (self->priv->conference->rtpbin, tmp);
g_free (tmp);
g_signal_handler_disconnect (self->priv->conference->rtpbin,
request_rtp_encoder_id);
g_signal_handler_disconnect (self->priv->conference->rtpbin,
request_rtp_decoder_id);
g_signal_handler_disconnect (self->priv->conference->rtpbin,
request_rtcp_encoder_id);
g_signal_handler_disconnect (self->priv->conference->rtpbin,
request_rtcp_decoder_id);
if (!self->priv->rtpbin_recv_rtp_sink)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get recv_rtp_sink_%u request pad from the rtpbin",
self->id);
return;
}
if (!self->priv->rtpbin_recv_rtcp_sink)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get recv_rtcp_sink_%u request pad from the rtpbin",
self->id);
return;
}
if (!self->priv->rtpbin_send_rtp_sink)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get send_rtp_sink_%u request pad from the rtpbin",
self->id);
return;
}
if (!self->priv->rtpbin_send_rtcp_src)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get send_rtcp_src_%u request pad from the rtpbin",
self->id);
return;
}
self->priv->send_tee_discovery_pad = gst_element_get_request_pad (tee,
"src_%u");
self->priv->send_tee_media_pad = gst_element_get_request_pad (tee,
"src_%u");
if (!self->priv->send_tee_discovery_pad || !self->priv->send_tee_media_pad)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the send tee request src pads");
}
tmp = g_strdup_printf ("valve_discovery_%u", self->id);
self->priv->discovery_valve = gst_element_factory_make ("valve", tmp);
g_free (tmp);
if (!self->priv->discovery_valve)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the valve element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference),
self->priv->discovery_valve))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the valve element to the FsRtpConference");
gst_object_unref (valve);
return;
}
if (!gst_element_sync_state_with_parent (self->priv->discovery_valve))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not sync the discovery valve's state with its parent");
return;
}
pad = gst_element_get_static_pad (self->priv->discovery_valve, "sink");
ret = gst_pad_link (self->priv->send_tee_discovery_pad, pad);
gst_object_unref (pad);
if (GST_PAD_LINK_FAILED (ret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link discovery pad to discovery valve");
return;
}
tmp = g_strdup_printf ("valve_send_%u", self->id);
valve = gst_element_factory_make ("valve", tmp);
g_free (tmp);
if (!valve)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the valve element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), valve))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the valve element to the FsRtpConference");
gst_object_unref (valve);
return;
}
g_object_set (G_OBJECT (valve), "drop", TRUE, NULL);
gst_element_set_state (valve, GST_STATE_PLAYING);
self->priv->media_sink_valve = gst_object_ref (valve);
valve_sink_pad = gst_element_get_static_pad (valve, "sink");
if (GST_PAD_LINK_FAILED (gst_pad_link (self->priv->send_tee_media_pad,
valve_sink_pad)))
{
gst_object_unref (valve_sink_pad);
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link send tee and valve");
return;
}
gst_object_unref (valve_sink_pad);
/* Now create the transmitter RTP funnel */
tmp = g_strdup_printf ("recv_rtp_funnel_%u", self->id);
funnel = gst_element_factory_make ("funnel", tmp);
g_free (tmp);
if (!funnel)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtp funnel element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), funnel))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtp funnel element to the FsRtpConference");
gst_object_unref (funnel);
return;
}
self->priv->transmitter_rtp_funnel = gst_object_ref (funnel);
funnel_src_pad = gst_element_get_static_pad (funnel, "src");
ret = gst_pad_link (funnel_src_pad, self->priv->rtpbin_recv_rtp_sink);
if (GST_PAD_LINK_FAILED (ret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link pad %s with pad %s",
GST_PAD_NAME (funnel_src_pad),
GST_PAD_NAME (self->priv->rtpbin_recv_rtp_sink));
gst_object_unref (funnel_src_pad);
return;
}
gst_object_unref (funnel_src_pad);
gst_element_set_state (funnel, GST_STATE_PLAYING);
/* Now create the transmitter RTCP funnel */
tmp = g_strdup_printf ("recv_rtcp_funnel_%u", self->id);
funnel = gst_element_factory_make ("funnel", tmp);
g_free (tmp);
if (!funnel)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtcp funnel element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), funnel))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtcp funnel element to the FsRtcpConference");
gst_object_unref (funnel);
return;
}
self->priv->transmitter_rtcp_funnel = gst_object_ref (funnel);
funnel_src_pad = gst_element_get_static_pad (funnel, "src");
ret = gst_pad_link (funnel_src_pad, self->priv->rtpbin_recv_rtcp_sink);
if (GST_PAD_LINK_FAILED (ret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link pad %s with pad %s",
GST_PAD_NAME (funnel_src_pad),
GST_PAD_NAME (self->priv->rtpbin_recv_rtcp_sink));
gst_object_unref (funnel_src_pad);
return;
}
gst_object_unref (funnel_src_pad);
gst_element_set_state (funnel, GST_STATE_PLAYING);
/* Lets get the internal RTP session */
g_signal_emit_by_name (self->priv->conference->rtpbin,
"get-internal-session", self->id, &self->priv->rtpbin_internal_session);
if (!self->priv->rtpbin_internal_session)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get the rtpbin's internal session");
return;
}
g_signal_connect (self->priv->rtpbin_internal_session,
"notify::internal-ssrc",
G_CALLBACK (_rtpbin_internal_session_notify_internal_ssrc), self);
g_object_set (self->priv->rtpbin_internal_session,
"favor-new", TRUE,
"bandwidth", (gdouble) 0,
"rtcp-fraction", (gdouble) 0.05,
NULL);
/* Lets now create the RTP muxer */
tmp = g_strdup_printf ("send_rtp_muxer_%u", self->id);
muxer = gst_element_factory_make ("rtpdtmfmux", tmp);
g_free (tmp);
if (!muxer)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtp muxer element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), muxer))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtp muxer element to the FsRtpConference");
gst_object_unref (muxer);
return;
}
self->priv->rtpmuxer = gst_object_ref (muxer);
g_signal_connect_object (self->priv->rtpbin_send_rtp_sink, "notify::caps",
G_CALLBACK (_rtpbin_send_rtp_sink_notify_caps), self, 0);
muxer_src_pad = gst_element_get_static_pad (muxer, "src");
ret = gst_pad_link (muxer_src_pad, self->priv->rtpbin_send_rtp_sink);
if (GST_PAD_LINK_FAILED (ret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link pad %s with pad %s",
GST_PAD_NAME (muxer_src_pad),
GST_PAD_NAME (self->priv->rtpbin_send_rtp_sink));
gst_object_unref (muxer_src_pad);
return;
}
gst_object_unref (muxer_src_pad);
gst_element_set_state (muxer, GST_STATE_PLAYING);
if (self->priv->media_type == FS_MEDIA_TYPE_VIDEO)
{
self->priv->rtp_tfrc = fs_rtp_tfrc_new (self);
g_signal_connect_object (self->priv->rtp_tfrc, "notify::bitrate",
G_CALLBACK (_rtp_tfrc_bitrate_changed), self, 0);
}
self->priv->keyunit_manager = fs_rtp_keyunit_manager_new (
self->priv->rtpbin_internal_session);
/* Now create the transmitter RTP tee */
tmp = g_strdup_printf ("send_rtp_tee_%u", self->id);
tee = gst_element_factory_make ("tee", tmp);
g_free (tmp);
if (!tee)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtp tee element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), tee))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtp tee element to the FsRtpConference");
gst_object_unref (tee);
return;
}
gst_element_set_state (tee, GST_STATE_PLAYING);
self->priv->transmitter_rtp_tee = gst_object_ref (tee);
tmp = g_strdup_printf ("send_rtp_src_%u", self->id);
if (!gst_element_link_pads (
self->priv->conference->rtpbin, tmp,
self->priv->transmitter_rtp_tee, "sink"))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link rtpbin %s pad to tee sink", tmp);
g_free (tmp);
return;
}
g_free (tmp);
/* Now create the transmitter RTCP tee */
tmp = g_strdup_printf ("send_rtcp_tee_%u", self->id);
tee = gst_element_factory_make ("tee", tmp);
g_free (tmp);
if (!tee)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtcp tee element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), tee))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtcp tee element to the FsRtpConference");
gst_object_unref (tee);
return;
}
gst_element_set_state (tee, GST_STATE_PLAYING);
self->priv->transmitter_rtcp_tee = gst_object_ref (tee);
transmitter_rtcp_tee_sink_pad =
gst_element_get_static_pad (self->priv->transmitter_rtcp_tee, "sink");
g_assert (transmitter_rtcp_tee_sink_pad);
ret = gst_pad_link (self->priv->rtpbin_send_rtcp_src,
transmitter_rtcp_tee_sink_pad);
gst_object_unref (transmitter_rtcp_tee_sink_pad);
if (GST_PAD_LINK_FAILED (ret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link rtpbin network rtcp src to tee");
return;
}
/* Lets now do the send_capsfilter */
tmp = g_strdup_printf ("send_rtp_capsfilter_%u", self->id);
capsfilter = gst_element_factory_make ("capsfilter", tmp);
g_free (tmp);
if (!capsfilter)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not create the rtp capsfilter element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), capsfilter))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the rtp capsfilter element to the FsRtpConference");
gst_object_unref (capsfilter);
return;
}
pad = gst_element_get_static_pad (capsfilter, "src");
g_signal_connect (pad, "notify::caps", G_CALLBACK (_send_caps_changed),
self);
gst_object_unref (pad);
self->priv->send_capsfilter = gst_object_ref (capsfilter);
if (!gst_element_link_pads (capsfilter, "src", muxer, "sink_%u"))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link pad capsfilter src pad to the rtpmux");
return;
}
gst_element_set_state (capsfilter, GST_STATE_PLAYING);
if (!fs_rtp_session_update_codecs (self, NULL, NULL,
&self->priv->construction_error))
{
g_assert (self->priv->construction_error);
return;
}
if (G_OBJECT_CLASS (fs_rtp_session_parent_class)->constructed)
G_OBJECT_CLASS (fs_rtp_session_parent_class)->constructed(object);
}
#define GET_MEMBER(Type, name) \
Type * \
fs_rtp_session_get_##name (FsRtpSession *self) \
{ \
Type *tmp = self->priv->name; \
\
if (tmp) \
gst_object_ref (tmp); \
return tmp; \
}
GET_MEMBER (FsRtpConference, conference)
GET_MEMBER (GstPad, rtpbin_recv_rtp_sink)
GET_MEMBER (GstPad, rtpbin_recv_rtcp_sink)
GET_MEMBER (GObject, rtpbin_internal_session)
GET_MEMBER (GstElement, rtpmuxer)
#undef GET_MEMBER
static gboolean
fs_rtp_session_add_ssrc_stream_locked (FsRtpSession *self, guint32 ssrc,
FsRtpStream *stream)
{
if (!g_hash_table_lookup (self->priv->ssrc_streams, GUINT_TO_POINTER (ssrc)))
{
g_hash_table_insert (self->priv->ssrc_streams, GUINT_TO_POINTER (ssrc),
stream);
if (self->priv->srtpdec)
g_signal_emit_by_name (self->priv->srtpdec, "remove-key", ssrc);
return TRUE;
} else {
return FALSE;
}
}
static void
_stream_known_source_packet_received (FsRtpStream *stream, guint component,
GstBuffer *buffer, gpointer user_data)
{
guint32 ssrc;
FsRtpSession *self = FS_RTP_SESSION_CAST (user_data);
gboolean valid = FALSE;
GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
{
ssrc = gst_rtp_buffer_get_ssrc (&rtpbuffer);
gst_rtp_buffer_unmap (&rtpbuffer);
valid = TRUE;
}
else
{
GstRTCPPacket rtcppacket;
GstRTCPBuffer rtcpbuffer = GST_RTCP_BUFFER_INIT;
if (gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcpbuffer))
{
if (gst_rtcp_buffer_get_first_packet (&rtcpbuffer, &rtcppacket))
{
GstRTCPType type;
do {
type = gst_rtcp_packet_get_type (&rtcppacket);
switch (type) {
case GST_RTCP_TYPE_RR:
ssrc = gst_rtcp_packet_rr_get_ssrc (&rtcppacket);
valid = TRUE;
break;
case GST_RTCP_TYPE_SR:
gst_rtcp_packet_sr_get_sender_info (&rtcppacket, &ssrc, NULL,
NULL, NULL, NULL);
valid = TRUE;
break;
case GST_RTCP_TYPE_SDES:
ssrc = gst_rtcp_packet_sdes_get_ssrc (&rtcppacket);
valid = TRUE;
break;
default:
break;
}
} while (valid == FALSE && type != GST_RTCP_TYPE_INVALID &&
gst_rtcp_packet_move_to_next (&rtcppacket));
}
gst_rtcp_buffer_unmap (&rtcpbuffer);
}
}
if (!valid)
{
fs_rtp_session_has_disposed_exit (self);
return;
}
FS_RTP_SESSION_LOCK (self);
if (fs_rtp_session_add_ssrc_stream_locked (self, ssrc, stream))
{
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_session_associate_free_substreams (self, stream, ssrc);
}
else
{
FS_RTP_SESSION_UNLOCK (self);
}
fs_rtp_session_has_disposed_exit (self);
}
static void
_stream_sending_changed_locked (FsRtpStream *stream, gboolean sending,
gpointer user_data)
{
FsRtpSession *session = user_data;
if (sending)
session->priv->streams_sending++;
else
session->priv->streams_sending--;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
if (session->priv->streams_sending && session->priv->send_codecbin &&
g_hash_table_size (session->priv->transmitters))
g_object_set (session->priv->media_sink_valve, "drop", FALSE, NULL);
else
g_object_set (session->priv->media_sink_valve, "drop", TRUE, NULL);
if (session->priv->rtp_tfrc)
g_object_set (session->priv->rtp_tfrc, "sending",
(session->priv->streams_sending > 0), NULL);
fs_rtp_session_has_disposed_exit (session);
}
static void
_stream_ssrc_added_cb (FsRtpStream *stream, guint32 ssrc, gpointer user_data)
{
FsRtpSession *self = user_data;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
FS_RTP_SESSION_LOCK (self);
fs_rtp_session_add_ssrc_stream_locked (self, ssrc, stream);
g_hash_table_insert (self->priv->ssrc_streams_manual, GUINT_TO_POINTER (ssrc),
stream);
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_session_associate_free_substreams (self, stream, ssrc);
fs_rtp_session_has_disposed_exit (self);
}
static gboolean
_remove_stream_from_ht (gpointer key, gpointer value, gpointer user_data)
{
return (value == user_data);
}
static void
_remove_stream (gpointer user_data,
GObject *where_the_object_was)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
FS_RTP_SESSION_LOCK (self);
self->priv->streams =
g_list_remove_all (self->priv->streams, where_the_object_was);
self->priv->streams_cookie++;
g_hash_table_foreach_remove (self->priv->ssrc_streams, _remove_stream_from_ht,
where_the_object_was);
g_hash_table_foreach_remove (self->priv->ssrc_streams_manual,
_remove_stream_from_ht, where_the_object_was);
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_session_has_disposed_exit (self);
}
/**
* fs_rtp_session_new_stream:
* @session: an #FsRtpSession
* @participant: #FsParticipant of a participant for the new stream
* @direction: #FsStreamDirection describing the direction of the new stream that will
* be created for this participant
* @error: location of a #GError, or NULL if no error occured
*
* This function creates a stream for the given participant into the active session.
*
* Returns: the new #FsStream that has been created. User must unref the
* #FsStream when the stream is ended. If an error occured, returns NULL.
*/
static FsStream *
fs_rtp_session_new_stream (FsSession *session,
FsParticipant *participant,
FsStreamDirection direction,
GError **error)
{
FsRtpSession *self = FS_RTP_SESSION (session);
FsRtpParticipant *rtpparticipant = NULL;
FsStream *new_stream = NULL;
if (!FS_IS_RTP_PARTICIPANT (participant))
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"You have to provide a participant of type RTP");
return NULL;
}
if (fs_rtp_session_has_disposed_enter (self, error))
return NULL;
rtpparticipant = FS_RTP_PARTICIPANT (participant);
new_stream = FS_STREAM_CAST (fs_rtp_stream_new (self, rtpparticipant,
direction, _stream_new_remote_codecs,
_stream_known_source_packet_received,
_stream_sending_changed_locked,
_stream_ssrc_added_cb,
_stream_get_new_stream_transmitter,
_stream_decrypt_clear_locked_cb,
self));
if (new_stream)
{
FS_RTP_SESSION_LOCK (self);
self->priv->streams = g_list_append (self->priv->streams, new_stream);
self->priv->streams_cookie++;
FS_RTP_SESSION_UNLOCK (self);
}
g_object_weak_ref (G_OBJECT (new_stream), _remove_stream, self);
fs_rtp_session_has_disposed_exit (self);
return new_stream;
}
static GstEvent *
fs_rtp_session_set_next_telephony_method (FsRtpSession *self,
gint method)
{
GstEvent *event;
GstStructure *s;
gboolean start;
FS_RTP_SESSION_LOCK (self);
event = g_queue_peek_tail (&self->priv->telephony_events);
if (gst_structure_get_boolean (gst_event_get_structure (event),
"start", &start) && !start)
goto out;
g_queue_pop_tail (&self->priv->telephony_events);
event = GST_EVENT (gst_mini_object_make_writable (GST_MINI_OBJECT (event)));
s = (GstStructure *) gst_event_get_structure (event);
gst_structure_set (s, "method", G_TYPE_INT, method, NULL);
g_queue_push_tail (&self->priv->telephony_events, event);
out:
gst_event_ref (event);
self->priv->telephony_event_running = TRUE;
FS_RTP_SESSION_UNLOCK (self);
return event;
}
static void
fs_rtp_session_try_sending_dtmf_event (FsRtpSession *self)
{
GstElement *rtpmuxer = NULL;
GstPad *pad;
gboolean ret = FALSE;
GstEvent *event;
FS_RTP_SESSION_LOCK (self);
if (self->priv->telephony_event_running ||
g_queue_get_length (&self->priv->telephony_events) == 0)
{
FS_RTP_SESSION_UNLOCK (self);
return;
}
g_assert (self->priv->rtpmuxer);
rtpmuxer = gst_object_ref (self->priv->rtpmuxer);
FS_RTP_SESSION_UNLOCK (self);
pad = gst_element_get_static_pad (rtpmuxer, "src");
event = fs_rtp_session_set_next_telephony_method (self, 1);
ret = gst_pad_send_event (pad, event);
if (!ret)
{
event = fs_rtp_session_set_next_telephony_method (self, 2);
ret = gst_pad_send_event (pad, event);
}
if (!ret)
{
FS_RTP_SESSION_LOCK (self);
self->priv->telephony_event_running = FALSE;
FS_RTP_SESSION_UNLOCK (self);
}
gst_object_unref (pad);
gst_object_unref (rtpmuxer);
}
static gboolean
fs_rtp_session_check_telephony_event_queue_start_locked (FsRtpSession *self,
gboolean desired_start)
{
GstEvent *event = g_queue_peek_head (&self->priv->telephony_events);
if (event)
{
const GstStructure *s = gst_event_get_structure (event);
gboolean start;
if (gst_structure_get_boolean (s, "start", &start) &&
start != desired_start)
{
GST_WARNING ("Tried to start an event while another is playing");
return FALSE;
}
}
return TRUE;
}
/**
* fs_rtp_session_start_telephony_event:
* @session: an #FsRtpSession
* @event: A #FsStreamDTMFEvent or another number defined at
* http://www.iana.org/assignments/audio-telephone-event-registry
* @volume: The volume in dBm0 without the negative sign. Should be between
* 0 and 36. Higher values mean lower volume
*
* This function will start sending a telephony event (such as a DTMF
* tone) on the #FsRtpSession. You have to call the function
* #fs_rtp_session_stop_telephony_event () to stop it.
*
* Returns: %TRUE if sucessful, it can return %FALSE if the #FsStream
* does not support this telephony event.
*/
static gboolean
fs_rtp_session_start_telephony_event (FsSession *session, guint8 event,
guint8 volume)
{
FsRtpSession *self = FS_RTP_SESSION (session);
gboolean ret = FALSE;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return FALSE;
FS_RTP_SESSION_LOCK (self);
if (!fs_rtp_session_check_telephony_event_queue_start_locked (self, FALSE))
{
GST_WARNING ("Tried to start an event without stopping the previous one");
goto out;
}
GST_DEBUG ("sending telephony event %d", event);
g_queue_push_head (&self->priv->telephony_events,
gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("dtmf-event",
"number", G_TYPE_INT, event,
"volume", G_TYPE_INT, volume,
"start", G_TYPE_BOOLEAN, TRUE,
"type", G_TYPE_INT, 1,
NULL)));
ret = TRUE;
out:
FS_RTP_SESSION_UNLOCK (self);
if (ret)
fs_rtp_session_try_sending_dtmf_event (self);
fs_rtp_session_has_disposed_exit (self);
return ret;
}
/**
* fs_rtp_session_stop_telephony_event:
* @session: an #FsRtpSession
*
* This function will stop sending a telephony event started by
* #fs_rtp_session_start_telephony_event (). If the event was being sent
* for less than 50ms, it will be sent for 50ms minimum. If the
* duration was a positive and the event is not over, it will cut it
* short.
*
* Returns: %TRUE if sucessful, it can return %FALSE if the #FsRtpSession
* does not support telephony events or if no telephony event is being sent
*/
static gboolean
fs_rtp_session_stop_telephony_event (FsSession *session)
{
FsRtpSession *self = FS_RTP_SESSION (session);
gboolean ret = FALSE;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return FALSE;
FS_RTP_SESSION_LOCK (self);
if (!fs_rtp_session_check_telephony_event_queue_start_locked (self, TRUE))
{
GST_WARNING ("Tried to stop a telephony event without starting one first");
goto out;
}
GST_DEBUG ("stopping telephony event");
g_queue_push_head (&self->priv->telephony_events,
gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("dtmf-event",
"start", G_TYPE_BOOLEAN, FALSE,
"type", G_TYPE_INT, 1,
NULL)));
ret = TRUE;
out:
FS_RTP_SESSION_UNLOCK (self);
if (ret)
fs_rtp_session_try_sending_dtmf_event (self);
fs_rtp_session_has_disposed_exit (self);
return ret;
}
/**
* fs_rtp_session_set_send_codec:
* @session: an #FsRtpSession
* @send_codec: an #FsCodec representing the codec to send
* @error: location of a #GError, or NULL if no error occured
*
* This function will set the currently being sent codec for all streams in this
* session. The given #FsCodec must be taken directly from the #FsSession:codecs
* property of the session. If the given codec is not in the codecs
* list, @error will be set and %FALSE will be returned. The @send_codec will be
* copied so it must be free'd using fs_codec_destroy () when done.
*
* Returns: %FALSE if the send codec couldn't be set.
*/
static gboolean
fs_rtp_session_set_send_codec (FsSession *session, FsCodec *send_codec,
GError **error)
{
FsRtpSession *self = FS_RTP_SESSION (session);
gboolean ret;
if (fs_rtp_session_has_disposed_enter (self, error))
return FALSE;
FS_RTP_SESSION_LOCK (self);
if (lookup_codec_association_by_codec_for_sending (
self->priv->codec_associations, send_codec))
{
if (self->priv->requested_send_codec)
fs_codec_destroy (self->priv->requested_send_codec);
self->priv->requested_send_codec = fs_codec_copy (send_codec);
fs_rtp_session_verify_send_codec_bin_locked (self);
ret = TRUE;
}
else
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"The passed codec is not part of the list of codecs");
ret = FALSE;
}
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_session_has_disposed_exit (self);
return ret;
}
static gboolean
fs_rtp_session_set_codec_preferences (FsSession *session,
GList *codec_preferences,
GError **error)
{
FsRtpSession *self = FS_RTP_SESSION (session);
GList *old_codec_prefs = NULL;
GList *new_codec_prefs = NULL;
gboolean ret;
guint current_generation;
if (fs_rtp_session_has_disposed_enter (self, error))
return FALSE;
new_codec_prefs =
validate_codecs_configuration (
self->priv->media_type, self->priv->blueprints,
codec_preferences);
if (new_codec_prefs == NULL)
GST_DEBUG ("None of the new codec preferences passed are usable,"
" this will restore the original list of detected codecs");
FS_RTP_SESSION_LOCK (self);
old_codec_prefs = self->priv->codec_preferences;
self->priv->codec_preferences = new_codec_prefs;
current_generation = self->priv->codec_preferences_generation;
self->priv->codec_preferences_generation++;
FS_RTP_SESSION_UNLOCK (self);
ret = fs_rtp_session_update_codecs (self, NULL, NULL, error);
if (ret)
{
g_list_free_full (old_codec_prefs,
(GDestroyNotify) codec_preference_destroy);
g_object_notify ((GObject*) self, "codec-preferences");
}
else
{
FS_RTP_SESSION_LOCK (self);
if (self->priv->codec_preferences_generation == current_generation)
{
g_list_free_full (self->priv->codec_preferences,
(GDestroyNotify) codec_preference_destroy);
self->priv->codec_preferences = old_codec_prefs;
self->priv->codec_preferences_generation++;
}
else
{
g_list_free_full (old_codec_prefs,
(GDestroyNotify) codec_preference_destroy);
}
FS_RTP_SESSION_UNLOCK (self);
GST_WARNING ("Invalid new codec preferences");
}
fs_rtp_session_has_disposed_exit (self);
return ret;
}
FsRtpSession *
fs_rtp_session_new (FsMediaType media_type, FsRtpConference *conference,
guint id, GError **error)
{
FsRtpSession *session = g_object_new (FS_TYPE_RTP_SESSION,
"media-type", media_type,
"conference", conference,
"id", id,
NULL);
if (session->priv->construction_error)
{
g_propagate_error (error, session->priv->construction_error);
g_object_unref (session);
return NULL;
}
return session;
}
GstCaps *
fs_rtp_session_request_pt_map (FsRtpSession *session, guint pt)
{
GstCaps *caps = NULL;
CodecAssociation *ca = NULL;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return NULL;
FS_RTP_SESSION_LOCK (session);
ca = lookup_codec_association_by_pt (
session->priv->codec_associations, pt);
if (ca)
{
FsCodec *tmpcodec = codec_copy_filtered (ca->codec, FS_PARAM_TYPE_CONFIG);
caps = fs_codec_to_gst_caps (tmpcodec);
fs_codec_destroy (tmpcodec);
}
FS_RTP_SESSION_UNLOCK (session);
if (!caps)
GST_WARNING ("Could not get caps for payload type %u in session %d",
pt, session->id);
fs_rtp_session_has_disposed_exit (session);
return caps;
}
static gboolean
_get_request_pad_and_link (GstElement *tee_funnel, const gchar *tee_funnel_name,
GstElement *sinksrc, const gchar *sinksrc_padname, GstPadDirection direction,
GError **error)
{
GstPad *requestpad = NULL;
GstPad *transpad = NULL;
GstPadLinkReturn ret;
gchar *requestpad_name = (direction == GST_PAD_SINK) ? "src_%u" : "sink_%u";
/* The transmitter will only be removed when the whole session is disposed,
* then the
*/
requestpad = gst_element_get_request_pad (tee_funnel, requestpad_name);
if (!requestpad)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Can not get the %s pad from the transmitter %s element",
requestpad_name, tee_funnel_name);
return FALSE;
}
transpad = gst_element_get_static_pad (sinksrc, sinksrc_padname);
if (direction == GST_PAD_SINK)
ret = gst_pad_link (requestpad, transpad);
else
ret = gst_pad_link (transpad, requestpad);
gst_object_unref (requestpad);
gst_object_unref (transpad);
if (GST_PAD_LINK_FAILED (ret))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Can not link the %s to the transmitter %s", tee_funnel_name,
(direction == GST_PAD_SINK) ? "sink" : "src");
return FALSE;
}
return TRUE;
}
static void
_transmitter_error (
FsStreamTransmitter *stream_transmitter,
gint errorno,
gchar *error_msg,
gpointer user_data)
{
FsSession *session = FS_SESSION (user_data);
fs_session_emit_error (session, errorno, error_msg);
}
static gboolean
fs_rtp_session_add_transmitter_gst_sink (FsRtpSession *self,
FsTransmitter *transmitter,
GError **error)
{
GstElement *sink;
g_object_get (transmitter, "gst-sink", &sink, NULL);
if (!gst_bin_add (GST_BIN (self->priv->conference), sink))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the transmitter sink for %s to the conference",
G_OBJECT_TYPE_NAME(transmitter));
goto error;
}
gst_element_sync_state_with_parent (sink);
if (!_get_request_pad_and_link (self->priv->transmitter_rtp_tee,
"rtp tee", sink, "sink_1", GST_PAD_SINK, error))
goto error;
if (!_get_request_pad_and_link (self->priv->transmitter_rtcp_tee,
"rtcp tee", sink, "sink_2", GST_PAD_SINK, error))
goto error;
gst_object_unref (sink);
return TRUE;
error:
if (sink)
gst_object_unref (sink);
return FALSE;
}
/**
* fs_rtp_session_get_transmitter:
* @self: a #FsRtpSession
* @transmitter_name: The name of the transmitter
* @error: a #GError or %NULL
*
* Returns the requested #FsTransmitter, possibly creating it if it
* does not exist.
*
* Returns: a #FsTransmitter or %NULL on error
*/
static FsTransmitter *
fs_rtp_session_get_transmitter (FsRtpSession *self,
const gchar *transmitter_name,
GError **error)
{
FsTransmitter *transmitter;
GstElement *src = NULL;
guint tos;
FS_RTP_SESSION_LOCK (self);
transmitter = g_hash_table_lookup (self->priv->transmitters,
transmitter_name);
if (transmitter)
{
g_object_ref (transmitter);
FS_RTP_SESSION_UNLOCK (self);
return transmitter;
}
tos = self->priv->tos;
FS_RTP_SESSION_UNLOCK (self);
transmitter = fs_transmitter_new (transmitter_name, 2, tos, error);
if (!transmitter)
return NULL;
g_signal_connect (transmitter, "error", G_CALLBACK (_transmitter_error),
self);
if (!fs_rtp_session_add_transmitter_gst_sink (self, transmitter, error))
goto error;
g_object_get (transmitter, "gst-src", &src, NULL);
if (!gst_bin_add (GST_BIN (self->priv->conference), src))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the transmitter src for %s to the conference",
transmitter_name);
goto error;
}
if (!_get_request_pad_and_link (self->priv->transmitter_rtp_funnel,
"rtp funnel", src, "src_1", GST_PAD_SRC, error))
goto error;
if (!_get_request_pad_and_link (self->priv->transmitter_rtcp_funnel,
"rtcp funnel", src, "src_2", GST_PAD_SRC, error))
goto error;
gst_element_sync_state_with_parent (src);
FS_RTP_SESSION_LOCK (self);
/* Check if two were added at the same time */
if (g_hash_table_lookup (self->priv->transmitters, transmitter_name))
{
FS_RTP_SESSION_UNLOCK (self);
gst_element_set_locked_state (src, TRUE);
gst_element_set_state (src, GST_STATE_NULL);
goto error;
}
g_object_ref (transmitter);
g_hash_table_insert (self->priv->transmitters, g_strdup (transmitter_name),
transmitter);
FS_RTP_SESSION_UNLOCK (self);
gst_object_unref (src);
return transmitter;
/*
* TODO:
* The transmitters sink/sources should be cleanly removed if there is
* an error
*/
error:
if (src)
gst_object_unref (src);
if (transmitter)
g_object_unref (transmitter);
return NULL;
}
static FsStreamTransmitter *
_stream_get_new_stream_transmitter (FsRtpStream *stream,
FsParticipant *participant,
const gchar *transmitter_name,
GParameter *parameters,
guint n_parameters,
GError **error,
gpointer user_data)
{
FsTransmitter *transmitter;
FsStreamTransmitter *st = NULL;
FsRtpSession *self = user_data;
if (fs_rtp_session_has_disposed_enter (self, error))
return NULL;
transmitter = fs_rtp_session_get_transmitter (self, transmitter_name, error);
if (!transmitter)
{
fs_rtp_session_has_disposed_exit (self);
return NULL;
}
st = fs_transmitter_new_stream_transmitter (transmitter, participant,
n_parameters, parameters, error);
g_object_unref (transmitter);
fs_rtp_session_has_disposed_exit (self);
return st;
}
/**
* fs_rtp_session_get_stream_by_ssrc_locked
* @self: The #FsRtpSession
* @stream_ssrc: The stream ssrc
*
* Gets the #FsRtpStream for the SSRC or %NULL if it doesnt exist
*
* Return value: A #FsRtpStream (unref after use) or %NULL if it doesn't exist
*/
static FsRtpStream *
fs_rtp_session_get_stream_by_ssrc_locked (FsRtpSession *self,
guint32 ssrc)
{
FsRtpStream *stream = NULL;
stream = g_hash_table_lookup (self->priv->ssrc_streams,
GUINT_TO_POINTER (ssrc));
if (stream)
g_object_ref (stream);
return stream;
}
/**
* fs_rtp_session_verify_recv_codecs_locked
* @session: A #FsRtpSession
*
* Verifies that the various substreams still have the right codec, otherwise
* re-sets it.
*/
static void
fs_rtp_session_verify_recv_codecs_locked (FsRtpSession *session)
{
GList *item, *item2;
for (item = g_list_first (session->priv->free_substreams);
item;
item = g_list_next (item))
fs_rtp_sub_stream_verify_codec_locked (item->data);
for (item = g_list_first (session->priv->streams);
item;
item = g_list_next (item))
{
FsRtpStream *stream = item->data;
for (item2 = g_list_first (stream->substreams);
item2;
item2 = g_list_next (item2))
fs_rtp_sub_stream_verify_codec_locked (item2->data);
}
}
/**
* fs_rtp_session_distribute_recv_codecs_locked:
* @session: a #FsRtpSession
* @force_stream: The #FsRtpStream to which the new remote codecs belong
* @forced_remote_codecs: The #GList of remote codecs to use for that stream
*
* This function distributes the codecs to the streams including their
* own config data.
*
* If a stream is specified, it will use the specified remote codecs
* instead of the ones currently in the stream.
*/
static void
fs_rtp_session_distribute_recv_codecs_locked (FsRtpSession *session,
FsRtpStream *force_stream,
GList *forced_remote_codecs)
{
GList *item = NULL;
guint cookie;
restart:
cookie = session->priv->streams_cookie;
for (item = session->priv->streams;
item;
item = g_list_next (item))
{
FsRtpStream *stream = item->data;
GList *remote_codecs = NULL;
if (stream == force_stream)
remote_codecs = forced_remote_codecs;
else
remote_codecs = stream->remote_codecs;
if (remote_codecs)
{
GList *new_codecs = codec_associations_to_codecs (
session->priv->codec_associations, FALSE);
GList *item2 = NULL;
for (item2 = new_codecs;
item2;
item2 = g_list_next (item2))
{
FsCodec *codec = item2->data;
GList *item3 = NULL;
FsCodec *remote_codec = NULL;
for (item3 = remote_codecs; item3; item3 = g_list_next (item3))
{
FsCodec *tmpcodec = NULL;
remote_codec = item3->data;
tmpcodec = sdp_negotiate_codec (codec, FS_PARAM_TYPE_RECV,
remote_codec, FS_PARAM_TYPE_RECV | FS_PARAM_TYPE_CONFIG);
if (tmpcodec)
{
fs_codec_destroy (tmpcodec);
break;
}
}
if (item3 == NULL)
remote_codec = NULL;
GST_LOG ("Adding codec to stream %p " FS_CODEC_FORMAT, stream,
FS_CODEC_ARGS (codec));
if (remote_codec)
{
for (item3 = remote_codec->optional_params; item3;
item3 = g_list_next (item3))
{
FsCodecParameter *param = item3->data;
if (codec_has_config_data_named (codec, param->name))
{
GST_LOG ("Adding parameter to stream %p %s=%s", stream,
param->name, param->value);
fs_codec_add_optional_parameter (codec, param->name,
param->value);
}
}
}
}
/* This function unlocks the lock, so we have to check the cookie
* when we come back */
g_object_ref (stream);
fs_rtp_stream_set_negotiated_codecs_unlock (stream, new_codecs);
g_object_unref (stream);
FS_RTP_SESSION_LOCK (session);
if (cookie != session->priv->streams_cookie)
goto restart;
}
}
}
/**
* fs_rtp_session_negotiate_codecs_locked:
* @session: a #FsRtpSession
* @stream: The #FsRtpStream to which the new remote codecs belong
* @remote_codecs: The #GList of remote codecs to use for that stream
* @has_remotes: Set to %TRUE if at least one stream has remote codecs
* set to %FALSE otherwise
* @is_new: Set to %TRUE if the codecs associations have changed
*
* Negotiates the codecs using the current (stored) codecs
* and the remote codecs from each stream.
* If a stream is specified, it will use the specified remote codecs
* instead of the ones currently in the stream
*
* Returns: %TRUE if a new list could be negotiated, otherwise %FALSE and sets
* @error
*/
static gboolean
fs_rtp_session_negotiate_codecs_locked (FsRtpSession *session,
FsRtpStream *stream,
GList *remote_codecs,
gboolean *has_remotes,
gboolean *is_new,
GError **error)
{
gint streams_with_codecs = 0;
gboolean has_many_streams = FALSE;
GList *new_negotiated_codec_associations = NULL;
GList *item;
guint8 hdrext_used_ids[8];
GList *new_hdrexts = NULL;
*has_remotes = FALSE;
for (item = g_list_first (session->priv->streams);
item;
item = g_list_next (item))
{
FsRtpStream *mystream = item->data;
if (mystream == stream)
{
if (remote_codecs)
streams_with_codecs ++;
}
else if (mystream->remote_codecs)
{
streams_with_codecs ++;
}
}
if (streams_with_codecs >= 2)
has_many_streams = TRUE;
new_negotiated_codec_associations = create_local_codec_associations (
session->priv->blueprints, session->priv->codec_preferences,
session->priv->codec_associations, session->priv->input_caps,
session->priv->output_caps);
if (!new_negotiated_codec_associations)
{
g_set_error (error, FS_ERROR, FS_ERROR_NO_CODECS_LEFT,
"Codec config would leave no valid local codecs");
goto error;
}
new_hdrexts = create_local_header_extensions (
session->priv->hdrext_negotiated, session->priv->hdrext_preferences,
hdrext_used_ids);
for (item = g_list_first (session->priv->streams);
item;
item = g_list_next (item))
{
FsRtpStream *mystream = item->data;
GList *codecs = NULL;
if (mystream == stream)
codecs = remote_codecs;
else
codecs = mystream->remote_codecs;
if (codecs)
{
GList *tmp_codec_associations = NULL;
*has_remotes = TRUE;
tmp_codec_associations = negotiate_stream_codecs (codecs,
new_negotiated_codec_associations, has_many_streams);
codec_association_list_destroy (new_negotiated_codec_associations);
new_negotiated_codec_associations = tmp_codec_associations;
if (!new_negotiated_codec_associations)
break;
new_hdrexts = negotiate_stream_header_extensions (new_hdrexts,
mystream->hdrext, !has_many_streams, hdrext_used_ids);
}
}
if (!new_negotiated_codec_associations)
{
g_set_error (error, FS_ERROR, FS_ERROR_NEGOTIATION_FAILED,
"There was no intersection between the remote codecs"
" and the local ones");
goto error;
}
new_negotiated_codec_associations = finish_codec_negotiation (
session->priv->codec_associations,
new_negotiated_codec_associations);
new_negotiated_codec_associations =
fs_rtp_special_sources_negotiation_filter (
new_negotiated_codec_associations);
fs_rtp_tfrc_filter_codecs (&new_negotiated_codec_associations,
&new_hdrexts);
if (session->priv->codec_associations)
*is_new = ! codec_associations_list_are_equal (
session->priv->codec_associations, new_negotiated_codec_associations);
codec_association_list_destroy (session->priv->codec_associations);
session->priv->codec_associations = new_negotiated_codec_associations;
new_hdrexts = finish_header_extensions_nego (new_hdrexts, hdrext_used_ids);
fs_rtp_header_extension_list_destroy (session->priv->hdrext_negotiated);
session->priv->hdrext_negotiated = new_hdrexts;
return TRUE;
error:
fs_rtp_header_extension_list_destroy (new_hdrexts);
return FALSE;
}
/**
* fs_rtp_session_update_codecs:
* @session: a #FsRtpSession
* @stream: The #FsRtpStream to which the new remote codecs belong
* @remote_codecs: The #GList of remote codecs to use for that stream
*
* Negotiates the codecs using the current (stored) codecs
* and the remote codecs from each stream.
* If a stream is specified, it will use the specified remote codecs
* instead of the ones currently in the stream
*
* MT safe
*
* Returns: TRUE if the negotiation succeeds, FALSE otherwise
*/
static gboolean
fs_rtp_session_update_codecs (FsRtpSession *session,
FsRtpStream *stream,
GList *remote_codecs,
GError **error)
{
gboolean is_new = TRUE;
gboolean has_remotes = FALSE;
FS_RTP_SESSION_LOCK (session);
if (!fs_rtp_session_negotiate_codecs_locked (
session, stream, remote_codecs, &has_remotes, &is_new, error))
{
FS_RTP_SESSION_UNLOCK (session);
return FALSE;
}
if (session->priv->rtp_tfrc)
fs_rtp_tfrc_codecs_updated (session->priv->rtp_tfrc,
session->priv->codec_associations,
session->priv->hdrext_negotiated);
fs_rtp_session_distribute_recv_codecs_locked (session, stream, remote_codecs);
fs_rtp_session_verify_recv_codecs_locked (session);
if (is_new)
g_signal_emit_by_name (session->priv->conference->rtpbin,
"clear-pt-map");
fs_rtp_session_start_codec_param_gathering_locked (session);
if (has_remotes)
{
fs_rtp_session_verify_send_codec_bin_locked (session);
}
FS_RTP_SESSION_UNLOCK (session);
if (is_new)
{
g_object_notify (G_OBJECT (session), "codecs");
g_object_notify (G_OBJECT (session), "codecs-without-config");
gst_element_post_message (GST_ELEMENT (session->priv->conference),
gst_message_new_element (GST_OBJECT (session->priv->conference),
gst_structure_new ("farstream-codecs-changed",
"session", FS_TYPE_SESSION, session,
NULL)));
}
return TRUE;
}
static gboolean
_stream_new_remote_codecs (FsRtpStream *stream,
GList *codecs, GError **error, gpointer user_data)
{
FsRtpSession *session = FS_RTP_SESSION_CAST (user_data);
gboolean ret;
if (fs_rtp_session_has_disposed_enter (session, error))
return FALSE;
ret = fs_rtp_session_update_codecs (session, stream, codecs, error);
fs_rtp_session_has_disposed_exit (session);
return ret;
}
static void
_substream_error (FsRtpSubStream *substream,
gint errorno,
gchar *error_msg,
gchar *debug_msg,
gpointer user_data)
{
FsSession *session = FS_SESSION (user_data);
fs_session_emit_error (session, errorno, error_msg);
}
static void
fs_rtp_session_update_minimum_rtcp_interval (FsRtpSession *self,
FsRtpSubStream *skip_substream)
{
guint min_interval = 5000;
GList *item, *item2;
FS_RTP_SESSION_LOCK (self);
if (self->priv->current_send_codec)
min_interval = MIN (min_interval,
self->priv->current_send_codec->minimum_reporting_interval);
for (item = self->priv->free_substreams; item; item = item->next)
{
FsRtpSubStream *substream = item->data;
if (substream == skip_substream)
continue;
if (substream->codec)
min_interval = MIN (min_interval,
substream->codec->minimum_reporting_interval);
}
for (item2 = self->priv->streams; item2; item2 = item2->next)
{
FsRtpStream *stream = item2->data;
for (item = stream->substreams; item; item = item->next)
{
FsRtpSubStream *substream = item->data;
if (substream == skip_substream)
continue;
if (substream->codec)
min_interval = MIN (min_interval,
substream->codec->minimum_reporting_interval);
}
}
FS_RTP_SESSION_UNLOCK (self);
g_object_set (self->priv->rtpbin_internal_session,
"rtcp-min-interval", (guint64) min_interval * GST_MSECOND, NULL);
}
static void
_substream_unlinked (FsRtpSubStream *substream, gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
fs_rtp_session_update_minimum_rtcp_interval (self, substream);
FS_RTP_SESSION_LOCK (self);
if (g_list_find (self->priv->free_substreams, substream))
{
self->priv->free_substreams = g_list_remove (self->priv->free_substreams,
substream);
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_sub_stream_stop (substream);
g_object_unref (substream);
}
else
{
FS_RTP_SESSION_UNLOCK (self);
}
fs_rtp_session_has_disposed_exit (self);
}
static void
_substream_codec_changed (FsRtpSubStream *substream, FsRtpSession *self)
{
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
fs_rtp_session_update_minimum_rtcp_interval (self, NULL);
fs_rtp_session_has_disposed_exit (self);
}
/**
* fs_rtp_session_new_recv_pad:
* @session: a #FsSession
* @new_pad: the newly created pad
* @ssrc: the ssrc for this new pad
* @pt: the pt for this new pad
*
* This function is called by the #FsRtpConference when a new src pad appears.
* It can will be called on the streaming thread.
*
* MT safe.
*/
void
fs_rtp_session_new_recv_pad (FsRtpSession *session, GstPad *new_pad,
guint32 ssrc, guint pt)
{
FsRtpSubStream *substream = NULL;
FsRtpStream *stream = NULL;
GError *error = NULL;
gint no_rtcp_timeout;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
FS_RTP_SESSION_LOCK (session);
no_rtcp_timeout = session->priv->no_rtcp_timeout;
FS_RTP_SESSION_UNLOCK (session);
substream = fs_rtp_sub_stream_new (session->priv->conference, session,
new_pad, ssrc, pt, no_rtcp_timeout, &error);
if (substream == NULL)
{
g_prefix_error (&error, "Could not create a substream for the new pad: ");
fs_session_emit_error (FS_SESSION (session),
error ? error->code : FS_ERROR_CONSTRUCTION,
error ? error->message : "No error details returned");
g_clear_error (&error);
fs_rtp_session_has_disposed_exit (session);
return;
}
g_signal_connect_object (substream, "get-codec-bin",
G_CALLBACK (_substream_get_codec_bin), session, 0);
g_signal_connect_object (substream, "unlinked",
G_CALLBACK (_substream_unlinked), session, 0);
g_signal_connect_object (substream, "codec-changed",
G_CALLBACK (_substream_codec_changed), session, 0);
/* Lets find the FsRtpStream for this substream, if no Stream claims it
* then we just store it
*/
FS_RTP_SESSION_LOCK (session);
stream = fs_rtp_session_get_stream_by_ssrc_locked (session, ssrc);
if (stream)
GST_DEBUG ("Already have a stream with SSRC %x, using it", ssrc);
/* Add the substream directly if the no_rtcp_timeout is 0 and
* there is only one stream */
if (!stream)
{
if (no_rtcp_timeout == 0 &&
g_list_length (session->priv->streams) == 1)
{
stream = g_object_ref (g_list_first (session->priv->streams)->data);
GST_DEBUG ("No RTCP timeout and only one stream, giving it substream"
" for SSRC %x in session %u", ssrc, session->id);
}
else
{
session->priv->free_substreams =
g_list_prepend (session->priv->free_substreams, substream);
g_signal_connect_object (substream, "error",
G_CALLBACK (_substream_error), session, 0);
if (no_rtcp_timeout > 0)
{
g_signal_connect_object (substream, "no-rtcp-timedout",
G_CALLBACK (_substream_no_rtcp_timedout_cb), session, 0);
GST_DEBUG ("No stream for SSRC %x, waiting for %d ms before associating"
"in session %u", ssrc, no_rtcp_timeout, session->id);
}
else if (no_rtcp_timeout < 0)
{
GST_DEBUG ("No RTCP timeout is < 0, we will wait forever for an"
" RTCP SDES to arrive for SSRC %x in session %u",
ssrc, session->id);
}
else
{
GST_WARNING ("No RTCP timeout is 0, but there is more than one stream,"
" we will wait forever for an RTCP SDES to arrive for SSRC %u in"
" session %u", ssrc, session->id);
}
}
}
if (stream)
{
if (!fs_rtp_stream_add_substream_unlock (stream, substream, &error))
{
g_prefix_error (&error,
"Could not add the output ghostpad to the new substream: ");
fs_session_emit_error (FS_SESSION (session), error->code,
error->message);
}
g_clear_error (&error);
}
else
{
fs_rtp_sub_stream_verify_codec_locked (substream);
FS_RTP_SESSION_UNLOCK (session);
}
if (stream)
g_object_unref (stream);
fs_rtp_session_has_disposed_exit (session);
}
static gboolean
validate_src_pads (const GValue *item, GValue *ret, gpointer user_data)
{
GstPad *pad = g_value_get_object (item);
GList *codecs = user_data;
GstCaps *caps;
GList *listitem = NULL;
gboolean retval = FALSE;
caps = gst_pad_query_caps (pad, NULL);
if (gst_caps_is_empty (caps))
{
GST_WARNING_OBJECT (pad, "Caps on pad are empty");
goto error;
}
for (listitem = codecs; listitem; listitem = g_list_next (listitem))
{
FsCodec *codec = listitem->data;
GstCaps *tmpcaps = fs_codec_to_gst_caps (codec);
if (gst_caps_can_intersect (tmpcaps, caps))
{
GST_LOG_OBJECT (pad, "Pad matches " FS_CODEC_FORMAT,
FS_CODEC_ARGS (codec));
retval = TRUE;
}
gst_caps_unref (tmpcaps);
if (retval)
break;
}
error:
gst_caps_unref (caps);
if (!retval)
g_value_set_boolean (ret, FALSE);
return retval;
}
static GstElement *
_create_codec_bin (const CodecAssociation *ca, const FsCodec *codec,
const gchar *name, FsStreamDirection direction, GList *codecs,
guint current_builder_hash, guint *new_builder_hash, GError **error)
{
GstElement *codec_bin = NULL;
const gchar *direction_str;
gchar *profile = NULL;
if (direction == FS_DIRECTION_SEND)
{
direction_str = "send";
profile = ca->send_profile;
}
else if (direction == FS_DIRECTION_RECV)
{
direction_str = "receive";
profile = ca->recv_profile;
}
else
{
g_assert_not_reached ();
}
if (profile)
{
GError *tmperror = NULL;
guint src_pad_count = 0, sink_pad_count = 0;
/* Return nothing if the builder hash is the same, it would just return
* the same thing
*/
if (new_builder_hash)
{
*new_builder_hash = g_str_hash (profile);
if (*new_builder_hash == current_builder_hash)
{
GST_DEBUG ("profile builder hash is the same for "FS_CODEC_FORMAT,
FS_CODEC_ARGS (ca->codec));
return NULL;
}
GST_DEBUG ("profile builder hash is different (new: %u != old: %u)"
" for " FS_CODEC_FORMAT,
*new_builder_hash, current_builder_hash, FS_CODEC_ARGS (ca->codec));
}
codec_bin = parse_bin_from_description_all_linked (profile, direction,
&src_pad_count, &sink_pad_count, &tmperror);
if (codec_bin)
{
if (sink_pad_count != 1 || src_pad_count == 0)
{
GST_ERROR ("Invalid pad count (src:%u sink:%u)"
" from codec profile: %s", src_pad_count, sink_pad_count, profile);
gst_object_unref (codec_bin);
codec_bin = NULL;
goto try_factory;
}
if (codecs && src_pad_count > 1)
{
GstIterator *iter;
GValue valid = {0};
GstIteratorResult res;
iter = gst_element_iterate_src_pads (codec_bin);
g_value_init (&valid, G_TYPE_BOOLEAN);
g_value_set_boolean (&valid, TRUE);
res = gst_iterator_fold (iter, validate_src_pads, &valid,
codecs);
gst_iterator_free (iter);
if (!g_value_get_boolean (&valid) || res == GST_ITERATOR_ERROR)
{
gst_object_unref (codec_bin);
codec_bin = NULL;
goto try_factory;
}
}
GST_DEBUG ("creating %s codec bin for id %d, profile: %s",
direction_str, codec->id, profile);
gst_element_set_name (codec_bin, name);
return codec_bin;
}
else if (!codec_blueprint_has_factory (ca->blueprint, direction))
{
g_propagate_error (error, tmperror);
return NULL;
}
}
try_factory:
if (new_builder_hash)
{
/* If its the same blueprint, it will be the same result,
* so return NULL without an error.
*/
*new_builder_hash = g_direct_hash (ca->blueprint);
if (ca->blueprint && current_builder_hash == *new_builder_hash)
{
GST_DEBUG ("blueprint builder hash is the same for "FS_CODEC_FORMAT,
FS_CODEC_ARGS (ca->codec));
return NULL;
}
GST_DEBUG ("blueprint builder hash is different (new: %u != old: %u)"
" for " FS_CODEC_FORMAT,
*new_builder_hash, current_builder_hash, FS_CODEC_ARGS (ca->codec));
}
if (!ca->blueprint) {
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"Codec Association has neither blueprint nor profile");
return NULL;
}
return create_codec_bin_from_blueprint (codec, ca->blueprint, name,
direction, error);
}
/**
* fs_rtp_session_get_recv_codec_locked:
* @session: a #FsRtpSession
* @pt: The payload type to find the codec for
* @stream: an optional #FsRtpStream for which this data is received
* @recv_codec: The codec one wants to receive one
*
* This function returns the #CodecAssociation that will be used to receive
* data on a specific payload type, optionally from a specific stream.
*
* MUST be called with the FsRtpSession lock held
*
* Returns: a #CodecAssociation, the caller doesn't own it
*/
static CodecAssociation *
fs_rtp_session_get_recv_codec_locked (FsRtpSession *session,
guint pt,
FsRtpStream *stream,
FsCodec **recv_codec,
GError **error)
{
FsCodec *recv_codec_tmp = NULL;
CodecAssociation *ca = NULL;
GList *item = NULL;
if (!session->priv->codec_associations)
{
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"No codecs yet");
return NULL;
}
ca = lookup_codec_association_by_pt (session->priv->codec_associations, pt);
if (!ca)
{
g_set_error (error, FS_ERROR, FS_ERROR_UNKNOWN_CODEC,
"There is no negotiated codec with pt %d", pt);
return NULL;
}
if (stream)
{
for (item = stream->negotiated_codecs; item; item = g_list_next (item))
{
recv_codec_tmp = item->data;
if (recv_codec_tmp->id == pt)
break;
}
if (item)
{
GST_DEBUG ("Receiving on stream codec " FS_CODEC_FORMAT,
FS_CODEC_ARGS (recv_codec_tmp));
}
else
{
GST_DEBUG ("Have stream, but it does not have negotiatied codec");
recv_codec_tmp = NULL;
}
}
if (recv_codec_tmp)
{
*recv_codec = fs_codec_copy (recv_codec_tmp);
}
else
{
*recv_codec = codec_copy_filtered (ca->codec, FS_PARAM_TYPE_CONFIG);
GST_DEBUG ("Receiving on session codec " FS_CODEC_FORMAT,
FS_CODEC_ARGS (ca->codec));
}
return ca;
}
/**
* fs_rtp_session_select_send_codec_locked:
* @session: the #FsRtpSession
*
* This function selects the codec to send using either the user preference
* or the remote preference (from the negotiation).
*
* You MUST own the FsRtpSession mutex to call this function
*
* Returns: a #CodecAssociation, the caller doesn't own it
*/
static CodecAssociation *
fs_rtp_session_select_send_codec_locked (FsRtpSession *session, GError **error)
{
CodecAssociation *ca = NULL;
GList *ca_e = NULL;
if (!session->priv->codec_associations)
{
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"Tried to call fs_rtp_session_select_send_codec_bin before the codec"
" negotiation has taken place");
return NULL;
}
if (session->priv->requested_send_codec)
{
ca = lookup_codec_association_by_codec_for_sending (
session->priv->codec_associations,
session->priv->requested_send_codec);
if (ca)
return ca;
/* The requested send codec no longer exists */
fs_codec_destroy (session->priv->requested_send_codec);
session->priv->requested_send_codec = NULL;
GST_WARNING_OBJECT (session->priv->conference,
"The current requested codec no longer exists, resetting");
}
/*
* We don't have a requested codec, or it was not valid, lets use the first
* codec from the list
*/
for (ca_e = g_list_first (session->priv->codec_associations);
ca_e;
ca_e = g_list_next (ca_e))
{
if (codec_association_is_valid_for_sending (ca_e->data, TRUE))
{
ca = ca_e->data;
break;
}
}
if (ca == NULL)
g_set_error (error, FS_ERROR, FS_ERROR_NEGOTIATION_FAILED,
"Could not get a valid send codec");
return ca;
}
struct link_data {
FsRtpSession *session;
GstCaps *caps;
FsCodec *codec;
GList *all_codecs;
GList *other_codecs;
GError **error;
};
/*
* This is a GstIteratorFoldFunction
* It returns FALSE when it wants to stop the iteration
*/
static gboolean
link_main_pad (const GValue *item, GValue *ret, gpointer user_data)
{
GstPad *pad = g_value_get_object (item);
struct link_data *data = user_data;
GstCaps *caps;
GstPad *other_pad;
caps = gst_pad_query_caps (pad, data->caps);
if (!gst_caps_can_intersect (caps, data->caps))
{
gst_caps_unref (caps);
return TRUE;
}
gst_caps_unref (caps);
other_pad = gst_element_get_static_pad (data->session->priv->send_capsfilter,
"sink");
if (!other_pad)
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not get the sink pad of the send capsfilter");
goto error;
}
if (GST_PAD_LINK_SUCCESSFUL(gst_pad_link (pad, other_pad)))
g_value_set_boolean (ret, TRUE);
else
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the send codec bin for pt %d to the send capsfilter",
data->codec->id);
error:
gst_object_unref (other_pad);
return FALSE;
}
/*
* This is a GstIteratorFoldFunction
* It returns FALSE when it wants to stop the iteration
*/
static gboolean
link_other_pads (const GValue *item, GValue *ret, gpointer user_data)
{
GstPad *pad = g_value_get_object (item);
struct link_data *data = user_data;
GstCaps *caps;
GstCaps *filter_caps = NULL;
GList *listitem;
GstElement *capsfilter;
GstPad *otherpad;
if (gst_pad_is_linked (pad))
{
return TRUE;
}
caps = gst_pad_query_caps (pad, NULL);
if (gst_caps_is_empty (caps))
{
GST_WARNING_OBJECT (pad, "Caps on pad are empty");
return TRUE;
}
for (listitem = data->all_codecs; listitem; listitem = g_list_next (listitem))
{
FsCodec *codec = listitem->data;
filter_caps = fs_codec_to_gst_caps (codec);
if (gst_caps_can_intersect (filter_caps, caps))
{
GST_LOG_OBJECT (pad, "Pad matches " FS_CODEC_FORMAT,
FS_CODEC_ARGS (codec));
break;
}
gst_caps_unref (filter_caps);
}
gst_caps_unref (caps);
if (!listitem)
{
g_set_error (data->error, FS_ERROR, FS_ERROR_INTERNAL,
"Could not find codec that matches the src pad");
g_value_set_boolean (ret, FALSE);
return FALSE;
}
capsfilter = gst_element_factory_make ("capsfilter", NULL);
g_object_set (capsfilter, "caps", filter_caps, NULL);
if (!gst_bin_add (GST_BIN (data->session->priv->conference), capsfilter))
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add send capsfilter to the conference");
gst_object_unref (capsfilter);
goto error;
}
data->session->priv->extra_send_capsfilters =
g_list_append (data->session->priv->extra_send_capsfilters,
capsfilter);
otherpad = gst_element_get_static_pad (capsfilter, "sink");
if (!otherpad)
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not get sink pad on capsfilter");
goto error;
}
if (GST_PAD_LINK_FAILED (gst_pad_link (pad, otherpad)))
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not get sink pad on capsfilter");
gst_object_unref (otherpad);
goto error;
}
gst_object_unref (otherpad);
if (!gst_element_link_pads (capsfilter, NULL,
data->session->priv->rtpmuxer, "sink_%u"))
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not an extra capsfilter to the muxer");
goto error;
}
if (!gst_element_sync_state_with_parent (capsfilter))
{
g_set_error (data->error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the state of the extra send capsfilter"
" with the state of the conference");
goto error;
}
data->other_codecs = g_list_append (data->other_codecs, filter_caps);
return TRUE;
error:
g_value_set_boolean (ret, FALSE);
gst_bin_remove (GST_BIN (data->session->priv->conference), capsfilter);
data->session->priv->extra_send_capsfilters =
g_list_remove (data->session->priv->extra_send_capsfilters,
capsfilter);
gst_caps_unref (filter_caps);
return FALSE;
}
static void
special_source_stopped (FsRtpSpecialSource *source, gpointer data)
{
FsRtpSession *self = FS_RTP_SESSION (data);
if (fs_rtp_session_has_disposed_enter (self, NULL))
return;
fs_rtp_special_sources_remove_finish (&self->priv->extra_sources,
FS_RTP_SESSION_GET_LOCK (self),
source);
fs_rtp_session_has_disposed_exit (self);
}
/*
* @codec: The currently selected codec for sending (but not the send_codec)
*/
static gboolean
fs_rtp_session_remove_send_codec_bin (FsRtpSession *self,
FsCodec *codec,
GstElement *send_codecbin,
gboolean error_emit)
{
FS_RTP_SESSION_LOCK (self);
if (self->priv->send_codecbin || send_codecbin)
{
GstElement *codecbin = self->priv->send_codecbin;
self->priv->send_codecbin = NULL;
FS_RTP_SESSION_UNLOCK (self);
if (!codecbin)
codecbin = send_codecbin;
gst_element_set_locked_state (codecbin, TRUE);
if (gst_element_set_state (codecbin, GST_STATE_NULL) !=
GST_STATE_CHANGE_SUCCESS)
{
gst_element_set_locked_state (codecbin, FALSE);
GST_ERROR ("Could not stop the codec bin, setting it to NULL did not"
" succeed");
if (error_emit)
fs_session_emit_error (FS_SESSION (self), FS_ERROR_INTERNAL,
"Setting the codec bin to NULL did not succeed");
return FALSE;
}
gst_bin_remove (GST_BIN (self->priv->conference), codecbin);
FS_RTP_SESSION_LOCK (self);
}
fs_codec_destroy (self->priv->current_send_codec);
self->priv->current_send_codec = NULL;
FS_RTP_SESSION_UNLOCK (self);
while (self->priv->extra_send_capsfilters)
{
GstElement *cf = self->priv->extra_send_capsfilters->data;
GstPad *ourpad = gst_element_get_static_pad (cf, "src");
GstPad *pad = NULL;
if (ourpad)
{
pad = gst_pad_get_peer (ourpad);
if (pad)
{
gst_pad_set_active (pad, FALSE);
gst_element_release_request_pad (self->priv->rtpmuxer, pad);
gst_object_unref (pad);
}
gst_object_unref (ourpad);
}
gst_element_set_locked_state (cf, TRUE);
gst_element_set_state (cf, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), cf);
self->priv->extra_send_capsfilters = g_list_delete_link (
self->priv->extra_send_capsfilters,
self->priv->extra_send_capsfilters);
}
if (codec)
fs_rtp_special_sources_remove (
&self->priv->extra_sources,
&self->priv->codec_associations,
FS_RTP_SESSION_GET_LOCK (self),
codec,
special_source_stopped, self);
return TRUE;
}
/**
* fs_rtp_session_add_send_codec_bin_unlock:
* @session: a #FsRtpSession
* @ca: the #CodecAssociation to use
*
* This function creates, adds and links a codec bin for the current send remote
* codec
*
* Needs the Session lock to be held. and releases it
*
* Returns: The new codec bin (or NULL if there is an error)
*/
static GstElement *
fs_rtp_session_add_send_codec_bin_unlock (FsRtpSession *session,
const CodecAssociation *ca,
GList **other_codecs,
GError **error)
{
GstElement *codecbin = NULL;
gchar *name;
GstCaps *sendcaps;
GList *codecs;
GstIterator *iter;
GValue link_rv = {0};
struct link_data data;
FsCodec *send_codec_copy = fs_codec_copy (ca->send_codec);
FsCodec *codec_copy = fs_codec_copy (ca->codec);
GST_DEBUG ("Trying to add send codecbin for " FS_CODEC_FORMAT,
FS_CODEC_ARGS (ca->send_codec));
name = g_strdup_printf ("send_%u_%u", session->id, ca->send_codec->id);
codecs = codec_associations_to_send_codecs (
session->priv->codec_associations);
codecbin = _create_codec_bin (ca, ca->send_codec, name, FS_DIRECTION_SEND,
codecs, 0, NULL, error);
g_free (name);
sendcaps = fs_codec_to_gst_caps (ca->send_codec);
if (session->priv->rtp_tfrc &&
fs_rtp_tfrc_is_enabled (session->priv->rtp_tfrc, ca->codec->id))
{
guint bitrate;
g_object_get (session->priv->rtp_tfrc, "bitrate", &bitrate, NULL);
session->priv->send_bitrate = bitrate;
}
if (codecbin)
codecbin_set_bitrate (codecbin, session->priv->send_bitrate);
FS_RTP_SESSION_UNLOCK (session);
if (!codecbin)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not create codec bin for : " FS_CODEC_FORMAT,
FS_CODEC_ARGS (send_codec_copy));
fs_codec_destroy (send_codec_copy);
fs_codec_destroy (codec_copy);
fs_codec_list_destroy (codecs);
return NULL;
}
gst_element_set_locked_state (codecbin, TRUE);
if (!gst_bin_add (GST_BIN (session->priv->conference), codecbin))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the send codec bin for: " FS_CODEC_FORMAT,
FS_CODEC_ARGS (send_codec_copy));
gst_object_unref (codecbin);
fs_codec_list_destroy (codecs);
fs_codec_destroy (send_codec_copy);
fs_codec_destroy (codec_copy);
gst_caps_unref (sendcaps);
return NULL;
}
fs_rtp_keyunit_manager_codecbin_changed (session->priv->keyunit_manager,
codecbin, send_codec_copy);
if (!gst_element_link_pads (session->priv->media_sink_valve, "src",
codecbin, "sink"))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the send codec bin sink pad");
gst_bin_remove (GST_BIN (session->priv->conference), (codecbin));
fs_codec_list_destroy (codecs);
gst_caps_unref (sendcaps);
fs_codec_destroy (codec_copy);
fs_codec_destroy (send_codec_copy);
return NULL;
}
g_object_set (G_OBJECT (session->priv->send_capsfilter),
"caps", sendcaps, NULL);
iter = gst_element_iterate_src_pads (codecbin);
g_value_init (&link_rv, G_TYPE_BOOLEAN);
g_value_set_boolean (&link_rv, FALSE);
data.session = session;
data.caps = sendcaps;
data.all_codecs = codecs;
data.error = error;
data.other_codecs = NULL;
data.codec = send_codec_copy;
if (gst_iterator_fold (iter, link_main_pad, &link_rv, &data) ==
GST_ITERATOR_ERROR)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not iterate over the src pads of the send codec bin to link"
" the main pad for: " FS_CODEC_FORMAT, FS_CODEC_ARGS (send_codec_copy));
gst_iterator_free (iter);
goto error;
}
gst_caps_unref (sendcaps);
if (!g_value_get_boolean (&link_rv))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link codec bin's main pads for : " FS_CODEC_FORMAT,
FS_CODEC_ARGS (send_codec_copy));
gst_iterator_free (iter);
goto error;
}
gst_iterator_resync (iter);
if (gst_iterator_fold (iter, link_other_pads, &link_rv, &data) ==
GST_ITERATOR_ERROR)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not iterate over the src pads of the send codec bin to link"
" the main pad for: " FS_CODEC_FORMAT,
FS_CODEC_ARGS (send_codec_copy));
gst_iterator_free (iter);
goto error;
}
gst_iterator_free (iter);
if (!g_value_get_boolean (&link_rv)) {
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link codec bin's other pads for : " FS_CODEC_FORMAT,
FS_CODEC_ARGS (send_codec_copy));
goto error;
}
gst_element_set_locked_state (codecbin, FALSE);
if (!gst_element_sync_state_with_parent (codecbin))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the state of the codec bin with parent for pt: "
FS_CODEC_FORMAT, FS_CODEC_ARGS (send_codec_copy));
goto error;
}
FS_RTP_SESSION_LOCK (session);
/* Re-set it here in case in changed while we were unlocked */
codecbin_set_bitrate (codecbin, session->priv->send_bitrate);
if (session->priv->streams_sending &&
g_hash_table_size (session->priv->transmitters))
g_object_set (session->priv->media_sink_valve, "drop", FALSE, NULL);
while (data.other_codecs)
{
FsCodec *other_send_codec = data.other_codecs->data;
CodecAssociation *ca;
data.other_codecs = g_list_remove (data.other_codecs, other_send_codec);
ca = lookup_codec_association_by_pt (session->priv->codec_associations,
other_send_codec->id);
if (ca)
*other_codecs = g_list_append (*other_codecs,
fs_codec_copy (ca->codec));
}
session->priv->send_codecbin = codecbin;
session->priv->current_send_codec = codec_copy;
FS_RTP_SESSION_UNLOCK (session);
fs_codec_list_destroy (codecs);
fs_codec_destroy (send_codec_copy);
return codecbin;
error:
g_list_free (data.other_codecs);
fs_rtp_session_remove_send_codec_bin (session, NULL, codecbin, FALSE);
fs_codec_list_destroy (codecs);
fs_codec_destroy (codec_copy);
fs_codec_destroy (send_codec_copy);
return NULL;
}
/**
* _send_src_pad_blocked_callback:
*
* This is the callback for the pad blocking on the media src pad
* It is used to replace the codec bin when the send codec has been changed.
*/
static GstPadProbeReturn
_send_src_pad_blocked_callback (GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
CodecAssociation *ca = NULL;
FsCodec *send_codec_copy = NULL;
FsCodec *codec_copy = NULL;
GError *error = NULL;
gboolean changed = FALSE;
GList *other_codecs = NULL;
if (fs_rtp_session_has_disposed_enter (self, NULL))
{
FS_RTP_SESSION_LOCK (self);
self->priv->send_pad_block_id = 0;
FS_RTP_SESSION_UNLOCK (self);
return GST_PAD_PROBE_REMOVE;
}
FS_RTP_SESSION_LOCK (self);
self->priv->send_pad_block_id = 0;
ca = fs_rtp_session_select_send_codec_locked (self, &error);
if (!ca)
{
g_prefix_error (&error, "Could not select a new send codec: ");
fs_session_emit_error (FS_SESSION (self), error->code, error->message);
goto done_locked;
}
g_clear_error (&error);
send_codec_copy = fs_codec_copy (ca->send_codec);
if (fs_codec_are_equal (ca->codec, self->priv->current_send_codec))
{
codec_copy = fs_codec_copy (ca->codec);
FS_RTP_SESSION_UNLOCK (self);
/* If the main codec has not changed, the special codecs could still
* have changed, so lets try to see if it is necessary to do something
* about it.
*/
changed |= fs_rtp_special_sources_remove (
&self->priv->extra_sources,
&self->priv->codec_associations,
FS_RTP_SESSION_GET_LOCK (self),
codec_copy,
special_source_stopped, self);
goto skip_main_codec;
}
FS_RTP_SESSION_UNLOCK (self);
g_object_set (self->priv->media_sink_valve, "drop", TRUE, NULL);
if (!fs_rtp_session_remove_send_codec_bin (self, send_codec_copy, NULL, TRUE))
goto done;
FS_RTP_SESSION_LOCK (self);
/* We have to re-fetch the ca because we lifted the lock */
fs_codec_destroy (send_codec_copy);
send_codec_copy = NULL;
ca = fs_rtp_session_select_send_codec_locked (self, &error);
if (!ca)
{
g_prefix_error (&error, "Could not select a new send codec: ");
fs_session_emit_error (FS_SESSION (self), error->code,
error->message);
goto done_locked;
}
g_clear_error (&error);
send_codec_copy = fs_codec_copy (ca->send_codec);
codec_copy = fs_codec_copy (ca->codec);
if (!fs_rtp_session_add_send_codec_bin_unlock (self, ca, &other_codecs,
&error))
{
g_prefix_error (&error, "Could not build a new send codec bin: ");
fs_session_emit_error (FS_SESSION (self), error->code,
error->message);
}
changed = TRUE;
skip_main_codec:
changed |= fs_rtp_special_sources_create (
&self->priv->extra_sources,
&self->priv->codec_associations,
FS_RTP_SESSION_GET_LOCK (self),
codec_copy,
GST_ELEMENT (self->priv->conference),
self->priv->rtpmuxer);
if (changed && !error)
{
GList *secondary_codecs;
FS_RTP_SESSION_LOCK (self);
secondary_codecs = fs_rtp_special_sources_get_codecs_locked (
self->priv->extra_sources, self->priv->codec_associations,
codec_copy);
FS_RTP_SESSION_UNLOCK (self);
secondary_codecs = g_list_concat (secondary_codecs, other_codecs);
g_object_notify (G_OBJECT (self), "current-send-codec");
gst_element_post_message (GST_ELEMENT (self->priv->conference),
gst_message_new_element (GST_OBJECT (self->priv->conference),
gst_structure_new ("farstream-send-codec-changed",
"session", FS_TYPE_SESSION, self,
"codec", FS_TYPE_CODEC, codec_copy,
"secondary-codecs", FS_TYPE_CODEC_LIST, secondary_codecs,
NULL)));
fs_codec_list_destroy (secondary_codecs);
fs_rtp_session_try_sending_dtmf_event (self);
}
done:
g_clear_error (&error);
fs_codec_destroy (send_codec_copy);
fs_codec_destroy (codec_copy);
/* If we have a codec bin, the required/preferred caps may have changed,
* in this case, we need to drop the current buffer and wait for a buffer
* with the right caps to come in. Only then can we drop the pad block
*/
fs_rtp_session_has_disposed_exit (self);
return GST_PAD_PROBE_REMOVE;
done_locked:
FS_RTP_SESSION_UNLOCK (self);
goto done;
}
/**
* fs_rtp_session_verify_send_codec_bin_locked:
*
* Verify that the current send codec is still valid and if it is not
* do whats required to have the right one be used.
*
* Does not care about locks
*
*/
static void
fs_rtp_session_verify_send_codec_bin_locked (FsRtpSession *self)
{
if (self->priv->send_pad_block_id == 0)
self->priv->send_pad_block_id =
gst_pad_add_probe (self->priv->send_tee_media_pad,
GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
_send_src_pad_blocked_callback, g_object_ref (self), g_object_unref);
}
/*
* This callback is called when the pad of a substream has been locked because
* the codec needs to be changed.
*
* It will return a new codecbin if it needs changing. If there is an error,
* the GError * will be set.
*/
static GstElement *
_substream_get_codec_bin (FsRtpSubStream *substream,
FsRtpStream *stream, FsCodec **new_codec,
guint current_builder_hash, guint *new_builder_hash,
GError **error, FsRtpSession *session)
{
GstElement *codecbin = NULL;
gchar *name;
CodecAssociation *ca = NULL;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return NULL;
FS_RTP_SESSION_LOCK (session);
ca = fs_rtp_session_get_recv_codec_locked (session, substream->pt, stream,
new_codec, error);
if (!ca)
goto out;
name = g_strdup_printf ("recv_%u_%u_%u", session->id, substream->ssrc,
substream->pt);
codecbin = _create_codec_bin (ca, *new_codec, name, FS_DIRECTION_RECV, NULL,
current_builder_hash, new_builder_hash, error);
g_free (name);
out:
fs_rtp_session_has_disposed_exit (session);
FS_RTP_SESSION_UNLOCK (session);
return codecbin;
}
static void
fs_rtp_session_associate_free_substreams (FsRtpSession *session,
FsRtpStream *stream, guint32 ssrc)
{
gboolean added = FALSE;
FS_RTP_SESSION_LOCK (session);
for (;;)
{
FsRtpSubStream *substream = NULL;
GList *item = NULL;
GError *error = NULL;
for (item = g_list_first (session->priv->free_substreams);
item;
item = g_list_next (item))
{
FsRtpSubStream *localsubstream = item->data;
GST_LOG ("Have substream with ssrc %x, looking for %x",
localsubstream->ssrc, ssrc);
if (ssrc == localsubstream->ssrc)
{
substream = localsubstream;
session->priv->free_substreams = g_list_delete_link (
session->priv->free_substreams, item);
break;
}
}
if (!substream)
break;
added = TRUE;
while (
g_signal_handlers_disconnect_by_func (substream, "error", session) > 0);
while (
g_signal_handlers_disconnect_by_func (substream, "no-rtcp-timedout",
session) > 0);
if (fs_rtp_stream_add_substream_unlock (stream, substream, &error))
{
GST_DEBUG ("Associated SSRC %x in session %u", ssrc, session->id);
}
else
{
GST_ERROR ("Could not associate a substream with its stream : %s",
error->message);
g_prefix_error (&error,
"Could not associate a substream with its stream: ");
fs_session_emit_error (FS_SESSION (session), error->code,
error->message);
}
g_clear_error (&error);
FS_RTP_SESSION_LOCK (session);
}
FS_RTP_SESSION_UNLOCK (session);
if (added == FALSE)
GST_DEBUG ("No free substream with SSRC %x in session %u",
ssrc, session->id);
}
void
fs_rtp_session_associate_ssrc_cname (FsRtpSession *session,
guint32 ssrc,
const gchar *cname)
{
FsRtpStream *stream = NULL;
GList *item = NULL;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
FS_RTP_SESSION_LOCK (session);
if (!session->priv->free_substreams)
{
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_has_disposed_exit (session);
return;
}
for (item = g_list_first (session->priv->streams);
item;
item = g_list_next (item))
{
FsRtpStream *localstream = item->data;
gchar *localcname = NULL;
g_object_get (localstream->participant, "cname", &localcname, NULL);
if (localcname && !strcmp (localcname, cname))
{
stream = localstream;
g_free (localcname);
break;
}
g_free (localcname);
}
if (!stream)
{
GST_LOG ("There is no participant with cname %s, but"
" we have streams of unknown origin", cname);
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_has_disposed_exit (session);
return;
}
fs_rtp_session_add_ssrc_stream_locked (session, ssrc, stream);
g_object_ref (stream);
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_associate_free_substreams (session, stream, ssrc);
g_object_unref (stream);
fs_rtp_session_has_disposed_exit (session);
}
static void
_substream_no_rtcp_timedout_cb (FsRtpSubStream *substream,
FsRtpSession *session)
{
GError *error = NULL;
FsRtpStream *first_stream = NULL;
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
FS_RTP_SESSION_LOCK (session);
if (g_list_length (session->priv->streams) != 1)
{
GST_WARNING ("The substream for SSRC %x and pt %u did not receive RTCP"
" for %d milliseconds, but we have more than one stream so we can"
" not associate it.", substream->ssrc, substream->pt,
substream->no_rtcp_timeout);
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_has_disposed_exit (session);
return;
}
if (!g_list_find (session->priv->free_substreams, substream))
{
GST_WARNING ("Could not find substream %p in the list of free substreams",
substream);
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_has_disposed_exit (session);
return;
}
session->priv->free_substreams =
g_list_remove (session->priv->free_substreams,
substream);
while (
g_signal_handlers_disconnect_by_func (substream, "error", session) > 0);
while (
g_signal_handlers_disconnect_by_func (substream, "no-rtcp-timedout",
session) > 0);
first_stream = g_list_first (session->priv->streams)->data;
g_object_ref (first_stream);
if (!fs_rtp_stream_add_substream_unlock (first_stream, substream, &error))
{
g_prefix_error (&error,
"Could not link the substream to a stream: ");
fs_session_emit_error (FS_SESSION (session),
error ? error->code : FS_ERROR_INTERNAL,
error ? error->message : "No error message");
}
g_clear_error (&error);
g_object_unref (first_stream);
fs_rtp_session_has_disposed_exit (session);
}
/**
* fs_rtp_session_bye_ssrc:
* @session: a #FsRtpSession
* @ssrc: The ssrc
*
* This function is called when a RTCP BYE is received
*/
void
fs_rtp_session_bye_ssrc (FsRtpSession *session,
guint32 ssrc)
{
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
/* First remove it from the known SSRCs */
FS_RTP_SESSION_LOCK (session);
if (!g_hash_table_lookup (session->priv->ssrc_streams_manual,
GUINT_TO_POINTER (ssrc)))
g_hash_table_remove (session->priv->ssrc_streams, GUINT_TO_POINTER (ssrc));
FS_RTP_SESSION_UNLOCK (session);
/*
* TODO:
*
* Remove running substreams with that SSRC .. lets also check if they
* come from the right ip/port/etc ??
*/
fs_rtp_session_has_disposed_exit (session);
}
static gboolean
gather_caps_parameters (CodecAssociation *ca, GstCaps *caps)
{
GstStructure *s = NULL;
int i;
gboolean new_config = FALSE;
s = gst_caps_get_structure (caps, 0);
for (i = 0; i < gst_structure_n_fields (s); i++)
{
const gchar *name = gst_structure_nth_field_name (s, i);
if (name)
{
const gchar *value = gst_structure_get_string (s, name);
if (value)
{
if (codec_has_config_data_named (ca->codec, name))
{
GList *item = NULL;
for (item = ca->codec->optional_params; item;
item = g_list_next (item))
{
FsCodecParameter *param = item->data;
if (!g_ascii_strcasecmp (param->name, name))
{
if (!g_ascii_strcasecmp (param->value, value))
break;
GST_DEBUG ("%d/%s: replacing param %s=%s with %s",
ca->codec->id, ca->codec->encoding_name, name, param->value, value);
/* replace the value if its different */
fs_codec_remove_optional_parameter (ca->codec, param);
fs_codec_add_optional_parameter (ca->codec, name, value);
new_config = TRUE;
break;
}
}
/* Add it if it wasn't there */
if (item == NULL)
{
GST_DEBUG ("%d/%s: adding param %s=%s",
ca->codec->id, ca->codec->encoding_name, name, value);
fs_codec_add_optional_parameter (ca->codec, name, value);
new_config = TRUE;
}
}
}
}
}
ca->need_config = FALSE;
return new_config;
}
static void
_send_caps_changed (GstPad *pad, GParamSpec *pspec, FsRtpSession *session)
{
GstCaps *caps = NULL;
CodecAssociation *ca = NULL;
g_object_get (pad, "caps", &caps, NULL);
if (!caps)
return;
g_return_if_fail (GST_CAPS_IS_SIMPLE(caps));
if (fs_rtp_session_has_disposed_enter (session, NULL))
{
gst_caps_unref (caps);
return;
}
FS_RTP_SESSION_LOCK (session);
if (!session->priv->current_send_codec)
goto out;
ca = lookup_codec_association_by_codec (session->priv->codec_associations,
session->priv->current_send_codec);
if (!ca)
goto out;
/*
* Emit farstream-codecs-changed if the sending thread finds the config
* for the last codec that needed it
*/
if (gather_caps_parameters (ca, caps))
{
GList *item = NULL;
for (item = g_list_first (session->priv->codec_associations);
item;
item = g_list_next (item))
{
ca = item->data;
if (ca->need_config)
break;
}
if (!item)
{
FS_RTP_SESSION_UNLOCK (session);
g_object_notify (G_OBJECT (session), "codecs");
gst_element_post_message (GST_ELEMENT (session->priv->conference),
gst_message_new_element (GST_OBJECT (session->priv->conference),
gst_structure_new ("farstream-codecs-changed",
"session", FS_TYPE_SESSION, session,
NULL)));
goto out_unlocked;
}
}
out:
FS_RTP_SESSION_UNLOCK (session);
out_unlocked:
gst_caps_unref (caps);
fs_rtp_session_has_disposed_exit (session);
}
static void
_discovery_caps_changed (GstPad *pad, GParamSpec *pspec, FsRtpSession *session)
{
CodecAssociation *ca = NULL;
GstCaps *caps = NULL;
gboolean block = TRUE;
g_object_get (pad, "caps", &caps, NULL);
if (!caps)
return;
g_return_if_fail (GST_CAPS_IS_SIMPLE(caps));
if (fs_rtp_session_has_disposed_enter (session, NULL))
{
gst_caps_unref (caps);
return;
}
FS_RTP_SESSION_LOCK (session);
/* If there is no codec, its because we're shutting down */
if (!session->priv->discovery_codec)
{
GST_DEBUG ("Got caps while discovery is stopping");
goto out;
}
ca = lookup_codec_association_by_codec_for_sending (
session->priv->codec_associations,
session->priv->discovery_codec);
if (ca && ca->need_config)
{
gather_caps_parameters (ca, caps);
fs_codec_destroy (session->priv->discovery_codec);
session->priv->discovery_codec = fs_codec_copy (ca->codec);
block = !ca->need_config;
}
out:
gst_caps_unref (caps);
if (block && session->priv->discovery_pad_block_id == 0)
session->priv->discovery_pad_block_id =
gst_pad_add_probe (session->priv->send_tee_discovery_pad,
GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
_discovery_pad_blocked_callback,
g_object_ref (session), g_object_unref);
FS_RTP_SESSION_UNLOCK (session);
fs_rtp_session_has_disposed_exit (session);
}
/**
* fs_rtp_session_get_codec_params_unlock:
* @session: a #FsRtpSession
* @ca: the #CodecAssociaton to get params for
*
* Gets the parameters for the specified #CodecAssociation
*
* Returns: %TRUE on success, %FALSE on error
*/
static gboolean
fs_rtp_session_get_codec_params_unlock (FsRtpSession *session,
CodecAssociation *ca, GError **error)
{
GstPad *pad = NULL;
gchar *tmp;
GstCaps *caps;
FsCodec *discovery_codec = fs_codec_copy (ca->codec);
FsCodec *send_codec = fs_codec_copy (ca->send_codec);
GstElement *codecbin = NULL;
GST_LOG ("Gathering params for codec " FS_CODEC_FORMAT,
FS_CODEC_ARGS (ca->send_codec));
fs_codec_destroy (session->priv->discovery_codec);
session->priv->discovery_codec = NULL;
tmp = g_strdup_printf ("discoverAA_%u_%u", session->id, ca->send_codec->id);
codecbin = _create_codec_bin (ca, ca->send_codec, tmp, FS_DIRECTION_SEND,
NULL,
0, NULL, error);
g_free (tmp);
FS_RTP_SESSION_UNLOCK (session);
/* Invalidate CA because we've just unlocked */
ca = NULL;
if (session->priv->discovery_codecbin)
{
gst_element_set_locked_state (session->priv->discovery_codecbin, TRUE);
gst_element_set_state (session->priv->discovery_codecbin, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_codecbin);
session->priv->discovery_codecbin = NULL;
}
/* They must both exist or neither exists, anything else is wrong */
if ((session->priv->discovery_fakesink == NULL ||
session->priv->discovery_capsfilter == NULL) &&
session->priv->discovery_fakesink != session->priv->discovery_capsfilter)
{
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"Capsfilter and fakesink not synchronized, fakesink:%p capsfilter:%p",
session->priv->discovery_fakesink, session->priv->discovery_capsfilter);
goto error;
}
if (session->priv->discovery_fakesink == NULL &&
session->priv->discovery_capsfilter == NULL)
{
tmp = g_strdup_printf ("discovery_fakesink_%u", session->id);
session->priv->discovery_fakesink =
gst_element_factory_make ("fakesink", tmp);
g_free (tmp);
if (!session->priv->discovery_fakesink)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not make fakesink element");
goto error;
}
g_object_set (session->priv->discovery_fakesink,
"sync", FALSE,
"async", FALSE,
NULL);
if (!gst_bin_add (GST_BIN (session->priv->conference),
session->priv->discovery_fakesink))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the discovery fakesink to the bin");
goto error;
}
if (!gst_element_sync_state_with_parent (session->priv->discovery_fakesink))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the discovery fakesink's state with its parent");
goto error;
}
tmp = g_strdup_printf ("discovery_capsfilter_%u", session->id);
session->priv->discovery_capsfilter =
gst_element_factory_make ("capsfilter", tmp);
g_free (tmp);
if (!session->priv->discovery_capsfilter)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not make capsfilter element");
goto error;
}
if (!gst_bin_add (GST_BIN (session->priv->conference),
session->priv->discovery_capsfilter))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the discovery capsfilter to the bin");
goto error;
}
if (!gst_element_sync_state_with_parent (
session->priv->discovery_capsfilter))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the discovery capsfilter's state with its parent");
goto error;
}
if (!gst_element_link_pads (session->priv->discovery_capsfilter, "src",
session->priv->discovery_fakesink, "sink"))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link discovery capsfilter and fakesink");
goto error;
}
pad = gst_element_get_static_pad (session->priv->discovery_capsfilter,
"src");
g_signal_connect_object (pad, "notify::caps",
G_CALLBACK (_discovery_caps_changed), session, 0);
gst_object_unref (pad);
}
if (!codecbin)
goto error;
session->priv->discovery_codecbin = codecbin;
codecbin = NULL;
if (!gst_bin_add (GST_BIN (session->priv->conference),
session->priv->discovery_codecbin))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the discovery codecbin to the bin");
goto error;
}
if (!gst_element_sync_state_with_parent (session->priv->discovery_codecbin))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the discovery codecbin's state with its parent");
goto error;
}
caps = fs_codec_to_gst_caps (send_codec);
g_object_set (session->priv->discovery_capsfilter,
"caps", caps,
NULL);
gst_caps_unref (caps);
if (!gst_element_link_pads (session->priv->discovery_codecbin, "src",
session->priv->discovery_capsfilter, "sink"))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link discovery codecbin and capsfilter");
goto error;
}
if (!gst_element_link (session->priv->discovery_valve,
session->priv->discovery_codecbin))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the valve and the discovery codecbin");
gst_object_unref (pad);
goto error;
}
g_object_set (session->priv->discovery_valve, "drop", FALSE, NULL);
fs_codec_destroy (send_codec);
session->priv->discovery_codec = discovery_codec;
return TRUE;
error:
g_object_set (session->priv->discovery_valve, "drop", TRUE, NULL);
fs_codec_destroy (send_codec);
fs_codec_destroy (discovery_codec);
if (codecbin)
gst_object_unref (codecbin);
if (session->priv->discovery_fakesink)
{
gst_element_set_locked_state (session->priv->discovery_fakesink, TRUE);
gst_element_set_state (session->priv->discovery_fakesink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_fakesink);
session->priv->discovery_fakesink = NULL;
}
if (session->priv->discovery_capsfilter)
{
gst_element_set_locked_state (session->priv->discovery_capsfilter, TRUE);
gst_element_set_state (session->priv->discovery_capsfilter, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_capsfilter);
session->priv->discovery_capsfilter = NULL;
}
if (session->priv->discovery_codecbin)
{
gst_element_set_locked_state (session->priv->discovery_codecbin, TRUE);
gst_element_set_state (session->priv->discovery_codecbin, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_codecbin);
session->priv->discovery_codecbin = NULL;
}
return FALSE;
}
/**
* _discovery_pad_blocked_callback:
*
* This is the callback to change the discovery codecbin
*/
static GstPadProbeReturn
_discovery_pad_blocked_callback (GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
FsRtpSession *session = user_data;
GError *error = NULL;
GList *item = NULL;
CodecAssociation *ca = NULL;
if (fs_rtp_session_has_disposed_enter (session, NULL))
{
FS_RTP_SESSION_LOCK (session);
session->priv->discovery_pad_block_id = 0;
FS_RTP_SESSION_UNLOCK (session);
return GST_PAD_PROBE_REMOVE;
}
FS_RTP_SESSION_LOCK (session);
session->priv->discovery_pad_block_id = 0;
/* Find out if there is a codec that needs the config to be fetched */
for (item = g_list_first (session->priv->codec_associations);
item;
item = g_list_next (item))
{
ca = item->data;
if (ca->need_config)
break;
}
if (!item)
{
fs_rtp_session_stop_codec_param_gathering_unlock (session);
g_object_notify (G_OBJECT (session), "codecs");
gst_element_post_message (GST_ELEMENT (session->priv->conference),
gst_message_new_element (GST_OBJECT (session->priv->conference),
gst_structure_new ("farstream-codecs-changed",
"session", FS_TYPE_SESSION, session,
NULL)));
goto out_unlocked;
}
if (fs_codec_are_equal (ca->codec, session->priv->discovery_codec))
goto out_locked;
if (!fs_rtp_session_get_codec_params_unlock (session, ca, &error))
{
FS_RTP_SESSION_LOCK (session);
fs_rtp_session_stop_codec_param_gathering_unlock (session);
g_prefix_error (&error,
"Error while discovering codec data, discovery cancelled: ");
fs_session_emit_error (FS_SESSION (session), error->code,
error->message);
}
g_clear_error (&error);
out_unlocked:
fs_rtp_session_has_disposed_exit (session);
return GST_PAD_PROBE_REMOVE;
out_locked:
FS_RTP_SESSION_UNLOCK (session);
goto out_unlocked;
}
/**
* fs_rtp_session_start_codec_param_gathering_locked
* @session: a #FsRtpSession
*
* Check if there is any codec associations that requires codec discovery and
* if there is, starts the gathering process by adding a pad block to the
* tee's discovery src pad
*/
static void
fs_rtp_session_start_codec_param_gathering_locked (FsRtpSession *session)
{
GList *item = NULL;
/* Find out if there is a codec that needs the config to be fetched */
for (item = g_list_first (session->priv->codec_associations);
item;
item = g_list_next (item))
{
CodecAssociation *ca = item->data;
if (ca->need_config)
break;
}
if (!item)
return;
GST_DEBUG ("Starting Codec Param discovery for session %d", session->id);
if (session->priv->discovery_pad_block_id == 0)
session->priv->discovery_pad_block_id =
gst_pad_add_probe (session->priv->send_tee_discovery_pad,
GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
_discovery_pad_blocked_callback, g_object_ref (session), g_object_unref);
}
/**
* fs_rtp_session_stop_codec_param_gathering_unlock:
* @session: a #FsRtpSession
*
* Stop the codec config gathering and remove the elements used for that
*/
static void
fs_rtp_session_stop_codec_param_gathering_unlock (FsRtpSession *session)
{
GST_DEBUG ("Stopping Codec Param discovery for session %d", session->id);
if (session->priv->discovery_codec)
{
fs_codec_destroy (session->priv->discovery_codec);
session->priv->discovery_codec = NULL;
}
if (session->priv->discovery_valve)
g_object_set (session->priv->discovery_valve, "drop", TRUE, NULL);
FS_RTP_SESSION_UNLOCK (session);
if (session->priv->discovery_fakesink)
{
gst_element_set_locked_state (session->priv->discovery_fakesink, TRUE);
gst_element_set_state (session->priv->discovery_fakesink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_fakesink);
session->priv->discovery_fakesink = NULL;
}
if (session->priv->discovery_capsfilter)
{
gst_element_set_locked_state (session->priv->discovery_capsfilter, TRUE);
gst_element_set_state (session->priv->discovery_capsfilter, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_capsfilter);
session->priv->discovery_capsfilter = NULL;
}
if (session->priv->discovery_codecbin)
{
gst_element_set_locked_state (session->priv->discovery_codecbin, TRUE);
gst_element_set_state (session->priv->discovery_codecbin, GST_STATE_NULL);
gst_bin_remove (GST_BIN (session->priv->conference),
session->priv->discovery_codecbin);
session->priv->discovery_codecbin = NULL;
}
}
static gchar **
fs_rtp_session_list_transmitters (FsSession *session)
{
gchar **rv;
g_return_val_if_fail (FS_IS_RTP_SESSION (session), NULL);
rv = fs_transmitter_list_available ();
if (!rv)
rv = g_malloc0 (1);
return rv;
}
static GType
fs_rtp_session_get_stream_transmitter_type (FsSession *session,
const gchar *transmitter)
{
FsRtpSession *self = FS_RTP_SESSION (session);
GType st_type = 0;
FsTransmitter *trans;
trans = fs_rtp_session_get_transmitter (self, transmitter, NULL);
if (transmitter)
st_type = fs_transmitter_get_stream_transmitter_type (trans);
g_object_unref (trans);
return st_type;
}
void
fs_rtp_session_ssrc_validated (FsRtpSession *session,
guint32 ssrc)
{
if (fs_rtp_session_has_disposed_enter (session, NULL))
return;
gst_element_send_event (session->priv->rtpmuxer,
gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstForceKeyUnit",
"all-headers", G_TYPE_BOOLEAN, TRUE,
NULL)));
fs_rtp_session_has_disposed_exit (session);
}
struct CodecBinSetBitrateData
{
guint bitrate;
gboolean ret;
};
static void
codecbin_set_bitrate_func (const GValue *item, gpointer user_data)
{
GstElement *elem = g_value_get_object (item);
struct CodecBinSetBitrateData *data = user_data;
if (g_object_class_find_property (G_OBJECT_GET_CLASS (elem), "bitrate"))
{
fs_utils_set_bitrate (elem, data->bitrate);
data->ret = TRUE;
}
}
static gboolean
codecbin_set_bitrate (GstElement *codecbin, guint bitrate)
{
GstIterator *it;
struct CodecBinSetBitrateData data;
if (bitrate == 0)
return FALSE;
GST_DEBUG ("Setting bitrate to %u bits/sec", bitrate);
data.bitrate = bitrate;
data.ret = FALSE;
it = gst_bin_iterate_recurse (GST_BIN (codecbin));
gst_iterator_foreach (it, codecbin_set_bitrate_func, &data);
gst_iterator_free (it);
return data.ret;
}
static void
fs_rtp_session_set_send_bitrate (FsRtpSession *self, guint bitrate)
{
FS_RTP_SESSION_LOCK (self);
if (bitrate)
self->priv->send_bitrate = bitrate;
if (self->priv->send_codecbin)
codecbin_set_bitrate (self->priv->send_codecbin, bitrate);
if (self->priv->send_bitrate_adapter)
g_object_set (self->priv->send_bitrate_adapter, "bitrate", bitrate, NULL);
FS_RTP_SESSION_UNLOCK (self);
}
static GList *
fs_rtp_session_get_codecs_need_resend (FsSession *session,
GList *old_codecs, GList *new_codecs)
{
g_return_val_if_fail (FS_IS_RTP_SESSION (session), FALSE);
return codecs_list_has_codec_config_changed (old_codecs, new_codecs);
}
/*
* TODO: This is horribly too complicated.
* What is need is a real async API on dtmfsrc and rtpdtmfsrc
*/
gboolean
fs_rtp_session_handle_dtmf_event_message (FsRtpSession *self,
GstMessage *message)
{
GstEvent *event;
const GstStructure *ms;
const GstStructure *es;
gboolean m_start, e_start;
gint m_method, e_method;
gint m_number = -1, e_number = -1;
gint m_volume;
gboolean matching;
GstMessage *post_message = NULL;
FS_RTP_SESSION_LOCK (self);
if (g_queue_get_length (&self->priv->telephony_events) == 0 ||
!fs_rtp_special_sources_claim_message_locked (
self->priv->extra_sources, message))
{
FS_RTP_SESSION_UNLOCK (self);
return FALSE;
}
event = g_queue_peek_tail (&self->priv->telephony_events);
ms = gst_message_get_structure (message);
es = gst_event_get_structure (event);
if (!gst_structure_get_boolean (ms, "start", &m_start))
goto invalid;
gst_structure_get_boolean (es, "start", &e_start);
if (!gst_structure_get_int (ms, "method", &m_method))
goto invalid;
gst_structure_get_int (es, "method", &e_method);
if (m_start)
{
if (!gst_structure_get_int (ms, "number", &m_number))
goto invalid;
gst_structure_get_int (es, "number", &e_number);
if (!gst_structure_get_int (ms, "volume", &m_volume))
goto invalid;
}
matching = ((!m_start && !e_start) ||
(m_start == e_start && m_method == e_method && m_number == e_number));
if (gst_structure_has_name (ms, "dtmf-event-processed"))
{
if (matching)
{
if (m_start)
{
if (self->priv->running_telephony_src)
{
GST_WARNING ("Got a second start from %s",
self->priv->running_telephony_src == GST_MESSAGE_SRC (message) ?
"the same source" : "a different source");
gst_object_unref (self->priv->running_telephony_src);
}
self->priv->running_telephony_src = gst_object_ref (
GST_MESSAGE_SRC (message));
}
else /* is a stop */
{
if (self->priv->running_telephony_src)
{
if (self->priv->running_telephony_src == GST_MESSAGE_SRC (message))
{
gst_object_unref (self->priv->running_telephony_src);
self->priv->running_telephony_src = NULL;
}
else
{
GST_DEBUG ("Received stop event from another source, ignoring");
return TRUE;
}
}
}
g_queue_pop_tail (&self->priv->telephony_events);
gst_event_unref (event);
self->priv->telephony_event_running = FALSE;
GST_DEBUG ("Got processed telepathy event %s for %d",
m_start ? "start" : "stop", m_number);
if (m_start)
post_message = gst_message_new_element (
GST_OBJECT (self->priv->conference),
gst_structure_new ("farstream-telephony-event-started",
"session", FS_TYPE_SESSION, self,
"method", FS_TYPE_DTMF_METHOD, m_method,
"event", FS_TYPE_DTMF_EVENT, m_number,
"volume", G_TYPE_UCHAR, m_volume,
NULL));
else
post_message = gst_message_new_element (
GST_OBJECT (self->priv->conference),
gst_structure_new ("farstream-telephony-event-stopped",
"session", FS_TYPE_SESSION, self,
"method", FS_TYPE_DTMF_METHOD, m_method,
NULL));
}
else
{
GST_WARNING ("Got dtmf-event-processed message that does not match the"
" currently running event, ignoring");
}
}
else if (gst_structure_has_name (ms, "dtmf-event-dropped"))
{
if (m_start == FALSE && e_start == FALSE)
{
if (self->priv->running_telephony_src == GST_MESSAGE_SRC (message))
{
gst_object_unref (self->priv->running_telephony_src);
self->priv->running_telephony_src = NULL;
}
g_queue_pop_tail (&self->priv->telephony_events);
gst_event_unref (event);
self->priv->telephony_event_running = FALSE;
post_message = gst_message_new_element (
GST_OBJECT (self->priv->conference),
gst_structure_new ("farstream-telephony-event-stopped",
"session", FS_TYPE_SESSION, self,
"type", G_TYPE_INT, 1,
"method", G_TYPE_INT, m_method,
NULL));
}
else if (matching)
{
self->priv->telephony_event_running = FALSE;
}
else
{
GST_WARNING ("Got dtmf-event-dropped message that does not match the"
" currently running event");
}
}
invalid:
FS_RTP_SESSION_UNLOCK (self);
if (post_message)
gst_element_post_message (GST_ELEMENT (self->priv->conference),
post_message);
fs_rtp_session_try_sending_dtmf_event (self);
return TRUE;
}
static gboolean
fs_rtp_session_set_allowed_caps (FsSession *session, GstCaps *sink_caps,
GstCaps *src_caps, GError **error)
{
FsRtpSession *self = FS_RTP_SESSION (session);
GstCaps *old_input_caps = NULL;
GstCaps *old_output_caps = NULL;
gboolean ret;
guint current_generation;
if (fs_rtp_session_has_disposed_enter (self, error))
return FALSE;
FS_RTP_SESSION_LOCK (self);
if (sink_caps)
{
old_input_caps = gst_caps_ref (self->priv->input_caps);
gst_caps_replace (&self->priv->input_caps, sink_caps);
}
if (src_caps)
{
old_output_caps = gst_caps_ref (self->priv->output_caps);
gst_caps_replace (&self->priv->output_caps, src_caps);
}
current_generation = self->priv->caps_generation;
self->priv->caps_generation++;
FS_RTP_SESSION_UNLOCK (self);
ret = fs_rtp_session_update_codecs (self, NULL, NULL, error);
if (ret)
{
if (sink_caps)
g_object_notify ((GObject *) self, "allowed-sink-caps");
if (src_caps)
g_object_notify ((GObject *) self, "allowed-src-caps");
}
else
{
FS_RTP_SESSION_LOCK (self);
if (self->priv->caps_generation == current_generation)
{
if (old_input_caps)
gst_caps_replace (&self->priv->input_caps, old_input_caps);
if (old_output_caps)
gst_caps_replace (&self->priv->output_caps, old_output_caps);
self->priv->caps_generation++;
}
FS_RTP_SESSION_UNLOCK (self);
GST_WARNING ("Invalid new codec preferences");
}
gst_caps_replace (&old_input_caps, NULL);
gst_caps_replace (&old_output_caps, NULL);
fs_rtp_session_has_disposed_exit (self);
return ret;
}
static gboolean
fs_rtp_session_set_encryption_parameters (FsSession *session,
GstStructure *parameters, GError **error)
{
FsRtpSession *self = FS_RTP_SESSION (session);
gboolean ret = FALSE;
GstBuffer *key;
gint rtp_cipher;
gint rtcp_cipher;
gint rtp_auth;
gint rtcp_auth;
guint replay_window_size;
g_return_val_if_fail (FS_IS_RTP_SESSION (session), FALSE);
g_return_val_if_fail (parameters == NULL ||
GST_IS_STRUCTURE (parameters), FALSE);
if (!validate_srtp_parameters (parameters, &rtp_cipher, &rtcp_cipher,
&rtp_auth, &rtcp_auth, &key, &replay_window_size, error))
return FALSE;
if (fs_rtp_session_has_disposed_enter (self, error))
return FALSE;
if (!self->priv->srtpenc)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Can't set encryption because srtpenc is not installed");
goto done;
}
FS_RTP_SESSION_LOCK (self);
if (self->priv->encryption_parameters)
gst_structure_free (self->priv->encryption_parameters);
if (parameters)
self->priv->encryption_parameters = gst_structure_copy (parameters);
else
self->priv->encryption_parameters = NULL;
FS_RTP_SESSION_UNLOCK (self);
g_object_set (self->priv->srtpenc,
"replay-window-size", replay_window_size,
"rtp-auth", rtp_auth, "rtcp-auth", rtcp_auth,
"rtp-cipher", rtp_cipher, "rtcp-cipher", rtcp_cipher, "key", key, NULL);
ret = TRUE;
done:
fs_rtp_session_has_disposed_exit (self);
return ret;
}
static GstCaps *
_srtpdec_request_key (GstElement *srtpdec, guint ssrc, gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
FsRtpStream *stream;
GstCaps *caps = NULL;
if (fs_rtp_session_has_disposed_enter (self, NULL))
return NULL;
FS_RTP_SESSION_LOCK (self);
stream = fs_rtp_session_get_stream_by_ssrc_locked (self, ssrc);
if (stream)
{
caps = fs_rtp_stream_get_srtp_caps_locked (stream);
g_object_unref (stream);
}
else
{
GList *item;
gboolean no_crypto = TRUE;
for (item = self->priv->streams; item; item = item->next)
if (fs_rtp_stream_requires_crypto_locked (item->data))
{
no_crypto = FALSE;
break;
}
if (no_crypto)
{
GST_DEBUG ("No stream found for SSRC %x, none of the streams require"
" crypto, so letting through", ssrc);
caps = gst_caps_new_simple ("application/x-srtp",
"srtp-cipher", G_TYPE_STRING, "null",
"srtcp-cipher", G_TYPE_STRING, "null",
"srtp-auth", G_TYPE_STRING, "null",
"srtcp-auth", G_TYPE_STRING, "null",
NULL);
}
else
{
GST_DEBUG ("Some streams require crypto, dropping packets");
}
}
FS_RTP_SESSION_UNLOCK (self);
fs_rtp_session_has_disposed_exit (self);
return caps;
}
static gboolean
_stream_decrypt_clear_locked_cb (FsRtpStream *stream, gpointer user_data)
{
FsRtpSession *self = FS_RTP_SESSION (user_data);
GHashTableIter iter;
gpointer key, value;
if (!self->priv->srtpdec)
return FALSE;
g_hash_table_iter_init (&iter, self->priv->ssrc_streams);
while (g_hash_table_iter_next (&iter, &key, &value))
{
guint32 ssrc = GPOINTER_TO_UINT (key);
FsRtpStream *tmp_stream = value;
if (tmp_stream == stream)
g_signal_emit_by_name (self->priv->srtpdec, "remove-key", ssrc);
}
return TRUE;
}