Blob Blame History Raw
/* Farstream unit tests for FsRtpConference
 *
 * Copyright (C) 2007 Collabora, Nokia
 * @author: Olivier Crete <olivier.crete@collabora.co.uk>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
*/

#ifdef HAVE_CONFIG_H
# include <config.h>
#endif

#include <gst/check/gstcheck.h>
#include <gst/rtp/gstrtpbuffer.h>

#include <farstream/fs-conference.h>

#include "check-threadsafe.h"
#include "generic.h"

GMainLoop *loop = NULL;

guint dtmf_id = 0;
gint digit = 0;
gboolean sending = FALSE;
gboolean received = FALSE;
gboolean ready_to_send = FALSE;
gboolean change_codec = FALSE;
gboolean filter_telephone_event = FALSE;

struct SimpleTestConference *dat = NULL;
FsStream *stream = NULL;

static gboolean
_start_pipeline (gpointer user_data)
{
  struct SimpleTestConference *dat = user_data;

  GST_DEBUG ("%d: Starting pipeline", dat->id);

  ts_fail_if (gst_element_set_state (dat->pipeline, GST_STATE_PLAYING) ==
    GST_STATE_CHANGE_FAILURE, "Could not set the pipeline to playing");

  dat->started = TRUE;

  return FALSE;
}

static gboolean
_bus_callback (GstBus *bus, GstMessage *message, gpointer user_data)
{
  struct SimpleTestConference *dat = user_data;

  switch (GST_MESSAGE_TYPE (message))
  {

    case GST_MESSAGE_ELEMENT:
      {
        const GstStructure *s = gst_message_get_structure (message);

        if (gst_structure_has_name (s, "farstream-error"))
        {
          const GValue *value;
          FsError errorno;
          const gchar *error;
          GEnumClass *enumclass = NULL;
          GEnumValue *enumvalue = NULL;

          ts_fail_unless (
            FS_IS_CONFERENCE (GST_MESSAGE_SRC (message)),
            "Received farstream-error from non-farstream element");

          ts_fail_unless (
              gst_structure_has_field_typed (s, "src-object", G_TYPE_OBJECT),
              "farstream-error structure has no src-object field");
          ts_fail_unless (
              gst_structure_has_field_typed (s, "error-no", FS_TYPE_ERROR),
              "farstream-error structure has no src-object field");
          ts_fail_unless (
              gst_structure_has_field_typed (s, "error-msg", G_TYPE_STRING),
              "farstream-error structure has no src-object field");

          value = gst_structure_get_value (s, "error-no");
          errorno = g_value_get_enum (value);
          error = gst_structure_get_string (s, "error-msg");

          enumclass = g_type_class_ref (FS_TYPE_ERROR);
          enumvalue = g_enum_get_value (enumclass, errorno);
          ts_fail ("Error on BUS %s (%d, %s) %s",
              enumvalue->value_name, errorno, enumvalue->value_nick,
              error);
          g_type_class_unref (enumclass);
        }
        else if (gst_structure_has_name (s, "farstream-send-codec-changed"))
        {
          FsCodec *codec = NULL;
          GList *secondary_codec_list = NULL;
          GList *item;

          ts_fail_unless (gst_structure_get ((GstStructure *) s,
                  "secondary-codecs", FS_TYPE_CODEC_LIST, &secondary_codec_list,
                  "codec", FS_TYPE_CODEC, &codec,
                  NULL));

          ts_fail_unless (codec != NULL);
          if (!filter_telephone_event)
          {
            ts_fail_unless (secondary_codec_list != NULL);

            for (item = secondary_codec_list; item; item = item->next)
            {
              FsCodec *codec = item->data;

              if (codec->clock_rate == 8000 &&
                  !g_ascii_strcasecmp ("telephone-event", codec->encoding_name))
              {
                ts_fail_unless (codec->id == dtmf_id);
                ready_to_send = TRUE;
              }
            }

            fail_unless (ready_to_send == TRUE);
          }

          fs_codec_list_destroy (secondary_codec_list);
          fs_codec_destroy (codec);
        }

      }
      break;
    case GST_MESSAGE_ERROR:
      {
        GError *error = NULL;
        gchar *debug = NULL;
        gst_message_parse_error (message, &error, &debug);

        ts_fail ("Got an error on the BUS (%d): %s (%s)", error->code,
            error->message, debug);
        g_error_free (error);
        g_free (debug);
      }
      break;
    case GST_MESSAGE_WARNING:
      {
        GError *error = NULL;
        gchar *debug = NULL;
        gst_message_parse_warning (message, &error, &debug);

        GST_WARNING ("%d: Got a warning on the BUS (%d): %s (%s)", dat->id,
            error->code,
            error->message, debug);
        g_error_free (error);
        g_free (debug);
      }
      break;
    default:
      break;
  }

  return TRUE;
}

