From 8188d017e2532990304263e848bf2757127f6be2 Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Tue, 28 May 2013 10:16:37 +0200 Subject: [PATCH] Add a dependency-aware task group Change-Id: I4524b4b220bdc02bdd7e3f9a5d84b1982cbee99f --- heat/engine/scheduler.py | 62 ++++++++++++++ heat/tests/test_scheduler.py | 160 +++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+) diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index 11bca52b..a553c8e4 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -243,6 +243,68 @@ def wrappertask(task): return wrapper +class DependencyTaskGroup(object): + """ + A task which manages a group of subtasks that have ordering dependencies. + """ + + def __init__(self, dependencies, make_task=lambda o: o, + reverse=False, name=None): + """ + Initialise with the task dependencies and (optionally) a function for + creating a task from each dependency object. + """ + self._runners = dict((o, TaskRunner(make_task(o))) + for o in dependencies) + self._graph = dependencies.graph(reverse=reverse) + + if name is None: + name = '(%s) %s' % (getattr(make_task, '__name__', + task_description(make_task)), + str(dependencies)) + self.name = name + + def __repr__(self): + """Return a string representation of the task.""" + return '%s(%s)' % (type(self).__name__, self.name) + + def __call__(self): + """Return a co-routine which runs the task group.""" + try: + while any(self._runners.itervalues()): + for k, r in self._ready(): + r.start() + + yield + + for k, r in self._running(): + if r.step(): + del self._graph[k] + except: + with excutils.save_and_reraise_exception(): + for r in self._runners.itervalues(): + r.cancel() + + def _ready(self): + """ + Iterate over all subtasks that are ready to start - i.e. all their + dependencies have been satisfied but they have not yet been started. + """ + for k, n in self._graph.iteritems(): + if not n: + runner = self._runners[k] + if not runner.started(): + yield k, runner + + def _running(self): + """ + Iterate over all subtasks that are currently running - i.e. they have + been started but have not yet completed. + """ + return itertools.ifilter(lambda (k, r): r and r.started(), + self._runners.iteritems()) + + class PollingTaskGroup(object): """ A task which manages a group of subtasks. diff --git a/heat/tests/test_scheduler.py b/heat/tests/test_scheduler.py index 6df29fef..6f829393 100644 --- a/heat/tests/test_scheduler.py +++ b/heat/tests/test_scheduler.py @@ -17,6 +17,7 @@ import mox import contextlib import eventlet +from heat.engine import dependencies from heat.engine import scheduler @@ -159,6 +160,165 @@ class PollingTaskGroupTest(mox.MoxTestBase): dummy.do_step(1, i, i * i) +class DependencyTaskGroupTest(mox.MoxTestBase): + + @contextlib.contextmanager + def _dep_test(self, *edges): + dummy = DummyTask() + + class TaskMaker(object): + def __init__(self, name): + self.name = name + + def __repr__(self): + return 'Dummy task "%s"' % self.name + + def __call__(self, *args, **kwargs): + return dummy(self.name, *args, **kwargs) + + deps = dependencies.Dependencies(edges) + + tg = scheduler.DependencyTaskGroup(deps, TaskMaker) + + self.mox.StubOutWithMock(dummy, 'do_step') + + yield dummy + + self.mox.ReplayAll() + scheduler.TaskRunner(tg)(wait_time=None) + self.mox.VerifyAll() + + def test_single_node(self): + with self._dep_test(('only', None)) as dummy: + dummy.do_step(1, 'only').AndReturn(None) + dummy.do_step(2, 'only').AndReturn(None) + dummy.do_step(3, 'only').AndReturn(None) + + def test_disjoint(self): + with self._dep_test(('1', None), ('2', None)) as dummy: + dummy.do_step(1, '1').InAnyOrder('1') + dummy.do_step(1, '2').InAnyOrder('1') + dummy.do_step(2, '1').InAnyOrder('2') + dummy.do_step(2, '2').InAnyOrder('2') + dummy.do_step(3, '1').InAnyOrder('3') + dummy.do_step(3, '2').InAnyOrder('3') + + def test_single_fwd(self): + with self._dep_test(('second', 'first')) as dummy: + dummy.do_step(1, 'first').AndReturn(None) + dummy.do_step(2, 'first').AndReturn(None) + dummy.do_step(3, 'first').AndReturn(None) + dummy.do_step(1, 'second').AndReturn(None) + dummy.do_step(2, 'second').AndReturn(None) + dummy.do_step(3, 'second').AndReturn(None) + + def test_chain_fwd(self): + with self._dep_test(('third', 'second'), + ('second', 'first')) as dummy: + dummy.do_step(1, 'first').AndReturn(None) + dummy.do_step(2, 'first').AndReturn(None) + dummy.do_step(3, 'first').AndReturn(None) + dummy.do_step(1, 'second').AndReturn(None) + dummy.do_step(2, 'second').AndReturn(None) + dummy.do_step(3, 'second').AndReturn(None) + dummy.do_step(1, 'third').AndReturn(None) + dummy.do_step(2, 'third').AndReturn(None) + dummy.do_step(3, 'third').AndReturn(None) + + def test_diamond_fwd(self): + with self._dep_test(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'first'), ('mid2', 'first')) as dummy: + dummy.do_step(1, 'first').AndReturn(None) + dummy.do_step(2, 'first').AndReturn(None) + dummy.do_step(3, 'first').AndReturn(None) + dummy.do_step(1, 'mid1').InAnyOrder('1') + dummy.do_step(1, 'mid2').InAnyOrder('1') + dummy.do_step(2, 'mid1').InAnyOrder('2') + dummy.do_step(2, 'mid2').InAnyOrder('2') + dummy.do_step(3, 'mid1').InAnyOrder('3') + dummy.do_step(3, 'mid2').InAnyOrder('3') + dummy.do_step(1, 'last').AndReturn(None) + dummy.do_step(2, 'last').AndReturn(None) + dummy.do_step(3, 'last').AndReturn(None) + + def test_complex_fwd(self): + with self._dep_test(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'mid3'), ('mid1', 'first'), + ('mid3', 'first'), ('mid2', 'first')) as dummy: + dummy.do_step(1, 'first').AndReturn(None) + dummy.do_step(2, 'first').AndReturn(None) + dummy.do_step(3, 'first').AndReturn(None) + dummy.do_step(1, 'mid2').InAnyOrder('1') + dummy.do_step(1, 'mid3').InAnyOrder('1') + dummy.do_step(2, 'mid2').InAnyOrder('2') + dummy.do_step(2, 'mid3').InAnyOrder('2') + dummy.do_step(3, 'mid2').InAnyOrder('3') + dummy.do_step(3, 'mid3').InAnyOrder('3') + dummy.do_step(1, 'mid1').AndReturn(None) + dummy.do_step(2, 'mid1').AndReturn(None) + dummy.do_step(3, 'mid1').AndReturn(None) + dummy.do_step(1, 'last').AndReturn(None) + dummy.do_step(2, 'last').AndReturn(None) + dummy.do_step(3, 'last').AndReturn(None) + + def test_many_edges_fwd(self): + with self._dep_test(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3')) as dummy: + dummy.do_step(1, 'e1').InAnyOrder('1edges') + dummy.do_step(1, 'e2').InAnyOrder('1edges') + dummy.do_step(1, 'e3').InAnyOrder('1edges') + dummy.do_step(2, 'e1').InAnyOrder('2edges') + dummy.do_step(2, 'e2').InAnyOrder('2edges') + dummy.do_step(2, 'e3').InAnyOrder('2edges') + dummy.do_step(3, 'e1').InAnyOrder('3edges') + dummy.do_step(3, 'e2').InAnyOrder('3edges') + dummy.do_step(3, 'e3').InAnyOrder('3edges') + dummy.do_step(1, 'mid3').AndReturn(None) + dummy.do_step(2, 'mid3').AndReturn(None) + dummy.do_step(3, 'mid3').AndReturn(None) + dummy.do_step(1, 'mid2').InAnyOrder('1mid') + dummy.do_step(1, 'mid1').InAnyOrder('1mid') + dummy.do_step(2, 'mid2').InAnyOrder('2mid') + dummy.do_step(2, 'mid1').InAnyOrder('2mid') + dummy.do_step(3, 'mid2').InAnyOrder('3mid') + dummy.do_step(3, 'mid1').InAnyOrder('3mid') + dummy.do_step(1, 'last').AndReturn(None) + dummy.do_step(2, 'last').AndReturn(None) + dummy.do_step(3, 'last').AndReturn(None) + + def test_dbldiamond_fwd(self): + with self._dep_test(('last', 'a1'), ('last', 'a2'), + ('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'), + ('b1', 'first'), ('b2', 'first')) as dummy: + dummy.do_step(1, 'first').AndReturn(None) + dummy.do_step(2, 'first').AndReturn(None) + dummy.do_step(3, 'first').AndReturn(None) + dummy.do_step(1, 'b1').InAnyOrder('1b') + dummy.do_step(1, 'b2').InAnyOrder('1b') + dummy.do_step(2, 'b1').InAnyOrder('2b') + dummy.do_step(2, 'b2').InAnyOrder('2b') + dummy.do_step(3, 'b1').InAnyOrder('3b') + dummy.do_step(3, 'b2').InAnyOrder('3b') + dummy.do_step(1, 'a1').InAnyOrder('1a') + dummy.do_step(1, 'a2').InAnyOrder('1a') + dummy.do_step(2, 'a1').InAnyOrder('2a') + dummy.do_step(2, 'a2').InAnyOrder('2a') + dummy.do_step(3, 'a1').InAnyOrder('3a') + dummy.do_step(3, 'a2').InAnyOrder('3a') + dummy.do_step(1, 'last').AndReturn(None) + dummy.do_step(2, 'last').AndReturn(None) + dummy.do_step(3, 'last').AndReturn(None) + + def test_circular_deps(self): + d = dependencies.Dependencies([('first', 'second'), + ('second', 'third'), + ('third', 'first')]) + self.assertRaises(dependencies.CircularDependencyException, + scheduler.DependencyTaskGroup, d) + + class TaskTest(mox.MoxTestBase): def test_run(self): -- 2.45.2