Blame tests/check/libs/collectpads.c

Packit f546b1
/*
Packit f546b1
 * collectpads.c - GstCollectPads testsuite
Packit f546b1
 * Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com>
Packit f546b1
 *
Packit f546b1
 * Authors:
Packit f546b1
 *   Alessandro Decina <alessandro.d@gmail.com>
Packit f546b1
 *
Packit f546b1
 * This library is free software; you can redistribute it and/or
Packit f546b1
 * modify it under the terms of the GNU Library General Public
Packit f546b1
 * License as published by the Free Software Foundation; either
Packit f546b1
 * version 2 of the License, or (at your option) any later version.
Packit f546b1
 *
Packit f546b1
 * This library is distributed in the hope that it will be useful,
Packit f546b1
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit f546b1
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit f546b1
 * Library General Public License for more details.
Packit f546b1
 *
Packit f546b1
 * You should have received a copy of the GNU Library General Public
Packit f546b1
 * License along with this library; if not, write to the
Packit f546b1
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
Packit f546b1
 * Boston, MA 02110-1301, USA.
Packit f546b1
 */
Packit f546b1
Packit f546b1
#ifdef HAVE_CONFIG_H
Packit f546b1
#  include "config.h"
Packit f546b1
#endif
Packit f546b1
Packit f546b1
#include <gst/check/gstcheck.h>
Packit f546b1
#include <gst/base/gstcollectpads.h>
Packit f546b1
Packit f546b1
/* dummy collectpads based element */
Packit f546b1
Packit f546b1
#define GST_TYPE_AGGREGATOR            (gst_aggregator_get_type ())
Packit f546b1
#define GST_AGGREGATOR(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_AGGREGATOR, GstAggregator))
Packit f546b1
#define GST_AGGREGATOR_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_AGGREGATOR, GstAggregatorClass))
Packit f546b1
#define GST_AGGREGATOR_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_AGGREGATOR, GstAggregatorClass))
Packit f546b1
Packit f546b1
typedef struct _GstAggregator GstAggregator;
Packit f546b1
typedef struct _GstAggregatorClass GstAggregatorClass;
Packit f546b1
Packit f546b1
struct _GstAggregator
Packit f546b1
{
Packit f546b1
  GstElement parent;
Packit f546b1
  GstCollectPads *collect;
Packit f546b1
  GstPad *srcpad;
Packit f546b1
  GstPad *sinkpad[2];
Packit f546b1
  gint padcount;
Packit f546b1
  gboolean first;
Packit f546b1
};
Packit f546b1
struct _GstAggregatorClass
Packit f546b1
{
Packit f546b1
  GstElementClass parent_class;
Packit f546b1
};
Packit f546b1
Packit f546b1
static GType gst_aggregator_get_type (void);
Packit f546b1
Packit f546b1
G_DEFINE_TYPE (GstAggregator, gst_aggregator, GST_TYPE_ELEMENT);
Packit f546b1
Packit f546b1
static GstStaticPadTemplate gst_aggregator_src_template =
Packit f546b1
GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
Packit f546b1
    GST_STATIC_CAPS_ANY);
Packit f546b1
Packit f546b1
static GstStaticPadTemplate gst_aggregator_sink_template =
Packit f546b1
GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
Packit f546b1
    GST_STATIC_CAPS_ANY);
Packit f546b1
Packit f546b1
static GstFlowReturn
Packit f546b1
gst_agregator_collected (GstCollectPads * pads, gpointer user_data)
Packit f546b1
{
Packit f546b1
  GstAggregator *aggregator = GST_AGGREGATOR (user_data);
Packit f546b1
  GstBuffer *inbuf;
Packit f546b1
  GstCollectData *collect_data = NULL;
Packit f546b1
  guint outsize = 0;
Packit f546b1
  GSList *walk;
Packit f546b1
Packit f546b1
  walk = pads->data;
Packit f546b1
  for (walk = pads->data; walk; walk = walk->next) {
Packit f546b1
    GstCollectData *tmp = (GstCollectData *) walk->data;
Packit f546b1
    if (tmp->buffer) {
Packit f546b1
      collect_data = tmp;
Packit f546b1
      break;
Packit f546b1
    }
Packit f546b1
  }
Packit f546b1
Packit f546b1
  /* can only happen when no pads to collect or all EOS */
Packit f546b1
  if (collect_data == NULL)
Packit f546b1
    goto eos;
Packit f546b1
Packit f546b1
  outsize = gst_buffer_get_size (collect_data->buffer);
Packit f546b1
  inbuf = gst_collect_pads_take_buffer (pads, collect_data, outsize);
Packit f546b1
  if (!inbuf)
Packit f546b1
    goto eos;
Packit f546b1
Packit f546b1
  if (aggregator->first) {
Packit f546b1
    GstSegment segment;
Packit f546b1
Packit f546b1
    gst_segment_init (&segment, GST_FORMAT_BYTES);
Packit f546b1
    gst_pad_push_event (aggregator->srcpad,
Packit f546b1
        gst_event_new_stream_start ("test"));
Packit f546b1
    gst_pad_push_event (aggregator->srcpad, gst_event_new_segment (&segment));
Packit f546b1
    aggregator->first = FALSE;
Packit f546b1
  }
Packit f546b1
Packit f546b1
  /* just forward the first buffer */
Packit f546b1
  GST_DEBUG_OBJECT (aggregator, "forward buffer %p", inbuf);
Packit f546b1
  return gst_pad_push (aggregator->srcpad, inbuf);
Packit f546b1
  /* ERRORS */
Packit f546b1
eos:
Packit f546b1
  {
Packit f546b1
    GST_DEBUG_OBJECT (aggregator, "no data available, must be EOS");
Packit f546b1
    gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
Packit f546b1
    return GST_FLOW_EOS;
Packit f546b1
  }
Packit f546b1
}
Packit f546b1
Packit f546b1
static GstPad *
Packit f546b1
gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
Packit f546b1
    const gchar * unused, const GstCaps * caps)
