Blob Blame History Raw
/* RTP Retransmission receiver element for GStreamer
 *
 * gstrtprtxreceive.c:
 *
 * Copyright (C) 2013 Collabora Ltd.
 *   @author Julien Isorce <julien.isorce@collabora.co.uk>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:element-rtprtxreceive
 * @see_also: rtprtxsend, rtpsession, rtpjitterbuffer
 *
 * rtprtxreceive listens to the retransmission events from the
 * downstream rtpjitterbuffer and remembers the SSRC (ssrc1) of the stream and
 * the sequence number that was requested. When it receives a packet with
 * a sequence number equal to one of the ones stored and with a different SSRC,
 * it identifies the new SSRC (ssrc2) as the retransmission stream of ssrc1.
 * From this point on, it replaces ssrc2 with ssrc1 in all packets of the
 * ssrc2 stream and flags them as retransmissions, so that rtpjitterbuffer
 * can reconstruct the original stream.
 *
 * This algorithm is implemented as specified in RFC 4588.
 *
 * This element is meant to be used with rtprtxsend on the sender side.
 * See #GstRtpRtxSend
 *
 * Below you can see some examples that illustrate how rtprtxreceive and
 * rtprtxsend fit among the other rtp elements and how they work internally.
 * Normally, hoewever, you should avoid using such pipelines and use
 * rtpbin instead, with its #GstRtpBin::request-aux-sender and
 * #GstRtpBin::request-aux-receiver signals. See #GstRtpBin.
 *
 * # Example pipelines
 * |[
 * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
 *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! \
 *         rtprtxsend payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
 *         rtpsession.send_rtp_sink \
 *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
 *         udpsink host="127.0.0.1" port=5000 \
 *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
 *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
 *         sync=false async=false
 * ]| Send audio stream through port 5000 (5001 and 5002 are just the rtcp
 * link with the receiver)
 * |[
 * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
 *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
 *         rtpsession.recv_rtp_sink \
 *     rtpsession.recv_rtp_src ! \
 *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
 *         rtpssrcdemux ! rtpjitterbuffer do-retransmission=true ! \
 *         rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
 *     rtpsession.send_rtcp_src ! \
 *         udpsink host="127.0.0.1" port=5001 sync=false async=false \
 *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink
 * ]| Receive audio stream from port 5000 (5001 and 5002 are just the rtcp
 * link with the sender)
 *
 * In this example we can see a simple streaming of an OPUS stream with some
 * of the packets being artificially dropped by the identity element.
 * Thanks to retransmission, you should still hear a clear sound when setting
 * drop-probability to something greater than 0.
 *
 * Internally, the rtpjitterbuffer will generate a custom upstream event,
 * GstRTPRetransmissionRequest, when it detects that one packet is missing.
 * Then this request is translated to a FB NACK in the rtcp link by rtpsession.
 * Finally the rtpsession of the sender side will re-convert it in a
 * GstRTPRetransmissionRequest that will be handled by rtprtxsend. rtprtxsend
 * will then re-send the missing packet with a new srrc and a different payload
 * type (here, 97), but with the same original sequence number. On the receiver
 * side, rtprtxreceive will associate this new stream with the original and
 * forward the retransmission packets to rtpjitterbuffer with the original
 * ssrc and payload type.
 *
 * |[
 * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
 *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 seqnum-offset=1 ! \
 *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
 *         funnel name=f ! rtpsession.send_rtp_sink \
 *     audiotestsrc freq=660.0 is-live=true ! opusenc ! \
 *         rtpopuspay pt=97 seqnum-offset=100 ! \
 *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
 *         f. \
 *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
 *         udpsink host="127.0.0.1" port=5000 \
 *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
 *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
 *         sync=false async=false
 * ]| Send two audio streams to port 5000.
 * |[
 * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
 *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)97" ! \
 *         rtpsession.recv_rtp_sink \
 *     rtpsession.recv_rtp_src ! \
 *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
 *         rtpssrcdemux name=demux \
 *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
 *         opusdec ! audioconvert ! autoaudiosink \
 *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
 *         opusdec ! audioconvert ! autoaudiosink \
 *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink \
 *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5001 \
 *         sync=false async=false
 * ]| Receive two audio streams from port 5000.
 *
 * In this example we are streaming two streams of the same type through the
 * same port. They, however, are using a different SSRC (ssrc is randomly
 * generated on each payloader - rtpopuspay in this example), so they can be
 * identified and demultiplexed by rtpssrcdemux on the receiver side. This is
 * an example of SSRC-multiplexing.
 *
 * It is important here to use a different starting sequence number
 * (seqnum-offset), since this is the only means of identification that
 * rtprtxreceive uses the very first time to identify retransmission streams.
 * It is an error, according to RFC4588 to have two retransmission requests for
 * packets belonging to two different streams but with the same sequence number.
 * Note that the default seqnum-offset value (-1, which means random) would
 * work just fine, but it is overriden here for illustration purposes.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gst/gst.h>
#include <gst/rtp/gstrtpbuffer.h>
#include <string.h>
#include <stdlib.h>

#include "gstrtprtxreceive.h"

#define ASSOC_TIMEOUT (GST_SECOND)

GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug);
#define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug

enum
{
  PROP_0,
  PROP_PAYLOAD_TYPE_MAP,
  PROP_NUM_RTX_REQUESTS,
  PROP_NUM_RTX_PACKETS,
  PROP_NUM_RTX_ASSOC_PACKETS
};

static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static gboolean gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
static GstFlowReturn gst_rtp_rtx_receive_chain (GstPad * pad,
    GstObject * parent, GstBuffer * buffer);

static GstStateChangeReturn gst_rtp_rtx_receive_change_state (GstElement *
    element, GstStateChange transition);

static void gst_rtp_rtx_receive_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_rtp_rtx_receive_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
static void gst_rtp_rtx_receive_finalize (GObject * object);

G_DEFINE_TYPE (GstRtpRtxReceive, gst_rtp_rtx_receive, GST_TYPE_ELEMENT);

static void
gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;

  gobject_class->get_property = gst_rtp_rtx_receive_get_property;
  gobject_class->set_property = gst_rtp_rtx_receive_set_property;
  gobject_class->finalize = gst_rtp_rtx_receive_finalize;

  g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
      g_param_spec_boxed ("payload-type-map", "Payload Type Map",
          "Map of original payload types to their retransmission payload types",
          GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
      g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
          "Number of retransmission events received", 0, G_MAXUINT,
          0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
      g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
          " Number of retransmission packets received", 0, G_MAXUINT,
          0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  g_object_class_install_property (gobject_class, PROP_NUM_RTX_ASSOC_PACKETS,
      g_param_spec_uint ("num-rtx-assoc-packets",
          "Num RTX Associated Packets", "Number of retransmission packets "
          "correctly associated with retransmission requests", 0, G_MAXUINT,
          0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));

  gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
  gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);

  gst_element_class_set_static_metadata (gstelement_class,
      "RTP Retransmission receiver", "Codec",
      "Receive retransmitted RTP packets according to RFC4588",
      "Julien Isorce <julien.isorce@collabora.co.uk>");

  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_change_state);
}

static void
gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx)
{
  GST_OBJECT_LOCK (rtx);
  g_hash_table_remove_all (rtx->ssrc2_ssrc1_map);
  g_hash_table_remove_all (rtx->seqnum_ssrc1_map);
  rtx->num_rtx_requests = 0;
  rtx->num_rtx_packets = 0;
  rtx->num_rtx_assoc_packets = 0;
  GST_OBJECT_UNLOCK (rtx);
}

static void
gst_rtp_rtx_receive_finalize (GObject * object)
{
  GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);

  g_hash_table_unref (rtx->ssrc2_ssrc1_map);
  g_hash_table_unref (rtx->seqnum_ssrc1_map);
  g_hash_table_unref (rtx->rtx_pt_map);
  if (rtx->rtx_pt_map_structure)
    gst_structure_free (rtx->rtx_pt_map_structure);

  G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object);
}

typedef struct
{
  guint32 ssrc;
  GstClockTime time;
} SsrcAssoc;

static SsrcAssoc *
ssrc_assoc_new (guint32 ssrc, GstClockTime time)
{
  SsrcAssoc *assoc = g_slice_new (SsrcAssoc);

  assoc->ssrc = ssrc;
  assoc->time = time;

  return assoc;
}

static void
ssrc_assoc_free (SsrcAssoc * assoc)
{
  g_slice_free (SsrcAssoc, assoc);
}

static void
gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
{
  GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);

  rtx->srcpad =
      gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
          "src"), "src");
  GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
  GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
  gst_pad_set_event_function (rtx->srcpad,
      GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_src_event));
  gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);

  rtx->sinkpad =
      gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
          "sink"), "sink");
  GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
  GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
  gst_pad_set_chain_function (rtx->sinkpad,
      GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_chain));
  gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);

  rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
  rtx->seqnum_ssrc1_map = g_hash_table_new_full (g_direct_hash, g_direct_equal,
      NULL, (GDestroyNotify) ssrc_assoc_free);

  rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
}

static gboolean
gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event)
{
  GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
  gboolean res;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_CUSTOM_UPSTREAM:
    {
      const GstStructure *s = gst_event_get_structure (event);

      /* This event usually comes from the downstream gstrtpjitterbuffer */
      if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
        guint seqnum = 0;
        guint ssrc = 0;
        gpointer ssrc2 = 0;

        /* retrieve seqnum of the packet that need to be retransmitted */
        if (!gst_structure_get_uint (s, "seqnum", &seqnum))
          seqnum = -1;

        /* retrieve ssrc of the packet that need to be retransmitted
         * it's useful when reconstructing the original packet from the rtx packet */
        if (!gst_structure_get_uint (s, "ssrc", &ssrc))
          ssrc = -1;

        GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
            seqnum, ssrc);

        GST_OBJECT_LOCK (rtx);

        /* increase number of seen requests for our statistics */
        ++rtx->num_rtx_requests;

        /* First, we lookup in our map to see if we have already associate this
         * master stream ssrc with its retransmitted stream.
         * Every ssrc are unique so we can use the same hash table
         * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
         */
        if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
                GUINT_TO_POINTER (ssrc), NULL, &ssrc2)
            && GPOINTER_TO_UINT (ssrc2) != GPOINTER_TO_UINT (ssrc)) {
          GST_TRACE_OBJECT (rtx, "Retransmited stream %X already associated "
              "to its master, %X", GPOINTER_TO_UINT (ssrc2), ssrc);
        } else {
          SsrcAssoc *assoc;

          /* not already associated but also we have to check that we have not
           * already considered this request.
           */
          if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
                  GUINT_TO_POINTER (seqnum), NULL, (gpointer *) & assoc)) {
            if (assoc->ssrc == ssrc) {
              /* same seqnum, same ssrc */

              /* do nothing because we have already considered this request
               * The jitter may be too impatient of the rtx packet has been
               * lost too.
               * It does not mean we reject the event, we still want to forward
               * the request to the gstrtpsession to be translater into a FB NACK
               */
              GST_LOG_OBJECT (rtx, "Duplicate request: seqnum: %u, ssrc: %X",
                  seqnum, ssrc);
            } else {
              /* same seqnum, different ssrc */

              /* If the association attempt is larger than ASSOC_TIMEOUT,
               * then we give up on it, and try this one.
               */
              if (!GST_CLOCK_TIME_IS_VALID (rtx->last_time) ||
                  !GST_CLOCK_TIME_IS_VALID (assoc->time) ||
                  assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
                /* From RFC 4588:
                 * the receiver MUST NOT have two outstanding requests for the
                 * same packet sequence number in two different original streams
                 * before the association is resolved. Otherwise it's impossible
                 * to associate a rtx stream and its master stream
                 */

                /* remove seqnum in order to reuse the spot */
                g_hash_table_remove (rtx->seqnum_ssrc1_map,
                    GUINT_TO_POINTER (seqnum));
                goto retransmit;
              } else {
                GST_INFO_OBJECT (rtx, "rejecting request for seqnum %u"
                    " of master stream %X; there is already a pending request "
                    "for the same seqnum on ssrc %X that has not expired",
                    seqnum, ssrc, assoc->ssrc);

                /* do not forward the event as we are rejecting this request */
                GST_OBJECT_UNLOCK (rtx);
                gst_event_unref (event);
                return TRUE;
              }
            }
          } else {
          retransmit:
            /* the request has not been already considered
             * insert it for the first time */
            g_hash_table_insert (rtx->seqnum_ssrc1_map,
                GUINT_TO_POINTER (seqnum),
                ssrc_assoc_new (ssrc, rtx->last_time));
          }
        }

        GST_DEBUG_OBJECT (rtx, "packet number %u of master stream %X"
            " needs to be retransmitted", seqnum, ssrc);

        GST_OBJECT_UNLOCK (rtx);
      }

      /* Transfer event upstream so that the request can acutally by translated
       * through gstrtpsession through the network */
      res = gst_pad_event_default (pad, parent, event);
      break;
    }
    default:
      res = gst_pad_event_default (pad, parent, event);
      break;
  }
  return res;
}

