/*
This file is part of Ingen.
Copyright 2007-2012 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include
#include
#include "ingen/Interface.hpp"
#include "ingen/AtomReader.hpp"
#include "ingen/URIMap.hpp"
#include "ingen/World.hpp"
#include "sord/sordmm.hpp"
#include "sratom/sratom.h"
#include "SocketReader.hpp"
#define LOG(s) s << "[SocketReader] "
namespace Ingen {
namespace Socket {
SocketReader::SocketReader(Ingen::World& world,
Interface& iface,
SharedPtr sock)
: Raul::Thread("SocketReader")
, _world(world)
, _iface(iface)
, _inserter(NULL)
, _msg_node(NULL)
, _socket(sock)
{
start();
}
SocketReader::~SocketReader()
{
stop();
join();
}
SerdStatus
SocketReader::set_base_uri(SocketReader* iface,
const SerdNode* uri_node)
{
return sord_inserter_set_base_uri(iface->_inserter, uri_node);
}
SerdStatus
SocketReader::set_prefix(SocketReader* iface,
const SerdNode* name,
const SerdNode* uri_node)
{
return sord_inserter_set_prefix(iface->_inserter, name, uri_node);
}
SerdStatus
SocketReader::write_statement(SocketReader* iface,
SerdStatementFlags flags,
const SerdNode* graph,
const SerdNode* subject,
const SerdNode* predicate,
const SerdNode* object,
const SerdNode* object_datatype,
const SerdNode* object_lang)
{
if (!iface->_msg_node) {
iface->_msg_node = sord_node_from_serd_node(
iface->_world.rdf_world()->c_obj(), iface->_env, subject, 0, 0);
}
return sord_inserter_write_statement(
iface->_inserter, flags, graph,
subject, predicate, object,
object_datatype, object_lang);
}
void
SocketReader::_run()
{
Sord::World* world = _world.rdf_world();
LV2_URID_Map* map = &_world.uri_map().urid_map_feature()->urid_map;
// Use as base URI so e.g. will be a path
SordNode* base_uri = sord_new_uri(
world->c_obj(), (const uint8_t*)"ingen:root/");
// Set up sratom and a forge to build LV2 atoms from model
Sratom* sratom = sratom_new(map);
SerdChunk chunk = { NULL, 0 };
LV2_Atom_Forge forge;
lv2_atom_forge_init(&forge, map);
lv2_atom_forge_set_sink(
&forge, sratom_forge_sink, sratom_forge_deref, &chunk);
// Make a model and reader to parse the next Turtle message
_env = world->prefixes().c_obj();
SordModel* model = sord_new(world->c_obj(), SORD_SPO, false);
_inserter = sord_inserter_new(model, _env);
SerdReader* reader = serd_reader_new(
SERD_TURTLE, this, NULL,
(SerdBaseSink)set_base_uri,
(SerdPrefixSink)set_prefix,
(SerdStatementSink)write_statement,
NULL);
serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
// Read directly from the connection with serd
FILE* f = fdopen(_socket->fd(), "r");
if (!f) {
LOG(Raul::error) << "Failed to open connection "
<< "(" << strerror(errno) << ")" << std::endl;
// Connection gone, exit
_socket.reset();
return;
}
serd_reader_start_stream(reader, f, (const uint8_t*)"(socket)", false);
// Make an AtomReader to call Ingen Interface methods based on Atom
AtomReader ar(_world.uri_map(), _world.uris(), _world.forge(), _iface);
struct pollfd pfd;
pfd.fd = _socket->fd();
pfd.events = POLLIN;
pfd.revents = 0;
while (!_exit_flag) {
if (feof(f)) {
break; // Lost connection
}
// Wait for input to arrive at socket
int ret = poll(&pfd, 1, -1);
if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) {
break; // Hangup
} else if (!ret) {
continue; // No data, shouldn't happen
}
// Read until the next '.'
SerdStatus st = serd_reader_read_chunk(reader);
if (st == SERD_FAILURE) {
continue; // Read nothing, e.g. just whitespace
} else if (st) {
LOG(Raul::error) << "Read error: " << serd_strerror(st) << std::endl;
continue;
} else if (!_msg_node) {
LOG(Raul::error) << "Received empty message" << std::endl;
continue;
}
// Build an LV2_Atom at chunk.buf from the message
sratom_read(sratom, &forge, world->c_obj(), model, _msg_node);
// Call _iface methods based on atom content
ar.write((const LV2_Atom*)chunk.buf);
// Reset everything for the next iteration
chunk.len = 0;
sord_node_free(world->c_obj(), _msg_node);
_msg_node = NULL;
}
fclose(f);
sord_inserter_free(_inserter);
serd_reader_end_stream(reader);
sratom_free(sratom);
serd_reader_free(reader);
sord_free(model);
delete this;
}
} // namespace Ingen
} // namespace Socket