Packit f546b1
{
Packit f546b1
  GstAggregator *aggregator = GST_AGGREGATOR (element);
Packit f546b1
  gchar *name;
Packit f546b1
  GstPad *newpad;
Packit f546b1
  gint padcount;
Packit f546b1
Packit f546b1
  if (templ->direction != GST_PAD_SINK)
Packit f546b1
    return NULL;
Packit f546b1
Packit f546b1
  /* create new pad */
Packit f546b1
  padcount = g_atomic_int_add (&aggregator->padcount, 1);
Packit f546b1
  name = g_strdup_printf ("sink_%u", padcount);
Packit f546b1
  newpad = gst_pad_new_from_template (templ, name);
Packit f546b1
  g_free (name);
Packit f546b1
Packit f546b1
  gst_collect_pads_add_pad (aggregator->collect, newpad,
Packit f546b1
      sizeof (GstCollectData), NULL, TRUE);
Packit f546b1
Packit f546b1
  /* takes ownership of the pad */
Packit f546b1
  if (!gst_element_add_pad (GST_ELEMENT (aggregator), newpad))
Packit f546b1
    goto could_not_add;
Packit f546b1
Packit f546b1
  GST_DEBUG_OBJECT (aggregator, "added new pad %s", GST_OBJECT_NAME (newpad));
Packit f546b1
  return newpad;
Packit f546b1
Packit f546b1
  /* errors */
Packit f546b1
could_not_add:
Packit f546b1
  {
Packit f546b1
    GST_DEBUG_OBJECT (aggregator, "could not add pad");
Packit f546b1
    gst_collect_pads_remove_pad (aggregator->collect, newpad);
Packit f546b1
    gst_object_unref (newpad);
Packit f546b1
    return NULL;
Packit f546b1
  }
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
gst_aggregator_release_pad (GstElement * element, GstPad * pad)
Packit f546b1
{
Packit f546b1
  GstAggregator *aggregator = GST_AGGREGATOR (element);
Packit f546b1
Packit f546b1
  if (aggregator->collect)
Packit f546b1
    gst_collect_pads_remove_pad (aggregator->collect, pad);
Packit f546b1
  gst_element_remove_pad (element, pad);
Packit f546b1
}
Packit f546b1
Packit f546b1
static GstStateChangeReturn
Packit f546b1
gst_aggregator_change_state (GstElement * element, GstStateChange transition)
Packit f546b1
{
Packit f546b1
  GstAggregator *aggregator = GST_AGGREGATOR (element);
Packit f546b1
  GstStateChangeReturn ret;
Packit f546b1
Packit f546b1
  switch (transition) {
Packit f546b1
    case GST_STATE_CHANGE_NULL_TO_READY:
Packit f546b1
      break;
Packit f546b1
    case GST_STATE_CHANGE_READY_TO_PAUSED:
Packit f546b1
      gst_collect_pads_start (aggregator->collect);
Packit f546b1
      break;
Packit f546b1
    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
Packit f546b1
      break;
Packit f546b1
    case GST_STATE_CHANGE_PAUSED_TO_READY:
Packit f546b1
      /* need to unblock the collectpads before calling the
Packit f546b1
       * parent change_state so that streaming can finish */
Packit f546b1
      gst_collect_pads_stop (aggregator->collect);
Packit f546b1
      break;
Packit f546b1
    default:
Packit f546b1
      break;
Packit f546b1
  }
Packit f546b1
Packit f546b1
  ret =
Packit f546b1
      GST_ELEMENT_CLASS (gst_aggregator_parent_class)->change_state (element,
Packit f546b1
      transition);
Packit f546b1
Packit f546b1
  switch (transition) {
Packit f546b1
    default:
Packit f546b1
      break;
Packit f546b1
  }
Packit f546b1
Packit f546b1
  return ret;
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
gst_aggregator_dispose (GObject * object)
Packit f546b1
{
Packit f546b1
  GstAggregator *aggregator = GST_AGGREGATOR (object);
Packit f546b1
Packit f546b1
  if (aggregator->collect) {
Packit f546b1
    gst_object_unref (aggregator->collect);
Packit f546b1
    aggregator->collect = NULL;
Packit f546b1
  }
Packit f546b1
Packit f546b1
  G_OBJECT_CLASS (gst_aggregator_parent_class)->dispose (object);
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
gst_aggregator_class_init (GstAggregatorClass * klass)
Packit f546b1
{
Packit f546b1
  GObjectClass *gobject_class = (GObjectClass *) klass;
Packit f546b1
  GstElementClass *gstelement_class = (GstElementClass *) klass;
Packit f546b1
Packit f546b1
  gobject_class->dispose = gst_aggregator_dispose;
Packit f546b1
Packit f546b1
  gst_element_class_add_static_pad_template (gstelement_class,
Packit f546b1
      &gst_aggregator_src_template);
Packit f546b1
  gst_element_class_add_static_pad_template (gstelement_class,
Packit f546b1
      &gst_aggregator_sink_template);
Packit f546b1
  gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
Packit f546b1
      "Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
Packit f546b1
Packit f546b1
  gstelement_class->request_new_pad =
Packit f546b1
      GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
Packit f546b1
  gstelement_class->release_pad =
Packit f546b1
      GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
Packit f546b1
  gstelement_class->change_state =
Packit f546b1
      GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
gst_aggregator_init (GstAggregator * agregator)
Packit f546b1
{
Packit f546b1
  GstPadTemplate *template;
Packit f546b1
Packit f546b1
  template = gst_static_pad_template_get (&gst_aggregator_src_template);
Packit f546b1
  agregator->srcpad = gst_pad_new_from_template (template, "src");
Packit f546b1
  gst_object_unref (template);
Packit f546b1
Packit f546b1
  GST_PAD_SET_PROXY_CAPS (agregator->srcpad);
Packit f546b1
  gst_element_add_pad (GST_ELEMENT (agregator), agregator->srcpad);
Packit f546b1
Packit f546b1
  /* keep track of the sinkpads requested */
Packit f546b1
  agregator->collect = gst_collect_pads_new ();
Packit f546b1
  gst_collect_pads_set_function (agregator->collect,
Packit f546b1
      GST_DEBUG_FUNCPTR (gst_agregator_collected), agregator);
Packit f546b1
Packit f546b1
  agregator->first = TRUE;
Packit f546b1
}
Packit f546b1
Packit f546b1
static gboolean
Packit f546b1
gst_agregator_plugin_init (GstPlugin * plugin)
Packit f546b1
{
Packit f546b1
  return gst_element_register (plugin, "aggregator", GST_RANK_NONE,
Packit f546b1
      GST_TYPE_AGGREGATOR);
Packit f546b1
}
Packit f546b1
Packit f546b1
static gboolean
Packit f546b1
gst_agregator_plugin_register (void)
Packit f546b1
{
Packit f546b1
  return gst_plugin_register_static (GST_VERSION_MAJOR,
Packit f546b1
      GST_VERSION_MINOR,
Packit f546b1
      "aggregator",
Packit f546b1
      "Combine buffers",
Packit f546b1
      gst_agregator_plugin_init,
Packit f546b1
      VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
Packit f546b1
}
Packit f546b1
Packit f546b1
Packit f546b1
#define fail_unless_collected(expected)           \
Packit f546b1
G_STMT_START {                                    \
Packit f546b1
  g_mutex_lock (&lock);                           \
Packit f546b1
  while (expected == TRUE && collected == FALSE)  \
Packit f546b1
    g_cond_wait (&cond, &lock);                   \
Packit f546b1
  fail_unless_equals_int (collected, expected);   \
Packit f546b1
  g_mutex_unlock (&lock);                         \
Packit f546b1
} G_STMT_END;
Packit f546b1
Packit f546b1
typedef struct
Packit f546b1
{
Packit f546b1
  char foo;
Packit f546b1
} BadCollectData;
Packit f546b1
Packit f546b1
typedef struct
Packit f546b1
{
Packit f546b1
  GstCollectData data;
Packit f546b1
  GstPad *pad;
Packit f546b1
  GstBuffer *buffer;
Packit f546b1
  GstEvent *event;
Packit f546b1
  GstFlowReturn expected_result;
Packit f546b1
} TestData;
Packit f546b1
Packit f546b1
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
Packit f546b1
    GST_PAD_SRC,
Packit f546b1
    GST_PAD_ALWAYS,
Packit f546b1
    GST_STATIC_CAPS_ANY);
Packit f546b1
Packit f546b1
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
Packit f546b1
    GST_PAD_SINK,
Packit f546b1
    GST_PAD_ALWAYS,
Packit f546b1
    GST_STATIC_CAPS_ANY);
Packit f546b1
Packit f546b1
static GstCollectPads *collect;
Packit f546b1
static gboolean collected;
Packit f546b1
static GstPad *agg_srcpad, *srcpad1, *srcpad2;
Packit f546b1
static GstPad *sinkpad1, *sinkpad2;
Packit f546b1
static TestData *data1, *data2;
Packit f546b1
static GstBuffer *outbuf1, *outbuf2;
Packit f546b1
static GstElement *agg;
Packit f546b1
gboolean fail_seek;
Packit f546b1
gint flush_start_events, flush_stop_events;
Packit f546b1
Packit f546b1
static GMutex lock;
Packit f546b1
static GCond cond;
Packit f546b1
Packit f546b1
static GstFlowReturn
Packit f546b1
collected_cb (GstCollectPads * pads, gpointer user_data)
Packit f546b1
{
Packit f546b1
  outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
Packit f546b1
  outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
Packit f546b1
Packit f546b1
  g_mutex_lock (&lock);
Packit f546b1
  collected = TRUE;
Packit f546b1
  g_cond_signal (&cond;;
Packit f546b1
  g_mutex_unlock (&lock);
Packit f546b1
Packit f546b1
  return GST_FLOW_OK;
Packit f546b1
}
Packit f546b1
Packit f546b1
static GstFlowReturn
Packit f546b1
handle_buffer_cb (GstCollectPads * pads, GstCollectData * data,
Packit f546b1
    GstBuffer * buf, gpointer user_data)
Packit f546b1
{
Packit f546b1
  GST_DEBUG ("collected buffers via callback");
Packit f546b1
Packit f546b1
  outbuf1 = gst_collect_pads_pop (pads, (GstCollectData *) data1);
Packit f546b1
  outbuf2 = gst_collect_pads_pop (pads, (GstCollectData *) data2);
Packit f546b1
Packit f546b1
  g_mutex_lock (&lock);
Packit f546b1
  collected = TRUE;
Packit f546b1
  g_cond_signal (&cond;;
Packit f546b1
  g_mutex_unlock (&lock);
Packit f546b1
Packit f546b1
  return GST_FLOW_OK;
Packit f546b1
}
Packit f546b1
Packit f546b1
static gpointer
Packit f546b1
push_buffer (gpointer user_data)
Packit f546b1
{
Packit f546b1
  GstFlowReturn flow;
Packit f546b1
  GstCaps *caps;
Packit f546b1
  TestData *test_data = (TestData *) user_data;
Packit f546b1
  GstSegment segment;
Packit f546b1
Packit f546b1
  gst_pad_push_event (test_data->pad, gst_event_new_stream_start ("test"));
Packit f546b1
Packit f546b1
  caps = gst_caps_new_empty_simple ("foo/x-bar");
Packit f546b1
  gst_pad_push_event (test_data->pad, gst_event_new_caps (caps));
Packit f546b1
  gst_caps_unref (caps);
Packit f546b1
Packit f546b1
  gst_segment_init (&segment, GST_FORMAT_TIME);
Packit f546b1
  gst_pad_push_event (test_data->pad, gst_event_new_segment (&segment));
Packit f546b1
Packit f546b1
  flow = gst_pad_push (test_data->pad, test_data->buffer);
Packit f546b1
  fail_unless (flow == test_data->expected_result, "got flow %s instead of OK",
Packit f546b1
      gst_flow_get_name (flow));
Packit f546b1
Packit f546b1
  return NULL;
Packit f546b1
}
Packit f546b1
Packit f546b1
static gpointer
Packit f546b1
push_event (gpointer user_data)
Packit f546b1
{
Packit f546b1
  TestData *test_data = (TestData *) user_data;
Packit f546b1
Packit f546b1
  fail_unless (gst_pad_push_event (test_data->pad, test_data->event) == TRUE);
Packit f546b1
Packit f546b1
  return NULL;
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
setup_default (void)
Packit f546b1
{
Packit f546b1
  collect = gst_collect_pads_new ();
Packit f546b1
Packit f546b1
  srcpad1 = gst_pad_new_from_static_template (&srctemplate, "src1");
Packit f546b1
  srcpad2 = gst_pad_new_from_static_template (&srctemplate, "src2");
Packit f546b1
  sinkpad1 = gst_pad_new_from_static_template (&sinktemplate, "sink1");
Packit f546b1
  sinkpad2 = gst_pad_new_from_static_template (&sinktemplate, "sink2");
Packit f546b1
  fail_unless (gst_pad_link (srcpad1, sinkpad1) == GST_PAD_LINK_OK);
Packit f546b1
  fail_unless (gst_pad_link (srcpad2, sinkpad2) == GST_PAD_LINK_OK);
Packit f546b1
Packit f546b1
  gst_pad_set_active (sinkpad1, TRUE);
Packit f546b1
  gst_pad_set_active (sinkpad2, TRUE);
Packit f546b1
  gst_pad_set_active (srcpad1, TRUE);
Packit f546b1
  gst_pad_set_active (srcpad2, TRUE);
Packit f546b1
Packit f546b1
  data1 = NULL;
Packit f546b1
  data2 = NULL;
Packit f546b1
  outbuf1 = NULL;
Packit f546b1
  outbuf2 = NULL;
Packit f546b1
  collected = FALSE;
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
setup (void)
Packit f546b1
{
Packit f546b1
  setup_default ();
Packit f546b1
  gst_collect_pads_set_function (collect, collected_cb, NULL);
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
setup_buffer_cb (void)
Packit f546b1
{
Packit f546b1
  setup_default ();
Packit f546b1
  gst_collect_pads_set_buffer_function (collect, handle_buffer_cb, NULL);
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
teardown (void)
Packit f546b1
{
Packit f546b1
  gst_object_unref (srcpad1);
Packit f546b1
  gst_object_unref (srcpad2);
Packit f546b1
  gst_object_unref (sinkpad1);
Packit f546b1
  gst_object_unref (sinkpad2);
Packit f546b1
  gst_object_unref (collect);
Packit f546b1
  srcpad1 = srcpad2 = NULL;
Packit f546b1
  sinkpad1 = sinkpad2 = NULL;
Packit f546b1
  collect = NULL;
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_START_TEST (test_pad_add_remove)
Packit f546b1
{
Packit f546b1
  ASSERT_CRITICAL (gst_collect_pads_add_pad (collect, sinkpad1,
Packit f546b1
          sizeof (BadCollectData), NULL, TRUE));
Packit f546b1
Packit f546b1
  data1 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad1, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data1 != NULL);
Packit f546b1
Packit f546b1
  fail_unless (gst_collect_pads_remove_pad (collect, sinkpad2) == FALSE);
Packit f546b1
  fail_unless (gst_collect_pads_remove_pad (collect, sinkpad1) == TRUE);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
GST_START_TEST (test_collect)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1, *buf2;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
Packit f546b1
  data1 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad1, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data1 != NULL);
Packit f546b1
Packit f546b1
  data2 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad2, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data2 != NULL);
Packit f546b1
Packit f546b1
  buf1 = gst_buffer_new ();
Packit f546b1
  buf2 = gst_buffer_new ();
Packit f546b1
Packit f546b1
  /* start collect pads */
Packit f546b1
  gst_collect_pads_start (collect);
Packit f546b1
Packit f546b1
  /* push buffers on the pads */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  /* here thread1 is blocked and srcpad1 has a queued buffer */
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->buffer = buf2;
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
Packit f546b1
Packit f546b1
  /* now both pads have a buffer */
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  fail_unless (outbuf1 == buf1);
Packit f546b1
  fail_unless (outbuf2 == buf2);
Packit f546b1
Packit f546b1
  /* these will return immediately as at this point the threads have been
Packit f546b1
   * unlocked and are finished */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  gst_collect_pads_stop (collect);
Packit f546b1
Packit f546b1
  gst_buffer_unref (buf1);
Packit f546b1
  gst_buffer_unref (buf2);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
Packit f546b1
GST_START_TEST (test_collect_eos)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
Packit f546b1
  data1 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad1, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data1 != NULL);
Packit f546b1
Packit f546b1
  data2 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad2, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data2 != NULL);
Packit f546b1
Packit f546b1
  buf1 = gst_buffer_new ();
Packit f546b1
Packit f546b1
  /* start collect pads */
Packit f546b1
  gst_collect_pads_start (collect);
Packit f546b1
Packit f546b1
  /* push a buffer on srcpad1 and EOS on srcpad2 */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  /* here thread1 is blocked and srcpad1 has a queued buffer */
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->event = gst_event_new_eos ();
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
Packit f546b1
  /* now sinkpad1 has a buffer and sinkpad2 has EOS */
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  fail_unless (outbuf1 == buf1);
Packit f546b1
  /* sinkpad2 has EOS so a NULL buffer is returned */
Packit f546b1
  fail_unless (outbuf2 == NULL);
Packit f546b1
Packit f546b1
  /* these will return immediately as when the data is popped the threads are
Packit f546b1
   * unlocked and will terminate */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  gst_collect_pads_stop (collect);
Packit f546b1
Packit f546b1
  gst_buffer_unref (buf1);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
GST_START_TEST (test_collect_twice)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1, *buf2;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
Packit f546b1
  data1 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad1, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data1 != NULL);
Packit f546b1
Packit f546b1
  data2 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad2, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data2 != NULL);
Packit f546b1
Packit f546b1
  GST_INFO ("round 1");
Packit f546b1
Packit f546b1
  buf1 = gst_buffer_new ();
Packit f546b1
Packit f546b1
  /* start collect pads */
Packit f546b1
  gst_collect_pads_start (collect);
Packit f546b1
Packit f546b1
  /* queue a buffer */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  /* here thread1 is blocked and srcpad1 has a queued buffer */
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  /* push EOS on the other pad */
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->event = gst_event_new_eos ();
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
Packit f546b1
Packit f546b1
  /* one of the pads has a buffer, the other has EOS */
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  fail_unless (outbuf1 == buf1);
Packit f546b1
  /* there's nothing to pop from the one which received EOS */
Packit f546b1
  fail_unless (outbuf2 == NULL);
Packit f546b1
Packit f546b1
  /* these will return immediately as at this point the threads have been
Packit f546b1
   * unlocked and are finished */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  gst_collect_pads_stop (collect);
Packit f546b1
  collected = FALSE;
Packit f546b1
Packit f546b1
  GST_INFO ("round 2");
Packit f546b1
Packit f546b1
  buf2 = gst_buffer_new ();
Packit f546b1
Packit f546b1
  /* clear EOS from pads */
Packit f546b1
  gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE));
Packit f546b1
  gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
Packit f546b1
Packit f546b1
  /* start collect pads */
Packit f546b1
  gst_collect_pads_start (collect);
Packit f546b1
Packit f546b1
  /* push buffers on the pads */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  /* here thread1 is blocked and srcpad1 has a queued buffer */
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->buffer = buf2;
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
Packit f546b1
Packit f546b1
  /* now both pads have a buffer */
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  /* these will return immediately as at this point the threads have been
Packit f546b1
   * unlocked and are finished */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  gst_collect_pads_stop (collect);
Packit f546b1
Packit f546b1
  gst_buffer_unref (buf1);
Packit f546b1
  gst_buffer_unref (buf2);
Packit f546b1
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
Packit f546b1
/* Test the default collected buffer func */
Packit f546b1
GST_START_TEST (test_collect_default)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1, *buf2;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
Packit f546b1
  data1 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad1, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data1 != NULL);
Packit f546b1
Packit f546b1
  data2 = (TestData *) gst_collect_pads_add_pad (collect,
Packit f546b1
      sinkpad2, sizeof (TestData), NULL, TRUE);
Packit f546b1
  fail_unless (data2 != NULL);
Packit f546b1
Packit f546b1
  buf1 = gst_buffer_new ();
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf1) = 0;
Packit f546b1
  buf2 = gst_buffer_new ();
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
Packit f546b1
Packit f546b1
  /* start collect pads */
