Blob Blame History Raw
/*
 * Farstream - Farstream RAW UDP with STUN Transmitter
 *
 * Copyright 2007 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 * Copyright 2007 Nokia Corp.
 *
 * fs-rawudp-transmitter.h - A Farstream UDP transmitter with STUN
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */

/**
 * SECTION:fs-rawudp-transmitter
 * @short_description: A transmitter for raw udp (with STUN)
 *
 * This transmitter provides RAW udp (with stun)
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "fs-rawudp-transmitter.h"
#include "fs-rawudp-stream-transmitter.h"

#include <farstream/fs-conference.h>
#include <farstream/fs-plugin.h>

#include <gio/gio.h>

#include <string.h>
#include <sys/types.h>

#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif

GST_DEBUG_CATEGORY (fs_rawudp_transmitter_debug);
#define GST_CAT_DEFAULT fs_rawudp_transmitter_debug

/* Signals */
enum
{
  LAST_SIGNAL
};

/* props */
enum
{
  PROP_0,
  PROP_GST_SINK,
  PROP_GST_SRC,
  PROP_COMPONENTS,
  PROP_TYPE_OF_SERVICE,
  PROP_DO_TIMESTAMP
};

struct _FsRawUdpTransmitterPrivate
{
  /* We hold references to this element */
  GstElement *gst_sink;
  GstElement *gst_src;

  /* We don't hold a reference to these elements, they are owned
     by the bins */
  /* They are tables of pointers, one per component */
  GstElement **udpsrc_funnels;
  GstElement **udpsink_tees;

  GMutex mutex;
  /* Protected by the mutex */
  GList **udpports;

  gint type_of_service;
  gboolean do_timestamp;

  gboolean disposed;
};

#define FS_RAWUDP_TRANSMITTER_GET_PRIVATE(o)                            \
  (G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_RAWUDP_TRANSMITTER,        \
      FsRawUdpTransmitterPrivate))

static void fs_rawudp_transmitter_class_init (FsRawUdpTransmitterClass *klass);
static void fs_rawudp_transmitter_init (FsRawUdpTransmitter *self);
static void fs_rawudp_transmitter_constructed (GObject *object);
static void fs_rawudp_transmitter_dispose (GObject *object);
static void fs_rawudp_transmitter_finalize (GObject *object);

static void fs_rawudp_transmitter_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec);
static void fs_rawudp_transmitter_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec);

static FsStreamTransmitter *fs_rawudp_transmitter_new_stream_transmitter (
    FsTransmitter *transmitter,
    FsParticipant *participant,
    guint n_parameters,
    GParameter *parameters,
    GError **error);
static GType fs_rawudp_transmitter_get_stream_transmitter_type (
    FsTransmitter *transmitter);

static void fs_rawudp_transmitter_set_type_of_service (
    FsRawUdpTransmitter *self,
    gint tos);


static GObjectClass *parent_class = NULL;
//static guint signals[LAST_SIGNAL] = { 0 };


/*
 * Lets register the plugin
 */

static GType type = 0;

GType
fs_rawudp_transmitter_get_type (void)
{
  g_assert (type);
  return type;
}

static GType
fs_rawudp_transmitter_register_type (FsPlugin *module)
{
  static const GTypeInfo info = {
    sizeof (FsRawUdpTransmitterClass),
    NULL,
    NULL,
    (GClassInitFunc) fs_rawudp_transmitter_class_init,
    NULL,
    NULL,
    sizeof (FsRawUdpTransmitter),
    0,
    (GInstanceInitFunc) fs_rawudp_transmitter_init
  };

  GST_DEBUG_CATEGORY_INIT (fs_rawudp_transmitter_debug,
      "fsrawudptransmitter", 0,
      "Farstream raw UDP transmitter");

  fs_rawudp_stream_transmitter_register_type (module);

  type = g_type_register_static (FS_TYPE_TRANSMITTER, "FsRawUdpTransmitter",
      &info, 0);

  return type;
}


FS_INIT_PLUGIN (rawudp, transmitter)

