diff options
Diffstat (limited to 'gst/rtpmanager')
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 285 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpbin.h | 4 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpmanager.c | 4 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpptdemux.c | 50 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 102 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpssrcdemux.c | 50 | ||||
-rw-r--r-- | gst/rtpmanager/gstrtpssrcdemux.h | 7 |
7 files changed, 446 insertions, 56 deletions
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index c1ec7130..d63321f6 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -43,6 +43,10 @@ #include "gstrtpbin.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug); +#define GST_CAT_DEFAULT gst_rtp_bin_debug + + /* elementfactory information */ static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin", "Filter/Editor/Video", @@ -98,6 +102,7 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", struct _GstRTPBinPrivate { + guint foo; }; /* signals and args */ @@ -113,30 +118,72 @@ enum }; /* helper objects */ -typedef struct +typedef struct _GstRTPBinSession GstRTPBinSession; +typedef struct _GstRTPBinStream GstRTPBinStream; +typedef struct _GstRTPBinClient GstRTPBinClient; + +/* Manages the RTP stream for one SSRC. + * + * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer. + * If we see an SDES RTCP packet that links multiple SSRCs together based on a + * common CNAME, we create a GstRTPBinClient structure to group the SSRCs + * together (see below). + */ +struct _GstRTPBinStream +{ + /* the SSRC of this stream */ + guint32 ssrc; + /* parent bin */ + GstRTPBin *bin; + /* the session this SSRC belongs to */ + GstRTPBinSession *session; + /* the jitterbuffer of the SSRC */ + GstElement *buffer; + /* the PT demuxer of the SSRC */ + GstElement *demux; + gulong demux_newpad_sig; +}; + +/* Manages the receiving end of the packets. + * + * There is one such structure for each RTP session (audio/video/...). + * We get the RTP/RTCP packets and stuff them into the session manager. From + * there they are pushed into an SSRC demuxer that splits the stream based on + * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with + * the GstRTPBinStream above). + */ +struct _GstRTPBinSession { /* session id */ gint id; + /* the parent bin */ + GstRTPBin *bin; /* the session element */ GstElement *session; /* the SSRC demuxer */ - GstElement *ssrcdemux; + GstElement *demux; + gulong demux_newpad_sig; + + /* list of GstRTPBinStream */ + GSList *streams; /* the pads of the session */ GstPad *recv_rtp_sink; + GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; + GstPad *recv_rtcp_src; GstPad *send_rtp_sink; + GstPad *send_rtp_src; GstPad *rtcp_src; - -} GstRTPBinSession; +}; /* find a session with the given id */ static GstRTPBinSession * find_session_by_id (GstRTPBin * rtpbin, gint id) { - GList *walk; + GSList *walk; - for (walk = rtpbin->sessions; walk; walk = g_list_next (walk)) { + for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) { GstRTPBinSession *sess = (GstRTPBinSession *) walk->data; if (sess->id == id) @@ -150,14 +197,25 @@ static GstRTPBinSession * create_session (GstRTPBin * rtpbin, gint id) { GstRTPBinSession *sess; - GstElement *elem; + GstElement *elem, *demux; if (!(elem = gst_element_factory_make ("rtpsession", NULL))) goto no_session; + if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) + goto no_demux; + sess = g_new0 (GstRTPBinSession, 1); sess->id = id; + sess->bin = rtpbin; sess->session = elem; + sess->demux = demux; + rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); + + gst_bin_add (GST_BIN_CAST (rtpbin), elem); + gst_element_set_state (elem, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (rtpbin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); return sess; @@ -167,8 +225,85 @@ no_session: g_warning ("rtpbin: could not create rtpsession element"); return NULL; } +no_demux: + { + gst_object_unref (elem); + g_warning ("rtpbin: could not create rtpssrcdemux element"); + return NULL; + } +} + +#if 0 +static GstRTPBinStream * +find_stream_by_ssrc (GstRTPBinSession * session, guint32 ssrc) +{ + GSList *walk; + + for (walk = session->streams; walk; walk = g_slist_next (walk)) { + GstRTPBinStream *stream = (GstRTPBinStream *) walk->data; + + if (stream->ssrc == ssrc) + return stream; + } + return NULL; +} +#endif + +static GstRTPBinStream * +create_stream (GstRTPBinSession * session, guint32 ssrc) +{ + GstElement *buffer, *demux; + GstRTPBinStream *stream; + + if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) + goto no_jitterbuffer; + + if (!(demux = gst_element_factory_make ("rtpptdemux", NULL))) + goto no_demux; + + stream = g_new0 (GstRTPBinStream, 1); + stream->ssrc = ssrc; + stream->bin = session->bin; + stream->session = session; + stream->buffer = buffer; + stream->demux = demux; + session->streams = g_slist_prepend (session->streams, stream); + + gst_bin_add (GST_BIN_CAST (session->bin), buffer); + gst_element_set_state (buffer, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (session->bin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); + + /* link stuff */ + gst_element_link (buffer, demux); + + return stream; + + /* ERRORS */ +no_jitterbuffer: + { + g_warning ("rtpbin: could not create rtpjitterbuffer element"); + return NULL; + } +no_demux: + { + gst_object_unref (buffer); + g_warning ("rtpbin: could not create rtpptdemux element"); + return NULL; + } } +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRTPBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + /* the streams */ + GSList *streams; +}; + /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -230,6 +365,8 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass) gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin"); } static void @@ -312,22 +449,88 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) return res; } +/* a new pad (SSRC) was created in @session */ +static void +new_payload_found (GstElement * element, guint pt, GstPad * pad, + GstRTPBinStream * stream) +{ + GstRTPBin *rtpbin; + GstElementClass *klass; + GstPadTemplate *templ; + gchar *padname; + GstPad *gpad; + + rtpbin = stream->bin; + + GST_DEBUG ("new payload pad %d", pt); + + /* ghost the pad to the parent */ + klass = GST_ELEMENT_GET_CLASS (rtpbin); + templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d"); + padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d", + stream->session->id, stream->ssrc, pt); + gpad = gst_ghost_pad_new_from_template (padname, pad, templ); + g_free (padname); + + gst_pad_set_active (gpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); +} + +/* a new pad (SSRC) was created in @session */ +static void +new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, + GstRTPBinSession * session) +{ + GstRTPBinStream *stream; + GstPad *sinkpad; + + GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); + + /* create new stream */ + stream = create_stream (session, ssrc); + if (!stream) + goto no_stream; + + /* get pad and link */ + GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); + sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); + gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + /* connect to the new-pad signal of the payload demuxer */ + stream->demux_newpad_sig = g_signal_connect (stream->demux, + "new-payload-type", (GCallback) new_payload_found, stream); + + return; + + /* ERRORS */ +no_stream: + { + GST_DEBUG ("could not create stream"); + return; + } +} + /* Create a pad for receiving RTP for the session in @name */ static GstPad * create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) { - GstPad *result; + GstPad *result, *sinkdpad; guint sessid; GstRTPBinSession *session; + GstPadLinkReturn lres; /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get or create session */ session = find_session_by_id (rtpbin, sessid); if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); /* create session now */ session = create_session (rtpbin, sessid); if (session == NULL) @@ -337,18 +540,37 @@ create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad"); /* get recv_rtp pad and store */ session->recv_rtp_sink = gst_element_get_request_pad (session->session, "recv_rtp_sink"); if (session->recv_rtp_sink == NULL) goto pad_failed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); + /* get srcpad, link to SSRCDemux */ + session->recv_rtp_src = + gst_element_get_static_pad (session->session, "recv_rtp_src"); + if (session->recv_rtp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; + + /* connect to the new-ssrc-pad signal of the demuxer */ + session->demux_newpad_sig = g_signal_connect (session->demux, + "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session); + + GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); result = gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -372,6 +594,11 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } } /* Create a pad for receiving RTCP for the session in @name @@ -384,10 +611,17 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, guint sessid; GstRTPBinSession *session; +#if 0 + GstPad *sinkdpad; + GstPadLinkReturn lres; +#endif + /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get the session, it must exist or we error */ session = find_session_by_id (rtpbin, sessid); if (!session) @@ -397,18 +631,35 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); + /* get recv_rtp pad and store */ session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) goto pad_failed; +#if 0 + /* get srcpad, link to SSRCDemux */ + GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); + session->recv_rtcp_src = + gst_element_get_static_pad (session->session, "sync_src"); + if (session->recv_rtcp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; +#endif + result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -433,6 +684,13 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +#if 0 +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } +#endif } /* Create a pad for sending RTP for the session in @name @@ -471,6 +729,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); /* get srcpad */ @@ -484,6 +743,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d"); srcghost = gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ); + gst_pad_set_active (srcghost, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost); g_free (gname); @@ -546,6 +806,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) goto pad_failed; result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); return result; diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index 517c1178..ccd57a78 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -38,10 +38,10 @@ typedef struct _GstRTPBinClass GstRTPBinClass; typedef struct _GstRTPBinPrivate GstRTPBinPrivate; struct _GstRTPBin { - GstBin element; + GstBin bin; /* a list of session */ - GList *sessions; + GSList *sessions; /*< private >*/ GstRTPBinPrivate *priv; diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index d71d850c..1490c4cb 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -21,6 +21,7 @@ #include "config.h" #endif +#include "gstrtpbin.h" #include "gstrtpclient.h" #include "gstrtpjitterbuffer.h" #include "gstrtpptdemux.h" @@ -30,6 +31,9 @@ static gboolean plugin_init (GstPlugin * plugin) { + if (!gst_element_register (plugin, "rtpbin", GST_RANK_NONE, GST_TYPE_RTP_BIN)) + return FALSE; + if (!gst_element_register (plugin, "rtpclient", GST_RANK_NONE, GST_TYPE_RTP_CLIENT)) return FALSE; diff --git a/gst/rtpmanager/gstrtpptdemux.c b/gst/rtpmanager/gstrtpptdemux.c index 5950f619..d7ff34d9 100644 --- a/gst/rtpmanager/gstrtpptdemux.c +++ b/gst/rtpmanager/gstrtpptdemux.c @@ -56,15 +56,15 @@ static GstStaticPadTemplate rtp_pt_demux_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/x-rtp, " - "payload = (int) [ 0, 255 ], " "clock-rate = (int) [ 0, 2147483647 ]") + GST_STATIC_CAPS ("application/x-rtp") ); static GstStaticPadTemplate rtp_pt_demux_src_template = -GST_STATIC_PAD_TEMPLATE ("src%d", +GST_STATIC_PAD_TEMPLATE ("src_%d", GST_PAD_SRC, GST_PAD_SOMETIMES, - GST_STATIC_CAPS_ANY); + GST_STATIC_CAPS ("application/x-rtp, " "payload = (int) [ 0, 255 ]") + ); GST_DEBUG_CATEGORY_STATIC (gst_rtp_pt_demux_debug); #define GST_CAT_DEFAULT gst_rtp_pt_demux_debug @@ -191,10 +191,13 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) rtpdemux = GST_RTP_PT_DEMUX (GST_OBJECT_PARENT (pad)); - g_return_val_if_fail (gst_rtp_buffer_validate (buf), GST_FLOW_ERROR); + if (!gst_rtp_buffer_validate (buf)) + goto invalid_buffer; pt = gst_rtp_buffer_get_payload_type (buf); + GST_DEBUG_OBJECT (rtpdemux, "received buffer for pt %d", pt); + srcpad = find_pad_for_pt (rtpdemux, pt); if (srcpad == NULL) { /* new PT, create a src pad */ @@ -205,15 +208,14 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) GstRTPPtDemuxPad *rtpdemuxpad; klass = GST_ELEMENT_GET_CLASS (rtpdemux); - templ = gst_element_class_get_pad_template (klass, "src%d"); - padname = g_strdup_printf ("src%d", pt); + templ = gst_element_class_get_pad_template (klass, "src_%d"); + padname = g_strdup_printf ("src_%d", pt); srcpad = gst_pad_new_from_template (templ, padname); g_free (padname); caps = gst_pad_get_caps (srcpad); caps = gst_caps_make_writable (caps); - gst_caps_append_structure (caps, - gst_structure_new ("payload", "payload", G_TYPE_INT, pt, NULL)); + gst_caps_set_simple (caps, "payload", G_TYPE_INT, pt, NULL); gst_pad_set_caps (srcpad, caps); /* XXX: set _link () function */ @@ -221,17 +223,15 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) gst_pad_set_active (srcpad, TRUE); gst_element_add_pad (element, srcpad); - if (srcpad) { - GST_DEBUG ("Adding pt=%d to the list.", pt); - rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1); - rtpdemuxpad->pt = pt; - rtpdemuxpad->pad = srcpad; - rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad); + GST_DEBUG ("Adding pt=%d to the list.", pt); + rtpdemuxpad = g_new0 (GstRTPPtDemuxPad, 1); + rtpdemuxpad->pt = pt; + rtpdemuxpad->pad = srcpad; + rtpdemux->srcpads = g_slist_append (rtpdemux->srcpads, rtpdemuxpad); - GST_DEBUG ("emitting new-payload_type for pt %d", pt); - g_signal_emit (G_OBJECT (rtpdemux), - gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad); - } + GST_DEBUG ("emitting new-payload_type for pt %d", pt); + g_signal_emit (G_OBJECT (rtpdemux), + gst_rtp_pt_demux_signals[SIGNAL_NEW_PAYLOAD_TYPE], 0, pt, srcpad); } if (pt != rtpdemux->last_pt) { @@ -246,9 +246,19 @@ gst_rtp_pt_demux_chain (GstPad * pad, GstBuffer * buf) /* push to srcpad */ if (srcpad) - gst_pad_push (srcpad, GST_BUFFER (buf)); + ret = gst_pad_push (srcpad, GST_BUFFER (buf)); return ret; + + /* ERRORS */ +invalid_buffer: + { + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (rtpdemux, STREAM, DECODE, (NULL), + ("Dropping invalid RTP payload")); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } } static GstCaps * diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index 47df756f..5d7508a6 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -41,6 +41,9 @@ #endif #include "gstrtpsession.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug); +#define GST_CAT_DEFAULT gst_rtp_session_debug + /* elementfactory information */ static const GstElementDetails rtpsession_details = GST_ELEMENT_DETAILS ("RTP Session", @@ -174,6 +177,9 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_session_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_session_release_pad); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_session_debug, + "rtpsession", 0, "RTP Session"); } static void @@ -255,6 +261,26 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition) return res; } +static GstFlowReturn +gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->recv_rtp_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* receive a packet from a sender, send it to the RTP session manager and * forward the packet on the rtp_src pad */ @@ -266,6 +292,8 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); + /* FIXME, do something */ ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); @@ -274,6 +302,26 @@ gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer) return ret; } +static GstFlowReturn +gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->sync_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* Receive an RTCP packet from a sender, send it to the RTP session manager and * forward the SR packets to the sync_src pad. */ @@ -286,6 +334,8 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); /* FIXME, do something */ + GST_DEBUG_OBJECT (rtpsession, "received RTCP packet"); + ret = gst_pad_push (rtpsession->sync_src, buffer); gst_object_unref (rtpsession); @@ -293,6 +343,26 @@ gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer) return ret; } +static GstFlowReturn +gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event) +{ + GstRTPSession *rtpsession; + gboolean ret = FALSE; + + rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + + GST_DEBUG_OBJECT (rtpsession, "received event"); + + switch (GST_EVENT_TYPE (event)) { + default: + ret = gst_pad_push_event (rtpsession->send_rtp_src, event); + break; + } + gst_object_unref (rtpsession); + + return ret; +} + /* Recieve an RTP packet to be send to the receivers, send to RTP session * manager and forward to send_rtp_src. */ @@ -304,6 +374,8 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); + GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); + /* FIXME, do something */ ret = gst_pad_push (rtpsession->send_rtp_src, buffer); @@ -319,17 +391,24 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) static GstPad * create_recv_rtp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating RTP sink pad"); + rtpsession->recv_rtp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->recv_rtp_sink, gst_rtp_session_chain_recv_rtp); + gst_pad_set_event_function (rtpsession->recv_rtp_sink, + gst_rtp_session_event_recv_rtp_sink); + gst_pad_set_active (rtpsession->recv_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_sink); + GST_DEBUG_OBJECT (rtpsession, "creating RTP src pad"); rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, - NULL); + "recv_rtp_src"); + gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); return rtpsession->recv_rtp_sink; @@ -341,16 +420,24 @@ create_recv_rtp_sink (GstRTPSession * rtpsession) static GstPad * create_recv_rtcp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating RTCP sink pad"); + rtpsession->recv_rtcp_sink = gst_pad_new_from_static_template (&rtpsession_recv_rtcp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->recv_rtcp_sink, gst_rtp_session_chain_recv_rtcp); + gst_pad_set_event_function (rtpsession->recv_rtcp_sink, + gst_rtp_session_event_recv_rtcp_sink); + gst_pad_set_active (rtpsession->recv_rtcp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtcp_sink); + GST_DEBUG_OBJECT (rtpsession, "creating sync src pad"); rtpsession->sync_src = - gst_pad_new_from_static_template (&rtpsession_sync_src_template, NULL); + gst_pad_new_from_static_template (&rtpsession_sync_src_template, + "sync_src"); + gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); return rtpsession->recv_rtcp_sink; @@ -362,17 +449,23 @@ create_recv_rtcp_sink (GstRTPSession * rtpsession) static GstPad * create_send_rtp_sink (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating pad"); + rtpsession->send_rtp_sink = gst_pad_new_from_static_template (&rtpsession_send_rtp_sink_template, NULL); gst_pad_set_chain_function (rtpsession->send_rtp_sink, gst_rtp_session_chain_send_rtp); + gst_pad_set_event_function (rtpsession->send_rtp_sink, + gst_rtp_session_event_send_rtp_sink); + gst_pad_set_active (rtpsession->send_rtp_sink, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtcp_sink); rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, NULL); + gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); return rtpsession->send_rtp_sink; @@ -385,8 +478,11 @@ create_send_rtp_sink (GstRTPSession * rtpsession) static GstPad * create_rtcp_src (GstRTPSession * rtpsession) { + GST_DEBUG_OBJECT (rtpsession, "creating pad"); + rtpsession->rtcp_src = gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); + gst_pad_set_active (rtpsession->rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); return rtpsession->rtcp_src; @@ -406,6 +502,8 @@ gst_rtp_session_request_new_pad (GstElement * element, rtpsession = GST_RTP_SESSION (element); klass = GST_ELEMENT_GET_CLASS (element); + GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name)); + /* figure out the template */ if (templ == gst_element_class_get_pad_template (klass, "recv_rtp_sink")) { if (rtpsession->recv_rtp_sink != NULL) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index fe6f1bee..3237100c 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -28,6 +28,9 @@ #include "gstrtpssrcdemux.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug); +#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug + /* generic templates */ static GstStaticPadTemplate rtp_ssrc_demux_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", @@ -50,12 +53,10 @@ static GstElementDetails gst_rtp_ssrc_demux_details = { "Wim Taymans <wim@fluendo.com>" }; -GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug); -#define GST_CAT_DEFAULT gst_rtp_ssrc_demux_debug - /* signals */ enum { + SIGNAL_NEW_SSRC_PAD, LAST_SIGNAL }; @@ -77,7 +78,7 @@ static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event); /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); -/* static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; /** * Item for storing GstPad <-> SSRC pairs. @@ -91,11 +92,11 @@ struct _GstRTPSsrcDemuxPad /* find a src pad for a given SSRC, returns NULL if the SSRC was not found */ static GstPad * -find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) +find_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) { GSList *walk; - for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) { GstRTPSsrcDemuxPad *pad = (GstRTPSsrcDemuxPad *) walk->data; if (pad->ssrc == ssrc) @@ -105,7 +106,7 @@ find_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) } static GstPad * -create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) +create_rtp_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) { GstPad *result; GstElementClass *klass; @@ -123,15 +124,18 @@ create_pad_for_ssrc (GstRTPSsrcDemux * demux, guint32 ssrc) demuxpad = g_new0 (GstRTPSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; demuxpad->pad = result; - demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad); /* copy caps from input */ - gst_pad_set_caps (result, GST_PAD_CAPS (demux->sinkpad)); + gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink)); gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event); gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (demux), result); + g_signal_emit (G_OBJECT (demux), + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result); + return result; } @@ -159,6 +163,13 @@ gst_rtp_ssrc_demux_class_init (GstRTPSsrcDemuxClass * klass) gobject_klass->finalize = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_finalize); + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] = + g_signal_new ("new-ssrc-pad", + G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstRTPSsrcDemuxClass, new_ssrc_pad), + NULL, NULL, g_cclosure_marshal_VOID__UINT_POINTER, + G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_PAD); + gstelement_klass->change_state = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state); @@ -172,12 +183,12 @@ gst_rtp_ssrc_demux_init (GstRTPSsrcDemux * demux, { GstElementClass *klass = GST_ELEMENT_GET_CLASS (demux); - demux->sinkpad = + demux->rtp_sink = gst_pad_new_from_template (gst_element_class_get_pad_template (klass, "sink"), "sink"); - gst_pad_set_chain_function (demux->sinkpad, gst_rtp_ssrc_demux_chain); - gst_pad_set_event_function (demux->sinkpad, gst_rtp_ssrc_demux_sink_event); - gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->sinkpad); + gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); + gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); } static void @@ -224,9 +235,12 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) ssrc = gst_rtp_buffer_get_ssrc (buf); - srcpad = find_pad_for_ssrc (demux, ssrc); + GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); + + srcpad = find_rtp_pad_for_ssrc (demux, ssrc); if (srcpad == NULL) { - srcpad = create_pad_for_ssrc (demux, ssrc); + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + srcpad = create_rtp_pad_for_ssrc (demux, ssrc); if (!srcpad) goto create_failed; } @@ -239,11 +253,11 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) /* ERRORS */ invalid_payload: { - /* this is not fatal yet */ - GST_ELEMENT_WARNING (demux, STREAM, DECODE, (NULL), + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Dropping invalid RTP payload")); gst_buffer_unref (buf); - return GST_FLOW_OK; + return GST_FLOW_ERROR; } create_failed: { diff --git a/gst/rtpmanager/gstrtpssrcdemux.h b/gst/rtpmanager/gstrtpssrcdemux.h index 6e1c2303..475d2f54 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.h +++ b/gst/rtpmanager/gstrtpssrcdemux.h @@ -36,13 +36,16 @@ struct _GstRTPSsrcDemux { GstElement parent; - GstPad *sinkpad; - GSList *srcpads; + GstPad *rtp_sink; + GSList *rtp_srcpads; }; struct _GstRTPSsrcDemuxClass { GstElementClass parent_class; + + /* signals */ + void (*new_ssrc_pad) (GstElement *element, guint32 ssrc, GstPad *pad); }; GType gst_rtp_ssrc_demux_get_type (void); |