diff options
Diffstat (limited to 'gst/rtpmanager/gstrtpsession.c')
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 138 |
1 files changed, 123 insertions, 15 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 87948a4b..70833655 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -214,6 +214,8 @@ enum LAST_SIGNAL }; +#define DEFAULT_NTP_NS_BASE 0 + enum { PROP_0, @@ -462,6 +464,11 @@ gst_rtp_session_class_init (GstRtpSessionClass * klass) G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSessionClass, on_timeout), NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); + g_object_class_install_property (gobject_class, PROP_NTP_NS_BASE, + g_param_spec_uint64 ("ntp-ns-base", "NTP base time", + "The NTP base time corresponding to running_time 0", 0, + G_MAXUINT64, DEFAULT_NTP_NS_BASE, G_PARAM_READWRITE)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -497,6 +504,9 @@ gst_rtp_session_init (GstRtpSession * rtpsession, GstRtpSessionClass * klass) g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); + + gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); + gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED); } static void @@ -521,7 +531,11 @@ gst_rtp_session_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_NTP_NS_BASE: + GST_OBJECT_LOCK (rtpsession); rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); + GST_DEBUG_OBJECT (rtpsession, "setting NTP base to %" GST_TIME_FORMAT, + GST_TIME_ARGS (rtpsession->priv->ntpnsbase)); + GST_OBJECT_UNLOCK (rtpsession); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -539,7 +553,9 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_NTP_NS_BASE: + GST_OBJECT_LOCK (rtpsession); g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); + GST_OBJECT_UNLOCK (rtpsession); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -548,19 +564,31 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, } static guint64 -get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) +get_current_ntp_ns_time (GstRtpSession * rtpsession) { guint64 ntpnstime; + GstClock *clock; + GstClockTime base_time, ntpnsbase; + + GST_OBJECT_LOCK (rtpsession); + if ((clock = GST_ELEMENT_CLOCK (rtpsession))) { + base_time = GST_ELEMENT_CAST (rtpsession)->base_time; + ntpnsbase = rtpsession->priv->ntpnsbase; + gst_object_ref (clock); + GST_OBJECT_UNLOCK (rtpsession); - if (clock) { /* get current NTP time */ ntpnstime = gst_clock_get_time (clock); /* convert to running time */ - ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + ntpnstime -= base_time; /* add NTP base offset */ - ntpnstime += rtpsession->priv->ntpnsbase; - } else + ntpnstime += ntpnsbase; + + gst_object_unref (clock); + } else { + GST_OBJECT_UNLOCK (rtpsession); ntpnstime = -1; + } return ntpnstime; } @@ -568,7 +596,7 @@ get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) static void rtcp_thread (GstRtpSession * rtpsession) { - GstClock *sysclock, *clock; + GstClock *sysclock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; @@ -579,9 +607,6 @@ rtcp_thread (GstRtpSession * rtpsession) if (sysclock == NULL) goto no_sysclock; - /* to get the current NTP time, we use the pipeline clock */ - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -619,7 +644,7 @@ rtcp_thread (GstRtpSession * rtpsession) current_time = gst_clock_get_time (sysclock); /* get current NTP time */ - ntpnstime = get_current_ntp_ns_time (rtpsession, clock); + ntpnstime = get_current_ntp_ns_time (rtpsession); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -969,6 +994,41 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) GST_EVENT_TYPE_NAME (event)); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + gst_segment_init (&rtpsession->recv_rtp_seg, GST_FORMAT_UNDEFINED); + break; + case GST_EVENT_NEWSEGMENT: + { + gboolean update; + gdouble rate, arate; + GstFormat format; + gint64 start, stop, time; + GstSegment *segment; + + segment = &rtpsession->recv_rtp_seg; + + /* the newsegment event is needed to convert the RTP timestamp to + * running_time, which is needed to generate a mapping from RTP to NTP + * timestamps in SR reports */ + gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, + &start, &stop, &time); + + GST_DEBUG_OBJECT (rtpsession, + "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, " + "format GST_FORMAT_TIME, " + "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT + ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT, + update, rate, arate, GST_TIME_ARGS (segment->start), + GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time), + GST_TIME_ARGS (segment->accum)); + + gst_segment_set_newsegment_full (segment, update, rate, + arate, format, start, stop, time); + + /* push event forward */ + ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); + break; + } default: ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); break; @@ -976,6 +1036,31 @@ gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) gst_object_unref (rtpsession); return ret; + +} +static GList * +gst_rtp_session_internal_links (GstPad * pad) +{ + GstRtpSession *rtpsession; + GstRtpSessionPrivate *priv; + GList *res = NULL; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; + + if (pad == rtpsession->recv_rtp_src) { + res = g_list_prepend (res, rtpsession->recv_rtp_sink); + } else if (pad == rtpsession->recv_rtp_sink) { + res = g_list_prepend (res, rtpsession->recv_rtp_src); + } else if (pad == rtpsession->send_rtp_src) { + res = g_list_prepend (res, rtpsession->send_rtp_sink); + } else if (pad == rtpsession->send_rtp_sink) { + res = g_list_prepend (res, rtpsession->send_rtp_src); + } + + gst_object_unref (rtpsession); + + return res; } static gboolean @@ -1006,14 +1091,25 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) GstRtpSessionPrivate *priv; GstFlowReturn ret; guint64 ntpnstime; + GstClockTime timestamp; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ntpnstime = - get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment values */ + ntpnstime = + gst_segment_to_running_time (&rtpsession->recv_rtp_seg, GST_FORMAT_TIME, + timestamp); + /* add constant to convert running time to NTP time */ + ntpnstime += priv->ntpnsbase; + } else { + ntpnstime = get_current_ntp_ns_time (rtpsession); + } ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); @@ -1084,6 +1180,9 @@ gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) GST_DEBUG_OBJECT (rtpsession, "received event"); switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_STOP: + gst_segment_init (&rtpsession->send_rtp_seg, GST_FORMAT_UNDEFINED); + break; case GST_EVENT_NEWSEGMENT: { gboolean update; @@ -1146,7 +1245,10 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) timestamp = GST_BUFFER_TIMESTAMP (buffer); if (GST_CLOCK_TIME_IS_VALID (timestamp)) { /* convert to running time using the segment start value. */ - ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime = + gst_segment_to_running_time (&rtpsession->send_rtp_seg, GST_FORMAT_TIME, + timestamp); + /* convert to NTP time by adding the NTP base */ ntpnstime += priv->ntpnsbase; } else ntpnstime = -1; @@ -1175,6 +1277,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) gst_rtp_session_event_recv_rtp_sink); gst_pad_set_setcaps_function (rtpsession->recv_rtp_sink, gst_rtp_session_sink_setcaps); + gst_pad_set_internal_link_function (rtpsession->recv_rtp_sink, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_sink); @@ -1183,6 +1287,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession) rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_set_internal_link_function (rtpsession->recv_rtp_src, + gst_rtp_session_internal_links); gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1235,8 +1341,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) gst_rtp_session_chain_send_rtp); gst_pad_set_event_function (rtpsession->send_rtp_sink, gst_rtp_session_event_send_rtp_sink); - gst_pad_set_setcaps_function (rtpsession->send_rtp_sink, - gst_rtp_session_sink_setcaps); + gst_pad_set_internal_link_function (rtpsession->send_rtp_sink, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_sink); @@ -1245,6 +1351,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); gst_pad_use_fixed_caps (rtpsession->send_rtp_src); + gst_pad_set_internal_link_function (rtpsession->send_rtp_src, + gst_rtp_session_internal_links); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); |