static void
fs_rawudp_transmitter_class_init (FsRawUdpTransmitterClass *klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  FsTransmitterClass *transmitter_class = FS_TRANSMITTER_CLASS (klass);

  parent_class = g_type_class_peek_parent (klass);

  gobject_class->set_property = fs_rawudp_transmitter_set_property;
  gobject_class->get_property = fs_rawudp_transmitter_get_property;

  gobject_class->constructed = fs_rawudp_transmitter_constructed;

  g_object_class_override_property (gobject_class, PROP_GST_SRC, "gst-src");
  g_object_class_override_property (gobject_class, PROP_GST_SINK, "gst-sink");
  g_object_class_override_property (gobject_class, PROP_COMPONENTS,
      "components");
  g_object_class_override_property (gobject_class, PROP_TYPE_OF_SERVICE,
      "tos");
  g_object_class_override_property (gobject_class, PROP_DO_TIMESTAMP,
      "do-timestamp");

  transmitter_class->new_stream_transmitter =
    fs_rawudp_transmitter_new_stream_transmitter;
  transmitter_class->get_stream_transmitter_type =
    fs_rawudp_transmitter_get_stream_transmitter_type;

  gobject_class->dispose = fs_rawudp_transmitter_dispose;
  gobject_class->finalize = fs_rawudp_transmitter_finalize;

  g_type_class_add_private (klass, sizeof (FsRawUdpTransmitterPrivate));
}

static void
fs_rawudp_transmitter_init (FsRawUdpTransmitter *self)
{

  /* member init */
  self->priv = FS_RAWUDP_TRANSMITTER_GET_PRIVATE (self);
  self->priv->disposed = FALSE;

  self->components = 2;
  g_mutex_init (&self->priv->mutex);
  self->priv->do_timestamp = TRUE;
}

static void
fs_rawudp_transmitter_constructed (GObject *object)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER_CAST (object);
  FsTransmitter *trans = FS_TRANSMITTER_CAST (self);
  GstPad *pad = NULL, *pad2 = NULL;
  GstPad *ghostpad = NULL;
  gchar *padname;
  GstPadLinkReturn ret;
  int c; /* component_id */


  /* We waste one space in order to have the index be the component_id */
  self->priv->udpsrc_funnels = g_new0 (GstElement *, self->components+1);
  self->priv->udpsink_tees = g_new0 (GstElement *, self->components+1);
  self->priv->udpports = g_new0 (GList *, self->components+1);

  /* First we need the src elemnet */

  self->priv->gst_src = gst_bin_new (NULL);

  if (!self->priv->gst_src)
  {
    trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not build the transmitter src bin");
    return;
  }

  gst_object_ref (self->priv->gst_src);


  /* Second, we do the sink element */

  self->priv->gst_sink = gst_bin_new (NULL);

  if (!self->priv->gst_sink)
  {
    trans->construction_error = g_error_new (FS_ERROR,
        FS_ERROR_CONSTRUCTION,
        "Could not build the transmitter sink bin");
    return;
  }

  g_object_set (G_OBJECT (self->priv->gst_sink),
      "async-handling", TRUE,
      NULL);

  gst_object_ref (self->priv->gst_sink);

  for (c = 1; c <= self->components; c++)
  {
    GstElement *fakesink = NULL;

    /* Lets create the RTP source funnel */

    self->priv->udpsrc_funnels[c] = gst_element_factory_make ("funnel", NULL);

    if (!self->priv->udpsrc_funnels[c])
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not make the funnel element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_src),
            self->priv->udpsrc_funnels[c]))
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not add the funnel element to the transmitter src bin");
    }

    pad = gst_element_get_static_pad (self->priv->udpsrc_funnels[c], "src");
    padname = g_strdup_printf ("src_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_src, ghostpad);


    /* Lets create the RTP sink tee */

    self->priv->udpsink_tees[c] = gst_element_factory_make ("tee", NULL);

    if (!self->priv->udpsink_tees[c])
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not make the tee element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink),
            self->priv->udpsink_tees[c]))
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not add the tee element to the transmitter sink bin");
    }

    pad = gst_element_get_static_pad (self->priv->udpsink_tees[c], "sink");
    padname = g_strdup_printf ("sink_%u", c);
    ghostpad = gst_ghost_pad_new (padname, pad);
    g_free (padname);
    gst_object_unref (pad);

    gst_pad_set_active (ghostpad, TRUE);
    gst_element_add_pad (self->priv->gst_sink, ghostpad);

    fakesink = gst_element_factory_make ("fakesink", NULL);

    if (!fakesink)
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not make the fakesink element");
      return;
    }

    if (!gst_bin_add (GST_BIN (self->priv->gst_sink), fakesink))
    {
      gst_object_unref (fakesink);
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not add the fakesink element to the transmitter sink bin");
      return;
    }

    g_object_set (fakesink,
        "async", FALSE,
        "sync", FALSE,
        NULL);

    pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src_%u");
    pad2 = gst_element_get_static_pad (fakesink, "sink");

    ret = gst_pad_link (pad, pad2);

    gst_object_unref (pad2);
    gst_object_unref (pad);

    if (GST_PAD_LINK_FAILED(ret))
    {
      trans->construction_error = g_error_new (FS_ERROR,
          FS_ERROR_CONSTRUCTION,
          "Could not link the tee to the fakesink");
      return;
    }
  }

  GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}

