Blob Blame History Raw
/*
 * Farstream - Farstream RTP TFRC support
 *
 * Copyright 2010 Collabora Ltd.
 *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
 * Copyright 2010 Nokia Corp.
 *
 * fs-rtp-tfrc.c - Rate control for Farstream RTP sessions
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "fs-rtp-tfrc.h"

#include <string.h>

#include "fs-rtp-packet-modder.h"
#include "farstream/fs-rtp.h"
#include "fs-rtp-codec-negotiation.h"

#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>

#define ONE_32BIT_CYCLE ((guint64) (((guint64)0xffffffff) + ((guint64)1)))


GST_DEBUG_CATEGORY_STATIC (fsrtpconference_tfrc);
#define GST_CAT_DEFAULT fsrtpconference_tfrc

G_DEFINE_TYPE (FsRtpTfrc, fs_rtp_tfrc, GST_TYPE_OBJECT);

/* props */
enum
{
  PROP_0,
  PROP_BITRATE,
  PROP_SENDING
};

static void fs_rtp_tfrc_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec);
static void fs_rtp_tfrc_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec);
static void fs_rtp_tfrc_dispose (GObject *object);

static void fs_rtp_tfrc_update_sender_timer_locked (
  FsRtpTfrc *self,
  struct TrackedSource *src,
  guint64 now);

static gboolean feedback_timer_expired (GstClock *clock, GstClockTime time,
    GstClockID id, gpointer user_data);

static void fs_rtp_tfrc_clear_sender (FsRtpTfrc *self);

