Blob Blame History Raw
/*
 * Farstream - Farstream Multicast UDP Transmitter
 *
 * Copyright 2007-2008 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 * Copyright 2007-2008 Nokia Corp.
 *
 * fs-multicast-transmitter.c - A Farstream multicast UDP transmitter
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */

/**
 * SECTION:fs-multicast-transmitter
 * @short_description: A transmitter for multicast UDP
 *
 * This transmitter provides multicast udp
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "fs-multicast-transmitter.h"
#include "fs-multicast-stream-transmitter.h"

#include <farstream/fs-conference.h>
#include <farstream/fs-plugin.h>

#include <string.h>
#include <sys/types.h>

#include <gio/gio.h>

#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif

#ifdef G_OS_WIN32
# include <ws2tcpip.h>
# define close closesocket
#else /*G_OS_WIN32*/
# include <sys/socket.h>
# include <netinet/in.h>
# include <arpa/inet.h>
#endif /*G_OS_WIN32*/

GST_DEBUG_CATEGORY (fs_multicast_transmitter_debug);
#define GST_CAT_DEFAULT fs_multicast_transmitter_debug

/* Signals */
enum
{
  LAST_SIGNAL
};

/* props */
enum
{
  PROP_0,
  PROP_GST_SINK,
  PROP_GST_SRC,
  PROP_COMPONENTS,
  PROP_TYPE_OF_SERVICE,
  PROP_DO_TIMESTAMP
};

struct _FsMulticastTransmitterPrivate
{
  /* We hold references to this element */
  GstElement *gst_sink;
  GstElement *gst_src;

  /* We don't hold a reference to these elements, they are owned
     by the bins */
  /* They are tables of pointers, one per component */
  GstElement **udpsrc_funnels;
  GstElement **udpsink_tees;

  GMutex mutex;
  GList **udpsocks;

  gint type_of_service;
  gboolean do_timestamp;

  gboolean disposed;
};

#define FS_MULTICAST_TRANSMITTER_GET_PRIVATE(o)  \
  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_MULTICAST_TRANSMITTER, \
    FsMulticastTransmitterPrivate))

#define FS_MULTICAST_TRANSMITTER_LOCK(self) \
  g_mutex_lock (&(self)->priv->mutex);
#define FS_MULTICAST_TRANSMITTER_UNLOCK(self) \
  g_mutex_unlock (&(self)->priv->mutex);

static void fs_multicast_transmitter_class_init (
    FsMulticastTransmitterClass *klass);
static void fs_multicast_transmitter_init (FsMulticastTransmitter *self);
static void fs_multicast_transmitter_constructed (GObject *object);
static void fs_multicast_transmitter_dispose (GObject *object);
static void fs_multicast_transmitter_finalize (GObject *object);

static void fs_multicast_transmitter_get_property (GObject *object,
                                                guint prop_id,
                                                GValue *value,
                                                GParamSpec *pspec);
static void fs_multicast_transmitter_set_property (GObject *object,
                                                guint prop_id,
                                                const GValue *value,
                                                GParamSpec *pspec);

static FsStreamTransmitter *fs_multicast_transmitter_new_stream_transmitter (
    FsTransmitter *transmitter, FsParticipant *participant,
    guint n_parameters, GParameter *parameters, GError **error);
static GType fs_multicast_transmitter_get_stream_transmitter_type (
    FsTransmitter *transmitter);

static void fs_multicast_transmitter_set_type_of_service (
    FsMulticastTransmitter *self,
    gint tos);


static GObjectClass *parent_class = NULL;
//static guint signals[LAST_SIGNAL] = { 0 };


/*
 * Lets register the plugin
 */

static GType type = 0;

GType
fs_multicast_transmitter_get_type (void)
{
  g_assert (type);
  return type;
}

