/* This file is part of Ingen. Copyright 2007-2012 David Robillard Ingen is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version. Ingen is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. You should have received a copy of the GNU Affero General Public License along with Ingen. If not, see . */ #include #include #include "ingen/Interface.hpp" #include "ingen/shared/World.hpp" #include "ingen/shared/AtomReader.hpp" #include "sord/sordmm.hpp" #include "sratom/sratom.h" #include "SocketInterface.hpp" #include "../server/Event.hpp" #include "../server/PostProcessor.hpp" #include "../server/ThreadManager.hpp" #define LOG(s) s << "[SocketInterface] " namespace Ingen { namespace Socket { SocketInterface::SocketInterface(Ingen::Shared::World& world, int conn) : _world(world) , _iface(*(Server::Engine*)world.local_engine().get(), *this) , _inserter(NULL) , _msg_node(NULL) , _event(NULL) , _conn(conn) { set_name("SocketInterface"); start(); } SocketInterface::~SocketInterface() { std::cerr << "SOCKET INTERFACE EXITING" << std::endl; stop(); join(); close(_conn); } void SocketInterface::event(Server::Event* ev) { if (_event) { std::cerr << "DUAL EVENTS" << std::endl; return; } assert(!_event); ev->pre_process(); _event = ev; _event->next(NULL); } bool SocketInterface::process(Server::PostProcessor& dest, Server::ProcessContext& context, bool limit) { if (_event) { _event->execute(context); dest.append(_event, _event); _event = NULL; } return (_conn != -1); } SerdStatus SocketInterface::set_base_uri(SocketInterface* iface, const SerdNode* uri_node) { return sord_inserter_set_base_uri(iface->_inserter, uri_node); } SerdStatus SocketInterface::set_prefix(SocketInterface* iface, const SerdNode* name, const SerdNode* uri_node) { return sord_inserter_set_prefix(iface->_inserter, name, uri_node); } SerdStatus SocketInterface::write_statement(SocketInterface* iface, SerdStatementFlags flags, const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate, const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang) { if (!iface->_msg_node) { iface->_msg_node = sord_node_from_serd_node( iface->_world.rdf_world()->c_obj(), iface->_env, subject, 0, 0); } return sord_inserter_write_statement( iface->_inserter, flags, graph, subject, predicate, object, object_datatype, object_lang); } void SocketInterface::_run() { Thread::set_context(Server::THREAD_PRE_PROCESS); Sord::World* world = _world.rdf_world(); LV2_URID_Map* map = &_world.lv2_uri_map()->urid_map_feature()->urid_map; // Use as base URI so e.g. will be a path SordNode* base_uri = sord_new_uri( world->c_obj(), (const uint8_t*)"path:"); // Set up sratom and a forge to build LV2 atoms from model Sratom* sratom = sratom_new(map); SerdChunk chunk = { NULL, 0 }; LV2_Atom_Forge forge; lv2_atom_forge_init(&forge, map); lv2_atom_forge_set_sink( &forge, sratom_forge_sink, sratom_forge_deref, &chunk); // Make a model and reader to parse the next Turtle message _env = world->prefixes().c_obj(); SordModel* model = sord_new(world->c_obj(), SORD_SPO, false); _inserter = sord_inserter_new(model, _env); SerdReader* reader = serd_reader_new( SERD_TURTLE, this, NULL, (SerdBaseSink)set_base_uri, (SerdPrefixSink)set_prefix, (SerdStatementSink)write_statement, NULL); serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri)); // Read directly from the connection with serd FILE* f = fdopen(_conn, "r"); if (!f) { LOG(Raul::error) << "Failed to open connection " << _conn << "(" << strerror(errno) << ")" << std::endl; // Connection gone, exit _conn = -1; return; } serd_reader_start_stream(reader, f, (const uint8_t*)"(socket)", false); // Make an AtomReader to call Ingen Interface methods based on Atom Shared::AtomReader ar(*_world.lv2_uri_map().get(), *_world.uris().get(), _world.forge(), _iface); struct pollfd pfd; pfd.fd = _conn; pfd.events = POLLIN; pfd.revents = 0; while (!_exit_flag) { // Wait for input to arrive at socket int ret = poll(&pfd, 1, -1); if (ret == -1 || (pfd.revents & (POLLHUP | POLLNVAL))) { break; // Hangup } else if (!ret) { continue; // No data, shouldn't happen } // Read until the next '.' SerdStatus st = serd_reader_read_chunk(reader); if (st == SERD_FAILURE) { continue; // Read nothing, e.g. just whitespace } else if (st) { fprintf(stderr, "Read error: %s\n", serd_strerror(st)); } // Build an LV2_Atom at chunk.buf from the message sratom_read(sratom, &forge, world->c_obj(), model, _msg_node); // Call _iface methods based on atom content ar.write((LV2_Atom*)chunk.buf); // Respond and close connection write(_conn, "OK", 2); // Reset everything for the next iteration chunk.len = 0; sord_node_free(world->c_obj(), _msg_node); _msg_node = NULL; } fclose(f); sord_inserter_free(_inserter); serd_reader_end_stream(reader); sratom_free(sratom); serd_reader_free(reader); sord_free(model); _conn = -1; } } // namespace Ingen } // namespace Socket