Blob Blame History Raw
/* GStreamer Stream Splitter
 * Copyright (C) 2010 Edward Hervey <edward.hervey@collabora.co.uk>
 *           (C) 2009 Nokia Corporation
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "gststreamsplitter.h"

static GstStaticPadTemplate src_template =
GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (gst_stream_splitter_debug);
#define GST_CAT_DEFAULT gst_stream_splitter_debug

G_DEFINE_TYPE (GstStreamSplitter, gst_stream_splitter, GST_TYPE_ELEMENT);

#define STREAMS_LOCK(obj) (g_mutex_lock(&obj->lock))
#define STREAMS_UNLOCK(obj) (g_mutex_unlock(&obj->lock))

static void gst_stream_splitter_dispose (GObject * object);
static void gst_stream_splitter_finalize (GObject * object);

static gboolean gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps);

static GstPad *gst_stream_splitter_request_new_pad (GstElement * element,
    GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
static void gst_stream_splitter_release_pad (GstElement * element,
    GstPad * pad);

static void
gst_stream_splitter_class_init (GstStreamSplitterClass * klass)
{
  GObjectClass *gobject_klass;
  GstElementClass *gstelement_klass;

  gobject_klass = (GObjectClass *) klass;
  gstelement_klass = (GstElementClass *) klass;

  gobject_klass->dispose = gst_stream_splitter_dispose;
  gobject_klass->finalize = gst_stream_splitter_finalize;

  GST_DEBUG_CATEGORY_INIT (gst_stream_splitter_debug, "streamsplitter", 0,
      "Stream Splitter");

  gst_element_class_add_static_pad_template (gstelement_klass, &src_template);
  gst_element_class_add_static_pad_template (gstelement_klass, &sink_template);

  gstelement_klass->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_stream_splitter_request_new_pad);
  gstelement_klass->release_pad =
      GST_DEBUG_FUNCPTR (gst_stream_splitter_release_pad);

  gst_element_class_set_static_metadata (gstelement_klass,
      "streamsplitter", "Generic",
      "Splits streams based on their media type",
      "Edward Hervey <edward.hervey@collabora.co.uk>");
}

static void
gst_stream_splitter_dispose (GObject * object)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;

  g_list_foreach (stream_splitter->pending_events, (GFunc) gst_event_unref,
      NULL);
  g_list_free (stream_splitter->pending_events);
  stream_splitter->pending_events = NULL;

  G_OBJECT_CLASS (gst_stream_splitter_parent_class)->dispose (object);
}

static void
gst_stream_splitter_finalize (GObject * object)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) object;

  g_mutex_clear (&stream_splitter->lock);

  G_OBJECT_CLASS (gst_stream_splitter_parent_class)->finalize (object);
}

static void
gst_stream_splitter_push_pending_events (GstStreamSplitter * splitter,
    GstPad * srcpad)
{
  GList *tmp;
  GST_DEBUG_OBJECT (srcpad, "Pushing out pending events");

  for (tmp = splitter->pending_events; tmp; tmp = tmp->next) {
    GstEvent *event = (GstEvent *) tmp->data;
    gst_pad_push_event (srcpad, event);
  }
  g_list_free (splitter->pending_events);
  splitter->pending_events = NULL;
}

static GstFlowReturn
gst_stream_splitter_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
  GstFlowReturn res;
  GstPad *srcpad = NULL;

  STREAMS_LOCK (stream_splitter);
  if (stream_splitter->current)
    srcpad = gst_object_ref (stream_splitter->current);
  STREAMS_UNLOCK (stream_splitter);

  if (G_UNLIKELY (srcpad == NULL))
    goto nopad;

  if (G_UNLIKELY (stream_splitter->pending_events))
    gst_stream_splitter_push_pending_events (stream_splitter, srcpad);

  /* Forward to currently activated stream */
  res = gst_pad_push (srcpad, buf);
  gst_object_unref (srcpad);

  return res;

nopad:
  GST_WARNING_OBJECT (stream_splitter, "No output pad was configured");
  return GST_FLOW_ERROR;
}

static GList *
_flush_events (GstPad * pad, GList * events)
{
  GList *tmp;

  for (tmp = events; tmp; tmp = tmp->next) {
    if (GST_EVENT_TYPE (tmp->data) != GST_EVENT_EOS &&
        GST_EVENT_TYPE (tmp->data) != GST_EVENT_SEGMENT &&
        GST_EVENT_IS_STICKY (tmp->data) && pad != NULL) {
      gst_pad_store_sticky_event (pad, GST_EVENT_CAST (tmp->data));
    }
    gst_event_unref (tmp->data);
  }
  g_list_free (events);

  return NULL;
}

