diff options
Diffstat (limited to 'gst/rtpmanager/rtpsession.c')
-rw-r--r-- | gst/rtpmanager/rtpsession.c | 295 |
1 files changed, 98 insertions, 197 deletions
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 275e7c74..e7f72b40 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -23,6 +23,8 @@ #include <gst/rtp/gstrtcpbuffer.h> #include <gst/netbuffer/gstnetbuffer.h> +#include "gstrtpbin-marshal.h" + #include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); @@ -332,8 +334,8 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.process_rtp = callbacks->process_rtp; sess->callbacks.send_rtp = callbacks->send_rtp; sess->callbacks.send_rtcp = callbacks->send_rtcp; + sess->callbacks.sync_rtcp = callbacks->sync_rtcp; sess->callbacks.clock_rate = callbacks->clock_rate; - sess->callbacks.get_time = callbacks->get_time; sess->callbacks.reconsider = callbacks->reconsider; sess->user_data = user_data; } @@ -911,13 +913,14 @@ rtp_session_create_source (RTPSession * sess) */ static void update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, - gboolean rtp, GstBuffer * buffer) + gboolean rtp, GstBuffer * buffer, guint64 ntpnstime) { - /* get time or arrival */ - if (sess->callbacks.get_time) - arrival->time = sess->callbacks.get_time (sess, sess->user_data); - else - arrival->time = GST_CLOCK_TIME_NONE; + GTimeVal current; + + /* get time of arrival */ + g_get_current_time (¤t); + arrival->time = GST_TIMEVAL_TO_TIME (current); + arrival->ntpnstime = ntpnstime; /* get packet size including header overhead */ arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len; @@ -941,6 +944,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * rtp_session_process_rtp: * @sess: and #RTPSession * @buffer: an RTP buffer + * @ntpnstime: the NTP arrival time in nanoseconds * * Process an RTP buffer in the session manager. This function takes ownership * of @buffer. @@ -948,7 +952,8 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, + guint64 ntpnstime) { GstFlowReturn result; guint32 ssrc; @@ -965,7 +970,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, TRUE, buffer); + update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime); /* ignore more RTP packets when we left the session */ if (sess->source->received_bye) @@ -1047,6 +1052,33 @@ ignore: } } +static void +rtp_session_process_rb (RTPSession * sess, RTPSource * source, + GstRTCPPacket * packet, RTPArrivalStats * arrival) +{ + guint count, i; + + count = gst_rtcp_packet_get_rb_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc, exthighestseq, jitter, lsr, dlsr; + guint8 fractionlost; + gint32 packetslost; + + gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, + &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + + GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter); + + if (ssrc == sess->source->ssrc) { + /* only deal with report blocks for our session, we update the stats of + * the sender of the RTCP message. We could also compare our stats against + * the other sender to see if we are better or worse. */ + rtp_source_process_rb (source, arrival->time, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); + } + } +} + /* A Sender report contains statistics about how the sender is doing. This * includes timing informataion such as the relation between RTP and NTP * timestamps and the number of packets/bytes it sent to us. @@ -1062,7 +1094,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, { guint32 senderssrc, rtptime, packet_count, octet_count; guint64 ntptime; - guint count, i; RTPSource *source; gboolean created, prevsender; @@ -1074,11 +1105,13 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + GST_BUFFER_OFFSET (packet->buffer) = source->clock_base; + prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ - rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count, - arrival->time); + rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count, + octet_count); if (prevsender != RTP_SOURCE_IS_SENDER (source)) { sess->stats.sender_sources++; @@ -1089,25 +1122,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter); - - if (ssrc == sess->source->ssrc) { - /* only deal with report blocks for our session, we update the stats of - * the sender of the RTCP message. We could also compare our stats against - * the other sender to see if we are better or worse. */ - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* A receiver report contains statistics about how a receiver is doing. It @@ -1121,7 +1136,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, RTPArrivalStats * arrival) { guint32 senderssrc; - guint count, i; RTPSource *source; gboolean created; @@ -1134,20 +1148,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - if (ssrc == sess->source->ssrc) { - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* FIXME, we're just printing this for now... */ @@ -1280,7 +1281,8 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, * @sess: and #RTPSession * @buffer: an RTCP buffer * - * Process an RTCP buffer in the session manager. + * Process an RTCP buffer in the session manager. This function takes ownership + * of @buffer. * * Returns: a #GstFlowReturn. */ @@ -1288,8 +1290,9 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) { GstRTCPPacket packet; - gboolean more, is_bye = FALSE; + gboolean more, is_bye = FALSE, is_sr = FALSE; RTPArrivalStats arrival; + GstFlowReturn result = GST_FLOW_OK; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1301,7 +1304,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer); + update_arrival_stats (sess, &arrival, FALSE, buffer, -1); if (sess->sent_bye) goto ignore; @@ -1322,6 +1325,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) switch (type) { case GST_RTCP_TYPE_SR: rtp_session_process_sr (sess, &packet, &arrival); + is_sr = TRUE; break; case GST_RTCP_TYPE_RR: rtp_session_process_rr (sess, &packet, &arrival); @@ -1357,14 +1361,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) } RTP_SESSION_UNLOCK (sess); - gst_buffer_unref (buffer); + /* notify caller of sr packets in the callback */ + if (is_sr && sess->callbacks.sync_rtcp) + result = sess->callbacks.sync_rtcp (sess, sess->source, buffer, + sess->user_data); + else + gst_buffer_unref (buffer); - return GST_FLOW_OK; + return result; /* ERRORS */ invalid_packet: { GST_DEBUG ("invalid RTCP packet received"); + gst_buffer_unref (buffer); return GST_FLOW_OK; } ignore: @@ -1380,6 +1390,7 @@ ignore: * rtp_session_send_rtp: * @sess: an #RTPSession * @buffer: an RTP buffer + * @ntptime: the NTP time of when this buffer was captured. * * Send the RTP buffer in the session manager. This function takes ownership of * @buffer. @@ -1387,11 +1398,12 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + GTimeVal current; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1405,14 +1417,13 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) source = sess->source; /* update last activity */ - if (sess->callbacks.get_time) - source->last_rtp_activity = - sess->callbacks.get_time (sess, sess->user_data); + g_get_current_time (¤t); + source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current); prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (sess->source, buffer); + result = rtp_source_send_rtp (sess->source, buffer, ntptime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; @@ -1429,36 +1440,6 @@ invalid_packet: } } -/** - * rtp_session_set_send_sync - * @sess: an #RTPSession - * @base_time: the clock base time - * @start_time: the timestamp start time - * - * Establish a relation between the times returned by the get_time callback and - * the buffer timestamps. This information is used to convert the NTP times to - * RTP timestamps. - */ -void -rtp_session_set_base_time (RTPSession * sess, GstClockTime base_time) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->base_time = base_time; - RTP_SESSION_UNLOCK (sess); -} - -void -rtp_session_set_timestamp_sync (RTPSession * sess, GstClockTime start_timestamp) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->start_timestamp = start_timestamp; - RTP_SESSION_UNLOCK (sess); -} - static GstClockTime calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first) @@ -1498,6 +1479,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) GstFlowReturn result = GST_FLOW_OK; RTPSource *source; GstClockTime current, interval; + GTimeVal curtv; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -1518,10 +1500,8 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) sess->sent_bye = FALSE; /* get current time */ - if (sess->callbacks.get_time) - current = sess->callbacks.get_time (sess, sess->user_data); - else - current = 0; + g_get_current_time (&curtv); + current = GST_TIMEVAL_TO_TIME (curtv); /* reschedule transmission */ sess->last_rtcp_send_time = current; @@ -1543,12 +1523,12 @@ done: /** * rtp_session_next_timeout: * @sess: an #RTPSession - * @time: the current time + * @time: the current system time * * Get the next time we should perform session maintenance tasks. * * Returns: a time when rtp_session_on_timeout() should be called with the - * current time. + * current system time. */ GstClockTime rtp_session_next_timeout (RTPSession * sess, GstClockTime time) @@ -1588,6 +1568,7 @@ typedef struct RTPSession *sess; GstBuffer *rtcp; GstClockTime time; + guint64 ntpnstime; GstClockTime interval; GstRTCPPacket packet; gboolean is_bye; @@ -1605,60 +1586,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data) if (RTP_SOURCE_IS_SENDER (own)) { guint64 ntptime; guint32 rtptime; - GstClockTime running_time; - GstClockTimeDiff diff; + guint32 packet_count, octet_count; /* we are a sender, create SR */ GST_DEBUG ("create SR for SSRC %08x", own->ssrc); gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); - /* use the sync params to interpollate the date->time member to rtptime. We - * use the last sent timestamp and rtptime as reference points. We assume - * that the slope of the rtptime vs timestamp curve is 1, which is certainly - * sufficient for the frequency at which we report SR and the rate we send - * out RTP packets. */ - rtptime = own->last_rtptime; - GST_DEBUG ("last_timestamp %" GST_TIME_FORMAT ", last_rtptime %" - G_GUINT32_FORMAT, GST_TIME_ARGS (own->last_timestamp), rtptime); - - if (own->clock_rate != -1) { - /* Start by calculating the running_time of the timestamp, this is a result - * in nanoseconds. */ - running_time = - (own->last_timestamp - sess->start_timestamp) + sess->base_time; - - /* get the diff with the SR time */ - diff = GST_CLOCK_DIFF (running_time, data->time); - - /* now translate the diff to RTP time, handle positive and negative cases. - * If there is no diff, we already set rtptime correctly above. */ - if (diff > 0) { - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime += gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } else { - diff = -diff; - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime -= gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } - } else { - GST_WARNING ("no clock-rate, cannot interpollate rtp time"); - } - - /* convert clock time to NTP time. upper 32 bits should contain the seconds - * and the lower 32 bits, the fractions of a second. */ - ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND); - /* conversion from unix timestamp (seconds since 1970) to NTP (seconds - * since 1900). FIXME nothing says that the time is in unix timestamps. */ - ntptime += (2208988800LL << 32); + /* get latest stats */ + rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime, + &packet_count, &octet_count); + /* store stats */ + rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count, + octet_count); - GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, - (guint32) (ntptime >> 32), (guint32) (ntptime & 0xffffffff), rtptime); - - /* fill in sender report info, FIXME RTP timestamps missing */ + /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent); + ntptime, rtptime, packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -1681,63 +1624,18 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { /* only report about other sender sources */ if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) { - RTPSourceStats *stats; - guint64 extended_max, expected; - guint64 expected_interval, received_interval, ntptime; - gint64 lost, lost_interval; - guint32 fraction, LSR, DLSR; - GstClockTime time; - - stats = &source->stats; - - extended_max = stats->cycles + stats->max_seq; - expected = extended_max - stats->base_seq + 1; - - GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT - ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, - extended_max, expected, stats->packets_received, stats->base_seq); + guint8 fractionlost; + gint32 packetslost; + guint32 exthighestseq, jitter; + guint32 lsr, dlsr; - lost = expected - stats->packets_received; - lost = CLAMP (lost, -0x800000, 0x7fffff); - - expected_interval = expected - stats->prev_expected; - stats->prev_expected = expected; - received_interval = stats->packets_received - stats->prev_received; - stats->prev_received = stats->packets_received; - - lost_interval = expected_interval - received_interval; - - if (expected_interval == 0 || lost_interval <= 0) - fraction = 0; - else - fraction = (lost_interval << 8) / expected_interval; - - GST_DEBUG ("add RR for SSRC %08x", source->ssrc); - /* we scaled the jitter up for additional precision */ - GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT - ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, - extended_max, stats->jitter >> 4); - - if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) { - GstClockTime diff; - - /* LSR is middle bits of the last ntptime */ - LSR = (ntptime >> 16) & 0xffffffff; - diff = data->time - time; - GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); - /* DLSR, delay since last SR is expressed in 1/65536 second units */ - DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); - } else { - /* No valid SR received, LSR/DLSR are set to 0 then */ - GST_DEBUG ("no valid SR received"); - LSR = 0; - DLSR = 0; - } - GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR); + /* get new stats */ + rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost, + &exthighestseq, &jitter, &lsr, &dlsr); /* packet is not yet filled, add report block for this source. */ - gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost, - extended_max, stats->jitter >> 4, LSR, DLSR); + gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); } } } @@ -1784,7 +1682,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (is_sender) { if (data->time > source->last_rtp_activity) { interval = MAX (data->interval * 2, 5 * GST_SECOND); - if (data->time - source->last_rtp_activity > interval) { GST_DEBUG ("sender source %08x timed out and became receiver, last %" GST_TIME_FORMAT, source->ssrc, @@ -1897,6 +1794,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) /** * rtp_session_on_timeout: * @sess: an #RTPSession + * @time: the current system time + * @ntpnstime: the current NTP time in nanoseconds * * Perform maintenance actions after the timeout obtained with * rtp_session_next_timeout() expired. @@ -1910,21 +1809,23 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_on_timeout (RTPSession * sess, GstClockTime time) +rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; ReportData data; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime)); + data.sess = sess; data.rtcp = NULL; data.time = time; + data.ntpnstime = ntpnstime; data.is_bye = FALSE; data.has_sdes = FALSE; - GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time)); - RTP_SESSION_LOCK (sess); /* get a new interval, we need this for various cleanups etc */ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); |