Packit f546b1
  gst_collect_pads_start (collect);
Packit f546b1
Packit f546b1
  /* push buffers on the pads */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  /* here thread1 is blocked and srcpad1 has a queued buffer */
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->buffer = buf2;
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
Packit f546b1
Packit f546b1
  /* now both pads have a buffer */
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  /* The default callback should have popped the buffer with lower timestamp,
Packit f546b1
   * and this should therefore be NULL: */
Packit f546b1
  fail_unless (outbuf1 == NULL);
Packit f546b1
  /* While this one should still be pending: */
Packit f546b1
  fail_unless (outbuf2 == buf2);
Packit f546b1
Packit f546b1
  /* these will return immediately as at this point the threads have been
Packit f546b1
   * unlocked and are finished */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  gst_collect_pads_stop (collect);
Packit f546b1
Packit f546b1
  gst_buffer_unref (buf1);
Packit f546b1
  gst_buffer_unref (buf2);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
Packit f546b1
#define NUM_BUFFERS 3
Packit f546b1
static void
Packit f546b1
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
Packit f546b1
{
Packit f546b1
  *count = *count + 1;
Packit f546b1
}
Packit f546b1
Packit f546b1
/* Test a linear pipeline using aggregator */
Packit f546b1
GST_START_TEST (test_linear_pipeline)
Packit f546b1
{
Packit f546b1
  GstElement *pipeline, *src, *agg, *sink;
Packit f546b1
  GstBus *bus;
Packit f546b1
  GstMessage *msg;
Packit f546b1
  gint count = 0;
Packit f546b1
Packit f546b1
  pipeline = gst_pipeline_new ("pipeline");
Packit f546b1
  src = gst_check_setup_element ("fakesrc");
Packit f546b1
  g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
Packit f546b1
      NULL);
