summaryrefslogtreecommitdiffstats
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/Event.hpp7
-rw-r--r--src/server/PreProcessor.cpp18
-rw-r--r--src/server/events/Delta.cpp29
-rw-r--r--src/server/events/Delta.hpp4
4 files changed, 38 insertions, 20 deletions
diff --git a/src/server/Event.hpp b/src/server/Event.hpp
index 203e5d1d..a0adb564 100644
--- a/src/server/Event.hpp
+++ b/src/server/Event.hpp
@@ -58,7 +58,12 @@ public:
enum class Mode { NORMAL, UNDO, REDO };
/** Execution mode for events that block and unblock preprocessing. */
- enum class Execution { NORMAL, BLOCK, UNBLOCK };
+ enum class Execution {
+ NORMAL, ///< Normal pipelined execution
+ ATOMIC, ///< Block pre-processing until this event is executed
+ BLOCK, ///< Begin atomic block of events
+ UNBLOCK ///< Finish atomic executed block of events
+ };
/** Pre-process event before execution (non-realtime). */
virtual bool pre_process(PreProcessContext& ctx) = 0;
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index 07c51fee..b5153436 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -117,9 +117,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit)
// Execute event
ev->execute(context);
+ ++n_processed;
+
+ // Unblock pre-processing if this is a non-bundled atomic event
+ if (ev->get_execution() == Event::Execution::ATOMIC &&
+ _block_state.load() == BlockState::PRE_BLOCKED) {
+ _block_state = BlockState::UNBLOCKED;
+ }
+
+ // Move to next event
last = ev;
ev = ev->next();
- ++n_processed;
if (_block_state != BlockState::PROCESSING &&
limit && n_processed >= limit) {
@@ -209,6 +217,14 @@ PreProcessor::run()
switch (ev->get_execution()) {
case Event::Execution::NORMAL:
break;
+ case Event::Execution::ATOMIC:
+ assert(_block_state == BlockState::UNBLOCKED);
+ _block_state = BlockState::PRE_BLOCKED;
+ while (_block_state != BlockState::UNBLOCKED) {
+ // Wait for process thread to execute event
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ break;
case Event::Execution::BLOCK:
assert(_block_state == BlockState::UNBLOCKED);
_block_state = BlockState::PRE_BLOCKED;
diff --git a/src/server/events/Delta.cpp b/src/server/events/Delta.cpp
index c623d51e..3ee17ee2 100644
--- a/src/server/events/Delta.cpp
+++ b/src/server/events/Delta.cpp
@@ -65,7 +65,7 @@ Delta::Delta(Engine& engine,
, _state(NULL)
, _context(context)
, _type(type)
- , _poly_lock(engine.store()->mutex(), std::defer_lock)
+ , _block(false)
{
if (context != Resource::Graph::DEFAULT) {
for (auto& p : _properties) {
@@ -136,7 +136,6 @@ Delta::pre_process(PreProcessContext& ctx)
const bool is_client = (_subject == "ingen:/clients/this");
const bool is_engine = (_subject == "ingen:/");
const bool is_file = (_subject.substr(0, 5) == "file:");
- bool poly_changed = false;
if (_type == Type::PUT && is_file) {
// Ensure type is Preset, the only supported file put
@@ -175,8 +174,7 @@ Delta::pre_process(PreProcessContext& ctx)
}
}
- // Take a writer lock while we modify the store
- std::unique_lock<Store::Mutex> lock(_engine.store()->mutex());
+ std::lock_guard<Store::Mutex> lock(_engine.store()->mutex());
_object = is_graph_object
? static_cast<Ingen::Resource*>(_engine.store()->get(Node::uri_to_path(_subject)))
@@ -362,8 +360,8 @@ Delta::pre_process(PreProcessContext& ctx)
if (value.get<int32_t>() < 1 || value.get<int32_t>() > 128) {
_status = Status::INVALID_POLY;
} else {
- poly_changed = true;
- op = SpecialType::POLYPHONY;
+ _block = true;
+ op = SpecialType::POLYPHONY;
_graph->prepare_internal_poly(
*_engine.buffer_factory(), value.get<int32_t>());
}
@@ -380,8 +378,8 @@ Delta::pre_process(PreProcessContext& ctx)
} else if (value.type() != uris.forge.Bool) {
_status = Status::BAD_VALUE_TYPE;
} else {
- poly_changed = true;
- op = SpecialType::POLYPHONIC;
+ _block = true;
+ op = SpecialType::POLYPHONIC;
obj->set_property(key, value, value.context());
BlockImpl* block = dynamic_cast<BlockImpl*>(obj);
if (block) {
@@ -427,11 +425,6 @@ Delta::pre_process(PreProcessContext& ctx)
s->pre_process(ctx);
}
- if (poly_changed) {
- lock.unlock();
- _poly_lock.lock();
- }
-
return Event::pre_process_done(
_status == Status::NOT_PREPARED ? Status::SUCCESS : _status,
_subject);
@@ -536,10 +529,6 @@ Delta::execute(RunContext& context)
void
Delta::post_process()
{
- if (_poly_lock.owns_lock()) {
- _poly_lock.unlock();
- }
-
if (_state) {
BlockImpl* block = dynamic_cast<BlockImpl*>(_object);
if (block) {
@@ -620,6 +609,12 @@ Delta::undo(Interface& target)
}
}
+Event::Execution
+Delta::get_execution() const
+{
+ return _block ? Execution::ATOMIC : Execution::NORMAL;
+}
+
} // namespace Events
} // namespace Server
} // namespace Ingen
diff --git a/src/server/events/Delta.hpp b/src/server/events/Delta.hpp
index ca9d0276..d2326cd9 100644
--- a/src/server/events/Delta.hpp
+++ b/src/server/events/Delta.hpp
@@ -78,6 +78,8 @@ public:
void post_process();
void undo(Interface& target);
+ Execution get_execution() const;
+
private:
enum class SpecialType {
NONE,
@@ -115,7 +117,7 @@ private:
boost::optional<Resource> _preset;
- std::unique_lock<Store::Mutex> _poly_lock; ///< Long-term lock for poly changes
+ bool _block;
};
} // namespace Events