Blob Blame History Raw
/*
 * Farstream - Farstream Raw Session
 *
 * Copyright 2008 Richard Spiers <richard.spiers@gmail.com>
 * Copyright 2007 Nokia Corp.
 * Copyright 2007-2010 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 *  @author: Youness Alaoui <youness.alaoui@collabora.co.uk>
 *  @author: Mike Ruprecht <mike.ruprecht@collabora.co.uk>
 *
 * fs-raw-session.c - A Farstream Raw Session gobject
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */

/**
 * SECTION:fs-raw-session
 * @short_description: A  Raw session in a #FsRawConference
 *
 * The transmitter parameters to the fs_session_new_stream() function are
 * used to set the initial value of the construct properties of the stream
 * object.
 *
 * The codecs preferences can not be modified. The codec should have the
 * encoding_name property set to the value returned by gst_caps_to_string.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "fs-raw-session.h"

#include <string.h>

#include <gst/gst.h>
#include <farstream/fs-transmitter.h>

#include "fs-raw-stream.h"
#include "fs-raw-participant.h"

#define GST_CAT_DEFAULT fsrawconference_debug

/* Signals */
enum
{
  LAST_SIGNAL
};

/* props */
enum
{
  PROP_0,
  PROP_MEDIA_TYPE,
  PROP_ID,
  PROP_SINK_PAD,
  PROP_CODEC_PREFERENCES,
  PROP_CODECS,
  PROP_CODECS_WITHOUT_CONFIG,
  PROP_CURRENT_SEND_CODEC,
  PROP_CONFERENCE,
  PROP_TOS
};



struct _FsRawSessionPrivate
{
  FsMediaType media_type;

  FsRawConference *conference;
  FsRawStream *stream;

  GError *construction_error;

  GstPad *media_sink_pad;
  GstElement *send_capsfilter;
  GList *codecs;
  FsCodec *send_codec;
  gboolean transmitter_sink_added;

  GstElement *send_tee;
  GstPad *send_tee_pad;

  GstElement *transform_bin;
  GstElement *fakesink;

  GstElement *send_valve;

  GstElement *recv_capsfilter;
  GstElement *recv_valve;
  gulong transmitter_recv_probe_id;
  GstPad *transmitter_src_pad;
  GstPad *src_ghost_pad;

  FsTransmitter *transmitter;

  guint tos; /* Protected by conf lock */

  GMutex mutex; /* protects the conference */

#ifdef DEBUG_MUTEXES
  guint count;
#endif
};

G_DEFINE_TYPE (FsRawSession, fs_raw_session, FS_TYPE_SESSION);

#define FS_RAW_SESSION_GET_PRIVATE(o)                                   \
  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RAW_SESSION, FsRawSessionPrivate))

#ifdef DEBUG_MUTEXES

#define FS_RAW_SESSION_LOCK(session)                            \
  do {                                                          \
    g_mutex_lock (&FS_RAW_SESSION (session)->priv->mutex);       \
    g_assert (FS_RAW_SESSION (session)->priv->count == 0);      \
    FS_RAW_SESSION (session)->priv->count++;                    \
  } while (0);
#define FS_RAW_SESSION_UNLOCK(session)                          \
  do {                                                          \
    g_assert (FS_RAW_SESSION (session)->priv->count == 1);      \
    FS_RAW_SESSION (session)->priv->count--;                    \
    g_mutex_unlock (&FS_RAW_SESSION (session)->priv->mutex);     \
  } while (0);
#define FS_RAW_SESSION_GET_LOCK(session)        \
  (FS_RAW_SESSION (session)->priv->mutex)
#else
#define FS_RAW_SESSION_LOCK(session)            \
  g_mutex_lock (&(session)->priv->mutex)
#define FS_RAW_SESSION_UNLOCK(session)          \
  g_mutex_unlock (&(session)->priv->mutex)
#define FS_RAW_SESSION_GET_LOCK(session)        \
  ((session)->priv->mutex)
#endif

static void fs_raw_session_dispose (GObject *object);
static void fs_raw_session_finalize (GObject *object);

static void fs_raw_session_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec);
static void fs_raw_session_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec);

static void fs_raw_session_constructed (GObject *object);

static FsStream *fs_raw_session_new_stream (FsSession *session,
    FsParticipant *participant,
    FsStreamDirection direction,
    GError **error);

static gchar **fs_raw_session_list_transmitters (FsSession *session);

static GType fs_raw_session_get_stream_transmitter_type (FsSession *session,
    const gchar *transmitter);


static GstElement *_create_transform_bin (FsRawSession *self, GError **error);

static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
    const gchar *transmitter_name,
    FsParticipant *participant,
    GParameter *parameters,
    guint n_parameters,
    GError **error,
    gpointer user_data);


