diff options
Diffstat (limited to 'gst/rtpmanager/rtpsession.c')
-rw-r--r-- | gst/rtpmanager/rtpsession.c | 487 |
1 files changed, 451 insertions, 36 deletions
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index e13f7d6c..e4925a2a 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -57,6 +57,9 @@ static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); +static RTPSource *obtain_source (RTPSession * sess, guint32 ssrc, + gboolean * created, RTPArrivalStats * arrival, gboolean rtp); + static void rtp_session_class_init (RTPSessionClass * klass) { @@ -123,18 +126,35 @@ rtp_session_class_init (RTPSessionClass * klass) static void rtp_session_init (RTPSession * sess) { + gint i; + sess->lock = g_mutex_new (); - sess->ssrcs = - g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); + sess->key = g_random_int (); + sess->mask_idx = 0; + sess->mask = 0; + + for (i = 0; i < 32; i++) { + sess->ssrcs[i] = + g_hash_table_new_full (NULL, NULL, NULL, + (GDestroyNotify) g_object_unref); + } sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL); - /* create an SSRC for this session manager */ - sess->source = rtp_session_create_source (sess); - rtp_stats_init_defaults (&sess->stats); + /* create an active SSRC for this session manager */ + sess->source = rtp_session_create_source (sess); + sess->stats.active_sources++; + /* default UDP header length */ sess->header_len = 28; + sess->mtu = 1400; + + /* some default SDES entries */ + //sess->cname = g_strdup_printf ("%s@%s", g_get_user_name (), g_get_host_name ()); + sess->cname = g_strdup_printf ("foo@%s", g_get_host_name ()); + sess->name = g_strdup (g_get_real_name ()); + sess->tool = g_strdup ("GStreamer"); GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); } @@ -143,14 +163,20 @@ static void rtp_session_finalize (GObject * object) { RTPSession *sess; + gint i; sess = RTP_SESSION_CAST (object); g_mutex_free (sess->lock); - g_hash_table_destroy (sess->ssrcs); + for (i = 0; i < 32; i++) + g_hash_table_destroy (sess->ssrcs[i]); + g_hash_table_destroy (sess->cnames); g_object_unref (sess->source); + g_free (sess->cname); + g_free (sess->tool); + G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); } @@ -312,6 +338,230 @@ rtp_session_get_rtcp_bandwidth (RTPSession * sess) return sess->stats.rtcp_bandwidth; } +/** + * rtp_session_set_cname: + * @sess: an #RTPSession + * @cname: a CNAME for the session + * + * Set the CNAME for the session. + */ +void +rtp_session_set_cname (RTPSession * sess, const gchar * cname) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->cname); + sess->cname = g_strdup (cname); +} + +/** + * rtp_session_get_cname: + * @sess: an #RTPSession + * + * Get the currently configured CNAME for the session. + * + * Returns: The CNAME. g_free after usage. + */ +gchar * +rtp_session_get_cname (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->cname); +} + +/** + * rtp_session_set_name: + * @sess: an #RTPSession + * @name: a NAME for the session + * + * Set the NAME for the session. + */ +void +rtp_session_set_name (RTPSession * sess, const gchar * name) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->name); + sess->name = g_strdup (name); +} + +/** + * rtp_session_get_name: + * @sess: an #RTPSession + * + * Get the currently configured NAME for the session. + * + * Returns: The NAME. g_free after usage. + */ +gchar * +rtp_session_get_name (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->name); +} + +/** + * rtp_session_set_email: + * @sess: an #RTPSession + * @email: an EMAIL for the session + * + * Set the EMAIL the session. + */ +void +rtp_session_set_email (RTPSession * sess, const gchar * email) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->email); + sess->email = g_strdup (email); +} + +/** + * rtp_session_get_email: + * @sess: an #RTPSession + * + * Get the currently configured EMAIL of the session. + * + * Returns: The EMAIL. g_free after usage. + */ +gchar * +rtp_session_get_email (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->email); +} + +/** + * rtp_session_set_phone: + * @sess: an #RTPSession + * @phone: a PHONE for the session + * + * Set the PHONE the session. + */ +void +rtp_session_set_phone (RTPSession * sess, const gchar * phone) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->phone); + sess->phone = g_strdup (phone); +} + +/** + * rtp_session_get_location: + * @sess: an #RTPSession + * + * Get the currently configured PHONE of the session. + * + * Returns: The PHONE. g_free after usage. + */ +gchar * +rtp_session_get_phone (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->phone); +} + +/** + * rtp_session_set_location: + * @sess: an #RTPSession + * @location: a LOCATION for the session + * + * Set the LOCATION the session. + */ +void +rtp_session_set_location (RTPSession * sess, const gchar * location) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->location); + sess->location = g_strdup (location); +} + +/** + * rtp_session_get_location: + * @sess: an #RTPSession + * + * Get the currently configured LOCATION of the session. + * + * Returns: The LOCATION. g_free after usage. + */ +gchar * +rtp_session_get_location (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->location); +} + +/** + * rtp_session_set_tool: + * @sess: an #RTPSession + * @tool: a TOOL for the session + * + * Set the TOOL the session. + */ +void +rtp_session_set_tool (RTPSession * sess, const gchar * tool) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->tool); + sess->tool = g_strdup (tool); +} + +/** + * rtp_session_get_tool: + * @sess: an #RTPSession + * + * Get the currently configured TOOL of the session. + * + * Returns: The TOOL. g_free after usage. + */ +gchar * +rtp_session_get_tool (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->tool); +} + +/** + * rtp_session_set_note: + * @sess: an #RTPSession + * @note: a NOTE for the session + * + * Set the NOTE the session. + */ +void +rtp_session_set_note (RTPSession * sess, const gchar * note) +{ + g_return_if_fail (RTP_IS_SESSION (sess)); + + g_free (sess->note); + sess->note = g_strdup (note); +} + +/** + * rtp_session_get_note: + * @sess: an #RTPSession + * + * Get the currently configured NOTE of the session. + * + * Returns: The NOTE. g_free after usage. + */ +gchar * +rtp_session_get_note (RTPSession * sess) +{ + g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + + return g_strdup (sess->note); +} + static GstFlowReturn source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) { @@ -319,6 +569,8 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (source == session->source) { GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc); + + if (session->callbacks.send_rtp) result = session->callbacks.send_rtp (session, source, buffer, @@ -371,7 +623,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, { RTPSource *source; - source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + source = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc)); if (source == NULL) { /* make new Source in probation and insert */ source = rtp_source_new (ssrc); @@ -392,7 +645,8 @@ obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, /* configure a callback on the source */ rtp_source_set_callbacks (source, &callbacks, sess); - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), + source); /* we have one more source now */ sess->total_sources++; @@ -426,9 +680,12 @@ rtp_session_add_source (RTPSession * sess, RTPSource * src) g_return_val_if_fail (src != NULL, FALSE); RTP_SESSION_LOCK (sess); - find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc)); + find = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (src->ssrc)); if (find == NULL) { - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (src->ssrc), src); /* we have one more source now */ sess->total_sources++; result = TRUE; @@ -501,7 +758,8 @@ rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc) g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); RTP_SESSION_LOCK (sess); - result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); + result = + g_hash_table_lookup (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc)); if (result) g_object_ref (result); RTP_SESSION_UNLOCK (sess); @@ -556,11 +814,13 @@ rtp_session_create_source (RTPSession * sess) ssrc = g_random_int (); /* see if it exists in the session, we're done if it doesn't */ - if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL) + if (g_hash_table_lookup (sess->ssrcs[sess->mask_idx], + GINT_TO_POINTER (ssrc)) == NULL) break; } source = rtp_source_new (ssrc); - g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + g_hash_table_insert (sess->ssrcs[sess->mask_idx], GINT_TO_POINTER (ssrc), + source); /* we have one more source now */ sess->total_sources++; RTP_SESSION_UNLOCK (sess); @@ -633,6 +893,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) prevsender = RTP_SOURCE_IS_SENDER (source); prevactive = RTP_SOURCE_IS_ACTIVE (source); + gst_buffer_ref (buffer); + /* let source process the packet */ result = rtp_source_process_rtp (source, buffer, &arrival); @@ -652,10 +914,11 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) if (created) on_new_ssrc (sess, source); - /* for validated sources, we add the CSRCs as well */ if (source->validated) { guint8 i, count; + gboolean created; + /* for validated sources, we add the CSRCs as well */ count = gst_rtp_buffer_get_csrc_count (buffer); for (i = 0; i < count; i++) { @@ -675,6 +938,8 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) } } } + gst_buffer_unref (buffer); + RTP_SESSION_UNLOCK (sess); return result; @@ -704,17 +969,27 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, guint64 ntptime; guint count, i; RTPSource *source; - gboolean created; + gboolean created, prevsender; gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime, &packet_count, &octet_count); + GST_DEBUG ("got SR packet: SSRC %08x", senderssrc); + RTP_SESSION_LOCK (sess); source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + prevsender = RTP_SOURCE_IS_SENDER (source); + /* first update the source */ rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count); + if (prevsender != RTP_SOURCE_IS_SENDER (source)) { + sess->stats.sender_sources++; + GST_DEBUG ("source: %08x became sender, %d sender sources", senderssrc, + sess->stats.sender_sources); + } + if (created) on_new_ssrc (sess, source); @@ -785,36 +1060,36 @@ static void rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, RTPArrivalStats * arrival) { - guint chunks, i, j; - gboolean more_chunks, more_items; + guint items, i, j; + gboolean more_items, more_entries; - chunks = gst_rtcp_packet_sdes_get_chunk_count (packet); - GST_DEBUG ("got SDES packet with %d chunks", chunks); + items = gst_rtcp_packet_sdes_get_item_count (packet); + GST_DEBUG ("got SDES packet with %d items", items); - more_chunks = gst_rtcp_packet_sdes_first_chunk (packet); + more_items = gst_rtcp_packet_sdes_first_item (packet); i = 0; - while (more_chunks) { + while (more_items) { guint32 ssrc; ssrc = gst_rtcp_packet_sdes_get_ssrc (packet); - GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc); + GST_DEBUG ("item %d, SSRC %08x", i, ssrc); - more_items = gst_rtcp_packet_sdes_first_item (packet); + more_entries = gst_rtcp_packet_sdes_first_entry (packet); j = 0; - while (more_items) { + while (more_entries) { GstRTCPSDESType type; guint8 len; - gchar *data; + guint8 *data; - gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data); + gst_rtcp_packet_sdes_get_entry (packet, &type, &len, &data); - GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data); + GST_DEBUG ("entry %d, type %d, len %d, data %s", j, type, len, data); - more_items = gst_rtcp_packet_sdes_next_item (packet); + more_entries = gst_rtcp_packet_sdes_next_entry (packet); j++; } - more_chunks = gst_rtcp_packet_sdes_next_chunk (packet); + more_items = gst_rtcp_packet_sdes_next_item (packet); i++; } } @@ -906,6 +1181,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) GST_DEBUG ("received RTCP packet"); /* get packet size including header overhead */ + RTP_SESSION_LOCK (sess); size = GST_BUFFER_SIZE (buffer) + sess->header_len; /* update average RTCP packet size */ @@ -914,6 +1190,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) else sess->stats.avg_rtcp_packet_size = (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4; + RTP_SESSION_UNLOCK (sess); /* start processing the compound packet */ more = gst_rtcp_buffer_get_first_packet (buffer, &packet); @@ -972,6 +1249,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + RTP_SESSION_LOCK (sess); source = sess->source; prevsender = RTP_SOURCE_IS_SENDER (source); @@ -981,20 +1259,22 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; + RTP_SESSION_UNLOCK (sess); return result; } /** - * rtp_session_get_rtcp_interval: + * rtp_session_get_reporting_interval: * @sess: an #RTPSession * - * Get the interval for sending out the next RTCP packet + * Get the interval for sending out the next RTCP packet and doing general + * maintenance tasks. * * Returns: an interval in seconds. */ gdouble -rtp_session_get_rtcp_interval (RTPSession * sess) +rtp_session_get_reporting_interval (RTPSession * sess) { gdouble result; @@ -1008,8 +1288,112 @@ rtp_session_get_rtcp_interval (RTPSession * sess) return result; } +typedef struct +{ + RTPSession *sess; + GstBuffer *rtcp; + GstRTCPPacket packet; +} ReportData; + +static void +session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) +{ + RTPSession *sess = data->sess; + RTPSource *own = sess->source; + GstRTCPPacket *packet = &data->packet; + + /* create a new buffer if needed */ + if (data->rtcp == NULL) { + data->rtcp = gst_rtcp_buffer_new (sess->mtu); + + if (RTP_SOURCE_IS_SENDER (own)) { + /* we are a sender, create SR */ + GST_DEBUG ("create SR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); + + /* fill in sender report info */ + gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, + 0, 0, own->stats.packets_sent, own->stats.octets_sent); + } else { + /* we are only receiver, create RR */ + GST_DEBUG ("create RR for SSRC %08x", own->ssrc); + gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_RR, packet); + gst_rtcp_packet_rr_set_ssrc (packet, own->ssrc); + } + } + if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { + /* only report about other sources */ + if (source != sess->source) { + RTPSourceStats *stats; + guint32 extended_max, expected; + guint32 expected_interval, received_interval; + guint32 lost, lost_interval, fraction; + + stats = &source->stats; + + extended_max = (stats->cycles << 16) + stats->max_seq; + expected = extended_max - stats->base_seq + 1; + + if (expected > stats->packets_received) { + lost = expected - stats->packets_received; + if (lost > 0x7fffff) + lost = 0x7fffff; + } else { + lost = stats->packets_received - expected; + if (lost > 0x800000) + lost = 0x800000; + else + lost = -lost; + } + + expected_interval = expected - stats->prev_expected; + stats->prev_expected = expected; + received_interval = stats->packets_received - stats->prev_received; + stats->prev_received = stats->packets_received; + + lost_interval = expected_interval - received_interval; + + if (expected_interval == 0 || lost_interval <= 0) + fraction = 0; + else + fraction = (lost_interval << 8) / expected_interval; + + GST_DEBUG ("add RR for SSRC %08x", source->ssrc); + /* we scaled the jitter up for additional precision */ + GST_DEBUG ("fraction %d, lost %d, extseq %u, jitter %d", fraction, lost, + extended_max, stats->jitter >> 4); + + /* packet is not yet filled, add report block for this source. */ + gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost, + extended_max, stats->jitter >> 4, 0, 0); + } + } +} + +static void +session_sdes (RTPSession * sess, GstBuffer * buffer) +{ + GstRTCPPacket packet; + + /* add SDES packet */ + gst_rtcp_buffer_add_packet (buffer, GST_RTCP_TYPE_SDES, &packet); + + gst_rtcp_packet_sdes_add_item (&packet, sess->source->ssrc); + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_CNAME, + strlen (sess->cname), (guint8 *) sess->cname); + + /* other SDES items must only be added at regular intervals and only when the + * user requests to since it might be a privacy problem */ +#if 0 + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_NAME, + strlen (sess->name), (guint8 *) sess->name); + gst_rtcp_packet_sdes_add_entry (&packet, GST_RTCP_SDES_TOOL, + strlen (sess->tool), (guint8 *) sess->tool); +#endif +} + /** - * rtp_session_produce_rtcp: + * rtp_session_perform_reporting: * @sess: an #RTPSession * * Instruct the session manager to generate RTCP packets with current stats. @@ -1019,8 +1403,39 @@ rtp_session_get_rtcp_interval (RTPSession * sess) * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_produce_rtcp (RTPSession * sess) +rtp_session_perform_reporting (RTPSession * sess) { - /* FIXME: implement me */ - return GST_FLOW_NOT_SUPPORTED; + GstFlowReturn result = GST_FLOW_OK; + ReportData data; + + g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + + data.sess = sess; + data.rtcp = NULL; + + RTP_SESSION_LOCK (sess); + /* loop over all known sources and do something */ + g_hash_table_foreach (sess->ssrcs[sess->mask_idx], + (GHFunc) session_report_blocks, &data); + + /* add SDES for this source */ + if (data.rtcp) { + session_sdes (sess, data.rtcp); + sess->stats.sent_rtcp = TRUE; + } + + RTP_SESSION_UNLOCK (sess); + + /* push out the RTCP packet */ + if (data.rtcp) { + /* close the RTCP packet */ + gst_rtcp_buffer_end (data.rtcp); + + if (sess->callbacks.send_rtcp) + result = sess->callbacks.send_rtcp (sess, sess->source, data.rtcp, + sess->user_data); + else + gst_buffer_unref (data.rtcp); + } + return result; } |