static GType
fs_multicast_transmitter_register_type (FsPlugin *module)
{
  static const GTypeInfo info = {
    sizeof (FsMulticastTransmitterClass),
    NULL,
    NULL,
    (GClassInitFunc) fs_multicast_transmitter_class_init,
    NULL,
    NULL,
    sizeof (FsMulticastTransmitter),
    0,
    (GInstanceInitFunc) fs_multicast_transmitter_init
  };

  GST_DEBUG_CATEGORY_INIT (fs_multicast_transmitter_debug,
      "fsmulticasttransmitter", 0,
      "Farstream multicast UDP transmitter");

  fs_multicast_stream_transmitter_register_type (module);

  type = g_type_register_static (FS_TYPE_TRANSMITTER,
      "FsMulticastTransmitter", &info, 0);

  return type;
}

FS_INIT_PLUGIN (multicast, transmitter)

static void
fs_multicast_transmitter_class_init (FsMulticastTransmitterClass *klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  FsTransmitterClass *transmitter_class = FS_TRANSMITTER_CLASS (klass);

  parent_class = g_type_class_peek_parent (klass);

  gobject_class->set_property = fs_multicast_transmitter_set_property;
  gobject_class->get_property = fs_multicast_transmitter_get_property;

  gobject_class->constructed = fs_multicast_transmitter_constructed;

  g_object_class_override_property (gobject_class, PROP_GST_SRC, "gst-src");
  g_object_class_override_property (gobject_class, PROP_GST_SINK, "gst-sink");
  g_object_class_override_property (gobject_class, PROP_COMPONENTS,
    "components");
  g_object_class_override_property (gobject_class, PROP_TYPE_OF_SERVICE,
    "tos");
  g_object_class_override_property (gobject_class, PROP_DO_TIMESTAMP,
    "do-timestamp");

  transmitter_class->new_stream_transmitter =
    fs_multicast_transmitter_new_stream_transmitter;
  transmitter_class->get_stream_transmitter_type =
    fs_multicast_transmitter_get_stream_transmitter_type;

  gobject_class->dispose = fs_multicast_transmitter_dispose;
  gobject_class->finalize = fs_multicast_transmitter_finalize;

  g_type_class_add_private (klass, sizeof (FsMulticastTransmitterPrivate));
}

static void
fs_multicast_transmitter_init (FsMulticastTransmitter *self)
{

  /* member init */
  self->priv = FS_MULTICAST_TRANSMITTER_GET_PRIVATE (self);
  self->priv->disposed = FALSE;

  self->components = 2;
  g_mutex_init (&self->priv->mutex);
  self->priv->do_timestamp = TRUE;
}

static void
fs_multicast_transmitter_constructed (GObject *object)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER_CAST (object);
  FsTransmitter *trans = FS_TRANSMITTER_CAST (self);
  GstPad *pad = NULL, *pad2 = NULL;
  GstPad *ghostpad = NULL;
  gchar *padname;
  GstPadLinkReturn ret;
  int c; /* component_id */


  /* We waste one space in order to have the index be the component_id */
  self->priv->udpsrc_funnels = g_new0 (GstElement *, self->components+1);
  self->priv->udpsink_tees = g_new0 (GstElement *, self->components+1);
  self->priv->udpsocks = g_new0 (GList *, self->components+1);

  /* First we need the src elemnet */

  self->priv->gst_src = gst_bin_new (NULL);

  if (!self->priv->gst_src) {
    trans->construction_error = g_error_new (FS_ERROR,
      FS_ERROR_CONSTRUCTION,
      "Could not build the transmitter src bin");
    return;
  }

  gst_object_ref (self->priv->gst_src);


  /* Second, we do the sink element */

  self->priv->gst_sink = gst_bin_new (NULL);

  if (!self->priv->gst_sink) {
    trans->construction_error = g_error_new (FS_ERROR,
      FS_ERROR_CONSTRUCTION,
      "Could not build the transmitter sink bin");
    return;
  }

  g_object_set (G_OBJECT (self->priv->gst_sink),
      "async-handling", TRUE,
      NULL);

  gst_object_ref (self->priv->gst_sink);

  for (c = 1; c <= self->components; c++) {
    GstElement *fakesink = NULL;

    /* Lets create the RTP source funnel */

    self->priv->udpsrc_funnels[c] = gst_element_factory_make ("funnel", NULL);

    if (!self->priv->udpsrc_funnels[c]) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the funnel element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_src),
        self->priv->udpsrc_funnels[c])) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not add the funnel element to the transmitter src bin");
    }

    pad = gst_element_get_static_pad (self->priv->udpsrc_funnels[c], "src");
    padname = g_strdup_printf ("src_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_src, ghostpad);


    /* Lets create the RTP sink tee */

    self->priv->udpsink_tees[c] = gst_element_factory_make ("tee", NULL);

    if (!self->priv->udpsink_tees[c]) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the tee element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink),
        self->priv->udpsink_tees[c])) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not add the tee element to the transmitter sink bin");
    }

    pad = gst_element_get_static_pad (self->priv->udpsink_tees[c], "sink");
    padname = g_strdup_printf ("sink_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_sink, ghostpad);

    fakesink = gst_element_factory_make ("fakesink", NULL);

    if (!fakesink) {
      trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not make the fakesink element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink), fakesink))
    {
      gst_object_unref (fakesink);
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not add the fakesink element to the transmitter sink bin");
      return;
    }

    g_object_set (fakesink,
        "async", FALSE,
        "sync" , FALSE,
        NULL);

    pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src_%u");
    pad2 = gst_element_get_static_pad (fakesink, "sink");

    ret = gst_pad_link (pad, pad2);

    gst_object_unref (pad2);
    gst_object_unref (pad);

    if (GST_PAD_LINK_FAILED(ret)) {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not link the tee to the fakesink");
      return;
    }
  }

  GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}

