diff options
Diffstat (limited to 'src/server/PreProcessor.cpp')
-rw-r--r-- | src/server/PreProcessor.cpp | 54 |
1 files changed, 49 insertions, 5 deletions
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp index e3e5475f..2ea4ac30 100644 --- a/src/server/PreProcessor.cpp +++ b/src/server/PreProcessor.cpp @@ -38,6 +38,7 @@ PreProcessor::PreProcessor(Engine& engine) , _head(NULL) , _prepared_back(NULL) , _tail(NULL) + , _block_state(BlockState::UNBLOCKED) , _exit_flag(false) , _thread(&PreProcessor::run, this) {} @@ -87,16 +88,44 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit) size_t n_processed = 0; Event* ev = head; Event* last = ev; - while (ev && ev->is_prepared() && ev->time() < context.end()) { - if (ev->time() < context.start()) { - // Didn't get around to executing in time, oh well... - ev->set_time(context.start()); + while (ev && ev->is_prepared()) { + switch (_block_state) { + case BlockState::UNBLOCKED: + break; + case BlockState::PRE_BLOCKED: + if (ev->get_execution() == Event::Execution::BLOCK) { + _block_state = BlockState::BLOCKED; + } + break; + case BlockState::BLOCKED: + break; + case BlockState::PRE_UNBLOCKED: + assert(ev->get_execution() == Event::Execution::BLOCK); + _block_state = BlockState::PROCESSING; + break; + case BlockState::PROCESSING: + if (ev->get_execution() == Event::Execution::UNBLOCK) { + _block_state = BlockState::UNBLOCKED; + } + } + + if (_block_state == BlockState::BLOCKED) { + break; // Waiting for PRE_UNBLOCKED + } else if (ev->time() < context.start()) { + ev->set_time(context.start()); // Too late, nudge to context start + } else if (_block_state != BlockState::PROCESSING && + ev->time() >= context.end()) { + break; // Event is for a future cycle } + + // Execute event ev->execute(context); last = ev; ev = ev->next(); ++n_processed; - if (limit && n_processed >= limit) { + + if (_block_state != BlockState::PROCESSING && + limit && n_processed >= limit) { break; } } @@ -158,6 +187,21 @@ PreProcessor::run() } assert(ev->is_prepared()); + switch (ev->get_execution()) { + case Event::Execution::NORMAL: + break; + case Event::Execution::BLOCK: + assert(_block_state == BlockState::UNBLOCKED); + _block_state = BlockState::PRE_BLOCKED; + break; + case Event::Execution::UNBLOCK: + while (_block_state != BlockState::BLOCKED) { + // Wait for process thread to reach blocking event + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + _block_state = BlockState::PRE_UNBLOCKED; + } + _prepared_back = (Event*)ev->next(); } } |