]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add a PollingTaskGroup task
authorZane Bitter <zbitter@redhat.com>
Tue, 7 May 2013 13:10:09 +0000 (15:10 +0200)
committerZane Bitter <zbitter@redhat.com>
Tue, 7 May 2013 13:10:09 +0000 (15:10 +0200)
A PollingTaskGroup can monitor a collection of subtasks.

Change-Id: I5db4e7a9d32647397f3229e7654aa0fb68d794ce

heat/engine/scheduler.py
heat/tests/test_scheduler.py

index 986c9c3b022e6cc7fdb0dc28576f3fee1a25090a..a321d3e82f6d5e1f577ebfae1af4051f64a8e0d1 100644 (file)
 #    under the License.
 
 import eventlet
+import functools
+import itertools
 import types
 
+from heat.openstack.common import excutils
 from heat.openstack.common import log as logging
 
 logger = logging.getLogger(__name__)
@@ -139,3 +142,98 @@ class TaskRunner(object):
     def __nonzero__(self):
         """Return True if there are steps remaining."""
         return not self.done()
+
+
+class PollingTaskGroup(object):
+    """
+    A task which manages a group of subtasks.
+
+    When the task is started, all of its subtasks are also started. The task
+    completes when all subtasks are complete.
+
+    Once started, the subtasks are assumed to be only polling for completion
+    of an asynchronous operation, so no attempt is made to give them equal
+    scheduling slots.
+    """
+
+    def __init__(self, tasks, name=None):
+        """Initialise with a list of tasks"""
+        self._tasks = list(tasks)
+        if name is None:
+            name = ', '.join(task_description(t) for t in self._tasks)
+        self.name = name
+
+    @staticmethod
+    def _args(arg_lists):
+        """Return a list containing the positional args for each subtask."""
+        return zip(*arg_lists)
+
+    @staticmethod
+    def _kwargs(kwarg_lists):
+        """Return a list containing the keyword args for each subtask."""
+        keygroups = (itertools.izip(itertools.repeat(name),
+                                    arglist)
+                     for name, arglist in kwarg_lists.iteritems())
+        return [dict(kwargs) for kwargs in itertools.izip(*keygroups)]
+
+    @classmethod
+    def from_task_with_args(cls, task, *arg_lists, **kwarg_lists):
+        """
+        Return a new PollingTaskGroup where each subtask is identical except
+        for the arguments passed to it.
+
+        Each argument to use should be passed as a list (or iterable) of values
+        such that one is passed in the corresponding position for each subtask.
+        The number of subtasks spawned depends on the length of the argument
+        lists. For example:
+
+            PollingTaskGroup.from_task_with_args(my_task,
+                                                 [1, 2, 3],
+                                                 alpha=['a', 'b', 'c'])
+
+        will start three TaskRunners that will run:
+
+            my_task(1, alpha='a')
+            my_task(2, alpha='b')
+            my_task(3, alpha='c')
+
+        respectively.
+
+        If multiple arguments are supplied, each list should be of the same
+        length. In the case of any discrepancy, the length of the shortest
+        argument list will be used, and any extra arguments discarded.
+        """
+
+        args_list = cls._args(arg_lists)
+        kwargs_list = cls._kwargs(kwarg_lists)
+
+        if kwarg_lists and not arg_lists:
+            args_list = [[]] * len(kwargs_list)
+        elif arg_lists and not kwarg_lists:
+            kwargs_list = [{}] * len(args_list)
+
+        task_args = itertools.izip(args_list, kwargs_list)
+        tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args)
+
+        return cls(tasks, name=task_description(task))
+
+    def __repr__(self):
+        """Return a string representation of the task group."""
+        return '%s(%s)' % (type(self).__name__, self.name)
+
+    def __call__(self):
+        """Return a co-routine which runs the task group"""
+        runners = [TaskRunner(t) for t in self._tasks]
+
+        try:
+            for r in runners:
+                r.start()
+
+            while runners:
+                yield
+                runners = list(itertools.dropwhile(lambda r: r.step(),
+                                                   runners))
+        except:
+            with excutils.save_and_reraise_exception():
+                for r in runners:
+                    r.cancel()
index 8b72aa2aca39543343495d2c91ae02c9cef02021..8fd2630febe58b7de25d323315a26cfd40ac50f6 100644 (file)
@@ -14,6 +14,7 @@
 
 import mox
 
+import contextlib
 import eventlet
 
 from heat.engine import scheduler
