summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2012-05-09 01:14:30 +0000
committerDavid Robillard <d@drobilla.net>2012-05-09 01:14:30 +0000
commitbaeb3c1872a989b69eb89fae04f93c59b06f258e (patch)
tree83720f9c7bad7ab26c910180d8caea2fd4cb7e70 /src/server
parent4d46a232b30be99bc34e581cbc636345f77c6bc4 (diff)
downloadingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.tar.gz
ingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.tar.bz2
ingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.zip
Simply event interface design and make only one pre-process thread.
This makes event pre-processing actually safe for multiple interfaces since multiple events will never be pre-processed simultaneously and the pre-process order is definitely the same as the execute order. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4323 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src/server')
-rw-r--r--src/server/Engine.cpp41
-rw-r--r--src/server/Engine.hpp15
-rw-r--r--src/server/EventSink.hpp37
-rw-r--r--src/server/EventSource.hpp58
-rw-r--r--src/server/EventWriter.cpp26
-rw-r--r--src/server/EventWriter.hpp11
-rw-r--r--src/server/PreProcessor.cpp (renamed from src/server/EventQueue.cpp)24
-rw-r--r--src/server/PreProcessor.hpp (renamed from src/server/EventQueue.hpp)42
-rw-r--r--src/server/ingen_engine.cpp9
-rw-r--r--src/server/ingen_lv2.cpp16
-rw-r--r--src/server/wscript2
11 files changed, 83 insertions, 198 deletions
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp
index 7138d2ea..c2cc7cab 100644
--- a/src/server/Engine.cpp
+++ b/src/server/Engine.cpp
@@ -36,12 +36,12 @@
#include "Engine.hpp"
#include "EngineStore.hpp"
#include "Event.hpp"
-#include "EventSource.hpp"
#include "EventWriter.hpp"
#include "MessageContext.hpp"
#include "NodeFactory.hpp"
#include "PatchImpl.hpp"
#include "PostProcessor.hpp"
+#include "PreProcessor.hpp"
#include "ProcessContext.hpp"
#include "ThreadManager.hpp"
@@ -60,7 +60,9 @@ Engine::Engine(Ingen::Shared::World* a_world)
, _maid(new Raul::Maid(event_queue_size()))
, _message_context(new MessageContext(*this))
, _node_factory(new NodeFactory(a_world))
+ , _pre_processor(new PreProcessor())
, _post_processor(new PostProcessor(*this))
+ , _event_writer(new EventWriter(*this))
, _quit_flag(false)
{
if (a_world->store()) {
@@ -123,15 +125,6 @@ Engine::main_iteration()
}
void
-Engine::add_event_source(SharedPtr<EventSource> source)
-{
- // FIXME: Not thread-safe
- _maid->manage(source);
- source->_next = _event_sources;
- _event_sources = source;
-}
-
-void
Engine::set_driver(SharedPtr<Driver> driver)
{
_driver = driver;
@@ -235,24 +228,24 @@ Engine::deactivate()
ThreadManager::single_threaded = true;
}
+bool
+Engine::pending_events()
+{
+ return !_pre_processor->empty();
+}
+
+void
+Engine::enqueue_event(Event* ev)
+{
+ ThreadManager::assert_not_thread(THREAD_PROCESS);
+ _pre_processor->event(ev);
+}
+
void
Engine::process_events(ProcessContext& context)
{
ThreadManager::assert_thread(THREAD_PROCESS);
-
- SharedPtr<EventSource> src = _event_sources;
- SharedPtr<EventSource> prev = src;
- for (; src; src = src->_next) {
- if (!src->process(*_post_processor, context)) {
- // Source is finished, remove
- if (src == _event_sources) {
- _event_sources = src->_next;
- } else {
- prev->_next = src->_next;
- }
- }
- prev = src;
- }
+ _pre_processor->process(*_post_processor, context);
}
void
diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp
index 5f218078..042264e4 100644
--- a/src/server/Engine.hpp
+++ b/src/server/Engine.hpp
@@ -17,8 +17,6 @@
#ifndef INGEN_ENGINE_ENGINE_HPP
#define INGEN_ENGINE_ENGINE_HPP
-#include <vector>
-
#include <boost/utility.hpp>
#include "ingen/EngineBase.hpp"
@@ -38,10 +36,12 @@ class ClientBroadcaster;
class ControlBindings;
class Driver;
class EngineStore;
-class EventSource;
+class Event;
+class EventWriter;
class MessageContext;
class NodeFactory;
class PostProcessor;
+class PreProcessor;
class ProcessContext;
/**
@@ -69,11 +69,14 @@ public:
virtual bool unregister_client(const Raul::URI& uri);
void set_driver(SharedPtr<Driver> driver);
- void add_event_source(SharedPtr<EventSource> source);
+
+ bool pending_events();
+ void enqueue_event(Event* ev);
void process_events(ProcessContext& context);
Ingen::Shared::World* world() const { return _world; }
+ EventWriter* interface() const { return _event_writer; }
ClientBroadcaster* broadcaster() const { return _broadcaster; }
BufferFactory* buffer_factory() const { return _buffer_factory; }
ControlBindings* control_bindings() const { return _control_bindings; }
@@ -97,9 +100,9 @@ private:
Raul::Maid* _maid;
MessageContext* _message_context;
NodeFactory* _node_factory;
+ PreProcessor* _pre_processor;
PostProcessor* _post_processor;
-
- SharedPtr<EventSource> _event_sources; ///< Intrusive linked list
+ EventWriter* _event_writer;
bool _quit_flag;
};
diff --git a/src/server/EventSink.hpp b/src/server/EventSink.hpp
deleted file mode 100644
index e24fa5e9..00000000
--- a/src/server/EventSink.hpp
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2007-2012 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef INGEN_ENGINE_EVENTSINK_HPP
-#define INGEN_ENGINE_EVENTSINK_HPP
-
-namespace Ingen {
-namespace Server {
-
-class Event;
-
-class EventSink
-{
-public:
- virtual ~EventSink() {}
-
- virtual void event(Event* ev) = 0;
-};
-
-} // namespace Server
-} // namespace Ingen
-
-#endif // INGEN_ENGINE_EVENTSINK_HPP
-
diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp
deleted file mode 100644
index 320b6b7a..00000000
--- a/src/server/EventSource.hpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- This file is part of Ingen.
- Copyright 2007-2012 David Robillard <http://drobilla.net/>
-
- Ingen is free software: you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free
- Software Foundation, either version 3 of the License, or any later version.
-
- Ingen is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for details.
-
- You should have received a copy of the GNU Affero General Public License
- along with Ingen. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef INGEN_ENGINE_EVENTSOURCE_HPP
-#define INGEN_ENGINE_EVENTSOURCE_HPP
-
-#include <raul/SharedPtr.hpp>
-#include <raul/Deletable.hpp>
-
-namespace Ingen {
-namespace Server {
-
-class Event;
-class PostProcessor;
-class ProcessContext;
-
-/** Source for events to run in the audio thread.
- *
- * The Driver gets events from an EventQueue in the process callback and
- * executes them, then they are sent to the PostProcessor and finalised
- * (post-processing thread).
- */
-class EventSource : public Raul::Deletable
-{
-public:
- EventSource() {}
- virtual ~EventSource() {}
-
- /** Process events for a cycle.
- * @return False iff this source is finished and should be removed.
- */
- virtual bool process(PostProcessor& dest,
- ProcessContext& context,
- bool limit = true) = 0;
-
-private:
- friend class Engine;
- SharedPtr<EventSource> _next; ///< Intrusive linked list for Engine
-};
-
-} // namespace Server
-} // namespace Ingen
-
-#endif // INGEN_ENGINE_EVENTSOURCE_HPP
-
diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp
index 4725835b..ca8ad196 100644
--- a/src/server/EventWriter.cpp
+++ b/src/server/EventWriter.cpp
@@ -23,7 +23,6 @@
#include "ClientBroadcaster.hpp"
#include "Driver.hpp"
#include "Engine.hpp"
-#include "EventQueue.hpp"
#include "EventWriter.hpp"
#include "events.hpp"
@@ -35,9 +34,8 @@ using namespace Raul;
namespace Ingen {
namespace Server {
-EventWriter::EventWriter(Engine& engine, EventSink& sink)
+EventWriter::EventWriter(Engine& engine)
: _request_client(NULL)
- , _sink(sink)
, _request_id(-1)
, _engine(engine)
, _in_bundle(false)
@@ -92,7 +90,7 @@ EventWriter::put(const URI& uri,
const Resource::Properties& properties,
const Resource::Graph ctx)
{
- _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties));
+ _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties));
}
void
@@ -100,14 +98,14 @@ EventWriter::delta(const URI& uri,
const Resource::Properties& remove,
const Resource::Properties& add)
{
- _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove));
+ _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove));
}
void
EventWriter::move(const Path& old_path,
const Path& new_path)
{
- _sink.event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path));
+ _engine.enqueue_event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path));
}
void
@@ -119,7 +117,7 @@ EventWriter::del(const URI& uri)
}
_engine.quit();
} else {
- _sink.event(new Events::Delete(_engine, _request_client, _request_id, now(), uri));
+ _engine.enqueue_event(new Events::Delete(_engine, _request_client, _request_id, now(), uri));
}
}
@@ -127,7 +125,7 @@ void
EventWriter::connect(const Path& tail_path,
const Path& head_path)
{
- _sink.event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path));
+ _engine.enqueue_event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path));
}
@@ -141,7 +139,7 @@ EventWriter::disconnect(const Path& src,
return;
}
- _sink.event(new Events::Disconnect(_engine, _request_client, _request_id, now(),
+ _engine.enqueue_event(new Events::Disconnect(_engine, _request_client, _request_id, now(),
Path(src.str()), Path(dst.str())));
}
@@ -149,7 +147,7 @@ void
EventWriter::disconnect_all(const Path& patch_path,
const Path& path)
{
- _sink.event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path));
+ _engine.enqueue_event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path));
}
void
@@ -161,16 +159,16 @@ EventWriter::set_property(const URI& uri,
&& value.type() == _engine.world()->forge().Bool) {
if (value.get_bool()) {
_engine.activate();
- _sink.event(new Events::Ping(_engine, _request_client, _request_id, now()));
+ _engine.enqueue_event(new Events::Ping(_engine, _request_client, _request_id, now()));
} else {
- _sink.event(new Events::Deactivate(_engine, _request_client, _request_id, now()));
+ _engine.enqueue_event(new Events::Deactivate(_engine, _request_client, _request_id, now()));
}
} else {
Resource::Properties remove;
remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard));
Resource::Properties add;
add.insert(make_pair(predicate, value));
- _sink.event(new Events::SetMetadata(
+ _engine.enqueue_event(new Events::SetMetadata(
_engine, _request_client, _request_id, now(), false, Resource::DEFAULT,
uri, add, remove));
}
@@ -181,7 +179,7 @@ EventWriter::set_property(const URI& uri,
void
EventWriter::get(const URI& uri)
{
- _sink.event(new Events::Get(_engine, _request_client, _request_id, now(), uri));
+ _engine.enqueue_event(new Events::Get(_engine, _request_client, _request_id, now(), uri));
}
} // namespace Server
diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp
index 771d22f5..1c5d11db 100644
--- a/src/server/EventWriter.hpp
+++ b/src/server/EventWriter.hpp
@@ -31,17 +31,13 @@ namespace Ingen {
namespace Server {
class Engine;
-class EventSink;
-/** An Interface that creates and writes Events to an EventSink.
- *
- * This is where Interface calls get turned into Events which are actually
- * processed by the engine to do things.
+/** An Interface that creates and enqueues Events for the Engine to execute.
*/
class EventWriter : public Interface
{
public:
- explicit EventWriter(Engine& engine, EventSink& sink);
+ explicit EventWriter(Engine& engine);
virtual ~EventWriter();
Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; }
@@ -87,7 +83,6 @@ public:
protected:
Interface* _request_client;
- EventSink& _sink;
int32_t _request_id;
Engine& _engine;
bool _in_bundle; ///< True iff a bundle is currently being received
@@ -99,4 +94,4 @@ private:
} // namespace Server
} // namespace Ingen
-#endif // INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP
+#endif // INGEN_ENGINE_EVENTWRITER_HPP
diff --git a/src/server/EventQueue.cpp b/src/server/PreProcessor.cpp
index 585eaa34..a88b74e5 100644
--- a/src/server/EventQueue.cpp
+++ b/src/server/PreProcessor.cpp
@@ -15,8 +15,8 @@
*/
#include "Event.hpp"
-#include "EventQueue.hpp"
#include "PostProcessor.hpp"
+#include "PreProcessor.hpp"
#include "ProcessContext.hpp"
#include "ThreadManager.hpp"
@@ -25,23 +25,24 @@ using namespace std;
namespace Ingen {
namespace Server {
-EventQueue::EventQueue()
+PreProcessor::PreProcessor()
{
Thread::set_context(THREAD_PRE_PROCESS);
- set_name("EventQueue");
+ set_name("PreProcessor");
start();
}
-EventQueue::~EventQueue()
+PreProcessor::~PreProcessor()
{
stop();
}
-/** Push an unprepared event onto the queue.
- */
void
-EventQueue::event(Event* const ev)
+PreProcessor::event(Event* const ev)
{
+ // TODO: Probably possible to make this lock-free with CAS
+ Glib::Mutex::Lock lock(_mutex);
+
assert(!ev->is_prepared());
assert(!ev->next());
@@ -63,12 +64,8 @@ EventQueue::event(Event* const ev)
whip();
}
-/** Process all events for a cycle.
- *
- * Executed events will be pushed to @a dest.
- */
bool
-EventQueue::process(PostProcessor& dest, ProcessContext& context, bool limit)
+PreProcessor::process(PostProcessor& dest, ProcessContext& context, bool limit)
{
ThreadManager::assert_thread(THREAD_PROCESS);
@@ -111,7 +108,7 @@ EventQueue::process(PostProcessor& dest, ProcessContext& context, bool limit)
/** Pre-process a single event */
void
-EventQueue::_whipped()
+PreProcessor::_whipped()
{
Event* ev = _prepared_back.get();
if (!ev)
@@ -126,4 +123,3 @@ EventQueue::_whipped()
} // namespace Server
} // namespace Ingen
-
diff --git a/src/server/EventQueue.hpp b/src/server/PreProcessor.hpp
index edd6c6b4..4fd1c91e 100644
--- a/src/server/EventQueue.hpp
+++ b/src/server/PreProcessor.hpp
@@ -14,14 +14,12 @@
along with Ingen. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef INGEN_ENGINE_EVENTQUEUE_HPP
-#define INGEN_ENGINE_EVENTQUEUE_HPP
+#ifndef INGEN_ENGINE_PREPROCESSOR_HPP
+#define INGEN_ENGINE_PREPROCESSOR_HPP
-#include "raul/AtomicPtr.hpp"
-#include "raul/Slave.hpp"
+#include <glibmm/thread.h>
-#include "EventSink.hpp"
-#include "EventSource.hpp"
+#include "raul/Slave.hpp"
namespace Ingen {
namespace Server {
@@ -30,27 +28,33 @@ class Event;
class PostProcessor;
class ProcessContext;
-/** An EventSource which prepares events in its own thread.
- */
-class EventQueue : public EventSource
- , public EventSink
- , public Raul::Slave
+class PreProcessor : public Raul::Slave
{
public:
- explicit EventQueue();
- virtual ~EventQueue();
+ explicit PreProcessor();
- bool process(PostProcessor& dest, ProcessContext& context, bool limit=true);
+ ~PreProcessor();
- inline bool unprepared_events() const { return _prepared_back.get(); }
- inline bool empty() const { return !_head.get(); }
+ /** Return true iff no events are enqueued. */
+ inline bool empty() const { return !_head.get(); }
-protected:
+ /** Enqueue an event.
+ * This is safe to call from any non-realtime thread (it locks).
+ */
void event(Event* ev);
- virtual void _whipped(); ///< Prepare 1 event
+ /** Process events for a cycle.
+ * @return False iff this source is finished and should be removed.
+ */
+ bool process(PostProcessor& dest,
+ ProcessContext& context,
+ bool limit = true);
+
+protected:
+ virtual void _whipped(); ///< Prepare 1 event
private:
+ Glib::Mutex _mutex;
Raul::AtomicPtr<Event> _head;
Raul::AtomicPtr<Event> _prepared_back;
Raul::AtomicPtr<Event> _tail;
@@ -59,5 +63,5 @@ private:
} // namespace Server
} // namespace Ingen
-#endif // INGEN_ENGINE_EVENTQUEUE_HPP
+#endif // INGEN_ENGINE_PREPROCESSOR_HPP
diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp
index 28659ec6..917e83cf 100644
--- a/src/server/ingen_engine.cpp
+++ b/src/server/ingen_engine.cpp
@@ -18,7 +18,6 @@
#include "ingen/shared/World.hpp"
#include "Engine.hpp"
#include "EventWriter.hpp"
-#include "EventQueue.hpp"
#include "util.hpp"
using namespace Ingen;
@@ -28,11 +27,9 @@ struct IngenEngineModule : public Ingen::Shared::Module {
Server::set_denormal_flags();
SharedPtr<Server::Engine> engine(new Server::Engine(world));
world->set_local_engine(engine);
- SharedPtr<Server::EventQueue> queue(new Server::EventQueue());
- SharedPtr<Server::EventWriter> interface(
- new Server::EventWriter(*engine.get(), *queue.get()));
- world->set_engine(interface);
- engine->add_event_source(queue);
+ if (!world->engine()) {
+ world->set_engine(SharedPtr<Interface>(engine->interface(), NullDeleter<Interface>));
+ }
assert(world->local_engine() == engine);
}
};
diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp
index 94e7344d..731c2f6f 100644
--- a/src/server/ingen_lv2.cpp
+++ b/src/server/ingen_lv2.cpp
@@ -45,7 +45,6 @@
#include "AudioBuffer.hpp"
#include "Driver.hpp"
#include "Engine.hpp"
-#include "EventQueue.hpp"
#include "EventWriter.hpp"
#include "PatchImpl.hpp"
#include "PostProcessor.hpp"
@@ -426,12 +425,10 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
plugin->main = new MainThread(engine);
plugin->main->set_name("Main");
- SharedPtr<Server::EventQueue> queue(new Server::EventQueue());
- SharedPtr<Server::EventWriter> interface(
- new Server::EventWriter(*engine.get(), *queue.get()));
+ SharedPtr<EventWriter> interface =
+ SharedPtr<EventWriter>(engine->interface(), NullDeleter<EventWriter>);
plugin->world->set_engine(interface);
- engine->add_event_source(queue);
Raul::Thread::get().set_context(Server::THREAD_PRE_PROCESS);
Server::ThreadManager::single_threaded = true;
@@ -451,23 +448,20 @@ ingen_instantiate(const LV2_Descriptor* descriptor,
context.locate(0, UINT_MAX, 0);
engine->post_processor()->set_end_time(UINT_MAX);
-
- queue->process(*engine->post_processor(), context, false);
+ engine->process_events(context);
engine->post_processor()->process();
plugin->world->parser()->parse_file(plugin->world,
plugin->world->engine().get(),
patch->filename);
- while (!queue->empty()) {
- queue->process(*engine->post_processor(), context, false);
+ while (engine->pending_events()) {
+ engine->process_events(context);
engine->post_processor()->process();
}
engine->deactivate();
- //plugin->world->load_module("osc_server");
-
return (LV2_Handle)plugin;
}
diff --git a/src/server/wscript b/src/server/wscript
index 823583ba..a043c0fa 100644
--- a/src/server/wscript
+++ b/src/server/wscript
@@ -13,7 +13,6 @@ def build(bld):
Engine.cpp
EngineStore.cpp
Event.cpp
- EventQueue.cpp
EventWriter.cpp
GraphObjectImpl.cpp
InputPort.cpp
@@ -31,6 +30,7 @@ def build(bld):
PluginImpl.cpp
PortImpl.cpp
PostProcessor.cpp
+ PreProcessor.cpp
ProcessContext.cpp
ProcessSlave.cpp
events/Connect.cpp