static void
fs_raw_session_class_init (FsRawSessionClass *klass)
{
  GObjectClass *gobject_class;
  FsSessionClass *session_class;

  gobject_class = (GObjectClass *) klass;
  session_class = FS_SESSION_CLASS (klass);

  gobject_class->set_property = fs_raw_session_set_property;
  gobject_class->get_property = fs_raw_session_get_property;
  gobject_class->constructed = fs_raw_session_constructed;

  session_class->new_stream = fs_raw_session_new_stream;
  session_class->list_transmitters = fs_raw_session_list_transmitters;
  session_class->get_stream_transmitter_type =
      fs_raw_session_get_stream_transmitter_type;

  g_object_class_override_property (gobject_class,
      PROP_MEDIA_TYPE, "media-type");
  g_object_class_override_property (gobject_class,
      PROP_ID, "id");
  g_object_class_override_property (gobject_class,
      PROP_SINK_PAD, "sink-pad");

  g_object_class_override_property (gobject_class,
      PROP_CODEC_PREFERENCES, "codec-preferences");
  g_object_class_override_property (gobject_class,
      PROP_CODECS, "codecs");
  g_object_class_override_property (gobject_class,
      PROP_CODECS_WITHOUT_CONFIG, "codecs-without-config");
  g_object_class_override_property (gobject_class,
      PROP_CURRENT_SEND_CODEC, "current-send-codec");
  g_object_class_override_property (gobject_class,
      PROP_TOS, "tos");
  g_object_class_override_property (gobject_class,
      PROP_CONFERENCE, "conference");

  gobject_class->dispose = fs_raw_session_dispose;
  gobject_class->finalize = fs_raw_session_finalize;

  g_type_class_add_private (klass, sizeof (FsRawSessionPrivate));
}

static void
fs_raw_session_init (FsRawSession *self)
{
  /* member init */
  self->priv = FS_RAW_SESSION_GET_PRIVATE (self);
  self->priv->construction_error = NULL;

  g_mutex_init (&self->priv->mutex);

  self->priv->media_type = FS_MEDIA_TYPE_LAST + 1;
}


static FsRawConference *
fs_raw_session_get_conference (FsRawSession *self, GError **error)
{
  FsRawConference *conference;

  FS_RAW_SESSION_LOCK (self);
  conference = self->priv->conference;
  if (conference)
    g_object_ref (conference);
  FS_RAW_SESSION_UNLOCK (self);

  if (!conference)
    g_set_error (error, FS_ERROR, FS_ERROR_DISPOSED,
        "Called function after session has been disposed");

  return conference;
}

static void
fs_raw_session_dispose (GObject *object)
{
  FsRawSession *self = FS_RAW_SESSION (object);
  GstBin *conferencebin = NULL;
  FsRawConference *conference = NULL;
  GstElement *send_valve = NULL;
  GstElement *send_capsfilter = NULL;
  GstElement *transform = NULL;
  GstElement *send_tee = NULL;
  GstElement *fakesink = NULL;
  GstPad *send_tee_pad = NULL;
  FsTransmitter *transmitter = NULL;
  GstPad *media_sink_pad = NULL;

  FS_RAW_SESSION_LOCK (self);
  conference = self->priv->conference;
  self->priv->conference = NULL;
  FS_RAW_SESSION_UNLOCK (self);

  if (!conference)
    goto out;

  conferencebin = GST_BIN (conference);

  if (!conferencebin)
    goto out;

  GST_OBJECT_LOCK (conference);
  send_valve = self->priv->send_valve;
  self->priv->send_valve = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (send_valve)
  {
    gst_element_set_locked_state (send_valve, TRUE);
    gst_bin_remove (conferencebin, send_valve);
    gst_element_set_state (send_valve, GST_STATE_NULL);
    gst_object_unref (send_valve);
  }

  GST_OBJECT_LOCK (conference);
  send_capsfilter = self->priv->send_capsfilter;
  self->priv->send_capsfilter = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (send_capsfilter)
  {
    gst_element_set_locked_state (send_capsfilter, TRUE);
    gst_bin_remove (conferencebin, send_capsfilter);
    gst_element_set_state (send_capsfilter, GST_STATE_NULL);
    gst_object_unref (send_capsfilter);
  }

  if (self->priv->stream)
  {
    FsStream *stream = FS_STREAM (self->priv->stream);
    fs_raw_session_remove_stream(self, stream);
    fs_stream_destroy (stream);
  }

  GST_OBJECT_LOCK (conference);
  transmitter = self->priv->transmitter;
  self->priv->transmitter = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (transmitter)
  {
    g_object_unref (transmitter);
  }

  GST_OBJECT_LOCK (conference);
  media_sink_pad = self->priv->media_sink_pad;
  self->priv->media_sink_pad = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (media_sink_pad)
  {
    gst_element_remove_pad (GST_ELEMENT (conference), media_sink_pad);
    gst_pad_set_active (media_sink_pad, FALSE);
    gst_object_unref (media_sink_pad);
  }

  GST_OBJECT_LOCK (conference);
  transform = self->priv->transform_bin;
  self->priv->transform_bin = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (transform)
  {
    gst_element_set_locked_state (transform, TRUE);
    gst_bin_remove (conferencebin, transform);
    gst_element_set_state (transform, GST_STATE_NULL);
    gst_object_unref (transform);
  }

  GST_OBJECT_LOCK (conference);
  fakesink = self->priv->fakesink;
  self->priv->fakesink = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (fakesink)
  {
    gst_element_set_locked_state (fakesink, TRUE);
    gst_bin_remove (conferencebin, fakesink);
    gst_element_set_state (fakesink, GST_STATE_NULL);
    gst_object_unref (fakesink);
  }

  GST_OBJECT_LOCK (conference);
  send_tee = self->priv->send_tee;
  self->priv->send_tee = NULL;

  send_tee_pad = self->priv->send_tee_pad;
  self->priv->send_tee_pad = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (send_tee)
  {
    gst_element_set_locked_state (send_tee, TRUE);
    gst_bin_remove (conferencebin, send_tee);
    gst_element_set_state (send_tee, GST_STATE_NULL);
    gst_object_unref (send_tee);
  }

  if (send_tee_pad != NULL)
    gst_object_unref (send_tee_pad);


  gst_object_unref (conference);

out:

  G_OBJECT_CLASS (fs_raw_session_parent_class)->dispose (object);
}