static void
fs_rawudp_transmitter_dispose (GObject *object)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER (object);

  if (self->priv->disposed)
    /* If dispose did already run, return. */
    return;

  if (self->priv->gst_src)
  {
    gst_object_unref (self->priv->gst_src);
    self->priv->gst_src = NULL;
  }

  if (self->priv->gst_sink)
  {
    gst_object_unref (self->priv->gst_sink);
    self->priv->gst_sink = NULL;
  }

  /* Make sure dispose does not run twice. */
  self->priv->disposed = TRUE;

  parent_class->dispose (object);
}

static void
fs_rawudp_transmitter_finalize (GObject *object)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER (object);

  if (self->priv->udpsrc_funnels)
  {
    g_free (self->priv->udpsrc_funnels);
    self->priv->udpsrc_funnels = NULL;
  }

  if (self->priv->udpsink_tees)
  {
    g_free (self->priv->udpsink_tees);
    self->priv->udpsink_tees = NULL;
  }

  if (self->priv->udpports)
  {
    g_free (self->priv->udpports);
    self->priv->udpports = NULL;
  }

  g_mutex_clear (&self->priv->mutex);

  parent_class->finalize (object);
}

static void
fs_rawudp_transmitter_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER (object);

  switch (prop_id)
  {
    case PROP_GST_SINK:
      g_value_set_object (value, self->priv->gst_sink);
      break;
    case PROP_GST_SRC:
      g_value_set_object (value, self->priv->gst_src);
      break;
    case PROP_COMPONENTS:
      g_value_set_uint (value, self->components);
      break;
    case PROP_TYPE_OF_SERVICE:
      g_mutex_lock (&self->priv->mutex);
      g_value_set_uint (value, self->priv->type_of_service);
      g_mutex_unlock (&self->priv->mutex);
      break;
    case PROP_DO_TIMESTAMP:
      g_value_set_boolean (value, self->priv->do_timestamp);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
fs_rawudp_transmitter_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER (object);

  switch (prop_id)
  {
    case PROP_COMPONENTS:
      self->components = g_value_get_uint (value);
      break;
    case PROP_TYPE_OF_SERVICE:
      fs_rawudp_transmitter_set_type_of_service (self,
          g_value_get_uint (value));
      break;
    case PROP_DO_TIMESTAMP:
      self->priv->do_timestamp = g_value_get_boolean (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}


/**
 * fs_rawudp_transmitter_new_stream_rawudp_transmitter:
 * @transmitter: a #FsTranmitter
 * @participant: the #FsParticipant for which the #FsStream using this
 * new #FsStreamTransmitter is created
 *
 * This function will create a new #FsStreamTransmitter element for a
 * specific participant for this #FsRawUdpTransmitter
 *
 * Returns: a new #FsStreamTransmitter
 */

static FsStreamTransmitter *
fs_rawudp_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
    FsParticipant *participant,
    guint n_parameters,
    GParameter *parameters,
    GError **error)
{
  FsRawUdpTransmitter *self = FS_RAWUDP_TRANSMITTER (transmitter);

  return FS_STREAM_TRANSMITTER (fs_rawudp_stream_transmitter_newv (
        self, n_parameters, parameters, error));
}


/*
 * The UdpPort structure is a ref-counted pseudo-object use to represent
 * one ip:port combo on which we listen and send, so it includes  a udpsrc
 * and a multiudpsink
 */

struct _UdpPort {
  /* Protected by the transmitter mutex */
  gint refcount;

  GstElement *udpsrc;
  GstPad *udpsrc_requested_pad;

  GstElement *udpsink;
  GstPad *udpsink_requested_pad;

  gchar *requested_ip;
  guint requested_port;

  guint port;

  GSocket *socket;

  /* These are just convenience pointers to our parent transmitter */
  GstElement *funnel;
  GstElement *tee;

  guint component_id;

  /* Everything below is protected by the mutex */
  GMutex mutex;
  GArray *known_addresses;
};

struct KnownAddress {
  FsRawUdpAddressUniqueCallbackFunc callback;
  gpointer user_data;
  GSocketAddress *addr;
};

static GSocket *
_bind_port (
    const gchar *ip,
    guint port,
    guint *used_port,
    int tos,
    GError **error)
{
  GSocketAddress *socket_addr;
  GInetAddress *addr;
  GSocket *socket;
  int fd;

  if (ip)
  {
    addr = g_inet_address_new_from_string (ip);
    if (!addr) {
      g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
          "Invalid IP address %s passed", ip);
      return NULL;
    }
  }
  else
  {
    addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
  }

  socket = g_socket_new (g_inet_address_get_family (addr),
      G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, error);
  if (!socket)
    return FALSE;

  for (;;) {
    socket_addr = g_inet_socket_address_new (addr, port);

    if (g_socket_bind (socket, socket_addr, FALSE, NULL))
      break;

    g_object_unref (socket_addr);

    GST_INFO ("could not bind port %d", port);
    port += 2;
    if (port > 65535)
    {
      g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
          "Could not bind the socket to a port");
      g_socket_close (socket, NULL);
      g_object_unref (socket);
      return NULL;
    }
  }

  g_object_unref (socket_addr);
  g_object_unref (addr);

  *used_port = port;

  fd = g_socket_get_fd (socket);

  if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
    GST_WARNING ("could not set socket ToS: %s", g_strerror (errno));

