Blame tests/check/elements/rtpstorage.c

Packit 1f69a5
/* GStreamer plugin for forward error correction
Packit 1f69a5
 * Copyright (C) 2017 Pexip
Packit 1f69a5
 *
Packit 1f69a5
 * This library is free software; you can redistribute it and/or
Packit 1f69a5
 * modify it under the terms of the GNU Lesser General Public
Packit 1f69a5
 * License as published by the Free Software Foundation; either
Packit 1f69a5
 * version 2.1 of the License, or (at your option) any later version.
Packit 1f69a5
 *
Packit 1f69a5
 * This library is distributed in the hope that it will be useful,
Packit 1f69a5
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 1f69a5
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 1f69a5
 * Lesser General Public License for more details.
Packit 1f69a5
 *
Packit 1f69a5
 * You should have received a copy of the GNU Lesser General Public
Packit 1f69a5
 * License along with this library; if not, write to the Free Software
Packit 1f69a5
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
Packit 1f69a5
 *
Packit 1f69a5
 * Author: Mikhail Fludkov <misha@pexip.com>
Packit 1f69a5
 */
Packit 1f69a5
Packit 1f69a5
#include <gst/rtp/gstrtpbuffer.h>
Packit 1f69a5
#include <gst/check/gstcheck.h>
Packit 1f69a5
#include <gst/check/gstharness.h>
Packit 1f69a5
Packit 1f69a5
#include "../../../gst/rtp/rtpstorage.h"
Packit 1f69a5
#include "../../../gst/rtp/rtpstoragestream.c"
Packit 1f69a5
#include "../../../gst/rtp/rtpstorage.c"
Packit 1f69a5
Packit 1f69a5
#define RTP_CLOCK_RATE (90000)
Packit 1f69a5
#define RTP_FRAME_DUR (RTP_CLOCK_RATE / 30)
Packit 1f69a5
Packit 1f69a5
#define RTP_TSTAMP_BASE (0x11111111)
Packit 1f69a5
#define GST_TSTAMP_BASE (0x22222222)
Packit 1f69a5
#define RTP_TSTAMP(i) (RTP_FRAME_DUR * (i) + RTP_TSTAMP_BASE)
Packit 1f69a5
#define GST_TSTAMP(i) (RTP_PACKET_DUR * (i) + GST_TSTAMP_BASE)
Packit 1f69a5
Packit 1f69a5
#define RTP_PACKET_DUR (10 * GST_MSECOND)
Packit 1f69a5
Packit 1f69a5
static GstBufferList *
Packit 1f69a5
get_packets_for_recovery (GstHarness * h, gint fec_pt, guint32 ssrc,
Packit 1f69a5
    guint16 lost_seq)