static gboolean
gst_stream_splitter_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) parent;
  gboolean res = TRUE;
  gboolean toall = FALSE;
  gboolean store = FALSE;
  /* FLUSH_START/STOP : forward to all
   * INBAND events : store to send in chain function to selected chain
   * OUT_OF_BAND events : send to all
   */

  GST_DEBUG_OBJECT (stream_splitter, "Got event %s",
      GST_EVENT_TYPE_NAME (event));

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_CAPS:
    {
      GstCaps *caps;

      gst_event_parse_caps (event, &caps);
      res = gst_stream_splitter_sink_setcaps (pad, caps);

      store = TRUE;
      break;
    }
    case GST_EVENT_FLUSH_STOP:
      toall = TRUE;
      STREAMS_LOCK (stream_splitter);
      stream_splitter->pending_events = _flush_events (stream_splitter->current,
          stream_splitter->pending_events);
      STREAMS_UNLOCK (stream_splitter);
      break;
    case GST_EVENT_FLUSH_START:
      toall = TRUE;
      break;
    case GST_EVENT_EOS:

      if (G_UNLIKELY (stream_splitter->pending_events)) {
        GstPad *srcpad = NULL;

        STREAMS_LOCK (stream_splitter);
        if (stream_splitter->current)
          srcpad = gst_object_ref (stream_splitter->current);
        STREAMS_UNLOCK (stream_splitter);

        if (srcpad) {
          gst_stream_splitter_push_pending_events (stream_splitter, srcpad);
          gst_object_unref (srcpad);
        }
      }

      toall = TRUE;
      break;
    default:
      if (GST_EVENT_TYPE (event) & GST_EVENT_TYPE_SERIALIZED)
        store = TRUE;
  }

  if (store) {
    stream_splitter->pending_events =
        g_list_append (stream_splitter->pending_events, event);
  } else if (toall) {
    GList *tmp;
    guint32 cookie;

    /* Send to all pads */
    STREAMS_LOCK (stream_splitter);
  resync:
    if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
      STREAMS_UNLOCK (stream_splitter);
      /* No source pads */
      gst_event_unref (event);
      res = FALSE;
      goto beach;
    }
    tmp = stream_splitter->srcpads;
    cookie = stream_splitter->cookie;
    while (tmp) {
      GstPad *srcpad = (GstPad *) tmp->data;
      STREAMS_UNLOCK (stream_splitter);
      gst_event_ref (event);
      res = gst_pad_push_event (srcpad, event);
      STREAMS_LOCK (stream_splitter);
      if (G_UNLIKELY (cookie != stream_splitter->cookie))
        goto resync;
      tmp = tmp->next;
    }
    STREAMS_UNLOCK (stream_splitter);
    gst_event_unref (event);
  } else {
    GstPad *pad;

    /* Only send to current pad */

    STREAMS_LOCK (stream_splitter);
    pad = stream_splitter->current;
    STREAMS_UNLOCK (stream_splitter);
    if (pad)
      res = gst_pad_push_event (pad, event);
    else {
      gst_event_unref (event);
      res = FALSE;
    }
  }

beach:
  return res;
}

static GstCaps *
gst_stream_splitter_sink_getcaps (GstPad * pad, GstCaps * filter)
{
  GstStreamSplitter *stream_splitter =
      (GstStreamSplitter *) GST_PAD_PARENT (pad);
  guint32 cookie;
  GList *tmp;
  GstCaps *res = NULL;

  /* Return the combination of all downstream caps */

  STREAMS_LOCK (stream_splitter);

resync:
  if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
    res = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
    goto beach;
  }

  res = NULL;
  cookie = stream_splitter->cookie;
  tmp = stream_splitter->srcpads;

  while (tmp) {
    GstPad *srcpad = (GstPad *) tmp->data;

    /* Ensure srcpad doesn't get destroyed while we query peer */
    gst_object_ref (srcpad);
    STREAMS_UNLOCK (stream_splitter);
    if (res) {
      GstCaps *peercaps = gst_pad_peer_query_caps (srcpad, filter);
      if (peercaps)
        res = gst_caps_merge (res, peercaps);
    } else {
      res = gst_pad_peer_query_caps (srcpad, filter);
    }
    STREAMS_LOCK (stream_splitter);
    gst_object_unref (srcpad);

    if (G_UNLIKELY (cookie != stream_splitter->cookie)) {
      if (res)
        gst_caps_unref (res);
      goto resync;
    }
    tmp = tmp->next;
  }

beach:
  STREAMS_UNLOCK (stream_splitter);
  return res;
}

static gboolean
gst_stream_splitter_sink_acceptcaps (GstPad * pad, GstCaps * caps)
{
  GstStreamSplitter *stream_splitter =
      (GstStreamSplitter *) GST_PAD_PARENT (pad);
  guint32 cookie;
  GList *tmp;
  gboolean res = FALSE;

  /* check if one of the downstream elements accepts the caps */
  STREAMS_LOCK (stream_splitter);

resync:
  res = FALSE;

  if (G_UNLIKELY (stream_splitter->srcpads == NULL))
    goto beach;

  cookie = stream_splitter->cookie;
  tmp = stream_splitter->srcpads;

  while (tmp) {
    GstPad *srcpad = (GstPad *) tmp->data;

    /* Ensure srcpad doesn't get destroyed while we query peer */
    gst_object_ref (srcpad);
    STREAMS_UNLOCK (stream_splitter);

    res = gst_pad_peer_query_accept_caps (srcpad, caps);

    STREAMS_LOCK (stream_splitter);
    gst_object_unref (srcpad);

    if (G_UNLIKELY (cookie != stream_splitter->cookie))
      goto resync;

    if (res)
      break;

    tmp = tmp->next;
  }

beach:
  STREAMS_UNLOCK (stream_splitter);
  return res;
}

