Blame gst/playback/gststreamsynchronizer.c

Packit 971217
/* GStreamer
Packit 971217
 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
Packit 971217
 *
Packit 971217
 * This library is free software; you can redistribute it and/or
Packit 971217
 * modify it under the terms of the GNU Library General Public
Packit 971217
 * License as published by the Free Software Foundation; either
Packit 971217
 * version 2 of the License, or (at your option) any later version.
Packit 971217
 *
Packit 971217
 * This library is distributed in the hope that it will be useful,
Packit 971217
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 971217
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 971217
 * Library General Public License for more details.
Packit 971217
 *
Packit 971217
 * You should have received a copy of the GNU Library General Public
Packit 971217
 * License along with this library; if not, write to the
Packit 971217
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
Packit 971217
 * Boston, MA 02110-1301, USA.
Packit 971217
 */
Packit 971217
Packit 971217
#ifdef HAVE_CONFIG_H
Packit 971217
#include "config.h"
Packit 971217
#endif
Packit 971217
Packit 971217
#include "gststreamsynchronizer.h"
Packit 971217
Packit 971217
GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
Packit 971217
#define GST_CAT_DEFAULT stream_synchronizer_debug
Packit 971217
Packit 971217
#define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                \
Packit 971217
    GST_TRACE_OBJECT (obj,                                              \
Packit 971217
                    "locking from thread %p",                           \
Packit 971217
                    g_thread_self ());                                  \
Packit 971217
    g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);            \
Packit 971217
    GST_TRACE_OBJECT (obj,                                              \
Packit 971217
                    "locked from thread %p",                            \
Packit 971217
                    g_thread_self ());                                  \
Packit 971217
} G_STMT_END
Packit 971217
Packit 971217
#define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {              \
Packit 971217
    GST_TRACE_OBJECT (obj,                                              \
Packit 971217
                    "unlocking from thread %p",                         \
Packit 971217
                    g_thread_self ());                                  \
Packit 971217
    g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
Packit 971217
} G_STMT_END
Packit 971217
Packit 971217
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
Packit 971217
    GST_PAD_SRC,
Packit 971217
    GST_PAD_SOMETIMES,
Packit 971217
    GST_STATIC_CAPS_ANY);
Packit 971217
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
Packit 971217
    GST_PAD_SINK,
Packit 971217
    GST_PAD_REQUEST,
Packit 971217
    GST_STATIC_CAPS_ANY);
Packit 971217
Packit 971217
#define gst_stream_synchronizer_parent_class parent_class
Packit 971217
G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
Packit 971217
    GST_TYPE_ELEMENT);
Packit 971217
Packit 971217
typedef struct
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *transform;
Packit 971217
  guint stream_number;
Packit 971217
  GstPad *srcpad;
Packit 971217
  GstPad *sinkpad;
Packit 971217
  GstSegment segment;
Packit 971217
Packit 971217
  gboolean wait;                /* TRUE if waiting/blocking */
Packit 971217
  gboolean is_eos;              /* TRUE if EOS was received */
Packit 971217
  gboolean eos_sent;            /* when EOS was sent downstream */
Packit 971217
  gboolean flushing;            /* set after flush-start and before flush-stop */
Packit 971217
  gboolean seen_data;
Packit 971217
  gboolean send_gap_event;
Packit 971217
  GstClockTime gap_duration;
Packit 971217
Packit 971217
  GstStreamFlags flags;
Packit 971217
Packit 971217
  GCond stream_finish_cond;
Packit 971217
Packit 971217
  /* seqnum of the previously received STREAM_START
Packit 971217
   * default: G_MAXUINT32 */
Packit 971217
  guint32 stream_start_seqnum;
Packit 971217
  guint32 segment_seqnum;
Packit 971217
  guint group_id;
Packit 971217
} GstSyncStream;
Packit 971217
Packit 971217
/* Must be called with lock! */
Packit 971217
static inline GstPad *
Packit 971217
gst_stream_get_other_pad (GstSyncStream * stream, GstPad * pad)
Packit 971217
{
Packit 971217
  if (stream->sinkpad == pad)
Packit 971217
    return gst_object_ref (stream->srcpad);
Packit 971217
  if (stream->srcpad == pad)
Packit 971217
    return gst_object_ref (stream->sinkpad);
Packit 971217
Packit 971217
  return NULL;
Packit 971217
}
Packit 971217
Packit 971217
static GstPad *
Packit 971217
gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
Packit 971217
{
Packit 971217
  GstSyncStream *stream;
Packit 971217
  GstPad *opad = NULL;
Packit 971217
Packit 971217
  GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
  stream = gst_pad_get_element_private (pad);
Packit 971217
  if (!stream)
Packit 971217
    goto out;
Packit 971217
Packit 971217
  opad = gst_stream_get_other_pad (stream, pad);
Packit 971217
Packit 971217
out:
Packit 971217
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
Packit 971217
  if (!opad)
Packit 971217
    GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
Packit 971217
Packit 971217
  return opad;
Packit 971217
}
Packit 971217
Packit 971217
/* Generic pad functions */
Packit 971217
static GstIterator *
Packit 971217
gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
Packit 971217
    GstObject * parent)