Packit 1f69a5
{
Packit 1f69a5
  GstBufferList *res;
Packit 1f69a5
  RtpStorage *internal_storage;
Packit 1f69a5
Packit 1f69a5
  g_object_get (h->element, "internal-storage", &internal_storage, NULL);
Packit 1f69a5
Packit 1f69a5
  res =
Packit 1f69a5
      rtp_storage_get_packets_for_recovery (internal_storage, fec_pt, ssrc,
Packit 1f69a5
      lost_seq);
Packit 1f69a5
Packit 1f69a5
  g_object_unref (internal_storage);
Packit 1f69a5
Packit 1f69a5
  return res;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static void
Packit 1f69a5
put_recovered_packet (GstHarness * h, GstBuffer * buffer, guint8 pt,
Packit 1f69a5
    guint32 ssrc, guint16 seq)
Packit 1f69a5
{
Packit 1f69a5
  RtpStorage *internal_storage;
Packit 1f69a5
Packit 1f69a5
  g_object_get (h->element, "internal-storage", &internal_storage, NULL);
Packit 1f69a5
Packit 1f69a5
  rtp_storage_do_put_recovered_packet (internal_storage, buffer, pt, ssrc, seq);
Packit 1f69a5
Packit 1f69a5
  g_object_unref (internal_storage);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static GstBuffer *
Packit 1f69a5
create_rtp_packet (guint8 pt, guint32 ssrc, guint32 timestamp, guint16 seq)
Packit 1f69a5
{
Packit 1f69a5
  GstBuffer *buf = gst_rtp_buffer_new_allocate (0, 0, 0);
Packit 1f69a5
  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
Packit 1f69a5
Packit 1f69a5
  fail_unless (gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtp));
Packit 1f69a5
  gst_rtp_buffer_set_ssrc (&rtp, ssrc);
Packit 1f69a5
  gst_rtp_buffer_set_payload_type (&rtp, pt);
Packit 1f69a5
  gst_rtp_buffer_set_timestamp (&rtp, timestamp);
Packit 1f69a5
  gst_rtp_buffer_set_seq (&rtp, seq);
Packit 1f69a5
  GST_BUFFER_DTS (buf) = GST_TSTAMP (seq);
Packit 1f69a5
  gst_rtp_buffer_unmap (&rtp;;
Packit 1f69a5
Packit 1f69a5
  return buf;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_up_and_down)
Packit 1f69a5
{
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_resize)
Packit 1f69a5
{
Packit 1f69a5
  guint i, j;
Packit 1f69a5
  GstBuffer *bufin, *bufout, *bufs[10];
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  g_object_set (h->element, "size-time", 0, NULL);
Packit 1f69a5
  bufin = create_rtp_packet (96, 0xabe2b0b, 0x111111, 0);
Packit 1f69a5
  bufout = gst_harness_push_and_pull (h, bufin);
Packit 1f69a5
  fail_unless (bufin == bufout);
Packit 1f69a5
  fail_unless (gst_buffer_is_writable (bufout));
Packit 1f69a5
Packit 1f69a5
  g_object_set (h->element, "size-time",
Packit 1f69a5
      (G_N_ELEMENTS (bufs) - 1) * RTP_PACKET_DUR, NULL);
Packit 1f69a5
Packit 1f69a5
  // Pushing 10 buffers all of them should have ref. count =2
Packit 1f69a5
  for (i = 0; i < G_N_ELEMENTS (bufs); ++i) {
Packit 1f69a5
    bufs[i] =
Packit 1f69a5
        gst_harness_push_and_pull (h, create_rtp_packet (96, 0xabe2b0b,
Packit 1f69a5
            0x111111, i));
Packit 1f69a5
    for (j = 0; j <= i; ++j)
Packit 1f69a5
      fail_unless (!gst_buffer_is_writable (bufs[j]));
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  // The next 10 buffers should expel the first 10
Packit 1f69a5
  for (i = 0; i < G_N_ELEMENTS (bufs); ++i) {
Packit 1f69a5
    gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
                0xabe2b0b, 0x111111, G_N_ELEMENTS (bufs) + i)));
Packit 1f69a5
    for (j = 0; j <= i; ++j)
Packit 1f69a5
      fail_unless (gst_buffer_is_writable (bufs[j]));
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  for (i = 0; i < G_N_ELEMENTS (bufs); ++i)
Packit 1f69a5
    gst_buffer_unref (bufs[i]);
Packit 1f69a5
  gst_buffer_unref (bufout);
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_stop_redundant_packets)
Packit 1f69a5
{
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  GstBuffer *bufinp;
Packit 1f69a5
Packit 1f69a5
  g_object_set (h->element, "size-time", 2 * RTP_PACKET_DUR, NULL);
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  bufinp = create_rtp_packet (96, 0xabe2b0b, 0x111111, 0);
Packit 1f69a5
  GST_BUFFER_FLAG_SET (bufinp, GST_RTP_BUFFER_FLAG_REDUNDANT);
Packit 1f69a5
  gst_harness_push (h, bufinp);
Packit 1f69a5
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, 0x111111, 1)));
Packit 1f69a5
Packit 1f69a5
  fail_unless_equals_int (gst_harness_buffers_received (h), 1);
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_unknown_ssrc)
Packit 1f69a5
{
Packit 1f69a5
  GstBufferList *bufs_out;
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  g_object_set (h->element, "size-time", RTP_PACKET_DUR, NULL);
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  /* No packets has been pushed through yet */
Packit 1f69a5
  bufs_out = get_packets_for_recovery (h, 100, 0xabe2b0b, 0);
Packit 1f69a5
  fail_unless (NULL == bufs_out);
Packit 1f69a5
Packit 1f69a5
  /* 1 packet with ssrc=0xabe2bob pushed. Asking for ssrc=0xdeadbeef */
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, 0x111111, 0)));
Packit 1f69a5
  bufs_out = get_packets_for_recovery (h, 100, 0xdeadbeef, 0);
