|
Packit |
1f69a5 |
/* GStreamer plugin for forward error correction
|
|
Packit |
1f69a5 |
* Copyright (C) 2017 Pexip
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* This library is free software; you can redistribute it and/or
|
|
Packit |
1f69a5 |
* modify it under the terms of the GNU Lesser General Public
|
|
Packit |
1f69a5 |
* License as published by the Free Software Foundation; either
|
|
Packit |
1f69a5 |
* version 2.1 of the License, or (at your option) any later version.
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* This library is distributed in the hope that it will be useful,
|
|
Packit |
1f69a5 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
Packit |
1f69a5 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Packit |
1f69a5 |
* Lesser General Public License for more details.
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* You should have received a copy of the GNU Lesser General Public
|
|
Packit |
1f69a5 |
* License along with this library; if not, write to the Free Software
|
|
Packit |
1f69a5 |
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* Author: Mikhail Fludkov <misha@pexip.com>
|
|
Packit |
1f69a5 |
*/
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
#include "rtpstoragestream.h"
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
static RtpStorageItem *
|
|
Packit |
1f69a5 |
rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
RtpStorageItem *ret = g_slice_new0 (RtpStorageItem);
|
|
Packit |
1f69a5 |
ret->buffer = buffer;
|
|
Packit |
1f69a5 |
ret->pt = pt;
|
|
Packit |
1f69a5 |
ret->seq = seq;
|
|
Packit |
1f69a5 |
return ret;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
static void
|
|
Packit |
1f69a5 |
rtp_storage_item_free (RtpStorageItem * item)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
g_assert (item->buffer != NULL);
|
|
Packit |
1f69a5 |
gst_buffer_unref (item->buffer);
|
|
Packit |
1f69a5 |
g_slice_free (RtpStorageItem, item);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
static gint
|
|
Packit |
1f69a5 |
rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
gint seq_diff = gst_rtp_buffer_compare_seqnum (
|
|
Packit |
1f69a5 |
((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq);
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (seq_diff >= 0)
|
|
Packit |
1f69a5 |
return 0;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
return 1;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
static void
|
|
Packit |
1f69a5 |
rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
GList *it;
|
|
Packit |
1f69a5 |
guint i, too_old_buffers_num = 0;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time));
|
|
Packit |
1f69a5 |
g_assert (GST_CLOCK_TIME_IS_VALID (size_time));
|
|
Packit |
1f69a5 |
g_assert_cmpint (size_time, >, 0);
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
/* Iterating from oldest sequence numbers to newest */
|
|
Packit |
1f69a5 |
for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) {
|
|
Packit |
1f69a5 |
RtpStorageItem *item = it->data;
|
|
Packit |
1f69a5 |
GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer);
|
|
Packit |
1f69a5 |
if (GST_CLOCK_TIME_IS_VALID (arrival_time)) {
|
|
Packit |
1f69a5 |
if (stream->max_arrival_time - arrival_time > size_time) {
|
|
Packit |
1f69a5 |
too_old_buffers_num = i + 1;
|
|
Packit |
1f69a5 |
} else
|
|
Packit |
1f69a5 |
break;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
for (i = 0; i < too_old_buffers_num; ++i) {
|
|
Packit |
1f69a5 |
RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
|
|
Packit |
1f69a5 |
rtp_storage_item_free (item);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
void
|
|
Packit |
1f69a5 |
rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream,
|
|
Packit |
1f69a5 |
GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer);
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) {
|
|
Packit |
1f69a5 |
if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)))
|
|
Packit |
1f69a5 |
stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time);
|
|
Packit |
1f69a5 |
else
|
|
Packit |
1f69a5 |
stream->max_arrival_time = arrival_time;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
rtp_storage_stream_resize (stream, size_time);
|
|
Packit |
1f69a5 |
rtp_storage_stream_add_item (stream, buffer, pt, seq);
|
|
Packit |
1f69a5 |
} else {
|
|
Packit |
1f69a5 |
rtp_storage_stream_add_item (stream, buffer, pt, seq);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
RtpStorageStream *
|
|
Packit |
1f69a5 |
rtp_storage_stream_new (guint32 ssrc)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
RtpStorageStream *ret = g_slice_new0 (RtpStorageStream);
|
|
Packit |
1f69a5 |
ret->max_arrival_time = GST_CLOCK_TIME_NONE;
|
|
Packit |
1f69a5 |
ret->ssrc = ssrc;
|
|
Packit |
1f69a5 |
g_mutex_init (&ret->stream_lock);
|
|
Packit |
1f69a5 |
return ret;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
void
|
|
Packit |
1f69a5 |
rtp_storage_stream_free (RtpStorageStream * stream)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
STREAM_LOCK (stream);
|
|
Packit |
1f69a5 |
while (stream->queue.length)
|
|
Packit |
1f69a5 |
rtp_storage_item_free (g_queue_pop_tail (&stream->queue));
|
|
Packit |
1f69a5 |
STREAM_UNLOCK (stream);
|
|
Packit |
1f69a5 |
g_mutex_clear (&stream->stream_lock);
|
|
Packit |
1f69a5 |
g_slice_free (RtpStorageStream, stream);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
void
|
|
Packit |
1f69a5 |
rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer,
|
|
Packit |
1f69a5 |
guint8 pt, guint16 seq)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq);
|
|
Packit |
1f69a5 |
GList *sibling = g_queue_find_custom (&stream->queue, item,
|
|
Packit |
1f69a5 |
(GCompareFunc) rtp_storage_item_compare);
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
g_queue_insert_before (&stream->queue, sibling, item);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
GstBufferList *
|
|
Packit |
1f69a5 |
rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream,
|
|
Packit |
1f69a5 |
guint8 pt_fec, guint16 lost_seq)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
guint ret_length = 0;
|
|
Packit |
1f69a5 |
GList *end = NULL;
|
|
Packit |
1f69a5 |
GList *start = NULL;
|
|
Packit |
1f69a5 |
gboolean saw_fec = TRUE; /* To initialize the start pointer in the loop below */
|
|
Packit |
1f69a5 |
GList *it;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
/* Looking for media stream chunk with FEC packets at the end, which could
|
|
Packit |
1f69a5 |
* can have the lost packet. For example:
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* |#10 FEC| |#9 FEC| |#8| ... |#6| |#5 FEC| |#4 FEC| |#3 FEC| |#2| |#1| |#0|
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other
|
|
Packit |
1f69a5 |
* packets are not relevant for recovery of packet 7.
|
|
Packit |
1f69a5 |
*
|
|
Packit |
1f69a5 |
* Or the lost packet can be in the storage. In that case single packet is returned.
|
|
Packit |
1f69a5 |
* It can happen if:
|
|
Packit |
1f69a5 |
* - it could have arrived right after it was considered lost (more of a corner case)
|
|
Packit |
1f69a5 |
* - it was recovered together with the other lost packet (most likely)
|
|
Packit |
1f69a5 |
*/
|
|
Packit |
1f69a5 |
for (it = stream->queue.tail; it; it = it->prev) {
|
|
Packit |
1f69a5 |
RtpStorageItem *item = it->data;
|
|
Packit |
1f69a5 |
gboolean found_end = FALSE;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
/* Is the buffer we lost in the storage? */
|
|
Packit |
1f69a5 |
if (item->seq == lost_seq) {
|
|
Packit |
1f69a5 |
start = it;
|
|
Packit |
1f69a5 |
end = it;
|
|
Packit |
1f69a5 |
ret_length = 1;
|
|
Packit |
1f69a5 |
break;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (pt_fec == item->pt) {
|
|
Packit |
1f69a5 |
gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq);
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (seq_diff >= 0) {
|
|
Packit |
1f69a5 |
if (it->prev) {
|
|
Packit |
1f69a5 |
gboolean media_next =
|
|
Packit |
1f69a5 |
pt_fec != ((RtpStorageItem *) it->prev->data)->pt;
|
|
Packit |
1f69a5 |
found_end = media_next;
|
|
Packit |
1f69a5 |
} else
|
|
Packit |
1f69a5 |
found_end = TRUE;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
saw_fec = TRUE;
|
|
Packit |
1f69a5 |
} else if (saw_fec) {
|
|
Packit |
1f69a5 |
saw_fec = FALSE;
|
|
Packit |
1f69a5 |
start = it;
|
|
Packit |
1f69a5 |
ret_length = 0;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
++ret_length;
|
|
Packit |
1f69a5 |
if (found_end) {
|
|
Packit |
1f69a5 |
end = it;
|
|
Packit |
1f69a5 |
break;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (end && !start)
|
|
Packit |
1f69a5 |
start = end;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
if (start && end) {
|
|
Packit |
1f69a5 |
GstBufferList *ret = gst_buffer_list_new_sized (ret_length);
|
|
Packit |
1f69a5 |
GList *it;
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
for (it = start; it != end->prev; it = it->prev)
|
|
Packit |
1f69a5 |
gst_buffer_list_add (ret,
|
|
Packit |
1f69a5 |
gst_buffer_ref (((RtpStorageItem *) it->data)->buffer));
|
|
Packit |
1f69a5 |
return ret;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
return NULL;
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
|
|
Packit |
1f69a5 |
GstBuffer *
|
|
Packit |
1f69a5 |
rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream,
|
|
Packit |
1f69a5 |
guint16 lost_seq)
|
|
Packit |
1f69a5 |
{
|
|
Packit |
1f69a5 |
GList *it;
|
|
Packit |
1f69a5 |
for (it = stream->queue.head; it; it = it->next) {
|
|
Packit |
1f69a5 |
RtpStorageItem *item = it->data;
|
|
Packit |
1f69a5 |
if (item->seq == lost_seq)
|
|
Packit |
1f69a5 |
return gst_buffer_ref (item->buffer);
|
|
Packit |
1f69a5 |
}
|
|
Packit |
1f69a5 |
return NULL;
|
|
Packit |
1f69a5 |
}
|