/* Copy fixed header and extension. Replace current ssrc by ssrc1,
 * remove OSN and replace current seq num by OSN.
 * Copy memory to avoid to manually copy each rtp buffer field.
 */
static GstBuffer *
_gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1,
    guint16 orign_seqnum, guint8 origin_payload_type)
{
  GstMemory *mem = NULL;
  GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
  GstBuffer *new_buffer = gst_buffer_new ();
  GstMapInfo map;
  guint payload_len = 0;

  /* copy fixed header */
  mem = gst_memory_copy (rtp->map[0].memory,
      (guint8 *) rtp->data[0] - rtp->map[0].data, rtp->size[0]);
  gst_buffer_append_memory (new_buffer, mem);

  /* copy extension if any */
  if (rtp->size[1]) {
    mem = gst_memory_copy (rtp->map[1].memory,
        (guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]);
    gst_buffer_append_memory (new_buffer, mem);
  }

  /* copy payload and remove OSN */
  payload_len = rtp->size[2] - 2;
  mem = gst_allocator_alloc (NULL, payload_len, NULL);

  gst_memory_map (mem, &map, GST_MAP_WRITE);
  if (rtp->size[2])
    memcpy (map.data, (guint8 *) rtp->data[2] + 2, payload_len);
  gst_memory_unmap (mem, &map);
  gst_buffer_append_memory (new_buffer, mem);

  /* the sender always constructs rtx packets without padding,
   * But the receiver can still receive rtx packets with padding.
   * So just copy it.
   */
  if (rtp->size[3]) {
    guint pad_len = rtp->size[3];

    mem = gst_allocator_alloc (NULL, pad_len, NULL);

    gst_memory_map (mem, &map, GST_MAP_WRITE);
    map.data[pad_len - 1] = pad_len;
    gst_memory_unmap (mem, &map);

    gst_buffer_append_memory (new_buffer, mem);
  }

  /* set ssrc and seq num */
  gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
  gst_rtp_buffer_set_ssrc (&new_rtp, ssrc1);
  gst_rtp_buffer_set_seq (&new_rtp, orign_seqnum);
  gst_rtp_buffer_set_payload_type (&new_rtp, origin_payload_type);
  gst_rtp_buffer_unmap (&new_rtp);

  gst_buffer_copy_into (new_buffer, rtp->buffer,
      GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
  GST_BUFFER_FLAG_SET (new_buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION);

  return new_buffer;
}

static GstFlowReturn
gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
  GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (parent);
  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
  GstFlowReturn ret = GST_FLOW_OK;
  GstBuffer *new_buffer = NULL;
  guint32 ssrc = 0;
  gpointer ssrc1 = 0;
  guint32 ssrc2 = 0;
  guint16 seqnum = 0;
  guint16 orign_seqnum = 0;
  guint8 payload_type = 0;
  gpointer payload = NULL;
  guint8 origin_payload_type = 0;
  gboolean is_rtx;
  gboolean drop = FALSE;

  /* map current rtp packet to parse its header */
  if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
    goto invalid_buffer;

  ssrc = gst_rtp_buffer_get_ssrc (&rtp);
  seqnum = gst_rtp_buffer_get_seq (&rtp);
  payload_type = gst_rtp_buffer_get_payload_type (&rtp);

  /* check if we have a retransmission packet (this information comes from SDP) */
  GST_OBJECT_LOCK (rtx);

  is_rtx =
      g_hash_table_lookup_extended (rtx->rtx_pt_map,
      GUINT_TO_POINTER (payload_type), NULL, NULL);

  if (is_rtx) {
    payload = gst_rtp_buffer_get_payload (&rtp);

    if (!payload || gst_rtp_buffer_get_payload_len (&rtp) < 2) {
      GST_OBJECT_UNLOCK (rtx);
      gst_rtp_buffer_unmap (&rtp);
      goto invalid_buffer;
    }
  }

  rtx->last_time = GST_BUFFER_PTS (buffer);

  if (g_hash_table_size (rtx->seqnum_ssrc1_map) > 0) {
    GHashTableIter iter;
    gpointer key, value;

    g_hash_table_iter_init (&iter, rtx->seqnum_ssrc1_map);
    while (g_hash_table_iter_next (&iter, &key, &value)) {
      SsrcAssoc *assoc = value;

      /* remove association request if it is too old */
      if (GST_CLOCK_TIME_IS_VALID (rtx->last_time) &&
          GST_CLOCK_TIME_IS_VALID (assoc->time) &&
          assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
        g_hash_table_iter_remove (&iter);
      }
    }
  }

  /* if the current packet is from a retransmission stream */
  if (is_rtx) {
    /* increase our statistic */
    ++rtx->num_rtx_packets;

    /* read OSN in the rtx payload */
    orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp));
    origin_payload_type =
        GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
            GUINT_TO_POINTER (payload_type)));

    GST_DEBUG_OBJECT (rtx, "Got rtx packet: rtx seqnum %u, rtx ssrc %X, "
        "rtx pt %u, orig seqnum %u, orig pt %u", seqnum, ssrc, payload_type,
        orign_seqnum, origin_payload_type);

    /* first we check if we already have associated this retransmission stream
     * to a master stream */
    if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
            GUINT_TO_POINTER (ssrc), NULL, &ssrc1)) {
      GST_TRACE_OBJECT (rtx,
          "packet is from retransmission stream %X already associated to "
          "master stream %X", ssrc, GPOINTER_TO_UINT (ssrc1));
      ssrc2 = ssrc;
    } else {
      SsrcAssoc *assoc;

      /* the current retransmitted packet has its rtx stream not already
       * associated to a master stream, so retrieve it from our request
       * history */
      if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
              GUINT_TO_POINTER (orign_seqnum), NULL, (gpointer *) & assoc)) {
        GST_LOG_OBJECT (rtx,
            "associating retransmitted stream %X to master stream %X thanks "
            "to rtx packet %u (orig seqnum %u)", ssrc, assoc->ssrc, seqnum,
            orign_seqnum);
        ssrc1 = GUINT_TO_POINTER (assoc->ssrc);
        ssrc2 = ssrc;

        /* just put a guard */
        if (GPOINTER_TO_UINT (ssrc1) == ssrc2)
          GST_WARNING_OBJECT (rtx, "RTX receiver ssrc2_ssrc1_map bad state, "
              "master and rtx SSRCs are the same (%X)\n", ssrc);

        /* free the spot so that this seqnum can be used to do another
         * association */
        g_hash_table_remove (rtx->seqnum_ssrc1_map,
            GUINT_TO_POINTER (orign_seqnum));

        /* actually do the association between rtx stream and master stream */
        g_hash_table_insert (rtx->ssrc2_ssrc1_map, GUINT_TO_POINTER (ssrc2),
            ssrc1);

        /* also do the association between master stream and rtx stream
         * every ssrc are unique so we can use the same hash table
         * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
         */
        g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1,
            GUINT_TO_POINTER (ssrc2));

      } else {
        /* we are not able to associate this rtx packet with a master stream */
        GST_INFO_OBJECT (rtx,
            "dropping rtx packet %u because its orig seqnum (%u) is not in our"
            " pending retransmission requests", seqnum, orign_seqnum);
        drop = TRUE;
      }
    }
  }

  /* if not dropped the packet was successfully associated */
  if (is_rtx && !drop)
    ++rtx->num_rtx_assoc_packets;

  GST_OBJECT_UNLOCK (rtx);

  /* just drop the packet if the association could not have been made */
  if (drop) {
    gst_rtp_buffer_unmap (&rtp);
    gst_buffer_unref (buffer);
    return GST_FLOW_OK;
  }

  /* create the retransmission packet */
  if (is_rtx)
    new_buffer =
        _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1),
        orign_seqnum, origin_payload_type);

  gst_rtp_buffer_unmap (&rtp);

  /* push the packet */
  if (is_rtx) {
    gst_buffer_unref (buffer);
    GST_LOG_OBJECT (rtx, "pushing packet seqnum:%u from restransmission "
        "stream ssrc: %X (master ssrc %X)", orign_seqnum, ssrc2,
        GPOINTER_TO_UINT (ssrc1));
    ret = gst_pad_push (rtx->srcpad, new_buffer);
  } else {
    GST_TRACE_OBJECT (rtx, "pushing packet seqnum:%u from master stream "
        "ssrc: %X", seqnum, ssrc);
    ret = gst_pad_push (rtx->srcpad, buffer);
  }

  return ret;

