/* Farstream unit tests for FsShmTransmitter
*
* Copyright (C) 2009 Collabora Ltd
* @author: Olivier Crete <olivier.crete@collabora.co.uk>
* Copyright (C) 2009 Nokia
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <gst/check/gstcheck.h>
#include <farstream/fs-transmitter.h>
#include <farstream/fs-conference.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include "check-threadsafe.h"
#include "generic.h"
gint buffer_count[2] = {0, 0};
gboolean got_candidates[2];
gboolean got_prepared[2];
GstElement *pipeline = NULL;
gboolean src_setup[2] = {FALSE, FALSE};
guint received_known[2] = {0, 0};
gboolean associate_on_source = TRUE;
GMutex test_mutex;
GCond cond;
gboolean done = FALSE;
guint connected_count;
enum {
FLAG_NO_SOURCE = 1 << 2,
FLAG_NOT_SENDING = 1 << 3,
FLAG_LOCAL_CANDIDATES = 1 << 5
};
#define RTP_PORT 9828
#define RTCP_PORT 9829
GST_START_TEST (test_shmtransmitter_new)
{
gchar **transmitters;
gint i;
gboolean found_it = FALSE;
transmitters = fs_transmitter_list_available ();
for (i=0; transmitters[i]; i++)
{
if (!strcmp ("shm", transmitters[i]))
{
found_it = TRUE;
break;
}
}
g_strfreev (transmitters);
ts_fail_unless (found_it, "Did not find shm transmitter");
test_transmitter_creation ("shm");
test_transmitter_creation ("shm");
}
GST_END_TEST;
static void
_new_local_candidate (FsStreamTransmitter *st, FsCandidate *candidate,
gpointer user_data)
{
ts_fail_if (candidate == NULL, "Passed NULL candidate");
ts_fail_unless (candidate->ip != NULL, "Null IP in candidate");
ts_fail_unless (candidate->proto == FS_NETWORK_PROTOCOL_UDP,
"Protocol is not UDP");
ts_fail_unless (candidate->type == FS_CANDIDATE_TYPE_HOST,
"Candidate is not host");
ts_fail_unless (got_candidates[candidate->component_id-1] == FALSE);
got_candidates[candidate->component_id-1] = TRUE;
GST_DEBUG ("New local candidate %s of type %d for component %d",
candidate->ip, candidate->type, candidate->component_id);
}
static void
_candidate_prepared (FsStreamTransmitter *st,gpointer user_data)
{
GST_DEBUG ("Local candidates prepared");
fail_unless (got_candidates[0] == TRUE || got_candidates[1] == TRUE);
if (got_candidates[0])
got_prepared[0] = TRUE;
if (got_candidates[1])
got_prepared[1] = TRUE;
}
static void
_state_changed (FsStreamTransmitter *st, guint component_id,
FsStreamState state, gpointer user_data)
{
g_mutex_lock (&test_mutex);
connected_count++;
g_mutex_unlock (&test_mutex);
g_cond_signal (&cond);
}
static void
_handoff_handler (GstElement *element, GstBuffer *buffer, GstPad *pad,
gpointer user_data)
{
gint component_id = GPOINTER_TO_INT (user_data);
ts_fail_unless (gst_buffer_get_size (buffer) == component_id * 10,
"Buffer is size %d but component_id is %d", gst_buffer_get_size (buffer),
component_id);
buffer_count[component_id-1]++;
GST_LOG ("Buffer %d component: %d size: %" G_GSIZE_FORMAT,
buffer_count[component_id-1], component_id, gst_buffer_get_size (buffer));
ts_fail_if (buffer_count[component_id-1] > 20,
"Too many buffers %d > 20 for component",
buffer_count[component_id-1], component_id);
if (buffer_count[0] == 20 && buffer_count[1] == 20) {
GST_DEBUG ("Test complete, got 20 buffers twice");
/* TEST OVER */
if (associate_on_source)
ts_fail_unless (buffer_count[0] == received_known[0] &&
buffer_count[1] == received_known[1], "Some known buffers from known"
" sources have not been reported (%d != %u || %d != %u)",
buffer_count[0], received_known[0],
buffer_count[1], received_known[1]);
else
ts_fail_unless (received_known[0] == 0 && received_known[1] == 0,
"Got a known-source-packet-received signal when we shouldn't have");
g_mutex_lock (&test_mutex);
done = TRUE;
g_mutex_unlock (&test_mutex);
g_cond_signal (&cond);
}
}
static void
_known_source_packet_received (FsStreamTransmitter *st, guint component_id,
GstBuffer *buffer, gpointer user_data)
{
ts_fail_unless (associate_on_source == TRUE,
"Got known-source-packet-received when we shouldn't have");
ts_fail_unless (component_id == 1 || component_id == 2,
"Invalid component id %u", component_id);
ts_fail_unless (GST_IS_BUFFER (buffer), "Invalid buffer received at %p",
buffer);
received_known[component_id - 1]++;
GST_LOG ("Known source buffer %d component: %d size: %" G_GSIZE_FORMAT,
received_known[component_id-1], component_id,
gst_buffer_get_size (buffer));
}
void
sync_error_handler (GstBus *bus, GstMessage *message, gpointer blob)
{
GError *error = NULL;
gchar *debug;
gst_message_parse_error (message, &error, &debug);
g_error ("bus sync error %s debug: %s", error->message, debug);
}
static void
run_shm_transmitter_test (gint flags)
{
GError *error = NULL;
FsTransmitter *trans;
FsStreamTransmitter *st;
GstBus *bus = NULL;
GParameter params[1];
GList *local_cands = NULL;
GstStateChangeReturn ret;
FsCandidate *cand;
GList *remote_cands = NULL;
int param_count = 0;
gint bus_source;
done = FALSE;
connected_count = 0;
g_cond_init (&cond);
g_mutex_init (&test_mutex);
buffer_count[0] = 0;
buffer_count[1] = 0;
received_known[0] = 0;
received_known[1] = 0;
got_candidates[0] = FALSE;
got_candidates[1] = FALSE;
got_prepared[0] = FALSE;
got_prepared[1] = FALSE;
if (unlink ("/tmp/src1") < 0 && errno != ENOENT)
fail ("Could not unlink /tmp/src1: %s", strerror (errno));
if (unlink ("/tmp/src2") < 0 && errno != ENOENT)
fail ("Could not unlink /tmp/src2: %s", strerror (errno));
local_cands = g_list_append (local_cands, fs_candidate_new (NULL, 1,
FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, "/tmp/src1", 0));
local_cands = g_list_append (local_cands, fs_candidate_new (NULL, 2,
FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, "/tmp/src2", 0));
if (flags & FLAG_LOCAL_CANDIDATES)
{
memset (params, 0, sizeof (GParameter));
params[0].name = "preferred-local-candidates";
g_value_init (¶ms[0].value, FS_TYPE_CANDIDATE_LIST);
g_value_take_boxed (¶ms[0].value, local_cands);
param_count = 1;
}
associate_on_source = !(flags & FLAG_NO_SOURCE);
if (flags & FLAG_NOT_SENDING)
{
buffer_count[0] = 20;
received_known[0] = 20;
}
trans = fs_transmitter_new ("shm", 2, 0, &error);
if (error)
ts_fail ("Error creating transmitter: (%s:%d) %s",
g_quark_to_string (error->domain), error->code, error->message);
ts_fail_if (trans == NULL, "No transmitter create, yet error is still NULL");
g_clear_error (&error);
pipeline = setup_pipeline (trans, G_CALLBACK (_handoff_handler));
bus = gst_element_get_bus (pipeline);
bus_source = gst_bus_add_watch (bus, bus_error_callback, NULL);
gst_bus_enable_sync_message_emission (bus);
g_signal_connect (bus, "sync-message::error",
G_CALLBACK (sync_error_handler), NULL);
gst_object_unref (bus);
st = fs_transmitter_new_stream_transmitter (trans, NULL,
param_count, params, &error);
if (param_count)
g_value_unset (¶ms[0].value);
if (error)
ts_fail ("Error creating stream transmitter: (%s:%d) %s",
g_quark_to_string (error->domain), error->code, error->message);
ts_fail_if (st == NULL, "No stream transmitter created, yet error is NULL");
g_clear_error (&error);
g_object_set (st, "sending", !(flags & FLAG_NOT_SENDING), NULL);
ts_fail_unless (g_signal_connect (st, "new-local-candidate",
G_CALLBACK (_new_local_candidate), trans),
"Could not connect new-local-candidate signal");
ts_fail_unless (g_signal_connect (st, "local-candidates-prepared",
G_CALLBACK (_candidate_prepared), NULL),
"Could not connect local-candidates-prepared signal");
ts_fail_unless (g_signal_connect (st, "error",
G_CALLBACK (stream_transmitter_error), NULL),
"Could not connect error signal");
ts_fail_unless (g_signal_connect (st, "known-source-packet-received",
G_CALLBACK (_known_source_packet_received), NULL),
"Could not connect known-source-packet-received signal");
ts_fail_unless (g_signal_connect (st, "state-changed",
G_CALLBACK (_state_changed), NULL),
"Could not connect state-changed signal");
if (!fs_stream_transmitter_gather_local_candidates (st, &error))
{
if (error)
ts_fail ("Could not start gathering local candidates (%s:%d) %s",
g_quark_to_string (error->domain), error->code, error->message);
else
ts_fail ("Could not start gathering candidates"
" (without a specified error)");
}
else
{
ts_fail_unless (error == NULL);
}
g_clear_error (&error);
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
ts_fail_if (ret == GST_STATE_CHANGE_FAILURE,
"Could not set the pipeline to playing");
if (!(flags & FLAG_LOCAL_CANDIDATES))
{
ret = fs_stream_transmitter_force_remote_candidates (st, local_cands,
&error);
fs_candidate_list_destroy (local_cands);
if (error)
ts_fail ("Error while adding candidate: (%s:%d) %s",
g_quark_to_string (error->domain), error->code, error->message);
ts_fail_unless (ret == TRUE, "No detailed error from add_remote_candidate");
}
else
{
ts_fail_unless (error == NULL);
}
g_clear_error (&error);
cand = fs_candidate_new (NULL, 1,
FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, NULL, 0);
cand->username = g_strdup ("/tmp/src1");
remote_cands = g_list_prepend (remote_cands, cand);
cand = fs_candidate_new (NULL, 2,
FS_CANDIDATE_TYPE_HOST, FS_NETWORK_PROTOCOL_UDP, NULL, 0);
cand->username = g_strdup ("/tmp/src2");
remote_cands = g_list_prepend (remote_cands, cand);
ret = fs_stream_transmitter_force_remote_candidates (st, remote_cands, &error);
fs_candidate_list_destroy (remote_cands);
if (error)
ts_fail ("Error while adding candidate: (%s:%d) %s",
g_quark_to_string (error->domain), error->code, error->message);
ts_fail_unless (ret == TRUE, "No detailed error from add_remote_candidate");
g_clear_error (&error);
g_mutex_lock (&test_mutex);
while (connected_count < 2)
g_cond_wait (&cond, &test_mutex);
g_mutex_unlock (&test_mutex);
setup_fakesrc (trans, pipeline, 1);
setup_fakesrc (trans, pipeline, 2);
g_mutex_lock (&test_mutex);
while (!done)
g_cond_wait (&cond, &test_mutex);
g_mutex_unlock (&test_mutex);
fail_unless (got_prepared[0] == TRUE);
fail_unless (got_prepared[1] == TRUE);
fail_unless (got_candidates[0] == TRUE);
fail_unless (got_candidates[1] == TRUE);
gst_element_set_state (pipeline, GST_STATE_NULL);
if (st)
{
fs_stream_transmitter_stop (st);
g_object_unref (st);
}
g_object_unref (trans);
g_source_remove (bus_source);
gst_object_unref (pipeline);
g_cond_clear (&cond);
g_mutex_clear (&test_mutex);
}
GST_START_TEST (test_shmtransmitter_run_basic)
{
run_shm_transmitter_test (0);
}
GST_END_TEST;
GST_START_TEST (test_shmtransmitter_sending_half)
{
run_shm_transmitter_test (FLAG_NOT_SENDING);
}
GST_END_TEST;
GST_START_TEST (test_shmtransmitter_local_cands)
{
run_shm_transmitter_test (FLAG_LOCAL_CANDIDATES);
}
GST_END_TEST;
static Suite *
shmtransmitter_suite (void)
{
Suite *s = suite_create ("shmtransmitter");
TCase *tc_chain;
GLogLevelFlags fatal_mask;
fatal_mask = g_log_set_always_fatal (G_LOG_FATAL_MASK);
fatal_mask |= G_LOG_LEVEL_WARNING | G_LOG_LEVEL_CRITICAL;
g_log_set_always_fatal (fatal_mask);
tc_chain = tcase_create ("shmtransmitter_new");
tcase_add_test (tc_chain, test_shmtransmitter_new);
suite_add_tcase (s, tc_chain);
tc_chain = tcase_create ("shmtransmitter_basic");
tcase_add_test (tc_chain, test_shmtransmitter_run_basic);
suite_add_tcase (s, tc_chain);
tc_chain = tcase_create ("shmtransmitter-sending-half");
tcase_add_test (tc_chain, test_shmtransmitter_sending_half);
suite_add_tcase (s, tc_chain);
tc_chain = tcase_create ("shmtransmitter-local-candidates");
tcase_add_test (tc_chain, test_shmtransmitter_local_cands);
suite_add_tcase (s, tc_chain);
return s;
}
GST_CHECK_MAIN (shmtransmitter);