diff options
author | Branko Subasic <branko.subasic at axis.com> | 2009-06-19 19:09:19 +0200 |
---|---|---|
committer | Wim Taymans <wim.taymans@collabora.co.uk> | 2009-06-19 19:10:35 +0200 |
commit | c70dbe94b5ff9a0993d852605d40c21020c59552 (patch) | |
tree | d603a4337beda1641cba8ac5644e056ee0577124 /gst | |
parent | 11dc33bea09febc98474fcb0cb3fbd8769dd21ae (diff) | |
download | gst-plugins-bad-c70dbe94b5ff9a0993d852605d40c21020c59552.tar.gz gst-plugins-bad-c70dbe94b5ff9a0993d852605d40c21020c59552.tar.bz2 gst-plugins-bad-c70dbe94b5ff9a0993d852605d40c21020c59552.zip |
rtpbin: add support for buffer-list
Add support for sending buffer-lists.
Add unit test for testing that the buffer-list passed through rtpbin.
fixes #585839
Diffstat (limited to 'gst')
-rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 62 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsession.c | 37 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsession.h | 4 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsource.c | 118 | ||||
-rw-r--r-- | gst/rtpmanager/rtpsource.h | 2 |
5 files changed, 161 insertions, 62 deletions
diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index c33fdfc6..9407ee52 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -259,7 +259,7 @@ struct _GstRtpSessionPrivate static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, - RTPSource * src, GstBuffer * buffer, gpointer user_data); + RTPSource * src, gpointer data, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data); static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, @@ -1032,8 +1032,8 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession) g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL); } -/* called when the session manager has an RTP packet ready for further - * processing */ +/* called when the session manager has an RTP packet or a list of packets + * ready for further processing */ static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data) @@ -1060,7 +1060,7 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, * sending */ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, - GstBuffer * buffer, gpointer user_data) + gpointer data, gpointer user_data) { GstFlowReturn result; GstRtpSession *rtpsession; @@ -1069,12 +1069,17 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; - GST_LOG_OBJECT (rtpsession, "sending RTP packet"); - if (rtpsession->send_rtp_src) { - result = gst_pad_push (rtpsession->send_rtp_src, buffer); + if (GST_IS_BUFFER (data)) { + GST_LOG_OBJECT (rtpsession, "sending RTP packet"); + result = gst_pad_push (rtpsession->send_rtp_src, GST_BUFFER_CAST (data)); + } else { + GST_LOG_OBJECT (rtpsession, "sending RTP list"); + result = gst_pad_push_list (rtpsession->send_rtp_src, + GST_BUFFER_LIST_CAST (data)); + } } else { - gst_buffer_unref (buffer); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); result = GST_FLOW_OK; } return result; @@ -1642,11 +1647,12 @@ gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstCaps * caps) return TRUE; } -/* Recieve an RTP packet to be send to the receivers, send to RTP session - * manager and forward to send_rtp_src. +/* Recieve an RTP packet or a list of packets to be send to the receivers, + * send to RTP session manager and forward to send_rtp_src. */ static GstFlowReturn -gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) +gst_rtp_session_chain_send_rtp_common (GstPad * pad, gpointer data, + gboolean is_list) { GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; @@ -1658,10 +1664,22 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; - GST_LOG_OBJECT (rtpsession, "received RTP packet"); + GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet"); /* get NTP time when this packet was captured, this depends on the timestamp. */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (is_list) { + GstBuffer *buffer = NULL; + + /* All groups in an list have the same timestamp. + * So, just take it from the first group. */ + buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0, 0); + if (buffer) + timestamp = GST_BUFFER_TIMESTAMP (buffer); + else + timestamp = -1; + } else { + timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (data)); + } if (GST_CLOCK_TIME_IS_VALID (timestamp)) { /* convert to running time using the segment start value. */ ntpnstime = @@ -1676,7 +1694,9 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) } current_time = gst_clock_get_time (priv->sysclock); - ret = rtp_session_send_rtp (priv->session, buffer, current_time, ntpnstime); + ret = + rtp_session_send_rtp (priv->session, data, is_list, current_time, + ntpnstime); if (ret != GST_FLOW_OK) goto push_error; @@ -1694,6 +1714,18 @@ push_error: } } +static GstFlowReturn +gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) +{ + return gst_rtp_session_chain_send_rtp_common (pad, buffer, FALSE); +} + +static GstFlowReturn +gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstBufferList * list) +{ + return gst_rtp_session_chain_send_rtp_common (pad, list, TRUE); +} + /* Create sinkpad to receive RTP packets from senders. This will also create a * srcpad for the RTP packets. */ @@ -1817,6 +1849,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) "send_rtp_sink"); gst_pad_set_chain_function (rtpsession->send_rtp_sink, gst_rtp_session_chain_send_rtp); + gst_pad_set_chain_list_function (rtpsession->send_rtp_sink, + gst_rtp_session_chain_send_rtp_list); gst_pad_set_getcaps_function (rtpsession->send_rtp_sink, gst_rtp_session_getcaps_send_rtp); gst_pad_set_setcaps_function (rtpsession->send_rtp_sink, diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 219aacf1..cda04182 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -958,7 +958,7 @@ rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type) } static GstFlowReturn -source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) +source_push_rtp (RTPSource * source, gpointer data, RTPSession * session) { GstFlowReturn result = GST_FLOW_OK; @@ -969,21 +969,21 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (session->callbacks.send_rtp) result = - session->callbacks.send_rtp (session, source, buffer, + session->callbacks.send_rtp (session, source, data, session->send_rtp_user_data); - else - gst_buffer_unref (buffer); - + else { + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + } } else { GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc); RTP_SESSION_UNLOCK (session); if (session->callbacks.process_rtp) result = - session->callbacks.process_rtp (session, source, buffer, - session->process_rtp_user_data); + session->callbacks.process_rtp (session, source, + GST_BUFFER_CAST (data), session->process_rtp_user_data); else - gst_buffer_unref (buffer); + gst_buffer_unref (GST_BUFFER_CAST (data)); } RTP_SESSION_LOCK (session); @@ -1962,7 +1962,7 @@ ignore: /** * rtp_session_send_rtp: * @sess: an #RTPSession - * @buffer: an RTP buffer + * @data: pointer to either an RTP buffer or a list of RTP buffers * @current_time: the current system time * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured. * This is the buffer timestamp converted to NTP time. @@ -1973,20 +1973,27 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, +rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, GstClockTime current_time, guint64 ntpnstime) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + gboolean valid_packet; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - if (!gst_rtp_buffer_validate (buffer)) + if (is_list) { + valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data)); + } else { + valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data)); + } + + if (!valid_packet) goto invalid_packet; - GST_LOG ("received RTP packet for sending"); + GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet"); RTP_SESSION_LOCK (sess); source = sess->source; @@ -1997,7 +2004,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (source, buffer, ntpnstime); + result = rtp_source_send_rtp (source, data, is_list, ntpnstime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; @@ -2008,7 +2015,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, /* ERRORS */ invalid_packet: { - gst_buffer_unref (buffer); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 7e327a7d..6312f1c1 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -65,7 +65,7 @@ typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, * * Returns: a #GstFlowReturn. */ -typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); +typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, gpointer data, gpointer user_data); /** * RTPSessionSendRTCP: @@ -288,7 +288,7 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer GstClockTime current_time); /* processing packets for sending */ -GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer, +GstFlowReturn rtp_session_send_rtp (RTPSession *sess, gpointer data, gboolean is_list, GstClockTime current_time, guint64 ntpnstime); /* stopping the session */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index ed080717..209c17b5 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -26,7 +26,7 @@ GST_DEBUG_CATEGORY_STATIC (rtp_source_debug); #define GST_CAT_DEFAULT rtp_source_debug -#define RTP_MAX_PROBATION_LEN 32 +#define RTP_MAX_PROBATION_LEN 32 /* signals and args */ enum @@ -1091,41 +1091,73 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason) src->received_bye = TRUE; } +static GstBufferListItem +set_ssrc (GstBuffer ** buffer, guint group, guint idx, RTPSource * src) +{ + *buffer = gst_buffer_make_writable (*buffer); + gst_rtp_buffer_set_ssrc (*buffer, src->ssrc); + return GST_BUFFER_LIST_SKIP_GROUP; +} + /** * rtp_source_send_rtp: * @src: an #RTPSource - * @buffer: an RTP buffer + * @data: an RTP buffer or a list of RTP buffers + * @is_list: if @data is a buffer or list * @ntpnstime: the NTP time when this buffer was captured in nanoseconds. This * is the buffer timestamp converted to NTP time. * - * Send an RTP @buffer originating from @src. This will make @src a sender. - * This function takes ownership of @buffer and modifies the SSRC in the RTP - * packet to that of @src when needed. + * Send @data (an RTP buffer or list of buffers) originating from @src. + * This will make @src a sender. This function takes ownership of @data and + * modifies the SSRC in the RTP packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) +rtp_source_send_rtp (RTPSource * src, gpointer data, gboolean is_list, + guint64 ntpnstime) { - GstFlowReturn result = GST_FLOW_OK; + GstFlowReturn result; guint len; guint32 rtptime; guint64 ext_rtptime; guint64 ntp_diff, rtp_diff; guint64 elapsed; + GstBufferList *list = NULL; + GstBuffer *buffer = NULL; + guint packets; + guint32 ssrc; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - len = gst_rtp_buffer_get_payload_len (buffer); + if (is_list) { + list = GST_BUFFER_LIST_CAST (data); + /* We can grab the caps from the first group, since all + * groups of a buffer list have same caps. */ + buffer = gst_buffer_list_get (list, 0, 0); + if (!buffer) + goto no_buffer; + } else { + buffer = GST_BUFFER_CAST (data); + } rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); /* we are a sender now */ src->is_sender = TRUE; + if (is_list) { + /* Each group makes up a network packet. */ + packets = gst_buffer_list_n_groups (list); + len = gst_rtp_buffer_list_get_payload_len (list); + } else { + packets = 1; + len = gst_rtp_buffer_get_payload_len (buffer); + } + /* update stats for the SR */ - src->stats.packets_sent++; + src->stats.packets_sent += packets; src->stats.octets_sent += len; src->bytes_sent += len; @@ -1156,7 +1188,11 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->bitrate = 0; } - rtptime = gst_rtp_buffer_get_timestamp (buffer); + if (is_list) { + rtptime = gst_rtp_buffer_list_get_timestamp (list); + } else { + rtptime = gst_rtp_buffer_get_timestamp (buffer); + } ext_rtptime = src->last_rtptime; ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); @@ -1180,31 +1216,53 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->last_ntpnstime = ntpnstime; /* push packet */ - if (src->callbacks.push_rtp) { - guint32 ssrc; + if (!src->callbacks.push_rtp) + goto no_callback; + if (is_list) { + ssrc = gst_rtp_buffer_list_get_ssrc (list); + } else { ssrc = gst_rtp_buffer_get_ssrc (buffer); - if (ssrc != src->ssrc) { - /* the SSRC of the packet is not correct, make a writable buffer and - * update the SSRC. This could involve a complete copy of the packet when - * it is not writable. Usually the payloader will use caps negotiation to - * get the correct SSRC from the session manager before pushing anything. */ - buffer = gst_buffer_make_writable (buffer); - - /* FIXME, we don't want to warn yet because we can't inform any payloader - * of the changes SSRC yet because we don't implement pad-alloc. */ - GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc, - src->ssrc); - gst_rtp_buffer_set_ssrc (buffer, src->ssrc); + } + + if (ssrc != src->ssrc) { + /* the SSRC of the packet is not correct, make a writable buffer and + * update the SSRC. This could involve a complete copy of the packet when + * it is not writable. Usually the payloader will use caps negotiation to + * get the correct SSRC from the session manager before pushing anything. */ + + /* FIXME, we don't want to warn yet because we can't inform any payloader + * of the changes SSRC yet because we don't implement pad-alloc. */ + GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc, + src->ssrc); + + if (is_list) { + list = gst_buffer_list_make_writable (list); + gst_buffer_list_foreach (list, (GstBufferListFunc) set_ssrc, src); + } else { + set_ssrc (&buffer, 0, 0, src); } - GST_LOG ("pushing RTP packet %" G_GUINT64_FORMAT, src->stats.packets_sent); - result = src->callbacks.push_rtp (src, buffer, src->user_data); - } else { - GST_WARNING ("no callback installed, dropping packet"); - gst_buffer_unref (buffer); } + GST_LOG ("pushing RTP %s %" G_GUINT64_FORMAT, is_list ? "list" : "packet", + src->stats.packets_sent); + + result = src->callbacks.push_rtp (src, data, src->user_data); return result; + + /* ERRORS */ +no_buffer: + { + GST_WARNING ("no buffers in buffer list"); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + return GST_FLOW_OK; + } +no_callback: + { + GST_WARNING ("no callback installed, dropping packet"); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + return GST_FLOW_OK; + } } /** diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index a44ac1cd..8286f2ec 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -194,7 +194,7 @@ void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *a /* handling RTP */ GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); -GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer, guint64 ntpnstime); +GstFlowReturn rtp_source_send_rtp (RTPSource *src, gpointer data, gboolean is_list, guint64 ntpnstime); /* RTCP messages */ void rtp_source_process_bye (RTPSource *src, const gchar *reason); |