Packit 971217
{
Packit 971217
  GstIterator *it = NULL;
Packit 971217
  GstPad *opad;
Packit 971217
Packit 971217
  opad =
Packit 971217
      gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
Packit 971217
  if (opad) {
Packit 971217
    GValue value = { 0, };
Packit 971217
Packit 971217
    g_value_init (&value, GST_TYPE_PAD);
Packit 971217
    g_value_set_object (&value, opad);
Packit 971217
    it = gst_iterator_new_single (GST_TYPE_PAD, &value);
Packit 971217
    g_value_unset (&value);
Packit 971217
    gst_object_unref (opad);
Packit 971217
  }
Packit 971217
Packit 971217
  return it;
Packit 971217
}
Packit 971217
Packit 971217
/* srcpad functions */
Packit 971217
static gboolean
Packit 971217
gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
Packit 971217
    GstEvent * event)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
Packit 971217
  gboolean ret = FALSE;
Packit 971217
Packit 971217
  GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
Packit 971217
      GST_EVENT_TYPE_NAME (event), event);
Packit 971217
Packit 971217
  switch (GST_EVENT_TYPE (event)) {
Packit 971217
    case GST_EVENT_QOS:{
Packit 971217
      gdouble proportion;
Packit 971217
      GstClockTimeDiff diff;
Packit 971217
      GstClockTime timestamp;
Packit 971217
      gint64 running_time_diff = -1;
Packit 971217
      GstSyncStream *stream;
Packit 971217
Packit 971217
      gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
Packit 971217
      gst_event_unref (event);
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
      if (stream)
Packit 971217
        running_time_diff = stream->segment.base;
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
Packit 971217
      if (running_time_diff == -1) {
Packit 971217
        GST_WARNING_OBJECT (pad, "QOS event before group start");
Packit 971217
        goto out;
Packit 971217
      }
Packit 971217
      if (timestamp < running_time_diff) {
Packit 971217
        GST_DEBUG_OBJECT (pad, "QOS event from previous group");
Packit 971217
        goto out;
Packit 971217
      }
Packit 971217
Packit 971217
      GST_LOG_OBJECT (pad,
Packit 971217
          "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
Packit 971217
          GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
Packit 971217
          GST_TIME_ARGS (running_time_diff),
Packit 971217
          GST_TIME_ARGS (timestamp - running_time_diff));
Packit 971217
Packit 971217
      timestamp -= running_time_diff;
Packit 971217
Packit 971217
      /* That case is invalid for QoS events */
Packit 971217
      if (diff < 0 && -diff > timestamp) {
Packit 971217
        GST_DEBUG_OBJECT (pad, "QOS event from previous group");
Packit 971217
        ret = TRUE;
Packit 971217
        goto out;
Packit 971217
      }
Packit 971217
Packit 971217
      event =
Packit 971217
          gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
Packit 971217
          timestamp);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    default:
Packit 971217
      break;
Packit 971217
  }
Packit 971217
Packit 971217
  ret = gst_pad_event_default (pad, parent, event);
Packit 971217
Packit 971217
out:
Packit 971217
  return ret;
Packit 971217
}
Packit 971217
Packit 971217
/* must be called with the STREAM_SYNCHRONIZER_LOCK */
Packit 971217
static gboolean
Packit 971217
gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
Packit 971217
{
Packit 971217
  gboolean ret = FALSE;
Packit 971217
  GstSyncStream *stream;
Packit 971217
Packit 971217
  while (!self->eos && !self->flushing) {
Packit 971217
    stream = gst_pad_get_element_private (pad);
Packit 971217
    if (!stream) {
Packit 971217
      GST_WARNING_OBJECT (pad, "unknown stream");
Packit 971217
      return ret;
Packit 971217
    }
Packit 971217
    if (stream->flushing) {
Packit 971217
      GST_DEBUG_OBJECT (pad, "Flushing");
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    if (!stream->wait) {
Packit 971217
      GST_DEBUG_OBJECT (pad, "Stream not waiting anymore");
Packit 971217
      break;
Packit 971217
    }
Packit 971217
Packit 971217
    if (stream->send_gap_event) {
Packit 971217
      GstEvent *event;
Packit 971217
Packit 971217
      if (!GST_CLOCK_TIME_IS_VALID (stream->segment.position)) {
Packit 971217
        GST_WARNING_OBJECT (pad, "Have no position and can't send GAP event");
Packit 971217
        stream->send_gap_event = FALSE;
Packit 971217
        continue;
Packit 971217
      }
Packit 971217
Packit 971217
      event =
Packit 971217
          gst_event_new_gap (stream->segment.position, stream->gap_duration);
Packit 971217
      GST_DEBUG_OBJECT (pad,
Packit 971217
          "Send GAP event, position: %" GST_TIME_FORMAT " duration: %"
Packit 971217
          GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position),
Packit 971217
          GST_TIME_ARGS (stream->gap_duration));
Packit 971217
Packit 971217
      /* drop lock when sending GAP event, which may block in e.g. preroll */
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      ret = gst_pad_push_event (pad, event);
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      if (!ret) {
Packit 971217
        return ret;
Packit 971217
      }
Packit 971217
      stream->send_gap_event = FALSE;
Packit 971217
Packit 971217
      /* force a check on the loop conditions as we unlocked a
Packit 971217
       * few lines above and those variables could have changed */
Packit 971217
      continue;
Packit 971217
    }
Packit 971217
Packit 971217
    g_cond_wait (&stream->stream_finish_cond, &self->lock);
Packit 971217
  }
Packit 971217
Packit 971217
  return TRUE;
Packit 971217
}
Packit 971217
Packit 971217
/* sinkpad functions */
Packit 971217
static gboolean
Packit 971217
gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
Packit 971217
    GstEvent * event)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
Packit 971217
  gboolean ret = FALSE;
