summaryrefslogtreecommitdiffstats
path: root/gst/gdp/gstgdppay.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/gdp/gstgdppay.c')
-rw-r--r--gst/gdp/gstgdppay.c307
1 files changed, 202 insertions, 105 deletions
diff --git a/gst/gdp/gstgdppay.c b/gst/gdp/gstgdppay.c
index 26eaab6e..e164b503 100644
--- a/gst/gdp/gstgdppay.c
+++ b/gst/gdp/gstgdppay.c
@@ -85,8 +85,12 @@ enum
GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
GST_TYPE_ELEMENT, _do_init);
+static void gst_gdp_pay_reset (GstGDPPay * this);
static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
+
+static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
+
static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
element, GstStateChange transition);
@@ -95,7 +99,7 @@ static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void gst_gdp_pay_dispose (GObject * gobject);
+static void gst_gdp_pay_finalize (GObject * gobject);
static void
gst_gdp_pay_base_init (gpointer g_class)
@@ -121,8 +125,7 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
- gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose);
- gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
+ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
g_param_spec_boolean ("crc-header", "CRC Header",
@@ -136,6 +139,8 @@ gst_gdp_pay_class_init (GstGDPPayClass * klass)
g_param_spec_enum ("version", "Version",
"Version of the GStreamer Data Protocol",
GST_TYPE_DP_VERSION, DEFAULT_VERSION, G_PARAM_READWRITE));
+
+ gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
}
static void
@@ -151,30 +156,61 @@ gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
gdppay->srcpad =
gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
+ gst_pad_set_event_function (gdppay->srcpad,
+ GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
- gdppay->offset = 0;
-
gdppay->crc_header = DEFAULT_CRC_HEADER;
gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
gdppay->version = DEFAULT_VERSION;
+ gdppay->offset = 0;
+
+ gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
}
static void
-gst_gdp_pay_dispose (GObject * gobject)
+gst_gdp_pay_finalize (GObject * gobject)
{
GstGDPPay *this = GST_GDP_PAY (gobject);
+ gst_gdp_pay_reset (this);
+ gst_dp_packetizer_free (this->packetizer);
+
+ GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
+}
+
+static void
+gst_gdp_pay_reset (GstGDPPay * this)
+{
+ /* clear the queued buffers */
+ while (this->queue) {
+ GstBuffer *buffer;
+
+ buffer = GST_BUFFER_CAST (this->queue->data);
+ GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+ /* delete buffer from queue now */
+ this->queue = g_list_delete_link (this->queue, this->queue);
+ }
+ if (this->caps) {
+ gst_caps_unref (this->caps);
+ this->caps = NULL;
+ }
if (this->caps_buf) {
gst_buffer_unref (this->caps_buf);
this->caps_buf = NULL;
}
+ if (this->tag_buf) {
+ gst_buffer_unref (this->tag_buf);
+ this->tag_buf = NULL;
+ }
if (this->new_segment_buf) {
gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = NULL;
}
- GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject));
+ this->sent_streamheader = FALSE;
+ this->offset = 0;
}
/* set OFFSET and OFFSET_END with running count */
@@ -195,10 +231,8 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
guint len;
if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
- &header, &payload)) {
- GST_WARNING_OBJECT (this, "could not create GDP header from caps");
- return NULL;
- }
+ &header, &payload))
+ goto packet_failed;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
headerbuf = gst_buffer_new ();
@@ -211,6 +245,13 @@ gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
+
+ /* ERRORS */
+packet_failed:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from caps");
+ return NULL;
+ }
}
static GstBuffer *
@@ -221,10 +262,8 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
guint len;
if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
- &header)) {
- GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
- return NULL;
- }
+ &header))
+ goto no_buffer;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
headerbuf = gst_buffer_new ();
@@ -233,7 +272,15 @@ gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
/* we do not want to lose the ref on the incoming buffer */
gst_buffer_ref (buffer);
+
return gst_buffer_join (headerbuf, buffer);
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
+ return NULL;
+ }
}
static GstBuffer *
@@ -248,12 +295,8 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
ret =
this->packetizer->packet_from_event (event, this->header_flag, &len,
&header, &payload);
-
- if (!ret) {
- GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
- gst_event_type_get_name (event->type), event->type);
- return NULL;
- }
+ if (!ret)
+ goto no_event;
GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
headerbuf = gst_buffer_new ();
@@ -266,6 +309,14 @@ gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
return gst_buffer_join (headerbuf, payloadbuf);
+
+ /* ERRORS */
+no_event:
+ {
+ GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
+ gst_event_type_get_name (event->type), event->type);
+ return NULL;
+ }
}
@@ -341,14 +392,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
bufval = &g_array_index (buffers, GValue, i);
buffer = g_value_peek_pointer (bufval);
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
- if (outbuffer) {
- g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, outbuffer);
- gst_value_array_append_value (&array, &value);
- g_value_unset (&value);
- }
- /* FIXME: if one or more in this loop fail to produce an outbuffer,
- * should we error out ? Once ? Every time ? */
+ if (!outbuffer)
+ goto no_buffer;
+
+ g_value_init (&value, GST_TYPE_BUFFER);
+ gst_value_set_buffer (&value, outbuffer);
+ gst_value_array_append_value (&array, &value);
+ g_value_unset (&value);
}
}
@@ -406,38 +456,50 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this)
this->sent_streamheader = TRUE;
GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
g_list_length (this->queue));
- if (this->queue) {
- GList *l;
-
- for (l = this->queue; l; l = g_list_next (l)) {
- GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data);
- gst_buffer_set_caps (l->data, caps);
- r = gst_pad_push (this->srcpad, l->data);
- if (r != GST_FLOW_OK) {
- GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
- return r;
- }
+ while (this->queue) {
+ GstBuffer *buffer;
+
+ buffer = GST_BUFFER_CAST (this->queue->data);
+ GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
+
+ /* delete buffer from queue now */
+ this->queue = g_list_delete_link (this->queue, this->queue);
+
+ /* set caps en push */
+ gst_buffer_set_caps (buffer, caps);
+ r = gst_pad_push (this->srcpad, buffer);
+ if (r != GST_FLOW_OK) {
+ GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
+ return r;
}
}
-
return r;
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+ ("failed to create GDP buffer from streamheader"));
+ return GST_FLOW_ERROR;
+ }
}
/* queue a buffer internally if we haven't sent streamheader buffers yet;
- * otherwise, just push on */
+ * otherwise, just push on, this takes ownership of the buffer. */
static GstFlowReturn
gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
{
if (this->sent_streamheader) {
- GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer);
- GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps);
+ GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
+ buffer, this->caps);
return gst_pad_push (this->srcpad, buffer);
}
- /* store it on an internal queue */
+ /* store it on an internal queue. buffer remains reffed. */
this->queue = g_list_append (this->queue, buffer);
GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
buffer, g_list_length (this->queue));
+
return GST_FLOW_OK;
}
@@ -466,12 +528,7 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
if (!outbuffer) {
GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
("Could not create GDP buffer from new segment event"));
-/*
- ret = GST_FLOW_ERROR;
- goto done;
-*/
} else {
-
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
GST_BUFFER_DURATION (outbuffer) = 0;
@@ -483,26 +540,16 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* make sure we've received caps before */
caps = gst_buffer_get_caps (buffer);
- if (!this->caps && !caps) {
- GST_WARNING_OBJECT (this, "first received buffer does not have caps set");
- if (caps)
- gst_caps_unref (caps);
- ret = GST_FLOW_NOT_NEGOTIATED;
- goto done;
- }
+ if (!this->caps && !caps)
+ goto no_caps;
/* if the caps have changed, process caps first */
if (caps && !gst_caps_is_equal (this->caps, caps)) {
GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
gst_caps_replace (&(this->caps), caps);
outbuffer = gst_gdp_buffer_from_caps (this, caps);
- if (!outbuffer) {
- GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
- gst_caps_unref (caps);
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!outbuffer)
+ goto no_caps_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -515,12 +562,8 @@ gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
/* create a GDP header packet,
* then create a GST buffer of the header packet and the buffer contents */
outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
- if (!outbuffer) {
- GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from buffer"));
- ret = GST_FLOW_ERROR;
- goto done;
- }
+ if (!outbuffer)
+ goto no_buffer;
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
@@ -532,6 +575,34 @@ done:
gst_buffer_unref (buffer);
gst_object_unref (this);
return ret;
+
+ /* ERRORS */
+no_caps:
+ {
+ /* when returning a fatal error as a GstFlowReturn we must post an error
+ * message */
+ GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
+ ("first received buffer does not have caps set"));
+ if (caps)
+ gst_caps_unref (caps);
+ ret = GST_FLOW_NOT_NEGOTIATED;
+ goto done;
+ }
+no_caps_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
+ gst_caps_unref (caps);
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
+no_buffer:
+ {
+ GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from buffer"));
+ ret = GST_FLOW_ERROR;
+ goto done;
+ }
}
static gboolean
@@ -547,13 +618,9 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
/* now turn the event into a buffer */
outbuffer = gst_gdp_buffer_from_event (this, event);
- if (!outbuffer) {
- GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
- ("Could not create GDP buffer from received event (type %s)",
- gst_event_type_get_name (event->type)));
- ret = FALSE;
- goto done;
- }
+ if (!outbuffer)
+ goto no_outbuffer;
+
gst_gdp_stamp_buffer (this, outbuffer);
GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
GST_BUFFER_DURATION (outbuffer) = 0;
@@ -562,47 +629,87 @@ gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
* and not send it on */
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_NEWSEGMENT:
- GST_DEBUG_OBJECT (this, "received new_segment event");
- if (this->new_segment_buf) {
- gst_buffer_unref (this->new_segment_buf);
- }
GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
outbuffer);
+
+ if (this->new_segment_buf)
+ gst_buffer_unref (this->new_segment_buf);
this->new_segment_buf = outbuffer;
+
gst_gdp_pay_reset_streamheader (this);
break;
case GST_EVENT_TAG:
- GST_DEBUG_OBJECT (this, "received tag event");
- if (this->tag_buf) {
- gst_buffer_unref (this->tag_buf);
- }
GST_DEBUG_OBJECT (this, "Storing buffer %p as tag_buf", outbuffer);
+
+ if (this->tag_buf)
+ gst_buffer_unref (this->tag_buf);
this->tag_buf = outbuffer;
+
gst_gdp_pay_reset_streamheader (this);
break;
default:
GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
event);
flowret = gst_gdp_queue_buffer (this, outbuffer);
- if (flowret != GST_FLOW_OK) {
- GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d",
- flowret);
- ret = FALSE;
- goto done;
- }
+ if (flowret != GST_FLOW_OK)
+ goto push_error;
break;
}
/* if we have EOS, we should send on EOS ourselves */
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
- return gst_pad_push_event (this->srcpad, event);
- };
+ /* ref, we unref later again */
+ ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
+ }
done:
- gst_object_unref (this);
gst_event_unref (event);
+ gst_object_unref (this);
+
return ret;
+
+ /* ERRORS */
+no_outbuffer:
+ {
+ GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
+ ("Could not create GDP buffer from received event (type %s)",
+ gst_event_type_get_name (event->type)));
+ ret = FALSE;
+ goto done;
+ }
+push_error:
+ {
+ GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
+ ret = FALSE;
+ goto done;
+ }
+}
+
+static gboolean
+gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
+{
+ GstGDPPay *this;
+ gboolean res = TRUE;
+
+ this = GST_GDP_PAY (gst_pad_get_parent (pad));
+
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_SEEK:
+ /* we refuse seek for now. */
+ gst_event_unref (event);
+ res = FALSE;
+ break;
+ case GST_EVENT_QOS:
+ case GST_EVENT_NAVIGATION:
+ default:
+ /* everything else is passed */
+ res = gst_pad_push_event (this->sinkpad, event);
+ break;
+ }
+ gst_object_unref (this);
+
+ return res;
}
static void
@@ -667,7 +774,6 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
- this->packetizer = gst_dp_packetizer_new (this->version);
break;
default:
break;
@@ -677,16 +783,7 @@ gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
- if (this->packetizer) {
- gst_dp_packetizer_free (this->packetizer);
- this->packetizer = NULL;
- }
- break;
- case GST_STATE_CHANGE_READY_TO_NULL:
- if (this->caps) {
- gst_caps_unref (this->caps);
- this->caps = NULL;
- }
+ gst_gdp_pay_reset (this);
break;
default:
break;