Blob Blame History Raw
/*
 * Farstream - Farstream Shm UDP Transmitter
 *
 * Copyright 2007-2008 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 * Copyright 2007-2008 Nokia Corp.
 *
 * fs-shm-transmitter.c - A Farstream shm UDP transmitter
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */

/**
 * SECTION:fs-shm-transmitter
 * @short_description: A transmitter for shm UDP
 *
 * This transmitter provides shm udp
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "fs-shm-transmitter.h"
#include "fs-shm-stream-transmitter.h"

#include <farstream/fs-conference.h>
#include <farstream/fs-plugin.h>

#include <string.h>

GST_DEBUG_CATEGORY (fs_shm_transmitter_debug);
#define GST_CAT_DEFAULT fs_shm_transmitter_debug

/* Signals */
enum
{
  LAST_SIGNAL
};

/* props */
enum
{
  PROP_0,
  PROP_GST_SINK,
  PROP_GST_SRC,
  PROP_COMPONENTS,
  PROP_DO_TIMESTAMP,
};

struct _FsShmTransmitterPrivate
{
  /* We hold references to this element */
  GstElement *gst_sink;
  GstElement *gst_src;

  /* We don't hold a reference to these elements, they are owned
     by the bins */
  /* They are tables of pointers, one per component */
  GstElement **funnels;
  GstElement **tees;

  gboolean do_timestamp;
};

#define FS_SHM_TRANSMITTER_GET_PRIVATE(o)  \
  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_SHM_TRANSMITTER,   \
      FsShmTransmitterPrivate))

static void fs_shm_transmitter_class_init (
    FsShmTransmitterClass *klass);
static void fs_shm_transmitter_init (FsShmTransmitter *self);
static void fs_shm_transmitter_constructed (GObject *object);
static void fs_shm_transmitter_dispose (GObject *object);
static void fs_shm_transmitter_finalize (GObject *object);

static void fs_shm_transmitter_get_property (GObject *object,
                                                guint prop_id,
                                                GValue *value,
                                                GParamSpec *pspec);
static void fs_shm_transmitter_set_property (GObject *object,
                                                guint prop_id,
                                                const GValue *value,
                                                GParamSpec *pspec);

static FsStreamTransmitter *fs_shm_transmitter_new_stream_transmitter (
    FsTransmitter *transmitter, FsParticipant *participant,
    guint n_parameters, GParameter *parameters, GError **error);
static GType fs_shm_transmitter_get_stream_transmitter_type (
    FsTransmitter *transmitter);


static GObjectClass *parent_class = NULL;
//static guint signals[LAST_SIGNAL] = { 0 };

/*
 * Private bin subclass
 */

enum {
  BIN_SIGNAL_READY,
  BIN_SIGNAL_DISCONNECTED,
  BIN_LAST_SIGNAL
};

static guint bin_signals[BIN_LAST_SIGNAL] = { 0 };
static GType shm_bin_type = 0;
gpointer shm_bin_parent_class = NULL;

typedef struct _FsShmBin
{
  GstBin parent;
} FsShmBin;

typedef struct _FsShmBinClass
{
  GstBinClass parent_class;
} FsShmBinClass;

static void fs_shm_bin_init (FsShmBin *self)
{
}

static GstElement *
fs_shm_bin_new (void)
{
  return g_object_new (shm_bin_type, NULL);
}

static void
fs_shm_bin_handle_message (GstBin *bin, GstMessage *message)
{
  GstState old, new, pending;
  GError *gerror;
  gchar *msg;

  switch (GST_MESSAGE_TYPE (message))
  {
    case GST_MESSAGE_STATE_CHANGED:
      gst_message_parse_state_changed (message, &old, &new, &pending);

      if (old == GST_STATE_PAUSED && new == GST_STATE_PLAYING)
        g_signal_emit (bin, bin_signals[BIN_SIGNAL_READY], 0,
            GST_MESSAGE_SRC (message));
      break;
    case GST_MESSAGE_ERROR:
      gst_message_parse_error (message, &gerror, &msg);

      if (g_error_matches (gerror, GST_RESOURCE_ERROR,
              GST_RESOURCE_ERROR_READ))
      {
        g_signal_emit (bin, bin_signals[BIN_SIGNAL_DISCONNECTED], 0,
            GST_MESSAGE_SRC (message));
        gst_message_unref (message);
        return;
      }
      break;
    default:
      break;
  }

  GST_BIN_CLASS (shm_bin_parent_class)->handle_message (bin, message);
}

