diff options
Diffstat (limited to 'gst/rtpmanager/gstrtpbin.c')
-rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 285 |
1 files changed, 273 insertions, 12 deletions
diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index c1ec7130..d63321f6 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -43,6 +43,10 @@ #include "gstrtpbin.h" +GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug); +#define GST_CAT_DEFAULT gst_rtp_bin_debug + + /* elementfactory information */ static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin", "Filter/Editor/Video", @@ -98,6 +102,7 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d", struct _GstRTPBinPrivate { + guint foo; }; /* signals and args */ @@ -113,30 +118,72 @@ enum }; /* helper objects */ -typedef struct +typedef struct _GstRTPBinSession GstRTPBinSession; +typedef struct _GstRTPBinStream GstRTPBinStream; +typedef struct _GstRTPBinClient GstRTPBinClient; + +/* Manages the RTP stream for one SSRC. + * + * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer. + * If we see an SDES RTCP packet that links multiple SSRCs together based on a + * common CNAME, we create a GstRTPBinClient structure to group the SSRCs + * together (see below). + */ +struct _GstRTPBinStream +{ + /* the SSRC of this stream */ + guint32 ssrc; + /* parent bin */ + GstRTPBin *bin; + /* the session this SSRC belongs to */ + GstRTPBinSession *session; + /* the jitterbuffer of the SSRC */ + GstElement *buffer; + /* the PT demuxer of the SSRC */ + GstElement *demux; + gulong demux_newpad_sig; +}; + +/* Manages the receiving end of the packets. + * + * There is one such structure for each RTP session (audio/video/...). + * We get the RTP/RTCP packets and stuff them into the session manager. From + * there they are pushed into an SSRC demuxer that splits the stream based on + * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with + * the GstRTPBinStream above). + */ +struct _GstRTPBinSession { /* session id */ gint id; + /* the parent bin */ + GstRTPBin *bin; /* the session element */ GstElement *session; /* the SSRC demuxer */ - GstElement *ssrcdemux; + GstElement *demux; + gulong demux_newpad_sig; + + /* list of GstRTPBinStream */ + GSList *streams; /* the pads of the session */ GstPad *recv_rtp_sink; + GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; + GstPad *recv_rtcp_src; GstPad *send_rtp_sink; + GstPad *send_rtp_src; GstPad *rtcp_src; - -} GstRTPBinSession; +}; /* find a session with the given id */ static GstRTPBinSession * find_session_by_id (GstRTPBin * rtpbin, gint id) { - GList *walk; + GSList *walk; - for (walk = rtpbin->sessions; walk; walk = g_list_next (walk)) { + for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) { GstRTPBinSession *sess = (GstRTPBinSession *) walk->data; if (sess->id == id) @@ -150,14 +197,25 @@ static GstRTPBinSession * create_session (GstRTPBin * rtpbin, gint id) { GstRTPBinSession *sess; - GstElement *elem; + GstElement *elem, *demux; if (!(elem = gst_element_factory_make ("rtpsession", NULL))) goto no_session; + if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) + goto no_demux; + sess = g_new0 (GstRTPBinSession, 1); sess->id = id; + sess->bin = rtpbin; sess->session = elem; + sess->demux = demux; + rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); + + gst_bin_add (GST_BIN_CAST (rtpbin), elem); + gst_element_set_state (elem, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (rtpbin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); return sess; @@ -167,8 +225,85 @@ no_session: g_warning ("rtpbin: could not create rtpsession element"); return NULL; } +no_demux: + { + gst_object_unref (elem); + g_warning ("rtpbin: could not create rtpssrcdemux element"); + return NULL; + } +} + +#if 0 +static GstRTPBinStream * +find_stream_by_ssrc (GstRTPBinSession * session, guint32 ssrc) +{ + GSList *walk; + + for (walk = session->streams; walk; walk = g_slist_next (walk)) { + GstRTPBinStream *stream = (GstRTPBinStream *) walk->data; + + if (stream->ssrc == ssrc) + return stream; + } + return NULL; +} +#endif + +static GstRTPBinStream * +create_stream (GstRTPBinSession * session, guint32 ssrc) +{ + GstElement *buffer, *demux; + GstRTPBinStream *stream; + + if (!(buffer = gst_element_factory_make ("rtpjitterbuffer", NULL))) + goto no_jitterbuffer; + + if (!(demux = gst_element_factory_make ("rtpptdemux", NULL))) + goto no_demux; + + stream = g_new0 (GstRTPBinStream, 1); + stream->ssrc = ssrc; + stream->bin = session->bin; + stream->session = session; + stream->buffer = buffer; + stream->demux = demux; + session->streams = g_slist_prepend (session->streams, stream); + + gst_bin_add (GST_BIN_CAST (session->bin), buffer); + gst_element_set_state (buffer, GST_STATE_PLAYING); + gst_bin_add (GST_BIN_CAST (session->bin), demux); + gst_element_set_state (demux, GST_STATE_PLAYING); + + /* link stuff */ + gst_element_link (buffer, demux); + + return stream; + + /* ERRORS */ +no_jitterbuffer: + { + g_warning ("rtpbin: could not create rtpjitterbuffer element"); + return NULL; + } +no_demux: + { + gst_object_unref (buffer); + g_warning ("rtpbin: could not create rtpptdemux element"); + return NULL; + } } +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRTPBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + /* the streams */ + GSList *streams; +}; + /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -230,6 +365,8 @@ gst_rtp_bin_class_init (GstRTPBinClass * klass) gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_release_pad); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_bin_debug, "rtpbin", 0, "RTP bin"); } static void @@ -312,22 +449,88 @@ gst_rtp_bin_change_state (GstElement * element, GstStateChange transition) return res; } +/* a new pad (SSRC) was created in @session */ +static void +new_payload_found (GstElement * element, guint pt, GstPad * pad, + GstRTPBinStream * stream) +{ + GstRTPBin *rtpbin; + GstElementClass *klass; + GstPadTemplate *templ; + gchar *padname; + GstPad *gpad; + + rtpbin = stream->bin; + + GST_DEBUG ("new payload pad %d", pt); + + /* ghost the pad to the parent */ + klass = GST_ELEMENT_GET_CLASS (rtpbin); + templ = gst_element_class_get_pad_template (klass, "recv_rtp_src_%d_%d_%d"); + padname = g_strdup_printf ("recv_rtp_src_%d_%u_%d", + stream->session->id, stream->ssrc, pt); + gpad = gst_ghost_pad_new_from_template (padname, pad, templ); + g_free (padname); + + gst_pad_set_active (gpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), gpad); +} + +/* a new pad (SSRC) was created in @session */ +static void +new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, + GstRTPBinSession * session) +{ + GstRTPBinStream *stream; + GstPad *sinkpad; + + GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); + + /* create new stream */ + stream = create_stream (session, ssrc); + if (!stream) + goto no_stream; + + /* get pad and link */ + GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); + sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); + gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + + /* connect to the new-pad signal of the payload demuxer */ + stream->demux_newpad_sig = g_signal_connect (stream->demux, + "new-payload-type", (GCallback) new_payload_found, stream); + + return; + + /* ERRORS */ +no_stream: + { + GST_DEBUG ("could not create stream"); + return; + } +} + /* Create a pad for receiving RTP for the session in @name */ static GstPad * create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) { - GstPad *result; + GstPad *result, *sinkdpad; guint sessid; GstRTPBinSession *session; + GstPadLinkReturn lres; /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get or create session */ session = find_session_by_id (rtpbin, sessid); if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %d", sessid); /* create session now */ session = create_session (rtpbin, sessid); if (session == NULL) @@ -337,18 +540,37 @@ create_recv_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) if (session->recv_rtp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP sink pad"); /* get recv_rtp pad and store */ session->recv_rtp_sink = gst_element_get_request_pad (session->session, "recv_rtp_sink"); if (session->recv_rtp_sink == NULL) goto pad_failed; + GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); + /* get srcpad, link to SSRCDemux */ + session->recv_rtp_src = + gst_element_get_static_pad (session->session, "recv_rtp_src"); + if (session->recv_rtp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; + + /* connect to the new-ssrc-pad signal of the demuxer */ + session->demux_newpad_sig = g_signal_connect (session->demux, + "new-ssrc-pad", (GCallback) new_ssrc_pad_found, session); + + GST_DEBUG_OBJECT (rtpbin, "ghosting session sink pad"); result = gst_ghost_pad_new_from_template (name, session->recv_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -372,6 +594,11 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } } /* Create a pad for receiving RTCP for the session in @name @@ -384,10 +611,17 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, guint sessid; GstRTPBinSession *session; +#if 0 + GstPad *sinkdpad; + GstPadLinkReturn lres; +#endif + /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) goto no_name; + GST_DEBUG_OBJECT (rtpbin, "finding session %d", sessid); + /* get the session, it must exist or we error */ session = find_session_by_id (rtpbin, sessid); if (!session) @@ -397,18 +631,35 @@ create_recv_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, if (session->recv_rtcp_sink != NULL) goto existed; + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); + /* get recv_rtp pad and store */ session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) goto pad_failed; +#if 0 + /* get srcpad, link to SSRCDemux */ + GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); + session->recv_rtcp_src = + gst_element_get_static_pad (session->session, "sync_src"); + if (session->recv_rtcp_src == NULL) + goto pad_failed; + + GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); + sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + gst_object_unref (sinkdpad); + if (lres != GST_PAD_LINK_OK) + goto link_failed; +#endif + result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); - /* FIXME, get srcpad, link to SSRCDemux */ - return result; /* ERRORS */ @@ -433,6 +684,13 @@ pad_failed: g_warning ("rtpbin: failed to get session pad"); return NULL; } +#if 0 +link_failed: + { + g_warning ("rtpbin: failed to link pads"); + return NULL; + } +#endif } /* Create a pad for sending RTP for the session in @name @@ -471,6 +729,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) result = gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); /* get srcpad */ @@ -484,6 +743,7 @@ create_send_rtp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) templ = gst_element_class_get_pad_template (klass, "send_rtp_src_%d"); srcghost = gst_ghost_pad_new_from_template (gname, session->send_rtp_sink, templ); + gst_pad_set_active (srcghost, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), srcghost); g_free (gname); @@ -546,6 +806,7 @@ create_rtcp (GstRTPBin * rtpbin, GstPadTemplate * templ, const gchar * name) goto pad_failed; result = gst_ghost_pad_new_from_template (name, session->rtcp_src, templ); + gst_pad_set_active (result, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), result); return result; |