summaryrefslogtreecommitdiffstats
path: root/ext
diff options
context:
space:
mode:
authorWouter Cloetens <wouter@mind.be>2008-02-22 07:20:03 +0000
committerSebastian Dröge <slomo@circular-chaos.org>2008-02-22 07:20:03 +0000
commitfbcd082d9c736d4be9d0b35bc92402cc5a016fdd (patch)
tree898ae10059fdd7cbc236682bef99cc7d1cb10b52 /ext
parentb4ec0f41568d28f58f41ee1abcb823432126cf5d (diff)
downloadgst-plugins-bad-fbcd082d9c736d4be9d0b35bc92402cc5a016fdd.tar.gz
gst-plugins-bad-fbcd082d9c736d4be9d0b35bc92402cc5a016fdd.tar.bz2
gst-plugins-bad-fbcd082d9c736d4be9d0b35bc92402cc5a016fdd.zip
Implement zero-copy and make the buffer size configurable.
Original commit message from CVS: Patch by: Wouter Cloetens <wouter at mind dot be> * configure.ac: * ext/soup/gstsouphttpsrc.c: (gst_soup_http_src_cancel_message), (gst_soup_http_src_finished_cb), (gst_soup_http_src_chunk_free), (gst_soup_http_src_chunk_allocator), (gst_soup_http_src_got_chunk_cb), (gst_soup_http_src_create), (gst_soup_http_src_start), (gst_soup_http_src_set_proxy): * ext/soup/gstsouphttpsrc.h: Implement zero-copy and make the buffer size configurable. Prefix proxy URIs with "http://" if they don't start with it already and catch errors earlier, fixes hanging in some situations. Fixes bug #514948.
Diffstat (limited to 'ext')
-rw-r--r--ext/soup/gstsouphttpsrc.c137
-rw-r--r--ext/soup/gstsouphttpsrc.h1
2 files changed, 103 insertions, 35 deletions
diff --git a/ext/soup/gstsouphttpsrc.c b/ext/soup/gstsouphttpsrc.c
index 86e6aca9..af50eaa2 100644
--- a/ext/soup/gstsouphttpsrc.c
+++ b/ext/soup/gstsouphttpsrc.c
@@ -124,7 +124,7 @@ enum
PROP_IRADIO_TITLE
};
-#define DEFAULT_USER_AGENT "GStreamer souphttpsrc"
+#define DEFAULT_USER_AGENT "GStreamer souphttpsrc "
static void gst_soup_http_src_uri_handler_init (gpointer g_iface,
gpointer iface_data);
@@ -163,6 +163,9 @@ static void gst_soup_http_src_session_pause_message (GstSoupHTTPSrc * src);
static void gst_soup_http_src_session_close (GstSoupHTTPSrc * src);
static void gst_soup_http_src_parse_status (SoupMessage * msg,
GstSoupHTTPSrc * src);
+static void gst_soup_http_src_chunk_free (gpointer gstbuf);
+static SoupBuffer *gst_soup_http_src_chunk_allocator (SoupMessage * msg,
+ gsize max_len, gpointer user_data);
static void gst_soup_http_src_got_chunk_cb (SoupMessage * msg,
SoupBuffer * chunk, GstSoupHTTPSrc * src);
static void gst_soup_http_src_response_cb (SoupSession * session,
@@ -459,7 +462,8 @@ gst_soup_http_src_unicodify (const gchar * str)
static void
gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
{
- soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
+ if (src->msg != NULL)
+ soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
src->msg = NULL;
}
@@ -645,25 +649,88 @@ gst_soup_http_src_finished_cb (SoupMessage * msg, GstSoupHTTPSrc * src)
GST_DEBUG_OBJECT (src, "finished, but not for current message");
return;
}
+ GST_DEBUG_OBJECT (src, "finished");
if (G_UNLIKELY (src->session_io_status !=
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING)) {
- /* Probably a redirect. */
- return;
+ GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
+ ("%s", msg->reason_phrase),
+ ("libsoup status code %d", msg->status_code));
}
- GST_DEBUG_OBJECT (src, "finished");
src->ret = GST_FLOW_UNEXPECTED;
if (src->loop)
g_main_loop_quit (src->loop);
}
+/* Buffer lifecycle management.
+ *
+ * gst_soup_http_src_create() runs the GMainLoop for this element, to let
+ * Soup take control.
+ * A GstBuffer is allocated in gst_soup_http_src_chunk_allocator() and
+ * associated with a SoupBuffer.
+ * Soup reads HTTP data in the GstBuffer's data buffer.
+ * The gst_soup_http_src_got_chunk_cb() is then called with the SoupBuffer.
+ * That sets gst_soup_http_src_create()'s return argument to the GstBuffer,
+ * increments its refcount (to 2), pauses the flow of data from the HTTP
+ * source to prevent gst_soup_http_src_got_chunk_cb() from being called
+ * again and breaks out of the GMainLoop.
+ * Because the SOUP_MESSAGE_OVERWRITE_CHUNKS flag is set, Soup frees the
+ * SoupBuffer and calls gst_soup_http_src_chunk_free(), which decrements the
+ * refcount (to 1).
+ * gst_soup_http_src_create() returns the GstBuffer. It will be freed by a
+ * downstream element.
+ * If Soup fails to read HTTP data, it does not call
+ * gst_soup_http_src_got_chunk_cb(), but still frees the SoupBuffer and
+ * calls gst_soup_http_src_chunk_free(), which decrements the GstBuffer's
+ * refcount to 0, freeing it.
+ */
+
+static void
+gst_soup_http_src_chunk_free (gpointer gstbuf)
+{
+ gst_buffer_unref (GST_BUFFER_CAST (gstbuf));
+}
+
+static SoupBuffer *
+gst_soup_http_src_chunk_allocator (SoupMessage * msg, gsize max_len,
+ gpointer user_data)
+{
+ GstSoupHTTPSrc *src = (GstSoupHTTPSrc *) user_data;
+ GstBaseSrc *basesrc = GST_BASE_SRC_CAST (src);
+ GstBuffer *gstbuf;
+ SoupBuffer *soupbuf;
+ gsize length;
+ GstFlowReturn rc;
+
+ if (max_len)
+ length = MIN (basesrc->blocksize, max_len);
+ else
+ length = basesrc->blocksize;
+ GST_DEBUG_OBJECT (src, "alloc %" G_GSIZE_FORMAT " bytes <= %" G_GSIZE_FORMAT,
+ length, max_len);
+
+ rc = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
+ GST_BUFFER_OFFSET_NONE, length,
+ GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), &gstbuf);
+ if (G_UNLIKELY (rc != GST_FLOW_OK)) {
+ /* Failed to allocate buffer. Stall SoupSession and return error code
+ * to create(). */
+ src->ret = rc;
+ g_main_loop_quit (src->loop);
+ return NULL;
+ }
+
+ soupbuf = soup_buffer_new_with_owner (GST_BUFFER_DATA (gstbuf), length,
+ gstbuf, gst_soup_http_src_chunk_free);
+
+ return soupbuf;
+}
+
static void
gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
GstSoupHTTPSrc * src)
{
GstBaseSrc *basesrc;
guint64 new_position;
- const char *data;
- gsize length;
if (G_UNLIKELY (msg != src->msg)) {
GST_DEBUG_OBJECT (src, "got chunk, but not for current message");
@@ -675,22 +742,22 @@ gst_soup_http_src_got_chunk_cb (SoupMessage * msg, SoupBuffer * chunk,
return;
}
basesrc = GST_BASE_SRC_CAST (src);
- data = chunk->data;
- length = chunk->length;
- GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes", length);
-
- /* Create the buffer. */
- src->ret = gst_pad_alloc_buffer (GST_BASE_SRC_PAD (basesrc),
- basesrc->segment.last_stop, length,
- GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)), src->outbuf);
- if (G_LIKELY (src->ret == GST_FLOW_OK)) {
- memcpy (GST_BUFFER_DATA (*src->outbuf), data, length);
- new_position = src->read_position + length;
- if (G_LIKELY (src->request_position == src->read_position))
- src->request_position = new_position;
- src->read_position = new_position;
- }
-
+ GST_DEBUG_OBJECT (src, "got chunk of %" G_GSIZE_FORMAT " bytes",
+ chunk->length);
+
+ /* Extract the GstBuffer from the SoupBuffer and set its fields. */
+ *src->outbuf = GST_BUFFER_CAST (soup_buffer_get_owner (chunk));
+ gst_buffer_ref (*src->outbuf);
+ GST_BUFFER_SIZE (*src->outbuf) = chunk->length;
+ GST_BUFFER_OFFSET (*src->outbuf) = basesrc->segment.last_stop;
+ gst_buffer_set_caps (*src->outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (basesrc)));
+
+ new_position = src->read_position + chunk->length;
+ if (G_LIKELY (src->request_position == src->read_position))
+ src->request_position = new_position;
+ src->read_position = new_position;
+
+ src->ret = GST_FLOW_OK;
g_main_loop_quit (src->loop);
gst_soup_http_src_session_pause_message (src);
}
@@ -789,10 +856,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
src->session_io_status = GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE;
soup_message_headers_append (src->msg->request_headers, "Connection",
"close");
- if (src->user_agent) {
- soup_message_headers_append (src->msg->request_headers, "User-Agent",
- src->user_agent);
- }
if (src->iradio_mode) {
soup_message_headers_append (src->msg->request_headers, "icy-metadata",
"1");
@@ -808,6 +871,8 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
G_CALLBACK (gst_soup_http_src_got_chunk_cb), src);
soup_message_set_flags (src->msg, SOUP_MESSAGE_OVERWRITE_CHUNKS |
(src->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
+ soup_message_set_chunk_allocator (src->msg,
+ gst_soup_http_src_chunk_allocator, src, NULL);
gst_soup_http_src_add_range_header (src, src->request_position);
}
@@ -828,10 +893,6 @@ gst_soup_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GST_DEBUG_OBJECT (src, "Queueing connection request");
gst_soup_http_src_queue_message (src);
break;
- case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED:
- GST_DEBUG_OBJECT (src, "Connection closed");
- gst_soup_http_src_cancel_message (src);
- break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED:
break;
case GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING:
@@ -874,11 +935,12 @@ gst_soup_http_src_start (GstBaseSrc * bsrc)
if (src->proxy == NULL)
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
- src->context, NULL);
+ src->context, SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
else
src->session =
soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
- src->context, SOUP_SESSION_PROXY_URI, src->proxy, NULL);
+ src->context, SOUP_SESSION_PROXY_URI, src->proxy,
+ SOUP_SESSION_USER_AGENT, src->user_agent, NULL);
if (!src->session) {
GST_ELEMENT_ERROR (src, LIBRARY, INIT,
(NULL), ("Failed to create async session"));
@@ -996,7 +1058,14 @@ gst_soup_http_src_set_proxy (GstSoupHTTPSrc * src, const gchar * uri)
soup_uri_free (src->proxy);
src->proxy = NULL;
}
- src->proxy = soup_uri_new (uri);
+ if (g_str_has_prefix (uri, "http://")) {
+ src->proxy = soup_uri_new (uri);
+ } else {
+ gchar *new_uri = g_strconcat ("http://", uri, NULL);
+
+ src->proxy = soup_uri_new (new_uri);
+ g_free (new_uri);
+ }
return TRUE;
}
diff --git a/ext/soup/gstsouphttpsrc.h b/ext/soup/gstsouphttpsrc.h
index 14891d90..9e7d81cb 100644
--- a/ext/soup/gstsouphttpsrc.h
+++ b/ext/soup/gstsouphttpsrc.h
@@ -42,7 +42,6 @@ typedef enum {
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_IDLE,
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_QUEUED,
GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_RUNNING,
- GST_SOUP_HTTP_SRC_SESSION_IO_STATUS_FINISHED,
} GstSoupHTTPSrcSessionIOStatus;
struct _GstSoupHTTPSrc {