Packit f546b1
  agg = gst_check_setup_element ("aggregator");
Packit f546b1
  sink = gst_check_setup_element ("fakesink");
Packit f546b1
  g_object_set (sink, "signal-handoffs", TRUE, NULL);
Packit f546b1
  g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
Packit f546b1
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), src));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
Packit f546b1
  fail_unless (gst_element_link (src, agg));
Packit f546b1
  fail_unless (gst_element_link (agg, sink));
Packit f546b1
Packit f546b1
  bus = gst_element_get_bus (pipeline);
Packit f546b1
  fail_if (bus == NULL);
Packit f546b1
  gst_element_set_state (pipeline, GST_STATE_PLAYING);
Packit f546b1
Packit f546b1
  msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
Packit f546b1
  fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
Packit f546b1
  gst_message_unref (msg);
Packit f546b1
Packit f546b1
  fail_unless_equals_int (count, NUM_BUFFERS);
Packit f546b1
Packit f546b1
  gst_element_set_state (pipeline, GST_STATE_NULL);
Packit f546b1
  gst_object_unref (bus);
Packit f546b1
  gst_object_unref (pipeline);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
/* Test a linear pipeline using aggregator */
Packit f546b1
GST_START_TEST (test_branched_pipeline)
Packit f546b1
{
Packit f546b1
  GstElement *pipeline, *src, *tee, *queue[2], *agg, *sink;
Packit f546b1
  GstBus *bus;
Packit f546b1
  GstMessage *msg;
Packit f546b1
  gint count = 0;
Packit f546b1
Packit f546b1
  pipeline = gst_pipeline_new ("pipeline");
Packit f546b1
  src = gst_check_setup_element ("fakesrc");
Packit f546b1
  g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
Packit f546b1
      NULL);