Packit 971217
Packit 971217
  GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
Packit 971217
      GST_EVENT_TYPE_NAME (event), event);
Packit 971217
Packit 971217
  switch (GST_EVENT_TYPE (event)) {
Packit 971217
    case GST_EVENT_STREAM_START:
Packit 971217
    {
Packit 971217
      GstSyncStream *stream, *ostream;
Packit 971217
      guint32 seqnum = gst_event_get_seqnum (event);
Packit 971217
      guint group_id;
Packit 971217
      gboolean have_group_id;
Packit 971217
      GList *l;
Packit 971217
      gboolean all_wait = TRUE;
Packit 971217
      gboolean new_stream = TRUE;
Packit 971217
Packit 971217
      have_group_id = gst_event_parse_group_id (event, &group_id);
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      self->have_group_id &= have_group_id;
Packit 971217
      have_group_id = self->have_group_id;
Packit 971217
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
Packit 971217
      if (!stream) {
Packit 971217
        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
Packit 971217
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
        break;
Packit 971217
      }
Packit 971217
Packit 971217
      gst_event_parse_stream_flags (event, &stream->flags);
Packit 971217
Packit 971217
      if ((have_group_id && stream->group_id != group_id) || (!have_group_id
Packit 971217
              && stream->stream_start_seqnum != seqnum)) {
Packit 971217
        stream->is_eos = FALSE;
Packit 971217
        stream->eos_sent = FALSE;
Packit 971217
        stream->flushing = FALSE;
Packit 971217
        stream->stream_start_seqnum = seqnum;
Packit 971217
        stream->group_id = group_id;
Packit 971217
Packit 971217
        if (!have_group_id) {
Packit 971217
          /* Check if this belongs to a stream that is already there,
Packit 971217
           * e.g. we got the visualizations for an audio stream */
Packit 971217
          for (l = self->streams; l; l = l->next) {
Packit 971217
            ostream = l->data;
Packit 971217
Packit 971217
            if (ostream != stream && ostream->stream_start_seqnum == seqnum
Packit 971217
                && !ostream->wait) {
Packit 971217
              new_stream = FALSE;
Packit 971217
              break;
Packit 971217
            }
Packit 971217
          }
Packit 971217
Packit 971217
          if (!new_stream) {
Packit 971217
            GST_DEBUG_OBJECT (pad,
Packit 971217
                "Stream %d belongs to running stream %d, no waiting",
Packit 971217
                stream->stream_number, ostream->stream_number);
Packit 971217
            stream->wait = FALSE;
Packit 971217
Packit 971217
            GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
            break;
Packit 971217
          }
Packit 971217
        } else if (group_id == self->group_id) {
Packit 971217
          GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
Packit 971217
              "no waiting", stream->stream_number, group_id);
Packit 971217
          GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
          break;
Packit 971217
        }
Packit 971217
Packit 971217
        GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
Packit 971217
Packit 971217
        stream->wait = TRUE;
Packit 971217
Packit 971217
        for (l = self->streams; l; l = l->next) {
Packit 971217
          GstSyncStream *ostream = l->data;
Packit 971217
Packit 971217
          all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
Packit 971217
              || (ostream->wait && (!have_group_id
Packit 971217
                      || ostream->group_id == group_id)));
Packit 971217
          if (!all_wait)
Packit 971217
            break;
Packit 971217
        }
Packit 971217
Packit 971217
        if (all_wait) {
Packit 971217
          gint64 position = 0;
Packit 971217
Packit 971217
          if (have_group_id)
Packit 971217
            GST_DEBUG_OBJECT (self,
Packit 971217
                "All streams have changed to group id %u -- unblocking",
Packit 971217
                group_id);
Packit 971217
          else
Packit 971217
            GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
Packit 971217
Packit 971217
          self->group_id = group_id;
Packit 971217
Packit 971217
          for (l = self->streams; l; l = l->next) {
Packit 971217
            GstSyncStream *ostream = l->data;
Packit 971217
            gint64 stop_running_time;
Packit 971217
            gint64 position_running_time;
Packit 971217
Packit 971217
            ostream->wait = FALSE;
Packit 971217
Packit 971217
            if (ostream->segment.format == GST_FORMAT_TIME) {
Packit 971217
              if (ostream->segment.rate > 0)
Packit 971217
                stop_running_time =
Packit 971217
                    gst_segment_to_running_time (&ostream->segment,
Packit 971217
                    GST_FORMAT_TIME, ostream->segment.stop);
Packit 971217
              else
Packit 971217
                stop_running_time =
Packit 971217
                    gst_segment_to_running_time (&ostream->segment,
Packit 971217
                    GST_FORMAT_TIME, ostream->segment.start);
Packit 971217
Packit 971217
              position_running_time =
Packit 971217
                  gst_segment_to_running_time (&ostream->segment,
Packit 971217
                  GST_FORMAT_TIME, ostream->segment.position);
Packit 971217
Packit 971217
              position_running_time =
Packit 971217
                  MAX (position_running_time, stop_running_time);
Packit 971217
Packit 971217
              if (ostream->segment.rate > 0)
Packit 971217
                position_running_time -=
Packit 971217
                    gst_segment_to_running_time (&ostream->segment,
Packit 971217
                    GST_FORMAT_TIME, ostream->segment.start);
Packit 971217
              else
Packit 971217
                position_running_time -=
Packit 971217
                    gst_segment_to_running_time (&ostream->segment,
Packit 971217
                    GST_FORMAT_TIME, ostream->segment.stop);
Packit 971217
Packit 971217
              position_running_time = MAX (0, position_running_time);
Packit 971217
Packit 971217
              position = MAX (position, position_running_time);
Packit 971217
            }
Packit 971217
          }
Packit 971217
Packit 971217
          self->group_start_time += position;
Packit 971217
Packit 971217
          GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
Packit 971217
              GST_TIME_ARGS (self->group_start_time));
Packit 971217
Packit 971217
          for (l = self->streams; l; l = l->next) {
Packit 971217
            GstSyncStream *ostream = l->data;
Packit 971217
            ostream->wait = FALSE;
Packit 971217
            g_cond_broadcast (&ostream->stream_finish_cond);
Packit 971217
          }
Packit 971217
        }