static void
fs_raw_session_finalize (GObject *object)
{
  FsRawSession *self = FS_RAW_SESSION (object);

  if (self->priv->codecs)
    fs_codec_list_destroy (self->priv->codecs);

  if (self->priv->send_codec)
    fs_codec_destroy (self->priv->send_codec);

  g_mutex_clear (&self->priv->mutex);

  G_OBJECT_CLASS (fs_raw_session_parent_class)->finalize (object);
}

static void
fs_raw_session_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec)
{
  FsRawSession *self = FS_RAW_SESSION (object);
  FsRawConference *conference = fs_raw_session_get_conference (self, NULL);

  if (!conference)
    return;

  GST_OBJECT_LOCK (conference);

  switch (prop_id)
  {
    case PROP_MEDIA_TYPE:
      g_value_set_enum (value, self->priv->media_type);
      break;
    case PROP_ID:
      g_value_set_uint (value, self->id);
      break;
    case PROP_CONFERENCE:
      g_value_set_object (value, self->priv->conference);
      break;
    case PROP_SINK_PAD:
      g_value_set_object (value, self->priv->media_sink_pad);
      break;
    case PROP_CODEC_PREFERENCES:
      /* There are no preferences, so return NULL */
      break;
    case PROP_CODECS:
    case PROP_CODECS_WITHOUT_CONFIG:
      g_value_set_boxed (value, self->priv->codecs);
      break;
    case PROP_CURRENT_SEND_CODEC:
      g_value_set_boxed (value, self->priv->send_codec);
      break;
    case PROP_TOS:
      g_value_set_uint (value, self->priv->tos);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_OBJECT_UNLOCK (conference);
  gst_object_unref (conference);
}

static void
fs_raw_session_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec)
{
  FsRawSession *self = FS_RAW_SESSION (object);
  FsRawConference *conference = fs_raw_session_get_conference (self, NULL);

  if (!conference && !(pspec->flags & G_PARAM_CONSTRUCT_ONLY))
    return;

  if (conference)
    GST_OBJECT_LOCK (conference);

  switch (prop_id)
  {
    case PROP_MEDIA_TYPE:
      self->priv->media_type = g_value_get_enum (value);
      break;
    case PROP_ID:
      self->id = g_value_get_uint (value);
      break;
    case PROP_CONFERENCE:
      self->priv->conference = FS_RAW_CONFERENCE (g_value_dup_object (value));
      break;
    case PROP_TOS:
      self->priv->tos = g_value_get_uint (value);
      if (self->priv->transmitter)
        g_object_set (self->priv->transmitter, "tos", self->priv->tos, NULL);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  if (conference)
  {
    GST_OBJECT_UNLOCK (conference);
    gst_object_unref (conference);
  }
}

static void
fs_raw_session_constructed (GObject *object)
{
  FsRawSession *self = FS_RAW_SESSION (object);
  GstPad *pad;
  gchar *tmp;

  if (self->id == 0)
  {
    g_error ("You can not instantiate this element directly, you MUST"
        " call fs_raw_session_new ()");
    return;
  }

  g_assert (self->priv->conference);

  tmp = g_strdup_printf ("send_capsfilter_%u", self->id);
  self->priv->send_capsfilter = gst_element_factory_make ("capsfilter", tmp);
  g_free (tmp);

  if (!self->priv->send_capsfilter)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make send capsfilter");
    return;
  }

  gst_object_ref_sink (self->priv->send_capsfilter);

  if (!gst_bin_add (GST_BIN (self->priv->conference),
          self->priv->send_capsfilter))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add capsfilter to conference");
    gst_object_unref (self->priv->send_capsfilter);
    self->priv->send_capsfilter = NULL;
    return;
  }

  if (!gst_element_sync_state_with_parent (self->priv->send_capsfilter))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not sync the send capsfilter's state with its parent");
    return;
  }

  self->priv->transform_bin = _create_transform_bin (self, NULL);

  if (self->priv->transform_bin == NULL)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not create transform bin");
    return;
  }

  gst_object_ref_sink (self->priv->transform_bin);
  if (!gst_bin_add (GST_BIN (self->priv->conference),
      self->priv->transform_bin))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add transform bin to conference");
    gst_object_unref (self->priv->transform_bin);
    self->priv->transform_bin = NULL;
    return;
  }

  if (!gst_element_link_pads (self->priv->transform_bin, "src",
          self->priv->send_capsfilter, "sink"))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not link send transformbin and capsfilter");
    return;
  }

  if (!gst_element_sync_state_with_parent (self->priv->transform_bin))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not sync the send transformbin's state with its parent");
    return;
  }

  tmp = g_strdup_printf ("send_tee_%u", self->id);
  self->priv->send_tee = gst_element_factory_make ("tee", tmp);
  g_free (tmp);

  if (!self->priv->send_tee)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not make send tee");
    return;
  }

  gst_object_ref_sink (self->priv->send_tee);

  if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->send_tee))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add send tee to conference");
    gst_object_unref (self->priv->send_tee);
    self->priv->send_tee = NULL;
    return;
  }

  self->priv->send_tee_pad = gst_element_get_request_pad (self->priv->send_tee,
    "src_%u");

  if (self->priv->send_tee_pad == NULL)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not create send tee pad");
    return;
  }

  pad = gst_element_get_static_pad (self->priv->transform_bin, "sink");
  if (pad == NULL)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not get transformbin sink pad");
    return;
  }

  if (GST_PAD_LINK_FAILED (gst_pad_link (self->priv->send_tee_pad, pad)))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not link send tee and transformbin");
    gst_object_unref (pad);
    return;
  }

  gst_object_unref (pad);

  self->priv->fakesink = gst_element_factory_make ("fakesink", NULL);
  if (!self->priv->fakesink)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not make fakesink");
    return;
  }

  g_object_set (self->priv->fakesink,
      "sync", FALSE,
      "async", FALSE,
      NULL);

  gst_object_ref_sink (self->priv->fakesink);
  if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->fakesink))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add fakesink to conference");
    gst_object_unref (self->priv->fakesink);
    self->priv->fakesink = NULL;
    return;
  }

  if (!gst_element_link_pads (self->priv->send_tee, "src_%u",
          self->priv->fakesink, "sink"))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not link send send tee and fakesink");
    return;
  }

  if (!gst_element_sync_state_with_parent (self->priv->fakesink))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not sync the fakesinks state with its parent");
    return;
  }

  if (!gst_element_sync_state_with_parent (self->priv->send_tee))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not sync the send valve's state with its parent");
    gst_bin_remove (GST_BIN (self->priv->conference), self->priv->send_tee);
    return;
  }

  tmp = g_strdup_printf ("send_valve_%u", self->id);
  self->priv->send_valve = gst_element_factory_make ("valve", tmp);
  g_free (tmp);

  if (!self->priv->send_valve)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not make send valve");
    return;
  }

  gst_object_ref_sink (self->priv->send_valve);

  if (!gst_bin_add (GST_BIN (self->priv->conference), self->priv->send_valve))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add valve to conference");
    gst_object_unref (self->priv->send_valve);
    self->priv->send_valve = NULL;
    return;
  }

  g_object_set (G_OBJECT (self->priv->send_valve), "drop", TRUE, NULL);

  if (!gst_element_sync_state_with_parent (self->priv->send_valve))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not sync the send valve's state with its parent");
    return;
  }

  if (!gst_element_link_pads (self->priv->send_valve, "src",
          self->priv->send_tee, "sink"))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not link send valve and transformbin");
    return;
  }

  pad = gst_element_get_static_pad (self->priv->send_valve, "sink");
  tmp = g_strdup_printf ("sink_%u", self->id);
  self->priv->media_sink_pad = gst_ghost_pad_new (tmp, pad);
  g_free (tmp);
  gst_object_unref (pad);

  if (!self->priv->media_sink_pad)
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not create sink ghost pad");
    return;
  }

  gst_object_ref_sink (self->priv->media_sink_pad);

  gst_pad_set_active (self->priv->media_sink_pad, TRUE);
  if (!gst_element_add_pad (GST_ELEMENT (self->priv->conference),
          self->priv->media_sink_pad))
  {
    self->priv->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION, "Could not add sink pad to conference");
    gst_object_unref (self->priv->media_sink_pad);
    self->priv->media_sink_pad = NULL;
    return;
  }

  if (G_OBJECT_CLASS (fs_raw_session_parent_class)->constructed)
    G_OBJECT_CLASS (fs_raw_session_parent_class)->constructed (object);
}