Packit 1f69a5
  fail_unless (NULL == bufs_out);
Packit 1f69a5
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_packet_not_lost)
Packit 1f69a5
{
Packit 1f69a5
  GstBuffer *buf;
Packit 1f69a5
  GstBufferList *bufs_out;
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  g_object_set (h->element, "size-time", 10 * RTP_PACKET_DUR, NULL);
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  /* Pushing through 2 frames + 2 FEC */
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (0), 0)));
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, (buf =
Packit 1f69a5
              create_rtp_packet (96, 0xabe2b0b, RTP_TSTAMP (1), 1))));
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (1), 2)));
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (1), 3)));
Packit 1f69a5
Packit 1f69a5
  /* Asking for a packet which was pushed before */
Packit 1f69a5
  bufs_out = get_packets_for_recovery (h, 100, 0xabe2b0b, 1);
Packit 1f69a5
  fail_unless (NULL != bufs_out);
Packit 1f69a5
  fail_unless_equals_int (1, gst_buffer_list_length (bufs_out));
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 0) == buf);
Packit 1f69a5
Packit 1f69a5
  gst_buffer_list_unref (bufs_out);
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (test_rtpstorage_put_recovered_packet)
Packit 1f69a5
{
Packit 1f69a5
  GstBuffer *bufs_in[4];
Packit 1f69a5
  GstBufferList *bufs_out;
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  g_object_set (h->element, "size-time", 10 * RTP_PACKET_DUR, NULL);
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  /* Pushing through 2 frames + 2 FEC
Packit 1f69a5
   * Packets with sequence numbers 1 and 2 are lost */
Packit 1f69a5
  bufs_in[0] = create_rtp_packet (96, 0xabe2b0b, RTP_TSTAMP (0), 0);
Packit 1f69a5
  bufs_in[1] = NULL;
Packit 1f69a5
  bufs_in[2] = NULL;
Packit 1f69a5
  bufs_in[3] = create_rtp_packet (100, 0xabe2b0b, RTP_TSTAMP (1), 3);
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, bufs_in[0]));
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, bufs_in[3]));
Packit 1f69a5
Packit 1f69a5
  /* 1 more frame + 1 FEC */
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (2), 4)));
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (100,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (2), 5)));
Packit 1f69a5
Packit 1f69a5
  /* Asking for the lost packet seq=1 */
Packit 1f69a5
  bufs_out = get_packets_for_recovery (h, 100, 0xabe2b0b, 1);
Packit 1f69a5
  fail_unless (NULL != bufs_out);
Packit 1f69a5
  fail_unless_equals_int (2, gst_buffer_list_length (bufs_out));
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 0) == bufs_in[0]);
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 1) == bufs_in[3]);
Packit 1f69a5
  gst_buffer_list_unref (bufs_out);
Packit 1f69a5
Packit 1f69a5
  /* During recovery the packet of a new frame has arrived */
Packit 1f69a5
  gst_buffer_unref (gst_harness_push_and_pull (h, create_rtp_packet (96,
Packit 1f69a5
              0xabe2b0b, RTP_TSTAMP (3), 6)));
Packit 1f69a5
Packit 1f69a5
  /* Say we recovered packet with seq=1 and put it back in the storage */
Packit 1f69a5
  bufs_in[1] = create_rtp_packet (96, 0xabe2b0b, RTP_TSTAMP (1), 1);
Packit 1f69a5
  put_recovered_packet (h, bufs_in[1], 96, 0xabe2b0b, 1);
Packit 1f69a5
Packit 1f69a5
  /* Asking for the lost packet seq=2 */
Packit 1f69a5
  bufs_out = get_packets_for_recovery (h, 100, 0xabe2b0b, 2);
Packit 1f69a5
  fail_unless (NULL != bufs_out);
Packit 1f69a5
  fail_unless_equals_int (3, gst_buffer_list_length (bufs_out));
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 0) == bufs_in[0]);
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 1) == bufs_in[1]);
Packit 1f69a5
  fail_unless (gst_buffer_list_get (bufs_out, 2) == bufs_in[3]);
