diff options
Diffstat (limited to 'gst/asfmux/gstrtpasfpay.c')
-rw-r--r-- | gst/asfmux/gstrtpasfpay.c | 469 |
1 files changed, 469 insertions, 0 deletions
diff --git a/gst/asfmux/gstrtpasfpay.c b/gst/asfmux/gstrtpasfpay.c new file mode 100644 index 00000000..ffcfd85a --- /dev/null +++ b/gst/asfmux/gstrtpasfpay.c @@ -0,0 +1,469 @@ +/* ASF RTP Payloader plugin for GStreamer + * Copyright (C) 2009 Thiago Santos <thiagoss@embedded.ufcg.edu.br> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/* FIXME + * - this element doesn't follow (max/min) time properties, + * is it possible to do it with a container format? + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <gst/rtp/gstrtpbuffer.h> +#include <string.h> + +#include "gstrtpasfpay.h" + +GST_DEBUG_CATEGORY_STATIC (rtpasfpay_debug); +#define GST_CAT_DEFAULT (rtpasfpay_debug) + +/* elementfactory information */ +static const GstElementDetails gst_rtp_asf_pay_details = +GST_ELEMENT_DETAILS ("RTP ASF payloader", + "Codec/Payloader/Network", + "Payload-encodes ASF into RTP packets (MS_RTSP)", + "Thiago Santos <thiagoss@embedded.ufcg.edu.br>"); + +static GstStaticPadTemplate gst_rtp_asf_pay_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("video/x-ms-asf, " "parsed = (boolean) true") + ); + +static GstStaticPadTemplate gst_rtp_asf_pay_src_template = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp, " + "media = (string) {\"audio\", \"video\", \"application\"}, " + "clock-rate = (int) 1000, " "encoding-name = (string) \"X-ASF-PF\"") + ); + +static GstFlowReturn +gst_rtp_asf_pay_handle_buffer (GstBaseRTPPayload * rtppay, GstBuffer * buffer); +static gboolean +gst_rtp_asf_pay_set_caps (GstBaseRTPPayload * rtppay, GstCaps * caps); + +GST_BOILERPLATE (GstRtpAsfPay, gst_rtp_asf_pay, GstBaseRTPPayload, + GST_TYPE_BASE_RTP_PAYLOAD); + +static void +gst_rtp_asf_pay_init (GstRtpAsfPay * rtpasfpay, GstRtpAsfPayClass * klass) +{ + rtpasfpay->first_ts = 0; + rtpasfpay->config = NULL; + rtpasfpay->packets_count = 0; + rtpasfpay->state = ASF_NOT_STARTED; + rtpasfpay->headers = NULL; + rtpasfpay->current = NULL; +} + +static void +gst_rtp_asf_pay_finalize (GObject * object) +{ + GstRtpAsfPay *rtpasfpay; + rtpasfpay = GST_RTP_ASF_PAY (object); + g_free (rtpasfpay->config); + if (rtpasfpay->headers) + gst_buffer_unref (rtpasfpay->headers); + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtp_asf_pay_base_init (gpointer klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_asf_pay_sink_template)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_rtp_asf_pay_src_template)); + gst_element_class_set_details (element_class, &gst_rtp_asf_pay_details); +} + +static void +gst_rtp_asf_pay_class_init (GstRtpAsfPayClass * klass) +{ + GObjectClass *gobject_class; + GstBaseRTPPayloadClass *gstbasertppayload_class; + + gobject_class = (GObjectClass *) klass; + gstbasertppayload_class = (GstBaseRTPPayloadClass *) klass; + + gobject_class->finalize = gst_rtp_asf_pay_finalize; + + gstbasertppayload_class->handle_buffer = gst_rtp_asf_pay_handle_buffer; + gstbasertppayload_class->set_caps = gst_rtp_asf_pay_set_caps; + + GST_DEBUG_CATEGORY_INIT (rtpasfpay_debug, "rtpasfpay", 0, + "ASF RTP Payloader"); +} + +static gboolean +gst_rtp_asf_pay_set_caps (GstBaseRTPPayload * rtppay, GstCaps * caps) +{ + /* FIXME change application for the actual content */ + gst_basertppayload_set_options (rtppay, "application", TRUE, "X-ASF-PF", + 1000); + return TRUE; +} + +static GstFlowReturn +gst_rtp_asf_pay_handle_packet (GstRtpAsfPay * rtpasfpay, GstBuffer * buffer) +{ + GstBaseRTPPayload *rtppay; + GstAsfPacketInfo *packetinfo; + guint8 flags; + guint8 *data; + guint32 packet_util_size; + guint32 packet_offset; + guint32 size_left; + GstFlowReturn ret = GST_FLOW_OK; + + rtppay = GST_BASE_RTP_PAYLOAD (rtpasfpay); + packetinfo = &rtpasfpay->packetinfo; + + if (!gst_asf_parse_packet (buffer, packetinfo, TRUE)) { + GST_ERROR_OBJECT (rtpasfpay, "Error while parsing asf packet"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } + + if (packetinfo->packet_size == 0) + packetinfo->packet_size = rtpasfpay->asfinfo.packet_size; + + GST_LOG_OBJECT (rtpasfpay, "Packet size: %" G_GUINT32_FORMAT + ", padding: %" G_GUINT32_FORMAT, packetinfo->packet_size, + packetinfo->padding); + + /* update padding field to 0 */ + if (packetinfo->padding > 0) { + GstAsfPacketInfo info; + /* find padding field offset */ + guint offset = packetinfo->err_cor_len + 2 + + gst_asf_get_var_size_field_len (packetinfo->packet_field_type) + + gst_asf_get_var_size_field_len (packetinfo->seq_field_type); + buffer = gst_buffer_make_writable (buffer); + switch (packetinfo->padd_field_type) { + case ASF_FIELD_TYPE_DWORD: + GST_WRITE_UINT32_LE (&(GST_BUFFER_DATA (buffer)[offset]), 0); + break; + case ASF_FIELD_TYPE_WORD: + GST_WRITE_UINT16_LE (&(GST_BUFFER_DATA (buffer)[offset]), 0); + break; + case ASF_FIELD_TYPE_BYTE: + GST_BUFFER_DATA (buffer)[offset] = 0; + break; + case ASF_FIELD_TYPE_NONE: + default: + break; + } + gst_asf_parse_packet (buffer, &info, FALSE); + } + + packet_util_size = packetinfo->packet_size - packetinfo->padding; + packet_offset = 0; + while (packet_util_size > 0) { + /* Even if we don't fill completely an output buffer we + * push it when we add an fragment. Because it seems that + * it is not possible to determine where a asf packet + * fragment ends inside a rtp packet payload. + * This flag tells us to push the packet. + */ + gboolean force_push = FALSE; + + /* we have no output buffer pending, create one */ + if (rtpasfpay->current == NULL) { + GST_LOG_OBJECT (rtpasfpay, "Creating new output buffer"); + rtpasfpay->current = + gst_rtp_buffer_new_allocate_len (GST_BASE_RTP_PAYLOAD_MTU (rtpasfpay), + 0, 0); + rtpasfpay->cur_off = gst_rtp_buffer_get_header_len (rtpasfpay->current); + rtpasfpay->has_ts = FALSE; + rtpasfpay->marker = FALSE; + } + data = GST_BUFFER_DATA (rtpasfpay->current) + rtpasfpay->cur_off; + size_left = GST_BUFFER_SIZE (rtpasfpay->current) - rtpasfpay->cur_off; + + GST_DEBUG_OBJECT (rtpasfpay, "Input buffer bytes consumed: %" + G_GUINT32_FORMAT "/%" G_GUINT32_FORMAT, packet_offset, + GST_BUFFER_SIZE (buffer)); + + GST_DEBUG_OBJECT (rtpasfpay, "Output rtpbuffer status"); + GST_DEBUG_OBJECT (rtpasfpay, "Current offset: %" G_GUINT32_FORMAT, + rtpasfpay->cur_off); + GST_DEBUG_OBJECT (rtpasfpay, "Size left: %" G_GUINT32_FORMAT, size_left); + GST_DEBUG_OBJECT (rtpasfpay, "Has ts: %s", + rtpasfpay->has_ts ? "yes" : "no"); + if (rtpasfpay->has_ts) { + GST_DEBUG_OBJECT (rtpasfpay, "Ts: %" G_GUINT32_FORMAT, rtpasfpay->ts); + } + + flags = 0; + if (packetinfo->has_keyframe) { + flags = flags | 0x80; + } + flags = flags | 0x20; /* Relative timestamp is present */ + + if (!rtpasfpay->has_ts) { + /* this is the first asf packet, its send time is the + * rtp packet timestamp */ + rtpasfpay->has_ts = TRUE; + rtpasfpay->ts = packetinfo->send_time; + } + + if (GST_BUFFER_SIZE (rtpasfpay->current) - rtpasfpay->cur_off >= + packet_util_size + 8) { + /* enough space for the rest of the packet */ + if (packet_offset == 0) { + flags = flags | 0x40; + GST_WRITE_UINT24_BE (data + 1, packet_util_size); + } else { + GST_WRITE_UINT24_BE (data + 1, packet_offset); + force_push = TRUE; + } + data[0] = flags; + GST_WRITE_UINT32_BE (data + 4, + (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts); + memcpy (data + 8, GST_BUFFER_DATA (buffer) + packet_offset, + packet_util_size); + + /* updating status variables */ + rtpasfpay->cur_off += 8 + packet_util_size; + size_left -= packet_util_size + 8; + packet_offset += packet_util_size; + packet_util_size = 0; + rtpasfpay->marker = TRUE; + } else { + /* fragment packet */ + data[0] = flags; + GST_WRITE_UINT24_BE (data + 1, packet_offset); + GST_WRITE_UINT32_BE (data + 4, + (gint32) (packetinfo->send_time) - (gint32) rtpasfpay->ts); + memcpy (data + 8, GST_BUFFER_DATA (buffer) + packet_offset, + size_left - 8); + + /* updating status variables */ + rtpasfpay->cur_off += size_left; + packet_offset += size_left - 8; + packet_util_size -= size_left - 8; + size_left = 0; + force_push = TRUE; + } + + /* there is not enough room for any more buffers */ + if (force_push || size_left <= 8) { + + if (size_left != 0) { + /* trim remaining bytes not used */ + GstBuffer *aux = gst_buffer_create_sub (rtpasfpay->current, 0, + GST_BUFFER_SIZE (rtpasfpay->current) - size_left); + gst_buffer_unref (rtpasfpay->current); + rtpasfpay->current = aux; + } + gst_rtp_buffer_set_ssrc (rtpasfpay->current, rtppay->current_ssrc); + gst_rtp_buffer_set_marker (rtpasfpay->current, rtpasfpay->marker); + gst_rtp_buffer_set_payload_type (rtpasfpay->current, + GST_BASE_RTP_PAYLOAD_PT (rtppay)); + gst_rtp_buffer_set_seq (rtpasfpay->current, rtppay->seqnum + 1); + gst_rtp_buffer_set_timestamp (rtpasfpay->current, packetinfo->send_time); + + GST_BUFFER_TIMESTAMP (rtpasfpay->current) = GST_BUFFER_TIMESTAMP (buffer); + + gst_buffer_set_caps (rtpasfpay->current, + GST_PAD_CAPS (GST_BASE_RTP_PAYLOAD_SRCPAD (rtppay))); + + rtppay->seqnum++; + rtppay->timestamp = packetinfo->send_time; + + GST_DEBUG_OBJECT (rtpasfpay, "Pushing rtp buffer"); + ret = + gst_pad_push (GST_BASE_RTP_PAYLOAD_SRCPAD (rtppay), + rtpasfpay->current); + rtpasfpay->current = NULL; + if (ret != GST_FLOW_OK) { + gst_buffer_unref (buffer); + return ret; + } + } + } + gst_buffer_unref (buffer); + return ret; +} + +static GstFlowReturn +gst_rtp_asf_pay_parse_headers (GstRtpAsfPay * rtpasfpay) +{ + GstFlowReturn ret = GST_FLOW_OK; + gchar *maxps; + g_return_val_if_fail (rtpasfpay->headers, GST_FLOW_ERROR); + + if (!gst_asf_parse_headers (rtpasfpay->headers, &rtpasfpay->asfinfo)) + goto error; + + GST_DEBUG_OBJECT (rtpasfpay, "Packets number: %" G_GUINT64_FORMAT, + rtpasfpay->asfinfo.packets_count); + GST_DEBUG_OBJECT (rtpasfpay, "Packets size: %" G_GUINT32_FORMAT, + rtpasfpay->asfinfo.packet_size); + GST_DEBUG_OBJECT (rtpasfpay, "Broadcast mode: %s", + rtpasfpay->asfinfo.broadcast ? "true" : "false"); + + /* get the config for caps */ + g_free (rtpasfpay->config); + rtpasfpay->config = g_base64_encode (GST_BUFFER_DATA (rtpasfpay->headers), + GST_BUFFER_SIZE (rtpasfpay->headers)); + GST_DEBUG_OBJECT (rtpasfpay, "Serialized headers to base64 string %s", + rtpasfpay->config); + + g_assert (rtpasfpay->config != NULL); + GST_DEBUG_OBJECT (rtpasfpay, "Setting optional caps values: maxps=%" + G_GUINT32_FORMAT " and config=%s", rtpasfpay->asfinfo.packet_size, + rtpasfpay->config); + maxps = + g_strdup_printf ("%" G_GUINT32_FORMAT, rtpasfpay->asfinfo.packet_size); + gst_basertppayload_set_outcaps (GST_BASE_RTP_PAYLOAD (rtpasfpay), "maxps", + G_TYPE_STRING, maxps, "config", G_TYPE_STRING, rtpasfpay->config, NULL); + g_free (maxps); + + return GST_FLOW_OK; + +error: + ret = GST_FLOW_ERROR; + GST_ERROR_OBJECT (rtpasfpay, "Error while parsing headers"); + return GST_FLOW_ERROR; +} + +static GstFlowReturn +gst_rtp_asf_pay_handle_buffer (GstBaseRTPPayload * rtppay, GstBuffer * buffer) +{ + GstRtpAsfPay *rtpasfpay = GST_RTP_ASF_PAY_CAST (rtppay); + + if (G_UNLIKELY (rtpasfpay->state == ASF_END)) { + GST_LOG_OBJECT (rtpasfpay, + "Dropping buffer as we already pushed all packets"); + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; /* we already finished our job */ + } + + /* receive headers + * we only accept if they are in a single buffer */ + if (G_UNLIKELY (rtpasfpay->state == ASF_NOT_STARTED)) { + guint64 header_size; + + if (GST_BUFFER_SIZE (buffer) < 24) { /* guid+object size size */ + GST_ERROR_OBJECT (rtpasfpay, + "Buffer too small, smaller than a Guid and object size"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } + + header_size = gst_asf_match_and_peek_obj_size (GST_BUFFER_DATA (buffer), + &(guids[ASF_HEADER_OBJECT_INDEX])); + if (header_size > 0) { + GST_DEBUG_OBJECT (rtpasfpay, "ASF header guid received, size %" + G_GUINT64_FORMAT, header_size); + + if (GST_BUFFER_SIZE (buffer) < header_size) { + GST_ERROR_OBJECT (rtpasfpay, "Headers should be contained in a single" + " buffer"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } else { + rtpasfpay->state = ASF_DATA_OBJECT; + + /* clear previous headers, if any */ + if (rtpasfpay->headers) { + gst_buffer_unref (rtpasfpay->headers); + } + + GST_DEBUG_OBJECT (rtpasfpay, "Storing headers"); + if (GST_BUFFER_SIZE (buffer) == header_size) { + rtpasfpay->headers = buffer; + return GST_FLOW_OK; + } else { + /* headers are a subbuffer of thie buffer */ + GstBuffer *aux = gst_buffer_create_sub (buffer, header_size, + GST_BUFFER_SIZE (buffer) - header_size); + rtpasfpay->headers = gst_buffer_create_sub (buffer, 0, header_size); + gst_buffer_replace (&buffer, aux); + } + } + } else { + GST_ERROR_OBJECT (rtpasfpay, "Missing ASF header start"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } + } + + if (G_UNLIKELY (rtpasfpay->state == ASF_DATA_OBJECT)) { + if (GST_BUFFER_SIZE (buffer) != ASF_DATA_OBJECT_SIZE) { + GST_ERROR_OBJECT (rtpasfpay, "Received buffer of different size of " + "the data object header"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } + + if (gst_asf_match_guid (GST_BUFFER_DATA (buffer), + &(guids[ASF_DATA_OBJECT_INDEX]))) { + GST_DEBUG_OBJECT (rtpasfpay, "Received data object header"); + rtpasfpay->headers = gst_buffer_join (rtpasfpay->headers, buffer); + rtpasfpay->state = ASF_PACKETS; + + return gst_rtp_asf_pay_parse_headers (rtpasfpay); + } else { + GST_ERROR_OBJECT (rtpasfpay, "Unexpected object received (was expecting " + "data object)"); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; + } + } + + if (G_LIKELY (rtpasfpay->state == ASF_PACKETS)) { + /* in broadcast mode we can't trust the packets count information + * from the headers + * We assume that if this is on broadcast mode it is a live stream + * and we are going to keep receiving packets indefinitely + */ + if (rtpasfpay->asfinfo.broadcast || + rtpasfpay->packets_count < rtpasfpay->asfinfo.packets_count) { + GST_DEBUG_OBJECT (rtpasfpay, "Received packet %" + G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT, + rtpasfpay->packets_count, rtpasfpay->asfinfo.packets_count); + rtpasfpay->packets_count++; + return gst_rtp_asf_pay_handle_packet (rtpasfpay, buffer); + } else { + GST_INFO_OBJECT (rtpasfpay, "Packets ended"); + rtpasfpay->state = ASF_END; + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; + } + } + + gst_buffer_unref (buffer); + return GST_FLOW_OK; +} + +gboolean +gst_rtp_asf_pay_plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "rtpasfpay", + GST_RANK_NONE, GST_TYPE_RTP_ASF_PAY); +} |