summaryrefslogtreecommitdiffstats
path: root/src/server/PreProcessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/PreProcessor.cpp')
-rw-r--r--src/server/PreProcessor.cpp52
1 files changed, 28 insertions, 24 deletions
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index b5153436..4e2ac364 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -92,13 +92,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit)
case BlockState::PRE_BLOCKED:
if (ev->get_execution() == Event::Execution::BLOCK) {
_block_state = BlockState::BLOCKED;
+ } else if (ev->get_execution() == Event::Execution::ATOMIC) {
+ _block_state = BlockState::PROCESSING;
}
break;
case BlockState::BLOCKED:
break;
case BlockState::PRE_UNBLOCKED:
assert(ev->get_execution() == Event::Execution::BLOCK);
- _block_state = BlockState::PROCESSING;
+ if (ev->get_execution() == Event::Execution::BLOCK) {
+ _block_state = BlockState::PROCESSING;
+ }
break;
case BlockState::PROCESSING:
if (ev->get_execution() == Event::Execution::UNBLOCK) {
@@ -120,8 +124,8 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit)
++n_processed;
// Unblock pre-processing if this is a non-bundled atomic event
- if (ev->get_execution() == Event::Execution::ATOMIC &&
- _block_state.load() == BlockState::PRE_BLOCKED) {
+ if (ev->get_execution() == Event::Execution::ATOMIC) {
+ assert(_block_state.load() == BlockState::PROCESSING);
_block_state = BlockState::UNBLOCKED;
}
@@ -194,6 +198,24 @@ PreProcessor::run()
continue;
}
+ // Set block state before enqueueing event
+ switch (ev->get_execution()) {
+ case Event::Execution::NORMAL:
+ break;
+ case Event::Execution::ATOMIC:
+ assert(_block_state == BlockState::UNBLOCKED);
+ _block_state = BlockState::PRE_BLOCKED;
+ break;
+ case Event::Execution::BLOCK:
+ assert(_block_state == BlockState::UNBLOCKED);
+ _block_state = BlockState::PRE_BLOCKED;
+ break;
+ case Event::Execution::UNBLOCK:
+ wait_for_block_state(BlockState::BLOCKED);
+ _block_state = BlockState::PRE_UNBLOCKED;
+ }
+
+ // Prepare event, allowing it to be processed
assert(!ev->is_prepared());
if (ev->pre_process(ctx)) {
switch (ev->get_mode()) {
@@ -214,27 +236,9 @@ PreProcessor::run()
}
assert(ev->is_prepared());
- switch (ev->get_execution()) {
- case Event::Execution::NORMAL:
- break;
- case Event::Execution::ATOMIC:
- assert(_block_state == BlockState::UNBLOCKED);
- _block_state = BlockState::PRE_BLOCKED;
- while (_block_state != BlockState::UNBLOCKED) {
- // Wait for process thread to execute event
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
- break;
- case Event::Execution::BLOCK:
- assert(_block_state == BlockState::UNBLOCKED);
- _block_state = BlockState::PRE_BLOCKED;
- break;
- case Event::Execution::UNBLOCK:
- while (_block_state != BlockState::BLOCKED) {
- // Wait for process thread to reach blocking event
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- }
- _block_state = BlockState::PRE_UNBLOCKED;
+ // Wait for process() if necessary
+ if (ev->get_execution() == Event::Execution::ATOMIC) {
+ wait_for_block_state(BlockState::UNBLOCKED);
}
back = (Event*)ev->next();