static GstElement *
_create_transform_bin (FsRawSession *self, GError **error)
{
  FsMediaType mtype = self->priv->media_type;

  if (mtype == FS_MEDIA_TYPE_AUDIO)
    return gst_parse_bin_from_description_full (
      "audioconvert ! audioresample ! audioconvert", TRUE, NULL,
      GST_PARSE_FLAG_NONE, error);
  else if (mtype == FS_MEDIA_TYPE_VIDEO)
    return gst_parse_bin_from_description_full ("videoconvert ! videoscale",
        TRUE, NULL, GST_PARSE_FLAG_NONE, error);
  else if (mtype == FS_MEDIA_TYPE_APPLICATION)
    return gst_element_factory_make ("identify", NULL);

  g_set_error (error, FS_ERROR, FS_ERROR_NOT_IMPLEMENTED,
    "No transform bin for this media type");
  return NULL;
}

static void
_stream_remote_codecs_changed (FsRawStream *stream, GParamSpec *pspec,
    FsRawSession *self)
{
  GList *codecs;
  GstCaps *caps;
  FsCodec *codec = NULL;
  GstPad *transform_pad;
  GError *error = NULL;
  GstElement *transform = NULL;
  FsRawConference *conference = fs_raw_session_get_conference (self, &error);
  gboolean changed;
  FsStreamDirection direction;

  if (conference == NULL)
    goto error;

  g_object_get (stream, "remote-codecs", &codecs, "direction", &direction,
      NULL);

  if (!codecs)
    return;

  if (g_list_length (codecs) == 2)
    codec = codecs->next->data;
  else
    codec = codecs->data;

  GST_OBJECT_LOCK (conference);
  transform = self->priv->transform_bin;
  self->priv->transform_bin = NULL;
  GST_OBJECT_UNLOCK (conference);

  /* Replace the transform bin */
  if (transform != NULL)
  {
    gst_element_set_locked_state (transform, TRUE);
    gst_element_set_state (transform, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (conference), transform);
    g_object_unref (transform);
  }

  transform = _create_transform_bin (self, &error);

  if (transform == NULL)
    goto error;
  gst_object_ref_sink (transform);

  if (!gst_bin_add (GST_BIN (conference), transform))
    goto error;

  caps = fs_raw_codec_to_gst_caps (codec);
  if (self->priv->send_capsfilter)
    g_object_set (self->priv->send_capsfilter, "caps", caps, NULL);
  gst_caps_unref (caps);

  if (!gst_element_link_pads (transform, "src",
          self->priv->send_capsfilter, "sink"))
    goto error;

  if (!gst_element_sync_state_with_parent (transform))
    goto error;

  transform_pad = gst_element_get_static_pad (transform, "sink");
  if (transform_pad == NULL)
    goto error;

  if (GST_PAD_LINK_FAILED (gst_pad_link (self->priv->send_tee_pad,
      transform_pad)))
    goto error;

  GST_OBJECT_LOCK (conference);
  self->priv->transform_bin = transform;
  transform = NULL;

  if (self->priv->codecs)
    fs_codec_list_destroy (self->priv->codecs);
  self->priv->codecs = codecs;

  if ((changed = !fs_codec_are_equal (self->priv->send_codec, codec)))
  {
    if (self->priv->send_codec)
      fs_codec_destroy (self->priv->send_codec);

    self->priv->send_codec = fs_codec_copy (codec);
  }

  codec = codecs->data;

  if (self->priv->recv_capsfilter)
  {
    GstElement *capsfilter = gst_object_ref (self->priv->recv_capsfilter);
    GstCaps *recv_caps;

    recv_caps = fs_raw_codec_to_gst_caps (codec);
    GST_OBJECT_UNLOCK (conference);
    g_object_set (capsfilter, "caps", recv_caps, NULL);
    gst_object_unref (capsfilter);
    GST_OBJECT_LOCK (conference);
    gst_caps_unref (recv_caps);
  }

  GST_OBJECT_UNLOCK (conference);

  fs_raw_session_update_direction (self, direction);

  if (changed)
  {
    g_object_notify (G_OBJECT (self), "current-send-codec");
    gst_element_post_message (GST_ELEMENT (self->priv->conference),
        gst_message_new_element (GST_OBJECT (self->priv->conference),
            gst_structure_new ("farstream-send-codec-changed",
                "session", FS_TYPE_SESSION, self,
                "codec", FS_TYPE_CODEC, codec,
                "secondary-codecs", FS_TYPE_CODEC_LIST, NULL,
                NULL)));
  }

  g_object_notify (G_OBJECT (self), "codecs");

  gst_object_unref (conference);
  return;

error:
  if (error != NULL)
    fs_session_emit_error (FS_SESSION (self), error->code, error->message);
  else
    fs_session_emit_error (FS_SESSION (self), FS_ERROR_INTERNAL,
        "Unable to change transform bin");

  if (conference != NULL)
    gst_object_unref (conference);

  if (transform != NULL)
    gst_object_unref (transform);
}

