/*
* Farstream - Farstream RTP Sub Stream
*
* Copyright 2007-2009 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* Copyright 2007-2009 Nokia Corp.
*
* fs-rtp-substream.c - A Farstream RTP Substream gobject
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-rtp-substream.h"
#include <farstream/fs-stream.h>
#include <farstream/fs-session.h>
#include "fs-rtp-stream.h"
#define GST_CAT_DEFAULT fsrtpconference_debug
/*
* SECTION:fs-rtp-sub-stream
* @short_description: The receive codec bin for a ssrc and a pt
*
* This object controls a part of the receive pipeline, with the following shape
*
* rtpbin_pad -> input_valve -> capsfilter -> codecbin -> output_valve -> output_ghostad
*
*/
/* signals */
enum
{
NO_RTCP_TIMEDOUT,
SRC_PAD_ADDED,
CODEC_CHANGED,
ERROR_SIGNAL,
GET_CODEC_BIN,
UNLINKED,
LAST_SIGNAL
};
/* props */
enum
{
PROP_0,
PROP_CONFERENCE,
PROP_SESSION,
PROP_STREAM,
PROP_RTPBIN_PAD,
PROP_SSRC,
PROP_PT,
PROP_CODEC,
PROP_RECEIVING,
PROP_OUTPUT_GHOSTPAD,
PROP_NO_RTCP_TIMEOUT
};
#define DEFAULT_NO_RTCP_TIMEOUT (7000)
struct _FsRtpSubStreamPrivate {
/* These are only pointers, we don't own references */
FsRtpConference *conference;
FsRtpSession *session;
FsRtpStream *stream; /* only set once, protected by session lock */
GstPad *rtpbin_pad;
gulong rtpbin_unlinked_sig;
GstElement *input_valve;
GstElement *output_valve;
GstElement *capsfilter;
/* This only exists if the codec is valid,
* otherwise the rtpbin_pad is blocked */
/* Protected by the session mutex */
GstElement *codecbin;
guint builder_hash;
/* This is only created when the substream is associated with a FsRtpStream */
GstPad *output_ghostpad;
/* Set to TRUE if the ghostpad is already being added */
/* Proteced by the session mutex */
gboolean adding_output_ghostpad;
/* The id of the pad probe used to block the stream while the recv codec
* is changed
* Protected by the session mutex
*/
gulong blocking_id;
gulong check_caps_id;
/* This is protected by the session lock... the caller takes the lock
* before updating the property.. yea nasty I know
*/
gboolean receiving;
/* Protected by the this mutex */
GMutex mutex;
GstClockID no_rtcp_timeout_id;
GstClockTime next_no_rtcp_timeout;
GThread *no_rtcp_timeout_thread;
/* Can only be used while using the lock */
GRWLock stopped_lock;
gboolean stopped;
GError *construction_error;
};
static GObjectClass *parent_class = NULL;
static guint signals[LAST_SIGNAL] = { 0 };
G_DEFINE_TYPE(FsRtpSubStream, fs_rtp_sub_stream, G_TYPE_OBJECT);
#define FS_RTP_SUB_STREAM_GET_PRIVATE(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RTP_SUB_STREAM, \
FsRtpSubStreamPrivate))
#define FS_RTP_SUB_STREAM_LOCK(substream) \
g_mutex_lock (&substream->priv->mutex)
#define FS_RTP_SUB_STREAM_UNLOCK(substream) \
g_mutex_unlock (&substream->priv->mutex)
static void fs_rtp_sub_stream_dispose (GObject *object);
static void fs_rtp_sub_stream_finalize (GObject *object);
static void fs_rtp_sub_stream_constructed (GObject *object);
static void fs_rtp_sub_stream_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static void fs_rtp_sub_stream_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static void
fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream,
gint error_no,
gchar *error_msg,
gchar *debug_msg);
static gboolean
fs_rtp_sub_stream_has_stopped_enter (FsRtpSubStream *self)
{
g_rw_lock_reader_lock (&self->priv->stopped_lock);
if (self->priv->stopped)
{
g_rw_lock_reader_unlock (&self->priv->stopped_lock);
return TRUE;
}
return FALSE;
}
static void
fs_rtp_sub_stream_has_stopped_exit (FsRtpSubStream *self)
{
g_rw_lock_reader_unlock (&self->priv->stopped_lock);
}
static void
fs_rtp_sub_stream_class_init (FsRtpSubStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
parent_class = fs_rtp_sub_stream_parent_class;
gobject_class->constructed = fs_rtp_sub_stream_constructed;
gobject_class->dispose = fs_rtp_sub_stream_dispose;
gobject_class->finalize = fs_rtp_sub_stream_finalize;
gobject_class->set_property = fs_rtp_sub_stream_set_property;
gobject_class->get_property = fs_rtp_sub_stream_get_property;
g_object_class_install_property (gobject_class,
PROP_CONFERENCE,
g_param_spec_object ("conference",
"The FsRtpConference this substream stream refers to",
"This is a convience pointer for the Conference",
FS_TYPE_RTP_CONFERENCE,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_SESSION,
g_param_spec_object ("session",
"The FsRtpSession this substream stream refers to",
"This is a convience pointer for the parent FsRtpSession",
FS_TYPE_RTP_SESSION,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_STREAM,
g_param_spec_object ("stream",
"The FsRtpStream this substream stream refers to",
"This is a convience pointer for the parent FsRtpStream",
FS_TYPE_RTP_STREAM,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_RTPBIN_PAD,
g_param_spec_object ("rtpbin-pad",
"The GstPad this substrea is linked to",
"This is the pad on which this substream will attach itself",
GST_TYPE_PAD,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_SSRC,
g_param_spec_uint ("ssrc",
"The ssrc this stream is used for",
"This is the SSRC from the pad",
0, G_MAXUINT32, 0,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_PT,
g_param_spec_uint ("pt",
"The payload type this stream is used for",
"This is the payload type from the pad",
0, 128, 0,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_CODEC,
g_param_spec_boxed ("codec",
"The FsCodec this substream is received",
"The FsCodec currently received from this substream",
FS_TYPE_CODEC,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_RECEIVING,
g_param_spec_boolean ("receiving",
"Whether this substream will receive any data",
"A toggle that prevents the substream from outputting any data",
TRUE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_OUTPUT_GHOSTPAD,
g_param_spec_object ("output-ghostpad",
"The output ghostpad for this substream",
"The GstPad which is on the outside of the fsrtpconference element"
" for this substream",
GST_TYPE_PAD,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_NO_RTCP_TIMEOUT,
g_param_spec_int ("no-rtcp-timeout",
"The timeout (in ms) before no RTCP is assumed",
"This is the time (in ms) after which data received without RTCP"
" is attached the FsStream, this only works if there is only one"
" FsStream. <=0 will do nothing",
-1, G_MAXINT, DEFAULT_NO_RTCP_TIMEOUT,
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* FsRtpSubStream::no-rtcp-timedout:
* @self: #FsSubStream that emitted the signal
*
* This signal is emitted after the timeout specified by
* #FsRtpSubStream:no-rtcp-timeout if this sub-stream has not been attached
* to a stream.
*
*/
signals[NO_RTCP_TIMEDOUT] = g_signal_new ("no-rtcp-timedout",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0);
/**
* FsRtpSubStream::src-pad-added:
* @self: #FsRtpSubStream that emitted the signal
* @pad: #GstPad of the new source pad
* @codec: #FsCodec of the codec being received on the new source pad
*
* This signal is emitted when a new gst source pad has been created for a
* specific codec being received. There will be a different source pad for
* each codec that is received. The user must ref the #GstPad if he wants to
* keep it. The user should not modify the #FsCodec and must copy it if he
* wants to use it outside the callback scope.
*
* This signal is not emitted on the main thread, but on GStreamer's streaming
* thread!
*
* This is re-emited by the FsStream
*
*/
signals[SRC_PAD_ADDED] = g_signal_new ("src-pad-added",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 2, GST_TYPE_PAD, FS_TYPE_CODEC);
/**
* FsRtpSubStream::error:
* @self: #FsRtpSubStream that emitted the signal
* @errorno: The number of the error
* @error_msg: Error message to be displayed to user
* @debug_msg: Debugging error message
*
* This signal is emitted in any error condition
*
*/
signals[ERROR_SIGNAL] = g_signal_new ("error",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_NONE, 3, G_TYPE_INT, G_TYPE_STRING, G_TYPE_STRING);
/**
* FsRtpSubStream:codec-changed
* @self: #FsRtpSubStream that emitted the signal
*
* This signal is emitted when the code for this substream has
* changed. It can be fetched from the #FsRtpSubStream:codec property
* This is useful for displaying the current active reception codecs.
*/
signals[CODEC_CHANGED] = g_signal_new ("codec-changed",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0);
/**
* FsRtpSubStream:get-codec-bin-locked
* @self: #FsRtpSubStream that emitted the signal
* @stream: the #FsRtpStream this substream is attached to if any (or %NULL)
* @current_codec: The current codec
* @new_codec: A pointer to a location where the codec can be stored
* @current_builder_hash: The hash of the current codecbin builder
* @new_builder_hash: A location to store the hash of the new codecbin
* builder
* @error: The location of a GError where an error can be stored
*
* This emitted when the substream want to get a codecbin or replace
* the current one.
*
* Returns: The Codec Bin
*/
signals[GET_CODEC_BIN] = g_signal_new ("get-codec-bin",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL,
G_TYPE_POINTER, 5, G_TYPE_POINTER, G_TYPE_POINTER, G_TYPE_UINT,
G_TYPE_POINTER, G_TYPE_POINTER);
/**
* FsRtpSubStream:unlinked
* @self: #FsRtpSubStream that emitted the signal
*
* This signal is emitted when the rtpbin pad that this substream decodes
* from is unlinked.
*/
signals[UNLINKED] = g_signal_new ("unlinked",
G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST,
0,
NULL,
NULL,
g_cclosure_marshal_VOID__VOID,
G_TYPE_NONE, 0, G_TYPE_NONE);
g_type_class_add_private (klass, sizeof (FsRtpSubStreamPrivate));
}
static void
fs_rtp_sub_stream_init (FsRtpSubStream *self)
{
self->priv = FS_RTP_SUB_STREAM_GET_PRIVATE (self);
self->priv->receiving = TRUE;
g_mutex_init (&self->priv->mutex);
g_rw_lock_init (&self->priv->stopped_lock);
}
static gpointer
no_rtcp_timeout_func (gpointer user_data)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (user_data);
GstClock *sysclock = NULL;
GstClockID id;
gboolean emit = TRUE;
sysclock = gst_system_clock_obtain ();
if (sysclock == NULL)
goto no_sysclock;
FS_RTP_SUB_STREAM_LOCK(self);
id = self->priv->no_rtcp_timeout_id = gst_clock_new_single_shot_id (sysclock,
self->priv->next_no_rtcp_timeout);
FS_RTP_SUB_STREAM_UNLOCK(self);
gst_clock_id_wait (id, NULL);
FS_RTP_SUB_STREAM_LOCK(self);
gst_clock_id_unref (id);
self->priv->no_rtcp_timeout_id = NULL;
if (self->priv->next_no_rtcp_timeout == 0)
emit = FALSE;
FS_RTP_SUB_STREAM_UNLOCK(self);
gst_object_unref (sysclock);
if (emit)
g_signal_emit (self, signals[NO_RTCP_TIMEDOUT], 0);
return NULL;
no_sysclock:
{
fs_rtp_sub_stream_emit_error (self, FS_ERROR_INTERNAL,
"Could not get system clock",
"Could not get system clock");
return NULL;
}
}
static gboolean
fs_rtp_sub_stream_start_no_rtcp_timeout_thread (FsRtpSubStream *self,
GError **error)
{
gboolean res = TRUE;
GstClock *sysclock = NULL;
sysclock = gst_system_clock_obtain ();
if (sysclock == NULL)
{
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"Could not obtain gst system clock");
return FALSE;
}
FS_RTP_SESSION_LOCK (self->priv->session);
FS_RTP_SUB_STREAM_LOCK(self);
self->priv->next_no_rtcp_timeout = gst_clock_get_time (sysclock) +
(self->no_rtcp_timeout * GST_MSECOND);
gst_object_unref (sysclock);
if (self->priv->no_rtcp_timeout_thread == NULL) {
/* only create a new thread if the old one was stopped. Otherwise we can
* just reuse the currently running one. */
self->priv->no_rtcp_timeout_thread =
g_thread_try_new ("no rtcp timeout", no_rtcp_timeout_func, self, error);
}
res = (self->priv->no_rtcp_timeout_thread != NULL);
if (res == FALSE && error && *error == NULL)
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Unknown error creating"
" thread");
FS_RTP_SUB_STREAM_UNLOCK(self);
FS_RTP_SESSION_UNLOCK (self->priv->session);
return res;
}
static void
fs_rtp_sub_stream_stop_no_rtcp_timeout_thread (FsRtpSubStream *self)
{
FS_RTP_SUB_STREAM_LOCK(self);
self->priv->next_no_rtcp_timeout = 0;
if (self->priv->no_rtcp_timeout_id)
gst_clock_id_unschedule (self->priv->no_rtcp_timeout_id);
if (self->priv->no_rtcp_timeout_thread == NULL)
{
FS_RTP_SUB_STREAM_UNLOCK(self);
return;
}
else
{
FS_RTP_SUB_STREAM_UNLOCK(self);
}
g_thread_join (self->priv->no_rtcp_timeout_thread);
FS_RTP_SUB_STREAM_LOCK(self);
self->priv->no_rtcp_timeout_thread = NULL;
FS_RTP_SUB_STREAM_UNLOCK(self);
}
static void
rtpbin_pad_unlinked (GstPad *pad, GstPad *peer, gpointer user_data)
{
FsRtpSubStream *self = user_data;
g_signal_emit (self, signals[UNLINKED], 0);
}
static void
fs_rtp_sub_stream_constructed (GObject *object)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (object);
GstPad *valve_sink_pad = NULL;
GstPadLinkReturn linkret;
gchar *tmp;
GST_DEBUG ("New substream in session %u for ssrc %x and pt %u",
self->priv->session->id, self->ssrc, self->pt);
if (!self->priv->conference) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_INVALID_ARGUMENTS, "A Substream needs a conference object");
return;
}
self->priv->rtpbin_unlinked_sig = g_signal_connect_object (
self->priv->rtpbin_pad, "unlinked", G_CALLBACK (rtpbin_pad_unlinked),
self, 0);
tmp = g_strdup_printf ("output_recv_valve_%u_%u_%u", self->priv->session->id,
self->ssrc, self->pt);
self->priv->output_valve = gst_element_factory_make ("valve", tmp);
g_free (tmp);
if (!self->priv->output_valve) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not create a valve element for"
" session substream with ssrc: %u and pt:%d", self->ssrc,
self->pt);
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->output_valve))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not add the valve element for session"
" substream with ssrc: %u and pt:%d to the conference bin",
self->ssrc, self->pt);
return;
}
/* We set the valve to dropping, the stream will unblock it when its linked */
g_object_set (self->priv->output_valve, "drop", TRUE, NULL);
if (gst_element_set_state (self->priv->output_valve, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not set the valve element for session"
" substream with ssrc: %u and pt:%d to the playing state",
self->ssrc, self->pt);
return;
}
tmp = g_strdup_printf ("recv_capsfilter_%u_%u_%u", self->priv->session->id,
self->ssrc, self->pt);
self->priv->capsfilter = gst_element_factory_make ("capsfilter", tmp);
g_free (tmp);
if (!self->priv->capsfilter) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not create a capsfilter element for"
" session substream with ssrc: %u and pt:%d", self->ssrc,
self->pt);
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->capsfilter)) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not add the capsfilter element for session"
" substream with ssrc: %u and pt:%d to the conference bin",
self->ssrc, self->pt);
return;
}
if (gst_element_set_state (self->priv->capsfilter, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not set the capsfilter element for session"
" substream with ssrc: %u and pt:%d to the playing state",
self->ssrc, self->pt);
return;
}
tmp = g_strdup_printf ("input_recv_valve_%u_%u_%u", self->priv->session->id,
self->ssrc, self->pt);
self->priv->input_valve = gst_element_factory_make ("valve", tmp);
g_free (tmp);
if (!self->priv->input_valve) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not create a valve element for"
" session substream with ssrc: %u and pt:%d", self->ssrc,
self->pt);
return;
}
if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->input_valve))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not add the valve element for session"
" substream with ssrc: %u and pt:%d to the conference bin",
self->ssrc, self->pt);
return;
}
if (gst_element_set_state (self->priv->input_valve, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE) {
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not set the valve element for session"
" substream with ssrc: %u and pt:%d to the playing state",
self->ssrc, self->pt);
return;
}
if (!gst_element_link (self->priv->input_valve, self->priv->capsfilter))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION, "Could not link the input valve"
" and the capsfilter");
return;
}
valve_sink_pad = gst_element_get_static_pad (self->priv->input_valve,
"sink");
if (!valve_sink_pad)
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not get the valve's sink pad");
return;
}
linkret = gst_pad_link (self->priv->rtpbin_pad, valve_sink_pad);
gst_object_unref (valve_sink_pad);
if (GST_PAD_LINK_FAILED (linkret))
{
self->priv->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link the rtpbin to the codec bin (%d)", linkret);
return;
}
if (self->no_rtcp_timeout > 0)
if (!fs_rtp_sub_stream_start_no_rtcp_timeout_thread (self,
&self->priv->construction_error))
return;
GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}
static void
fs_rtp_sub_stream_dispose (GObject *object)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (object);
fs_rtp_sub_stream_stop_no_rtcp_timeout_thread (self);
if (self->priv->output_ghostpad) {
gst_element_remove_pad (GST_ELEMENT (self->priv->conference),
self->priv->output_ghostpad);
self->priv->output_ghostpad = NULL;
}
if (self->priv->output_valve) {
gst_element_set_locked_state (self->priv->output_valve, TRUE);
gst_element_set_state (self->priv->output_valve, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), self->priv->output_valve);
self->priv->output_valve = NULL;
}
if (self->priv->codecbin) {
gst_element_set_locked_state (self->priv->codecbin, TRUE);
gst_element_set_state (self->priv->codecbin, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), self->priv->codecbin);
self->priv->codecbin = NULL;
}
if (self->priv->capsfilter) {
gst_element_set_locked_state (self->priv->capsfilter, TRUE);
gst_element_set_state (self->priv->capsfilter, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), self->priv->capsfilter);
self->priv->capsfilter = NULL;
}
if (self->priv->input_valve) {
gst_element_set_locked_state (self->priv->input_valve, TRUE);
gst_element_set_state (self->priv->input_valve, GST_STATE_NULL);
gst_bin_remove (GST_BIN (self->priv->conference), self->priv->input_valve);
self->priv->input_valve = NULL;
}
if (self->priv->rtpbin_pad) {
gst_object_unref (self->priv->rtpbin_pad);
self->priv->rtpbin_pad = NULL;
}
G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->dispose (object);
}
static void
fs_rtp_sub_stream_finalize (GObject *object)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (object);
fs_codec_destroy (self->codec);
g_mutex_clear (&self->priv->mutex);
g_rw_lock_clear (&self->priv->stopped_lock);
G_OBJECT_CLASS (fs_rtp_sub_stream_parent_class)->finalize (object);
}
/*
* These properties can only be accessed while holding the session lock
*
*/
static void
fs_rtp_sub_stream_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (object);
switch (prop_id) {
case PROP_CONFERENCE:
self->priv->conference = g_value_get_object (value);
break;
case PROP_SESSION:
self->priv->session = g_value_get_object (value);
break;
case PROP_STREAM:
if (self->priv->stream)
GST_WARNING ("Stream already set, not re-setting");
else
self->priv->stream = g_value_get_object (value);
break;
case PROP_RTPBIN_PAD:
self->priv->rtpbin_pad = GST_PAD (g_value_dup_object (value));
break;
case PROP_SSRC:
self->ssrc = g_value_get_uint (value);
break;
case PROP_PT:
self->pt = g_value_get_uint (value);
break;
case PROP_RECEIVING:
self->priv->receiving = g_value_get_boolean (value);
if (self->priv->input_valve)
g_object_set (G_OBJECT (self->priv->input_valve),
"drop", !self->priv->receiving,
NULL);
break;
case PROP_NO_RTCP_TIMEOUT:
self->no_rtcp_timeout = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/*
* These properties can only be accessed while holding the session lock
*
*/
static void
fs_rtp_sub_stream_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (object);
switch (prop_id) {
case PROP_CONFERENCE:
g_value_set_object (value, self->priv->conference);
break;
case PROP_SESSION:
g_value_set_object (value, self->priv->session);
break;
case PROP_STREAM:
g_value_set_object (value, self->priv->stream);
break;
case PROP_RTPBIN_PAD:
g_value_set_object (value, self->priv->rtpbin_pad);
break;
case PROP_SSRC:
g_value_set_uint (value, self->ssrc);
break;
case PROP_PT:
g_value_set_uint (value, self->pt);
break;
case PROP_CODEC:
g_value_set_boxed (value, self->codec);
break;
case PROP_RECEIVING:
g_value_set_boolean (value, self->priv->receiving);
break;
case PROP_OUTPUT_GHOSTPAD:
g_value_set_object (value, self->priv->output_ghostpad);
break;
case PROP_NO_RTCP_TIMEOUT:
g_value_set_int (value, self->no_rtcp_timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/**
* fs_rtp_sub_stream_set_codecbin:
*
* Add and links the rtpbin for a given substream.
* Removes any codecbin that was previously there.
*
* This function will swallow one ref to the codecbin and the codec.
*
* Returns: TRUE on success
*/
static gboolean
fs_rtp_sub_stream_set_codecbin (FsRtpSubStream *substream,
GstElement *codecbin,
guint builder_hash,
GError **error)
{
gboolean ret = FALSE;
GstPad *pad;
if (substream->priv->codecbin)
{
gst_element_set_locked_state (substream->priv->codecbin, TRUE);
if (gst_element_set_state (substream->priv->codecbin, GST_STATE_NULL) !=
GST_STATE_CHANGE_SUCCESS)
{
gst_element_set_locked_state (substream->priv->codecbin, FALSE);
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL,
"Could not set the codec bin for ssrc %u"
" and payload type %d to the state NULL", substream->ssrc,
substream->pt);
gst_object_unref (codecbin);
return FALSE;
}
gst_bin_remove (GST_BIN (substream->priv->conference),
substream->priv->codecbin);
FS_RTP_SESSION_LOCK (substream->priv->session);
substream->priv->codecbin = NULL;
substream->priv->builder_hash = 0;
FS_RTP_SESSION_UNLOCK (substream->priv->session);
}
if (!gst_bin_add (GST_BIN (substream->priv->conference), codecbin))
{
gst_object_unref (codecbin);
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the codec bin to the conference");
return FALSE;
}
if (gst_element_set_state (codecbin, GST_STATE_PLAYING) ==
GST_STATE_CHANGE_FAILURE)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not set the codec bin to the playing state");
goto error;
}
if (!gst_element_link_pads (codecbin, "src",
substream->priv->output_valve, "sink"))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the codec bin to the output_valve");
goto error;
}
if (!gst_element_link_pads (substream->priv->capsfilter, "src",
codecbin, "sink"))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the receive capsfilter and the codecbin for pt %d",
substream->pt);
goto error;
}
pad = gst_element_get_static_pad (codecbin, "sink");
if (!pad)
{
g_set_error (error, FS_ERROR, FS_ERROR_INTERNAL, "Could not get sink pad"
" from codecbin");
goto error;
}
gst_object_unref (pad);
FS_RTP_SESSION_LOCK (substream->priv->session);
substream->priv->codecbin = codecbin;
substream->priv->builder_hash = builder_hash;
if (substream->priv->stream && !substream->priv->output_ghostpad)
{
if (!fs_rtp_sub_stream_add_output_ghostpad_unlock (substream, error))
goto error;
}
else
{
FS_RTP_SESSION_UNLOCK (substream->priv->session);
g_signal_emit (substream, signals[CODEC_CHANGED], 0);
}
return TRUE;
error:
gst_element_set_locked_state (codecbin, TRUE);
gst_element_set_state (codecbin, GST_STATE_NULL);
gst_bin_remove (GST_BIN (substream->priv->conference), codecbin);
return ret;
}
FsRtpSubStream *
fs_rtp_sub_stream_new (FsRtpConference *conference,
FsRtpSession *session,
GstPad *rtpbin_pad,
guint32 ssrc,
guint pt,
gint no_rtcp_timeout,
GError **error)
{
FsRtpSubStream *substream = g_object_new (FS_TYPE_RTP_SUB_STREAM,
"conference", conference,
"session", session,
"rtpbin-pad", rtpbin_pad,
"ssrc", ssrc,
"pt", pt,
"no-rtcp-timeout", no_rtcp_timeout,
NULL);
if (substream->priv->construction_error) {
g_propagate_error (error, substream->priv->construction_error);
g_object_unref (substream);
return NULL;
}
return substream;
}
/**
* fs_rtp_sub_stream_stop:
*
* Stops all of the elements on a #FsRtpSubstream
*/
void
fs_rtp_sub_stream_stop (FsRtpSubStream *substream)
{
substream->priv->stopped = TRUE;
g_rw_lock_writer_lock (&substream->priv->stopped_lock);
substream->priv->stopped = TRUE;
g_rw_lock_writer_unlock (&substream->priv->stopped_lock);
if (substream->priv->rtpbin_unlinked_sig) {
g_signal_handler_disconnect (substream->priv->rtpbin_pad,
substream->priv->rtpbin_unlinked_sig);
substream->priv->rtpbin_unlinked_sig = 0;
}
FS_RTP_SESSION_LOCK (substream->priv->session);
if (substream->priv->blocking_id != 0)
{
gst_pad_remove_probe (substream->priv->rtpbin_pad,
substream->priv->blocking_id);
substream->priv->blocking_id = 0;
}
FS_RTP_SESSION_UNLOCK (substream->priv->session);
if (substream->priv->check_caps_id != 0)
{
gst_pad_remove_probe (substream->priv->rtpbin_pad,
substream->priv->check_caps_id);
substream->priv->check_caps_id = 0;
}
if (substream->priv->output_ghostpad)
gst_pad_set_active (substream->priv->output_ghostpad, FALSE);
if (substream->priv->output_valve)
{
gst_element_set_locked_state (substream->priv->output_valve, TRUE);
gst_element_set_state (substream->priv->output_valve, GST_STATE_NULL);
}
if (substream->priv->codecbin)
{
gst_element_set_locked_state (substream->priv->codecbin, TRUE);
gst_element_set_state (substream->priv->codecbin, GST_STATE_NULL);
}
if (substream->priv->capsfilter)
{
gst_element_set_locked_state (substream->priv->capsfilter, TRUE);
gst_element_set_state (substream->priv->capsfilter, GST_STATE_NULL);
}
if (substream->priv->input_valve)
{
gst_element_set_locked_state (substream->priv->input_valve, TRUE);
gst_element_set_state (substream->priv->input_valve, GST_STATE_NULL);
}
}
/**
* fs_rtp_sub_stream_add_output_ghostpad_unlock:
*
* Creates and adds an output ghostpad for this substreams
*
* The caller MUST hold the session lock
*
* Returns: TRUE on Success, FALSE on error
*/
gboolean
fs_rtp_sub_stream_add_output_ghostpad_unlock (FsRtpSubStream *substream,
GError **error)
{
GstPad *valve_srcpad;
gchar *padname = NULL;
GstPad *ghostpad = NULL;
FsCodec *codec = NULL;
if (fs_rtp_sub_stream_has_stopped_enter (substream))
{
FS_RTP_SESSION_UNLOCK (substream->priv->session);
return TRUE;
}
if (substream->priv->adding_output_ghostpad)
{
FS_RTP_SESSION_UNLOCK (substream->priv->session);
goto out;
}
g_assert (substream->priv->output_ghostpad == NULL);
substream->priv->adding_output_ghostpad = TRUE;
padname = g_strdup_printf ("src_%u_%u_%u", substream->priv->session->id,
substream->ssrc,
substream->pt);
FS_RTP_SESSION_UNLOCK (substream->priv->session);
valve_srcpad = gst_element_get_static_pad (substream->priv->output_valve,
"src");
g_assert (valve_srcpad);
ghostpad = gst_ghost_pad_new_from_template (padname, valve_srcpad,
gst_element_class_get_pad_template (
GST_ELEMENT_GET_CLASS (substream->priv->conference),
"src_%u_%u_%u"));
gst_object_unref (valve_srcpad);
g_free (padname);
if (!ghostpad)
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not build ghostpad src_%u_%u_%u", substream->priv->session->id,
substream->ssrc, substream->pt);
goto error;
}
if (!gst_pad_set_active (ghostpad, TRUE))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not activate the src_%u_%u_%u", substream->priv->session->id,
substream->ssrc, substream->pt);
gst_object_unref (ghostpad);
goto error;
}
if (!gst_element_add_pad (GST_ELEMENT (substream->priv->conference),
ghostpad))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add ghostpad src_%u_%u_%u to the conference",
substream->priv->session->id, substream->ssrc, substream->pt);
gst_object_unref (ghostpad);
goto error;
}
FS_RTP_SESSION_LOCK (substream->priv->session);
substream->priv->output_ghostpad = ghostpad;
GST_DEBUG ("Src pad added on substream for ssrc:%X pt:%u " FS_CODEC_FORMAT,
substream->ssrc, substream->pt,
FS_CODEC_ARGS (substream->codec));
codec = fs_codec_copy (substream->codec);
FS_RTP_SESSION_UNLOCK (substream->priv->session);
g_signal_emit (substream, signals[SRC_PAD_ADDED], 0,
ghostpad, codec);
g_signal_emit (substream, signals[CODEC_CHANGED], 0);
fs_codec_destroy (codec);
g_object_set (substream->priv->output_valve, "drop", FALSE, NULL);
out:
fs_rtp_sub_stream_has_stopped_exit (substream);
return TRUE;
error:
substream->priv->adding_output_ghostpad = FALSE;
fs_rtp_sub_stream_has_stopped_exit (substream);
return FALSE;
}
static GstPadProbeReturn
_probe_check_caps (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
FsRtpSubStream *self = FS_RTP_SUB_STREAM (user_data);
GstEvent *event;
GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
/* drop buffers before we have the right caps */
if (!(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM))
return GST_PAD_PROBE_DROP;
event = GST_PAD_PROBE_INFO_EVENT (info);
/* let other events like segments through before the caps we like */
if (GST_EVENT_TYPE (event) != GST_EVENT_CAPS)
return GST_PAD_PROBE_PASS;
if (fs_rtp_session_has_disposed_enter (self->priv->session, NULL))
return GST_PAD_PROBE_REMOVE;
if (fs_rtp_sub_stream_has_stopped_enter (self))
{
fs_rtp_session_has_disposed_exit (self->priv->session);
return GST_PAD_PROBE_REMOVE;
}
FS_RTP_SESSION_LOCK (self->priv->session);
if (self->priv->codecbin && self->codec)
{
GstCaps *caps;
gst_event_parse_caps (event, &caps);
if (gst_pad_set_caps (pad, caps))
ret = GST_PAD_PROBE_REMOVE;
}
FS_RTP_SESSION_UNLOCK (self->priv->session);
fs_rtp_sub_stream_has_stopped_exit (self);
fs_rtp_session_has_disposed_exit (self->priv->session);
return ret;
}
static GstPadProbeReturn
_rtpbin_pad_blocked_callback (GstPad *pad, GstPadProbeInfo *info,
gpointer user_data)
{
FsRtpSubStream *substream = user_data;
GError *error = NULL;
GstElement *codecbin = NULL;
guint new_builder_hash = 0;
FsCodec *codec = NULL;
FsRtpSession *session;
GstCaps *caps = NULL;
if (GST_PAD_PROBE_INFO_TYPE (info) == GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM &&
!GST_EVENT_IS_SERIALIZED (GST_PAD_PROBE_INFO_EVENT (info)))
return GST_PAD_PROBE_PASS;
FS_RTP_SESSION_LOCK (substream->priv->session);
substream->priv->blocking_id = 0;
FS_RTP_SESSION_UNLOCK (substream->priv->session);
if (fs_rtp_session_has_disposed_enter (substream->priv->session, NULL))
{
return GST_PAD_PROBE_REMOVE;
}
if (fs_rtp_sub_stream_has_stopped_enter (substream))
{
fs_rtp_session_has_disposed_exit (substream->priv->session);
return GST_PAD_PROBE_REMOVE;
}
g_object_ref (substream);
session = g_object_ref (substream->priv->session);
GST_DEBUG ("Substream blocked for codec change (session:%d SSRC:%x pt:%d)",
substream->priv->session->id, substream->ssrc, substream->pt);
g_signal_emit (substream, signals[GET_CODEC_BIN], 0,
substream->priv->stream, &codec,
substream->priv->builder_hash, &new_builder_hash, &error, &codecbin);
if (error)
goto error;
FS_RTP_SESSION_LOCK (substream->priv->session);
if (codec &&
(!substream->codec || !fs_codec_are_equal (codec, substream->codec)))
{
gchar *tmp;
if (substream->codec)
fs_codec_destroy (substream->codec);
substream->codec = codec;
caps = fs_codec_to_gst_caps (codec);
tmp = gst_caps_to_string (caps);
GST_DEBUG ("Setting caps %s on recv substream", tmp);
g_free (tmp);
g_object_set (substream->priv->capsfilter, "caps", caps, NULL);
}
else if (codec)
{
fs_codec_destroy (codec);
}
FS_RTP_SESSION_UNLOCK (substream->priv->session);
if (codecbin)
{
if (!fs_rtp_sub_stream_set_codecbin (substream, codecbin, new_builder_hash,
&error))
goto error;
}
if (caps)
{
if (!gst_pad_set_caps (substream->priv->rtpbin_pad, caps))
{
if (substream->priv->check_caps_id == 0)
substream->priv->check_caps_id =
gst_pad_add_probe (substream->priv->rtpbin_pad,
GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
_probe_check_caps, g_object_ref (substream), g_object_unref);
}
gst_caps_unref (caps);
}
out:
g_clear_error (&error);
fs_rtp_sub_stream_has_stopped_exit (substream);
fs_rtp_session_has_disposed_exit (substream->priv->session);
g_object_unref (substream);
g_object_unref (session);
return GST_PAD_PROBE_REMOVE;
error:
g_prefix_error (&error, "Could not add the new recv codec bin for"
" ssrc %u and payload type %d to the state NULL: ", substream->ssrc,
substream->pt);
if (substream->priv->stream)
fs_stream_emit_error (FS_STREAM (substream->priv->stream),
FS_ERROR_CONSTRUCTION, error->message);
else
fs_session_emit_error (FS_SESSION (substream->priv->session),
FS_ERROR_CONSTRUCTION, error->message);
if (caps)
gst_caps_unref (caps);
goto out;
}
/**
* fs_rtp_sub_stream_verify_codec_locked:
* @substream: A #FsRtpSubStream
*
* This function will start the process that invalidates the codec
* for this rtpbin.
*
* You must hold the session lock to call it.
*/
void
fs_rtp_sub_stream_verify_codec_locked (FsRtpSubStream *substream)
{
if (fs_rtp_sub_stream_has_stopped_enter (substream))
return;
GST_LOG ("Starting codec verification process for substream with"
" SSRC:%x pt:%d", substream->ssrc, substream->pt);
if (!substream->priv->blocking_id)
substream->priv->blocking_id = gst_pad_add_probe (
substream->priv->rtpbin_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
_rtpbin_pad_blocked_callback, g_object_ref (substream), g_object_unref);
fs_rtp_sub_stream_has_stopped_exit (substream);
}
static void
fs_rtp_sub_stream_emit_error (FsRtpSubStream *substream,
gint error_no,
gchar *error_msg,
gchar *debug_msg)
{
g_signal_emit (substream, signals[ERROR_SIGNAL], 0, error_no, error_msg,
debug_msg);
}