summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog48
-rw-r--r--gst/rtpmanager/gstrtpbin.c31
-rw-r--r--gst/rtpmanager/gstrtpjitterbuffer.c2
-rw-r--r--gst/rtpmanager/gstrtpsession.c138
-rw-r--r--gst/rtpmanager/gstrtpsession.h1
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.c116
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.h2
-rw-r--r--gst/rtpmanager/rtpsession.c2
-rw-r--r--gst/rtpmanager/rtpsource.c53
9 files changed, 354 insertions, 39 deletions
diff --git a/ChangeLog b/ChangeLog
index 007eecd8..815f7c9f 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,51 @@
+2007-09-12 Wim Taymans <wim.taymans@gmail.com>
+
+ * gst/rtpmanager/gstrtpbin.c: (calc_ntp_ns_base),
+ (gst_rtp_bin_change_state), (new_payload_found), (create_send_rtp):
+ Calculate and configure the NTP base time so that we can generate better
+ NTP times in SR packets.
+ Set caps on new ghostpad.
+
+ * gst/rtpmanager/gstrtpjitterbuffer.c:
+ (gst_rtp_jitter_buffer_loop):
+ Clean debug statement.
+
+ * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init),
+ (gst_rtp_session_init), (gst_rtp_session_set_property),
+ (gst_rtp_session_get_property), (get_current_ntp_ns_time),
+ (rtcp_thread), (gst_rtp_session_event_recv_rtp_sink),
+ (gst_rtp_session_internal_links), (gst_rtp_session_chain_recv_rtp),
+ (gst_rtp_session_event_send_rtp_sink),
+ (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink),
+ (create_send_rtp_sink):
+ * gst/rtpmanager/gstrtpsession.h:
+ Add ntp-ns-base property to convert running_time to NTP time.
+ Handle NEWSEGMENT events on send and recv RTP pads so that we can
+ calculate the running time and thus NTP time of the packets.
+ Simplify getting the current NTP time using the pipeline clock.
+ Implement internal links functions.
+ Use the buffer timestamp to calculate the NTP time instead of the clock.
+
+ * gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc),
+ (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event),
+ (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain),
+ (gst_rtp_ssrc_demux_internal_links),
+ (gst_rtp_ssrc_demux_src_query):
+ * gst/rtpmanager/gstrtpssrcdemux.h:
+ Implement internal links function.
+ Calculate the diff between different streams, this might be used later
+ to get the inter stream latency.
+
+ * gst/rtpmanager/rtpsession.c: (rtp_session_send_rtp):
+ Simple cleanup.
+
+ * gst/rtpmanager/rtpsource.c: (rtp_source_init),
+ (calculate_jitter), (rtp_source_send_rtp), (rtp_source_get_new_sr):
+ Make the clock skew window a little bigger.
+ Apply the clock skew to all buffers, not just one with a new timestamp.
+ Calculate and debug sender clock drift.
+ Use extended last timestamp to interpollate for SR reports.
+
2007-09-12 Tim-Philipp Müller <tim at centricular dot net>
Patch by: Peter Kjellerstedt <pkj at axis com>
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
index a4ba67c3..7d5fb5d1 100644
--- a/gst/rtpmanager/gstrtpbin.c
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -208,6 +208,8 @@ GST_STATIC_PAD_TEMPLATE ("sink_%d",
struct _GstRtpBinPrivate
{
GMutex *bin_lock;
+
+ GstClockTime ntp_ns_base;
};
/* signals and args */
@@ -1142,6 +1144,30 @@ gst_rtp_bin_provide_clock (GstElement * element)
return GST_CLOCK_CAST (gst_object_ref (rtpbin->provided_clock));
}
+static void
+calc_ntp_ns_base (GstRtpBin * bin)
+{
+ GstClockTime now;
+ GTimeVal current;
+ GSList *walk;
+
+ /* get the current time and convert it to NTP time in nanoseconds */
+ g_get_current_time (&current);
+ now = GST_TIMEVAL_TO_TIME (current);
+ now += (2208988800LL * GST_SECOND);
+
+ GST_RTP_BIN_LOCK (bin);
+ bin->priv->ntp_ns_base = now;
+ for (walk = bin->sessions; walk; walk = g_slist_next (walk)) {
+ GstRtpBinSession *session = (GstRtpBinSession *) walk->data;
+
+ g_object_set (session->session, "ntp-ns-base", now, NULL);
+ }
+ GST_RTP_BIN_UNLOCK (bin);
+
+ return;
+}
+
static GstStateChangeReturn
gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
{
@@ -1156,6 +1182,7 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ calc_ntp_ns_base (rtpbin);
break;
default:
break;
@@ -1199,6 +1226,7 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,
gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
g_free (padname);
+ gst_pad_set_caps (gpad, GST_PAD_CAPS (pad));
gst_pad_set_active (gpad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
}
@@ -1553,9 +1581,6 @@ create_send_rtp (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->send_rtp_sink == NULL)
goto pad_failed;
- g_signal_connect (session->send_rtp_sink, "notify::caps",
- (GCallback) caps_changed, session);
-
result =
gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
gst_pad_set_active (result, TRUE);
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c
index a23fbb87..08a55f2b 100644
--- a/gst/rtpmanager/gstrtpjitterbuffer.c
+++ b/gst/rtpmanager/gstrtpjitterbuffer.c
@@ -977,7 +977,7 @@ again:
GST_DEBUG_OBJECT (jitterbuffer,
"Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
- ",now %d left", seqnum, rtp_time, exttimestamp,
+ ", now %d left", seqnum, rtp_time, exttimestamp,
rtp_jitter_buffer_num_packets (priv->jbuf));
/* If we don't know what the next seqnum should be (== -1) we have to wait
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index 87948a4b..70833655 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -214,6 +214,8 @@ enum
LAST_SIGNAL
};
+#define DEFAULT_NTP_NS_BASE 0
+
enum
{
PROP_0,
@@ -462,6 +464,11 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout),
NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE,
+ g_param_spec_uint64 ("ntp-ns-base", "NTP base time",
+ "The NTP base time corresponding to running_time 0", 0,
+ G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE));
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad =
@@ -497,6 +504,9 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
g_signal_connect (rtpsession->priv->session, "on-timeout",
(GCallback) on_timeout, rtpsession);
rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
+
+ gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+ gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
}
static void
@@ -521,7 +531,11 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_NTP_NS_BASE:
+ GST_OBJECT_LOCK (rtpsession);
rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
+ GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (rtpsession->priv->ntpnsbase));
+ GST_OBJECT_UNLOCK (rtpsession);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -539,7 +553,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_NTP_NS_BASE:
+ GST_OBJECT_LOCK (rtpsession);
g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
+ GST_OBJECT_UNLOCK (rtpsession);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -548,19 +564,31 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
}
static guint64
-get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
+get_current_ntp_ns_time (GstRtpSession * rtpsession)
{
guint64 ntpnstime;
+ GstClock *clock;
+ GstClockTime base_time, ntpnsbase;
+
+ GST_OBJECT_LOCK (rtpsession);
+ if ((clock = GST_ELEMENT_CLOCK (rtpsession))) {
+ base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
+ ntpnsbase = rtpsession->priv->ntpnsbase;
+ gst_object_ref (clock);
+ GST_OBJECT_UNLOCK (rtpsession);
- if (clock) {
/* get current NTP time */
ntpnstime = gst_clock_get_time (clock);
/* convert to running time */
- ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession));
+ ntpnstime -= base_time;
/* add NTP base offset */
- ntpnstime += rtpsession->priv->ntpnsbase;
- } else
+ ntpnstime += ntpnsbase;
+
+ gst_object_unref (clock);
+ } else {
+ GST_OBJECT_UNLOCK (rtpsession);
ntpnstime = -1;
+ }
return ntpnstime;
}
@@ -568,7 +596,7 @@ get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
static void
rtcp_thread (GstRtpSession * rtpsession)
{
- GstClock *sysclock, *clock;
+ GstClock *sysclock;
GstClockID id;
GstClockTime current_time;
GstClockTime next_timeout;
@@ -579,9 +607,6 @@ rtcp_thread (GstRtpSession * rtpsession)
if (sysclock == NULL)
goto no_sysclock;
- /* to get the current NTP time, we use the pipeline clock */
- clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
-
current_time = gst_clock_get_time (sysclock);
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
@@ -619,7 +644,7 @@ rtcp_thread (GstRtpSession * rtpsession)
current_time = gst_clock_get_time (sysclock);
/* get current NTP time */
- ntpnstime = get_current_ntp_ns_time (rtpsession, clock);
+ ntpnstime = get_current_ntp_ns_time (rtpsession);
/* we get unlocked because we need to perform reconsideration, don't perform
* the timeout but get a new reporting estimate. */
@@ -969,6 +994,41 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
GST_EVENT_TYPE_NAME (event));
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_STOP:
+ gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED);
+ break;
+ case GST_EVENT_NEWSEGMENT:
+ {
+ gboolean update;
+ gdouble rate, arate;
+ GstFormat format;
+ gint64 start, stop, time;
+ GstSegment *segment;
+
+ segment = &rtpsession->recv_rtp_seg;
+
+ /* the newsegment event is needed to convert the RTP timestamp to
+ * running_time, which is needed to generate a mapping from RTP to NTP
+ * timestamps in SR reports */
+ gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
+ &start, &stop, &time);
+
+ GST_DEBUG_OBJECT (rtpsession,
+ "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
+ "format GST_FORMAT_TIME, "
+ "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
+ ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
+ update, rate, arate, GST_TIME_ARGS (segment->start),
+ GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
+ GST_TIME_ARGS (segment->accum));
+
+ gst_segment_set_newsegment_full (segment, update, rate,
+ arate, format, start, stop, time);
+
+ /* push event forward */
+ ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
+ break;
+ }
default:
ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
break;
@@ -976,6 +1036,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
gst_object_unref (rtpsession);
return ret;
+
+}
+static GList *
+gst_rtp_session_internal_links (GstPad * pad)
+{
+ GstRtpSession *rtpsession;
+ GstRtpSessionPrivate *priv;
+ GList *res = NULL;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
+
+ if (pad == rtpsession->recv_rtp_src) {
+ res = g_list_prepend (res, rtpsession->recv_rtp_sink);
+ } else if (pad == rtpsession->recv_rtp_sink) {
+ res = g_list_prepend (res, rtpsession->recv_rtp_src);
+ } else if (pad == rtpsession->send_rtp_src) {
+ res = g_list_prepend (res, rtpsession->send_rtp_sink);
+ } else if (pad == rtpsession->send_rtp_sink) {
+ res = g_list_prepend (res, rtpsession->send_rtp_src);
+ }
+
+ gst_object_unref (rtpsession);
+
+ return res;
}
static gboolean
@@ -1006,14 +1091,25 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
guint64 ntpnstime;
+ GstClockTime timestamp;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- ntpnstime =
- get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession));
+ /* get NTP time when this packet was captured, this depends on the timestamp. */
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+ /* convert to running time using the segment values */
+ ntpnstime =
+ gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME,
+ timestamp);
+ /* add constant to convert running time to NTP time */
+ ntpnstime += priv->ntpnsbase;
+ } else {
+ ntpnstime = get_current_ntp_ns_time (rtpsession);
+ }
ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
@@ -1084,6 +1180,9 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
GST_DEBUG_OBJECT (rtpsession, "received event");
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_STOP:
+ gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED);
+ break;
case GST_EVENT_NEWSEGMENT:
{
gboolean update;
@@ -1146,7 +1245,10 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* convert to running time using the segment start value. */
- ntpnstime = timestamp - rtpsession->send_rtp_seg.start;
+ ntpnstime =
+ gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME,
+ timestamp);
+ /* convert to NTP time by adding the NTP base */
ntpnstime += priv->ntpnsbase;
} else
ntpnstime = -1;
@@ -1175,6 +1277,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
gst_rtp_session_event_recv_rtp_sink);
gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink,
gst_rtp_session_sink_setcaps);
+ gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink,
+ gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->recv_rtp_sink);
@@ -1183,6 +1287,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
rtpsession->recv_rtp_src =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
"recv_rtp_src");
+ gst_pad_set_internal_link_function (rtpsession->recv_rtp_src,
+ gst_rtp_session_internal_links);
gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
@@ -1235,8 +1341,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
gst_rtp_session_chain_send_rtp);
gst_pad_set_event_function (rtpsession->send_rtp_sink,
gst_rtp_session_event_send_rtp_sink);
- gst_pad_set_setcaps_function (rtpsession->send_rtp_sink,
- gst_rtp_session_sink_setcaps);
+ gst_pad_set_internal_link_function (rtpsession->send_rtp_sink,
+ gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtp_sink);
@@ -1245,6 +1351,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
"send_rtp_src");
gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
+ gst_pad_set_internal_link_function (rtpsession->send_rtp_src,
+ gst_rtp_session_internal_links);
gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h
index 09565ac1..3fffb067 100644
--- a/gst/rtpmanager/gstrtpsession.h
+++ b/gst/rtpmanager/gstrtpsession.h
@@ -43,6 +43,7 @@ struct _GstRtpSession {
/*< private >*/
GstPad *recv_rtp_sink;
+ GstSegment recv_rtp_seg;
GstPad *recv_rtcp_sink;
GstPad *send_rtp_sink;
GstSegment send_rtp_seg;
diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c
index 5457bc35..c728a6d5 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.c
+++ b/gst/rtpmanager/gstrtpssrcdemux.c
@@ -125,6 +125,8 @@ static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
/* srcpad stuff */
static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
+static GList *gst_rtp_ssrc_demux_internal_links (GstPad * pad);
+static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
@@ -137,6 +139,7 @@ struct _GstRtpSsrcDemuxPad
GstPad *rtp_pad;
GstCaps *caps;
GstPad *rtcp_pad;
+ GstClockTime first_ts;
};
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@@ -156,7 +159,8 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
}
static GstRtpSsrcDemuxPad *
-create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+ GstClockTime timestamp)
{
GstPad *rtp_pad, *rtcp_pad;
GstElementClass *klass;
@@ -177,13 +181,27 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
rtcp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
+ /* we use the first timestamp received to calculate the difference between
+ * timestamps on all streams */
+ GST_DEBUG_OBJECT (demux, "SSRC %08x, first timestamp %" GST_TIME_FORMAT,
+ ssrc, GST_TIME_ARGS (timestamp));
+
/* wrap in structure and add to list */
demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1);
demuxpad->ssrc = ssrc;
demuxpad->rtp_pad = rtp_pad;
demuxpad->rtcp_pad = rtcp_pad;
+ demuxpad->first_ts = timestamp;
+
+ GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timestamp));
+
+ gst_pad_set_element_private (rtp_pad, demuxpad);
+ gst_pad_set_element_private (rtcp_pad, demuxpad);
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+
+ /* unlock to perform the remainder and to fire our signal */
GST_OBJECT_UNLOCK (demux);
/* copy caps from input */
@@ -193,7 +211,13 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+ gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
+ gst_pad_set_internal_link_function (rtp_pad,
+ gst_rtp_ssrc_demux_internal_links);
gst_pad_set_active (rtp_pad, TRUE);
+
+ gst_pad_set_internal_link_function (rtcp_pad,
+ gst_rtp_ssrc_demux_internal_links);
gst_pad_set_active (rtcp_pad, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
@@ -277,6 +301,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux,
gst_pad_set_event_function (demux->rtcp_sink,
gst_rtp_ssrc_demux_rtcp_sink_event);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
+
+ gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
}
static void
@@ -298,6 +324,9 @@ gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_STOP:
+ gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
+ break;
case GST_EVENT_NEWSEGMENT:
default:
{
@@ -370,7 +399,9 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
GST_OBJECT_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) {
- if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
+ if (!(dpad =
+ create_demux_pad_for_ssrc (demux, ssrc,
+ GST_BUFFER_TIMESTAMP (buf))))
goto create_failed;
}
GST_OBJECT_UNLOCK (demux);
@@ -419,6 +450,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
/* first packet must be SR or RR or else the validate would have failed */
switch (gst_rtcp_packet_get_type (&packet)) {
case GST_RTCP_TYPE_SR:
+ /* get the ssrc so that we can route it to the right source pad */
gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
NULL);
break;
@@ -435,7 +467,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) {
GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
- if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc)))
+ if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1)))
goto create_failed;
}
GST_OBJECT_UNLOCK (demux);
@@ -482,6 +514,84 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
return res;
}
+static GList *
+gst_rtp_ssrc_demux_internal_links (GstPad * pad)
+{
+ GstRtpSsrcDemux *demux;
+ GList *res = NULL;
+ GSList *walk;
+
+ demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+
+ GST_OBJECT_LOCK (demux);
+ for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+ GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+
+ if (pad == demux->rtp_sink) {
+ res = g_list_prepend (res, dpad->rtp_pad);
+ } else if (pad == demux->rtcp_sink) {
+ res = g_list_prepend (res, dpad->rtcp_pad);
+ } else if (pad == dpad->rtp_pad) {
+ res = g_list_prepend (res, demux->rtp_sink);
+ break;
+ } else if (pad == dpad->rtcp_pad) {
+ res = g_list_prepend (res, demux->rtcp_sink);
+ break;
+ }
+ }
+ GST_OBJECT_UNLOCK (demux);
+
+ gst_object_unref (demux);
+ return res;
+}
+
+static gboolean
+gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
+{
+ GstRtpSsrcDemux *demux;
+ gboolean res = FALSE;
+
+ demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+
+ switch (GST_QUERY_TYPE (query)) {
+ case GST_QUERY_LATENCY:
+ {
+
+ if ((res = gst_pad_peer_query (demux->rtp_sink, query))) {
+ gboolean live;
+ GstClockTime min_latency, max_latency;
+ GstRtpSsrcDemuxPad *demuxpad;
+
+ demuxpad = gst_pad_get_element_private (pad);
+
+ gst_query_parse_latency (query, &live, &min_latency, &max_latency);
+
+ GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency));
+
+ GST_DEBUG_OBJECT (demux,
+ "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc,
+ GST_TIME_ARGS (demuxpad->first_ts));
+
+#if 0
+ min_latency += demuxpad->first_ts;
+ if (max_latency != -1)
+ max_latency += demuxpad->first_ts;
+#endif
+
+ gst_query_set_latency (query, live, min_latency, max_latency);
+ }
+ break;
+ }
+ default:
+ res = gst_pad_query_default (pad, query);
+ break;
+ }
+ gst_object_unref (demux);
+
+ return res;
+}
+
static GstStateChangeReturn
gst_rtp_ssrc_demux_change_state (GstElement * element,
GstStateChange transition)
diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h
index bea2769d..88aeed80 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.h
+++ b/gst/rtpmanager/gstrtpssrcdemux.h
@@ -36,6 +36,8 @@ struct _GstRtpSsrcDemux
{
GstElement parent;
+ GstSegment segment;
+
GstPad *rtp_sink;
GstPad *rtcp_sink;
GSList *srcpads;
diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c
index e7f72b40..724fa24a 100644
--- a/gst/rtpmanager/rtpsession.c
+++ b/gst/rtpmanager/rtpsession.c
@@ -1423,7 +1423,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, guint64 ntptime)
prevsender = RTP_SOURCE_IS_SENDER (source);
/* we use our own source to send */
- result = rtp_source_send_rtp (sess->source, buffer, ntptime);
+ result = rtp_source_send_rtp (source, buffer, ntptime);
if (RTP_SOURCE_IS_SENDER (source) && !prevsender)
sess->stats.sender_sources++;
diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c
index 63543358..c4152474 100644
--- a/gst/rtpmanager/rtpsource.c
+++ b/gst/rtpmanager/rtpsource.c
@@ -74,6 +74,7 @@ rtp_source_init (RTPSource * src)
src->prev_ext_rtptime = -1;
src->packets = g_queue_new ();
src->seqnum_base = -1;
+ src->last_rtptime = -1;
src->stats.cycles = -1;
src->stats.jitter = 0;
@@ -320,22 +321,19 @@ calculate_jitter (RTPSource * src, GstBuffer * buffer,
* out of sync and we must compensate. */
skew = ntpdiff - rtpdiff;
/* average out the skew to get a smooth value. */
- src->avg_skew = (31 * src->avg_skew + skew) / 32;
+ src->avg_skew = (63 * src->avg_skew + skew) / 64;
- GST_DEBUG ("skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
+ GST_DEBUG ("new skew %" G_GINT64_FORMAT ", avg %" G_GINT64_FORMAT, skew,
src->avg_skew);
- if (src->avg_skew != 0) {
- guint32 timestamp;
-
- /* patch the buffer RTP timestamp with the skew */
- GST_DEBUG ("adjusting timestamp %" G_GINT64_FORMAT, src->avg_skew);
- timestamp = gst_rtp_buffer_get_timestamp (buffer);
- timestamp += src->avg_skew;
- gst_rtp_buffer_set_timestamp (buffer, timestamp);
- }
/* store previous extended timestamp */
src->prev_ext_rtptime = ext_rtptime;
}
+ if (src->avg_skew != 0) {
+ /* patch the buffer RTP timestamp with the skew */
+ GST_DEBUG ("skew timestamp RTP %" G_GUINT32_FORMAT " -> %" G_GINT64_FORMAT,
+ rtptime, rtptime + src->avg_skew);
+ gst_rtp_buffer_set_timestamp (buffer, rtptime + src->avg_skew);
+ }
/* convert arrival time to RTP timestamp units, truncate to 32 bits, we don't
* care about the absolute value, just the difference. */
@@ -555,6 +553,9 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
{
GstFlowReturn result = GST_FLOW_OK;
guint len;
+ guint32 rtptime;
+ guint64 ext_rtptime;
+ guint64 ntp_diff, rtp_diff;
g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
@@ -570,9 +571,27 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
src->stats.packets_sent++;
src->stats.octets_sent += len;
+ rtptime = gst_rtp_buffer_get_timestamp (buffer);
+ ext_rtptime = src->last_rtptime;
+ ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
+
+ GST_DEBUG ("SSRC %08x, RTP %" G_GUINT64_FORMAT ", NTP %" GST_TIME_FORMAT,
+ src->ssrc, ext_rtptime, GST_TIME_ARGS (ntpnstime));
+
+ if (ext_rtptime > src->last_rtptime) {
+ rtp_diff = ext_rtptime - src->last_rtptime;
+ ntp_diff = ntpnstime - src->last_ntpnstime;
+
+ /* calc the diff so we can detect drift at the sender. This can also be used
+ * to guestimate the clock rate if the NTP time is locked to the RTP
+ * timestamps (as is the case when the capture device is providing the clock). */
+ GST_DEBUG ("SSRC %08x, diff RTP %" G_GUINT64_FORMAT ", diff NTP %"
+ GST_TIME_FORMAT, src->ssrc, rtp_diff, GST_TIME_ARGS (ntp_diff));
+ }
+
/* we keep track of the last received RTP timestamp and the corresponding
* NTP timestamp so that we can use this info when constructing SR reports */
- src->last_rtptime = gst_rtp_buffer_get_timestamp (buffer);
+ src->last_rtptime = ext_rtptime;
src->last_ntpnstime = ntpnstime;
/* push packet */
@@ -587,7 +606,8 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime)
* get the correct SSRC. */
buffer = gst_buffer_make_writable (buffer);
- GST_DEBUG ("updating SSRC from %08x to %08x", ssrc, src->ssrc);
+ GST_WARNING ("updating SSRC from %08x to %08x, fix the payloader", ssrc,
+ src->ssrc);
gst_rtp_buffer_set_ssrc (buffer, src->ssrc);
}
GST_DEBUG ("pushing RTP packet %" G_GUINT64_FORMAT,
@@ -716,7 +736,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
guint64 * ntptime, guint32 * rtptime, guint32 * packet_count,
guint32 * octet_count)
{
- guint32 t_rtp;
+ guint64 t_rtp;
guint64 t_current_ntp;
GstClockTimeDiff diff;
@@ -730,7 +750,7 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
t_rtp = src->last_rtptime;
GST_DEBUG ("last_ntpnstime %" GST_TIME_FORMAT ", last_rtptime %"
- G_GUINT32_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
+ G_GUINT64_FORMAT, GST_TIME_ARGS (src->last_ntpnstime), t_rtp);
if (src->clock_rate != -1) {
/* get the diff with the SR time */
@@ -752,11 +772,12 @@ rtp_source_get_new_sr (RTPSource * src, GstClockTime ntpnstime,
GST_WARNING ("no clock-rate, cannot interpollate rtp time");
}
+ /* convert the NTP time in nanoseconds to 32.32 fixed point */
t_current_ntp = gst_util_uint64_scale (ntpnstime, (1LL << 32), GST_SECOND);
GST_DEBUG ("NTP %08x:%08x, RTP %" G_GUINT32_FORMAT,
(guint32) (t_current_ntp >> 32), (guint32) (t_current_ntp & 0xffffffff),
- t_rtp);
+ (guint32) t_rtp);
if (ntptime)
*ntptime = t_current_ntp;