void
fs_raw_session_remove_stream (FsRawSession *self,
    FsStream *stream)
{
  FsRawConference *conference = fs_raw_session_get_conference (self, NULL);
  FsTransmitter *transmitter = NULL;
  GstElement *src = NULL;
  GstElement *sink = NULL;

  if (!conference)
    return;

  g_object_set (G_OBJECT (self->priv->send_valve), "drop", TRUE, NULL);

  GST_OBJECT_LOCK (conference);
  if (self->priv->stream == (FsRawStream *) stream)
  {
    self->priv->stream = NULL;
  }
  transmitter = self->priv->transmitter;
  self->priv->transmitter = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (!transmitter)
    return;
  g_object_get (transmitter,
      "gst-src", &src,
      "gst-sink", &sink,
      NULL);


  if (self->priv->transmitter_recv_probe_id)
  {
    if (self->priv->transmitter_src_pad)
      gst_pad_remove_probe (self->priv->transmitter_src_pad,
          self->priv->transmitter_recv_probe_id);
    self->priv->transmitter_recv_probe_id = 0;
  }


  gst_element_set_locked_state (src, TRUE);
  gst_element_set_state (src, GST_STATE_NULL);
  gst_bin_remove (GST_BIN (conference), src);

  if (gst_object_has_ancestor (GST_OBJECT (sink), GST_OBJECT (conference)))
  {
    gst_element_set_locked_state (sink, TRUE);
    gst_element_set_state (sink, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (conference), sink);
  }

  if (self->priv->transmitter_src_pad)
  {
    gst_object_unref (self->priv->transmitter_src_pad);
    self->priv->transmitter_src_pad = NULL;
  }

  if (self->priv->recv_valve)
  {
    gst_element_set_locked_state (self->priv->recv_valve, TRUE);
    gst_bin_remove (GST_BIN (conference), self->priv->recv_valve);
    gst_element_set_state (self->priv->recv_valve, GST_STATE_NULL);
    gst_object_unref (self->priv->recv_valve);
    self->priv->recv_valve = NULL;
  }

  if (self->priv->recv_capsfilter)
  {
    gst_element_set_locked_state (self->priv->recv_capsfilter, TRUE);
    gst_bin_remove (GST_BIN (conference), self->priv->recv_capsfilter);
    gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_NULL);
    gst_object_unref (self->priv->recv_capsfilter);
    self->priv->recv_capsfilter = NULL;
  }

  if (self->priv->src_ghost_pad)
  {
    gst_element_remove_pad (GST_ELEMENT (conference),
        self->priv->src_ghost_pad);
    gst_pad_set_active (self->priv->src_ghost_pad, FALSE);
    gst_object_unref (self->priv->src_ghost_pad);
    self->priv->src_ghost_pad = NULL;
  }


  gst_object_unref (src);
  gst_object_unref (sink);
  g_object_unref (transmitter);
  gst_object_unref (conference);
}