static void
fs_rtp_tfrc_class_init (FsRtpTfrcClass *klass)
{
  GObjectClass *gobject_class;

  gobject_class = (GObjectClass *) klass;

  gobject_class->get_property = fs_rtp_tfrc_get_property;
  gobject_class->set_property = fs_rtp_tfrc_set_property;
  gobject_class->dispose = fs_rtp_tfrc_dispose;

  g_object_class_install_property (gobject_class,
      PROP_BITRATE,
      g_param_spec_uint ("bitrate",
          "The bitrate at which data should be sent",
          "The bitrate that the session should try to send at in bits/sec",
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

 g_object_class_install_property (gobject_class,
      PROP_SENDING,
      g_param_spec_boolean ("sending",
          "The bitrate at which data should be sent",
          "The bitrate that the session should try to send at in bits/sec",
          FALSE, G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
}


static struct TrackedSource *
tracked_src_new (FsRtpTfrc *self)
{
  struct TrackedSource *src;

  src = g_slice_new0 (struct TrackedSource);
  src->self = self;
  src->next_feedback_timer = G_MAXUINT64;

  return src;
}

static void
tracked_src_free (struct TrackedSource *src)
{
  if (src->sender_id)
  {
    gst_clock_id_unschedule (src->sender_id);
    gst_clock_id_unref (src->sender_id);
  }

  if (src->receiver_id)
  {
    gst_clock_id_unschedule (src->receiver_id);
    gst_clock_id_unref (src->receiver_id);
  }

  if (src->rtpsource)
    g_object_unref (src->rtpsource);

  if (src->sender)
    tfrc_sender_free (src->sender);
  if (src->receiver)
    tfrc_receiver_free (src->receiver);

  if (src->idl)
    tfrc_is_data_limited_free (src->idl);

  g_slice_free (struct TrackedSource, src);
}

static void
fs_rtp_tfrc_init (FsRtpTfrc *self)
{
  GST_DEBUG_CATEGORY_INIT (fsrtpconference_tfrc,
      "fsrtpconference_tfrc", 0,
      "Farstream RTP Conference Element Rate Control logic");

  /* member init */

  self->tfrc_sources = g_hash_table_new_full (g_direct_hash,
      g_direct_equal, NULL, (GDestroyNotify) tracked_src_free);

  fs_rtp_tfrc_clear_sender (self);
  self->send_bitrate = tfrc_sender_get_send_rate (NULL)  * 8;

  self->extension_type = EXTENSION_NONE;
  self->extension_id = 0;
  memset (self->pts, 0, 128);

  self->systemclock = gst_system_clock_obtain ();
}

void
fs_rtp_tfrc_destroy (FsRtpTfrc *self)
{
  GST_OBJECT_LOCK (self);

  if (self->modder_check_probe_id)
    gst_pad_remove_probe (self->in_rtp_pad, self->modder_check_probe_id);
  self->modder_check_probe_id = 0;

  if (self->in_rtp_probe_id)
    gst_pad_remove_probe (self->in_rtp_pad, self->in_rtp_probe_id);
  self->in_rtp_probe_id = 0;
  if (self->in_rtcp_probe_id)
    gst_pad_remove_probe (self->in_rtcp_pad, self->in_rtcp_probe_id);
  self->in_rtcp_probe_id = 0;


  if (self->on_ssrc_validated_id)
    g_signal_handler_disconnect (self->rtpsession, self->on_ssrc_validated_id);
  self->on_ssrc_validated_id = 0;
  if (self->on_sending_rtcp_id)
    g_signal_handler_disconnect (self->rtpsession, self->on_sending_rtcp_id);
  self->on_sending_rtcp_id = 0;

  g_hash_table_destroy (g_hash_table_ref (self->tfrc_sources));

  self->fsrtpsession = NULL;

  GST_OBJECT_UNLOCK (self);
}

static void
fs_rtp_tfrc_dispose (GObject *object)
{
  FsRtpTfrc *self = FS_RTP_TFRC (object);

  GST_OBJECT_LOCK (self);

  if (self->tfrc_sources)
    g_hash_table_destroy (self->tfrc_sources);
  self->tfrc_sources = NULL;
  self->last_src = NULL;

  if (self->initial_src)
    tracked_src_free (self->initial_src);
  self->initial_src = NULL;

  if (self->packet_modder)
  {
    gst_bin_remove (self->parent_bin, self->packet_modder);
    gst_element_set_state (self->packet_modder, GST_STATE_NULL);
    g_object_unref (self->packet_modder);
  }

  if (self->rtpsession)
      g_object_unref (self->rtpsession);
  if (self->in_rtp_pad)
    g_object_unref (self->in_rtp_pad);
  if (self->in_rtcp_pad)
    g_object_unref (self->in_rtcp_pad);
  if (self->out_rtp_pad)
    g_object_unref (self->out_rtp_pad);

  if (self->parent_bin)
    gst_object_unref (self->parent_bin);

  gst_object_unref (self->systemclock);
  self->systemclock = NULL;

  GST_OBJECT_UNLOCK (self);

  if (G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose)
    G_OBJECT_CLASS (fs_rtp_tfrc_parent_class)->dispose (object);
}


static void
fs_rtp_tfrc_get_property (GObject *object,
    guint prop_id,
    GValue *value,
    GParamSpec *pspec)
{
  FsRtpTfrc *self = FS_RTP_TFRC (object);

  switch (prop_id)
  {
    case PROP_BITRATE:
      GST_OBJECT_LOCK (self);
      g_value_set_uint (value, self->send_bitrate);
      GST_OBJECT_UNLOCK (self);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

gboolean
clear_sender (gpointer key, gpointer value, gpointer user_data)
{
  FsRtpTfrc *self = FS_RTP_TFRC (user_data);
  struct TrackedSource *src = value;

  src->send_ts_base = 0;
  src->send_ts_cycles = 0;
  src->fb_last_ts = 0;
  src->fb_ts_cycles = 0;

  if (src->sender_id)
  {
    gst_clock_id_unschedule (src->sender_id);
    gst_clock_id_unref (src->sender_id);
    src->sender_id = 0;
  }

  if (src->sender)
    tfrc_sender_free (src->sender);
  src->sender = NULL;

  if (src->idl)
  {
    tfrc_is_data_limited_free (src->idl);
    src->idl = NULL;
  }

  if (self->last_src == src)
    self->last_src = NULL;

  if (src->receiver)
    return FALSE;
  else
    return TRUE;
}

static void
fs_rtp_tfrc_clear_sender (FsRtpTfrc *self)
{
  g_hash_table_foreach_remove (self->tfrc_sources, clear_sender, self);
  if (self->initial_src)
    if (clear_sender (NULL, self->initial_src, self))
      self->initial_src = NULL;

  self->last_sent_ts = GST_CLOCK_TIME_NONE;
  self->byte_reservoir = 1500; /* About one packet */
}

static void
fs_rtp_tfrc_set_property (GObject *object,
    guint prop_id,
    const GValue *value,
    GParamSpec *pspec)
{
  FsRtpTfrc *self = FS_RTP_TFRC (object);

  switch (prop_id)
  {
    case PROP_SENDING:
      GST_OBJECT_LOCK (self);
      self->sending = g_value_get_boolean (value);
      if (!self->sending)
        fs_rtp_tfrc_clear_sender (self);
      GST_OBJECT_UNLOCK (self);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}


static gboolean
fs_rtp_tfrc_update_bitrate_locked (FsRtpTfrc *self, const gchar *source)
{
  guint byterate;
  guint new_bitrate;
  gboolean ret;

  if (self->last_src && self->last_src->sender)
    byterate = tfrc_sender_get_send_rate (self->last_src->sender);
  else
    byterate = tfrc_sender_get_send_rate (NULL);

  if (G_LIKELY (byterate < G_MAXUINT / 8))
    new_bitrate = byterate * 8;
  else
    new_bitrate = G_MAXUINT;

  ret = self->send_bitrate != new_bitrate;

  if (ret)
    GST_DEBUG_OBJECT (self, "Send rate changed (%s): %u -> %u", source,
        self->send_bitrate, new_bitrate);

  self->send_bitrate = new_bitrate;

  return ret;
}

static guint64
fs_rtp_tfrc_get_now (FsRtpTfrc *self)
{
  return GST_TIME_AS_USECONDS (gst_clock_get_time (self->systemclock));
}


static struct TrackedSource *
fs_rtp_tfrc_get_remote_ssrc_locked (FsRtpTfrc *self, guint ssrc,
  GObject *rtpsource)
{
  struct TrackedSource *src;

  src = g_hash_table_lookup (self->tfrc_sources, GUINT_TO_POINTER (ssrc));

  if (G_LIKELY (src))
  {
    if (G_UNLIKELY (rtpsource && !src->rtpsource))
      src->rtpsource = g_object_ref (rtpsource);

    return src;
  }

  if (self->initial_src)
  {
    src = self->initial_src;
    self->initial_src = NULL;
    src->ssrc = ssrc;
    if (rtpsource && !src->rtpsource)
      src->rtpsource = g_object_ref (rtpsource);
    g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);
    return src;
  }

  src = tracked_src_new (self);
  src->ssrc = ssrc;
  if (rtpsource)
    src->rtpsource = g_object_ref (rtpsource);

  if (!self->last_src)
    self->last_src = src;

  g_hash_table_insert (self->tfrc_sources, GUINT_TO_POINTER (ssrc), src);

  return src;
}

static void
rtpsession_on_ssrc_validated (GObject *rtpsession, GObject *rtpsource,
    FsRtpTfrc *self)
{
  guint32 ssrc;

  g_object_get (rtpsource, "ssrc", &ssrc, NULL);

  GST_DEBUG_OBJECT (self, "ssrc validate: %X", ssrc);

  GST_OBJECT_LOCK (self);
  fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, rtpsource);
  GST_OBJECT_UNLOCK (self);
}

struct TimerData
{
  FsRtpTfrc *self;
  guint ssrc;
};

static struct TimerData *
build_timer_data (FsRtpTfrc *self, guint ssrc)
{
  struct TimerData *td = g_slice_new0 (struct TimerData);

  td->self = g_object_ref (self);
  td->ssrc = ssrc;

  return td;
}

static void
free_timer_data (gpointer data)
{
  struct TimerData *td = data;
  g_object_unref (td->self);
  g_slice_free (struct TimerData, td);
}

static void
fs_rtp_tfrc_set_receiver_timer_locked (FsRtpTfrc *self,
    struct TrackedSource *src, guint64 now)
{
  guint64 expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);
  GstClockReturn cret;

  if (expiry == 0)
    return;

  if (src->receiver_id)
  {
    if (src->next_feedback_timer <= expiry)
      return;

    gst_clock_id_unschedule (src->receiver_id);
    gst_clock_id_unref (src->receiver_id);
    src->receiver_id = NULL;
  }
  src->next_feedback_timer = expiry;

  g_assert (expiry != now);

  src->receiver_id = gst_clock_new_single_shot_id (self->systemclock,
      expiry * GST_USECOND);

  cret = gst_clock_id_wait_async (src->receiver_id, feedback_timer_expired,
      build_timer_data (self, src->ssrc), free_timer_data);
  if (cret != GST_CLOCK_OK)
    GST_ERROR_OBJECT (self,
        "Could not schedule feedback time for %" G_GUINT64_FORMAT
        " (now %" G_GUINT64_FORMAT ") error: %d", expiry, now, cret);
}

static void
fs_rtp_tfrc_receiver_timer_func_locked (FsRtpTfrc *self,
    struct TrackedSource *src, guint64 now)
{
  guint64 expiry;

  if (src->receiver_id)
  {
    gst_clock_id_unschedule (src->receiver_id);
    gst_clock_id_unref (src->receiver_id);
    src->receiver_id = NULL;
  }

  expiry = tfrc_receiver_get_feedback_timer_expiry (src->receiver);

  if (expiry <= now &&
      tfrc_receiver_feedback_timer_expired (src->receiver, now))
  {
    src->send_feedback = TRUE;
    g_signal_emit_by_name (self->rtpsession, "send-rtcp", (guint64) 0);
  }
  else
  {
    fs_rtp_tfrc_set_receiver_timer_locked (self, src, now);
  }
}

static gboolean
feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
  gpointer user_data)
{
  struct TimerData *td = user_data;
  struct TrackedSource *src;
  guint64 now;

  if (time == GST_CLOCK_TIME_NONE)
    return FALSE;

  GST_OBJECT_LOCK (td->self);

  src = g_hash_table_lookup (td->self->tfrc_sources,
      GUINT_TO_POINTER (td->ssrc));

  now = fs_rtp_tfrc_get_now (td->self);

  if (G_LIKELY (src && src->receiver_id == id))
    fs_rtp_tfrc_receiver_timer_func_locked (td->self, src, now);

  GST_OBJECT_UNLOCK (td->self);

  return FALSE;
}


struct SendingRtcpData {
  FsRtpTfrc *self;
  GstRTCPBuffer rtcpbuffer;
  gboolean ret;
  guint32 ssrc;
  gboolean have_ssrc;
};

static void
tfrc_sources_process (gpointer key, gpointer value, gpointer user_data)
{
  struct SendingRtcpData *data = user_data;
  struct TrackedSource *src = value;
  GstRTCPPacket packet;
  guint8 *pdata;
  guint64 now;
  gdouble loss_event_rate;
  guint receive_rate;

  if (!src->receiver)
    return;

  if (src->got_nohdr_pkt)
    return;

  now = fs_rtp_tfrc_get_now (data->self);

  if (!src->send_feedback)
    goto done;

  if (!gst_rtcp_buffer_add_packet (&data->rtcpbuffer, GST_RTCP_TYPE_RTPFB,
          &packet))
    goto done;

  if (!gst_rtcp_packet_fb_set_fci_length (&packet, 4))
  {
    gst_rtcp_packet_remove (&packet);
    goto done;
  }

  if (!tfrc_receiver_send_feedback (src->receiver, now, &loss_event_rate,
          &receive_rate))
  {
    gst_rtcp_packet_remove (&packet);
    goto done;
  }

  if (!data->have_ssrc)
    g_object_get (data->self->rtpsession, "internal-ssrc", &data->ssrc, NULL);
  data->have_ssrc = TRUE;

  /* draft-ietf-avt-tfrc-profile-10 defines the type as 2 */
  gst_rtcp_packet_fb_set_type (&packet, 2);
  gst_rtcp_packet_fb_set_sender_ssrc (&packet, data->ssrc);
  gst_rtcp_packet_fb_set_media_ssrc (&packet, src->ssrc);
  pdata = gst_rtcp_packet_fb_get_fci (&packet);

  GST_WRITE_UINT32_BE (pdata, src->last_ts);
  GST_WRITE_UINT32_BE (pdata + 4, now - src->last_now);
  GST_WRITE_UINT32_BE (pdata + 8, receive_rate);
  GST_WRITE_UINT32_BE (pdata + 12, loss_event_rate * G_MAXUINT);

  GST_LOG_OBJECT (data->self, "Sending RTCP report last_ts: %d delay: %"
      G_GINT64_FORMAT", x_recv: %d, rate: %f",
      src->last_ts, now - src->last_now, receive_rate, loss_event_rate);

  src->send_feedback = FALSE;

  data->ret = TRUE;

done:
  fs_rtp_tfrc_set_receiver_timer_locked (data->self, src, now);
}

static gboolean
rtpsession_sending_rtcp (GObject *rtpsession, GstBuffer *buffer,
    gboolean is_early, FsRtpTfrc *self)
{
  struct SendingRtcpData data = {NULL, GST_RTCP_BUFFER_INIT};

  gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &data.rtcpbuffer);

  data.self = self;
  data.ret = FALSE;
  data.have_ssrc = FALSE;


  GST_OBJECT_LOCK (self);
  g_hash_table_foreach (self->tfrc_sources, tfrc_sources_process, &data);
  GST_OBJECT_UNLOCK (self);

  gst_rtcp_buffer_unmap (&data.rtcpbuffer);

  /* Return TRUE if something was added */
  return data.ret;
}

static GstPadProbeReturn
incoming_rtp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  FsRtpTfrc *self = FS_RTP_TFRC (user_data);
  GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
  guint32 ssrc;
  guint8 *data;
  guint size;
  gboolean got_header = FALSE;
  struct TrackedSource *src = NULL;
  guint32 rtt, seq;
  gint64 ts_delta;
  guint64 ts;
  gboolean send_rtcp = FALSE;
  guint64 now;
  guint8 pt;
  gint seq_delta;
  GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;

  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
    return GST_PAD_PROBE_OK;

  GST_OBJECT_LOCK (self);

  if (!self->fsrtpsession)
    goto out_no_header_unmap;

  ssrc = gst_rtp_buffer_get_ssrc (&rtpbuffer);
  pt = gst_rtp_buffer_get_payload_type (&rtpbuffer);
  seq = gst_rtp_buffer_get_seq (&rtpbuffer);

  if (pt >= 128 || !self->pts[pt])
    goto out_no_header_unmap;

  if (self->extension_type == EXTENSION_NONE)
    goto out_no_header_unmap;
  else if (self->extension_type == EXTENSION_ONE_BYTE)
    got_header = gst_rtp_buffer_get_extension_onebyte_header (&rtpbuffer,
        self->extension_id, 0, (gpointer *) &data, &size);
  else if (self->extension_type == EXTENSION_TWO_BYTES)
    got_header = gst_rtp_buffer_get_extension_twobytes_header (&rtpbuffer,
        NULL, self->extension_id, 0, (gpointer *) &data, &size);

  gst_rtp_buffer_unmap (&rtpbuffer);

  src = fs_rtp_tfrc_get_remote_ssrc_locked (self, ssrc, NULL);

  if (src->rtpsource == NULL)
  {
    GST_WARNING_OBJECT (self, "Got packet from unconfirmed source %X ?", ssrc);
    goto out;
  }

  if (!got_header || size != 7)
    goto out_no_header;

  src->got_nohdr_pkt = FALSE;

  now =  fs_rtp_tfrc_get_now (self);

  rtt = GST_READ_UINT24_BE (data);
  ts = GST_READ_UINT32_BE (data + 3);


  if (!src->receiver)
  {
    src->receiver = tfrc_receiver_new (now);
  }
  else if (rtt == 0 && src->last_rtt != 0)
  {
    /* Detect sender reset */
    src->seq_cycles = 0;
    src->last_seq = 0;
    src->ts_cycles = 0;
    src->last_now = 0;
    src->last_rtt = 0;
    tfrc_receiver_free (src->receiver);
    src->receiver = tfrc_receiver_new (now);
    if (src->receiver_id)
    {
      gst_clock_id_unschedule (src->receiver_id);
      gst_clock_id_unref (src->receiver_id);
      src->receiver_id = NULL;
    }
  }
  seq_delta = seq - src->last_seq;

  if (seq < src->last_seq && seq_delta < -3000)
    src->seq_cycles += 1 << 16;
  src->last_seq = seq;
  seq += src->seq_cycles;

  ts_delta = ts - src->last_ts;
  /* We declare there has been a cycle if the difference is more than
   * 5 minutes
   */
  if (ts < src->last_ts && ts_delta < -(5 * 60 * 1000 * 1000))
    src->ts_cycles += ONE_32BIT_CYCLE;
  src->last_ts = ts;
  ts += src->ts_cycles;

  send_rtcp = tfrc_receiver_got_packet (src->receiver, ts, now, seq, rtt,
      gst_rtp_buffer_get_packet_len (&rtpbuffer));

  GST_LOG_OBJECT (self, "Got RTP packet");

  if (rtt && src->last_rtt == 0)
    fs_rtp_tfrc_receiver_timer_func_locked (self, src, now);

  src->last_now = now;
  src->last_rtt = rtt;

out:

  if (send_rtcp)
  {
    src->send_feedback = TRUE;
    GST_OBJECT_UNLOCK (self);
    g_signal_emit_by_name (src->self->rtpsession, "send-rtcp", (guint64) 0);
  }
  else
  {
    GST_OBJECT_UNLOCK (self);
  }

  return GST_PAD_PROBE_OK;

out_no_header_unmap:

  gst_rtp_buffer_unmap (&rtpbuffer);

out_no_header:
  if (src)
    src->got_nohdr_pkt = TRUE;
  goto out;
}

static gboolean
no_feedback_timer_expired (GstClock *clock, GstClockTime time, GstClockID id,
  gpointer user_data)
{
  struct TimerData *td = user_data;
  struct TrackedSource *src;
  guint64 now;
  gboolean notify = FALSE;

  if (time == GST_CLOCK_TIME_NONE)
    return FALSE;

  GST_OBJECT_LOCK (td->self);

  if (!td->self->sending)
    goto out;

  src = g_hash_table_lookup (td->self->tfrc_sources,
      GUINT_TO_POINTER (td->ssrc));

  if (!src)
    goto out;

  if (src->sender_id != id)
    goto out;

  now = fs_rtp_tfrc_get_now (td->self);

  fs_rtp_tfrc_update_sender_timer_locked (td->self, src, now);

  if (fs_rtp_tfrc_update_bitrate_locked (td->self, "tm"))
    notify = TRUE;

out:

  GST_OBJECT_UNLOCK (td->self);

  if (notify)
    g_object_notify (G_OBJECT (td->self), "bitrate");

  return FALSE;
}

static void
fs_rtp_tfrc_update_sender_timer_locked (FsRtpTfrc *self,
    struct TrackedSource *src, guint64 now)
{
  guint64 expiry;
  GstClockReturn cret;

  if (src->sender_id)
  {
    gst_clock_id_unschedule (src->sender_id);
    gst_clock_id_unref (src->sender_id);
    src->sender_id = NULL;
  }

  if (src->sender == NULL)
    return;

  expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);

  if (expiry <= now)
  {
    tfrc_sender_no_feedback_timer_expired (src->sender, now);
    expiry = tfrc_sender_get_no_feedback_timer_expiry (src->sender);
  }

  src->sender_id = gst_clock_new_single_shot_id (self->systemclock,
      expiry * GST_USECOND);

  cret = gst_clock_id_wait_async (src->sender_id,
      no_feedback_timer_expired, build_timer_data (self, src->ssrc),
      free_timer_data);
  if (cret != GST_CLOCK_OK)
    GST_ERROR_OBJECT (self,
        "Could not schedule feedback time for %" G_GUINT64_FORMAT
        " (now %" G_GUINT64_FORMAT ") error: %d",
        expiry, now, cret);
}