#ifdef IPV6_TCLASS
  if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
    GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif

  return socket;
}

static GstElement *
_create_sinksource (
    gchar *elementname,
    GstBin *bin,
    GstElement *teefunnel,
    GstElement *filter,
    GSocket *socket,
    GstPadDirection direction,
    gboolean do_timestamp,
    GstPad **requested_pad,
    GError **error)
{
  GstElement *elem;
  GstPadLinkReturn ret = GST_PAD_LINK_OK;
  GstPad *elempad = NULL;
  GstStateChangeReturn state_ret;

  g_assert (direction == GST_PAD_SINK || direction == GST_PAD_SRC);

  elem = gst_element_factory_make (elementname, NULL);
  if (!elem)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not create the %s element", elementname);
    return NULL;
  }

  g_object_set (elem,
      "auto-multicast", FALSE,
      "close-socket", FALSE,
      "socket", socket,
      NULL);

  if (direction == GST_PAD_SINK)
    g_object_set (elem,
        "async", FALSE,
        "sync", FALSE,
        NULL);
  else
    g_object_set (elem,
        "do-timestamp", do_timestamp,
        NULL);

  if (!gst_bin_add (bin, elem))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not add the %s element to the gst %s bin", elementname,
        (direction == GST_PAD_SINK) ? "sink" : "src");
    gst_object_unref (elem);
    return NULL;
  }

  if (direction == GST_PAD_SINK)
    *requested_pad = gst_element_get_request_pad (teefunnel, "src_%u");
  else
    *requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u");

  if (!*requested_pad)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not get the %s request pad from the %s",
        (direction == GST_PAD_SINK) ? "src" : "sink",
        (direction == GST_PAD_SINK) ? "tee" : "funnel");
    goto error;
  }

  if (direction == GST_PAD_SINK)
    elempad = gst_element_get_static_pad (elem, "sink");
  else
    elempad = gst_element_get_static_pad (elem, "src");

  if (filter)
  {
    GstPad *filterpad = NULL;

    if (!gst_bin_add (bin, filter))
    {
      g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
          "Could not add the filter element to the gst %s bin",
          (direction == GST_PAD_SINK) ? "sink" : "src");
      goto error;
    }

    if (direction == GST_PAD_SINK)
      filterpad = gst_element_get_static_pad (filter, "src");
    else
      filterpad = gst_element_get_static_pad (filter, "sink");

    if (direction == GST_PAD_SINK)
      ret = gst_pad_link (filterpad, elempad);
    else
      ret = gst_pad_link (elempad, filterpad);

    gst_object_unref (elempad);
    gst_object_unref (filterpad);
    elempad = NULL;

    if (GST_PAD_LINK_FAILED(ret))
    {
      g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
          "Could not link the new element %s (%d)", elementname, ret);
      goto error;
    }

    if (direction == GST_PAD_SINK)
      elempad = gst_element_get_static_pad (filter, "sink");
    else
      elempad = gst_element_get_static_pad (filter, "src");


    if (!gst_element_sync_state_with_parent (filter))
    {
      g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
          "Could not sync the state of the new filte rwith its parent");
      goto error;
    }
  }

  if (direction != GST_PAD_SINK)
    ret = gst_pad_link (elempad, *requested_pad);

  if (GST_PAD_LINK_FAILED(ret))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the new element %s (%d)", elementname, ret);
    goto error;
  }

  if (!gst_element_sync_state_with_parent (elem))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not sync the state of the new %s with its parent",
        elementname);
    goto error;
  }

  if (direction == GST_PAD_SINK)
    ret = gst_pad_link (*requested_pad, elempad);

  if (GST_PAD_LINK_FAILED(ret))
  {
    g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
        "Could not link the new element %s (%d)", elementname, ret);
    goto error;
  }

  gst_object_unref (elempad);

  return elem;

 error:

  gst_element_set_locked_state (elem, TRUE);
  state_ret = gst_element_set_state (elem, GST_STATE_NULL);
  if (state_ret != GST_STATE_CHANGE_SUCCESS)
    GST_ERROR ("On error, could not reset %s to state NULL (%s)", elementname,
        gst_element_state_change_return_get_name (state_ret));
  if (!gst_bin_remove (bin, elem))
    GST_ERROR ("Could not remove element %s from bin on error", elementname);

  if (elempad)
    gst_object_unref (elempad);

  return NULL;
}


