diff options
Diffstat (limited to 'ext/neon/gstneonhttpsrc.c')
-rw-r--r-- | ext/neon/gstneonhttpsrc.c | 202 |
1 files changed, 71 insertions, 131 deletions
diff --git a/ext/neon/gstneonhttpsrc.c b/ext/neon/gstneonhttpsrc.c index 1f9fad7e..319dc26b 100644 --- a/ext/neon/gstneonhttpsrc.c +++ b/ext/neon/gstneonhttpsrc.c @@ -49,11 +49,8 @@ enum PROP_PROXY }; -static void request_dispatch (void *data); static void oom_callback (); -static int accept_response (void *userdata, ne_request * req, - const ne_status * st); -static void block_reader (void *userdata, const char *buf, size_t len); + static void size_header_handler (void *userdata, const char *value); static gboolean set_proxy (const char *uri, ne_uri * parsed, @@ -165,11 +162,7 @@ gst_neonhttp_src_init (GstNeonhttpSrc * this, GstNeonhttpSrcClass * g_class) set_uri (NULL, &this->uri, &this->ishttps, &this->uristr, TRUE); set_proxy (NULL, &this->proxy, TRUE); - this->lock = g_mutex_new (); this->adapter = gst_adapter_new (); - this->task = gst_task_create (request_dispatch, this); - g_static_rec_mutex_init (&this->tasklock); - gst_task_set_lock (this->task, &this->tasklock); gst_base_src_set_live (GST_BASE_SRC (this), TRUE); @@ -200,22 +193,80 @@ gst_neonhttp_src_finalize (GObject * gobject) g_object_unref (this->adapter); } - if (this->lock) { - g_mutex_free (this->lock); + if (this->uristr) { + ne_free (this->uristr); + } + +} + +int +request_dispatch (GstNeonhttpSrc * src, GstBuffer * outbuf) +{ + + GstPad *peer; + int ret; + int read = 0; + int sizetoread = GST_BUFFER_SIZE (outbuf); + + /* Loop sending the request: + * Retry whilst authentication fails and we supply it. */ + + ssize_t len = 0; + + while (sizetoread > 0) { + + if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) { + GST_BUFFER_SIZE (outbuf) = read; + return read; + } + len = ne_read_response_block (src->request, + (char *) GST_BUFFER_DATA (outbuf) + read, sizetoread); + if (len > 0) { + read += len; + sizetoread -= len; + } else { + break; + } + + } + + GST_BUFFER_SIZE (outbuf) = read; + + if (len < 0) { + read = -2; + goto done; + } else if (len == 0) { + ret = ne_end_request (src->request); + if (ret != NE_RETRY) { + if (ret == NE_OK) { + GST_DEBUG ("Returning EOS"); + peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src)); + if (!gst_pad_send_event (peer, gst_event_new_eos ())) { + ret = GST_FLOW_ERROR; + } + gst_object_unref (peer); + } else { + read = -3; + GST_ERROR ("Request failed. code:%d, desc: %s\n", ret, + ne_get_error (src->session)); + } + } + goto done; } - gst_object_unref (this->task); +done: - g_static_rec_mutex_free (&this->tasklock); + return read; } + static GstFlowReturn gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstNeonhttpSrc *src; GstFlowReturn ret = GST_FLOW_OK; - guint avail; + int read; src = GST_NEONHTTP_SRC (psrc); @@ -224,46 +275,19 @@ gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_LOG_OBJECT (src, "asked for a buffer"); - while (1) { - g_mutex_lock (src->lock); - if ((avail = gst_adapter_available (src->adapter))) { - g_mutex_unlock (src->lock); - break; - } else if (src->eos) { - GstPad *peer; + *outbuf = gst_buffer_new_and_alloc (GST_BASE_SRC (psrc)->blocksize); - g_mutex_unlock (src->lock); + read = request_dispatch (src, *outbuf); + if (read > 0) { - *outbuf = NULL; - GST_DEBUG ("Returning EOS"); - peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src)); - if (!gst_pad_send_event (peer, gst_event_new_eos ())) { - ret = GST_FLOW_ERROR; - } - gst_object_unref (peer); - goto done; + if (*outbuf) { + gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src))); } - g_mutex_unlock (src->lock); - usleep (250000); - } - - g_mutex_lock (src->lock); - - avail = gst_adapter_available (src->adapter); - avail = avail > (4 * 1024) ? (4 * 1024) : avail; - *outbuf = gst_buffer_new_and_alloc (avail); - memcpy (GST_BUFFER_DATA (*outbuf), gst_adapter_peek (src->adapter, avail), - avail); - gst_adapter_flush (src->adapter, avail); - - g_mutex_unlock (src->lock); - if (*outbuf) { - gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src))); + } else if (read < 0) { + return GST_FLOW_ERROR; } -done: - return ret; wrong_state: @@ -516,17 +540,12 @@ gst_neonhttp_src_start (GstBaseSrc * bsrc) ne_add_response_header_handler (src->request, "Content-Length", size_header_handler, src); - ne_add_response_body_reader (src->request, accept_response, block_reader, - src); - if (NE_OK != ne_begin_request (src->request)) { ret = FALSE; goto done; } - src->eos = FALSE; GST_OBJECT_FLAG_SET (src, GST_NEONHTTP_SRC_OPEN); - gst_task_start (src->task); done: @@ -546,9 +565,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc) GST_OBJECT_FLAG_UNSET (src, GST_NEONHTTP_SRC_OPEN); - gst_task_stop (src->task); - gst_task_join (src->task); - if (src->request) { ne_request_destroy (src->request); src->request = NULL; @@ -563,54 +579,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc) return TRUE; } -void -request_dispatch (void *data) -{ - - int ret; - - GstNeonhttpSrc *src; - - src = GST_NEONHTTP_SRC (data); - - /* Loop sending the request: - * Retry whilst authentication fails and we supply it. */ - - do { - ssize_t len; - - do { - if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) { - return; - } - len = ne_read_response_block (src->request, - src->respbuf, sizeof (src->respbuf)); - } while (len > 0); - - if (len < 0) { - ret = NE_ERROR; - break; - } - - ret = ne_end_request (src->request); - - } while (ret == NE_RETRY); - - if (ret != NE_OK) { - GST_ERROR ("Request failed. code:%d, desc: %s\n", ret, - ne_get_error (src->session)); - } - - g_mutex_lock (src->lock); - src->eos = TRUE; - g_mutex_unlock (src->lock); - gst_task_stop (src->task); - - return; - -} - - /* entry point to initialize the plug-in * initialize the plug-in itself * register the element factories and pad templates @@ -682,39 +650,11 @@ oom_callback () GST_ERROR ("memory exeception in neon\n"); } -static int -accept_response (void *userdata, ne_request * req, const ne_status * st) -{ - GST_LOG ("ne_accept_response called code = %d phrase %s\n", st->code, - st->reason_phrase); - return ne_accept_2xx (userdata, req, st); -} - -static void -block_reader (void *userdata, const char *buf, size_t len) -{ - - if (len) { - GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata); - GstBuffer *buffer = gst_buffer_new_and_alloc (len); - - memcpy (GST_BUFFER_DATA (buffer), buf, len); - - g_mutex_lock (src->lock); - gst_adapter_push (src->adapter, buffer); - src->current_size += len; - g_mutex_unlock (src->lock); - - } -} - void size_header_handler (void *userdata, const char *value) { GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata); - g_mutex_lock (src->lock); src->content_size = atoi (value); - g_mutex_unlock (src->lock); } |