static void
fs_multicast_transmitter_dispose (GObject *object)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);

  if (self->priv->disposed) {
    /* If dispose did already run, return. */
    return;
  }

  if (self->priv->gst_src) {
    gst_object_unref (self->priv->gst_src);
    self->priv->gst_src = NULL;
  }

  if (self->priv->gst_sink) {
    gst_object_unref (self->priv->gst_sink);
    self->priv->gst_sink = NULL;
  }

  /* Make sure dispose does not run twice. */
  self->priv->disposed = TRUE;

  parent_class->dispose (object);
}

static void
fs_multicast_transmitter_finalize (GObject *object)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);

  if (self->priv->udpsrc_funnels) {
    g_free (self->priv->udpsrc_funnels);
    self->priv->udpsrc_funnels = NULL;
  }

  if (self->priv->udpsink_tees) {
    g_free (self->priv->udpsink_tees);
    self->priv->udpsink_tees = NULL;
  }

  if (self->priv->udpsocks) {
    g_free (self->priv->udpsocks);
    self->priv->udpsocks = NULL;
  }

  g_mutex_clear (&self->priv->mutex);

  parent_class->finalize (object);
}

static void
fs_multicast_transmitter_get_property (GObject *object,
                             guint prop_id,
                             GValue *value,
                             GParamSpec *pspec)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);

  switch (prop_id) {
    case PROP_GST_SINK:
      g_value_set_object (value, self->priv->gst_sink);
      break;
    case PROP_GST_SRC:
      g_value_set_object (value, self->priv->gst_src);
      break;
    case PROP_COMPONENTS:
      g_value_set_uint (value, self->components);
      break;
    case PROP_TYPE_OF_SERVICE:
      FS_MULTICAST_TRANSMITTER_LOCK (self);
      g_value_set_uint (value, self->priv->type_of_service);
      FS_MULTICAST_TRANSMITTER_UNLOCK (self);
      break;
    case PROP_DO_TIMESTAMP:
      g_value_set_boolean (value, self->priv->do_timestamp);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
fs_multicast_transmitter_set_property (GObject *object,
                                    guint prop_id,
                                    const GValue *value,
                                    GParamSpec *pspec)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);

  switch (prop_id) {
    case PROP_COMPONENTS:
      self->components = g_value_get_uint (value);
      break;
    case PROP_TYPE_OF_SERVICE:
      fs_multicast_transmitter_set_type_of_service (self,
          g_value_get_uint (value));
      break;
    case PROP_DO_TIMESTAMP:
      self->priv->do_timestamp = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}


