diff options
Diffstat (limited to 'src/server/PreProcessor.cpp')
-rw-r--r-- | src/server/PreProcessor.cpp | 52 |
1 files changed, 28 insertions, 24 deletions
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index b5153436..4e2ac364 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -92,13 +92,17 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) case BlockState::PRE_BLOCKED: if (ev->get_execution() == Event::Execution::BLOCK) { _block_state = BlockState::BLOCKED; + } else if (ev->get_execution() == Event::Execution::ATOMIC) { + _block_state = BlockState::PROCESSING; } break; case BlockState::BLOCKED: break; case BlockState::PRE_UNBLOCKED: assert(ev->get_execution() == Event::Execution::BLOCK); - _block_state = BlockState::PROCESSING; + if (ev->get_execution() == Event::Execution::BLOCK) { + _block_state = BlockState::PROCESSING; + } break; case BlockState::PROCESSING: if (ev->get_execution() == Event::Execution::UNBLOCK) { @@ -120,8 +124,8 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) ++n_processed; // Unblock pre-processing if this is a non-bundled atomic event - if (ev->get_execution() == Event::Execution::ATOMIC && - _block_state.load() == BlockState::PRE_BLOCKED) { + if (ev->get_execution() == Event::Execution::ATOMIC) { + assert(_block_state.load() == BlockState::PROCESSING); _block_state = BlockState::UNBLOCKED; } @@ -194,6 +198,24 @@ PreProcessor::run() continue; } + // Set block state before enqueueing event + switch (ev->get_execution()) { + case Event::Execution::NORMAL: + break; + case Event::Execution::ATOMIC: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + break; + case Event::Execution::BLOCK: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + break; + case Event::Execution::UNBLOCK: + wait_for_block_state(BlockState::BLOCKED); + _block_state = BlockState::PRE_UNBLOCKED; + } + + // Prepare event, allowing it to be processed assert(!ev->is_prepared()); if (ev->pre_process(ctx)) { switch (ev->get_mode()) { @@ -214,27 +236,9 @@ PreProcessor::run() } assert(ev->is_prepared()); - switch (ev->get_execution()) { - case Event::Execution::NORMAL: - break; - case Event::Execution::ATOMIC: - assert(_block_state == BlockState::UNBLOCKED); - _block_state = BlockState::PRE_BLOCKED; - while (_block_state != BlockState::UNBLOCKED) { - // Wait for process thread to execute event - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - break; - case Event::Execution::BLOCK: - assert(_block_state == BlockState::UNBLOCKED); - _block_state = BlockState::PRE_BLOCKED; - break; - case Event::Execution::UNBLOCK: - while (_block_state != BlockState::BLOCKED) { - // Wait for process thread to reach blocking event - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - _block_state = BlockState::PRE_UNBLOCKED; + // Wait for process() if necessary + if (ev->get_execution() == Event::Execution::ATOMIC) { + wait_for_block_state(BlockState::UNBLOCKED); } back = (Event*)ev->next(); |