static UdpPort *
fs_rawudp_transmitter_get_udpport_locked (FsRawUdpTransmitter *trans,
    guint component_id,
    const gchar *requested_ip,
    guint requested_port)
{
  UdpPort *udpport;
  GList *udpport_e;

  for (udpport_e = g_list_first (trans->priv->udpports[component_id]);
       udpport_e;
       udpport_e = g_list_next (udpport_e))
  {
    udpport = udpport_e->data;
    if (requested_port == udpport->requested_port &&
        ((requested_ip == NULL && udpport->requested_ip == NULL) ||
            (requested_ip && udpport->requested_ip &&
                !strcmp (requested_ip, udpport->requested_ip))))
    {
      GST_LOG ("Got port refcount %d->%d", udpport->refcount,
          udpport->refcount+1);
      udpport->refcount++;
      return udpport;
    }
  }

  return NULL;
}


UdpPort *
fs_rawudp_transmitter_get_udpport (FsRawUdpTransmitter *trans,
    guint component_id,
    const gchar *requested_ip,
    guint requested_port,
    GError **error)
{
  UdpPort *udpport;
  UdpPort *tmpudpport;
  int tos;

  /* First lets check if we already have one */
  if (component_id > trans->components)
  {
    g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
        "Invalid component %d > %d", component_id, trans->components);
    return NULL;
  }

  g_mutex_lock (&trans->priv->mutex);
  udpport = fs_rawudp_transmitter_get_udpport_locked (trans, component_id,
      requested_ip, requested_port);
  tos = trans->priv->type_of_service;
  g_mutex_unlock (&trans->priv->mutex);

  if (udpport)
    return udpport;

  GST_DEBUG ("Make new UdpPort for component %u requesting %s:%u", component_id,
      requested_ip ? requested_ip : "ANY", requested_port);

  udpport = g_slice_new0 (UdpPort);

  udpport->refcount = 1;
  udpport->requested_ip = g_strdup (requested_ip);
  udpport->requested_port = requested_port;
  udpport->component_id = component_id;
  g_mutex_init (&udpport->mutex);
  udpport->known_addresses = g_array_new (TRUE, FALSE,
      sizeof (struct KnownAddress));

  /* Now lets bind both ports */

  udpport->socket = _bind_port (requested_ip, requested_port, &udpport->port,
      tos, error);
  if (!udpport->socket)
    goto error;

  /* Now lets create the elements */

  udpport->tee = trans->priv->udpsink_tees[component_id];
  udpport->funnel = trans->priv->udpsrc_funnels[component_id];

  udpport->udpsrc = _create_sinksource ("udpsrc",
      GST_BIN (trans->priv->gst_src), udpport->funnel, NULL,
      udpport->socket, GST_PAD_SRC, trans->priv->do_timestamp,
      &udpport->udpsrc_requested_pad, error);
  if (!udpport->udpsrc)
    goto error;

  udpport->udpsink = _create_sinksource ("multiudpsink",
      GST_BIN (trans->priv->gst_sink), udpport->tee, NULL,
      udpport->socket, GST_PAD_SINK, FALSE, &udpport->udpsink_requested_pad,
      error);
  if (!udpport->udpsink)
    goto error;

  g_mutex_lock (&trans->priv->mutex);

  /* Check if someone else added the same port at the same time */
  tmpudpport = fs_rawudp_transmitter_get_udpport_locked (trans, component_id,
      requested_ip, requested_port);

  if (tmpudpport)
  {
    g_mutex_unlock (&trans->priv->mutex);
    fs_rawudp_transmitter_put_udpport (trans, udpport);
    return tmpudpport;
  }

  trans->priv->udpports[component_id] =
    g_list_prepend (trans->priv->udpports[component_id], udpport);
  g_mutex_unlock (&trans->priv->mutex);

  return udpport;

 error:
  fs_rawudp_transmitter_put_udpport (trans, udpport);
  return NULL;
}

