From a7f83c70733288ee9efee5c08330fbdf638446db Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sun, 18 Sep 2016 21:29:30 -0400 Subject: Atomic bundle execution --- src/Configuration.cpp | 1 + src/server/Engine.cpp | 1 + src/server/Engine.hpp | 2 ++ src/server/Event.hpp | 6 +++++ src/server/PreProcessor.cpp | 54 ++++++++++++++++++++++++++++++++++++++++----- src/server/UndoStack.cpp | 10 +++++---- src/server/UndoStack.hpp | 4 ++-- src/server/events/Mark.cpp | 27 +++++++++++++++++++++-- src/server/events/Mark.hpp | 3 +++ 9 files changed, 95 insertions(+), 13 deletions(-) diff --git a/src/Configuration.cpp b/src/Configuration.cpp index 37a5c59c..dfb52781 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -47,6 +47,7 @@ Configuration::Configuration(Forge& forge) " ingen -eg foo.ingen # Run engine and GUI and load a graph") , _max_name_length(0) { + add("atomicBundles", "atomic-bundles", 'a', "Execute bundles atomically", SESSION, forge.Bool, forge.make(false)); add("clientPort", "client-port", 'C', "Client port", SESSION, forge.Int, Atom()); add("connect", "connect", 'c', "Connect to engine URI", SESSION, forge.String, forge.alloc("unix:///tmp/ingen.sock")); add("engine", "engine", 'e', "Run (JACK) engine", SESSION, forge.Bool, forge.make(false)); diff --git a/src/server/Engine.cpp b/src/server/Engine.cpp index c7ad5c09..097b38fc 100644 --- a/src/server/Engine.cpp +++ b/src/server/Engine.cpp @@ -88,6 +88,7 @@ Engine::Engine(Ingen::World* world) , _uniform_dist(0.0f, 1.0f) , _quit_flag(false) , _direct_driver(true) + , _atomic_bundles(world->conf().option("atomic-bundles").get()) { if (!world->store()) { world->set_store(SPtr(new Store())); diff --git a/src/server/Engine.hpp b/src/server/Engine.hpp index 32f284e0..16699293 100644 --- a/src/server/Engine.hpp +++ b/src/server/Engine.hpp @@ -120,6 +120,7 @@ public: SPtr store() const; size_t event_queue_size() const; + bool atomic_bundles() const { return _atomic_bundles; } private: Ingen::World* _world; @@ -150,6 +151,7 @@ private: bool _quit_flag; bool _direct_driver; + bool _atomic_bundles; }; } // namespace Server diff --git a/src/server/Event.hpp b/src/server/Event.hpp index 11bffeb3..8ed25c0f 100644 --- a/src/server/Event.hpp +++ b/src/server/Event.hpp @@ -56,6 +56,9 @@ public: /** Event mode to distinguish normal events from undo events. */ enum class Mode { NORMAL, UNDO, REDO }; + /** Execution mode for events that block and unblock preprocessing. */ + enum class Execution { NORMAL, BLOCK, UNBLOCK }; + /** Pre-process event before execution (non-realtime). */ virtual bool pre_process() = 0; @@ -86,6 +89,9 @@ public: /** Return the status (success or error code) of this event. */ Status status() const { return _status; } + /** Return the blocking behaviour of this event (after pre_process()) */ + virtual Execution get_execution() const { return Execution::NORMAL; } + /** Return undo mode of this event. */ Mode get_mode() const { return _mode; } diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index e3e5475f..2ea4ac30 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -38,6 +38,7 @@ PreProcessor::PreProcessor(Engine& engine) , _head(NULL) , _prepared_back(NULL) , _tail(NULL) + , _block_state(BlockState::UNBLOCKED) , _exit_flag(false) , _thread(&PreProcessor::run, this) {} @@ -87,16 +88,44 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) size_t n_processed = 0; Event* ev = head; Event* last = ev; - while (ev && ev->is_prepared() && ev->time() < context.end()) { - if (ev->time() < context.start()) { - // Didn't get around to executing in time, oh well... - ev->set_time(context.start()); + while (ev && ev->is_prepared()) { + switch (_block_state) { + case BlockState::UNBLOCKED: + break; + case BlockState::PRE_BLOCKED: + if (ev->get_execution() == Event::Execution::BLOCK) { + _block_state = BlockState::BLOCKED; + } + break; + case BlockState::BLOCKED: + break; + case BlockState::PRE_UNBLOCKED: + assert(ev->get_execution() == Event::Execution::BLOCK); + _block_state = BlockState::PROCESSING; + break; + case BlockState::PROCESSING: + if (ev->get_execution() == Event::Execution::UNBLOCK) { + _block_state = BlockState::UNBLOCKED; + } + } + + if (_block_state == BlockState::BLOCKED) { + break; // Waiting for PRE_UNBLOCKED + } else if (ev->time() < context.start()) { + ev->set_time(context.start()); // Too late, nudge to context start + } else if (_block_state != BlockState::PROCESSING && + ev->time() >= context.end()) { + break; // Event is for a future cycle } + + // Execute event ev->execute(context); last = ev; ev = ev->next(); ++n_processed; - if (limit && n_processed >= limit) { + + if (_block_state != BlockState::PROCESSING && + limit && n_processed >= limit) { break; } } @@ -158,6 +187,21 @@ PreProcessor::run() } assert(ev->is_prepared()); + switch (ev->get_execution()) { + case Event::Execution::NORMAL: + break; + case Event::Execution::BLOCK: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + break; + case Event::Execution::UNBLOCK: + while (_block_state != BlockState::BLOCKED) { + // Wait for process thread to reach blocking event + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + _block_state = BlockState::PRE_UNBLOCKED; + } + _prepared_back = (Event*)ev->next(); } } diff --git a/src/server/UndoStack.cpp b/src/server/UndoStack.cpp index 77d26871..87391bc7 100644 --- a/src/server/UndoStack.cpp +++ b/src/server/UndoStack.cpp @@ -32,7 +32,7 @@ namespace Ingen { namespace Server { -void +int UndoStack::start_entry() { if (_depth == 0) { @@ -40,7 +40,7 @@ UndoStack::start_entry() time(&now); _stack.push_back(Entry(now)); } - ++_depth; + return ++_depth; } bool @@ -80,11 +80,11 @@ UndoStack::ignore_later_event(const LV2_Atom* first, return false; } -void +int UndoStack::finish_entry() { if (--_depth > 0) { - return; + return _depth; } else if (_stack.back().events.empty()) { // Disregard empty entry _stack.pop_back(); @@ -98,6 +98,8 @@ UndoStack::finish_entry() } } } + + return _depth; } UndoStack::Entry diff --git a/src/server/UndoStack.hpp b/src/server/UndoStack.hpp index 4511d70d..d8d60e84 100644 --- a/src/server/UndoStack.hpp +++ b/src/server/UndoStack.hpp @@ -77,9 +77,9 @@ public: UndoStack(URIs& uris, URIMap& map) : _uris(uris), _map(map), _depth(0) {} - void start_entry(); + int start_entry(); bool write(const LV2_Atom* msg); - void finish_entry(); + int finish_entry(); bool empty() const { return _stack.empty(); } Entry pop(); diff --git a/src/server/events/Mark.cpp b/src/server/events/Mark.cpp index 32648649..11690487 100644 --- a/src/server/events/Mark.cpp +++ b/src/server/events/Mark.cpp @@ -29,6 +29,7 @@ Mark::Mark(Engine& engine, Type type) : Event(engine, client, id, timestamp) , _type(type) + , _depth(0) {} bool @@ -40,10 +41,10 @@ Mark::pre_process() switch (_type) { case Type::BUNDLE_START: - stack->start_entry(); + _depth = stack->start_entry(); break; case Type::BUNDLE_END: - stack->finish_entry(); + _depth = stack->finish_entry(); break; } @@ -60,6 +61,28 @@ Mark::post_process() respond(); } +Event::Execution +Mark::get_execution() const +{ + if (!_engine.atomic_bundles()) { + return Execution::NORMAL; + } + + switch (_type) { + case Type::BUNDLE_START: + if (_depth == 1) { + return Execution::BLOCK; + } + break; + case Type::BUNDLE_END: + if (_depth == 0) { + return Execution::UNBLOCK; + } + break; + } + return Execution::NORMAL; +} + } // namespace Events } // namespace Server } // namespace Ingen diff --git a/src/server/events/Mark.hpp b/src/server/events/Mark.hpp index db5550a0..a68e3b3b 100644 --- a/src/server/events/Mark.hpp +++ b/src/server/events/Mark.hpp @@ -48,8 +48,11 @@ public: void execute(RunContext& context); void post_process(); + Execution get_execution() const override; + private: Type _type; + int _depth; }; } // namespace Events -- cgit v1.2.1