Packit 971217
      }
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_EVENT_SEGMENT:{
Packit 971217
      GstSyncStream *stream;
Packit 971217
      GstSegment segment;
Packit 971217
Packit 971217
      gst_event_copy_segment (event, &segment);
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
Packit 971217
      gst_stream_synchronizer_wait (self, pad);
Packit 971217
Packit 971217
      if (self->shutdown) {
Packit 971217
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
        gst_event_unref (event);
Packit 971217
        goto done;
Packit 971217
      }
Packit 971217
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
      if (stream && segment.format == GST_FORMAT_TIME) {
Packit 971217
        GST_DEBUG_OBJECT (pad,
Packit 971217
            "New stream, updating base from %" GST_TIME_FORMAT " to %"
Packit 971217
            GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
Packit 971217
            GST_TIME_ARGS (segment.base + self->group_start_time));
Packit 971217
        segment.base += self->group_start_time;
Packit 971217
Packit 971217
        GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
Packit 971217
            &stream->segment);
Packit 971217
        gst_segment_copy_into (&segment, &stream->segment);
Packit 971217
        GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
Packit 971217
            &stream->segment);
Packit 971217
        stream->segment_seqnum = gst_event_get_seqnum (event);
Packit 971217
Packit 971217
        GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
Packit 971217
            GST_TIME_ARGS (stream->segment.base));
Packit 971217
        {
Packit 971217
          GstEvent *tmpev;
Packit 971217
Packit 971217
          tmpev = gst_event_new_segment (&stream->segment);
Packit 971217
          gst_event_set_seqnum (tmpev, stream->segment_seqnum);
Packit 971217
          gst_event_unref (event);
Packit 971217
          event = tmpev;
Packit 971217
        }
Packit 971217
      } else if (stream) {
Packit 971217
        GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
Packit 971217
            gst_format_get_name (segment.format));
Packit 971217
        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
