summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog76
-rw-r--r--gst/rtpmanager/gstrtpbin-marshal.list1
-rw-r--r--gst/rtpmanager/gstrtpbin.c470
-rw-r--r--gst/rtpmanager/gstrtpbin.h3
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c121
-rw-r--r--gst/rtpmanager/gstrtpsession.c229
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.c210
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.h3
-rw-r--r--gst/rtpmanager/rtpsession.c295
-rw-r--r--gst/rtpmanager/rtpsession.h38
-rw-r--r--gst/rtpmanager/rtpsource.c334
-rw-r--r--gst/rtpmanager/rtpsource.h35
-rw-r--r--gst/rtpmanager/rtpstats.h2
13 files changed, 1382 insertions, 435 deletions
diff --git a/ChangeLog b/ChangeLog
index 2de3cb0f..af636638 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,79 @@
+2007-09-03 Wim Taymans <wim.taymans@gmail.com>
+
+ * gst/rtpmanager/gstrtpbin-marshal.list:
+ * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client),
+ (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream),
+ (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found),
+ (create_recv_rtp), (create_recv_rtcp), (create_send_rtp):
+ * gst/rtpmanager/gstrtpbin.h:
+ Updated example pipelines in docs.
+ Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync.
+ Set the default latency correctly.
+ Add some more points where we can get caps.
+
+ * gst/rtpmanager/gstrtpjitterbuffer.c:
+ (gst_rtp_jitter_buffer_class_init),
+ (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop),
+ (gst_rtp_jitter_buffer_query),
+ (gst_rtp_jitter_buffer_set_property),
+ (gst_rtp_jitter_buffer_get_property):
+ Add ts-offset property to control timestamping.
+
+ * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
+ (gst_rtp_session_init), (gst_rtp_session_set_property),
+ (gst_rtp_session_get_property), (get_current_ntp_ns_time),
+ (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state),
+ (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp),
+ (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate),
+ (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp),
+ (gst_rtp_session_event_send_rtp_sink),
+ (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
+ (create_recv_rtcp_sink), (create_send_rtp_sink),
+ (create_send_rtcp_src):
+ Various cleanups.
+ Feed rtpsession manager with NTP time based on pipeline clock when
+ handling RTP packets and RTCP timeouts.
+ Perform all RTCP with the system clock.
+ Set caps on RTCP outgoing buffers.
+
+ * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc),
+ (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init),
+ (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event),
+ (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain),
+ (gst_rtp_ssrc_demux_rtcp_chain):
+ * gst/rtpmanager/gstrtpssrcdemux.h:
+ Also demux RTCP messages.
+
+ * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks),
+ (update_arrival_stats), (rtp_session_process_rtp),
+ (rtp_session_process_rb), (rtp_session_process_sr),
+ (rtp_session_process_rr), (rtp_session_process_rtcp),
+ (rtp_session_send_rtp), (rtp_session_send_bye),
+ (session_start_rtcp), (session_report_blocks), (session_cleanup),
+ (rtp_session_on_timeout):
+ * gst/rtpmanager/rtpsession.h:
+ Remove the get_time callback, the GStreamer part will feed us with
+ enough timing information.
+ Split sync timing and RTCP timing information.
+ Factor out common RB handling for SR and RR.
+ Send out SR RTCP packets for lip-sync.
+ Move SR and RR packet info generation to the source.
+
+ * gst/rtpmanager/rtpsource.c: (rtp_source_init),
+ (rtp_source_update_caps), (get_clock_rate), (calculate_jitter),
+ (rtp_source_process_rtp), (rtp_source_send_rtp),
+ (rtp_source_process_sr), (rtp_source_process_rb),
+ (rtp_source_get_new_sr), (rtp_source_get_new_rb),
+ (rtp_source_get_last_sr):
+ * gst/rtpmanager/rtpsource.h:
+ * gst/rtpmanager/rtpstats.h:
+ Use caps on incomming buffers to get timing information when they are
+ there.
+ Calculate clock scew of the receiver compared to the sender and adjust
+ the rtp timestamps.
+ Calculate the round trip in sources.
+ Do SR and RR calculations in the source.
+
2007-09-03 Renato Filho <renato.filho@indt.org.br>
* configure.ac:
diff --git a/gst/rtpmanager/gstrtpbin-marshal.list b/gst/rtpmanager/gstrtpbin-marshal.list
index ca760d82..e5c5fc42 100644
--- a/gst/rtpmanager/gstrtpbin-marshal.list
+++ b/gst/rtpmanager/gstrtpbin-marshal.list
@@ -3,3 +3,4 @@ BOXED:UINT
BOXED:UINT,UINT
VOID:UINT,OBJECT
VOID:UINT,UINT
+VOID:OBJECT,OBJECT
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
index cb2a9f0b..a4ba67c3 100644
--- a/gst/rtpmanager/gstrtpbin.c
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -79,12 +79,12 @@
* <programlisting>
* gst-launch gstrtpbin name=rtpbin \
* v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
- * rtpbin.send_rtp_src_0 ! udpsink port=5000 \
- * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false \
- * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \
- * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \
- * rtpbin.send_rtp_src_1 ! udpsink port=5002 \
- * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false \
+ * rtpbin.send_rtp_src_0 ! udpsink port=5000 \
+ * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false \
+ * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \
+ * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \
+ * rtpbin.send_rtp_src_1 ! udpsink port=5002 \
+ * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false \
* udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
* </programlisting>
* Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
@@ -94,21 +94,22 @@
* on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
* RTCP packets for session 0 are received on port 5005 and RTCP for session 1
* is received on port 5007. Since RTCP packets from the sender should be sent
- * as soon as possible, sync=false is configured on udpsink.
+ * as soon as possible and do not participate in preroll, sync=false and
+ * async=false is configured on udpsink
* </para>
* <para>
* <programlisting>
- * gst-launch -v gstrtpbin name=rtpbin \
+ * gst-launch -v gstrtpbin name=rtpbin \
* udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
- * port=5000 ! rtpbin.recv_rtp_sink_0 \
- * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \
- * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \
- * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false \
- * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
- * port=5002 ! rtpbin.recv_rtp_sink_1 \
- * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \
- * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \
- * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false
+ * port=5000 ! rtpbin.recv_rtp_sink_0 \
+ * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \
+ * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \
+ * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false \
+ * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
+ * port=5002 ! rtpbin.recv_rtp_sink_1 \
+ * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \
+ * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \
+ * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
* </programlisting>
* Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
* decode and display the video.
@@ -122,7 +123,7 @@
* </para>
* </refsect2>
*
- * Last reviewed on 2007-08-28 (0.10.6)
+ * Last reviewed on 2007-08-30 (0.10.6)
*/
#ifdef HAVE_CONFIG_H
@@ -130,6 +131,9 @@
#endif
#include <string.h>
+#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
+
#include "gstrtpbin-marshal.h"
#include "gstrtpbin.h"
@@ -187,6 +191,14 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
GST_STATIC_CAPS ("application/x-rtp")
);
+/* padtemplate for the internal pad */
+static GstStaticPadTemplate rtpbin_sync_sink_template =
+GST_STATIC_PAD_TEMPLATE ("sink_%d",
+ GST_PAD_SINK,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
#define GST_RTP_BIN_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
@@ -242,16 +254,37 @@ struct _GstRtpBinStream
{
/* the SSRC of this stream */
guint32 ssrc;
+
/* parent bin */
GstRtpBin *bin;
+
/* the session this SSRC belongs to */
GstRtpBinSession *session;
+
/* the jitterbuffer of the SSRC */
GstElement *buffer;
+
/* the PT demuxer of the SSRC */
GstElement *demux;
gulong demux_newpad_sig;
gulong demux_ptreq_sig;
+
+ /* the internal pad we use to get RTCP sync messages */
+ GstPad *sync_pad;
+ gboolean have_sync;
+ guint64 last_unix;
+ guint64 last_extrtptime;
+
+ /* mapping to local RTP and NTP time */
+ guint64 local_rtp;
+ guint64 local_unix;
+ gint64 unix_delta;
+
+ /* for lip-sync */
+ guint64 clock_base;
+ gint clock_rate;
+ gint64 ts_offset;
+ gint64 prev_ts_offset;
};
#define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock)
@@ -289,12 +322,28 @@ struct _GstRtpBinSession
GstPad *recv_rtp_sink;
GstPad *recv_rtp_src;
GstPad *recv_rtcp_sink;
- GstPad *recv_rtcp_src;
+ GstPad *sync_src;
GstPad *send_rtp_sink;
GstPad *send_rtp_src;
GstPad *send_rtcp_src;
};
+/* Manages the RTP streams that come from one client and should therefore be
+ * synchronized.
+ */
+struct _GstRtpBinClient
+{
+ /* the common CNAME for the streams */
+ gchar *cname;
+ guint cname_len;
+
+ /* the streams */
+ guint nstreams;
+ GSList *streams;
+
+ gint64 min_delta;
+};
+
/* find a session with the given id. Must be called with RTP_BIN_LOCK */
static GstRtpBinSession *
find_session_by_id (GstRtpBin * rtpbin, gint id)
@@ -513,6 +562,271 @@ gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
GST_RTP_BIN_UNLOCK (bin);
}
+static GstRtpBinClient *
+gst_rtp_bin_get_client (GstRtpBin * bin, guint8 len, guint8 * data,
+ gboolean * created)
+{
+ GstRtpBinClient *result = NULL;
+ GSList *walk;
+
+ for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
+ GstRtpBinClient *client = (GstRtpBinClient *) walk->data;
+
+ if (len != client->cname_len)
+ continue;
+
+ if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
+ GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
+ client->cname);
+ result = client;
+ break;
+ }
+ }
+
+ /* nothing found, create one */
+ if (result == NULL) {
+ result = g_new0 (GstRtpBinClient, 1);
+ result->cname = g_strndup ((gchar *) data, len);
+ result->cname_len = len;
+ result->min_delta = G_MAXINT64;
+ bin->clients = g_slist_prepend (bin->clients, result);
+ GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
+ result->cname);
+ }
+ return result;
+}
+
+/* associate a stream to the given CNAME. This will make sure all streams for
+ * that CNAME are synchronized together. */
+static void
+gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
+ guint8 * data)
+{
+ GstRtpBinClient *client;
+ gboolean created;
+ GSList *walk;
+
+ /* first find or create the CNAME */
+ client = gst_rtp_bin_get_client (bin, len, data, &created);
+
+ /* find stream in the client */
+ for (walk = client->streams; walk; walk = g_slist_next (walk)) {
+ GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
+
+ if (ostream == stream)
+ break;
+ }
+ /* not found, add it to the list */
+ if (walk == NULL) {
+ GST_DEBUG_OBJECT (bin,
+ "new association of SSRC %08x with client %p with CNAME %s",
+ stream->ssrc, client, client->cname);
+ client->streams = g_slist_prepend (client->streams, stream);
+ client->nstreams++;
+ } else {
+ GST_DEBUG_OBJECT (bin,
+ "found association of SSRC %08x with client %p with CNAME %s",
+ stream->ssrc, client, client->cname);
+ }
+
+ /* we can only continue if we know the local clock-base and clock-rate */
+ if (stream->clock_base == -1)
+ goto no_clock_base;
+ if (stream->clock_rate <= 0)
+ goto no_clock_rate;
+
+ /* map last RTP time to local timeline using our clock-base */
+ stream->local_rtp = stream->last_extrtptime - stream->clock_base;
+
+ GST_DEBUG_OBJECT (bin,
+ "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
+ ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base,
+ stream->last_extrtptime, stream->local_rtp, stream->clock_rate);
+
+ /* calculate local NTP time in gstreamer timestamp */
+ stream->local_unix =
+ gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND,
+ stream->clock_rate);
+ /* calculate delta between server and receiver */
+ stream->unix_delta = stream->last_unix - stream->local_unix;
+
+ GST_DEBUG_OBJECT (bin,
+ "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
+ ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix,
+ stream->unix_delta);
+
+ /* recalc inter stream playout offset, but only if there are more than one
+ * stream. */
+ if (client->nstreams > 1) {
+ gint64 min;
+
+ /* calculate the min of all deltas */
+ min = G_MAXINT64;
+ for (walk = client->streams; walk; walk = g_slist_next (walk)) {
+ GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
+
+ if (ostream->unix_delta < min)
+ min = ostream->unix_delta;
+ }
+
+ GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
+ min);
+
+ /* calculate offsets for each stream */
+ for (walk = client->streams; walk; walk = g_slist_next (walk)) {
+ GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
+
+ ostream->ts_offset = ostream->unix_delta - min;
+
+ /* delta changed, see how much */
+ if (ostream->prev_ts_offset != ostream->ts_offset) {
+ gint64 diff;
+
+ if (ostream->prev_ts_offset > ostream->ts_offset)
+ diff = ostream->prev_ts_offset - ostream->ts_offset;
+ else
+ diff = ostream->ts_offset - ostream->prev_ts_offset;
+
+ /* only change diff when it changed more than 1 millisecond. This
+ * compensates for rounding errors in NTP to RTP timestamp
+ * conversions */
+ if (diff > GST_MSECOND)
+ g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL);
+
+ ostream->prev_ts_offset = ostream->ts_offset;
+ }
+ GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
+ ostream->ssrc, ostream->ts_offset);
+ }
+ }
+ return;
+
+no_clock_base:
+ {
+ GST_WARNING_OBJECT (bin, "we have no clock-base");
+ return;
+ }
+no_clock_rate:
+ {
+ GST_WARNING_OBJECT (bin, "we have no clock-rate");
+ return;
+ }
+}
+
+#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
+ for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
+ (b) = gst_rtcp_packet_move_to_next ((packet)))
+
+#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
+ for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
+ (b) = gst_rtcp_packet_sdes_next_item ((packet)))
+
+#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
+ for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
+ (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
+
+static GstFlowReturn
+gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstRtpBinStream *stream;
+ GstRtpBin *bin;
+ GstRTCPPacket packet;
+ guint32 ssrc;
+ guint64 ntptime;
+ guint32 rtptime;
+ gboolean have_sr, have_sdes;
+ gboolean more;
+
+ stream = gst_pad_get_element_private (pad);
+ bin = stream->bin;
+
+ GST_DEBUG_OBJECT (bin, "received sync packet");
+
+ if (!gst_rtcp_buffer_validate (buffer))
+ goto invalid_rtcp;
+
+ have_sr = FALSE;
+ have_sdes = FALSE;
+ GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) {
+ /* first packet must be SR or RR or else the validate would have failed */
+ switch (gst_rtcp_packet_get_type (&packet)) {
+ case GST_RTCP_TYPE_SR:
+ /* only parse first. There is only supposed to be one SR in the packet
+ * but we will deal with malformed packets gracefully */
+ if (have_sr)
+ break;
+ /* get NTP and RTP times */
+ gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
+ NULL, NULL);
+
+ GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
+ /* ignore SR that is not ours */
+ if (ssrc != stream->ssrc)
+ continue;
+
+ have_sr = TRUE;
+
+ /* store values in the stream */
+ stream->have_sync = TRUE;
+ stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
+ /* use extended timestamp */
+ gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime);
+ break;
+ case GST_RTCP_TYPE_SDES:
+ {
+ gboolean more_items, more_entries;
+
+ /* only deal with first SDES, there is only supposed to be one SDES in
+ * the RTCP packet but we deal with bad packets gracefully. Also bail
+ * out if we have not seen an SR item yet. */
+ if (have_sdes || !have_sr)
+ break;
+
+ GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
+ /* skip items that are not about the SSRC of the sender */
+ if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
+ continue;
+
+ /* find the CNAME entry */
+ GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
+ GstRTCPSDESType type;
+ guint8 len;
+ guint8 *data;
+
+ gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
+
+ if (type == GST_RTCP_SDES_CNAME) {
+ stream->clock_base = GST_BUFFER_OFFSET (buffer);
+ /* associate the stream to CNAME */
+ gst_rtp_bin_associate (bin, stream, len, data);
+ }
+ }
+ }
+ have_sdes = TRUE;
+ break;
+ }
+ default:
+ /* we can ignore these packets */
+ break;
+ }
+ }
+
+ gst_buffer_unref (buffer);
+
+ return ret;
+
+ /* ERRORS */
+invalid_rtcp:
+ {
+ /* this is fatal and should be filtered earlier */
+ GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL),
+ ("invalid RTCP packet received"));
+ gst_buffer_unref (buffer);
+ return GST_FLOW_ERROR;
+ }
+}
+
/* create a new stream with @ssrc in @session. Must be called with
* RTP_SESSION_LOCK. */
static GstRtpBinStream *
@@ -520,6 +834,8 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
{
GstElement *buffer, *demux;
GstRtpBinStream *stream;
+ GstPadTemplate *templ;
+ gchar *padname;
if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
goto no_jitterbuffer;
@@ -533,8 +849,22 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
stream->session = session;
stream->buffer = buffer;
stream->demux = demux;
+ stream->last_extrtptime = -1;
+ stream->have_sync = FALSE;
session->streams = g_slist_prepend (session->streams, stream);
+ /* make an internal sinkpad for RTCP sync packets. Take ownership of the
+ * pad. We will link this pad later. */
+ padname = g_strdup_printf ("sync_%d", ssrc);
+ templ = gst_static_pad_template_get (&rtpbin_sync_sink_template);
+ stream->sync_pad = gst_pad_new_from_template (templ, padname);
+ gst_object_unref (templ);
+ gst_object_ref (stream->sync_pad);
+ gst_object_sink (stream->sync_pad);
+ gst_pad_set_element_private (stream->sync_pad, stream);
+ gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain);
+ gst_pad_set_active (stream->sync_pad, TRUE);
+
/* provide clock_rate to the jitterbuffer when needed */
g_signal_connect (buffer, "request-pt-map",
(GCallback) pt_map_requested, session);
@@ -566,17 +896,6 @@ no_demux:
}
}
-/* Manages the RTP streams that come from one client and should therefore be
- * synchronized.
- */
-struct _GstRtpBinClient
-{
- /* the common CNAME for the streams */
- gchar *cname;
- /* the streams */
- GSList *streams;
-};
-
/* GObject vmethods */
static void gst_rtp_bin_finalize (GObject * object);
static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
@@ -762,6 +1081,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin);
rtpbin->priv->bin_lock = g_mutex_new ();
rtpbin->provided_clock = gst_system_clock_obtain ();
+ rtpbin->latency = DEFAULT_LATENCY_MS;
}
static void
@@ -908,13 +1228,45 @@ no_caps:
}
}
+/* emited when caps changed for the session */
+static void
+caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session)
+{
+ GstRtpBin *bin;
+ GstCaps *caps;
+ gint payload;
+ const GstStructure *s;
+
+ bin = session->bin;
+
+ g_object_get (pad, "caps", &caps, NULL);
+
+ if (caps == NULL)
+ return;
+
+ GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps);
+
+ s = gst_caps_get_structure (caps, 0);
+
+ /* get payload, finish when it's not there */
+ if (!gst_structure_get_int (s, "payload", &payload))
+ return;
+
+ GST_RTP_SESSION_LOCK (session);
+ GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload);
+ g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps);
+ GST_RTP_SESSION_UNLOCK (session);
+}
+
/* a new pad (SSRC) was created in @session */
static void
new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
GstRtpBinSession * session)
{
GstRtpBinStream *stream;
- GstPad *sinkpad;
+ GstPad *sinkpad, *srcpad;
+ gchar *padname;
+ GstCaps *caps;
GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc);
@@ -925,12 +1277,38 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
if (!stream)
goto no_stream;
+ /* get the caps of the pad, we need the clock-rate and base_time if any. */
+ if ((caps = gst_pad_get_caps (pad))) {
+ const GstStructure *s;
+ guint val;
+
+ GST_DEBUG_OBJECT (session->bin, "pad has caps %" GST_PTR_FORMAT, caps);
+
+ s = gst_caps_get_structure (caps, 0);
+
+ if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate))
+ stream->clock_rate = -1;
+
+ if (gst_structure_get_uint (s, "clock-base", &val))
+ stream->clock_base = val;
+ else
+ stream->clock_base = -1;
+ }
+
/* get pad and link */
GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
gst_pad_link (pad, sinkpad);
gst_object_unref (sinkpad);
+ /* get the RTCP sync pad */
+ GST_DEBUG_OBJECT (session->bin, "linking sync pad");
+ padname = g_strdup_printf ("rtcp_src_%d", ssrc);
+ srcpad = gst_element_get_pad (element, padname);
+ g_free (padname);
+ gst_pad_link (srcpad, stream->sync_pad);
+ gst_object_unref (srcpad);
+
/* connect to the new-pad signal of the payload demuxer, this will expose the
* new pad by ghosting it. */
stream->demux_newpad_sig = g_signal_connect (stream->demux,
@@ -992,6 +1370,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->recv_rtp_sink == NULL)
goto pad_failed;
+ g_signal_connect (session->recv_rtp_sink, "notify::caps",
+ (GCallback) caps_changed, session);
+
GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
/* get srcpad, link to SSRCDemux */
session->recv_rtp_src =
@@ -999,8 +1380,9 @@ create_recv_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->recv_rtp_src == NULL)
goto pad_failed;
- GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad");
+ GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad");
sinkdpad = gst_element_get_static_pad (session->demux, "sink");
+ GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad");
lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
gst_object_unref (sinkdpad);
if (lres != GST_PAD_LINK_OK)
@@ -1057,11 +1439,8 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
GstPad *result;
guint sessid;
GstRtpBinSession *session;
-
-#if 0
GstPad *sinkdpad;
GstPadLinkReturn lres;
-#endif
/* first get the session number */
if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
@@ -1083,29 +1462,25 @@ create_recv_rtcp (GstRtpBin * rtpbin, GstPadTemplate * templ,
if (session->recv_rtcp_sink != NULL)
goto existed;
- GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
-
/* get recv_rtp pad and store */
+ GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
session->recv_rtcp_sink =
gst_element_get_request_pad (session->session, "recv_rtcp_sink");
if (session->recv_rtcp_sink == NULL)
goto pad_failed;
-#if 0
/* get srcpad, link to SSRCDemux */
GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
- session->recv_rtcp_src =
- gst_element_get_static_pad (session->session, "sync_src");
- if (session->recv_rtcp_src == NULL)
+ session->sync_src = gst_element_get_static_pad (session->session, "sync_src");
+ if (session->sync_src == NULL)
goto pad_failed;
- GST_DEBUG_OBJECT (rtpbin, "linking sync to demux");
- sinkdpad = gst_element_get_static_pad (session->demux, "sink");
- lres = gst_pad_link (session->recv_rtcp_src, sinkdpad);
+ GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad");
+ sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink");
+ lres = gst_pad_link (session->sync_src, sinkdpad);
gst_object_unref (sinkdpad);
if (lres != GST_PAD_LINK_OK)
goto link_failed;
-#endif
result =
gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
@@ -1136,13 +1511,11 @@ pad_failed:
g_warning ("gstrtpbin: failed to get session pad");
return NULL;
}
-#if 0
link_failed:
{
g_warning ("gstrtpbin: failed to link pads");
return NULL;
}
-#endif
}
/* Create a pad for sending RTP for the session in @name. Must be called with
@@ -1180,6 +1553,9 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->send_rtp_sink == NULL)
goto pad_failed;
+ g_signal_connect (session->send_rtp_sink, "notify::caps",
+ (GCallback) caps_changed, session);
+
result =
gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
gst_pad_set_active (result, TRUE);
diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h
index 4dd755f1..874167cc 100644
--- a/gst/rtpmanager/gstrtpbin.h
+++ b/gst/rtpmanager/gstrtpbin.h
@@ -48,6 +48,9 @@ struct _GstRtpBin {
/* clock we provide */
GstClock *provided_clock;
+ /* a list of clients, these are streams with the same CNAME */
+ GSList *clients;
+
/*< private >*/
GstRtpBinPrivate *priv;
};
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index bd1c07ea..a23fbb87 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -99,12 +99,14 @@ enum
#define DEFAULT_LATENCY_MS 200
#define DEFAULT_DROP_ON_LATENCY FALSE
+#define DEFAULT_TS_OFFSET 0
enum
{
PROP_0,
PROP_LATENCY,
- PROP_DROP_ON_LATENCY
+ PROP_DROP_ON_LATENCY,
+ PROP_TS_OFFSET
};
#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock))
@@ -137,6 +139,7 @@ struct _GstRtpJitterBufferPrivate
/* properties */
guint latency_ms;
gboolean drop_on_latency;
+ gint64 ts_offset;
/* the last seqnum we pushed out */
guint32 last_popped_seqnum;
@@ -150,6 +153,7 @@ struct _GstRtpJitterBufferPrivate
gint32 clock_rate;
gint64 clock_base;
guint64 exttimestamp;
+ gint64 prev_ts_offset;
/* when we are shutting down */
GstFlowReturn srcresult;
@@ -278,6 +282,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
"Tells the jitterbuffer to never exceed the given latency in size",
DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
/**
+ * GstRtpJitterBuffer::ts-offset:
+ *
+ * Adjust RTP timestamps in the jitterbuffer with offset.
+ */
+ g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
+ g_param_spec_int64 ("ts-offset",
+ "Timestamp Offset",
+ "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
+ G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
+ /**
* GstRtpJitterBuffer::request-pt-map:
* @buffer: the object which received the signal
* @pt: the pt
@@ -421,7 +435,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
{
GstRtpJitterBufferPrivate *priv;
GstStructure *caps_struct;
- const GValue *value;
+ guint val;
priv = jitterbuffer->priv;
@@ -443,22 +457,22 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
/* gah, clock-base is uint. If we don't have a base, we will use the first
* buffer timestamp as the base time. This will screw up sync but it's better
* than nothing. */
- value = gst_structure_get_value (caps_struct, "clock-base");
- if (value && G_VALUE_HOLDS_UINT (value)) {
- priv->clock_base = g_value_get_uint (value);
- GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
- priv->clock_base);
- } else
+ if (gst_structure_get_uint (caps_struct, "clock-base", &val))
+ priv->clock_base = val;
+ else
priv->clock_base = -1;
+ GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
+ priv->clock_base);
+
/* first expected seqnum */
- value = gst_structure_get_value (caps_struct, "seqnum-base");
- if (value && G_VALUE_HOLDS_UINT (value)) {
- priv->next_seqnum = g_value_get_uint (value);
- GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
- } else
+ if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
+ priv->next_seqnum = val;
+ else
priv->next_seqnum = -1;
+ GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
+
return TRUE;
/* ERRORS */
@@ -929,6 +943,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
GstClockTime timestamp;
gint64 running_time;
guint64 exttimestamp;
+ gint ts_offset_rtp;
priv = jitterbuffer->priv;
@@ -996,8 +1011,11 @@ again:
exttimestamp, priv->clock_base);
/* if no clock_base was given, take first ts as base */
- if (priv->clock_base == -1)
+ if (priv->clock_base == -1) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp);
priv->clock_base = exttimestamp;
+ }
/* take rtp timestamp offset into account, this can wrap around */
exttimestamp -= priv->clock_base;
@@ -1089,6 +1107,34 @@ push_buffer:
outbuf = gst_buffer_make_metadata_writable (outbuf);
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
}
+
+ /* apply the timestamp offset */
+ if (priv->ts_offset > 0)
+ ts_offset_rtp =
+ gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate,
+ GST_SECOND);
+ else if (priv->ts_offset < 0)
+ ts_offset_rtp =
+ -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate,
+ GST_SECOND);
+ else
+ ts_offset_rtp = 0;
+
+ if (ts_offset_rtp != 0) {
+ guint32 timestamp;
+
+ /* if the offset changed, mark with discont */
+ if (priv->ts_offset != priv->prev_ts_offset) {
+ GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp);
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ priv->prev_ts_offset = priv->ts_offset;
+ }
+
+ timestamp = gst_rtp_buffer_get_timestamp (outbuf);
+ timestamp += ts_offset_rtp;
+ gst_rtp_buffer_set_timestamp (outbuf, timestamp);
+ }
+
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
@@ -1158,6 +1204,7 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
GstClockTime min_latency, max_latency;
gboolean us_live;
GstPad *peer;
+ GstClockTime our_latency;
if ((peer = gst_pad_get_peer (priv->sinkpad))) {
if ((res = gst_pad_query (peer, query))) {
@@ -1172,11 +1219,16 @@ gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
priv->peer_latency = min_latency;
JBUF_UNLOCK (priv);
- min_latency += priv->latency_ms * GST_MSECOND;
+ our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (our_latency));
+
+ min_latency += our_latency;
/* max_latency can be -1, meaning there is no upper limit for the
* latency. */
if (max_latency != -1)
- max_latency += priv->latency_ms * GST_MSECOND;
+ max_latency += our_latency * GST_MSECOND;
GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
@@ -1199,7 +1251,11 @@ static void
gst_rtp_jitter_buffer_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
- GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ GstRtpJitterBuffer *jitterbuffer;
+ GstRtpJitterBufferPrivate *priv;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ priv = jitterbuffer->priv;
switch (prop_id) {
case PROP_LATENCY:
@@ -1208,23 +1264,29 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
/* FIXME, not threadsafe */
new_latency = g_value_get_uint (value);
- old_latency = jitterbuffer->priv->latency_ms;
+ old_latency = priv->latency_ms;
- jitterbuffer->priv->latency_ms = new_latency;
+ priv->latency_ms = new_latency;
/* post message if latency changed, this will inform the parent pipeline
* that a latency reconfiguration is possible/needed. */
if (new_latency != old_latency) {
+ GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (new_latency * GST_MSECOND));
+
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
}
break;
}
case PROP_DROP_ON_LATENCY:
- {
- jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value);
+ priv->drop_on_latency = g_value_get_boolean (value);
+ break;
+ case PROP_TS_OFFSET:
+ JBUF_LOCK (priv);
+ priv->ts_offset = g_value_get_int64 (value);
+ JBUF_UNLOCK (priv);
break;
- }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -1235,14 +1297,23 @@ static void
gst_rtp_jitter_buffer_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
- GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ GstRtpJitterBuffer *jitterbuffer;
+ GstRtpJitterBufferPrivate *priv;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ priv = jitterbuffer->priv;
switch (prop_id) {
case PROP_LATENCY:
- g_value_set_uint (value, jitterbuffer->priv->latency_ms);
+ g_value_set_uint (value, priv->latency_ms);
break;
case PROP_DROP_ON_LATENCY:
- g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency);
+ g_value_set_boolean (value, priv->drop_on_latency);
+ break;
+ case PROP_TS_OFFSET:
+ JBUF_LOCK (priv);
+ g_value_set_int64 (value, priv->ts_offset);
+ JBUF_UNLOCK (priv);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index 985a3713..e716682c 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -132,6 +132,8 @@
#include "config.h"
#endif
+#include <gst/rtp/gstrtpbuffer.h>
+
#include "gstrtpbin-marshal.h"
#include "gstrtpsession.h"
#include "rtpsession.h"
@@ -214,7 +216,8 @@ enum
enum
{
- PROP_0
+ PROP_0,
+ PROP_NTP_NS_BASE
};
#define GST_RTP_SESSION_GET_PRIVATE(obj) \
@@ -234,8 +237,10 @@ struct _GstRtpSessionPrivate
GThread *thread;
/* caps mapping */
- guint8 pt;
- gint clock_rate;
+ GHashTable *ptmap;
+
+ /* NTP base time */
+ guint64 ntpnsbase;
};
/* callbacks to handle actions from the session manager */
@@ -245,18 +250,18 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
gpointer user_data);
-static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
- gpointer user_data);
static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
static RTPSessionCallbacks callbacks = {
gst_rtp_session_process_rtp,
gst_rtp_session_send_rtp,
gst_rtp_session_send_rtcp,
+ gst_rtp_session_sync_rtcp,
gst_rtp_session_clock_rate,
- gst_rtp_session_get_time,
gst_rtp_session_reconsider
};
@@ -363,6 +368,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
+
/**
* GstRtpSession::request-pt-map:
* @sess: the object which received the signal
@@ -490,6 +496,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
(GCallback) on_bye_timeout, rtpsession);
g_signal_connect (rtpsession->priv->session, "on-timeout",
(GCallback) on_timeout, rtpsession);
+ rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
}
static void
@@ -513,6 +520,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
rtpsession = GST_RTP_SESSION (object);
switch (prop_id) {
+ case PROP_NTP_NS_BASE:
+ rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -528,26 +538,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
rtpsession = GST_RTP_SESSION (object);
switch (prop_id) {
+ case PROP_NTP_NS_BASE:
+ g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
+static guint64
+get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
+{
+ guint64 ntpnstime;
+
+ if (clock) {
+ /* get current NTP time */
+ ntpnstime = gst_clock_get_time (clock);
+ /* convert to running time */
+ ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession));
+ /* add NTP base offset */
+ ntpnstime += rtpsession->priv->ntpnsbase;
+ } else
+ ntpnstime = -1;
+
+ return ntpnstime;
+}
+
static void
rtcp_thread (GstRtpSession * rtpsession)
{
- GstClock *clock;
+ GstClock *sysclock, *clock;
GstClockID id;
GstClockTime current_time;
GstClockTime next_timeout;
+ guint64 ntpnstime;
- /* RTCP timeouts we use the system clock */
- clock = gst_system_clock_obtain ();
- if (clock == NULL)
- goto no_clock;
+ /* for RTCP timeouts we use the system clock */
+ sysclock = gst_system_clock_obtain ();
+ if (sysclock == NULL)
+ goto no_sysclock;
+
+ /* to get the current NTP time, we use the pipeline clock */
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
- current_time = gst_clock_get_time (clock);
+ current_time = gst_clock_get_time (sysclock);
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
@@ -568,7 +603,7 @@ rtcp_thread (GstRtpSession * rtpsession)
break;
id = rtpsession->priv->id =
- gst_clock_new_single_shot_id (clock, next_timeout);
+ gst_clock_new_single_shot_id (sysclock, next_timeout);
GST_RTP_SESSION_UNLOCK (rtpsession);
res = gst_clock_id_wait (id, NULL);
@@ -581,7 +616,10 @@ rtcp_thread (GstRtpSession * rtpsession)
break;
/* update current time */
- current_time = gst_clock_get_time (clock);
+ current_time = gst_clock_get_time (sysclock);
+
+ /* get current NTP time */
+ ntpnstime = get_current_ntp_ns_time (rtpsession, clock);
/* we get unlocked because we need to perform reconsideration, don't perform
* the timeout but get a new reporting estimate. */
@@ -590,18 +628,18 @@ rtcp_thread (GstRtpSession * rtpsession)
/* perform actions, we ignore result. Release lock because it might push. */
GST_RTP_SESSION_UNLOCK (rtpsession);
- rtp_session_on_timeout (rtpsession->priv->session, current_time);
+ rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime);
GST_RTP_SESSION_LOCK (rtpsession);
}
GST_RTP_SESSION_UNLOCK (rtpsession);
- gst_object_unref (clock);
+ gst_object_unref (sysclock);
GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
return;
/* ERRORS */
-no_clock:
+no_sysclock:
{
GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL),
("Could not get system clock"));
@@ -662,7 +700,6 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- priv->clock_rate = -1;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
@@ -677,17 +714,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- {
- GstClockTime base_time;
-
- base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
-
- rtp_session_set_base_time (priv->session, base_time);
-
if (!start_rtcp_thread (rtpsession))
goto failed_thread;
break;
- }
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -774,6 +803,15 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
priv = rtpsession->priv;
if (rtpsession->send_rtcp_src) {
+ GstCaps *caps;
+
+ /* set rtcp caps on output pad */
+ if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) {
+ caps = gst_caps_new_simple ("application/x-rtcp", NULL);
+ gst_pad_set_caps (rtpsession->send_rtcp_src, caps);
+ gst_caps_unref (caps);
+ }
+ gst_buffer_set_caps (buffer, caps);
GST_DEBUG_OBJECT (rtpsession, "sending RTCP");
result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
} else {
@@ -784,30 +822,59 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
return result;
}
-static gboolean
-gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps)
+/* called when the session manager has an SR RTCP packet ready for handling
+ * inter stream synchronisation */
+static GstFlowReturn
+gst_rtp_session_sync_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data)
{
+ GstFlowReturn result;
+ GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
- const GstStructure *caps_struct;
+ rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
- GST_DEBUG_OBJECT (rtpsession, "parsing caps");
+ if (rtpsession->sync_src) {
+ GstCaps *caps;
- caps_struct = gst_caps_get_structure (caps, 0);
- if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
- goto no_clock_rate;
+ /* set rtcp caps on output pad */
+ if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) {
+ caps = gst_caps_new_simple ("application/x-rtcp", NULL);
+ gst_pad_set_caps (rtpsession->sync_src, caps);
+ gst_caps_unref (caps);
+ }
+ gst_buffer_set_caps (buffer, caps);
+ GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP");
+ result = gst_pad_push (rtpsession->sync_src, buffer);
+ } else {
+ GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+static void
+gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
+{
+ GstRtpSessionPrivate *priv;
+ const GstStructure *s;
+ gint payload;
- GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate);
+ priv = rtpsession->priv;
- return TRUE;
+ GST_DEBUG_OBJECT (rtpsession, "parsing caps");
- /* ERRORS */
-no_clock_rate:
- {
- GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
- return FALSE;
- }
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (s, "payload", &payload))
+ return;
+
+ caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload));
+ if (caps)
+ return;
+
+ g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps);
}
/* called when the session manager needs the clock rate */
@@ -821,13 +888,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
GValue ret = { 0 };
GValue args[2] = { {0}, {0} };
GstCaps *caps;
+ const GstStructure *s;
rtpsession = GST_RTP_SESSION_CAST (user_data);
priv = rtpsession->priv;
- /* if we have it, return it */
- if (priv->clock_rate != -1)
- return priv->clock_rate;
+ GST_RTP_SESSION_LOCK (rtpsession);
+ caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload));
+ if (caps)
+ goto done;
g_value_init (&args[0], GST_TYPE_ELEMENT);
g_value_set_object (&args[0], rtpsession);
@@ -844,10 +913,16 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
if (!caps)
goto no_caps;
- if (!gst_rtp_session_parse_caps (rtpsession, caps))
- goto parse_failed;
+ gst_rtp_session_cache_caps (rtpsession, caps);
+
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (s, "clock-rate", &result))
+ goto no_clock_rate;
+
+ GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
- result = priv->clock_rate;
+done:
+ GST_RTP_SESSION_UNLOCK (rtpsession);
return result;
@@ -855,35 +930,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
no_caps:
{
GST_DEBUG_OBJECT (rtpsession, "could not get caps");
- return -1;
+ goto done;
}
-parse_failed:
+no_clock_rate:
{
- GST_DEBUG_OBJECT (rtpsession, "failed to parse caps");
- return -1;
+ GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
+ goto done;
}
}
-/* called when the session manager needs the time of clock */
-static GstClockTime
-gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
-{
- GstClockTime result;
- GstRtpSession *rtpsession;
- GstClock *clock;
-
- rtpsession = GST_RTP_SESSION_CAST (user_data);
-
- clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
- if (clock) {
- result = gst_clock_get_time (clock);
- gst_object_unref (clock);
- } else
- result = GST_CLOCK_TIME_NONE;
-
- return result;
-}
-
/* called when the session manager asks us to reconsider the timeout */
static void
gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
@@ -925,18 +980,19 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
static gboolean
gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps)
{
- gboolean res;
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
- res = gst_rtp_session_parse_caps (rtpsession, caps);
+ GST_RTP_SESSION_LOCK (rtpsession);
+ gst_rtp_session_cache_caps (rtpsession, caps);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
gst_object_unref (rtpsession);
- return res;
+ return TRUE;
}
/* receive a packet from a sender, send it to the RTP session manager and
@@ -948,13 +1004,17 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
+ guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- ret = rtp_session_process_rtp (priv->session, buffer);
+ ntpnstime =
+ get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession));
+
+ ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
gst_object_unref (rtpsession);
@@ -1051,8 +1111,6 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
gst_segment_set_newsegment_full (segment, update, rate,
arate, format, start, stop, time);
- rtp_session_set_timestamp_sync (priv->session, start);
-
/* push event forward */
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
break;
@@ -1075,13 +1133,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
+ GstClockTime timestamp;
+ guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- ret = rtp_session_send_rtp (priv->session, buffer);
+ /* get NTP time when this packet was captured, this depends on the timestamp. */
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+ /* convert to running time using the segment start value. */
+ ntpnstime = timestamp - rtpsession->send_rtp_seg.start;
+ ntpnstime += priv->ntpnsbase;
+ } else
+ ntpnstime = -1;
+
+ ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
gst_object_unref (rtpsession);
@@ -1113,6 +1182,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
rtpsession->recv_rtp_src =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
"recv_rtp_src");
+ gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
@@ -1142,6 +1212,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession)
rtpsession->sync_src =
gst_pad_new_from_static_template (&rtpsession_sync_src_template,
"sync_src");
+ gst_pad_use_fixed_caps (rtpsession->sync_src);
gst_pad_set_active (rtpsession->sync_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
@@ -1172,6 +1243,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
rtpsession->send_rtp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
"send_rtp_src");
+ gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
@@ -1190,6 +1262,7 @@ create_send_rtcp_src (GstRtpSession * rtpsession)
rtpsession->send_rtcp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
"send_rtcp_src");
+ gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtcp_src);
diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c
index 539b03c7..5457bc35 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.c
+++ b/gst/rtpmanager/gstrtpssrcdemux.c
@@ -52,6 +52,7 @@
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/rtp/gstrtcpbuffer.h>
#include "gstrtpbin-marshal.h"
#include "gstrtpssrcdemux.h"
@@ -67,6 +68,13 @@ GST_STATIC_PAD_TEMPLATE ("sink",
GST_STATIC_CAPS ("application/x-rtp")
);
+static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template =
+GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
static GstStaticPadTemplate rtp_ssrc_demux_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
@@ -74,6 +82,13 @@ GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_STATIC_CAPS ("application/x-rtp")
);
+static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+ GST_PAD_SRC,
+ GST_PAD_SOMETIMES,
+ GST_STATIC_CAPS ("application/x-rtcp")
+ );
+
static GstElementDetails gst_rtp_ssrc_demux_details = {
"RTP SSRC Demux",
"Demux/Network/RTP",
@@ -103,6 +118,11 @@ static GstStateChangeReturn gst_rtp_ssrc_demux_change_state (GstElement *
static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf);
static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
+static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
+ GstBuffer * buf);
+static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
+ GstEvent * event);
+
/* srcpad stuff */
static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
@@ -113,59 +133,78 @@ static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
*/
struct _GstRtpSsrcDemuxPad
{
- GstPad *pad;
guint32 ssrc;
+ GstPad *rtp_pad;
GstCaps *caps;
+ GstPad *rtcp_pad;
};
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found
*/
-static GstPad *
-find_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+static GstRtpSsrcDemuxPad *
+find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
{
GSList *walk;
- for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) {
+ for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
if (pad->ssrc == ssrc)
- return pad->pad;
+ return pad;
}
return NULL;
}
-static GstPad *
-create_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+static GstRtpSsrcDemuxPad *
+create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
{
- GstPad *result;
+ GstPad *rtp_pad, *rtcp_pad;
GstElementClass *klass;
GstPadTemplate *templ;
gchar *padname;
GstRtpSsrcDemuxPad *demuxpad;
+ GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
+
klass = GST_ELEMENT_GET_CLASS (demux);
templ = gst_element_class_get_pad_template (klass, "src_%d");
padname = g_strdup_printf ("src_%d", ssrc);
- result = gst_pad_new_from_template (templ, padname);
+ rtp_pad = gst_pad_new_from_template (templ, padname);
+ g_free (padname);
+
+ templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d");
+ padname = g_strdup_printf ("rtcp_src_%d", ssrc);
+ rtcp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
/* wrap in structure and add to list */
demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
demuxpad->ssrc = ssrc;
- demuxpad->pad = result;
- demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad);
+ demuxpad->rtp_pad = rtp_pad;
+ demuxpad->rtcp_pad = rtcp_pad;
+
+ demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+ GST_OBJECT_UNLOCK (demux);
/* copy caps from input */
- gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink));
+ gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink));
+ gst_pad_use_fixed_caps (rtp_pad);
+ gst_pad_set_caps (rtcp_pad, GST_PAD_CAPS (demux->rtcp_sink));
+ gst_pad_use_fixed_caps (rtcp_pad);
- gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event);
- gst_pad_set_active (result, TRUE);
- gst_element_add_pad (GST_ELEMENT_CAST (demux), result);
+ gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+ gst_pad_set_active (rtp_pad, TRUE);
+ gst_pad_set_active (rtcp_pad, TRUE);
+
+ gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
+ gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
g_signal_emit (G_OBJECT (demux),
- gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result);
+ gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
+
+ GST_OBJECT_LOCK (demux);
- return result;
+ return demuxpad;
}
static void
@@ -176,7 +215,11 @@ gst_rtp_ssrc_demux_base_init (gpointer g_class)
gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
gst_element_class_add_pad_template (gstelement_klass,
+ gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
+ gst_element_class_add_pad_template (gstelement_klass,
gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
+ gst_element_class_add_pad_template (gstelement_klass,
+ gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
gst_element_class_set_details (gstelement_klass, &gst_rtp_ssrc_demux_details);
}
@@ -226,6 +269,14 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
+
+ demux->rtcp_sink =
+ gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
+ "rtcp_sink"), "rtcp_sink");
+ gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
+ gst_pad_set_event_function (demux->rtcp_sink,
+ gst_rtp_ssrc_demux_rtcp_sink_event);
+ gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
}
static void
@@ -249,21 +300,63 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
default:
- res = gst_pad_event_default (pad, event);
+ {
+ GSList *walk;
+
+ res = TRUE;
+ GST_OBJECT_LOCK (demux);
+ for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+ GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+
+ gst_event_ref (event);
+ res &= gst_pad_push_event (pad->rtp_pad, event);
+ }
+ GST_OBJECT_UNLOCK (demux);
+ gst_event_unref (event);
break;
+ }
}
gst_object_unref (demux);
return res;
}
+static gboolean
+gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
+{
+ GstRtpSsrcDemux *demux;
+ gboolean res = FALSE;
+
+ demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_NEWSEGMENT:
+ default:
+ {
+ GSList *walk;
+
+ res = TRUE;
+ GST_OBJECT_LOCK (demux);
+ for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+ GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+
+ res &= gst_pad_push_event (pad->rtcp_pad, event);
+ }
+ GST_OBJECT_UNLOCK (demux);
+ break;
+ }
+ }
+ gst_object_unref (demux);
+ return res;
+}
+
static GstFlowReturn
gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
{
GstFlowReturn ret;
GstRtpSsrcDemux *demux;
guint32 ssrc;
- GstPad *srcpad;
+ GstRtpSsrcDemuxPad *dpad;
demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
@@ -274,16 +367,16 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
- srcpad = find_rtp_pad_for_ssrc (demux, ssrc);
- if (srcpad == NULL) {
- GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
- srcpad = create_rtp_pad_for_ssrc (demux, ssrc);
- if (!srcpad)
+ GST_OBJECT_LOCK (demux);
+ dpad = find_demux_pad_for_ssrc (demux, ssrc);
+ if (dpad == NULL) {
+ if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
goto create_failed;
}
+ GST_OBJECT_UNLOCK (demux);
/* push to srcpad */
- ret = gst_pad_push (srcpad, buf);
+ ret = gst_pad_push (dpad->rtp_pad, buf);
return ret;
@@ -298,9 +391,74 @@ invalid_payload:
}
create_failed:
{
- /* this is not fatal yet */
GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Could not create new pad"));
+ GST_OBJECT_UNLOCK (demux);
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+}
+
+static GstFlowReturn
+gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
+{
+ GstFlowReturn ret;
+ GstRtpSsrcDemux *demux;
+ guint32 ssrc;
+ GstRtpSsrcDemuxPad *dpad;
+ GstRTCPPacket packet;
+
+ demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+
+ if (!gst_rtcp_buffer_validate (buf))
+ goto invalid_rtcp;
+
+ if (!gst_rtcp_buffer_get_first_packet (buf, &packet))
+ goto invalid_rtcp;
+
+ /* first packet must be SR or RR or else the validate would have failed */
+ switch (gst_rtcp_packet_get_type (&packet)) {
+ case GST_RTCP_TYPE_SR:
+ gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
+ NULL);
+ break;
+ case GST_RTCP_TYPE_RR:
+ ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
+ break;
+ default:
+ goto invalid_rtcp;
+ }
+
+ GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
+
+ GST_OBJECT_LOCK (demux);
+ dpad = find_demux_pad_for_ssrc (demux, ssrc);
+ if (dpad == NULL) {
+ GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
+ if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
+ goto create_failed;
+ }
+ GST_OBJECT_UNLOCK (demux);
+
+ /* push to srcpad */
+ ret = gst_pad_push (dpad->rtcp_pad, buf);
+
+ return ret;
+
+ /* ERRORS */
+invalid_rtcp:
+ {
+ /* this is fatal and should be filtered earlier */
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
+ ("Dropping invalid RTCP packet"));
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+create_failed:
+ {
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
+ ("Could not create new pad"));
+ GST_OBJECT_UNLOCK (demux);
gst_buffer_unref (buf);
return GST_FLOW_ERROR;
}
diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h
index 5d93330d..bea2769d 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.h
+++ b/gst/rtpmanager/gstrtpssrcdemux.h
@@ -37,7 +37,8 @@ struct _GstRtpSsrcDemux
GstElement parent;
GstPad *rtp_sink;
- GSList *rtp_srcpads;
+ GstPad *rtcp_sink;
+ GSList *srcpads;
};
struct _GstRtpSsrcDemuxClass
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c
index 275e7c74..e7f72b40 100644
--- a/gst/rtpmanager/rtpsession.c
+++ b/gst/rtpmanager/rtpsession.c
@@ -23,6 +23,8 @@
#include <gst/rtp/gstrtcpbuffer.h>
#include <gst/netbuffer/gstnetbuffer.h>
+#include "gstrtpbin-marshal.h"
+
#include "rtpsession.h"
GST_DEBUG_CATEGORY_STATIC (rtp_session_debug);
@@ -332,8 +334,8 @@ rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks,
sess->callbacks.process_rtp = callbacks->process_rtp;
sess->callbacks.send_rtp = callbacks->send_rtp;
sess->callbacks.send_rtcp = callbacks->send_rtcp;
+ sess->callbacks.sync_rtcp = callbacks->sync_rtcp;
sess->callbacks.clock_rate = callbacks->clock_rate;
- sess->callbacks.get_time = callbacks->get_time;
sess->callbacks.reconsider = callbacks->reconsider;
sess->user_data = user_data;
}
@@ -911,13 +913,14 @@ rtp_session_create_source (RTPSession * sess)
*/
static void
update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
- gboolean rtp, GstBuffer * buffer)
+ gboolean rtp, GstBuffer * buffer, guint64 ntpnstime)
{
- /* get time or arrival */
- if (sess->callbacks.get_time)
- arrival->time = sess->callbacks.get_time (sess, sess->user_data);
- else
- arrival->time = GST_CLOCK_TIME_NONE;
+ GTimeVal current;
+
+ /* get time of arrival */
+ g_get_current_time (&current);
+ arrival->time = GST_TIMEVAL_TO_TIME (current);
+ arrival->ntpnstime = ntpnstime;
/* get packet size including header overhead */
arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len;
@@ -941,6 +944,7 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
* rtp_session_process_rtp:
* @sess: and #RTPSession
* @buffer: an RTP buffer
+ * @ntpnstime: the NTP arrival time in nanoseconds
*
* Process an RTP buffer in the session manager. This function takes ownership
* of @buffer.
@@ -948,7 +952,8 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival,
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
+rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer,
+ guint64 ntpnstime)
{
GstFlowReturn result;
guint32 ssrc;
@@ -965,7 +970,7 @@ rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer)
RTP_SESSION_LOCK (sess);
/* update arrival stats */
- update_arrival_stats (sess, &arrival, TRUE, buffer);
+ update_arrival_stats (sess, &arrival, TRUE, buffer, ntpnstime);
/* ignore more RTP packets when we left the session */
if (sess->source->received_bye)
@@ -1047,6 +1052,33 @@ ignore:
}
}
+static void
+rtp_session_process_rb (RTPSession * sess, RTPSource * source,
+ GstRTCPPacket * packet, RTPArrivalStats * arrival)
+{
+ guint count, i;
+
+ count = gst_rtcp_packet_get_rb_count (packet);
+ for (i = 0; i < count; i++) {
+ guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
+ guint8 fractionlost;
+ gint32 packetslost;
+
+ gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
+ &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
+
+ GST_DEBUG ("RB %d: SSRC %08x, jitter %" G_GUINT32_FORMAT, i, ssrc, jitter);
+
+ if (ssrc == sess->source->ssrc) {
+ /* only deal with report blocks for our session, we update the stats of
+ * the sender of the RTCP message. We could also compare our stats against
+ * the other sender to see if we are better or worse. */
+ rtp_source_process_rb (source, arrival->time, fractionlost, packetslost,
+ exthighestseq, jitter, lsr, dlsr);
+ }
+ }
+}
+
/* A Sender report contains statistics about how the sender is doing. This
* includes timing informataion such as the relation between RTP and NTP
* timestamps and the number of packets/bytes it sent to us.
@@ -1062,7 +1094,6 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
{
guint32 senderssrc, rtptime, packet_count, octet_count;
guint64 ntptime;
- guint count, i;
RTPSource *source;
gboolean created, prevsender;
@@ -1074,11 +1105,13 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
source = obtain_source (sess, senderssrc, &created, arrival, FALSE);
+ GST_BUFFER_OFFSET (packet->buffer) = source->clock_base;
+
prevsender = RTP_SOURCE_IS_SENDER (source);
/* first update the source */
- rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count,
- arrival->time);
+ rtp_source_process_sr (source, arrival->time, ntptime, rtptime, packet_count,
+ octet_count);
if (prevsender != RTP_SOURCE_IS_SENDER (source)) {
sess->stats.sender_sources++;
@@ -1089,25 +1122,7 @@ rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet,
if (created)
on_new_ssrc (sess, source);
- count = gst_rtcp_packet_get_rb_count (packet);
- for (i = 0; i < count; i++) {
- guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
- guint8 fractionlost;
- gint32 packetslost;
-
- gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
- &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
-
- GST_DEBUG ("RB %d: %08x, %u", i, ssrc, jitter);
-
- if (ssrc == sess->source->ssrc) {
- /* only deal with report blocks for our session, we update the stats of
- * the sender of the RTCP message. We could also compare our stats against
- * the other sender to see if we are better or worse. */
- rtp_source_process_rb (source, fractionlost, packetslost,
- exthighestseq, jitter, lsr, dlsr);
- }
- }
+ rtp_session_process_rb (sess, source, packet, arrival);
}
/* A receiver report contains statistics about how a receiver is doing. It
@@ -1121,7 +1136,6 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
RTPArrivalStats * arrival)
{
guint32 senderssrc;
- guint count, i;
RTPSource *source;
gboolean created;
@@ -1134,20 +1148,7 @@ rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet,
if (created)
on_new_ssrc (sess, source);
- count = gst_rtcp_packet_get_rb_count (packet);
- for (i = 0; i < count; i++) {
- guint32 ssrc, exthighestseq, jitter, lsr, dlsr;
- guint8 fractionlost;
- gint32 packetslost;
-
- gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost,
- &packetslost, &exthighestseq, &jitter, &lsr, &dlsr);
-
- if (ssrc == sess->source->ssrc) {
- rtp_source_process_rb (source, fractionlost, packetslost,
- exthighestseq, jitter, lsr, dlsr);
- }
- }
+ rtp_session_process_rb (sess, source, packet, arrival);
}
/* FIXME, we're just printing this for now... */
@@ -1280,7 +1281,8 @@ rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet,
* @sess: and #RTPSession
* @buffer: an RTCP buffer
*
- * Process an RTCP buffer in the session manager.
+ * Process an RTCP buffer in the session manager. This function takes ownership
+ * of @buffer.
*
* Returns: a #GstFlowReturn.
*/
@@ -1288,8 +1290,9 @@ GstFlowReturn
rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
{
GstRTCPPacket packet;
- gboolean more, is_bye = FALSE;
+ gboolean more, is_bye = FALSE, is_sr = FALSE;
RTPArrivalStats arrival;
+ GstFlowReturn result = GST_FLOW_OK;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -1301,7 +1304,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
RTP_SESSION_LOCK (sess);
/* update arrival stats */
- update_arrival_stats (sess, &arrival, FALSE, buffer);
+ update_arrival_stats (sess, &arrival, FALSE, buffer, -1);
if (sess->sent_bye)
goto ignore;
@@ -1322,6 +1325,7 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
switch (type) {
case GST_RTCP_TYPE_SR:
rtp_session_process_sr (sess, &packet, &arrival);
+ is_sr = TRUE;
break;
case GST_RTCP_TYPE_RR:
rtp_session_process_rr (sess, &packet, &arrival);
@@ -1357,14 +1361,20 @@ rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer)
}
RTP_SESSION_UNLOCK (sess);
- gst_buffer_unref (buffer);
+ /* notify caller of sr packets in the callback */
+ if (is_sr && sess->callbacks.sync_rtcp)
+ result = sess->callbacks.sync_rtcp (sess, sess->source, buffer,
+ sess->user_data);
+ else
+ gst_buffer_unref (buffer);
- return GST_FLOW_OK;
+ return result;
/* ERRORS */
invalid_packet:
{
GST_DEBUG ("invalid RTCP packet received");
+ gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
ignore:
@@ -1380,6 +1390,7 @@ ignore:
* rtp_session_send_rtp:
* @sess: an #RTPSession
* @buffer: an RTP buffer
+ * @ntptime: the NTP time of when this buffer was captured.
*
* Send the RTP buffer in the session manager. This function takes ownership of
* @buffer.
@@ -1387,11 +1398,12 @@ ignore:
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
+rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime)
{
GstFlowReturn result;
RTPSource *source;
gboolean prevsender;
+ GTimeVal current;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -1405,14 +1417,13 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer)
source = sess->source;
/* update last activity */
- if (sess->callbacks.get_time)
- source->last_rtp_activity =
- sess->callbacks.get_time (sess, sess->user_data);
+ g_get_current_time (&current);
+ source->last_rtp_activity = GST_TIMEVAL_TO_TIME (current);
prevsender = RTP_SOURCE_IS_SENDER (source);
/* we use our own source to send */
- result = rtp_source_send_rtp (sess->source, buffer);
+ result = rtp_source_send_rtp (sess->source, buffer, ntptime);
if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
sess->stats.sender_sources++;
@@ -1429,36 +1440,6 @@ invalid_packet:
}
}
-/**
- * rtp_session_set_send_sync
- * @sess: an #RTPSession
- * @base_time: the clock base time
- * @start_time: the timestamp start time
- *
- * Establish a relation between the times returned by the get_time callback and
- * the buffer timestamps. This information is used to convert the NTP times to
- * RTP timestamps.
- */
-void
-rtp_session_set_base_time (RTPSession * sess, GstClockTime base_time)
-{
- g_return_if_fail (RTP_IS_SESSION (sess));
-
- RTP_SESSION_LOCK (sess);
- sess->base_time = base_time;
- RTP_SESSION_UNLOCK (sess);
-}
-
-void
-rtp_session_set_timestamp_sync (RTPSession * sess, GstClockTime start_timestamp)
-{
- g_return_if_fail (RTP_IS_SESSION (sess));
-
- RTP_SESSION_LOCK (sess);
- sess->start_timestamp = start_timestamp;
- RTP_SESSION_UNLOCK (sess);
-}
-
static GstClockTime
calculate_rtcp_interval (RTPSession * sess, gboolean deterministic,
gboolean first)
@@ -1498,6 +1479,7 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason)
GstFlowReturn result = GST_FLOW_OK;
RTPSource *source;
GstClockTime current, interval;
+ GTimeVal curtv;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
@@ -1518,10 +1500,8 @@ rtp_session_send_bye (RTPSession * sess, const gchar * reason)
sess->sent_bye = FALSE;
/* get current time */
- if (sess->callbacks.get_time)
- current = sess->callbacks.get_time (sess, sess->user_data);
- else
- current = 0;
+ g_get_current_time (&curtv);
+ current = GST_TIMEVAL_TO_TIME (curtv);
/* reschedule transmission */
sess->last_rtcp_send_time = current;
@@ -1543,12 +1523,12 @@ done:
/**
* rtp_session_next_timeout:
* @sess: an #RTPSession
- * @time: the current time
+ * @time: the current system time
*
* Get the next time we should perform session maintenance tasks.
*
* Returns: a time when rtp_session_on_timeout() should be called with the
- * current time.
+ * current system time.
*/
GstClockTime
rtp_session_next_timeout (RTPSession * sess, GstClockTime time)
@@ -1588,6 +1568,7 @@ typedef struct
RTPSession *sess;
GstBuffer *rtcp;
GstClockTime time;
+ guint64 ntpnstime;
GstClockTime interval;
GstRTCPPacket packet;
gboolean is_bye;
@@ -1605,60 +1586,22 @@ session_start_rtcp (RTPSession * sess, ReportData * data)
if (RTP_SOURCE_IS_SENDER (own)) {
guint64 ntptime;
guint32 rtptime;
- GstClockTime running_time;
- GstClockTimeDiff diff;
+ guint32 packet_count, octet_count;
/* we are a sender, create SR */
GST_DEBUG ("create SR for SSRC %08x", own->ssrc);
gst_rtcp_buffer_add_packet (data->rtcp, GST_RTCP_TYPE_SR, packet);
- /* use the sync params to interpollate the date->time member to rtptime. We
- * use the last sent timestamp and rtptime as reference points. We assume
- * that the slope of the rtptime vs timestamp curve is 1, which is certainly
- * sufficient for the frequency at which we report SR and the rate we send
- * out RTP packets. */
- rtptime = own->last_rtptime;
- GST_DEBUG ("last_timestamp %" GST_TIME_FORMAT ", last_rtptime %"
- G_GUINT32_FORMAT, GST_TIME_ARGS (own->last_timestamp), rtptime);
-
- if (own->clock_rate != -1) {
- /* Start by calculating the running_time of the timestamp, this is a result
- * in nanoseconds. */
- running_time =
- (own->last_timestamp - sess->start_timestamp) + sess->base_time;
-
- /* get the diff with the SR time */
- diff = GST_CLOCK_DIFF (running_time, data->time);
-
- /* now translate the diff to RTP time, handle positive and negative cases.
- * If there is no diff, we already set rtptime correctly above. */
- if (diff > 0) {
- GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT,
- GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
- rtptime += gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND);
- } else {
- diff = -diff;
- GST_DEBUG ("running_time %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT,
- GST_TIME_ARGS (running_time), GST_TIME_ARGS (diff));
- rtptime -= gst_util_uint64_scale (diff, own->clock_rate, GST_SECOND);
- }
- } else {
- GST_WARNING ("no clock-rate, cannot interpollate rtp time");
- }
-
- /* convert clock time to NTP time. upper 32 bits should contain the seconds
- * and the lower 32 bits, the fractions of a second. */
- ntptime = gst_util_uint64_scale (data->time, (1LL << 32), GST_SECOND);
- /* conversion from unix timestamp (seconds since 1970) to NTP (seconds
- * since 1900). FIXME nothing says that the time is in unix timestamps. */
- ntptime += (2208988800LL << 32);
+ /* get latest stats */
+ rtp_source_get_new_sr (own, data->ntpnstime, &ntptime, &rtptime,
+ &packet_count, &octet_count);
+ /* store stats */
+ rtp_source_process_sr (own, data->ntpnstime, ntptime, rtptime, packet_count,
+ octet_count);
- GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
- (guint32) (ntptime >> 32), (guint32) (ntptime & 0xffffffff), rtptime);
-
- /* fill in sender report info, FIXME RTP timestamps missing */
+ /* fill in sender report info */
gst_rtcp_packet_sr_set_sender_info (packet, own->ssrc,
- ntptime, rtptime, own->stats.packets_sent, own->stats.octets_sent);
+ ntptime, rtptime, packet_count, octet_count);
} else {
/* we are only receiver, create RR */
GST_DEBUG ("create RR for SSRC %08x", own->ssrc);
@@ -1681,63 +1624,18 @@ session_report_blocks (const gchar * key, RTPSource * source, ReportData * data)
if (gst_rtcp_packet_get_rb_count (packet) < GST_RTCP_MAX_RB_COUNT) {
/* only report about other sender sources */
if (source != sess->source && RTP_SOURCE_IS_SENDER (source)) {
- RTPSourceStats *stats;
- guint64 extended_max, expected;
- guint64 expected_interval, received_interval, ntptime;
- gint64 lost, lost_interval;
- guint32 fraction, LSR, DLSR;
- GstClockTime time;
-
- stats = &source->stats;
-
- extended_max = stats->cycles + stats->max_seq;
- expected = extended_max - stats->base_seq + 1;
-
- GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT
- ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT,
- extended_max, expected, stats->packets_received, stats->base_seq);
+ guint8 fractionlost;
+ gint32 packetslost;
+ guint32 exthighestseq, jitter;
+ guint32 lsr, dlsr;
- lost = expected - stats->packets_received;
- lost = CLAMP (lost, -0x800000, 0x7fffff);
-
- expected_interval = expected - stats->prev_expected;
- stats->prev_expected = expected;
- received_interval = stats->packets_received - stats->prev_received;
- stats->prev_received = stats->packets_received;
-
- lost_interval = expected_interval - received_interval;
-
- if (expected_interval == 0 || lost_interval <= 0)
- fraction = 0;
- else
- fraction = (lost_interval << 8) / expected_interval;
-
- GST_DEBUG ("add RR for SSRC %08x", source->ssrc);
- /* we scaled the jitter up for additional precision */
- GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT
- ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost,
- extended_max, stats->jitter >> 4);
-
- if (rtp_source_get_last_sr (source, &ntptime, NULL, NULL, NULL, &time)) {
- GstClockTime diff;
-
- /* LSR is middle bits of the last ntptime */
- LSR = (ntptime >> 16) & 0xffffffff;
- diff = data->time - time;
- GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff));
- /* DLSR, delay since last SR is expressed in 1/65536 second units */
- DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND);
- } else {
- /* No valid SR received, LSR/DLSR are set to 0 then */
- GST_DEBUG ("no valid SR received");
- LSR = 0;
- DLSR = 0;
- }
- GST_DEBUG ("LSR %08x, DLSR %08x", LSR, DLSR);
+ /* get new stats */
+ rtp_source_get_new_rb (source, data->time, &fractionlost, &packetslost,
+ &exthighestseq, &jitter, &lsr, &dlsr);
/* packet is not yet filled, add report block for this source. */
- gst_rtcp_packet_add_rb (packet, source->ssrc, fraction, lost,
- extended_max, stats->jitter >> 4, LSR, DLSR);
+ gst_rtcp_packet_add_rb (packet, source->ssrc, fractionlost, packetslost,
+ exthighestseq, jitter, lsr, dlsr);
}
}
}
@@ -1784,7 +1682,6 @@ session_cleanup (const gchar * key, RTPSource * source, ReportData * data)
if (is_sender) {
if (data->time > source->last_rtp_activity) {
interval = MAX (data->interval * 2, 5 * GST_SECOND);
-
if (data->time - source->last_rtp_activity > interval) {
GST_DEBUG ("sender source %08x timed out and became receiver, last %"
GST_TIME_FORMAT, source->ssrc,
@@ -1897,6 +1794,8 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
/**
* rtp_session_on_timeout:
* @sess: an #RTPSession
+ * @time: the current system time
+ * @ntpnstime: the current NTP time in nanoseconds
*
* Perform maintenance actions after the timeout obtained with
* rtp_session_next_timeout() expired.
@@ -1910,21 +1809,23 @@ is_rtcp_time (RTPSession * sess, GstClockTime time, ReportData * data)
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_session_on_timeout (RTPSession * sess, GstClockTime time)
+rtp_session_on_timeout (RTPSession * sess, GstClockTime time, guint64 ntpnstime)
{
GstFlowReturn result = GST_FLOW_OK;
ReportData data;
g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR);
+ GST_DEBUG ("reporting at %" GST_TIME_FORMAT ", NTP time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (time), GST_TIME_ARGS (ntpnstime));
+
data.sess = sess;
data.rtcp = NULL;
data.time = time;
+ data.ntpnstime = ntpnstime;
data.is_bye = FALSE;
data.has_sdes = FALSE;
- GST_DEBUG ("reporting at %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
-
RTP_SESSION_LOCK (sess);
/* get a new interval, we need this for various cleanups etc */
data.interval = calculate_rtcp_interval (sess, TRUE, sess->first_rtcp);
diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h
index 9380b55e..d7dbb784 100644
--- a/gst/rtpmanager/rtpsession.h
+++ b/gst/rtpmanager/rtpsession.h
@@ -82,28 +82,30 @@ typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, Gs
typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
- * RTPSessionClockRate:
+ * RTPSessionSyncRTCP:
* @sess: an #RTPSession
- * @payload: the payload
+ * @src: the #RTPSource
+ * @buffer: the RTCP buffer ready for sending
* @user_data: user data specified when registering
*
- * This callback will be called when @sess needs the clock-rate of @payload.
+ * This callback will be called when @sess has and SR @buffer ready for doing
+ * synchronisation between streams.
*
- * Returns: the clock-rate of @pt.
+ * Returns: a #GstFlowReturn.
*/
-typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data);
+typedef GstFlowReturn (*RTPSessionSyncRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data);
/**
- * RTPSessionGetTime:
+ * RTPSessionClockRate:
* @sess: an #RTPSession
+ * @payload: the payload
* @user_data: user data specified when registering
*
- * This callback will be called when @sess needs the current time in
- * nanoseconds.
+ * This callback will be called when @sess needs the clock-rate of @payload.
*
- * Returns: a #GstClockTime with the current time in nanoseconds.
+ * Returns: the clock-rate of @pt.
*/
-typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data);
+typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data);
/**
* RTPSessionReconsider:
@@ -121,7 +123,7 @@ typedef void (*RTPSessionReconsider) (RTPSession *sess, gpointer user_data);
* @RTPSessionProcessRTP: callback to process RTP packets
* @RTPSessionSendRTP: callback for sending RTP packets
* @RTPSessionSendRTCP: callback for sending RTCP packets
- * @RTPSessionGetTime: callback for returning the current time
+ * @RTPSessionSyncRTCP: callback for handling SR packets
* @RTPSessionReconsider: callback for reconsidering the timeout
*
* These callbacks can be installed on the session manager to get notification
@@ -132,8 +134,8 @@ typedef struct {
RTPSessionProcessRTP process_rtp;
RTPSessionSendRTP send_rtp;
RTPSessionSendRTCP send_rtcp;
+ RTPSessionSyncRTCP sync_rtcp;
RTPSessionClockRate clock_rate;
- RTPSessionGetTime get_time;
RTPSessionReconsider reconsider;
} RTPSessionCallbacks;
@@ -190,8 +192,7 @@ struct _RTPSession {
RTPSessionStats stats;
- /* for mapping RTP time to NTP time */
- GstClockTime start_timestamp;
+ /* for mapping clock time to NTP time */
GstClockTime base_time;
};
@@ -250,18 +251,17 @@ RTPSource* rtp_session_get_source_by_cname (RTPSession *sess, const gcha
RTPSource* rtp_session_create_source (RTPSession *sess);
/* processing packets from receivers */
-GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer);
+GstFlowReturn rtp_session_process_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntpnstime);
GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer *buffer);
/* processing packets for sending */
-GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer);
-void rtp_session_set_base_time (RTPSession *sess, GstClockTime base_time);
-void rtp_session_set_timestamp_sync (RTPSession *sess, GstClockTime start_timestamp);
+GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer, guint64 ntptime);
+
/* stopping the session */
GstFlowReturn rtp_session_send_bye (RTPSession *sess, const gchar *reason);
/* get interval for next RTCP interval */
GstClockTime rtp_session_next_timeout (RTPSession *sess, GstClockTime time);
-GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time);
+GstFlowReturn rtp_session_on_timeout (RTPSession *sess, GstClockTime time, guint64 ntpnstime);
#endif /* __RTP_SESSION_H__ */
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c
index 24bb8466..63543358 100644
--- a/gst/rtpmanager/rtpsource.c
+++ b/gst/rtpmanager/rtpsource.c
@@ -68,7 +68,12 @@ rtp_source_init (RTPSource * src)
src->payload = 0;
src->clock_rate = -1;
+ src->clock_base = -1;
+ src->skew_base_ntpnstime = -1;
+ src->ext_rtptime = -1;
+ src->prev_ext_rtptime = -1;
src->packets = g_queue_new ();
+ src->seqnum_base = -1;
src->stats.cycles = -1;
src->stats.jitter = 0;
@@ -112,6 +117,44 @@ rtp_source_new (guint32 ssrc)
}
/**
+ * rtp_source_update_caps:
+ * @src: an #RTPSource
+ * @caps: a #GstCaps
+ *
+ * Parse @caps and store all relevant information in @source.
+ */
+void
+rtp_source_update_caps (RTPSource * src, GstCaps * caps)
+{
+ GstStructure *s;
+ guint val;
+ gint ival;
+
+ /* nothing changed, return */
+ if (src->caps == caps)
+ return;
+
+ s = gst_caps_get_structure (caps, 0);
+
+ if (gst_structure_get_int (s, "payload", &ival))
+ src->payload = ival;
+ GST_DEBUG ("got payload %d", src->payload);
+
+ gst_structure_get_int (s, "clock-rate", &src->clock_rate);
+ GST_DEBUG ("got clock-rate %d", src->clock_rate);
+
+ if (gst_structure_get_uint (s, "clock-base", &val))
+ src->clock_base = val;
+ GST_DEBUG ("got clock-base %" G_GINT64_FORMAT, src->clock_base);
+
+ if (gst_structure_get_uint (s, "seqnum-base", &val))
+ src->seqnum_base = val;
+ GST_DEBUG ("got seqnum-base %" G_GINT32_FORMAT, src->seqnum_base);
+
+ gst_caps_replace (&src->caps, caps);
+}
+
+/**
* rtp_source_set_callbacks:
* @src: an #RTPSource
* @cb: callback functions
@@ -207,7 +250,7 @@ push_packet (RTPSource * src, GstBuffer * buffer)
static gint
get_clock_rate (RTPSource * src, guint8 payload)
{
- if (payload != src->payload) {
+ if (src->clock_rate == -1) {
gint clock_rate = -1;
if (src->callbacks.clock_rate)
@@ -216,8 +259,9 @@ get_clock_rate (RTPSource * src, guint8 payload)
GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate);
src->clock_rate = clock_rate;
- src->payload = payload;
}
+ src->payload = payload;
+
return src->clock_rate;
}
@@ -225,14 +269,17 @@ static void
calculate_jitter (RTPSource * src, GstBuffer * buffer,
RTPArrivalStats * arrival)
{
- GstClockTime current;
+ guint64 ntpnstime;
guint32 rtparrival, transit, rtptime;
+ guint64 ext_rtptime;
gint32 diff;
gint clock_rate;
guint8 pt;
+ guint64 rtpdiff, ntpdiff;
+ gint64 skew;
/* get arrival time */
- if ((current = arrival->time) == GST_CLOCK_TIME_NONE)
+ if ((ntpnstime = arrival->ntpnstime) == GST_CLOCK_TIME_NONE)
goto no_time;
pt = gst_rtp_buffer_get_payload_type (buffer);
@@ -243,8 +290,56 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
rtptime = gst_rtp_buffer_get_timestamp (buffer);
- /* convert arrival time to RTP timestamp units */
- rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND);
+ /* convert to extended timestamp right away */
+ ext_rtptime = gst_rtp_buffer_ext_timestamp (&src->ext_rtptime, rtptime);
+
+ /* no clock-base, take first rtptime as base */
+ if (src->clock_base == -1) {
+ GST_DEBUG ("using clock-base of %" G_GUINT32_FORMAT, rtptime);
+ src->clock_base = rtptime;
+ }
+
+ if (src->skew_base_ntpnstime == -1) {
+ /* lock on first observed NTP and RTP time, they should increment in-sync or
+ * we have a clock skew. */
+ GST_DEBUG ("using base_ntpnstime of %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (ntpnstime));
+ src->skew_base_ntpnstime = ntpnstime;
+ src->skew_base_rtptime = rtptime;
+ src->prev_ext_rtptime = ext_rtptime;
+ src->avg_skew = 0;
+ } else if (src->prev_ext_rtptime < ext_rtptime) {
+ /* get elapsed rtptime but only when the previous rtptime was stricly smaller
+ * than the new one. */
+ rtpdiff = ext_rtptime - src->skew_base_rtptime;
+ /* get NTP diff and convert to RTP time, this is always positive */
+ ntpdiff = ntpnstime - src->skew_base_ntpnstime;
+ ntpdiff = gst_util_uint64_scale_int (ntpdiff, clock_rate, GST_SECOND);
+
+ /* see how the NTP and RTP relate any deviation from 0 means that they drift
+ * out of sync and we must compensate. */
+ skew = ntpdiff - rtpdiff;
+ /* average out the skew to get a smooth value. */
+ src->avg_skew = (31 * src->avg_skew + skew) / 32;
+
+ GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
+ src->avg_skew);
+ if (src->avg_skew != 0) {
+ guint32 timestamp;
+
+ /* patch the buffer RTP timestamp with the skew */
+ GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew);
+ timestamp = gst_rtp_buffer_get_timestamp (buffer);
+ timestamp += src->avg_skew;
+ gst_rtp_buffer_set_timestamp (buffer, timestamp);
+ }
+ /* store previous extended timestamp */
+ src->prev_ext_rtptime = ext_rtptime;
+ }
+
+ /* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
+ * care about the absolute value, just the difference. */
+ rtparrival = gst_util_uint64_scale_int (ntpnstime, clock_rate, GST_SECOND);
/* transit time is difference with RTP timestamp */
transit = rtparrival - rtptime;
@@ -324,6 +419,8 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
seqnr = gst_rtp_buffer_get_seq (buffer);
+ rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer));
+
if (stats->cycles == -1) {
GST_DEBUG ("received first buffer");
/* first time we heard of this source */
@@ -389,6 +486,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
}
} else {
/* duplicate or reordered packet, will be filtered by jitterbuffer. */
+ GST_WARNING ("duplicate or reordered packet");
}
src->stats.octets_received += arrival->payload_len;
@@ -401,7 +499,7 @@ rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer,
GST_DEBUG ("seq %d, PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT,
seqnr, src->stats.packets_received, src->stats.octets_received);
- /* calculate jitter */
+ /* calculate jitter and perform skew correction */
calculate_jitter (src, buffer, arrival);
/* we're ready to push the RTP packet now */
@@ -444,25 +542,27 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason)
* rtp_source_send_rtp:
* @src: an #RTPSource
* @buffer: an RTP buffer
+ * @ntpnstime: the NTP time when this buffer was captured in nanoseconds
*
* Send an RTP @buffer originating from @src. This will make @src a sender.
* This function takes ownership of @buffer and modifies the SSRC in the RTP
- * packet to that of @src.
+ * packet to that of @src when needed.
*
* Returns: a #GstFlowReturn.
*/
GstFlowReturn
-rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
+rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
{
GstFlowReturn result = GST_FLOW_OK;
guint len;
- GstClockTime timestamp;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
len = gst_rtp_buffer_get_payload_len (buffer);
+ rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer));
+
/* we are a sender now */
src->is_sender = TRUE;
@@ -471,18 +571,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
src->stats.octets_sent += len;
/* we keep track of the last received RTP timestamp and the corresponding
- * GStreamer timestamp so that we can convert NTP time to RTP time when
- * sending SR reports */
+ * NTP timestamp so that we can use this info when constructing SR reports */
src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer);
-
- /* the timestamp can be undefined, in that case we use any previously
- * received timestamp */
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
- if (timestamp != -1)
- src->last_timestamp = timestamp;
-
- if (src->clock_rate == -1)
- get_clock_rate (src, gst_rtp_buffer_get_payload_type (buffer));
+ src->last_ntpnstime = ntpnstime;
/* push packet */
if (src->callbacks.push_rtp) {
@@ -496,7 +587,7 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
* get the correct SSRC. */
buffer = gst_buffer_make_writable (buffer);
- GST_DEBUG ("updating SSRC from %u to %u", ssrc, src->ssrc);
+ GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc);
gst_rtp_buffer_set_ssrc (buffer, src->ssrc);
}
GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT,
@@ -513,17 +604,17 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer)
/**
* rtp_source_process_sr:
* @src: an #RTPSource
+ * @time: time of packet arrival
* @ntptime: the NTP time
* @rtptime: the RTP time
* @packet_count: the packet count
* @octet_count: the octect count
- * @time: time of packet arrival
*
* Update the sender report in @src.
*/
void
-rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
- guint32 packet_count, guint32 octet_count, GstClockTime time)
+rtp_source_process_sr (RTPSource * src, GstClockTime time, guint64 ntptime,
+ guint32 rtptime, guint32 packet_count, guint32 octet_count)
{
RTPSenderReport *curr;
gint curridx;
@@ -556,6 +647,7 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
/**
* rtp_source_process_rb:
* @src: an #RTPSource
+ * @time: the current time in nanoseconds since 1970
* @fractionlost: fraction lost since last SR/RR
* @packetslost: the cumululative number of packets lost
* @exthighestseq: the extended last sequence number received
@@ -566,18 +658,20 @@ rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime,
* Update the report block in @src.
*/
void
-rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost,
- guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr)
+rtp_source_process_rb (RTPSource * src, GstClockTime time, guint8 fractionlost,
+ gint32 packetslost, guint32 exthighestseq, guint32 jitter, guint32 lsr,
+ guint32 dlsr)
{
RTPReceiverReport *curr;
gint curridx;
+ guint32 ntp, A;
g_return_if_fail (RTP_IS_SOURCE (src));
- GST_DEBUG ("got RB packet: SSRC %08x, FL %" G_GUINT32_FORMAT ""
- ", PL %d, HS %" G_GUINT32_FORMAT ", JITTER %" G_GUINT32_FORMAT
- ", LSR %08x, DLSR %08x", src->ssrc, fractionlost, packetslost,
- exthighestseq, jitter, lsr, dlsr);
+ GST_DEBUG ("got RB packet: SSRC %08x, FL %2x, PL %d, HS %" G_GUINT32_FORMAT
+ ", jitter %" G_GUINT32_FORMAT ", LSR %04x:%04x, DLSR %04x:%04x",
+ src->ssrc, fractionlost, packetslost, exthighestseq, jitter, lsr >> 16,
+ lsr & 0xffff, dlsr >> 16, dlsr & 0xffff);
curridx = src->stats.curr_rr ^ 1;
curr = &src->stats.rr[curridx];
@@ -591,26 +685,198 @@ rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost,
curr->lsr = lsr;
curr->dlsr = dlsr;
+ /* calculate round trip */
+ ntp = (gst_rtcp_unix_to_ntp (time) >> 16) & 0xffffffff;
+ A = ntp - dlsr;
+ A -= lsr;
+ curr->round_trip = A;
+
+ GST_DEBUG ("NTP %04x:%04x, round trip %04x:%04x", ntp >> 16, ntp & 0xffff,
+ A >> 16, A & 0xffff);
+
/* make current */
src->stats.curr_rr = curridx;
}
/**
- * rtp_source_get_last_sr:
+ * rtp_source_get_new_sr:
* @src: an #RTPSource
+ * @time: the current time in nanoseconds since 1970
* @ntptime: the NTP time
* @rtptime: the RTP time
* @packet_count: the packet count
* @octet_count: the octect count
+ *
+ * Get new values to put into a new SR report from this source.
+ *
+ * Returns: %TRUE on success.
+ */
+gboolean
+rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
+ guint64 * ntptime, guint32 * rtptime, guint32 * packet_count,
+ guint32 * octet_count)
+{
+ guint32 t_rtp;
+ guint64 t_current_ntp;
+ GstClockTimeDiff diff;
+
+ g_return_val_if_fail (RTP_IS_SOURCE (src), FALSE);
+
+ /* use the sync params to interpollate the date->time member to rtptime. We
+ * use the last sent timestamp and rtptime as reference points. We assume
+ * that the slope of the rtptime vs timestamp curve is 1, which is certainly
+ * sufficient for the frequency at which we report SR and the rate we send
+ * out RTP packets. */
+ t_rtp = src->last_rtptime;
+
+ GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %"
+ G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
+
+ if (src->clock_rate != -1) {
+ /* get the diff with the SR time */
+ diff = GST_CLOCK_DIFF (src->last_ntpnstime, ntpnstime);
+
+ /* now translate the diff to RTP time, handle positive and negative cases.
+ * If there is no diff, we already set rtptime correctly above. */
+ if (diff > 0) {
+ GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff));
+ t_rtp += gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
+ } else {
+ diff = -diff;
+ GST_DEBUG ("ntpnstime %" GST_TIME_FORMAT ", diff -%" GST_TIME_FORMAT,
+ GST_TIME_ARGS (ntpnstime), GST_TIME_ARGS (diff));
+ t_rtp -= gst_util_uint64_scale_int (diff, src->clock_rate, GST_SECOND);
+ }
+ } else {
+ GST_WARNING ("no clock-rate, cannot interpollate rtp time");
+ }
+
+ t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
+
+ GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
+ (guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff),
+ t_rtp);
+
+ if (ntptime)
+ *ntptime = t_current_ntp;
+ if (rtptime)
+ *rtptime = t_rtp;
+ if (packet_count)
+ *packet_count = src->stats.packets_sent;
+ if (octet_count)
+ *octet_count = src->stats.octets_sent;
+
+ return TRUE;
+}
+
+/**
+ * rtp_source_get_new_rb:
+ * @src: an #RTPSource
+ * @time: the current time in nanoseconds since 1970
+ * @fractionlost: fraction lost since last SR/RR
+ * @packetslost: the cumululative number of packets lost
+ * @exthighestseq: the extended last sequence number received
+ * @jitter: the interarrival jitter
+ * @lsr: the last SR packet from this source
+ * @dlsr: the delay since last SR packet
+ *
+ * Get the values of the last RB report set with rtp_source_process_rb().
+ *
+ * Returns: %TRUE on success.
+ */
+gboolean
+rtp_source_get_new_rb (RTPSource * src, GstClockTime time,
+ guint8 * fractionlost, gint32 * packetslost, guint32 * exthighestseq,
+ guint32 * jitter, guint32 * lsr, guint32 * dlsr)
+{
+ RTPSourceStats *stats;
+ guint64 extended_max, expected;
+ guint64 expected_interval, received_interval, ntptime;
+ gint64 lost, lost_interval;
+ guint32 fraction, LSR, DLSR;
+ GstClockTime sr_time;
+
+ stats = &src->stats;
+
+ extended_max = stats->cycles + stats->max_seq;
+ expected = extended_max - stats->base_seq + 1;
+
+ GST_DEBUG ("ext_max %" G_GUINT64_FORMAT ", expected %" G_GUINT64_FORMAT
+ ", received %" G_GUINT64_FORMAT ", base_seq %" G_GUINT32_FORMAT,
+ extended_max, expected, stats->packets_received, stats->base_seq);
+
+ lost = expected - stats->packets_received;
+ lost = CLAMP (lost, -0x800000, 0x7fffff);
+
+ expected_interval = expected - stats->prev_expected;
+ stats->prev_expected = expected;
+ received_interval = stats->packets_received - stats->prev_received;
+ stats->prev_received = stats->packets_received;
+
+ lost_interval = expected_interval - received_interval;
+
+ if (expected_interval == 0 || lost_interval <= 0)
+ fraction = 0;
+ else
+ fraction = (lost_interval << 8) / expected_interval;
+
+ GST_DEBUG ("add RR for SSRC %08x", src->ssrc);
+ /* we scaled the jitter up for additional precision */
+ GST_DEBUG ("fraction %" G_GUINT32_FORMAT ", lost %" G_GINT64_FORMAT
+ ", extseq %" G_GUINT64_FORMAT ", jitter %d", fraction, lost,
+ extended_max, stats->jitter >> 4);
+
+ if (rtp_source_get_last_sr (src, &sr_time, &ntptime, NULL, NULL, NULL)) {
+ GstClockTime diff;
+
+ /* LSR is middle 32 bits of the last ntptime */
+ LSR = (ntptime >> 16) & 0xffffffff;
+ diff = time - sr_time;
+ GST_DEBUG ("last SR time diff %" GST_TIME_FORMAT, GST_TIME_ARGS (diff));
+ /* DLSR, delay since last SR is expressed in 1/65536 second units */
+ DLSR = gst_util_uint64_scale_int (diff, 65536, GST_SECOND);
+ } else {
+ /* No valid SR received, LSR/DLSR are set to 0 then */
+ GST_DEBUG ("no valid SR received");
+ LSR = 0;
+ DLSR = 0;
+ }
+ GST_DEBUG ("LSR %04x:%04x, DLSR %04x:%04x", LSR >> 16, LSR & 0xffff,
+ DLSR >> 16, DLSR & 0xffff);
+
+ if (fractionlost)
+ *fractionlost = fraction;
+ if (packetslost)
+ *packetslost = lost;
+ if (exthighestseq)
+ *exthighestseq = extended_max;
+ if (jitter)
+ *jitter = stats->jitter >> 4;
+ if (lsr)
+ *lsr = LSR;
+ if (dlsr)
+ *dlsr = DLSR;
+
+ return TRUE;
+}
+
+/**
+ * rtp_source_get_last_sr:
+ * @src: an #RTPSource
* @time: time of packet arrival
+ * @ntptime: the NTP time
+ * @rtptime: the RTP time
+ * @packet_count: the packet count
+ * @octet_count: the octect count
*
* Get the values of the last sender report as set with rtp_source_process_sr().
*
* Returns: %TRUE if there was a valid SR report.
*/
gboolean
-rtp_source_get_last_sr (RTPSource * src, guint64 * ntptime, guint32 * rtptime,
- guint32 * packet_count, guint32 * octet_count, GstClockTime * time)
+rtp_source_get_last_sr (RTPSource * src, GstClockTime * time, guint64 * ntptime,
+ guint32 * rtptime, guint32 * packet_count, guint32 * octet_count)
{
RTPSenderReport *curr;
diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h
index 7920b6f4..be793461 100644
--- a/gst/rtpmanager/rtpsource.h
+++ b/gst/rtpmanager/rtpsource.h
@@ -134,13 +134,25 @@ struct _RTPSource {
GstNetAddress rtcp_from;
guint8 payload;
+ GstCaps *caps;
gint clock_rate;
+ gint32 seqnum_base;
+
+ gint64 clock_base;
+
+ /* to calculate the clock skew */
+ guint64 skew_base_ntpnstime;
+ guint64 skew_base_rtptime;
+ gint64 avg_skew;
+ guint64 ext_rtptime;
+ guint64 prev_ext_rtptime;
GstClockTime bye_time;
GstClockTime last_activity;
GstClockTime last_rtp_activity;
- GstClockTime last_timestamp;
+
GstClockTime last_rtptime;
+ GstClockTime last_ntpnstime;
GQueue *packets;
@@ -158,6 +170,7 @@ GType rtp_source_get_type (void);
/* managing lifetime of sources */
RTPSource* rtp_source_new (guint32 ssrc);
+void rtp_source_update_caps (RTPSource *src, GstCaps *caps);
void rtp_source_set_callbacks (RTPSource *src, RTPSourceCallbacks *cb, gpointer data);
void rtp_source_set_as_csrc (RTPSource *src);
@@ -168,18 +181,24 @@ void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *addres
/* handling RTP */
GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival);
-GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer);
+GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer, guint64 ntpnstime);
/* RTCP messages */
void rtp_source_process_bye (RTPSource *src, const gchar *reason);
-void rtp_source_process_sr (RTPSource *src, guint64 ntptime, guint32 rtptime,
- guint32 packet_count, guint32 octet_count, GstClockTime time);
-void rtp_source_process_rb (RTPSource *src, guint8 fractionlost, gint32 packetslost,
- guint32 exthighestseq, guint32 jitter,
+void rtp_source_process_sr (RTPSource *src, GstClockTime time, guint64 ntptime,
+ guint32 rtptime, guint32 packet_count, guint32 octet_count);
+void rtp_source_process_rb (RTPSource *src, GstClockTime time, guint8 fractionlost,
+ gint32 packetslost, guint32 exthighestseq, guint32 jitter,
guint32 lsr, guint32 dlsr);
-gboolean rtp_source_get_last_sr (RTPSource *src, guint64 *ntptime, guint32 *rtptime,
- guint32 *packet_count, guint32 *octet_count, GstClockTime *time);
+gboolean rtp_source_get_new_sr (RTPSource *src, GstClockTime time, guint64 *ntptime,
+ guint32 *rtptime, guint32 *packet_count, guint32 *octet_count);
+gboolean rtp_source_get_new_rb (RTPSource *src, GstClockTime time, guint8 *fractionlost,
+ gint32 *packetslost, guint32 *exthighestseq, guint32 *jitter,
+ guint32 *lsr, guint32 *dlsr);
+
+gboolean rtp_source_get_last_sr (RTPSource *src, GstClockTime *time, guint64 *ntptime,
+ guint32 *rtptime, guint32 *packet_count, guint32 *octet_count);
gboolean rtp_source_get_last_rb (RTPSource *src, guint8 *fractionlost, gint32 *packetslost,
guint32 *exthighestseq, guint32 *jitter,
guint32 *lsr, guint32 *dlsr);
diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h
index 0ee1ed1e..e2e4e397 100644
--- a/gst/rtpmanager/rtpstats.h
+++ b/gst/rtpmanager/rtpstats.h
@@ -51,6 +51,7 @@ typedef struct {
guint32 jitter;
guint32 lsr;
guint32 dlsr;
+ guint32 round_trip;
} RTPReceiverReport;
/**
@@ -64,6 +65,7 @@ typedef struct {
*/
typedef struct {
GstClockTime time;
+ guint64 ntpnstime;
gboolean have_address;
GstNetAddress address;
guint bytes;