static gboolean
_add_transmitter_sink (FsRawSession *self,
    GstElement *transmitter_sink,
    GError **error)
{
  if (!gst_bin_add (GST_BIN (self->priv->conference), transmitter_sink))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add the transmitter's sink element"
        " for session %d to the conference bin", self->id);
    goto error;
  }

  if (!gst_element_sync_state_with_parent (transmitter_sink))
  {
    gst_bin_remove (GST_BIN (self->priv->conference), transmitter_sink);
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the transmitter's sink element"
        " with its parent for session %d", self->id);
    goto error;
  }

  if (!gst_element_link (self->priv->send_capsfilter, transmitter_sink))
  {
    gst_bin_remove (GST_BIN (self->priv->conference), transmitter_sink);
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the capsfilter and transmitter's"
        " sink element for session %d", self->id);
    goto error;
  }

  return TRUE;

error:
  return FALSE;
}

void
fs_raw_session_update_direction (FsRawSession *self,
  FsStreamDirection direction)
{
  GError *error = NULL;
  FsRawConference *conference;

  conference = fs_raw_session_get_conference (self, &error);

  if (!conference)
  {
    fs_session_emit_error (FS_SESSION (self), error->code, error->message);
    g_clear_error (&error);
    return;
  }

  GST_OBJECT_LOCK (conference);

  /* Don't start sending before we have codecs */
  if (!self->priv->codecs)
  {
    GST_OBJECT_UNLOCK (conference);
    goto out;
  }

  if (self->priv->transmitter &&
      !self->priv->transmitter_sink_added &&
      direction & FS_DIRECTION_SEND)  {
    GstElement *transmitter_sink;

    GST_OBJECT_UNLOCK (conference);

    g_object_get (self->priv->transmitter, "gst-sink", &transmitter_sink, NULL);
    if (!transmitter_sink)
    {
      fs_session_emit_error (FS_SESSION (self), FS_ERROR_CONSTRUCTION,
          "Unable to get the sink element from the FsTransmitter");
      goto out;
    }

    if (!_add_transmitter_sink (self, transmitter_sink, &error))
    {
      gst_object_unref (transmitter_sink);
      fs_session_emit_error (FS_SESSION (self), error->code, error->message);
      g_clear_error (&error);
      goto out;
    }

    gst_object_unref (transmitter_sink);

    GST_OBJECT_LOCK (conference);
    self->priv->transmitter_sink_added = TRUE;
  }

  if (self->priv->recv_valve)
  {
    GstElement *valve = g_object_ref (self->priv->recv_valve);

    GST_OBJECT_UNLOCK (conference);
    g_object_set (valve,
        "drop", ! (direction & FS_DIRECTION_RECV), NULL);
    g_object_unref (valve);
    GST_OBJECT_LOCK (conference);
  }

  GST_OBJECT_UNLOCK (conference);

  if (direction & FS_DIRECTION_SEND)
    g_object_set (self->priv->send_valve, "drop", FALSE, NULL);
  else
    g_object_set (self->priv->send_valve, "drop", TRUE, NULL);

out:
  gst_object_unref (conference);
}

/**
 * fs_raw_session_new_stream:
 * @session: an #FsRawSession
 * @participant: #FsParticipant of a participant for the new stream
 * @direction: #FsStreamDirection describing the direction of the new stream
 * that will be created for this participant
 * @error: location of a #GError, or NULL if no error occured
 *
 * This function creates a stream for the given participant into the active
 * session.
 *
 * Returns: the new #FsStream that has been created. User must unref the
 * #FsStream when the stream is ended. If an error occured, returns NULL.
 */
