diff options
Diffstat (limited to 'gst/rtpmanager/gstrtpsession.c')
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 325 |
1 files changed, 305 insertions, 20 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index cdad7e9b..03b0802b 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -39,7 +39,10 @@ #ifdef HAVE_CONFIG_H #include "config.h" #endif + +#include "gstrtpbin-marshal.h" #include "gstrtpsession.h" +#include "rtpsession.h" GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug); #define GST_CAT_DEFAULT gst_rtp_session_debug @@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src", GST_STATIC_CAPS ("application/x-rtp") ); -static GstStaticPadTemplate rtpsession_rtcp_src_template = -GST_STATIC_PAD_TEMPLATE ("rtcp_src", +static GstStaticPadTemplate rtpsession_send_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtcp_src", GST_PAD_SRC, GST_PAD_REQUEST, GST_STATIC_CAPS ("application/x-rtcp") @@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src", /* signals and args */ enum { - /* FILL ME */ + SIGNAL_REQUEST_PT_MAP, LAST_SIGNAL }; @@ -123,6 +126,31 @@ enum struct _GstRTPSessionPrivate { GMutex *lock; + RTPSession *session; + /* thread for sending out RTCP */ + GstClockID id; + gboolean stop_thread; + GThread *thread; +}; + +/* callbacks to handle actions from the session manager */ +static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); +static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, + gpointer user_data); +static GstClockTime gst_rtp_session_get_time (RTPSession * sess, + gpointer user_data); + +static RTPSessionCallbacks callbacks = { + gst_rtp_session_process_rtp, + gst_rtp_session_send_rtp, + gst_rtp_session_send_rtcp, + gst_rtp_session_clock_rate, + gst_rtp_session_get_time }; /* GObject vmethods */ @@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name); static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad); -/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); @@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass) gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&rtpsession_send_rtp_src_template)); gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&rtpsession_rtcp_src_template)); + gst_static_pad_template_get (&rtpsession_send_rtcp_src_template)); gst_element_class_set_details (element_class, &rtpsession_details); } @@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass) gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; + /** + * GstRTPSession::request-pt-map: + * @sess: the object which received the signal + * @pt: the pt + * + * Request the payload type as #GstCaps for @pt. + */ + gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] = + g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map), + NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, + G_TYPE_UINT); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_session_change_state); gstelement_class->request_new_pad = @@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass) { rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession); rtpsession->priv->lock = g_mutex_new (); + rtpsession->priv->session = rtp_session_new (); + /* configure callbacks */ + rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession); } static void @@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object) rtpsession = GST_RTP_SESSION (object); g_mutex_free (rtpsession->priv->lock); + g_object_unref (rtpsession->priv->session); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id, } } +static void +rtcp_thread (GstRTPSession * rtpsession) +{ + GstClock *clock; + GstClockID id; + + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + if (clock == NULL) + return; + + GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + while (!rtpsession->priv->stop_thread) { + gdouble timeout; + GstClockTime target; + + timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session); + GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout); + + target = gst_clock_get_time (clock); + target += GST_SECOND * timeout; + id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target); + GST_RTP_SESSION_UNLOCK (rtpsession); + + gst_clock_id_wait (id, NULL); + + GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout"); + + /* make the session manager produce RTCP, we ignore the result. */ + rtp_session_produce_rtcp (rtpsession->priv->session); + + GST_RTP_SESSION_LOCK (rtpsession); + gst_clock_id_unref (id); + rtpsession->priv->id = NULL; + } + GST_RTP_SESSION_UNLOCK (rtpsession); + + gst_object_unref (clock); + + GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); +} + +static gboolean +start_rtcp_thread (GstRTPSession * rtpsession) +{ + GError *error = NULL; + gboolean res; + + GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->stop_thread = FALSE; + rtpsession->priv->thread = + g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error); + GST_RTP_SESSION_UNLOCK (rtpsession); + + if (error != NULL) { + res = FALSE; + GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message); + g_error_free (error); + } else { + res = TRUE; + } + return res; +} + +static void +stop_rtcp_thread (GstRTPSession * rtpsession) +{ + GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread"); + + GST_RTP_SESSION_LOCK (rtpsession); + rtpsession->priv->stop_thread = TRUE; + if (rtpsession->priv->id) + gst_clock_id_unschedule (rtpsession->priv->id); + GST_RTP_SESSION_UNLOCK (rtpsession); + + g_thread_join (rtpsession->priv->thread); +} + static GstStateChangeReturn gst_rtp_session_change_state (GstElement * element, GstStateChange transition) { @@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + stop_rtcp_thread (rtpsession); default: break; } @@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) res = parent_class->change_state (element, transition); switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + if (!start_rtcp_thread (rtpsession)) + goto failed_thread; + break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) break; } return res; + + /* ERRORS */ +failed_thread: + { + return GST_STATE_CHANGE_FAILURE; + } +} + +/* called when the session manager has an RTP packet ready for further + * processing */ +static GstFlowReturn +gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->recv_rtp_src) { + result = gst_pad_push (rtpsession->recv_rtp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +/* called when the session manager has an RTP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->send_rtp_src) { + result = gst_pad_push (rtpsession->send_rtp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + +/* called when the session manager has an RTCP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, + GstBuffer * buffer, gpointer user_data) +{ + GstFlowReturn result; + GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; + + rtpsession = GST_RTP_SESSION (user_data); + priv = rtpsession->priv; + + if (rtpsession->send_rtcp_src) { + result = gst_pad_push (rtpsession->send_rtcp_src, buffer); + } else { + gst_buffer_unref (buffer); + result = GST_FLOW_OK; + } + return result; +} + + +/* called when the session manager needs the clock rate */ +static gint +gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, + gpointer user_data) +{ + gint result = -1; + GstRTPSession *rtpsession; + GValue ret = { 0 }; + GValue args[2] = { {0}, {0} }; + GstCaps *caps; + const GstStructure *caps_struct; + + rtpsession = GST_RTP_SESSION_CAST (user_data); + + g_value_init (&args[0], GST_TYPE_ELEMENT); + g_value_set_object (&args[0], rtpsession); + g_value_init (&args[1], G_TYPE_UINT); + g_value_set_uint (&args[1], payload); + + g_value_init (&ret, GST_TYPE_CAPS); + g_value_set_boxed (&ret, NULL); + + g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0, + &ret); + + caps = (GstCaps *) g_value_get_boxed (&ret); + if (!caps) + goto no_caps; + + caps_struct = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (caps_struct, "clock-rate", &result)) + goto no_clock_rate; + + return result; + + /* ERRORS */ +no_caps: + { + GST_DEBUG_OBJECT (rtpsession, "could not get caps"); + return -1; + } +no_clock_rate: + { + GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps"); + return -1; + } +} + +/* called when the session manager needs the time of clock */ +static GstClockTime +gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) +{ + GstClockTime result; + GstRTPSession *rtpsession; + GstClock *clock; + + rtpsession = GST_RTP_SESSION_CAST (user_data); + + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + if (clock) { + result = gst_clock_get_time (clock); + gst_object_unref (clock); + } else + result = GST_CLOCK_TIME_NONE; + + return result; } static GstFlowReturn gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event %s", GST_EVENT_TYPE_NAME (event)); @@ -305,14 +580,15 @@ static GstFlowReturn gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - /* FIXME, do something */ - ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); + ret = rtp_session_process_rtp (priv->session, buffer); gst_object_unref (rtpsession); @@ -323,9 +599,11 @@ static GstFlowReturn gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event %s", GST_EVENT_TYPE_NAME (event)); @@ -347,14 +625,15 @@ static GstFlowReturn gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; - /* FIXME, do something */ GST_DEBUG_OBJECT (rtpsession, "received RTCP packet"); - ret = gst_pad_push (rtpsession->sync_src, buffer); + ret = rtp_session_process_rtcp (priv->session, buffer); gst_object_unref (rtpsession); @@ -365,9 +644,11 @@ static GstFlowReturn gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; gboolean ret = FALSE; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received event"); @@ -388,14 +669,15 @@ static GstFlowReturn gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) { GstRTPSession *rtpsession; + GstRTPSessionPrivate *priv; GstFlowReturn ret; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + priv = rtpsession->priv; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - /* FIXME, do something */ - ret = gst_pad_push (rtpsession->send_rtp_src, buffer); + ret = rtp_session_send_rtp (priv->session, buffer); gst_object_unref (rtpsession); @@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession) * RTCP packets. */ static GstPad * -create_rtcp_src (GstRTPSession * rtpsession) +create_send_rtcp_src (GstRTPSession * rtpsession) { GST_DEBUG_OBJECT (rtpsession, "creating pad"); - rtpsession->rtcp_src = - gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); - gst_pad_set_active (rtpsession->rtcp_src, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); + rtpsession->send_rtcp_src = + gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, + NULL); + gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), + rtpsession->send_rtcp_src); - return rtpsession->rtcp_src; + return rtpsession->send_rtcp_src; } static GstPad * @@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element, goto exists; result = create_send_rtp_sink (rtpsession); - } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) { - if (rtpsession->rtcp_src != NULL) + } else if (templ == gst_element_class_get_pad_template (klass, + "send_rtcp_src")) { + if (rtpsession->send_rtcp_src != NULL) goto exists; - result = create_rtcp_src (rtpsession); + result = create_send_rtcp_src (rtpsession); } else goto wrong_template; |