diff options
author | David Robillard <d@drobilla.net> | 2017-01-18 15:28:04 -0500 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2017-01-18 15:28:04 -0500 |
commit | 02ae3e9d8bf3f6a5e844706721aad8c0ac9f4340 (patch) | |
tree | 5bbfb3eba51024529e3970103bfb0065b26bc8ec | |
parent | 4e1c349bd3687b866597afda56dc5f1b0c4be4ef (diff) | |
download | ingen-02ae3e9d8bf3f6a5e844706721aad8c0ac9f4340.tar.gz ingen-02ae3e9d8bf3f6a5e844706721aad8c0ac9f4340.tar.bz2 ingen-02ae3e9d8bf3f6a5e844706721aad8c0ac9f4340.zip |
Fix invalid cross-thread use of mutex
Instead of abusing store mutex for this purpose, extend blocking mechanism of
the PreProcessor (designed for atomic bundle execution) to support execution of
individual atomic events which must be executed before the next event can be
pre-processed.
-rw-r--r-- | src/server/Event.hpp | 7 | ||||
-rw-r--r-- | src/server/PreProcessor.cpp | 18 | ||||
-rw-r--r-- | src/server/events/Delta.cpp | 29 | ||||
-rw-r--r-- | src/server/events/Delta.hpp | 4 |
4 files changed, 38 insertions, 20 deletions
diff --git a/src/server/Event.hpp b/src/server/Event.hpp index 203e5d1d..a0adb564 100644 --- a/src/server/Event.hpp +++ b/src/server/Event.hpp @@ -58,7 +58,12 @@ public: enum class Mode { NORMAL, UNDO, REDO }; /** Execution mode for events that block and unblock preprocessing. */ - enum class Execution { NORMAL, BLOCK, UNBLOCK }; + enum class Execution { + NORMAL, ///< Normal pipelined execution + ATOMIC, ///< Block pre-processing until this event is executed + BLOCK, ///< Begin atomic block of events + UNBLOCK ///< Finish atomic executed block of events + }; /** Pre-process event before execution (non-realtime). */ virtual bool pre_process(PreProcessContext& ctx) = 0; diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index 07c51fee..b5153436 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -117,9 +117,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) // Execute event ev->execute(context); + ++n_processed; + + // Unblock pre-processing if this is a non-bundled atomic event + if (ev->get_execution() == Event::Execution::ATOMIC && + _block_state.load() == BlockState::PRE_BLOCKED) { + _block_state = BlockState::UNBLOCKED; + } + + // Move to next event last = ev; ev = ev->next(); - ++n_processed; if (_block_state != BlockState::PROCESSING && limit && n_processed >= limit) { @@ -209,6 +217,14 @@ PreProcessor::run() switch (ev->get_execution()) { case Event::Execution::NORMAL: break; + case Event::Execution::ATOMIC: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + while (_block_state != BlockState::UNBLOCKED) { + // Wait for process thread to execute event + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + break; case Event::Execution::BLOCK: assert(_block_state == BlockState::UNBLOCKED); _block_state = BlockState::PRE_BLOCKED; diff --git a/src/server/events/Delta.cpp b/src/server/events/Delta.cpp index c623d51e..3ee17ee2 100644 --- a/src/server/events/Delta.cpp +++ b/src/server/events/Delta.cpp @@ -65,7 +65,7 @@ Delta::Delta(Engine& engine, , _state(NULL) , _context(context) , _type(type) - , _poly_lock(engine.store()->mutex(), std::defer_lock) + , _block(false) { if (context != Resource::Graph::DEFAULT) { for (auto& p : _properties) { @@ -136,7 +136,6 @@ Delta::pre_process(PreProcessContext& ctx) const bool is_client = (_subject == "ingen:/clients/this"); const bool is_engine = (_subject == "ingen:/"); const bool is_file = (_subject.substr(0, 5) == "file:"); - bool poly_changed = false; if (_type == Type::PUT && is_file) { // Ensure type is Preset, the only supported file put @@ -175,8 +174,7 @@ Delta::pre_process(PreProcessContext& ctx) } } - // Take a writer lock while we modify the store - std::unique_lock<Store::Mutex> lock(_engine.store()->mutex()); + std::lock_guard<Store::Mutex> lock(_engine.store()->mutex()); _object = is_graph_object ? static_cast<Ingen::Resource*>(_engine.store()->get(Node::uri_to_path(_subject))) @@ -362,8 +360,8 @@ Delta::pre_process(PreProcessContext& ctx) if (value.get<int32_t>() < 1 || value.get<int32_t>() > 128) { _status = Status::INVALID_POLY; } else { - poly_changed = true; - op = SpecialType::POLYPHONY; + _block = true; + op = SpecialType::POLYPHONY; _graph->prepare_internal_poly( *_engine.buffer_factory(), value.get<int32_t>()); } @@ -380,8 +378,8 @@ Delta::pre_process(PreProcessContext& ctx) } else if (value.type() != uris.forge.Bool) { _status = Status::BAD_VALUE_TYPE; } else { - poly_changed = true; - op = SpecialType::POLYPHONIC; + _block = true; + op = SpecialType::POLYPHONIC; obj->set_property(key, value, value.context()); BlockImpl* block = dynamic_cast<BlockImpl*>(obj); if (block) { @@ -427,11 +425,6 @@ Delta::pre_process(PreProcessContext& ctx) s->pre_process(ctx); } - if (poly_changed) { - lock.unlock(); - _poly_lock.lock(); - } - return Event::pre_process_done( _status == Status::NOT_PREPARED ? Status::SUCCESS : _status, _subject); @@ -536,10 +529,6 @@ Delta::execute(RunContext& context) void Delta::post_process() { - if (_poly_lock.owns_lock()) { - _poly_lock.unlock(); - } - if (_state) { BlockImpl* block = dynamic_cast<BlockImpl*>(_object); if (block) { @@ -620,6 +609,12 @@ Delta::undo(Interface& target) } } +Event::Execution +Delta::get_execution() const +{ + return _block ? Execution::ATOMIC : Execution::NORMAL; +} + } // namespace Events } // namespace Server } // namespace Ingen diff --git a/src/server/events/Delta.hpp b/src/server/events/Delta.hpp index ca9d0276..d2326cd9 100644 --- a/src/server/events/Delta.hpp +++ b/src/server/events/Delta.hpp @@ -78,6 +78,8 @@ public: void post_process(); void undo(Interface& target); + Execution get_execution() const; + private: enum class SpecialType { NONE, @@ -115,7 +117,7 @@ private: boost::optional<Resource> _preset; - std::unique_lock<Store::Mutex> _poly_lock; ///< Long-term lock for poly changes + bool _block; }; } // namespace Events |