@@ -25,13 +26,139 @@ class DummyTask(object):
 
     def __call__(self, *args, **kwargs):
         for i in range(1, self.num_steps + 1):
-            self.do_step(i)
+            self.do_step(i, *args, **kwargs)
             yield
 
-    def do_step(self, step_num):
+    def do_step(self, step_num, *args, **kwargs):
         print self, step_num
 
 
+class PollingTaskGroupTest(mox.MoxTestBase):
+
+    def test_group(self):
+        tasks = [DummyTask() for i in range(3)]
+        for t in tasks:
+            self.mox.StubOutWithMock(t, 'do_step')
+
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        for t in tasks:
+            t.do_step(1).AndReturn(None)
+        for t in tasks:
+            scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+            t.do_step(2).AndReturn(None)
+            scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+            t.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        tg = scheduler.PollingTaskGroup(tasks)
+        scheduler.TaskRunner(tg)()
+
+    def test_kwargs(self):
+        input_kwargs = {'i': [0, 1, 2],
+                        'i2': [0, 1, 4]}
+
+        output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
+
+        expected_kwargs = [{'i': 0, 'i2': 0},
+                           {'i': 1, 'i2': 1},
+                           {'i': 2, 'i2': 4}]
+
+        self.assertEqual(list(output_kwargs), expected_kwargs)
+
+    def test_kwargs_short(self):
+        input_kwargs = {'i': [0, 1, 2],
+                        'i2': [0]}
+
+        output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
+
+        expected_kwargs = [{'i': 0, 'i2': 0}]
+
+        self.assertEqual(list(output_kwargs), expected_kwargs)
+
+    def test_no_kwargs(self):
+        output_kwargs = scheduler.PollingTaskGroup._kwargs({})
+        self.assertEqual(list(output_kwargs), [])
+
+    def test_args(self):
+        input_args = ([0, 1, 2],
+                      [0, 1, 4])
+
+        output_args = scheduler.PollingTaskGroup._args(input_args)
+
+        expected_args = [(0, 0), (1, 1), (2, 4)]
+
+        self.assertEqual(list(output_args), expected_args)
+
+    def test_args_short(self):
+        input_args = ([0, 1, 2],
+                      [0])
+
+        output_args = scheduler.PollingTaskGroup._args(input_args)
+
+        expected_args = [(0, 0)]
+
+        self.assertEqual(list(output_args), expected_args)
+
+    def test_no_args(self):
+        output_args = scheduler.PollingTaskGroup._args([])
+        self.assertEqual(list(output_args), [])
+
+    @contextlib.contextmanager
+    def _args_test(self, *arg_lists, **kwarg_lists):
+        dummy = DummyTask(1)
+
+        tg = scheduler.PollingTaskGroup.from_task_with_args(dummy,
+                                                            *arg_lists,
+                                                            **kwarg_lists)
+
+        self.mox.StubOutWithMock(dummy, 'do_step')
+        yield dummy
+
+        self.mox.ReplayAll()
+        scheduler.TaskRunner(tg)(wait_time=None)
+        self.mox.VerifyAll()
+
+    def test_with_all_args(self):
+        with self._args_test([0, 1, 2], [0, 1, 8],
+                             i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+            for i in range(3):
+                dummy.do_step(1, i, i * i * i, i=i, i2=i * i)
+
+    def test_with_short_args(self):
+        with self._args_test([0, 1, 2], [0, 1],
+                             i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+            for i in range(2):
+                dummy.do_step(1, i, i * i, i=i, i2=i * i)
+
+    def test_with_short_kwargs(self):
+        with self._args_test([0, 1, 2], [0, 1, 8],
+                             i=[0, 1], i2=[0, 1, 4]) as dummy:
+            for i in range(2):
+                dummy.do_step(1, i, i * i, i=i, i2=i * i)
+
+    def test_with_empty_args(self):
+        with self._args_test([],
+                             i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+            pass
+
+    def test_with_empty_kwargs(self):
+        with self._args_test([0, 1, 2], [0, 1, 8],
+                             i=[]) as dummy:
+            pass
+
+    def test_with_no_args(self):
+        with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+            for i in range(3):
+                dummy.do_step(1, i=i, i2=i * i)
+
+    def test_with_no_kwargs(self):
+        with self._args_test([0, 1, 2], [0, 1, 4]) as dummy:
+            for i in range(3):
+                dummy.do_step(1, i, i * i)
+
+
 class TaskTest(mox.MoxTestBase):
 
     def test_run(self):