Packit 971217
      }
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_EVENT_FLUSH_START:{
Packit 971217
      GstSyncStream *stream;
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
      self->eos = FALSE;
Packit 971217
      if (stream) {
Packit 971217
        GST_DEBUG_OBJECT (pad, "Flushing streams");
Packit 971217
        stream->flushing = TRUE;
Packit 971217
        g_cond_broadcast (&stream->stream_finish_cond);
Packit 971217
      }
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_EVENT_FLUSH_STOP:{
Packit 971217
      GstSyncStream *stream;
Packit 971217
      GList *l;
Packit 971217
      GstClockTime new_group_start_time = 0;
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
      if (stream) {
Packit 971217
        GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
Packit 971217
            stream->stream_number);
Packit 971217
        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
Packit 971217
Packit 971217
        stream->is_eos = FALSE;
Packit 971217
        stream->eos_sent = FALSE;
Packit 971217
        stream->flushing = FALSE;
Packit 971217
        stream->wait = FALSE;
Packit 971217
        g_cond_broadcast (&stream->stream_finish_cond);
Packit 971217
      }
Packit 971217
Packit 971217
      for (l = self->streams; l; l = l->next) {
Packit 971217
        GstSyncStream *ostream = l->data;
Packit 971217
        GstClockTime start_running_time;
Packit 971217
Packit 971217
        if (ostream == stream || ostream->flushing)
Packit 971217
          continue;
Packit 971217
Packit 971217
        if (ostream->segment.format == GST_FORMAT_TIME) {
Packit 971217
          if (ostream->segment.rate > 0)
Packit 971217
            start_running_time =
Packit 971217
                gst_segment_to_running_time (&ostream->segment,
Packit 971217
                GST_FORMAT_TIME, ostream->segment.start);
Packit 971217
          else
Packit 971217
            start_running_time =
Packit 971217
                gst_segment_to_running_time (&ostream->segment,
Packit 971217
                GST_FORMAT_TIME, ostream->segment.stop);
Packit 971217
Packit 971217
          new_group_start_time = MAX (new_group_start_time, start_running_time);
Packit 971217
        }
Packit 971217
      }
Packit 971217
Packit 971217
      GST_DEBUG_OBJECT (pad,
Packit 971217
          "Updating group start time from %" GST_TIME_FORMAT " to %"
Packit 971217
          GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
Packit 971217
          GST_TIME_ARGS (new_group_start_time));
Packit 971217
      self->group_start_time = new_group_start_time;
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
      /* unblocking EOS wait when track switch. */
Packit 971217
    case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:{
Packit 971217
      if (gst_event_has_name (event, "playsink-custom-video-flush")
Packit 971217
          || gst_event_has_name (event, "playsink-custom-audio-flush")
Packit 971217
          || gst_event_has_name (event, "playsink-custom-subtitle-flush")) {
Packit 971217
        GstSyncStream *stream;
Packit 971217
Packit 971217
        GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
        stream = gst_pad_get_element_private (pad);
Packit 971217
        if (stream) {
Packit 971217
          stream->is_eos = FALSE;
Packit 971217
          stream->eos_sent = FALSE;
Packit 971217
          stream->wait = FALSE;
Packit 971217
          g_cond_broadcast (&stream->stream_finish_cond);
Packit 971217
        }
Packit 971217
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      }
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_EVENT_EOS:{
Packit 971217
      GstSyncStream *stream;
Packit 971217
      GList *l;
Packit 971217
      gboolean all_eos = TRUE;
Packit 971217
      gboolean seen_data;
Packit 971217
      GSList *pads = NULL;
Packit 971217
      GstPad *srcpad;
Packit 971217
      GstClockTime timestamp;
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      stream = gst_pad_get_element_private (pad);
Packit 971217
      if (!stream) {
Packit 971217
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
        GST_WARNING_OBJECT (pad, "EOS for unknown stream");
Packit 971217
        break;
Packit 971217
      }
Packit 971217
Packit 971217
      GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
Packit 971217
      stream->is_eos = TRUE;
Packit 971217
Packit 971217
      seen_data = stream->seen_data;
Packit 971217
      srcpad = gst_object_ref (stream->srcpad);
Packit 971217
Packit 971217
      if (seen_data && stream->segment.position != -1)
Packit 971217
        timestamp = stream->segment.position;
Packit 971217
      else if (stream->segment.rate < 0.0 || stream->segment.stop == -1)
Packit 971217
        timestamp = stream->segment.start;
Packit 971217
      else
Packit 971217
        timestamp = stream->segment.stop;
Packit 971217
Packit 971217
      stream->segment.position = timestamp;
Packit 971217
Packit 971217
      for (l = self->streams; l; l = l->next) {
Packit 971217
        GstSyncStream *ostream = l->data;
Packit 971217
Packit 971217
        all_eos = all_eos && ostream->is_eos;
Packit 971217
        if (!all_eos)
Packit 971217
          break;
Packit 971217
      }
Packit 971217
Packit 971217
      if (all_eos) {
Packit 971217
        GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
Packit 971217
        self->eos = TRUE;
Packit 971217
        for (l = self->streams; l; l = l->next) {
Packit 971217
          GstSyncStream *ostream = l->data;
Packit 971217
          /* local snapshot of current pads */
Packit 971217
          gst_object_ref (ostream->srcpad);
Packit 971217
          pads = g_slist_prepend (pads, ostream->srcpad);
Packit 971217
        }
Packit 971217
      }
Packit 971217
      if (pads) {
Packit 971217
        GstPad *pad;
Packit 971217
        GSList *epad;
Packit 971217
        GstSyncStream *ostream;
Packit 971217
Packit 971217
        ret = TRUE;
Packit 971217
        epad = pads;
Packit 971217
        while (epad) {
Packit 971217
          pad = epad->data;
Packit 971217
          ostream = gst_pad_get_element_private (pad);
Packit 971217
          if (ostream) {
Packit 971217
            g_cond_broadcast (&ostream->stream_finish_cond);
Packit 971217
          }
Packit 971217
Packit 971217
          gst_object_unref (pad);
Packit 971217
          epad = g_slist_next (epad);
Packit 971217
        }
Packit 971217
        g_slist_free (pads);
Packit 971217
      } else {
Packit 971217
        if (seen_data) {
Packit 971217
          stream->send_gap_event = TRUE;
Packit 971217
          stream->gap_duration = GST_CLOCK_TIME_NONE;
Packit 971217
          stream->wait = TRUE;
Packit 971217
          ret = gst_stream_synchronizer_wait (self, srcpad);
Packit 971217
        }
Packit 971217
      }
Packit 971217
Packit 971217
      /* send eos if haven't seen data. seen_data will be true if data buffer
Packit 971217
       * of the track have received in anytime. sink is ready if seen_data is
Packit 971217
       * true, so can send GAP event. Will send EOS if sink isn't ready. The
Packit 971217
       * scenario for the case is one track haven't any media data and then
Packit 971217
       * send EOS. Or no any valid media data in one track, so decoder can't
Packit 971217
       * get valid CAPS for the track. sink can't ready without received CAPS.*/
Packit 971217
      if (!seen_data || self->eos) {
Packit 971217
        GST_DEBUG_OBJECT (pad, "send EOS event");
Packit 971217
        /* drop lock when sending eos, which may block in e.g. preroll */
Packit 971217
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
        ret = gst_pad_push_event (srcpad, gst_event_new_eos ());
Packit 971217
        GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
        stream = gst_pad_get_element_private (pad);
Packit 971217
        if (stream) {
Packit 971217
          stream->eos_sent = TRUE;
Packit 971217
        }
Packit 971217
      }
Packit 971217
Packit 971217
      gst_object_unref (srcpad);
Packit 971217
      gst_event_unref (event);
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      goto done;
Packit 971217
    }
Packit 971217
    default:
Packit 971217
      break;
Packit 971217
  }
Packit 971217
Packit 971217
  ret = gst_pad_event_default (pad, parent, event);
Packit 971217
Packit 971217
done:
Packit 971217
Packit 971217
  return ret;
Packit 971217
}
Packit 971217
Packit 971217
static GstFlowReturn
Packit 971217
gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
Packit 971217
    GstBuffer * buffer)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
Packit 971217
  GstPad *opad;
Packit 971217
  GstFlowReturn ret = GST_FLOW_ERROR;
Packit 971217
  GstSyncStream *stream;
