summaryrefslogtreecommitdiffstats
path: root/ext/neon/gstneonhttpsrc.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/neon/gstneonhttpsrc.c')
-rw-r--r--ext/neon/gstneonhttpsrc.c202
1 files changed, 71 insertions, 131 deletions
diff --git a/ext/neon/gstneonhttpsrc.c b/ext/neon/gstneonhttpsrc.c
index 1f9fad7e..319dc26b 100644
--- a/ext/neon/gstneonhttpsrc.c
+++ b/ext/neon/gstneonhttpsrc.c
@@ -49,11 +49,8 @@ enum
PROP_PROXY
};
-static void request_dispatch (void *data);
static void oom_callback ();
-static int accept_response (void *userdata, ne_request * req,
- const ne_status * st);
-static void block_reader (void *userdata, const char *buf, size_t len);
+
static void size_header_handler (void *userdata, const char *value);
static gboolean set_proxy (const char *uri, ne_uri * parsed,
@@ -165,11 +162,7 @@ gst_neonhttp_src_init (GstNeonhttpSrc * this, GstNeonhttpSrcClass * g_class)
set_uri (NULL, &this->uri, &this->ishttps, &this->uristr, TRUE);
set_proxy (NULL, &this->proxy, TRUE);
- this->lock = g_mutex_new ();
this->adapter = gst_adapter_new ();
- this->task = gst_task_create (request_dispatch, this);
- g_static_rec_mutex_init (&this->tasklock);
- gst_task_set_lock (this->task, &this->tasklock);
gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
@@ -200,22 +193,80 @@ gst_neonhttp_src_finalize (GObject * gobject)
g_object_unref (this->adapter);
}
- if (this->lock) {
- g_mutex_free (this->lock);
+ if (this->uristr) {
+ ne_free (this->uristr);
+ }
+
+}
+
+int
+request_dispatch (GstNeonhttpSrc * src, GstBuffer * outbuf)
+{
+
+ GstPad *peer;
+ int ret;
+ int read = 0;
+ int sizetoread = GST_BUFFER_SIZE (outbuf);
+
+ /* Loop sending the request:
+ * Retry whilst authentication fails and we supply it. */
+
+ ssize_t len = 0;
+
+ while (sizetoread > 0) {
+
+ if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
+ GST_BUFFER_SIZE (outbuf) = read;
+ return read;
+ }
+ len = ne_read_response_block (src->request,
+ (char *) GST_BUFFER_DATA (outbuf) + read, sizetoread);
+ if (len > 0) {
+ read += len;
+ sizetoread -= len;
+ } else {
+ break;
+ }
+
+ }
+
+ GST_BUFFER_SIZE (outbuf) = read;
+
+ if (len < 0) {
+ read = -2;
+ goto done;
+ } else if (len == 0) {
+ ret = ne_end_request (src->request);
+ if (ret != NE_RETRY) {
+ if (ret == NE_OK) {
+ GST_DEBUG ("Returning EOS");
+ peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
+ if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
+ ret = GST_FLOW_ERROR;
+ }
+ gst_object_unref (peer);
+ } else {
+ read = -3;
+ GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
+ ne_get_error (src->session));
+ }
+ }
+ goto done;
}
- gst_object_unref (this->task);
+done:
- g_static_rec_mutex_free (&this->tasklock);
+ return read;
}
+
static GstFlowReturn
gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstNeonhttpSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
- guint avail;
+ int read;
src = GST_NEONHTTP_SRC (psrc);
@@ -224,46 +275,19 @@ gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_LOG_OBJECT (src, "asked for a buffer");
- while (1) {
- g_mutex_lock (src->lock);
- if ((avail = gst_adapter_available (src->adapter))) {
- g_mutex_unlock (src->lock);
- break;
- } else if (src->eos) {
- GstPad *peer;
+ *outbuf = gst_buffer_new_and_alloc (GST_BASE_SRC (psrc)->blocksize);
- g_mutex_unlock (src->lock);
+ read = request_dispatch (src, *outbuf);
+ if (read > 0) {
- *outbuf = NULL;
- GST_DEBUG ("Returning EOS");
- peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
- if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
- ret = GST_FLOW_ERROR;
- }
- gst_object_unref (peer);
- goto done;
+ if (*outbuf) {
+ gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
}
- g_mutex_unlock (src->lock);
- usleep (250000);
- }
-
- g_mutex_lock (src->lock);
-
- avail = gst_adapter_available (src->adapter);
- avail = avail > (4 * 1024) ? (4 * 1024) : avail;
- *outbuf = gst_buffer_new_and_alloc (avail);
- memcpy (GST_BUFFER_DATA (*outbuf), gst_adapter_peek (src->adapter, avail),
- avail);
- gst_adapter_flush (src->adapter, avail);
-
- g_mutex_unlock (src->lock);
- if (*outbuf) {
- gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
+ } else if (read < 0) {
+ return GST_FLOW_ERROR;
}
-done:
-
return ret;
wrong_state:
@@ -516,17 +540,12 @@ gst_neonhttp_src_start (GstBaseSrc * bsrc)
ne_add_response_header_handler (src->request, "Content-Length",
size_header_handler, src);
- ne_add_response_body_reader (src->request, accept_response, block_reader,
- src);
-
if (NE_OK != ne_begin_request (src->request)) {
ret = FALSE;
goto done;
}
- src->eos = FALSE;
GST_OBJECT_FLAG_SET (src, GST_NEONHTTP_SRC_OPEN);
- gst_task_start (src->task);
done:
@@ -546,9 +565,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
GST_OBJECT_FLAG_UNSET (src, GST_NEONHTTP_SRC_OPEN);
- gst_task_stop (src->task);
- gst_task_join (src->task);
-
if (src->request) {
ne_request_destroy (src->request);
src->request = NULL;
@@ -563,54 +579,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
return TRUE;
}
-void
-request_dispatch (void *data)
-{
-
- int ret;
-
- GstNeonhttpSrc *src;
-
- src = GST_NEONHTTP_SRC (data);
-
- /* Loop sending the request:
- * Retry whilst authentication fails and we supply it. */
-
- do {
- ssize_t len;
-
- do {
- if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
- return;
- }
- len = ne_read_response_block (src->request,
- src->respbuf, sizeof (src->respbuf));
- } while (len > 0);
-
- if (len < 0) {
- ret = NE_ERROR;
- break;
- }
-
- ret = ne_end_request (src->request);
-
- } while (ret == NE_RETRY);
-
- if (ret != NE_OK) {
- GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
- ne_get_error (src->session));
- }
-
- g_mutex_lock (src->lock);
- src->eos = TRUE;
- g_mutex_unlock (src->lock);
- gst_task_stop (src->task);
-
- return;
-
-}
-
-
/* entry point to initialize the plug-in
* initialize the plug-in itself
* register the element factories and pad templates
@@ -682,39 +650,11 @@ oom_callback ()
GST_ERROR ("memory exeception in neon\n");
}
-static int
-accept_response (void *userdata, ne_request * req, const ne_status * st)
-{
- GST_LOG ("ne_accept_response called code = %d phrase %s\n", st->code,
- st->reason_phrase);
- return ne_accept_2xx (userdata, req, st);
-}
-
-static void
-block_reader (void *userdata, const char *buf, size_t len)
-{
-
- if (len) {
- GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
- GstBuffer *buffer = gst_buffer_new_and_alloc (len);
-
- memcpy (GST_BUFFER_DATA (buffer), buf, len);
-
- g_mutex_lock (src->lock);
- gst_adapter_push (src->adapter, buffer);
- src->current_size += len;
- g_mutex_unlock (src->lock);
-
- }
-}
-
void
size_header_handler (void *userdata, const char *value)
{
GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
- g_mutex_lock (src->lock);
src->content_size = atoi (value);
- g_mutex_unlock (src->lock);
}