static GstElement *
build_recv_pipeline (GstPadProbeCallback buffer_handler, gpointer data,
    gint *port)
{
  GstElement *pipeline;
  GstElement *src;
  GstElement *sink;
  GstPad *pad = NULL;

  pipeline = gst_pipeline_new (NULL);

  src = gst_element_factory_make ("udpsrc", NULL);
  sink = gst_element_factory_make ("fakesink", NULL);

  g_object_set (sink, "sync", FALSE, NULL);

  ts_fail_unless (pipeline && src && sink, "Could not make pipeline(%p)"
      " or src(%p) or sink(%p)", pipeline, src, sink);

  gst_bin_add_many (GST_BIN (pipeline), src, sink, NULL);

  ts_fail_unless (gst_element_link (src, sink), "Could not link udpsrc"
      " and fakesink");

  pad = gst_element_get_static_pad (sink, "sink");

  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER, buffer_handler, data,
      NULL);

  gst_object_ref (pad);

  ts_fail_if (gst_element_set_state (pipeline, GST_STATE_PLAYING) ==
      GST_STATE_CHANGE_FAILURE, "Could not start recv pipeline");

  g_object_get (G_OBJECT (src), "port", port, NULL);

  return pipeline;
}

static void
set_codecs (struct SimpleTestConference *dat, FsStream *stream)
{
  GList *codecs = NULL;
  GList *filtered_codecs = NULL;
  GList *item = NULL;
  GError *error = NULL;
  FsCodec *dtmf_codec = NULL;

  g_object_get (dat->session, "codecs-without-config", &codecs, NULL);

  ts_fail_if (codecs == NULL, "Could not get the local codecs");

  for (item = g_list_first (codecs); item; item = g_list_next (item))
  {
    FsCodec *codec = item->data;
    if (codec->id == 0)
    {
      filtered_codecs = g_list_append (filtered_codecs, codec);
    }
    else if (codec->clock_rate == 8000 &&
        !g_ascii_strcasecmp (codec->encoding_name, "telephone-event"))
    {
      ts_fail_unless (dtmf_codec == NULL,
          "More than one copy of telephone-event");
      dtmf_codec = codec;
      if (!filter_telephone_event)
        filtered_codecs = g_list_append (filtered_codecs, codec);
    }
  }

  ts_fail_if (filtered_codecs == NULL, "PCMA and PCMU are not in the codecs"
      " you must install gst-plugins-good");

  ts_fail_unless (dtmf_codec != NULL);
  dtmf_codec->id = dtmf_id;

  if (!fs_stream_set_remote_codecs (stream, filtered_codecs, &error))
  {
    if (error)
      ts_fail ("Could not set the remote codecs on stream (%d): %s",
          error->code,
          error->message);
    else
      ts_fail ("Could not set the remote codecs on stream"
          " and we did NOT get a GError!!");
  }

  g_list_free (filtered_codecs);
  fs_codec_list_destroy (codecs);
}

static void
one_way (GstElement *recv_pipeline, gint port)
{
  FsParticipant *participant = NULL;
  GError *error = NULL;
  GList *candidates = NULL;
  GstBus *bus = NULL;

  dtmf_id = 105;
  digit = 0;
  sending = FALSE;
  received = FALSE;
  ready_to_send = filter_telephone_event;

  loop = g_main_loop_new (NULL, FALSE);

  dat = setup_simple_conference (1, "fsrtpconference", "tester@123445");

  bus = gst_element_get_bus (dat->pipeline);
  gst_bus_add_watch (bus, _bus_callback, dat);
  gst_object_unref (bus);

  g_idle_add (_start_pipeline, dat);

  participant = fs_conference_new_participant (
      FS_CONFERENCE (dat->conference), &error);
  if (error)
    ts_fail ("Error while creating new participant (%d): %s",
        error->code, error->message);
  ts_fail_if (dat->session == NULL,
      "Could not make participant, but no GError!");

  stream = fs_session_new_stream (dat->session, participant,
      FS_DIRECTION_SEND, &error);
  if (error)
    ts_fail ("Error while creating new stream (%d): %s",
        error->code, error->message);
  ts_fail_if (stream == NULL, "Could not make stream, but no GError!");

  fail_unless (fs_stream_set_transmitter (stream, "rawudp", NULL, 0, &error));
  fail_unless (error == NULL);

  GST_DEBUG ("port is %d", port);

  candidates = g_list_prepend (NULL,
      fs_candidate_new ("1", FS_COMPONENT_RTP, FS_CANDIDATE_TYPE_HOST,
          FS_NETWORK_PROTOCOL_UDP, "127.0.0.1", port));
  ts_fail_unless (fs_stream_force_remote_candidates (stream, candidates,
          &error),
      "Could not set remote candidate");
  fs_candidate_list_destroy (candidates);

  set_codecs (dat, stream);

  setup_fakesrc (dat);

  g_main_loop_run (loop);

  gst_element_set_state (dat->pipeline, GST_STATE_NULL);
  gst_element_set_state (recv_pipeline, GST_STATE_NULL);

  cleanup_simple_conference (dat);
  gst_object_unref (recv_pipeline);

  g_main_loop_unref (loop);
}