void
fs_rawudp_transmitter_put_udpport (FsRawUdpTransmitter *trans,
  UdpPort *udpport)
{
  GST_LOG ("Put port refcount %d->%d", udpport->refcount, udpport->refcount-1);

  g_mutex_lock (&trans->priv->mutex);

  if (udpport->refcount > 1)
  {
    udpport->refcount--;
    g_mutex_unlock (&trans->priv->mutex);
    return;
  }

  trans->priv->udpports[udpport->component_id] =
    g_list_remove (trans->priv->udpports[udpport->component_id], udpport);

  g_mutex_unlock (&trans->priv->mutex);

  if (udpport->udpsrc)
  {
    GstStateChangeReturn ret;
    gst_element_set_locked_state (udpport->udpsrc, TRUE);
    ret = gst_element_set_state (udpport->udpsrc, GST_STATE_NULL);
    if (ret != GST_STATE_CHANGE_SUCCESS)
      GST_ERROR ("Error changing state of udpsrc: %s",
          gst_element_state_change_return_get_name (ret));
    if (!gst_bin_remove (GST_BIN (trans->priv->gst_src), udpport->udpsrc))
      GST_ERROR ("Could not remove udpsrc element from transmitter source");
  }

  if (udpport->udpsrc_requested_pad)
  {
    gst_element_release_request_pad (udpport->funnel,
        udpport->udpsrc_requested_pad);
    gst_object_unref (udpport->udpsrc_requested_pad);
  }

  if (udpport->udpsink_requested_pad)
  {
    gst_element_release_request_pad (udpport->tee,
        udpport->udpsink_requested_pad);
    gst_object_unref (udpport->udpsink_requested_pad);
  }

  if (udpport->udpsink)
  {
    GstStateChangeReturn ret;
    gst_element_set_locked_state (udpport->udpsink, TRUE);
    ret = gst_element_set_state (udpport->udpsink, GST_STATE_NULL);
    if (ret != GST_STATE_CHANGE_SUCCESS)
      GST_ERROR ("Error changing state of udpsink: %s",
          gst_element_state_change_return_get_name (ret));
    if (!gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpport->udpsink))
      GST_ERROR ("Could not remove udpsink element from transmitter source");
  }

  if (udpport->socket)
    g_socket_close (udpport->socket, NULL);
  g_clear_object (&udpport->socket);

  if (udpport->known_addresses)
  {
    guint i;
    for (i = 0; i < udpport->known_addresses->len; i++)
      g_object_unref (g_array_index (udpport->known_addresses,
              struct KnownAddress, i).addr);
    g_array_free (udpport->known_addresses, TRUE);
  }

  g_free (udpport->requested_ip);
  g_mutex_clear (&udpport->mutex);
  g_slice_free (UdpPort, udpport);
}

