summaryrefslogtreecommitdiffstats
path: root/gst/rtpmanager/gstrtpsession.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtpmanager/gstrtpsession.c')
-rw-r--r--gst/rtpmanager/gstrtpsession.c325
1 files changed, 305 insertions, 20 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index cdad7e9b..03b0802b 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -39,7 +39,10 @@
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
+
+#include "gstrtpbin-marshal.h"
#include "gstrtpsession.h"
+#include "rtpsession.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
#define GST_CAT_DEFAULT gst_rtp_session_debug
@@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src",
GST_STATIC_CAPS ("application/x-rtp")
);
-static GstStaticPadTemplate rtpsession_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src",
+static GstStaticPadTemplate rtpsession_send_rtcp_src_template =
+GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",
GST_PAD_SRC,
GST_PAD_REQUEST,
GST_STATIC_CAPS ("application/x-rtcp")
@@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",
/* signals and args */
enum
{
- /* FILL ME */
+ SIGNAL_REQUEST_PT_MAP,
LAST_SIGNAL
};
@@ -123,6 +126,31 @@ enum
struct _GstRTPSessionPrivate
{
GMutex *lock;
+ RTPSession *session;
+ /* thread for sending out RTCP */
+ GstClockID id;
+ gboolean stop_thread;
+ GThread *thread;
+};
+
+/* callbacks to handle actions from the session manager */
+static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess,
+ RTPSource * src, GstBuffer * buffer, gpointer user_data);
+static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+ gpointer user_data);
+static GstClockTime gst_rtp_session_get_time (RTPSession * sess,
+ gpointer user_data);
+
+static RTPSessionCallbacks callbacks = {
+ gst_rtp_session_process_rtp,
+ gst_rtp_session_send_rtp,
+ gst_rtp_session_send_rtcp,
+ gst_rtp_session_clock_rate,
+ gst_rtp_session_get_time
};
/* GObject vmethods */
@@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name);
static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad);
-/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */
+static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };
GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT);
@@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass)
gst_element_class_add_pad_template (element_class,
gst_static_pad_template_get (&rtpsession_send_rtp_src_template));
gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&rtpsession_rtcp_src_template));
+ gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));
gst_element_class_set_details (element_class, &rtpsession_details);
}
@@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
gobject_class->set_property = gst_rtp_session_set_property;
gobject_class->get_property = gst_rtp_session_get_property;
+ /**
+ * GstRTPSession::request-pt-map:
+ * @sess: the object which received the signal
+ * @pt: the pt
+ *
+ * Request the payload type as #GstCaps for @pt.
+ */
+ gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] =
+ g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map),
+ NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1,
+ G_TYPE_UINT);
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);
gstelement_class->request_new_pad =
@@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)
{
rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);
rtpsession->priv->lock = g_mutex_new ();
+ rtpsession->priv->session = rtp_session_new ();
+ /* configure callbacks */
+ rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);
}
static void
@@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object)
rtpsession = GST_RTP_SESSION (object);
g_mutex_free (rtpsession->priv->lock);
+ g_object_unref (rtpsession->priv->session);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,
}
}
+static void
+rtcp_thread (GstRTPSession * rtpsession)
+{
+ GstClock *clock;
+ GstClockID id;
+
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+ if (clock == NULL)
+ return;
+
+ GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ while (!rtpsession->priv->stop_thread) {
+ gdouble timeout;
+ GstClockTime target;
+
+ timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session);
+ GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout);
+
+ target = gst_clock_get_time (clock);
+ target += GST_SECOND * timeout;
+ id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ gst_clock_id_wait (id, NULL);
+
+ GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout");
+
+ /* make the session manager produce RTCP, we ignore the result. */
+ rtp_session_produce_rtcp (rtpsession->priv->session);
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ gst_clock_id_unref (id);
+ rtpsession->priv->id = NULL;
+ }
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ gst_object_unref (clock);
+
+ GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread");
+}
+
+static gboolean
+start_rtcp_thread (GstRTPSession * rtpsession)
+{
+ GError *error = NULL;
+ gboolean res;
+
+ GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->stop_thread = FALSE;
+ rtpsession->priv->thread =
+ g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ if (error != NULL) {
+ res = FALSE;
+ GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message);
+ g_error_free (error);
+ } else {
+ res = TRUE;
+ }
+ return res;
+}
+
+static void
+stop_rtcp_thread (GstRTPSession * rtpsession)
+{
+ GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread");
+
+ GST_RTP_SESSION_LOCK (rtpsession);
+ rtpsession->priv->stop_thread = TRUE;
+ if (rtpsession->priv->id)
+ gst_clock_id_unschedule (rtpsession->priv->id);
+ GST_RTP_SESSION_UNLOCK (rtpsession);
+
+ g_thread_join (rtpsession->priv->thread);
+}
+
static GstStateChangeReturn
gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
{
@@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ stop_rtcp_thread (rtpsession);
default:
break;
}
@@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
res = parent_class->change_state (element, transition);
switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ if (!start_rtcp_thread (rtpsession))
+ goto failed_thread;
+ break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
@@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
break;
}
return res;
+
+ /* ERRORS */
+failed_thread:
+ {
+ return GST_STATE_CHANGE_FAILURE;
+ }
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * processing */
+static GstFlowReturn
+gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->recv_rtp_src) {
+ result = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+/* called when the session manager has an RTP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->send_rtp_src) {
+ result = gst_pad_push (rtpsession->send_rtp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+/* called when the session manager has an RTCP packet ready for further
+ * sending */
+static GstFlowReturn
+gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src,
+ GstBuffer * buffer, gpointer user_data)
+{
+ GstFlowReturn result;
+ GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
+
+ rtpsession = GST_RTP_SESSION (user_data);
+ priv = rtpsession->priv;
+
+ if (rtpsession->send_rtcp_src) {
+ result = gst_pad_push (rtpsession->send_rtcp_src, buffer);
+ } else {
+ gst_buffer_unref (buffer);
+ result = GST_FLOW_OK;
+ }
+ return result;
+}
+
+
+/* called when the session manager needs the clock rate */
+static gint
+gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload,
+ gpointer user_data)
+{
+ gint result = -1;
+ GstRTPSession *rtpsession;
+ GValue ret = { 0 };
+ GValue args[2] = { {0}, {0} };
+ GstCaps *caps;
+ const GstStructure *caps_struct;
+
+ rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+ g_value_init (&args[0], GST_TYPE_ELEMENT);
+ g_value_set_object (&args[0], rtpsession);
+ g_value_init (&args[1], G_TYPE_UINT);
+ g_value_set_uint (&args[1], payload);
+
+ g_value_init (&ret, GST_TYPE_CAPS);
+ g_value_set_boxed (&ret, NULL);
+
+ g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0,
+ &ret);
+
+ caps = (GstCaps *) g_value_get_boxed (&ret);
+ if (!caps)
+ goto no_caps;
+
+ caps_struct = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_get_int (caps_struct, "clock-rate", &result))
+ goto no_clock_rate;
+
+ return result;
+
+ /* ERRORS */
+no_caps:
+ {
+ GST_DEBUG_OBJECT (rtpsession, "could not get caps");
+ return -1;
+ }
+no_clock_rate:
+ {
+ GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps");
+ return -1;
+ }
+}
+
+/* called when the session manager needs the time of clock */
+static GstClockTime
+gst_rtp_session_get_time (RTPSession * sess, gpointer user_data)
+{
+ GstClockTime result;
+ GstRTPSession *rtpsession;
+ GstClock *clock;
+
+ rtpsession = GST_RTP_SESSION_CAST (user_data);
+
+ clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession));
+ if (clock) {
+ result = gst_clock_get_time (clock);
+ gst_object_unref (clock);
+ } else
+ result = GST_CLOCK_TIME_NONE;
+
+ return result;
}
static GstFlowReturn
gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@@ -305,14 +580,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- /* FIXME, do something */
- ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
+ ret = rtp_session_process_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -323,9 +599,11 @@ static GstFlowReturn
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event %s",
GST_EVENT_TYPE_NAME (event));
@@ -347,14 +625,15 @@ static GstFlowReturn
gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
- /* FIXME, do something */
GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
- ret = gst_pad_push (rtpsession->sync_src, buffer);
+ ret = rtp_session_process_rtcp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -365,9 +644,11 @@ static GstFlowReturn
gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
gboolean ret = FALSE;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received event");
@@ -388,14 +669,15 @@ static GstFlowReturn
gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
{
GstRTPSession *rtpsession;
+ GstRTPSessionPrivate *priv;
GstFlowReturn ret;
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ priv = rtpsession->priv;
GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
- /* FIXME, do something */
- ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
+ ret = rtp_session_send_rtp (priv->session, buffer);
gst_object_unref (rtpsession);
@@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
* RTCP packets.
*/
static GstPad *
-create_rtcp_src (GstRTPSession * rtpsession)
+create_send_rtcp_src (GstRTPSession * rtpsession)
{
GST_DEBUG_OBJECT (rtpsession, "creating pad");
- rtpsession->rtcp_src =
- gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
- gst_pad_set_active (rtpsession->rtcp_src, TRUE);
- gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
+ rtpsession->send_rtcp_src =
+ gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template,
+ NULL);
+ gst_pad_set_active (rtpsession->send_rtcp_src, TRUE);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
+ rtpsession->send_rtcp_src);
- return rtpsession->rtcp_src;
+ return rtpsession->send_rtcp_src;
}
static GstPad *
@@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element,
goto exists;
result = create_send_rtp_sink (rtpsession);
- } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) {
- if (rtpsession->rtcp_src != NULL)
+ } else if (templ == gst_element_class_get_pad_template (klass,
+ "send_rtcp_src")) {
+ if (rtpsession->send_rtcp_src != NULL)
goto exists;
- result = create_rtcp_src (rtpsession);
+ result = create_send_rtcp_src (rtpsession);
} else
goto wrong_template;