summaryrefslogtreecommitdiffstats
path: root/gst/gdp
diff options
context:
space:
mode:
Diffstat (limited to 'gst/gdp')
-rw-r--r--gst/gdp/gstgdpdepay.c269
-rw-r--r--gst/gdp/gstgdppay.c307
-rw-r--r--gst/gdp/gstgdppay.h1
3 files changed, 379 insertions, 198 deletions
diff --git a/gst/gdp/gstgdpdepay.c b/gst/gdp/gstgdpdepay.c
index 4b0266fa..ba391c40 100644
--- a/gst/gdp/gstgdpdepay.c
+++ b/gst/gdp/gstgdpdepay.c
@@ -56,7 +56,6 @@ GST_ELEMENT_DETAILS ("GDP Depayloader",
enum
{
PROP_0,
- /* FILL ME */
};
static GstStaticPadTemplate gdp_depay_sink_template =
@@ -82,6 +81,8 @@ GST_BOILERPLATE_FULL (GstGDPDepay, gst_gdp_depay, GstElement,
GST_TYPE_ELEMENT, _do_init);
static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event);
+static gboolean gst_gdp_depay_src_event (GstPad * pad, GstEvent * event);
+
static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer);
static GstStateChangeReturn gst_gdp_depay_change_state (GstElement *
@@ -129,6 +130,8 @@ gst_gdp_depay_init (GstGDPDepay * gdpdepay, GstGDPDepayClass * g_class)
gdpdepay->srcpad =
gst_pad_new_from_static_template (&gdp_depay_src_template, "src");
+ gst_pad_set_event_function (gdpdepay->srcpad,
+ GST_DEBUG_FUNCPTR (gst_gdp_depay_src_event));
/* our caps will always be decided by the incoming GDP caps buffers */
gst_pad_use_fixed_caps (gdpdepay->srcpad);
gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad);
@@ -144,19 +147,13 @@ gst_gdp_depay_finalize (GObject * gobject)
this = GST_GDP_DEPAY (gobject);
if (this->caps)
gst_caps_unref (this->caps);
- if (this->header)
- g_free (this->header);
+ g_free (this->header);
gst_adapter_clear (this->adapter);
g_object_unref (this->adapter);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
}
-/* we ignore most events because the only events we want to output are those
- * found in the gdp data stream.
- * An exception is the EOS event, if we get that on the sinkpad, we certainly
- * are not going to produce more data.
- */
static gboolean
gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
{
@@ -166,10 +163,20 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH_START:
+ case GST_EVENT_FLUSH_STOP:
+ /* forward flush start and stop */
+ res = gst_pad_push_event (this->srcpad, event);
+ break;
case GST_EVENT_EOS:
+ /* after EOS, we don't expect to output anything anymore */
res = gst_pad_push_event (this->srcpad, event);
break;
+ case GST_EVENT_NEWSEGMENT:
+ case GST_EVENT_TAG:
+ case GST_EVENT_BUFFERSIZE:
default:
+ /* we unref most events as we take them from the datastream */
gst_event_unref (event);
break;
}
@@ -178,6 +185,32 @@ gst_gdp_depay_sink_event (GstPad * pad, GstEvent * event)
return res;
}
+static gboolean
+gst_gdp_depay_src_event (GstPad * pad, GstEvent * event)
+{
+ GstGDPDepay *this;
+ gboolean res = TRUE;
+
+ this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEEK:
+ /* we refuse seek for now. */
+ gst_event_unref (event);
+ res = FALSE;
+ break;
+ case GST_EVENT_QOS:
+ case GST_EVENT_NAVIGATION:
+ default:
+ /* everything else is passed */
+ res = gst_pad_push_event (this->sinkpad, event);
+ break;
+ }
+ gst_object_unref (this);
+
+ return res;
+}
+
static GstFlowReturn
gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
{
@@ -186,108 +219,124 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GstCaps *caps;
GstBuffer *buf;
GstEvent *event;
- guint8 *header = NULL;
- guint8 *payload = NULL;
guint available;
- gboolean running = TRUE;
this = GST_GDP_DEPAY (gst_pad_get_parent (pad));
+ /* On DISCONT, get rid of accumulated data. We assume a buffer after the
+ * DISCONT contains (part of) a new valid header, if not we error because we
+ * lost sync */
+ if (GST_BUFFER_IS_DISCONT (buffer)) {
+ gst_adapter_clear (this->adapter);
+ this->state = GST_GDP_DEPAY_STATE_HEADER;
+ }
gst_adapter_push (this->adapter, buffer);
- while (running) {
+ while (TRUE) {
switch (this->state) {
case GST_GDP_DEPAY_STATE_HEADER:
+ {
+ guint8 *header;
+
+ /* collect a complete header, validate and store the header. Figure out
+ * the payload length and switch to the PAYLOAD state */
available = gst_adapter_available (this->adapter);
- if (available < GST_DP_HEADER_LENGTH) {
- running = FALSE;
- break;
- }
+ if (available < GST_DP_HEADER_LENGTH)
+ goto done;
- if (this->header)
- g_free (this->header);
GST_LOG_OBJECT (this, "reading GDP header from adapter");
header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH);
- if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header))
+ if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) {
+ g_free (header);
goto header_validate_error;
+ }
+ /* store types and payload length. Also store the header, which we need
+ * to make the payload. */
this->payload_length = gst_dp_header_payload_length (header);
this->payload_type = gst_dp_header_payload_type (header);
+ /* free previous header and store new one. */
+ g_free (this->header);
this->header = header;
+
GST_LOG_OBJECT (this,
"read GDP header, payload size %d, switching to state PAYLOAD",
this->payload_length);
this->state = GST_GDP_DEPAY_STATE_PAYLOAD;
break;
-
+ }
case GST_GDP_DEPAY_STATE_PAYLOAD:
+ {
+ /* in this state we wait for all the payload data to be available in the
+ * adapter. Then we switch to the state where we actually process the
+ * payload. */
available = gst_adapter_available (this->adapter);
- if (available < this->payload_length) {
- running = FALSE;
- break;
- }
+ if (available < this->payload_length)
+ goto done;
/* change state based on type */
- if (this->payload_type == GST_DP_PAYLOAD_BUFFER) {
- GST_LOG_OBJECT (this, "switching to state BUFFER");
- this->state = GST_GDP_DEPAY_STATE_BUFFER;
- } else if (this->payload_type == GST_DP_PAYLOAD_CAPS) {
- GST_LOG_OBJECT (this, "switching to state CAPS");
- this->state = GST_GDP_DEPAY_STATE_CAPS;
- } else if (this->payload_type >= GST_DP_PAYLOAD_EVENT_NONE) {
- GST_LOG_OBJECT (this, "switching to state EVENT");
- this->state = GST_GDP_DEPAY_STATE_EVENT;
- } else
- goto wrong_type;
+ switch (this->payload_type) {
+ case GST_DP_PAYLOAD_BUFFER:
+ GST_LOG_OBJECT (this, "switching to state BUFFER");
+ this->state = GST_GDP_DEPAY_STATE_BUFFER;
+ break;
+ case GST_DP_PAYLOAD_CAPS:
+ GST_LOG_OBJECT (this, "switching to state CAPS");
+ this->state = GST_GDP_DEPAY_STATE_CAPS;
+ break;
+ case GST_DP_PAYLOAD_EVENT_NONE:
+ GST_LOG_OBJECT (this, "switching to state EVENT");
+ this->state = GST_GDP_DEPAY_STATE_EVENT;
+ break;
+ default:
+ goto wrong_type;
+ }
break;
-
+ }
case GST_GDP_DEPAY_STATE_BUFFER:
- if (!this->caps) {
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("Received a buffer without first receiving caps"));
- ret = GST_FLOW_NOT_NEGOTIATED;
- goto done;
- }
+ {
+
+ /* if we receive a buffer without caps first, we error out */
+ if (!this->caps)
+ goto no_caps;
GST_LOG_OBJECT (this, "reading GDP buffer from adapter");
buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header);
- if (!buf) {
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("could not create buffer from GDP packet"));
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!buf)
+ goto buffer_failed;
- payload = gst_adapter_take (this->adapter, this->payload_length);
- memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
- g_free (payload);
- payload = NULL;
+ /* now take the payload if there is any */
+ if (this->payload_length > 0) {
+ guint8 *payload;
+
+ payload = gst_adapter_take (this->adapter, this->payload_length);
+ memcpy (GST_BUFFER_DATA (buf), payload, this->payload_length);
+ g_free (payload);
+ }
+ /* set caps and push */
gst_buffer_set_caps (buf, this->caps);
ret = gst_pad_push (this->srcpad, buf);
- if (ret != GST_FLOW_OK) {
- GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d",
- ret);
- goto done;
- }
+ if (ret != GST_FLOW_OK)
+ goto push_error;
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
-
+ }
case GST_GDP_DEPAY_STATE_CAPS:
+ {
+ guint8 *payload;
+
+ /* take the payload of the caps */
GST_LOG_OBJECT (this, "reading GDP caps from adapter");
payload = gst_adapter_take (this->adapter, this->payload_length);
caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload);
g_free (payload);
- payload = NULL;
- if (!caps) {
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("could not create caps from GDP packet"));
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!caps)
+ goto caps_failed;
+
GST_DEBUG_OBJECT (this, "read caps %" GST_PTR_FORMAT, caps);
gst_caps_replace (&(this->caps), caps);
gst_pad_set_caps (this->srcpad, caps);
@@ -297,24 +346,24 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
-
+ }
case GST_GDP_DEPAY_STATE_EVENT:
+ {
+ guint8 *payload;
+
GST_LOG_OBJECT (this, "reading GDP event from adapter");
+
/* adapter doesn't like 0 length payload */
if (this->payload_length > 0)
payload = gst_adapter_take (this->adapter, this->payload_length);
+ else
+ payload = NULL;
event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header,
payload);
- if (payload) {
- g_free (payload);
- payload = NULL;
- }
- if (!event) {
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("could not create event from GDP packet"));
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ g_free (payload);
+ if (!event)
+ goto event_failed;
+
GST_DEBUG_OBJECT (this, "sending deserialized event %p of type %s",
event, gst_event_type_get_name (event->type));
gst_pad_push_event (this->srcpad, event);
@@ -322,27 +371,62 @@ gst_gdp_depay_chain (GstPad * pad, GstBuffer * buffer)
GST_LOG_OBJECT (this, "switching to state HEADER");
this->state = GST_GDP_DEPAY_STATE_HEADER;
break;
+ }
}
}
- goto done;
-
-header_validate_error:
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("GDP packet header does not validate"));
- g_free (header);
- ret = GST_FLOW_ERROR;
- goto done;
-
-wrong_type:
- GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
- ("GDP packet header is of wrong type"));
- g_free (header);
- ret = GST_FLOW_ERROR;
- goto done;
done:
gst_object_unref (this);
return ret;
+
+ /* ERRORS */
+header_validate_error:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("GDP packet header does not validate"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+wrong_type:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("GDP packet header is of wrong type"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+no_caps:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("Received a buffer without first receiving caps"));
+ ret = GST_FLOW_NOT_NEGOTIATED;
+ goto done;
+ }
+buffer_failed:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("could not create buffer from GDP packet"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+push_error:
+ {
+ GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", ret);
+ goto done;
+ }
+caps_failed:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("could not create caps from GDP packet"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+event_failed:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
+ ("could not create event from GDP packet"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
}
static GstStateChangeReturn
@@ -354,7 +438,7 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
- case GST_STATE_CHANGE_READY_TO_NULL:
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
if (this->caps) {
gst_caps_unref (this->caps);
this->caps = NULL;
@@ -363,7 +447,6 @@ gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
default:
break;
}
-
return ret;
}
diff --git a/gst/gdp/gstgdppay.c b/gst/gdp/gstgdppay.c
index 26eaab6e..e164b503 100644
--- a/gst/gdp/gstgdppay.c
+++ b/gst/gdp/gstgdppay.c
@@ -85,8 +85,12 @@ enum
GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
GST_TYPE_ELEMENT, _do_init);
+static void gst_gdp_pay_reset (GstGDPPay * this);
static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
+
+static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
+
static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
element, GstStateChange transition);
@@ -95,7 +99,7 @@ static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void gst_gdp_pay_dispose (GObject * gobject);
+static void gst_gdp_pay_finalize (GObject * gobject);
static void
gst_gdp_pay_base_init (gpointer g_class)
@@ -121,8 +125,7 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
- gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose);
- gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
+ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
g_param_spec_boolean ("crc-header", "CRC Header",
@@ -136,6 +139,8 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
g_param_spec_enum ("version", "Version",
"Version of the GStreamer Data Protocol",
GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE));
+
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
}
static void
@@ -151,30 +156,61 @@ gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
gdppay->srcpad =
gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
+ gst_pad_set_event_function (gdppay->srcpad,
+ GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
- gdppay->offset = 0;
-
gdppay->crc_header = DEFAULT_CRC_HEADER;
gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
gdppay->version = DEFAULT_VERSION;
+ gdppay->offset = 0;
+
+ gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
}
static void
-gst_gdp_pay_dispose (GObject * gobject)
+gst_gdp_pay_finalize (GObject * gobject)
{
GstGDPPay *this = GST_GDP_PAY (gobject);
+ gst_gdp_pay_reset (this);
+ gst_dp_packetizer_free (this->packetizer);
+
+ GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
+}
+
+static void
+gst_gdp_pay_reset (GstGDPPay * this)
+{
+ /* clear the queued buffers */
+ while (this->queue) {
+ GstBuffer *buffer;
+
+ buffer = GST_BUFFER_CAST (this->queue->data);
+ GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+ /* delete buffer from queue now */
+ this->queue = g_list_delete_link (this->queue, this->queue);
+ }
+ if (this->caps) {
+ gst_caps_unref (this->caps);
+ this->caps = NULL;
+ }
if (this->caps_buf) {
gst_buffer_unref (this->caps_buf);
this->caps_buf = NULL;
}
+ if (this->tag_buf) {
+ gst_buffer_unref (this->tag_buf);
+ this->tag_buf = NULL;
+ }
if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = NULL;
}
- GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject));
+ this->sent_streamheader = FALSE;
+ this->offset = 0;
}
/* set OFFSET and OFFSET_END with running count */
@@ -195,10 +231,8 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
guint len;
if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
- &header, &payload)) {
- GST_WARNING_OBJECT (this, "could not create GDP header from caps");
- return NULL;
- }
+ &header, &payload))
+ goto packet_failed;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
headerbuf = gst_buffer_new ();
@@ -211,6 +245,13 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
+
+ /* ERRORS */
+packet_failed:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from caps");
+ return NULL;
+ }
}
static GstBuffer *
@@ -221,10 +262,8 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
guint len;
if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
- &header)) {
- GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
- return NULL;
- }
+ &header))
+ goto no_buffer;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
headerbuf = gst_buffer_new ();
@@ -233,7 +272,15 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
/* we do not want to lose the ref on the incoming buffer */
gst_buffer_ref (buffer);
+
return gst_buffer_join (headerbuf, buffer);
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
+ return NULL;
+ }
}
static GstBuffer *
@@ -248,12 +295,8 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
ret =
this->packetizer->packet_from_event (event, this->header_flag, &len,
&header, &payload);
-
- if (!ret) {
- GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
- gst_event_type_get_name (event->type), event->type);
- return NULL;
- }
+ if (!ret)
+ goto no_event;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
headerbuf = gst_buffer_new ();
@@ -266,6 +309,14 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
+
+ /* ERRORS */
+no_event:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
+ gst_event_type_get_name (event->type), event->type);
+ return NULL;
+ }
}
@@ -341,14 +392,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
bufval = &g_array_index (buffers, GValue, i);
buffer = g_value_peek_pointer (bufval);
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
- if (outbuffer) {
- g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, outbuffer);
- gst_value_array_append_value (&array, &value);
- g_value_unset (&value);
- }
- /* FIXME: if one or more in this loop fail to produce an outbuffer,
- * should we error out ? Once ? Every time ? */
+ if (!outbuffer)
+ goto no_buffer;
+
+ g_value_init (&value, GST_TYPE_BUFFER);
+ gst_value_set_buffer (&value, outbuffer);
+ gst_value_array_append_value (&array, &value);
+ g_value_unset (&value);
}
}
@@ -406,38 +456,50 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
this->sent_streamheader = TRUE;
GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
g_list_length (this->queue));
- if (this->queue) {
- GList *l;
-
- for (l = this->queue; l; l = g_list_next (l)) {
- GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data);
- gst_buffer_set_caps (l->data, caps);
- r = gst_pad_push (this->srcpad, l->data);
- if (r != GST_FLOW_OK) {
- GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
- return r;
- }
+ while (this->queue) {
+ GstBuffer *buffer;
+
+ buffer = GST_BUFFER_CAST (this->queue->data);
+ GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+ /* delete buffer from queue now */
+ this->queue = g_list_delete_link (this->queue, this->queue);
+
+ /* set caps en push */
+ gst_buffer_set_caps (buffer, caps);
+ r = gst_pad_push (this->srcpad, buffer);
+ if (r != GST_FLOW_OK) {
+ GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
+ return r;
}
}
-
return r;
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+ ("failed to create GDP buffer from streamheader"));
+ return GST_FLOW_ERROR;
+ }
}
/* queue a buffer internally if we haven't sent streamheader buffers yet;
- * otherwise, just push on */
+ * otherwise, just push on, this takes ownership of the buffer. */
static GstFlowReturn
gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
{
if (this->sent_streamheader) {
- GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer);
- GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps);
+ GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
+ buffer, this->caps);
return gst_pad_push (this->srcpad, buffer);
}
- /* store it on an internal queue */
+ /* store it on an internal queue. buffer remains reffed. */
this->queue = g_list_append (this->queue, buffer);
GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
buffer, g_list_length (this->queue));
+
return GST_FLOW_OK;
}
@@ -466,12 +528,7 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
if (!outbuffer) {
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from new segment event"));
-/*
- ret = GST_FLOW_ERROR;
- goto done;
-*/
} else {
-
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
GST_BUFFER_DURATION (outbuffer) = 0;
@@ -483,26 +540,16 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* make sure we've received caps before */
caps = gst_buffer_get_caps (buffer);
- if (!this->caps && !caps) {
- GST_WARNING_OBJECT (this, "first received buffer does not have caps set");
- if (caps)
- gst_caps_unref (caps);
- ret = GST_FLOW_NOT_NEGOTIATED;
- goto done;
- }
+ if (!this->caps && !caps)
+ goto no_caps;
/* if the caps have changed, process caps first */
if (caps && !gst_caps_is_equal (this->caps, caps)) {
GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
gst_caps_replace (&(this->caps), caps);
outbuffer = gst_gdp_buffer_from_caps (this, caps);
- if (!outbuffer) {
- GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
- gst_caps_unref (caps);
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!outbuffer)
+ goto no_caps_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -515,12 +562,8 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* create a GDP header packet,
* then create a GST buffer of the header packet and the buffer contents */
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
- if (!outbuffer) {
- GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from buffer"));
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!outbuffer)
+ goto no_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -532,6 +575,34 @@ done:
gst_buffer_unref (buffer);
gst_object_unref (this);
return ret;
+
+ /* ERRORS */
+no_caps:
+ {
+ /* when returning a fatal error as a GstFlowReturn we must post an error
+ * message */
+ GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+ ("first received buffer does not have caps set"));
+ if (caps)
+ gst_caps_unref (caps);
+ ret = GST_FLOW_NOT_NEGOTIATED;
+ goto done;
+ }
+no_caps_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
+ gst_caps_unref (caps);
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+no_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from buffer"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
}
static gboolean
@@ -547,13 +618,9 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
/* now turn the event into a buffer */
outbuffer = gst_gdp_buffer_from_event (this, event);
- if (!outbuffer) {
- GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from received event (type %s)",
- gst_event_type_get_name (event->type)));
- ret = FALSE;
- goto done;
- }
+ if (!outbuffer)
+ goto no_outbuffer;
+
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
GST_BUFFER_DURATION (outbuffer) = 0;
@@ -562,47 +629,87 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
* and not send it on */
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
- GST_DEBUG_OBJECT (this, "received new_segment event");
- if (this->new_segment_buf) {
- gst_buffer_unref (this->new_segment_buf);
- }
GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
outbuffer);
+
+ if (this->new_segment_buf)
+ gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = outbuffer;
+
gst_gdp_pay_reset_streamheader (this);
break;
case GST_EVENT_TAG:
- GST_DEBUG_OBJECT (this, "received tag event");
- if (this->tag_buf) {
- gst_buffer_unref (this->tag_buf);
- }
GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer);
+
+ if (this->tag_buf)
+ gst_buffer_unref (this->tag_buf);
this->tag_buf = outbuffer;
+
gst_gdp_pay_reset_streamheader (this);
break;
default:
GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
event);
flowret = gst_gdp_queue_buffer (this, outbuffer);
- if (flowret != GST_FLOW_OK) {
- GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d",
- flowret);
- ret = FALSE;
- goto done;
- }
+ if (flowret != GST_FLOW_OK)
+ goto push_error;
break;
}
/* if we have EOS, we should send on EOS ourselves */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
- return gst_pad_push_event (this->srcpad, event);
- };
+ /* ref, we unref later again */
+ ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
+ }
done:
- gst_object_unref (this);
gst_event_unref (event);
+ gst_object_unref (this);
+
return ret;
+
+ /* ERRORS */
+no_outbuffer:
+ {
+ GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from received event (type %s)",
+ gst_event_type_get_name (event->type)));
+ ret = FALSE;
+ goto done;
+ }
+push_error:
+ {
+ GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
+ ret = FALSE;
+ goto done;
+ }
+}
+
+static gboolean
+gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
+{
+ GstGDPPay *this;
+ gboolean res = TRUE;
+
+ this = GST_GDP_PAY (gst_pad_get_parent (pad));
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEEK:
+ /* we refuse seek for now. */
+ gst_event_unref (event);
+ res = FALSE;
+ break;
+ case GST_EVENT_QOS:
+ case GST_EVENT_NAVIGATION:
+ default:
+ /* everything else is passed */
+ res = gst_pad_push_event (this->sinkpad, event);
+ break;
+ }
+ gst_object_unref (this);
+
+ return res;
}
static void
@@ -667,7 +774,6 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
- this->packetizer = gst_dp_packetizer_new (this->version);
break;
default:
break;
@@ -677,16 +783,7 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
- if (this->packetizer) {
- gst_dp_packetizer_free (this->packetizer);
- this->packetizer = NULL;
- }
- break;
- case GST_STATE_CHANGE_READY_TO_NULL:
- if (this->caps) {
- gst_caps_unref (this->caps);
- this->caps = NULL;
- }
+ gst_gdp_pay_reset (this);
break;
default:
break;
diff --git a/gst/gdp/gstgdppay.h b/gst/gdp/gstgdppay.h
index 5bde0829..22bbcd29 100644
--- a/gst/gdp/gstgdppay.h
+++ b/gst/gdp/gstgdppay.h
@@ -41,6 +41,7 @@ typedef struct _GstGDPPayClass GstGDPPayClass;
struct _GstGDPPay
{
GstElement element;
+
GstPad *sinkpad;
GstPad *srcpad;