summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/gstrtpjitterbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/gstrtpjitterbuffer.c')
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c253
1 files changed, 116 insertions, 137 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index 79d06788..fe85f87f 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -4,7 +4,7 @@
* Copyright 2007 Collabora Ltd,
* Copyright 2007 Nokia Corporation
* @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
- * Copyright 2007 Wim Taymans <wim@fluendo.com>
+ * Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@@ -72,7 +72,7 @@
#include "gstrtpbin-marshal.h"
#include "gstrtpjitterbuffer.h"
-#include "async_jitter_queue.h"
+#include "rtpjitterbuffer.h"
GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
@@ -87,7 +87,7 @@ GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
"Filter/Network/RTP",
"A buffer that deals with network jitter and other transmission faults",
"Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
- "Wim Taymans <wim@fluendo.com>");
+ "Wim Taymans <wim.taymans@gmail.com>");
/* RTPJitterBuffer signals and args */
enum
@@ -107,11 +107,32 @@ enum
PROP_DROP_ON_LATENCY
};
+#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock))
+
+#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
+ JBUF_LOCK (priv); \
+ if (priv->srcresult != GST_FLOW_OK) \
+ goto label; \
+} G_STMT_END
+
+#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
+#define JBUF_WAIT(priv) (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
+
+#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
+ JBUF_WAIT(priv); \
+ if (priv->srcresult != GST_FLOW_OK) \
+ goto label; \
+} G_STMT_END
+
+#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
+
struct _GstRTPJitterBufferPrivate
{
GstPad *sinkpad, *srcpad;
- AsyncJitterQueue *queue;
+ RTPJitterBuffer *jbuf;
+ GMutex *jbuf_lock;
+ GCond *jbuf_cond;
/* properties */
guint latency_ms;
@@ -122,12 +143,16 @@ struct _GstRTPJitterBufferPrivate
/* the next expected seqnum */
guint32 next_seqnum;
+ /* state */
+ gboolean eos;
+
/* clock rate and rtp timestamp offset */
gint32 clock_rate;
gint64 clock_base;
/* when we are shutting down */
GstFlowReturn srcresult;
+ gboolean blocked;
/* for sync */
GstSegment segment;
@@ -292,9 +317,9 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer,
priv->latency_ms = DEFAULT_LATENCY_MS;
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
- priv->queue = async_jitter_queue_new ();
- async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD);
- async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD);
+ priv->jbuf = rtp_jitter_buffer_new ();
+ priv->jbuf_lock = g_mutex_new ();
+ priv->jbuf_cond = g_cond_new ();
priv->waiting_seqnum = -1;
@@ -332,9 +357,9 @@ gst_rtp_jitter_buffer_dispose (GObject * object)
GstRTPJitterBuffer *jitterbuffer;
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
- if (jitterbuffer->priv->queue) {
- async_jitter_queue_unref (jitterbuffer->priv->queue);
- jitterbuffer->priv->queue = NULL;
+ if (jitterbuffer->priv->jbuf) {
+ g_object_unref (jitterbuffer->priv->jbuf);
+ jitterbuffer->priv->jbuf = NULL;
}
G_OBJECT_CLASS (parent_class)->dispose (object);
@@ -430,9 +455,6 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer,
} else
priv->next_seqnum = -1;
- async_jitter_queue_set_max_queue_length (priv->queue,
- priv->latency_ms * priv->clock_rate / 1000);
-
return TRUE;
/* ERRORS */
@@ -470,34 +492,24 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
}
static void
-free_func (gpointer data, GstRTPJitterBuffer * user_data)
-{
- if (GST_IS_BUFFER (data))
- gst_buffer_unref (GST_BUFFER_CAST (data));
- else
- gst_event_unref (GST_EVENT_CAST (data));
-}
-
-static void
gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer)
{
GstRTPJitterBufferPrivate *priv;
priv = jitterbuffer->priv;
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* mark ourselves as flushing */
priv->srcresult = GST_FLOW_WRONG_STATE;
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
- async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue,
- (GFunc) free_func, jitterbuffer);
+ JBUF_SIGNAL (priv);
+ rtp_jitter_buffer_flush (priv->jbuf);
/* unlock clock, we just unschedule, the entry will be released by the
* locking streaming thread. */
if (priv->clock_id)
gst_clock_id_unschedule (priv->clock_id);
-
- async_jitter_queue_unlock (priv->queue);
+ JBUF_UNLOCK (priv);
}
static void
@@ -507,7 +519,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
priv = jitterbuffer->priv;
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
/* Mark as non flushing */
priv->srcresult = GST_FLOW_OK;
@@ -515,9 +527,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer)
priv->last_popped_seqnum = -1;
priv->next_seqnum = -1;
priv->clock_rate = -1;
- /* allow pops from the src pad task */
- async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue);
- async_jitter_queue_unlock (priv->queue);
+ priv->eos = FALSE;
+ JBUF_UNLOCK (priv);
}
static gboolean
@@ -566,21 +577,20 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* reset negotiated values */
priv->clock_rate = -1;
priv->clock_base = -1;
/* block until we go to PLAYING */
- async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
- TRUE);
- async_jitter_queue_unlock (priv->queue);
+ priv->blocked = TRUE;
+ JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* unblock to allow streaming in PLAYING */
- async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
- FALSE);
- async_jitter_queue_unlock (priv->queue);
+ priv->blocked = FALSE;
+ JBUF_SIGNAL (priv);
+ JBUF_UNLOCK (priv);
break;
default:
break;
@@ -596,11 +606,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* block to stop streaming when PAUSED */
- async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue,
- TRUE);
- async_jitter_queue_unlock (priv->queue);
+ priv->blocked = TRUE;
+ JBUF_UNLOCK (priv);
if (ret != GST_STATE_CHANGE_FAILURE)
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
@@ -630,30 +639,6 @@ priv_compare_rtp_seq_lt (guint16 a, guint16 b)
}
}
-/**
- * gets the seqnum from the buffers and compare them
- */
-static gint
-compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b)
-{
- gint ret;
-
- if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) {
- /* two buffers */
- ret = priv_compare_rtp_seq_lt
- (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)),
- gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b)));
- } else {
- /* one of them is an event, the event always goes before the other element
- * so we return -1. */
- if (GST_IS_EVENT (a))
- ret = -1;
- else
- ret = 1;
- }
- return ret;
-}
-
static gboolean
gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
{
@@ -707,16 +692,20 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
case GST_EVENT_EOS:
{
/* push EOS in queue. We always push it at the head */
- async_jitter_queue_lock (priv->queue);
- GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
+ JBUF_LOCK (priv);
/* check for flushing, we need to discard the event and return FALSE when
* we are flushing */
ret = priv->srcresult == GST_FLOW_OK;
- if (ret)
- async_jitter_queue_push_unlocked (priv->queue, event);
- else
+ if (ret) {
+ GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
+ priv->eos = TRUE;
+ JBUF_SIGNAL (priv);
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
+ gst_flow_get_name (priv->srcresult));
gst_event_unref (event);
- async_jitter_queue_unlock (priv->queue);
+ }
+ JBUF_UNLOCK (priv);
break;
}
default:
@@ -780,7 +769,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
GstRTPJitterBuffer *jitterbuffer;
GstRTPJitterBufferPrivate *priv;
guint16 seqnum;
- GstFlowReturn ret;
+ GstFlowReturn ret = GST_FLOW_OK;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@@ -803,10 +792,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
seqnum = gst_rtp_buffer_get_seq (buffer);
GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
- async_jitter_queue_lock (priv->queue);
- ret = priv->srcresult;
- if (ret != GST_FLOW_OK)
- goto out_flushing;
+ JBUF_LOCK_CHECK (priv, out_flushing);
+ /* don't accept more data on EOS */
+ if (priv->eos)
+ goto have_eos;
/* let's check if this buffer is too late, we cannot accept packets with
* bigger seqnum than the one we already pushed. */
@@ -818,14 +807,18 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
/* let's drop oldest packet if the queue is already full and drop-on-latency
* is set. */
if (priv->drop_on_latency) {
- if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >=
- priv->latency_ms * priv->clock_rate / 1000) {
+ guint64 latency_ts;
+
+ latency_ts =
+ gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
+
+ if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
GstBuffer *old_buf;
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
seqnum);
- old_buf = async_jitter_queue_pop_unlocked (priv->queue);
+ old_buf = rtp_jitter_buffer_pop (priv->jbuf);
gst_buffer_unref (old_buf);
}
}
@@ -833,10 +826,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer,
- (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL))
+ if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
goto duplicate;
+ /* signal addition of new buffer */
+ JBUF_SIGNAL (priv);
+
/* let's unschedule and unblock any waiting buffers. We only want to do this
* if there is a currently waiting newer (> seqnum) buffer */
if (priv->clock_id) {
@@ -846,11 +841,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
}
}
- GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d",
- seqnum, async_jitter_queue_length_unlocked (priv->queue));
+ GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
+ seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
finished:
- async_jitter_queue_unlock (priv->queue);
+ JBUF_UNLOCK (priv);
gst_object_unref (jitterbuffer);
@@ -875,10 +870,18 @@ not_negotiated:
}
out_flushing:
{
+ ret = priv->srcresult;
GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
gst_buffer_unref (buffer);
goto finished;
}
+have_eos:
+ {
+ ret = GST_FLOW_UNEXPECTED;
+ GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
too_late:
{
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
@@ -908,8 +911,7 @@ static void
gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
{
GstRTPJitterBufferPrivate *priv;
- gpointer elem;
- GstBuffer *outbuf;
+ GstBuffer *outbuf = NULL;
GstFlowReturn result;
guint16 seqnum;
guint32 rtp_time;
@@ -918,44 +920,24 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer)
priv = jitterbuffer->priv;
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK_CHECK (priv, flushing);
again:
GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
- /* pop a buffer, we will get NULL if the queue was shut down */
- elem = async_jitter_queue_pop_unlocked (priv->queue);
- if (!elem)
- goto no_elem;
-
- /* special code for events */
- if (G_UNLIKELY (GST_IS_EVENT (elem))) {
- GstEvent *event = GST_EVENT_CAST (elem);
-
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_EOS:
- GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue");
- /* we don't expect more data now, makes upstream perform EOS actions */
- priv->srcresult = GST_FLOW_UNEXPECTED;
- break;
- default:
- GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue",
- GST_EVENT_TYPE_NAME (event));
- break;
- }
- async_jitter_queue_unlock (priv->queue);
-
- /* push event */
- gst_pad_push_event (priv->srcpad, event);
- return;
+ /* wait if we are blocked or don't have a packet and eos */
+ while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf)
+ || priv->eos)) {
+ JBUF_WAIT_CHECK (priv, flushing);
}
+ if (priv->eos)
+ goto do_eos;
- /* we know it's a buffer now */
- outbuf = GST_BUFFER_CAST (elem);
+ /* pop a buffer, we must have a buffer now */
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf);
seqnum = gst_rtp_buffer_get_seq (outbuf);
- GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d",
- gst_rtp_buffer_get_seq (outbuf),
- async_jitter_queue_length_unlocked (priv->queue));
+ GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, now %d left",
+ seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
/* If we don't know what the next seqnum should be (== -1) we have to wait
* because it might be possible that we are not receiving this buffer in-order,
@@ -1032,11 +1014,11 @@ again:
GST_OBJECT_UNLOCK (jitterbuffer);
/* release the lock so that the other end can push stuff or unlock */
- async_jitter_queue_unlock (priv->queue);
+ JBUF_UNLOCK (priv);
ret = gst_clock_id_wait (id, &jitter);
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
@@ -1054,8 +1036,7 @@ again:
GST_DEBUG_OBJECT (jitterbuffer,
"Wait got unscheduled, will retry to push with new buffer");
/* reinserting popped buffer into queue */
- if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf,
- (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) {
+ if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
GST_DEBUG_OBJECT (jitterbuffer,
"Duplicate packet #%d detected, dropping", seqnum);
priv->num_duplicates++;
@@ -1087,7 +1068,7 @@ push_buffer:
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
priv->next_seqnum = (seqnum + 1) & 0xffff;
- async_jitter_queue_unlock (priv->queue);
+ JBUF_UNLOCK (priv);
/* push buffer */
GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
@@ -1098,20 +1079,22 @@ push_buffer:
return;
/* ERRORS */
-no_elem:
+do_eos:
{
/* store result, we are flushing now */
- GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing");
- priv->srcresult = GST_FLOW_WRONG_STATE;
+ GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
+ priv->srcresult = GST_FLOW_UNEXPECTED;
gst_pad_pause_task (priv->srcpad);
- async_jitter_queue_unlock (priv->queue);
+ gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
+ JBUF_UNLOCK (priv);
return;
}
flushing:
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- gst_buffer_unref (outbuf);
- async_jitter_queue_unlock (priv->queue);
+ if (outbuf)
+ gst_buffer_unref (outbuf);
+ JBUF_UNLOCK (priv);
return;
}
pause:
@@ -1120,13 +1103,13 @@ pause:
GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
- async_jitter_queue_lock (priv->queue);
+ JBUF_LOCK (priv);
/* store result */
priv->srcresult = result;
/* we don't post errors or anything because upstream will do that for us
* when we pass the return value upstream. */
gst_pad_pause_task (priv->srcpad);
- async_jitter_queue_unlock (priv->queue);
+ JBUF_UNLOCK (priv);
return;
}
}
@@ -1194,11 +1177,7 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
old_latency = jitterbuffer->priv->latency_ms;
jitterbuffer->priv->latency_ms = new_latency;
- if (jitterbuffer->priv->clock_rate != -1) {
- async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue,
- gst_util_uint64_scale_int (new_latency,
- jitterbuffer->priv->clock_rate, 1000));
- }
+
/* post message if latency changed, this will infor the parent pipeline
* that a latency reconfiguration is possible. */
if (new_latency != old_latency) {