void
fs_rawudp_transmitter_udpport_add_dest (UdpPort *udpport,
    const gchar *ip,
    gint port)
{
  GST_DEBUG ("Adding dest %s:%d", ip, port);
  g_signal_emit_by_name (udpport->udpsink, "add", ip, port);
  gst_element_send_event (udpport->udpsink,
      gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
          gst_structure_new ("GstForceKeyUnit",
              "all-headers", G_TYPE_BOOLEAN, TRUE,
              NULL)));
}


void
fs_rawudp_transmitter_udpport_remove_dest (UdpPort *udpport,
  const gchar *ip,
    gint port)
{
  g_signal_emit_by_name (udpport->udpsink, "remove", ip, port);
}

gboolean
fs_rawudp_transmitter_udpport_sendto (UdpPort *udpport,
    gchar *msg,
    size_t len,
    const struct sockaddr *to,
    socklen_t tolen,
    GError **error)
{
  GSocketAddress *addr;
  gboolean ret;

  addr = g_socket_address_new_from_native ((gpointer) to, tolen);
  ret = g_socket_send_to (udpport->socket, addr, msg, len, NULL, error);
  g_object_unref (addr);

  return ret;
}

gulong
fs_rawudp_transmitter_udpport_connect_recv (UdpPort *udpport,
    GstPadProbeCallback callback,
    gpointer user_data)
{
  GstPad *pad;
  gulong id;

  pad = gst_element_get_static_pad (udpport->udpsrc, "src");

  id = gst_pad_add_probe (pad,
      GST_PAD_PROBE_TYPE_BUFFER,
      callback, user_data, NULL);

  gst_object_unref (pad);

  return id;
}


void
fs_rawudp_transmitter_udpport_disconnect_recv (UdpPort *udpport,
    gulong id)
{
  GstPad *pad = gst_element_get_static_pad (udpport->udpsrc, "src");

  gst_pad_remove_probe (pad, id);

  gst_object_unref (pad);
}

gboolean
fs_rawudp_transmitter_udpport_is_pad (UdpPort *udpport,
    GstPad *pad)
{
  GstPad *mypad;
  gboolean res;

  mypad =  gst_element_get_static_pad (udpport->udpsrc, "src");

  res = (mypad == pad);

  gst_object_unref (mypad);

  return res;
}


gint
fs_rawudp_transmitter_udpport_get_port (UdpPort *udpport)
{
  return udpport->port;
}


static GType
fs_rawudp_transmitter_get_stream_transmitter_type (FsTransmitter *transmitter)
{
  return FS_TYPE_RAWUDP_STREAM_TRANSMITTER;
}

/**
 * fs_rawudp_transmitter_udpport_add_known_address:
 * @udpport: a #UdpPort
 * @address: the new #GSocketAddress that we know
 * @callback: a Callback that will be called if the uniqueness of an address
 *   changes
 * @user_data: data passed back to the callback
 *
 * This function stores the passed address and tells the caller if it was
 * unique or not. The callback is called when the uniqueness changes.
 *
 * Returns: %TRUE if the new address is unique, %FALSE otherwise
 */

