Blame gst/rtp/rtpstoragestream.c

Packit 1f69a5
/* GStreamer plugin for forward error correction
Packit 1f69a5
 * Copyright (C) 2017 Pexip
Packit 1f69a5
 *
Packit 1f69a5
 * This library is free software; you can redistribute it and/or
Packit 1f69a5
 * modify it under the terms of the GNU Lesser General Public
Packit 1f69a5
 * License as published by the Free Software Foundation; either
Packit 1f69a5
 * version 2.1 of the License, or (at your option) any later version.
Packit 1f69a5
 *
Packit 1f69a5
 * This library is distributed in the hope that it will be useful,
Packit 1f69a5
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 1f69a5
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Packit 1f69a5
 * Lesser General Public License for more details.
Packit 1f69a5
 *
Packit 1f69a5
 * You should have received a copy of the GNU Lesser General Public
Packit 1f69a5
 * License along with this library; if not, write to the Free Software
Packit 1f69a5
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
Packit 1f69a5
 *
Packit 1f69a5
 * Author: Mikhail Fludkov <misha@pexip.com>
Packit 1f69a5
 */
Packit 1f69a5
Packit 1f69a5
#include "rtpstoragestream.h"
Packit 1f69a5
Packit 1f69a5
static RtpStorageItem *
Packit 1f69a5
rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq)
Packit 1f69a5
{
Packit 1f69a5
  RtpStorageItem *ret = g_slice_new0 (RtpStorageItem);
Packit 1f69a5
  ret->buffer = buffer;
Packit 1f69a5
  ret->pt = pt;
Packit 1f69a5
  ret->seq = seq;
Packit 1f69a5
  return ret;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static void
Packit 1f69a5
rtp_storage_item_free (RtpStorageItem * item)
Packit 1f69a5
{
Packit 1f69a5
  g_assert (item->buffer != NULL);
Packit 1f69a5
  gst_buffer_unref (item->buffer);
Packit 1f69a5
  g_slice_free (RtpStorageItem, item);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static gint
Packit 1f69a5
rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata)
Packit 1f69a5
{
Packit 1f69a5
  gint seq_diff = gst_rtp_buffer_compare_seqnum (
Packit 1f69a5
      ((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq);
Packit 1f69a5
Packit 1f69a5
  if (seq_diff >= 0)
Packit 1f69a5
    return 0;
Packit 1f69a5
Packit 1f69a5
  return 1;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
static void
Packit 1f69a5
rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time)
Packit 1f69a5
{
Packit 1f69a5
  GList *it;
Packit 1f69a5
  guint i, too_old_buffers_num = 0;
Packit 1f69a5
Packit 1f69a5
  g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time));
Packit 1f69a5
  g_assert (GST_CLOCK_TIME_IS_VALID (size_time));
Packit 1f69a5
  g_assert_cmpint (size_time, >, 0);
Packit 1f69a5
Packit 1f69a5
  /* Iterating from oldest sequence numbers to newest */
Packit 1f69a5
  for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) {
Packit 1f69a5
    RtpStorageItem *item = it->data;
Packit 1f69a5
    GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer);
Packit 1f69a5
    if (GST_CLOCK_TIME_IS_VALID (arrival_time)) {
Packit 1f69a5
      if (stream->max_arrival_time - arrival_time > size_time) {
Packit 1f69a5
        too_old_buffers_num = i + 1;
Packit 1f69a5
      } else
Packit 1f69a5
        break;
Packit 1f69a5
    }
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  for (i = 0; i < too_old_buffers_num; ++i) {
Packit 1f69a5
    RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
Packit 1f69a5
    rtp_storage_item_free (item);
Packit 1f69a5
  }
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
void
Packit 1f69a5
rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream,
Packit 1f69a5
    GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq)
Packit 1f69a5
{
Packit 1f69a5
  GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer);
Packit 1f69a5
Packit 1f69a5
  if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) {
Packit 1f69a5
    if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)))
Packit 1f69a5
      stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time);
Packit 1f69a5
    else
Packit 1f69a5
      stream->max_arrival_time = arrival_time;
Packit 1f69a5
Packit 1f69a5
    rtp_storage_stream_resize (stream, size_time);
Packit 1f69a5
    rtp_storage_stream_add_item (stream, buffer, pt, seq);
Packit 1f69a5
  } else {
Packit 1f69a5
    rtp_storage_stream_add_item (stream, buffer, pt, seq);
Packit 1f69a5
  }
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
RtpStorageStream *
Packit 1f69a5
rtp_storage_stream_new (guint32 ssrc)
Packit 1f69a5
{
Packit 1f69a5
  RtpStorageStream *ret = g_slice_new0 (RtpStorageStream);
Packit 1f69a5
  ret->max_arrival_time = GST_CLOCK_TIME_NONE;
Packit 1f69a5
  ret->ssrc = ssrc;
Packit 1f69a5
  g_mutex_init (&ret->stream_lock);
Packit 1f69a5
  return ret;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
void
Packit 1f69a5
rtp_storage_stream_free (RtpStorageStream * stream)
Packit 1f69a5
{
Packit 1f69a5
  STREAM_LOCK (stream);
Packit 1f69a5
  while (stream->queue.length)
Packit 1f69a5
    rtp_storage_item_free (g_queue_pop_tail (&stream->queue));
Packit 1f69a5
  STREAM_UNLOCK (stream);
Packit 1f69a5
  g_mutex_clear (&stream->stream_lock);
Packit 1f69a5
  g_slice_free (RtpStorageStream, stream);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
void
Packit 1f69a5
rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer,
Packit 1f69a5
    guint8 pt, guint16 seq)
Packit 1f69a5
{
Packit 1f69a5
  RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq);
Packit 1f69a5
  GList *sibling = g_queue_find_custom (&stream->queue, item,
Packit 1f69a5
      (GCompareFunc) rtp_storage_item_compare);
Packit 1f69a5
Packit 1f69a5
  g_queue_insert_before (&stream->queue, sibling, item);
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GstBufferList *
Packit 1f69a5
rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream,
Packit 1f69a5
    guint8 pt_fec, guint16 lost_seq)