invalid_buffer:
  {
    GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL),
        ("Received invalid RTP payload, dropping"));
    gst_buffer_unref (buffer);
    return GST_FLOW_OK;
  }
}

static void
gst_rtp_rtx_receive_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec)
{
  GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);

  switch (prop_id) {
    case PROP_PAYLOAD_TYPE_MAP:
      GST_OBJECT_LOCK (rtx);
      g_value_set_boxed (value, rtx->rtx_pt_map_structure);
      GST_OBJECT_UNLOCK (rtx);
      break;
    case PROP_NUM_RTX_REQUESTS:
      GST_OBJECT_LOCK (rtx);
      g_value_set_uint (value, rtx->num_rtx_requests);
      GST_OBJECT_UNLOCK (rtx);
      break;
    case PROP_NUM_RTX_PACKETS:
      GST_OBJECT_LOCK (rtx);
      g_value_set_uint (value, rtx->num_rtx_packets);
      GST_OBJECT_UNLOCK (rtx);
      break;
    case PROP_NUM_RTX_ASSOC_PACKETS:
      GST_OBJECT_LOCK (rtx);
      g_value_set_uint (value, rtx->num_rtx_assoc_packets);
      GST_OBJECT_UNLOCK (rtx);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static gboolean
structure_to_hash_table_inv (GQuark field_id, const GValue * value,
    gpointer hash)
{
  const gchar *field_str;
  guint field_uint;
  guint value_uint;

  field_str = g_quark_to_string (field_id);
  field_uint = atoi (field_str);
  value_uint = g_value_get_uint (value);
  g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (value_uint),
      GUINT_TO_POINTER (field_uint));

  return TRUE;
}

