/*
This file is part of Ingen.
Copyright 2007-2017 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include "ingen/SocketReader.hpp"
#include "ingen/AtomForge.hpp"
#include "ingen/AtomReader.hpp"
#include "ingen/Log.hpp"
#include "ingen/URIMap.hpp"
#include "ingen/World.hpp"
#include "ingen/types.hpp"
#include "lv2/atom/forge.h"
#include "lv2/urid/urid.h"
#include "raul/Socket.hpp"
#include "serd/serd.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
namespace ingen {
SocketReader::SocketReader(ingen::World& world,
Interface& iface,
SPtr sock)
: _world(world)
, _iface(iface)
, _socket(std::move(sock))
, _exit_flag(false)
, _thread(&SocketReader::run, this)
{}
SocketReader::~SocketReader()
{
_exit_flag = true;
_socket->shutdown();
_thread.join();
}
void
SocketReader::run()
{
serd::World& world = _world.rdf_world();
LV2_URID_Map& map = _world.uri_map().urid_map_feature()->urid_map;
// Open socket as a FILE for reading directly with serd
std::unique_ptr f{fdopen(_socket->fd(), "r"),
&fclose};
if (!f) {
_world.log().error("Failed to open connection (%1%)\n",
strerror(errno));
// Connection gone, exit
_socket.reset();
return;
}
// Set up a forge to build LV2 atoms from model
AtomForge forge(world, map);
serd::Optional base_uri;
serd::Optional model;
serd::Env env;
UPtr inserter;
serd::Optional msg_node;
{
// Lock RDF world
std::lock_guard lock(_world.rdf_mutex());
// Use as base URI, so relative URIs are like bundle paths
base_uri = serd::make_uri("ingen:/");
// Make a model and reader to parse the next Turtle message
env = _world.env();
model = serd::Model(world, serd::ModelFlag::index_SPO);
// Create an inserter for writing incoming triples to model
inserter = UPtr{new serd::Inserter(*model, env)};
}
serd::Sink sink;
sink.set_base_func([&](const serd::Node& uri) {
return inserter->sink().base(uri);
});
sink.set_prefix_func([&](const serd::Node& name, const serd::Node& uri) {
return inserter->sink().prefix(name, uri);
});
sink.set_statement_func([&](const serd::StatementFlags flags,
const serd::Statement& statement) {
if (!msg_node) {
msg_node = statement.subject();
}
return inserter->sink().statement(flags, statement);
});
serd::Reader reader(world, serd::Syntax::Turtle, {}, sink, 4096);
serd::Node name = serd::make_string("(socket)");
env.set_base_uri(*base_uri);
reader.start_stream(f.get(), name, 1);
// Make an AtomReader to call Ingen Interface methods based on Atom
AtomReader ar(_world.uri_map(), _world.uris(), _world.log(), _iface);
struct pollfd pfd{};
pfd.fd = _socket->fd();
pfd.events = POLLIN;
pfd.revents = 0;
while (!_exit_flag) {
if (feof(f.get())) {
break; // Lost connection
}
// Wait for input to arrive at socket
int ret = poll(&pfd, 1, -1);
if (ret == -1 || (pfd.revents & (POLLERR|POLLHUP|POLLNVAL))) {
on_hangup();
break; // Hangup
} else if (!ret) {
continue; // No data, shouldn't happen
}
// Lock RDF world
std::lock_guard lock(_world.rdf_mutex());
// Read until the next '.'
auto st = reader.read_chunk();
if (st == serd::Status::failure || !msg_node) {
continue; // Read no node (e.g. a directive)
} else if (st != serd::Status::success) {
_world.log().error("Read error: %1%\n", serd::strerror(st));
continue;
}
// Build an LV2_Atom from the message
auto atom = forge.read(*model, *msg_node);
// Call _iface methods with forged atom
ar.write(atom);
// Reset everything for the next iteration
msg_node.reset();
}
// Lock RDF world
std::lock_guard lock(_world.rdf_mutex());
// Destroy everything
f.reset();
reader.finish();
_socket.reset();
}
} // namespace ingen