From 281bbcc6a7208c28283bc9bdd521c5d6cc48a60f Mon Sep 17 00:00:00 2001 From: David Robillard Date: Thu, 10 May 2012 02:14:55 +0000 Subject: Bidirectional socket communication (GUI once again works remotely). git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4335 a436a847-0d15-0410-975c-d299462d15a1 --- ingen/shared/World.hpp | 4 +- scripts/ingenish | 13 ++-- src/gui/ConnectWindow.hpp | 2 +- src/gui/ingen_gui.ui | 2 +- src/ingen/main.cpp | 4 ++ src/server/ClientBroadcaster.cpp | 10 +-- src/server/EventWriter.cpp | 88 ++++++++++++------------- src/server/EventWriter.hpp | 12 ++-- src/server/ingen_lv2.cpp | 2 +- src/shared/AtomReader.cpp | 16 +++++ src/shared/AtomWriter.cpp | 2 + src/shared/Configuration.cpp | 2 +- src/shared/World.cpp | 4 +- src/socket/Socket.cpp | 132 +++++++++++++++++++++++++++---------- src/socket/Socket.hpp | 62 ++++++++++++----- src/socket/SocketClient.hpp | 52 +++++++++++++++ src/socket/SocketListener.cpp | 22 +++---- src/socket/SocketReader.cpp | 18 ++--- src/socket/SocketReader.hpp | 25 ++++--- src/socket/SocketServer.hpp | 53 +++++++++++++++ src/socket/SocketWriter.cpp | 74 +++++++++++++++++++++ src/socket/SocketWriter.hpp | 65 ++++++++++++++++++ src/socket/ingen_socket_client.cpp | 57 ++++++++++++++++ src/socket/wscript | 18 ++++- 24 files changed, 579 insertions(+), 160 deletions(-) create mode 100644 src/socket/SocketClient.hpp create mode 100644 src/socket/SocketServer.hpp create mode 100644 src/socket/SocketWriter.cpp create mode 100644 src/socket/SocketWriter.hpp create mode 100644 src/socket/ingen_socket_client.cpp diff --git a/ingen/shared/World.hpp b/ingen/shared/World.hpp index f43afaa0..6279e31f 100644 --- a/ingen/shared/World.hpp +++ b/ingen/shared/World.hpp @@ -76,14 +76,14 @@ public: typedef SharedPtr (*InterfaceFactory)( World* world, const std::string& engine_url, - SharedPtr respond_to); + SharedPtr respondee); virtual void add_interface_factory(const std::string& scheme, InterfaceFactory factory); virtual SharedPtr interface( const std::string& engine_url, - SharedPtr respond_to); + SharedPtr respondee); virtual bool run(const std::string& mime_type, const std::string& filename); diff --git a/scripts/ingenish b/scripts/ingenish index a142a392..6e73915a 100755 --- a/scripts/ingenish +++ b/scripts/ingenish @@ -35,7 +35,6 @@ class Client: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) parsed = re.split('[:/]', uri[len('tcp://'):]) addr = (parsed[0], int(parsed[1])) - print addr self.sock.connect(addr) def __del__(self): @@ -49,12 +48,12 @@ class Client: def send(self, msg): self.sock.send(self.msgencode(msg)) - response = self.sock.recv(1024) - if response != self.msgencode('OK'): - print('Error: %s' % response) - return False - else: - return True + #response = self.sock.recv(1024) + # if response != self.msgencode('OK'): + # print('Error: %s' % response) + # return False + # else: + # return True def put(self, path, body): return self.send(''' diff --git a/src/gui/ConnectWindow.hpp b/src/gui/ConnectWindow.hpp index 1f7b8abd..e19b91cc 100644 --- a/src/gui/ConnectWindow.hpp +++ b/src/gui/ConnectWindow.hpp @@ -32,7 +32,7 @@ class App; /** The initially visible "Connect to engine" window. * * This handles actually connecting to the engine and making sure everything - * is ready before really launching the app (eg wait for the root patch). + * is ready before really launching the app. * * \ingroup GUI */ diff --git a/src/gui/ingen_gui.ui b/src/gui/ingen_gui.ui index 16549a27..8a043c30 100644 --- a/src/gui/ingen_gui.ui +++ b/src/gui/ingen_gui.ui @@ -523,7 +523,7 @@ Contributors: True True 28 - osc.udp://localhost:16180 + unix:///tmp/ingen.sock True diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp index d78753c8..401986ea 100644 --- a/src/ingen/main.cpp +++ b/src/ingen/main.cpp @@ -135,6 +135,10 @@ main(int argc, char** argv) if (!engine_interface) { ingen_try(world->load_module("client"), "Unable to load client module"); + #ifdef HAVE_SOCKET + ingen_try(world->load_module("socket_client"), + "Unable to load socket client module"); + #endif const char* const uri = conf.option("connect").get_string(); SharedPtr client(new Client::SigClientInterface()); ingen_try((engine_interface = world->interface(uri, client)), diff --git a/src/server/ClientBroadcaster.cpp b/src/server/ClientBroadcaster.cpp index e44475b1..46e6fb2f 100644 --- a/src/server/ClientBroadcaster.cpp +++ b/src/server/ClientBroadcaster.cpp @@ -39,14 +39,8 @@ void ClientBroadcaster::register_client(const URI& uri, Interface* client) { Glib::Mutex::Lock lock(_clients_mutex); - Clients::iterator i = _clients.find(uri); - - if (i == _clients.end()) { - _clients[uri] = client; - LOG(info) << "Registered client: " << uri << endl; - } else { - LOG(warn) << "Client already registered: " << uri << endl; - } + LOG(info) << "Registered client: " << uri << endl; + _clients[uri] = client; } /** Remove a client from the list of registered clients. diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp index ca8ad196..19a738b0 100644 --- a/src/server/EventWriter.cpp +++ b/src/server/EventWriter.cpp @@ -35,10 +35,9 @@ namespace Ingen { namespace Server { EventWriter::EventWriter(Engine& engine) - : _request_client(NULL) + : _engine(engine) + , _respondee(NULL) , _request_id(-1) - , _engine(engine) - , _in_bundle(false) { } @@ -49,48 +48,32 @@ EventWriter::~EventWriter() SampleCount EventWriter::now() const { - // Exactly one cycle latency (some could run ASAP if we get lucky, but not always, and a slight - // constant latency is far better than jittery lower (average) latency - if (_engine.driver()) - return _engine.driver()->frame_time() + _engine.driver()->block_length(); - else - return 0; + /* Exactly one cycle latency (some could run ASAP if we get lucky, but not + always, and a slight constant latency is far better than jittery lower + (average) latency */ + return (_engine.driver()) + ? _engine.driver()->frame_time() + _engine.driver()->block_length() + : 0; } void EventWriter::set_response_id(int32_t id) { - if (!_request_client) { // Kludge - _request_client = _engine.broadcaster()->client( + if (!_respondee) { // Kludge + _respondee = _engine.broadcaster()->client( "http://drobilla.net/ns/ingen#internal"); } _request_id = id; } -/* *** ServerInterface implementation below here *** */ - -// Bundle commands - -void -EventWriter::bundle_begin() -{ - _in_bundle = true; -} - -void -EventWriter::bundle_end() -{ - _in_bundle = false; -} - -// Object commands - void EventWriter::put(const URI& uri, const Resource::Properties& properties, const Resource::Graph ctx) { - _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); + _engine.enqueue_event( + new Events::SetMetadata(_engine, _respondee, _request_id, now(), + true, ctx, uri, properties)); } void @@ -98,26 +81,31 @@ EventWriter::delta(const URI& uri, const Resource::Properties& remove, const Resource::Properties& add) { - _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); + _engine.enqueue_event( + new Events::SetMetadata(_engine, _respondee, _request_id, now(), + false, Resource::DEFAULT, uri, add, remove)); } void EventWriter::move(const Path& old_path, const Path& new_path) { - _engine.enqueue_event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); + _engine.enqueue_event( + new Events::Move(_engine, _respondee, _request_id, now(), + old_path, new_path)); } void EventWriter::del(const URI& uri) { if (uri == "ingen:engine") { - if (_request_client) { - _request_client->response(_request_id, SUCCESS); + if (_respondee) { + _respondee->response(_request_id, SUCCESS); } _engine.quit(); } else { - _engine.enqueue_event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event( + new Events::Delete(_engine, _respondee, _request_id, now(), uri)); } } @@ -125,7 +113,9 @@ void EventWriter::connect(const Path& tail_path, const Path& head_path) { - _engine.enqueue_event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); + _engine.enqueue_event( + new Events::Connect(_engine, _respondee, _request_id, now(), + tail_path, head_path)); } @@ -139,15 +129,18 @@ EventWriter::disconnect(const Path& src, return; } - _engine.enqueue_event(new Events::Disconnect(_engine, _request_client, _request_id, now(), - Path(src.str()), Path(dst.str()))); + _engine.enqueue_event( + new Events::Disconnect(_engine, _respondee, _request_id, now(), + src, dst)); } void EventWriter::disconnect_all(const Path& patch_path, const Path& path) { - _engine.enqueue_event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); + _engine.enqueue_event( + new Events::DisconnectAll(_engine, _respondee, _request_id, now(), + patch_path, path)); } void @@ -159,27 +152,28 @@ EventWriter::set_property(const URI& uri, && value.type() == _engine.world()->forge().Bool) { if (value.get_bool()) { _engine.activate(); - _engine.enqueue_event(new Events::Ping(_engine, _request_client, _request_id, now())); + _engine.enqueue_event( + new Events::Ping(_engine, _respondee, _request_id, now())); } else { - _engine.enqueue_event(new Events::Deactivate(_engine, _request_client, _request_id, now())); + _engine.enqueue_event( + new Events::Deactivate(_engine, _respondee, _request_id, now())); } } else { Resource::Properties remove; remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard)); Resource::Properties add; add.insert(make_pair(predicate, value)); - _engine.enqueue_event(new Events::SetMetadata( - _engine, _request_client, _request_id, now(), false, Resource::DEFAULT, - uri, add, remove)); + _engine.enqueue_event( + new Events::SetMetadata(_engine, _respondee, _request_id, now(), + false, Resource::DEFAULT, uri, add, remove)); } } -// Requests // - void EventWriter::get(const URI& uri) { - _engine.enqueue_event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event( + new Events::Get(_engine, _respondee, _request_id, now(), uri)); } } // namespace Server diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp index 1c5d11db..9e334f98 100644 --- a/src/server/EventWriter.hpp +++ b/src/server/EventWriter.hpp @@ -42,13 +42,13 @@ public: Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } - void set_response_interface(Interface* iface) { _request_client = iface; } + void set_respondee(Interface* iface) { _respondee = iface; } virtual void set_response_id(int32_t id); - virtual void bundle_begin(); + virtual void bundle_begin() {} - virtual void bundle_end(); + virtual void bundle_end() {} virtual void put(const Raul::URI& path, const Resource::Properties& properties, @@ -79,13 +79,13 @@ public: virtual void get(const Raul::URI& uri); virtual void response(int32_t id, Status status) {} ///< N/A + virtual void error(const std::string& msg) {} ///< N/A protected: - Interface* _request_client; - int32_t _request_id; Engine& _engine; - bool _in_bundle; ///< True iff a bundle is currently being received + Interface* _respondee; + int32_t _request_id; private: SampleCount now() const; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 5e418485..733f7a52 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -438,7 +438,7 @@ ingen_instantiate(const LV2_Descriptor* descriptor, LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate); engine->set_driver(SharedPtr(driver)); - interface->set_response_interface(&driver->writer()); + interface->set_respondee(&driver->writer()); engine->register_client("http://drobilla.net/ns/ingen#internal", &driver->writer()); diff --git a/src/shared/AtomReader.cpp b/src/shared/AtomReader.cpp index 7cafd919..45b60c35 100644 --- a/src/shared/AtomReader.cpp +++ b/src/shared/AtomReader.cpp @@ -188,6 +188,22 @@ AtomReader::write(const LV2_Atom* msg) get_props(remove, remove_props); _iface.delta(subject_uri, remove_props, add_props); + } else if (obj->body.otype == _uris.patch_Response) { + const LV2_Atom* request = NULL; + const LV2_Atom* body = NULL; + lv2_atom_object_get(obj, + (LV2_URID)_uris.patch_request, &request, + (LV2_URID)_uris.patch_body, &body, + 0); + if (!request || request->type != _uris.atom_Int) { + Raul::warn << "Response message has no request" << std::endl; + return; + } else if (!body || body->type != _uris.atom_Int) { + Raul::warn << "Response message body is not integer" << std::endl; + return; + } + _iface.response(((LV2_Atom_Int*)request)->body, + (Ingen::Status)((LV2_Atom_Int*)body)->body); } else { Raul::warn << "Unknown object type <" << _map.unmap_uri(obj->body.otype) diff --git a/src/shared/AtomWriter.cpp b/src/shared/AtomWriter.cpp index c9af3002..8d88ef11 100644 --- a/src/shared/AtomWriter.cpp +++ b/src/shared/AtomWriter.cpp @@ -264,6 +264,8 @@ AtomWriter::response(int32_t id, Status status) lv2_atom_forge_blank(&_forge, &msg, next_id(), _uris.patch_Response); lv2_atom_forge_property_head(&_forge, _uris.patch_request, 0); lv2_atom_forge_int(&_forge, id); + lv2_atom_forge_property_head(&_forge, _uris.patch_body, 0); + lv2_atom_forge_int(&_forge, status); lv2_atom_forge_pop(&_forge, &msg); finish_msg(); } diff --git a/src/shared/Configuration.cpp b/src/shared/Configuration.cpp index f1eda529..aecf0e5b 100644 --- a/src/shared/Configuration.cpp +++ b/src/shared/Configuration.cpp @@ -37,7 +37,7 @@ Configuration::Configuration() " ingen -egl patch.ingen # Run an engine and a GUI and load a patch bundle") { add("client-port", 'C', "Client port", INT, Value()); - add("connect", 'c', "Connect to engine URI", STRING, Value("osc.udp://localhost:16180")); + add("connect", 'c', "Connect to engine URI", STRING, Value("unix:///tmp/ingen.sock")); add("engine", 'e', "Run (JACK) engine", BOOL, Value(false)); add("engine-port", 'E', "Engine listen port", INT, Value(16180)); add("socket", 'S', "Engine socket path", STRING, Value("/tmp/ingen.sock")); diff --git a/src/shared/World.cpp b/src/shared/World.cpp index b99dfd1a..d2d5f74c 100644 --- a/src/shared/World.cpp +++ b/src/shared/World.cpp @@ -273,7 +273,7 @@ World::unload_modules() */ SharedPtr World::interface(const std::string& engine_url, - SharedPtr respond_to) + SharedPtr respondee) { const string scheme = engine_url.substr(0, engine_url.find(":")); const Pimpl::InterfaceFactories::const_iterator i = _impl->interface_factories.find(scheme); @@ -282,7 +282,7 @@ World::interface(const std::string& engine_url, return SharedPtr(); } - return i->second(this, engine_url, respond_to); + return i->second(this, engine_url, respondee); } /** Run a script of type @a mime_type at filename @a filename */ diff --git a/src/socket/Socket.cpp b/src/socket/Socket.cpp index 7fb4d89b..daeb6fa2 100644 --- a/src/socket/Socket.cpp +++ b/src/socket/Socket.cpp @@ -15,11 +15,12 @@ */ #include +#include #include #include +#include #include #include -#include #include #include @@ -33,51 +34,104 @@ namespace Ingen { namespace Socket { -bool -Socket::open_unix(const std::string& uri, const std::string& path) +#ifndef NI_MAXHOST +# define NI_MAXHOST 1025 +#endif +#ifndef NI_MAXSERV +# define NI_MAXSERV 32 +#endif + +Socket::Socket(Type t) + : _type(t) + , _addr(NULL) + , _addr_len(0) + , _sock(-1) { - if ((_sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { - return false; + switch (t) { + case UNIX: + _sock = socket(AF_UNIX, SOCK_STREAM, 0); + break; + case TCP: + _sock = socket(AF_INET, SOCK_STREAM, 0); + break; } +} - struct sockaddr_un* uaddr = (struct sockaddr_un*)calloc( - 1, sizeof(struct sockaddr_un)); - uaddr->sun_family = AF_UNIX; - strncpy(uaddr->sun_path, path.c_str(), sizeof(uaddr->sun_path) - 1); - _uri = uri; - _addr = (sockaddr*)uaddr; - _addr_len = sizeof(struct sockaddr_un); - - return bind(); +Socket::Socket(Type t, + const std::string& uri, + struct sockaddr* addr, + socklen_t addr_len, + int fd) + : _type(t) + , _uri(uri) + , _addr(addr) + , _addr_len(addr_len) + , _sock(fd) +{ } bool -Socket::open_tcp(const std::string& uri, uint16_t port) +Socket::set_addr(const std::string& uri) { - if ((_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - return false; + if (_type == UNIX && uri.substr(0, strlen("unix://")) == "unix://") { + const std::string path = uri.substr(strlen("unix://")); + struct sockaddr_un* uaddr = (struct sockaddr_un*)calloc( + 1, sizeof(struct sockaddr_un)); + uaddr->sun_family = AF_UNIX; + strncpy(uaddr->sun_path, path.c_str(), sizeof(uaddr->sun_path) - 1); + _uri = uri; + _addr = (sockaddr*)uaddr; + _addr_len = sizeof(struct sockaddr_un); + return true; + } else if (_type == TCP && uri.find("://") != std::string::npos) { + const std::string no_scheme = uri.substr(uri.find("://") + 4); + const size_t port_sep = no_scheme.find(':'); + if (port_sep == std::string::npos) { + return false; + } + + const std::string host = no_scheme.substr(0, port_sep); + const std::string port = no_scheme.substr(port_sep + 1).c_str(); + + struct addrinfo* ainfo; + int st = 0; + if ((st = getaddrinfo(host.c_str(), port.c_str(), NULL, &ainfo))) { + LOG(Raul::error) << "Error in getaddrinfo: " + << gai_strerror(st) << std::endl; + return false; + } + + _uri = uri; + _addr = (struct sockaddr*)malloc(ainfo->ai_addrlen); + _addr_len = ainfo->ai_addrlen; + memcpy(_addr, ainfo->ai_addr, ainfo->ai_addrlen); + return true; } + return false; +} - struct sockaddr_in* naddr = (struct sockaddr_in*)calloc( - 1, sizeof(struct sockaddr_in)); - naddr->sin_family = AF_INET; - naddr->sin_port = htons(port); - _uri = uri; - _addr = (sockaddr*)naddr; - _addr_len = sizeof(struct sockaddr_in); - - return bind(); +bool +Socket::bind(const std::string& uri) +{ + if (set_addr(uri) && ::bind(_sock, _addr, _addr_len) != -1) { + return true; + } + + LOG(Raul::error) << "Failed to bind " << _uri + << ": " << strerror(errno) << std::endl; + return false; } bool -Socket::bind() +Socket::connect(const std::string& uri) { - if (::bind(_sock, _addr, _addr_len) == -1) { - LOG(Raul::error) << "Failed to bind " << _uri - << ": " << strerror(errno) << std::endl; - return false; + if (set_addr(uri) && ::connect(_sock, _addr, _addr_len) != -1) { + return true; } - return true; + + LOG(Raul::error) << "Failed to connect " << _uri + << ": " << strerror(errno) << std::endl; + return false; } bool @@ -92,10 +146,9 @@ Socket::listen() } } -int +SharedPtr Socket::accept() { - // Accept connection from client socklen_t client_addr_len = _addr_len; struct sockaddr* client_addr = (struct sockaddr*)calloc( 1, client_addr_len); @@ -104,9 +157,18 @@ Socket::accept() if (conn == -1) { LOG(Raul::error) << "Error accepting connection: " << strerror(errno) << std::endl; + return SharedPtr(); } - return conn; + std::string client_uri = _uri; + char host[NI_MAXHOST]; + if (getnameinfo(client_addr, client_addr_len, + host, sizeof(host), NULL, 0, 0)) { + client_uri = _uri.substr(0, _uri.find(":") + 1) + host; + } + + return SharedPtr( + new Socket(_type, client_uri, client_addr, client_addr_len, conn)); } void diff --git a/src/socket/Socket.hpp b/src/socket/Socket.hpp index fa2b6972..dd62aefa 100644 --- a/src/socket/Socket.hpp +++ b/src/socket/Socket.hpp @@ -14,32 +14,59 @@ along with Ingen. If not, see . */ +#ifndef INGEN_SOCKET_SOCKET_HPP +#define INGEN_SOCKET_SOCKET_HPP + #include #include #include +#include "raul/SharedPtr.hpp" +#include "raul/Noncopyable.hpp" + namespace Ingen { namespace Socket { -class Socket { +/** A safe and simple interface for UNIX or TCP sockets. */ +class Socket : public Raul::Noncopyable { public: - Socket() : _addr(NULL), _addr_len(0), _sock(-1) {} + enum Type { + UNIX, + TCP + }; + + static Type type_from_uri(const std::string& uri) { + if (uri.substr(0, strlen("unix://")) == "unix://") { + return UNIX; + } else { + return TCP; + } + } + + /** Create a new unbound/unconnected socket of a given type. */ + Socket(Type t); + + /** Wrap an existing open socket. */ + Socket(Type t, + const std::string& uri, + struct sockaddr* addr, + socklen_t addr_len, + int fd); + ~Socket() { close(); } - /** Open UNIX socket and bind to address. - * @param uri URI used for identification and log output. - * @param path Socket path. - * @return True on success + /** Bind a server socket to an address. + * @param uri Address URI, e.g. unix:///tmp/foo or tcp://somehost:1234 + * @return True on success. */ - bool open_unix(const std::string& uri, const std::string& path); + bool bind(const std::string& uri); - /** Open TCP socket and bind to address. - * @param uri URI used for identification and log output. - * @param port Port number. - * @return True on success + /** Connect a client socket to a server address. + * @param uri Address URI, e.g. unix:///tmp/foo or tcp://somehost:1234 + * @return True on success. */ - bool open_tcp(const std::string& uri, uint16_t port); + bool connect(const std::string& uri); /** Mark server socket as passive to listen for incoming connections. * @return True on success. @@ -47,19 +74,22 @@ public: bool listen(); /** Accept a connection. - * @return The socket file descriptor, or -1 on error. + * @return An new open socket for the connection. */ - int accept(); + SharedPtr accept(); /** Return the file descriptor for the socket. */ int fd() { return _sock; } + const std::string& uri() const { return _uri; } + /** Close the socket. */ void close(); private: - bool bind(); + bool set_addr(const std::string& uri); + Type _type; std::string _uri; struct sockaddr* _addr; socklen_t _addr_len; @@ -68,3 +98,5 @@ private: } // namespace Socket } // namespace Ingen + +#endif // INGEN_SOCKET_SOCKET_HPP diff --git a/src/socket/SocketClient.hpp b/src/socket/SocketClient.hpp new file mode 100644 index 00000000..22e6eeb4 --- /dev/null +++ b/src/socket/SocketClient.hpp @@ -0,0 +1,52 @@ +/* + This file is part of Ingen. + Copyright 2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_SOCKET_SOCKET_CLIENT_HPP +#define INGEN_SOCKET_SOCKET_CLIENT_HPP + +#include "SocketReader.hpp" +#include "SocketWriter.hpp" + +namespace Ingen { +namespace Socket { + +/** The client side of an Ingen socket connection. */ +class SocketClient : public SocketWriter +{ +public: + SocketClient(Shared::World& world, + const std::string& uri, + SharedPtr sock, + SharedPtr respondee) + : SocketWriter(*world.lv2_uri_map().get(), + *world.uris().get(), + uri, + sock) + , _respondee(respondee) + , _reader(world, *respondee.get(), sock) + { + _reader.start(); + } + +private: + SharedPtr _respondee; + SocketReader _reader; +}; + +} // namespace Socket +} // namespace Ingen + +#endif // INGEN_SOCKET_SOCKET_CLIENT_HPP diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp index 6f41383b..fa58ff44 100644 --- a/src/socket/SocketListener.cpp +++ b/src/socket/SocketListener.cpp @@ -29,7 +29,7 @@ #include "../server/Engine.hpp" #include "../server/EventWriter.hpp" #include "SocketListener.hpp" -#include "SocketReader.hpp" +#include "SocketServer.hpp" #define LOG(s) s << "[SocketListener] " @@ -38,13 +38,15 @@ namespace Socket { SocketListener::SocketListener(Ingen::Shared::World& world) : _world(world) + , _unix_sock(Socket::UNIX) + , _net_sock(Socket::TCP) { set_name("SocketListener"); // Create UNIX socket _unix_path = world.conf()->option("socket").get_string(); const std::string unix_uri = "unix://" + _unix_path; - if (!_unix_sock.open_unix(unix_uri, _unix_path) || !_unix_sock.listen()) { + if (!_unix_sock.bind(unix_uri) || !_unix_sock.listen()) { LOG(Raul::error) << "Failed to create UNIX socket" << std::endl; _unix_sock.close(); } @@ -54,7 +56,7 @@ SocketListener::SocketListener(Ingen::Shared::World& world) std::ostringstream ss; ss << "tcp:///localhost:"; ss << port; - if (!_net_sock.open_tcp(ss.str(), port) || !_net_sock.listen()) { + if (!_net_sock.bind(ss.str()) || !_net_sock.listen()) { LOG(Raul::error) << "Failed to create TCP socket" << std::endl; _net_sock.close(); } @@ -103,18 +105,16 @@ SocketListener::_run() } if (pfds[0].revents & POLLIN) { - int conn = _unix_sock.accept(); - if (conn != -1) { - // Make an new interface/thread to handle the connection - new SocketReader(_world, *engine->interface(), conn); + SharedPtr conn = _unix_sock.accept(); + if (conn) { + new SocketServer(_world, *engine, conn); } } if (pfds[1].revents & POLLIN) { - int conn = _net_sock.accept(); - if (conn != -1) { - // Make an new interface/thread to handle the connection - new SocketReader(_world, *engine->interface(), conn); + SharedPtr conn = _net_sock.accept(); + if (conn) { + new SocketServer(_world, *engine, conn); } } } diff --git a/src/socket/SocketReader.cpp b/src/socket/SocketReader.cpp index 2d1b5c14..74d441ef 100644 --- a/src/socket/SocketReader.cpp +++ b/src/socket/SocketReader.cpp @@ -31,12 +31,12 @@ namespace Socket { SocketReader::SocketReader(Ingen::Shared::World& world, Interface& iface, - int conn) + SharedPtr sock) : _world(world) , _iface(iface) , _inserter(NULL) , _msg_node(NULL) - , _conn(conn) + , _socket(sock) { set_name("SocketReader"); start(); @@ -46,7 +46,6 @@ SocketReader::~SocketReader() { stop(); join(); - close(_conn); } SerdStatus @@ -119,12 +118,12 @@ SocketReader::_run() serd_env_set_base_uri(_env, sord_node_to_serd_node(base_uri)); // Read directly from the connection with serd - FILE* f = fdopen(_conn, "r"); + FILE* f = fdopen(_socket->fd(), "r"); if (!f) { - LOG(Raul::error) << "Failed to open connection " << _conn + LOG(Raul::error) << "Failed to open connection " << "(" << strerror(errno) << ")" << std::endl; // Connection gone, exit - _conn = -1; + _socket.reset(); return; } @@ -137,7 +136,7 @@ SocketReader::_run() _iface); struct pollfd pfd; - pfd.fd = _conn; + pfd.fd = _socket->fd(); pfd.events = POLLIN; pfd.revents = 0; @@ -159,7 +158,7 @@ SocketReader::_run() if (st == SERD_FAILURE) { continue; // Read nothing, e.g. just whitespace } else if (st) { - fprintf(stderr, "Read error: %s\n", serd_strerror(st)); + LOG(Raul::error) << "Read error: " << serd_strerror(st) << std::endl; continue; } else if (!_msg_node) { LOG(Raul::error) << "Received empty message" << std::endl; @@ -172,9 +171,6 @@ SocketReader::_run() // Call _iface methods based on atom content ar.write((LV2_Atom*)chunk.buf); - // Respond and close connection - write(_conn, "OK", 2); - // Reset everything for the next iteration chunk.len = 0; sord_node_free(world->c_obj(), _msg_node); diff --git a/src/socket/SocketReader.hpp b/src/socket/SocketReader.hpp index 141e6216..5e205186 100644 --- a/src/socket/SocketReader.hpp +++ b/src/socket/SocketReader.hpp @@ -20,19 +20,24 @@ #include "raul/Thread.hpp" #include "sord/sord.h" +#include "Socket.hpp" + namespace Ingen { -namespace Shared { -class World; class Interface; -} + +namespace Shared { class World; } namespace Socket { +/** Calls Interface methods based on Turtle messages received via socket. */ class SocketReader : public Raul::Thread { public: - SocketReader(Shared::World& world, Interface& iface, int conn); + SocketReader(Shared::World& world, + Interface& iface, + SharedPtr sock); + ~SocketReader(); private: @@ -54,12 +59,12 @@ private: const SerdNode* object_datatype, const SerdNode* object_lang); - Shared::World& _world; - Interface& _iface; - SerdEnv* _env; - SordInserter* _inserter; - SordNode* _msg_node; - int _conn; + Shared::World& _world; + Interface& _iface; + SerdEnv* _env; + SordInserter* _inserter; + SordNode* _msg_node; + SharedPtr _socket; }; } // namespace Ingen diff --git a/src/socket/SocketServer.hpp b/src/socket/SocketServer.hpp new file mode 100644 index 00000000..8ea0f445 --- /dev/null +++ b/src/socket/SocketServer.hpp @@ -0,0 +1,53 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_SOCKET_SOCKET_SERVER_HPP +#define INGEN_SOCKET_SOCKET_SERVER_HPP + +#include "../server/EventWriter.hpp" +#include "Socket.hpp" +#include "SocketReader.hpp" +#include "SocketWriter.hpp" + +namespace Ingen { +namespace Socket { + +/** The server side of an Ingen socket connection. */ +class SocketServer : public Server::EventWriter, public SocketReader +{ +public: + SocketServer(Shared::World& world, + Server::Engine& engine, + SharedPtr sock) + : Server::EventWriter(engine) + , SocketReader(world, *this, sock) + , _writer(*world.lv2_uri_map().get(), + *world.uris().get(), + sock->uri(), + sock) + { + set_respondee(&_writer); + engine.register_client(sock->uri(), &_writer); + } + +private: + SocketWriter _writer; +}; + +} // namespace Ingen +} // namespace Socket + +#endif // INGEN_SOCKET_SOCKET_SERVER_HPP diff --git a/src/socket/SocketWriter.cpp b/src/socket/SocketWriter.cpp new file mode 100644 index 00000000..29bc018b --- /dev/null +++ b/src/socket/SocketWriter.cpp @@ -0,0 +1,74 @@ +/* + This file is part of Ingen. + Copyright 2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include "SocketWriter.hpp" + +namespace Ingen { +namespace Socket { + +static size_t +socket_sink(const void* buf, size_t len, void* stream) +{ + SocketWriter* writer = (SocketWriter*)stream; + return write(writer->fd(), buf, len); +} + +SocketWriter::SocketWriter(Shared::LV2URIMap& map, + Shared::URIs& uris, + const Raul::URI& uri, + SharedPtr sock) + : AtomWriter(map, uris, *this) + , _map(map) + , _sratom(sratom_new(&map.urid_map_feature()->urid_map)) + , _uri(uri) + , _socket(sock) +{ + // Use as base URI so e.g. will be a path + _base = serd_node_from_string(SERD_URI, (const uint8_t*)"path:"); + + serd_uri_parse(_base.buf, &_base_uri); + + _env = serd_env_new(&_base); + _writer = serd_writer_new( + SERD_TURTLE, + (SerdStyle)(SERD_STYLE_RESOLVED|SERD_STYLE_ABBREVIATED|SERD_STYLE_CURIED), + _env, + &_base_uri, + socket_sink, + this); + + sratom_set_sink(_sratom, + (const char*)_base.buf, + (SerdStatementSink)serd_writer_write_statement, + (SerdEndSink)serd_writer_end_anon, + _writer); +} + +SocketWriter::~SocketWriter() +{ + sratom_free(_sratom); +} + +void +SocketWriter::write(const LV2_Atom* msg) +{ + sratom_write(_sratom, &_map.urid_unmap_feature()->urid_unmap, 0, + NULL, NULL, msg->type, msg->size, LV2_ATOM_BODY(msg)); + serd_writer_finish(_writer); +} + +} // namespace Socket +} // namespace Ingen diff --git a/src/socket/SocketWriter.hpp b/src/socket/SocketWriter.hpp new file mode 100644 index 00000000..902538f6 --- /dev/null +++ b/src/socket/SocketWriter.hpp @@ -0,0 +1,65 @@ +/* + This file is part of Ingen. + Copyright 2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_SOCKET_SOCKET_WRITER_HPP +#define INGEN_SOCKET_SOCKET_WRITER_HPP + +#include + +#include "ingen/Interface.hpp" +#include "ingen/shared/AtomSink.hpp" +#include "ingen/shared/AtomWriter.hpp" +#include "raul/URI.hpp" +#include "raul/SharedPtr.hpp" +#include "sratom/sratom.h" + +#include "Socket.hpp" + +namespace Ingen { +namespace Socket { + +/** An Interface that writes Turtle messages to a socket. + */ +class SocketWriter : public Shared::AtomWriter, public Shared::AtomSink +{ +public: + SocketWriter(Shared::LV2URIMap& map, + Shared::URIs& uris, + const Raul::URI& uri, + SharedPtr sock); + + ~SocketWriter(); + + void write(const LV2_Atom* msg); + + int fd() { return _socket->fd(); } + Raul::URI uri() const { return _uri; } + +protected: + Shared::LV2URIMap& _map; + Sratom* _sratom; + SerdNode _base; + SerdURI _base_uri; + SerdEnv* _env; + SerdWriter* _writer; + Raul::URI _uri; + SharedPtr _socket; +}; + +} // namespace Socket +} // namespace Ingen + +#endif // INGEN_SOCKET_SOCKET_WRITER_HPP diff --git a/src/socket/ingen_socket_client.cpp b/src/socket/ingen_socket_client.cpp new file mode 100644 index 00000000..8b57683c --- /dev/null +++ b/src/socket/ingen_socket_client.cpp @@ -0,0 +1,57 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include "ingen/shared/Module.hpp" +#include "ingen/shared/World.hpp" +#include "raul/log.hpp" + +#include "Socket.hpp" +#include "SocketClient.hpp" + +static SharedPtr +new_socket_interface(Ingen::Shared::World* world, + const std::string& url, + SharedPtr respondee) +{ + SharedPtr sock( + new Ingen::Socket::Socket(Ingen::Socket::Socket::type_from_uri(url))); + if (!sock->connect(url)) { + return SharedPtr(); + } + Ingen::Socket::SocketClient* client = new Ingen::Socket::SocketClient( + *world, + url, + sock, + respondee); + return SharedPtr(client); +} + +struct IngenSocketClientModule : public Ingen::Shared::Module { + void load(Ingen::Shared::World* world) { + world->add_interface_factory("unix", &new_socket_interface); + world->add_interface_factory("tcp", &new_socket_interface); + } +}; + +extern "C" { + +Ingen::Shared::Module* +ingen_module_load() +{ + return new IngenSocketClientModule(); +} + +} // extern "C" diff --git a/src/socket/wscript b/src/socket/wscript index e351e0e4..f6705c37 100644 --- a/src/socket/wscript +++ b/src/socket/wscript @@ -4,12 +4,26 @@ from waflib.extras import autowaf as autowaf def build(bld): if bld.is_defined('HAVE_SOCKET'): obj = bld(features = 'cxx cxxshlib', - source = ['SocketReader.cpp', + source = ['Socket.cpp', 'SocketListener.cpp', + 'SocketReader.cpp', + 'SocketWriter.cpp', 'ingen_socket_server.cpp'], includes = ['.', '../..'], name = 'libingen_socket_server', target = 'ingen_socket_server', install_path = '${LIBDIR}', use = 'libingen_server') - autowaf.use_lib(bld, obj, 'RAUL LIBLO') + autowaf.use_lib(bld, obj, 'RAUL') + + obj = bld(features = 'cxx cxxshlib', + source = ['Socket.cpp', + 'SocketReader.cpp', + 'SocketWriter.cpp', + 'ingen_socket_client.cpp'], + includes = ['.', '../..'], + name = 'libingen_socket_client', + target = 'ingen_socket_client', + install_path = '${LIBDIR}', + use = 'libingen_server') + autowaf.use_lib(bld, obj, 'RAUL') -- cgit v1.2.1