From da468f24388d7f0f574c6e4dd4022e05d47a9db2 Mon Sep 17 00:00:00 2001 From: David Robillard Date: Thu, 10 May 2012 03:23:11 +0000 Subject: Use SharedPtr references to Interfaces to keep things sane. Fix double register when using GUI with a remote engine. Avoid signal when writing to dead socket by using send with MSG_NOSIGNAL. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4336 a436a847-0d15-0410-975c-d299462d15a1 --- src/client/ThreadedSigClientInterface.cpp | 74 ------------------------------- src/client/wscript | 1 - src/gui/App.cpp | 2 +- src/gui/ConnectWindow.cpp | 15 +++++-- src/ingen/main.cpp | 4 +- src/server/ClientBroadcaster.cpp | 10 ++--- src/server/ClientBroadcaster.hpp | 6 +-- src/server/Engine.cpp | 2 +- src/server/Engine.hpp | 3 +- src/server/EventWriter.cpp | 27 +++++------ src/server/EventWriter.hpp | 14 ++++-- src/server/ingen_lv2.cpp | 6 +-- src/socket/SocketClient.hpp | 8 ++++ src/socket/SocketServer.hpp | 20 ++++++--- src/socket/SocketWriter.cpp | 10 ++++- 15 files changed, 79 insertions(+), 123 deletions(-) delete mode 100644 src/client/ThreadedSigClientInterface.cpp (limited to 'src') diff --git a/src/client/ThreadedSigClientInterface.cpp b/src/client/ThreadedSigClientInterface.cpp deleted file mode 100644 index 2679724c..00000000 --- a/src/client/ThreadedSigClientInterface.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#include "raul/log.hpp" -#include "ingen/Patch.hpp" -#include "ingen/Plugin.hpp" -#include "ingen/Port.hpp" -#include "ingen/client/ThreadedSigClientInterface.hpp" - -using namespace std; -using namespace Raul; - -namespace Ingen { -namespace Client { - -/** Push an event (from the engine, ie 'new patch') on to the queue. - */ -void -ThreadedSigClientInterface::push_sig(Closure ev) -{ - bool success = false; - while (!success) { - success = _sigs.push(ev); - if (!success) { - warn << "Client event queue full. Waiting..." << endl; - _mutex.lock(); - _cond.wait(_mutex); - _mutex.unlock(); - warn << "Queue drained, continuing" << endl; - } - } -} - -/** Process all queued events that came from the OSC thread. - * - * This function should be called from the Gtk thread to emit signals and cause - * the connected methods to execute. - */ -bool -ThreadedSigClientInterface::emit_signals() -{ - // Process a limited number of events, to prevent locking the GTK - // thread indefinitely while processing continually arriving events - - size_t num_processed = 0; - while (!_sigs.empty() && num_processed++ < (_sigs.capacity() * 3 / 4)) { - Closure& ev = _sigs.front(); - ev(); - ev.disconnect(); - _sigs.pop(); - } - - _mutex.lock(); - _cond.broadcast(); - _mutex.unlock(); - - return true; -} - -} // namespace Client -} // namespace Ingen diff --git a/src/client/wscript b/src/client/wscript index 453bc122..c3830506 100644 --- a/src/client/wscript +++ b/src/client/wscript @@ -19,6 +19,5 @@ def build(bld): PluginModel.cpp PluginUI.cpp PortModel.cpp - ThreadedSigClientInterface.cpp ingen_client.cpp ''' diff --git a/src/gui/App.cpp b/src/gui/App.cpp index 4d108c71..525a5885 100644 --- a/src/gui/App.cpp +++ b/src/gui/App.cpp @@ -162,7 +162,7 @@ App::attach(SharedPtr client) assert(!_loader); if (_world->local_engine()) { - _world->local_engine()->register_client(client->uri(), client.get()); + _world->local_engine()->register_client(client->uri(), client); } _client = client; diff --git a/src/gui/ConnectWindow.cpp b/src/gui/ConnectWindow.cpp index 4d26e61c..b08efb85 100644 --- a/src/gui/ConnectWindow.cpp +++ b/src/gui/ConnectWindow.cpp @@ -157,13 +157,19 @@ ConnectWindow::connect(bool existing) uri = user_uri; } - if (existing) + if (existing) { uri = world->engine()->uri().str(); + } - // Create client-side listener - SharedPtr tsci(new ThreadedSigClientInterface(1024)); + SharedPtr tsci; + if (world->engine()) { + tsci = PtrCast( + world->engine()->respondee()); + } - world->set_engine(world->interface(uri, tsci)); + if (!tsci) { + world->set_engine(world->interface(uri, tsci)); + } _app->attach(tsci); _app->register_callbacks(); @@ -203,6 +209,7 @@ ConnectWindow::connect(bool existing) SharedPtr client(new SigClientInterface()); + world->engine()->set_respondee(client); _app->attach(client); _app->register_callbacks(); diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp index 401986ea..2878ebc7 100644 --- a/src/ingen/main.cpp +++ b/src/ingen/main.cpp @@ -44,7 +44,7 @@ #include "ingen/shared/Configuration.hpp" #include "ingen/shared/World.hpp" #include "ingen/shared/runtime_paths.hpp" -#include "ingen/client/SigClientInterface.hpp" +#include "ingen/client/ThreadedSigClientInterface.hpp" #ifdef WITH_BINDINGS #include "bindings/ingen_bindings.hpp" #endif @@ -140,7 +140,7 @@ main(int argc, char** argv) "Unable to load socket client module"); #endif const char* const uri = conf.option("connect").get_string(); - SharedPtr client(new Client::SigClientInterface()); + SharedPtr client(new Client::ThreadedSigClientInterface(1024)); ingen_try((engine_interface = world->interface(uri, client)), (string("Unable to create interface to `") + uri + "'").c_str()); } diff --git a/src/server/ClientBroadcaster.cpp b/src/server/ClientBroadcaster.cpp index 46e6fb2f..42aad778 100644 --- a/src/server/ClientBroadcaster.cpp +++ b/src/server/ClientBroadcaster.cpp @@ -36,7 +36,7 @@ namespace Server { /** Register a client to receive messages over the notification band. */ void -ClientBroadcaster::register_client(const URI& uri, Interface* client) +ClientBroadcaster::register_client(const URI& uri, SharedPtr client) { Glib::Mutex::Lock lock(_clients_mutex); LOG(info) << "Registered client: " << uri << endl; @@ -65,7 +65,7 @@ ClientBroadcaster::unregister_client(const URI& uri) /** Looks up the client with the given source @a uri (which is used as the * unique identifier for registered clients). */ -Interface* +SharedPtr ClientBroadcaster::client(const URI& uri) { Glib::Mutex::Lock lock(_clients_mutex); @@ -73,7 +73,7 @@ ClientBroadcaster::client(const URI& uri) if (i != _clients.end()) { return (*i).second; } else { - return NULL; + return SharedPtr(); } } @@ -82,7 +82,7 @@ ClientBroadcaster::send_plugins(const NodeFactory::Plugins& plugins) { Glib::Mutex::Lock lock(_clients_mutex); for (Clients::const_iterator c = _clients.begin(); c != _clients.end(); ++c) { - send_plugins_to((*c).second, plugins); + send_plugins_to((*c).second.get(), plugins); } } @@ -109,7 +109,7 @@ ClientBroadcaster::send_object(const GraphObjectImpl* o, bool recursive) { Glib::Mutex::Lock lock(_clients_mutex); for (Clients::const_iterator i = _clients.begin(); i != _clients.end(); ++i) { - ObjectSender::send_object((*i).second, o, recursive); + ObjectSender::send_object((*i).second.get(), o, recursive); } } diff --git a/src/server/ClientBroadcaster.hpp b/src/server/ClientBroadcaster.hpp index c095b960..5eea1093 100644 --- a/src/server/ClientBroadcaster.hpp +++ b/src/server/ClientBroadcaster.hpp @@ -49,10 +49,10 @@ class ConnectionImpl; class ClientBroadcaster : public Interface { public: - void register_client(const Raul::URI& uri, Interface* client); + void register_client(const Raul::URI& uri, SharedPtr client); bool unregister_client(const Raul::URI& uri); - Interface* client(const Raul::URI& uri); + SharedPtr client(const Raul::URI& uri); void send_plugins(const NodeFactory::Plugins& plugin_list); void send_plugins_to(Interface*, const NodeFactory::Plugins& plugin_list); @@ -118,7 +118,7 @@ public: void error(const std::string& msg) { BROADCAST(error, msg); } private: - typedef std::map Clients; + typedef std::map< Raul::URI, SharedPtr > Clients; Glib::Mutex _clients_mutex; Clients _clients; diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index f2f088f5..f3b5e476 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -278,7 +278,7 @@ Engine::process_events(ProcessContext& context) } void -Engine::register_client(const Raul::URI& uri, Interface* client) +Engine::register_client(const Raul::URI& uri, SharedPtr client) { _broadcaster->register_client(uri, client); } diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 5111f52c..9600e8a2 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -69,7 +69,8 @@ public: virtual void run(uint32_t sample_count); virtual void quit(); virtual bool main_iteration(); - virtual void register_client(const Raul::URI& uri, Interface* client); + virtual void register_client(const Raul::URI& uri, + SharedPtr client); virtual bool unregister_client(const Raul::URI& uri); void set_driver(SharedPtr driver); diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp index 19a738b0..4ec0756b 100644 --- a/src/server/EventWriter.cpp +++ b/src/server/EventWriter.cpp @@ -36,7 +36,6 @@ namespace Server { EventWriter::EventWriter(Engine& engine) : _engine(engine) - , _respondee(NULL) , _request_id(-1) { } @@ -59,10 +58,6 @@ EventWriter::now() const void EventWriter::set_response_id(int32_t id) { - if (!_respondee) { // Kludge - _respondee = _engine.broadcaster()->client( - "http://drobilla.net/ns/ingen#internal"); - } _request_id = id; } @@ -72,7 +67,7 @@ EventWriter::put(const URI& uri, const Resource::Graph ctx) { _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), true, ctx, uri, properties)); } @@ -82,7 +77,7 @@ EventWriter::delta(const URI& uri, const Resource::Properties& add) { _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } @@ -91,7 +86,7 @@ EventWriter::move(const Path& old_path, const Path& new_path) { _engine.enqueue_event( - new Events::Move(_engine, _respondee, _request_id, now(), + new Events::Move(_engine, _respondee.get(), _request_id, now(), old_path, new_path)); } @@ -105,7 +100,7 @@ EventWriter::del(const URI& uri) _engine.quit(); } else { _engine.enqueue_event( - new Events::Delete(_engine, _respondee, _request_id, now(), uri)); + new Events::Delete(_engine, _respondee.get(), _request_id, now(), uri)); } } @@ -114,7 +109,7 @@ EventWriter::connect(const Path& tail_path, const Path& head_path) { _engine.enqueue_event( - new Events::Connect(_engine, _respondee, _request_id, now(), + new Events::Connect(_engine, _respondee.get(), _request_id, now(), tail_path, head_path)); } @@ -130,7 +125,7 @@ EventWriter::disconnect(const Path& src, } _engine.enqueue_event( - new Events::Disconnect(_engine, _respondee, _request_id, now(), + new Events::Disconnect(_engine, _respondee.get(), _request_id, now(), src, dst)); } @@ -139,7 +134,7 @@ EventWriter::disconnect_all(const Path& patch_path, const Path& path) { _engine.enqueue_event( - new Events::DisconnectAll(_engine, _respondee, _request_id, now(), + new Events::DisconnectAll(_engine, _respondee.get(), _request_id, now(), patch_path, path)); } @@ -153,10 +148,10 @@ EventWriter::set_property(const URI& uri, if (value.get_bool()) { _engine.activate(); _engine.enqueue_event( - new Events::Ping(_engine, _respondee, _request_id, now())); + new Events::Ping(_engine, _respondee.get(), _request_id, now())); } else { _engine.enqueue_event( - new Events::Deactivate(_engine, _respondee, _request_id, now())); + new Events::Deactivate(_engine, _respondee.get(), _request_id, now())); } } else { Resource::Properties remove; @@ -164,7 +159,7 @@ EventWriter::set_property(const URI& uri, Resource::Properties add; add.insert(make_pair(predicate, value)); _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } } @@ -173,7 +168,7 @@ void EventWriter::get(const URI& uri) { _engine.enqueue_event( - new Events::Get(_engine, _respondee, _request_id, now(), uri)); + new Events::Get(_engine, _respondee.get(), _request_id, now(), uri)); } } // namespace Server diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp index 9e334f98..2a080be4 100644 --- a/src/server/EventWriter.hpp +++ b/src/server/EventWriter.hpp @@ -42,7 +42,13 @@ public: Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } - void set_respondee(Interface* iface) { _respondee = iface; } + virtual SharedPtr respondee() const { + return _respondee; + } + + virtual void set_respondee(SharedPtr respondee) { + _respondee = respondee; + } virtual void set_response_id(int32_t id); @@ -83,9 +89,9 @@ public: virtual void error(const std::string& msg) {} ///< N/A protected: - Engine& _engine; - Interface* _respondee; - int32_t _request_id; + Engine& _engine; + SharedPtr _respondee; + int32_t _request_id; private: SampleCount now() const; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 733f7a52..5e21f679 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -438,9 +438,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor, LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate); engine->set_driver(SharedPtr(driver)); - interface->set_respondee(&driver->writer()); - engine->register_client("http://drobilla.net/ns/ingen#internal", - &driver->writer()); + SharedPtr client(&driver->writer(), NullDeleter); + interface->set_respondee(client); + engine->register_client("http://drobilla.net/ns/ingen#internal", client); engine->activate(); Server::ThreadManager::single_threaded = true; diff --git a/src/socket/SocketClient.hpp b/src/socket/SocketClient.hpp index 22e6eeb4..d75288e0 100644 --- a/src/socket/SocketClient.hpp +++ b/src/socket/SocketClient.hpp @@ -41,6 +41,14 @@ public: _reader.start(); } + virtual SharedPtr respondee() const { + return _respondee; + } + + virtual void set_respondee(SharedPtr respondee) { + _respondee = respondee; + } + private: SharedPtr _respondee; SocketReader _reader; diff --git a/src/socket/SocketServer.hpp b/src/socket/SocketServer.hpp index 8ea0f445..55434030 100644 --- a/src/socket/SocketServer.hpp +++ b/src/socket/SocketServer.hpp @@ -34,17 +34,23 @@ public: SharedPtr sock) : Server::EventWriter(engine) , SocketReader(world, *this, sock) - , _writer(*world.lv2_uri_map().get(), - *world.uris().get(), - sock->uri(), - sock) + , _engine(engine) + , _writer(new SocketWriter(*world.lv2_uri_map().get(), + *world.uris().get(), + sock->uri(), + sock)) { - set_respondee(&_writer); - engine.register_client(sock->uri(), &_writer); + set_respondee(_writer); + engine.register_client(_writer->uri(), _writer); + } + + ~SocketServer() { + _engine.unregister_client(_writer->uri()); } private: - SocketWriter _writer; + Server::Engine& _engine; + SharedPtr _writer; }; } // namespace Ingen diff --git a/src/socket/SocketWriter.cpp b/src/socket/SocketWriter.cpp index 29bc018b..808d62cf 100644 --- a/src/socket/SocketWriter.cpp +++ b/src/socket/SocketWriter.cpp @@ -14,6 +14,10 @@ along with Ingen. If not, see . */ +#include +#include +#include + #include "SocketWriter.hpp" namespace Ingen { @@ -23,7 +27,11 @@ static size_t socket_sink(const void* buf, size_t len, void* stream) { SocketWriter* writer = (SocketWriter*)stream; - return write(writer->fd(), buf, len); + ssize_t ret = send(writer->fd(), buf, len, MSG_NOSIGNAL); + if (ret < 0) { + return 0; + } + return ret; } SocketWriter::SocketWriter(Shared::LV2URIMap& map, -- cgit v1.2.1