From fe7b1e82ee5fb0a99cdd2249a0e9513afa45bf88 Mon Sep 17 00:00:00 2001 From: Olivier Crete Date: Tue, 11 Mar 2008 12:40:58 +0000 Subject: gst/rtpmanager/rtpsession.*: Implement collision and loop detection in rtpmanager. Original commit message from CVS: Patch by: Olivier Crete * gst/rtpmanager/rtpsession.c: (find_add_conflicting_addresses), (check_collision), (obtain_source), (rtp_session_create_new_ssrc), (rtp_session_create_source), (rtp_session_process_rtp), (rtp_session_process_sr), (rtp_session_process_rr), (rtp_session_process_sdes), (rtp_session_process_bye), (rtp_session_send_bye_locked), (rtp_session_send_bye), (rtp_session_on_timeout): * gst/rtpmanager/rtpsession.h: Implement collision and loop detection in rtpmanager. Fixes #520626. * gst/rtpmanager/rtpsource.c: (rtp_source_reset), (rtp_source_init): * gst/rtpmanager/rtpsource.h: Add method to reset stats. --- ChangeLog | 20 ++++ gst/rtpmanager/rtpsession.c | 230 ++++++++++++++++++++++++++++++++++++++++---- gst/rtpmanager/rtpsession.h | 19 ++++ gst/rtpmanager/rtpsource.c | 24 ++++- gst/rtpmanager/rtpsource.h | 2 + 5 files changed, 273 insertions(+), 22 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3eddfb22..d065acd9 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,23 @@ +2008-03-11 Wim Taymans + + Patch by: Olivier Crete + + * gst/rtpmanager/rtpsession.c: (find_add_conflicting_addresses), + (check_collision), (obtain_source), (rtp_session_create_new_ssrc), + (rtp_session_create_source), (rtp_session_process_rtp), + (rtp_session_process_sr), (rtp_session_process_rr), + (rtp_session_process_sdes), (rtp_session_process_bye), + (rtp_session_send_bye_locked), (rtp_session_send_bye), + (rtp_session_on_timeout): + * gst/rtpmanager/rtpsession.h: + Implement collision and loop detection in rtpmanager. + Fixes #520626. + + * gst/rtpmanager/rtpsource.c: (rtp_source_reset), + (rtp_source_init): + * gst/rtpmanager/rtpsource.h: + Add method to reset stats. + 2008-03-11 Wim Taymans Based on patch by: Ole André Vadla Ravnås diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 3b2951a1..0eaf3e6f 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -82,6 +82,11 @@ enum else \ (avg) = ((val) + (15 * (avg))) >> 4; +/* The number RTCP intervals after which to timeout entries in the + * collision table + */ +#define RTCP_INTERVAL_COLLISION_TIMEOUT 10 + /* GObject vmethods */ static void rtp_session_finalize (GObject * object); static void rtp_session_set_property (GObject * object, guint prop_id, @@ -95,6 +100,10 @@ G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, RTPArrivalStats * arrival, gboolean rtp); +static GstFlowReturn rtp_session_send_bye_locked (RTPSession * sess, + const gchar * reason); +static GstClockTime calculate_rtcp_interval (RTPSession * sess, + gboolean deterministic, gboolean first); static void rtp_session_class_init (RTPSessionClass * klass) @@ -852,14 +861,110 @@ static RTPSourceCallbacks callbacks = { (RTPSourceClockRate) source_clock_rate, }; +/** + * find_add_conflicting_addresses: + * @sess: The session to check in + * @arrival: The arrival stats for the buffer + * + * Checks if an address which has a conflict is already known, + * otherwise remembers it to prevent loops. + * + * Returns: TRUE if it was a known conflict, FALSE otherwise + */ + static gboolean -check_collision (RTPSession * sess, RTPSource * source, - RTPArrivalStats * arrival) +find_add_conflicting_addresses (RTPSession * sess, RTPArrivalStats * arrival) { - /* FIXME, do collision check */ + GList *item; + RTPConflictingAddress *new_conflict; + + for (item = g_list_first (sess->conflicting_addresses); + item; item = g_list_next (item)) { + RTPConflictingAddress *known_conflict = item->data; + + if (gst_netaddress_equal (&arrival->address, &known_conflict->address)) { + known_conflict->time = arrival->time; + return TRUE; + } + } + + new_conflict = g_new0 (RTPConflictingAddress, 1); + + memcpy (&new_conflict->address, &arrival->address, sizeof (GstNetAddress)); + new_conflict->time = arrival->time; + + sess->conflicting_addresses = g_list_prepend (sess->conflicting_addresses, + new_conflict); + return FALSE; } +static gboolean +check_collision (RTPSession * sess, RTPSource * source, + RTPArrivalStats * arrival, gboolean rtp) +{ + /* If we have not arrival address, we can't do collision checking */ + if (!arrival->have_address) { + return FALSE; + } + + if (sess->source != source) { + /* This is not our local source, but lets check if two remote + * source collide + */ + + if (rtp) { + if (source->have_rtp_from) { + if (gst_netaddress_equal (&source->rtp_from, &arrival->address)) + /* Address is the same */ + return FALSE; + } else { + /* We don't already have a from address for RTP, just set it */ + rtp_source_set_rtp_from (source, &arrival->address); + return FALSE; + } + } else { + if (source->have_rtcp_from) { + if (gst_netaddress_equal (&source->rtcp_from, &arrival->address)) + /* Address is the same */ + return FALSE; + } else { + /* We don't already have a from address for RTCP, just set it */ + rtp_source_set_rtcp_from (source, &arrival->address); + return FALSE; + } + } + + /* In this case, we have third-party collision or loop */ + + /* FIXME: Log 3rd party collision somehow + * Maybe should be done in upper layer, only the SDES can tell us + * if its a collision or a loop + */ + } else { + /* This is sending with our ssrc, is it an address we already know */ + + if (find_add_conflicting_addresses (sess, arrival)) { + /* Its a known conflict, its probably a loop, not a collision + * lets just drop the incoming packet + */ + GST_DEBUG ("Our packets are being looped back to us, dropping"); + } else { + /* Its a new collision, lets change our SSRC */ + + GST_DEBUG ("Collision for SSRC %x", rtp_source_get_ssrc (source)); + on_ssrc_collision (sess, source); + + rtp_session_send_bye_locked (sess, "SSRC Collision"); + + sess->change_ssrc = TRUE; + } + } + + return TRUE; +} + + /* must be called with the session lock */ static RTPSource * obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, @@ -901,8 +1006,9 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, } else { *created = FALSE; /* check for collision, this updates the address when not previously set */ - if (check_collision (sess, source, arrival)) - on_ssrc_collision (sess, source); + if (check_collision (sess, source, arrival, rtp)) { + return NULL; + } } /* update last activity */ source->last_activity = arrival->time; @@ -1066,6 +1172,24 @@ rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname) return result; } +static guint32 +rtp_session_create_new_ssrc (RTPSession * sess) +{ + guint32 ssrc; + + while (TRUE) { + ssrc = g_random_int (); + + /* see if it exists in the session, we're done if it doesn't */ + if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (ssrc)) == NULL) + break; + } + + return ssrc; +} + + /** * rtp_session_create_source: * @sess: an #RTPSession @@ -1082,14 +1206,7 @@ rtp_session_create_source (RTPSession * sess) RTPSource *source; RTP_SESSION_LOCK (sess); - while (TRUE) { - ssrc = g_random_int (); - - /* see if it exists in the session, we're done if it doesn't */ - if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx], - GINT_TO_POINTER (ssrc)) == NULL) - break; - } + ssrc = rtp_session_create_new_ssrc (sess); source = rtp_source_new (ssrc); g_object_ref (source); rtp_source_set_callbacks (source, &callbacks, sess); @@ -1176,6 +1293,9 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, ssrc = gst_rtp_buffer_get_ssrc (buffer); source = obtain_source (sess, ssrc, &created, &arrival, TRUE); + if (!source) + goto collision; + prevsender = RTP_SOURCE_IS_SENDER (source); prevactive = RTP_SOURCE_IS_ACTIVE (source); @@ -1246,6 +1366,13 @@ ignore: GST_DEBUG ("ignoring RTP packet because we are leaving"); return GST_FLOW_OK; } +collision: + { + gst_buffer_unref (buffer); + RTP_SESSION_UNLOCK (sess); + GST_DEBUG ("ignoring packet because its collisioning"); + return GST_FLOW_OK; + } } static void @@ -1303,6 +1430,9 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + if (!source) + return; + GST_BUFFER_OFFSET (packet->buffer) = source->clock_base; prevsender = RTP_SOURCE_IS_SENDER (source); @@ -1343,6 +1473,9 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + if (!source) + return; + if (created) on_new_ssrc (sess, source); @@ -1375,6 +1508,9 @@ rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, ssrc, &created, arrival, FALSE); changed = FALSE; + if (!source) + return; + more_entries = gst_rtcp_packet_sdes_first_entry (packet); j = 0; while (more_entries) { @@ -1428,6 +1564,9 @@ rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, /* find src and mark bye, no probation when dealing with RTCP */ source = obtain_source (sess, ssrc, &created, arrival, FALSE); + if (!source) + return; + /* store time for when we need to time out this source */ source->bye_time = arrival->time; @@ -1677,16 +1816,18 @@ calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, } /** - * rtp_session_send_bye: + * rtp_session_send_bye_locked: * @sess: an #RTPSession * @reason: a reason or NULL * * Stop the current @sess and schedule a BYE message for the other members. * + * One must have the session lock to call this function + * * Returns: a #GstFlowReturn. */ -GstFlowReturn -rtp_session_send_bye (RTPSession * sess, const gchar * reason) +static GstFlowReturn +rtp_session_send_bye_locked (RTPSession * sess, const gchar * reason) { GstFlowReturn result = GST_FLOW_OK; RTPSource *source; @@ -1695,7 +1836,6 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); - RTP_SESSION_LOCK (sess); source = sess->source; /* ignore more BYEs */ @@ -1728,6 +1868,30 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) if (sess->callbacks.reconsider) sess->callbacks.reconsider (sess, sess->reconsider_user_data); done: + + return result; +} + +/** + * rtp_session_send_bye: + * @sess: an #RTPSession + * @reason: a reason or NULL + * + * Stop the current @sess and schedule a BYE message for the other members. + * + * One must have the session lock to call this function + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_send_bye (RTPSession * sess, const gchar * reason) +{ + GstFlowReturn result = GST_FLOW_OK; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + + RTP_SESSION_LOCK (sess); + result = rtp_session_send_bye_locked (sess, reason); RTP_SESSION_UNLOCK (sess); return result; @@ -2051,6 +2215,7 @@ GstFlowReturn rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; + GList *item; ReportData data; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -2102,6 +2267,37 @@ rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime) size = GST_BUFFER_SIZE (data.rtcp) + sess->header_len; UPDATE_AVG (sess->stats.avg_rtcp_packet_size, size); } + + /* check for outdated collisions */ + item = g_list_first (sess->conflicting_addresses); + while (item) { + RTPConflictingAddress *known_conflict = item->data; + GList *next_item = g_list_next (item); + + if (known_conflict->time < time - (data.interval * + RTCP_INTERVAL_COLLISION_TIMEOUT)) { + sess->conflicting_addresses = + g_list_delete_link (sess->conflicting_addresses, item); + g_free (known_conflict); + } + item = next_item; + } + + if (sess->change_ssrc) { + g_hash_table_steal (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (sess->source->ssrc)); + + sess->source->ssrc = rtp_session_create_new_ssrc (sess); + rtp_source_reset (sess->source); + + g_hash_table_insert (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (sess->source->ssrc), sess->source); + + g_free (sess->bye_reason); + sess->bye_reason = NULL; + sess->sent_bye = FALSE; + sess->change_ssrc = FALSE; + } RTP_SESSION_UNLOCK (sess); /* push out the RTCP packet */ diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 59703300..e14e2da3 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -139,6 +139,20 @@ typedef struct { RTPSessionReconsider reconsider; } RTPSessionCallbacks; +/** + * RTPConflictingAddress: + * @address: #GstNetAddress which conflicted + * @last_conflict_time: time when the last conflict was seen + * + * This structure is used to account for addresses that have conflicted to find + * loops. + */ + +typedef struct { + GstNetAddress address; + GstClockTime time; +} RTPConflictingAddress; + /** * RTPSession: * @lock: lock to protect the session @@ -149,6 +163,8 @@ typedef struct { * @activecount: the number of active sources * @callbacks: callbacks * @user_data: user data passed in callbacks + * @stats: session statistics + * @conflicting_addresses: GList of conflicting addresses * * The RTP session manager object */ @@ -187,6 +203,9 @@ struct _RTPSession { RTPSessionStats stats; + GList *conflicting_addresses; + gboolean change_ssrc; + /* for mapping clock time to NTP time */ GstClockTime base_time; }; diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 1938324f..938a1d56 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -142,6 +142,24 @@ rtp_source_class_init (RTPSourceClass * klass) GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); } +/** + * rtp_source_reset: + * @src: an #RTPSource + * + * Reset the stats of @src. + */ +void +rtp_source_reset (RTPSource * src) +{ + src->received_bye = FALSE; + + src->stats.cycles = -1; + src->stats.jitter = 0; + src->stats.transit = -1; + src->stats.curr_sr = 0; + src->stats.curr_rr = 0; +} + static void rtp_source_init (RTPSource * src) { @@ -157,11 +175,7 @@ rtp_source_init (RTPSource * src) src->seqnum_base = -1; src->last_rtptime = -1; - src->stats.cycles = -1; - src->stats.jitter = 0; - src->stats.transit = -1; - src->stats.curr_sr = 0; - src->stats.curr_rr = 0; + rtp_source_reset (src); } static void diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index b731ae6a..1eae0c18 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -215,4 +215,6 @@ gboolean rtp_source_get_last_rb (RTPSource *src, guint8 *fraction guint32 *exthighestseq, guint32 *jitter, guint32 *lsr, guint32 *dlsr); +void rtp_source_reset (RTPSource * src); + #endif /* __RTP_SOURCE_H__ */ -- cgit v1.2.1