summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/socket/SocketInterface.cpp179
-rw-r--r--src/socket/SocketInterface.hpp24
2 files changed, 148 insertions, 55 deletions
diff --git a/src/socket/SocketInterface.cpp b/src/socket/SocketInterface.cpp
index d9994e3e..430f0962 100644
--- a/src/socket/SocketInterface.cpp
+++ b/src/socket/SocketInterface.cpp
@@ -15,9 +15,7 @@
*/
#include <errno.h>
-#include <sys/fcntl.h>
-#include <sys/socket.h>
-#include <sys/un.h>
+#include <poll.h>
#include "ingen/Interface.hpp"
#include "ingen/shared/World.hpp"
@@ -38,19 +36,18 @@ namespace Socket {
SocketInterface::SocketInterface(Ingen::Shared::World& world, int conn)
: _world(world)
, _iface(*(Server::Engine*)world.local_engine().get(), *this)
+ , _inserter(NULL)
+ , _msg_node(NULL)
, _event(NULL)
, _conn(conn)
{
- // Set connection to non-blocking so parser can read until EOF
- // and not block indefinitely waiting for more input
- fcntl(_conn, F_SETFL, fcntl(_conn, F_GETFL, 0) | O_NONBLOCK);
-
set_name("SocketInterface");
start();
}
SocketInterface::~SocketInterface()
{
+ std::cerr << "SOCKET INTERFACE EXITING" << std::endl;
stop();
join();
close(_conn);
@@ -59,9 +56,14 @@ SocketInterface::~SocketInterface()
void
SocketInterface::event(Server::Event* ev)
{
+ if (_event) {
+ std::cerr << "DUAL EVENTS" << std::endl;
+ return;
+ }
assert(!_event);
ev->pre_process();
_event = ev;
+ _event->next(NULL);
}
bool
@@ -74,74 +76,143 @@ SocketInterface::process(Server::PostProcessor& dest,
dest.append(_event, _event);
_event = NULL;
}
- if (_conn == -1) {
- return false;
+ return (_conn != -1);
+}
+
+SerdStatus
+SocketInterface::set_base_uri(SocketInterface* iface,
+ const SerdNode* uri_node)
+{
+ return sord_inserter_set_base_uri(iface->_inserter, uri_node);
+}
+
+SerdStatus
+SocketInterface::set_prefix(SocketInterface* iface,
+ const SerdNode* name,
+ const SerdNode* uri_node)
+{
+ return sord_inserter_set_prefix(iface->_inserter, name, uri_node);
+}
+
+SerdStatus
+SocketInterface::write_statement(SocketInterface* iface,
+ SerdStatementFlags flags,
+ const SerdNode* graph,
+ const SerdNode* subject,
+ const SerdNode* predicate,
+ const SerdNode* object,
+ const SerdNode* object_datatype,
+ const SerdNode* object_lang)
+{
+ if (!iface->_msg_node) {
+ iface->_msg_node = sord_node_from_serd_node(
+ iface->_world.rdf_world()->c_obj(), iface->_env, subject, 0, 0);
}
- return true;
+
+ return sord_inserter_write_statement(
+ iface->_inserter, flags, graph,
+ subject, predicate, object,
+ object_datatype, object_lang);
}
void
SocketInterface::_run()
{
Thread::set_context(Server::THREAD_PRE_PROCESS);
+
+ Sord::World* world = _world.rdf_world();
+ LV2_URID_Map* map = &_world.lv2_uri_map()->urid_map_feature()->urid_map;
+
+ // Use <path:> as base URI so e.g. </foo/bar> will be a path
+ SordNode* base_uri = sord_new_uri(
+ world->c_obj(), (const uint8_t*)"path:");
+
+ // Set up sratom and a forge to build LV2 atoms from model
+ Sratom* sratom = sratom_new(map);
+ SerdChunk chunk = { NULL, 0 };
+ LV2_Atom_Forge forge;
+ lv2_atom_forge_init(&forge, map);
+ lv2_atom_forge_set_sink(
+ &forge, sratom_forge_sink, sratom_forge_deref, &chunk);
+
+ // Make a model and reader to parse the next Turtle message
+ _env = world->prefixes().c_obj();
+ SordModel* model = sord_new(world->c_obj(), SORD_SPO, false);
+
+ _inserter = sord_inserter_new(model, _env);
+
+ SerdReader* reader = serd_reader_new(
+ SERD_TURTLE, this, NULL,
+ (SerdBaseSink)set_base_uri,
+ (SerdPrefixSink)set_prefix,
+ (SerdStatementSink)write_statement,
+ NULL);
+
+ serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
+
+ // Read directly from the connection with serd
+ FILE* f = fdopen(_conn, "r");
+ if (!f) {
+ LOG(Raul::error) << "Failed to open connection " << _conn
+ << "(" << strerror(errno) << ")" << std::endl;
+ // Connection gone, exit
+ _conn = -1;
+ return;
+ }
+
+ serd_reader_start_stream(reader, f, (const uint8_t*)"(socket)", false);
+
+ // Make an AtomReader to call Ingen Interface methods based on Atom
+ Shared::AtomReader ar(*_world.lv2_uri_map().get(),
+ *_world.uris().get(),
+ _world.forge(),
+ _iface);
+
+ struct pollfd pfd;
+ pfd.fd = _conn;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
while (!_exit_flag) {
- // Set up a reader to parse the Turtle message into a model
- Sord::World* world = _world.rdf_world();
- SerdEnv* env = world->prefixes().c_obj();
- SordModel* model = sord_new(world->c_obj(), SORD_SPO, false);
- SerdReader* reader = sord_new_reader(model, env, SERD_TURTLE, NULL);
- // Set base URI to path: so e.g. </foo/bar> will be a path
- SordNode* base_uri = sord_new_uri(
- world->c_obj(), (const uint8_t*)"path:");
- serd_env_set_base_uri(env, sord_node_to_serd_node(base_uri));
-
- LV2_URID_Map* map = &_world.lv2_uri_map()->urid_map_feature()->urid_map;
-
- // Set up sratom to build an LV2_Atom from the model
- Sratom* sratom = sratom_new(map);
- SerdChunk chunk = { NULL, 0 };
- LV2_Atom_Forge forge;
- lv2_atom_forge_init(&forge, map);
- lv2_atom_forge_set_sink(
- &forge, sratom_forge_sink, sratom_forge_deref, &chunk);
-
- // Read directly from the connection with serd
- FILE* f = fdopen(_conn, "r");
- if (!f) {
- LOG(Raul::error) << "Failed to open connection " << _conn
- << "(" << strerror(errno) << ")" << std::endl;
- // Connection gone, exit
- break;
+ // Wait for input to arrive at socket
+ int ret = poll(&pfd, 1, -1);
+ if (ret == -1 || (pfd.revents & (POLLHUP | POLLNVAL))) {
+ break; // Hangup
+ } else if (!ret) {
+ continue; // No data, shouldn't happen
}
- serd_reader_read_file_handle(reader, f, (const uint8_t*)"(socket)");
-
- // FIXME: Sratom needs work to be able to read resources
- SordNode* msg_node = sord_new_blank(
- world->c_obj(), (const uint8_t*)"genid1");
+ // Read until the next '.'
+ SerdStatus st = serd_reader_read_chunk(reader);
+ if (st == SERD_FAILURE) {
+ continue; // Read nothing, e.g. just whitespace
+ } else if (st) {
+ fprintf(stderr, "Read error: %s\n", serd_strerror(st));
+ }
// Build an LV2_Atom at chunk.buf from the message
- sratom_read(sratom, &forge, world->c_obj(), model, msg_node);
-
- // Make an AtomReader to read that atom and do Ingen things
- Shared::AtomReader ar(*_world.lv2_uri_map().get(),
- *_world.uris().get(),
- _world.forge(),
- _iface);
+ sratom_read(sratom, &forge, world->c_obj(), model, _msg_node);
// Call _iface methods based on atom content
ar.write((LV2_Atom*)chunk.buf);
// Respond and close connection
write(_conn, "OK", 2);
- fclose(f);
- sratom_free(sratom);
- sord_node_free(world->c_obj(), msg_node);
- serd_reader_free(reader);
- sord_free(model);
+ // Reset everything for the next iteration
+ chunk.len = 0;
+ sord_node_free(world->c_obj(), _msg_node);
+ _msg_node = NULL;
}
+ fclose(f);
+
+ sord_inserter_free(_inserter);
+ serd_reader_end_stream(reader);
+ sratom_free(sratom);
+ serd_reader_free(reader);
+ sord_free(model);
+
_conn = -1;
}
diff --git a/src/socket/SocketInterface.hpp b/src/socket/SocketInterface.hpp
index 2a23cc64..b8d339f8 100644
--- a/src/socket/SocketInterface.hpp
+++ b/src/socket/SocketInterface.hpp
@@ -16,9 +16,10 @@
#include <string>
+#include "ingen/Interface.hpp"
#include "raul/SharedPtr.hpp"
#include "raul/Thread.hpp"
-#include "ingen/Interface.hpp"
+#include "sord/sord.h"
#include "../server/EventSink.hpp"
#include "../server/EventSource.hpp"
@@ -45,11 +46,32 @@ public:
~SocketInterface();
+ SordInserter* inserter() { return _inserter; }
+
private:
virtual void _run();
+ static SerdStatus set_base_uri(SocketInterface* iface,
+ const SerdNode* uri_node);
+
+ static SerdStatus set_prefix(SocketInterface* iface,
+ const SerdNode* name,
+ const SerdNode* uri_node);
+
+ static SerdStatus write_statement(SocketInterface* iface,
+ SerdStatementFlags flags,
+ const SerdNode* graph,
+ const SerdNode* subject,
+ const SerdNode* predicate,
+ const SerdNode* object,
+ const SerdNode* object_datatype,
+ const SerdNode* object_lang);
+
Shared::World& _world;
Server::EventWriter _iface;
+ SerdEnv* _env;
+ SordInserter* _inserter;
+ SordNode* _msg_node;
Server::Event* _event;
int _conn;
};