summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/rtpsession.c
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2007-04-27 15:09:12 +0000
committerWim Taymans <wim.taymans@gmail.com>2007-04-27 15:09:12 +0000
commita468f02d2aeabcf7e31d9e4cf576ab3474e8a1f7 (patch)
tree05189a20db17aeeace5ceeae55977bceeaf46667 /gst/rtpmanager/rtpsession.c
parente72bd2abf931cb60b32ae8f0e8e702e6fdbc1500 (diff)
downloadgst-plugins-bad-a468f02d2aeabcf7e31d9e4cf576ab3474e8a1f7.tar.gz
gst-plugins-bad-a468f02d2aeabcf7e31d9e4cf576ab3474e8a1f7.tar.bz2
gst-plugins-bad-a468f02d2aeabcf7e31d9e4cf576ab3474e8a1f7.zip
gst/rtpmanager/gstrtpsession.c: Move reconsideration code to the rtpsession object.
Original commit message from CVS: * gst/rtpmanager/gstrtpsession.c: (rtcp_thread), (gst_rtp_session_send_rtcp), (gst_rtp_session_reconsider): Move reconsideration code to the rtpsession object. Simplify timout handling and add reconsideration. * gst/rtpmanager/rtpsession.c: (rtp_session_class_init), (rtp_session_init), (rtp_session_finalize), (on_bye_ssrc), (on_bye_timeout), (on_timeout), (rtp_session_set_callbacks), (obtain_source), (rtp_session_create_source), (update_arrival_stats), (rtp_session_process_rtp), (rtp_session_process_sr), (rtp_session_process_rr), (rtp_session_process_bye), (rtp_session_process_rtcp), (calculate_rtcp_interval), (rtp_session_send_bye), (rtp_session_next_timeout), (session_start_rtcp), (session_report_blocks), (session_cleanup), (session_sdes), (session_bye), (is_rtcp_time), (rtp_session_on_timeout): * gst/rtpmanager/rtpsession.h: Handle timeout of inactive sources and senders. Implement BYE scheduling. * gst/rtpmanager/rtpsource.c: (calculate_jitter), (rtp_source_process_sr), (rtp_source_get_last_sr), (rtp_source_get_last_rb): * gst/rtpmanager/rtpsource.h: Add members to check for timeouts. * gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults), (rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter), (rtp_stats_calculate_bye_interval): * gst/rtpmanager/rtpstats.h: Use RFC algorithm for calculating the reporting interval.
Diffstat (limited to 'gst/rtpmanager/rtpsession.c')
-rw-r--r--gst/rtpmanager/rtpsession.c541
1 files changed, 470 insertions, 71 deletions
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c
index 1f6e1ebc..7244f5fb 100644
--- a/gst/rtpmanager/rtpsession.c
+++ b/gst/rtpmanager/rtpsession.c
@@ -35,6 +35,8 @@ enum
SIGNAL_ON_SSRC_COLLISION,
SIGNAL_ON_SSRC_VALIDATED,
SIGNAL_ON_BYE_SSRC,
+ SIGNAL_ON_BYE_TIMEOUT,
+ SIGNAL_ON_TIMEOUT,
LAST_SIGNAL
};
@@ -46,6 +48,14 @@ enum
PROP_0
};
+/* update average packet size, we keep this scaled by 16 to keep enough
+ * precision. */
+#define UPDATE_AVG(avg, val) \
+ if ((avg) == 0) \
+ (avg) = (val) << 4; \
+ else \
+ (avg) = ((val) + (15 * (avg))) >> 4;
+
/* GObject vmethods */
static void rtp_session_finalize (GObject * object);
static void rtp_session_set_property (GObject * object, guint prop_id,
@@ -119,6 +129,30 @@ rtp_session_class_init (RTPSessionClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc),
NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
G_TYPE_OBJECT);
+ /**
+ * RTPSession::on-bye-timeout:
+ * @session: the object which received the signal
+ * @src: the RTPSource that timed out
+ *
+ * Notify of an SSRC that has timed out because of BYE
+ */
+ rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT] =
+ g_signal_new ("on-bye-timeout", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_timeout),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
+ /**
+ * RTPSession::on-timeout:
+ * @session: the object which received the signal
+ * @src: the RTPSource that timed out
+ *
+ * Notify of an SSRC that has timed out
+ */
+ rtp_session_signals[SIGNAL_ON_TIMEOUT] =
+ g_signal_new ("on-timeout", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_timeout),
+ NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1,
+ G_TYPE_OBJECT);
GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session");
}
@@ -144,6 +178,7 @@ rtp_session_init (RTPSession * sess)
/* create an active SSRC for this session manager */
sess->source = rtp_session_create_source (sess);
+ sess->source->validated = TRUE;
sess->stats.active_sources++;
/* default UDP header length */
@@ -156,6 +191,8 @@ rtp_session_init (RTPSession * sess)
sess->name = g_strdup (g_get_real_name ());
sess->tool = g_strdup ("GStreamer");
+ sess->first_rtcp = TRUE;
+
GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc);
}
@@ -176,6 +213,7 @@ rtp_session_finalize (GObject * object)
g_free (sess->cname);
g_free (sess->tool);
+ g_free (sess->bye_reason);
G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object);
}
@@ -233,9 +271,22 @@ on_ssrc_validated (RTPSession * sess, RTPSource * source)
static void
on_bye_ssrc (RTPSession * sess, RTPSource * source)
{
+ /* notify app that reconsideration should be performed */
g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source);
}
+static void
+on_bye_timeout (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_TIMEOUT], 0, source);
+}
+
+static void
+on_timeout (RTPSession * sess, RTPSource * source)
+{
+ g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_TIMEOUT], 0, source);
+}
+
/**
* rtp_session_new:
*
@@ -272,6 +323,7 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
sess->callbacks.send_rtcp = callbacks->send_rtcp;
sess->callbacks.clock_rate = callbacks->clock_rate;
sess->callbacks.get_time = callbacks->get_time;
+ sess->callbacks.reconsider = callbacks->reconsider;
sess->user_data = user_data;
}
@@ -657,6 +709,11 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created,
if (check_collision (sess, source, arrival))
on_ssrc_collision (sess, source);
}
+ /* update last activity */
+ source->last_activity = arrival->time;
+ if (rtp)
+ source->last_rtp_activity = arrival->time;
+
return source;
}
@@ -819,6 +876,7 @@ rtp_session_create_source (RTPSession * sess)
break;
}
source = rtp_source_new (ssrc);
+ g_object_ref (source);
g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc),
source);
/* we have one more source now */
@@ -831,6 +889,7 @@ rtp_session_create_source (RTPSession * sess)
/* update the RTPArrivalStats structure with the current time and other bits
* about the current buffer we are handling.
* This function is typically called when a validated packet is received.
+ * This function should be called with the SESSION_LOCK
*/
static void
update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
@@ -842,9 +901,14 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
else
arrival->time = GST_CLOCK_TIME_NONE;
- /* update sizes */
- arrival->bytes = GST_BUFFER_SIZE (buffer) + 28;
- arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0);
+ /* get packet size including header overhead */
+ arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
+
+ if (rtp) {
+ arrival->payload_len = gst_rtp_buffer_get_payload_len (buffer);
+ } else {
+ arrival->payload_len = 0;
+ }
/* for netbuffer we can store the IP address to check for collisions */
arrival->have_address = GST_IS_NETBUFFER (buffer);
@@ -881,13 +945,16 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
if (!gst_rtp_buffer_validate (buffer))
goto invalid_packet;
+ RTP_SESSION_LOCK (sess);
/* update arrival stats */
update_arrival_stats (sess, &arrival, TRUE, buffer);
+ /* ignore more RTP packets when we left the session */
+ if (sess->source->received_bye)
+ goto ignore;
+
/* get SSRC and look up in session database */
ssrc = gst_rtp_buffer_get_ssrc (buffer);
-
- RTP_SESSION_LOCK (sess);
source = obtain_source (sess, ssrc, &created, &arrival, TRUE);
prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -930,6 +997,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
/* get source */
csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE);
+
if (created) {
GST_DEBUG ("created new CSRC: %08x", csrc);
rtp_source_set_as_csrc (csrc_src);
@@ -948,9 +1016,17 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
/* ERRORS */
invalid_packet:
{
+ gst_buffer_unref (buffer);
GST_DEBUG ("invalid RTP packet received");
return GST_FLOW_OK;
}
+ignore:
+ {
+ gst_buffer_unref (buffer);
+ RTP_SESSION_UNLOCK (sess);
+ GST_DEBUG ("ignoring RTP packet because we are leaving");
+ return GST_FLOW_OK;
+ }
}
/* A Sender report contains statistics about how the sender is doing. This
@@ -977,7 +1053,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
GST_DEBUG ("got SR packet: SSRC %08x", senderssrc);
- RTP_SESSION_LOCK (sess);
source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
prevsender = RTP_SOURCE_IS_SENDER (source);
@@ -1012,7 +1087,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
exthighestseq, jitter, lsr, dlsr);
}
}
- RTP_SESSION_UNLOCK (sess);
}
/* A receiver report contains statistics about how a receiver is doing. It
@@ -1034,7 +1108,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
GST_DEBUG ("got RR packet: SSRC %08x", senderssrc);
- RTP_SESSION_LOCK (sess);
source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
if (created)
@@ -1054,7 +1127,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
exthighestseq, jitter, lsr, dlsr);
}
}
- RTP_SESSION_UNLOCK (sess);
}
/* FIXME, we're just printing this for now... */
@@ -1113,20 +1185,25 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
guint32 ssrc;
RTPSource *source;
gboolean created, prevactive, prevsender;
+ guint pmembers, members;
ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i);
GST_DEBUG ("SSRC: %08x", ssrc);
/* find src and mark bye, no probation when dealing with RTCP */
- RTP_SESSION_LOCK (sess);
source = obtain_source (sess, ssrc, &created, arrival, FALSE);
+ /* store time for when we need to time out this source */
+ source->bye_time = arrival->time;
+
prevactive = RTP_SOURCE_IS_ACTIVE (source);
prevsender = RTP_SOURCE_IS_SENDER (source);
/* let the source handle the rest */
rtp_source_process_bye (source, reason);
+ pmembers = sess->stats.active_sources;
+
if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) {
sess->stats.active_sources--;
GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc,
@@ -1137,12 +1214,34 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet,
GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc,
sess->stats.sender_sources);
}
+ members = sess->stats.active_sources;
+
+ if (!sess->source->received_bye && members < pmembers) {
+ /* some members went away since the previous timeout estimate.
+ * Perform reverse reconsideration but only when we are not scheduling a
+ * BYE ourselves. */
+ if (arrival->time < sess->next_rtcp_check_time) {
+ GstClockTime time_remaining;
+
+ time_remaining = sess->next_rtcp_check_time - arrival->time;
+ sess->next_rtcp_check_time =
+ gst_util_uint64_scale (time_remaining, members, pmembers);
+
+ GST_DEBUG ("reverse reconsideration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (sess->next_rtcp_check_time));
+
+ sess->next_rtcp_check_time += arrival->time;
+
+ /* notify app of reconsideration */
+ if (sess->callbacks.reconsider)
+ sess->callbacks.reconsider (sess, sess->user_data);
+ }
+ }
if (created)
on_new_ssrc (sess, source);
on_bye_ssrc (sess, source);
- RTP_SESSION_UNLOCK (sess);
}
g_free (reason);
}
@@ -1167,9 +1266,8 @@ GstFlowReturn
rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
{
GstRTCPPacket packet;
- gboolean more;
+ gboolean more, is_bye = FALSE;
RTPArrivalStats arrival;
- guint size;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -1177,27 +1275,29 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
if (!gst_rtcp_buffer_validate (buffer))
goto invalid_packet;
- /* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer);
-
GST_DEBUG ("received RTCP packet");
- /* get packet size including header overhead */
RTP_SESSION_LOCK (sess);
- size = GST_BUFFER_SIZE (buffer) + sess->header_len;
+ /* update arrival stats */
+ update_arrival_stats (sess, &arrival, FALSE, buffer);
- /* update average RTCP packet size */
- if (sess->stats.avg_rtcp_packet_size == 0)
- sess->stats.avg_rtcp_packet_size = size;
- else
- sess->stats.avg_rtcp_packet_size =
- (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4;
- RTP_SESSION_UNLOCK (sess);
+ if (sess->sent_bye)
+ goto ignore;
/* start processing the compound packet */
more = gst_rtcp_buffer_get_first_packet (buffer, &packet);
while (more) {
- switch (gst_rtcp_packet_get_type (&packet)) {
+ GstRTCPType type;
+
+ type = gst_rtcp_packet_get_type (&packet);
+
+ /* when we are leaving the session, we should ignore all non-BYE messages */
+ if (sess->source->received_bye && type != GST_RTCP_TYPE_BYE) {
+ GST_DEBUG ("ignoring non-BYE RTCP packet because we are leaving");
+ goto next;
+ }
+
+ switch (type) {
case GST_RTCP_TYPE_SR:
rtp_session_process_sr (sess, &packet, &arrival);
break;
@@ -1208,6 +1308,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
rtp_session_process_sdes (sess, &packet, &arrival);
break;
case GST_RTCP_TYPE_BYE:
+ is_bye = TRUE;
rtp_session_process_bye (sess, &packet, &arrival);
break;
case GST_RTCP_TYPE_APP:
@@ -1217,9 +1318,23 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
GST_WARNING ("got unknown RTCP packet");
break;
}
+ next:
more = gst_rtcp_packet_move_to_next (&packet);
}
+ /* if we are scheduling a BYE, we only want to count bye packets, else we
+ * count everything */
+ if (sess->source->received_bye) {
+ if (is_bye) {
+ sess->stats.bye_members++;
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ }
+ } else {
+ /* keep track of average packet size */
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, arrival.bytes);
+ }
+ RTP_SESSION_UNLOCK (sess);
+
gst_buffer_unref (buffer);
return GST_FLOW_OK;
@@ -1230,11 +1345,18 @@ invalid_packet:
GST_DEBUG ("invalid RTCP packet received");
return GST_FLOW_OK;
}
+ignore:
+ {
+ gst_buffer_unref (buffer);
+ RTP_SESSION_UNLOCK (sess);
+ GST_DEBUG ("ignoring RTP packet because we left");
+ return GST_FLOW_OK;
+ }
}
/**
* rtp_session_send_rtp:
- * @sess: and #RTPSession
+ * @sess: an #RTPSession
* @buffer: an RTP buffer
*
* Send the RTP buffer in the session manager.
@@ -1266,25 +1388,125 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
return result;
}
+static GstClockTime
+calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
+ gboolean first)
+{
+ GstClockTime result;
+
+ if (sess->source->received_bye) {
+ result = rtp_stats_calculate_rtcp_interval (&sess->stats,
+ RTP_SOURCE_IS_SENDER (sess->source), first);
+ } else {
+ result = rtp_stats_calculate_bye_interval (&sess->stats);
+ }
+
+ GST_DEBUG ("next deterministic interval: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (result));
+
+ if (!deterministic)
+ result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+
+ GST_DEBUG ("next interval: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
+
+ return result;
+}
+
/**
- * rtp_session_get_reporting_interval:
+ * rtp_session_send_bye:
* @sess: an #RTPSession
+ * @reason: a reason or NULL
*
- * Get the interval for sending out the next RTCP packet and doing general
- * maintenance tasks.
+ * Stop the current @sess and schedule a BYE message for the other members.
*
- * Returns: an interval in seconds.
+ * Returns: a #GstFlowReturn.
*/
-gdouble
-rtp_session_get_reporting_interval (RTPSession * sess)
+GstFlowReturn
+rtp_session_send_bye (RTPSession * sess, const gchar * reason)
+{
+ GstFlowReturn result = GST_FLOW_OK;
+ RTPSource *source;
+ GstClockTime current, interval;
+
+ g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+
+ RTP_SESSION_LOCK (sess);
+ source = sess->source;
+
+ /* ignore more BYEs */
+ if (source->received_bye)
+ goto done;
+
+ /* we have BYE now */
+ source->received_bye = TRUE;
+ /* at least one member wants to send a BYE */
+ sess->bye_reason = g_strdup (reason);
+ sess->stats.avg_rtcp_packet_size = 100;
+ sess->stats.bye_members = 1;
+ sess->first_rtcp = TRUE;
+ sess->sent_bye = FALSE;
+
+ /* get current time */
+ if (sess->callbacks.get_time)
+ current = sess->callbacks.get_time (sess, sess->user_data);
+ else
+ current = 0;
+
+ /* reschedule transmission */
+ sess->last_rtcp_send_time = current;
+ interval = calculate_rtcp_interval (sess, FALSE, TRUE);
+ sess->next_rtcp_check_time = current + interval;
+
+ GST_DEBUG ("Schedule BYE for %" GST_TIME_FORMAT ", %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (interval), GST_TIME_ARGS (sess->next_rtcp_check_time));
+
+ /* notify app of reconsideration */
+ if (sess->callbacks.reconsider)
+ sess->callbacks.reconsider (sess, sess->user_data);
+done:
+ RTP_SESSION_UNLOCK (sess);
+
+ return result;
+}
+
+/**
+ * rtp_session_next_timeout:
+ * @sess: an #RTPSession
+ * @time: the current time
+ *
+ * Get the next time we should perform session maintenance tasks.
+ *
+ * Returns: a time when rtp_session_on_timeout() should be called with the
+ * current time.
+ */
+GstClockTime
+rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
{
- gdouble result;
+ GstClockTime result;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
RTP_SESSION_LOCK (sess);
- result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE);
- result = rtp_stats_add_rtcp_jitter (&sess->stats, result);
+
+ result = sess->next_rtcp_check_time;
+
+ if (sess->source->received_bye) {
+ if (sess->sent_bye)
+ result = GST_CLOCK_TIME_NONE;
+ else if (sess->stats.active_sources >= 50)
+ /* reconsider BYE if members >= 50 */
+ result = time + calculate_rtcp_interval (sess, FALSE, TRUE);;
+ } else {
+ if (sess->first_rtcp)
+ /* we are called for the first time */
+ result = time + calculate_rtcp_interval (sess, FALSE, TRUE);
+ else if (sess->next_rtcp_check_time < time)
+ /* get a new timeout when we need to */
+ result = time + calculate_rtcp_interval (sess, FALSE, FALSE);
+ }
+ sess->next_rtcp_check_time = result;
+
+ GST_DEBUG ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (result));
RTP_SESSION_UNLOCK (sess);
return result;
@@ -1295,34 +1517,46 @@ typedef struct
RTPSession *sess;
GstBuffer *rtcp;
GstClockTime time;
+ GstClockTime interval;
GstRTCPPacket packet;
+ gboolean is_bye;
+ gboolean has_sdes;
} ReportData;
static void
+session_start_rtcp (RTPSession * sess, ReportData * data)
+{
+ GstRTCPPacket *packet = &data->packet;
+ RTPSource *own = sess->source;
+
+ data->rtcp = gst_rtcp_buffer_new (sess->mtu);
+
+ if (RTP_SOURCE_IS_SENDER (own)) {
+ /* we are a sender, create SR */
+ GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
+ gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
+
+ /* fill in sender report info, FIXME NTP and RTP timestamps missing */
+ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
+ 0, 0, own->stats.packets_sent, own->stats.octets_sent);
+ } else {
+ /* we are only receiver, create RR */
+ GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
+ gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
+ gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
+ }
+}
+
+/* construct a Sender or Receiver Report */
+static void
session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
{
RTPSession *sess = data->sess;
- RTPSource *own = sess->source;
GstRTCPPacket *packet = &data->packet;
/* create a new buffer if needed */
if (data->rtcp == NULL) {
- data->rtcp = gst_rtcp_buffer_new (sess->mtu);
-
- if (RTP_SOURCE_IS_SENDER (own)) {
- /* we are a sender, create SR */
- GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
- gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
-
- /* fill in sender report info */
- gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
- 0, 0, own->stats.packets_sent, own->stats.octets_sent);
- } else {
- /* we are only receiver, create RR */
- GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
- gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet);
- gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc);
- }
+ session_start_rtcp (sess, data);
}
if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
/* only report about other sender sources */
@@ -1381,16 +1615,85 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
}
}
+/* perform cleanup of sources that timed out */
+static gboolean
+session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
+{
+ gboolean remove = FALSE;
+ gboolean byetimeout = FALSE;
+ gboolean is_sender, is_active;
+ RTPSession *sess = data->sess;
+ GstClockTime interval;
+
+ is_sender = RTP_SOURCE_IS_SENDER (source);
+ is_active = RTP_SOURCE_IS_ACTIVE (source);
+
+ /* check for our own source, we don't want to delete our own source. */
+ if (!(source == sess->source)) {
+ if (source->received_bye) {
+ /* if we received a BYE from the source, remove the source after some
+ * time. */
+ if (data->time > source->bye_time &&
+ data->time - source->bye_time > sess->stats.bye_timeout) {
+ GST_DEBUG ("removing BYE source %08x", source->ssrc);
+ remove = TRUE;
+ byetimeout = TRUE;
+ }
+ }
+ /* sources that were inactive for more than 5 times the deterministic reporting
+ * interval get timed out. the min timeout is 5 seconds. */
+ if (data->time > source->last_activity) {
+ interval = MAX (data->interval * 5, 5 * GST_SECOND);
+ if (data->time - source->last_activity > interval) {
+ GST_DEBUG ("removing timeout source %08x, last %" GST_TIME_FORMAT,
+ source->ssrc, GST_TIME_ARGS (source->last_activity));
+ remove = TRUE;
+ }
+ }
+ }
+
+ /* senders that did not send for a long time become a receiver, this also
+ * holds for our own source. */
+ if (is_sender) {
+ if (data->time > source->last_rtp_activity) {
+ interval = MAX (data->interval * 2, 5 * GST_SECOND);
+
+ if (data->time - source->last_rtp_activity > interval) {
+ GST_DEBUG ("sender source %08x timed out and became receiver, last %"
+ GST_TIME_FORMAT, source->ssrc,
+ GST_TIME_ARGS (source->last_rtp_activity));
+ source->is_sender = FALSE;
+ sess->stats.sender_sources--;
+ }
+ }
+ }
+
+ if (remove) {
+ sess->total_sources--;
+ if (is_sender)
+ sess->stats.sender_sources--;
+ if (is_active)
+ sess->stats.active_sources--;
+
+ if (byetimeout)
+ on_bye_timeout (sess, source);
+ else
+ on_timeout (sess, source);
+
+ }
+ return remove;
+}
+
static void
-session_sdes (RTPSession * sess, GstBuffer * buffer)
+session_sdes (RTPSession * sess, ReportData * data)
{
- GstRTCPPacket packet;
+ GstRTCPPacket *packet = &data->packet;
/* add SDES packet */
- gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_SDES, &packet);
+ gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SDES, packet);
- gst_rtcp_packet_sdes_add_item (&packet, sess->source->ssrc);
- gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_CNAME,
+ gst_rtcp_packet_sdes_add_item (packet, sess->source->ssrc);
+ gst_rtcp_packet_sdes_add_entry (packet, GST_RTCP_SDES_CNAME,
strlen (sess->cname), (guint8 *) sess->cname);
/* other SDES items must only be added at regular intervals and only when the
@@ -1401,20 +1704,87 @@ session_sdes (RTPSession * sess, GstBuffer * buffer)
gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL,
strlen (sess->tool), (guint8 *) sess->tool);
#endif
+
+ data->has_sdes = TRUE;
+}
+
+/* schedule a BYE packet */
+static void
+session_bye (RTPSession * sess, ReportData * data)
+{
+ GstRTCPPacket *packet = &data->packet;
+
+ /* open packet */
+ session_start_rtcp (sess, data);
+
+ /* add SDES */
+ session_sdes (sess, data);
+
+ /* add a BYE packet */
+ gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_BYE, packet);
+ gst_rtcp_packet_bye_add_ssrc (packet, sess->source->ssrc);
+ if (sess->bye_reason)
+ gst_rtcp_packet_bye_set_reason (packet, sess->bye_reason);
+
+ /* we have a BYE packet now */
+ data->is_bye = TRUE;
+}
+
+static gboolean
+is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
+{
+ GstClockTime new_send_time;
+ gboolean result;
+
+ /* no need to check yet */
+ if (sess->next_rtcp_check_time > time) {
+ GST_DEBUG ("no check time yet");
+ return FALSE;
+ }
+
+ /* perform forward reconsideration */
+ new_send_time = rtp_stats_add_rtcp_jitter (&sess->stats, data->interval);
+
+ GST_DEBUG ("forward reconsideration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+
+ new_send_time += sess->last_rtcp_send_time;
+
+ /* check if reconsideration */
+ if (time < new_send_time) {
+ GST_DEBUG ("reconsider RTCP for %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+ result = FALSE;
+ /* store new check time */
+ sess->next_rtcp_check_time = new_send_time;
+ } else {
+ result = TRUE;
+ new_send_time = calculate_rtcp_interval (sess, FALSE, FALSE);
+
+ GST_DEBUG ("can send RTCP now, next interval %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_send_time));
+ sess->next_rtcp_check_time = time + new_send_time;
+ }
+ return result;
}
/**
- * rtp_session_perform_reporting:
+ * rtp_session_on_timeout:
* @sess: an #RTPSession
*
- * Instruct the session manager to generate RTCP packets with current stats.
- * This function will call the #RTPSessionSendRTCP callback, possibly multiple
+ * Perform maintenance actions after the timeout obtained with
+ * rtp_session_next_timeout() expired.
+ *
+ * This function will perform timeouts of receivers and senders, send a BYE
+ * packet or generate RTCP packets with current session stats.
+ *
+ * This function can call the #RTPSessionSendRTCP callback, possibly multiple
* times, for each packet that should be processed.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_perform_reporting (RTPSession * sess)
+rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
{
GstFlowReturn result = GST_FLOW_OK;
ReportData data;
@@ -1423,21 +1793,49 @@ rtp_session_perform_reporting (RTPSession * sess)
data.sess = sess;
data.rtcp = NULL;
+ data.time = time;
+ data.is_bye = FALSE;
+ data.has_sdes = FALSE;
- /* get time so it can be used later */
- data.time = sess->callbacks.get_time (sess, sess->user_data);
+ GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
RTP_SESSION_LOCK (sess);
- /* loop over all known sources and do something */
- g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
- (GHFunc) session_report_blocks, &data);
+ /* get a new interval, we need this for various cleanups etc */
+ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
+
+ /* first perform cleanups */
+ g_hash_table_foreach_remove (sess->ssrcs[sess->mask_idx],
+ (GHRFunc) session_cleanup, &data);
+
+ /* see if we need to generate SR or RR packets */
+ if (is_rtcp_time (sess, time, &data)) {
+ if (sess->source->received_bye) {
+ /* generate BYE instead */
+ session_bye (sess, &data);
+ sess->sent_bye = TRUE;
+ } else {
+ /* loop over all known sources and do something */
+ g_hash_table_foreach (sess->ssrcs[sess->mask_idx],
+ (GHFunc) session_report_blocks, &data);
+ }
+ }
- /* add SDES for this source */
if (data.rtcp) {
- session_sdes (sess, data.rtcp);
- sess->stats.sent_rtcp = TRUE;
- }
+ guint size;
+
+ /* we keep track of the last report time in order to timeout inactive
+ * receivers or senders */
+ sess->last_rtcp_send_time = data.time;
+ sess->first_rtcp = FALSE;
+ /* add SDES for this source when not already added */
+ if (!data.has_sdes)
+ session_sdes (sess, &data);
+
+ /* update average RTCP size before sending */
+ size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len;
+ UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size);
+ }
RTP_SESSION_UNLOCK (sess);
/* push out the RTCP packet */
@@ -1451,5 +1849,6 @@ rtp_session_perform_reporting (RTPSession * sess)
else
gst_buffer_unref (data.rtcp);
}
+
return result;
}