Packit 1f69a5
  gst_buffer_list_unref (bufs_out);
Packit 1f69a5
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
Packit 1f69a5
static void
Packit 1f69a5
_single_ssrc_test (GstHarness * h, guint32 ssrc,
Packit 1f69a5
    guint16 seq_start, guint16 nth_to_loose,
Packit 1f69a5
    gsize expected_buf_size, gsize expected_first_buffer_idx)
Packit 1f69a5
{
Packit 1f69a5
  guint i;
Packit 1f69a5
  GPtrArray *bufs_in =
Packit 1f69a5
      g_ptr_array_new_with_free_func ((GDestroyNotify) gst_buffer_unref);
Packit 1f69a5
  GstBufferList *bufs_out;
Packit 1f69a5
Packit 1f69a5
  /* 2 frames + 2 FEC */
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (0),
Packit 1f69a5
          seq_start + 0));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (1),
Packit 1f69a5
          seq_start + 1));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (100, ssrc, RTP_TSTAMP (1),
Packit 1f69a5
          seq_start + 2));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (100, ssrc, RTP_TSTAMP (1),
Packit 1f69a5
          seq_start + 3));
Packit 1f69a5
  /* 3 frames + 2 FEC */
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (2),
Packit 1f69a5
          seq_start + 4));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (3),
Packit 1f69a5
          seq_start + 5));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (4),
Packit 1f69a5
          seq_start + 6));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (100, ssrc, RTP_TSTAMP (4),
Packit 1f69a5
          seq_start + 7));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (100, ssrc, RTP_TSTAMP (4),
Packit 1f69a5
          seq_start + 8));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (100, ssrc, RTP_TSTAMP (4),
Packit 1f69a5
          seq_start + 9));
Packit 1f69a5
  /* 2 frames + no FEC */
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (5),
Packit 1f69a5
          seq_start + 10));
Packit 1f69a5
  g_ptr_array_add (bufs_in, create_rtp_packet (96, ssrc, RTP_TSTAMP (6),
Packit 1f69a5
          seq_start + 11));
Packit 1f69a5
Packit 1f69a5
  /* Loosing one */
Packit 1f69a5
  g_ptr_array_remove_index (bufs_in, nth_to_loose);
Packit 1f69a5
Packit 1f69a5
  /* Push all of them through */
Packit 1f69a5
  for (i = 0; i < bufs_in->len; ++i)
Packit 1f69a5
    gst_buffer_unref (gst_harness_push_and_pull (h,
Packit 1f69a5
            gst_buffer_ref (g_ptr_array_index (bufs_in, i))));
Packit 1f69a5
Packit 1f69a5
  bufs_out =
Packit 1f69a5
      get_packets_for_recovery (h, 100, ssrc,
Packit 1f69a5
      (guint16) (seq_start + nth_to_loose));
Packit 1f69a5
  if (0 == expected_buf_size) {
Packit 1f69a5
    fail_unless (NULL == bufs_out);
Packit 1f69a5
  } else {
Packit 1f69a5
    fail_unless (NULL != bufs_out);
Packit 1f69a5
    fail_unless_equals_int (expected_buf_size,
Packit 1f69a5
        gst_buffer_list_length (bufs_out));
Packit 1f69a5
    for (i = 0; i < gst_buffer_list_length (bufs_out); ++i)
Packit 1f69a5
      fail_unless (gst_buffer_list_get (bufs_out, i) ==
Packit 1f69a5
          g_ptr_array_index (bufs_in, expected_first_buffer_idx + i));
Packit 1f69a5
    gst_buffer_list_unref (bufs_out);
Packit 1f69a5
  }
Packit 1f69a5
  g_ptr_array_unref (bufs_in);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static void
Packit 1f69a5
_multiple_ssrcs_test (guint16 nth_to_loose,
Packit 1f69a5
    gsize expected_buf_size, gsize expected_first_buffer_idx)