/**
 * fs_multicast_transmitter_new_stream_multicast_transmitter:
 * @transmitter: a #FsTranmitter
 * @participant: the #FsParticipant for which the #FsStream using this
 * new #FsStreamTransmitter is created
 *
 * This function will create a new #FsStreamTransmitter element for a
 * specific participant for this #FsMulticastTransmitter
 *
 * Returns: a new #FsStreamTransmitter
 */

static FsStreamTransmitter *
fs_multicast_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
  FsParticipant *participant, guint n_parameters, GParameter *parameters,
  GError **error)
{
  FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (transmitter);

  return FS_STREAM_TRANSMITTER (fs_multicast_stream_transmitter_newv (
        self, n_parameters, parameters, error));
}


/*
 * The UdpSock structure is a ref-counted pseudo-object use to represent
 * one local_ip:port:multicast_ip trio on which we listen and send,
 * so it includes a udpsrc and a multiudpsink. It represents one BSD socket.
 * The TTL used is the max TTL requested by any stream.
 */

struct _UdpSock {

  GstElement *udpsrc;
  GstPad *udpsrc_requested_pad;

  GstElement *udpsink;
  GstPad *udpsink_requested_pad;

  gchar *local_ip;
  gchar *multicast_ip;
  guint16 port;
  /* Protected by the transmitter mutex */
  guint8 current_ttl;

  gint fd;
  GSocket *socket;

  /* Protected by the transmitter mutex */
  GByteArray *ttls;

  /* These are just convenience pointers to our parent transmitter */
  GstElement *funnel;
  GstElement *tee;

  guint component_id;

  volatile gint sendcount;
};

static gboolean
_ip_string_into_sockaddr_in (const gchar *ip_as_string,
    struct sockaddr_in *sockaddr_in, GError **error)
{
  GInetAddress *inetaddr;
  GSocketAddress *socket_addr;
  gboolean ret;

  inetaddr = g_inet_address_new_from_string (ip_as_string);

  if (!inetaddr) {
    g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
        "Invalid IP address %s passed", ip_as_string);
    return FALSE;
  }

  if (g_inet_address_get_family (inetaddr) != G_SOCKET_FAMILY_IPV4) {
    g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
        "IP address %s passed is not IPv4", ip_as_string);
    g_object_unref (inetaddr);
    return 0;
  }

  socket_addr = g_inet_socket_address_new (inetaddr, 1);

  ret = g_socket_address_to_native (socket_addr, sockaddr_in,
      sizeof (struct sockaddr_in), error);

  g_object_unref (socket_addr);
  g_object_unref (inetaddr);

  return ret;
}

static gint
_bind_port (
    const gchar *local_ip,
    const gchar *multicast_ip,
    guint16 port,
    guchar ttl,
    int type_of_service,
    GError **error)
{
  int sock = -1;
  struct sockaddr_in address;
  int retval;
  guchar loop = 1;
  int reuseaddr = 1;
#ifdef HAVE_IP_MREQN
  struct ip_mreqn mreq;
#else
  struct ip_mreq mreq;
#endif

  address.sin_family = AF_INET;
  address.sin_addr.s_addr = INADDR_ANY;

  g_assert (multicast_ip);

  if (!_ip_string_into_sockaddr_in (multicast_ip, &address, error))
    goto error;
  memcpy (&mreq.imr_multiaddr, &address.sin_addr,
      sizeof (mreq.imr_multiaddr));

  if (local_ip)
  {
    struct sockaddr_in tmpaddr;
    if (!_ip_string_into_sockaddr_in (local_ip, &tmpaddr, error))
      goto error;
#ifdef HAVE_IP_MREQN
    memcpy (&mreq.imr_address, &tmpaddr.sin_addr, sizeof (mreq.imr_address));
#else
    memcpy (&mreq.imr_interface, &tmpaddr.sin_addr, sizeof (mreq.imr_interface));
#endif
  }
  else
  {
#ifdef HAVE_IP_MREQN
    mreq.imr_address.s_addr = INADDR_ANY;
#else
    mreq.imr_interface.s_addr = INADDR_ANY;
#endif
  }

#ifdef HAVE_IP_MREQN
  mreq.imr_ifindex = 0;
#endif

  if ((sock = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP)) <= 0) {
    g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
      "Error creating socket: %s", g_strerror (errno));
    goto error;
  }

  if (setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, (const void *)&ttl,
          sizeof (ttl)) < 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Error setting the multicast TTL: %s",
        g_strerror (errno));
    goto error;
  }

  if (setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, (const void *)&loop,
          sizeof (loop)) < 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Error setting the multicast loop to FALSE: %s",
        g_strerror (errno));
    goto error;
  }

  if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (const void *)&reuseaddr,
          sizeof (reuseaddr)) < 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Error setting reuseaddr to TRUE: %s",
        g_strerror (errno));
    goto error;
  }

