From a2a6ea8645b6270564ab3a1f4b448f0cc378d9fd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 1 Oct 2007 11:43:09 +0000 Subject: Added SDP demuxer element. Fixes #426657. Original commit message from CVS: * configure.ac: * gst/sdp/gstsdpdemux.c: (_do_init), (gst_sdp_demux_base_init), (gst_sdp_demux_class_init), (gst_sdp_demux_init), (gst_sdp_demux_finalize), (gst_sdp_demux_set_property), (gst_sdp_demux_get_property), (find_stream_by_id), (find_stream_by_pt), (find_stream_by_udpsrc), (find_stream), (gst_sdp_demux_stream_free), (gst_sdp_demux_create_stream), (gst_sdp_demux_cleanup), (get_default_rate_for_pt), (gst_sdp_demux_parse_rtpmap), (gst_sdp_demux_media_to_caps), (new_session_pad), (request_pt_map), (gst_sdp_demux_do_stream_eos), (on_bye_ssrc), (on_timeout), (gst_sdp_demux_configure_manager), (gst_sdp_demux_stream_configure_udp), (gst_sdp_demux_stream_configure_udp_sink), (gst_sdp_demux_combine_flows), (gst_sdp_demux_stream_push_event), (gst_sdp_demux_handle_message), (gst_sdp_demux_start), (gst_sdp_demux_sink_event), (gst_sdp_demux_sink_chain), (gst_sdp_demux_change_state): * gst/sdp/gstsdpdemux.h: * gst/sdp/gstsdpelem.c: (plugin_init): Added SDP demuxer element. Fixes #426657. --- gst/sdp/gstsdpdemux.c | 1378 +++++++++++++++++++++++++++++++++++++++++++++++++ gst/sdp/gstsdpdemux.h | 114 ++++ gst/sdp/gstsdpelem.c | 40 ++ 3 files changed, 1532 insertions(+) create mode 100644 gst/sdp/gstsdpdemux.c create mode 100644 gst/sdp/gstsdpdemux.h create mode 100644 gst/sdp/gstsdpelem.c (limited to 'gst/sdp') diff --git a/gst/sdp/gstsdpdemux.c b/gst/sdp/gstsdpdemux.c new file mode 100644 index 00000000..b574eadf --- /dev/null +++ b/gst/sdp/gstsdpdemux.c @@ -0,0 +1,1378 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +/** + * SECTION:element-sdpdemux + * + * + * + * sdpdemux currently understands SDP as the input format of the session description. + * For each stream listed in the SDP a new rtp_stream%d pad will be created + * with caps derived from the SDP media description. This is a caps of mime type + * "application/x-rtp" that can be connected to any available RTP depayloader + * element. + * + * + * sdpdemux will internally instantiate an RTP session manager element + * that will handle the RTCP messages to and from the server, jitter removal, + * packet reordering along with providing a clock for the pipeline. + * + * + * sdpdemux acts like a live element and will therefore only generate data in the + * PLAYING state. + * + * Example launch line + * + * + * gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink + * + * Establish a connection to an HTTP server that contains an SDP session description + * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink. + * + * + * + * Last reviewed on 2007-10-01 (0.10.6) + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include +#include +#include +#include + +#include + +#include "gstsdpdemux.h" + +GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug); +#define GST_CAT_DEFAULT (sdpdemux_debug) + +/* elementfactory information */ +static const GstElementDetails gst_sdp_demux_details = +GST_ELEMENT_DETAILS ("SDP session setup", + "Codec/Demuxer/Network/RTP", + "Receive data over the network via SDP", + "Wim Taymans "); + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/sdp")); + +static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream%d", + GST_PAD_SRC, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtp")); + +enum +{ + /* FILL ME */ + LAST_SIGNAL +}; + +#define DEFAULT_DEBUG FALSE +#define DEFAULT_TIMEOUT 10000000 +#define DEFAULT_LATENCY_MS 200 + +enum +{ + PROP_0, + PROP_DEBUG, + PROP_TIMEOUT, + PROP_LATENCY +}; + +static void gst_sdp_demux_base_init (gpointer g_class); +static void gst_sdp_demux_finalize (GObject * object); + +static void gst_sdp_demux_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_sdp_demux_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static GstCaps *gst_sdp_demux_media_to_caps (gint pt, + const GstSDPMedia * media); + +static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element, + GstStateChange transition); +static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message); + +static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux, + GstSDPStream * stream, GstEvent * event); + +static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event); +static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad, + GstBuffer * buffer); + +/*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */ + +static void +_do_init (GType sdp_demux_type) +{ + GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux"); +} + +GST_BOILERPLATE_FULL (GstSDPDemux, gst_sdp_demux, GstBin, GST_TYPE_BIN, + _do_init); + +static void +gst_sdp_demux_base_init (gpointer g_class) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sinktemplate)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&rtptemplate)); + + gst_element_class_set_details (element_class, &gst_sdp_demux_details); +} + +static void +gst_sdp_demux_class_init (GstSDPDemuxClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBinClass *gstbin_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbin_class = (GstBinClass *) klass; + + gobject_class->set_property = gst_sdp_demux_set_property; + gobject_class->get_property = gst_sdp_demux_get_property; + + gobject_class->finalize = gst_sdp_demux_finalize; + + g_object_class_install_property (gobject_class, PROP_DEBUG, + g_param_spec_boolean ("debug", "Debug", + "Dump request and response messages to stdout", + DEFAULT_DEBUG, G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (gobject_class, PROP_TIMEOUT, + g_param_spec_uint64 ("timeout", "Timeout", + "Fail transport after UDP timeout microseconds (0 = disabled)", + 0, G_MAXUINT64, DEFAULT_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (gobject_class, PROP_LATENCY, + g_param_spec_uint ("latency", "Buffer latency in ms", + "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); + + gstelement_class->change_state = gst_sdp_demux_change_state; + + gstbin_class->handle_message = gst_sdp_demux_handle_message; +} + +static void +gst_sdp_demux_init (GstSDPDemux * demux, GstSDPDemuxClass * g_class) +{ + demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); + gst_pad_set_event_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event)); + gst_pad_set_chain_function (demux->sinkpad, + GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain)); + gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); + + /* protects the streaming thread in interleaved mode or the polling + * thread in UDP mode. */ + demux->stream_rec_lock = g_new (GStaticRecMutex, 1); + g_static_rec_mutex_init (demux->stream_rec_lock); + + demux->adapter = gst_adapter_new (); +} + +static void +gst_sdp_demux_finalize (GObject * object) +{ + GstSDPDemux *demux; + + demux = GST_SDP_DEMUX (object); + + /* free locks */ + g_static_rec_mutex_free (demux->stream_rec_lock); + g_free (demux->stream_rec_lock); + + g_object_unref (demux->adapter); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_sdp_demux_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstSDPDemux *demux; + + demux = GST_SDP_DEMUX (object); + + switch (prop_id) { + case PROP_DEBUG: + demux->debug = g_value_get_boolean (value); + break; + case PROP_TIMEOUT: + demux->udp_timeout = g_value_get_uint64 (value); + break; + case PROP_LATENCY: + demux->latency = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstSDPDemux *demux; + + demux = GST_SDP_DEMUX (object); + + switch (prop_id) { + case PROP_DEBUG: + g_value_set_boolean (value, demux->debug); + break; + case PROP_TIMEOUT: + g_value_set_uint64 (value, demux->udp_timeout); + break; + case PROP_LATENCY: + g_value_set_uint (value, demux->latency); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gint +find_stream_by_id (GstSDPStream * stream, gconstpointer a) +{ + gint id = GPOINTER_TO_INT (a); + + if (stream->id == id) + return 0; + + return -1; +} + +static gint +find_stream_by_pt (GstSDPStream * stream, gconstpointer a) +{ + gint pt = GPOINTER_TO_INT (a); + + if (stream->pt == pt) + return 0; + + return -1; +} + +static gint +find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a) +{ + GstElement *src = (GstElement *) a; + + if (stream->udpsrc[0] == src) + return 0; + if (stream->udpsrc[1] == src) + return 0; + + return -1; +} + +GstSDPStream * +find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func) +{ + GList *lstream; + + /* find and get stream */ + if ((lstream = + g_list_find_custom (demux->streams, data, (GCompareFunc) func))) + return (GstSDPStream *) lstream->data; + + return NULL; +} + +static void +gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream) +{ + gint i; + + GST_DEBUG_OBJECT (demux, "free stream %p", stream); + + if (stream->caps) + gst_caps_unref (stream->caps); + + for (i = 0; i < 2; i++) { + GstElement *udpsrc = stream->udpsrc[i]; + + if (udpsrc) { + gst_element_set_state (udpsrc, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), udpsrc); + stream->udpsrc[i] = NULL; + } + } + if (stream->udpsink) { + gst_element_set_state (stream->udpsink, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink); + stream->udpsink = NULL; + } + if (stream->srcpad) { + gst_pad_set_active (stream->srcpad, FALSE); + if (stream->added) { + gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad); + stream->added = FALSE; + } + stream->srcpad = NULL; + } + g_free (stream); +} + +static GstSDPStream * +gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx) +{ + GstSDPStream *stream; + const gchar *payload, *rtcp; + const GstSDPMedia *media; + const GstSDPConnection *conn; + + /* get media, should not return NULL */ + media = gst_sdp_message_get_media (sdp, idx); + if (media == NULL) + return NULL; + + stream = g_new0 (GstSDPStream, 1); + stream->parent = demux; + /* we mark the pad as not linked, we will mark it as OK when we add the pad to + * the element. */ + stream->last_ret = GST_FLOW_OK; + stream->added = FALSE; + stream->disabled = FALSE; + stream->id = demux->numstreams++; + stream->eos = FALSE; + + /* we must have a payload. No payload means we cannot create caps */ + /* FIXME, handle multiple formats. */ + if ((payload = gst_sdp_media_get_format (media, 0))) { + stream->pt = atoi (payload); + /* convert caps */ + stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media); + + if (stream->pt >= 96) { + /* If we have a dynamic payload type, see if we have a stream with the + * same payload number. If there is one, they are part of the same + * container and we only need to add one pad. */ + if (find_stream (demux, GINT_TO_POINTER (stream->pt), + (gpointer) find_stream_by_pt)) { + stream->container = TRUE; + } + } + } + if (!(conn = gst_sdp_media_get_connection (media, 0))) { + if (!(conn = gst_sdp_message_get_connection (sdp))) + goto no_connection; + } + + stream->destination = conn->address; + stream->ttl = conn->ttl; + + stream->rtp_port = gst_sdp_media_get_port (media); + if ((rtcp = gst_sdp_media_get_attribute_val (media, "rtcp"))) { + /* FIXME, RFC 3605 */ + stream->rtcp_port = stream->rtp_port + 1; + } else { + stream->rtcp_port = stream->rtp_port + 1; + } + + GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream); + GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt); + GST_DEBUG_OBJECT (demux, " container: %d", stream->container); + GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps); + + /* we keep track of all streams */ + demux->streams = g_list_append (demux->streams, stream); + + return stream; + + /* ERRORS */ +no_connection: + { + gst_sdp_demux_stream_free (demux, stream); + return NULL; + } +} + +static void +gst_sdp_demux_cleanup (GstSDPDemux * demux) +{ + GList *walk; + + GST_DEBUG_OBJECT (demux, "cleanup"); + + for (walk = demux->streams; walk; walk = g_list_next (walk)) { + GstSDPStream *stream = (GstSDPStream *) walk->data; + + gst_sdp_demux_stream_free (demux, stream); + } + g_list_free (demux->streams); + demux->streams = NULL; + if (demux->session) { + if (demux->session_sig_id) { + g_signal_handler_disconnect (demux->session, demux->session_sig_id); + demux->session_sig_id = 0; + } + gst_element_set_state (demux->session, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), demux->session); + demux->session = NULL; + } + demux->numstreams = 0; +} + +/* FIXME, this should go somewhere else, ideally + */ +static guint +get_default_rate_for_pt (gint pt, gchar * name, gchar * params) +{ + switch (pt) { + case 0: + case 3: + case 4: + case 5: + case 7: + case 8: + case 9: + case 12: + case 13: + case 15: + case 18: + return 8000; + case 16: + return 11025; + case 17: + return 22050; + case 6: + return 16000; + case 10: + case 11: + return 44100; + case 14: + case 25: + case 26: + case 28: + case 31: + case 32: + case 33: + case 34: + return 90000; + default: + { + if (g_str_has_prefix (name, "x-pn-real")) + return 1000; + return -1; + } + } +} + +#define PARSE_INT(p, del, res) \ +G_STMT_START { \ + gchar *t = p; \ + p = strstr (p, del); \ + if (p == NULL) \ + res = -1; \ + else { \ + *p = '\0'; \ + p++; \ + res = atoi (t); \ + } \ +} G_STMT_END + +#define PARSE_STRING(p, del, res) \ +G_STMT_START { \ + gchar *t = p; \ + p = strstr (p, del); \ + if (p == NULL) { \ + res = NULL; \ + p = t; \ + } \ + else { \ + *p = '\0'; \ + p++; \ + res = t; \ + } \ +} G_STMT_END + +#define SKIP_SPACES(p) \ + while (*p && g_ascii_isspace (*p)) \ + p++; + +/* rtpmap contains: + * + * /[/] + */ +static gboolean +gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name, + gint * rate, gchar ** params) +{ + gchar *p, *t; + + t = p = (gchar *) rtpmap; + + PARSE_INT (p, " ", *payload); + if (*payload == -1) + return FALSE; + + SKIP_SPACES (p); + if (*p == '\0') + return FALSE; + + PARSE_STRING (p, "/", *name); + if (*name == NULL) { + GST_DEBUG ("no rate, name %s", p); + /* no rate, assume -1 then */ + *name = p; + *rate = -1; + return TRUE; + } + + t = p; + p = strstr (p, "/"); + if (p == NULL) { + *rate = atoi (t); + return TRUE; + } + *p = '\0'; + p++; + *rate = atoi (t); + + t = p; + if (*p == '\0') + return TRUE; + *params = t; + + return TRUE; +} + +/* + * Mapping of caps to and from SDP fields: + * + * m= RTP/AVP + * a=rtpmap: /[/] + * a=fmtp: [=];... + */ +static GstCaps * +gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media) +{ + GstCaps *caps; + const gchar *rtpmap; + const gchar *fmtp; + gchar *name = NULL; + gint rate = -1; + gchar *params = NULL; + gchar *tmp; + GstStructure *s; + gint payload = 0; + gboolean ret; + + /* get and parse rtpmap */ + if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) { + ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, ¶ms); + if (ret) { + if (payload != pt) { + /* we ignore the rtpmap if the payload type is different. */ + g_warning ("rtpmap of wrong payload type, ignoring"); + name = NULL; + rate = -1; + params = NULL; + } + } else { + /* if we failed to parse the rtpmap for a dynamic payload type, we have an + * error */ + if (pt >= 96) + goto no_rtpmap; + /* else we can ignore */ + g_warning ("error parsing rtpmap, ignoring"); + } + } else { + /* dynamic payloads need rtpmap or we fail */ + if (pt >= 96) + goto no_rtpmap; + } + /* check if we have a rate, if not, we need to look up the rate from the + * default rates based on the payload types. */ + if (rate == -1) { + rate = get_default_rate_for_pt (pt, name, params); + /* we fail if we cannot find one */ + if (rate == -1) + goto no_rate; + } + + tmp = g_ascii_strdown (media->media, -1); + caps = gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL); + g_free (tmp); + s = gst_caps_get_structure (caps, 0); + + gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL); + + /* encoding name must be upper case */ + if (name != NULL) { + tmp = g_ascii_strup (name, -1); + gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL); + g_free (tmp); + } + + /* params must be lower case */ + if (params != NULL) { + tmp = g_ascii_strdown (params, -1); + gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL); + g_free (tmp); + } + + /* parse optional fmtp: field */ + if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) { + gchar *p; + gint payload = 0; + + p = (gchar *) fmtp; + + /* p is now of the format [=];... */ + PARSE_INT (p, " ", payload); + if (payload != -1 && payload == pt) { + gchar **pairs; + gint i; + + /* [=] are separated with ';' */ + pairs = g_strsplit (p, ";", 0); + for (i = 0; pairs[i]; i++) { + gchar *valpos; + gchar *val, *key; + + /* the key may not have a '=', the value can have other '='s */ + valpos = strstr (pairs[i], "="); + if (valpos) { + /* we have a '=' and thus a value, remove the '=' with \0 */ + *valpos = '\0'; + /* value is everything between '=' and ';'. FIXME, strip? */ + val = g_strstrip (valpos + 1); + } else { + /* simple ;.. is translated into =1;... */ + val = "1"; + } + /* strip the key of spaces, convert key to lowercase but not the value. */ + key = g_strstrip (pairs[i]); + if (strlen (key) > 1) { + tmp = g_ascii_strdown (key, -1); + gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL); + g_free (tmp); + } + } + g_strfreev (pairs); + } + } + return caps; + + /* ERRORS */ +no_rtpmap: + { + g_warning ("rtpmap type not given for dynamic payload %d", pt); + return NULL; + } +no_rate: + { + g_warning ("rate unknown for payload type %d", pt); + return NULL; + } +} + +/* this callback is called when the session manager generated a new src pad with + * payloaded RTP packets. We simply ghost the pad here. */ +static void +new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux) +{ + gchar *name; + GstPadTemplate *template; + gint id, ssrc, pt; + GList *lstream; + GstSDPStream *stream; + gboolean all_added; + + GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad); + + GST_SDP_STREAM_LOCK (demux); + /* find stream */ + name = gst_object_get_name (GST_OBJECT_CAST (pad)); + if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3) + goto unknown_stream; + + GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt); + + stream = + find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id); + if (stream == NULL) + goto unknown_stream; + + /* no need for a timeout anymore now */ + g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL); + + /* create a new pad we will use to stream to */ + template = gst_static_pad_template_get (&rtptemplate); + stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template); + gst_object_unref (template); + g_free (name); + + stream->added = TRUE; + gst_pad_set_active (stream->srcpad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad); + + /* check if we added all streams */ + all_added = TRUE; + for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) { + stream = (GstSDPStream *) lstream->data; + /* a container stream only needs one pad added. Also disabled streams don't + * count */ + if (!stream->container && !stream->disabled && !stream->added) { + all_added = FALSE; + break; + } + } + GST_SDP_STREAM_UNLOCK (demux); + + if (all_added) { + GST_DEBUG_OBJECT (demux, "We added all streams"); + /* when we get here, all stream are added and we can fire the no-more-pads + * signal. */ + gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); + } + + return; + + /* ERRORS */ +unknown_stream: + { + GST_DEBUG_OBJECT (demux, "ignoring unknown stream"); + GST_SDP_STREAM_UNLOCK (demux); + g_free (name); + return; + } +} + +static GstCaps * +request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux) +{ + GstSDPStream *stream; + GstCaps *caps; + + GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt, + session); + + GST_SDP_STREAM_LOCK (demux); + stream = + find_stream (demux, GINT_TO_POINTER (session), + (gpointer) find_stream_by_id); + if (!stream) + goto unknown_stream; + + caps = stream->caps; + GST_SDP_STREAM_UNLOCK (demux); + + return caps; + +unknown_stream: + { + GST_DEBUG_OBJECT (demux, "unknown stream %d", session); + GST_SDP_STREAM_UNLOCK (demux); + return NULL; + } +} + +static void +gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session) +{ + GstSDPStream *stream; + + GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session); + + /* get stream for session */ + stream = + find_stream (demux, GINT_TO_POINTER (session), + (gpointer) find_stream_by_id); + if (!stream) + goto unknown_stream; + + if (stream->eos) + goto was_eos; + + stream->eos = TRUE; + gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ()); + return; + + /* ERRORS */ +unknown_stream: + { + GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session); + return; + } +was_eos: + { + GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session); + return; + } +} + +static void +on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc, + GstSDPDemux * demux) +{ + GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc, + session); + + gst_sdp_demux_do_stream_eos (demux, session); +} + +static void +on_timeout (GstElement * manager, guint session, guint32 ssrc, + GstSDPDemux * demux) +{ + GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session); + + gst_sdp_demux_do_stream_eos (demux, session); +} + +/* try to get and configure a manager */ +static gboolean +gst_sdp_demux_configure_manager (GstSDPDemux * demux) +{ + GstStateChangeReturn ret; + + /* configure the session manager */ + if (!(demux->session = gst_element_factory_make ("gstrtpbin", NULL))) + goto manager_failed; + + /* we manage this element */ + gst_bin_add (GST_BIN_CAST (demux), demux->session); + + ret = gst_element_set_state (demux->session, GST_STATE_PAUSED); + if (ret == GST_STATE_CHANGE_FAILURE) + goto start_session_failure; + + g_object_set (demux->session, "latency", demux->latency, NULL); + + /* connect to signals if we did not already do so */ + GST_DEBUG_OBJECT (demux, "connect to signals on session manager"); + demux->session_sig_id = + g_signal_connect (demux->session, "pad-added", + (GCallback) new_session_pad, demux); + demux->session_ptmap_id = + g_signal_connect (demux->session, "request-pt-map", + (GCallback) request_pt_map, demux); + g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, + demux); + g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout, + demux); + g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout, + demux); + + return TRUE; + + /* ERRORS */ +manager_failed: + { + GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found"); + return FALSE; + } +start_session_failure: + { + GST_DEBUG_OBJECT (demux, "could not start session"); + gst_element_set_state (demux->session, GST_STATE_NULL); + gst_bin_remove (GST_BIN_CAST (demux), demux->session); + demux->session = NULL; + return FALSE; + } +} + +static gboolean +gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream) +{ + gchar *uri, *name; + GstPad *pad; + + GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast"); + + /* creating UDP source */ + if (stream->rtp_port != -1) { + GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", stream->destination, + stream->rtp_port); + + uri = g_strdup_printf ("udp://%s:%d", stream->destination, + stream->rtp_port); + stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); + g_free (uri); + if (stream->udpsrc[0] == NULL) + goto no_element; + + /* take ownership */ + gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]); + + GST_DEBUG_OBJECT (demux, + "setting up UDP source with timeout %" G_GINT64_FORMAT, + demux->udp_timeout); + + /* configure a timeout on the UDP port. When the timeout message is + * posted, we assume UDP transport is not possible. */ + g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", demux->udp_timeout, + NULL); + + /* get output pad of the UDP source. */ + pad = gst_element_get_pad (stream->udpsrc[0], "src"); + + name = g_strdup_printf ("recv_rtp_sink_%d", stream->id); + stream->channelpad[0] = gst_element_get_request_pad (demux->session, name); + g_free (name); + + GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager"); + /* configure for UDP delivery, we need to connect the UDP pads to + * the session plugin. */ + gst_pad_link (pad, stream->channelpad[0]); + gst_object_unref (pad); + + /* change state */ + gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED); + } + + /* creating another UDP source */ + if (stream->rtcp_port != -1) { + GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", stream->destination, + stream->rtcp_port); + uri = + g_strdup_printf ("udp://%s:%d", stream->destination, stream->rtcp_port); + stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); + g_free (uri); + if (stream->udpsrc[1] == NULL) + goto no_element; + + /* take ownership */ + gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]); + + GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager"); + + name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id); + stream->channelpad[1] = gst_element_get_request_pad (demux->session, name); + g_free (name); + + pad = gst_element_get_pad (stream->udpsrc[1], "src"); + gst_pad_link (pad, stream->channelpad[1]); + gst_object_unref (pad); + + gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED); + } + return TRUE; + + /* ERRORS */ +no_element: + { + GST_DEBUG_OBJECT (demux, "no UDP source element found"); + return FALSE; + } +} + +/* configure the UDP sink back to the server for status reports */ +static gboolean +gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux, + GstSDPStream * stream) +{ + GstPad *pad, *sinkpad; + gint port, sockfd = -1; + gchar *destination, *uri, *name; + + /* get destination and port */ + port = stream->rtcp_port; + destination = stream->destination; + + GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port); + + uri = g_strdup_printf ("udp://%s:%d", destination, port); + stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL); + g_free (uri); + if (stream->udpsink == NULL) + goto no_sink_element; + + /* no sync needed */ + g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL); + /* no async state changes needed */ + g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL); + + if (stream->udpsrc[1]) { + /* configure socket, we give it the same UDP socket as the udpsrc for RTCP + * because some servers check the port number of where it sends RTCP to identify + * the RTCP packets it receives */ + g_object_get (G_OBJECT (stream->udpsrc[1]), "sock", &sockfd, NULL); + GST_DEBUG_OBJECT (demux, "UDP src has sock %d", sockfd); + /* configure socket and make sure udpsink does not close it when shutting + * down, it belongs to udpsrc after all. */ + g_object_set (G_OBJECT (stream->udpsink), "sockfd", sockfd, NULL); + g_object_set (G_OBJECT (stream->udpsink), "closefd", FALSE, NULL); + } + + /* we keep this playing always */ + gst_element_set_locked_state (stream->udpsink, TRUE); + gst_element_set_state (stream->udpsink, GST_STATE_PLAYING); + + gst_bin_add (GST_BIN_CAST (demux), stream->udpsink); + + /* get session RTCP pad */ + name = g_strdup_printf ("send_rtcp_src_%d", stream->id); + pad = gst_element_get_request_pad (demux->session, name); + g_free (name); + + /* and link */ + if (pad) { + sinkpad = gst_element_get_pad (stream->udpsink, "sink"); + gst_pad_link (pad, sinkpad); + gst_object_unref (sinkpad); + } else { + /* not very fatal, we just won't be able to send RTCP */ + GST_WARNING_OBJECT (demux, "could not get session RTCP pad"); + } + + + return TRUE; + + /* ERRORS */ +no_sink_element: + { + GST_DEBUG_OBJECT (demux, "no UDP sink element found"); + return FALSE; + } +} + +static GstFlowReturn +gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream, + GstFlowReturn ret) +{ + GList *streams; + + /* store the value */ + stream->last_ret = ret; + + /* if it's success we can return the value right away */ + if (GST_FLOW_IS_SUCCESS (ret)) + goto done; + + /* any other error that is not-linked can be returned right + * away */ + if (ret != GST_FLOW_NOT_LINKED) + goto done; + + /* only return NOT_LINKED if all other pads returned NOT_LINKED */ + for (streams = demux->streams; streams; streams = g_list_next (streams)) { + GstSDPStream *ostream = (GstSDPStream *) streams->data; + + ret = ostream->last_ret; + /* some other return value (must be SUCCESS but we can return + * other values as well) */ + if (ret != GST_FLOW_NOT_LINKED) + goto done; + } + /* if we get here, all other pads were unlinked and we return + * NOT_LINKED then */ +done: + return ret; +} + +static void +gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream, + GstEvent * event) +{ + /* only streams that have a connection to the outside world */ + if (stream->srcpad == NULL) + goto done; + + if (stream->channelpad[0]) { + gst_event_ref (event); + gst_pad_send_event (stream->channelpad[0], event); + } + + if (stream->channelpad[1]) { + gst_event_ref (event); + gst_pad_send_event (stream->channelpad[1], event); + } + +done: + gst_event_unref (event); +} + +static void +gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message) +{ + GstSDPDemux *demux; + + demux = GST_SDP_DEMUX (bin); + + switch (GST_MESSAGE_TYPE (message)) { + case GST_MESSAGE_ELEMENT: + { + const GstStructure *s = gst_message_get_structure (message); + + if (gst_structure_has_name (s, "GstUDPSrcTimeout")) { + gboolean ignore_timeout; + + GST_DEBUG_OBJECT (bin, "timeout on UDP port"); + + GST_OBJECT_LOCK (demux); + ignore_timeout = demux->ignore_timeout; + demux->ignore_timeout = TRUE; + GST_OBJECT_UNLOCK (demux); + + /* we only act on the first udp timeout message, others are irrelevant + * and can be ignored. */ + if (ignore_timeout) + gst_message_unref (message); + else { + GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL), + ("Could not receive any UDP packets for %.4f seconds, maybe your " + "firewall is blocking it.", + gst_guint64_to_gdouble (demux->udp_timeout / 1000000))); + } + return; + } + GST_BIN_CLASS (parent_class)->handle_message (bin, message); + break; + } + case GST_MESSAGE_ERROR: + { + GstObject *udpsrc; + GstSDPStream *stream; + GstFlowReturn ret; + + udpsrc = GST_MESSAGE_SRC (message); + + GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc)); + + stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc); + /* fatal but not our message, forward */ + if (!stream) + goto forward; + + /* we ignore the RTCP udpsrc */ + if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc)) + goto done; + + /* if we get error messages from the udp sources, that's not a problem as + * long as not all of them error out. We also don't really know what the + * problem is, the message does not give enough detail... */ + ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED); + GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret)); + if (ret != GST_FLOW_OK) + goto forward; + + done: + gst_message_unref (message); + break; + + forward: + GST_BIN_CLASS (parent_class)->handle_message (bin, message); + break; + } + default: + { + GST_BIN_CLASS (parent_class)->handle_message (bin, message); + break; + } + } +} + +static gboolean +gst_sdp_demux_start (GstSDPDemux * demux) +{ + guint8 *data; + guint size; + gint i, n_streams; + GstSDPMessage sdp = { 0 }; + GstSDPStream *stream = NULL; + GList *walk; + + /* grab the lock so that no state change can interfere */ + GST_SDP_STREAM_LOCK (demux); + + GST_DEBUG_OBJECT (demux, "parse SDP..."); + + size = gst_adapter_available (demux->adapter); + data = gst_adapter_take (demux->adapter, size); + + gst_sdp_message_init (&sdp); + if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK) + goto could_not_parse; + + if (demux->debug) + gst_sdp_message_dump (&sdp); + + /* try to get and configure a manager */ + if (!gst_sdp_demux_configure_manager (demux)) + goto no_manager; + + /* create streams with UDP sources and sinks */ + n_streams = gst_sdp_message_medias_len (&sdp); + for (i = 0; i < n_streams; i++) { + stream = gst_sdp_demux_create_stream (demux, &sdp, i); + + GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream); + + if (!gst_sdp_demux_stream_configure_udp (demux, stream)) + goto transport_failed; + if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream)) + goto transport_failed; + } + + /* set target state on session manager */ + gst_element_set_state (demux->session, demux->target); + + /* activate all streams */ + for (walk = demux->streams; walk; walk = g_list_next (walk)) { + stream = (GstSDPStream *) walk->data; + + /* configure target state on udp sources */ + gst_element_set_state (stream->udpsrc[0], demux->target); + gst_element_set_state (stream->udpsrc[1], demux->target); + } + GST_SDP_STREAM_UNLOCK (demux); + + return TRUE; + + /* ERRORS */ +transport_failed: + { + GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL), + ("Could not create RTP stream transport.")); + GST_SDP_STREAM_UNLOCK (demux); + return FALSE; + } +no_manager: + { + GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL), + ("Could not create RTP session manager.")); + GST_SDP_STREAM_UNLOCK (demux); + return FALSE; + } +could_not_parse: + { + gst_sdp_message_uninit (&sdp); + GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL), + ("Could not parse SDP message.")); + GST_SDP_STREAM_UNLOCK (demux); + return FALSE; + } +} + +static gboolean +gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event) +{ + GstSDPDemux *demux; + gboolean res = TRUE; + + demux = GST_SDP_DEMUX (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + /* when we get EOS, start parsing the SDP */ + res = gst_sdp_demux_start (demux); + gst_event_unref (event); + break; + default: + gst_event_unref (event); + break; + } + gst_object_unref (demux); + + return res; +} + +static GstFlowReturn +gst_sdp_demux_sink_chain (GstPad * pad, GstBuffer * buffer) +{ + GstSDPDemux *demux; + + demux = GST_SDP_DEMUX (gst_pad_get_parent (pad)); + + /* push the SDP message in an adapter, we start doing something with it when + * we receive EOS */ + gst_adapter_push (demux->adapter, buffer); + + gst_object_unref (demux); + + return GST_FLOW_OK; +} + +static GstStateChangeReturn +gst_sdp_demux_change_state (GstElement * element, GstStateChange transition) +{ + GstSDPDemux *demux; + GstStateChangeReturn ret; + + demux = GST_SDP_DEMUX (element); + + GST_SDP_STREAM_LOCK (demux); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + /* first attempt, don't ignore timeouts */ + gst_adapter_clear (demux->adapter); + demux->ignore_timeout = FALSE; + demux->target = GST_STATE_PAUSED; + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + demux->target = GST_STATE_PLAYING; + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (ret == GST_STATE_CHANGE_FAILURE) + goto done; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + ret = GST_STATE_CHANGE_NO_PREROLL; + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + ret = GST_STATE_CHANGE_NO_PREROLL; + demux->target = GST_STATE_PAUSED; + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_sdp_demux_cleanup (demux); + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + +done: + GST_SDP_STREAM_UNLOCK (demux); + + return ret; +} diff --git a/gst/sdp/gstsdpdemux.h b/gst/sdp/gstsdpdemux.h new file mode 100644 index 00000000..1c184419 --- /dev/null +++ b/gst/sdp/gstsdpdemux.h @@ -0,0 +1,114 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __GST_SDP_DEMUX_H__ +#define __GST_SDP_DEMUX_H__ + +#include +#include + +G_BEGIN_DECLS + +#define GST_TYPE_SDP_DEMUX \ + (gst_sdp_demux_get_type()) +#define GST_SDP_DEMUX(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SDP_DEMUX,GstSDPDemux)) +#define GST_SDP_DEMUX_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SDP_DEMUX,GstSDPDemuxClass)) +#define GST_IS_SDP_DEMUX(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SDP_DEMUX)) +#define GST_IS_SDP_DEMUX_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SDP_DEMUX)) +#define GST_SDP_DEMUX_CAST(obj) \ + ((GstSDPDemux *)(obj)) + +typedef struct _GstSDPDemux GstSDPDemux; +typedef struct _GstSDPDemuxClass GstSDPDemuxClass; + +#define GST_SDP_STREAM_GET_LOCK(sdp) (GST_SDP_DEMUX_CAST(sdp)->stream_rec_lock) +#define GST_SDP_STREAM_LOCK(sdp) (g_static_rec_mutex_lock (GST_SDP_STREAM_GET_LOCK(sdp))) +#define GST_SDP_STREAM_UNLOCK(sdp) (g_static_rec_mutex_unlock (GST_SDP_STREAM_GET_LOCK(sdp))) + +typedef struct _GstSDPStream GstSDPStream; + +struct _GstSDPStream { + gint id; + + GstSDPDemux *parent; /* parent, no extra ref to parent is taken */ + + /* pad we expose or NULL when it does not have an actual pad */ + GstPad *srcpad; + GstFlowReturn last_ret; + gboolean added; + gboolean disabled; + GstCaps *caps; + gboolean eos; + + /* our udp sources */ + GstElement *udpsrc[2]; + GstPad *channelpad[2]; + guint rtp_port; + guint rtcp_port; + + gchar *destination; + guint ttl; + + /* our udp sink back to the server */ + GstElement *udpsink; + GstPad *rtcppad; + + /* state */ + guint8 pt; + gboolean container; +}; + +struct _GstSDPDemux { + GstBin parent; + + GstPad *sinkpad; + GstAdapter *adapter; + GstState target; + + /* task for UDP loop */ + gboolean ignore_timeout; + + gint numstreams; + GStaticRecMutex *stream_rec_lock; + GList *streams; + + /* properties */ + gboolean debug; + guint64 udp_timeout; + guint latency; + + /* session management */ + GstElement *session; + gulong session_sig_id; + gulong session_ptmap_id; +}; + +struct _GstSDPDemuxClass { + GstBinClass parent_class; +}; + +GType gst_sdp_demux_get_type(void); + +G_END_DECLS + +#endif /* __GST_SDP_DEMUX_H__ */ diff --git a/gst/sdp/gstsdpelem.c b/gst/sdp/gstsdpelem.c new file mode 100644 index 00000000..8a0c8d9f --- /dev/null +++ b/gst/sdp/gstsdpelem.c @@ -0,0 +1,40 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstsdpdemux.h" + +static gboolean +plugin_init (GstPlugin * plugin) +{ + if (!gst_element_register (plugin, "sdpdemux", GST_RANK_SECONDARY, + GST_TYPE_SDP_DEMUX)) + return FALSE; + + return TRUE; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + "sdp", + "configure streaming sessions using SDP", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) -- cgit v1.2.1