Packit f546b1
  tee = gst_check_setup_element ("tee");
Packit f546b1
  queue[0] = gst_check_setup_element ("queue");
Packit f546b1
  gst_object_set_name (GST_OBJECT (queue[0]), "queue0");
Packit f546b1
  queue[1] = gst_check_setup_element ("queue");
Packit f546b1
  gst_object_set_name (GST_OBJECT (queue[1]), "queue1");
Packit f546b1
  agg = gst_check_setup_element ("aggregator");
Packit f546b1
  sink = gst_check_setup_element ("fakesink");
Packit f546b1
  g_object_set (sink, "signal-handoffs", TRUE, NULL);
Packit f546b1
  g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
Packit f546b1
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), src));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), tee));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), queue[0]));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), queue[1]));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
Packit f546b1
  fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
Packit f546b1
  fail_unless (gst_element_link (src, tee));
Packit f546b1
  fail_unless (gst_element_link (tee, queue[0]));
Packit f546b1
  fail_unless (gst_element_link (tee, queue[1]));
Packit f546b1
  fail_unless (gst_element_link (queue[0], agg));
Packit f546b1
  fail_unless (gst_element_link (queue[1], agg));
Packit f546b1
  fail_unless (gst_element_link (agg, sink));
Packit f546b1
Packit f546b1
  bus = gst_element_get_bus (pipeline);