#ifdef SO_REUSEPORT
  if (setsockopt (sock, SOL_SOCKET, SO_REUSEPORT, (const void *)&reuseaddr,
          sizeof (reuseaddr)) < 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Error setting reuseaddr to TRUE: %s",
        g_strerror (errno));
    goto error;
  }
#endif

  if (setsockopt (sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
          (const void *)&mreq, sizeof (mreq)) < 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Could not join the socket to the multicast group: %s",
        g_strerror (errno));
    goto error;
  }

  if (setsockopt (sock, IPPROTO_IP, IP_TOS,
          &type_of_service, sizeof (type_of_service)) < 0)
    GST_WARNING ("could not set socket ToS: %s", g_strerror (errno));

#ifdef IPV6_TCLASS
  if (setsockopt (sock, IPPROTO_IPV6, IPV6_TCLASS,
          &type_of_service, sizeof (type_of_service)) < 0)
    GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif

  address.sin_port = htons (port);
  retval = bind (sock, (struct sockaddr *) &address, sizeof (address));
  if (retval != 0)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
        "Could not bind to port %d", port);
    goto error;
  }

  return sock;

 error:
  if (sock >= 0)
    close (sock);
  return -1;
}

static GstElement *
_create_sinksource (gchar *elementname, GstBin *bin,
    GstElement *teefunnel, GSocket *socket,
    GstPadDirection direction, GstPad **requested_pad, GError **error)
{
  GstElement *elem;
  GstPadLinkReturn ret = GST_PAD_LINK_OK;
  GstPad *elempad = NULL;
  GstStateChangeReturn state_ret;

  g_assert (direction == GST_PAD_SINK || direction == GST_PAD_SRC);

  elem = gst_element_factory_make (elementname, NULL);
  if (!elem) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
      "Could not create the %s element", elementname);
    return NULL;
  }

  g_object_set (elem,
    "close-socket", FALSE,
    "socket", socket,
    "auto-multicast", FALSE,
    NULL);

  if (!gst_bin_add (bin, elem)) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
      "Could not add the %s element to the gst %s bin", elementname,
      (direction == GST_PAD_SINK) ? "sink" : "src");
    gst_object_unref (elem);
    return NULL;
  }

  if (direction == GST_PAD_SINK)
    *requested_pad = gst_element_get_request_pad (teefunnel, "src_%u");
  else
    *requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u");

  if (!*requested_pad) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
      "Could not get the %s request pad from the %s",
      (direction == GST_PAD_SINK) ? "src" : "sink",
      (direction == GST_PAD_SINK) ? "tee" : "funnel");
    goto error;
  }

  if (direction == GST_PAD_SINK)
    elempad = gst_element_get_static_pad (elem, "sink");
  else
    elempad = gst_element_get_static_pad (elem, "src");

  if (direction != GST_PAD_SINK)
    ret = gst_pad_link (elempad, *requested_pad);

  if (GST_PAD_LINK_FAILED(ret))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
      "Could not link the new element %s (%d)", elementname, ret);
    goto error;
  }

  if (!gst_element_sync_state_with_parent (elem)) {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
      "Could not sync the state of the new %s with its parent",
      elementname);
    goto error;
  }

  if (direction == GST_PAD_SINK)
    ret = gst_pad_link (*requested_pad, elempad);

  if (GST_PAD_LINK_FAILED(ret))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the new element %s (%d)", elementname, ret);
    goto error;
  }

  gst_object_unref (elempad);

  return elem;

 error:

  gst_element_set_locked_state (elem, TRUE);
  state_ret = gst_element_set_state (elem, GST_STATE_NULL);
  if (state_ret != GST_STATE_CHANGE_SUCCESS)
    GST_ERROR ("On error, could not reset %s to state NULL (%s)", elementname,
      gst_element_state_change_return_get_name (state_ret));
  if (!gst_bin_remove (bin, elem))
    GST_ERROR ("Could not remove element %s from bin on error", elementname);

  if (elempad)
    gst_object_unref (elempad);

  return NULL;
}