Packit 1f69a5
{
Packit 1f69a5
  guint ret_length = 0;
Packit 1f69a5
  GList *end = NULL;
Packit 1f69a5
  GList *start = NULL;
Packit 1f69a5
  gboolean saw_fec = TRUE;      /* To initialize the start pointer in the loop below */
Packit 1f69a5
  GList *it;
Packit 1f69a5
Packit 1f69a5
  /* Looking for media stream chunk with FEC packets at the end, which could
Packit 1f69a5
   * can have the lost packet. For example:
Packit 1f69a5
   *
Packit 1f69a5
   *   |#10 FEC|  |#9 FEC|  |#8| ... |#6|  |#5 FEC|  |#4 FEC|  |#3 FEC|  |#2|  |#1|  |#0|
Packit 1f69a5
   *
Packit 1f69a5
   * Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other
Packit 1f69a5
   * packets are not relevant for recovery of packet 7.
Packit 1f69a5
   *
Packit 1f69a5
   * Or the lost packet can be in the storage. In that case single packet is returned.
Packit 1f69a5
   * It can happen if:
Packit 1f69a5
   * - it could have arrived right after it was considered lost (more of a corner case)
Packit 1f69a5
   * - it was recovered together with the other lost packet (most likely)
Packit 1f69a5
   */
Packit 1f69a5
  for (it = stream->queue.tail; it; it = it->prev) {
Packit 1f69a5
    RtpStorageItem *item = it->data;
Packit 1f69a5
    gboolean found_end = FALSE;
Packit 1f69a5
Packit 1f69a5
    /* Is the buffer we lost in the storage? */
Packit 1f69a5
    if (item->seq == lost_seq) {
Packit 1f69a5
      start = it;
Packit 1f69a5
      end = it;
Packit 1f69a5
      ret_length = 1;
Packit 1f69a5
      break;
Packit 1f69a5
    }
Packit 1f69a5
Packit 1f69a5
    if (pt_fec == item->pt) {
Packit 1f69a5
      gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq);
Packit 1f69a5
Packit 1f69a5
      if (seq_diff >= 0) {
Packit 1f69a5
        if (it->prev) {
Packit 1f69a5
          gboolean media_next =
Packit 1f69a5
              pt_fec != ((RtpStorageItem *) it->prev->data)->pt;
Packit 1f69a5
          found_end = media_next;
Packit 1f69a5
        } else
Packit 1f69a5
          found_end = TRUE;
Packit 1f69a5
      }
Packit 1f69a5
      saw_fec = TRUE;
Packit 1f69a5
    } else if (saw_fec) {
Packit 1f69a5
      saw_fec = FALSE;
Packit 1f69a5
      start = it;
Packit 1f69a5
      ret_length = 0;
Packit 1f69a5
    }
Packit 1f69a5
Packit 1f69a5
    ++ret_length;
Packit 1f69a5
    if (found_end) {
Packit 1f69a5
      end = it;
Packit 1f69a5
      break;
Packit 1f69a5
    }
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  if (end && !start)
Packit 1f69a5
    start = end;
Packit 1f69a5
Packit 1f69a5
  if (start && end) {
Packit 1f69a5
    GstBufferList *ret = gst_buffer_list_new_sized (ret_length);
Packit 1f69a5
    GList *it;
Packit 1f69a5
Packit 1f69a5
    for (it = start; it != end->prev; it = it->prev)
Packit 1f69a5
      gst_buffer_list_add (ret,
Packit 1f69a5
          gst_buffer_ref (((RtpStorageItem *) it->data)->buffer));
Packit 1f69a5
    return ret;
Packit 1f69a5
  }
Packit 1f69a5
Packit 1f69a5
  return NULL;
Packit 1f69a5
}
Packit 1f69a5
Packit 1f69a5
GstBuffer *
Packit 1f69a5
rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream,
Packit 1f69a5
    guint16 lost_seq)
Packit 1f69a5
{
Packit 1f69a5
  GList *it;
Packit 1f69a5
  for (it = stream->queue.head; it; it = it->next) {
Packit 1f69a5
    RtpStorageItem *item = it->data;
Packit 1f69a5
    if (item->seq == lost_seq)
Packit 1f69a5
      return gst_buffer_ref (item->buffer);
Packit 1f69a5
  }
Packit 1f69a5
  return NULL;
Packit 1f69a5
}