From baeb3c1872a989b69eb89fae04f93c59b06f258e Mon Sep 17 00:00:00 2001 From: David Robillard Date: Wed, 9 May 2012 01:14:30 +0000 Subject: Simply event interface design and make only one pre-process thread. This makes event pre-processing actually safe for multiple interfaces since multiple events will never be pre-processed simultaneously and the pre-process order is definitely the same as the execute order. git-svn-id: http://svn.drobilla.net/lad/trunk/ingen@4323 a436a847-0d15-0410-975c-d299462d15a1 --- src/server/Engine.cpp | 41 +++++------- src/server/Engine.hpp | 15 +++-- src/server/EventQueue.cpp | 129 ------------------------------------- src/server/EventQueue.hpp | 63 ------------------ src/server/EventSink.hpp | 37 ----------- src/server/EventSource.hpp | 58 ----------------- src/server/EventWriter.cpp | 26 ++++---- src/server/EventWriter.hpp | 11 +--- src/server/PreProcessor.cpp | 125 +++++++++++++++++++++++++++++++++++ src/server/PreProcessor.hpp | 67 +++++++++++++++++++ src/server/ingen_engine.cpp | 9 +-- src/server/ingen_lv2.cpp | 16 ++--- src/server/wscript | 2 +- src/socket/SocketInterface.cpp | 43 ++----------- src/socket/SocketInterface.hpp | 31 ++------- src/socket/SocketListener.cpp | 6 +- src/socket/ingen_socket_server.cpp | 1 - 17 files changed, 258 insertions(+), 422 deletions(-) delete mode 100644 src/server/EventQueue.cpp delete mode 100644 src/server/EventQueue.hpp delete mode 100644 src/server/EventSink.hpp delete mode 100644 src/server/EventSource.hpp create mode 100644 src/server/PreProcessor.cpp create mode 100644 src/server/PreProcessor.hpp (limited to 'src') diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index 7138d2ea..c2cc7cab 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -36,12 +36,12 @@ #include "Engine.hpp" #include "EngineStore.hpp" #include "Event.hpp" -#include "EventSource.hpp" #include "EventWriter.hpp" #include "MessageContext.hpp" #include "NodeFactory.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" +#include "PreProcessor.hpp" #include "ProcessContext.hpp" #include "ThreadManager.hpp" @@ -60,7 +60,9 @@ Engine::Engine(Ingen::Shared::World* a_world) , _maid(new Raul::Maid(event_queue_size())) , _message_context(new MessageContext(*this)) , _node_factory(new NodeFactory(a_world)) + , _pre_processor(new PreProcessor()) , _post_processor(new PostProcessor(*this)) + , _event_writer(new EventWriter(*this)) , _quit_flag(false) { if (a_world->store()) { @@ -122,15 +124,6 @@ Engine::main_iteration() return !_quit_flag; } -void -Engine::add_event_source(SharedPtr source) -{ - // FIXME: Not thread-safe - _maid->manage(source); - source->_next = _event_sources; - _event_sources = source; -} - void Engine::set_driver(SharedPtr driver) { @@ -235,24 +228,24 @@ Engine::deactivate() ThreadManager::single_threaded = true; } +bool +Engine::pending_events() +{ + return !_pre_processor->empty(); +} + +void +Engine::enqueue_event(Event* ev) +{ + ThreadManager::assert_not_thread(THREAD_PROCESS); + _pre_processor->event(ev); +} + void Engine::process_events(ProcessContext& context) { ThreadManager::assert_thread(THREAD_PROCESS); - - SharedPtr src = _event_sources; - SharedPtr prev = src; - for (; src; src = src->_next) { - if (!src->process(*_post_processor, context)) { - // Source is finished, remove - if (src == _event_sources) { - _event_sources = src->_next; - } else { - prev->_next = src->_next; - } - } - prev = src; - } + _pre_processor->process(*_post_processor, context); } void diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 5f218078..042264e4 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -17,8 +17,6 @@ #ifndef INGEN_ENGINE_ENGINE_HPP #define INGEN_ENGINE_ENGINE_HPP -#include - #include #include "ingen/EngineBase.hpp" @@ -38,10 +36,12 @@ class ClientBroadcaster; class ControlBindings; class Driver; class EngineStore; -class EventSource; +class Event; +class EventWriter; class MessageContext; class NodeFactory; class PostProcessor; +class PreProcessor; class ProcessContext; /** @@ -69,11 +69,14 @@ public: virtual bool unregister_client(const Raul::URI& uri); void set_driver(SharedPtr driver); - void add_event_source(SharedPtr source); + + bool pending_events(); + void enqueue_event(Event* ev); void process_events(ProcessContext& context); Ingen::Shared::World* world() const { return _world; } + EventWriter* interface() const { return _event_writer; } ClientBroadcaster* broadcaster() const { return _broadcaster; } BufferFactory* buffer_factory() const { return _buffer_factory; } ControlBindings* control_bindings() const { return _control_bindings; } @@ -97,9 +100,9 @@ private: Raul::Maid* _maid; MessageContext* _message_context; NodeFactory* _node_factory; + PreProcessor* _pre_processor; PostProcessor* _post_processor; - - SharedPtr _event_sources; ///< Intrusive linked list + EventWriter* _event_writer; bool _quit_flag; }; diff --git a/src/server/EventQueue.cpp b/src/server/EventQueue.cpp deleted file mode 100644 index 585eaa34..00000000 --- a/src/server/EventQueue.cpp +++ /dev/null @@ -1,129 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#include "Event.hpp" -#include "EventQueue.hpp" -#include "PostProcessor.hpp" -#include "ProcessContext.hpp" -#include "ThreadManager.hpp" - -using namespace std; - -namespace Ingen { -namespace Server { - -EventQueue::EventQueue() -{ - Thread::set_context(THREAD_PRE_PROCESS); - set_name("EventQueue"); - start(); -} - -EventQueue::~EventQueue() -{ - stop(); -} - -/** Push an unprepared event onto the queue. - */ -void -EventQueue::event(Event* const ev) -{ - assert(!ev->is_prepared()); - assert(!ev->next()); - - Event* const head = _head.get(); - Event* const tail = _tail.get(); - - if (!head) { - _head = ev; - _tail = ev; - } else { - _tail = ev; - tail->next(ev); - } - - if (!_prepared_back.get()) { - _prepared_back = ev; - } - - whip(); -} - -/** Process all events for a cycle. - * - * Executed events will be pushed to @a dest. - */ -bool -EventQueue::process(PostProcessor& dest, ProcessContext& context, bool limit) -{ - ThreadManager::assert_thread(THREAD_PROCESS); - - if (!_head.get()) - return true; - - /* Limit the maximum number of queued events to process per cycle. This - makes the process callback (more) realtime-safe by preventing being - choked by events coming in faster than they can be processed. - FIXME: test this and figure out a good value - */ - const size_t MAX_QUEUED_EVENTS = context.nframes() / 32; - - size_t num_events_processed = 0; - - Event* ev = _head.get(); - Event* last = ev; - - while (ev && ev->is_prepared() && ev->time() < context.end()) { - ev->execute(context); - last = ev; - ev = (Event*)ev->next(); - ++num_events_processed; - if (limit && (num_events_processed > MAX_QUEUED_EVENTS)) - break; - } - - if (num_events_processed > 0) { - Event* next = (Event*)last->next(); - last->next(NULL); - assert(!last->next()); - dest.append(_head.get(), last); - _head = next; - if (!next) - _tail = NULL; - } - - return true; -} - -/** Pre-process a single event */ -void -EventQueue::_whipped() -{ - Event* ev = _prepared_back.get(); - if (!ev) - return; - - assert(!ev->is_prepared()); - ev->pre_process(); - assert(ev->is_prepared()); - - _prepared_back = (Event*)ev->next(); -} - -} // namespace Server -} // namespace Ingen - diff --git a/src/server/EventQueue.hpp b/src/server/EventQueue.hpp deleted file mode 100644 index edd6c6b4..00000000 --- a/src/server/EventQueue.hpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_EVENTQUEUE_HPP -#define INGEN_ENGINE_EVENTQUEUE_HPP - -#include "raul/AtomicPtr.hpp" -#include "raul/Slave.hpp" - -#include "EventSink.hpp" -#include "EventSource.hpp" - -namespace Ingen { -namespace Server { - -class Event; -class PostProcessor; -class ProcessContext; - -/** An EventSource which prepares events in its own thread. - */ -class EventQueue : public EventSource - , public EventSink - , public Raul::Slave -{ -public: - explicit EventQueue(); - virtual ~EventQueue(); - - bool process(PostProcessor& dest, ProcessContext& context, bool limit=true); - - inline bool unprepared_events() const { return _prepared_back.get(); } - inline bool empty() const { return !_head.get(); } - -protected: - void event(Event* ev); - - virtual void _whipped(); ///< Prepare 1 event - -private: - Raul::AtomicPtr _head; - Raul::AtomicPtr _prepared_back; - Raul::AtomicPtr _tail; -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_EVENTQUEUE_HPP - diff --git a/src/server/EventSink.hpp b/src/server/EventSink.hpp deleted file mode 100644 index e24fa5e9..00000000 --- a/src/server/EventSink.hpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_EVENTSINK_HPP -#define INGEN_ENGINE_EVENTSINK_HPP - -namespace Ingen { -namespace Server { - -class Event; - -class EventSink -{ -public: - virtual ~EventSink() {} - - virtual void event(Event* ev) = 0; -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_EVENTSINK_HPP - diff --git a/src/server/EventSource.hpp b/src/server/EventSource.hpp deleted file mode 100644 index 320b6b7a..00000000 --- a/src/server/EventSource.hpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - This file is part of Ingen. - Copyright 2007-2012 David Robillard - - Ingen is free software: you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free - Software Foundation, either version 3 of the License, or any later version. - - Ingen is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. - - You should have received a copy of the GNU Affero General Public License - along with Ingen. If not, see . -*/ - -#ifndef INGEN_ENGINE_EVENTSOURCE_HPP -#define INGEN_ENGINE_EVENTSOURCE_HPP - -#include -#include - -namespace Ingen { -namespace Server { - -class Event; -class PostProcessor; -class ProcessContext; - -/** Source for events to run in the audio thread. - * - * The Driver gets events from an EventQueue in the process callback and - * executes them, then they are sent to the PostProcessor and finalised - * (post-processing thread). - */ -class EventSource : public Raul::Deletable -{ -public: - EventSource() {} - virtual ~EventSource() {} - - /** Process events for a cycle. - * @return False iff this source is finished and should be removed. - */ - virtual bool process(PostProcessor& dest, - ProcessContext& context, - bool limit = true) = 0; - -private: - friend class Engine; - SharedPtr _next; ///< Intrusive linked list for Engine -}; - -} // namespace Server -} // namespace Ingen - -#endif // INGEN_ENGINE_EVENTSOURCE_HPP - diff --git a/src/server/EventWriter.cpp b/src/server/EventWriter.cpp index 4725835b..ca8ad196 100644 --- a/src/server/EventWriter.cpp +++ b/src/server/EventWriter.cpp @@ -23,7 +23,6 @@ #include "ClientBroadcaster.hpp" #include "Driver.hpp" #include "Engine.hpp" -#include "EventQueue.hpp" #include "EventWriter.hpp" #include "events.hpp" @@ -35,9 +34,8 @@ using namespace Raul; namespace Ingen { namespace Server { -EventWriter::EventWriter(Engine& engine, EventSink& sink) +EventWriter::EventWriter(Engine& engine) : _request_client(NULL) - , _sink(sink) , _request_id(-1) , _engine(engine) , _in_bundle(false) @@ -92,7 +90,7 @@ EventWriter::put(const URI& uri, const Resource::Properties& properties, const Resource::Graph ctx) { - _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); + _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), true, ctx, uri, properties)); } void @@ -100,14 +98,14 @@ EventWriter::delta(const URI& uri, const Resource::Properties& remove, const Resource::Properties& add) { - _sink.event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); + _engine.enqueue_event(new Events::SetMetadata(_engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } void EventWriter::move(const Path& old_path, const Path& new_path) { - _sink.event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); + _engine.enqueue_event(new Events::Move(_engine, _request_client, _request_id, now(), old_path, new_path)); } void @@ -119,7 +117,7 @@ EventWriter::del(const URI& uri) } _engine.quit(); } else { - _sink.event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event(new Events::Delete(_engine, _request_client, _request_id, now(), uri)); } } @@ -127,7 +125,7 @@ void EventWriter::connect(const Path& tail_path, const Path& head_path) { - _sink.event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); + _engine.enqueue_event(new Events::Connect(_engine, _request_client, _request_id, now(), tail_path, head_path)); } @@ -141,7 +139,7 @@ EventWriter::disconnect(const Path& src, return; } - _sink.event(new Events::Disconnect(_engine, _request_client, _request_id, now(), + _engine.enqueue_event(new Events::Disconnect(_engine, _request_client, _request_id, now(), Path(src.str()), Path(dst.str()))); } @@ -149,7 +147,7 @@ void EventWriter::disconnect_all(const Path& patch_path, const Path& path) { - _sink.event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); + _engine.enqueue_event(new Events::DisconnectAll(_engine, _request_client, _request_id, now(), patch_path, path)); } void @@ -161,16 +159,16 @@ EventWriter::set_property(const URI& uri, && value.type() == _engine.world()->forge().Bool) { if (value.get_bool()) { _engine.activate(); - _sink.event(new Events::Ping(_engine, _request_client, _request_id, now())); + _engine.enqueue_event(new Events::Ping(_engine, _request_client, _request_id, now())); } else { - _sink.event(new Events::Deactivate(_engine, _request_client, _request_id, now())); + _engine.enqueue_event(new Events::Deactivate(_engine, _request_client, _request_id, now())); } } else { Resource::Properties remove; remove.insert(make_pair(predicate, _engine.world()->uris()->wildcard)); Resource::Properties add; add.insert(make_pair(predicate, value)); - _sink.event(new Events::SetMetadata( + _engine.enqueue_event(new Events::SetMetadata( _engine, _request_client, _request_id, now(), false, Resource::DEFAULT, uri, add, remove)); } @@ -181,7 +179,7 @@ EventWriter::set_property(const URI& uri, void EventWriter::get(const URI& uri) { - _sink.event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); + _engine.enqueue_event(new Events::Get(_engine, _request_client, _request_id, now(), uri)); } } // namespace Server diff --git a/src/server/EventWriter.hpp b/src/server/EventWriter.hpp index 771d22f5..1c5d11db 100644 --- a/src/server/EventWriter.hpp +++ b/src/server/EventWriter.hpp @@ -31,17 +31,13 @@ namespace Ingen { namespace Server { class Engine; -class EventSink; -/** An Interface that creates and writes Events to an EventSink. - * - * This is where Interface calls get turned into Events which are actually - * processed by the engine to do things. +/** An Interface that creates and enqueues Events for the Engine to execute. */ class EventWriter : public Interface { public: - explicit EventWriter(Engine& engine, EventSink& sink); + explicit EventWriter(Engine& engine); virtual ~EventWriter(); Raul::URI uri() const { return "http://drobilla.net/ns/ingen#internal"; } @@ -87,7 +83,6 @@ public: protected: Interface* _request_client; - EventSink& _sink; int32_t _request_id; Engine& _engine; bool _in_bundle; ///< True iff a bundle is currently being received @@ -99,4 +94,4 @@ private: } // namespace Server } // namespace Ingen -#endif // INGEN_ENGINE_QUEUEDENGINEINTERFACE_HPP +#endif // INGEN_ENGINE_EVENTWRITER_HPP diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp new file mode 100644 index 00000000..a88b74e5 --- /dev/null +++ b/src/server/PreProcessor.cpp @@ -0,0 +1,125 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#include "Event.hpp" +#include "PostProcessor.hpp" +#include "PreProcessor.hpp" +#include "ProcessContext.hpp" +#include "ThreadManager.hpp" + +using namespace std; + +namespace Ingen { +namespace Server { + +PreProcessor::PreProcessor() +{ + Thread::set_context(THREAD_PRE_PROCESS); + set_name("PreProcessor"); + start(); +} + +PreProcessor::~PreProcessor() +{ + stop(); +} + +void +PreProcessor::event(Event* const ev) +{ + // TODO: Probably possible to make this lock-free with CAS + Glib::Mutex::Lock lock(_mutex); + + assert(!ev->is_prepared()); + assert(!ev->next()); + + Event* const head = _head.get(); + Event* const tail = _tail.get(); + + if (!head) { + _head = ev; + _tail = ev; + } else { + _tail = ev; + tail->next(ev); + } + + if (!_prepared_back.get()) { + _prepared_back = ev; + } + + whip(); +} + +bool +PreProcessor::process(PostProcessor& dest, ProcessContext& context, bool limit) +{ + ThreadManager::assert_thread(THREAD_PROCESS); + + if (!_head.get()) + return true; + + /* Limit the maximum number of queued events to process per cycle. This + makes the process callback (more) realtime-safe by preventing being + choked by events coming in faster than they can be processed. + FIXME: test this and figure out a good value + */ + const size_t MAX_QUEUED_EVENTS = context.nframes() / 32; + + size_t num_events_processed = 0; + + Event* ev = _head.get(); + Event* last = ev; + + while (ev && ev->is_prepared() && ev->time() < context.end()) { + ev->execute(context); + last = ev; + ev = (Event*)ev->next(); + ++num_events_processed; + if (limit && (num_events_processed > MAX_QUEUED_EVENTS)) + break; + } + + if (num_events_processed > 0) { + Event* next = (Event*)last->next(); + last->next(NULL); + assert(!last->next()); + dest.append(_head.get(), last); + _head = next; + if (!next) + _tail = NULL; + } + + return true; +} + +/** Pre-process a single event */ +void +PreProcessor::_whipped() +{ + Event* ev = _prepared_back.get(); + if (!ev) + return; + + assert(!ev->is_prepared()); + ev->pre_process(); + assert(ev->is_prepared()); + + _prepared_back = (Event*)ev->next(); +} + +} // namespace Server +} // namespace Ingen diff --git a/src/server/PreProcessor.hpp b/src/server/PreProcessor.hpp new file mode 100644 index 00000000..4fd1c91e --- /dev/null +++ b/src/server/PreProcessor.hpp @@ -0,0 +1,67 @@ +/* + This file is part of Ingen. + Copyright 2007-2012 David Robillard + + Ingen is free software: you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or any later version. + + Ingen is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for details. + + You should have received a copy of the GNU Affero General Public License + along with Ingen. If not, see . +*/ + +#ifndef INGEN_ENGINE_PREPROCESSOR_HPP +#define INGEN_ENGINE_PREPROCESSOR_HPP + +#include + +#include "raul/Slave.hpp" + +namespace Ingen { +namespace Server { + +class Event; +class PostProcessor; +class ProcessContext; + +class PreProcessor : public Raul::Slave +{ +public: + explicit PreProcessor(); + + ~PreProcessor(); + + /** Return true iff no events are enqueued. */ + inline bool empty() const { return !_head.get(); } + + /** Enqueue an event. + * This is safe to call from any non-realtime thread (it locks). + */ + void event(Event* ev); + + /** Process events for a cycle. + * @return False iff this source is finished and should be removed. + */ + bool process(PostProcessor& dest, + ProcessContext& context, + bool limit = true); + +protected: + virtual void _whipped(); ///< Prepare 1 event + +private: + Glib::Mutex _mutex; + Raul::AtomicPtr _head; + Raul::AtomicPtr _prepared_back; + Raul::AtomicPtr _tail; +}; + +} // namespace Server +} // namespace Ingen + +#endif // INGEN_ENGINE_PREPROCESSOR_HPP + diff --git a/src/server/ingen_engine.cpp b/src/server/ingen_engine.cpp index 28659ec6..917e83cf 100644 --- a/src/server/ingen_engine.cpp +++ b/src/server/ingen_engine.cpp @@ -18,7 +18,6 @@ #include "ingen/shared/World.hpp" #include "Engine.hpp" #include "EventWriter.hpp" -#include "EventQueue.hpp" #include "util.hpp" using namespace Ingen; @@ -28,11 +27,9 @@ struct IngenEngineModule : public Ingen::Shared::Module { Server::set_denormal_flags(); SharedPtr engine(new Server::Engine(world)); world->set_local_engine(engine); - SharedPtr queue(new Server::EventQueue()); - SharedPtr interface( - new Server::EventWriter(*engine.get(), *queue.get())); - world->set_engine(interface); - engine->add_event_source(queue); + if (!world->engine()) { + world->set_engine(SharedPtr(engine->interface(), NullDeleter)); + } assert(world->local_engine() == engine); } }; diff --git a/src/server/ingen_lv2.cpp b/src/server/ingen_lv2.cpp index 94e7344d..731c2f6f 100644 --- a/src/server/ingen_lv2.cpp +++ b/src/server/ingen_lv2.cpp @@ -45,7 +45,6 @@ #include "AudioBuffer.hpp" #include "Driver.hpp" #include "Engine.hpp" -#include "EventQueue.hpp" #include "EventWriter.hpp" #include "PatchImpl.hpp" #include "PostProcessor.hpp" @@ -426,12 +425,10 @@ ingen_instantiate(const LV2_Descriptor* descriptor, plugin->main = new MainThread(engine); plugin->main->set_name("Main"); - SharedPtr queue(new Server::EventQueue()); - SharedPtr interface( - new Server::EventWriter(*engine.get(), *queue.get())); + SharedPtr interface = + SharedPtr(engine->interface(), NullDeleter); plugin->world->set_engine(interface); - engine->add_event_source(queue); Raul::Thread::get().set_context(Server::THREAD_PRE_PROCESS); Server::ThreadManager::single_threaded = true; @@ -451,23 +448,20 @@ ingen_instantiate(const LV2_Descriptor* descriptor, context.locate(0, UINT_MAX, 0); engine->post_processor()->set_end_time(UINT_MAX); - - queue->process(*engine->post_processor(), context, false); + engine->process_events(context); engine->post_processor()->process(); plugin->world->parser()->parse_file(plugin->world, plugin->world->engine().get(), patch->filename); - while (!queue->empty()) { - queue->process(*engine->post_processor(), context, false); + while (engine->pending_events()) { + engine->process_events(context); engine->post_processor()->process(); } engine->deactivate(); - //plugin->world->load_module("osc_server"); - return (LV2_Handle)plugin; } diff --git a/src/server/wscript b/src/server/wscript index 823583ba..a043c0fa 100644 --- a/src/server/wscript +++ b/src/server/wscript @@ -13,7 +13,6 @@ def build(bld): Engine.cpp EngineStore.cpp Event.cpp - EventQueue.cpp EventWriter.cpp GraphObjectImpl.cpp InputPort.cpp @@ -31,6 +30,7 @@ def build(bld): PluginImpl.cpp PortImpl.cpp PostProcessor.cpp + PreProcessor.cpp ProcessContext.cpp ProcessSlave.cpp events/Connect.cpp diff --git a/src/socket/SocketInterface.cpp b/src/socket/SocketInterface.cpp index 430f0962..8b9aab66 100644 --- a/src/socket/SocketInterface.cpp +++ b/src/socket/SocketInterface.cpp @@ -24,21 +24,18 @@ #include "sratom/sratom.h" #include "SocketInterface.hpp" -#include "../server/Event.hpp" -#include "../server/PostProcessor.hpp" -#include "../server/ThreadManager.hpp" - #define LOG(s) s << "[SocketInterface] " namespace Ingen { namespace Socket { -SocketInterface::SocketInterface(Ingen::Shared::World& world, int conn) +SocketInterface::SocketInterface(Ingen::Shared::World& world, + Interface& iface, + int conn) : _world(world) - , _iface(*(Server::Engine*)world.local_engine().get(), *this) + , _iface(iface) , _inserter(NULL) , _msg_node(NULL) - , _event(NULL) , _conn(conn) { set_name("SocketInterface"); @@ -47,38 +44,11 @@ SocketInterface::SocketInterface(Ingen::Shared::World& world, int conn) SocketInterface::~SocketInterface() { - std::cerr << "SOCKET INTERFACE EXITING" << std::endl; stop(); join(); close(_conn); } -void -SocketInterface::event(Server::Event* ev) -{ - if (_event) { - std::cerr << "DUAL EVENTS" << std::endl; - return; - } - assert(!_event); - ev->pre_process(); - _event = ev; - _event->next(NULL); -} - -bool -SocketInterface::process(Server::PostProcessor& dest, - Server::ProcessContext& context, - bool limit) -{ - if (_event) { - _event->execute(context); - dest.append(_event, _event); - _event = NULL; - } - return (_conn != -1); -} - SerdStatus SocketInterface::set_base_uri(SocketInterface* iface, const SerdNode* uri_node) @@ -118,8 +88,6 @@ SocketInterface::write_statement(SocketInterface* iface, void SocketInterface::_run() { - Thread::set_context(Server::THREAD_PRE_PROCESS); - Sord::World* world = _world.rdf_world(); LV2_URID_Map* map = &_world.lv2_uri_map()->urid_map_feature()->urid_map; @@ -206,14 +174,13 @@ SocketInterface::_run() } fclose(f); - sord_inserter_free(_inserter); serd_reader_end_stream(reader); sratom_free(sratom); serd_reader_free(reader); sord_free(model); - _conn = -1; + delete this; } } // namespace Ingen diff --git a/src/socket/SocketInterface.hpp b/src/socket/SocketInterface.hpp index b8d339f8..5e4daaf2 100644 --- a/src/socket/SocketInterface.hpp +++ b/src/socket/SocketInterface.hpp @@ -14,17 +14,10 @@ along with Ingen. If not, see . */ -#include - #include "ingen/Interface.hpp" -#include "raul/SharedPtr.hpp" #include "raul/Thread.hpp" #include "sord/sord.h" -#include "../server/EventSink.hpp" -#include "../server/EventSource.hpp" -#include "../server/EventWriter.hpp" - namespace Ingen { namespace Shared { class World; } @@ -32,18 +25,9 @@ namespace Shared { class World; } namespace Socket { class SocketInterface : public Raul::Thread - , public Server::EventSink - , public Server::EventSource { public: - SocketInterface(Shared::World& world, int conn); - - void event(Server::Event* ev); - - bool process(Server::PostProcessor& dest, - Server::ProcessContext& context, - bool limit = true); - + SocketInterface(Shared::World& world, Interface& iface, int conn); ~SocketInterface(); SordInserter* inserter() { return _inserter; } @@ -67,13 +51,12 @@ private: const SerdNode* object_datatype, const SerdNode* object_lang); - Shared::World& _world; - Server::EventWriter _iface; - SerdEnv* _env; - SordInserter* _inserter; - SordNode* _msg_node; - Server::Event* _event; - int _conn; + Shared::World& _world; + Interface& _iface; + SerdEnv* _env; + SordInserter* _inserter; + SordNode* _msg_node; + int _conn; }; } // namespace Ingen diff --git a/src/socket/SocketListener.cpp b/src/socket/SocketListener.cpp index c2ff1c10..aeb50c5f 100644 --- a/src/socket/SocketListener.cpp +++ b/src/socket/SocketListener.cpp @@ -25,6 +25,7 @@ #include "sratom/sratom.h" #include "../server/Engine.hpp" +#include "../server/EventWriter.hpp" #include "SocketListener.hpp" #include "SocketInterface.hpp" @@ -36,6 +37,8 @@ namespace Socket { SocketListener::SocketListener(Ingen::Shared::World& world) : _world(world) { + set_name("SocketListener"); + // Create server socket _sock = socket(AF_UNIX, SOCK_STREAM, 0); if (_sock == -1) { @@ -91,8 +94,7 @@ SocketListener::_run() // Make an new interface/thread to handle the connection Server::Engine* engine = (Server::Engine*)_world.local_engine().get(); - SharedPtr iface(new SocketInterface(_world, conn)); - engine->add_event_source(iface); + new SocketInterface(_world, *engine->interface(), conn); } } diff --git a/src/socket/ingen_socket_server.cpp b/src/socket/ingen_socket_server.cpp index 71b8ad64..5bf679bb 100644 --- a/src/socket/ingen_socket_server.cpp +++ b/src/socket/ingen_socket_server.cpp @@ -21,7 +21,6 @@ #include "../server/Engine.hpp" #include "../server/EventWriter.hpp" -#include "../server/EventQueue.hpp" #include "SocketListener.hpp" -- cgit v1.2.1