diff options
author | David Robillard <d@drobilla.net> | 2012-05-09 01:14:30 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2012-05-09 01:14:30 +0000 |
commit | baeb3c1872a989b69eb89fae04f93c59b06f258e (patch) | |
tree | 83720f9c7bad7ab26c910180d8caea2fd4cb7e70 /src/server | |
parent | 4d46a232b30be99bc34e581cbc636345f77c6bc4 (diff) | |
download | ingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.tar.gz ingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.tar.bz2 ingen-baeb3c1872a989b69eb89fae04f93c59b06f258e.zip |
Simply event interface design and make only one pre-process thread.
This makes event pre-processing actually safe for multiple interfaces since multiple events will never be pre-processed simultaneously and the pre-process order is definitely the same as the execute order.
git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4323 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/Engine.cpp | 41 | ||||
-rw-r--r-- | src/server/Engine.hpp | 15 | ||||
-rw-r--r-- | src/server/EventSink.hpp | 37 | ||||
-rw-r--r-- | src/server/EventSource.hpp | 58 | ||||
-rw-r--r-- | src/server/EventWriter.cpp | 26 | ||||
-rw-r--r-- | src/server/EventWriter.hpp | 11 | ||||
-rw-r--r-- | src/server/PreProcessor.cpp (renamed from src/server/EventQueue.cpp) | 24 | ||||
-rw-r--r-- | src/server/PreProcessor.hpp (renamed from src/server/EventQueue.hpp) | 42 | ||||
-rw-r--r-- | src/server/ingen_engine.cpp | 9 | ||||
-rw-r--r-- | src/server/ingen_lv2.cpp | 16 | ||||
-rw-r--r-- | src/server/wscript | 2 |
11 files changed, 83 insertions, 198 deletions
diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 7138d2ea..c2cc7cab 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -36,12 +36,12 @@ #include "Engine.hpp" #include "EngineStore.hpp" #include "Event.hpp" -#include "EventSource.hpp" #include "EventWriter.hpp" #include "MessageContext.hpp" #include "NodeFactory.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" +#include "PreProcessor.hpp" #include "ProcessContext.hpp" #include "ThreadManager.hpp" @@ -60,7 +60,9 @@ Engine::Engine(Ingen::Shared::World* a_world) , _maid(new Raul::Maid(event_queue_size())) , _message_context(new MessageContext(*this)) , _node_factory(new NodeFactory(a_world)) + , _pre_processor(new PreProcessor()) , _post_processor(new PostProcessor(*this)) + , _event_writer(new EventWriter(*this)) , _quit_flag(false) { if (a_world->store()) { @@ -123,15 +125,6 @@ Engine::main_iteration() } void -Engine::add_event_source(SharedPtr<EventSource> source) -{ - // FIXME: Not thread-safe - _maid->manage(source); - source->_next = _event_sources; - _event_sources = source; -} - -void Engine::set_driver(SharedPtr<Driver> driver) { _driver = driver; @@ -235,24 +228,24 @@ Engine::deactivate() ThreadManager::single_threaded = true; } +bool +Engine::pending_events() +{ + return !_pre_processor->empty(); +} + +void +Engine::enqueue_event(Event* ev) +{ + ThreadManager::assert_not_thread(THREAD_PROCESS); + _pre_processor->event(ev); +} + void Engine::process_events(ProcessContext& context) { ThreadManager::assert_thread(THREAD_PROCESS); - - SharedPtr<EventSource> src = _event_sources; - SharedPtr<EventSource> prev = src; - for (; src; src = src->_next) { - if (!src->process(*_post_processor, context)) { - // Source is finished, remove - if (src == _event_sources) { - _event_sources = src->_next; - } else { - prev->_next = src->_next; - } - } - prev = src; - } + _pre_processor->process(*_post_processor, context); } void diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 5f218078..042264e4 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -17,8 +17,6 @@ #ifndef INGEN_ENGINE_ENGINE_HPP #define INGEN_ENGINE_ENGINE_HPP -#include <vector> - #include <boost/utility.hpp> #include "ingen/EngineBase.hpp" @@ -38,10 +36,12 @@ class ClientBroadcaster; class ControlBindings; class Driver; class EngineStore; -class EventSource; +class Event; +class EventWriter; class MessageContext; class NodeFactory; class PostProcessor; +class PreProcessor; class ProcessContext; /** @@ -69,11 +69,14 @@ public: virtual bool unregister_client(const Raul::URI& uri); void set_driver(SharedPtr<Driver> driver); - void add_event_source(SharedPtr<EventSource> source); + + bool pending_events(); + void enqueue_event(Event* ev); void process_events(ProcessContext& context); Ingen::Shared::World* world() const { return _world; } + EventWriter* interface() const { return _event_writer; } ClientBroadcaster* broadcaster() const { return _broadcaster; } BufferFactory* buffer_factory() const { return _buffer_factory; } ControlBindings* control_bindings() const { return _control_bindings; } @@ -97,9 +100,9 @@ private: Raul::Maid* _maid; MessageContext* _message_context; NodeFactory* _node_factory; + PreProcessor* _pre_processor; PostProcessor* _post_processor; - - SharedPtr<EventSource> _event_sources; ///< Intrusive linked list + EventWriter* _event_writer; bool _quit_flag; }; diff --git a/src/server/EventSink.hpp b/src/server/EventSink.hpp deleted file mode 100644 index e24fa5e9..00000000 --- a/src/server/EventSink.hpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef INGEN_ENGINE_EVENTSINK_HPP -#define INGEN_ENGINE_EVENTSINK_HPP - -namespace Ingen { -namespace Server { - -class Event; - -class EventSink -{ -public: - virtual ~EventSink() {} - - virtual void event(Event* ev) = 0; -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_EVENTSINK_HPP - diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp deleted file mode 100644 index 320b6b7a..00000000 --- a/src/server/EventSource.hpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard <http://drobilla.net/> - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef INGEN_ENGINE_EVENTSOURCE_HPP -#define INGEN_ENGINE_EVENTSOURCE_HPP - -#include <raul/SharedPtr.hpp> -#include <raul/Deletable.hpp> - -namespace Ingen { -namespace Server { - -class Event; -class PostProcessor; -class ProcessContext; - -/** Source for events to run in the audio thread. - * - * The Driver gets events from an EventQueue in the process callback and - * executes them, then they are sent to the PostProcessor and finalised - * (post-processing thread). - */ -class EventSource : public Raul::Deletable -{ -public: - EventSource() {} - virtual ~EventSource() {} - - /** Process events for a cycle. - * @return False iff this source is finished and should be removed. - */ - virtual bool process(PostProcessor& dest, - ProcessContext& context, - bool limit = true) = 0; - -private: - friend class Engine; - SharedPtr<EventSource> _next; ///< Intrusive linked list for Engine -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_EVENTSOURCE_HPP - diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp index 4725835b..ca8ad196 100644 --- a/src/server/EventWriter.cpp +++ b/src/server/EventWriter.cpp @@ -23,7 +23,6 @@ #include "ClientBroadcaster.hpp" #include "Driver.hpp" #include "Engine.hpp" -#include "EventQueue.hpp" #include "EventWriter.hpp" #include "events.hpp" @@ -35,9 +34,8 @@ using namespace Raul; namespace Ingen { namespace Server { -EventWriter::EventWriter(Engine& engine, EventSink& sink) +EventWriter::EventWriter(Engine& engine) : _request_client(NULL) - , _sink(sink) , _request_id(-1) , _engine(engine) , _in_bundle(false) @@ -92,7 +90,7 @@ EventWriter::put(const URI& uri, const Resource::Properties& properties, const Resource::Graph ctx) { - _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); + _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); } void @@ -100,14 +98,14 @@ EventWriter::delta(const URI& uri, const Resource::Properties& remove, const Resource::Properties& add) { - _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); + _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } void EventWriter::move(const Path& old_path, const Path& new_path) { - _sink.event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); + _engine.enqueue_event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); } void @@ -119,7 +117,7 @@ EventWriter::del(const URI& uri) } _engine.quit(); } else { - _sink.event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); } } @@ -127,7 +125,7 @@ void EventWriter::connect(const Path& tail_path, const Path& head_path) { - _sink.event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); + _engine.enqueue_event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); } @@ -141,7 +139,7 @@ EventWriter::disconnect(const Path& src, return; } - _sink.event(new Events::Disconnect(_engine, _request_client, _request_id, now(), + _engine.enqueue_event(new Events::Disconnect(_engine, _request_client, _request_id, now(), Path(src.str()), Path(dst.str()))); } @@ -149,7 +147,7 @@ void EventWriter::disconnect_all(const Path& patch_path, const Path& path) { - _sink.event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); + _engine.enqueue_event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); } void @@ -161,16 +159,16 @@ EventWriter::set_property(const URI& uri, && value.type() == _engine.world()->forge().Bool) { if (value.get_bool()) { _engine.activate(); - _sink.event(new Events::Ping(_engine, _request_client, _request_id, now())); + _engine.enqueue_event(new Events::Ping(_engine, _request_client, _request_id, now())); } else { - _sink.event(new Events::Deactivate(_engine, _request_client, _request_id, now())); + _engine.enqueue_event(new Events::Deactivate(_engine, _request_client, _request_id, now())); } } else { Resource::Properties remove; remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard)); Resource::Properties add; add.insert(make_pair(predicate, value)); - _sink.event(new Events::SetMetadata( + _engine.enqueue_event(new Events::SetMetadata( _engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } @@ -181,7 +179,7 @@ EventWriter::set_property(const URI& uri, void EventWriter::get(const URI& uri) { - _sink.event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); } } // namespace Server diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp index 771d22f5..1c5d11db 100644 --- a/src/server/EventWriter.hpp +++ b/src/server/EventWriter.hpp @@ -31,17 +31,13 @@ namespace Ingen { namespace Server { class Engine; -class EventSink; -/** An Interface that creates and writes Events to an EventSink. - * - * This is where Interface calls get turned into Events which are actually - * processed by the engine to do things. +/** An Interface that creates and enqueues Events for the Engine to execute. */ class EventWriter : public Interface { public: - explicit EventWriter(Engine& engine, EventSink& sink); + explicit EventWriter(Engine& engine); virtual ~EventWriter(); Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } @@ -87,7 +83,6 @@ public: protected: Interface* _request_client; - EventSink& _sink; int32_t _request_id; Engine& _engine; bool _in_bundle; ///< True iff a bundle is currently being received @@ -99,4 +94,4 @@ private: } // namespace Server } // namespace Ingen -#endif // INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP +#endif // INGEN_ENGINE_EVENTWRITER_HPP diff --git a/src/server/EventQueue.cpp b/src/server/PreProcessor.cpp index 585eaa34..a88b74e5 100644 --- a/src/server/EventQueue.cpp +++ b/src/server/PreProcessor.cpp @@ -15,8 +15,8 @@ */ #include "Event.hpp" -#include "EventQueue.hpp" #include "PostProcessor.hpp" +#include "PreProcessor.hpp" #include "ProcessContext.hpp" #include "ThreadManager.hpp" @@ -25,23 +25,24 @@ using namespace std; namespace Ingen { namespace Server { -EventQueue::EventQueue() +PreProcessor::PreProcessor() { Thread::set_context(THREAD_PRE_PROCESS); - set_name("EventQueue"); + set_name("PreProcessor"); start(); } -EventQueue::~EventQueue() +PreProcessor::~PreProcessor() { stop(); } -/** Push an unprepared event onto the queue. - */ void -EventQueue::event(Event* const ev) +PreProcessor::event(Event* const ev) { + // TODO: Probably possible to make this lock-free with CAS + Glib::Mutex::Lock lock(_mutex); + assert(!ev->is_prepared()); assert(!ev->next()); @@ -63,12 +64,8 @@ EventQueue::event(Event* const ev) whip(); } -/** Process all events for a cycle. - * - * Executed events will be pushed to @a dest. - */ bool -EventQueue::process(PostProcessor& dest, ProcessContext& context, bool limit) +PreProcessor::process(PostProcessor& dest, ProcessContext& context, bool limit) { ThreadManager::assert_thread(THREAD_PROCESS); @@ -111,7 +108,7 @@ EventQueue::process(PostProcessor& dest, ProcessContext& context, bool limit) /** Pre-process a single event */ void -EventQueue::_whipped() +PreProcessor::_whipped() { Event* ev = _prepared_back.get(); if (!ev) @@ -126,4 +123,3 @@ EventQueue::_whipped() } // namespace Server } // namespace Ingen - diff --git a/src/server/EventQueue.hpp b/src/server/PreProcessor.hpp index edd6c6b4..4fd1c91e 100644 --- a/src/server/EventQueue.hpp +++ b/src/server/PreProcessor.hpp @@ -14,14 +14,12 @@ along with Ingen. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef INGEN_ENGINE_EVENTQUEUE_HPP -#define INGEN_ENGINE_EVENTQUEUE_HPP +#ifndef INGEN_ENGINE_PREPROCESSOR_HPP +#define INGEN_ENGINE_PREPROCESSOR_HPP -#include "raul/AtomicPtr.hpp" -#include "raul/Slave.hpp" +#include <glibmm/thread.h> -#include "EventSink.hpp" -#include "EventSource.hpp" +#include "raul/Slave.hpp" namespace Ingen { namespace Server { @@ -30,27 +28,33 @@ class Event; class PostProcessor; class ProcessContext; -/** An EventSource which prepares events in its own thread. - */ -class EventQueue : public EventSource - , public EventSink - , public Raul::Slave +class PreProcessor : public Raul::Slave { public: - explicit EventQueue(); - virtual ~EventQueue(); + explicit PreProcessor(); - bool process(PostProcessor& dest, ProcessContext& context, bool limit=true); + ~PreProcessor(); - inline bool unprepared_events() const { return _prepared_back.get(); } - inline bool empty() const { return !_head.get(); } + /** Return true iff no events are enqueued. */ + inline bool empty() const { return !_head.get(); } -protected: + /** Enqueue an event. + * This is safe to call from any non-realtime thread (it locks). + */ void event(Event* ev); - virtual void _whipped(); ///< Prepare 1 event + /** Process events for a cycle. + * @return False iff this source is finished and should be removed. + */ + bool process(PostProcessor& dest, + ProcessContext& context, + bool limit = true); + +protected: + virtual void _whipped(); ///< Prepare 1 event private: + Glib::Mutex _mutex; Raul::AtomicPtr<Event> _head; Raul::AtomicPtr<Event> _prepared_back; Raul::AtomicPtr<Event> _tail; @@ -59,5 +63,5 @@ private: } // namespace Server } // namespace Ingen -#endif // INGEN_ENGINE_EVENTQUEUE_HPP +#endif // INGEN_ENGINE_PREPROCESSOR_HPP diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp index 28659ec6..917e83cf 100644 --- a/src/server/ingen_engine.cpp +++ b/src/server/ingen_engine.cpp @@ -18,7 +18,6 @@ #include "ingen/shared/World.hpp" #include "Engine.hpp" #include "EventWriter.hpp" -#include "EventQueue.hpp" #include "util.hpp" using namespace Ingen; @@ -28,11 +27,9 @@ struct IngenEngineModule : public Ingen::Shared::Module { Server::set_denormal_flags(); SharedPtr<Server::Engine> engine(new Server::Engine(world)); world->set_local_engine(engine); - SharedPtr<Server::EventQueue> queue(new Server::EventQueue()); - SharedPtr<Server::EventWriter> interface( - new Server::EventWriter(*engine.get(), *queue.get())); - world->set_engine(interface); - engine->add_event_source(queue); + if (!world->engine()) { + world->set_engine(SharedPtr<Interface>(engine->interface(), NullDeleter<Interface>)); + } assert(world->local_engine() == engine); } }; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 94e7344d..731c2f6f 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -45,7 +45,6 @@ #include "AudioBuffer.hpp" #include "Driver.hpp" #include "Engine.hpp" -#include "EventQueue.hpp" #include "EventWriter.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" @@ -426,12 +425,10 @@ ingen_instantiate(const LV2_Descriptor* descriptor, plugin->main = new MainThread(engine); plugin->main->set_name("Main"); - SharedPtr<Server::EventQueue> queue(new Server::EventQueue()); - SharedPtr<Server::EventWriter> interface( - new Server::EventWriter(*engine.get(), *queue.get())); + SharedPtr<EventWriter> interface = + SharedPtr<EventWriter>(engine->interface(), NullDeleter<EventWriter>); plugin->world->set_engine(interface); - engine->add_event_source(queue); Raul::Thread::get().set_context(Server::THREAD_PRE_PROCESS); Server::ThreadManager::single_threaded = true; @@ -451,23 +448,20 @@ ingen_instantiate(const LV2_Descriptor* descriptor, context.locate(0, UINT_MAX, 0); engine->post_processor()->set_end_time(UINT_MAX); - - queue->process(*engine->post_processor(), context, false); + engine->process_events(context); engine->post_processor()->process(); plugin->world->parser()->parse_file(plugin->world, plugin->world->engine().get(), patch->filename); - while (!queue->empty()) { - queue->process(*engine->post_processor(), context, false); + while (engine->pending_events()) { + engine->process_events(context); engine->post_processor()->process(); } engine->deactivate(); - //plugin->world->load_module("osc_server"); - return (LV2_Handle)plugin; } diff --git a/src/server/wscript b/src/server/wscript index 823583ba..a043c0fa 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -13,7 +13,6 @@ def build(bld): Engine.cpp EngineStore.cpp Event.cpp - EventQueue.cpp EventWriter.cpp GraphObjectImpl.cpp InputPort.cpp @@ -31,6 +30,7 @@ def build(bld): PluginImpl.cpp PortImpl.cpp PostProcessor.cpp + PreProcessor.cpp ProcessContext.cpp ProcessSlave.cpp events/Connect.cpp |