static GstPadProbeReturn
send_dmtf_buffer_handler (GstPad *pad, GstPadProbeInfo *info,
    gpointer user_data)
{
  GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
  GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
  gchar *data;

  ts_fail_unless (gst_rtp_buffer_map (buf, GST_MAP_READ, &rtpbuf),
    "Buffer is not valid rtp");
  if (gst_rtp_buffer_get_payload_type (&rtpbuf) != dtmf_id)
    goto out;

  data = gst_rtp_buffer_get_payload (&rtpbuf);

  /* Check if still on previous digit */
  if (data[0] < digit)
    goto out;

  GST_LOG ("Got digit %d", data[0]);

  ts_fail_if (data[0] != digit, "Not sending the right digit"
      " (sending %d, should be %d", data[0], digit);

  received = TRUE;

out:
  gst_rtp_buffer_unmap (&rtpbuf);
  return GST_PAD_PROBE_OK;
}


static gboolean
start_stop_sending_dtmf (gpointer data)
{
  GstState state;
  GstStateChangeReturn ret;

  if (!dat || !dat->pipeline || !dat->session)
    return TRUE;

  ret = gst_element_get_state (dat->pipeline, &state, NULL, 0);
  ts_fail_if (ret == GST_STATE_CHANGE_FAILURE);

  if (ret != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING)
    return TRUE;

  if (!ready_to_send)
    return TRUE;


  if (sending)
  {
    ts_fail_unless (fs_session_stop_telephony_event (dat->session),
        "Could not stop telephony event");
    sending = FALSE;
  }
  else
  {
    if (digit)
      ts_fail_unless (received == TRUE,
          "Did not receive any buffer for digit %d", digit);

    if (digit >= FS_DTMF_EVENT_D)
    {
      if (change_codec)
      {
        digit = 0;
        dtmf_id++;
        ready_to_send = FALSE;
        change_codec = FALSE;
        set_codecs (dat, stream);
        return TRUE;
      }
      else
      {
        g_main_loop_quit (loop);
        return FALSE;
      }
    }
    digit++;

    received = FALSE;
    ts_fail_unless (fs_session_start_telephony_event (dat->session,
            digit, digit),
        "Could not start telephony event");
    sending = TRUE;
  }

  return TRUE;
}

GST_START_TEST (test_senddtmf_event)
{
  gint port;
  GstElement *recv_pipeline = build_recv_pipeline (
      send_dmtf_buffer_handler, NULL, &port);

  g_timeout_add (350, start_stop_sending_dtmf, NULL);
  one_way (recv_pipeline, port);
}
GST_END_TEST;


static gboolean
dtmf_bus_watch (GstBus *bus, GstMessage *message, gpointer data)
{
  const GstStructure *s;
  int d;

  if (GST_MESSAGE_TYPE (message) != GST_MESSAGE_ELEMENT)
    return TRUE;

  s = gst_message_get_structure (message);

  if (!gst_structure_has_name (s, "dtmf-event"))
    return TRUE;


  if (gst_structure_get_int (s, "number", &d)) {
    GST_LOG ("Got digit %d", d);
    if (digit == d)
      received = TRUE;
  }


  return TRUE;
}