Packit 971217
  GstClockTime duration = GST_CLOCK_TIME_NONE;
Packit 971217
  GstClockTime timestamp = GST_CLOCK_TIME_NONE;
Packit 971217
  GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;
Packit 971217
Packit 971217
  GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
Packit 971217
      ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
Packit 971217
      " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
Packit 971217
      buffer, gst_buffer_get_size (buffer),
Packit 971217
      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
Packit 971217
      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
Packit 971217
      GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));
Packit 971217
Packit 971217
  timestamp = GST_BUFFER_TIMESTAMP (buffer);
Packit 971217
  duration = GST_BUFFER_DURATION (buffer);
Packit 971217
  if (GST_CLOCK_TIME_IS_VALID (timestamp)
Packit 971217
      && GST_CLOCK_TIME_IS_VALID (duration))
Packit 971217
    timestamp_end = timestamp + duration;
Packit 971217
Packit 971217
  GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
  stream = gst_pad_get_element_private (pad);
Packit 971217
Packit 971217
  if (stream) {
Packit 971217
    stream->seen_data = TRUE;
Packit 971217
    if (stream->segment.format == GST_FORMAT_TIME
Packit 971217
        && GST_CLOCK_TIME_IS_VALID (timestamp)) {
Packit 971217
      GST_LOG_OBJECT (pad,
Packit 971217
          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
Packit 971217
          GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
Packit 971217
      if (stream->segment.rate > 0.0)
Packit 971217
        stream->segment.position = timestamp;
Packit 971217
      else
Packit 971217
        stream->segment.position = timestamp_end;
Packit 971217
    }
Packit 971217
  }
Packit 971217
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
Packit 971217
  opad = gst_stream_get_other_pad_from_pad (self, pad);
Packit 971217
  if (opad) {
Packit 971217
    ret = gst_pad_push (opad, buffer);
Packit 971217
    gst_object_unref (opad);
Packit 971217
  }
Packit 971217
Packit 971217
  GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
Packit 971217
  if (ret == GST_FLOW_OK) {
Packit 971217
    GList *l;
Packit 971217
Packit 971217
    GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
    stream = gst_pad_get_element_private (pad);
Packit 971217
    if (stream && stream->segment.format == GST_FORMAT_TIME) {
Packit 971217
      GstClockTime position;
Packit 971217
Packit 971217
      if (stream->segment.rate > 0.0)
Packit 971217
        position = timestamp_end;
Packit 971217
      else
Packit 971217
        position = timestamp;
Packit 971217
Packit 971217
      if (GST_CLOCK_TIME_IS_VALID (position)) {
Packit 971217
        GST_LOG_OBJECT (pad,
Packit 971217
            "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
Packit 971217
            GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (position));
Packit 971217
        stream->segment.position = position;
Packit 971217
      }
Packit 971217
    }
Packit 971217
Packit 971217
    /* Advance EOS streams if necessary. For non-EOS
Packit 971217
     * streams the demuxers should already do this! */
Packit 971217
    if (!GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
Packit 971217
        GST_CLOCK_TIME_IS_VALID (timestamp)) {
Packit 971217
      timestamp_end = timestamp + GST_SECOND;
Packit 971217
    }
Packit 971217
Packit 971217
    for (l = self->streams; l; l = l->next) {
Packit 971217
      GstSyncStream *ostream = l->data;
Packit 971217
      gint64 position;
Packit 971217
Packit 971217
      if (!ostream->is_eos || ostream->eos_sent ||
Packit 971217
          ostream->segment.format != GST_FORMAT_TIME)
Packit 971217
        continue;
Packit 971217
Packit 971217
      if (ostream->segment.position != -1)
Packit 971217
        position = ostream->segment.position;
Packit 971217
      else
Packit 971217
        position = ostream->segment.start;
Packit 971217
Packit 971217
      /* Is there a 1 second lag? */
Packit 971217
      if (position != -1 && GST_CLOCK_TIME_IS_VALID (timestamp_end) &&
Packit 971217
          position + GST_SECOND < timestamp_end) {
Packit 971217
        gint64 new_start;
Packit 971217
Packit 971217
        new_start = timestamp_end - GST_SECOND;
Packit 971217
Packit 971217
        GST_DEBUG_OBJECT (ostream->sinkpad,
Packit 971217
            "Advancing stream %u from %" GST_TIME_FORMAT " to %"
Packit 971217
            GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
Packit 971217
            GST_TIME_ARGS (new_start));
Packit 971217
Packit 971217
        ostream->segment.position = new_start;
Packit 971217
Packit 971217
        ostream->send_gap_event = TRUE;
Packit 971217
        ostream->gap_duration = new_start - position;
Packit 971217
        g_cond_broadcast (&ostream->stream_finish_cond);
Packit 971217
      }
Packit 971217
    }
Packit 971217
    GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
  }
Packit 971217
Packit 971217
  return ret;
Packit 971217
}
Packit 971217
Packit 971217
/* GstElement vfuncs */
Packit 971217
static GstPad *
Packit 971217
gst_stream_synchronizer_request_new_pad (GstElement * element,
Packit 971217
    GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
Packit 971217
  GstSyncStream *stream;
Packit 971217
  gchar *tmp;
Packit 971217
Packit 971217
  GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
  GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
Packit 971217
      self->current_stream_number);
Packit 971217
Packit 971217
  stream = g_slice_new0 (GstSyncStream);
Packit 971217
  stream->transform = self;
Packit 971217
  stream->stream_number = self->current_stream_number;