Packit f546b1
  fail_if (bus == NULL);
Packit f546b1
  gst_element_set_state (pipeline, GST_STATE_PLAYING);
Packit f546b1
Packit f546b1
  msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
Packit f546b1
  fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
Packit f546b1
  gst_message_unref (msg);
Packit f546b1
Packit f546b1
  /* we have two branches, but we still only forward buffers from one branch */
Packit f546b1
  fail_unless_equals_int (count, NUM_BUFFERS * 2);
Packit f546b1
Packit f546b1
  gst_element_set_state (pipeline, GST_STATE_NULL);
Packit f546b1
  gst_object_unref (bus);
Packit f546b1
  gst_object_unref (pipeline);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
static GstPadProbeReturn
Packit f546b1
downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
Packit f546b1
{
Packit f546b1
  if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
Packit f546b1
    if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
Packit f546b1
        GST_EVENT_FLUSH_START)
Packit f546b1
      g_atomic_int_inc (&flush_start_events);
Packit f546b1
    else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
Packit f546b1
        GST_EVENT_FLUSH_STOP)
Packit f546b1
      g_atomic_int_inc (&flush_stop_events);
Packit f546b1
  } else if (info->type & GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM) {
Packit f546b1
    g_mutex_lock (&lock);
Packit f546b1
    collected = TRUE;
Packit f546b1
    g_cond_signal (&cond;;
Packit f546b1
    g_mutex_unlock (&lock);
Packit f546b1
  }
Packit f546b1
Packit f546b1
  return GST_PAD_PROBE_DROP;
Packit f546b1
}
Packit f546b1
Packit f546b1
static gboolean
Packit f546b1
src_event (GstPad * pad, GstObject * parent, GstEvent * event)
Packit f546b1
{
Packit f546b1
  gboolean ret = TRUE;
Packit f546b1
  if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK) {
Packit f546b1
    if (g_atomic_int_compare_and_exchange (&fail_seek, TRUE, FALSE) == TRUE) {
Packit f546b1
      ret = FALSE;
Packit f546b1
    }
Packit f546b1
  }
Packit f546b1
Packit f546b1
  gst_event_unref (event);
Packit f546b1
  return ret;
Packit f546b1
}
Packit f546b1
Packit f546b1
static gboolean
Packit f546b1
agg_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
Packit f546b1
{
Packit f546b1
  return gst_collect_pads_src_event_default (GST_AGGREGATOR (parent)->collect,
Packit f546b1
      pad, event);
Packit f546b1
}
Packit f546b1
Packit f546b1
static GstPad *
Packit f546b1
setup_src_pad (GstElement * element,
Packit f546b1
    GstStaticPadTemplate * tmpl, const char *name)
Packit f546b1
{
Packit f546b1
  GstPad *srcpad, *sinkpad;
Packit f546b1
Packit f546b1
  srcpad = gst_pad_new_from_static_template (tmpl, "src");
Packit f546b1
  sinkpad = gst_element_get_request_pad (element, name);
Packit f546b1
  fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK,
Packit f546b1
      "Could not link source and %s sink pads", GST_ELEMENT_NAME (element));
Packit f546b1
  gst_pad_set_event_function (srcpad, src_event);
Packit f546b1
  gst_pad_set_active (srcpad, TRUE);
Packit f546b1
  gst_object_unref (sinkpad);