gboolean
fs_rawudp_transmitter_udpport_add_known_address (UdpPort *udpport,
    GSocketAddress *address,
    FsRawUdpAddressUniqueCallbackFunc callback,
    gpointer user_data)
{
  gint i;
  gboolean unique = FALSE;
  struct KnownAddress newka = {0};
  guint counter = 0;
  struct KnownAddress *prev_ka = NULL;

  g_mutex_lock (&udpport->mutex);

  for (i = 0;
       g_array_index (udpport->known_addresses,
           struct KnownAddress, i).callback;
       i++)
  {
    struct KnownAddress *ka = &g_array_index (udpport->known_addresses,
        struct KnownAddress, i);
    if (fs_g_inet_socket_address_equal (address, ka->addr))
    {
      g_assert (!(ka->callback == callback && ka->user_data == user_data));

      prev_ka = ka;
      counter++;
    }
  }

  if (counter == 0)
  {
    unique = TRUE;
  }
  else if (counter == 1)
  {
    if (prev_ka->callback)
      prev_ka->callback (FALSE, prev_ka->addr, prev_ka->user_data);
  }

  newka.addr = g_object_ref (address);
  newka.callback = callback;
  newka.user_data = user_data;

  g_array_append_val (udpport->known_addresses, newka);

  g_mutex_unlock (&udpport->mutex);

  return unique;
}

/**
 * fs_rawudp_transmitter_udpport_remove_known_address:
 * @udpport: a #UdpPort
 * @address: the address to remove
 * @callback: the callback passed to the corresponding
 *  fs_rawudp_transmitter_udpport_add_known_address() call
 * @user_data: the user_data passed to the corresponding
 *  fs_rawudp_transmitter_udpport_add_known_address() call
 *
 * Removes a known address from the list and calls the notifiers if another
 * address becomes unique
 */

void
fs_rawudp_transmitter_udpport_remove_known_address (UdpPort *udpport,
    GSocketAddress *address,
    FsRawUdpAddressUniqueCallbackFunc callback,
    gpointer user_data)
{
  gint i;
  gint remove_i = -1;
  guint counter = 0;
  struct KnownAddress *prev_ka = NULL;

  g_mutex_lock (&udpport->mutex);

  for (i = 0;
       g_array_index (udpport->known_addresses, struct KnownAddress, i).callback;
       i++)
  {
    struct KnownAddress *ka = &g_array_index (udpport->known_addresses,
        struct KnownAddress, i);
    if (fs_g_inet_socket_address_equal (address, ka->addr))
    {
      if (ka->callback == callback && ka->user_data == user_data)
      {
        remove_i = i;
      }
      else
      {
        counter++;
        prev_ka = ka;
      }
    }
  }

  if (remove_i == -1)
  {
    GST_ERROR ("Tried to remove unknown known address");
    goto out;
  }

  if (counter == 1)
    prev_ka->callback (TRUE, prev_ka->addr, prev_ka->user_data);

  g_object_unref (g_array_index (udpport->known_addresses,
          struct KnownAddress, remove_i).addr);
  g_array_remove_index_fast (udpport->known_addresses, remove_i);

 out:

  g_mutex_unlock (&udpport->mutex);
}

static void
fs_rawudp_transmitter_set_type_of_service (FsRawUdpTransmitter *self,
    gint tos)
{
  gint i;

  g_mutex_lock (&self->priv->mutex);
  if (self->priv->type_of_service == tos)
    goto out;

  self->priv->type_of_service = tos;

  for (i = 0; i < self->components; i++)
  {
    GList *item;

    for (item = self->priv->udpports[i]; item; item = item->next)
    {
      UdpPort *udpport = item->data;
      int fd = g_socket_get_fd (udpport->socket);

      if (setsockopt (fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0)
        GST_WARNING ( "could not set socket ToS: %s", g_strerror (errno));

#ifdef IPV6_TCLASS
      if (setsockopt (fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, sizeof (tos)) < 0)
        GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif
    }
  }

 out:
  g_mutex_unlock (&self->priv->mutex);
}


/* TEMPORARY: should be in Glib */
gboolean
fs_g_inet_socket_address_equal (GSocketAddress *addr1, GSocketAddress *addr2)
{
  GInetSocketAddress *inet1;
  GInetSocketAddress *inet2;

  if (!G_IS_INET_SOCKET_ADDRESS (addr1) || !G_IS_INET_SOCKET_ADDRESS (addr2))
    return FALSE;

  inet1 = G_INET_SOCKET_ADDRESS (addr1);
  inet2 = G_INET_SOCKET_ADDRESS (addr2);

  if (g_inet_socket_address_get_port (inet1) ==
      g_inet_socket_address_get_port (inet2) &&
      g_inet_address_equal (g_inet_socket_address_get_address (inet1),
          g_inet_socket_address_get_address (inet2)))
    return TRUE;
  else
    return FALSE;
}