static GstElement *
build_dtmf_sound_recv_pipeline (gint *port)
{
  GstElement *pipeline;
  GstElement *src;
  GstBus *bus;
  GError *error = NULL;

  pipeline = gst_parse_launch_full (
      "udpsrc name=src caps=\"application/x-rtp, payload=0\" !"
      " rtppcmudepay ! mulawdec ! dtmfdetect ! fakesink sync=0", NULL,
      GST_PARSE_FLAG_FATAL_ERRORS, &error);
  fail_if (pipeline == NULL);
  fail_if (error != NULL);

  bus = gst_element_get_bus (pipeline);
  gst_bus_add_watch (bus, dtmf_bus_watch, NULL);
  gst_object_unref (bus);

  ts_fail_if (gst_element_set_state (pipeline, GST_STATE_PLAYING) ==
      GST_STATE_CHANGE_FAILURE, "Could not start recv pipeline");

  src = gst_bin_get_by_name (GST_BIN (pipeline), "src");
  fail_if (src == NULL);
  g_object_get (G_OBJECT (src), "port", port, NULL);
  gst_object_unref (src);

  return pipeline;
}


GST_START_TEST (test_senddtmf_sound)
{
  gint port = 0;
  GstElement *recv_pipeline = build_dtmf_sound_recv_pipeline (&port);

  g_timeout_add (350, start_stop_sending_dtmf, NULL);
  filter_telephone_event = TRUE;
  one_way (recv_pipeline, port);
  filter_telephone_event = FALSE;
}
GST_END_TEST;


GST_START_TEST (test_senddtmf_change_auto)
{
  gint port;
  GstElement *recv_pipeline = build_recv_pipeline (
      send_dmtf_buffer_handler, NULL, &port);

  change_codec = TRUE;
  g_timeout_add (350, start_stop_sending_dtmf, NULL);
  one_way (recv_pipeline, port);
}
GST_END_TEST;

gboolean checked = FALSE;

static GstPadProbeReturn
change_ssrc_buffer_handler (GstPad *pad, GstPadProbeInfo *info,
    gpointer user_data)
{
  GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
  guint sess_ssrc;
  guint buf_ssrc;
  GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;

  ts_fail_unless (gst_rtp_buffer_map (buf, GST_MAP_READ, &rtpbuf));
  buf_ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
  gst_rtp_buffer_unmap (&rtpbuf);

  g_object_get (dat->session, "ssrc", &sess_ssrc, NULL);

  if (buf_ssrc == 12345)
  {
    /* Step two, set it to 6789 */
    ts_fail_unless (buf_ssrc == sess_ssrc || sess_ssrc == 6789);

    g_object_set (dat->session, "ssrc", 6789, NULL);
  }
  else if (buf_ssrc == 6789)
  {
    /* Step three, quit */
    ts_fail_unless (buf_ssrc == sess_ssrc);

    g_main_loop_quit (loop);
  }
  else
  {
    ts_fail_unless (checked || buf_ssrc == sess_ssrc);
    checked = TRUE;

    /* Step one, set the ssrc to 12345 */
    if (sess_ssrc != 12345)
      g_object_set (dat->session, "ssrc", 12345, NULL);
  }

  return GST_PAD_PROBE_OK;
}

GST_START_TEST (test_change_ssrc)
{
  gint port;
  GstElement *recv_pipeline = build_recv_pipeline (
      change_ssrc_buffer_handler, NULL, &port);

  checked = FALSE;
  one_way (recv_pipeline, port);
}
GST_END_TEST;


static Suite *
fsrtpsendcodecs_suite (void)
{
  Suite *s = suite_create ("fsrtpsendcodecs");
  TCase *tc_chain;
  GLogLevelFlags fatal_mask;

  fatal_mask = g_log_set_always_fatal (G_LOG_FATAL_MASK);
  fatal_mask |= G_LOG_LEVEL_WARNING | G_LOG_LEVEL_CRITICAL;
  g_log_set_always_fatal (fatal_mask);


  tc_chain = tcase_create ("fsrtpsenddtmf_event");
  tcase_add_test (tc_chain, test_senddtmf_event);
  suite_add_tcase (s, tc_chain);

  tc_chain = tcase_create ("fsrtpsenddtmf_sound");
  tcase_add_test (tc_chain, test_senddtmf_sound);
  suite_add_tcase (s, tc_chain);

  /* FIXME: Skip this test cause it fails randomly due to some
   * ordering issue */
  tc_chain = tcase_create ("fsrtpsenddtmf_change_auto");
  tcase_add_test (tc_chain, test_senddtmf_change_auto);
  //suite_add_tcase (s, tc_chain);

  tc_chain = tcase_create ("fsrtpchangessrc");
  tcase_add_test (tc_chain, test_change_ssrc);
  suite_add_tcase (s, tc_chain);

  return s;
}

GST_CHECK_MAIN (fsrtpsendcodecs);