aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2011-12-11 20:58:38 +0000
committerDavid Robillard <d@drobilla.net>2011-12-11 20:58:38 +0000
commit6e4a3a9ea93155e74da4b00fd7cf1b88b2d08f5f (patch)
treed0c8a0018719d957c10e73b2b0cc5765f09b11e9
parent4f2be9d332594f1fdd494437e6f7d5468c1c6022 (diff)
downloadserd-6e4a3a9ea93155e74da4b00fd7cf1b88b2d08f5f.tar.gz
serd-6e4a3a9ea93155e74da4b00fd7cf1b88b2d08f5f.tar.bz2
serd-6e4a3a9ea93155e74da4b00fd7cf1b88b2d08f5f.zip
Add SerdBulkSink for writing bulk output and corresponding serdi -B option.
git-svn-id: http://svn.drobilla.net/serd/trunk@242 490d8e77-9747-427b-9fa3-0b8f29cee8a0
-rw-r--r--ChangeLog1
-rw-r--r--serd/serd.h52
-rw-r--r--src/reader.c23
-rw-r--r--src/serd_internal.h2
-rw-r--r--src/serdi.c18
-rw-r--r--src/sink.c90
-rw-r--r--wscript3
7 files changed, 168 insertions, 21 deletions
diff --git a/ChangeLog b/ChangeLog
index e18d37f0..86395ad7 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -6,6 +6,7 @@ serd (UNRELEASED) unstable; urgency=low
Unicode replacement character
* Report reason for failure to open file in serdi
* Improve write performance by doing bulk writes for unescaped substrings
+ * Add SerdBulkSink for writing bulk output and corresponding serdi -B option
-- David Robillard <d@drobilla.net> (UNRELEASED)
diff --git a/serd/serd.h b/serd/serd.h
index 79f164ea..94499a4d 100644
--- a/serd/serd.h
+++ b/serd/serd.h
@@ -267,10 +267,55 @@ serd_strlen(const uint8_t* str, size_t* n_bytes, SerdNodeFlags* flags);
/**
@}
- @name URI
+ @name Sink
@{
*/
+/**
+ Sink function for raw string output.
+*/
+typedef size_t (*SerdSink)(const void* buf, size_t len, void* stream);
+
+/**
+ Sink adapter that writes blocks to the target sink.
+
+ This is itself a SerdSink which can be used with any SerdSink as a target to
+ transparently write chunked I/O to the output sink. This can significantly
+ improve write performance when the target is a file or similar resource.
+*/
+typedef struct SerdBulkSinkImpl SerdBulkSink;
+
+/**
+ Create a new bulk sink adapter.
+ @param sink Target sink where completed blocks will be written.
+ @param stream Stream parameter for target sink.
+ @param block_size Size of blocks to write, and internal buffer size.
+*/
+SERD_API
+SerdBulkSink*
+serd_bulk_sink_new(SerdSink sink, void* stream, size_t block_size);
+
+/**
+ Free a bulk sink adapter.
+*/
+SERD_API
+void
+serd_bulk_sink_free(SerdBulkSink* bsink);
+
+/**
+ Write data to a bulk sink adapter.
+
+ This function may safely be cast to SerdSink.
+*/
+SERD_API
+size_t
+serd_bulk_sink_write(const void* buf, size_t len, SerdBulkSink* bsink);
+
+/**
+ @}
+ @name URI
+ @{
+*/
static const SerdURI SERD_URI_NULL = {{0,0},{0,0},{0,0},{0,0},{0,0},{0,0}};
/**
@@ -302,11 +347,6 @@ void
serd_uri_resolve(const SerdURI* uri, const SerdURI* base, SerdURI* out);
/**
- Sink function for raw string output.
-*/
-typedef size_t (*SerdSink)(const void* buf, size_t len, void* stream);
-
-/**
Serialise @c uri with a series of calls to @c sink.
*/
SERD_API
diff --git a/src/reader.c b/src/reader.c
index d718d255..42c5093f 100644
--- a/src/reader.c
+++ b/src/reader.c
@@ -34,9 +34,6 @@
#define TRY_THROW(exp) if (!(exp)) goto except;
#define TRY_RET(exp) if (!(exp)) return 0;
-#define STACK_PAGE_SIZE 4096
-#define READ_BUF_LEN 4096
-
typedef struct {
const uint8_t* filename;
unsigned line;
@@ -120,7 +117,7 @@ page(SerdReader* reader)
{
assert(reader->from_file);
reader->read_head = 0;
- const size_t n_read = fread(reader->read_buf, 1, READ_BUF_LEN, reader->fd);
+ const size_t n_read = fread(reader->read_buf, 1, SERD_PAGE_SIZE, reader->fd);
reader->read_buf[n_read] = '\0';
if (n_read == 0) {
reader->eof = true;
@@ -133,7 +130,7 @@ peek_string(SerdReader* reader, uint8_t* pre, int n)
{
uint8_t* ptr = reader->read_buf + reader->read_head;
for (int i = 0; i < n; ++i) {
- if (reader->from_file && (reader->read_head + i >= READ_BUF_LEN)) {
+ if (reader->from_file && (reader->read_head + i >= SERD_PAGE_SIZE)) {
if (!page(reader)) {
return false;
}
@@ -168,9 +165,9 @@ eat_byte(SerdReader* reader, const uint8_t byte)
if (c != byte) {
return error(reader, "expected `%c', not `%c'\n", byte, c);
}
- if (reader->from_file && (reader->read_head == READ_BUF_LEN)) {
+ if (reader->from_file && (reader->read_head == SERD_PAGE_SIZE)) {
TRY_RET(page(reader));
- assert(reader->read_head < READ_BUF_LEN);
+ assert(reader->read_head < SERD_PAGE_SIZE);
}
if (reader->read_buf[reader->read_head] == '\0') {
reader->eof = true;
@@ -1439,7 +1436,7 @@ serd_reader_new(SerdSyntax syntax,
me->statement_sink = statement_sink;
me->end_sink = end_sink;
me->fd = 0;
- me->stack = serd_stack_new(STACK_PAGE_SIZE);
+ me->stack = serd_stack_new(SERD_PAGE_SIZE);
me->syntax = syntax;
me->cur = cur;
me->bprefix = NULL;
@@ -1537,21 +1534,21 @@ serd_reader_read_file_handle(SerdReader* me, FILE* file, const uint8_t* name)
me->from_file = true;
me->eof = false;
#ifdef HAVE_POSIX_MEMALIGN
- posix_memalign((void**)&me->read_buf, 4096, READ_BUF_LEN * 2);
+ posix_memalign((void**)&me->read_buf, 4096, SERD_PAGE_SIZE * 2);
#else
- me->read_buf = (uint8_t*)malloc(READ_BUF_LEN * 2);
+ me->read_buf = (uint8_t*)malloc(SERD_PAGE_SIZE * 2);
#endif
/* Read into the second page of the buffer. Occasionally peek_string
will move the read_head to before this point when readahead causes
a page fault.
*/
- memset(me->read_buf, '\0', READ_BUF_LEN * 2);
- me->read_buf += READ_BUF_LEN;
+ memset(me->read_buf, '\0', SERD_PAGE_SIZE * 2);
+ me->read_buf += SERD_PAGE_SIZE;
const bool ret = !page(me) || read_turtleDoc(me);
- free(me->read_buf - READ_BUF_LEN);
+ free(me->read_buf - SERD_PAGE_SIZE);
me->fd = 0;
me->read_buf = NULL;
return ret ? SERD_SUCCESS : SERD_ERR_UNKNOWN;
diff --git a/src/serd_internal.h b/src/serd_internal.h
index 6c00fea1..f5f0f3b4 100644
--- a/src/serd_internal.h
+++ b/src/serd_internal.h
@@ -22,6 +22,8 @@
#include "serd/serd.h"
+#define SERD_PAGE_SIZE 4096
+
/** A dynamic stack in memory. */
typedef struct {
uint8_t* buf; ///< Stack memory
diff --git a/src/serdi.c b/src/serdi.c
index 939b63d0..c8bb9242 100644
--- a/src/serdi.c
+++ b/src/serdi.c
@@ -20,7 +20,9 @@
#include <string.h>
#include "serd/serd.h"
+
#include "serd-config.h"
+#include "serd_internal.h"
typedef struct {
SerdEnv* env;
@@ -45,6 +47,7 @@ print_usage(const char* name, bool error)
fprintf(os, "Usage: %s [OPTION]... INPUT [BASE_URI]\n", name);
fprintf(os, "Read and write RDF syntax.\n");
fprintf(os, "Use - for INPUT to read from standard input.\n\n");
+ fprintf(os, " -B Fast bulk output for large serialisations.\n");
fprintf(os, " -h Display this help and exit\n");
fprintf(os, " -i SYNTAX Input syntax (`turtle' or `ntriples')\n");
fprintf(os, " -o SYNTAX Output syntax (`turtle' or `ntriples')\n");
@@ -87,6 +90,7 @@ main(int argc, char** argv)
SerdSyntax input_syntax = SERD_TURTLE;
SerdSyntax output_syntax = SERD_NTRIPLES;
bool from_file = true;
+ bool bulk_write = false;
const uint8_t* in_name = NULL;
const uint8_t* add_prefix = NULL;
const uint8_t* chop_prefix = NULL;
@@ -96,6 +100,8 @@ main(int argc, char** argv)
in_name = (const uint8_t*)"(stdin)";
in_fd = stdin;
break;
+ } else if (argv[a][1] == 'B') {
+ bulk_write = true;
} else if (argv[a][1] == 'h') {
return print_usage(argv[0], false);
} else if (argv[a][1] == 'v') {
@@ -201,8 +207,17 @@ main(int argc, char** argv)
output_style |= SERD_STYLE_ABBREVIATED|SERD_STYLE_CURIED;
}
+ SerdSink sink = file_sink;
+ void* stream = out_fd;
+ SerdBulkSink* bulk_sink = NULL;
+ if (bulk_write) {
+ bulk_sink = serd_bulk_sink_new(sink, stream, SERD_PAGE_SIZE);
+ sink = (SerdSink)serd_bulk_sink_write;
+ stream = bulk_sink;
+ }
+
SerdWriter* writer = serd_writer_new(
- output_syntax, output_style, env, &base_uri, file_sink, out_fd);
+ output_syntax, output_style, env, &base_uri, sink, stream);
if (chop_prefix) {
serd_writer_chop_blank_prefix(writer, chop_prefix);
@@ -233,6 +248,7 @@ main(int argc, char** argv)
serd_writer_finish(state.writer);
serd_writer_free(state.writer);
+ serd_bulk_sink_free(bulk_sink);
serd_env_free(state.env);
serd_node_free(&base_uri_node);
diff --git a/src/sink.c b/src/sink.c
new file mode 100644
index 00000000..3eb44b9a
--- /dev/null
+++ b/src/sink.c
@@ -0,0 +1,90 @@
+/*
+ Copyright 2011 David Robillard <http://drobilla.net>
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+
+#define _POSIX_C_SOURCE 201112L /* for posix_memalign */
+
+#include <stdlib.h>
+#include <string.h>
+
+#include "serd_internal.h"
+#include "serd-config.h"
+
+#ifndef MIN
+# define MIN(a, b) (((a) < (b)) ? (a) : (b))
+#endif
+
+struct SerdBulkSinkImpl {
+ SerdSink sink;
+ void* stream;
+ uint8_t* buf;
+ size_t size;
+ size_t block_size;
+};
+
+SERD_API
+SerdBulkSink*
+serd_bulk_sink_new(SerdSink sink, void* stream, size_t block_size)
+{
+ SerdBulkSink* bsink = (SerdBulkSink*)malloc(sizeof(SerdBulkSink));
+ bsink->sink = sink;
+ bsink->stream = stream;
+ bsink->size = 0;
+ bsink->block_size = block_size;
+#ifdef HAVE_POSIX_MEMALIGN
+ posix_memalign((void**)&bsink->buf, block_size, block_size);
+#else
+ bsink->buf = (uint8_t*)malloc(block_size);
+#endif
+ return bsink;
+}
+
+SERD_API
+void
+serd_bulk_sink_free(SerdBulkSink* bsink)
+{
+ if (bsink) {
+ // Flush any remaining output
+ if (bsink->size > 0) {
+ bsink->sink(bsink->buf, bsink->size, bsink->stream);
+ }
+ free(bsink->buf);
+ free(bsink);
+ }
+}
+
+SERD_API
+size_t
+serd_bulk_sink_write(const void* buf, size_t len, SerdBulkSink* bsink)
+{
+ const size_t orig_len = len;
+ while (len > 0) {
+ const size_t space = bsink->block_size - bsink->size;
+ const size_t n = MIN(space, len);
+
+ // Write as much as possible into the remaining buffer space
+ memcpy(bsink->buf + bsink->size, buf, n);
+ bsink->size += n;
+ buf = (uint8_t*)buf + n;
+ len -= n;
+
+ // Flush page if buffer is full
+ if (bsink->size == bsink->block_size) {
+ bsink->sink(bsink->buf, bsink->block_size, bsink->stream);
+ bsink->size = 0;
+ }
+ }
+ return orig_len;
+}
diff --git a/wscript b/wscript
index 225b52b9..9bc371d3 100644
--- a/wscript
+++ b/wscript
@@ -10,7 +10,7 @@ from waflib.extras import autowaf as autowaf
import waflib.Logs as Logs, waflib.Options as Options
# Version of this package (even if built as a child)
-SERD_VERSION = '0.5.0'
+SERD_VERSION = '0.6.0'
SERD_MAJOR_VERSION = '0'
# Library version (UNIX style major, minor, micro)
@@ -91,6 +91,7 @@ def build(bld):
src/env.c
src/node.c
src/reader.c
+ src/sink.c
src/string.c
src/uri.c
src/writer.c