diff options
Diffstat (limited to 'gst/rtpmanager/gstrtpsession.c')
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 229 |
1 files changed, 151 insertions, 78 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 985a3713..e716682c 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -132,6 +132,8 @@ #include "config.h" #endif +#include <gst/rtp/gstrtpbuffer.h> + #include "gstrtpbin-marshal.h" #include "gstrtpsession.h" #include "rtpsession.h" @@ -214,7 +216,8 @@ enum enum { - PROP_0 + PROP_0, + PROP_NTP_NS_BASE }; #define GST_RTP_SESSION_GET_PRIVATE(obj) \ @@ -234,8 +237,10 @@ struct _GstRtpSessionPrivate GThread *thread; /* caps mapping */ - guint8 pt; - gint clock_rate; + GHashTable *ptmap; + + /* NTP base time */ + guint64 ntpnsbase; }; /* callbacks to handle actions from the session manager */ @@ -245,18 +250,18 @@ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); -static GstClockTime gst_rtp_session_get_time (RTPSession * sess, - gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, gst_rtp_session_send_rtp, gst_rtp_session_send_rtcp, + gst_rtp_session_sync_rtcp, gst_rtp_session_clock_rate, - gst_rtp_session_get_time, gst_rtp_session_reconsider }; @@ -363,6 +368,7 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; + /** * GstRtpSession::request-pt-map: * @sess: the object which received the signal @@ -490,6 +496,7 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) (GCallback) on_bye_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); + rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); } static void @@ -513,6 +520,9 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -528,26 +538,51 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, rtpsession = GST_RTP_SESSION (object); switch (prop_id) { + case PROP_NTP_NS_BASE: + g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static guint64 +get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) +{ + guint64 ntpnstime; + + if (clock) { + /* get current NTP time */ + ntpnstime = gst_clock_get_time (clock); + /* convert to running time */ + ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + /* add NTP base offset */ + ntpnstime += rtpsession->priv->ntpnsbase; + } else + ntpnstime = -1; + + return ntpnstime; +} + static void rtcp_thread (GstRtpSession * rtpsession) { - GstClock *clock; + GstClock *sysclock, *clock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; + guint64 ntpnstime; - /* RTCP timeouts we use the system clock */ - clock = gst_system_clock_obtain (); - if (clock == NULL) - goto no_clock; + /* for RTCP timeouts we use the system clock */ + sysclock = gst_system_clock_obtain (); + if (sysclock == NULL) + goto no_sysclock; + + /* to get the current NTP time, we use the pipeline clock */ + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -568,7 +603,7 @@ rtcp_thread (GstRtpSession * rtpsession) break; id = rtpsession->priv->id = - gst_clock_new_single_shot_id (clock, next_timeout); + gst_clock_new_single_shot_id (sysclock, next_timeout); GST_RTP_SESSION_UNLOCK (rtpsession); res = gst_clock_id_wait (id, NULL); @@ -581,7 +616,10 @@ rtcp_thread (GstRtpSession * rtpsession) break; /* update current time */ - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); + + /* get current NTP time */ + ntpnstime = get_current_ntp_ns_time (rtpsession, clock); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -590,18 +628,18 @@ rtcp_thread (GstRtpSession * rtpsession) /* perform actions, we ignore result. Release lock because it might push. */ GST_RTP_SESSION_UNLOCK (rtpsession); - rtp_session_on_timeout (rtpsession->priv->session, current_time); + rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime); GST_RTP_SESSION_LOCK (rtpsession); } GST_RTP_SESSION_UNLOCK (rtpsession); - gst_object_unref (clock); + gst_object_unref (sysclock); GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); return; /* ERRORS */ -no_clock: +no_sysclock: { GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL), ("Could not get system clock")); @@ -662,7 +700,6 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - priv->clock_rate = -1; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; @@ -677,17 +714,9 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - { - GstClockTime base_time; - - base_time = GST_ELEMENT_CAST (rtpsession)->base_time; - - rtp_session_set_base_time (priv->session, base_time); - if (!start_rtcp_thread (rtpsession)) goto failed_thread; break; - } case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -774,6 +803,15 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, priv = rtpsession->priv; if (rtpsession->send_rtcp_src) { + GstCaps *caps; + + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->send_rtcp_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -784,30 +822,59 @@ gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, return result; } -static gboolean -gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps) +/* called when the session manager has an SR RTCP packet ready for handling + * inter stream synchronisation */ +static GstFlowReturn +gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data) { + GstFlowReturn result; + GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; - const GstStructure *caps_struct; + rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; - GST_DEBUG_OBJECT (rtpsession, "parsing caps"); + if (rtpsession->sync_src) { + GstCaps *caps; - caps_struct = gst_caps_get_structure (caps, 0); - if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate)) - goto no_clock_rate; + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->sync_src, caps); + gst_caps_unref (caps); + } + gst_buffer_set_caps (buffer, caps); + GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP"); + result = gst_pad_push (rtpsession->sync_src, buffer); + } else { + GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad"); + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +static void +gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps) +{ + GstRtpSessionPrivate *priv; + const GstStructure *s; + gint payload; - GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate); + priv = rtpsession->priv; - return TRUE; + GST_DEBUG_OBJECT (rtpsession, "parsing caps"); - /* ERRORS */ -no_clock_rate: - { - GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); - return FALSE; - } + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "payload", &payload)) + return; + + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + return; + + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps); } /* called when the session manager needs the clock rate */ @@ -821,13 +888,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, GValue ret = { 0 }; GValue args[2] = { {0}, {0} }; GstCaps *caps; + const GstStructure *s; rtpsession = GST_RTP_SESSION_CAST (user_data); priv = rtpsession->priv; - /* if we have it, return it */ - if (priv->clock_rate != -1) - return priv->clock_rate; + GST_RTP_SESSION_LOCK (rtpsession); + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + goto done; g_value_init (&args[0], GST_TYPE_ELEMENT); g_value_set_object (&args[0], rtpsession); @@ -844,10 +913,16 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, if (!caps) goto no_caps; - if (!gst_rtp_session_parse_caps (rtpsession, caps)) - goto parse_failed; + gst_rtp_session_cache_caps (rtpsession, caps); + + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "clock-rate", &result)) + goto no_clock_rate; + + GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result); - result = priv->clock_rate; +done: + GST_RTP_SESSION_UNLOCK (rtpsession); return result; @@ -855,35 +930,15 @@ gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, no_caps: { GST_DEBUG_OBJECT (rtpsession, "could not get caps"); - return -1; + goto done; } -parse_failed: +no_clock_rate: { - GST_DEBUG_OBJECT (rtpsession, "failed to parse caps"); - return -1; + GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); + goto done; } } -/* called when the session manager needs the time of clock */ -static GstClockTime -gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) -{ - GstClockTime result; - GstRtpSession *rtpsession; - GstClock *clock; - - rtpsession = GST_RTP_SESSION_CAST (user_data); - - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - if (clock) { - result = gst_clock_get_time (clock); - gst_object_unref (clock); - } else - result = GST_CLOCK_TIME_NONE; - - return result; -} - /* called when the session manager asks us to reconsider the timeout */ static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data) @@ -925,18 +980,19 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) static gboolean gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps) { - gboolean res; GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; - res = gst_rtp_session_parse_caps (rtpsession, caps); + GST_RTP_SESSION_LOCK (rtpsession); + gst_rtp_session_cache_caps (rtpsession, caps); + GST_RTP_SESSION_UNLOCK (rtpsession); gst_object_unref (rtpsession); - return res; + return TRUE; } /* receive a packet from a sender, send it to the RTP session manager and @@ -948,13 +1004,17 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_process_rtp (priv->session, buffer); + ntpnstime = + get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + + ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1051,8 +1111,6 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) gst_segment_set_newsegment_full (segment, update, rate, arate, format, start, stop, time); - rtp_session_set_timestamp_sync (priv->session, start); - /* push event forward */ ret = gst_pad_push_event (rtpsession->send_rtp_src, event); break; @@ -1075,13 +1133,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; GstFlowReturn ret; + GstClockTime timestamp; + guint64 ntpnstime; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_send_rtp (priv->session, buffer); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment start value. */ + ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime += priv->ntpnsbase; + } else + ntpnstime = -1; + + ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime); gst_object_unref (rtpsession); @@ -1113,6 +1182,7 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1142,6 +1212,7 @@ create_recv_rtcp_sink (GstRtpSession * rtpsession) rtpsession->sync_src = gst_pad_new_from_static_template (&rtpsession_sync_src_template, "sync_src"); + gst_pad_use_fixed_caps (rtpsession->sync_src); gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); @@ -1172,6 +1243,7 @@ create_send_rtp_sink (GstRtpSession * rtpsession) rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtp_src); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -1190,6 +1262,7 @@ create_send_rtcp_src (GstRtpSession * rtpsession) rtpsession->send_rtcp_src = gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, "send_rtcp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtcp_src); gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtcp_src); |