diff options
-rw-r--r-- | ingen/EngineBase.hpp | 5 | ||||
-rw-r--r-- | ingen/Interface.hpp | 7 | ||||
-rw-r--r-- | ingen/client/ThreadedSigClientInterface.hpp | 33 | ||||
-rw-r--r-- | src/client/ThreadedSigClientInterface.cpp | 74 | ||||
-rw-r--r-- | src/client/wscript | 1 | ||||
-rw-r--r-- | src/gui/App.cpp | 2 | ||||
-rw-r--r-- | src/gui/ConnectWindow.cpp | 15 | ||||
-rw-r--r-- | src/ingen/main.cpp | 4 | ||||
-rw-r--r-- | src/server/ClientBroadcaster.cpp | 10 | ||||
-rw-r--r-- | src/server/ClientBroadcaster.hpp | 6 | ||||
-rw-r--r-- | src/server/Engine.cpp | 2 | ||||
-rw-r--r-- | src/server/Engine.hpp | 3 | ||||
-rw-r--r-- | src/server/EventWriter.cpp | 27 | ||||
-rw-r--r-- | src/server/EventWriter.hpp | 14 | ||||
-rw-r--r-- | src/server/ingen_lv2.cpp | 6 | ||||
-rw-r--r-- | src/socket/SocketClient.hpp | 8 | ||||
-rw-r--r-- | src/socket/SocketServer.hpp | 20 | ||||
-rw-r--r-- | src/socket/SocketWriter.cpp | 10 |
18 files changed, 120 insertions, 127 deletions
diff --git a/ingen/EngineBase.hpp b/ingen/EngineBase.hpp index df65c5d0..51af6de5 100644 --- a/ingen/EngineBase.hpp +++ b/ingen/EngineBase.hpp @@ -20,6 +20,7 @@ #include <stdint.h> #include "raul/URI.hpp" +#include "raul/SharedPtr.hpp" namespace Ingen { @@ -74,8 +75,8 @@ public: /** Register a client to receive updates about engine changes. */ - virtual void register_client(const Raul::URI& uri, - Interface* client) = 0; + virtual void register_client(const Raul::URI& uri, + SharedPtr<Interface> client) = 0; /** Unregister a client. diff --git a/ingen/Interface.hpp b/ingen/Interface.hpp index 0a3411a1..96a438a9 100644 --- a/ingen/Interface.hpp +++ b/ingen/Interface.hpp @@ -19,6 +19,7 @@ #include "ingen/Resource.hpp" #include "ingen/Status.hpp" +#include "raul/SharedPtr.hpp" namespace Raul { class Atom; @@ -40,6 +41,12 @@ public: virtual Raul::URI uri() const = 0; + virtual SharedPtr<Interface> respondee() const { + return SharedPtr<Interface>(); + } + + virtual void set_respondee(SharedPtr<Interface> respondee) {} + /** Begin an atomic bundle */ virtual void bundle_begin() = 0; diff --git a/ingen/client/ThreadedSigClientInterface.hpp b/ingen/client/ThreadedSigClientInterface.hpp index 096920bd..e8b46504 100644 --- a/ingen/client/ThreadedSigClientInterface.hpp +++ b/ingen/client/ThreadedSigClientInterface.hpp @@ -106,10 +106,39 @@ public: { push_sig(sigc::bind(property_change_slot, subject, key, value)); } /** Process all queued events - Called from GTK thread to emit signals. */ - bool emit_signals(); + bool emit_signals() { + // Process a limited number of events, to prevent locking the GTK + // thread indefinitely while processing continually arriving events + + size_t num_processed = 0; + while (!_sigs.empty() && num_processed++ < (_sigs.capacity() * 3 / 4)) { + Closure& ev = _sigs.front(); + ev(); + ev.disconnect(); + _sigs.pop(); + } + + _mutex.lock(); + _cond.broadcast(); + _mutex.unlock(); + + return true; + } private: - void push_sig(Closure ev); + void push_sig(Closure ev) { + bool success = false; + while (!success) { + success = _sigs.push(ev); + if (!success) { + Raul::warn << "Client event queue full. Waiting..." << std::endl; + _mutex.lock(); + _cond.wait(_mutex); + _mutex.unlock(); + Raul::warn << "Queue drained, continuing" << std::endl; + } + } + } Glib::Mutex _mutex; Glib::Cond _cond; diff --git a/src/client/ThreadedSigClientInterface.cpp b/src/client/ThreadedSigClientInterface.cpp deleted file mode 100644 index 2679724c..00000000 --- a/src/client/ThreadedSigClientInterface.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "raul/log.hpp" -#include "ingen/Patch.hpp" -#include "ingen/Plugin.hpp" -#include "ingen/Port.hpp" -#include "ingen/client/ThreadedSigClientInterface.hpp" - -using namespace std; -using namespace Raul; - -namespace Ingen { -namespace Client { - -/** Push an event (from the engine, ie 'new patch') on to the queue. - */ -void -ThreadedSigClientInterface::push_sig(Closure ev) -{ - bool success = false; - while (!success) { - success = _sigs.push(ev); - if (!success) { - warn << "Client event queue full. Waiting..." << endl; - _mutex.lock(); - _cond.wait(_mutex); - _mutex.unlock(); - warn << "Queue drained, continuing" << endl; - } - } -} - -/** Process all queued events that came from the OSC thread. - * - * This function should be called from the Gtk thread to emit signals and cause - * the connected methods to execute. - */ -bool -ThreadedSigClientInterface::emit_signals() -{ - // Process a limited number of events, to prevent locking the GTK - // thread indefinitely while processing continually arriving events - - size_t num_processed = 0; - while (!_sigs.empty() && num_processed++ < (_sigs.capacity() * 3 / 4)) { - Closure& ev = _sigs.front(); - ev(); - ev.disconnect(); - _sigs.pop(); - } - - _mutex.lock(); - _cond.broadcast(); - _mutex.unlock(); - - return true; -} - -} // namespace Client -} // namespace Ingen diff --git a/src/client/wscript b/src/client/wscript index 453bc122..c3830506 100644 --- a/src/client/wscript +++ b/src/client/wscript @@ -19,6 +19,5 @@ def build(bld): PluginModel.cpp PluginUI.cpp PortModel.cpp - ThreadedSigClientInterface.cpp ingen_client.cpp ''' diff --git a/src/gui/App.cpp b/src/gui/App.cpp index 4d108c71..525a5885 100644 --- a/src/gui/App.cpp +++ b/src/gui/App.cpp @@ -162,7 +162,7 @@ App::attach(SharedPtr<SigClientInterface> client) assert(!_loader); if (_world->local_engine()) { - _world->local_engine()->register_client(client->uri(), client.get()); + _world->local_engine()->register_client(client->uri(), client); } _client = client; diff --git a/src/gui/ConnectWindow.cpp b/src/gui/ConnectWindow.cpp index 4d26e61c..b08efb85 100644 --- a/src/gui/ConnectWindow.cpp +++ b/src/gui/ConnectWindow.cpp @@ -157,13 +157,19 @@ ConnectWindow::connect(bool existing) uri = user_uri; } - if (existing) + if (existing) { uri = world->engine()->uri().str(); + } - // Create client-side listener - SharedPtr<ThreadedSigClientInterface> tsci(new ThreadedSigClientInterface(1024)); + SharedPtr<ThreadedSigClientInterface> tsci; + if (world->engine()) { + tsci = PtrCast<ThreadedSigClientInterface>( + world->engine()->respondee()); + } - world->set_engine(world->interface(uri, tsci)); + if (!tsci) { + world->set_engine(world->interface(uri, tsci)); + } _app->attach(tsci); _app->register_callbacks(); @@ -203,6 +209,7 @@ ConnectWindow::connect(bool existing) SharedPtr<SigClientInterface> client(new SigClientInterface()); + world->engine()->set_respondee(client); _app->attach(client); _app->register_callbacks(); diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp index 401986ea..2878ebc7 100644 --- a/src/ingen/main.cpp +++ b/src/ingen/main.cpp @@ -44,7 +44,7 @@ #include "ingen/shared/Configuration.hpp" #include "ingen/shared/World.hpp" #include "ingen/shared/runtime_paths.hpp" -#include "ingen/client/SigClientInterface.hpp" +#include "ingen/client/ThreadedSigClientInterface.hpp" #ifdef WITH_BINDINGS #include "bindings/ingen_bindings.hpp" #endif @@ -140,7 +140,7 @@ main(int argc, char** argv) "Unable to load socket client module"); #endif const char* const uri = conf.option("connect").get_string(); - SharedPtr<Interface> client(new Client::SigClientInterface()); + SharedPtr<Interface> client(new Client::ThreadedSigClientInterface(1024)); ingen_try((engine_interface = world->interface(uri, client)), (string("Unable to create interface to `") + uri + "'").c_str()); } diff --git a/src/server/ClientBroadcaster.cpp b/src/server/ClientBroadcaster.cpp index 46e6fb2f..42aad778 100644 --- a/src/server/ClientBroadcaster.cpp +++ b/src/server/ClientBroadcaster.cpp @@ -36,7 +36,7 @@ namespace Server { /** Register a client to receive messages over the notification band. */ void -ClientBroadcaster::register_client(const URI& uri, Interface* client) +ClientBroadcaster::register_client(const URI& uri, SharedPtr<Interface> client) { Glib::Mutex::Lock lock(_clients_mutex); LOG(info) << "Registered client: " << uri << endl; @@ -65,7 +65,7 @@ ClientBroadcaster::unregister_client(const URI& uri) /** Looks up the client with the given source @a uri (which is used as the * unique identifier for registered clients). */ -Interface* +SharedPtr<Interface> ClientBroadcaster::client(const URI& uri) { Glib::Mutex::Lock lock(_clients_mutex); @@ -73,7 +73,7 @@ ClientBroadcaster::client(const URI& uri) if (i != _clients.end()) { return (*i).second; } else { - return NULL; + return SharedPtr<Interface>(); } } @@ -82,7 +82,7 @@ ClientBroadcaster::send_plugins(const NodeFactory::Plugins& plugins) { Glib::Mutex::Lock lock(_clients_mutex); for (Clients::const_iterator c = _clients.begin(); c != _clients.end(); ++c) { - send_plugins_to((*c).second, plugins); + send_plugins_to((*c).second.get(), plugins); } } @@ -109,7 +109,7 @@ ClientBroadcaster::send_object(const GraphObjectImpl* o, bool recursive) { Glib::Mutex::Lock lock(_clients_mutex); for (Clients::const_iterator i = _clients.begin(); i != _clients.end(); ++i) { - ObjectSender::send_object((*i).second, o, recursive); + ObjectSender::send_object((*i).second.get(), o, recursive); } } diff --git a/src/server/ClientBroadcaster.hpp b/src/server/ClientBroadcaster.hpp index c095b960..5eea1093 100644 --- a/src/server/ClientBroadcaster.hpp +++ b/src/server/ClientBroadcaster.hpp @@ -49,10 +49,10 @@ class ConnectionImpl; class ClientBroadcaster : public Interface { public: - void register_client(const Raul::URI& uri, Interface* client); + void register_client(const Raul::URI& uri, SharedPtr<Interface> client); bool unregister_client(const Raul::URI& uri); - Interface* client(const Raul::URI& uri); + SharedPtr<Interface> client(const Raul::URI& uri); void send_plugins(const NodeFactory::Plugins& plugin_list); void send_plugins_to(Interface*, const NodeFactory::Plugins& plugin_list); @@ -118,7 +118,7 @@ public: void error(const std::string& msg) { BROADCAST(error, msg); } private: - typedef std::map<Raul::URI, Interface*> Clients; + typedef std::map< Raul::URI, SharedPtr<Interface> > Clients; Glib::Mutex _clients_mutex; Clients _clients; diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index f2f088f5..f3b5e476 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -278,7 +278,7 @@ Engine::process_events(ProcessContext& context) } void -Engine::register_client(const Raul::URI& uri, Interface* client) +Engine::register_client(const Raul::URI& uri, SharedPtr<Interface> client) { _broadcaster->register_client(uri, client); } diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 5111f52c..9600e8a2 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -69,7 +69,8 @@ public: virtual void run(uint32_t sample_count); virtual void quit(); virtual bool main_iteration(); - virtual void register_client(const Raul::URI& uri, Interface* client); + virtual void register_client(const Raul::URI& uri, + SharedPtr<Interface> client); virtual bool unregister_client(const Raul::URI& uri); void set_driver(SharedPtr<Driver> driver); diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp index 19a738b0..4ec0756b 100644 --- a/src/server/EventWriter.cpp +++ b/src/server/EventWriter.cpp @@ -36,7 +36,6 @@ namespace Server { EventWriter::EventWriter(Engine& engine) : _engine(engine) - , _respondee(NULL) , _request_id(-1) { } @@ -59,10 +58,6 @@ EventWriter::now() const void EventWriter::set_response_id(int32_t id) { - if (!_respondee) { // Kludge - _respondee = _engine.broadcaster()->client( - "http://drobilla.net/ns/ingen#internal"); - } _request_id = id; } @@ -72,7 +67,7 @@ EventWriter::put(const URI& uri, const Resource::Graph ctx) { _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), true, ctx, uri, properties)); } @@ -82,7 +77,7 @@ EventWriter::delta(const URI& uri, const Resource::Properties& add) { _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } @@ -91,7 +86,7 @@ EventWriter::move(const Path& old_path, const Path& new_path) { _engine.enqueue_event( - new Events::Move(_engine, _respondee, _request_id, now(), + new Events::Move(_engine, _respondee.get(), _request_id, now(), old_path, new_path)); } @@ -105,7 +100,7 @@ EventWriter::del(const URI& uri) _engine.quit(); } else { _engine.enqueue_event( - new Events::Delete(_engine, _respondee, _request_id, now(), uri)); + new Events::Delete(_engine, _respondee.get(), _request_id, now(), uri)); } } @@ -114,7 +109,7 @@ EventWriter::connect(const Path& tail_path, const Path& head_path) { _engine.enqueue_event( - new Events::Connect(_engine, _respondee, _request_id, now(), + new Events::Connect(_engine, _respondee.get(), _request_id, now(), tail_path, head_path)); } @@ -130,7 +125,7 @@ EventWriter::disconnect(const Path& src, } _engine.enqueue_event( - new Events::Disconnect(_engine, _respondee, _request_id, now(), + new Events::Disconnect(_engine, _respondee.get(), _request_id, now(), src, dst)); } @@ -139,7 +134,7 @@ EventWriter::disconnect_all(const Path& patch_path, const Path& path) { _engine.enqueue_event( - new Events::DisconnectAll(_engine, _respondee, _request_id, now(), + new Events::DisconnectAll(_engine, _respondee.get(), _request_id, now(), patch_path, path)); } @@ -153,10 +148,10 @@ EventWriter::set_property(const URI& uri, if (value.get_bool()) { _engine.activate(); _engine.enqueue_event( - new Events::Ping(_engine, _respondee, _request_id, now())); + new Events::Ping(_engine, _respondee.get(), _request_id, now())); } else { _engine.enqueue_event( - new Events::Deactivate(_engine, _respondee, _request_id, now())); + new Events::Deactivate(_engine, _respondee.get(), _request_id, now())); } } else { Resource::Properties remove; @@ -164,7 +159,7 @@ EventWriter::set_property(const URI& uri, Resource::Properties add; add.insert(make_pair(predicate, value)); _engine.enqueue_event( - new Events::SetMetadata(_engine, _respondee, _request_id, now(), + new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } } @@ -173,7 +168,7 @@ void EventWriter::get(const URI& uri) { _engine.enqueue_event( - new Events::Get(_engine, _respondee, _request_id, now(), uri)); + new Events::Get(_engine, _respondee.get(), _request_id, now(), uri)); } } // namespace Server diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp index 9e334f98..2a080be4 100644 --- a/src/server/EventWriter.hpp +++ b/src/server/EventWriter.hpp @@ -42,7 +42,13 @@ public: Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } - void set_respondee(Interface* iface) { _respondee = iface; } + virtual SharedPtr<Interface> respondee() const { + return _respondee; + } + + virtual void set_respondee(SharedPtr<Interface> respondee) { + _respondee = respondee; + } virtual void set_response_id(int32_t id); @@ -83,9 +89,9 @@ public: virtual void error(const std::string& msg) {} ///< N/A protected: - Engine& _engine; - Interface* _respondee; - int32_t _request_id; + Engine& _engine; + SharedPtr<Interface> _respondee; + int32_t _request_id; private: SampleCount now() const; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 733f7a52..5e21f679 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -438,9 +438,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor, LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate); engine->set_driver(SharedPtr<Ingen::Server::Driver>(driver)); - interface->set_respondee(&driver->writer()); - engine->register_client("http://drobilla.net/ns/ingen#internal", - &driver->writer()); + SharedPtr<Interface> client(&driver->writer(), NullDeleter<Interface>); + interface->set_respondee(client); + engine->register_client("http://drobilla.net/ns/ingen#internal", client); engine->activate(); Server::ThreadManager::single_threaded = true; diff --git a/src/socket/SocketClient.hpp b/src/socket/SocketClient.hpp index 22e6eeb4..d75288e0 100644 --- a/src/socket/SocketClient.hpp +++ b/src/socket/SocketClient.hpp @@ -41,6 +41,14 @@ public: _reader.start(); } + virtual SharedPtr<Interface> respondee() const { + return _respondee; + } + + virtual void set_respondee(SharedPtr<Interface> respondee) { + _respondee = respondee; + } + private: SharedPtr<Interface> _respondee; SocketReader _reader; diff --git a/src/socket/SocketServer.hpp b/src/socket/SocketServer.hpp index 8ea0f445..55434030 100644 --- a/src/socket/SocketServer.hpp +++ b/src/socket/SocketServer.hpp @@ -34,17 +34,23 @@ public: SharedPtr<Socket> sock) : Server::EventWriter(engine) , SocketReader(world, *this, sock) - , _writer(*world.lv2_uri_map().get(), - *world.uris().get(), - sock->uri(), - sock) + , _engine(engine) + , _writer(new SocketWriter(*world.lv2_uri_map().get(), + *world.uris().get(), + sock->uri(), + sock)) { - set_respondee(&_writer); - engine.register_client(sock->uri(), &_writer); + set_respondee(_writer); + engine.register_client(_writer->uri(), _writer); + } + + ~SocketServer() { + _engine.unregister_client(_writer->uri()); } private: - SocketWriter _writer; + Server::Engine& _engine; + SharedPtr<SocketWriter> _writer; }; } // namespace Ingen diff --git a/src/socket/SocketWriter.cpp b/src/socket/SocketWriter.cpp index 29bc018b..808d62cf 100644 --- a/src/socket/SocketWriter.cpp +++ b/src/socket/SocketWriter.cpp @@ -14,6 +14,10 @@ along with Ingen. If not, see <http://www.gnu.org/licenses/>. */ +#include <errno.h> +#include <sys/types.h> +#include <sys/socket.h> + #include "SocketWriter.hpp" namespace Ingen { @@ -23,7 +27,11 @@ static size_t socket_sink(const void* buf, size_t len, void* stream) { SocketWriter* writer = (SocketWriter*)stream; - return write(writer->fd(), buf, len); + ssize_t ret = send(writer->fd(), buf, len, MSG_NOSIGNAL); + if (ret < 0) { + return 0; + } + return ret; } SocketWriter::SocketWriter(Shared::LV2URIMap& map, |