Packit f546b1
Packit f546b1
  return srcpad;
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
flush_setup (void)
Packit f546b1
{
Packit f546b1
  agg = gst_check_setup_element ("aggregator");
Packit f546b1
  agg_srcpad = gst_element_get_static_pad (agg, "src");
Packit f546b1
  srcpad1 = setup_src_pad (agg, &srctemplate, "sink_0");
Packit f546b1
  srcpad2 = setup_src_pad (agg, &srctemplate, "sink_1");
Packit f546b1
  gst_pad_add_probe (agg_srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
Packit f546b1
      GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM |
Packit f546b1
      GST_PAD_PROBE_TYPE_EVENT_FLUSH, downstream_probe_cb, NULL, NULL);
Packit f546b1
  gst_pad_set_event_function (agg_srcpad, agg_src_event);
Packit f546b1
  data1 = g_new0 (TestData, 1);
Packit f546b1
  data2 = g_new0 (TestData, 1);
Packit f546b1
  g_atomic_int_set (&flush_start_events, 0);
Packit f546b1
  g_atomic_int_set (&flush_stop_events, 0);
Packit f546b1
  gst_element_set_state (agg, GST_STATE_PLAYING);
Packit f546b1
  outbuf1 = NULL;
Packit f546b1
  outbuf2 = NULL;
Packit f546b1
  collected = FALSE;
Packit f546b1
}
Packit f546b1
Packit f546b1
static void
Packit f546b1
flush_teardown (void)
Packit f546b1
{
Packit f546b1
  gst_element_set_state (agg, GST_STATE_NULL);
Packit f546b1
  gst_object_unref (agg);
Packit f546b1
  gst_object_unref (agg_srcpad);
Packit f546b1
  gst_object_unref (srcpad1);
Packit f546b1
  gst_object_unref (srcpad2);
Packit f546b1
  g_free (data1);
Packit f546b1
  g_free (data2);
Packit f546b1
  agg = NULL;
Packit f546b1
  agg_srcpad = NULL;
Packit f546b1
  srcpad1 = srcpad2 = NULL;
Packit f546b1
  data1 = data2 = NULL;
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_START_TEST (test_flushing_seek_failure)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1, *buf2;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
  GstEvent *event;
Packit f546b1
Packit f546b1
  /* Queue a buffer in agg:sink_1. Do a flushing seek and simulate one upstream
Packit f546b1
   * element failing to handle the seek (see src_event()). Check that the
Packit f546b1
   * flushing seek logic doesn't get triggered by checking that the buffer
Packit f546b1
   * queued on agg:sink_1 doesn't get flushed.
Packit f546b1
   */
Packit f546b1
Packit f546b1
  /* queue a buffer in agg:sink_1 */
Packit f546b1
  buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf2) = GST_SECOND;
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->buffer = buf2;
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
Packit f546b1
  fail_unless_collected (FALSE);
Packit f546b1
Packit f546b1
  /* do the seek */
Packit f546b1
  event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
Packit f546b1
      GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
Packit f546b1
  g_atomic_int_set (&fail_seek, TRUE);
Packit f546b1
  fail_if (gst_pad_send_event (agg_srcpad, event));
Packit f546b1
Packit f546b1
  /* flush srcpad1 (pretending it's the upstream that didn't fail to seek) */
Packit f546b1
  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
Packit f546b1
  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
Packit f546b1
Packit f546b1
  /* check that the flush events reached agg:src */
Packit f546b1
  fail_unless_equals_int (flush_start_events, 1);
Packit f546b1
  fail_unless_equals_int (flush_stop_events, 1);
Packit f546b1
Packit f546b1
  /* push a buffer on agg:sink_0. This should trigger a collect since agg:sink_1
Packit f546b1
   * should not have been flushed at this point */
Packit f546b1
  buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf1) = 0;
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
  collected = FALSE;
Packit f546b1
Packit f546b1
  /* at this point thread1 must have returned */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
Packit f546b1
  /* push eos on agg:sink_0 so the buffer queued in agg:sink_1 is collected and
Packit f546b1
   * the pushing thread returns */
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->event = gst_event_new_eos ();
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_event, data1, NULL);
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
GST_START_TEST (test_flushing_seek)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf1, *buf2;
Packit f546b1
  GThread *thread1, *thread2;
Packit f546b1
  GstEvent *event;
Packit f546b1
Packit f546b1
  /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
Packit f546b1
   * new flushing seek logic is triggered. On the first FLUSH_START call the
Packit f546b1
   * buffers queued in collectpads should get flushed. Only one FLUSH_START and
Packit f546b1
   * one FLUSH_STOP should be forwarded downstream.
Packit f546b1
   */
Packit f546b1
  buf2 = gst_buffer_new_allocate (NULL, 1, NULL);
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf2) = 0;
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->buffer = buf2;
Packit f546b1
  /* expect this buffer to be flushed */
Packit f546b1
  data2->expected_result = GST_FLOW_FLUSHING;
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_buffer, data2, NULL);
Packit f546b1
Packit f546b1
  /* now do a successful flushing seek */
Packit f546b1
  event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
Packit f546b1
      GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
Packit f546b1
  g_atomic_int_set (&fail_seek, FALSE);
Packit f546b1
  fail_unless (gst_pad_send_event (agg_srcpad, event));
Packit f546b1
Packit f546b1
  /* flushing starts once one of the upstream elements sends the first
Packit f546b1
   * FLUSH_START */
Packit f546b1
  fail_unless_equals_int (flush_start_events, 0);
Packit f546b1
  fail_unless_equals_int (flush_stop_events, 0);
Packit f546b1
Packit f546b1
  /* flush ogg:sink_0. This flushs collectpads, calls ::flush() and sends
Packit f546b1
   * FLUSH_START downstream */
Packit f546b1
  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_start ()));
Packit f546b1
  fail_unless_equals_int (flush_start_events, 1);
Packit f546b1
  fail_unless_equals_int (flush_stop_events, 0);
Packit f546b1
  /* the first FLUSH_STOP is forwarded downstream */
Packit f546b1
  fail_unless (gst_pad_push_event (srcpad1, gst_event_new_flush_stop (TRUE)));
Packit f546b1
  fail_unless_equals_int (flush_start_events, 1);
Packit f546b1
  fail_unless_equals_int (flush_stop_events, 1);
Packit f546b1
  /* at this point even the other pad agg:sink_1 should be flushing so thread2
Packit f546b1
   * should have stopped */
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
Packit f546b1
  /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
Packit f546b1
   * that flushing completes once all the pads have been flushed */
Packit f546b1
  buf1 = gst_buffer_new_allocate (NULL, 1, NULL);
Packit f546b1
  GST_BUFFER_TIMESTAMP (buf1) = GST_SECOND;
Packit f546b1
  data1->pad = srcpad1;
Packit f546b1
  data1->buffer = buf1;
Packit f546b1
  thread1 = g_thread_try_new ("gst-check", push_buffer, data1, NULL);
Packit f546b1
Packit f546b1
  /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
Packit f546b1
   * sent downstream */
Packit f546b1
  gst_pad_push_event (srcpad2, gst_event_new_flush_start ());