static void fs_shm_bin_class_init (FsShmBinClass *klass)
{
  GstBinClass *bin_class = GST_BIN_CLASS (klass);

  shm_bin_parent_class = g_type_class_peek_parent (klass);

  bin_signals[BIN_SIGNAL_READY] =
    g_signal_new ("ready", G_TYPE_FROM_CLASS (klass),
        G_SIGNAL_RUN_LAST, 0, NULL, NULL,
        g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);

  bin_signals[BIN_SIGNAL_DISCONNECTED] =
    g_signal_new ("disconnected", G_TYPE_FROM_CLASS (klass),
        G_SIGNAL_RUN_LAST, 0, NULL, NULL,
        g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);

  bin_class->handle_message = GST_DEBUG_FUNCPTR (fs_shm_bin_handle_message);
}

/*
 * Lets register the plugin
 */

static GType type = 0;

GType
fs_shm_transmitter_get_type (void)
{
  g_assert (type);
  return type;
}

static GType
fs_shm_transmitter_register_type (FsPlugin *module)
{
  static const GTypeInfo info = {
    sizeof (FsShmTransmitterClass),
    NULL,
    NULL,
    (GClassInitFunc) fs_shm_transmitter_class_init,
    NULL,
    NULL,
    sizeof (FsShmTransmitter),
    0,
    (GInstanceInitFunc) fs_shm_transmitter_init
  };

  static const GTypeInfo bin_info = {
    sizeof (FsShmBinClass),
    NULL,
    NULL,
    (GClassInitFunc) fs_shm_bin_class_init,
    NULL,
    NULL,
    sizeof (FsShmBin),
    0,
    (GInstanceInitFunc) fs_shm_bin_init
  };


  GST_DEBUG_CATEGORY_INIT (fs_shm_transmitter_debug,
      "fsshmtransmitter", 0,
      "Farstream shm UDP transmitter");

  fs_shm_stream_transmitter_register_type (module);

  type = g_type_register_static (FS_TYPE_TRANSMITTER, "FsShmTransmitter",
      &info, 0);

  shm_bin_type = g_type_register_static (
    GST_TYPE_BIN, "FsShmBin", &bin_info, 0);

  return type;
}

FS_INIT_PLUGIN (shm, transmitter)

static void
fs_shm_transmitter_class_init (FsShmTransmitterClass *klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  FsTransmitterClass *transmitter_class = FS_TRANSMITTER_CLASS (klass);

  parent_class = g_type_class_peek_parent (klass);

  gobject_class->set_property = fs_shm_transmitter_set_property;
  gobject_class->get_property = fs_shm_transmitter_get_property;

  gobject_class->constructed = fs_shm_transmitter_constructed;

  g_object_class_override_property (gobject_class, PROP_GST_SRC, "gst-src");
  g_object_class_override_property (gobject_class, PROP_GST_SINK, "gst-sink");
  g_object_class_override_property (gobject_class, PROP_COMPONENTS,
    "components");
  g_object_class_override_property (gobject_class, PROP_DO_TIMESTAMP,
    "do-timestamp");

  transmitter_class->new_stream_transmitter =
    fs_shm_transmitter_new_stream_transmitter;
  transmitter_class->get_stream_transmitter_type =
    fs_shm_transmitter_get_stream_transmitter_type;

  gobject_class->dispose = fs_shm_transmitter_dispose;
  gobject_class->finalize = fs_shm_transmitter_finalize;

  g_type_class_add_private (klass, sizeof (FsShmTransmitterPrivate));
}

static void
fs_shm_transmitter_init (FsShmTransmitter *self)
{

  /* member init */
  self->priv = FS_SHM_TRANSMITTER_GET_PRIVATE (self);

  self->components = 2;
  self->priv->do_timestamp = TRUE;
}