static void
tracked_src_add_sender (struct TrackedSource *src, guint64 now,
  guint initial_rate)
{
  src->sender = tfrc_sender_new (1460, now, initial_rate);
  src->idl = tfrc_is_data_limited_new (now);
  src->send_ts_base = now;
}

static GstPadProbeReturn
incoming_rtcp_probe (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  FsRtpTfrc *self = FS_RTP_TFRC (user_data);
  GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
  GstRTCPBuffer rtcpbuffer = GST_RTCP_BUFFER_INIT;
  GstRTCPPacket packet;
  gboolean notify = FALSE;

  if (!gst_rtcp_buffer_validate (buffer))
    return GST_PAD_PROBE_OK;

  gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcpbuffer);

  if (!gst_rtcp_buffer_get_first_packet (&rtcpbuffer, &packet))
    goto out;

  do {
    if (gst_rtcp_packet_get_type (&packet) == GST_RTCP_TYPE_RTPFB &&
        gst_rtcp_packet_fb_get_type (&packet) == 2 &&
        gst_rtcp_packet_get_length (&packet) == 6)
    {
      /* We have a TFRC packet */
      guint32 media_ssrc;
      guint32 sender_ssrc;
      guint64 ts;
      guint32 delay;
      guint32 x_recv;
      gdouble loss_event_rate;
      guint8 *buf = rtcpbuffer.map.data + packet.offset;
      struct TrackedSource *src;
      guint64 now;
      guint64 rtt;
      guint32 local_ssrc;
      gboolean is_data_limited;

      media_ssrc = gst_rtcp_packet_fb_get_media_ssrc (&packet);

      g_object_get (self->rtpsession, "internal-ssrc", &local_ssrc, NULL);

      if (media_ssrc != local_ssrc)
        continue;

      sender_ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);

      buf += 4 * 3; /* skip the header, ssrc of sender and media sender */

      ts = GST_READ_UINT32_BE (buf);
      buf += 4;
      delay = GST_READ_UINT32_BE (buf);
      buf += 4;
      x_recv = GST_READ_UINT32_BE (buf);
      buf += 4;
      loss_event_rate = (gdouble) GST_READ_UINT32_BE (buf) / (gdouble) G_MAXUINT;
      GST_LOG_OBJECT (self, "Got RTCP TFRC packet last_sent_ts: %"
          G_GUINT64_FORMAT " delay: %u x_recv: %u loss_event_rate: %f", ts,
          delay, x_recv, loss_event_rate);

      GST_OBJECT_LOCK (self);

      if (!self->fsrtpsession || !self->sending)
        goto done;

      src = fs_rtp_tfrc_get_remote_ssrc_locked (self, sender_ssrc,
          NULL);

      now = fs_rtp_tfrc_get_now (self);

      if (G_UNLIKELY (!src->sender))
        tracked_src_add_sender (src, now, self->send_bitrate);

      /* Make sure we only use the RTT from the most recent packets from
       * the remote side, ignore anything that got delayed in between.
       */
      if (ts < src->fb_last_ts)
      {
        if (src->fb_ts_cycles + ONE_32BIT_CYCLE == src->send_ts_cycles)
        {
          src->fb_ts_cycles = src->send_ts_cycles;
        }
        else
        {
          GST_DEBUG_OBJECT (self, "Ignoring packet because the timestamp is "
              "older than one that has already been received,"
              " probably reordered.");
          goto done;
        }
      }

      src->fb_last_ts = ts;
      ts += src->fb_ts_cycles + src->send_ts_base;

      if (ts > now || now - ts < delay)
      {
        GST_ERROR_OBJECT (self, "Ignoring packet because ts > now ||"
            " now - ts < delay (ts: %" G_GUINT64_FORMAT
            " now: %" G_GUINT64_FORMAT " delay:%u",
            ts, now, delay);
        goto done;
      }

      rtt = now - ts - delay;

      if (rtt == 0)
        rtt = 1;

      if (rtt > 10 * 1000 * 1000)
      {
        GST_WARNING_OBJECT (self, "Impossible RTT %" G_GUINT64_FORMAT
            " ms, ignoring", rtt);
        goto done;
      }

      GST_LOG_OBJECT (self, "rtt: %" G_GUINT64_FORMAT
          " = now %" G_GUINT64_FORMAT
          " - ts %"G_GUINT64_FORMAT" - delay %u",
          rtt, now, ts, delay);

      if (G_UNLIKELY (tfrc_sender_get_averaged_rtt (src->sender) == 0))
        tfrc_sender_on_first_rtt (src->sender, now);

      is_data_limited =
          tfrc_is_data_limited_received_feedback (src->idl, now, ts,
              tfrc_sender_get_averaged_rtt (src->sender));

      tfrc_sender_on_feedback_packet (src->sender, now, rtt, x_recv,
          loss_event_rate, is_data_limited);

      fs_rtp_tfrc_update_sender_timer_locked (self, src, now);

      self->last_src = src;

      if (fs_rtp_tfrc_update_bitrate_locked (self, "fb"))
        notify = TRUE;

    done:
      GST_OBJECT_UNLOCK (self);
    }
  } while (gst_rtcp_packet_move_to_next (&packet));

  if (notify)
    g_object_notify (G_OBJECT (self), "bitrate");

