Blame tests/examples/streamiddemux/streamiddemux-stream.c

Packit Service 963350
#include <gst/gst.h>
Packit Service 963350
Packit Service 963350
#define NUM_STREAM 13
Packit Service 963350
Packit Service 963350
typedef struct _App App;
Packit Service 963350
Packit Service 963350
struct _App
Packit Service 963350
{
Packit Service 963350
  GstElement *pipeline;
Packit Service 963350
  GstElement *audiotestsrc[NUM_STREAM];
Packit Service 963350
  GstElement *audioconvert[NUM_STREAM];
Packit Service 963350
  GstElement *capsfilter[NUM_STREAM];
Packit Service 963350
  GstElement *vorbisenc[NUM_STREAM];
Packit Service 963350
  GstElement *oggmux[NUM_STREAM];
Packit Service 963350
  GstElement *funnel;
Packit Service 963350
  GstElement *demux;
Packit Service 963350
  GstElement *stream_synchronizer;
Packit Service 963350
  GstElement *queue[NUM_STREAM];
Packit Service 963350
  GstElement *filesink[NUM_STREAM];
Packit Service 963350
Packit Service 963350
  gboolean pad_blocked[NUM_STREAM];
Packit Service 963350
  GstPad *queue_srcpad[NUM_STREAM];
Packit Service 963350
  gulong blocked_id[NUM_STREAM];
Packit Service 963350
};
Packit Service 963350
Packit Service 963350
App s_app;
Packit Service 963350
Packit Service 963350
gint pad_added_cnt = 0;
Packit Service 963350
Packit Service 963350
static gboolean
Packit Service 963350
bus_call (GstBus * bus, GstMessage * msg, gpointer data)
Packit Service 963350
{
Packit Service 963350
  GMainLoop *loop = (GMainLoop *) data;
Packit Service 963350
Packit Service 963350
  switch (GST_MESSAGE_TYPE (msg)) {
Packit Service 963350
    case GST_MESSAGE_EOS:{
Packit Service 963350
      g_main_loop_quit (loop);
Packit Service 963350
      break;
Packit Service 963350
    }
Packit Service 963350
    case GST_MESSAGE_ERROR:{
Packit Service 963350
      g_main_loop_quit (loop);
Packit Service 963350
      break;
Packit Service 963350
    }
Packit Service 963350
    default:
Packit Service 963350
      break;
Packit Service 963350
  }
Packit Service 963350
  return TRUE;
Packit Service 963350
}
Packit Service 963350
Packit Service 963350
static void
Packit Service 963350
set_blocked (App * app, gboolean blocked)
Packit Service 963350
{
Packit Service 963350
  gint i = 0;
Packit Service 963350
Packit Service 963350
  for (i = 0; i < NUM_STREAM; i++) {
Packit Service 963350
    gst_pad_remove_probe (app->queue_srcpad[i], app->blocked_id[i]);
Packit Service 963350
  }
Packit Service 963350
}
Packit Service 963350
Packit Service 963350
static void
Packit Service 963350
sink_do_reconfigure (App * app)
Packit Service 963350
{
Packit Service 963350
  gint i = 0;
Packit Service 963350
  GstPad *filesink_sinkpad[NUM_STREAM];
Packit Service 963350
  GstPad *sync_sinkpad[NUM_STREAM];
Packit Service 963350
  GstPad *sync_srcpad[NUM_STREAM];
Packit Service 963350
  GstIterator *it;
Packit Service 963350
  GValue item = G_VALUE_INIT;
Packit Service 963350
Packit Service 963350
  for (i = 0; i < NUM_STREAM; i++) {
Packit Service 963350
    sync_sinkpad[i] =
Packit Service 963350
        gst_element_get_request_pad (app->stream_synchronizer, "sink_%u");
Packit Service 963350
    it = gst_pad_iterate_internal_links (sync_sinkpad[i]);
Packit Service 963350
    g_assert (it);
Packit Service 963350
    gst_iterator_next (it, &item);
Packit Service 963350
    sync_srcpad[i] = g_value_dup_object (&item);
Packit Service 963350
    g_value_unset (&item);
Packit Service 963350
Packit Service 963350
    filesink_sinkpad[i] = gst_element_get_static_pad (app->filesink[i], "sink");
Packit Service 963350
Packit Service 963350
    gst_pad_link_full (app->queue_srcpad[i], sync_sinkpad[i],
Packit Service 963350
        GST_PAD_LINK_CHECK_NOTHING);
Packit Service 963350
    gst_pad_link_full (sync_srcpad[i], filesink_sinkpad[i],
Packit Service 963350
        GST_PAD_LINK_CHECK_NOTHING);
Packit Service 963350
  }
Packit Service 963350
  gst_iterator_free (it);
Packit Service 963350
Packit Service 963350
}
Packit Service 963350
Packit Service 963350
static GstPadProbeReturn
Packit Service 963350
blocked_cb (GstPad * blockedpad, GstPadProbeInfo * info, gpointer user_data)
Packit Service 963350
{
Packit Service 963350
  App *app = user_data;
Packit Service 963350
  gint i = 0;
Packit Service 963350
  gboolean all_pads_blocked = TRUE;
Packit Service 963350
Packit Service 963350
  for (i = 0; i < NUM_STREAM; i++) {
Packit Service 963350
    if (blockedpad == app->queue_srcpad[i])
Packit Service 963350
      app->pad_blocked[i] = TRUE;
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  for (i = 0; i < NUM_STREAM; i++) {
Packit Service 963350
    if (app->queue_srcpad[i] == FALSE) {
Packit Service 963350
      all_pads_blocked = FALSE;
Packit Service 963350
      break;
Packit Service 963350
    }
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  if (all_pads_blocked == TRUE) {
Packit Service 963350
    sink_do_reconfigure (app);
Packit Service 963350
    set_blocked (app, FALSE);
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  return GST_PAD_PROBE_OK;
Packit Service 963350
}
Packit Service 963350
Packit Service 963350
static void
Packit Service 963350
src_pad_added_cb (GstElement * demux, GstPad * pad, App * app)
Packit Service 963350
{
Packit Service 963350
  GstPad *queue_sinkpad[NUM_STREAM];
Packit Service 963350
Packit Service 963350
  queue_sinkpad[pad_added_cnt] =
Packit Service 963350
      gst_element_get_static_pad (app->queue[pad_added_cnt], "sink");
Packit Service 963350
  gst_pad_link_full (pad, queue_sinkpad[pad_added_cnt],
Packit Service 963350
      GST_PAD_LINK_CHECK_NOTHING);
Packit Service 963350
Packit Service 963350
  app->queue_srcpad[pad_added_cnt] =
Packit Service 963350
      gst_element_get_static_pad (app->queue[pad_added_cnt], "src");
Packit Service 963350
  app->blocked_id[pad_added_cnt] =
Packit Service 963350
      gst_pad_add_probe (app->queue_srcpad[pad_added_cnt],
Packit Service 963350
      GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, blocked_cb, app, NULL);
Packit Service 963350
Packit Service 963350
  pad_added_cnt++;
Packit Service 963350
}
Packit Service 963350
Packit Service 963350
gint
Packit Service 963350
main (gint argc, gchar * argv[])
Packit Service 963350
{
Packit Service 963350
  App *app = &s_app;
Packit Service 963350
Packit Service 963350
  GMainLoop *loop = NULL;
Packit Service 963350
  GstBus *bus;
Packit Service 963350
  guint bus_watch_id;
Packit Service 963350
Packit Service 963350
  GstPad *funnel_sinkpad[NUM_STREAM];
Packit Service 963350
  GstPad *funnel_srcpad;
Packit Service 963350
  GstPad *demux_sinkpad;
Packit Service 963350
  GstPad *oggmux_srcpad[NUM_STREAM];
Packit Service 963350
Packit Service 963350
  guint stream_cnt = 0;
Packit Service 963350
  GstCaps *caps;
Packit Service 963350
Packit Service 963350
  gst_init (&argc, &argv);
Packit Service 963350
Packit Service 963350
  app->pipeline = gst_pipeline_new ("pipeline");
Packit Service 963350
Packit Service 963350
  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
Packit Service 963350
    app->audiotestsrc[stream_cnt] =
Packit Service 963350
        gst_element_factory_make ("audiotestsrc", NULL);
Packit Service 963350
    app->audioconvert[stream_cnt] =
Packit Service 963350
        gst_element_factory_make ("audioconvert", NULL);
Packit Service 963350
    app->capsfilter[stream_cnt] = gst_element_factory_make ("capsfilter", NULL);
Packit Service 963350
    app->vorbisenc[stream_cnt] = gst_element_factory_make ("vorbisenc", NULL);
Packit Service 963350
    app->oggmux[stream_cnt] = gst_element_factory_make ("oggmux", NULL);
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  app->funnel = gst_element_factory_make ("funnel", NULL);
Packit Service 963350
  app->demux = gst_element_factory_make ("streamiddemux", NULL);
Packit Service 963350
  app->stream_synchronizer =
Packit Service 963350
      gst_element_factory_make ("streamsynchronizer", NULL);
Packit Service 963350
Packit Service 963350
  caps = gst_caps_from_string ("audio/x-raw,channels=1;");
Packit Service 963350
Packit Service 963350
  stream_cnt = 0;
Packit Service 963350
Packit Service 963350
  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
Packit Service 963350
    app->queue[stream_cnt] = gst_element_factory_make ("queue", NULL);
Packit Service 963350
    app->filesink[stream_cnt] = gst_element_factory_make ("filesink", NULL);
Packit Service 963350
Packit Service 963350
    g_object_set (app->audiotestsrc[stream_cnt], "wave", stream_cnt,
Packit Service 963350
        "num-buffers", 2000, NULL);
Packit Service 963350
    g_object_set (app->capsfilter[stream_cnt], "caps", caps, NULL);
Packit Service 963350
    g_object_set (app->filesink[stream_cnt], "location",
Packit Service 963350
        g_strdup_printf ("filesink_%d.ogg", stream_cnt), NULL);
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  stream_cnt = 0;
Packit Service 963350
Packit Service 963350
  g_signal_connect (app->demux, "pad-added", G_CALLBACK (src_pad_added_cb),
Packit Service 963350
      app);
Packit Service 963350
Packit Service 963350
  loop = g_main_loop_new (NULL, FALSE);
Packit Service 963350
Packit Service 963350
  bus = gst_element_get_bus (app->pipeline);
Packit Service 963350
  bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
Packit Service 963350
  g_object_unref (bus);
Packit Service 963350
Packit Service 963350
  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
Packit Service 963350
    gst_bin_add_many (GST_BIN (app->pipeline), app->audiotestsrc[stream_cnt],
Packit Service 963350
        app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
Packit Service 963350
        app->vorbisenc[stream_cnt], app->oggmux[stream_cnt],
Packit Service 963350
        app->queue[stream_cnt], app->filesink[stream_cnt], NULL);
Packit Service 963350
    if (stream_cnt == 0) {
Packit Service 963350
      gst_bin_add_many (GST_BIN (app->pipeline), app->funnel, app->demux,
Packit Service 963350
          app->stream_synchronizer, NULL);
Packit Service 963350
    }
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  stream_cnt = 0;
Packit Service 963350
Packit Service 963350
  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
Packit Service 963350
    gst_element_link_many (app->audiotestsrc[stream_cnt],
Packit Service 963350
        app->audioconvert[stream_cnt], app->capsfilter[stream_cnt],
Packit Service 963350
        app->vorbisenc[stream_cnt], app->oggmux[stream_cnt], NULL);
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  stream_cnt = 0;
Packit Service 963350
Packit Service 963350
  for (stream_cnt = 0; stream_cnt < NUM_STREAM; stream_cnt++) {
Packit Service 963350
    funnel_sinkpad[stream_cnt] =
Packit Service 963350
        gst_element_get_request_pad (app->funnel, "sink_%u");
Packit Service 963350
    oggmux_srcpad[stream_cnt] =
Packit Service 963350
        gst_element_get_static_pad (app->oggmux[stream_cnt], "src");
Packit Service 963350
    gst_pad_link (oggmux_srcpad[stream_cnt], funnel_sinkpad[stream_cnt]);
Packit Service 963350
  }
Packit Service 963350
Packit Service 963350
  funnel_srcpad = gst_element_get_static_pad (app->funnel, "src");
Packit Service 963350
Packit Service 963350
  demux_sinkpad = gst_element_get_static_pad (app->demux, "sink");
Packit Service 963350
  gst_pad_link (funnel_srcpad, demux_sinkpad);
Packit Service 963350
Packit Service 963350
  gst_element_set_state (app->pipeline, GST_STATE_PLAYING);
Packit Service 963350
  g_main_loop_run (loop);
Packit Service 963350
Packit Service 963350
  gst_element_set_state (app->pipeline, GST_STATE_NULL);
Packit Service 963350
  g_object_unref (app->pipeline);
Packit Service 963350
  g_source_remove (bus_watch_id);
Packit Service 963350
  g_main_loop_unref (loop);
Packit Service 963350
Packit Service 963350
  return 0;
Packit Service 963350
}