From e88984ccbd4b2b7b5527cbc3ff488c5c85bb1e97 Mon Sep 17 00:00:00 2001 From: Vincent Penquerc'h Date: Sun, 15 Feb 2009 18:35:04 +0000 Subject: add new Kate plugin, for Kate overlay streams katedec: Kate decoder (text only) kateenc: Kate encoder (text and DVD SPU only) katetag: Kate tagger kateparse: Kate parser tiger: Kate renderer using the Tiger rendering library Fixes #525743. --- ext/kate/gstkateparse.c | 613 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 ext/kate/gstkateparse.c (limited to 'ext/kate/gstkateparse.c') diff --git a/ext/kate/gstkateparse.c b/ext/kate/gstkateparse.c new file mode 100644 index 00000000..84a752b7 --- /dev/null +++ b/ext/kate/gstkateparse.c @@ -0,0 +1,613 @@ +/* GStreamer + * Copyright (C) <2004> Thomas Vander Stichele + * Copyright (C) 2006 Andy Wingo + * Copyright (C) 2008 Vincent Penquerc'h + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +/** + * SECTION:element-kateparse + * @short_description: parses kate streams + * @see_also: katedec, vorbisparse, oggdemux, theoraparse + * + * + * + * The kateparse element will parse the header packets of the Kate + * stream and put them as the streamheader in the caps. This is used in the + * multifdsink case where you want to stream live kate streams to multiple + * clients, each client has to receive the streamheaders first before they can + * consume the kate packets. + * + * + * This element also makes sure that the buffers that it pushes out are properly + * timestamped and that their offset and offset_end are set. The buffers that + * kateparse outputs have all of the metadata that oggmux expects to receive, + * which allows you to (for example) remux an ogg/kate file. + * + * Example pipelines + * + * + * gst-launch -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink + * + * This pipeline shows that the streamheader is set in the caps, and that each + * buffer has the timestamp, duration, offset, and offset_end set. + * + * + * + * gst-launch filesrc location=kate.ogg ! oggdemux ! kateparse \ + * ! oggmux ! filesink location=kate-remuxed.ogg + * + * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same + * as kate.ogg, but they should produce exactly the same decoded data. + * + * + * + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "gstkate.h" +#include "gstkateutil.h" +#include "gstkateparse.h" + +GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug); +#define GST_CAT_DEFAULT gst_kateparse_debug + +static const GstElementDetails gst_kate_parse_details = +GST_ELEMENT_DETAILS ("Kate stream parser", + "Codec/Parser/Subtitle", + "parse raw kate streams", + "Vincent Penquerc'h "); + +static GstStaticPadTemplate gst_kate_parse_sink_factory = +GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (GST_KATE_MIME_TYPE) + ); + +static GstStaticPadTemplate gst_kate_parse_src_factory = +GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (GST_KATE_MIME_TYPE) + ); + +GST_BOILERPLATE (GstKateParse, gst_kate_parse, GstElement, GST_TYPE_ELEMENT); + +static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstBuffer * buffer); +static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element, + GstStateChange transition); +static gboolean gst_kate_parse_sink_event (GstPad * pad, GstEvent * event); +static gboolean gst_kate_parse_src_query (GstPad * pad, GstQuery * query); +#if 0 +static gboolean gst_kate_parse_convert (GstPad * pad, + GstFormat src_format, gint64 src_value, + GstFormat * dest_format, gint64 * dest_value); +#endif +static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse, + GstBuffer * buf); + +static void +gst_kate_parse_base_init (gpointer g_class) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_kate_parse_src_factory)); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&gst_kate_parse_sink_factory)); + gst_element_class_set_details (element_class, &gst_kate_parse_details); +} + +static void +gst_kate_parse_class_init (GstKateParseClass * klass) +{ + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gstelement_class->change_state = gst_kate_parse_change_state; + + klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet); +} + +static void +gst_kate_parse_init (GstKateParse * parse, GstKateParseClass * g_class) +{ + parse->sinkpad = + gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink"); + gst_pad_set_chain_function (parse->sinkpad, + GST_DEBUG_FUNCPTR (gst_kate_parse_chain)); + gst_pad_set_event_function (parse->sinkpad, + GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event)); + gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad); + + parse->srcpad = + gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src"); + gst_pad_set_query_function (parse->srcpad, + GST_DEBUG_FUNCPTR (gst_kate_parse_src_query)); + gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad); +} + +static void +gst_kate_parse_drain_event_queue (GstKateParse * parse) +{ + while (parse->event_queue->length) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue)); + gst_pad_event_default (parse->sinkpad, event); + } +} + +static GstFlowReturn +gst_kate_parse_push_headers (GstKateParse * parse) +{ + /* mark and put on caps */ + GstCaps *caps; + GstBuffer *outbuf; + kate_packet packet; + GList *headers, *outbuf_list = NULL; + int ret; + gboolean res; + + /* get the headers into the caps, passing them to kate as we go */ + caps = + gst_kate_util_set_header_on_caps (&parse->element, + gst_pad_get_caps (parse->srcpad), parse->streamheader); + if (G_UNLIKELY (!caps)) { + GST_ERROR_OBJECT (parse, "Failed to set headers on caps"); + return GST_FLOW_ERROR; + } + + GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps); + res = gst_pad_set_caps (parse->srcpad, caps); + gst_caps_unref (caps); + if (G_UNLIKELY (!res)) { + GST_WARNING_OBJECT (parse, "Failed to set pad caps"); + return GST_FLOW_ERROR; + } + + headers = parse->streamheader; + while (headers) { + outbuf = GST_BUFFER_CAST (headers->data); + kate_packet_wrap (&packet, GST_BUFFER_SIZE (outbuf), + GST_BUFFER_DATA (outbuf)); + ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet); + if (G_UNLIKELY (ret < 0)) { + GST_WARNING_OBJECT (parse, "kate_decode_headerin returned %d", ret); + } + outbuf_list = g_list_append (outbuf_list, outbuf); + headers = headers->next; + } + + /* first process queued events */ + gst_kate_parse_drain_event_queue (parse); + + /* push out buffers, ignoring return value... */ + headers = outbuf_list; + while (headers) { + outbuf = GST_BUFFER_CAST (headers->data); + gst_buffer_set_caps (outbuf, GST_PAD_CAPS (parse->srcpad)); + gst_pad_push (parse->srcpad, outbuf); + headers = headers->next; + } + + g_list_free (outbuf_list); + + parse->streamheader_sent = TRUE; + + return GST_FLOW_OK; +} + +static void +gst_kate_parse_clear_queue (GstKateParse * parse) +{ + GST_DEBUG_OBJECT (parse, "Clearing queue"); + while (parse->buffer_queue->length) { + GstBuffer *buf; + + buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue)); + gst_buffer_unref (buf); + } + while (parse->event_queue->length) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue)); + gst_event_unref (event); + } +} + +static GstFlowReturn +gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf, + gint64 granulepos) +{ + GST_LOG_OBJECT (parse, "granulepos %16llx", granulepos); + if (granulepos < 0) { + /* packets coming not from Ogg won't have a granpos in the offset end, + so we have to synthesize one here - only problem is we don't know + the backlink - pretend there's none for now */ + GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one"); + granulepos = + kate_duration_granule (&parse->ki, + GST_BUFFER_TIMESTAMP (buf) / + (double) GST_SECOND) << kate_granule_shift (&parse->ki); + } + GST_BUFFER_OFFSET (buf) = + kate_granule_time (&parse->ki, granulepos) * GST_SECOND; + GST_BUFFER_OFFSET_END (buf) = granulepos; + GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf); + + /* Hack to flush each packet on its own page - taken off the CMML encoder element */ + /* TODO: this is shite and needs to go once I find a way to tell Ogg to flush + as it messes up Matroska's track duration */ + GST_BUFFER_DURATION (buf) = G_MAXINT64; + + gst_buffer_set_caps (buf, GST_PAD_CAPS (parse->srcpad)); + + return gst_pad_push (parse->srcpad, buf); +} + +static GstFlowReturn +gst_kate_parse_drain_queue_prematurely (GstKateParse * parse) +{ + GstFlowReturn ret = GST_FLOW_OK; + + /* got an EOS event, make sure to push out any buffers that were in the queue + * -- won't normally be the case, but this catches the + * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous + * stream. */ + + /* if we got EOS before any buffers came, go ahead and push the other events + * first */ + gst_kate_parse_drain_event_queue (parse); + + while (!g_queue_is_empty (parse->buffer_queue)) { + GstBuffer *buf; + gint64 granpos; + + buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue)); + + granpos = GST_BUFFER_OFFSET_END (buf); + ret = gst_kate_parse_push_buffer (parse, buf, granpos); + + if (ret != GST_FLOW_OK) + goto done; + } + + g_assert (g_queue_is_empty (parse->buffer_queue)); + +done: + return ret; +} + +static GstFlowReturn +gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos) +{ + GstFlowReturn ret = GST_FLOW_OK; + + if (!g_queue_is_empty (parse->buffer_queue)) { + GstBuffer *buf; + buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue)); + ret = gst_kate_parse_push_buffer (parse, buf, granulepos); + + if (ret != GST_FLOW_OK) + goto done; + } + g_assert (g_queue_is_empty (parse->buffer_queue)); + +done: + return ret; +} + +static GstFlowReturn +gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf) +{ + GstFlowReturn ret = GST_FLOW_OK; + gint64 granpos; + + buf = gst_buffer_make_metadata_writable (buf); + + /* oggdemux stores the granule pos in the offset end */ + granpos = GST_BUFFER_OFFSET_END (buf); + GST_LOG_OBJECT (parse, "granpos %16llx", granpos); + g_queue_push_tail (parse->buffer_queue, buf); + +#if 1 + /* if getting buffers from matroska, we won't have a granpos here... */ + //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) { + ret = gst_kate_parse_drain_queue (parse, granpos); + //} +#else + if (granpos >= 0) { + ret = gst_kate_parse_drain_queue (parse, granpos); + } else { + GST_WARNING_OBJECT (parse, "granulepos < 0 (%lld)", granpos); + ret = GST_FLOW_ERROR; + } +#endif + + return ret; +} + +static GstFlowReturn +gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf) +{ + GstFlowReturn ret = GST_FLOW_OK; + + g_assert (parse); + + parse->packetno++; + + GST_LOG_OBJECT (parse, "Got packet %02x, %u bytes", + GST_BUFFER_SIZE (buf) ? GST_BUFFER_DATA (buf)[0] : -1, + GST_BUFFER_SIZE (buf)); + + if (GST_BUFFER_SIZE (buf) > 0 && GST_BUFFER_DATA (buf)[0] & 0x80) { + GST_DEBUG_OBJECT (parse, "Found header %02x", GST_BUFFER_DATA (buf)[0]); + /* if 0x80 is set, it's streamheader, + * so put it on the streamheader list and return */ + parse->streamheader = g_list_append (parse->streamheader, buf); + ret = GST_FLOW_OK; + } else { + if (!parse->streamheader_sent) { + GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far"); + ret = gst_kate_parse_push_headers (parse); + } + + if (ret == GST_FLOW_OK) { + ret = gst_kate_parse_queue_buffer (parse, buf); + } + } + + return ret; +} + +static GstFlowReturn +gst_kate_parse_chain (GstPad * pad, GstBuffer * buffer) +{ + GstKateParseClass *klass; + GstKateParse *parse; + + parse = GST_KATE_PARSE (GST_PAD_PARENT (pad)); + klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse)); + + g_assert (klass->parse_packet != NULL); + + return klass->parse_packet (parse, buffer); +} + +static gboolean +gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event) +{ + GstFlowReturn ret = TRUE; + + g_queue_push_tail (parse->event_queue, event); + + return ret; +} + +static gboolean +gst_kate_parse_sink_event (GstPad * pad, GstEvent * event) +{ + gboolean ret; + GstKateParse *parse; + + parse = GST_KATE_PARSE (gst_pad_get_parent (pad)); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + gst_kate_parse_clear_queue (parse); + ret = gst_pad_event_default (pad, event); + break; + case GST_EVENT_EOS: + if (!parse->streamheader_sent) { + GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far"); + ret = gst_kate_parse_push_headers (parse); + if (ret != GST_FLOW_OK) + goto done; + } + gst_kate_parse_drain_queue_prematurely (parse); + ret = gst_pad_event_default (pad, event); + break; + default: + if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event)) + ret = gst_kate_parse_queue_event (parse, event); + else + ret = gst_pad_event_default (pad, event); + break; + } + +done: + gst_object_unref (parse); + + return ret; +} + +#if 0 +static gboolean +gst_kate_parse_convert (GstPad * pad, + GstFormat src_format, gint64 src_value, + GstFormat * dest_format, gint64 * dest_value) +{ + gboolean res = TRUE; + GstKateParse *parse; + + parse = GST_KATE_PARSE (GST_PAD_PARENT (pad)); + + /* fixme: assumes atomic access to lots of instance variables modified from + * the streaming thread, including 64-bit variables */ + + if (!parse->streamheader_sent) + return FALSE; + + if (src_format == *dest_format) { + *dest_value = src_value; + return TRUE; + } + + if (parse->sinkpad == pad && + (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES)) + return FALSE; + + switch (src_format) { + case GST_FORMAT_TIME: + switch (*dest_format) { + default: + res = FALSE; + } + break; + case GST_FORMAT_DEFAULT: + switch (*dest_format) { + case GST_FORMAT_TIME: + *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND; + break; + default: + res = FALSE; + } + break; + default: + res = FALSE; + } + + return res; +} +#endif + +static gboolean +gst_kate_parse_src_query (GstPad * pad, GstQuery * query) +{ +#if 1 + // TODO + GST_WARNING ("gst_kate_parse_src_query"); + return FALSE; +#else + gint64 granulepos; + GstKateParse *parse; + gboolean res = FALSE; + + parse = GST_KATE_PARSE (GST_PAD_PARENT (pad)); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_POSITION: + { + GstFormat format; + gint64 value; + + granulepos = parse->prev_granulepos; + + gst_query_parse_position (query, &format, NULL); + + /* and convert to the final format */ + if (!(res = + gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos, + &format, &value))) + goto error; + + /* fixme: support segments + value = (value - parse->segment_start) + parse->segment_time; + */ + + gst_query_set_position (query, format, value); + + GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %" + G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)", + query, granulepos, value, format); + + break; + } + case GST_QUERY_DURATION: + { + /* fixme: not threadsafe */ + /* query peer for total length */ + if (!gst_pad_is_linked (parse->sinkpad)) { + GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked", + parse->sinkpad); + goto error; + } + if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query))) + goto error; + break; + } + case GST_QUERY_CONVERT: + { + GstFormat src_fmt, dest_fmt; + gint64 src_val, dest_val; + + gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val); + if (!(res = + gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt, + &dest_val))) + goto error; + gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val); + break; + } + default: + res = gst_pad_query_default (pad, query); + break; + } + return res; + +error: + { + GST_WARNING_OBJECT (parse, "error handling query"); + return res; + } +#endif +} + +static GstStateChangeReturn +gst_kate_parse_change_state (GstElement * element, GstStateChange transition) +{ + GstKateParse *parse = GST_KATE_PARSE (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + kate_info_init (&parse->ki); + kate_comment_init (&parse->kc); + parse->packetno = 0; + parse->streamheader_sent = FALSE; + parse->streamheader = NULL; + parse->buffer_queue = g_queue_new (); + parse->event_queue = g_queue_new (); + break; + default: + break; + } + + ret = parent_class->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + kate_info_clear (&parse->ki); + kate_comment_clear (&parse->kc); + + gst_kate_parse_clear_queue (parse); + g_queue_free (parse->buffer_queue); + parse->buffer_queue = NULL; + g_queue_free (parse->event_queue); + parse->event_queue = NULL; + break; + + default: + break; + } + + return ret; +} -- cgit v1.2.1