out:

  gst_rtcp_buffer_unmap (&rtcpbuffer);

  return GST_PAD_PROBE_OK;
}

static GstClockTime
fs_rtp_tfrc_get_sync_time (FsRtpPacketModder *modder,
    GstBuffer *buffer, gpointer user_data)
{
  FsRtpTfrc *self = FS_RTP_TFRC (user_data);
  GstClockTime sync_time = GST_BUFFER_TIMESTAMP (buffer);
  gint bytes_for_one_rtt = 0;
  guint size = 0;
  guint send_rate;

  GST_OBJECT_LOCK (self);

  if (self->extension_type == EXTENSION_NONE || !self->sending)
  {
    GST_OBJECT_UNLOCK (self);
    return GST_CLOCK_TIME_NONE;
  }

  if (self->last_src && self->last_src->sender)
  {
    send_rate = tfrc_sender_get_send_rate (self->last_src->sender);
    bytes_for_one_rtt = send_rate *
        tfrc_sender_get_averaged_rtt (self->last_src->sender);
  }
  else
  {
    send_rate = tfrc_sender_get_send_rate (NULL);
    bytes_for_one_rtt = 0;
  }

  size = gst_buffer_get_size (buffer) + 10;

  if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
  {
    if (GST_CLOCK_TIME_IS_VALID (self->last_sent_ts) &&
        self->last_sent_ts < GST_BUFFER_TIMESTAMP (buffer))
      self->byte_reservoir +=
        gst_util_uint64_scale (
            (GST_BUFFER_TIMESTAMP (buffer) - self->last_sent_ts),
            send_rate,
            GST_SECOND);
    self->last_sent_ts = GST_BUFFER_TIMESTAMP (buffer);

    if (bytes_for_one_rtt &&
        self->byte_reservoir > bytes_for_one_rtt)
      self->byte_reservoir = bytes_for_one_rtt;
  }

  self->byte_reservoir -= size;

  if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
      self->byte_reservoir < 0)
  {
    GstClockTimeDiff diff = 0;

    diff = gst_util_uint64_scale_int (GST_SECOND,
        -self->byte_reservoir, send_rate);
    g_assert (diff > 0);


    GST_LOG_OBJECT (self, "Delaying packet by %"GST_TIME_FORMAT
        " = 1sec * bytes %d / rate %u",
        GST_TIME_ARGS (diff), self->byte_reservoir,
        send_rate);

    GST_BUFFER_TIMESTAMP (buffer) += diff;
  }

  GST_OBJECT_UNLOCK (self);


  return sync_time;
}