Packit f546b1
  gst_pad_push_event (srcpad2, gst_event_new_flush_stop (TRUE));
Packit f546b1
Packit f546b1
  /* still, only one FLUSH_START and one FLUSH_STOP are forwarded downstream */
Packit f546b1
  fail_unless_equals_int (flush_start_events, 1);
Packit f546b1
  fail_unless_equals_int (flush_stop_events, 1);
Packit f546b1
Packit f546b1
  /* EOS agg:sink_1 so the buffer queued in agg:sink_0 is collected */
Packit f546b1
  data2->pad = srcpad2;
Packit f546b1
  data2->event = gst_event_new_eos ();
Packit f546b1
  thread2 = g_thread_try_new ("gst-check", push_event, data2, NULL);
Packit f546b1
  fail_unless_collected (TRUE);
Packit f546b1
Packit f546b1
  /* these will return immediately as at this point the threads have been
Packit f546b1
   * unlocked and are finished */
Packit f546b1
  g_thread_join (thread1);
Packit f546b1
  g_thread_join (thread2);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
GST_START_TEST (test_clip_running_time)
Packit f546b1
{
Packit f546b1
  GstBuffer *buf;
Packit f546b1
  GstCollectData data = { 0 };
Packit f546b1
Packit f546b1
  buf = gst_buffer_new ();
Packit f546b1
  data.pad = gst_pad_new ("clip_test", GST_PAD_SRC);
Packit f546b1
Packit f546b1
  GST_BUFFER_PTS (buf) = 0;
Packit f546b1
  GST_BUFFER_DTS (buf) = 0;
Packit f546b1
  gst_segment_init (&data.segment, GST_FORMAT_TIME);
Packit f546b1
Packit f546b1
  gst_collect_pads_clip_running_time (NULL, &data, buf, &buf, NULL);
Packit f546b1
Packit f546b1
  fail_unless (buf != NULL);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_PTS (buf), 0);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_DTS (buf), 0);
Packit f546b1
  fail_unless_equals_int64 (GST_COLLECT_PADS_DTS (&data), 0);
Packit f546b1
Packit f546b1
  GST_BUFFER_PTS (buf) = 1000;
Packit f546b1
  GST_BUFFER_DTS (buf) = 0;
Packit f546b1
  data.segment.start = 1000;
Packit f546b1
Packit f546b1
  gst_collect_pads_clip_running_time (NULL, &data, buf, &buf, NULL);
Packit f546b1
Packit f546b1
  fail_unless (buf != NULL);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_PTS (buf), 0);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_DTS (buf), GST_CLOCK_TIME_NONE);
Packit f546b1
  fail_unless_equals_int64 (GST_COLLECT_PADS_DTS (&data), -1000);
Packit f546b1
Packit f546b1
  GST_BUFFER_PTS (buf) = 1000;
Packit f546b1
  GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
Packit f546b1
Packit f546b1
  gst_collect_pads_clip_running_time (NULL, &data, buf, &buf, NULL);
Packit f546b1
Packit f546b1
  fail_unless (buf != NULL);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_PTS (buf), 0);
Packit f546b1
  fail_unless_equals_uint64 (GST_BUFFER_DTS (buf), GST_CLOCK_TIME_NONE);
Packit f546b1
  fail_if (GST_COLLECT_PADS_DTS_IS_VALID (&data));
Packit f546b1
Packit f546b1
  GST_BUFFER_PTS (buf) = 0;
Packit f546b1
  GST_BUFFER_DTS (buf) = 0;
Packit f546b1
Packit f546b1
  gst_collect_pads_clip_running_time (NULL, &data, buf, &buf, NULL);
Packit f546b1
Packit f546b1
  fail_unless (buf == NULL);
Packit f546b1
  gst_object_unref (data.pad);
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_END_TEST;
Packit f546b1
Packit f546b1
Packit f546b1
static Suite *
Packit f546b1
gst_collect_pads_suite (void)
Packit f546b1
{
Packit f546b1
  Suite *suite;
Packit f546b1
  TCase *general, *buffers, *pipeline, *flush;
Packit f546b1
Packit f546b1
  gst_agregator_plugin_register ();
Packit f546b1
Packit f546b1
  suite = suite_create ("GstCollectPads");
Packit f546b1
Packit f546b1
  general = tcase_create ("general");
Packit f546b1
  suite_add_tcase (suite, general);
Packit f546b1
  tcase_add_checked_fixture (general, setup, teardown);
Packit f546b1
  tcase_add_test (general, test_pad_add_remove);
Packit f546b1
Packit f546b1
  tcase_add_test (general, test_collect);
Packit f546b1
  tcase_add_test (general, test_collect_eos);
Packit f546b1
  tcase_add_test (general, test_collect_twice);
Packit f546b1
  tcase_add_test (general, test_clip_running_time);
Packit f546b1
Packit f546b1
  buffers = tcase_create ("buffers");
Packit f546b1
  suite_add_tcase (suite, buffers);
Packit f546b1
  tcase_add_checked_fixture (buffers, setup_buffer_cb, teardown);
Packit f546b1
  tcase_add_test (buffers, test_collect_default);
Packit f546b1
Packit f546b1
  pipeline = tcase_create ("pipeline");
Packit f546b1
  suite_add_tcase (suite, pipeline);
Packit f546b1
  tcase_add_test (pipeline, test_linear_pipeline);
Packit f546b1
  tcase_add_test (pipeline, test_branched_pipeline);
Packit f546b1
Packit f546b1
  flush = tcase_create ("flush");
Packit f546b1
  suite_add_tcase (suite, flush);
Packit f546b1
  tcase_add_checked_fixture (flush, flush_setup, flush_teardown);
Packit f546b1
  tcase_add_test (flush, test_flushing_seek_failure);
Packit f546b1
  tcase_add_test (flush, test_flushing_seek);
Packit f546b1
Packit f546b1
  return suite;
Packit f546b1
}
Packit f546b1
Packit f546b1
GST_CHECK_MAIN (gst_collect_pads);