static void
gst_rtp_rtx_receive_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec)
{
  GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE (object);

  switch (prop_id) {
    case PROP_PAYLOAD_TYPE_MAP:
      GST_OBJECT_LOCK (rtx);
      if (rtx->rtx_pt_map_structure)
        gst_structure_free (rtx->rtx_pt_map_structure);
      rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
      g_hash_table_remove_all (rtx->rtx_pt_map);
      gst_structure_foreach (rtx->rtx_pt_map_structure,
          structure_to_hash_table_inv, rtx->rtx_pt_map);
      GST_OBJECT_UNLOCK (rtx);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static GstStateChangeReturn
gst_rtp_rtx_receive_change_state (GstElement * element,
    GstStateChange transition)
{
  GstStateChangeReturn ret;
  GstRtpRtxReceive *rtx;

  rtx = GST_RTP_RTX_RECEIVE (element);

  switch (transition) {
    default:
      break;
  }

  ret =
      GST_ELEMENT_CLASS (gst_rtp_rtx_receive_parent_class)->change_state
      (element, transition);

  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      gst_rtp_rtx_receive_reset (rtx);
      break;
    default:
      break;
  }

  return ret;
}

gboolean
gst_rtp_rtx_receive_plugin_init (GstPlugin * plugin)
{
  GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_receive_debug, "rtprtxreceive", 0,
      "rtp retransmission receiver");

  return gst_element_register (plugin, "rtprtxreceive", GST_RANK_NONE,
      GST_TYPE_RTP_RTX_RECEIVE);
}