static FsStream *
fs_raw_session_new_stream (FsSession *session,
    FsParticipant *participant,
    FsStreamDirection direction,
    GError **error)
{
  FsRawSession *self = FS_RAW_SESSION (session);
  FsStream *new_stream = NULL;
  FsRawConference *conference;

  if (!FS_IS_RAW_PARTICIPANT (participant))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "You have to provide a participant of type RAW");
    return NULL;
  }

  conference = fs_raw_session_get_conference (self, error);
  if (!conference)
    return NULL;

  GST_OBJECT_LOCK (conference);
  if (self->priv->stream)
    goto already_have_stream;
  GST_OBJECT_UNLOCK (conference);

  new_stream = FS_STREAM_CAST (fs_raw_stream_new (self,
          FS_RAW_PARTICIPANT (participant),
          direction, conference,
          _stream_get_stream_transmitter, self));

  GST_OBJECT_LOCK (conference);
  if (self->priv->stream)
    goto already_have_stream;


  self->priv->stream = (FsRawStream *) new_stream;

  GST_OBJECT_UNLOCK (conference);

  g_signal_connect_object (new_stream, "notify::remote-codecs",
      G_CALLBACK (_stream_remote_codecs_changed), self, 0);


done:
  gst_object_unref (conference);
  return new_stream;

already_have_stream:
  GST_OBJECT_UNLOCK (conference);
  g_set_error (error, FS_ERROR, FS_ERROR_ALREADY_EXISTS,
      "There already is a stream in this session");
  goto done;
}