static GstBuffer *
fs_rtp_tfrc_outgoing_packets (FsRtpPacketModder *modder,
    GstBuffer *buffer, GstClockTime buffer_ts, gpointer user_data)
{
  FsRtpTfrc *self = FS_RTP_TFRC (user_data);
  gchar data[7];
  guint64 now;
  GstBuffer *headerbuf;
  GstBuffer *newbuf;
  gboolean is_data_limited;
  gsize header_size;
  gsize new_header_size;
  GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;

  if (!GST_CLOCK_TIME_IS_VALID (buffer_ts))
    return buffer;

  GST_OBJECT_LOCK (self);

  if (!self->fsrtpsession || self->extension_type == EXTENSION_NONE ||
      !self->sending)
  {
    GST_OBJECT_UNLOCK (self);
    return buffer;
  }

  now = fs_rtp_tfrc_get_now (self);

  if (G_UNLIKELY (self->last_src == NULL))
    self->initial_src = self->last_src = tracked_src_new (self);

  if (G_UNLIKELY (self->last_src->sender == NULL))
  {
    tracked_src_add_sender (self->last_src, now, self->send_bitrate);
    fs_rtp_tfrc_update_sender_timer_locked (self, self->last_src, now);
  }

  GST_WRITE_UINT24_BE (data,
      tfrc_sender_get_averaged_rtt (self->last_src->sender));
  GST_WRITE_UINT32_BE (data+3, now - self->last_src->send_ts_base);

  if (now - self->last_src->send_ts_base > self->last_src->send_ts_cycles +
      ONE_32BIT_CYCLE)
    self->last_src->send_ts_cycles += ONE_32BIT_CYCLE;

  is_data_limited = (GST_BUFFER_PTS (buffer) == buffer_ts);

  gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer);
  header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);
  gst_rtp_buffer_unmap (&rtpbuffer);

  headerbuf = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL, 0,
      header_size);
  headerbuf = gst_buffer_make_writable (headerbuf);
  gst_buffer_set_size (headerbuf, header_size + 16);

  gst_rtp_buffer_map (headerbuf, GST_MAP_READWRITE, &rtpbuffer);

  if (self->extension_type == EXTENSION_ONE_BYTE)
  {
    if (!gst_rtp_buffer_add_extension_onebyte_header (&rtpbuffer,
            self->extension_id, data, 7))
      GST_WARNING_OBJECT (self,
          "Could not add extension to RTP header buf %p", headerbuf);
  }
  else if (self->extension_type == EXTENSION_TWO_BYTES)
  {
    if (!gst_rtp_buffer_add_extension_twobytes_header (&rtpbuffer, 0,
            self->extension_id, data, 7))
      GST_WARNING_OBJECT (self,
          "Could not add extension to RTP header in list %p", headerbuf);
  }

  /* FIXME:
   * This will break if any padding is applied
   */
  new_header_size = gst_rtp_buffer_get_header_len (&rtpbuffer);

  gst_rtp_buffer_unmap (&rtpbuffer);
  gst_buffer_set_size (headerbuf, new_header_size);

  /* append_region eats a ref */
  gst_buffer_ref (buffer);
  newbuf = gst_buffer_append_region (headerbuf, buffer, header_size, -1);

  GST_LOG_OBJECT (self, "Sending RTP");

  if (g_hash_table_size (self->tfrc_sources))
  {
    GHashTableIter ht_iter;
    struct TrackedSource *src;

    g_hash_table_iter_init (&ht_iter, self->tfrc_sources);

    while (g_hash_table_iter_next (&ht_iter, NULL,
            (gpointer *) &src))
    {
      if (src->sender)
      {
        if (!is_data_limited)
          tfrc_is_data_limited_not_limited_now (src->idl, now);
        tfrc_sender_sending_packet (src->sender, gst_buffer_get_size (newbuf));
      }
    }
  }
  if (self->initial_src)
  {
    if (!is_data_limited)
      tfrc_is_data_limited_not_limited_now (self->initial_src->idl, now);
    tfrc_sender_sending_packet (self->initial_src->sender,
        gst_buffer_get_size (newbuf));
  }


  GST_OBJECT_UNLOCK (self);

  gst_buffer_unref (buffer);

  return newbuf;
}

