diff options
Diffstat (limited to 'gst/rtpmanager/rtpsource.c')
-rw-r--r-- | gst/rtpmanager/rtpsource.c | 334 |
1 files changed, 300 insertions, 34 deletions
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 24bb8466..63543358 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -68,7 +68,12 @@ rtp_source_init (RTPSource * src) src->payload = 0; src->clock_rate = -1; + src->clock_base = -1; + src->skew_base_ntpnstime = -1; + src->ext_rtptime = -1; + src->prev_ext_rtptime = -1; src->packets = g_queue_new (); + src->seqnum_base = -1; src->stats.cycles = -1; src->stats.jitter = 0; @@ -112,6 +117,44 @@ rtp_source_new (guint32 ssrc) } /** + * rtp_source_update_caps: + * @src: an #RTPSource + * @caps: a #GstCaps + * + * Parse @caps and store all relevant information in @source. + */ +void +rtp_source_update_caps (RTPSource * src, GstCaps * caps) +{ + GstStructure *s; + guint val; + gint ival; + + /* nothing changed, return */ + if (src->caps == caps) + return; + + s = gst_caps_get_structure (caps, 0); + + if (gst_structure_get_int (s, "payload", &ival)) + src->payload = ival; + GST_DEBUG ("got payload %d", src->payload); + + gst_structure_get_int (s, "clock-rate", &src->clock_rate); + GST_DEBUG ("got clock-rate %d", src->clock_rate); + + if (gst_structure_get_uint (s, "clock-base", &val)) + src->clock_base = val; + GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base); + + if (gst_structure_get_uint (s, "seqnum-base", &val)) + src->seqnum_base = val; + GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base); + + gst_caps_replace (&src->caps, caps); +} + +/** * rtp_source_set_callbacks: * @src: an #RTPSource * @cb: callback functions @@ -207,7 +250,7 @@ push_packet (RTPSource * src, GstBuffer * buffer) static gint get_clock_rate (RTPSource * src, guint8 payload) { - if (payload != src->payload) { + if (src->clock_rate == -1) { gint clock_rate = -1; if (src->callbacks.clock_rate) @@ -216,8 +259,9 @@ get_clock_rate (RTPSource * src, guint8 payload) GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate); src->clock_rate = clock_rate; - src->payload = payload; } + src->payload = payload; + return src->clock_rate; } @@ -225,14 +269,17 @@ static void calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { - GstClockTime current; + guint64 ntpnstime; guint32 rtparrival, transit, rtptime; + guint64 ext_rtptime; gint32 diff; gint clock_rate; guint8 pt; + guint64 rtpdiff, ntpdiff; + gint64 skew; /* get arrival time */ - if ((current = arrival->time) == GST_CLOCK_TIME_NONE) + if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE) goto no_time; pt = gst_rtp_buffer_get_payload_type (buffer); @@ -243,8 +290,56 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, rtptime = gst_rtp_buffer_get_timestamp (buffer); - /* convert arrival time to RTP timestamp units */ - rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND); + /* convert to extended timestamp right away */ + ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime); + + /* no clock-base, take first rtptime as base */ + if (src->clock_base == -1) { + GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime); + src->clock_base = rtptime; + } + + if (src->skew_base_ntpnstime == -1) { + /* lock on first observed NTP and RTP time, they should increment in-sync or + * we have a clock skew. */ + GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime)); + src->skew_base_ntpnstime = ntpnstime; + src->skew_base_rtptime = rtptime; + src->prev_ext_rtptime = ext_rtptime; + src->avg_skew = 0; + } else if (src->prev_ext_rtptime < ext_rtptime) { + /* get elapsed rtptime but only when the previous rtptime was stricly smaller + * than the new one. */ + rtpdiff = ext_rtptime - src->skew_base_rtptime; + /* get NTP diff and convert to RTP time, this is always positive */ + ntpdiff = ntpnstime - src->skew_base_ntpnstime; + ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND); + + /* see how the NTP and RTP relate any deviation from 0 means that they drift + * out of sync and we must compensate. */ + skew = ntpdiff - rtpdiff; + /* average out the skew to get a smooth value. */ + src->avg_skew = (31 * src->avg_skew + skew) / 32; + + GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, + src->avg_skew); + if (src->avg_skew != 0) { + guint32 timestamp; + + /* patch the buffer RTP timestamp with the skew */ + GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew); + timestamp = gst_rtp_buffer_get_timestamp (buffer); + timestamp += src->avg_skew; + gst_rtp_buffer_set_timestamp (buffer, timestamp); + } + /* store previous extended timestamp */ + src->prev_ext_rtptime = ext_rtptime; + } + + /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't + * care about the absolute value, just the difference. */ + rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND); /* transit time is difference with RTP timestamp */ transit = rtparrival - rtptime; @@ -324,6 +419,8 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, seqnr = gst_rtp_buffer_get_seq (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + if (stats->cycles == -1) { GST_DEBUG ("received first buffer"); /* first time we heard of this source */ @@ -389,6 +486,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, } } else { /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + GST_WARNING ("duplicate or reordered packet"); } src->stats.octets_received += arrival->payload_len; @@ -401,7 +499,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, GST_DEBUG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, seqnr, src->stats.packets_received, src->stats.octets_received); - /* calculate jitter */ + /* calculate jitter and perform skew correction */ calculate_jitter (src, buffer, arrival); /* we're ready to push the RTP packet now */ @@ -444,25 +542,27 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason) * rtp_source_send_rtp: * @src: an #RTPSource * @buffer: an RTP buffer + * @ntpnstime: the NTP time when this buffer was captured in nanoseconds * * Send an RTP @buffer originating from @src. This will make @src a sender. * This function takes ownership of @buffer and modifies the SSRC in the RTP - * packet to that of @src. + * packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) +rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; guint len; - GstClockTime timestamp; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); len = gst_rtp_buffer_get_payload_len (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + /* we are a sender now */ src->is_sender = TRUE; @@ -471,18 +571,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) src->stats.octets_sent += len; /* we keep track of the last received RTP timestamp and the corresponding - * GStreamer timestamp so that we can convert NTP time to RTP time when - * sending SR reports */ + * NTP timestamp so that we can use this info when constructing SR reports */ src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer); - - /* the timestamp can be undefined, in that case we use any previously - * received timestamp */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); - if (timestamp != -1) - src->last_timestamp = timestamp; - - if (src->clock_rate == -1) - get_clock_rate (src, gst_rtp_buffer_get_payload_type (buffer)); + src->last_ntpnstime = ntpnstime; /* push packet */ if (src->callbacks.push_rtp) { @@ -496,7 +587,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) * get the correct SSRC. */ buffer = gst_buffer_make_writable (buffer); - GST_DEBUG ("updating SSRC from %u to %u", ssrc, src->ssrc); + GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc); gst_rtp_buffer_set_ssrc (buffer, src->ssrc); } GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT, @@ -513,17 +604,17 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) /** * rtp_source_process_sr: * @src: an #RTPSource + * @time: time of packet arrival * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count - * @time: time of packet arrival * * Update the sender report in @src. */ void -rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, - guint32 packet_count, guint32 octet_count, GstClockTime time) +rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime, + guint32 rtptime, guint32 packet_count, guint32 octet_count) { RTPSenderReport *curr; gint curridx; @@ -556,6 +647,7 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, /** * rtp_source_process_rb: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @fractionlost: fraction lost since last SR/RR * @packetslost: the cumululative number of packets lost * @exthighestseq: the extended last sequence number received @@ -566,18 +658,20 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, * Update the report block in @src. */ void -rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, - guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr) +rtp_source_process_rb (RTPSource * src, GstClockTime time, guint8 fractionlost, + gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr, + guint32 dlsr) { RTPReceiverReport *curr; gint curridx; + guint32 ntp, A; g_return_if_fail (RTP_IS_SOURCE (src)); - GST_DEBUG ("got RB packet: SSRC %08x, FL %" G_GUINT32_FORMAT "" - ", PL %d, HS %" G_GUINT32_FORMAT ", JITTER %" G_GUINT32_FORMAT - ", LSR %08x, DLSR %08x", src->ssrc, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); + GST_DEBUG ("got RB packet: SSRC %08x, FL %2x, PL %d, HS %" G_GUINT32_FORMAT + ", jitter %" G_GUINT32_FORMAT ", LSR %04x:%04x, DLSR %04x:%04x", + src->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr >> 16, + lsr & 0xffff, dlsr >> 16, dlsr & 0xffff); curridx = src->stats.curr_rr ^ 1; curr = &src->stats.rr[curridx]; @@ -591,26 +685,198 @@ rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, curr->lsr = lsr; curr->dlsr = dlsr; + /* calculate round trip */ + ntp = (gst_rtcp_unix_to_ntp (time) >> 16) & 0xffffffff; + A = ntp - dlsr; + A -= lsr; + curr->round_trip = A; + + GST_DEBUG ("NTP %04x:%04x, round trip %04x:%04x", ntp >> 16, ntp & 0xffff, + A >> 16, A & 0xffff); + /* make current */ src->stats.curr_rr = curridx; } /** - * rtp_source_get_last_sr: + * rtp_source_get_new_sr: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count + * + * Get new values to put into a new SR report from this source. + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, + guint64 * ntptime, guint32 * rtptime, guint32 * packet_count, + guint32 * octet_count) +{ + guint32 t_rtp; + guint64 t_current_ntp; + GstClockTimeDiff diff; + + g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE); + + /* use the sync params to interpollate the date->time member to rtptime. We + * use the last sent timestamp and rtptime as reference points. We assume + * that the slope of the rtptime vs timestamp curve is 1, which is certainly + * sufficient for the frequency at which we report SR and the rate we send + * out RTP packets. */ + t_rtp = src->last_rtptime; + + GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %" + G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); + + if (src->clock_rate != -1) { + /* get the diff with the SR time */ + diff = GST_CLOCK_DIFF (src->last_ntpnstime, ntpnstime); + + /* now translate the diff to RTP time, handle positive and negative cases. + * If there is no diff, we already set rtptime correctly above. */ + if (diff > 0) { + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } else { + diff = -diff; + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } + } else { + GST_WARNING ("no clock-rate, cannot interpollate rtp time"); + } + + t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND); + + GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, + (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff), + t_rtp); + + if (ntptime) + *ntptime = t_current_ntp; + if (rtptime) + *rtptime = t_rtp; + if (packet_count) + *packet_count = src->stats.packets_sent; + if (octet_count) + *octet_count = src->stats.octets_sent; + + return TRUE; +} + +/** + * rtp_source_get_new_rb: + * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 + * @fractionlost: fraction lost since last SR/RR + * @packetslost: the cumululative number of packets lost + * @exthighestseq: the extended last sequence number received + * @jitter: the interarrival jitter + * @lsr: the last SR packet from this source + * @dlsr: the delay since last SR packet + * + * Get the values of the last RB report set with rtp_source_process_rb(). + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_rb (RTPSource * src, GstClockTime time, + guint8 * fractionlost, gint32 * packetslost, guint32 * exthighestseq, + guint32 * jitter, guint32 * lsr, guint32 * dlsr) +{ + RTPSourceStats *stats; + guint64 extended_max, expected; + guint64 expected_interval, received_interval, ntptime; + gint64 lost, lost_interval; + guint32 fraction, LSR, DLSR; + GstClockTime sr_time; + + stats = &src->stats; + + extended_max = stats->cycles + stats->max_seq; + expected = extended_max - stats->base_seq + 1; + + GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT + ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, + extended_max, expected, stats->packets_received, stats->base_seq); + + lost = expected - stats->packets_received; + lost = CLAMP (lost, -0x800000, 0x7fffff); + + expected_interval = expected - stats->prev_expected; + stats->prev_expected = expected; + received_interval = stats->packets_received - stats->prev_received; + stats->prev_received = stats->packets_received; + + lost_interval = expected_interval - received_interval; + + if (expected_interval == 0 || lost_interval <= 0) + fraction = 0; + else + fraction = (lost_interval << 8) / expected_interval; + + GST_DEBUG ("add RR for SSRC %08x", src->ssrc); + /* we scaled the jitter up for additional precision */ + GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT + ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, + extended_max, stats->jitter >> 4); + + if (rtp_source_get_last_sr (src, &sr_time, &ntptime, NULL, NULL, NULL)) { + GstClockTime diff; + + /* LSR is middle 32 bits of the last ntptime */ + LSR = (ntptime >> 16) & 0xffffffff; + diff = time - sr_time; + GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); + /* DLSR, delay since last SR is expressed in 1/65536 second units */ + DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); + } else { + /* No valid SR received, LSR/DLSR are set to 0 then */ + GST_DEBUG ("no valid SR received"); + LSR = 0; + DLSR = 0; + } + GST_DEBUG ("LSR %04x:%04x, DLSR %04x:%04x", LSR >> 16, LSR & 0xffff, + DLSR >> 16, DLSR & 0xffff); + + if (fractionlost) + *fractionlost = fraction; + if (packetslost) + *packetslost = lost; + if (exthighestseq) + *exthighestseq = extended_max; + if (jitter) + *jitter = stats->jitter >> 4; + if (lsr) + *lsr = LSR; + if (dlsr) + *dlsr = DLSR; + + return TRUE; +} + +/** + * rtp_source_get_last_sr: + * @src: an #RTPSource * @time: time of packet arrival + * @ntptime: the NTP time + * @rtptime: the RTP time + * @packet_count: the packet count + * @octet_count: the octect count * * Get the values of the last sender report as set with rtp_source_process_sr(). * * Returns: %TRUE if there was a valid SR report. */ gboolean -rtp_source_get_last_sr (RTPSource * src, guint64 * ntptime, guint32 * rtptime, - guint32 * packet_count, guint32 * octet_count, GstClockTime * time) +rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime, + guint32 * rtptime, guint32 * packet_count, guint32 * octet_count) { RTPSenderReport *curr; |