static UdpSock *
fs_multicast_transmitter_get_udpsock_locked (FsMulticastTransmitter *trans,
    guint component_id,
    const gchar *local_ip,
    const gchar *multicast_ip,
    guint16 port,
    guint8 ttl,
    gboolean sending,
    GError **error)
{
  UdpSock *udpsock;
  GList *udpsock_e;

  for (udpsock_e = g_list_first (trans->priv->udpsocks[component_id]);
       udpsock_e;
       udpsock_e = g_list_next (udpsock_e))
  {
    udpsock = udpsock_e->data;

    if (port == udpsock->port &&
        !strcmp (multicast_ip, udpsock->multicast_ip) &&
        ((local_ip == NULL && udpsock->local_ip == NULL) ||
            (local_ip && udpsock->local_ip &&
                !strcmp (local_ip, udpsock->local_ip))))
    {
      if (ttl > udpsock->current_ttl)
      {

        if (setsockopt (udpsock->fd, IPPROTO_IP, IP_MULTICAST_TTL,
                (const void *)&ttl, sizeof (ttl)) < 0)
        {
          g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
              "Error setting the multicast TTL: %s",
              g_strerror (errno));
          return NULL;
        }
        udpsock->current_ttl = ttl;
      }
      g_byte_array_append (udpsock->ttls, &ttl, 1);

      return udpsock;
    }
  }
  return NULL;
}