static GstPadProbeReturn
send_rtp_pad_blocked (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  FsRtpTfrc *self = user_data;
  gboolean need_modder;
  GstPad *peer = NULL;

  GST_OBJECT_LOCK (self);
  self->modder_check_probe_id = 0;
  need_modder = self->extension_type != EXTENSION_NONE;

  if (!self->fsrtpsession || !!self->packet_modder == need_modder)
    goto out;

  GST_DEBUG ("Pad blocked to possibly %s the tfrc packet modder",
      need_modder ? "add" : "remove");

  if (need_modder)
  {
    GstPadLinkReturn linkret;
    GstPad *modder_pad;

    self->packet_modder = GST_ELEMENT (fs_rtp_packet_modder_new (
          fs_rtp_tfrc_outgoing_packets, fs_rtp_tfrc_get_sync_time, self));
    g_object_ref (self->packet_modder);

    if (!gst_bin_add (self->parent_bin, self->packet_modder))
    {
      fs_session_emit_error (FS_SESSION (self->fsrtpsession),
          FS_ERROR_CONSTRUCTION,
          "Could not add tfrc packet modder to the pipeline");
      goto adding_failed;
    }

    peer = gst_pad_get_peer (pad);
    gst_pad_unlink (pad, peer);

    modder_pad = gst_element_get_static_pad (self->packet_modder, "src");
    linkret = gst_pad_link (modder_pad, peer);
    gst_object_unref (modder_pad);
    if (GST_PAD_LINK_FAILED (linkret))
    {
      fs_session_emit_error (FS_SESSION (self->fsrtpsession),
          FS_ERROR_CONSTRUCTION,
          "Could not link tfrc packet modder to rtp muxer");
      goto linking_failed;
    }

    modder_pad = gst_element_get_static_pad (self->packet_modder, "sink");
    linkret = gst_pad_link (pad, modder_pad);
    gst_object_unref (modder_pad);
    if (GST_PAD_LINK_FAILED (linkret))
    {
      fs_session_emit_error (FS_SESSION (self->fsrtpsession),
          FS_ERROR_CONSTRUCTION,
          "Could not link tfrc packet modder to the rtpbin");
      goto linking_failed;
    }

    if (gst_element_set_state (self->packet_modder, GST_STATE_PLAYING) ==
        GST_STATE_CHANGE_FAILURE)
    {
      fs_session_emit_error (FS_SESSION (self->fsrtpsession),
          FS_ERROR_CONSTRUCTION,
          "Could not set the TFRC packet modder to playing");
      goto linking_failed;
    }
  }
  else
  {
    GstPadLinkReturn linkret;
    GstPad *modder_src_pad;

    modder_src_pad = gst_element_get_static_pad (self->packet_modder, "src");
    peer = gst_pad_get_peer (modder_src_pad);
    gst_object_unref (modder_src_pad);

    gst_bin_remove (self->parent_bin, self->packet_modder);
    gst_element_set_state (self->packet_modder, GST_STATE_NULL);
    gst_object_unref (self->packet_modder);
    self->packet_modder = NULL;

    linkret = gst_pad_link (pad, peer);
    if (GST_PAD_LINK_FAILED (linkret))
      fs_session_emit_error (FS_SESSION (self->fsrtpsession),
          FS_ERROR_CONSTRUCTION,
          "Could not re-link after removing tfrc packet modder");
  }

out:
  gst_object_unref (peer);
  GST_OBJECT_UNLOCK (self);

  return GST_PAD_PROBE_REMOVE;

linking_failed:
  gst_bin_remove (self->parent_bin, self->packet_modder);
  gst_pad_link (pad, peer);
adding_failed:
  gst_object_unref (self->packet_modder);
  self->packet_modder = NULL;
  goto out;
}

