From fbcd082d9c736d4be9d0b35bc92402cc5a016fdd Mon Sep 17 00:00:00 2001 From: Wouter Cloetens Date: Fri, 22 Feb 2008 07:20:03 +0000 Subject: Implement zero-copy and make the buffer size configurable. Original commit message from CVS: Patch by: Wouter Cloetens * configure.ac: * ext/soup/gstsouphttpsrc.c: (gst_soup_http_src_cancel_message), (gst_soup_http_src_finished_cb), (gst_soup_http_src_chunk_free), (gst_soup_http_src_chunk_allocator), (gst_soup_http_src_got_chunk_cb), (gst_soup_http_src_create), (gst_soup_http_src_start), (gst_soup_http_src_set_proxy): * ext/soup/gstsouphttpsrc.h: Implement zero-copy and make the buffer size configurable. Prefix proxy URIs with "http://" if they don't start with it already and catch errors earlier, fixes hanging in some situations. Fixes bug #514948. --- ext/soup/gstsouphttpsrc.c | 137 ++++++++++++++++++++++++++++++++++------------ ext/soup/gstsouphttpsrc.h | 1 - 2 files changed, 103 insertions(+), 35 deletions(-) (limited to 'ext/soup') diff --git a/ext/soup/gstsouphttpsrc.c b/ext/soup/gstsouphttpsrc.c index 86e6aca9..af50eaa2 100644 --- a/ext/soup/gstsouphttpsrc.c +++ b/ext/soup/gstsouphttpsrc.c @@ -124,7 +124,7 @@ enum PROP_IRADIO_TITLE }; -#define DEFAULT_USER_AGENT "GStreamer souphttpsrc" +#define DEFAULT_USER_AGENT "GStreamer souphttpsrc " static void gst_soup_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data); @@ -163,6 +163,9 @@ static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src); static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src); static void gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src); +static void gst_soup_http_src_chunk_free (gpointer gstbuf); +static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg, + gsize max_len, gpointer user_data); static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk, GstSoupHTTPSrc * src); static void gst_soup_http_src_response_cb (SoupSession * session, @@ -459,7 +462,8 @@ gst_soup_http_src_unicodify (const gchar * str) static void gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src) { - soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED); + if (src->msg != NULL) + soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED); src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE; src->msg = NULL; } @@ -645,25 +649,88 @@ gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src) GST_DEBUG_OBJECT (src, "finished, but not for current message"); return; } + GST_DEBUG_OBJECT (src, "finished"); if (G_UNLIKELY (src->session_io_status != GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) { - /* Probably a redirect. */ - return; + GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, + ("%s", msg->reason_phrase), + ("libsoup status code %d", msg->status_code)); } - GST_DEBUG_OBJECT (src, "finished"); src->ret = GST_FLOW_UNEXPECTED; if (src->loop) g_main_loop_quit (src->loop); } +/* Buffer lifecycle management. + * + * gst_soup_http_src_create() runs the GMainLoop for this element, to let + * Soup take control. + * A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and + * associated with a SoupBuffer. + * Soup reads HTTP data in the GstBuffer's data buffer. + * The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer. + * That sets gst_soup_http_src_create()'s return argument to the GstBuffer, + * increments its refcount (to 2), pauses the flow of data from the HTTP + * source to prevent gst_soup_http_src_got_chunk_cb() from being called + * again and breaks out of the GMainLoop. + * Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the + * SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the + * refcount (to 1). + * gst_soup_http_src_create() returns the GstBuffer. It will be freed by a + * downstream element. + * If Soup fails to read HTTP data, it does not call + * gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and + * calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's + * refcount to 0, freeing it. + */ + +static void +gst_soup_http_src_chunk_free (gpointer gstbuf) +{ + gst_buffer_unref (GST_BUFFER_CAST (gstbuf)); +} + +static SoupBuffer * +gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len, + gpointer user_data) +{ + GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data; + GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src); + GstBuffer *gstbuf; + SoupBuffer *soupbuf; + gsize length; + GstFlowReturn rc; + + if (max_len) + length = MIN (basesrc->blocksize, max_len); + else + length = basesrc->blocksize; + GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT, + length, max_len); + + rc = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc), + GST_BUFFER_OFFSET_NONE, length, + GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), &gstbuf); + if (G_UNLIKELY (rc != GST_FLOW_OK)) { + /* Failed to allocate buffer. Stall SoupSession and return error code + * to create(). */ + src->ret = rc; + g_main_loop_quit (src->loop); + return NULL; + } + + soupbuf = soup_buffer_new_with_owner (GST_BUFFER_DATA (gstbuf), length, + gstbuf, gst_soup_http_src_chunk_free); + + return soupbuf; +} + static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk, GstSoupHTTPSrc * src) { GstBaseSrc *basesrc; guint64 new_position; - const char *data; - gsize length; if (G_UNLIKELY (msg != src->msg)) { GST_DEBUG_OBJECT (src, "got chunk, but not for current message"); @@ -675,22 +742,22 @@ gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk, return; } basesrc = GST_BASE_SRC_CAST (src); - data = chunk->data; - length = chunk->length; - GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes", length); - - /* Create the buffer. */ - src->ret = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc), - basesrc->segment.last_stop, length, - GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), src->outbuf); - if (G_LIKELY (src->ret == GST_FLOW_OK)) { - memcpy (GST_BUFFER_DATA (*src->outbuf), data, length); - new_position = src->read_position + length; - if (G_LIKELY (src->request_position == src->read_position)) - src->request_position = new_position; - src->read_position = new_position; - } - + GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes", + chunk->length); + + /* Extract the GstBuffer from the SoupBuffer and set its fields. */ + *src->outbuf = GST_BUFFER_CAST (soup_buffer_get_owner (chunk)); + gst_buffer_ref (*src->outbuf); + GST_BUFFER_SIZE (*src->outbuf) = chunk->length; + GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.last_stop; + gst_buffer_set_caps (*src->outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc))); + + new_position = src->read_position + chunk->length; + if (G_LIKELY (src->request_position == src->read_position)) + src->request_position = new_position; + src->read_position = new_position; + + src->ret = GST_FLOW_OK; g_main_loop_quit (src->loop); gst_soup_http_src_session_pause_message (src); } @@ -789,10 +856,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE; soup_message_headers_append (src->msg->request_headers, "Connection", "close"); - if (src->user_agent) { - soup_message_headers_append (src->msg->request_headers, "User-Agent", - src->user_agent); - } if (src->iradio_mode) { soup_message_headers_append (src->msg->request_headers, "icy-metadata", "1"); @@ -808,6 +871,8 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) G_CALLBACK (gst_soup_http_src_got_chunk_cb), src); soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS | (src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT)); + soup_message_set_chunk_allocator (src->msg, + gst_soup_http_src_chunk_allocator, src, NULL); gst_soup_http_src_add_range_header (src, src->request_position); } @@ -828,10 +893,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_DEBUG_OBJECT (src, "Queueing connection request"); gst_soup_http_src_queue_message (src); break; - case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED: - GST_DEBUG_OBJECT (src, "Connection closed"); - gst_soup_http_src_cancel_message (src); - break; case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED: break; case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING: @@ -874,11 +935,12 @@ gst_soup_http_src_start (GstBaseSrc * bsrc) if (src->proxy == NULL) src->session = soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT, - src->context, NULL); + src->context, SOUP_SESSION_USER_AGENT, src->user_agent, NULL); else src->session = soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT, - src->context, SOUP_SESSION_PROXY_URI, src->proxy, NULL); + src->context, SOUP_SESSION_PROXY_URI, src->proxy, + SOUP_SESSION_USER_AGENT, src->user_agent, NULL); if (!src->session) { GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), ("Failed to create async session")); @@ -996,7 +1058,14 @@ gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, const gchar * uri) soup_uri_free (src->proxy); src->proxy = NULL; } - src->proxy = soup_uri_new (uri); + if (g_str_has_prefix (uri, "http://")) { + src->proxy = soup_uri_new (uri); + } else { + gchar *new_uri = g_strconcat ("http://", uri, NULL); + + src->proxy = soup_uri_new (new_uri); + g_free (new_uri); + } return TRUE; } diff --git a/ext/soup/gstsouphttpsrc.h b/ext/soup/gstsouphttpsrc.h index 14891d90..9e7d81cb 100644 --- a/ext/soup/gstsouphttpsrc.h +++ b/ext/soup/gstsouphttpsrc.h @@ -42,7 +42,6 @@ typedef enum { GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE, GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED, GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING, - GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED, } GstSoupHTTPSrcSessionIOStatus; struct _GstSoupHTTPSrc { -- cgit v1.2.1