summaryrefslogtreecommitdiffstats
path: root/src/server/PreProcessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/PreProcessor.cpp')
-rw-r--r--src/server/PreProcessor.cpp18
1 files changed, 17 insertions, 1 deletions
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index 07c51fee..b5153436 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -117,9 +117,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit)
// Execute event
ev->execute(context);
+ ++n_processed;
+
+ // Unblock pre-processing if this is a non-bundled atomic event
+ if (ev->get_execution() == Event::Execution::ATOMIC &&
+ _block_state.load() == BlockState::PRE_BLOCKED) {
+ _block_state = BlockState::UNBLOCKED;
+ }
+
+ // Move to next event
last = ev;
ev = ev->next();
- ++n_processed;
if (_block_state != BlockState::PROCESSING &&
limit && n_processed >= limit) {
@@ -209,6 +217,14 @@ PreProcessor::run()
switch (ev->get_execution()) {
case Event::Execution::NORMAL:
break;
+ case Event::Execution::ATOMIC:
+ assert(_block_state == BlockState::UNBLOCKED);
+ _block_state = BlockState::PRE_BLOCKED;
+ while (_block_state != BlockState::UNBLOCKED) {
+ // Wait for process thread to execute event
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ break;
case Event::Execution::BLOCK:
assert(_block_state == BlockState::UNBLOCKED);
_block_state = BlockState::PRE_BLOCKED;