summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ingen/EngineBase.hpp5
-rw-r--r--ingen/Interface.hpp7
-rw-r--r--ingen/client/ThreadedSigClientInterface.hpp33
-rw-r--r--src/client/ThreadedSigClientInterface.cpp74
-rw-r--r--src/client/wscript1
-rw-r--r--src/gui/App.cpp2
-rw-r--r--src/gui/ConnectWindow.cpp15
-rw-r--r--src/ingen/main.cpp4
-rw-r--r--src/server/ClientBroadcaster.cpp10
-rw-r--r--src/server/ClientBroadcaster.hpp6
-rw-r--r--src/server/Engine.cpp2
-rw-r--r--src/server/Engine.hpp3
-rw-r--r--src/server/EventWriter.cpp27
-rw-r--r--src/server/EventWriter.hpp14
-rw-r--r--src/server/ingen_lv2.cpp6
-rw-r--r--src/socket/SocketClient.hpp8
-rw-r--r--src/socket/SocketServer.hpp20
-rw-r--r--src/socket/SocketWriter.cpp10
18 files changed, 120 insertions, 127 deletions
diff --git a/ingen/EngineBase.hpp b/ingen/EngineBase.hpp
index df65c5d0..51af6de5 100644
--- a/ingen/EngineBase.hpp
+++ b/ingen/EngineBase.hpp
@@ -20,6 +20,7 @@
#include <stdint.h>
#include "raul/URI.hpp"
+#include "raul/SharedPtr.hpp"
namespace Ingen {
@@ -74,8 +75,8 @@ public:
/**
Register a client to receive updates about engine changes.
*/
- virtual void register_client(const Raul::URI& uri,
- Interface* client) = 0;
+ virtual void register_client(const Raul::URI& uri,
+ SharedPtr<Interface> client) = 0;
/**
Unregister a client.
diff --git a/ingen/Interface.hpp b/ingen/Interface.hpp
index 0a3411a1..96a438a9 100644
--- a/ingen/Interface.hpp
+++ b/ingen/Interface.hpp
@@ -19,6 +19,7 @@
#include "ingen/Resource.hpp"
#include "ingen/Status.hpp"
+#include "raul/SharedPtr.hpp"
namespace Raul {
class Atom;
@@ -40,6 +41,12 @@ public:
virtual Raul::URI uri() const = 0;
+ virtual SharedPtr<Interface> respondee() const {
+ return SharedPtr<Interface>();
+ }
+
+ virtual void set_respondee(SharedPtr<Interface> respondee) {}
+
/** Begin an atomic bundle */
virtual void bundle_begin() = 0;
diff --git a/ingen/client/ThreadedSigClientInterface.hpp b/ingen/client/ThreadedSigClientInterface.hpp
index 096920bd..e8b46504 100644
--- a/ingen/client/ThreadedSigClientInterface.hpp
+++ b/ingen/client/ThreadedSigClientInterface.hpp
@@ -106,10 +106,39 @@ public:
{ push_sig(sigc::bind(property_change_slot, subject, key, value)); }
/** Process all queued events - Called from GTK thread to emit signals. */
- bool emit_signals();
+ bool emit_signals() {
+ // Process a limited number of events, to prevent locking the GTK
+ // thread indefinitely while processing continually arriving events
+
+ size_t num_processed = 0;
+ while (!_sigs.empty() && num_processed++ < (_sigs.capacity() * 3 / 4)) {
+ Closure& ev = _sigs.front();
+ ev();
+ ev.disconnect();
+ _sigs.pop();
+ }
+
+ _mutex.lock();
+ _cond.broadcast();
+ _mutex.unlock();
+
+ return true;
+ }
private:
- void push_sig(Closure ev);
+ void push_sig(Closure ev) {
+ bool success = false;
+ while (!success) {
+ success = _sigs.push(ev);
+ if (!success) {
+ Raul::warn << "Client event queue full. Waiting..." << std::endl;
+ _mutex.lock();
+ _cond.wait(_mutex);
+ _mutex.unlock();
+ Raul::warn << "Queue drained, continuing" << std::endl;
+ }
+ }
+ }
Glib::Mutex _mutex;
Glib::Cond _cond;
diff --git a/src/client/ThreadedSigClientInterface.cpp b/src/client/ThreadedSigClientInterface.cpp
deleted file mode 100644
index 2679724c..00000000
--- a/src/client/ThreadedSigClientInterface.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2007-2012 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "raul/log.hpp"
-#include "ingen/Patch.hpp"
-#include "ingen/Plugin.hpp"
-#include "ingen/Port.hpp"
-#include "ingen/client/ThreadedSigClientInterface.hpp"
-
-using namespace std;
-using namespace Raul;
-
-namespace Ingen {
-namespace Client {
-
-/** Push an event (from the engine, ie 'new patch') on to the queue.
- */
-void
-ThreadedSigClientInterface::push_sig(Closure ev)
-{
- bool success = false;
- while (!success) {
- success = _sigs.push(ev);
- if (!success) {
- warn << "Client event queue full. Waiting..." << endl;
- _mutex.lock();
- _cond.wait(_mutex);
- _mutex.unlock();
- warn << "Queue drained, continuing" << endl;
- }
- }
-}
-
-/** Process all queued events that came from the OSC thread.
- *
- * This function should be called from the Gtk thread to emit signals and cause
- * the connected methods to execute.
- */
-bool
-ThreadedSigClientInterface::emit_signals()
-{
- // Process a limited number of events, to prevent locking the GTK
- // thread indefinitely while processing continually arriving events
-
- size_t num_processed = 0;
- while (!_sigs.empty() && num_processed++ < (_sigs.capacity() * 3 / 4)) {
- Closure& ev = _sigs.front();
- ev();
- ev.disconnect();
- _sigs.pop();
- }
-
- _mutex.lock();
- _cond.broadcast();
- _mutex.unlock();
-
- return true;
-}
-
-} // namespace Client
-} // namespace Ingen
diff --git a/src/client/wscript b/src/client/wscript
index 453bc122..c3830506 100644
--- a/src/client/wscript
+++ b/src/client/wscript
@@ -19,6 +19,5 @@ def build(bld):
PluginModel.cpp
PluginUI.cpp
PortModel.cpp
- ThreadedSigClientInterface.cpp
ingen_client.cpp
'''
diff --git a/src/gui/App.cpp b/src/gui/App.cpp
index 4d108c71..525a5885 100644
--- a/src/gui/App.cpp
+++ b/src/gui/App.cpp
@@ -162,7 +162,7 @@ App::attach(SharedPtr<SigClientInterface> client)
assert(!_loader);
if (_world->local_engine()) {
- _world->local_engine()->register_client(client->uri(), client.get());
+ _world->local_engine()->register_client(client->uri(), client);
}
_client = client;
diff --git a/src/gui/ConnectWindow.cpp b/src/gui/ConnectWindow.cpp
index 4d26e61c..b08efb85 100644
--- a/src/gui/ConnectWindow.cpp
+++ b/src/gui/ConnectWindow.cpp
@@ -157,13 +157,19 @@ ConnectWindow::connect(bool existing)
uri = user_uri;
}
- if (existing)
+ if (existing) {
uri = world->engine()->uri().str();
+ }
- // Create client-side listener
- SharedPtr<ThreadedSigClientInterface> tsci(new ThreadedSigClientInterface(1024));
+ SharedPtr<ThreadedSigClientInterface> tsci;
+ if (world->engine()) {
+ tsci = PtrCast<ThreadedSigClientInterface>(
+ world->engine()->respondee());
+ }
- world->set_engine(world->interface(uri, tsci));
+ if (!tsci) {
+ world->set_engine(world->interface(uri, tsci));
+ }
_app->attach(tsci);
_app->register_callbacks();
@@ -203,6 +209,7 @@ ConnectWindow::connect(bool existing)
SharedPtr<SigClientInterface> client(new SigClientInterface());
+ world->engine()->set_respondee(client);
_app->attach(client);
_app->register_callbacks();
diff --git a/src/ingen/main.cpp b/src/ingen/main.cpp
index 401986ea..2878ebc7 100644
--- a/src/ingen/main.cpp
+++ b/src/ingen/main.cpp
@@ -44,7 +44,7 @@
#include "ingen/shared/Configuration.hpp"
#include "ingen/shared/World.hpp"
#include "ingen/shared/runtime_paths.hpp"
-#include "ingen/client/SigClientInterface.hpp"
+#include "ingen/client/ThreadedSigClientInterface.hpp"
#ifdef WITH_BINDINGS
#include "bindings/ingen_bindings.hpp"
#endif
@@ -140,7 +140,7 @@ main(int argc, char** argv)
"Unable to load socket client module");
#endif
const char* const uri = conf.option("connect").get_string();
- SharedPtr<Interface> client(new Client::SigClientInterface());
+ SharedPtr<Interface> client(new Client::ThreadedSigClientInterface(1024));
ingen_try((engine_interface = world->interface(uri, client)),
(string("Unable to create interface to `") + uri + "'").c_str());
}
diff --git a/src/server/ClientBroadcaster.cpp b/src/server/ClientBroadcaster.cpp
index 46e6fb2f..42aad778 100644
--- a/src/server/ClientBroadcaster.cpp
+++ b/src/server/ClientBroadcaster.cpp
@@ -36,7 +36,7 @@ namespace Server {
/** Register a client to receive messages over the notification band.
*/
void
-ClientBroadcaster::register_client(const URI& uri, Interface* client)
+ClientBroadcaster::register_client(const URI& uri, SharedPtr<Interface> client)
{
Glib::Mutex::Lock lock(_clients_mutex);
LOG(info) << "Registered client: " << uri << endl;
@@ -65,7 +65,7 @@ ClientBroadcaster::unregister_client(const URI& uri)
/** Looks up the client with the given source @a uri (which is used as the
* unique identifier for registered clients).
*/
-Interface*
+SharedPtr<Interface>
ClientBroadcaster::client(const URI& uri)
{
Glib::Mutex::Lock lock(_clients_mutex);
@@ -73,7 +73,7 @@ ClientBroadcaster::client(const URI& uri)
if (i != _clients.end()) {
return (*i).second;
} else {
- return NULL;
+ return SharedPtr<Interface>();
}
}
@@ -82,7 +82,7 @@ ClientBroadcaster::send_plugins(const NodeFactory::Plugins& plugins)
{
Glib::Mutex::Lock lock(_clients_mutex);
for (Clients::const_iterator c = _clients.begin(); c != _clients.end(); ++c) {
- send_plugins_to((*c).second, plugins);
+ send_plugins_to((*c).second.get(), plugins);
}
}
@@ -109,7 +109,7 @@ ClientBroadcaster::send_object(const GraphObjectImpl* o, bool recursive)
{
Glib::Mutex::Lock lock(_clients_mutex);
for (Clients::const_iterator i = _clients.begin(); i != _clients.end(); ++i) {
- ObjectSender::send_object((*i).second, o, recursive);
+ ObjectSender::send_object((*i).second.get(), o, recursive);
}
}
diff --git a/src/server/ClientBroadcaster.hpp b/src/server/ClientBroadcaster.hpp
index c095b960..5eea1093 100644
--- a/src/server/ClientBroadcaster.hpp
+++ b/src/server/ClientBroadcaster.hpp
@@ -49,10 +49,10 @@ class ConnectionImpl;
class ClientBroadcaster : public Interface
{
public:
- void register_client(const Raul::URI& uri, Interface* client);
+ void register_client(const Raul::URI& uri, SharedPtr<Interface> client);
bool unregister_client(const Raul::URI& uri);
- Interface* client(const Raul::URI& uri);
+ SharedPtr<Interface> client(const Raul::URI& uri);
void send_plugins(const NodeFactory::Plugins& plugin_list);
void send_plugins_to(Interface*, const NodeFactory::Plugins& plugin_list);
@@ -118,7 +118,7 @@ public:
void error(const std::string& msg) { BROADCAST(error, msg); }
private:
- typedef std::map<Raul::URI, Interface*> Clients;
+ typedef std::map< Raul::URI, SharedPtr<Interface> > Clients;
Glib::Mutex _clients_mutex;
Clients _clients;
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index f2f088f5..f3b5e476 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -278,7 +278,7 @@ Engine::process_events(ProcessContext& context)
}
void
-Engine::register_client(const Raul::URI& uri, Interface* client)
+Engine::register_client(const Raul::URI& uri, SharedPtr<Interface> client)
{
_broadcaster->register_client(uri, client);
}
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index 5111f52c..9600e8a2 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -69,7 +69,8 @@ public:
virtual void run(uint32_t sample_count);
virtual void quit();
virtual bool main_iteration();
- virtual void register_client(const Raul::URI& uri, Interface* client);
+ virtual void register_client(const Raul::URI& uri,
+ SharedPtr<Interface> client);
virtual bool unregister_client(const Raul::URI& uri);
void set_driver(SharedPtr<Driver> driver);
diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp
index 19a738b0..4ec0756b 100644
--- a/src/server/EventWriter.cpp
+++ b/src/server/EventWriter.cpp
@@ -36,7 +36,6 @@ namespace Server {
EventWriter::EventWriter(Engine& engine)
: _engine(engine)
- , _respondee(NULL)
, _request_id(-1)
{
}
@@ -59,10 +58,6 @@ EventWriter::now() const
void
EventWriter::set_response_id(int32_t id)
{
- if (!_respondee) { // Kludge
- _respondee = _engine.broadcaster()->client(
- "http://drobilla.net/ns/ingen#internal");
- }
_request_id = id;
}
@@ -72,7 +67,7 @@ EventWriter::put(const URI& uri,
const Resource::Graph ctx)
{
_engine.enqueue_event(
- new Events::SetMetadata(_engine, _respondee, _request_id, now(),
+ new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(),
true, ctx, uri, properties));
}
@@ -82,7 +77,7 @@ EventWriter::delta(const URI& uri,
const Resource::Properties& add)
{
_engine.enqueue_event(
- new Events::SetMetadata(_engine, _respondee, _request_id, now(),
+ new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(),
false, Resource::DEFAULT, uri, add, remove));
}
@@ -91,7 +86,7 @@ EventWriter::move(const Path& old_path,
const Path& new_path)
{
_engine.enqueue_event(
- new Events::Move(_engine, _respondee, _request_id, now(),
+ new Events::Move(_engine, _respondee.get(), _request_id, now(),
old_path, new_path));
}
@@ -105,7 +100,7 @@ EventWriter::del(const URI& uri)
_engine.quit();
} else {
_engine.enqueue_event(
- new Events::Delete(_engine, _respondee, _request_id, now(), uri));
+ new Events::Delete(_engine, _respondee.get(), _request_id, now(), uri));
}
}
@@ -114,7 +109,7 @@ EventWriter::connect(const Path& tail_path,
const Path& head_path)
{
_engine.enqueue_event(
- new Events::Connect(_engine, _respondee, _request_id, now(),
+ new Events::Connect(_engine, _respondee.get(), _request_id, now(),
tail_path, head_path));
}
@@ -130,7 +125,7 @@ EventWriter::disconnect(const Path& src,
}
_engine.enqueue_event(
- new Events::Disconnect(_engine, _respondee, _request_id, now(),
+ new Events::Disconnect(_engine, _respondee.get(), _request_id, now(),
src, dst));
}
@@ -139,7 +134,7 @@ EventWriter::disconnect_all(const Path& patch_path,
const Path& path)
{
_engine.enqueue_event(
- new Events::DisconnectAll(_engine, _respondee, _request_id, now(),
+ new Events::DisconnectAll(_engine, _respondee.get(), _request_id, now(),
patch_path, path));
}
@@ -153,10 +148,10 @@ EventWriter::set_property(const URI& uri,
if (value.get_bool()) {
_engine.activate();
_engine.enqueue_event(
- new Events::Ping(_engine, _respondee, _request_id, now()));
+ new Events::Ping(_engine, _respondee.get(), _request_id, now()));
} else {
_engine.enqueue_event(
- new Events::Deactivate(_engine, _respondee, _request_id, now()));
+ new Events::Deactivate(_engine, _respondee.get(), _request_id, now()));
}
} else {
Resource::Properties remove;
@@ -164,7 +159,7 @@ EventWriter::set_property(const URI& uri,
Resource::Properties add;
add.insert(make_pair(predicate, value));
_engine.enqueue_event(
- new Events::SetMetadata(_engine, _respondee, _request_id, now(),
+ new Events::SetMetadata(_engine, _respondee.get(), _request_id, now(),
false, Resource::DEFAULT, uri, add, remove));
}
}
@@ -173,7 +168,7 @@ void
EventWriter::get(const URI& uri)
{
_engine.enqueue_event(
- new Events::Get(_engine, _respondee, _request_id, now(), uri));
+ new Events::Get(_engine, _respondee.get(), _request_id, now(), uri));
}
} // namespace Server
diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp
index 9e334f98..2a080be4 100644
--- a/src/server/EventWriter.hpp
+++ b/src/server/EventWriter.hpp
@@ -42,7 +42,13 @@ public:
Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; }
- void set_respondee(Interface* iface) { _respondee = iface; }
+ virtual SharedPtr<Interface> respondee() const {
+ return _respondee;
+ }
+
+ virtual void set_respondee(SharedPtr<Interface> respondee) {
+ _respondee = respondee;
+ }
virtual void set_response_id(int32_t id);
@@ -83,9 +89,9 @@ public:
virtual void error(const std::string& msg) {} ///< N/A
protected:
- Engine& _engine;
- Interface* _respondee;
- int32_t _request_id;
+ Engine& _engine;
+ SharedPtr<Interface> _respondee;
+ int32_t _request_id;
private:
SampleCount now() const;
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index 733f7a52..5e21f679 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -438,9 +438,9 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
LV2Driver* driver = new LV2Driver(*engine.get(), 4096, rate);
engine->set_driver(SharedPtr<Ingen::Server::Driver>(driver));
- interface->set_respondee(&driver->writer());
- engine->register_client("http://drobilla.net/ns/ingen#internal",
- &driver->writer());
+ SharedPtr<Interface> client(&driver->writer(), NullDeleter<Interface>);
+ interface->set_respondee(client);
+ engine->register_client("http://drobilla.net/ns/ingen#internal", client);
engine->activate();
Server::ThreadManager::single_threaded = true;
diff --git a/src/socket/SocketClient.hpp b/src/socket/SocketClient.hpp
index 22e6eeb4..d75288e0 100644
--- a/src/socket/SocketClient.hpp
+++ b/src/socket/SocketClient.hpp
@@ -41,6 +41,14 @@ public:
_reader.start();
}
+ virtual SharedPtr<Interface> respondee() const {
+ return _respondee;
+ }
+
+ virtual void set_respondee(SharedPtr<Interface> respondee) {
+ _respondee = respondee;
+ }
+
private:
SharedPtr<Interface> _respondee;
SocketReader _reader;
diff --git a/src/socket/SocketServer.hpp b/src/socket/SocketServer.hpp
index 8ea0f445..55434030 100644
--- a/src/socket/SocketServer.hpp
+++ b/src/socket/SocketServer.hpp
@@ -34,17 +34,23 @@ public:
SharedPtr<Socket> sock)
: Server::EventWriter(engine)
, SocketReader(world, *this, sock)
- , _writer(*world.lv2_uri_map().get(),
- *world.uris().get(),
- sock->uri(),
- sock)
+ , _engine(engine)
+ , _writer(new SocketWriter(*world.lv2_uri_map().get(),
+ *world.uris().get(),
+ sock->uri(),
+ sock))
{
- set_respondee(&_writer);
- engine.register_client(sock->uri(), &_writer);
+ set_respondee(_writer);
+ engine.register_client(_writer->uri(), _writer);
+ }
+
+ ~SocketServer() {
+ _engine.unregister_client(_writer->uri());
}
private:
- SocketWriter _writer;
+ Server::Engine& _engine;
+ SharedPtr<SocketWriter> _writer;
};
} // namespace Ingen
diff --git a/src/socket/SocketWriter.cpp b/src/socket/SocketWriter.cpp
index 29bc018b..808d62cf 100644
--- a/src/socket/SocketWriter.cpp
+++ b/src/socket/SocketWriter.cpp
@@ -14,6 +14,10 @@
along with Ingen. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
#include "SocketWriter.hpp"
namespace Ingen {
@@ -23,7 +27,11 @@ static size_t
socket_sink(const void* buf, size_t len, void* stream)
{
SocketWriter* writer = (SocketWriter*)stream;
- return write(writer->fd(), buf, len);
+ ssize_t ret = send(writer->fd(), buf, len, MSG_NOSIGNAL);
+ if (ret < 0) {
+ return 0;
+ }
+ return ret;
}
SocketWriter::SocketWriter(Shared::LV2URIMap& map,