summaryrefslogtreecommitdiffstats
path: root/src/server/PreProcessor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/PreProcessor.cpp')
-rw-r--r--src/server/PreProcessor.cpp54
1 files changed, 49 insertions, 5 deletions
diff --git a/src/server/PreProcessor.cpp b/src/server/PreProcessor.cpp
index e3e5475f..2ea4ac30 100644
--- a/src/server/PreProcessor.cpp
+++ b/src/server/PreProcessor.cpp
@@ -38,6 +38,7 @@ PreProcessor::PreProcessor(Engine& engine)
, _head(NULL)
, _prepared_back(NULL)
, _tail(NULL)
+ , _block_state(BlockState::UNBLOCKED)
, _exit_flag(false)
, _thread(&PreProcessor::run, this)
{}
@@ -87,16 +88,44 @@ PreProcessor::process(RunContext& context, PostProcessor& dest, size_t limit)
size_t n_processed = 0;
Event* ev = head;
Event* last = ev;
- while (ev && ev->is_prepared() && ev->time() < context.end()) {
- if (ev->time() < context.start()) {
- // Didn't get around to executing in time, oh well...
- ev->set_time(context.start());
+ while (ev && ev->is_prepared()) {
+ switch (_block_state) {
+ case BlockState::UNBLOCKED:
+ break;
+ case BlockState::PRE_BLOCKED:
+ if (ev->get_execution() == Event::Execution::BLOCK) {
+ _block_state = BlockState::BLOCKED;
+ }
+ break;
+ case BlockState::BLOCKED:
+ break;
+ case BlockState::PRE_UNBLOCKED:
+ assert(ev->get_execution() == Event::Execution::BLOCK);
+ _block_state = BlockState::PROCESSING;
+ break;
+ case BlockState::PROCESSING:
+ if (ev->get_execution() == Event::Execution::UNBLOCK) {
+ _block_state = BlockState::UNBLOCKED;
+ }
+ }
+
+ if (_block_state == BlockState::BLOCKED) {
+ break; // Waiting for PRE_UNBLOCKED
+ } else if (ev->time() < context.start()) {
+ ev->set_time(context.start()); // Too late, nudge to context start
+ } else if (_block_state != BlockState::PROCESSING &&
+ ev->time() >= context.end()) {
+ break; // Event is for a future cycle
}
+
+ // Execute event
ev->execute(context);
last = ev;
ev = ev->next();
++n_processed;
- if (limit && n_processed >= limit) {
+
+ if (_block_state != BlockState::PROCESSING &&
+ limit && n_processed >= limit) {
break;
}
}
@@ -158,6 +187,21 @@ PreProcessor::run()
}
assert(ev->is_prepared());
+ switch (ev->get_execution()) {
+ case Event::Execution::NORMAL:
+ break;
+ case Event::Execution::BLOCK:
+ assert(_block_state == BlockState::UNBLOCKED);
+ _block_state = BlockState::PRE_BLOCKED;
+ break;
+ case Event::Execution::UNBLOCK:
+ while (_block_state != BlockState::BLOCKED) {
+ // Wait for process thread to reach blocking event
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ _block_state = BlockState::PRE_UNBLOCKED;
+ }
+
_prepared_back = (Event*)ev->next();
}
}