static void
fs_shm_transmitter_constructed (GObject *object)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER_CAST (object);
  FsTransmitter *trans = FS_TRANSMITTER_CAST (self);
  GstPad *pad = NULL, *pad2 = NULL;
  GstPad *ghostpad = NULL;
  gchar *padname;
  GstPadLinkReturn ret;
  int c; /* component_id */


  /* We waste one space in order to have the index be the component_id */
  self->priv->funnels = g_new0 (GstElement *, self->components+1);
  self->priv->tees = g_new0 (GstElement *, self->components+1);

  /* First we need the src elemnet */

  self->priv->gst_src = fs_shm_bin_new ();

  if (!self->priv->gst_src) {
    trans->construction_error = g_error_new (FS_ERROR,
      FS_ERROR_CONSTRUCTION,
      "Could not build the transmitter src bin");
    return;
  }

  gst_object_ref (self->priv->gst_src);


  /* Second, we do the sink element */

  self->priv->gst_sink = fs_shm_bin_new ();

  if (!self->priv->gst_sink) {
    trans->construction_error = g_error_new (FS_ERROR,
      FS_ERROR_CONSTRUCTION,
      "Could not build the transmitter sink bin");
    return;
  }

  g_object_set (G_OBJECT (self->priv->gst_sink),
      "async-handling", TRUE,
      NULL);

  gst_object_ref (self->priv->gst_sink);

  for (c = 1; c <= self->components; c++) {
    GstElement *fakesink = NULL;

    /* Lets create the RTP source funnel */

    self->priv->funnels[c] = gst_element_factory_make ("funnel", NULL);

    if (!self->priv->funnels[c]) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the funnel element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_src),
        self->priv->funnels[c])) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not add the funnel element to the transmitter src bin");
    }

    pad = gst_element_get_static_pad (self->priv->funnels[c], "src");
    padname = g_strdup_printf ("src_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_src, ghostpad);


    /* Lets create the RTP sink tee */

    self->priv->tees[c] = gst_element_factory_make ("tee", NULL);

    if (!self->priv->tees[c]) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the tee element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink),
        self->priv->tees[c])) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not add the tee element to the transmitter sink bin");
    }

    pad = gst_element_get_static_pad (self->priv->tees[c], "sink");
    padname = g_strdup_printf ("sink_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_sink, ghostpad);

    fakesink = gst_element_factory_make ("fakesink", NULL);

    if (!fakesink) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the fakesink element");
      return;
    }

    g_object_set (fakesink,
        "async", FALSE,
        "sync" , FALSE,
        NULL);

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink), fakesink))
    {
      gst_object_unref (fakesink);
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not add the fakesink element to the transmitter sink bin");
      return;
    }

    pad = gst_element_get_request_pad (self->priv->tees[c], "src_%u");
    pad2 = gst_element_get_static_pad (fakesink, "sink");

    ret = gst_pad_link (pad, pad2);

    gst_object_unref (pad2);
    gst_object_unref (pad);

    if (GST_PAD_LINK_FAILED(ret)) {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not link the tee to the fakesink");
      return;
    }
  }

  GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}

static void
fs_shm_transmitter_dispose (GObject *object)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER (object);

  if (self->priv->gst_src) {
    gst_object_unref (self->priv->gst_src);
    self->priv->gst_src = NULL;
  }

  if (self->priv->gst_sink) {
    gst_object_unref (self->priv->gst_sink);
    self->priv->gst_sink = NULL;
  }

  parent_class->dispose (object);
}

static void
fs_shm_transmitter_finalize (GObject *object)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER (object);

  if (self->priv->funnels) {
    g_free (self->priv->funnels);
    self->priv->funnels = NULL;
  }

  if (self->priv->tees) {
    g_free (self->priv->tees);
    self->priv->tees = NULL;
  }

  parent_class->finalize (object);
}

static void
fs_shm_transmitter_get_property (GObject *object,
                             guint prop_id,
                             GValue *value,
                             GParamSpec *pspec)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER (object);

  switch (prop_id) {
    case PROP_GST_SINK:
      g_value_set_object (value, self->priv->gst_sink);
      break;
    case PROP_GST_SRC:
      g_value_set_object (value, self->priv->gst_src);
      break;
    case PROP_COMPONENTS:
      g_value_set_uint (value, self->components);
      break;
    case PROP_DO_TIMESTAMP:
      g_value_set_boolean (value, self->priv->do_timestamp);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
fs_shm_transmitter_set_property (GObject *object,
                                    guint prop_id,
                                    const GValue *value,
                                    GParamSpec *pspec)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER (object);

  switch (prop_id) {
    case PROP_COMPONENTS:
      self->components = g_value_get_uint (value);
      break;
    case PROP_DO_TIMESTAMP:
      self->priv->do_timestamp = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}


/**
 * fs_shm_transmitter_new_stream_shm_transmitter:
 * @transmitter: a #FsTranmitter
 * @participant: the #FsParticipant for which the #FsStream using this
 * new #FsStreamTransmitter is created
 *
 * This function will create a new #FsStreamTransmitter element for a
 * specific participant for this #FsShmTransmitter
 *
 * Returns: a new #FsStreamTransmitter
 */

static FsStreamTransmitter *
fs_shm_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
  FsParticipant *participant, guint n_parameters, GParameter *parameters,
  GError **error)
{
  FsShmTransmitter *self = FS_SHM_TRANSMITTER (transmitter);

  return FS_STREAM_TRANSMITTER (fs_shm_stream_transmitter_newv (
        self, n_parameters, parameters, error));
}