Packit 971217
  g_cond_init (&stream->stream_finish_cond);
Packit 971217
  stream->stream_start_seqnum = G_MAXUINT32;
Packit 971217
  stream->segment_seqnum = G_MAXUINT32;
Packit 971217
  stream->group_id = G_MAXUINT;
Packit 971217
  stream->seen_data = FALSE;
Packit 971217
  stream->send_gap_event = FALSE;
Packit 971217
Packit 971217
  tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
Packit 971217
  stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
Packit 971217
  g_free (tmp);
Packit 971217
  gst_pad_set_element_private (stream->sinkpad, stream);
Packit 971217
  gst_pad_set_iterate_internal_links_function (stream->sinkpad,
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
Packit 971217
  gst_pad_set_event_function (stream->sinkpad,
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
Packit 971217
  gst_pad_set_chain_function (stream->sinkpad,
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));
Packit 971217
  GST_PAD_SET_PROXY_CAPS (stream->sinkpad);
Packit 971217
  GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
Packit 971217
  GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
Packit 971217
Packit 971217
  tmp = g_strdup_printf ("src_%u", self->current_stream_number);
Packit 971217
  stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
Packit 971217
  g_free (tmp);
Packit 971217
  gst_pad_set_element_private (stream->srcpad, stream);
Packit 971217
  gst_pad_set_iterate_internal_links_function (stream->srcpad,
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
Packit 971217
  gst_pad_set_event_function (stream->srcpad,
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));
Packit 971217
  GST_PAD_SET_PROXY_CAPS (stream->srcpad);
Packit 971217
  GST_PAD_SET_PROXY_ALLOCATION (stream->srcpad);
Packit 971217
  GST_PAD_SET_PROXY_SCHEDULING (stream->srcpad);
Packit 971217
Packit 971217
  gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
Packit 971217
Packit 971217
  self->streams = g_list_prepend (self->streams, stream);
Packit 971217
  self->current_stream_number++;
Packit 971217
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
Packit 971217
  /* Add pads and activate unless we're going to NULL */
Packit 971217
  g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
Packit 971217
  if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
Packit 971217
    gst_pad_set_active (stream->srcpad, TRUE);
Packit 971217
    gst_pad_set_active (stream->sinkpad, TRUE);
Packit 971217
  }
Packit 971217
  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
Packit 971217
  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
Packit 971217
  g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
Packit 971217
Packit 971217
  return stream->sinkpad;
Packit 971217
}
Packit 971217
Packit 971217
/* Must be called with lock! */
Packit 971217
static void
Packit 971217
gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
Packit 971217
    GstSyncStream * stream)
Packit 971217
{
Packit 971217
  GList *l;
Packit 971217
Packit 971217
  GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);
Packit 971217
Packit 971217
  for (l = self->streams; l; l = l->next) {
Packit 971217
    if (l->data == stream) {
Packit 971217
      self->streams = g_list_delete_link (self->streams, l);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
  }
Packit 971217
  g_assert (l != NULL);
Packit 971217
  if (self->streams == NULL) {
Packit 971217
    self->have_group_id = TRUE;
Packit 971217
    self->group_id = G_MAXUINT;
Packit 971217
  }
Packit 971217
Packit 971217
  /* we can drop the lock, since stream exists now only local.
Packit 971217
   * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
Packit 971217
   * (due to reverse lock order) when deactivating pads */
Packit 971217
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
Packit 971217
  gst_pad_set_element_private (stream->srcpad, NULL);
Packit 971217
  gst_pad_set_element_private (stream->sinkpad, NULL);
Packit 971217
  gst_pad_set_active (stream->srcpad, FALSE);
Packit 971217
  gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
Packit 971217
  gst_pad_set_active (stream->sinkpad, FALSE);
Packit 971217
  gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
Packit 971217
Packit 971217
  g_cond_clear (&stream->stream_finish_cond);
Packit 971217
  g_slice_free (GstSyncStream, stream);
Packit 971217
Packit 971217
  /* NOTE: In theory we have to check here if all streams
Packit 971217
   * are EOS but the one that was removed wasn't and then
Packit 971217
   * send EOS downstream. But due to the way how playsink
Packit 971217
   * works this is not necessary and will only cause problems
Packit 971217
   * for gapless playback. playsink will only add/remove pads
Packit 971217
   * when it's reconfigured, which happens when the streams
Packit 971217
   * change
Packit 971217
   */
Packit 971217
Packit 971217
  /* lock for good measure, since the caller had it */
Packit 971217
  GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
}
Packit 971217
Packit 971217
static void
Packit 971217
gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
Packit 971217
  GstSyncStream *stream;
Packit 971217
Packit 971217
  GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
  stream = gst_pad_get_element_private (pad);
Packit 971217
  if (stream) {
Packit 971217
    g_assert (stream->sinkpad == pad);
Packit 971217
Packit 971217
    gst_stream_synchronizer_release_stream (self, stream);
Packit 971217
  }
Packit 971217
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
}
Packit 971217
Packit 971217
static GstStateChangeReturn
Packit 971217
gst_stream_synchronizer_change_state (GstElement * element,
Packit 971217
    GstStateChange transition)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
Packit 971217
  GstStateChangeReturn ret;