Packit 1f69a5
{
Packit 1f69a5
  guint16 stream0_seq_start = 200;
Packit 1f69a5
  guint16 stream1_seq_start = 65529;
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  g_object_set (h->element, "size-time", 12 * RTP_PACKET_DUR, NULL);
Packit 1f69a5
  gst_harness_set_src_caps_str (h, "application/x-rtp");
Packit 1f69a5
Packit 1f69a5
  _single_ssrc_test (h, 0x0abe2b0b, stream0_seq_start,
Packit 1f69a5
      nth_to_loose, expected_buf_size, expected_first_buffer_idx);
Packit 1f69a5
  _single_ssrc_test (h, 0xdeadbeef, stream1_seq_start,
Packit 1f69a5
      nth_to_loose, expected_buf_size, expected_first_buffer_idx);
Packit 1f69a5
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern0)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (1, 3, 0);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern1)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (2, 3, 0);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern2)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (3, 6, 3);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern3)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (4, 5, 4);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern4)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (5, 5, 4);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern5)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (6, 5, 4);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern6)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (7, 5, 4);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern7)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (8, 5, 4);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern8)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (9, 0, 0);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_loss_pattern9)
Packit 1f69a5
{
Packit 1f69a5
  _multiple_ssrcs_test (10, 0, 0);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
#define STRESS_TEST_SSRCS (8)
Packit 1f69a5
#define STRESS_TEST_STORAGE_DEPTH (50)
Packit 1f69a5
typedef struct _StressTestData StressTestData;
Packit 1f69a5
struct _StressTestData
Packit 1f69a5
{
Packit 1f69a5
  guint16 seq[STRESS_TEST_SSRCS];
Packit 1f69a5
  guint32 ssrc[STRESS_TEST_SSRCS];
Packit 1f69a5
  gsize count[STRESS_TEST_SSRCS];
Packit 1f69a5
  GRand *rnd;
Packit 1f69a5
};
Packit 1f69a5
Packit 1f69a5
static GstBuffer *
Packit 1f69a5
rtpstorage_stress_prepare_buffer (GstHarness * h, gpointer data)
Packit 1f69a5
{
Packit 1f69a5
  static const guint8 fec_pt = 100;
Packit 1f69a5
  static const guint8 media_pt = 96;
Packit 1f69a5
  StressTestData *test_data = data;
Packit 1f69a5
  gsize ssrc_idx = g_rand_int_range (test_data->rnd, 0, STRESS_TEST_SSRCS);
Packit 1f69a5
  guint16 seq = test_data->seq[ssrc_idx];
Packit 1f69a5
  guint32 ssrc = test_data->ssrc[ssrc_idx];
Packit 1f69a5
  gboolean is_fec = test_data->count[ssrc_idx] > 0 && (seq % 5 == 0
Packit 1f69a5
      || seq % 5 == 1);
Packit 1f69a5
  guint8 pt = is_fec ? fec_pt : media_pt;
Packit 1f69a5
Packit 1f69a5
  GstBuffer *buf = create_rtp_packet (pt, ssrc, RTP_TSTAMP (0), seq);
Packit 1f69a5
Packit 1f69a5
  ++test_data->seq[ssrc_idx];
Packit 1f69a5
  ++test_data->count[ssrc_idx];
Packit 1f69a5
  return buf;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_START_TEST (rtpstorage_stress)
Packit 1f69a5
{
Packit 1f69a5
  GRand *rnd;
Packit 1f69a5
  GTimer *timer;
Packit 1f69a5
  GstCaps *caps;
Packit 1f69a5
  GstSegment segment;
Packit 1f69a5
  GstHarnessThread *ht;
Packit 1f69a5
  StressTestData test_data;
Packit 1f69a5
  guint seed, i, total, requested;
Packit 1f69a5
  GstHarness *h = gst_harness_new ("rtpstorage");
Packit 1f69a5
  g_object_set (h->element, "size-time",
Packit 1f69a5
      STRESS_TEST_STORAGE_DEPTH * RTP_PACKET_DUR, NULL);
Packit 1f69a5
Packit 1f69a5
  /* The stress test pushes buffers with STRESS_TEST_SSRCS different
Packit 1f69a5
   * ssrcs from one thread and requests packets for FEC recovery from
Packit 1f69a5
   * another thread.
Packit 1f69a5
   * */
Packit 1f69a5
  memset (&test_data, 0, sizeof (test_data));
Packit 1f69a5
  seed = g_random_int ();
Packit 1f69a5
  test_data.rnd = g_rand_new_with_seed (seed);
Packit 1f69a5
  for (i = 0; i < STRESS_TEST_SSRCS; ++i) {
Packit 1f69a5
    test_data.ssrc[i] = 0x00112233 + i * 0x01000000;
Packit 1f69a5
    test_data.seq[i] = g_rand_int_range (test_data.rnd, 0, 0x10000);
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  gst_segment_init (&segment, GST_FORMAT_TIME);
Packit 1f69a5
  caps = gst_caps_from_string ("application/x-rtp");
Packit 1f69a5
  rnd = g_rand_copy (test_data.rnd);
Packit 1f69a5
Packit 1f69a5
  GST_INFO ("%u seed", seed);
Packit 1f69a5
  ht = gst_harness_stress_push_buffer_with_cb_start (h, caps, &segment,
Packit 1f69a5
      rtpstorage_stress_prepare_buffer, &test_data, NULL);
Packit 1f69a5
Packit 1f69a5
  requested = 0;
Packit 1f69a5
  timer = g_timer_new ();
Packit 1f69a5
  while (g_timer_elapsed (timer, NULL) < 2) {
Packit 1f69a5
    gsize ssrc_idx = g_rand_int_range (rnd, 0, STRESS_TEST_SSRCS);
Packit 1f69a5
Packit 1f69a5
    /* The following if statement is simply keeping the log
Packit 1f69a5
     * clean from ERROR messages */
Packit 1f69a5
    if (*((volatile gsize *) &test_data.count[ssrc_idx]) > 1) {
Packit 1f69a5
      guint16 lost_seq = *((volatile guint16 *) &test_data.seq[ssrc_idx]) - 5;
Packit 1f69a5
Packit 1f69a5
      GstBufferList *bufs_out = get_packets_for_recovery (h, 100,
Packit 1f69a5
          test_data.ssrc[ssrc_idx], lost_seq);
Packit 1f69a5
      if (bufs_out) {
Packit 1f69a5
        requested += gst_buffer_list_length (bufs_out);
Packit 1f69a5
        gst_buffer_list_unref (bufs_out);
Packit 1f69a5
      }
Packit 1f69a5
    }
Packit 1f69a5
Packit 1f69a5
    /* Having sleep here makes it hard to detect the race, but we need it to
Packit 1f69a5
     * allow another thread to push more buffers when running under valgrind */
Packit 1f69a5
    g_usleep (G_USEC_PER_SEC / 10000);
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  gst_harness_stress_thread_stop (ht);
Packit 1f69a5
  for (i = 0, total = 0; i < STRESS_TEST_SSRCS; ++i) {
Packit 1f69a5
    GST_INFO ("SSRC 0x%08x: %u packets", test_data.ssrc[i],
Packit 1f69a5
        (guint32) test_data.count[i]);
Packit 1f69a5
    total += test_data.count[i];
Packit 1f69a5
  }
Packit 1f69a5
  GST_INFO ("%u packets pushed through, %u requested", total, requested);
Packit 1f69a5
Packit 1f69a5
  g_rand_free (rnd);
Packit 1f69a5
  g_rand_free (test_data.rnd);
Packit 1f69a5
  gst_caps_unref (caps);
Packit 1f69a5
  g_timer_destroy (timer);
Packit 1f69a5
  gst_harness_teardown (h);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_END_TEST;
Packit 1f69a5
Packit 1f69a5
static Suite *
Packit 1f69a5
rtpstorage_suite (void)
Packit 1f69a5
{
Packit 1f69a5
  Suite *s = suite_create ("rtpstorage");
Packit 1f69a5
  TCase *tc_chain = tcase_create ("general");
Packit 1f69a5
Packit 1f69a5
  suite_add_tcase (s, tc_chain);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_up_and_down);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_resize);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_stop_redundant_packets);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_unknown_ssrc);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_packet_not_lost);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern0);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern1);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern2);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern3);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern4);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern5);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern6);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern7);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern8);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_loss_pattern9);
Packit 1f69a5
  tcase_add_test (tc_chain, test_rtpstorage_put_recovered_packet);
Packit 1f69a5
  tcase_add_test (tc_chain, rtpstorage_stress);
Packit 1f69a5
Packit 1f69a5
  return s;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GST_CHECK_MAIN (rtpstorage)