From 6e4a3a9ea93155e74da4b00fd7cf1b88b2d08f5f Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sun, 11 Dec 2011 20:58:38 +0000 Subject: Add SerdBulkSink for writing bulk output and corresponding serdi -B option. git-svn-id: http://svn.drobilla.net/serd/trunk@242 490d8e77-9747-427b-9fa3-0b8f29cee8a0 --- ChangeLog | 1 + serd/serd.h | 52 +++++++++++++++++++++++++++---- src/reader.c | 23 ++++++-------- src/serd_internal.h | 2 ++ src/serdi.c | 18 ++++++++++- src/sink.c | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++ wscript | 3 +- 7 files changed, 168 insertions(+), 21 deletions(-) create mode 100644 src/sink.c diff --git a/ChangeLog b/ChangeLog index e18d37f0..86395ad7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -6,6 +6,7 @@ serd (UNRELEASED) unstable; urgency=low Unicode replacement character * Report reason for failure to open file in serdi * Improve write performance by doing bulk writes for unescaped substrings + * Add SerdBulkSink for writing bulk output and corresponding serdi -B option -- David Robillard (UNRELEASED) diff --git a/serd/serd.h b/serd/serd.h index 79f164ea..94499a4d 100644 --- a/serd/serd.h +++ b/serd/serd.h @@ -267,10 +267,55 @@ serd_strlen(const uint8_t* str, size_t* n_bytes, SerdNodeFlags* flags); /** @} - @name URI + @name Sink @{ */ +/** + Sink function for raw string output. +*/ +typedef size_t (*SerdSink)(const void* buf, size_t len, void* stream); + +/** + Sink adapter that writes blocks to the target sink. + + This is itself a SerdSink which can be used with any SerdSink as a target to + transparently write chunked I/O to the output sink. This can significantly + improve write performance when the target is a file or similar resource. +*/ +typedef struct SerdBulkSinkImpl SerdBulkSink; + +/** + Create a new bulk sink adapter. + @param sink Target sink where completed blocks will be written. + @param stream Stream parameter for target sink. + @param block_size Size of blocks to write, and internal buffer size. +*/ +SERD_API +SerdBulkSink* +serd_bulk_sink_new(SerdSink sink, void* stream, size_t block_size); + +/** + Free a bulk sink adapter. +*/ +SERD_API +void +serd_bulk_sink_free(SerdBulkSink* bsink); + +/** + Write data to a bulk sink adapter. + + This function may safely be cast to SerdSink. +*/ +SERD_API +size_t +serd_bulk_sink_write(const void* buf, size_t len, SerdBulkSink* bsink); + +/** + @} + @name URI + @{ +*/ static const SerdURI SERD_URI_NULL = {{0,0},{0,0},{0,0},{0,0},{0,0},{0,0}}; /** @@ -301,11 +346,6 @@ SERD_API void serd_uri_resolve(const SerdURI* uri, const SerdURI* base, SerdURI* out); -/** - Sink function for raw string output. -*/ -typedef size_t (*SerdSink)(const void* buf, size_t len, void* stream); - /** Serialise @c uri with a series of calls to @c sink. */ diff --git a/src/reader.c b/src/reader.c index d718d255..42c5093f 100644 --- a/src/reader.c +++ b/src/reader.c @@ -34,9 +34,6 @@ #define TRY_THROW(exp) if (!(exp)) goto except; #define TRY_RET(exp) if (!(exp)) return 0; -#define STACK_PAGE_SIZE 4096 -#define READ_BUF_LEN 4096 - typedef struct { const uint8_t* filename; unsigned line; @@ -120,7 +117,7 @@ page(SerdReader* reader) { assert(reader->from_file); reader->read_head = 0; - const size_t n_read = fread(reader->read_buf, 1, READ_BUF_LEN, reader->fd); + const size_t n_read = fread(reader->read_buf, 1, SERD_PAGE_SIZE, reader->fd); reader->read_buf[n_read] = '\0'; if (n_read == 0) { reader->eof = true; @@ -133,7 +130,7 @@ peek_string(SerdReader* reader, uint8_t* pre, int n) { uint8_t* ptr = reader->read_buf + reader->read_head; for (int i = 0; i < n; ++i) { - if (reader->from_file && (reader->read_head + i >= READ_BUF_LEN)) { + if (reader->from_file && (reader->read_head + i >= SERD_PAGE_SIZE)) { if (!page(reader)) { return false; } @@ -168,9 +165,9 @@ eat_byte(SerdReader* reader, const uint8_t byte) if (c != byte) { return error(reader, "expected `%c', not `%c'\n", byte, c); } - if (reader->from_file && (reader->read_head == READ_BUF_LEN)) { + if (reader->from_file && (reader->read_head == SERD_PAGE_SIZE)) { TRY_RET(page(reader)); - assert(reader->read_head < READ_BUF_LEN); + assert(reader->read_head < SERD_PAGE_SIZE); } if (reader->read_buf[reader->read_head] == '\0') { reader->eof = true; @@ -1439,7 +1436,7 @@ serd_reader_new(SerdSyntax syntax, me->statement_sink = statement_sink; me->end_sink = end_sink; me->fd = 0; - me->stack = serd_stack_new(STACK_PAGE_SIZE); + me->stack = serd_stack_new(SERD_PAGE_SIZE); me->syntax = syntax; me->cur = cur; me->bprefix = NULL; @@ -1537,21 +1534,21 @@ serd_reader_read_file_handle(SerdReader* me, FILE* file, const uint8_t* name) me->from_file = true; me->eof = false; #ifdef HAVE_POSIX_MEMALIGN - posix_memalign((void**)&me->read_buf, 4096, READ_BUF_LEN * 2); + posix_memalign((void**)&me->read_buf, 4096, SERD_PAGE_SIZE * 2); #else - me->read_buf = (uint8_t*)malloc(READ_BUF_LEN * 2); + me->read_buf = (uint8_t*)malloc(SERD_PAGE_SIZE * 2); #endif /* Read into the second page of the buffer. Occasionally peek_string will move the read_head to before this point when readahead causes a page fault. */ - memset(me->read_buf, '\0', READ_BUF_LEN * 2); - me->read_buf += READ_BUF_LEN; + memset(me->read_buf, '\0', SERD_PAGE_SIZE * 2); + me->read_buf += SERD_PAGE_SIZE; const bool ret = !page(me) || read_turtleDoc(me); - free(me->read_buf - READ_BUF_LEN); + free(me->read_buf - SERD_PAGE_SIZE); me->fd = 0; me->read_buf = NULL; return ret ? SERD_SUCCESS : SERD_ERR_UNKNOWN; diff --git a/src/serd_internal.h b/src/serd_internal.h index 6c00fea1..f5f0f3b4 100644 --- a/src/serd_internal.h +++ b/src/serd_internal.h @@ -22,6 +22,8 @@ #include "serd/serd.h" +#define SERD_PAGE_SIZE 4096 + /** A dynamic stack in memory. */ typedef struct { uint8_t* buf; ///< Stack memory diff --git a/src/serdi.c b/src/serdi.c index 939b63d0..c8bb9242 100644 --- a/src/serdi.c +++ b/src/serdi.c @@ -20,7 +20,9 @@ #include #include "serd/serd.h" + #include "serd-config.h" +#include "serd_internal.h" typedef struct { SerdEnv* env; @@ -45,6 +47,7 @@ print_usage(const char* name, bool error) fprintf(os, "Usage: %s [OPTION]... INPUT [BASE_URI]\n", name); fprintf(os, "Read and write RDF syntax.\n"); fprintf(os, "Use - for INPUT to read from standard input.\n\n"); + fprintf(os, " -B Fast bulk output for large serialisations.\n"); fprintf(os, " -h Display this help and exit\n"); fprintf(os, " -i SYNTAX Input syntax (`turtle' or `ntriples')\n"); fprintf(os, " -o SYNTAX Output syntax (`turtle' or `ntriples')\n"); @@ -87,6 +90,7 @@ main(int argc, char** argv) SerdSyntax input_syntax = SERD_TURTLE; SerdSyntax output_syntax = SERD_NTRIPLES; bool from_file = true; + bool bulk_write = false; const uint8_t* in_name = NULL; const uint8_t* add_prefix = NULL; const uint8_t* chop_prefix = NULL; @@ -96,6 +100,8 @@ main(int argc, char** argv) in_name = (const uint8_t*)"(stdin)"; in_fd = stdin; break; + } else if (argv[a][1] == 'B') { + bulk_write = true; } else if (argv[a][1] == 'h') { return print_usage(argv[0], false); } else if (argv[a][1] == 'v') { @@ -201,8 +207,17 @@ main(int argc, char** argv) output_style |= SERD_STYLE_ABBREVIATED|SERD_STYLE_CURIED; } + SerdSink sink = file_sink; + void* stream = out_fd; + SerdBulkSink* bulk_sink = NULL; + if (bulk_write) { + bulk_sink = serd_bulk_sink_new(sink, stream, SERD_PAGE_SIZE); + sink = (SerdSink)serd_bulk_sink_write; + stream = bulk_sink; + } + SerdWriter* writer = serd_writer_new( - output_syntax, output_style, env, &base_uri, file_sink, out_fd); + output_syntax, output_style, env, &base_uri, sink, stream); if (chop_prefix) { serd_writer_chop_blank_prefix(writer, chop_prefix); @@ -233,6 +248,7 @@ main(int argc, char** argv) serd_writer_finish(state.writer); serd_writer_free(state.writer); + serd_bulk_sink_free(bulk_sink); serd_env_free(state.env); serd_node_free(&base_uri_node); diff --git a/src/sink.c b/src/sink.c new file mode 100644 index 00000000..3eb44b9a --- /dev/null +++ b/src/sink.c @@ -0,0 +1,90 @@ +/* + Copyright 2011 David Robillard + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#define _POSIX_C_SOURCE 201112L /* for posix_memalign */ + +#include +#include + +#include "serd_internal.h" +#include "serd-config.h" + +#ifndef MIN +# define MIN(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +struct SerdBulkSinkImpl { + SerdSink sink; + void* stream; + uint8_t* buf; + size_t size; + size_t block_size; +}; + +SERD_API +SerdBulkSink* +serd_bulk_sink_new(SerdSink sink, void* stream, size_t block_size) +{ + SerdBulkSink* bsink = (SerdBulkSink*)malloc(sizeof(SerdBulkSink)); + bsink->sink = sink; + bsink->stream = stream; + bsink->size = 0; + bsink->block_size = block_size; +#ifdef HAVE_POSIX_MEMALIGN + posix_memalign((void**)&bsink->buf, block_size, block_size); +#else + bsink->buf = (uint8_t*)malloc(block_size); +#endif + return bsink; +} + +SERD_API +void +serd_bulk_sink_free(SerdBulkSink* bsink) +{ + if (bsink) { + // Flush any remaining output + if (bsink->size > 0) { + bsink->sink(bsink->buf, bsink->size, bsink->stream); + } + free(bsink->buf); + free(bsink); + } +} + +SERD_API +size_t +serd_bulk_sink_write(const void* buf, size_t len, SerdBulkSink* bsink) +{ + const size_t orig_len = len; + while (len > 0) { + const size_t space = bsink->block_size - bsink->size; + const size_t n = MIN(space, len); + + // Write as much as possible into the remaining buffer space + memcpy(bsink->buf + bsink->size, buf, n); + bsink->size += n; + buf = (uint8_t*)buf + n; + len -= n; + + // Flush page if buffer is full + if (bsink->size == bsink->block_size) { + bsink->sink(bsink->buf, bsink->block_size, bsink->stream); + bsink->size = 0; + } + } + return orig_len; +} diff --git a/wscript b/wscript index 225b52b9..9bc371d3 100644 --- a/wscript +++ b/wscript @@ -10,7 +10,7 @@ from waflib.extras import autowaf as autowaf import waflib.Logs as Logs, waflib.Options as Options # Version of this package (even if built as a child) -SERD_VERSION = '0.5.0' +SERD_VERSION = '0.6.0' SERD_MAJOR_VERSION = '0' # Library version (UNIX style major, minor, micro) @@ -91,6 +91,7 @@ def build(bld): src/env.c src/node.c src/reader.c + src/sink.c src/string.c src/uri.c src/writer.c -- cgit v1.2.1