]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add a dependency-aware task group
authorZane Bitter <zbitter@redhat.com>
Tue, 28 May 2013 08:16:37 +0000 (10:16 +0200)
committerZane Bitter <zbitter@redhat.com>
Tue, 28 May 2013 08:16:37 +0000 (10:16 +0200)
Change-Id: I4524b4b220bdc02bdd7e3f9a5d84b1982cbee99f

heat/engine/scheduler.py
heat/tests/test_scheduler.py

index 11bca52bfa8d3b69525f7b6f09f28344ecda6216..a553c8e482f3b170b88a9fb9b3c5fe7251fdb0e9 100644 (file)
@@ -243,6 +243,68 @@ def wrappertask(task):
     return wrapper
 
 
+class DependencyTaskGroup(object):
+    """
+    A task which manages a group of subtasks that have ordering dependencies.
+    """
+
+    def __init__(self, dependencies, make_task=lambda o: o,
+                 reverse=False, name=None):
+        """
+        Initialise with the task dependencies and (optionally) a function for
+        creating a task from each dependency object.
+        """
+        self._runners = dict((o, TaskRunner(make_task(o)))
+                             for o in dependencies)
+        self._graph = dependencies.graph(reverse=reverse)
+
+        if name is None:
+            name = '(%s) %s' % (getattr(make_task, '__name__',
+                                        task_description(make_task)),
+                                str(dependencies))
+        self.name = name
+
+    def __repr__(self):
+        """Return a string representation of the task."""
+        return '%s(%s)' % (type(self).__name__, self.name)
+
+    def __call__(self):
+        """Return a co-routine which runs the task group."""
+        try:
+            while any(self._runners.itervalues()):
+                for k, r in self._ready():
+                    r.start()
+
+                yield
+
+                for k, r in self._running():
+                    if r.step():
+                        del self._graph[k]
+        except:
+            with excutils.save_and_reraise_exception():
+                for r in self._runners.itervalues():
+                    r.cancel()
+
+    def _ready(self):
+        """
+        Iterate over all subtasks that are ready to start - i.e. all their
+        dependencies have been satisfied but they have not yet been started.
+        """
+        for k, n in self._graph.iteritems():
+            if not n:
+                runner = self._runners[k]
+                if not runner.started():
+                    yield k, runner
+
+    def _running(self):
+        """
+        Iterate over all subtasks that are currently running - i.e. they have
+        been started but have not yet completed.
+        """
+        return itertools.ifilter(lambda (k, r): r and r.started(),
+                                 self._runners.iteritems())
+
+
 class PollingTaskGroup(object):
     """
     A task which manages a group of subtasks.
index 6df29fef0f7d0e1554799ca8460ac960dd9e1a74..6f82939391b4f35e584e5710c7879b30cdd71819 100644 (file)
@@ -17,6 +17,7 @@ import mox
 import contextlib
 import eventlet
 
+from heat.engine import dependencies
 from heat.engine import scheduler
 
 
@@ -159,6 +160,165 @@ class PollingTaskGroupTest(mox.MoxTestBase):
                 dummy.do_step(1, i, i * i)
 
 
+class DependencyTaskGroupTest(mox.MoxTestBase):
+
+    @contextlib.contextmanager
+    def _dep_test(self, *edges):
+        dummy = DummyTask()
+
+        class TaskMaker(object):
+            def __init__(self, name):
+                self.name = name
+
+            def __repr__(self):
+                return 'Dummy task "%s"' % self.name
+
+            def __call__(self, *args, **kwargs):
+                return dummy(self.name, *args, **kwargs)
+
+        deps = dependencies.Dependencies(edges)
+
+        tg = scheduler.DependencyTaskGroup(deps, TaskMaker)
+
+        self.mox.StubOutWithMock(dummy, 'do_step')
+
+        yield dummy
+
+        self.mox.ReplayAll()
+        scheduler.TaskRunner(tg)(wait_time=None)
+        self.mox.VerifyAll()
+
+    def test_single_node(self):
+        with self._dep_test(('only', None)) as dummy:
+            dummy.do_step(1, 'only').AndReturn(None)
+            dummy.do_step(2, 'only').AndReturn(None)
+            dummy.do_step(3, 'only').AndReturn(None)
+
+    def test_disjoint(self):
+        with self._dep_test(('1', None), ('2', None)) as dummy:
+            dummy.do_step(1, '1').InAnyOrder('1')
+            dummy.do_step(1, '2').InAnyOrder('1')
+            dummy.do_step(2, '1').InAnyOrder('2')
+            dummy.do_step(2, '2').InAnyOrder('2')
+            dummy.do_step(3, '1').InAnyOrder('3')
+            dummy.do_step(3, '2').InAnyOrder('3')
+
+    def test_single_fwd(self):
+        with self._dep_test(('second', 'first')) as dummy:
+            dummy.do_step(1, 'first').AndReturn(None)
+            dummy.do_step(2, 'first').AndReturn(None)
+            dummy.do_step(3, 'first').AndReturn(None)
+            dummy.do_step(1, 'second').AndReturn(None)
+            dummy.do_step(2, 'second').AndReturn(None)
+            dummy.do_step(3, 'second').AndReturn(None)
+
+    def test_chain_fwd(self):
+        with self._dep_test(('third', 'second'),
+                            ('second', 'first')) as dummy:
+            dummy.do_step(1, 'first').AndReturn(None)
+            dummy.do_step(2, 'first').AndReturn(None)
+            dummy.do_step(3, 'first').AndReturn(None)
+            dummy.do_step(1, 'second').AndReturn(None)
+            dummy.do_step(2, 'second').AndReturn(None)
+            dummy.do_step(3, 'second').AndReturn(None)
+            dummy.do_step(1, 'third').AndReturn(None)
+            dummy.do_step(2, 'third').AndReturn(None)
+            dummy.do_step(3, 'third').AndReturn(None)
+
+    def test_diamond_fwd(self):
+        with self._dep_test(('last', 'mid1'), ('last', 'mid2'),
+                            ('mid1', 'first'), ('mid2', 'first')) as dummy:
+            dummy.do_step(1, 'first').AndReturn(None)
+            dummy.do_step(2, 'first').AndReturn(None)
+            dummy.do_step(3, 'first').AndReturn(None)
+            dummy.do_step(1, 'mid1').InAnyOrder('1')
+            dummy.do_step(1, 'mid2').InAnyOrder('1')
+            dummy.do_step(2, 'mid1').InAnyOrder('2')
+            dummy.do_step(2, 'mid2').InAnyOrder('2')
+            dummy.do_step(3, 'mid1').InAnyOrder('3')
+            dummy.do_step(3, 'mid2').InAnyOrder('3')
+            dummy.do_step(1, 'last').AndReturn(None)
+            dummy.do_step(2, 'last').AndReturn(None)
+            dummy.do_step(3, 'last').AndReturn(None)
+
+    def test_complex_fwd(self):
+        with self._dep_test(('last', 'mid1'), ('last', 'mid2'),
+                            ('mid1', 'mid3'), ('mid1', 'first'),
+                            ('mid3', 'first'), ('mid2', 'first')) as dummy:
+            dummy.do_step(1, 'first').AndReturn(None)
+            dummy.do_step(2, 'first').AndReturn(None)
+            dummy.do_step(3, 'first').AndReturn(None)
+            dummy.do_step(1, 'mid2').InAnyOrder('1')
+            dummy.do_step(1, 'mid3').InAnyOrder('1')
+            dummy.do_step(2, 'mid2').InAnyOrder('2')
+            dummy.do_step(2, 'mid3').InAnyOrder('2')
+            dummy.do_step(3, 'mid2').InAnyOrder('3')
+            dummy.do_step(3, 'mid3').InAnyOrder('3')
+            dummy.do_step(1, 'mid1').AndReturn(None)
+            dummy.do_step(2, 'mid1').AndReturn(None)
+            dummy.do_step(3, 'mid1').AndReturn(None)
+            dummy.do_step(1, 'last').AndReturn(None)
+            dummy.do_step(2, 'last').AndReturn(None)
+            dummy.do_step(3, 'last').AndReturn(None)
+
+    def test_many_edges_fwd(self):
+        with self._dep_test(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
+                            ('mid1', 'e2'), ('mid1', 'mid3'),
+                            ('mid2', 'mid3'),
+                            ('mid3', 'e3')) as dummy:
+            dummy.do_step(1, 'e1').InAnyOrder('1edges')
+            dummy.do_step(1, 'e2').InAnyOrder('1edges')
+            dummy.do_step(1, 'e3').InAnyOrder('1edges')
+            dummy.do_step(2, 'e1').InAnyOrder('2edges')
+            dummy.do_step(2, 'e2').InAnyOrder('2edges')
+            dummy.do_step(2, 'e3').InAnyOrder('2edges')
+            dummy.do_step(3, 'e1').InAnyOrder('3edges')
+            dummy.do_step(3, 'e2').InAnyOrder('3edges')
+            dummy.do_step(3, 'e3').InAnyOrder('3edges')
+            dummy.do_step(1, 'mid3').AndReturn(None)
+            dummy.do_step(2, 'mid3').AndReturn(None)
+            dummy.do_step(3, 'mid3').AndReturn(None)
+            dummy.do_step(1, 'mid2').InAnyOrder('1mid')
+            dummy.do_step(1, 'mid1').InAnyOrder('1mid')
+            dummy.do_step(2, 'mid2').InAnyOrder('2mid')
+            dummy.do_step(2, 'mid1').InAnyOrder('2mid')
+            dummy.do_step(3, 'mid2').InAnyOrder('3mid')
+            dummy.do_step(3, 'mid1').InAnyOrder('3mid')
+            dummy.do_step(1, 'last').AndReturn(None)
+            dummy.do_step(2, 'last').AndReturn(None)
+            dummy.do_step(3, 'last').AndReturn(None)
+
+    def test_dbldiamond_fwd(self):
+        with self._dep_test(('last', 'a1'), ('last', 'a2'),
+                            ('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'),
+                            ('b1', 'first'), ('b2', 'first')) as dummy:
+            dummy.do_step(1, 'first').AndReturn(None)
+            dummy.do_step(2, 'first').AndReturn(None)
+            dummy.do_step(3, 'first').AndReturn(None)
+            dummy.do_step(1, 'b1').InAnyOrder('1b')
+            dummy.do_step(1, 'b2').InAnyOrder('1b')
+            dummy.do_step(2, 'b1').InAnyOrder('2b')
+            dummy.do_step(2, 'b2').InAnyOrder('2b')
+            dummy.do_step(3, 'b1').InAnyOrder('3b')
+            dummy.do_step(3, 'b2').InAnyOrder('3b')
+            dummy.do_step(1, 'a1').InAnyOrder('1a')
+            dummy.do_step(1, 'a2').InAnyOrder('1a')
+            dummy.do_step(2, 'a1').InAnyOrder('2a')
+            dummy.do_step(2, 'a2').InAnyOrder('2a')
+            dummy.do_step(3, 'a1').InAnyOrder('3a')
+            dummy.do_step(3, 'a2').InAnyOrder('3a')
+            dummy.do_step(1, 'last').AndReturn(None)
+            dummy.do_step(2, 'last').AndReturn(None)
+            dummy.do_step(3, 'last').AndReturn(None)
+
+    def test_circular_deps(self):
+        d = dependencies.Dependencies([('first', 'second'),
+                                       ('second', 'third'),
+                                       ('third', 'first')])
+        self.assertRaises(dependencies.CircularDependencyException,
+                          scheduler.DependencyTaskGroup, d)
+
+
 class TaskTest(mox.MoxTestBase):
 
     def test_run(self):