/*
This file is part of Ingen.
Copyright 2007-2012 David Robillard
Ingen is free software: you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or any later version.
Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
You should have received a copy of the GNU Affero General Public License
along with Ingen. If not, see .
*/
#include
#include
#include "ingen/Interface.hpp"
#include "ingen/shared/World.hpp"
#include "ingen/shared/AtomReader.hpp"
#include "sord/sordmm.hpp"
#include "sratom/sratom.h"
#include "SocketInterface.hpp"
#include "../server/Event.hpp"
#include "../server/PostProcessor.hpp"
#include "../server/ThreadManager.hpp"
#define LOG(s) s << "[SocketInterface] "
namespace Ingen {
namespace Socket {
SocketInterface::SocketInterface(Ingen::Shared::World& world, int conn)
: _world(world)
, _iface(*(Server::Engine*)world.local_engine().get(), *this)
, _inserter(NULL)
, _msg_node(NULL)
, _event(NULL)
, _conn(conn)
{
set_name("SocketInterface");
start();
}
SocketInterface::~SocketInterface()
{
std::cerr << "SOCKET INTERFACE EXITING" << std::endl;
stop();
join();
close(_conn);
}
void
SocketInterface::event(Server::Event* ev)
{
if (_event) {
std::cerr << "DUAL EVENTS" << std::endl;
return;
}
assert(!_event);
ev->pre_process();
_event = ev;
_event->next(NULL);
}
bool
SocketInterface::process(Server::PostProcessor& dest,
Server::ProcessContext& context,
bool limit)
{
if (_event) {
_event->execute(context);
dest.append(_event, _event);
_event = NULL;
}
return (_conn != -1);
}
SerdStatus
SocketInterface::set_base_uri(SocketInterface* iface,
const SerdNode* uri_node)
{
return sord_inserter_set_base_uri(iface->_inserter, uri_node);
}
SerdStatus
SocketInterface::set_prefix(SocketInterface* iface,
const SerdNode* name,
const SerdNode* uri_node)
{
return sord_inserter_set_prefix(iface->_inserter, name, uri_node);
}
SerdStatus
SocketInterface::write_statement(SocketInterface* iface,
SerdStatementFlags flags,
const SerdNode* graph,
const SerdNode* subject,
const SerdNode* predicate,
const SerdNode* object,
const SerdNode* object_datatype,
const SerdNode* object_lang)
{
if (!iface->_msg_node) {
iface->_msg_node = sord_node_from_serd_node(
iface->_world.rdf_world()->c_obj(), iface->_env, subject, 0, 0);
}
return sord_inserter_write_statement(
iface->_inserter, flags, graph,
subject, predicate, object,
object_datatype, object_lang);
}
void
SocketInterface::_run()
{
Thread::set_context(Server::THREAD_PRE_PROCESS);
Sord::World* world = _world.rdf_world();
LV2_URID_Map* map = &_world.lv2_uri_map()->urid_map_feature()->urid_map;
// Use as base URI so e.g. will be a path
SordNode* base_uri = sord_new_uri(
world->c_obj(), (const uint8_t*)"path:");
// Set up sratom and a forge to build LV2 atoms from model
Sratom* sratom = sratom_new(map);
SerdChunk chunk = { NULL, 0 };
LV2_Atom_Forge forge;
lv2_atom_forge_init(&forge, map);
lv2_atom_forge_set_sink(
&forge, sratom_forge_sink, sratom_forge_deref, &chunk);
// Make a model and reader to parse the next Turtle message
_env = world->prefixes().c_obj();
SordModel* model = sord_new(world->c_obj(), SORD_SPO, false);
_inserter = sord_inserter_new(model, _env);
SerdReader* reader = serd_reader_new(
SERD_TURTLE, this, NULL,
(SerdBaseSink)set_base_uri,
(SerdPrefixSink)set_prefix,
(SerdStatementSink)write_statement,
NULL);
serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri));
// Read directly from the connection with serd
FILE* f = fdopen(_conn, "r");
if (!f) {
LOG(Raul::error) << "Failed to open connection " << _conn
<< "(" << strerror(errno) << ")" << std::endl;
// Connection gone, exit
_conn = -1;
return;
}
serd_reader_start_stream(reader, f, (const uint8_t*)"(socket)", false);
// Make an AtomReader to call Ingen Interface methods based on Atom
Shared::AtomReader ar(*_world.lv2_uri_map().get(),
*_world.uris().get(),
_world.forge(),
_iface);
struct pollfd pfd;
pfd.fd = _conn;
pfd.events = POLLIN;
pfd.revents = 0;
while (!_exit_flag) {
// Wait for input to arrive at socket
int ret = poll(&pfd, 1, -1);
if (ret == -1 || (pfd.revents & (POLLHUP | POLLNVAL))) {
break; // Hangup
} else if (!ret) {
continue; // No data, shouldn't happen
}
// Read until the next '.'
SerdStatus st = serd_reader_read_chunk(reader);
if (st == SERD_FAILURE) {
continue; // Read nothing, e.g. just whitespace
} else if (st) {
fprintf(stderr, "Read error: %s\n", serd_strerror(st));
}
// Build an LV2_Atom at chunk.buf from the message
sratom_read(sratom, &forge, world->c_obj(), model, _msg_node);
// Call _iface methods based on atom content
ar.write((LV2_Atom*)chunk.buf);
// Respond and close connection
write(_conn, "OK", 2);
// Reset everything for the next iteration
chunk.len = 0;
sord_node_free(world->c_obj(), _msg_node);
_msg_node = NULL;
}
fclose(f);
sord_inserter_free(_inserter);
serd_reader_end_stream(reader);
sratom_free(sratom);
serd_reader_free(reader);
sord_free(model);
_conn = -1;
}
} // namespace Ingen
} // namespace Socket