UdpSock *
fs_multicast_transmitter_get_udpsock (FsMulticastTransmitter *trans,
    guint component_id,
    const gchar *local_ip,
    const gchar *multicast_ip,
    guint16 port,
    guint8 ttl,
    gboolean sending,
    GError **error)
{
  UdpSock *udpsock;
  UdpSock *tmpudpsock;
  GError *local_error = NULL;
  int tos;

  /* First lets check if we already have one */
  if (component_id > trans->components)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
      "Invalid component %d > %d", component_id, trans->components);
    return NULL;
  }

  FS_MULTICAST_TRANSMITTER_LOCK (trans);
  udpsock = fs_multicast_transmitter_get_udpsock_locked (trans, component_id,
      local_ip, multicast_ip, port, ttl, sending, &local_error);
  tos = trans->priv->type_of_service;
  FS_MULTICAST_TRANSMITTER_UNLOCK (trans);

  if (local_error)
  {
    g_propagate_error (error, local_error);
    return NULL;
  }

  if (udpsock)
  {
    if (sending)
      fs_multicast_transmitter_udpsock_inc_sending (udpsock);
    return udpsock;
  }

  udpsock = g_slice_new0 (UdpSock);

  udpsock->local_ip = g_strdup (local_ip);
  udpsock->multicast_ip = g_strdup (multicast_ip);
  udpsock->fd = -1;
  udpsock->component_id = component_id;
  udpsock->port = port;
  udpsock->current_ttl = ttl;
  udpsock->ttls = g_byte_array_new ();
  g_byte_array_append (udpsock->ttls, &ttl, 1);

  /* Now lets bind both ports */

  udpsock->fd = _bind_port (local_ip, multicast_ip, port, ttl, tos, error);
  if (udpsock->fd < 0)
    goto error;

  udpsock->socket = g_socket_new_from_fd (udpsock->fd, error);
  if (!udpsock->socket)
    goto error;

  /* Now lets create the elements */

  udpsock->tee = trans->priv->udpsink_tees[component_id];
  udpsock->funnel = trans->priv->udpsrc_funnels[component_id];

  udpsock->udpsrc = _create_sinksource ("udpsrc",
      GST_BIN (trans->priv->gst_src), udpsock->funnel, udpsock->socket,
      GST_PAD_SRC, &udpsock->udpsrc_requested_pad, error);
  if (!udpsock->udpsrc)
    goto error;

  udpsock->udpsink = _create_sinksource ("multiudpsink",
      GST_BIN (trans->priv->gst_sink), udpsock->tee,
      udpsock->socket, GST_PAD_SINK, &udpsock->udpsink_requested_pad, error);
  if (!udpsock->udpsink)
    goto error;

  g_object_set (udpsock->udpsink,
      "async", FALSE,
      "sync", FALSE,
      NULL);

  FS_MULTICAST_TRANSMITTER_LOCK (trans);
  /* Check if someone else has added the same thing at the same time */
  tmpudpsock = fs_multicast_transmitter_get_udpsock_locked (trans, component_id,
      local_ip, multicast_ip, port, ttl, sending, &local_error);

  if (tmpudpsock || local_error)
  {
    FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
    fs_multicast_transmitter_put_udpsock (trans, udpsock, ttl);
    if (local_error)
    {
      g_propagate_error (error, local_error);
      goto error;
    }
    if (sending)
      fs_multicast_transmitter_udpsock_inc_sending (udpsock);
    return tmpudpsock;
  }

  trans->priv->udpsocks[component_id] =
    g_list_prepend (trans->priv->udpsocks[component_id], udpsock);
  FS_MULTICAST_TRANSMITTER_UNLOCK (trans);

  if (sending)
    fs_multicast_transmitter_udpsock_inc_sending (udpsock);

  return udpsock;

 error:

  fs_multicast_transmitter_put_udpsock (trans, udpsock, ttl);

  return NULL;
}

void
fs_multicast_transmitter_put_udpsock (FsMulticastTransmitter *trans,
    UdpSock *udpsock, guint8 ttl)
{
  guint i;

  FS_MULTICAST_TRANSMITTER_LOCK (trans);
  for (i = udpsock->ttls->len - 1;; i--)
  {
    if (udpsock->ttls->data[i] == ttl)
    {
      g_byte_array_remove_index_fast (udpsock->ttls, i);
      break;
    }

    g_return_if_fail (i > 0);
  }

  if (udpsock->ttls->len > 0)
  {
    g_assert (udpsock->fd >= 0);

    /* If we were the max, check if there is a new max */
    if (udpsock->current_ttl == ttl && ttl > 1)
    {
      guint8 max = 1;
      for (i = 0; i < udpsock->ttls->len; i++)
      {
        if (udpsock->ttls->data[i] > max)
          max = udpsock->ttls->data[i];
      }

      if (max != udpsock->current_ttl)
      {

        if (setsockopt (udpsock->fd, IPPROTO_IP, IP_MULTICAST_TTL,
                (const void *)&max, sizeof (max)) < 0)
        {
          GST_WARNING ("Error setting the multicast TTL to %u: %s", max,
              g_strerror (errno));
          FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
          return;
        }
        udpsock->current_ttl = max;
      }
    }
    FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
    return;
  }

  trans->priv->udpsocks[udpsock->component_id] =
    g_list_remove (trans->priv->udpsocks[udpsock->component_id], udpsock);

  FS_MULTICAST_TRANSMITTER_UNLOCK (trans);

  if (udpsock->udpsrc)
  {
    GstStateChangeReturn ret;
    gst_element_set_locked_state (udpsock->udpsrc, TRUE);
    ret = gst_element_set_state (udpsock->udpsrc, GST_STATE_NULL);
    if (ret != GST_STATE_CHANGE_SUCCESS)
      GST_ERROR ("Error changing state of udpsrc: %s",
          gst_element_state_change_return_get_name (ret));
    if (!gst_bin_remove (GST_BIN (trans->priv->gst_src), udpsock->udpsrc))
      GST_ERROR ("Could not remove udpsrc element from transmitter source");
  }

  if (udpsock->udpsrc_requested_pad)
  {
    gst_element_release_request_pad (udpsock->funnel,
      udpsock->udpsrc_requested_pad);
    gst_object_unref (udpsock->udpsrc_requested_pad);
  }

  if (udpsock->udpsink_requested_pad)
  {
    gst_element_release_request_pad (udpsock->tee,
      udpsock->udpsink_requested_pad);
    gst_object_unref (udpsock->udpsink_requested_pad);
  }

  if (udpsock->udpsink)
  {
    GstStateChangeReturn ret;
    gst_element_set_locked_state (udpsock->udpsink, TRUE);
    ret = gst_element_set_state (udpsock->udpsink, GST_STATE_NULL);
    if (ret != GST_STATE_CHANGE_SUCCESS)
      GST_ERROR ("Error changing state of udpsink: %s",
          gst_element_state_change_return_get_name (ret));
    if (!gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpsock->udpsink))
      GST_ERROR ("Could not remove udpsink element from transmitter source");
  }

  if (udpsock->socket)
    g_object_unref (udpsock->socket);

  if (udpsock->fd >= 0)
    close (udpsock->fd);

  g_byte_array_free (udpsock->ttls, TRUE);
  g_free (udpsock->multicast_ip);
  g_free (udpsock->local_ip);
  g_slice_free (UdpSock, udpsock);
}

