summaryrefslogtreecommitdiffstats
path: root/gst
diff options
context:
space:
mode:
Diffstat (limited to 'gst')
-rw-r--r--gst/rtpmanager/gstrtpbin.c285
-rw-r--r--gst/rtpmanager/gstrtpbin.h4
-rw-r--r--gst/rtpmanager/gstrtpmanager.c4
-rw-r--r--gst/rtpmanager/gstrtpptdemux.c50
-rw-r--r--gst/rtpmanager/gstrtpsession.c102
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.c50
-rw-r--r--gst/rtpmanager/gstrtpssrcdemux.h7
7 files changed, 446 insertions, 56 deletions
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c
index c1ec7130..d63321f6 100644
--- a/gst/rtpmanager/gstrtpbin.c
+++ b/gst/rtpmanager/gstrtpbin.c
@@ -43,6 +43,10 @@
#include "gstrtpbin.h"
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
+#define GST_CAT_DEFAULT gst_rtp_bin_debug
+
+
/* elementfactory information */
static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
"Filter/Editor/Video",
@@ -98,6 +102,7 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
struct _GstRTPBinPrivate
{
+ guint foo;
};
/* signals and args */
@@ -113,30 +118,72 @@ enum
};
/* helper objects */
-typedef struct
+typedef struct _GstRTPBinSession GstRTPBinSession;
+typedef struct _GstRTPBinStream GstRTPBinStream;
+typedef struct _GstRTPBinClient GstRTPBinClient;
+
+/* Manages the RTP stream for one SSRC.
+ *
+ * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
+ * If we see an SDES RTCP packet that links multiple SSRCs together based on a
+ * common CNAME, we create a GstRTPBinClient structure to group the SSRCs
+ * together (see below).
+ */
+struct _GstRTPBinStream
+{
+ /* the SSRC of this stream */
+ guint32 ssrc;
+ /* parent bin */
+ GstRTPBin *bin;
+ /* the session this SSRC belongs to */
+ GstRTPBinSession *session;
+ /* the jitterbuffer of the SSRC */
+ GstElement *buffer;
+ /* the PT demuxer of the SSRC */
+ GstElement *demux;
+ gulong demux_newpad_sig;
+};
+
+/* Manages the receiving end of the packets.
+ *
+ * There is one such structure for each RTP session (audio/video/...).
+ * We get the RTP/RTCP packets and stuff them into the session manager. From
+ * there they are pushed into an SSRC demuxer that splits the stream based on
+ * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
+ * the GstRTPBinStream above).
+ */
+struct _GstRTPBinSession
{
/* session id */
gint id;
+ /* the parent bin */
+ GstRTPBin *bin;
/* the session element */
GstElement *session;
/* the SSRC demuxer */
- GstElement *ssrcdemux;
+ GstElement *demux;
+ gulong demux_newpad_sig;
+
+ /* list of GstRTPBinStream */
+ GSList *streams;
/* the pads of the session */
GstPad *recv_rtp_sink;
+ GstPad *recv_rtp_src;
GstPad *recv_rtcp_sink;
+ GstPad *recv_rtcp_src;
GstPad *send_rtp_sink;
+ GstPad *send_rtp_src;
GstPad *rtcp_src;
-
-} GstRTPBinSession;
+};
/* find a session with the given id */
static GstRTPBinSession *
find_session_by_id (GstRTPBin * rtpbin, gint id)
{
- GList *walk;
+ GSList *walk;
- for (walk = rtpbin->sessions; walk; walk = g_list_next (walk)) {
+ for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
GstRTPBinSession *sess = (GstRTPBinSession *) walk->data;
if (sess->id == id)
@@ -150,14 +197,25 @@ static GstRTPBinSession *
create_session (GstRTPBin * rtpbin, gint id)
{
GstRTPBinSession *sess;
- GstElement *elem;
+ GstElement *elem, *demux;
if (!(elem = gst_element_factory_make ("rtpsession", NULL)))
goto no_session;
+ if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL)))
+ goto no_demux;
+
sess = g_new0 (GstRTPBinSession, 1);
sess->id = id;
+ sess->bin = rtpbin;
sess->session = elem;
+ sess->demux = demux;
+ rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);
+
+ gst_bin_add (GST_BIN_CAST (rtpbin), elem);
+ gst_element_set_state (elem, GST_STATE_PLAYING);
+ gst_bin_add (GST_BIN_CAST (rtpbin), demux);
+ gst_element_set_state (demux, GST_STATE_PLAYING);
return sess;
@@ -167,8 +225,85 @@ no_session:
g_warning ("rtpbin: could not create rtpsession element");
return NULL;
}
+no_demux:
+ {
+ gst_object_unref (elem);
+ g_warning ("rtpbin: could not create rtpssrcdemux element");
+ return NULL;
+ }
+}
+
+#if 0
+static GstRTPBinStream *
+find_stream_by_ssrc (GstRTPBinSession * session, guint32 ssrc)
+{
+ GSList *walk;
+
+ for (walk = session->streams; walk; walk = g_slist_next (walk)) {
+ GstRTPBinStream *stream = (GstRTPBinStream *) walk->data;
+
+ if (stream->ssrc == ssrc)
+ return stream;
+ }
+ return NULL;
+}
+#endif
+
+static GstRTPBinStream *
+create_stream (GstRTPBinSession * session, guint32 ssrc)
+{
+ GstElement *buffer, *demux;
+ GstRTPBinStream *stream;
+
+ if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL)))
+ goto no_jitterbuffer;
+
+ if (!(demux = gst_element_factory_make ("rtpptdemux", NULL)))
+ goto no_demux;
+
+ stream = g_new0 (GstRTPBinStream, 1);
+ stream->ssrc = ssrc;
+ stream->bin = session->bin;
+ stream->session = session;
+ stream->buffer = buffer;
+ stream->demux = demux;
+ session->streams = g_slist_prepend (session->streams, stream);
+
+ gst_bin_add (GST_BIN_CAST (session->bin), buffer);
+ gst_element_set_state (buffer, GST_STATE_PLAYING);
+ gst_bin_add (GST_BIN_CAST (session->bin), demux);
+ gst_element_set_state (demux, GST_STATE_PLAYING);
+
+ /* link stuff */
+ gst_element_link (buffer, demux);
+
+ return stream;
+
+ /* ERRORS */
+no_jitterbuffer:
+ {
+ g_warning ("rtpbin: could not create rtpjitterbuffer element");
+ return NULL;
+ }
+no_demux:
+ {
+ gst_object_unref (buffer);
+ g_warning ("rtpbin: could not create rtpptdemux element");
+ return NULL;
+ }
}
+/* Manages the RTP streams that come from one client and should therefore be
+ * synchronized.
+ */
+struct _GstRTPBinClient
+{
+ /* the common CNAME for the streams */
+ gchar *cname;
+ /* the streams */
+ GSList *streams;
+};
+
/* GObject vmethods */
static void gst_rtp_bin_finalize (GObject * object);
static void gst_rtp_bin_set_property (GObject * object, guint prop_id,
@@ -230,6 +365,8 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass)
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad);
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin");
}
static void
@@ -312,22 +449,88 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition)
return res;
}
+/* a new pad (SSRC) was created in @session */
+static void
+new_payload_found (GstElement * element, guint pt, GstPad * pad,
+ GstRTPBinStream * stream)
+{
+ GstRTPBin *rtpbin;
+ GstElementClass *klass;
+ GstPadTemplate *templ;
+ gchar *padname;
+ GstPad *gpad;
+
+ rtpbin = stream->bin;
+
+ GST_DEBUG ("new payload pad %d", pt);
+
+ /* ghost the pad to the parent */
+ klass = GST_ELEMENT_GET_CLASS (rtpbin);
+ templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d");
+ padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d",
+ stream->session->id, stream->ssrc, pt);
+ gpad = gst_ghost_pad_new_from_template (padname, pad, templ);
+ g_free (padname);
+
+ gst_pad_set_active (gpad, TRUE);
+ gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad);
+}
+
+/* a new pad (SSRC) was created in @session */
+static void
+new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
+ GstRTPBinSession * session)
+{
+ GstRTPBinStream *stream;
+ GstPad *sinkpad;
+
+ GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc);
+
+ /* create new stream */
+ stream = create_stream (session, ssrc);
+ if (!stream)
+ goto no_stream;
+
+ /* get pad and link */
+ GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer");
+ sinkpad = gst_element_get_static_pad (stream->buffer, "sink");
+ gst_pad_link (pad, sinkpad);
+ gst_object_unref (sinkpad);
+
+ /* connect to the new-pad signal of the payload demuxer */
+ stream->demux_newpad_sig = g_signal_connect (stream->demux,
+ "new-payload-type", (GCallback) new_payload_found, stream);
+
+ return;
+
+ /* ERRORS */
+no_stream:
+ {
+ GST_DEBUG ("could not create stream");
+ return;
+ }
+}
+
/* Create a pad for receiving RTP for the session in @name
*/
static GstPad *
create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
{
- GstPad *result;
+ GstPad *result, *sinkdpad;
guint sessid;
GstRTPBinSession *session;
+ GstPadLinkReturn lres;
/* first get the session number */
if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1)
goto no_name;
+ GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
+
/* get or create session */
session = find_session_by_id (rtpbin, sessid);
if (!session) {
+ GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid);
/* create session now */
session = create_session (rtpbin, sessid);
if (session == NULL)
@@ -337,18 +540,37 @@ create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
if (session->recv_rtp_sink != NULL)
goto existed;
+ GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad");
/* get recv_rtp pad and store */
session->recv_rtp_sink =
gst_element_get_request_pad (session->session, "recv_rtp_sink");
if (session->recv_rtp_sink == NULL)
goto pad_failed;
+ GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad");
+ /* get srcpad, link to SSRCDemux */
+ session->recv_rtp_src =
+ gst_element_get_static_pad (session->session, "recv_rtp_src");
+ if (session->recv_rtp_src == NULL)
+ goto pad_failed;
+
+ GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad");
+ sinkdpad = gst_element_get_static_pad (session->demux, "sink");
+ lres = gst_pad_link (session->recv_rtp_src, sinkdpad);
+ gst_object_unref (sinkdpad);
+ if (lres != GST_PAD_LINK_OK)
+ goto link_failed;
+
+ /* connect to the new-ssrc-pad signal of the demuxer */
+ session->demux_newpad_sig = g_signal_connect (session->demux,
+ "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session);
+
+ GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad");
result =
gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ);
+ gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
- /* FIXME, get srcpad, link to SSRCDemux */
-
return result;
/* ERRORS */
@@ -372,6 +594,11 @@ pad_failed:
g_warning ("rtpbin: failed to get session pad");
return NULL;
}
+link_failed:
+ {
+ g_warning ("rtpbin: failed to link pads");
+ return NULL;
+ }
}
/* Create a pad for receiving RTCP for the session in @name
@@ -384,10 +611,17 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ,
guint sessid;
GstRTPBinSession *session;
+#if 0
+ GstPad *sinkdpad;
+ GstPadLinkReturn lres;
+#endif
+
/* first get the session number */
if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1)
goto no_name;
+ GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid);
+
/* get the session, it must exist or we error */
session = find_session_by_id (rtpbin, sessid);
if (!session)
@@ -397,18 +631,35 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ,
if (session->recv_rtcp_sink != NULL)
goto existed;
+ GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad");
+
/* get recv_rtp pad and store */
session->recv_rtcp_sink =
gst_element_get_request_pad (session->session, "recv_rtcp_sink");
if (session->recv_rtcp_sink == NULL)
goto pad_failed;
+#if 0
+ /* get srcpad, link to SSRCDemux */
+ GST_DEBUG_OBJECT (rtpbin, "getting sync src pad");
+ session->recv_rtcp_src =
+ gst_element_get_static_pad (session->session, "sync_src");
+ if (session->recv_rtcp_src == NULL)
+ goto pad_failed;
+
+ GST_DEBUG_OBJECT (rtpbin, "linking sync to demux");
+ sinkdpad = gst_element_get_static_pad (session->demux, "sink");
+ lres = gst_pad_link (session->recv_rtcp_src, sinkdpad);
+ gst_object_unref (sinkdpad);
+ if (lres != GST_PAD_LINK_OK)
+ goto link_failed;
+#endif
+
result =
gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ);
+ gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
- /* FIXME, get srcpad, link to SSRCDemux */
-
return result;
/* ERRORS */
@@ -433,6 +684,13 @@ pad_failed:
g_warning ("rtpbin: failed to get session pad");
return NULL;
}
+#if 0
+link_failed:
+ {
+ g_warning ("rtpbin: failed to link pads");
+ return NULL;
+ }
+#endif
}
/* Create a pad for sending RTP for the session in @name
@@ -471,6 +729,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
result =
gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ);
+ gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
/* get srcpad */
@@ -484,6 +743,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d");
srcghost =
gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ);
+ gst_pad_set_active (srcghost, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost);
g_free (gname);
@@ -546,6 +806,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name)
goto pad_failed;
result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ);
+ gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result);
return result;
diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h
index 517c1178..ccd57a78 100644
--- a/gst/rtpmanager/gstrtpbin.h
+++ b/gst/rtpmanager/gstrtpbin.h
@@ -38,10 +38,10 @@ typedef struct _GstRTPBinClass GstRTPBinClass;
typedef struct _GstRTPBinPrivate GstRTPBinPrivate;
struct _GstRTPBin {
- GstBin element;
+ GstBin bin;
/* a list of session */
- GList *sessions;
+ GSList *sessions;
/*< private >*/
GstRTPBinPrivate *priv;
diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c
index d71d850c..1490c4cb 100644
--- a/gst/rtpmanager/gstrtpmanager.c
+++ b/gst/rtpmanager/gstrtpmanager.c
@@ -21,6 +21,7 @@
#include "config.h"
#endif
+#include "gstrtpbin.h"
#include "gstrtpclient.h"
#include "gstrtpjitterbuffer.h"
#include "gstrtpptdemux.h"
@@ -30,6 +31,9 @@
static gboolean
plugin_init (GstPlugin * plugin)
{
+ if (!gst_element_register (plugin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN))
+ return FALSE;
+
if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE,
GST_TYPE_RTP_CLIENT))
return FALSE;
diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c
index 5950f619..d7ff34d9 100644
--- a/gst/rtpmanager/gstrtpptdemux.c
+++ b/gst/rtpmanager/gstrtpptdemux.c
@@ -56,15 +56,15 @@ static GstStaticPadTemplate rtp_pt_demux_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
- GST_STATIC_CAPS ("application/x-rtp, "
- "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]")
+ GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate rtp_pt_demux_src_template =
-GST_STATIC_PAD_TEMPLATE ("src%d",
+GST_STATIC_PAD_TEMPLATE ("src_%d",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
- GST_STATIC_CAPS_ANY);
+ GST_STATIC_CAPS ("application/x-rtp, " "payload = (int) [ 0, 255 ]")
+ );
GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug);
#define GST_CAT_DEFAULT gst_rtp_pt_demux_debug
@@ -191,10 +191,13 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad));
- g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR);
+ if (!gst_rtp_buffer_validate (buf))
+ goto invalid_buffer;
pt = gst_rtp_buffer_get_payload_type (buf);
+ GST_DEBUG_OBJECT (rtpdemux, "received buffer for pt %d", pt);
+
srcpad = find_pad_for_pt (rtpdemux, pt);
if (srcpad == NULL) {
/* new PT, create a src pad */
@@ -205,15 +208,14 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
GstRTPPtDemuxPad *rtpdemuxpad;
klass = GST_ELEMENT_GET_CLASS (rtpdemux);
- templ = gst_element_class_get_pad_template (klass, "src%d");
- padname = g_strdup_printf ("src%d", pt);
+ templ = gst_element_class_get_pad_template (klass, "src_%d");
+ padname = g_strdup_printf ("src_%d", pt);
srcpad = gst_pad_new_from_template (templ, padname);
g_free (padname);
caps = gst_pad_get_caps (srcpad);
caps = gst_caps_make_writable (caps);
- gst_caps_append_structure (caps,
- gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL));
+ gst_caps_set_simple (caps, "payload", G_TYPE_INT, pt, NULL);
gst_pad_set_caps (srcpad, caps);
/* XXX: set _link () function */
@@ -221,17 +223,15 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
gst_pad_set_active (srcpad, TRUE);
gst_element_add_pad (element, srcpad);
- if (srcpad) {
- GST_DEBUG ("Adding pt=%d to the list.", pt);
- rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1);
- rtpdemuxpad->pt = pt;
- rtpdemuxpad->pad = srcpad;
- rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad);
+ GST_DEBUG ("Adding pt=%d to the list.", pt);
+ rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1);
+ rtpdemuxpad->pt = pt;
+ rtpdemuxpad->pad = srcpad;
+ rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad);
- GST_DEBUG ("emitting new-payload_type for pt %d", pt);
- g_signal_emit (G_OBJECT (rtpdemux),
- gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad);
- }
+ GST_DEBUG ("emitting new-payload_type for pt %d", pt);
+ g_signal_emit (G_OBJECT (rtpdemux),
+ gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad);
}
if (pt != rtpdemux->last_pt) {
@@ -246,9 +246,19 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf)
/* push to srcpad */
if (srcpad)
- gst_pad_push (srcpad, GST_BUFFER (buf));
+ ret = gst_pad_push (srcpad, GST_BUFFER (buf));
return ret;
+
+ /* ERRORS */
+invalid_buffer:
+ {
+ /* this is fatal and should be filtered earlier */
+ GST_ELEMENT_ERROR (rtpdemux, STREAM, DECODE, (NULL),
+ ("Dropping invalid RTP payload"));
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
}
static GstCaps *
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c
index 47df756f..5d7508a6 100644
--- a/gst/rtpmanager/gstrtpsession.c
+++ b/gst/rtpmanager/gstrtpsession.c
@@ -41,6 +41,9 @@
#endif
#include "gstrtpsession.h"
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);
+#define GST_CAT_DEFAULT gst_rtp_session_debug
+
/* elementfactory information */
static const GstElementDetails rtpsession_details =
GST_ELEMENT_DETAILS ("RTP Session",
@@ -174,6 +177,9 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)
GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad);
gstelement_class->release_pad =
GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad);
+
+ GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug,
+ "rtpsession", 0, "RTP Session");
}
static void
@@ -255,6 +261,26 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)
return res;
}
+static GstFlowReturn
+gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)
+{
+ GstRTPSession *rtpsession;
+ gboolean ret = FALSE;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ GST_DEBUG_OBJECT (rtpsession, "received event");
+
+ switch (GST_EVENT_TYPE (event)) {
+ default:
+ ret = gst_pad_push_event (rtpsession->recv_rtp_src, event);
+ break;
+ }
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
/* receive a packet from a sender, send it to the RTP session manager and
* forward the packet on the rtp_src pad
*/
@@ -266,6 +292,8 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
+
/* FIXME, do something */
ret = gst_pad_push (rtpsession->recv_rtp_src, buffer);
@@ -274,6 +302,26 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)
return ret;
}
+static GstFlowReturn
+gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)
+{
+ GstRTPSession *rtpsession;
+ gboolean ret = FALSE;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ GST_DEBUG_OBJECT (rtpsession, "received event");
+
+ switch (GST_EVENT_TYPE (event)) {
+ default:
+ ret = gst_pad_push_event (rtpsession->sync_src, event);
+ break;
+ }
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
/* Receive an RTCP packet from a sender, send it to the RTP session manager and
* forward the SR packets to the sync_src pad.
*/
@@ -286,6 +334,8 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
/* FIXME, do something */
+ GST_DEBUG_OBJECT (rtpsession, "received RTCP packet");
+
ret = gst_pad_push (rtpsession->sync_src, buffer);
gst_object_unref (rtpsession);
@@ -293,6 +343,26 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)
return ret;
}
+static GstFlowReturn
+gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)
+{
+ GstRTPSession *rtpsession;
+ gboolean ret = FALSE;
+
+ rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+
+ GST_DEBUG_OBJECT (rtpsession, "received event");
+
+ switch (GST_EVENT_TYPE (event)) {
+ default:
+ ret = gst_pad_push_event (rtpsession->send_rtp_src, event);
+ break;
+ }
+ gst_object_unref (rtpsession);
+
+ return ret;
+}
+
/* Recieve an RTP packet to be send to the receivers, send to RTP session
* manager and forward to send_rtp_src.
*/
@@ -304,6 +374,8 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad));
+ GST_DEBUG_OBJECT (rtpsession, "received RTP packet");
+
/* FIXME, do something */
ret = gst_pad_push (rtpsession->send_rtp_src, buffer);
@@ -319,17 +391,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)
static GstPad *
create_recv_rtp_sink (GstRTPSession * rtpsession)
{
+ GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad");
+
rtpsession->recv_rtp_sink =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template,
NULL);
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp);
+ gst_pad_set_event_function (rtpsession->recv_rtp_sink,
+ gst_rtp_session_event_recv_rtp_sink);
+ gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->recv_rtp_sink);
+ GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad");
rtpsession->recv_rtp_src =
gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template,
- NULL);
+ "recv_rtp_src");
+ gst_pad_set_active (rtpsession->recv_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src);
return rtpsession->recv_rtp_sink;
@@ -341,16 +420,24 @@ create_recv_rtp_sink (GstRTPSession * rtpsession)
static GstPad *
create_recv_rtcp_sink (GstRTPSession * rtpsession)
{
+ GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad");
+
rtpsession->recv_rtcp_sink =
gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template,
NULL);
gst_pad_set_chain_function (rtpsession->recv_rtcp_sink,
gst_rtp_session_chain_recv_rtcp);
+ gst_pad_set_event_function (rtpsession->recv_rtcp_sink,
+ gst_rtp_session_event_recv_rtcp_sink);
+ gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->recv_rtcp_sink);
+ GST_DEBUG_OBJECT (rtpsession, "creating sync src pad");
rtpsession->sync_src =
- gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL);
+ gst_pad_new_from_static_template (&rtpsession_sync_src_template,
+ "sync_src");
+ gst_pad_set_active (rtpsession->sync_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src);
return rtpsession->recv_rtcp_sink;
@@ -362,17 +449,23 @@ create_recv_rtcp_sink (GstRTPSession * rtpsession)
static GstPad *
create_send_rtp_sink (GstRTPSession * rtpsession)
{
+ GST_DEBUG_OBJECT (rtpsession, "creating pad");
+
rtpsession->send_rtp_sink =
gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template,
NULL);
gst_pad_set_chain_function (rtpsession->send_rtp_sink,
gst_rtp_session_chain_send_rtp);
+ gst_pad_set_event_function (rtpsession->send_rtp_sink,
+ gst_rtp_session_event_send_rtp_sink);
+ gst_pad_set_active (rtpsession->send_rtp_sink, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession),
rtpsession->recv_rtcp_sink);
rtpsession->send_rtp_src =
gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template,
NULL);
+ gst_pad_set_active (rtpsession->send_rtp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src);
return rtpsession->send_rtp_sink;
@@ -385,8 +478,11 @@ create_send_rtp_sink (GstRTPSession * rtpsession)
static GstPad *
create_rtcp_src (GstRTPSession * rtpsession)
{
+ GST_DEBUG_OBJECT (rtpsession, "creating pad");
+
rtpsession->rtcp_src =
gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL);
+ gst_pad_set_active (rtpsession->rtcp_src, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src);
return rtpsession->rtcp_src;
@@ -406,6 +502,8 @@ gst_rtp_session_request_new_pad (GstElement * element,
rtpsession = GST_RTP_SESSION (element);
klass = GST_ELEMENT_GET_CLASS (element);
+ GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
+
/* figure out the template */
if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) {
if (rtpsession->recv_rtp_sink != NULL)
diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c
index fe6f1bee..3237100c 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.c
+++ b/gst/rtpmanager/gstrtpssrcdemux.c
@@ -28,6 +28,9 @@
#include "gstrtpssrcdemux.h"
+GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
+#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
+
/* generic templates */
static GstStaticPadTemplate rtp_ssrc_demux_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
@@ -50,12 +53,10 @@ static GstElementDetails gst_rtp_ssrc_demux_details = {
"Wim Taymans <wim@fluendo.com>"
};
-GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
-#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug
-
/* signals */
enum
{
+ SIGNAL_NEW_SSRC_PAD,
LAST_SIGNAL
};
@@ -77,7 +78,7 @@ static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
/* srcpad stuff */
static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
-/* static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; */
+static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
/**
* Item for storing GstPad <-> SSRC pairs.
@@ -91,11 +92,11 @@ struct _GstRTPSsrcDemuxPad
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found
*/
static GstPad *
-find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
+find_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
{
GSList *walk;
- for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+ for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) {
GstRTPSsrcDemuxPad *pad = (GstRTPSsrcDemuxPad *) walk->data;
if (pad->ssrc == ssrc)
@@ -105,7 +106,7 @@ find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
}
static GstPad *
-create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
+create_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
{
GstPad *result;
GstElementClass *klass;
@@ -123,15 +124,18 @@ create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc)
demuxpad = g_new0 (GstRTPSsrcDemuxPad, 1);
demuxpad->ssrc = ssrc;
demuxpad->pad = result;
- demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+ demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad);
/* copy caps from input */
- gst_pad_set_caps (result, GST_PAD_CAPS (demux->sinkpad));
+ gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink));
gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event);
gst_pad_set_active (result, TRUE);
gst_element_add_pad (GST_ELEMENT_CAST (demux), result);
+ g_signal_emit (G_OBJECT (demux),
+ gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result);
+
return result;
}
@@ -159,6 +163,13 @@ gst_rtp_ssrc_demux_class_init (GstRTPSsrcDemuxClass * klass)
gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_finalize);
+ gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
+ g_signal_new ("new-ssrc-pad",
+ G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstRTPSsrcDemuxClass, new_ssrc_pad),
+ NULL, NULL, g_cclosure_marshal_VOID__UINT_POINTER,
+ G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD);
+
gstelement_klass->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
@@ -172,12 +183,12 @@ gst_rtp_ssrc_demux_init (GstRTPSsrcDemux * demux,
{
GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux);
- demux->sinkpad =
+ demux->rtp_sink =
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"sink"), "sink");
- gst_pad_set_chain_function (demux->sinkpad, gst_rtp_ssrc_demux_chain);
- gst_pad_set_event_function (demux->sinkpad, gst_rtp_ssrc_demux_sink_event);
- gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->sinkpad);
+ gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain);
+ gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event);
+ gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink);
}
static void
@@ -224,9 +235,12 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
ssrc = gst_rtp_buffer_get_ssrc (buf);
- srcpad = find_pad_for_ssrc (demux, ssrc);
+ GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
+
+ srcpad = find_rtp_pad_for_ssrc (demux, ssrc);
if (srcpad == NULL) {
- srcpad = create_pad_for_ssrc (demux, ssrc);
+ GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
+ srcpad = create_rtp_pad_for_ssrc (demux, ssrc);
if (!srcpad)
goto create_failed;
}
@@ -239,11 +253,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
/* ERRORS */
invalid_payload:
{
- /* this is not fatal yet */
- GST_ELEMENT_WARNING (demux, STREAM, DECODE, (NULL),
+ /* this is fatal and should be filtered earlier */
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
("Dropping invalid RTP payload"));
gst_buffer_unref (buf);
- return GST_FLOW_OK;
+ return GST_FLOW_ERROR;
}
create_failed:
{
diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h
index 6e1c2303..475d2f54 100644
--- a/gst/rtpmanager/gstrtpssrcdemux.h
+++ b/gst/rtpmanager/gstrtpssrcdemux.h
@@ -36,13 +36,16 @@ struct _GstRTPSsrcDemux
{
GstElement parent;
- GstPad *sinkpad;
- GSList *srcpads;
+ GstPad *rtp_sink;
+ GSList *rtp_srcpads;
};
struct _GstRTPSsrcDemuxClass
{
GstElementClass parent_class;
+
+ /* signals */
+ void (*new_ssrc_pad) (GstElement *element, guint32 ssrc, GstPad *pad);
};
GType gst_rtp_ssrc_demux_get_type (void);