/*
* Farstream - Farstream Multicast UDP Transmitter
*
* Copyright 2007-2008 Collabora Ltd.
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* Copyright 2007-2008 Nokia Corp.
*
* fs-multicast-transmitter.c - A Farstream multicast UDP transmitter
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/**
* SECTION:fs-multicast-transmitter
* @short_description: A transmitter for multicast UDP
*
* This transmitter provides multicast udp
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "fs-multicast-transmitter.h"
#include "fs-multicast-stream-transmitter.h"
#include <farstream/fs-conference.h>
#include <farstream/fs-plugin.h>
#include <string.h>
#include <sys/types.h>
#include <gio/gio.h>
#ifdef HAVE_UNISTD_H
# include <unistd.h>
#endif
#ifdef G_OS_WIN32
# include <ws2tcpip.h>
# define close closesocket
#else /*G_OS_WIN32*/
# include <sys/socket.h>
# include <netinet/in.h>
# include <arpa/inet.h>
#endif /*G_OS_WIN32*/
GST_DEBUG_CATEGORY (fs_multicast_transmitter_debug);
#define GST_CAT_DEFAULT fs_multicast_transmitter_debug
/* Signals */
enum
{
LAST_SIGNAL
};
/* props */
enum
{
PROP_0,
PROP_GST_SINK,
PROP_GST_SRC,
PROP_COMPONENTS,
PROP_TYPE_OF_SERVICE,
PROP_DO_TIMESTAMP
};
struct _FsMulticastTransmitterPrivate
{
/* We hold references to this element */
GstElement *gst_sink;
GstElement *gst_src;
/* We don't hold a reference to these elements, they are owned
by the bins */
/* They are tables of pointers, one per component */
GstElement **udpsrc_funnels;
GstElement **udpsink_tees;
GMutex mutex;
GList **udpsocks;
gint type_of_service;
gboolean do_timestamp;
gboolean disposed;
};
#define FS_MULTICAST_TRANSMITTER_GET_PRIVATE(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), FS_TYPE_MULTICAST_TRANSMITTER, \
FsMulticastTransmitterPrivate))
#define FS_MULTICAST_TRANSMITTER_LOCK(self) \
g_mutex_lock (&(self)->priv->mutex);
#define FS_MULTICAST_TRANSMITTER_UNLOCK(self) \
g_mutex_unlock (&(self)->priv->mutex);
static void fs_multicast_transmitter_class_init (
FsMulticastTransmitterClass *klass);
static void fs_multicast_transmitter_init (FsMulticastTransmitter *self);
static void fs_multicast_transmitter_constructed (GObject *object);
static void fs_multicast_transmitter_dispose (GObject *object);
static void fs_multicast_transmitter_finalize (GObject *object);
static void fs_multicast_transmitter_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec);
static void fs_multicast_transmitter_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec);
static FsStreamTransmitter *fs_multicast_transmitter_new_stream_transmitter (
FsTransmitter *transmitter, FsParticipant *participant,
guint n_parameters, GParameter *parameters, GError **error);
static GType fs_multicast_transmitter_get_stream_transmitter_type (
FsTransmitter *transmitter);
static void fs_multicast_transmitter_set_type_of_service (
FsMulticastTransmitter *self,
gint tos);
static GObjectClass *parent_class = NULL;
//static guint signals[LAST_SIGNAL] = { 0 };
/*
* Lets register the plugin
*/
static GType type = 0;
GType
fs_multicast_transmitter_get_type (void)
{
g_assert (type);
return type;
}
static GType
fs_multicast_transmitter_register_type (FsPlugin *module)
{
static const GTypeInfo info = {
sizeof (FsMulticastTransmitterClass),
NULL,
NULL,
(GClassInitFunc) fs_multicast_transmitter_class_init,
NULL,
NULL,
sizeof (FsMulticastTransmitter),
0,
(GInstanceInitFunc) fs_multicast_transmitter_init
};
GST_DEBUG_CATEGORY_INIT (fs_multicast_transmitter_debug,
"fsmulticasttransmitter", 0,
"Farstream multicast UDP transmitter");
fs_multicast_stream_transmitter_register_type (module);
type = g_type_register_static (FS_TYPE_TRANSMITTER,
"FsMulticastTransmitter", &info, 0);
return type;
}
FS_INIT_PLUGIN (multicast, transmitter)
static void
fs_multicast_transmitter_class_init (FsMulticastTransmitterClass *klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
FsTransmitterClass *transmitter_class = FS_TRANSMITTER_CLASS (klass);
parent_class = g_type_class_peek_parent (klass);
gobject_class->set_property = fs_multicast_transmitter_set_property;
gobject_class->get_property = fs_multicast_transmitter_get_property;
gobject_class->constructed = fs_multicast_transmitter_constructed;
g_object_class_override_property (gobject_class, PROP_GST_SRC, "gst-src");
g_object_class_override_property (gobject_class, PROP_GST_SINK, "gst-sink");
g_object_class_override_property (gobject_class, PROP_COMPONENTS,
"components");
g_object_class_override_property (gobject_class, PROP_TYPE_OF_SERVICE,
"tos");
g_object_class_override_property (gobject_class, PROP_DO_TIMESTAMP,
"do-timestamp");
transmitter_class->new_stream_transmitter =
fs_multicast_transmitter_new_stream_transmitter;
transmitter_class->get_stream_transmitter_type =
fs_multicast_transmitter_get_stream_transmitter_type;
gobject_class->dispose = fs_multicast_transmitter_dispose;
gobject_class->finalize = fs_multicast_transmitter_finalize;
g_type_class_add_private (klass, sizeof (FsMulticastTransmitterPrivate));
}
static void
fs_multicast_transmitter_init (FsMulticastTransmitter *self)
{
/* member init */
self->priv = FS_MULTICAST_TRANSMITTER_GET_PRIVATE (self);
self->priv->disposed = FALSE;
self->components = 2;
g_mutex_init (&self->priv->mutex);
self->priv->do_timestamp = TRUE;
}
static void
fs_multicast_transmitter_constructed (GObject *object)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER_CAST (object);
FsTransmitter *trans = FS_TRANSMITTER_CAST (self);
GstPad *pad = NULL, *pad2 = NULL;
GstPad *ghostpad = NULL;
gchar *padname;
GstPadLinkReturn ret;
int c; /* component_id */
/* We waste one space in order to have the index be the component_id */
self->priv->udpsrc_funnels = g_new0 (GstElement *, self->components+1);
self->priv->udpsink_tees = g_new0 (GstElement *, self->components+1);
self->priv->udpsocks = g_new0 (GList *, self->components+1);
/* First we need the src elemnet */
self->priv->gst_src = gst_bin_new (NULL);
if (!self->priv->gst_src) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not build the transmitter src bin");
return;
}
gst_object_ref (self->priv->gst_src);
/* Second, we do the sink element */
self->priv->gst_sink = gst_bin_new (NULL);
if (!self->priv->gst_sink) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not build the transmitter sink bin");
return;
}
g_object_set (G_OBJECT (self->priv->gst_sink),
"async-handling", TRUE,
NULL);
gst_object_ref (self->priv->gst_sink);
for (c = 1; c <= self->components; c++) {
GstElement *fakesink = NULL;
/* Lets create the RTP source funnel */
self->priv->udpsrc_funnels[c] = gst_element_factory_make ("funnel", NULL);
if (!self->priv->udpsrc_funnels[c]) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not make the funnel element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->gst_src),
self->priv->udpsrc_funnels[c])) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the funnel element to the transmitter src bin");
}
pad = gst_element_get_static_pad (self->priv->udpsrc_funnels[c], "src");
padname = g_strdup_printf ("src_%u", c);
ghostpad = gst_ghost_pad_new (padname, pad);
g_free (padname);
gst_object_unref (pad);
gst_pad_set_active (ghostpad, TRUE);
gst_element_add_pad (self->priv->gst_src, ghostpad);
/* Lets create the RTP sink tee */
self->priv->udpsink_tees[c] = gst_element_factory_make ("tee", NULL);
if (!self->priv->udpsink_tees[c]) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not make the tee element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->gst_sink),
self->priv->udpsink_tees[c])) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the tee element to the transmitter sink bin");
}
pad = gst_element_get_static_pad (self->priv->udpsink_tees[c], "sink");
padname = g_strdup_printf ("sink_%u", c);
ghostpad = gst_ghost_pad_new (padname, pad);
g_free (padname);
gst_object_unref (pad);
gst_pad_set_active (ghostpad, TRUE);
gst_element_add_pad (self->priv->gst_sink, ghostpad);
fakesink = gst_element_factory_make ("fakesink", NULL);
if (!fakesink) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not make the fakesink element");
return;
}
if (!gst_bin_add (GST_BIN (self->priv->gst_sink), fakesink))
{
gst_object_unref (fakesink);
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not add the fakesink element to the transmitter sink bin");
return;
}
g_object_set (fakesink,
"async", FALSE,
"sync" , FALSE,
NULL);
pad = gst_element_get_request_pad (self->priv->udpsink_tees[c], "src_%u");
pad2 = gst_element_get_static_pad (fakesink, "sink");
ret = gst_pad_link (pad, pad2);
gst_object_unref (pad2);
gst_object_unref (pad);
if (GST_PAD_LINK_FAILED(ret)) {
trans->construction_error = g_error_new (FS_ERROR,
FS_ERROR_CONSTRUCTION,
"Could not link the tee to the fakesink");
return;
}
}
GST_CALL_PARENT (G_OBJECT_CLASS, constructed, (object));
}
static void
fs_multicast_transmitter_dispose (GObject *object)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);
if (self->priv->disposed) {
/* If dispose did already run, return. */
return;
}
if (self->priv->gst_src) {
gst_object_unref (self->priv->gst_src);
self->priv->gst_src = NULL;
}
if (self->priv->gst_sink) {
gst_object_unref (self->priv->gst_sink);
self->priv->gst_sink = NULL;
}
/* Make sure dispose does not run twice. */
self->priv->disposed = TRUE;
parent_class->dispose (object);
}
static void
fs_multicast_transmitter_finalize (GObject *object)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);
if (self->priv->udpsrc_funnels) {
g_free (self->priv->udpsrc_funnels);
self->priv->udpsrc_funnels = NULL;
}
if (self->priv->udpsink_tees) {
g_free (self->priv->udpsink_tees);
self->priv->udpsink_tees = NULL;
}
if (self->priv->udpsocks) {
g_free (self->priv->udpsocks);
self->priv->udpsocks = NULL;
}
g_mutex_clear (&self->priv->mutex);
parent_class->finalize (object);
}
static void
fs_multicast_transmitter_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);
switch (prop_id) {
case PROP_GST_SINK:
g_value_set_object (value, self->priv->gst_sink);
break;
case PROP_GST_SRC:
g_value_set_object (value, self->priv->gst_src);
break;
case PROP_COMPONENTS:
g_value_set_uint (value, self->components);
break;
case PROP_TYPE_OF_SERVICE:
FS_MULTICAST_TRANSMITTER_LOCK (self);
g_value_set_uint (value, self->priv->type_of_service);
FS_MULTICAST_TRANSMITTER_UNLOCK (self);
break;
case PROP_DO_TIMESTAMP:
g_value_set_boolean (value, self->priv->do_timestamp);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
fs_multicast_transmitter_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (object);
switch (prop_id) {
case PROP_COMPONENTS:
self->components = g_value_get_uint (value);
break;
case PROP_TYPE_OF_SERVICE:
fs_multicast_transmitter_set_type_of_service (self,
g_value_get_uint (value));
break;
case PROP_DO_TIMESTAMP:
self->priv->do_timestamp = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/**
* fs_multicast_transmitter_new_stream_multicast_transmitter:
* @transmitter: a #FsTranmitter
* @participant: the #FsParticipant for which the #FsStream using this
* new #FsStreamTransmitter is created
*
* This function will create a new #FsStreamTransmitter element for a
* specific participant for this #FsMulticastTransmitter
*
* Returns: a new #FsStreamTransmitter
*/
static FsStreamTransmitter *
fs_multicast_transmitter_new_stream_transmitter (FsTransmitter *transmitter,
FsParticipant *participant, guint n_parameters, GParameter *parameters,
GError **error)
{
FsMulticastTransmitter *self = FS_MULTICAST_TRANSMITTER (transmitter);
return FS_STREAM_TRANSMITTER (fs_multicast_stream_transmitter_newv (
self, n_parameters, parameters, error));
}
/*
* The UdpSock structure is a ref-counted pseudo-object use to represent
* one local_ip:port:multicast_ip trio on which we listen and send,
* so it includes a udpsrc and a multiudpsink. It represents one BSD socket.
* The TTL used is the max TTL requested by any stream.
*/
struct _UdpSock {
GstElement *udpsrc;
GstPad *udpsrc_requested_pad;
GstElement *udpsink;
GstPad *udpsink_requested_pad;
gchar *local_ip;
gchar *multicast_ip;
guint16 port;
/* Protected by the transmitter mutex */
guint8 current_ttl;
gint fd;
GSocket *socket;
/* Protected by the transmitter mutex */
GByteArray *ttls;
/* These are just convenience pointers to our parent transmitter */
GstElement *funnel;
GstElement *tee;
guint component_id;
volatile gint sendcount;
};
static gboolean
_ip_string_into_sockaddr_in (const gchar *ip_as_string,
struct sockaddr_in *sockaddr_in, GError **error)
{
GInetAddress *inetaddr;
GSocketAddress *socket_addr;
gboolean ret;
inetaddr = g_inet_address_new_from_string (ip_as_string);
if (!inetaddr) {
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Invalid IP address %s passed", ip_as_string);
return FALSE;
}
if (g_inet_address_get_family (inetaddr) != G_SOCKET_FAMILY_IPV4) {
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"IP address %s passed is not IPv4", ip_as_string);
g_object_unref (inetaddr);
return 0;
}
socket_addr = g_inet_socket_address_new (inetaddr, 1);
ret = g_socket_address_to_native (socket_addr, sockaddr_in,
sizeof (struct sockaddr_in), error);
g_object_unref (socket_addr);
g_object_unref (inetaddr);
return ret;
}
static gint
_bind_port (
const gchar *local_ip,
const gchar *multicast_ip,
guint16 port,
guchar ttl,
int type_of_service,
GError **error)
{
int sock = -1;
struct sockaddr_in address;
int retval;
guchar loop = 1;
int reuseaddr = 1;
#ifdef HAVE_IP_MREQN
struct ip_mreqn mreq;
#else
struct ip_mreq mreq;
#endif
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
g_assert (multicast_ip);
if (!_ip_string_into_sockaddr_in (multicast_ip, &address, error))
goto error;
memcpy (&mreq.imr_multiaddr, &address.sin_addr,
sizeof (mreq.imr_multiaddr));
if (local_ip)
{
struct sockaddr_in tmpaddr;
if (!_ip_string_into_sockaddr_in (local_ip, &tmpaddr, error))
goto error;
#ifdef HAVE_IP_MREQN
memcpy (&mreq.imr_address, &tmpaddr.sin_addr, sizeof (mreq.imr_address));
#else
memcpy (&mreq.imr_interface, &tmpaddr.sin_addr, sizeof (mreq.imr_interface));
#endif
}
else
{
#ifdef HAVE_IP_MREQN
mreq.imr_address.s_addr = INADDR_ANY;
#else
mreq.imr_interface.s_addr = INADDR_ANY;
#endif
}
#ifdef HAVE_IP_MREQN
mreq.imr_ifindex = 0;
#endif
if ((sock = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP)) <= 0) {
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Error creating socket: %s", g_strerror (errno));
goto error;
}
if (setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, (const void *)&ttl,
sizeof (ttl)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Error setting the multicast TTL: %s",
g_strerror (errno));
goto error;
}
if (setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, (const void *)&loop,
sizeof (loop)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Error setting the multicast loop to FALSE: %s",
g_strerror (errno));
goto error;
}
if (setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (const void *)&reuseaddr,
sizeof (reuseaddr)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Error setting reuseaddr to TRUE: %s",
g_strerror (errno));
goto error;
}
#ifdef SO_REUSEPORT
if (setsockopt (sock, SOL_SOCKET, SO_REUSEPORT, (const void *)&reuseaddr,
sizeof (reuseaddr)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Error setting reuseaddr to TRUE: %s",
g_strerror (errno));
goto error;
}
#endif
if (setsockopt (sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(const void *)&mreq, sizeof (mreq)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Could not join the socket to the multicast group: %s",
g_strerror (errno));
goto error;
}
if (setsockopt (sock, IPPROTO_IP, IP_TOS,
&type_of_service, sizeof (type_of_service)) < 0)
GST_WARNING ("could not set socket ToS: %s", g_strerror (errno));
#ifdef IPV6_TCLASS
if (setsockopt (sock, IPPROTO_IPV6, IPV6_TCLASS,
&type_of_service, sizeof (type_of_service)) < 0)
GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif
address.sin_port = htons (port);
retval = bind (sock, (struct sockaddr *) &address, sizeof (address));
if (retval != 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_NETWORK,
"Could not bind to port %d", port);
goto error;
}
return sock;
error:
if (sock >= 0)
close (sock);
return -1;
}
static GstElement *
_create_sinksource (gchar *elementname, GstBin *bin,
GstElement *teefunnel, GSocket *socket,
GstPadDirection direction, GstPad **requested_pad, GError **error)
{
GstElement *elem;
GstPadLinkReturn ret = GST_PAD_LINK_OK;
GstPad *elempad = NULL;
GstStateChangeReturn state_ret;
g_assert (direction == GST_PAD_SINK || direction == GST_PAD_SRC);
elem = gst_element_factory_make (elementname, NULL);
if (!elem) {
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not create the %s element", elementname);
return NULL;
}
g_object_set (elem,
"close-socket", FALSE,
"socket", socket,
"auto-multicast", FALSE,
NULL);
if (!gst_bin_add (bin, elem)) {
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not add the %s element to the gst %s bin", elementname,
(direction == GST_PAD_SINK) ? "sink" : "src");
gst_object_unref (elem);
return NULL;
}
if (direction == GST_PAD_SINK)
*requested_pad = gst_element_get_request_pad (teefunnel, "src_%u");
else
*requested_pad = gst_element_get_request_pad (teefunnel, "sink_%u");
if (!*requested_pad) {
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not get the %s request pad from the %s",
(direction == GST_PAD_SINK) ? "src" : "sink",
(direction == GST_PAD_SINK) ? "tee" : "funnel");
goto error;
}
if (direction == GST_PAD_SINK)
elempad = gst_element_get_static_pad (elem, "sink");
else
elempad = gst_element_get_static_pad (elem, "src");
if (direction != GST_PAD_SINK)
ret = gst_pad_link (elempad, *requested_pad);
if (GST_PAD_LINK_FAILED(ret))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the new element %s (%d)", elementname, ret);
goto error;
}
if (!gst_element_sync_state_with_parent (elem)) {
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not sync the state of the new %s with its parent",
elementname);
goto error;
}
if (direction == GST_PAD_SINK)
ret = gst_pad_link (*requested_pad, elempad);
if (GST_PAD_LINK_FAILED(ret))
{
g_set_error (error, FS_ERROR, FS_ERROR_CONSTRUCTION,
"Could not link the new element %s (%d)", elementname, ret);
goto error;
}
gst_object_unref (elempad);
return elem;
error:
gst_element_set_locked_state (elem, TRUE);
state_ret = gst_element_set_state (elem, GST_STATE_NULL);
if (state_ret != GST_STATE_CHANGE_SUCCESS)
GST_ERROR ("On error, could not reset %s to state NULL (%s)", elementname,
gst_element_state_change_return_get_name (state_ret));
if (!gst_bin_remove (bin, elem))
GST_ERROR ("Could not remove element %s from bin on error", elementname);
if (elempad)
gst_object_unref (elempad);
return NULL;
}
static UdpSock *
fs_multicast_transmitter_get_udpsock_locked (FsMulticastTransmitter *trans,
guint component_id,
const gchar *local_ip,
const gchar *multicast_ip,
guint16 port,
guint8 ttl,
gboolean sending,
GError **error)
{
UdpSock *udpsock;
GList *udpsock_e;
for (udpsock_e = g_list_first (trans->priv->udpsocks[component_id]);
udpsock_e;
udpsock_e = g_list_next (udpsock_e))
{
udpsock = udpsock_e->data;
if (port == udpsock->port &&
!strcmp (multicast_ip, udpsock->multicast_ip) &&
((local_ip == NULL && udpsock->local_ip == NULL) ||
(local_ip && udpsock->local_ip &&
!strcmp (local_ip, udpsock->local_ip))))
{
if (ttl > udpsock->current_ttl)
{
if (setsockopt (udpsock->fd, IPPROTO_IP, IP_MULTICAST_TTL,
(const void *)&ttl, sizeof (ttl)) < 0)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Error setting the multicast TTL: %s",
g_strerror (errno));
return NULL;
}
udpsock->current_ttl = ttl;
}
g_byte_array_append (udpsock->ttls, &ttl, 1);
return udpsock;
}
}
return NULL;
}
UdpSock *
fs_multicast_transmitter_get_udpsock (FsMulticastTransmitter *trans,
guint component_id,
const gchar *local_ip,
const gchar *multicast_ip,
guint16 port,
guint8 ttl,
gboolean sending,
GError **error)
{
UdpSock *udpsock;
UdpSock *tmpudpsock;
GError *local_error = NULL;
int tos;
/* First lets check if we already have one */
if (component_id > trans->components)
{
g_set_error (error, FS_ERROR, FS_ERROR_INVALID_ARGUMENTS,
"Invalid component %d > %d", component_id, trans->components);
return NULL;
}
FS_MULTICAST_TRANSMITTER_LOCK (trans);
udpsock = fs_multicast_transmitter_get_udpsock_locked (trans, component_id,
local_ip, multicast_ip, port, ttl, sending, &local_error);
tos = trans->priv->type_of_service;
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
if (local_error)
{
g_propagate_error (error, local_error);
return NULL;
}
if (udpsock)
{
if (sending)
fs_multicast_transmitter_udpsock_inc_sending (udpsock);
return udpsock;
}
udpsock = g_slice_new0 (UdpSock);
udpsock->local_ip = g_strdup (local_ip);
udpsock->multicast_ip = g_strdup (multicast_ip);
udpsock->fd = -1;
udpsock->component_id = component_id;
udpsock->port = port;
udpsock->current_ttl = ttl;
udpsock->ttls = g_byte_array_new ();
g_byte_array_append (udpsock->ttls, &ttl, 1);
/* Now lets bind both ports */
udpsock->fd = _bind_port (local_ip, multicast_ip, port, ttl, tos, error);
if (udpsock->fd < 0)
goto error;
udpsock->socket = g_socket_new_from_fd (udpsock->fd, error);
if (!udpsock->socket)
goto error;
/* Now lets create the elements */
udpsock->tee = trans->priv->udpsink_tees[component_id];
udpsock->funnel = trans->priv->udpsrc_funnels[component_id];
udpsock->udpsrc = _create_sinksource ("udpsrc",
GST_BIN (trans->priv->gst_src), udpsock->funnel, udpsock->socket,
GST_PAD_SRC, &udpsock->udpsrc_requested_pad, error);
if (!udpsock->udpsrc)
goto error;
udpsock->udpsink = _create_sinksource ("multiudpsink",
GST_BIN (trans->priv->gst_sink), udpsock->tee,
udpsock->socket, GST_PAD_SINK, &udpsock->udpsink_requested_pad, error);
if (!udpsock->udpsink)
goto error;
g_object_set (udpsock->udpsink,
"async", FALSE,
"sync", FALSE,
NULL);
FS_MULTICAST_TRANSMITTER_LOCK (trans);
/* Check if someone else has added the same thing at the same time */
tmpudpsock = fs_multicast_transmitter_get_udpsock_locked (trans, component_id,
local_ip, multicast_ip, port, ttl, sending, &local_error);
if (tmpudpsock || local_error)
{
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
fs_multicast_transmitter_put_udpsock (trans, udpsock, ttl);
if (local_error)
{
g_propagate_error (error, local_error);
goto error;
}
if (sending)
fs_multicast_transmitter_udpsock_inc_sending (udpsock);
return tmpudpsock;
}
trans->priv->udpsocks[component_id] =
g_list_prepend (trans->priv->udpsocks[component_id], udpsock);
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
if (sending)
fs_multicast_transmitter_udpsock_inc_sending (udpsock);
return udpsock;
error:
fs_multicast_transmitter_put_udpsock (trans, udpsock, ttl);
return NULL;
}
void
fs_multicast_transmitter_put_udpsock (FsMulticastTransmitter *trans,
UdpSock *udpsock, guint8 ttl)
{
guint i;
FS_MULTICAST_TRANSMITTER_LOCK (trans);
for (i = udpsock->ttls->len - 1;; i--)
{
if (udpsock->ttls->data[i] == ttl)
{
g_byte_array_remove_index_fast (udpsock->ttls, i);
break;
}
g_return_if_fail (i > 0);
}
if (udpsock->ttls->len > 0)
{
g_assert (udpsock->fd >= 0);
/* If we were the max, check if there is a new max */
if (udpsock->current_ttl == ttl && ttl > 1)
{
guint8 max = 1;
for (i = 0; i < udpsock->ttls->len; i++)
{
if (udpsock->ttls->data[i] > max)
max = udpsock->ttls->data[i];
}
if (max != udpsock->current_ttl)
{
if (setsockopt (udpsock->fd, IPPROTO_IP, IP_MULTICAST_TTL,
(const void *)&max, sizeof (max)) < 0)
{
GST_WARNING ("Error setting the multicast TTL to %u: %s", max,
g_strerror (errno));
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
return;
}
udpsock->current_ttl = max;
}
}
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
return;
}
trans->priv->udpsocks[udpsock->component_id] =
g_list_remove (trans->priv->udpsocks[udpsock->component_id], udpsock);
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
if (udpsock->udpsrc)
{
GstStateChangeReturn ret;
gst_element_set_locked_state (udpsock->udpsrc, TRUE);
ret = gst_element_set_state (udpsock->udpsrc, GST_STATE_NULL);
if (ret != GST_STATE_CHANGE_SUCCESS)
GST_ERROR ("Error changing state of udpsrc: %s",
gst_element_state_change_return_get_name (ret));
if (!gst_bin_remove (GST_BIN (trans->priv->gst_src), udpsock->udpsrc))
GST_ERROR ("Could not remove udpsrc element from transmitter source");
}
if (udpsock->udpsrc_requested_pad)
{
gst_element_release_request_pad (udpsock->funnel,
udpsock->udpsrc_requested_pad);
gst_object_unref (udpsock->udpsrc_requested_pad);
}
if (udpsock->udpsink_requested_pad)
{
gst_element_release_request_pad (udpsock->tee,
udpsock->udpsink_requested_pad);
gst_object_unref (udpsock->udpsink_requested_pad);
}
if (udpsock->udpsink)
{
GstStateChangeReturn ret;
gst_element_set_locked_state (udpsock->udpsink, TRUE);
ret = gst_element_set_state (udpsock->udpsink, GST_STATE_NULL);
if (ret != GST_STATE_CHANGE_SUCCESS)
GST_ERROR ("Error changing state of udpsink: %s",
gst_element_state_change_return_get_name (ret));
if (!gst_bin_remove (GST_BIN (trans->priv->gst_sink), udpsock->udpsink))
GST_ERROR ("Could not remove udpsink element from transmitter source");
}
if (udpsock->socket)
g_object_unref (udpsock->socket);
if (udpsock->fd >= 0)
close (udpsock->fd);
g_byte_array_free (udpsock->ttls, TRUE);
g_free (udpsock->multicast_ip);
g_free (udpsock->local_ip);
g_slice_free (UdpSock, udpsock);
}
void
fs_multicast_transmitter_udpsock_inc_sending (UdpSock *udpsock)
{
if (g_atomic_int_add (&udpsock->sendcount, 1) == 0)
{
g_signal_emit_by_name (udpsock->udpsink, "add", udpsock->multicast_ip,
udpsock->port);
gst_element_send_event (udpsock->udpsink,
gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstForceKeyUnit",
"all-headers", G_TYPE_BOOLEAN, TRUE,
NULL)));
}
}
void
fs_multicast_transmitter_udpsock_dec_sending (UdpSock *udpsock)
{
if (g_atomic_int_dec_and_test (&udpsock->sendcount))
{
g_signal_emit_by_name (udpsock->udpsink, "remove", udpsock->multicast_ip,
udpsock->port);
}
}
static GType
fs_multicast_transmitter_get_stream_transmitter_type (
FsTransmitter *transmitter)
{
return FS_TYPE_MULTICAST_STREAM_TRANSMITTER;
}
void
fs_multicast_transmitter_udpsock_ref (FsMulticastTransmitter *trans,
UdpSock *udpsock, guint8 ttl)
{
FS_MULTICAST_TRANSMITTER_LOCK (trans);
g_byte_array_append (udpsock->ttls, &ttl, 1);
FS_MULTICAST_TRANSMITTER_UNLOCK (trans);
}
static void
fs_multicast_transmitter_set_type_of_service (FsMulticastTransmitter *self,
gint tos)
{
gint i;
FS_MULTICAST_TRANSMITTER_LOCK (self);
if (self->priv->type_of_service == tos)
goto out;
self->priv->type_of_service = tos;
for (i = 0; i < self->components; i++)
{
GList *item;
for (item = self->priv->udpsocks[i]; item; item = item->next)
{
UdpSock *udpsock = item->data;
if (setsockopt (udpsock->fd, IPPROTO_IP, IP_TOS,
&tos, sizeof (tos)) < 0)
GST_WARNING ( "could not set socket tos: %s", g_strerror (errno));
#ifdef IPV6_TCLASS
if (setsockopt (udpsock->fd, IPPROTO_IPV6, IPV6_TCLASS,
&tos, sizeof (tos)) < 0)
GST_WARNING ("could not set TCLASS: %s", g_strerror (errno));
#endif
}
}
out:
FS_MULTICAST_TRANSMITTER_UNLOCK (self);
}