diff options
Diffstat (limited to 'gst/rtpmanager')
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 112 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsession.c | 541 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsession.h | 29 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsource.h | 4 | ||||
-rw-r--r-- | gst/rtpmanager/rtpstats.c | 134 | ||||
-rw-r--r-- | gst/rtpmanager/rtpstats.h | 17 |
6 files changed, 656 insertions, 181 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index bc297235..9545a92d 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -144,13 +144,15 @@ static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); static GstClockTime gst_rtp_session_get_time (RTPSession * sess, gpointer user_data); +static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, gst_rtp_session_send_rtp, gst_rtp_session_send_rtcp, gst_rtp_session_clock_rate, - gst_rtp_session_get_time + gst_rtp_session_get_time, + gst_rtp_session_reconsider }; /* GObject vmethods */ @@ -293,44 +295,39 @@ rtcp_thread (GstRTPSession * rtpsession) { GstClock *clock; GstClockID id; - gdouble interval; GstClockTime current_time; - GstClockTime next_rtcp_check_time; - GstClockTime new_rtcp_send_time; - GstClockTime last_rtcp_send_time; - GstClockTimeDiff jitter; - guint members, prev_members; + GstClockTime next_timeout; clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); if (clock == NULL) return; + current_time = gst_clock_get_time (clock); + GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); GST_RTP_SESSION_LOCK (rtpsession); - /* get initial estimate */ - interval = rtp_session_get_reporting_interval (rtpsession->priv->session); - current_time = gst_clock_get_time (clock); - last_rtcp_send_time = current_time; - next_rtcp_check_time = current_time + (GST_SECOND * interval); - /* we keep track of members before and after the timeout to do reverse - * reconsideration. */ - prev_members = rtp_session_get_num_active_sources (rtpsession->priv->session); - - GST_DEBUG_OBJECT (rtpsession, "first RTCP interval: %lf seconds", interval); - while (!rtpsession->priv->stop_thread) { GstClockReturn res; + /* get initial estimate */ + next_timeout = + rtp_session_next_timeout (rtpsession->priv->session, current_time); + + GST_DEBUG_OBJECT (rtpsession, "next check time %" GST_TIME_FORMAT, - GST_TIME_ARGS (next_rtcp_check_time)); + GST_TIME_ARGS (next_timeout)); + + /* leave if no more timeouts, the session ended */ + if (next_timeout == GST_CLOCK_TIME_NONE) + break; id = rtpsession->priv->id = - gst_clock_new_single_shot_id (clock, next_rtcp_check_time); + gst_clock_new_single_shot_id (clock, next_timeout); GST_RTP_SESSION_UNLOCK (rtpsession); - res = gst_clock_id_wait (id, &jitter); + res = gst_clock_id_wait (id, NULL); GST_RTP_SESSION_LOCK (rtpsession); gst_clock_id_unref (id); @@ -339,52 +336,16 @@ rtcp_thread (GstRTPSession * rtpsession) if (rtpsession->priv->stop_thread) break; - if (res != GST_CLOCK_UNSCHEDULED) - if (jitter < 0) - current_time = next_rtcp_check_time; - else - current_time = next_rtcp_check_time - jitter; - else - current_time = gst_clock_get_time (clock); - - GST_DEBUG_OBJECT (rtpsession, "unlocked %d, jitter %" G_GINT64_FORMAT - ", current %" GST_TIME_FORMAT, res, jitter, - GST_TIME_ARGS (current_time)); - - members = rtp_session_get_num_active_sources (rtpsession->priv->session); - - if (members < prev_members) { - GstClockTime time_remaining; - - /* some members went away */ - GST_DEBUG_OBJECT (rtpsession, "reverse reconsideration"); - time_remaining = next_rtcp_check_time - current_time; - new_rtcp_send_time = - current_time + (time_remaining * members / prev_members); - } else { - interval = rtp_session_get_reporting_interval (rtpsession->priv->session); - GST_DEBUG_OBJECT (rtpsession, "forward reconsideration: %lf seconds", - interval); - new_rtcp_send_time = (interval * GST_SECOND) + last_rtcp_send_time; - } - prev_members = members; - - if (current_time >= new_rtcp_send_time) { - GST_DEBUG_OBJECT (rtpsession, "sending RTCP now"); - - /* make the session manager produce RTCP, we ignore the result. */ - rtp_session_perform_reporting (rtpsession->priv->session); - - interval = rtp_session_get_reporting_interval (rtpsession->priv->session); - - GST_DEBUG_OBJECT (rtpsession, "next RTCP interval: %lf seconds", - interval); - next_rtcp_check_time = (interval * GST_SECOND) + current_time; - last_rtcp_send_time = current_time; - } else { - GST_DEBUG_OBJECT (rtpsession, "reconsider RTCP"); - next_rtcp_check_time = new_rtcp_send_time; - } + /* update current time */ + current_time = gst_clock_get_time (clock); + + /* we get unlocked because we need to perform reconsideration, don't perform + * the timeout but get a new reporting estimate. */ + GST_DEBUG_OBJECT (rtpsession, "unlocked %d, current %" GST_TIME_FORMAT, + res, GST_TIME_ARGS (current_time)); + + /* perform actions, we ignore result. */ + rtp_session_on_timeout (rtpsession->priv->session, current_time); } GST_RTP_SESSION_UNLOCK (rtpsession); @@ -536,6 +497,8 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); + gst_util_dump_mem (GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer)); + if (rtpsession->send_rtcp_src) { result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -616,6 +579,21 @@ gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) return result; } +/* called when the session manager asks us to reconsider the timeout */ +static void +gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data) +{ + GstRTPSession *rtpsession; + + rtpsession = GST_RTP_SESSION_CAST (user_data); + + GST_RTP_SESSION_LOCK (rtpsession); + GST_DEBUG_OBJECT (rtpsession, "unlock timer for reconsideration"); + if (rtpsession->priv->id) + gst_clock_id_unschedule (rtpsession->priv->id); + GST_RTP_SESSION_UNLOCK (rtpsession); +} + static GstFlowReturn gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) { diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 1f6e1ebc..7244f5fb 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -35,6 +35,8 @@ enum SIGNAL_ON_SSRC_COLLISION, SIGNAL_ON_SSRC_VALIDATED, SIGNAL_ON_BYE_SSRC, + SIGNAL_ON_BYE_TIMEOUT, + SIGNAL_ON_TIMEOUT, LAST_SIGNAL }; @@ -46,6 +48,14 @@ enum PROP_0 }; +/* update average packet size, we keep this scaled by 16 to keep enough + * precision. */ +#define UPDATE_AVG(avg, val) \ + if ((avg) == 0) \ + (avg) = (val) << 4; \ + else \ + (avg) = ((val) + (15 * (avg))) >> 4; + /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -119,6 +129,30 @@ rtp_session_class_init (RTPSessionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc), NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, G_TYPE_OBJECT); + /** + * RTPSession::on-bye-timeout: + * @session: the object which received the signal + * @src: the RTPSource that timed out + * + * Notify of an SSRC that has timed out because of BYE + */ + rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] = + g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); + /** + * RTPSession::on-timeout: + * @session: the object which received the signal + * @src: the RTPSource that timed out + * + * Notify of an SSRC that has timed out + */ + rtp_session_signals[SIGNAL_ON_TIMEOUT] = + g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout), + NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, + G_TYPE_OBJECT); GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session"); } @@ -144,6 +178,7 @@ rtp_session_init (RTPSession * sess) /* create an active SSRC for this session manager */ sess->source = rtp_session_create_source (sess); + sess->source->validated = TRUE; sess->stats.active_sources++; /* default UDP header length */ @@ -156,6 +191,8 @@ rtp_session_init (RTPSession * sess) sess->name = g_strdup (g_get_real_name ()); sess->tool = g_strdup ("GStreamer"); + sess->first_rtcp = TRUE; + GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); } @@ -176,6 +213,7 @@ rtp_session_finalize (GObject * object) g_free (sess->cname); g_free (sess->tool); + g_free (sess->bye_reason); G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); } @@ -233,9 +271,22 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source) static void on_bye_ssrc (RTPSession * sess, RTPSource * source) { + /* notify app that reconsideration should be performed */ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source); } +static void +on_bye_timeout (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source); +} + +static void +on_timeout (RTPSession * sess, RTPSource * source) +{ + g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source); +} + /** * rtp_session_new: * @@ -272,6 +323,7 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.send_rtcp = callbacks->send_rtcp; sess->callbacks.clock_rate = callbacks->clock_rate; sess->callbacks.get_time = callbacks->get_time; + sess->callbacks.reconsider = callbacks->reconsider; sess->user_data = user_data; } @@ -657,6 +709,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, if (check_collision (sess, source, arrival)) on_ssrc_collision (sess, source); } + /* update last activity */ + source->last_activity = arrival->time; + if (rtp) + source->last_rtp_activity = arrival->time; + return source; } @@ -819,6 +876,7 @@ rtp_session_create_source (RTPSession * sess) break; } source = rtp_source_new (ssrc); + g_object_ref (source); g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), source); /* we have one more source now */ @@ -831,6 +889,7 @@ rtp_session_create_source (RTPSession * sess) /* update the RTPArrivalStats structure with the current time and other bits * about the current buffer we are handling. * This function is typically called when a validated packet is received. + * This function should be called with the SESSION_LOCK */ static void update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, @@ -842,9 +901,14 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, else arrival->time = GST_CLOCK_TIME_NONE; - /* update sizes */ - arrival->bytes = GST_BUFFER_SIZE (buffer) + 28; - arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0); + /* get packet size including header overhead */ + arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len; + + if (rtp) { + arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer); + } else { + arrival->payload_len = 0; + } /* for netbuffer we can store the IP address to check for collisions */ arrival->have_address = GST_IS_NETBUFFER (buffer); @@ -881,13 +945,16 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) if (!gst_rtp_buffer_validate (buffer)) goto invalid_packet; + RTP_SESSION_LOCK (sess); /* update arrival stats */ update_arrival_stats (sess, &arrival, TRUE, buffer); + /* ignore more RTP packets when we left the session */ + if (sess->source->received_bye) + goto ignore; + /* get SSRC and look up in session database */ ssrc = gst_rtp_buffer_get_ssrc (buffer); - - RTP_SESSION_LOCK (sess); source = obtain_source (sess, ssrc, &created, &arrival, TRUE); prevsender = RTP_SOURCE_IS_SENDER (source); @@ -930,6 +997,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) /* get source */ csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE); + if (created) { GST_DEBUG ("created new CSRC: %08x", csrc); rtp_source_set_as_csrc (csrc_src); @@ -948,9 +1016,17 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) /* ERRORS */ invalid_packet: { + gst_buffer_unref (buffer); GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } +ignore: + { + gst_buffer_unref (buffer); + RTP_SESSION_UNLOCK (sess); + GST_DEBUG ("ignoring RTP packet because we are leaving"); + return GST_FLOW_OK; + } } /* A Sender report contains statistics about how the sender is doing. This @@ -977,7 +1053,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("got SR packet: SSRC %08x", senderssrc); - RTP_SESSION_LOCK (sess); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); prevsender = RTP_SOURCE_IS_SENDER (source); @@ -1012,7 +1087,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, exthighestseq, jitter, lsr, dlsr); } } - RTP_SESSION_UNLOCK (sess); } /* A receiver report contains statistics about how a receiver is doing. It @@ -1034,7 +1108,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("got RR packet: SSRC %08x", senderssrc); - RTP_SESSION_LOCK (sess); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); if (created) @@ -1054,7 +1127,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, exthighestseq, jitter, lsr, dlsr); } } - RTP_SESSION_UNLOCK (sess); } /* FIXME, we're just printing this for now... */ @@ -1113,20 +1185,25 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, guint32 ssrc; RTPSource *source; gboolean created, prevactive, prevsender; + guint pmembers, members; ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); GST_DEBUG ("SSRC: %08x", ssrc); /* find src and mark bye, no probation when dealing with RTCP */ - RTP_SESSION_LOCK (sess); source = obtain_source (sess, ssrc, &created, arrival, FALSE); + /* store time for when we need to time out this source */ + source->bye_time = arrival->time; + prevactive = RTP_SOURCE_IS_ACTIVE (source); prevsender = RTP_SOURCE_IS_SENDER (source); /* let the source handle the rest */ rtp_source_process_bye (source, reason); + pmembers = sess->stats.active_sources; + if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) { sess->stats.active_sources--; GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc, @@ -1137,12 +1214,34 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc, sess->stats.sender_sources); } + members = sess->stats.active_sources; + + if (!sess->source->received_bye && members < pmembers) { + /* some members went away since the previous timeout estimate. + * Perform reverse reconsideration but only when we are not scheduling a + * BYE ourselves. */ + if (arrival->time < sess->next_rtcp_check_time) { + GstClockTime time_remaining; + + time_remaining = sess->next_rtcp_check_time - arrival->time; + sess->next_rtcp_check_time = + gst_util_uint64_scale (time_remaining, members, pmembers); + + GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT, + GST_TIME_ARGS (sess->next_rtcp_check_time)); + + sess->next_rtcp_check_time += arrival->time; + + /* notify app of reconsideration */ + if (sess->callbacks.reconsider) + sess->callbacks.reconsider (sess, sess->user_data); + } + } if (created) on_new_ssrc (sess, source); on_bye_ssrc (sess, source); - RTP_SESSION_UNLOCK (sess); } g_free (reason); } @@ -1167,9 +1266,8 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) { GstRTCPPacket packet; - gboolean more; + gboolean more, is_bye = FALSE; RTPArrivalStats arrival; - guint size; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1177,27 +1275,29 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) if (!gst_rtcp_buffer_validate (buffer)) goto invalid_packet; - /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer); - GST_DEBUG ("received RTCP packet"); - /* get packet size including header overhead */ RTP_SESSION_LOCK (sess); - size = GST_BUFFER_SIZE (buffer) + sess->header_len; + /* update arrival stats */ + update_arrival_stats (sess, &arrival, FALSE, buffer); - /* update average RTCP packet size */ - if (sess->stats.avg_rtcp_packet_size == 0) - sess->stats.avg_rtcp_packet_size = size; - else - sess->stats.avg_rtcp_packet_size = - (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4; - RTP_SESSION_UNLOCK (sess); + if (sess->sent_bye) + goto ignore; /* start processing the compound packet */ more = gst_rtcp_buffer_get_first_packet (buffer, &packet); while (more) { - switch (gst_rtcp_packet_get_type (&packet)) { + GstRTCPType type; + + type = gst_rtcp_packet_get_type (&packet); + + /* when we are leaving the session, we should ignore all non-BYE messages */ + if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) { + GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving"); + goto next; + } + + switch (type) { case GST_RTCP_TYPE_SR: rtp_session_process_sr (sess, &packet, &arrival); break; @@ -1208,6 +1308,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) rtp_session_process_sdes (sess, &packet, &arrival); break; case GST_RTCP_TYPE_BYE: + is_bye = TRUE; rtp_session_process_bye (sess, &packet, &arrival); break; case GST_RTCP_TYPE_APP: @@ -1217,9 +1318,23 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) GST_WARNING ("got unknown RTCP packet"); break; } + next: more = gst_rtcp_packet_move_to_next (&packet); } + /* if we are scheduling a BYE, we only want to count bye packets, else we + * count everything */ + if (sess->source->received_bye) { + if (is_bye) { + sess->stats.bye_members++; + UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes); + } + } else { + /* keep track of average packet size */ + UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes); + } + RTP_SESSION_UNLOCK (sess); + gst_buffer_unref (buffer); return GST_FLOW_OK; @@ -1230,11 +1345,18 @@ invalid_packet: GST_DEBUG ("invalid RTCP packet received"); return GST_FLOW_OK; } +ignore: + { + gst_buffer_unref (buffer); + RTP_SESSION_UNLOCK (sess); + GST_DEBUG ("ignoring RTP packet because we left"); + return GST_FLOW_OK; + } } /** * rtp_session_send_rtp: - * @sess: and #RTPSession + * @sess: an #RTPSession * @buffer: an RTP buffer * * Send the RTP buffer in the session manager. @@ -1266,25 +1388,125 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) return result; } +static GstClockTime +calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, + gboolean first) +{ + GstClockTime result; + + if (sess->source->received_bye) { + result = rtp_stats_calculate_rtcp_interval (&sess->stats, + RTP_SOURCE_IS_SENDER (sess->source), first); + } else { + result = rtp_stats_calculate_bye_interval (&sess->stats); + } + + GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT, + GST_TIME_ARGS (result)); + + if (!deterministic) + result = rtp_stats_add_rtcp_jitter (&sess->stats, result); + + GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); + + return result; +} + /** - * rtp_session_get_reporting_interval: + * rtp_session_send_bye: * @sess: an #RTPSession + * @reason: a reason or NULL * - * Get the interval for sending out the next RTCP packet and doing general - * maintenance tasks. + * Stop the current @sess and schedule a BYE message for the other members. * - * Returns: an interval in seconds. + * Returns: a #GstFlowReturn. */ -gdouble -rtp_session_get_reporting_interval (RTPSession * sess) +GstFlowReturn +rtp_session_send_bye (RTPSession * sess, const gchar * reason) +{ + GstFlowReturn result = GST_FLOW_OK; + RTPSource *source; + GstClockTime current, interval; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + + RTP_SESSION_LOCK (sess); + source = sess->source; + + /* ignore more BYEs */ + if (source->received_bye) + goto done; + + /* we have BYE now */ + source->received_bye = TRUE; + /* at least one member wants to send a BYE */ + sess->bye_reason = g_strdup (reason); + sess->stats.avg_rtcp_packet_size = 100; + sess->stats.bye_members = 1; + sess->first_rtcp = TRUE; + sess->sent_bye = FALSE; + + /* get current time */ + if (sess->callbacks.get_time) + current = sess->callbacks.get_time (sess, sess->user_data); + else + current = 0; + + /* reschedule transmission */ + sess->last_rtcp_send_time = current; + interval = calculate_rtcp_interval (sess, FALSE, TRUE); + sess->next_rtcp_check_time = current + interval; + + GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT, + GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time)); + + /* notify app of reconsideration */ + if (sess->callbacks.reconsider) + sess->callbacks.reconsider (sess, sess->user_data); +done: + RTP_SESSION_UNLOCK (sess); + + return result; +} + +/** + * rtp_session_next_timeout: + * @sess: an #RTPSession + * @time: the current time + * + * Get the next time we should perform session maintenance tasks. + * + * Returns: a time when rtp_session_on_timeout() should be called with the + * current time. + */ +GstClockTime +rtp_session_next_timeout (RTPSession * sess, GstClockTime time) { - gdouble result; + GstClockTime result; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); RTP_SESSION_LOCK (sess); - result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE); - result = rtp_stats_add_rtcp_jitter (&sess->stats, result); + + result = sess->next_rtcp_check_time; + + if (sess->source->received_bye) { + if (sess->sent_bye) + result = GST_CLOCK_TIME_NONE; + else if (sess->stats.active_sources >= 50) + /* reconsider BYE if members >= 50 */ + result = time + calculate_rtcp_interval (sess, FALSE, TRUE);; + } else { + if (sess->first_rtcp) + /* we are called for the first time */ + result = time + calculate_rtcp_interval (sess, FALSE, TRUE); + else if (sess->next_rtcp_check_time < time) + /* get a new timeout when we need to */ + result = time + calculate_rtcp_interval (sess, FALSE, FALSE); + } + sess->next_rtcp_check_time = result; + + GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result)); RTP_SESSION_UNLOCK (sess); return result; @@ -1295,34 +1517,46 @@ typedef struct RTPSession *sess; GstBuffer *rtcp; GstClockTime time; + GstClockTime interval; GstRTCPPacket packet; + gboolean is_bye; + gboolean has_sdes; } ReportData; static void +session_start_rtcp (RTPSession * sess, ReportData * data) +{ + GstRTCPPacket *packet = &data->packet; + RTPSource *own = sess->source; + + data->rtcp = gst_rtcp_buffer_new (sess->mtu); + + if (RTP_SOURCE_IS_SENDER (own)) { + /* we are a sender, create SR */ + GST_DEBUG ("create SR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); + + /* fill in sender report info, FIXME NTP and RTP timestamps missing */ + gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, + 0, 0, own->stats.packets_sent, own->stats.octets_sent); + } else { + /* we are only receiver, create RR */ + GST_DEBUG ("create RR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet); + gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc); + } +} + +/* construct a Sender or Receiver Report */ +static void session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) { RTPSession *sess = data->sess; - RTPSource *own = sess->source; GstRTCPPacket *packet = &data->packet; /* create a new buffer if needed */ if (data->rtcp == NULL) { - data->rtcp = gst_rtcp_buffer_new (sess->mtu); - - if (RTP_SOURCE_IS_SENDER (own)) { - /* we are a sender, create SR */ - GST_DEBUG ("create SR for SSRC %08x", own->ssrc); - gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); - - /* fill in sender report info */ - gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - 0, 0, own->stats.packets_sent, own->stats.octets_sent); - } else { - /* we are only receiver, create RR */ - GST_DEBUG ("create RR for SSRC %08x", own->ssrc); - gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet); - gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc); - } + session_start_rtcp (sess, data); } if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { /* only report about other sender sources */ @@ -1381,16 +1615,85 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) } } +/* perform cleanup of sources that timed out */ +static gboolean +session_cleanup (const gchar * key, RTPSource * source, ReportData * data) +{ + gboolean remove = FALSE; + gboolean byetimeout = FALSE; + gboolean is_sender, is_active; + RTPSession *sess = data->sess; + GstClockTime interval; + + is_sender = RTP_SOURCE_IS_SENDER (source); + is_active = RTP_SOURCE_IS_ACTIVE (source); + + /* check for our own source, we don't want to delete our own source. */ + if (!(source == sess->source)) { + if (source->received_bye) { + /* if we received a BYE from the source, remove the source after some + * time. */ + if (data->time > source->bye_time && + data->time - source->bye_time > sess->stats.bye_timeout) { + GST_DEBUG ("removing BYE source %08x", source->ssrc); + remove = TRUE; + byetimeout = TRUE; + } + } + /* sources that were inactive for more than 5 times the deterministic reporting + * interval get timed out. the min timeout is 5 seconds. */ + if (data->time > source->last_activity) { + interval = MAX (data->interval * 5, 5 * GST_SECOND); + if (data->time - source->last_activity > interval) { + GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT, + source->ssrc, GST_TIME_ARGS (source->last_activity)); + remove = TRUE; + } + } + } + + /* senders that did not send for a long time become a receiver, this also + * holds for our own source. */ + if (is_sender) { + if (data->time > source->last_rtp_activity) { + interval = MAX (data->interval * 2, 5 * GST_SECOND); + + if (data->time - source->last_rtp_activity > interval) { + GST_DEBUG ("sender source %08x timed out and became receiver, last %" + GST_TIME_FORMAT, source->ssrc, + GST_TIME_ARGS (source->last_rtp_activity)); + source->is_sender = FALSE; + sess->stats.sender_sources--; + } + } + } + + if (remove) { + sess->total_sources--; + if (is_sender) + sess->stats.sender_sources--; + if (is_active) + sess->stats.active_sources--; + + if (byetimeout) + on_bye_timeout (sess, source); + else + on_timeout (sess, source); + + } + return remove; +} + static void -session_sdes (RTPSession * sess, GstBuffer * buffer) +session_sdes (RTPSession * sess, ReportData * data) { - GstRTCPPacket packet; + GstRTCPPacket *packet = &data->packet; /* add SDES packet */ - gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_SDES, &packet); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet); - gst_rtcp_packet_sdes_add_item (&packet, sess->source->ssrc); - gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_CNAME, + gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc); + gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME, strlen (sess->cname), (guint8 *) sess->cname); /* other SDES items must only be added at regular intervals and only when the @@ -1401,20 +1704,87 @@ session_sdes (RTPSession * sess, GstBuffer * buffer) gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL, strlen (sess->tool), (guint8 *) sess->tool); #endif + + data->has_sdes = TRUE; +} + +/* schedule a BYE packet */ +static void +session_bye (RTPSession * sess, ReportData * data) +{ + GstRTCPPacket *packet = &data->packet; + + /* open packet */ + session_start_rtcp (sess, data); + + /* add SDES */ + session_sdes (sess, data); + + /* add a BYE packet */ + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet); + gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc); + if (sess->bye_reason) + gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason); + + /* we have a BYE packet now */ + data->is_bye = TRUE; +} + +static gboolean +is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) +{ + GstClockTime new_send_time; + gboolean result; + + /* no need to check yet */ + if (sess->next_rtcp_check_time > time) { + GST_DEBUG ("no check time yet"); + return FALSE; + } + + /* perform forward reconsideration */ + new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval); + + GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + + new_send_time += sess->last_rtcp_send_time; + + /* check if reconsideration */ + if (time < new_send_time) { + GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + result = FALSE; + /* store new check time */ + sess->next_rtcp_check_time = new_send_time; + } else { + result = TRUE; + new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE); + + GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_send_time)); + sess->next_rtcp_check_time = time + new_send_time; + } + return result; } /** - * rtp_session_perform_reporting: + * rtp_session_on_timeout: * @sess: an #RTPSession * - * Instruct the session manager to generate RTCP packets with current stats. - * This function will call the #RTPSessionSendRTCP callback, possibly multiple + * Perform maintenance actions after the timeout obtained with + * rtp_session_next_timeout() expired. + * + * This function will perform timeouts of receivers and senders, send a BYE + * packet or generate RTCP packets with current session stats. + * + * This function can call the #RTPSessionSendRTCP callback, possibly multiple * times, for each packet that should be processed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_perform_reporting (RTPSession * sess) +rtp_session_on_timeout (RTPSession * sess, GstClockTime time) { GstFlowReturn result = GST_FLOW_OK; ReportData data; @@ -1423,21 +1793,49 @@ rtp_session_perform_reporting (RTPSession * sess) data.sess = sess; data.rtcp = NULL; + data.time = time; + data.is_bye = FALSE; + data.has_sdes = FALSE; - /* get time so it can be used later */ - data.time = sess->callbacks.get_time (sess, sess->user_data); + GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time)); RTP_SESSION_LOCK (sess); - /* loop over all known sources and do something */ - g_hash_table_foreach (sess->ssrcs[sess->mask_idx], - (GHFunc) session_report_blocks, &data); + /* get a new interval, we need this for various cleanups etc */ + data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); + + /* first perform cleanups */ + g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx], + (GHRFunc) session_cleanup, &data); + + /* see if we need to generate SR or RR packets */ + if (is_rtcp_time (sess, time, &data)) { + if (sess->source->received_bye) { + /* generate BYE instead */ + session_bye (sess, &data); + sess->sent_bye = TRUE; + } else { + /* loop over all known sources and do something */ + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) session_report_blocks, &data); + } + } - /* add SDES for this source */ if (data.rtcp) { - session_sdes (sess, data.rtcp); - sess->stats.sent_rtcp = TRUE; - } + guint size; + + /* we keep track of the last report time in order to timeout inactive + * receivers or senders */ + sess->last_rtcp_send_time = data.time; + sess->first_rtcp = FALSE; + /* add SDES for this source when not already added */ + if (!data.has_sdes) + session_sdes (sess, &data); + + /* update average RTCP size before sending */ + size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len; + UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size); + } RTP_SESSION_UNLOCK (sess); /* push out the RTCP packet */ @@ -1451,5 +1849,6 @@ rtp_session_perform_reporting (RTPSession * sess) else gst_buffer_unref (data.rtcp); } + return result; } diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 3554016f..c9a2114f 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -106,6 +106,17 @@ typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data); /** + * RTPSessionReconsider: + * @sess: an #RTPSession + * @user_data: user data specified when registering + * + * This callback will be called when @sess needs to cancel the previous timeout. + * The currently running timeout should be canceled and a new reporting interval + * should be requested from @sess. + */ +typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data); + +/** * RTPSessionCallbacks: * @RTPSessionProcessRTP: callback to process RTP packets * @RTPSessionSendRTP: callback for sending RTP packets @@ -122,6 +133,7 @@ typedef struct { RTPSessionSendRTCP send_rtcp; RTPSessionClockRate clock_rate; RTPSessionGetTime get_time; + RTPSessionReconsider reconsider; } RTPSessionCallbacks; /** @@ -164,6 +176,14 @@ struct _RTPSession { GHashTable *cnames; guint total_sources; + GstClockTime next_rtcp_check_time; + GstClockTime last_rtcp_send_time; + gboolean first_rtcp; + + GstBuffer *bye_packet; + gchar *bye_reason; + gboolean sent_bye; + RTPSessionCallbacks callbacks; gpointer user_data; @@ -185,6 +205,8 @@ struct _RTPSessionClass { void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source); void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source); void (*on_bye_ssrc) (RTPSession *sess, RTPSource *source); + void (*on_bye_timeout) (RTPSession *sess, RTPSource *source); + void (*on_timeout) (RTPSession *sess, RTPSource *source); }; GType rtp_session_get_type (void); @@ -229,8 +251,11 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer /* processing packets for sending */ GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer); +/* stopping the session */ +GstFlowReturn rtp_session_send_bye (RTPSession *sess, const gchar *reason); + /* get interval for next RTCP interval */ -gdouble rtp_session_get_reporting_interval (RTPSession *sess); -GstFlowReturn rtp_session_perform_reporting (RTPSession *sess); +GstClockTime rtp_session_next_timeout (RTPSession *sess, GstClockTime time); +GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time); #endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index f5ca2a1c..0df03f4f 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -136,6 +136,10 @@ struct _RTPSource { guint8 payload; gint clock_rate; + GstClockTime bye_time; + GstClockTime last_activity; + GstClockTime last_rtp_activity; + GQueue *packets; RTPSourceCallbacks callbacks; diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c index 456ed15f..1e18f45e 100644 --- a/gst/rtpmanager/rtpstats.c +++ b/gst/rtpmanager/rtpstats.c @@ -33,63 +33,77 @@ rtp_stats_init_defaults (RTPSessionStats * stats) stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION; stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH; stats->min_interval = RTP_STATS_MIN_INTERVAL; + stats->bye_timeout = RTP_STATS_BYE_TIMEOUT; } /** * rtp_stats_calculate_rtcp_interval: * @stats: an #RTPSessionStats struct + * @sender: if we are a sender + * @first: if this is the first time * * Calculate the RTCP interval. The result of this function is the amount of - * time to wait (in seconds) before sender a new RTCP message. + * time to wait (in nanoseconds) before sending a new RTCP message. * * Returns: the RTCP interval. */ -gdouble -rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender) +GstClockTime +rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean we_send, + gboolean first) { - gdouble active, senders, receivers, sfraction; - gboolean avg_rtcp; + gdouble members, senders, n; + gdouble avg_rtcp_size, rtcp_bw; gdouble interval; + gdouble rtcp_min_time; - active = stats->active_sources; - /* Try to avoid division by zero */ - if (stats->active_sources == 0) - active += 1.0; - senders = (gdouble) stats->sender_sources; - receivers = (gdouble) (active - senders); - avg_rtcp = (gdouble) stats->avg_rtcp_packet_size; - - sfraction = senders / active; + /* Very first call at application start-up uses half the min + * delay for quicker notification while still allowing some time + * before reporting for randomization and to learn about other + * sources so the report interval will converge to the correct + * interval more quickly. + */ + rtcp_min_time = stats->min_interval; + if (first) + rtcp_min_time /= 2.0; - GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f", - senders, receivers, avg_rtcp, sfraction); + /* Dedicate a fraction of the RTCP bandwidth to senders unless + * the number of senders is large enough that their share is + * more than that fraction. + */ + n = members = stats->active_sources; + senders = (gdouble) stats->sender_sources; + rtcp_bw = stats->rtcp_bandwidth; - if (senders > 0 && sfraction <= stats->sender_fraction) { - if (sender) { - interval = - (avg_rtcp * senders) / (stats->sender_fraction * - stats->rtcp_bandwidth); + if (senders <= members * RTP_STATS_SENDER_FRACTION) { + if (we_send) { + rtcp_bw *= RTP_STATS_SENDER_FRACTION; + n = senders; } else { - interval = - (avg_rtcp * receivers) / ((1.0 - - stats->sender_fraction) * stats->rtcp_bandwidth); + rtcp_bw *= RTP_STATS_RECEIVER_FRACTION; + n -= senders; } - } else { - interval = (avg_rtcp * active) / stats->rtcp_bandwidth; } - if (interval < stats->min_interval) - interval = stats->min_interval; - - if (!stats->sent_rtcp) - interval /= 2.0; + avg_rtcp_size = stats->avg_rtcp_packet_size / 16.0; + /* + * The effective number of sites times the average packet size is + * the total number of octets sent when each site sends a report. + * Dividing this by the effective bandwidth gives the time + * interval over which those packets must be sent in order to + * meet the bandwidth target, with a minimum enforced. In that + * time interval we send one report so this time is also our + * average time between reports. + */ + interval = avg_rtcp_size * n / rtcp_bw; + if (interval < rtcp_min_time) + interval = rtcp_min_time; - return interval; + return interval * GST_SECOND; } /** - * rtp_stats_calculate_rtcp_interval: + * rtp_stats_add_rtcp_jitter: * @stats: an #RTPSessionStats struct * @interval: an RTCP interval * @@ -98,14 +112,62 @@ rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender) * * Returns: the new RTCP interval. */ -gdouble -rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval) +GstClockTime +rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, GstClockTime interval) { + gdouble temp; + /* see RFC 3550 p 30 * To compensate for "unconditional reconsideration" converging to a * value below the intended average. */ #define COMPENSATION (2.71828 - 1.5); - return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION; + temp = (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION; + + return (GstClockTime) temp; +} + + +/** + * rtp_stats_calculate_bye_interval: + * @stats: an #RTPSessionStats struct + * + * Calculate the BYE interval. The result of this function is the amount of + * time to wait (in nanoseconds) before sending a BYE message. + * + * Returns: the BYE interval. + */ +GstClockTime +rtp_stats_calculate_bye_interval (RTPSessionStats * stats) +{ + gdouble members; + gdouble avg_rtcp_size, rtcp_bw; + gdouble interval; + gdouble rtcp_min_time; + + rtcp_min_time = (stats->min_interval) / 2.0; + + /* Dedicate a fraction of the RTCP bandwidth to senders unless + * the number of senders is large enough that their share is + * more than that fraction. + */ + members = stats->bye_members; + rtcp_bw = stats->rtcp_bandwidth * RTP_STATS_RECEIVER_FRACTION; + + avg_rtcp_size = stats->avg_rtcp_packet_size / 16.0; + /* + * The effective number of sites times the average packet size is + * the total number of octets sent when each site sends a report. + * Dividing this by the effective bandwidth gives the time + * interval over which those packets must be sent in order to + * meet the bandwidth target, with a minimum enforced. In that + * time interval we send one report so this time is also our + * average time between reports. + */ + interval = avg_rtcp_size * members / rtcp_bw; + if (interval < rtcp_min_time) + interval = rtcp_min_time; + + return interval * GST_SECOND; } diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index e8ea9816..0ee1ed1e 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -134,7 +134,7 @@ typedef struct { * a network partition. */ #define RTP_STATS_MIN_INTERVAL 5.0 - /* +/* * Fraction of the RTCP bandwidth to be shared among active * senders. (This fraction was chosen so that in a typical * session with one or two active senders, the computed report @@ -145,6 +145,12 @@ typedef struct { #define RTP_STATS_SENDER_FRACTION (0.25) #define RTP_STATS_RECEIVER_FRACTION (1.0 - RTP_STATS_SENDER_FRACTION) +/* + * When receiving a BYE from a source, remove the source fomr the database + * after this timeout. + */ +#define RTP_STATS_BYE_TIMEOUT (2 * GST_SECOND) + /** * RTPSessionStats: * @@ -156,16 +162,17 @@ typedef struct { gdouble receiver_fraction; gdouble rtcp_bandwidth; gdouble min_interval; + GstClockTime bye_timeout; guint sender_sources; guint active_sources; guint avg_rtcp_packet_size; - guint avg_bye_packet_size; - gboolean sent_rtcp; + guint bye_members; } RTPSessionStats; void rtp_stats_init_defaults (RTPSessionStats *stats); -gdouble rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender); -gdouble rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, gdouble interval); +GstClockTime rtp_stats_calculate_rtcp_interval (RTPSessionStats *stats, gboolean sender, gboolean first); +GstClockTime rtp_stats_add_rtcp_jitter (RTPSessionStats *stats, GstClockTime interval); +GstClockTime rtp_stats_calculate_bye_interval (RTPSessionStats *stats); #endif /* __RTP_STATS_H__ */ |