summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/gstrtpsession.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/gstrtpsession.c')
-rw-r--r--gst/rtpmanager/gstrtpsession.c229
1 files changed, 151 insertions, 78 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index 985a3713..e716682c 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -132,6 +132,8 @@
#include "config.h"
#endif
+#include <gst/rtp/gstrtpbuffer.h>
+
#include "gstrtpbin-marshal.h"
#include "gstrtpsession.h"
#include "rtpsession.h"
@@ -214,7 +216,8 @@ enum
enum
{
- PROP_0
+ PROP_0,
+ PROP_NTP_NS_BASE
};
#define GST_RTP_SESSION_GET_PRIVATE(obj) \
@@ -234,8 +237,10 @@ struct _GstRtpSessionPrivate
GThread *thread;
/* caps mapping */
- guint8 pt;
- gint clock_rate;
+ GHashTable *ptmap;
+
+ /* NTP base time */
+ guint64 ntpnsbase;
};
/* callbacks to handle actions from the session manager */
@@ -245,18 +250,18 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
gpointer user_data);
-static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
- gpointer user_data);
static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data);
static RTPSessionCallbacks callbacks = {
gst_rtp_session_process_rtp,
gst_rtp_session_send_rtp,
gst_rtp_session_send_rtcp,
+ gst_rtp_session_sync_rtcp,
gst_rtp_session_clock_rate,
- gst_rtp_session_get_time,
gst_rtp_session_reconsider
};
@@ -363,6 +368,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass)
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
+
/**
* GstRtpSession::request-pt-map:
* @sess: the object which received the signal
@@ -490,6 +496,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass)
(GCallback) on_bye_timeout, rtpsession);
g_signal_connect (rtpsession->priv->session, "on-timeout",
(GCallback) on_timeout, rtpsession);
+ rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL);
}
static void
@@ -513,6 +520,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id,
rtpsession = GST_RTP_SESSION (object);
switch (prop_id) {
+ case PROP_NTP_NS_BASE:
+ rtpsession->priv->ntpnsbase = g_value_get_uint64 (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -528,26 +538,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
rtpsession = GST_RTP_SESSION (object);
switch (prop_id) {
+ case PROP_NTP_NS_BASE:
+ g_value_set_uint64 (value, rtpsession->priv->ntpnsbase);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
+static guint64
+get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock)
+{
+ guint64 ntpnstime;
+
+ if (clock) {
+ /* get current NTP time */
+ ntpnstime = gst_clock_get_time (clock);
+ /* convert to running time */
+ ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession));
+ /* add NTP base offset */
+ ntpnstime += rtpsession->priv->ntpnsbase;
+ } else
+ ntpnstime = -1;
+
+ return ntpnstime;
+}
+
static void
rtcp_thread (GstRtpSession * rtpsession)
{
- GstClock *clock;
+ GstClock *sysclock, *clock;
GstClockID id;
GstClockTime current_time;
GstClockTime next_timeout;
+ guint64 ntpnstime;
- /* RTCP timeouts we use the system clock */
- clock = gst_system_clock_obtain ();
- if (clock == NULL)
- goto no_clock;
+ /* for RTCP timeouts we use the system clock */
+ sysclock = gst_system_clock_obtain ();
+ if (sysclock == NULL)
+ goto no_sysclock;
+
+ /* to get the current NTP time, we use the pipeline clock */
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
- current_time = gst_clock_get_time (clock);
+ current_time = gst_clock_get_time (sysclock);
GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
@@ -568,7 +603,7 @@ rtcp_thread (GstRtpSession * rtpsession)
break;
id = rtpsession->priv->id =
- gst_clock_new_single_shot_id (clock, next_timeout);
+ gst_clock_new_single_shot_id (sysclock, next_timeout);
GST_RTP_SESSION_UNLOCK (rtpsession);
res = gst_clock_id_wait (id, NULL);
@@ -581,7 +616,10 @@ rtcp_thread (GstRtpSession * rtpsession)
break;
/* update current time */
- current_time = gst_clock_get_time (clock);
+ current_time = gst_clock_get_time (sysclock);
+
+ /* get current NTP time */
+ ntpnstime = get_current_ntp_ns_time (rtpsession, clock);
/* we get unlocked because we need to perform reconsideration, don't perform
* the timeout but get a new reporting estimate. */
@@ -590,18 +628,18 @@ rtcp_thread (GstRtpSession * rtpsession)
/* perform actions, we ignore result. Release lock because it might push. */
GST_RTP_SESSION_UNLOCK (rtpsession);
- rtp_session_on_timeout (rtpsession->priv->session, current_time);
+ rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime);
GST_RTP_SESSION_LOCK (rtpsession);
}
GST_RTP_SESSION_UNLOCK (rtpsession);
- gst_object_unref (clock);
+ gst_object_unref (sysclock);
GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
return;
/* ERRORS */
-no_clock:
+no_sysclock:
{
GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL),
("Could not get system clock"));
@@ -662,7 +700,6 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
case GST_STATE_CHANGE_NULL_TO_READY:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
- priv->clock_rate = -1;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
@@ -677,17 +714,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- {
- GstClockTime base_time;
-
- base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
-
- rtp_session_set_base_time (priv->session, base_time);
-
if (!start_rtcp_thread (rtpsession))
goto failed_thread;
break;
- }
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -774,6 +803,15 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
priv = rtpsession->priv;
if (rtpsession->send_rtcp_src) {
+ GstCaps *caps;
+
+ /* set rtcp caps on output pad */
+ if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) {
+ caps = gst_caps_new_simple ("application/x-rtcp", NULL);
+ gst_pad_set_caps (rtpsession->send_rtcp_src, caps);
+ gst_caps_unref (caps);
+ }
+ gst_buffer_set_caps (buffer, caps);
GST_DEBUG_OBJECT (rtpsession, "sending RTCP");
result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
} else {
@@ -784,30 +822,59 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
return result;
}
-static gboolean
-gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps)
+/* called when the session manager has an SR RTCP packet ready for handling
+ * inter stream synchronisation */
+static GstFlowReturn
+gst_rtp_session_sync_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data)
{
+ GstFlowReturn result;
+ GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
- const GstStructure *caps_struct;
+ rtpsession = GST_RTP_SESSION (user_data);
priv = rtpsession->priv;
- GST_DEBUG_OBJECT (rtpsession, "parsing caps");
+ if (rtpsession->sync_src) {
+ GstCaps *caps;
- caps_struct = gst_caps_get_structure (caps, 0);
- if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
- goto no_clock_rate;
+ /* set rtcp caps on output pad */
+ if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) {
+ caps = gst_caps_new_simple ("application/x-rtcp", NULL);
+ gst_pad_set_caps (rtpsession->sync_src, caps);
+ gst_caps_unref (caps);
+ }
+ gst_buffer_set_caps (buffer, caps);
+ GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP");
+ result = gst_pad_push (rtpsession->sync_src, buffer);
+ } else {
+ GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad");
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+static void
+gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps)
+{
+ GstRtpSessionPrivate *priv;
+ const GstStructure *s;
+ gint payload;
- GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate);
+ priv = rtpsession->priv;
- return TRUE;
+ GST_DEBUG_OBJECT (rtpsession, "parsing caps");
- /* ERRORS */
-no_clock_rate:
- {
- GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
- return FALSE;
- }
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (s, "payload", &payload))
+ return;
+
+ caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload));
+ if (caps)
+ return;
+
+ g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps);
}
/* called when the session manager needs the clock rate */
@@ -821,13 +888,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
GValue ret = { 0 };
GValue args[2] = { {0}, {0} };
GstCaps *caps;
+ const GstStructure *s;
rtpsession = GST_RTP_SESSION_CAST (user_data);
priv = rtpsession->priv;
- /* if we have it, return it */
- if (priv->clock_rate != -1)
- return priv->clock_rate;
+ GST_RTP_SESSION_LOCK (rtpsession);
+ caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload));
+ if (caps)
+ goto done;
g_value_init (&args[0], GST_TYPE_ELEMENT);
g_value_set_object (&args[0], rtpsession);
@@ -844,10 +913,16 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
if (!caps)
goto no_caps;
- if (!gst_rtp_session_parse_caps (rtpsession, caps))
- goto parse_failed;
+ gst_rtp_session_cache_caps (rtpsession, caps);
+
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (s, "clock-rate", &result))
+ goto no_clock_rate;
+
+ GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result);
- result = priv->clock_rate;
+done:
+ GST_RTP_SESSION_UNLOCK (rtpsession);
return result;
@@ -855,35 +930,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
no_caps:
{
GST_DEBUG_OBJECT (rtpsession, "could not get caps");
- return -1;
+ goto done;
}
-parse_failed:
+no_clock_rate:
{
- GST_DEBUG_OBJECT (rtpsession, "failed to parse caps");
- return -1;
+ GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!");
+ goto done;
}
}
-/* called when the session manager needs the time of clock */
-static GstClockTime
-gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
-{
- GstClockTime result;
- GstRtpSession *rtpsession;
- GstClock *clock;
-
- rtpsession = GST_RTP_SESSION_CAST (user_data);
-
- clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
- if (clock) {
- result = gst_clock_get_time (clock);
- gst_object_unref (clock);
- } else
- result = GST_CLOCK_TIME_NONE;
-
- return result;
-}
-
/* called when the session manager asks us to reconsider the timeout */
static void
gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data)
@@ -925,18 +980,19 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
static gboolean
gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps)
{
- gboolean res;
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
- res = gst_rtp_session_parse_caps (rtpsession, caps);
+ GST_RTP_SESSION_LOCK (rtpsession);
+ gst_rtp_session_cache_caps (rtpsession, caps);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
gst_object_unref (rtpsession);
- return res;
+ return TRUE;
}
/* receive a packet from a sender, send it to the RTP session manager and
@@ -948,13 +1004,17 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
+ guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- ret = rtp_session_process_rtp (priv->session, buffer);
+ ntpnstime =
+ get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession));
+
+ ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime);
gst_object_unref (rtpsession);
@@ -1051,8 +1111,6 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
gst_segment_set_newsegment_full (segment, update, rate,
arate, format, start, stop, time);
- rtp_session_set_timestamp_sync (priv->session, start);
-
/* push event forward */
ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
break;
@@ -1075,13 +1133,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
GstRtpSession *rtpsession;
GstRtpSessionPrivate *priv;
GstFlowReturn ret;
+ GstClockTime timestamp;
+ guint64 ntpnstime;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- ret = rtp_session_send_rtp (priv->session, buffer);
+ /* get NTP time when this packet was captured, this depends on the timestamp. */
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+ /* convert to running time using the segment start value. */
+ ntpnstime = timestamp - rtpsession->send_rtp_seg.start;
+ ntpnstime += priv->ntpnsbase;
+ } else
+ ntpnstime = -1;
+
+ ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime);
gst_object_unref (rtpsession);
@@ -1113,6 +1182,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
rtpsession->recv_rtp_src =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
"recv_rtp_src");
+ gst_pad_use_fixed_caps (rtpsession->recv_rtp_src);
gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
@@ -1142,6 +1212,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession)
rtpsession->sync_src =
gst_pad_new_from_static_template (&rtpsession_sync_src_template,
"sync_src");
+ gst_pad_use_fixed_caps (rtpsession->sync_src);
gst_pad_set_active (rtpsession->sync_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
@@ -1172,6 +1243,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession)
rtpsession->send_rtp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
"send_rtp_src");
+ gst_pad_use_fixed_caps (rtpsession->send_rtp_src);
gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
@@ -1190,6 +1262,7 @@ create_send_rtcp_src (GstRtpSession * rtpsession)
rtpsession->send_rtcp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
"send_rtcp_src");
+ gst_pad_use_fixed_caps (rtpsession->send_rtcp_src);
gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->send_rtcp_src);