static void
fs_rtp_tfrc_check_modder_locked (FsRtpTfrc *self)
{
  gboolean need_modder;

  need_modder = self->extension_type != EXTENSION_NONE;

  if (!!self->packet_modder == need_modder)
    return;

  if (self->modder_check_probe_id != 0)
    return;

  self->modder_check_probe_id =
      gst_pad_add_probe (self->out_rtp_pad,
          GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
          send_rtp_pad_blocked,
          g_object_ref (self), (GDestroyNotify) g_object_unref);
}


FsRtpTfrc *
fs_rtp_tfrc_new (FsRtpSession *fsrtpsession)
{
  FsRtpTfrc *self;
  GstElement *rtpmuxer;

  g_return_val_if_fail (fsrtpsession, NULL);

  self = g_object_new (FS_TYPE_RTP_TFRC, NULL);

  self->fsrtpsession = fsrtpsession;
  self->sending = FALSE;

  self->rtpsession = fs_rtp_session_get_rtpbin_internal_session (fsrtpsession);
  self->parent_bin = GST_BIN (fs_rtp_session_get_conference (fsrtpsession));
  self->in_rtp_pad = fs_rtp_session_get_rtpbin_recv_rtp_sink (fsrtpsession);;
  self->in_rtcp_pad = fs_rtp_session_get_rtpbin_recv_rtcp_sink (fsrtpsession);;

  rtpmuxer = fs_rtp_session_get_rtpmuxer (fsrtpsession);
  self->out_rtp_pad = gst_element_get_static_pad (rtpmuxer, "src");
  gst_object_unref (rtpmuxer);

  self->in_rtp_probe_id = gst_pad_add_probe (self->in_rtp_pad,
      GST_PAD_PROBE_TYPE_BUFFER, incoming_rtp_probe, self, NULL);
  self->in_rtcp_probe_id = gst_pad_add_probe (self->in_rtcp_pad,
      GST_PAD_PROBE_TYPE_BUFFER, incoming_rtcp_probe, self, NULL);


  self->on_ssrc_validated_id = g_signal_connect_object (self->rtpsession,
      "on-ssrc-validated", G_CALLBACK (rtpsession_on_ssrc_validated), self, 0);
  self->on_sending_rtcp_id = g_signal_connect_object (self->rtpsession,
      "on-sending-rtcp", G_CALLBACK (rtpsession_sending_rtcp), self, 0);

  return self;
}

