/* Farstream unit tests for FsRtpConference
*
* Copyright (C) 2007 Collabora, Nokia
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <gst/check/gstcheck.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <farstream/fs-conference.h>
#include <farstream/fs-element-added-notifier.h>
#include "check-threadsafe.h"
#define SEND_BUFFER_COUNT 100
#define BUFFER_COUNT 20
GMutex count_mutex;
GCond count_cond;
guint buffer_count = 0;
static void
handoff_handler (GstElement *fakesink, GstBuffer *buffer, GstPad *pad,
gpointer user_data)
{
g_mutex_lock (&count_mutex);
buffer_count ++;
GST_LOG ("buffer %d", buffer_count);
if (buffer_count == BUFFER_COUNT)
g_cond_broadcast (&count_cond);
ts_fail_unless (buffer_count <= SEND_BUFFER_COUNT);
g_mutex_unlock (&count_mutex);
}
static void
src_pad_added_cb (FsStream *self,
GstPad *pad,
FsCodec *codec,
GstElement *pipeline)
{
GstElement *sink;
GstPad *sinkpad;
sink = gst_element_factory_make ("fakesink", NULL);
g_object_set (sink, "sync", TRUE,
"signal-handoffs", TRUE,
NULL);
g_signal_connect (sink, "handoff", G_CALLBACK (handoff_handler), NULL);
fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
gst_element_set_state (sink, GST_STATE_PLAYING);
sinkpad = gst_element_get_static_pad (sink, "sink");
fail_unless (GST_PAD_LINK_SUCCESSFUL (gst_pad_link (pad, sinkpad)));
gst_object_unref (sinkpad);
GST_DEBUG ("Pad added");
}
static guint decoder_count = 0;
static void
element_added (FsElementAddedNotifier *notif, GstBin *bin, GstElement *element,
gpointer user_data)
{
GstElementFactory *fact = gst_element_get_factory (element);
if (!fact)
return;
if (strcmp (gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (fact)),
"theoradec"))
return;
ts_fail_unless (decoder_count == 0);
decoder_count++;
}
static void
caps_changed (GstPad *pad, GParamSpec *spec, FsStream *stream)
{
GstCaps *caps;
GstStructure *s;
FsCodec *codec;
GList *codecs;
const gchar *config;
GError *error = NULL;
g_object_get (pad, "caps", &caps, NULL);
if (!caps)
return;
s = gst_caps_get_structure (caps, 0);
codec = fs_codec_new (96, "THEORA", FS_MEDIA_TYPE_VIDEO, 90000);
config = gst_structure_get_string (s, "configuration");
if (config)
fs_codec_add_optional_parameter (codec, "configuration", config);
codecs = g_list_prepend (NULL, codec);
fail_unless (fs_stream_set_remote_codecs (stream, codecs, &error),
"Unable to set remote codec: %s",
error ? error->message : "UNKNOWN");
fs_codec_list_destroy (codecs);
}
static GstPadProbeReturn
drop_theora_config (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
guint8 *payload;
guint32 header;
guchar TDT;
gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer);
payload = gst_rtp_buffer_get_payload (&rtpbuffer);
header = GST_READ_UINT32_BE (payload);
/*
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | Ident | F |TDT|# pkts.|
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*
* F: Fragment type (0=none, 1=start, 2=cont, 3=end)
* TDT: Theora data type (0=theora, 1=config, 2=comment, 3=reserved)
* pkts: number of packets.
*/
TDT = (header & 0x30) >> 4;
gst_rtp_buffer_unmap (&rtpbuffer);
if (TDT == 1)
return GST_PAD_PROBE_DROP;
else
return GST_PAD_PROBE_OK;
}
GST_START_TEST (test_rtprecv_inband_config_data)
{
FsParticipant *participant = NULL;
FsStream *stream = NULL;
GstElement *src;
GstElement *pipeline;
GstElement *sink;
GstBus *bus;
GstMessage *msg;
guint port = 0;
GError *error = NULL;
GList *codecs = NULL;
GstElement *fspipeline;
GstElement *conference;
FsSession *session;
GList *item;
GstPad *pad;
GstElement *pay;
FsElementAddedNotifier *notif;
g_mutex_init (&count_mutex);
g_cond_init (&count_cond);
buffer_count = 0;
decoder_count = 0;
fspipeline = gst_pipeline_new (NULL);
notif = fs_element_added_notifier_new ();
fs_element_added_notifier_add (notif, GST_BIN (fspipeline));
g_signal_connect (notif, "element-added", G_CALLBACK (element_added), NULL);
conference = gst_element_factory_make ("fsrtpconference", NULL);
fail_unless (gst_bin_add (GST_BIN (fspipeline), conference));
session = fs_conference_new_session (FS_CONFERENCE (conference),
FS_MEDIA_TYPE_VIDEO, &error);
if (error)
fail ("Error while creating new session (%d): %s",
error->code, error->message);
fail_if (session == NULL, "Could not make session, but no GError!");
g_object_set (session, "no-rtcp-timeout", 0, NULL);
g_object_get (session, "codecs-without-config", &codecs, NULL);
for (item = codecs; item; item = item->next)
{
FsCodec *codec = item->data;
if (!g_ascii_strcasecmp ("THEORA", codec->encoding_name))
break;
}
fs_codec_list_destroy (codecs);
if (!item)
{
GST_INFO ("Skipping %s because THEORA is not detected", G_STRFUNC);
fs_session_destroy (session);
g_object_unref (session);
gst_object_unref (fspipeline);
return;
}
participant = fs_conference_new_participant (
FS_CONFERENCE (conference), &error);
if (error)
ts_fail ("Error while creating new participant (%d): %s",
error->code, error->message);
ts_fail_if (participant == NULL,
"Could not make participant, but no GError!");
stream = fs_session_new_stream (session, participant, FS_DIRECTION_RECV,
&error);
if (error)
ts_fail ("Error while creating new stream (%d): %s",
error->code, error->message);
ts_fail_if (stream == NULL, "Could not make stream, but no GError!");
fail_unless (fs_stream_set_transmitter (stream, "rawudp", NULL, 0, &error));
fail_unless (error == NULL);
g_signal_connect (stream, "src-pad-added",
G_CALLBACK (src_pad_added_cb), fspipeline);
codecs = g_list_prepend (NULL, fs_codec_new (96, "THEORA",
FS_MEDIA_TYPE_VIDEO, 90000));
fail_unless (fs_stream_set_remote_codecs (stream, codecs,
&error),
"Unable to set remote codec: %s",
error ? error->message : "UNKNOWN");
fs_codec_list_destroy (codecs);
pipeline = gst_parse_launch (
"videotestsrc is-live=1 name=src num-buffers="G_STRINGIFY (BUFFER_COUNT) " !"
" video/x-raw, framerate=(fraction)30/1 ! theoraenc !"
" rtptheorapay name=pay config-interval=0 name=pay !"
" application/x-rtp, payload=96, ssrc=(uint)12345678 !"
" udpsink host=127.0.0.1 name=sink", NULL);
gst_element_set_state (fspipeline, GST_STATE_PLAYING);
bus = gst_element_get_bus (fspipeline);
while (port == 0)
{
const GstStructure *s;
msg = gst_bus_timed_pop_filtered (bus, GST_CLOCK_TIME_NONE,
GST_MESSAGE_ELEMENT);
fail_unless (msg != NULL);
s = gst_message_get_structure (msg);
fail_if (gst_structure_has_name (s, "farstream-local-candidates-prepared"));
if (gst_structure_has_name (s, "farstream-new-local-candidate"))
{
const GValue *value;
FsCandidate *candidate;
ts_fail_unless (
gst_structure_has_field_typed (s, "candidate", FS_TYPE_CANDIDATE),
"farstream-new-local-candidate structure has no candidate field");
value = gst_structure_get_value (s, "candidate");
candidate = g_value_get_boxed (value);
if (candidate->type == FS_CANDIDATE_TYPE_HOST)
port = candidate->port;
GST_DEBUG ("Got port %u", port);
}
gst_message_unref (msg);
}
gst_object_unref (bus);
sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");
g_object_set (sink, "port", port, NULL);
gst_object_unref (sink);
pay = gst_bin_get_by_name (GST_BIN (pipeline), "pay");
ts_fail_unless (pay != NULL);
pad = gst_element_get_static_pad (pay, "src");
ts_fail_unless (pad != NULL);
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
drop_theora_config, NULL, NULL);
g_signal_connect (pad, "notify::caps", G_CALLBACK (caps_changed), stream);
caps_changed (pad, NULL, stream);
gst_object_unref (pad);
gst_object_unref (pay);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
g_mutex_lock (&count_mutex);
while (buffer_count < BUFFER_COUNT)
g_cond_wait (&count_cond, &count_mutex);
buffer_count = 0;
g_mutex_unlock (&count_mutex);
gst_element_set_state (pipeline, GST_STATE_NULL);
src = gst_bin_get_by_name (GST_BIN (pipeline), "src");
g_object_set (src, "num-buffers", SEND_BUFFER_COUNT, NULL);
gst_object_unref (src);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
g_mutex_lock (&count_mutex);
while (buffer_count < BUFFER_COUNT)
g_cond_wait (&count_cond, &count_mutex);
buffer_count = 0;
g_mutex_unlock (&count_mutex);
gst_element_set_state (pipeline, GST_STATE_NULL);
bus = gst_element_get_bus (fspipeline);
msg = gst_bus_pop_filtered (bus, GST_MESSAGE_ERROR);
if (msg)
{
GError *error;
gchar *debug;
gst_message_parse_error (msg, &error, &debug);
ts_fail ("Got an error on the BUS (%d): %s (%s)", error->code,
error->message, debug);
g_error_free (error);
g_free (debug);
gst_message_unref (msg);
}
gst_object_unref (bus);
gst_object_unref (pipeline);
gst_object_unref (participant);
gst_object_unref (stream);
gst_object_unref (session);
gst_element_set_state (fspipeline, GST_STATE_NULL);
gst_object_unref (fspipeline);
g_mutex_clear (&count_mutex);
g_cond_clear (&count_cond);
}
GST_END_TEST;
static Suite *
fsrtprecvcodecs_suite (void)
{
Suite *s = suite_create ("fsrtprecvcodecs");
TCase *tc_chain;
GLogLevelFlags fatal_mask;
fatal_mask = g_log_set_always_fatal (G_LOG_FATAL_MASK);
fatal_mask |= G_LOG_LEVEL_WARNING | G_LOG_LEVEL_CRITICAL;
g_log_set_always_fatal (fatal_mask);
tc_chain = tcase_create ("fsrtprecv_inband_config_data");
tcase_add_test (tc_chain, test_rtprecv_inband_config_data);
suite_add_tcase (s, tc_chain);
return s;
}
GST_CHECK_MAIN (fsrtprecvcodecs);