diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2007-09-16 19:40:31 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2007-09-16 19:40:31 +0000 |
commit | 04d3b8290698e41034809e8baec11622ca128243 (patch) | |
tree | df6c408b0b15acbf31a1743d7ce7a882b47a9b7d | |
parent | 51990d65dc103c9355bb83ef4bc75f7e1eae0ac4 (diff) | |
download | gst-plugins-bad-04d3b8290698e41034809e8baec11622ca128243.tar.gz gst-plugins-bad-04d3b8290698e41034809e8baec11622ca128243.tar.bz2 gst-plugins-bad-04d3b8290698e41034809e8baec11622ca128243.zip |
gst/rtpmanager/gstrtpbin.c: Use lock to protect variable.
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_set_property),
(gst_rtp_bin_get_property):
Use lock to protect variable.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_class_init),
(gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_chain),
(convert_rtptime_to_gsttime), (gst_rtp_jitter_buffer_loop):
Reconstruct GST timestamp from RTP timestamps based on measured clock
skew and sync offset.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_init),
(rtp_jitter_buffer_set_tail_changed),
(rtp_jitter_buffer_set_clock_rate),
(rtp_jitter_buffer_get_clock_rate), (calculate_skew),
(rtp_jitter_buffer_insert), (rtp_jitter_buffer_peek):
* gst/rtpmanager/rtpjitterbuffer.h:
Measure clock skew.
Add callback to be notfied when a new packet was inserted at the tail.
* gst/rtpmanager/rtpsource.c: (rtp_source_init),
(calculate_jitter), (rtp_source_send_rtp):
* gst/rtpmanager/rtpsource.h:
Remove clock skew detection, it's move to the jitterbuffer now.
-rw-r--r-- | ChangeLog | 27 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 4 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.c | 153 | ||||
-rw-r--r-- | gst/rtpmanager/rtpjitterbuffer.c | 216 | ||||
-rw-r--r-- | gst/rtpmanager/rtpjitterbuffer.h | 46 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsource.c | 53 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsource.h | 8 |
7 files changed, 368 insertions, 139 deletions
@@ -1,5 +1,32 @@ 2007-09-16 Wim Taymans <wim.taymans@gmail.com> + * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_set_property), + (gst_rtp_bin_get_property): + Use lock to protect variable. + + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_class_init), + (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_chain), + (convert_rtptime_to_gsttime), (gst_rtp_jitter_buffer_loop): + Reconstruct GST timestamp from RTP timestamps based on measured clock + skew and sync offset. + + * gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_init), + (rtp_jitter_buffer_set_tail_changed), + (rtp_jitter_buffer_set_clock_rate), + (rtp_jitter_buffer_get_clock_rate), (calculate_skew), + (rtp_jitter_buffer_insert), (rtp_jitter_buffer_peek): + * gst/rtpmanager/rtpjitterbuffer.h: + Measure clock skew. + Add callback to be notfied when a new packet was inserted at the tail. + + * gst/rtpmanager/rtpsource.c: (rtp_source_init), + (calculate_jitter), (rtp_source_send_rtp): + * gst/rtpmanager/rtpsource.h: + Remove clock skew detection, it's move to the jitterbuffer now. + +2007-09-16 Wim Taymans <wim.taymans@gmail.com> + Patch by: Daniel Charles <dcharles at ti dot com> * ext/amrwb/gstamrwbenc.c: (gst_amrwbenc_bandmode_get_type), diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index eb028fb1..cdbdaf64 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -1183,7 +1183,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: + GST_RTP_BIN_LOCK (rtpbin); rtpbin->latency = g_value_get_uint (value); + GST_RTP_BIN_UNLOCK (rtpbin); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1201,7 +1203,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: + GST_RTP_BIN_LOCK (rtpbin); g_value_set_uint (value, rtpbin->latency); + GST_RTP_BIN_UNLOCK (rtpbin); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 327ff0a3..57be42ec 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -320,7 +320,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map); GST_DEBUG_CATEGORY_INIT - (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer"); + (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer"); } static void @@ -453,6 +453,8 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, if (priv->clock_rate <= 0) goto wrong_rate; + rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate); + GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate); /* gah, clock-base is uint. If we don't have a base, we will use the first @@ -794,6 +796,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) GstRtpJitterBufferPrivate *priv; guint16 seqnum; GstFlowReturn ret = GST_FLOW_OK; + GstClockTime timestamp; jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); @@ -811,10 +814,23 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt); if (priv->clock_rate == -1) goto not_negotiated; + + rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate); } + /* take the timestamp of the buffer. This is the time when the packet was + * received and is used to calculate jitter and clock skew. We will adjust + * this timestamp with the smoothed value after processing it in the + * jitterbuffer. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + /* bring to running time */ + timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, + timestamp); + seqnum = gst_rtp_buffer_get_seq (buffer); - GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum); + GST_DEBUG_OBJECT (jitterbuffer, + "Received packet #%d at time %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (timestamp)); JBUF_LOCK_CHECK (priv, out_flushing); /* don't accept more data on EOS */ @@ -852,7 +868,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (!rtp_jitter_buffer_insert (priv->jbuf, buffer)) + if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp)) goto duplicate; /* signal addition of new buffer */ @@ -926,6 +942,37 @@ duplicate: } } +static GstClockTime +convert_rtptime_to_gsttime (GstRtpJitterBuffer * jitterbuffer, + guint64 exttimestamp) +{ + GstClockTime timestamp; + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + /* construct a timestamp from the RTP timestamp now. We don't apply this + * timestamp to the outgoing buffer yet as the popped buffer might not be the + * one we need to push out right now. */ + timestamp = + gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate); + + /* apply first observed timestamp */ + timestamp += priv->jbuf->base_time; + + /* apply the current clock skew */ + timestamp += priv->jbuf->skew; + + /* apply the timestamp offset */ + timestamp += priv->ts_offset; + + /* add latency, this includes our own latency and the peer latency. */ + timestamp += (priv->latency_ms * GST_MSECOND); + timestamp += priv->peer_latency; + + return timestamp; +} + /** * This funcion will push out buffers on the source pad. * @@ -942,9 +989,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) guint16 seqnum; guint32 rtp_time; GstClockTime timestamp; - gint64 running_time; guint64 exttimestamp; - gint ts_offset_rtp; priv = jitterbuffer->priv; @@ -968,19 +1013,29 @@ again: /* pop a buffer, we must have a buffer now */ outbuf = rtp_jitter_buffer_pop (priv->jbuf); - seqnum = gst_rtp_buffer_get_seq (outbuf); - /* get the max deadline to wait for the missing packets, this is the time - * of the currently popped packet */ + /* construct extended RTP timestamp from packet */ rtp_time = gst_rtp_buffer_get_timestamp (outbuf); exttimestamp = gst_rtp_buffer_ext_timestamp (&priv->exttimestamp, rtp_time); + /* if no clock_base was given, take first ts as base */ + if (priv->clock_base == -1) { + GST_DEBUG_OBJECT (jitterbuffer, + "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp); + priv->clock_base = exttimestamp; + } + /* subtract the base clock time so that we start counting from 0 */ + exttimestamp -= priv->clock_base; + GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT ", now %d left", seqnum, rtp_time, exttimestamp, rtp_jitter_buffer_num_packets (priv->jbuf)); + /* convert the RTP timestamp to a gstreamer timestamp. */ + timestamp = convert_rtptime_to_gsttime (jitterbuffer, exttimestamp); + /* If we don't know what the next seqnum should be (== -1) we have to wait * because it might be possible that we are not receiving this buffer in-order, * a buffer with a lower seqnum could arrive later and we want to push that @@ -991,7 +1046,7 @@ again: * packet expires. */ if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) { GstClockID id; - GstClockTimeDiff jitter; + GstClockTime sync_time; GstClockReturn ret; GstClock *clock; @@ -1007,34 +1062,6 @@ again: GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum); } - GST_DEBUG_OBJECT (jitterbuffer, - "exttimestamp %" G_GUINT64_FORMAT ", base %" G_GINT64_FORMAT, - exttimestamp, priv->clock_base); - - /* if no clock_base was given, take first ts as base */ - if (priv->clock_base == -1) { - GST_DEBUG_OBJECT (jitterbuffer, - "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp); - priv->clock_base = exttimestamp; - } - - /* take rtp timestamp offset into account, this should not wrap around since - * we are dealing with the extended timestamp here. */ - exttimestamp -= priv->clock_base; - - /* bring timestamp to gst time */ - timestamp = - gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate); - - GST_DEBUG_OBJECT (jitterbuffer, - "exttimestamp %" G_GUINT64_FORMAT ", clock-rate %u, timestamp %" - GST_TIME_FORMAT, exttimestamp, priv->clock_rate, - GST_TIME_ARGS (timestamp)); - - /* bring to running time */ - running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, - timestamp); - GST_OBJECT_LOCK (jitterbuffer); clock = GST_ELEMENT_CLOCK (jitterbuffer); if (!clock) { @@ -1043,25 +1070,21 @@ again: goto push_buffer; } - /* add latency, this includes our own latency and the peer latency. */ - running_time += (priv->latency_ms * GST_MSECOND); - running_time += priv->peer_latency; - - GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time)); + GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT, + GST_TIME_ARGS (timestamp)); /* prepare for sync against clock */ - running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time; + sync_time = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time; /* create an entry for the clock */ - id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time); + id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); priv->waiting_seqnum = seqnum; GST_OBJECT_UNLOCK (jitterbuffer); /* release the lock so that the other end can push stuff or unlock */ JBUF_UNLOCK (priv); - ret = gst_clock_id_wait (id, &jitter); + ret = gst_clock_id_wait (id, NULL); JBUF_LOCK (priv); /* and free the entry */ @@ -1080,8 +1103,9 @@ again: if (ret == GST_CLOCK_UNSCHEDULED) { GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled, will retry to push with new buffer"); - /* reinsert popped buffer into queue */ - if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) { + /* reinsert popped buffer into queue, no need to recalculate skew, we do + * that when inserting the buffer in the chain function */ + if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) { GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; @@ -1089,6 +1113,9 @@ again: } goto again; } + /* After waiting, we might have a better estimate of skew, generate a new + * timestamp before pushing out the buffer */ + timestamp = convert_rtptime_to_gsttime (jitterbuffer, exttimestamp); } push_buffer: /* check if we are pushing something unexpected */ @@ -1105,37 +1132,13 @@ push_buffer: /* update stats */ priv->num_late += dropped; - /* set DISCONT flag */ + /* set DISCONT flag when we missed a packet. */ outbuf = gst_buffer_make_metadata_writable (outbuf); GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); } - /* apply the timestamp offset */ - if (priv->ts_offset > 0) - ts_offset_rtp = - gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate, - GST_SECOND); - else if (priv->ts_offset < 0) - ts_offset_rtp = - -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate, - GST_SECOND); - else - ts_offset_rtp = 0; - - if (ts_offset_rtp != 0) { - guint32 timestamp; - - /* if the offset changed, mark with discont */ - if (priv->ts_offset != priv->prev_ts_offset) { - GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp); - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); - priv->prev_ts_offset = priv->ts_offset; - } - - timestamp = gst_rtp_buffer_get_timestamp (outbuf); - timestamp += ts_offset_rtp; - gst_rtp_buffer_set_timestamp (outbuf, timestamp); - } + /* apply timestamp to buffer now */ + GST_BUFFER_TIMESTAMP (outbuf) = timestamp; /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ diff --git a/gst/rtpmanager/rtpjitterbuffer.c b/gst/rtpmanager/rtpjitterbuffer.c index c36a25c5..7260e9ee 100644 --- a/gst/rtpmanager/rtpjitterbuffer.c +++ b/gst/rtpmanager/rtpjitterbuffer.c @@ -61,7 +61,20 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass) static void rtp_jitter_buffer_init (RTPJitterBuffer * jbuf) { + gint i; + jbuf->packets = g_queue_new (); + jbuf->base_time = -1; + jbuf->base_rtptime = -1; + jbuf->ext_rtptime = -1; + + for (i = 0; i < 100; i++) { + jbuf->window[i] = 0; + } + jbuf->window_pos = 0; + jbuf->window_filling = TRUE; + jbuf->window_min = 0; + jbuf->skew = 0; } static void @@ -94,6 +107,168 @@ rtp_jitter_buffer_new (void) return jbuf; } +void +rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer * jbuf, RTPTailChanged func, + gpointer user_data) +{ + g_return_if_fail (jbuf != NULL); + + jbuf->tail_changed = func; + jbuf->user_data = user_data; +} + +void +rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer * jbuf, gint clock_rate) +{ + g_return_if_fail (jbuf != NULL); + + jbuf->clock_rate = clock_rate; +} + +gint +rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf) +{ + g_return_val_if_fail (jbuf != NULL, 0); + + return jbuf->clock_rate; +} + + +/* For the clock skew we use a windowed low point averaging algorithm as can be + * found in http://www.grame.fr/pub/TR-050601.pdf. The idea is that the jitter is + * composed of: + * + * J = N + n + * + * N : a constant network delay. + * n : random added noise. The noise is concentrated around 0 + * + * In the receiver we can track the elapsed time at the sender with: + * + * send_diff(i) = (Tsi - Ts0); + * + * Tsi : The time at the sender at packet i + * Ts0 : The time at the sender at the first packet + * + * This is the difference between the RTP timestamp in the first received packet + * and the current packet. + * + * At the receiver we have to deal with the jitter introduced by the network. + * + * recv_diff(i) = (Tri - Tr0) + * + * Tri : The time at the receiver at packet i + * Tr0 : The time at the receiver at the first packet + * + * Both of these values contain a jitter Ji, a jitter for packet i, so we can + * write: + * + * recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0)) + * + * Cri : The time of the clock at the receiver for packet i + * D + ni : The jitter when receiving packet i + * + * We see that the network delay is irrelevant here as we can elliminate D: + * + * recv_diff(i) = (Cri + ni) - (Cr0 + n0)) + * + * The drift is now expressed as: + * + * Drift(i) = recv_diff(i) - send_diff(i); + * + * We now keep the W latest values of Drift and find the minimum (this is the + * one with the lowest network jitter and thus the one which is least affected + * by it). We average this lowest value to smooth out the resulting network skew. + * + * Both the window and the weighting used for averaging influence the accuracy + * of the drift estimation. Finding the correct parameters turns out to be a + * compromise between accuracy and inertia. + */ +static void +calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time) +{ + guint64 ext_rtptime; + guint64 send_diff, recv_diff; + gint64 delta; + gint64 old; + gint pos, i; + GstClockTime gstrtptime; + + ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime); + + gstrtptime = + gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate); + + /* first time, lock on to time and gstrtptime */ + if (jbuf->base_time == -1) + jbuf->base_time = time; + if (jbuf->base_rtptime == -1) + jbuf->base_rtptime = gstrtptime; + + /* elapsed time at sender */ + send_diff = gstrtptime - jbuf->base_rtptime; + /* elapsed time at receiver, includes the jitter */ + recv_diff = time - jbuf->base_time; + + /* measure the diff */ + delta = ((gint64) recv_diff) - ((gint64) send_diff); + + pos = jbuf->window_pos; + + if (jbuf->window_filling) { + /* we are filling the window */ + GST_DEBUG ("filling %d %" G_GINT64_FORMAT, pos, delta); + jbuf->window[pos++] = delta; + /* calc the min delta we observed */ + if (pos == 1 || delta < jbuf->window_min) + jbuf->window_min = delta; + + if (pos >= 100) { + /* window filled, fill window with min */ + GST_DEBUG ("min %" G_GINT64_FORMAT, jbuf->window_min); + for (i = 0; i < 100; i++) + jbuf->window[i] = jbuf->window_min; + + /* the skew is initially the min */ + jbuf->skew = jbuf->window_min; + jbuf->window_filling = FALSE; + } + } else { + /* pick old value and store new value. We keep the previous value in order + * to quickly check if the min of the window changed */ + old = jbuf->window[pos]; + jbuf->window[pos++] = delta; + + if (delta <= jbuf->window_min) { + /* if the new value we inserted is smaller or equal to the current min, + * it becomes the new min */ + jbuf->window_min = delta; + } else if (old == jbuf->window_min) { + gint64 min = G_MAXINT64; + + /* if we removed the old min, we have to find a new min */ + for (i = 0; i < 100; i++) { + /* we found another value equal to the old min, we can stop searching now */ + if (jbuf->window[i] == old) { + min = old; + break; + } + if (jbuf->window[i] < min) + min = jbuf->window[i]; + } + jbuf->window_min = min; + } + /* average the min values */ + jbuf->skew = (jbuf->window_min + (15 * jbuf->skew)) / 16; + GST_DEBUG ("new min: %" G_GINT64_FORMAT ", skew %" G_GINT64_FORMAT, + jbuf->window_min, jbuf->skew); + } + /* wrap around in the window */ + if (pos >= 100) + pos = 0; + jbuf->window_pos = pos; +} + static gint compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf) { @@ -115,6 +290,7 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf) * rtp_jitter_buffer_insert: * @jbuf: an #RTPJitterBuffer * @buf: a buffer + * @time: a timestamp when this buffer was received in nanoseconds * * Inserts @buf into the packet queue of @jbuf. The sequence number of the * packet will be used to sort the packets. This function takes ownerhip of @@ -123,10 +299,12 @@ compare_seqnum (GstBuffer * a, GstBuffer * b, RTPJitterBuffer * jbuf) * Returns: %FALSE if a packet with the same number already existed. */ gboolean -rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf) +rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf, + GstClockTime time) { GList *list; gint func_ret = 1; + guint32 rtptime; g_return_val_if_fail (jbuf != NULL, FALSE); g_return_val_if_fail (buf != NULL, FALSE); @@ -142,11 +320,23 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf) if (func_ret == 0) return FALSE; + /* do skew calculation by measuring the difference between rtptime and the + * receive time */ + if (time != -1) { + rtptime = gst_rtp_buffer_get_timestamp (buf); + calculate_skew (jbuf, rtptime, time); + } + if (list) g_queue_insert_before (jbuf->packets, list, buf); - else + else { g_queue_push_tail (jbuf->packets, buf); + /* tail buffer changed, signal callback */ + if (jbuf->tail_changed) + jbuf->tail_changed (jbuf, jbuf->user_data); + } + return TRUE; } @@ -171,6 +361,28 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf) } /** + * rtp_jitter_buffer_peek: + * @jbuf: an #RTPJitterBuffer + * + * Peek the oldest buffer from the packet queue of @jbuf. Register a callback + * with rtp_jitter_buffer_set_tail_changed() to be notified when an older packet + * was inserted in the queue. + * + * Returns: a #GstBuffer or %NULL when there was no packet in the queue. + */ +GstBuffer * +rtp_jitter_buffer_peek (RTPJitterBuffer * jbuf) +{ + GstBuffer *buf; + + g_return_val_if_fail (jbuf != NULL, FALSE); + + buf = g_queue_peek_tail (jbuf->packets); + + return buf; +} + +/** * rtp_jitter_buffer_flush: * @jbuf: an #RTPJitterBuffer * diff --git a/gst/rtpmanager/rtpjitterbuffer.h b/gst/rtpmanager/rtpjitterbuffer.h index 8bff03c8..b67e265f 100644 --- a/gst/rtpmanager/rtpjitterbuffer.h +++ b/gst/rtpmanager/rtpjitterbuffer.h @@ -35,14 +35,38 @@ typedef struct _RTPJitterBufferClass RTPJitterBufferClass; #define RTP_JITTER_BUFFER_CAST(src) ((RTPJitterBuffer *)(src)) /** + * RTPTailChanged: + * @jbuf: an #RTPJitterBuffer + * @user_data: user data specified when registering + * + * This callback will be called when the tail buffer of @jbuf changed. + */ +typedef void (*RTPTailChanged) (RTPJitterBuffer *jbuf, gpointer user_data); + +/** * RTPJitterBuffer: * * A JitterBuffer in the #RTPSession */ struct _RTPJitterBuffer { - GObject object; + GObject object; + + GQueue *packets; - GQueue *packets; + gint clock_rate; + + /* for calculating skew */ + GstClockTime base_time; + GstClockTime base_rtptime; + guint64 ext_rtptime; + gint64 window[100]; + guint window_pos; + gboolean window_filling; + gint64 window_min; + gint64 skew; + + RTPTailChanged tail_changed; + gpointer user_data; }; struct _RTPJitterBufferClass { @@ -52,14 +76,20 @@ struct _RTPJitterBufferClass { GType rtp_jitter_buffer_get_type (void); /* managing lifetime */ -RTPJitterBuffer* rtp_jitter_buffer_new (void); +RTPJitterBuffer* rtp_jitter_buffer_new (void); + +void rtp_jitter_buffer_set_tail_changed (RTPJitterBuffer *jbuf, RTPTailChanged func, + gpointer user_data); + +void rtp_jitter_buffer_set_clock_rate (RTPJitterBuffer *jbuf, gint clock_rate); +gint rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer *jbuf); -gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf); -GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf); +gboolean rtp_jitter_buffer_insert (RTPJitterBuffer *jbuf, GstBuffer *buf, GstClockTime time); +GstBuffer * rtp_jitter_buffer_pop (RTPJitterBuffer *jbuf); -void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf); +void rtp_jitter_buffer_flush (RTPJitterBuffer *jbuf); -guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf); -guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); +guint rtp_jitter_buffer_num_packets (RTPJitterBuffer *jbuf); +guint32 rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer *jbuf); #endif /* __RTP_JITTER_BUFFER_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index c4152474..4ffc6bbc 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -69,9 +69,6 @@ rtp_source_init (RTPSource * src) src->payload = 0; src->clock_rate = -1; src->clock_base = -1; - src->skew_base_ntpnstime = -1; - src->ext_rtptime = -1; - src->prev_ext_rtptime = -1; src->packets = g_queue_new (); src->seqnum_base = -1; src->last_rtptime = -1; @@ -266,18 +263,20 @@ get_clock_rate (RTPSource * src, guint8 payload) return src->clock_rate; } +/* Jitter is the variation in the delay of received packets in a flow. It is + * measured by comparing the interval when RTP packets were sent to the interval + * at which they were received. For instance, if packet #1 and packet #2 leave + * 50 milliseconds apart and arrive 60 milliseconds apart, then the jitter is 10 + * milliseconds. */ static void calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { guint64 ntpnstime; guint32 rtparrival, transit, rtptime; - guint64 ext_rtptime; gint32 diff; gint clock_rate; guint8 pt; - guint64 rtpdiff, ntpdiff; - gint64 skew; /* get arrival time */ if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE) @@ -291,50 +290,12 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, rtptime = gst_rtp_buffer_get_timestamp (buffer); - /* convert to extended timestamp right away */ - ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime); - /* no clock-base, take first rtptime as base */ if (src->clock_base == -1) { GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime); src->clock_base = rtptime; } - if (src->skew_base_ntpnstime == -1) { - /* lock on first observed NTP and RTP time, they should increment in-sync or - * we have a clock skew. */ - GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT, - GST_TIME_ARGS (ntpnstime)); - src->skew_base_ntpnstime = ntpnstime; - src->skew_base_rtptime = rtptime; - src->prev_ext_rtptime = ext_rtptime; - src->avg_skew = 0; - } else if (src->prev_ext_rtptime < ext_rtptime) { - /* get elapsed rtptime but only when the previous rtptime was stricly smaller - * than the new one. */ - rtpdiff = ext_rtptime - src->skew_base_rtptime; - /* get NTP diff and convert to RTP time, this is always positive */ - ntpdiff = ntpnstime - src->skew_base_ntpnstime; - ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND); - - /* see how the NTP and RTP relate any deviation from 0 means that they drift - * out of sync and we must compensate. */ - skew = ntpdiff - rtpdiff; - /* average out the skew to get a smooth value. */ - src->avg_skew = (63 * src->avg_skew + skew) / 64; - - GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, - src->avg_skew); - /* store previous extended timestamp */ - src->prev_ext_rtptime = ext_rtptime; - } - if (src->avg_skew != 0) { - /* patch the buffer RTP timestamp with the skew */ - GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT, - rtptime, rtptime + src->avg_skew); - gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew); - } - /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't * care about the absolute value, just the difference. */ rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND); @@ -603,7 +564,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) /* the SSRC of the packet is not correct, make a writable buffer and * update the SSRC. This could involve a complete copy of the packet when * it is not writable. Usually the payloader will use caps negotiation to - * get the correct SSRC. */ + * get the correct SSRC from the session manager before pushing anything. */ buffer = gst_buffer_make_writable (buffer); GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc, @@ -614,7 +575,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->stats.packets_sent); result = src->callbacks.push_rtp (src, buffer, src->user_data); } else { - GST_DEBUG ("no callback installed"); + GST_WARNING ("no callback installed, dropping packet"); gst_buffer_unref (buffer); } diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index be793461..1952a4c2 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -137,16 +137,8 @@ struct _RTPSource { GstCaps *caps; gint clock_rate; gint32 seqnum_base; - gint64 clock_base; - /* to calculate the clock skew */ - guint64 skew_base_ntpnstime; - guint64 skew_base_rtptime; - gint64 avg_skew; - guint64 ext_rtptime; - guint64 prev_ext_rtptime; - GstClockTime bye_time; GstClockTime last_activity; GstClockTime last_rtp_activity; |