static GType
fs_shm_transmitter_get_stream_transmitter_type (
    FsTransmitter *transmitter)
{
  return FS_TYPE_SHM_STREAM_TRANSMITTER;
}


struct _ShmSrc {
  guint component;
  gchar *path;
  GstElement *src;
  GstPad *funnelpad;

  got_buffer got_buffer_func;
  connection disconnected_func;
  gpointer cb_data;
  gulong buffer_probe;
};


static GstPadProbeReturn
src_buffer_probe_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  ShmSrc *shm = user_data;
  GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);

  shm->got_buffer_func (buffer, shm->component, shm->cb_data);

  return TRUE;
}


static void
disconnected_cb (GstBin *bin, GstElement *elem, ShmSrc *shm)
{
  if (elem != shm->src)
    return;

  shm->disconnected_func (shm->component, 0, shm->cb_data);
}


ShmSrc *
fs_shm_transmitter_get_shm_src (FsShmTransmitter *self,
    guint component,
    const gchar *path,
    got_buffer got_buffer_func,
    connection disconnected_func,
    gpointer cb_data,
    GError **error)
{
  ShmSrc *shm = g_slice_new0 (ShmSrc);
  GstElement *elem;
  GstPad *pad;

  shm->component = component;
  shm->got_buffer_func = got_buffer_func;
  shm->disconnected_func = disconnected_func;
  shm->cb_data = cb_data;

  shm->path = g_strdup (path);

  elem = gst_element_factory_make ("shmsrc", NULL);
  if (!elem)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not make shmsrc");
    goto error;
  }

  g_object_set (elem,
      "socket-path", path,
      "do-timestamp", self->priv->do_timestamp,
      "is-live", TRUE,
      NULL);

  if (shm->disconnected_func)
    g_signal_connect (self->priv->gst_src, "disconnected",
        G_CALLBACK (disconnected_cb), shm);

  if (!gst_bin_add (GST_BIN (self->priv->gst_src), elem))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add recvonly filter to bin");
    gst_object_unref (elem);
    goto error;
  }

  shm->src = elem;

  shm->funnelpad = gst_element_get_request_pad (self->priv->funnels[component],
      "sink_%u");

  if (!shm->funnelpad)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not get funnelpad");
    goto error;
  }

  pad = gst_element_get_static_pad (shm->src, "src");
  if (GST_PAD_LINK_FAILED (gst_pad_link (pad, shm->funnelpad)))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link tee"
        " and valve");
    gst_object_unref (pad);
    goto error;
  }

  gst_object_unref (pad);

  if (got_buffer_func)
    shm->buffer_probe = gst_pad_add_probe (shm->funnelpad,
        GST_PAD_PROBE_TYPE_BUFFER,
        src_buffer_probe_cb, shm, NULL);

  if (!gst_element_sync_state_with_parent (shm->src))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the state of the new shmsrc with its parent");
    goto error;
  }

  return shm;

 error:
  fs_shm_transmitter_check_shm_src (self, shm, NULL);
  return NULL;
}

/*
 * Returns: %TRUE if the path is the same, other %FALSE and freeds the ShmSrc
 */

gboolean
fs_shm_transmitter_check_shm_src (FsShmTransmitter *self, ShmSrc *shm,
    const gchar *path)
{
  if (path && !strcmp (path, shm->path))
    return TRUE;

  if (shm->buffer_probe)
    gst_pad_remove_probe (shm->funnelpad, shm->buffer_probe);
  shm->buffer_probe = 0;

  if (shm->funnelpad) {
    gst_element_release_request_pad (self->priv->funnels[shm->component],
        shm->funnelpad);
    gst_object_unref (shm->funnelpad);
  }
  shm->funnelpad = NULL;

  if (shm->src)
  {
    gst_element_set_locked_state (shm->src, TRUE);
    gst_element_set_state (shm->src, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (self->priv->gst_src), shm->src);
  }
  shm->src = NULL;

  g_free (shm->path);
  g_slice_free (ShmSrc, shm);

  return FALSE;
}



struct _ShmSink {
  guint component;
  gchar *path;
  GstElement *sink;
  GstElement *recvonly_filter;
  GstPad *teepad;

  ready ready_func;
  connection connected_func;
  gpointer cb_data;
};


static void
ready_cb (GstBin *bin, GstElement *elem, ShmSink *shm)
{
  gchar *path = NULL;

  if (elem != shm->sink)
    return;

  g_object_get (elem, "socket-path", &path, NULL);
  shm->ready_func (shm->component, path, shm->cb_data);
  g_free (path);
}


static void
connected_cb (GstBin *bin, gint id, ShmSink *shm)
{
  shm->connected_func (shm->component, id, shm->cb_data);
}

