Blob Blame History Raw
/*
 * Farstream - Farstream RTP Packet modder
 *
 * Copyright 2010 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 * Copyright 2010 Nokia Corp.
 *
 * fs-rtp-packet-modder.c - Filter to modify RTP packets
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */


#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "fs-rtp-packet-modder.h"

GST_DEBUG_CATEGORY_STATIC (fs_rtp_packet_modder_debug);
#define GST_CAT_DEFAULT fs_rtp_packet_modder_debug

static GstStaticPadTemplate fs_rtp_packet_modder_sink_template =
    GST_STATIC_PAD_TEMPLATE ("sink",
        GST_PAD_SINK,
        GST_PAD_ALWAYS,
        GST_STATIC_CAPS ("application/x-rtp"));

static GstStaticPadTemplate fs_rtp_packet_modder_src_template =
    GST_STATIC_PAD_TEMPLATE ("src",
        GST_PAD_SRC,
        GST_PAD_ALWAYS,
        GST_STATIC_CAPS ("application/x-rtp"));

G_DEFINE_TYPE (FsRtpPacketModder, fs_rtp_packet_modder, GST_TYPE_ELEMENT);

static GstFlowReturn fs_rtp_packet_modder_chain (GstPad *pad,
    GstObject *parent, GstBuffer *buffer);
static GstCaps *fs_rtp_packet_modder_getcaps (FsRtpPacketModder *self,
    GstPad *pad, GstCaps *filter);
static gboolean fs_rtp_packet_modder_sink_event (GstPad *pad,
    GstObject *parent,
    GstEvent *event);
static gboolean fs_rtp_packet_modder_query (GstPad *pad,
    GstObject *parent,
    GstQuery *query);
static GstStateChangeReturn fs_rtp_packet_modder_change_state (
  GstElement *element, GstStateChange transition);


static void
fs_rtp_packet_modder_class_init (FsRtpPacketModderClass *klass)
{
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

  GST_DEBUG_CATEGORY_INIT
      (fs_rtp_packet_modder_debug, "fsrtppacketmodder", 0,
          "fsrtppacketmodder element");

  gst_element_class_set_details_simple (gstelement_class,
      "Farstream RTP Packet modder",
      "Generic",
      "Filter that can modify RTP packets",
      "Olivier Crete <olivier.crete@collabora.co.uk>");

  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&fs_rtp_packet_modder_sink_template));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&fs_rtp_packet_modder_src_template));

  gstelement_class->change_state = fs_rtp_packet_modder_change_state;
}

static void
fs_rtp_packet_modder_init (FsRtpPacketModder *self)
{
  gst_segment_init (&self->segment, GST_FORMAT_TIME);

  self->sinkpad = gst_pad_new_from_static_template (
    &fs_rtp_packet_modder_sink_template, "sink");
  gst_pad_set_chain_function (self->sinkpad, fs_rtp_packet_modder_chain);
  gst_pad_set_query_function (self->sinkpad, fs_rtp_packet_modder_query);
  gst_pad_set_event_function (self->sinkpad, fs_rtp_packet_modder_sink_event);
  GST_PAD_SET_PROXY_CAPS (self->sinkpad);
  gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);

  self->srcpad = gst_pad_new_from_static_template (
    &fs_rtp_packet_modder_src_template, "src");
  gst_pad_set_query_function (self->srcpad, fs_rtp_packet_modder_query);
  gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
}

FsRtpPacketModder *
fs_rtp_packet_modder_new (FsRtpPacketModderFunc modder_func,
    FsRtpPacketModderSyncTimeFunc sync_func,
    gpointer user_data)
{
  FsRtpPacketModder *self;

  g_return_val_if_fail (modder_func != NULL, NULL);
  g_return_val_if_fail (sync_func != NULL, NULL);

  self = g_object_new (FS_TYPE_RTP_PACKET_MODDER, NULL);

  self->modder_func = modder_func;
  self->sync_func = sync_func;
  self->user_data = user_data;

  return self;
}

static void
fs_rtp_packet_modder_sync_to_clock (FsRtpPacketModder *self,
  GstClockTime buffer_ts)
{
  GstClockTime running_time;
  GstClockTime sync_time;
  GstClockID id;
  GstClock *clock;
  GstClockReturn clockret;

  GST_OBJECT_LOCK (self);
  running_time = gst_segment_to_running_time (&self->segment, GST_FORMAT_TIME,
     buffer_ts);

  do {
    sync_time = running_time + GST_ELEMENT_CAST (self)->base_time +
        self->peer_latency;

    clock = GST_ELEMENT_CLOCK (self);
    if (!clock) {
      GST_OBJECT_UNLOCK (self);
      /* let's just push if there is no clock */
      GST_LOG_OBJECT (self, "No clock, push right away");
      return;
    }

    GST_LOG_OBJECT (self, "sync to running timestamp %" GST_TIME_FORMAT,
        GST_TIME_ARGS (running_time));

    id = self->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
    self->unscheduled = FALSE;
    GST_OBJECT_UNLOCK (self);

    clockret = gst_clock_id_wait (id, NULL);

    GST_OBJECT_LOCK (self);
    gst_clock_id_unref (id);
    self->clock_id = NULL;

  } while  (clockret == GST_CLOCK_UNSCHEDULED && !self->unscheduled);
  GST_OBJECT_UNLOCK (self);
}

