aboutsummaryrefslogtreecommitdiffstats
path: root/Runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'Runner.py')
-rw-r--r--Runner.py611
1 files changed, 611 insertions, 0 deletions
diff --git a/Runner.py b/Runner.py
new file mode 100644
index 00000000..261084d2
--- /dev/null
+++ b/Runner.py
@@ -0,0 +1,611 @@
+#!/usr/bin/env python
+# encoding: utf-8
+# Thomas Nagy, 2005-2018 (ita)
+
+"""
+Runner.py: Task scheduling and execution
+"""
+
+import heapq, traceback
+try:
+ from queue import Queue, PriorityQueue
+except ImportError:
+ from Queue import Queue
+ try:
+ from Queue import PriorityQueue
+ except ImportError:
+ class PriorityQueue(Queue):
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.queue = []
+ def _put(self, item):
+ heapq.heappush(self.queue, item)
+ def _get(self):
+ return heapq.heappop(self.queue)
+
+from waflib import Utils, Task, Errors, Logs
+
+GAP = 5
+"""
+Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
+"""
+
+class PriorityTasks(object):
+ def __init__(self):
+ self.lst = []
+ def __len__(self):
+ return len(self.lst)
+ def __iter__(self):
+ return iter(self.lst)
+ def clear(self):
+ self.lst = []
+ def append(self, task):
+ heapq.heappush(self.lst, task)
+ def appendleft(self, task):
+ "Deprecated, do not use"
+ heapq.heappush(self.lst, task)
+ def pop(self):
+ return heapq.heappop(self.lst)
+ def extend(self, lst):
+ if self.lst:
+ for x in lst:
+ self.append(x)
+ else:
+ if isinstance(lst, list):
+ self.lst = lst
+ heapq.heapify(lst)
+ else:
+ self.lst = lst.lst
+
+class Consumer(Utils.threading.Thread):
+ """
+ Daemon thread object that executes a task. It shares a semaphore with
+ the coordinator :py:class:`waflib.Runner.Spawner`. There is one
+ instance per task to consume.
+ """
+ def __init__(self, spawner, task):
+ Utils.threading.Thread.__init__(self)
+ self.task = task
+ """Task to execute"""
+ self.spawner = spawner
+ """Coordinator object"""
+ self.setDaemon(1)
+ self.start()
+ def run(self):
+ """
+ Processes a single task
+ """
+ try:
+ if not self.spawner.master.stop:
+ self.spawner.master.process_task(self.task)
+ finally:
+ self.spawner.sem.release()
+ self.spawner.master.out.put(self.task)
+ self.task = None
+ self.spawner = None
+
+class Spawner(Utils.threading.Thread):
+ """
+ Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
+ spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
+ :py:class:`waflib.Task.Task` instance.
+ """
+ def __init__(self, master):
+ Utils.threading.Thread.__init__(self)
+ self.master = master
+ """:py:class:`waflib.Runner.Parallel` producer instance"""
+ self.sem = Utils.threading.Semaphore(master.numjobs)
+ """Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
+ self.setDaemon(1)
+ self.start()
+ def run(self):
+ """
+ Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
+ """
+ try:
+ self.loop()
+ except Exception:
+ # Python 2 prints unnecessary messages when shutting down
+ # we also want to stop the thread properly
+ pass
+ def loop(self):
+ """
+ Consumes task objects from the producer; ends when the producer has no more
+ task to provide.
+ """
+ master = self.master
+ while 1:
+ task = master.ready.get()
+ self.sem.acquire()
+ if not master.stop:
+ task.log_display(task.generator.bld)
+ Consumer(self, task)
+
+class Parallel(object):
+ """
+ Schedule the tasks obtained from the build context for execution.
+ """
+ def __init__(self, bld, j=2):
+ """
+ The initialization requires a build context reference
+ for computing the total number of jobs.
+ """
+
+ self.numjobs = j
+ """
+ Amount of parallel consumers to use
+ """
+
+ self.bld = bld
+ """
+ Instance of :py:class:`waflib.Build.BuildContext`
+ """
+
+ self.outstanding = PriorityTasks()
+ """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
+
+ self.postponed = PriorityTasks()
+ """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
+
+ self.incomplete = set()
+ """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
+
+ self.ready = PriorityQueue(0)
+ """List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
+
+ self.out = Queue(0)
+ """List of :py:class:`waflib.Task.Task` returned by the task consumers"""
+
+ self.count = 0
+ """Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
+
+ self.processed = 0
+ """Amount of tasks processed"""
+
+ self.stop = False
+ """Error flag to stop the build"""
+
+ self.error = []
+ """Tasks that could not be executed"""
+
+ self.biter = None
+ """Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
+
+ self.dirty = False
+ """
+ Flag that indicates that the build cache must be saved when a task was executed
+ (calls :py:meth:`waflib.Build.BuildContext.store`)"""
+
+ self.revdeps = Utils.defaultdict(set)
+ """
+ The reverse dependency graph of dependencies obtained from Task.run_after
+ """
+
+ self.spawner = Spawner(self)
+ """
+ Coordinating daemon thread that spawns thread consumers
+ """
+
+ def get_next_task(self):
+ """
+ Obtains the next Task instance to run
+
+ :rtype: :py:class:`waflib.Task.Task`
+ """
+ if not self.outstanding:
+ return None
+ return self.outstanding.pop()
+
+ def postpone(self, tsk):
+ """
+ Adds the task to the list :py:attr:`waflib.Runner.Parallel.postponed`.
+ The order is scrambled so as to consume as many tasks in parallel as possible.
+
+ :param tsk: task instance
+ :type tsk: :py:class:`waflib.Task.Task`
+ """
+ self.postponed.append(tsk)
+
+ def refill_task_list(self):
+ """
+ Pulls a next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
+ Ensures that all tasks in the current build group are complete before processing the next one.
+ """
+ while self.count > self.numjobs * GAP:
+ self.get_out()
+
+ while not self.outstanding:
+ if self.count:
+ self.get_out()
+ if self.outstanding:
+ break
+ elif self.postponed:
+ try:
+ cond = self.deadlock == self.processed
+ except AttributeError:
+ pass
+ else:
+ if cond:
+ # The most common reason is conflicting build order declaration
+ # for example: "X run_after Y" and "Y run_after X"
+ # Another can be changing "run_after" dependencies while the build is running
+ # for example: updating "tsk.run_after" in the "runnable_status" method
+ lst = []
+ for tsk in self.postponed:
+ deps = [id(x) for x in tsk.run_after if not x.hasrun]
+ lst.append('%s\t-> %r' % (repr(tsk), deps))
+ if not deps:
+ lst.append('\n task %r dependencies are done, check its *runnable_status*?' % id(tsk))
+ raise Errors.WafError('Deadlock detected: check the task build order%s' % ''.join(lst))
+ self.deadlock = self.processed
+
+ if self.postponed:
+ self.outstanding.extend(self.postponed)
+ self.postponed.clear()
+ elif not self.count:
+ if self.incomplete:
+ for x in self.incomplete:
+ for k in x.run_after:
+ if not k.hasrun:
+ break
+ else:
+ # dependency added after the build started without updating revdeps
+ self.incomplete.remove(x)
+ self.outstanding.append(x)
+ break
+ else:
+ raise Errors.WafError('Broken revdeps detected on %r' % self.incomplete)
+ else:
+ tasks = next(self.biter)
+ ready, waiting = self.prio_and_split(tasks)
+ self.outstanding.extend(ready)
+ self.incomplete.update(waiting)
+ self.total = self.bld.total()
+ break
+
+ def add_more_tasks(self, tsk):
+ """
+ If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
+ in that list are added to the current build and will be processed before the next build group.
+
+ The priorities for dependent tasks are not re-calculated globally
+
+ :param tsk: task instance
+ :type tsk: :py:attr:`waflib.Task.Task`
+ """
+ if getattr(tsk, 'more_tasks', None):
+ more = set(tsk.more_tasks)
+ groups_done = set()
+ def iteri(a, b):
+ for x in a:
+ yield x
+ for x in b:
+ yield x
+
+ # Update the dependency tree
+ # this assumes that task.run_after values were updated
+ for x in iteri(self.outstanding, self.incomplete):
+ for k in x.run_after:
+ if isinstance(k, Task.TaskGroup):
+ if k not in groups_done:
+ groups_done.add(k)
+ for j in k.prev & more:
+ self.revdeps[j].add(k)
+ elif k in more:
+ self.revdeps[k].add(x)
+
+ ready, waiting = self.prio_and_split(tsk.more_tasks)
+ self.outstanding.extend(ready)
+ self.incomplete.update(waiting)
+ self.total += len(tsk.more_tasks)
+
+ def mark_finished(self, tsk):
+ def try_unfreeze(x):
+ # DAG ancestors are likely to be in the incomplete set
+ # This assumes that the run_after contents have not changed
+ # after the build starts, else a deadlock may occur
+ if x in self.incomplete:
+ # TODO remove dependencies to free some memory?
+ # x.run_after.remove(tsk)
+ for k in x.run_after:
+ if not k.hasrun:
+ break
+ else:
+ self.incomplete.remove(x)
+ self.outstanding.append(x)
+
+ if tsk in self.revdeps:
+ for x in self.revdeps[tsk]:
+ if isinstance(x, Task.TaskGroup):
+ x.prev.remove(tsk)
+ if not x.prev:
+ for k in x.next:
+ # TODO necessary optimization?
+ k.run_after.remove(x)
+ try_unfreeze(k)
+ # TODO necessary optimization?
+ x.next = []
+ else:
+ try_unfreeze(x)
+ del self.revdeps[tsk]
+
+ if hasattr(tsk, 'semaphore'):
+ sem = tsk.semaphore
+ sem.release(tsk)
+ while sem.waiting and not sem.is_locked():
+ # take a frozen task, make it ready to run
+ x = sem.waiting.pop()
+ self._add_task(x)
+
+ def get_out(self):
+ """
+ Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
+ Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
+
+ :rtype: :py:attr:`waflib.Task.Task`
+ """
+ tsk = self.out.get()
+ if not self.stop:
+ self.add_more_tasks(tsk)
+ self.mark_finished(tsk)
+
+ self.count -= 1
+ self.dirty = True
+ return tsk
+
+ def add_task(self, tsk):
+ """
+ Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
+
+ :param tsk: task instance
+ :type tsk: :py:attr:`waflib.Task.Task`
+ """
+ # TODO change in waf 2.1
+ self.ready.put(tsk)
+
+ def _add_task(self, tsk):
+ if hasattr(tsk, 'semaphore'):
+ sem = tsk.semaphore
+ try:
+ sem.acquire(tsk)
+ except IndexError:
+ sem.waiting.add(tsk)
+ return
+
+ self.count += 1
+ self.processed += 1
+ if self.numjobs == 1:
+ tsk.log_display(tsk.generator.bld)
+ try:
+ self.process_task(tsk)
+ finally:
+ self.out.put(tsk)
+ else:
+ self.add_task(tsk)
+
+ def process_task(self, tsk):
+ """
+ Processes a task and attempts to stop the build in case of errors
+ """
+ tsk.process()
+ if tsk.hasrun != Task.SUCCESS:
+ self.error_handler(tsk)
+
+ def skip(self, tsk):
+ """
+ Mark a task as skipped/up-to-date
+ """
+ tsk.hasrun = Task.SKIPPED
+ self.mark_finished(tsk)
+
+ def cancel(self, tsk):
+ """
+ Mark a task as failed because of unsatisfiable dependencies
+ """
+ tsk.hasrun = Task.CANCELED
+ self.mark_finished(tsk)
+
+ def error_handler(self, tsk):
+ """
+ Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set,
+ unless the build is executed with::
+
+ $ waf build -k
+
+ :param tsk: task instance
+ :type tsk: :py:attr:`waflib.Task.Task`
+ """
+ if not self.bld.keep:
+ self.stop = True
+ self.error.append(tsk)
+
+ def task_status(self, tsk):
+ """
+ Obtains the task status to decide whether to run it immediately or not.
+
+ :return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
+ :rtype: integer
+ """
+ try:
+ return tsk.runnable_status()
+ except Exception:
+ self.processed += 1
+ tsk.err_msg = traceback.format_exc()
+ if not self.stop and self.bld.keep:
+ self.skip(tsk)
+ if self.bld.keep == 1:
+ # if -k stop on the first exception, if -kk try to go as far as possible
+ if Logs.verbose > 1 or not self.error:
+ self.error.append(tsk)
+ self.stop = True
+ else:
+ if Logs.verbose > 1:
+ self.error.append(tsk)
+ return Task.EXCEPTION
+
+ tsk.hasrun = Task.EXCEPTION
+ self.error_handler(tsk)
+
+ return Task.EXCEPTION
+
+ def start(self):
+ """
+ Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
+ :py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
+ has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
+ and marks the build as failed by setting the ``stop`` flag.
+ If only one job is used, then executes the tasks one by one, without consumers.
+ """
+ self.total = self.bld.total()
+
+ while not self.stop:
+
+ self.refill_task_list()
+
+ # consider the next task
+ tsk = self.get_next_task()
+ if not tsk:
+ if self.count:
+ # tasks may add new ones after they are run
+ continue
+ else:
+ # no tasks to run, no tasks running, time to exit
+ break
+
+ if tsk.hasrun:
+ # if the task is marked as "run", just skip it
+ self.processed += 1
+ continue
+
+ if self.stop: # stop immediately after a failure is detected
+ break
+
+ st = self.task_status(tsk)
+ if st == Task.RUN_ME:
+ self._add_task(tsk)
+ elif st == Task.ASK_LATER:
+ self.postpone(tsk)
+ elif st == Task.SKIP_ME:
+ self.processed += 1
+ self.skip(tsk)
+ self.add_more_tasks(tsk)
+ elif st == Task.CANCEL_ME:
+ # A dependency problem has occurred, and the
+ # build is most likely run with `waf -k`
+ if Logs.verbose > 1:
+ self.error.append(tsk)
+ self.processed += 1
+ self.cancel(tsk)
+
+ # self.count represents the tasks that have been made available to the consumer threads
+ # collect all the tasks after an error else the message may be incomplete
+ while self.error and self.count:
+ self.get_out()
+
+ self.ready.put(None)
+ if not self.stop:
+ assert not self.count
+ assert not self.postponed
+ assert not self.incomplete
+
+ def prio_and_split(self, tasks):
+ """
+ Label input tasks with priority values, and return a pair containing
+ the tasks that are ready to run and the tasks that are necessarily
+ waiting for other tasks to complete.
+
+ The priority system is really meant as an optional layer for optimization:
+ dependency cycles are found quickly, and builds should be more efficient.
+ A high priority number means that a task is processed first.
+
+ This method can be overridden to disable the priority system::
+
+ def prio_and_split(self, tasks):
+ return tasks, []
+
+ :return: A pair of task lists
+ :rtype: tuple
+ """
+ # to disable:
+ #return tasks, []
+ for x in tasks:
+ x.visited = 0
+
+ reverse = self.revdeps
+
+ groups_done = set()
+ for x in tasks:
+ for k in x.run_after:
+ if isinstance(k, Task.TaskGroup):
+ if k not in groups_done:
+ groups_done.add(k)
+ for j in k.prev:
+ reverse[j].add(k)
+ else:
+ reverse[k].add(x)
+
+ # the priority number is not the tree depth
+ def visit(n):
+ if isinstance(n, Task.TaskGroup):
+ return sum(visit(k) for k in n.next)
+
+ if n.visited == 0:
+ n.visited = 1
+
+ if n in reverse:
+ rev = reverse[n]
+ n.prio_order = n.tree_weight + len(rev) + sum(visit(k) for k in rev)
+ else:
+ n.prio_order = n.tree_weight
+
+ n.visited = 2
+ elif n.visited == 1:
+ raise Errors.WafError('Dependency cycle found!')
+ return n.prio_order
+
+ for x in tasks:
+ if x.visited != 0:
+ # must visit all to detect cycles
+ continue
+ try:
+ visit(x)
+ except Errors.WafError:
+ self.debug_cycles(tasks, reverse)
+
+ ready = []
+ waiting = []
+ for x in tasks:
+ for k in x.run_after:
+ if not k.hasrun:
+ waiting.append(x)
+ break
+ else:
+ ready.append(x)
+ return (ready, waiting)
+
+ def debug_cycles(self, tasks, reverse):
+ tmp = {}
+ for x in tasks:
+ tmp[x] = 0
+
+ def visit(n, acc):
+ if isinstance(n, Task.TaskGroup):
+ for k in n.next:
+ visit(k, acc)
+ return
+ if tmp[n] == 0:
+ tmp[n] = 1
+ for k in reverse.get(n, []):
+ visit(k, [n] + acc)
+ tmp[n] = 2
+ elif tmp[n] == 1:
+ lst = []
+ for tsk in acc:
+ lst.append(repr(tsk))
+ if tsk is n:
+ # exclude prior nodes, we want the minimum cycle
+ break
+ raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
+ for x in tasks:
+ visit(x, [])
+