ShmSink *
fs_shm_transmitter_get_shm_sink (FsShmTransmitter *self,
    guint component,
    const gchar *path,
    ready ready_func,
    connection connected_func,
    gpointer cb_data,
    GError **error)
{
  ShmSink *shm = g_slice_new0 (ShmSink);
  GstElement *elem;
  GstPad *pad;

  GST_DEBUG ("Trying to add shm sink for c:%u path %s", component, path);

  shm->component = component;

  shm->path = g_strdup (path);

  shm->ready_func = ready_func;
  shm->connected_func = connected_func;
  shm->cb_data = cb_data;

  /* First add the sink */

  elem = gst_element_factory_make ("shmsink", NULL);
  if (!elem)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not make shmsink");
    goto error;
  }
  g_object_set (elem,
      "socket-path", path,
      "wait-for-connection", FALSE,
      "async", FALSE,
      "sync" , FALSE,
      NULL);

  if (ready_func)
    g_signal_connect (self->priv->gst_sink, "ready", G_CALLBACK (ready_cb),
        shm);

  if (connected_func)
    g_signal_connect (elem, "client-connected", G_CALLBACK (connected_cb), shm);

  if (!gst_bin_add (GST_BIN (self->priv->gst_sink), elem))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add shmsink to bin");
    gst_object_unref (elem);
    goto error;
  }

  shm->sink = elem;

  /* Second add the recvonly filter */

  elem = gst_element_factory_make ("valve", NULL);
  if (!elem)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not make valve");
    goto error;
  }

  if (!gst_bin_add (GST_BIN (self->priv->gst_sink), elem))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add recvonly filter to bin");
    gst_object_unref (elem);
    goto error;
  }

  shm->recvonly_filter = elem;

  /* Third connect these */

  if (!gst_element_link (shm->recvonly_filter, shm->sink))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link recvonly filter and shmsink");
    goto error;
  }

  if (!gst_element_sync_state_with_parent (shm->sink))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the state of the new shmsink with its parent");
    goto error;
  }

  if (!gst_element_sync_state_with_parent (shm->recvonly_filter))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the state of the new recvonly filter  with its parent");
    goto error;
  }

  shm->teepad = gst_element_get_request_pad (self->priv->tees[component],
      "src_%u");

  if (!shm->teepad)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not get teepad");
    goto error;
  }

  pad = gst_element_get_static_pad (shm->recvonly_filter, "sink");
  if (GST_PAD_LINK_FAILED (gst_pad_link (shm->teepad, pad)))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION, "Could not link tee"
        " and valve");
    gst_object_unref (pad);
    goto error;
  }
  gst_object_unref (pad);

  return shm;

 error:
  fs_shm_transmitter_check_shm_sink (self, shm, NULL);

  return NULL;
}

gboolean
fs_shm_transmitter_check_shm_sink (FsShmTransmitter *self, ShmSink *shm,
    const gchar *path)
{
  if (path && !strcmp (path, shm->path))
    return TRUE;

  if (path)
    GST_DEBUG ("Replacing shm socket %s with %s", shm->path, path);
  else
    GST_DEBUG ("Freeing shm socket %s", shm->path);

  if (shm->teepad)
  {
    gst_element_release_request_pad (self->priv->tees[shm->component],
        shm->teepad);
    gst_object_unref (shm->teepad);
  }
  shm->teepad = NULL;

  if (shm->sink)
  {
    gst_element_set_locked_state (shm->sink, TRUE);
    gst_element_set_state (shm->sink, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (self->priv->gst_sink), shm->sink);
  }
  shm->sink = NULL;

  if (shm->recvonly_filter)
  {
    gst_element_set_locked_state (shm->recvonly_filter, TRUE);
    gst_element_set_state (shm->recvonly_filter, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (self->priv->gst_sink), shm->recvonly_filter);
  }
  shm->recvonly_filter = NULL;

  g_free (shm->path);
  g_slice_free (ShmSink, shm);

  return FALSE;
}


void
fs_shm_transmitter_sink_set_sending (FsShmTransmitter *self, ShmSink *shm,
    gboolean sending)
{
  GObjectClass *klass = G_OBJECT_GET_CLASS (shm->recvonly_filter);

  if (g_object_class_find_property (klass, "drop"))
    g_object_set (shm->recvonly_filter, "drop", !sending, NULL);

  if (sending)
    gst_element_send_event (shm->sink,
        gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
            gst_structure_new ("GstForceKeyUnit",
              "all-headers", G_TYPE_BOOLEAN, TRUE,
              NULL)));
}