static gboolean
gst_stream_splitter_sink_query (GstPad * pad, GstObject * parent,
    GstQuery * query)
{
  gboolean res;

  switch (GST_QUERY_TYPE (query)) {
    case GST_QUERY_CAPS:
    {
      GstCaps *filter, *caps;

      gst_query_parse_caps (query, &filter);
      caps = gst_stream_splitter_sink_getcaps (pad, filter);
      gst_query_set_caps_result (query, caps);
      gst_caps_unref (caps);
      res = TRUE;
      break;
    }
    case GST_QUERY_ACCEPT_CAPS:
    {
      GstCaps *caps;
      gboolean result;

      gst_query_parse_accept_caps (query, &caps);
      result = gst_stream_splitter_sink_acceptcaps (pad, caps);
      gst_query_set_accept_caps_result (query, result);
      res = TRUE;
      break;
    }
    default:
      res = gst_pad_query_default (pad, parent, query);
      break;
  }
  return res;
}

static gboolean
gst_stream_splitter_sink_setcaps (GstPad * pad, GstCaps * caps)
{
  GstStreamSplitter *stream_splitter =
      (GstStreamSplitter *) GST_PAD_PARENT (pad);
  guint32 cookie;
  GList *tmp;
  gboolean res;

  GST_DEBUG_OBJECT (stream_splitter, "caps %" GST_PTR_FORMAT, caps);

  /* Try on all pads, choose the one that succeeds as the current stream */
  STREAMS_LOCK (stream_splitter);

resync:
  if (G_UNLIKELY (stream_splitter->srcpads == NULL)) {
    res = FALSE;
    goto beach;
  }

  res = FALSE;
  tmp = stream_splitter->srcpads;
  cookie = stream_splitter->cookie;

  while (tmp) {
    GstPad *srcpad = (GstPad *) tmp->data;
    GstCaps *peercaps;

    STREAMS_UNLOCK (stream_splitter);
    peercaps = gst_pad_peer_query_caps (srcpad, NULL);
    if (peercaps) {
      res = gst_caps_can_intersect (caps, peercaps);
      gst_caps_unref (peercaps);
    }
    STREAMS_LOCK (stream_splitter);

    if (G_UNLIKELY (cookie != stream_splitter->cookie))
      goto resync;

    if (res) {
      /* FIXME : we need to switch properly */
      GST_DEBUG_OBJECT (srcpad, "Setting caps on this pad was successful");
      stream_splitter->current = srcpad;
      goto beach;
    }
    tmp = tmp->next;
  }

beach:
  STREAMS_UNLOCK (stream_splitter);
  return res;
}

static void
gst_stream_splitter_init (GstStreamSplitter * stream_splitter)
{
  stream_splitter->sinkpad =
      gst_pad_new_from_static_template (&sink_template, "sink");
  gst_pad_set_chain_function (stream_splitter->sinkpad,
      gst_stream_splitter_chain);
  gst_pad_set_event_function (stream_splitter->sinkpad,
      gst_stream_splitter_sink_event);
  gst_pad_set_query_function (stream_splitter->sinkpad,
      gst_stream_splitter_sink_query);
  gst_element_add_pad (GST_ELEMENT (stream_splitter), stream_splitter->sinkpad);

  g_mutex_init (&stream_splitter->lock);
}

static GstPad *
gst_stream_splitter_request_new_pad (GstElement * element,
    GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
  GstPad *srcpad;

  srcpad = gst_pad_new_from_static_template (&src_template, name);

  STREAMS_LOCK (stream_splitter);
  stream_splitter->srcpads = g_list_append (stream_splitter->srcpads, srcpad);
  gst_pad_set_active (srcpad, TRUE);
  gst_element_add_pad (element, srcpad);
  stream_splitter->cookie++;
  STREAMS_UNLOCK (stream_splitter);

  return srcpad;
}

static void
gst_stream_splitter_release_pad (GstElement * element, GstPad * pad)
{
  GstStreamSplitter *stream_splitter = (GstStreamSplitter *) element;
  GList *tmp;

  STREAMS_LOCK (stream_splitter);
  tmp = g_list_find (stream_splitter->srcpads, pad);
  if (tmp) {
    GstPad *pad = (GstPad *) tmp->data;

    stream_splitter->srcpads =
        g_list_delete_link (stream_splitter->srcpads, tmp);
    stream_splitter->cookie++;

    if (pad == stream_splitter->current) {
      /* Deactivate current flow */
      GST_DEBUG_OBJECT (element, "Removed pad was the current one");
      stream_splitter->current = NULL;
    }

    gst_element_remove_pad (element, pad);
  }
  STREAMS_UNLOCK (stream_splitter);

  return;
}