gboolean
validate_ca_for_tfrc (CodecAssociation *ca, gpointer user_data)
{
  return codec_association_is_valid_for_sending (ca, TRUE) &&
      fs_codec_get_feedback_parameter (ca->codec, "tfrc", "",  "");
}

void
fs_rtp_tfrc_filter_codecs (GList **codec_associations,
    GList **header_extensions)
{
  gboolean has_header_ext = FALSE;
  gboolean has_codec_rtcpfb = FALSE;
  GList *item;

  has_codec_rtcpfb = !!lookup_codec_association_custom (*codec_associations,
      validate_ca_for_tfrc, NULL);

  for (item = *header_extensions; item;)
  {
    FsRtpHeaderExtension *hdrext = item->data;
    GList *next = item->next;

    if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts"))
    {
      if (has_header_ext || !has_codec_rtcpfb)
      {
        GST_WARNING ("Removing rtt-sendts hdrext because matching tfrc"
            " feedback parameter not found or because rtp-hdrext"
            " is duplicated");
        fs_rtp_header_extension_destroy (item->data);
        *header_extensions = g_list_remove_link (*header_extensions, item);
      }
      else if (hdrext->direction == FS_DIRECTION_BOTH)
      {
        has_header_ext = TRUE;
      }
    }
    item = next;
  }

  if (!has_codec_rtcpfb || has_header_ext)
    return;

  for (item = *codec_associations; item; item = item->next)
  {
    CodecAssociation *ca = item->data;
    GList *item2;

    for (item2 = ca->codec->feedback_params; item2;)
    {
      GList *next2 = item2->next;
      FsFeedbackParameter *p = item2->data;

      if (!g_ascii_strcasecmp (p->type, "tfrc"))
      {
        GST_WARNING ("Removing tfrc from codec because no hdrext:rtt-sendts: "
            FS_CODEC_FORMAT, FS_CODEC_ARGS (ca->codec));
        fs_codec_remove_feedback_parameter (ca->codec, item2);
      }

      item2 = next2;
    }
  }

}

void
fs_rtp_tfrc_codecs_updated (FsRtpTfrc *self,
    GList *codec_associations,
    GList *header_extensions)
{
  GList *item;
  FsRtpHeaderExtension *hdrext;

  GST_OBJECT_LOCK (self);

  memset (self->pts, 0, 128);
  for (item = codec_associations; item; item = item->next)
  {
    CodecAssociation *ca = item->data;

    /* Also require nack/pli for tfrc to work, we really need to disable
     * automatic keyframes
     */

    if (fs_codec_get_feedback_parameter (ca->codec, "tfrc", NULL, NULL) &&
        fs_rtp_keyunit_manager_has_key_request_feedback (ca->codec))
    self->pts[ca->codec->id] = TRUE;
  }

  for (item = header_extensions; item; item = item->next)
  {
    hdrext = item->data;
    if (!strcmp (hdrext->uri, "urn:ietf:params:rtp-hdrext:rtt-sendts") &&
        hdrext->direction == FS_DIRECTION_BOTH)
      break;
  }

  if (!item)
  {
    self->extension_type = EXTENSION_NONE;
    goto out;
  }

  if (hdrext->id > 15)
    self->extension_type = EXTENSION_TWO_BYTES;
  else
    self->extension_type = EXTENSION_ONE_BYTE;

  self->extension_id = hdrext->id;

out:
  fs_rtp_tfrc_check_modder_locked (self);

  GST_OBJECT_UNLOCK (self);
}


gboolean
fs_rtp_tfrc_is_enabled (FsRtpTfrc *self, guint pt)
{
  gboolean is_enabled;

  g_return_val_if_fail (pt < 128, FALSE);

  GST_OBJECT_LOCK (self);
  is_enabled = (self->extension_type != EXTENSION_NONE) &&
      self->pts[pt];
  GST_OBJECT_UNLOCK (self);

  return is_enabled;
}