summaryrefslogtreecommitdiffstats
path: root/src/SocketReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/SocketReader.cpp')
-rw-r--r--src/SocketReader.cpp119
1 files changed, 47 insertions, 72 deletions
diff --git a/src/SocketReader.cpp b/src/SocketReader.cpp
index 443c418f..273c9e49 100644
--- a/src/SocketReader.cpp
+++ b/src/SocketReader.cpp
@@ -21,10 +21,11 @@
#include "ingen/Log.hpp"
#include "ingen/URIMap.hpp"
#include "ingen/World.hpp"
+#include "ingen/types.hpp"
#include "lv2/atom/forge.h"
#include "lv2/urid/urid.h"
#include "raul/Socket.hpp"
-#include "sord/sordmm.hpp"
+#include "serd/serd.hpp"
#include <cerrno>
#include <cstdint>
@@ -42,9 +43,6 @@ SocketReader::SocketReader(ingen::World& world,
SPtr<Raul::Socket> sock)
: _world(world)
, _iface(iface)
- , _env()
- , _inserter(nullptr)
- , _msg_node(nullptr)
, _socket(std::move(sock))
, _exit_flag(false)
, _thread(&SocketReader::run, this)
@@ -57,46 +55,10 @@ SocketReader::~SocketReader()
_thread.join();
}
-SerdStatus
-SocketReader::set_base_uri(SocketReader* iface,
- const SerdNode* uri_node)
-{
- return sord_inserter_set_base_uri(iface->_inserter, uri_node);
-}
-
-SerdStatus
-SocketReader::set_prefix(SocketReader* iface,
- const SerdNode* name,
- const SerdNode* uri_node)
-{
- return sord_inserter_set_prefix(iface->_inserter, name, uri_node);
-}
-
-SerdStatus
-SocketReader::write_statement(SocketReader* iface,
- SerdStatementFlags flags,
- const SerdNode* graph,
- const SerdNode* subject,
- const SerdNode* predicate,
- const SerdNode* object,
- const SerdNode* object_datatype,
- const SerdNode* object_lang)
-{
- if (!iface->_msg_node) {
- iface->_msg_node = sord_node_from_serd_node(
- iface->_world.rdf_world()->c_obj(), iface->_env, subject, nullptr, nullptr);
- }
-
- return sord_inserter_write_statement(
- iface->_inserter, flags, graph,
- subject, predicate, object,
- object_datatype, object_lang);
-}
-
void
SocketReader::run()
{
- Sord::World* world = _world.rdf_world();
+ serd::World& world = _world.rdf_world();
LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map;
// Open socket as a FILE for reading directly with serd
@@ -111,33 +73,51 @@ SocketReader::run()
}
// Set up a forge to build LV2 atoms from model
- SordNode* base_uri = nullptr;
- SordModel* model = nullptr;
- AtomForge forge(map);
+ AtomForge forge(world, map);
+ serd::Optional<serd::Node> base_uri;
+ serd::Optional<serd::Model> model;
+ serd::Env env;
+ UPtr<serd::Inserter> inserter;
+ serd::Optional<serd::Node> msg_node;
{
// Lock RDF world
std::lock_guard<std::mutex> lock(_world.rdf_mutex());
// Use <ingen:/> as base URI, so relative URIs are like bundle paths
- base_uri = sord_new_uri(world->c_obj(), (const uint8_t*)"ingen:/");
+ base_uri = serd::make_uri("ingen:/");
// Make a model and reader to parse the next Turtle message
- _env = world->prefixes().c_obj();
- model = sord_new(world->c_obj(), SORD_SPO, false);
+ env = _world.env();
+ model = serd::Model(world, serd::ModelFlag::index_SPO);
// Create an inserter for writing incoming triples to model
- _inserter = sord_inserter_new(model, _env);
+ inserter = UPtr<serd::Inserter>{new serd::Inserter(*model, env)};
}
- SerdReader* reader = serd_reader_new(
- SERD_TURTLE, this, nullptr,
- (SerdBaseSink)set_base_uri,
- (SerdPrefixSink)set_prefix,
- (SerdStatementSink)write_statement,
- nullptr);
+ serd::Sink sink;
+
+ sink.set_base_func([&](const serd::Node& uri) {
+ return inserter->sink().base(uri);
+ });
+
+ sink.set_prefix_func([&](const serd::Node& name, const serd::Node& uri) {
+ return inserter->sink().prefix(name, uri);
+ });
+
+ sink.set_statement_func([&](const serd::StatementFlags flags,
+ const serd::Statement& statement) {
+ if (!msg_node) {
+ msg_node = statement.subject();
+ }
+
+ return inserter->sink().statement(flags, statement);
+ });
+
+ serd::Reader reader(world, serd::Syntax::Turtle, {}, sink, 4096);
- serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
- serd_reader_start_stream(reader, f.get(), (const uint8_t*)"(socket)", false);
+ serd::Node name = serd::make_string("(socket)");
+ env.set_base_uri(*base_uri);
+ reader.start_stream(f.get(), name, 1);
// Make an AtomReader to call Ingen Interface methods based on Atom
AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface);
@@ -165,24 +145,22 @@ SocketReader::run()
std::lock_guard<std::mutex> lock(_world.rdf_mutex());
// Read until the next '.'
- SerdStatus st = serd_reader_read_chunk(reader);
- if (st == SERD_FAILURE || !_msg_node) {
- continue; // Read nothing, e.g. just whitespace
- } else if (st) {
- _world.log().error("Read error: %1%\n", serd_strerror(st));
+ auto st = reader.read_chunk();
+ if (st == serd::Status::failure || !msg_node) {
+ continue; // Read no node (e.g. a directive)
+ } else if (st != serd::Status::success) {
+ _world.log().error("Read error: %1%\n", serd::strerror(st));
continue;
}
- // Build an LV2_Atom at chunk.buf from the message
- forge.read(*world, model, _msg_node);
+ // Build an LV2_Atom from the message
+ auto atom = forge.read(*model, *msg_node);
- // Call _iface methods based on atom content
- ar.write(forge.atom());
+ // Call _iface methods with forged atom
+ ar.write(atom);
// Reset everything for the next iteration
- forge.clear();
- sord_node_free(world->c_obj(), _msg_node);
- _msg_node = nullptr;
+ msg_node.reset();
}
// Lock RDF world
@@ -190,10 +168,7 @@ SocketReader::run()
// Destroy everything
f.reset();
- sord_inserter_free(_inserter);
- serd_reader_end_stream(reader);
- serd_reader_free(reader);
- sord_free(model);
+ reader.finish();
_socket.reset();
}