static GstFlowReturn
fs_rtp_packet_modder_chain (GstPad *pad, GstObject *parent, GstBuffer *buffer)
{
  FsRtpPacketModder *self = FS_RTP_PACKET_MODDER (parent);
  GstFlowReturn ret = GST_FLOW_ERROR;
  GstClockTime buffer_ts = GST_BUFFER_TIMESTAMP (buffer);

  if (GST_CLOCK_TIME_IS_VALID (buffer_ts))
    buffer_ts = self->sync_func (self, buffer, self->user_data);

  if (GST_CLOCK_TIME_IS_VALID (buffer_ts))
    fs_rtp_packet_modder_sync_to_clock (self, buffer_ts);

  buffer = self->modder_func (self, buffer, buffer_ts, self->user_data);

  if (!buffer)
  {
    GST_LOG_OBJECT (self, "Got NULL from FsRtpPacketModderFunc");
    goto invalid;
  }

  ret = gst_pad_push (self->srcpad, buffer);

invalid:

  return ret;
}


static GstCaps *
fs_rtp_packet_modder_getcaps (FsRtpPacketModder *self, GstPad *pad,
  GstCaps *filter)
{
  GstPad *peer;
  GstCaps *caps;
  GstPad *otherpad = self->sinkpad == pad ? self->srcpad : self->sinkpad;

  peer = gst_pad_get_peer (otherpad);

  if (peer)
  {
    GstCaps *peercaps;
    peercaps = gst_pad_query_caps (peer, filter);
    caps = gst_caps_intersect (peercaps, gst_pad_get_pad_template_caps (pad));
    gst_caps_unref (peercaps);
    gst_object_unref (peer);
  }
  else
  {
    caps = gst_caps_intersect (gst_pad_get_pad_template_caps (pad), filter);
  }

  return caps;
}

static gboolean
fs_rtp_packet_modder_sink_event (GstPad *pad, GstObject *parent,
    GstEvent *event)
{
  FsRtpPacketModder *self = FS_RTP_PACKET_MODDER (parent);

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_SEGMENT:
    {
      gst_event_copy_segment (event, &self->segment);

      if (self->segment.format != GST_FORMAT_TIME)
        goto newseg_wrong_format;
      break;
    }
    case GST_EVENT_FLUSH_START:
      GST_OBJECT_LOCK (self);
      if (self->clock_id)
      {
        gst_clock_id_unschedule (self->clock_id);
        self->unscheduled = TRUE;
      }
      GST_OBJECT_UNLOCK (self);
      break;
    default:
      break;
  }

  return gst_pad_push_event (self->srcpad, event);

newseg_wrong_format:

  GST_DEBUG_OBJECT (self, "received non TIME segment");
  gst_event_unref (event);
  return FALSE;
}

static GstStateChangeReturn
fs_rtp_packet_modder_change_state (GstElement *element,
    GstStateChange transition)
{
  FsRtpPacketModder *self;
  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;

  self = FS_RTP_PACKET_MODDER (element);

  switch (transition) {
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      GST_OBJECT_LOCK (self);
      /* reset negotiated values */
      self->peer_latency = 0;
      GST_OBJECT_UNLOCK (self);
      break;
    default:
      break;
  }

  ret = GST_ELEMENT_CLASS (fs_rtp_packet_modder_parent_class)->change_state (
    element, transition);

  switch (transition) {
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      /* we are a live element because we sync to the clock, which we can only
       * do in the PLAYING state */
      if (ret != GST_STATE_CHANGE_FAILURE)
        ret = GST_STATE_CHANGE_NO_PREROLL;
      break;
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
      GST_OBJECT_LOCK (self);
      if (self->clock_id)
      {
        gst_clock_id_unschedule (self->clock_id);
        self->unscheduled = TRUE;
      }
      GST_OBJECT_UNLOCK (self);
      break;
   default:
      break;
  }

  return ret;
}


static gboolean
fs_rtp_packet_modder_query (GstPad *pad, GstObject *parent, GstQuery *query)
{
  FsRtpPacketModder *self = FS_RTP_PACKET_MODDER (parent);
  gboolean res = FALSE;

  switch (GST_QUERY_TYPE (query)) {
    case GST_QUERY_CAPS:
    {
      GstCaps *caps, *filter;

      gst_query_parse_caps (query, &filter);
      caps = fs_rtp_packet_modder_getcaps (self, pad, filter);
      gst_query_set_caps_result (query, caps);
      gst_caps_unref (caps);
      res = TRUE;
      break;
    }
    case GST_QUERY_LATENCY:
    {
      /* We need to send the query upstream and add the returned latency to our
       * own */
      GstClockTime min_latency, max_latency;
      gboolean us_live;

      if ((res = gst_pad_peer_query (self->sinkpad, query))) {
        gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);

        GST_DEBUG_OBJECT (self, "Peer latency: min %"
            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));

        /* store this so that we can safely sync on the peer buffers. */
        GST_OBJECT_LOCK (self);
        self->peer_latency = min_latency;
        if (self->clock_id)
          gst_clock_id_unschedule (self->clock_id);
        GST_OBJECT_UNLOCK (self);

        /* we add some latency but can buffer an infinite amount of time */

        GST_DEBUG_OBJECT (self, "Calculated total latency : min %"
            GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
            GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));

        gst_query_set_latency (query, TRUE, min_latency, max_latency);
      }
      break;
    }
    default:
      res = gst_pad_query_default (pad, parent, query);
      break;
  }

  return res;
}