void
fs_multicast_transmitter_udpsock_inc_sending (UdpSock *udpsock)
{
  if (g_atomic_int_add (&udpsock->sendcount, 1) == 0)
  {
    g_signal_emit_by_name (udpsock->udpsink, "add", udpsock->multicast_ip,
        udpsock->port);

    gst_element_send_event (udpsock->udpsink,
        gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
            gst_structure_new ("GstForceKeyUnit",
                "all-headers", G_TYPE_BOOLEAN, TRUE,
                NULL)));
  }
}

void
fs_multicast_transmitter_udpsock_dec_sending (UdpSock *udpsock)
{
  if (g_atomic_int_dec_and_test (&udpsock->sendcount))
  {
    g_signal_emit_by_name (udpsock->udpsink, "remove", udpsock->multicast_ip,
        udpsock->port);
  }
}

static GType
fs_multicast_transmitter_get_stream_transmitter_type (
    FsTransmitter *transmitter)
{
  return FS_TYPE_MULTICAST_STREAM_TRANSMITTER;
}

void
fs_multicast_transmitter_udpsock_ref (FsMulticastTransmitter *trans,
    UdpSock *udpsock, guint8 ttl)
{
  FS_MULTICAST_TRANSMITTER_LOCK (trans);
  g_byte_array_append (udpsock->ttls, &ttl, 1);
  FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
}


static void
fs_multicast_transmitter_set_type_of_service (FsMulticastTransmitter *self,
    gint tos)
{
  gint i;

  FS_MULTICAST_TRANSMITTER_LOCK (self);
  if (self->priv->type_of_service == tos)
    goto out;

  self->priv->type_of_service = tos;

  for (i = 0; i < self->components; i++)
  {
    GList *item;

    for (item = self->priv->udpsocks[i]; item; item = item->next)
    {
      UdpSock *udpsock = item->data;

      if (setsockopt (udpsock->fd, IPPROTO_IP, IP_TOS,
              &tos, sizeof (tos)) < 0)
        GST_WARNING ( "could not set socket tos: %s", g_strerror (errno));

#ifdef IPV6_TCLASS
      if (setsockopt (udpsock->fd, IPPROTO_IPV6, IPV6_TCLASS,
              &tos, sizeof (tos)) < 0)
        GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif
    }
  }

 out:
  FS_MULTICAST_TRANSMITTER_UNLOCK (self);
}