FsRawSession *
fs_raw_session_new (FsMediaType media_type,
    FsRawConference *conference,
    guint id,
    GError **error)
{
  FsRawSession *session = g_object_new (FS_TYPE_RAW_SESSION,
      "media-type", media_type,
      "conference", conference,
      "id", id,
      NULL);

  if (!session)
  {
    *error = g_error_new (FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not create object");
  }
  else if (session->priv->construction_error)
  {
    g_propagate_error (error, session->priv->construction_error);
    g_object_unref (session);
    return NULL;
  }

  return session;
}

static gchar **
fs_raw_session_list_transmitters (FsSession *session)
{
  return fs_transmitter_list_available ();
}

static GType
fs_raw_session_get_stream_transmitter_type (FsSession *session,
    const gchar *transmitter)
{
  FsTransmitter *fstransmitter;
  GType transmitter_type;

  fstransmitter = fs_transmitter_new (transmitter, 1, 0, NULL);

  if (!fstransmitter)
    return G_TYPE_NONE;

  transmitter_type = fs_transmitter_get_stream_transmitter_type (fstransmitter);

  g_object_unref (fstransmitter);
  return transmitter_type;
}

static GstPadProbeReturn
_transmitter_pad_have_data_callback (GstPad *pad, GstPadProbeInfo *info,
    gpointer user_data)
{
  FsRawSession *self = FS_RAW_SESSION (user_data);
  FsRawConference *conference = fs_raw_session_get_conference (self, NULL);
  FsRawStream *stream;
  GstElement *recv_capsfilter = NULL;
  GstPad *ghostpad;
  GstPad *srcpad;
  gchar *padname;
  FsCodec *codec;

  if (!conference)
    return GST_PAD_PROBE_REMOVE;

  GST_OBJECT_LOCK (conference);
  if (!self->priv->codecs ||
      !self->priv->recv_capsfilter ||
      !self->priv->transmitter_recv_probe_id)
  {
    GST_OBJECT_UNLOCK (conference);
    gst_object_unref (conference);
    return GST_PAD_PROBE_DROP;
  }

  recv_capsfilter = gst_object_ref (self->priv->recv_capsfilter);
  self->priv->transmitter_recv_probe_id = 0;
  codec = fs_codec_copy (self->priv->codecs->data);
  GST_OBJECT_UNLOCK (conference);

  srcpad = gst_element_get_static_pad (recv_capsfilter, "src");

  if (!srcpad)
  {
    GST_WARNING ("Unable to get recv_capsfilter (%p) srcpad", recv_capsfilter);
    goto error;
  }

  padname = g_strdup_printf ("src_%d", self->id);
  ghostpad = gst_ghost_pad_new_from_template (padname, srcpad,
      gst_element_class_get_pad_template (
        GST_ELEMENT_GET_CLASS (self->priv->conference),
        "src_%d"));
  g_free (padname);
  gst_object_unref (srcpad);

  gst_object_ref (ghostpad);

  if (!gst_pad_set_active (ghostpad, TRUE))
    GST_WARNING ("Unable to set ghost pad active");


  if (!gst_element_add_pad (GST_ELEMENT (self->priv->conference), ghostpad))
  {
    GST_WARNING ("Unable to add ghost pad to conference");

    gst_object_unref (ghostpad);
    gst_object_unref (ghostpad);
    goto error;
  }

  GST_OBJECT_LOCK (conference);
  self->priv->src_ghost_pad = ghostpad;
  stream = g_object_ref (self->priv->stream);
  GST_OBJECT_UNLOCK (conference);

  fs_stream_emit_src_pad_added (FS_STREAM (stream), ghostpad, codec);

  fs_codec_destroy (codec);
  g_object_unref (stream);
  gst_object_unref (conference);
  gst_object_unref (recv_capsfilter);

  return GST_PAD_PROBE_REMOVE;

error:
  fs_codec_destroy (codec);
  gst_object_unref (conference);
  gst_object_unref (recv_capsfilter);

  return GST_PAD_PROBE_REMOVE;
}




static FsStreamTransmitter *_stream_get_stream_transmitter (FsRawStream *stream,
    const gchar *transmitter_name,
    FsParticipant *participant,
    GParameter *parameters,
    guint n_parameters,
    GError **error,
    gpointer user_data)
{
  FsRawSession *self = user_data;
  FsTransmitter *fstransmitter = NULL;
  FsStreamTransmitter *stream_transmitter = NULL;
  GstElement *transmitter_src = NULL;
  FsRawConference *conference;
  gchar *tmp;
  GstElement *capsfilter;
  GstElement *valve;
  GstPad *transmitter_src_pad;

  conference = fs_raw_session_get_conference (self, error);
  if (!conference)
    return NULL;

  fstransmitter = fs_transmitter_new (transmitter_name, 1, 0, error);

  if (!fstransmitter)
    goto error;

  g_object_set (fstransmitter,
      "tos", self->priv->tos,
      "do-timestamp", FALSE,
      NULL);

  stream_transmitter = fs_transmitter_new_stream_transmitter (fstransmitter,
      participant, n_parameters, parameters, error);

  if (!stream_transmitter)
    goto error;

  g_object_get (fstransmitter, "gst-src", &transmitter_src, NULL);
  g_assert (transmitter_src);

  if (!gst_bin_add (GST_BIN (conference), transmitter_src))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add the transmitter's source element"
        " for session %d to the conference bin", self->id);
    gst_object_unref (transmitter_src);
    transmitter_src = NULL;
    goto error;
  }

  tmp = g_strdup_printf ("recv_capsfilter_%d", self->id);
  capsfilter = gst_element_factory_make ("capsfilter", tmp);
  g_free (tmp);

  if (!capsfilter)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not create a capsfilter element for session %d", self->id);
    g_object_unref (capsfilter);
    goto error;
  }

  gst_object_ref (capsfilter);

  if (!gst_bin_add (GST_BIN (conference), capsfilter))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add the capsfilter element for session %d", self->id);
    gst_object_unref (capsfilter);
    gst_object_unref (capsfilter);
    goto error;
  }
  self->priv->recv_capsfilter = capsfilter;

  if (gst_element_set_state (self->priv->recv_capsfilter, GST_STATE_PLAYING) ==
    GST_STATE_CHANGE_FAILURE)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not set the capsfilter element for session %d", self->id);
    goto error;
  }

  tmp = g_strdup_printf ("recv_valve_%d", self->id);
  valve = gst_element_factory_make ("valve", tmp);
  g_free (tmp);

  if (!valve) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not create a valve element for session %d", self->id);
    goto error;
  }

  gst_object_ref (valve);

  if (!gst_bin_add (GST_BIN (conference), valve))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add the valve element for session %d"
        " to the conference bin", self->id);
    gst_object_unref (valve);
    goto error;
  }

  g_object_set (valve, "drop", TRUE, NULL);

  self->priv->recv_valve = valve;

  if (gst_element_set_state (self->priv->recv_valve, GST_STATE_PLAYING) ==
    GST_STATE_CHANGE_FAILURE) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not set the valve element for session %d to the playing state",
        self->id);
    goto error;
  }

  if (!gst_element_link (self->priv->recv_valve, self->priv->recv_capsfilter))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the recv valve and the capsfilter");
    goto error;
  }

  if (!gst_element_link_pads (transmitter_src, "src_1",
          valve, "sink"))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the recv_valve to the codec bin");
    goto error;
  }

  transmitter_src_pad = gst_element_get_static_pad (transmitter_src, "src_1");

  GST_OBJECT_LOCK (conference);
  self->priv->transmitter = fstransmitter;
  self->priv->transmitter_src_pad = transmitter_src_pad;
  GST_OBJECT_UNLOCK (conference);

  self->priv->transmitter_recv_probe_id = gst_pad_add_probe (
    self->priv->transmitter_src_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
    _transmitter_pad_have_data_callback, g_object_ref (self),
    g_object_unref);

  if (!gst_element_sync_state_with_parent (transmitter_src))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the transmitter's source element"
        " with its parent for session %d", self->id);
    goto error;
  }

  gst_object_unref (transmitter_src);
  gst_object_unref (conference);

  return stream_transmitter;

error:
  if (self->priv->recv_valve)
  {
    gst_bin_remove (GST_BIN (conference), self->priv->recv_valve);
    self->priv->recv_valve = NULL;
  }

  if (self->priv->recv_capsfilter)
  {
    gst_bin_remove (GST_BIN (conference), self->priv->recv_capsfilter);
    self->priv->recv_capsfilter = NULL;
  }

  if (transmitter_src)
    gst_bin_remove (GST_BIN (conference), transmitter_src);

  if (stream_transmitter)
  {
    fs_stream_transmitter_stop (stream_transmitter);
    g_object_unref (stream_transmitter);
  }

  GST_OBJECT_LOCK (conference);
  fstransmitter = self->priv->transmitter;
  self->priv->transmitter = NULL;
  GST_OBJECT_UNLOCK (conference);

  if (fstransmitter)
    g_object_unref (fstransmitter);

  gst_object_unref (conference);

  return NULL;

}