diff options
-rw-r--r-- | src/server/Event.hpp | 7 | ||||
-rw-r--r-- | src/server/PreProcessor.cpp | 18 | ||||
-rw-r--r-- | src/server/events/Delta.cpp | 29 | ||||
-rw-r--r-- | src/server/events/Delta.hpp | 4 |
4 files changed, 38 insertions, 20 deletions
diff --git a/src/server/Event.hpp b/src/server/Event.hpp index 203e5d1d..a0adb564 100644 --- a/src/server/Event.hpp +++ b/src/server/Event.hpp @@ -58,7 +58,12 @@ public: enum class Mode { NORMAL, UNDO, REDO }; /** Execution mode for events that block and unblock preprocessing. */ - enum class Execution { NORMAL, BLOCK, UNBLOCK }; + enum class Execution { + NORMAL, ///< Normal pipelined execution + ATOMIC, ///< Block pre-processing until this event is executed + BLOCK, ///< Begin atomic block of events + UNBLOCK ///< Finish atomic executed block of events + }; /** Pre-process event before execution (non-realtime). */ virtual bool pre_process(PreProcessContext& ctx) = 0; diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index 07c51fee..b5153436 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -117,9 +117,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) // Execute event ev->execute(context); + ++n_processed; + + // Unblock pre-processing if this is a non-bundled atomic event + if (ev->get_execution() == Event::Execution::ATOMIC && + _block_state.load() == BlockState::PRE_BLOCKED) { + _block_state = BlockState::UNBLOCKED; + } + + // Move to next event last = ev; ev = ev->next(); - ++n_processed; if (_block_state != BlockState::PROCESSING && limit && n_processed >= limit) { @@ -209,6 +217,14 @@ PreProcessor::run() switch (ev->get_execution()) { case Event::Execution::NORMAL: break; + case Event::Execution::ATOMIC: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + while (_block_state != BlockState::UNBLOCKED) { + // Wait for process thread to execute event + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + break; case Event::Execution::BLOCK: assert(_block_state == BlockState::UNBLOCKED); _block_state = BlockState::PRE_BLOCKED; diff --git a/src/server/events/Delta.cpp b/src/server/events/Delta.cpp index c623d51e..3ee17ee2 100644 --- a/src/server/events/Delta.cpp +++ b/src/server/events/Delta.cpp @@ -65,7 +65,7 @@ Delta::Delta(Engine& engine, , _state(NULL) , _context(context) , _type(type) - , _poly_lock(engine.store()->mutex(), std::defer_lock) + , _block(false) { if (context != Resource::Graph::DEFAULT) { for (auto& p : _properties) { @@ -136,7 +136,6 @@ Delta::pre_process(PreProcessContext& ctx) const bool is_client = (_subject == "ingen:/clients/this"); const bool is_engine = (_subject == "ingen:/"); const bool is_file = (_subject.substr(0, 5) == "file:"); - bool poly_changed = false; if (_type == Type::PUT && is_file) { // Ensure type is Preset, the only supported file put @@ -175,8 +174,7 @@ Delta::pre_process(PreProcessContext& ctx) } } - // Take a writer lock while we modify the store - std::unique_lock<Store::Mutex> lock(_engine.store()->mutex()); + std::lock_guard<Store::Mutex> lock(_engine.store()->mutex()); _object = is_graph_object ? static_cast<Ingen::Resource*>(_engine.store()->get(Node::uri_to_path(_subject))) @@ -362,8 +360,8 @@ Delta::pre_process(PreProcessContext& ctx) if (value.get<int32_t>() < 1 || value.get<int32_t>() > 128) { _status = Status::INVALID_POLY; } else { - poly_changed = true; - op = SpecialType::POLYPHONY; + _block = true; + op = SpecialType::POLYPHONY; _graph->prepare_internal_poly( *_engine.buffer_factory(), value.get<int32_t>()); } @@ -380,8 +378,8 @@ Delta::pre_process(PreProcessContext& ctx) } else if (value.type() != uris.forge.Bool) { _status = Status::BAD_VALUE_TYPE; } else { - poly_changed = true; - op = SpecialType::POLYPHONIC; + _block = true; + op = SpecialType::POLYPHONIC; obj->set_property(key, value, value.context()); BlockImpl* block = dynamic_cast<BlockImpl*>(obj); if (block) { @@ -427,11 +425,6 @@ Delta::pre_process(PreProcessContext& ctx) s->pre_process(ctx); } - if (poly_changed) { - lock.unlock(); - _poly_lock.lock(); - } - return Event::pre_process_done( _status == Status::NOT_PREPARED ? Status::SUCCESS : _status, _subject); @@ -536,10 +529,6 @@ Delta::execute(RunContext& context) void Delta::post_process() { - if (_poly_lock.owns_lock()) { - _poly_lock.unlock(); - } - if (_state) { BlockImpl* block = dynamic_cast<BlockImpl*>(_object); if (block) { @@ -620,6 +609,12 @@ Delta::undo(Interface& target) } } +Event::Execution +Delta::get_execution() const +{ + return _block ? Execution::ATOMIC : Execution::NORMAL; +} + } // namespace Events } // namespace Server } // namespace Ingen diff --git a/src/server/events/Delta.hpp b/src/server/events/Delta.hpp index ca9d0276..d2326cd9 100644 --- a/src/server/events/Delta.hpp +++ b/src/server/events/Delta.hpp @@ -78,6 +78,8 @@ public: void post_process(); void undo(Interface& target); + Execution get_execution() const; + private: enum class SpecialType { NONE, @@ -115,7 +117,7 @@ private: boost::optional<Resource> _preset; - std::unique_lock<Store::Mutex> _poly_lock; ///< Long-term lock for poly changes + bool _block; }; } // namespace Events |