Packit 971217
Packit 971217
  switch (transition) {
Packit 971217
    case GST_STATE_CHANGE_NULL_TO_READY:
Packit 971217
      GST_DEBUG_OBJECT (self, "State change NULL->READY");
Packit 971217
      self->shutdown = FALSE;
Packit 971217
      break;
Packit 971217
    case GST_STATE_CHANGE_READY_TO_PAUSED:
Packit 971217
      GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
Packit 971217
      self->group_start_time = 0;
Packit 971217
      self->have_group_id = TRUE;
Packit 971217
      self->group_id = G_MAXUINT;
Packit 971217
      self->shutdown = FALSE;
Packit 971217
      self->flushing = FALSE;
Packit 971217
      self->eos = FALSE;
Packit 971217
      break;
Packit 971217
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
Packit 971217
      GList *l;
Packit 971217
Packit 971217
      GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      self->flushing = TRUE;
Packit 971217
      self->shutdown = TRUE;
Packit 971217
      for (l = self->streams; l; l = l->next) {
Packit 971217
        GstSyncStream *ostream = l->data;
Packit 971217
        g_cond_broadcast (&ostream->stream_finish_cond);
Packit 971217
      }
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
    }
Packit 971217
    default:
Packit 971217
      break;
Packit 971217
  }
Packit 971217
Packit 971217
  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
Packit 971217
  GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
Packit 971217
  if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
Packit 971217
    return ret;
Packit 971217
Packit 971217
  switch (transition) {
Packit 971217
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
Packit 971217
      GList *l;
Packit 971217
Packit 971217
      GST_DEBUG_OBJECT (self, "State change PLAYING->PAUSED");
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      for (l = self->streams; l; l = l->next) {
Packit 971217
        GstSyncStream *stream = l->data;
Packit 971217
        /* send GAP event to sink to finished pre-roll. The reason is function
Packit 971217
         * chain () will be blocked on pad_push (), so can't trigger the track
Packit 971217
         * which reach EOS to send GAP event. */
Packit 971217
        if (stream->is_eos && !stream->eos_sent) {
Packit 971217
          stream->send_gap_event = TRUE;
Packit 971217
          stream->gap_duration = GST_CLOCK_TIME_NONE;
Packit 971217
          g_cond_broadcast (&stream->stream_finish_cond);
Packit 971217
        }
Packit 971217
      }
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
Packit 971217
      GList *l;
Packit 971217
Packit 971217
      GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
Packit 971217
      self->group_start_time = 0;
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      for (l = self->streams; l; l = l->next) {
Packit 971217
        GstSyncStream *stream = l->data;
Packit 971217
Packit 971217
        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
Packit 971217
        stream->gap_duration = GST_CLOCK_TIME_NONE;
Packit 971217
        stream->wait = FALSE;
Packit 971217
        stream->is_eos = FALSE;
Packit 971217
        stream->eos_sent = FALSE;
Packit 971217
        stream->flushing = FALSE;
Packit 971217
        stream->send_gap_event = FALSE;
Packit 971217
      }
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    case GST_STATE_CHANGE_READY_TO_NULL:{
Packit 971217
      GST_DEBUG_OBJECT (self, "State change READY->NULL");
Packit 971217
Packit 971217
      GST_STREAM_SYNCHRONIZER_LOCK (self);
Packit 971217
      self->current_stream_number = 0;
Packit 971217
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
Packit 971217
      break;
Packit 971217
    }
Packit 971217
    default:
Packit 971217
      break;
Packit 971217
  }
Packit 971217
Packit 971217
  return ret;
Packit 971217
}
Packit 971217
Packit 971217
/* GObject vfuncs */
Packit 971217
static void
Packit 971217
gst_stream_synchronizer_finalize (GObject * object)
Packit 971217
{
Packit 971217
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);
Packit 971217
Packit 971217
  g_mutex_clear (&self->lock);
Packit 971217
Packit 971217
  G_OBJECT_CLASS (parent_class)->finalize (object);
Packit 971217
}
Packit 971217
Packit 971217
/* GObject type initialization */
Packit 971217
static void
Packit 971217
gst_stream_synchronizer_init (GstStreamSynchronizer * self)
Packit 971217
{
Packit 971217
  g_mutex_init (&self->lock);
Packit 971217
}
Packit 971217
Packit 971217
static void
Packit 971217
gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
Packit 971217
{
Packit 971217
  GObjectClass *gobject_class = (GObjectClass *) klass;
Packit 971217
  GstElementClass *element_class = (GstElementClass *) klass;
Packit 971217
Packit 971217
  gobject_class->finalize = gst_stream_synchronizer_finalize;
Packit 971217
Packit 971217
  gst_element_class_add_static_pad_template (element_class, &srctemplate);
Packit 971217
  gst_element_class_add_static_pad_template (element_class, &sinktemplate);
Packit 971217
Packit 971217
  gst_element_class_set_static_metadata (element_class,
Packit 971217
      "Stream Synchronizer", "Generic",
Packit 971217
      "Synchronizes a group of streams to have equal durations and starting points",
Packit 971217
      "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
Packit 971217
Packit 971217
  element_class->change_state =
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
Packit 971217
  element_class->request_new_pad =
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
Packit 971217
  element_class->release_pad =
Packit 971217
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
Packit 971217
}
Packit 971217
Packit 971217
gboolean
Packit 971217
gst_stream_synchronizer_plugin_init (GstPlugin * plugin)
Packit 971217
{
Packit 971217
  GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
Packit 971217
      "streamsynchronizer", 0, "Stream Synchronizer");
Packit 971217
Packit 971217
  return gst_element_register (plugin, "streamsynchronizer", GST_RANK_NONE,
Packit 971217
      GST_TYPE_STREAM_SYNCHRONIZER);
Packit 971217
}