From fcce4aff924da9dc2f7c86a3a93dfdc1b2cd1d93 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 3 Sep 2007 21:19:34 +0000 Subject: gst/rtpmanager/: Updated example pipelines in docs. Original commit message from CVS: * gst/rtpmanager/gstrtpbin-marshal.list: * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client), (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream), (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found), (create_recv_rtp), (create_recv_rtcp), (create_send_rtp): * gst/rtpmanager/gstrtpbin.h: Updated example pipelines in docs. Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync. Set the default latency correctly. Add some more points where we can get caps. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_class_init), (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_query), (gst_rtp_jitter_buffer_set_property), (gst_rtp_jitter_buffer_get_property): Add ts-offset property to control timestamping. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), (gst_rtp_session_init), (gst_rtp_session_set_property), (gst_rtp_session_get_property), (get_current_ntp_ns_time), (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp), (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate), (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_event_send_rtp_sink), (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), (create_recv_rtcp_sink), (create_send_rtp_sink), (create_send_rtcp_src): Various cleanups. Feed rtpsession manager with NTP time based on pipeline clock when handling RTP packets and RTCP timeouts. Perform all RTCP with the system clock. Set caps on RTCP outgoing buffers. * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc), (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init), (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain): * gst/rtpmanager/gstrtpssrcdemux.h: Also demux RTCP messages. * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks), (update_arrival_stats), (rtp_session_process_rtp), (rtp_session_process_rb), (rtp_session_process_sr), (rtp_session_process_rr), (rtp_session_process_rtcp), (rtp_session_send_rtp), (rtp_session_send_bye), (session_start_rtcp), (session_report_blocks), (session_cleanup), (rtp_session_on_timeout): * gst/rtpmanager/rtpsession.h: Remove the get_time callback, the GStreamer part will feed us with enough timing information. Split sync timing and RTCP timing information. Factor out common RB handling for SR and RR. Send out SR RTCP packets for lip-sync. Move SR and RR packet info generation to the source. * gst/rtpmanager/rtpsource.c: (rtp_source_init), (rtp_source_update_caps), (get_clock_rate), (calculate_jitter), (rtp_source_process_rtp), (rtp_source_send_rtp), (rtp_source_process_sr), (rtp_source_process_rb), (rtp_source_get_new_sr), (rtp_source_get_new_rb), (rtp_source_get_last_sr): * gst/rtpmanager/rtpsource.h: * gst/rtpmanager/rtpstats.h: Use caps on incomming buffers to get timing information when they are there. Calculate clock scew of the receiver compared to the sender and adjust the rtp timestamps. Calculate the round trip in sources. Do SR and RR calculations in the source. --- ChangeLog | 76 ++++++ gst/rtpmanager/gstrtpbin-marshal.list | 1 + gst/rtpmanager/gstrtpbin.c | 470 ++++++++++++++++++++++++++++++---- gst/rtpmanager/gstrtpbin.h | 3 + gst/rtpmanager/gstrtpjitterbuffer.c | 121 +++++++-- gst/rtpmanager/gstrtpsession.c | 229 +++++++++++------ gst/rtpmanager/gstrtpssrcdemux.c | 210 +++++++++++++-- gst/rtpmanager/gstrtpssrcdemux.h | 3 +- gst/rtpmanager/rtpsession.c | 295 +++++++-------------- gst/rtpmanager/rtpsession.h | 38 +-- gst/rtpmanager/rtpsource.c | 334 +++++++++++++++++++++--- gst/rtpmanager/rtpsource.h | 35 ++- gst/rtpmanager/rtpstats.h | 2 + 13 files changed, 1382 insertions(+), 435 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2de3cb0f..af636638 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,79 @@ +2007-09-03 Wim Taymans + + * gst/rtpmanager/gstrtpbin-marshal.list: + * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client), + (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream), + (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found), + (create_recv_rtp), (create_recv_rtcp), (create_send_rtp): + * gst/rtpmanager/gstrtpbin.h: + Updated example pipelines in docs. + Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync. + Set the default latency correctly. + Add some more points where we can get caps. + + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_class_init), + (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop), + (gst_rtp_jitter_buffer_query), + (gst_rtp_jitter_buffer_set_property), + (gst_rtp_jitter_buffer_get_property): + Add ts-offset property to control timestamping. + + * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), + (gst_rtp_session_init), (gst_rtp_session_set_property), + (gst_rtp_session_get_property), (get_current_ntp_ns_time), + (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), + (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp), + (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate), + (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp), + (gst_rtp_session_event_send_rtp_sink), + (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), + (create_recv_rtcp_sink), (create_send_rtp_sink), + (create_send_rtcp_src): + Various cleanups. + Feed rtpsession manager with NTP time based on pipeline clock when + handling RTP packets and RTCP timeouts. + Perform all RTCP with the system clock. + Set caps on RTCP outgoing buffers. + + * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc), + (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init), + (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), + (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), + (gst_rtp_ssrc_demux_rtcp_chain): + * gst/rtpmanager/gstrtpssrcdemux.h: + Also demux RTCP messages. + + * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks), + (update_arrival_stats), (rtp_session_process_rtp), + (rtp_session_process_rb), (rtp_session_process_sr), + (rtp_session_process_rr), (rtp_session_process_rtcp), + (rtp_session_send_rtp), (rtp_session_send_bye), + (session_start_rtcp), (session_report_blocks), (session_cleanup), + (rtp_session_on_timeout): + * gst/rtpmanager/rtpsession.h: + Remove the get_time callback, the GStreamer part will feed us with + enough timing information. + Split sync timing and RTCP timing information. + Factor out common RB handling for SR and RR. + Send out SR RTCP packets for lip-sync. + Move SR and RR packet info generation to the source. + + * gst/rtpmanager/rtpsource.c: (rtp_source_init), + (rtp_source_update_caps), (get_clock_rate), (calculate_jitter), + (rtp_source_process_rtp), (rtp_source_send_rtp), + (rtp_source_process_sr), (rtp_source_process_rb), + (rtp_source_get_new_sr), (rtp_source_get_new_rb), + (rtp_source_get_last_sr): + * gst/rtpmanager/rtpsource.h: + * gst/rtpmanager/rtpstats.h: + Use caps on incomming buffers to get timing information when they are + there. + Calculate clock scew of the receiver compared to the sender and adjust + the rtp timestamps. + Calculate the round trip in sources. + Do SR and RR calculations in the source. + 2007-09-03 Renato Filho * configure.ac: diff --git a/gst/rtpmanager/gstrtpbin-marshal.list b/gst/rtpmanager/gstrtpbin-marshal.list index ca760d82..e5c5fc42 100644 --- a/gst/rtpmanager/gstrtpbin-marshal.list +++ b/gst/rtpmanager/gstrtpbin-marshal.list @@ -3,3 +3,4 @@ BOXED:UINT BOXED:UINT,UINT VOID:UINT,OBJECT VOID:UINT,UINT +VOID:OBJECT,OBJECT diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index cb2a9f0b..a4ba67c3 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -79,12 +79,12 @@ * * gst-launch gstrtpbin name=rtpbin \ * v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \ - * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false \ - * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ - * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ - * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false \ + * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false \ + * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ + * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ + * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false \ * udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1 * * Encode and payload H263 video captured from a v4l2src. Encode and payload AMR @@ -94,21 +94,22 @@ * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003. * RTCP packets for session 0 are received on port 5005 and RTCP for session 1 * is received on port 5007. Since RTCP packets from the sender should be sent - * as soon as possible, sync=false is configured on udpsink. + * as soon as possible and do not participate in preroll, sync=false and + * async=false is configured on udpsink * * * - * gst-launch -v gstrtpbin name=rtpbin \ + * gst-launch -v gstrtpbin name=rtpbin \ * udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \ - * port=5000 ! rtpbin.recv_rtp_sink_0 \ - * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ - * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false \ - * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ - * port=5002 ! rtpbin.recv_rtp_sink_1 \ - * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ - * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false + * port=5000 ! rtpbin.recv_rtp_sink_0 \ + * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ + * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false \ + * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ + * port=5002 ! rtpbin.recv_rtp_sink_1 \ + * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ + * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false * * Receive H263 on port 5000, send it through rtpbin in session 0, depayload, * decode and display the video. @@ -122,7 +123,7 @@ * * * - * Last reviewed on 2007-08-28 (0.10.6) + * Last reviewed on 2007-08-30 (0.10.6) */ #ifdef HAVE_CONFIG_H @@ -130,6 +131,9 @@ #endif #include +#include +#include + #include "gstrtpbin-marshal.h" #include "gstrtpbin.h" @@ -187,6 +191,14 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", GST_STATIC_CAPS ("application/x-rtp") ); +/* padtemplate for the internal pad */ +static GstStaticPadTemplate rtpbin_sync_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink_%d", + GST_PAD_SINK, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); + #define GST_RTP_BIN_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) @@ -242,16 +254,37 @@ struct _GstRtpBinStream { /* the SSRC of this stream */ guint32 ssrc; + /* parent bin */ GstRtpBin *bin; + /* the session this SSRC belongs to */ GstRtpBinSession *session; + /* the jitterbuffer of the SSRC */ GstElement *buffer; + /* the PT demuxer of the SSRC */ GstElement *demux; gulong demux_newpad_sig; gulong demux_ptreq_sig; + + /* the internal pad we use to get RTCP sync messages */ + GstPad *sync_pad; + gboolean have_sync; + guint64 last_unix; + guint64 last_extrtptime; + + /* mapping to local RTP and NTP time */ + guint64 local_rtp; + guint64 local_unix; + gint64 unix_delta; + + /* for lip-sync */ + guint64 clock_base; + gint clock_rate; + gint64 ts_offset; + gint64 prev_ts_offset; }; #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock) @@ -289,12 +322,28 @@ struct _GstRtpBinSession GstPad *recv_rtp_sink; GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; - GstPad *recv_rtcp_src; + GstPad *sync_src; GstPad *send_rtp_sink; GstPad *send_rtp_src; GstPad *send_rtcp_src; }; +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRtpBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + guint cname_len; + + /* the streams */ + guint nstreams; + GSList *streams; + + gint64 min_delta; +}; + /* find a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * find_session_by_id (GstRtpBin * rtpbin, gint id) @@ -513,6 +562,271 @@ gst_rtp_bin_clear_pt_map (GstRtpBin * bin) GST_RTP_BIN_UNLOCK (bin); } +static GstRtpBinClient * +gst_rtp_bin_get_client (GstRtpBin * bin, guint8 len, guint8 * data, + gboolean * created) +{ + GstRtpBinClient *result = NULL; + GSList *walk; + + for (walk = bin->clients; walk; walk = g_slist_next (walk)) { + GstRtpBinClient *client = (GstRtpBinClient *) walk->data; + + if (len != client->cname_len) + continue; + + if (!strncmp ((gchar *) data, client->cname, client->cname_len)) { + GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client, + client->cname); + result = client; + break; + } + } + + /* nothing found, create one */ + if (result == NULL) { + result = g_new0 (GstRtpBinClient, 1); + result->cname = g_strndup ((gchar *) data, len); + result->cname_len = len; + result->min_delta = G_MAXINT64; + bin->clients = g_slist_prepend (bin->clients, result); + GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result, + result->cname); + } + return result; +} + +/* associate a stream to the given CNAME. This will make sure all streams for + * that CNAME are synchronized together. */ +static void +gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, + guint8 * data) +{ + GstRtpBinClient *client; + gboolean created; + GSList *walk; + + /* first find or create the CNAME */ + client = gst_rtp_bin_get_client (bin, len, data, &created); + + /* find stream in the client */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + if (ostream == stream) + break; + } + /* not found, add it to the list */ + if (walk == NULL) { + GST_DEBUG_OBJECT (bin, + "new association of SSRC %08x with client %p with CNAME %s", + stream->ssrc, client, client->cname); + client->streams = g_slist_prepend (client->streams, stream); + client->nstreams++; + } else { + GST_DEBUG_OBJECT (bin, + "found association of SSRC %08x with client %p with CNAME %s", + stream->ssrc, client, client->cname); + } + + /* we can only continue if we know the local clock-base and clock-rate */ + if (stream->clock_base == -1) + goto no_clock_base; + if (stream->clock_rate <= 0) + goto no_clock_rate; + + /* map last RTP time to local timeline using our clock-base */ + stream->local_rtp = stream->last_extrtptime - stream->clock_base; + + GST_DEBUG_OBJECT (bin, + "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT + ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base, + stream->last_extrtptime, stream->local_rtp, stream->clock_rate); + + /* calculate local NTP time in gstreamer timestamp */ + stream->local_unix = + gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND, + stream->clock_rate); + /* calculate delta between server and receiver */ + stream->unix_delta = stream->last_unix - stream->local_unix; + + GST_DEBUG_OBJECT (bin, + "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT + ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix, + stream->unix_delta); + + /* recalc inter stream playout offset, but only if there are more than one + * stream. */ + if (client->nstreams > 1) { + gint64 min; + + /* calculate the min of all deltas */ + min = G_MAXINT64; + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + if (ostream->unix_delta < min) + min = ostream->unix_delta; + } + + GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client, + min); + + /* calculate offsets for each stream */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + ostream->ts_offset = ostream->unix_delta - min; + + /* delta changed, see how much */ + if (ostream->prev_ts_offset != ostream->ts_offset) { + gint64 diff; + + if (ostream->prev_ts_offset > ostream->ts_offset) + diff = ostream->prev_ts_offset - ostream->ts_offset; + else + diff = ostream->ts_offset - ostream->prev_ts_offset; + + /* only change diff when it changed more than 1 millisecond. This + * compensates for rounding errors in NTP to RTP timestamp + * conversions */ + if (diff > GST_MSECOND) + g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL); + + ostream->prev_ts_offset = ostream->ts_offset; + } + GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, + ostream->ssrc, ostream->ts_offset); + } + } + return; + +no_clock_base: + { + GST_WARNING_OBJECT (bin, "we have no clock-base"); + return; + } +no_clock_rate: + { + GST_WARNING_OBJECT (bin, "we have no clock-rate"); + return; + } +} + +#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \ + for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \ + (b) = gst_rtcp_packet_move_to_next ((packet))) + +#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_item ((packet))) + +#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_entry ((packet))) + +static GstFlowReturn +gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstRtpBinStream *stream; + GstRtpBin *bin; + GstRTCPPacket packet; + guint32 ssrc; + guint64 ntptime; + guint32 rtptime; + gboolean have_sr, have_sdes; + gboolean more; + + stream = gst_pad_get_element_private (pad); + bin = stream->bin; + + GST_DEBUG_OBJECT (bin, "received sync packet"); + + if (!gst_rtcp_buffer_validate (buffer)) + goto invalid_rtcp; + + have_sr = FALSE; + have_sdes = FALSE; + GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) { + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + /* only parse first. There is only supposed to be one SR in the packet + * but we will deal with malformed packets gracefully */ + if (have_sr) + break; + /* get NTP and RTP times */ + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime, + NULL, NULL); + + GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc); + /* ignore SR that is not ours */ + if (ssrc != stream->ssrc) + continue; + + have_sr = TRUE; + + /* store values in the stream */ + stream->have_sync = TRUE; + stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); + /* use extended timestamp */ + gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime); + break; + case GST_RTCP_TYPE_SDES: + { + gboolean more_items, more_entries; + + /* only deal with first SDES, there is only supposed to be one SDES in + * the RTCP packet but we deal with bad packets gracefully. Also bail + * out if we have not seen an SR item yet. */ + if (have_sdes || !have_sr) + break; + + GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) { + /* skip items that are not about the SSRC of the sender */ + if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc) + continue; + + /* find the CNAME entry */ + GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) { + GstRTCPSDESType type; + guint8 len; + guint8 *data; + + gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data); + + if (type == GST_RTCP_SDES_CNAME) { + stream->clock_base = GST_BUFFER_OFFSET (buffer); + /* associate the stream to CNAME */ + gst_rtp_bin_associate (bin, stream, len, data); + } + } + } + have_sdes = TRUE; + break; + } + default: + /* we can ignore these packets */ + break; + } + } + + gst_buffer_unref (buffer); + + return ret; + + /* ERRORS */ +invalid_rtcp: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL), + ("invalid RTCP packet received")); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } +} + /* create a new stream with @ssrc in @session. Must be called with * RTP_SESSION_LOCK. */ static GstRtpBinStream * @@ -520,6 +834,8 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) { GstElement *buffer, *demux; GstRtpBinStream *stream; + GstPadTemplate *templ; + gchar *padname; if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL))) goto no_jitterbuffer; @@ -533,8 +849,22 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->session = session; stream->buffer = buffer; stream->demux = demux; + stream->last_extrtptime = -1; + stream->have_sync = FALSE; session->streams = g_slist_prepend (session->streams, stream); + /* make an internal sinkpad for RTCP sync packets. Take ownership of the + * pad. We will link this pad later. */ + padname = g_strdup_printf ("sync_%d", ssrc); + templ = gst_static_pad_template_get (&rtpbin_sync_sink_template); + stream->sync_pad = gst_pad_new_from_template (templ, padname); + gst_object_unref (templ); + gst_object_ref (stream->sync_pad); + gst_object_sink (stream->sync_pad); + gst_pad_set_element_private (stream->sync_pad, stream); + gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain); + gst_pad_set_active (stream->sync_pad, TRUE); + /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); @@ -566,17 +896,6 @@ no_demux: } } -/* Manages the RTP streams that come from one client and should therefore be - * synchronized. - */ -struct _GstRtpBinClient -{ - /* the common CNAME for the streams */ - gchar *cname; - /* the streams */ - GSList *streams; -}; - /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -762,6 +1081,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass) rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); rtpbin->priv->bin_lock = g_mutex_new (); rtpbin->provided_clock = gst_system_clock_obtain (); + rtpbin->latency = DEFAULT_LATENCY_MS; } static void @@ -908,13 +1228,45 @@ no_caps: } } +/* emited when caps changed for the session */ +static void +caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) +{ + GstRtpBin *bin; + GstCaps *caps; + gint payload; + const GstStructure *s; + + bin = session->bin; + + g_object_get (pad, "caps", &caps, NULL); + + if (caps == NULL) + return; + + GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps); + + s = gst_caps_get_structure (caps, 0); + + /* get payload, finish when it's not there */ + if (!gst_structure_get_int (s, "payload", &payload)) + return; + + GST_RTP_SESSION_LOCK (session); + GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload); + g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps); + GST_RTP_SESSION_UNLOCK (session); +} + /* a new pad (SSRC) was created in @session */ static void new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, GstRtpBinSession * session) { GstRtpBinStream *stream; - GstPad *sinkpad; + GstPad *sinkpad, *srcpad; + gchar *padname; + GstCaps *caps; GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); @@ -925,12 +1277,38 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, if (!stream) goto no_stream; + /* get the caps of the pad, we need the clock-rate and base_time if any. */ + if ((caps = gst_pad_get_caps (pad))) { + const GstStructure *s; + guint val; + + GST_DEBUG_OBJECT (session->bin, "pad has caps %" GST_PTR_FORMAT, caps); + + s = gst_caps_get_structure (caps, 0); + + if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate)) + stream->clock_rate = -1; + + if (gst_structure_get_uint (s, "clock-base", &val)) + stream->clock_base = val; + else + stream->clock_base = -1; + } + /* get pad and link */ GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); gst_pad_link (pad, sinkpad); gst_object_unref (sinkpad); + /* get the RTCP sync pad */ + GST_DEBUG_OBJECT (session->bin, "linking sync pad"); + padname = g_strdup_printf ("rtcp_src_%d", ssrc); + srcpad = gst_element_get_pad (element, padname); + g_free (padname); + gst_pad_link (srcpad, stream->sync_pad); + gst_object_unref (srcpad); + /* connect to the new-pad signal of the payload demuxer, this will expose the * new pad by ghosting it. */ stream->demux_newpad_sig = g_signal_connect (stream->demux, @@ -992,6 +1370,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_sink == NULL) goto pad_failed; + g_signal_connect (session->recv_rtp_sink, "notify::caps", + (GCallback) caps_changed, session); + GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); /* get srcpad, link to SSRCDemux */ session->recv_rtp_src = @@ -999,8 +1380,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_src == NULL) goto pad_failed; - GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad"); sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad"); lres = gst_pad_link (session->recv_rtp_src, sinkdpad); gst_object_unref (sinkdpad); if (lres != GST_PAD_LINK_OK) @@ -1057,11 +1439,8 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, GstPad *result; guint sessid; GstRtpBinSession *session; - -#if 0 GstPad *sinkdpad; GstPadLinkReturn lres; -#endif /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) @@ -1083,29 +1462,25 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink != NULL) goto existed; - GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); - /* get recv_rtp pad and store */ + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) goto pad_failed; -#if 0 /* get srcpad, link to SSRCDemux */ GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); - session->recv_rtcp_src = - gst_element_get_static_pad (session->session, "sync_src"); - if (session->recv_rtcp_src == NULL) + session->sync_src = gst_element_get_static_pad (session->session, "sync_src"); + if (session->sync_src == NULL) goto pad_failed; - GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); - sinkdpad = gst_element_get_static_pad (session->demux, "sink"); - lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink"); + lres = gst_pad_link (session->sync_src, sinkdpad); gst_object_unref (sinkdpad); if (lres != GST_PAD_LINK_OK) goto link_failed; -#endif result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); @@ -1136,13 +1511,11 @@ pad_failed: g_warning ("gstrtpbin: failed to get session pad"); return NULL; } -#if 0 link_failed: { g_warning ("gstrtpbin: failed to link pads"); return NULL; } -#endif } /* Create a pad for sending RTP for the session in @name. Must be called with @@ -1180,6 +1553,9 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->send_rtp_sink == NULL) goto pad_failed; + g_signal_connect (session->send_rtp_sink, "notify::caps", + (GCallback) caps_changed, session); + result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); gst_pad_set_active (result, TRUE); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 4dd755f1..874167cc 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -48,6 +48,9 @@ struct _GstRtpBin { /* clock we provide */ GstClock *provided_clock; + /* a list of clients, these are streams with the same CNAME */ + GSList *clients; + /*< private >*/ GstRtpBinPrivate *priv; }; diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index bd1c07ea..a23fbb87 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -99,12 +99,14 @@ enum #define DEFAULT_LATENCY_MS 200 #define DEFAULT_DROP_ON_LATENCY FALSE +#define DEFAULT_TS_OFFSET 0 enum { PROP_0, PROP_LATENCY, - PROP_DROP_ON_LATENCY + PROP_DROP_ON_LATENCY, + PROP_TS_OFFSET }; #define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock)) @@ -137,6 +139,7 @@ struct _GstRtpJitterBufferPrivate /* properties */ guint latency_ms; gboolean drop_on_latency; + gint64 ts_offset; /* the last seqnum we pushed out */ guint32 last_popped_seqnum; @@ -150,6 +153,7 @@ struct _GstRtpJitterBufferPrivate gint32 clock_rate; gint64 clock_base; guint64 exttimestamp; + gint64 prev_ts_offset; /* when we are shutting down */ GstFlowReturn srcresult; @@ -277,6 +281,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) "Drop buffers when maximum latency is reached", "Tells the jitterbuffer to never exceed the given latency in size", DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE)); + /** + * GstRtpJitterBuffer::ts-offset: + * + * Adjust RTP timestamps in the jitterbuffer with offset. + */ + g_object_class_install_property (gobject_class, PROP_TS_OFFSET, + g_param_spec_int64 ("ts-offset", + "Timestamp Offset", + "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64, + G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE)); /** * GstRtpJitterBuffer::request-pt-map: * @buffer: the object which received the signal @@ -421,7 +435,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, { GstRtpJitterBufferPrivate *priv; GstStructure *caps_struct; - const GValue *value; + guint val; priv = jitterbuffer->priv; @@ -443,22 +457,22 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, /* gah, clock-base is uint. If we don't have a base, we will use the first * buffer timestamp as the base time. This will screw up sync but it's better * than nothing. */ - value = gst_structure_get_value (caps_struct, "clock-base"); - if (value && G_VALUE_HOLDS_UINT (value)) { - priv->clock_base = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, - priv->clock_base); - } else + if (gst_structure_get_uint (caps_struct, "clock-base", &val)) + priv->clock_base = val; + else priv->clock_base = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, + priv->clock_base); + /* first expected seqnum */ - value = gst_structure_get_value (caps_struct, "seqnum-base"); - if (value && G_VALUE_HOLDS_UINT (value)) { - priv->next_seqnum = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); - } else + if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) + priv->next_seqnum = val; + else priv->next_seqnum = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); + return TRUE; /* ERRORS */ @@ -929,6 +943,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) GstClockTime timestamp; gint64 running_time; guint64 exttimestamp; + gint ts_offset_rtp; priv = jitterbuffer->priv; @@ -996,8 +1011,11 @@ again: exttimestamp, priv->clock_base); /* if no clock_base was given, take first ts as base */ - if (priv->clock_base == -1) + if (priv->clock_base == -1) { + GST_DEBUG_OBJECT (jitterbuffer, + "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp); priv->clock_base = exttimestamp; + } /* take rtp timestamp offset into account, this can wrap around */ exttimestamp -= priv->clock_base; @@ -1089,6 +1107,34 @@ push_buffer: outbuf = gst_buffer_make_metadata_writable (outbuf); GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); } + + /* apply the timestamp offset */ + if (priv->ts_offset > 0) + ts_offset_rtp = + gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate, + GST_SECOND); + else if (priv->ts_offset < 0) + ts_offset_rtp = + -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate, + GST_SECOND); + else + ts_offset_rtp = 0; + + if (ts_offset_rtp != 0) { + guint32 timestamp; + + /* if the offset changed, mark with discont */ + if (priv->ts_offset != priv->prev_ts_offset) { + GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->prev_ts_offset = priv->ts_offset; + } + + timestamp = gst_rtp_buffer_get_timestamp (outbuf); + timestamp += ts_offset_rtp; + gst_rtp_buffer_set_timestamp (outbuf, timestamp); + } + /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; @@ -1158,6 +1204,7 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) GstClockTime min_latency, max_latency; gboolean us_live; GstPad *peer; + GstClockTime our_latency; if ((peer = gst_pad_get_peer (priv->sinkpad))) { if ((res = gst_pad_query (peer, query))) { @@ -1172,11 +1219,16 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query) priv->peer_latency = min_latency; JBUF_UNLOCK (priv); - min_latency += priv->latency_ms * GST_MSECOND; + our_latency = ((guint64) priv->latency_ms) * GST_MSECOND; + + GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT, + GST_TIME_ARGS (our_latency)); + + min_latency += our_latency; /* max_latency can be -1, meaning there is no upper limit for the * latency. */ if (max_latency != -1) - max_latency += priv->latency_ms * GST_MSECOND; + max_latency += our_latency * GST_MSECOND; GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, @@ -1199,7 +1251,11 @@ static void gst_rtp_jitter_buffer_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + priv = jitterbuffer->priv; switch (prop_id) { case PROP_LATENCY: @@ -1208,23 +1264,29 @@ gst_rtp_jitter_buffer_set_property (GObject * object, /* FIXME, not threadsafe */ new_latency = g_value_get_uint (value); - old_latency = jitterbuffer->priv->latency_ms; + old_latency = priv->latency_ms; - jitterbuffer->priv->latency_ms = new_latency; + priv->latency_ms = new_latency; /* post message if latency changed, this will inform the parent pipeline * that a latency reconfiguration is possible/needed. */ if (new_latency != old_latency) { + GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_latency * GST_MSECOND)); + gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer))); } break; } case PROP_DROP_ON_LATENCY: - { - jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value); + priv->drop_on_latency = g_value_get_boolean (value); + break; + case PROP_TS_OFFSET: + JBUF_LOCK (priv); + priv->ts_offset = g_value_get_int64 (value); + JBUF_UNLOCK (priv); break; - } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1235,14 +1297,23 @@ static void gst_rtp_jitter_buffer_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + priv = jitterbuffer->priv; switch (prop_id) { case PROP_LATENCY: - g_value_set_uint (value, jitterbuffer->priv->latency_ms); + g_value_set_uint (value, priv->latency_ms); break; case PROP_DROP_ON_LATENCY: - g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency); + g_value_set_boolean (value, priv->drop_on_latency); + break; + case PROP_TS_OFFSET: + JBUF_LOCK (priv); + g_value_set_int64 (value, priv->ts_offset); + JBUF_UNLOCK (priv); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 985a3713..e716682c 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -132,6 +132,8 @@ #include "config.h" #endif +#include + #include "gstrtpbin-marshal.h" #include "gstrtpsession.h" #include "rtpsession.h" @@ -214,7 +216,8 @@ enum enum { - PROP_0 + PROP_0, + PROP_NTP_NS_BASE }; #define GST_RTP_SESSION_GET_PRIVATE(obj) \ @@ -234,8 +237,10 @@ struct _GstRtpSessionPrivate GThread *thread; /* caps mapping */ - guint8 pt; - gint clock_rate; + GHashTable *ptmap; + + /* NTP base time */ + guint64 ntpnsbase; }; /* callbacks to handle actions from the session manager */ @@ -245,18 +250,18 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); -static GstClockTime gst_rtp_session_get_time (RTPSession * sess, - gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, gst_rtp_session_send_rtp, gst_rtp_session_send_rtcp, + gst_rtp_session_sync_rtcp, gst_rtp_session_clock_rate, - gst_rtp_session_get_time, gst_rtp_session_reconsider }; @@ -363,6 +368,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; + /** * GstRtpSession::request-pt-map: * @sess: the object which received the signal @@ -490,6 +496,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) (GCallback) on_bye_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); + rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); } static void @@ -513,6 +520,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -528,26 +538,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static guint64 +get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) +{ + guint64 ntpnstime; + + if (clock) { + /* get current NTP time */ + ntpnstime = gst_clock_get_time (clock); + /* convert to running time */ + ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + /* add NTP base offset */ + ntpnstime += rtpsession->priv->ntpnsbase; + } else + ntpnstime = -1; + + return ntpnstime; +} + static void rtcp_thread (GstRtpSession * rtpsession) { - GstClock *clock; + GstClock *sysclock, *clock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; + guint64 ntpnstime; - /* RTCP timeouts we use the system clock */ - clock = gst_system_clock_obtain (); - if (clock == NULL) - goto no_clock; + /* for RTCP timeouts we use the system clock */ + sysclock = gst_system_clock_obtain (); + if (sysclock == NULL) + goto no_sysclock; + + /* to get the current NTP time, we use the pipeline clock */ + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -568,7 +603,7 @@ rtcp_thread (GstRtpSession * rtpsession) break; id = rtpsession->priv->id = - gst_clock_new_single_shot_id (clock, next_timeout); + gst_clock_new_single_shot_id (sysclock, next_timeout); GST_RTP_SESSION_UNLOCK (rtpsession); res = gst_clock_id_wait (id, NULL); @@ -581,7 +616,10 @@ rtcp_thread (GstRtpSession * rtpsession) break; /* update current time */ - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); + + /* get current NTP time */ + ntpnstime = get_current_ntp_ns_time (rtpsession, clock); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -590,18 +628,18 @@ rtcp_thread (GstRtpSession * rtpsession) /* perform actions, we ignore result. Release lock because it might push. */ GST_RTP_SESSION_UNLOCK (rtpsession); - rtp_session_on_timeout (rtpsession->priv->session, current_time); + rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime); GST_RTP_SESSION_LOCK (rtpsession); } GST_RTP_SESSION_UNLOCK (rtpsession); - gst_object_unref (clock); + gst_object_unref (sysclock); GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); return; /* ERRORS */ -no_clock: +no_sysclock: { GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL), ("Could not get system clock")); @@ -662,7 +700,6 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - priv->clock_rate = -1; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; @@ -677,17 +714,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - { - GstClockTime base_time; - - base_time = GST_ELEMENT_CAST (rtpsession)->base_time; - - rtp_session_set_base_time (priv->session, base_time); - if (!start_rtcp_thread (rtpsession)) goto failed_thread; break; - } case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -774,6 +803,15 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, priv = rtpsession->priv; if (rtpsession->send_rtcp_src) { + GstCaps *caps; + + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->send_rtcp_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -784,30 +822,59 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, return result; } -static gboolean -gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps) +/* called when the session manager has an SR RTCP packet ready for handling + * inter stream synchronisation */ +static GstFlowReturn +gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data) { + GstFlowReturn result; + GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; - const GstStructure *caps_struct; + rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; - GST_DEBUG_OBJECT (rtpsession, "parsing caps"); + if (rtpsession->sync_src) { + GstCaps *caps; - caps_struct = gst_caps_get_structure (caps, 0); - if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate)) - goto no_clock_rate; + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->sync_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); + GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP"); + result = gst_pad_push (rtpsession->sync_src, buffer); + } else { + GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad"); + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +static void +gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps) +{ + GstRtpSessionPrivate *priv; + const GstStructure *s; + gint payload; - GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate); + priv = rtpsession->priv; - return TRUE; + GST_DEBUG_OBJECT (rtpsession, "parsing caps"); - /* ERRORS */ -no_clock_rate: - { - GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); - return FALSE; - } + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "payload", &payload)) + return; + + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + return; + + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps); } /* called when the session manager needs the clock rate */ @@ -821,13 +888,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, GValue ret = { 0 }; GValue args[2] = { {0}, {0} }; GstCaps *caps; + const GstStructure *s; rtpsession = GST_RTP_SESSION_CAST (user_data); priv = rtpsession->priv; - /* if we have it, return it */ - if (priv->clock_rate != -1) - return priv->clock_rate; + GST_RTP_SESSION_LOCK (rtpsession); + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + goto done; g_value_init (&args[0], GST_TYPE_ELEMENT); g_value_set_object (&args[0], rtpsession); @@ -844,10 +913,16 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, if (!caps) goto no_caps; - if (!gst_rtp_session_parse_caps (rtpsession, caps)) - goto parse_failed; + gst_rtp_session_cache_caps (rtpsession, caps); + + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "clock-rate", &result)) + goto no_clock_rate; + + GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result); - result = priv->clock_rate; +done: + GST_RTP_SESSION_UNLOCK (rtpsession); return result; @@ -855,35 +930,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, no_caps: { GST_DEBUG_OBJECT (rtpsession, "could not get caps"); - return -1; + goto done; } -parse_failed: +no_clock_rate: { - GST_DEBUG_OBJECT (rtpsession, "failed to parse caps"); - return -1; + GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); + goto done; } } -/* called when the session manager needs the time of clock */ -static GstClockTime -gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) -{ - GstClockTime result; - GstRtpSession *rtpsession; - GstClock *clock; - - rtpsession = GST_RTP_SESSION_CAST (user_data); - - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - if (clock) { - result = gst_clock_get_time (clock); - gst_object_unref (clock); - } else - result = GST_CLOCK_TIME_NONE; - - return result; -} - /* called when the session manager asks us to reconsider the timeout */ static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data) @@ -925,18 +980,19 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) static gboolean gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps) { - gboolean res; GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; - res = gst_rtp_session_parse_caps (rtpsession, caps); + GST_RTP_SESSION_LOCK (rtpsession); + gst_rtp_session_cache_caps (rtpsession, caps); + GST_RTP_SESSION_UNLOCK (rtpsession); gst_object_unref (rtpsession); - return res; + return TRUE; } /* receive a packet from a sender, send it to the RTP session manager and @@ -948,13 +1004,17 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_process_rtp (priv->session, buffer); + ntpnstime = + get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + + ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1051,8 +1111,6 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) gst_segment_set_newsegment_full (segment, update, rate, arate, format, start, stop, time); - rtp_session_set_timestamp_sync (priv->session, start); - /* push event forward */ ret = gst_pad_push_event (rtpsession->send_rtp_src, event); break; @@ -1075,13 +1133,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + GstClockTime timestamp; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_send_rtp (priv->session, buffer); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment start value. */ + ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime += priv->ntpnsbase; + } else + ntpnstime = -1; + + ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1113,6 +1182,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1142,6 +1212,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession) rtpsession->sync_src = gst_pad_new_from_static_template (&rtpsession_sync_src_template, "sync_src"); + gst_pad_use_fixed_caps (rtpsession->sync_src); gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); @@ -1172,6 +1243,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession) rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtp_src); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -1190,6 +1262,7 @@ create_send_rtcp_src (GstRtpSession * rtpsession) rtpsession->send_rtcp_src = gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, "send_rtcp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtcp_src); gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtcp_src); diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 539b03c7..5457bc35 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -52,6 +52,7 @@ #include #include +#include #include "gstrtpbin-marshal.h" #include "gstrtpssrcdemux.h" @@ -67,6 +68,13 @@ GST_STATIC_PAD_TEMPLATE ("sink", GST_STATIC_CAPS ("application/x-rtp") ); +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstStaticPadTemplate rtp_ssrc_demux_src_template = GST_STATIC_PAD_TEMPLATE ("src_%d", GST_PAD_SRC, @@ -74,6 +82,13 @@ GST_STATIC_PAD_TEMPLATE ("src_%d", GST_STATIC_CAPS ("application/x-rtp") ); +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); + static GstElementDetails gst_rtp_ssrc_demux_details = { "RTP SSRC Demux", "Demux/Network/RTP", @@ -103,6 +118,11 @@ static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement * static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event); +static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, + GstBuffer * buf); +static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, + GstEvent * event); + /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); @@ -113,59 +133,78 @@ static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; */ struct _GstRtpSsrcDemuxPad { - GstPad *pad; guint32 ssrc; + GstPad *rtp_pad; GstCaps *caps; + GstPad *rtcp_pad; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found */ -static GstPad * -find_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +static GstRtpSsrcDemuxPad * +find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GSList *walk; - for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) { + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; if (pad->ssrc == ssrc) - return pad->pad; + return pad; } return NULL; } -static GstPad * -create_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +static GstRtpSsrcDemuxPad * +create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { - GstPad *result; + GstPad *rtp_pad, *rtcp_pad; GstElementClass *klass; GstPadTemplate *templ; gchar *padname; GstRtpSsrcDemuxPad *demuxpad; + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + klass = GST_ELEMENT_GET_CLASS (demux); templ = gst_element_class_get_pad_template (klass, "src_%d"); padname = g_strdup_printf ("src_%d", ssrc); - result = gst_pad_new_from_template (templ, padname); + rtp_pad = gst_pad_new_from_template (templ, padname); + g_free (padname); + + templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d"); + padname = g_strdup_printf ("rtcp_src_%d", ssrc); + rtcp_pad = gst_pad_new_from_template (templ, padname); g_free (padname); /* wrap in structure and add to list */ demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; - demuxpad->pad = result; - demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad); + demuxpad->rtp_pad = rtp_pad; + demuxpad->rtcp_pad = rtcp_pad; + + demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); /* copy caps from input */ - gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_use_fixed_caps (rtp_pad); + gst_pad_set_caps (rtcp_pad, GST_PAD_CAPS (demux->rtcp_sink)); + gst_pad_use_fixed_caps (rtcp_pad); - gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event); - gst_pad_set_active (result, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (demux), result); + gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); + gst_pad_set_active (rtp_pad, TRUE); + gst_pad_set_active (rtcp_pad, TRUE); + + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); g_signal_emit (G_OBJECT (demux), - gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result); + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); + + GST_OBJECT_LOCK (demux); - return result; + return demuxpad; } static void @@ -175,8 +214,12 @@ gst_rtp_ssrc_demux_base_init (gpointer g_class) gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_sink_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template)); gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_src_template)); + gst_element_class_add_pad_template (gstelement_klass, + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template)); gst_element_class_set_details (gstelement_klass, &gst_rtp_ssrc_demux_details); } @@ -226,6 +269,14 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); + + demux->rtcp_sink = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "rtcp_sink"), "rtcp_sink"); + gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain); + gst_pad_set_event_function (demux->rtcp_sink, + gst_rtp_ssrc_demux_rtcp_sink_event); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); } static void @@ -249,21 +300,63 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_NEWSEGMENT: default: - res = gst_pad_event_default (pad, event); + { + GSList *walk; + + res = TRUE; + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; + + gst_event_ref (event); + res &= gst_pad_push_event (pad->rtp_pad, event); + } + GST_OBJECT_UNLOCK (demux); + gst_event_unref (event); break; + } } gst_object_unref (demux); return res; } +static gboolean +gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event) +{ + GstRtpSsrcDemux *demux; + gboolean res = FALSE; + + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_NEWSEGMENT: + default: + { + GSList *walk; + + res = TRUE; + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; + + res &= gst_pad_push_event (pad->rtcp_pad, event); + } + GST_OBJECT_UNLOCK (demux); + break; + } + } + gst_object_unref (demux); + return res; +} + static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) { GstFlowReturn ret; GstRtpSsrcDemux *demux; guint32 ssrc; - GstPad *srcpad; + GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); @@ -274,16 +367,16 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); - srcpad = find_rtp_pad_for_ssrc (demux, ssrc); - if (srcpad == NULL) { - GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); - srcpad = create_rtp_pad_for_ssrc (demux, ssrc); - if (!srcpad) + GST_OBJECT_LOCK (demux); + dpad = find_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) { + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) goto create_failed; } + GST_OBJECT_UNLOCK (demux); /* push to srcpad */ - ret = gst_pad_push (srcpad, buf); + ret = gst_pad_push (dpad->rtp_pad, buf); return ret; @@ -298,9 +391,74 @@ invalid_payload: } create_failed: { - /* this is not fatal yet */ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); + GST_OBJECT_UNLOCK (demux); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +} + +static GstFlowReturn +gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) +{ + GstFlowReturn ret; + GstRtpSsrcDemux *demux; + guint32 ssrc; + GstRtpSsrcDemuxPad *dpad; + GstRTCPPacket packet; + + demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); + + if (!gst_rtcp_buffer_validate (buf)) + goto invalid_rtcp; + + if (!gst_rtcp_buffer_get_first_packet (buf, &packet)) + goto invalid_rtcp; + + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, + NULL); + break; + case GST_RTCP_TYPE_RR: + ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); + break; + default: + goto invalid_rtcp; + } + + GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc); + + GST_OBJECT_LOCK (demux); + dpad = find_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) { + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) + goto create_failed; + } + GST_OBJECT_UNLOCK (demux); + + /* push to srcpad */ + ret = gst_pad_push (dpad->rtcp_pad, buf); + + return ret; + + /* ERRORS */ +invalid_rtcp: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), + ("Dropping invalid RTCP packet")); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +create_failed: + { + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), + ("Could not create new pad")); + GST_OBJECT_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 5d93330d..bea2769d 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -37,7 +37,8 @@ struct _GstRtpSsrcDemux GstElement parent; GstPad *rtp_sink; - GSList *rtp_srcpads; + GstPad *rtcp_sink; + GSList *srcpads; }; struct _GstRtpSsrcDemuxClass diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 275e7c74..e7f72b40 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -23,6 +23,8 @@ #include #include +#include "gstrtpbin-marshal.h" + #include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); @@ -332,8 +334,8 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, sess->callbacks.process_rtp = callbacks->process_rtp; sess->callbacks.send_rtp = callbacks->send_rtp; sess->callbacks.send_rtcp = callbacks->send_rtcp; + sess->callbacks.sync_rtcp = callbacks->sync_rtcp; sess->callbacks.clock_rate = callbacks->clock_rate; - sess->callbacks.get_time = callbacks->get_time; sess->callbacks.reconsider = callbacks->reconsider; sess->user_data = user_data; } @@ -911,13 +913,14 @@ rtp_session_create_source (RTPSession * sess) */ static void update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, - gboolean rtp, GstBuffer * buffer) + gboolean rtp, GstBuffer * buffer, guint64 ntpnstime) { - /* get time or arrival */ - if (sess->callbacks.get_time) - arrival->time = sess->callbacks.get_time (sess, sess->user_data); - else - arrival->time = GST_CLOCK_TIME_NONE; + GTimeVal current; + + /* get time of arrival */ + g_get_current_time (¤t); + arrival->time = GST_TIMEVAL_TO_TIME (current); + arrival->ntpnstime = ntpnstime; /* get packet size including header overhead */ arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len; @@ -941,6 +944,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * rtp_session_process_rtp: * @sess: and #RTPSession * @buffer: an RTP buffer + * @ntpnstime: the NTP arrival time in nanoseconds * * Process an RTP buffer in the session manager. This function takes ownership * of @buffer. @@ -948,7 +952,8 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer, + guint64 ntpnstime) { GstFlowReturn result; guint32 ssrc; @@ -965,7 +970,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, TRUE, buffer); + update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime); /* ignore more RTP packets when we left the session */ if (sess->source->received_bye) @@ -1047,6 +1052,33 @@ ignore: } } +static void +rtp_session_process_rb (RTPSession * sess, RTPSource * source, + GstRTCPPacket * packet, RTPArrivalStats * arrival) +{ + guint count, i; + + count = gst_rtcp_packet_get_rb_count (packet); + for (i = 0; i < count; i++) { + guint32 ssrc, exthighestseq, jitter, lsr, dlsr; + guint8 fractionlost; + gint32 packetslost; + + gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, + &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + + GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter); + + if (ssrc == sess->source->ssrc) { + /* only deal with report blocks for our session, we update the stats of + * the sender of the RTCP message. We could also compare our stats against + * the other sender to see if we are better or worse. */ + rtp_source_process_rb (source, arrival->time, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); + } + } +} + /* A Sender report contains statistics about how the sender is doing. This * includes timing informataion such as the relation between RTP and NTP * timestamps and the number of packets/bytes it sent to us. @@ -1062,7 +1094,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, { guint32 senderssrc, rtptime, packet_count, octet_count; guint64 ntptime; - guint count, i; RTPSource *source; gboolean created, prevsender; @@ -1074,11 +1105,13 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + GST_BUFFER_OFFSET (packet->buffer) = source->clock_base; + prevsender = RTP_SOURCE_IS_SENDER (source); /* first update the source */ - rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count, - arrival->time); + rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count, + octet_count); if (prevsender != RTP_SOURCE_IS_SENDER (source)) { sess->stats.sender_sources++; @@ -1089,25 +1122,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter); - - if (ssrc == sess->source->ssrc) { - /* only deal with report blocks for our session, we update the stats of - * the sender of the RTCP message. We could also compare our stats against - * the other sender to see if we are better or worse. */ - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* A receiver report contains statistics about how a receiver is doing. It @@ -1121,7 +1136,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, RTPArrivalStats * arrival) { guint32 senderssrc; - guint count, i; RTPSource *source; gboolean created; @@ -1134,20 +1148,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, if (created) on_new_ssrc (sess, source); - count = gst_rtcp_packet_get_rb_count (packet); - for (i = 0; i < count; i++) { - guint32 ssrc, exthighestseq, jitter, lsr, dlsr; - guint8 fractionlost; - gint32 packetslost; - - gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, - &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); - - if (ssrc == sess->source->ssrc) { - rtp_source_process_rb (source, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); - } - } + rtp_session_process_rb (sess, source, packet, arrival); } /* FIXME, we're just printing this for now... */ @@ -1280,7 +1281,8 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, * @sess: and #RTPSession * @buffer: an RTCP buffer * - * Process an RTCP buffer in the session manager. + * Process an RTCP buffer in the session manager. This function takes ownership + * of @buffer. * * Returns: a #GstFlowReturn. */ @@ -1288,8 +1290,9 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) { GstRTCPPacket packet; - gboolean more, is_bye = FALSE; + gboolean more, is_bye = FALSE, is_sr = FALSE; RTPArrivalStats arrival; + GstFlowReturn result = GST_FLOW_OK; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1301,7 +1304,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) RTP_SESSION_LOCK (sess); /* update arrival stats */ - update_arrival_stats (sess, &arrival, FALSE, buffer); + update_arrival_stats (sess, &arrival, FALSE, buffer, -1); if (sess->sent_bye) goto ignore; @@ -1322,6 +1325,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) switch (type) { case GST_RTCP_TYPE_SR: rtp_session_process_sr (sess, &packet, &arrival); + is_sr = TRUE; break; case GST_RTCP_TYPE_RR: rtp_session_process_rr (sess, &packet, &arrival); @@ -1357,14 +1361,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) } RTP_SESSION_UNLOCK (sess); - gst_buffer_unref (buffer); + /* notify caller of sr packets in the callback */ + if (is_sr && sess->callbacks.sync_rtcp) + result = sess->callbacks.sync_rtcp (sess, sess->source, buffer, + sess->user_data); + else + gst_buffer_unref (buffer); - return GST_FLOW_OK; + return result; /* ERRORS */ invalid_packet: { GST_DEBUG ("invalid RTCP packet received"); + gst_buffer_unref (buffer); return GST_FLOW_OK; } ignore: @@ -1380,6 +1390,7 @@ ignore: * rtp_session_send_rtp: * @sess: an #RTPSession * @buffer: an RTP buffer + * @ntptime: the NTP time of when this buffer was captured. * * Send the RTP buffer in the session manager. This function takes ownership of * @buffer. @@ -1387,11 +1398,12 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) +rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + GTimeVal current; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); @@ -1405,14 +1417,13 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) source = sess->source; /* update last activity */ - if (sess->callbacks.get_time) - source->last_rtp_activity = - sess->callbacks.get_time (sess, sess->user_data); + g_get_current_time (¤t); + source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current); prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (sess->source, buffer); + result = rtp_source_send_rtp (sess->source, buffer, ntptime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; @@ -1429,36 +1440,6 @@ invalid_packet: } } -/** - * rtp_session_set_send_sync - * @sess: an #RTPSession - * @base_time: the clock base time - * @start_time: the timestamp start time - * - * Establish a relation between the times returned by the get_time callback and - * the buffer timestamps. This information is used to convert the NTP times to - * RTP timestamps. - */ -void -rtp_session_set_base_time (RTPSession * sess, GstClockTime base_time) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->base_time = base_time; - RTP_SESSION_UNLOCK (sess); -} - -void -rtp_session_set_timestamp_sync (RTPSession * sess, GstClockTime start_timestamp) -{ - g_return_if_fail (RTP_IS_SESSION (sess)); - - RTP_SESSION_LOCK (sess); - sess->start_timestamp = start_timestamp; - RTP_SESSION_UNLOCK (sess); -} - static GstClockTime calculate_rtcp_interval (RTPSession * sess, gboolean deterministic, gboolean first) @@ -1498,6 +1479,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) GstFlowReturn result = GST_FLOW_OK; RTPSource *source; GstClockTime current, interval; + GTimeVal curtv; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); @@ -1518,10 +1500,8 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason) sess->sent_bye = FALSE; /* get current time */ - if (sess->callbacks.get_time) - current = sess->callbacks.get_time (sess, sess->user_data); - else - current = 0; + g_get_current_time (&curtv); + current = GST_TIMEVAL_TO_TIME (curtv); /* reschedule transmission */ sess->last_rtcp_send_time = current; @@ -1543,12 +1523,12 @@ done: /** * rtp_session_next_timeout: * @sess: an #RTPSession - * @time: the current time + * @time: the current system time * * Get the next time we should perform session maintenance tasks. * * Returns: a time when rtp_session_on_timeout() should be called with the - * current time. + * current system time. */ GstClockTime rtp_session_next_timeout (RTPSession * sess, GstClockTime time) @@ -1588,6 +1568,7 @@ typedef struct RTPSession *sess; GstBuffer *rtcp; GstClockTime time; + guint64 ntpnstime; GstClockTime interval; GstRTCPPacket packet; gboolean is_bye; @@ -1605,60 +1586,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data) if (RTP_SOURCE_IS_SENDER (own)) { guint64 ntptime; guint32 rtptime; - GstClockTime running_time; - GstClockTimeDiff diff; + guint32 packet_count, octet_count; /* we are a sender, create SR */ GST_DEBUG ("create SR for SSRC %08x", own->ssrc); gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet); - /* use the sync params to interpollate the date->time member to rtptime. We - * use the last sent timestamp and rtptime as reference points. We assume - * that the slope of the rtptime vs timestamp curve is 1, which is certainly - * sufficient for the frequency at which we report SR and the rate we send - * out RTP packets. */ - rtptime = own->last_rtptime; - GST_DEBUG ("last_timestamp %" GST_TIME_FORMAT ", last_rtptime %" - G_GUINT32_FORMAT, GST_TIME_ARGS (own->last_timestamp), rtptime); - - if (own->clock_rate != -1) { - /* Start by calculating the running_time of the timestamp, this is a result - * in nanoseconds. */ - running_time = - (own->last_timestamp - sess->start_timestamp) + sess->base_time; - - /* get the diff with the SR time */ - diff = GST_CLOCK_DIFF (running_time, data->time); - - /* now translate the diff to RTP time, handle positive and negative cases. - * If there is no diff, we already set rtptime correctly above. */ - if (diff > 0) { - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime += gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } else { - diff = -diff; - GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, - GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff)); - rtptime -= gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND); - } - } else { - GST_WARNING ("no clock-rate, cannot interpollate rtp time"); - } - - /* convert clock time to NTP time. upper 32 bits should contain the seconds - * and the lower 32 bits, the fractions of a second. */ - ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND); - /* conversion from unix timestamp (seconds since 1970) to NTP (seconds - * since 1900). FIXME nothing says that the time is in unix timestamps. */ - ntptime += (2208988800LL << 32); + /* get latest stats */ + rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime, + &packet_count, &octet_count); + /* store stats */ + rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count, + octet_count); - GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, - (guint32) (ntptime >> 32), (guint32) (ntptime & 0xffffffff), rtptime); - - /* fill in sender report info, FIXME RTP timestamps missing */ + /* fill in sender report info */ gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc, - ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent); + ntptime, rtptime, packet_count, octet_count); } else { /* we are only receiver, create RR */ GST_DEBUG ("create RR for SSRC %08x", own->ssrc); @@ -1681,63 +1624,18 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data) if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) { /* only report about other sender sources */ if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) { - RTPSourceStats *stats; - guint64 extended_max, expected; - guint64 expected_interval, received_interval, ntptime; - gint64 lost, lost_interval; - guint32 fraction, LSR, DLSR; - GstClockTime time; - - stats = &source->stats; - - extended_max = stats->cycles + stats->max_seq; - expected = extended_max - stats->base_seq + 1; - - GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT - ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, - extended_max, expected, stats->packets_received, stats->base_seq); + guint8 fractionlost; + gint32 packetslost; + guint32 exthighestseq, jitter; + guint32 lsr, dlsr; - lost = expected - stats->packets_received; - lost = CLAMP (lost, -0x800000, 0x7fffff); - - expected_interval = expected - stats->prev_expected; - stats->prev_expected = expected; - received_interval = stats->packets_received - stats->prev_received; - stats->prev_received = stats->packets_received; - - lost_interval = expected_interval - received_interval; - - if (expected_interval == 0 || lost_interval <= 0) - fraction = 0; - else - fraction = (lost_interval << 8) / expected_interval; - - GST_DEBUG ("add RR for SSRC %08x", source->ssrc); - /* we scaled the jitter up for additional precision */ - GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT - ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, - extended_max, stats->jitter >> 4); - - if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) { - GstClockTime diff; - - /* LSR is middle bits of the last ntptime */ - LSR = (ntptime >> 16) & 0xffffffff; - diff = data->time - time; - GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); - /* DLSR, delay since last SR is expressed in 1/65536 second units */ - DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); - } else { - /* No valid SR received, LSR/DLSR are set to 0 then */ - GST_DEBUG ("no valid SR received"); - LSR = 0; - DLSR = 0; - } - GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR); + /* get new stats */ + rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost, + &exthighestseq, &jitter, &lsr, &dlsr); /* packet is not yet filled, add report block for this source. */ - gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost, - extended_max, stats->jitter >> 4, LSR, DLSR); + gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost, + exthighestseq, jitter, lsr, dlsr); } } } @@ -1784,7 +1682,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data) if (is_sender) { if (data->time > source->last_rtp_activity) { interval = MAX (data->interval * 2, 5 * GST_SECOND); - if (data->time - source->last_rtp_activity > interval) { GST_DEBUG ("sender source %08x timed out and became receiver, last %" GST_TIME_FORMAT, source->ssrc, @@ -1897,6 +1794,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) /** * rtp_session_on_timeout: * @sess: an #RTPSession + * @time: the current system time + * @ntpnstime: the current NTP time in nanoseconds * * Perform maintenance actions after the timeout obtained with * rtp_session_next_timeout() expired. @@ -1910,21 +1809,23 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data) * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_on_timeout (RTPSession * sess, GstClockTime time) +rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; ReportData data; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime)); + data.sess = sess; data.rtcp = NULL; data.time = time; + data.ntpnstime = ntpnstime; data.is_bye = FALSE; data.has_sdes = FALSE; - GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time)); - RTP_SESSION_LOCK (sess); /* get a new interval, we need this for various cleanups etc */ data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp); diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 9380b55e..d7dbb784 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -82,28 +82,30 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, Gs typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); /** - * RTPSessionClockRate: + * RTPSessionSyncRTCP: * @sess: an #RTPSession - * @payload: the payload + * @src: the #RTPSource + * @buffer: the RTCP buffer ready for sending * @user_data: user data specified when registering * - * This callback will be called when @sess needs the clock-rate of @payload. + * This callback will be called when @sess has and SR @buffer ready for doing + * synchronisation between streams. * - * Returns: the clock-rate of @pt. + * Returns: a #GstFlowReturn. */ -typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); +typedef GstFlowReturn (*RTPSessionSyncRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); /** - * RTPSessionGetTime: + * RTPSessionClockRate: * @sess: an #RTPSession + * @payload: the payload * @user_data: user data specified when registering * - * This callback will be called when @sess needs the current time in - * nanoseconds. + * This callback will be called when @sess needs the clock-rate of @payload. * - * Returns: a #GstClockTime with the current time in nanoseconds. + * Returns: the clock-rate of @pt. */ -typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data); +typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); /** * RTPSessionReconsider: @@ -121,7 +123,7 @@ typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data); * @RTPSessionProcessRTP: callback to process RTP packets * @RTPSessionSendRTP: callback for sending RTP packets * @RTPSessionSendRTCP: callback for sending RTCP packets - * @RTPSessionGetTime: callback for returning the current time + * @RTPSessionSyncRTCP: callback for handling SR packets * @RTPSessionReconsider: callback for reconsidering the timeout * * These callbacks can be installed on the session manager to get notification @@ -132,8 +134,8 @@ typedef struct { RTPSessionProcessRTP process_rtp; RTPSessionSendRTP send_rtp; RTPSessionSendRTCP send_rtcp; + RTPSessionSyncRTCP sync_rtcp; RTPSessionClockRate clock_rate; - RTPSessionGetTime get_time; RTPSessionReconsider reconsider; } RTPSessionCallbacks; @@ -190,8 +192,7 @@ struct _RTPSession { RTPSessionStats stats; - /* for mapping RTP time to NTP time */ - GstClockTime start_timestamp; + /* for mapping clock time to NTP time */ GstClockTime base_time; }; @@ -250,18 +251,17 @@ RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gcha RTPSource* rtp_session_create_source (RTPSession *sess); /* processing packets from receivers */ -GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer); +GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntpnstime); GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer); /* processing packets for sending */ -GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer); -void rtp_session_set_base_time (RTPSession *sess, GstClockTime base_time); -void rtp_session_set_timestamp_sync (RTPSession *sess, GstClockTime start_timestamp); +GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntptime); + /* stopping the session */ GstFlowReturn rtp_session_send_bye (RTPSession *sess, const gchar *reason); /* get interval for next RTCP interval */ GstClockTime rtp_session_next_timeout (RTPSession *sess, GstClockTime time); -GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time); +GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time, guint64 ntpnstime); #endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index 24bb8466..63543358 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -68,7 +68,12 @@ rtp_source_init (RTPSource * src) src->payload = 0; src->clock_rate = -1; + src->clock_base = -1; + src->skew_base_ntpnstime = -1; + src->ext_rtptime = -1; + src->prev_ext_rtptime = -1; src->packets = g_queue_new (); + src->seqnum_base = -1; src->stats.cycles = -1; src->stats.jitter = 0; @@ -111,6 +116,44 @@ rtp_source_new (guint32 ssrc) return src; } +/** + * rtp_source_update_caps: + * @src: an #RTPSource + * @caps: a #GstCaps + * + * Parse @caps and store all relevant information in @source. + */ +void +rtp_source_update_caps (RTPSource * src, GstCaps * caps) +{ + GstStructure *s; + guint val; + gint ival; + + /* nothing changed, return */ + if (src->caps == caps) + return; + + s = gst_caps_get_structure (caps, 0); + + if (gst_structure_get_int (s, "payload", &ival)) + src->payload = ival; + GST_DEBUG ("got payload %d", src->payload); + + gst_structure_get_int (s, "clock-rate", &src->clock_rate); + GST_DEBUG ("got clock-rate %d", src->clock_rate); + + if (gst_structure_get_uint (s, "clock-base", &val)) + src->clock_base = val; + GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base); + + if (gst_structure_get_uint (s, "seqnum-base", &val)) + src->seqnum_base = val; + GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base); + + gst_caps_replace (&src->caps, caps); +} + /** * rtp_source_set_callbacks: * @src: an #RTPSource @@ -207,7 +250,7 @@ push_packet (RTPSource * src, GstBuffer * buffer) static gint get_clock_rate (RTPSource * src, guint8 payload) { - if (payload != src->payload) { + if (src->clock_rate == -1) { gint clock_rate = -1; if (src->callbacks.clock_rate) @@ -216,8 +259,9 @@ get_clock_rate (RTPSource * src, guint8 payload) GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate); src->clock_rate = clock_rate; - src->payload = payload; } + src->payload = payload; + return src->clock_rate; } @@ -225,14 +269,17 @@ static void calculate_jitter (RTPSource * src, GstBuffer * buffer, RTPArrivalStats * arrival) { - GstClockTime current; + guint64 ntpnstime; guint32 rtparrival, transit, rtptime; + guint64 ext_rtptime; gint32 diff; gint clock_rate; guint8 pt; + guint64 rtpdiff, ntpdiff; + gint64 skew; /* get arrival time */ - if ((current = arrival->time) == GST_CLOCK_TIME_NONE) + if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE) goto no_time; pt = gst_rtp_buffer_get_payload_type (buffer); @@ -243,8 +290,56 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer, rtptime = gst_rtp_buffer_get_timestamp (buffer); - /* convert arrival time to RTP timestamp units */ - rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND); + /* convert to extended timestamp right away */ + ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime); + + /* no clock-base, take first rtptime as base */ + if (src->clock_base == -1) { + GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime); + src->clock_base = rtptime; + } + + if (src->skew_base_ntpnstime == -1) { + /* lock on first observed NTP and RTP time, they should increment in-sync or + * we have a clock skew. */ + GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime)); + src->skew_base_ntpnstime = ntpnstime; + src->skew_base_rtptime = rtptime; + src->prev_ext_rtptime = ext_rtptime; + src->avg_skew = 0; + } else if (src->prev_ext_rtptime < ext_rtptime) { + /* get elapsed rtptime but only when the previous rtptime was stricly smaller + * than the new one. */ + rtpdiff = ext_rtptime - src->skew_base_rtptime; + /* get NTP diff and convert to RTP time, this is always positive */ + ntpdiff = ntpnstime - src->skew_base_ntpnstime; + ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND); + + /* see how the NTP and RTP relate any deviation from 0 means that they drift + * out of sync and we must compensate. */ + skew = ntpdiff - rtpdiff; + /* average out the skew to get a smooth value. */ + src->avg_skew = (31 * src->avg_skew + skew) / 32; + + GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew, + src->avg_skew); + if (src->avg_skew != 0) { + guint32 timestamp; + + /* patch the buffer RTP timestamp with the skew */ + GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew); + timestamp = gst_rtp_buffer_get_timestamp (buffer); + timestamp += src->avg_skew; + gst_rtp_buffer_set_timestamp (buffer, timestamp); + } + /* store previous extended timestamp */ + src->prev_ext_rtptime = ext_rtptime; + } + + /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't + * care about the absolute value, just the difference. */ + rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND); /* transit time is difference with RTP timestamp */ transit = rtparrival - rtptime; @@ -324,6 +419,8 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, seqnr = gst_rtp_buffer_get_seq (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + if (stats->cycles == -1) { GST_DEBUG ("received first buffer"); /* first time we heard of this source */ @@ -389,6 +486,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, } } else { /* duplicate or reordered packet, will be filtered by jitterbuffer. */ + GST_WARNING ("duplicate or reordered packet"); } src->stats.octets_received += arrival->payload_len; @@ -401,7 +499,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, GST_DEBUG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, seqnr, src->stats.packets_received, src->stats.octets_received); - /* calculate jitter */ + /* calculate jitter and perform skew correction */ calculate_jitter (src, buffer, arrival); /* we're ready to push the RTP packet now */ @@ -444,25 +542,27 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason) * rtp_source_send_rtp: * @src: an #RTPSource * @buffer: an RTP buffer + * @ntpnstime: the NTP time when this buffer was captured in nanoseconds * * Send an RTP @buffer originating from @src. This will make @src a sender. * This function takes ownership of @buffer and modifies the SSRC in the RTP - * packet to that of @src. + * packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) +rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) { GstFlowReturn result = GST_FLOW_OK; guint len; - GstClockTime timestamp; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); len = gst_rtp_buffer_get_payload_len (buffer); + rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); + /* we are a sender now */ src->is_sender = TRUE; @@ -471,18 +571,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) src->stats.octets_sent += len; /* we keep track of the last received RTP timestamp and the corresponding - * GStreamer timestamp so that we can convert NTP time to RTP time when - * sending SR reports */ + * NTP timestamp so that we can use this info when constructing SR reports */ src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer); - - /* the timestamp can be undefined, in that case we use any previously - * received timestamp */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); - if (timestamp != -1) - src->last_timestamp = timestamp; - - if (src->clock_rate == -1) - get_clock_rate (src, gst_rtp_buffer_get_payload_type (buffer)); + src->last_ntpnstime = ntpnstime; /* push packet */ if (src->callbacks.push_rtp) { @@ -496,7 +587,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) * get the correct SSRC. */ buffer = gst_buffer_make_writable (buffer); - GST_DEBUG ("updating SSRC from %u to %u", ssrc, src->ssrc); + GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc); gst_rtp_buffer_set_ssrc (buffer, src->ssrc); } GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT, @@ -513,17 +604,17 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) /** * rtp_source_process_sr: * @src: an #RTPSource + * @time: time of packet arrival * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count - * @time: time of packet arrival * * Update the sender report in @src. */ void -rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, - guint32 packet_count, guint32 octet_count, GstClockTime time) +rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime, + guint32 rtptime, guint32 packet_count, guint32 octet_count) { RTPSenderReport *curr; gint curridx; @@ -556,6 +647,7 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, /** * rtp_source_process_rb: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @fractionlost: fraction lost since last SR/RR * @packetslost: the cumululative number of packets lost * @exthighestseq: the extended last sequence number received @@ -566,18 +658,20 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, * Update the report block in @src. */ void -rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, - guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr) +rtp_source_process_rb (RTPSource * src, GstClockTime time, guint8 fractionlost, + gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr, + guint32 dlsr) { RTPReceiverReport *curr; gint curridx; + guint32 ntp, A; g_return_if_fail (RTP_IS_SOURCE (src)); - GST_DEBUG ("got RB packet: SSRC %08x, FL %" G_GUINT32_FORMAT "" - ", PL %d, HS %" G_GUINT32_FORMAT ", JITTER %" G_GUINT32_FORMAT - ", LSR %08x, DLSR %08x", src->ssrc, fractionlost, packetslost, - exthighestseq, jitter, lsr, dlsr); + GST_DEBUG ("got RB packet: SSRC %08x, FL %2x, PL %d, HS %" G_GUINT32_FORMAT + ", jitter %" G_GUINT32_FORMAT ", LSR %04x:%04x, DLSR %04x:%04x", + src->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr >> 16, + lsr & 0xffff, dlsr >> 16, dlsr & 0xffff); curridx = src->stats.curr_rr ^ 1; curr = &src->stats.rr[curridx]; @@ -591,26 +685,198 @@ rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, curr->lsr = lsr; curr->dlsr = dlsr; + /* calculate round trip */ + ntp = (gst_rtcp_unix_to_ntp (time) >> 16) & 0xffffffff; + A = ntp - dlsr; + A -= lsr; + curr->round_trip = A; + + GST_DEBUG ("NTP %04x:%04x, round trip %04x:%04x", ntp >> 16, ntp & 0xffff, + A >> 16, A & 0xffff); + /* make current */ src->stats.curr_rr = curridx; } /** - * rtp_source_get_last_sr: + * rtp_source_get_new_sr: * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 * @ntptime: the NTP time * @rtptime: the RTP time * @packet_count: the packet count * @octet_count: the octect count + * + * Get new values to put into a new SR report from this source. + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime, + guint64 * ntptime, guint32 * rtptime, guint32 * packet_count, + guint32 * octet_count) +{ + guint32 t_rtp; + guint64 t_current_ntp; + GstClockTimeDiff diff; + + g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE); + + /* use the sync params to interpollate the date->time member to rtptime. We + * use the last sent timestamp and rtptime as reference points. We assume + * that the slope of the rtptime vs timestamp curve is 1, which is certainly + * sufficient for the frequency at which we report SR and the rate we send + * out RTP packets. */ + t_rtp = src->last_rtptime; + + GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %" + G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp); + + if (src->clock_rate != -1) { + /* get the diff with the SR time */ + diff = GST_CLOCK_DIFF (src->last_ntpnstime, ntpnstime); + + /* now translate the diff to RTP time, handle positive and negative cases. + * If there is no diff, we already set rtptime correctly above. */ + if (diff > 0) { + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } else { + diff = -diff; + GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT, + GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff)); + t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND); + } + } else { + GST_WARNING ("no clock-rate, cannot interpollate rtp time"); + } + + t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND); + + GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT, + (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff), + t_rtp); + + if (ntptime) + *ntptime = t_current_ntp; + if (rtptime) + *rtptime = t_rtp; + if (packet_count) + *packet_count = src->stats.packets_sent; + if (octet_count) + *octet_count = src->stats.octets_sent; + + return TRUE; +} + +/** + * rtp_source_get_new_rb: + * @src: an #RTPSource + * @time: the current time in nanoseconds since 1970 + * @fractionlost: fraction lost since last SR/RR + * @packetslost: the cumululative number of packets lost + * @exthighestseq: the extended last sequence number received + * @jitter: the interarrival jitter + * @lsr: the last SR packet from this source + * @dlsr: the delay since last SR packet + * + * Get the values of the last RB report set with rtp_source_process_rb(). + * + * Returns: %TRUE on success. + */ +gboolean +rtp_source_get_new_rb (RTPSource * src, GstClockTime time, + guint8 * fractionlost, gint32 * packetslost, guint32 * exthighestseq, + guint32 * jitter, guint32 * lsr, guint32 * dlsr) +{ + RTPSourceStats *stats; + guint64 extended_max, expected; + guint64 expected_interval, received_interval, ntptime; + gint64 lost, lost_interval; + guint32 fraction, LSR, DLSR; + GstClockTime sr_time; + + stats = &src->stats; + + extended_max = stats->cycles + stats->max_seq; + expected = extended_max - stats->base_seq + 1; + + GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT + ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT, + extended_max, expected, stats->packets_received, stats->base_seq); + + lost = expected - stats->packets_received; + lost = CLAMP (lost, -0x800000, 0x7fffff); + + expected_interval = expected - stats->prev_expected; + stats->prev_expected = expected; + received_interval = stats->packets_received - stats->prev_received; + stats->prev_received = stats->packets_received; + + lost_interval = expected_interval - received_interval; + + if (expected_interval == 0 || lost_interval <= 0) + fraction = 0; + else + fraction = (lost_interval << 8) / expected_interval; + + GST_DEBUG ("add RR for SSRC %08x", src->ssrc); + /* we scaled the jitter up for additional precision */ + GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT + ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost, + extended_max, stats->jitter >> 4); + + if (rtp_source_get_last_sr (src, &sr_time, &ntptime, NULL, NULL, NULL)) { + GstClockTime diff; + + /* LSR is middle 32 bits of the last ntptime */ + LSR = (ntptime >> 16) & 0xffffffff; + diff = time - sr_time; + GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff)); + /* DLSR, delay since last SR is expressed in 1/65536 second units */ + DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND); + } else { + /* No valid SR received, LSR/DLSR are set to 0 then */ + GST_DEBUG ("no valid SR received"); + LSR = 0; + DLSR = 0; + } + GST_DEBUG ("LSR %04x:%04x, DLSR %04x:%04x", LSR >> 16, LSR & 0xffff, + DLSR >> 16, DLSR & 0xffff); + + if (fractionlost) + *fractionlost = fraction; + if (packetslost) + *packetslost = lost; + if (exthighestseq) + *exthighestseq = extended_max; + if (jitter) + *jitter = stats->jitter >> 4; + if (lsr) + *lsr = LSR; + if (dlsr) + *dlsr = DLSR; + + return TRUE; +} + +/** + * rtp_source_get_last_sr: + * @src: an #RTPSource * @time: time of packet arrival + * @ntptime: the NTP time + * @rtptime: the RTP time + * @packet_count: the packet count + * @octet_count: the octect count * * Get the values of the last sender report as set with rtp_source_process_sr(). * * Returns: %TRUE if there was a valid SR report. */ gboolean -rtp_source_get_last_sr (RTPSource * src, guint64 * ntptime, guint32 * rtptime, - guint32 * packet_count, guint32 * octet_count, GstClockTime * time) +rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime, + guint32 * rtptime, guint32 * packet_count, guint32 * octet_count) { RTPSenderReport *curr; diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index 7920b6f4..be793461 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -134,13 +134,25 @@ struct _RTPSource { GstNetAddress rtcp_from; guint8 payload; + GstCaps *caps; gint clock_rate; + gint32 seqnum_base; + + gint64 clock_base; + + /* to calculate the clock skew */ + guint64 skew_base_ntpnstime; + guint64 skew_base_rtptime; + gint64 avg_skew; + guint64 ext_rtptime; + guint64 prev_ext_rtptime; GstClockTime bye_time; GstClockTime last_activity; GstClockTime last_rtp_activity; - GstClockTime last_timestamp; + GstClockTime last_rtptime; + GstClockTime last_ntpnstime; GQueue *packets; @@ -158,6 +170,7 @@ GType rtp_source_get_type (void); /* managing lifetime of sources */ RTPSource* rtp_source_new (guint32 ssrc); +void rtp_source_update_caps (RTPSource *src, GstCaps *caps); void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data); void rtp_source_set_as_csrc (RTPSource *src); @@ -168,18 +181,24 @@ void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *addres /* handling RTP */ GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); -GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer); +GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer, guint64 ntpnstime); /* RTCP messages */ void rtp_source_process_bye (RTPSource *src, const gchar *reason); -void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime, - guint32 packet_count, guint32 octet_count, GstClockTime time); -void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost, - guint32 exthighestseq, guint32 jitter, +void rtp_source_process_sr (RTPSource *src, GstClockTime time, guint64 ntptime, + guint32 rtptime, guint32 packet_count, guint32 octet_count); +void rtp_source_process_rb (RTPSource *src, GstClockTime time, guint8 fractionlost, + gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr); -gboolean rtp_source_get_last_sr (RTPSource *src, guint64 *ntptime, guint32 *rtptime, - guint32 *packet_count, guint32 *octet_count, GstClockTime *time); +gboolean rtp_source_get_new_sr (RTPSource *src, GstClockTime time, guint64 *ntptime, + guint32 *rtptime, guint32 *packet_count, guint32 *octet_count); +gboolean rtp_source_get_new_rb (RTPSource *src, GstClockTime time, guint8 *fractionlost, + gint32 *packetslost, guint32 *exthighestseq, guint32 *jitter, + guint32 *lsr, guint32 *dlsr); + +gboolean rtp_source_get_last_sr (RTPSource *src, GstClockTime *time, guint64 *ntptime, + guint32 *rtptime, guint32 *packet_count, guint32 *octet_count); gboolean rtp_source_get_last_rb (RTPSource *src, guint8 *fractionlost, gint32 *packetslost, guint32 *exthighestseq, guint32 *jitter, guint32 *lsr, guint32 *dlsr); diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h index 0ee1ed1e..e2e4e397 100644 --- a/gst/rtpmanager/rtpstats.h +++ b/gst/rtpmanager/rtpstats.h @@ -51,6 +51,7 @@ typedef struct { guint32 jitter; guint32 lsr; guint32 dlsr; + guint32 round_trip; } RTPReceiverReport; /** @@ -64,6 +65,7 @@ typedef struct { */ typedef struct { GstClockTime time; + guint64 ntpnstime; gboolean have_address; GstNetAddress address; guint bytes; -- cgit v1.2.1