From 3d21cdb263c14a06c14208169e72d565fc0424d5 Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Tue, 7 May 2013 15:10:09 +0200 Subject: [PATCH] Add a PollingTaskGroup task A PollingTaskGroup can monitor a collection of subtasks. Change-Id: I5db4e7a9d32647397f3229e7654aa0fb68d794ce --- heat/engine/scheduler.py | 98 ++++++++++++++++++++++++++ heat/tests/test_scheduler.py | 131 ++++++++++++++++++++++++++++++++++- 2 files changed, 227 insertions(+), 2 deletions(-) diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index 986c9c3b..a321d3e8 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -14,8 +14,11 @@ # under the License. import eventlet +import functools +import itertools import types +from heat.openstack.common import excutils from heat.openstack.common import log as logging logger = logging.getLogger(__name__) @@ -139,3 +142,98 @@ class TaskRunner(object): def __nonzero__(self): """Return True if there are steps remaining.""" return not self.done() + + +class PollingTaskGroup(object): + """ + A task which manages a group of subtasks. + + When the task is started, all of its subtasks are also started. The task + completes when all subtasks are complete. + + Once started, the subtasks are assumed to be only polling for completion + of an asynchronous operation, so no attempt is made to give them equal + scheduling slots. + """ + + def __init__(self, tasks, name=None): + """Initialise with a list of tasks""" + self._tasks = list(tasks) + if name is None: + name = ', '.join(task_description(t) for t in self._tasks) + self.name = name + + @staticmethod + def _args(arg_lists): + """Return a list containing the positional args for each subtask.""" + return zip(*arg_lists) + + @staticmethod + def _kwargs(kwarg_lists): + """Return a list containing the keyword args for each subtask.""" + keygroups = (itertools.izip(itertools.repeat(name), + arglist) + for name, arglist in kwarg_lists.iteritems()) + return [dict(kwargs) for kwargs in itertools.izip(*keygroups)] + + @classmethod + def from_task_with_args(cls, task, *arg_lists, **kwarg_lists): + """ + Return a new PollingTaskGroup where each subtask is identical except + for the arguments passed to it. + + Each argument to use should be passed as a list (or iterable) of values + such that one is passed in the corresponding position for each subtask. + The number of subtasks spawned depends on the length of the argument + lists. For example: + + PollingTaskGroup.from_task_with_args(my_task, + [1, 2, 3], + alpha=['a', 'b', 'c']) + + will start three TaskRunners that will run: + + my_task(1, alpha='a') + my_task(2, alpha='b') + my_task(3, alpha='c') + + respectively. + + If multiple arguments are supplied, each list should be of the same + length. In the case of any discrepancy, the length of the shortest + argument list will be used, and any extra arguments discarded. + """ + + args_list = cls._args(arg_lists) + kwargs_list = cls._kwargs(kwarg_lists) + + if kwarg_lists and not arg_lists: + args_list = [[]] * len(kwargs_list) + elif arg_lists and not kwarg_lists: + kwargs_list = [{}] * len(args_list) + + task_args = itertools.izip(args_list, kwargs_list) + tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args) + + return cls(tasks, name=task_description(task)) + + def __repr__(self): + """Return a string representation of the task group.""" + return '%s(%s)' % (type(self).__name__, self.name) + + def __call__(self): + """Return a co-routine which runs the task group""" + runners = [TaskRunner(t) for t in self._tasks] + + try: + for r in runners: + r.start() + + while runners: + yield + runners = list(itertools.dropwhile(lambda r: r.step(), + runners)) + except: + with excutils.save_and_reraise_exception(): + for r in runners: + r.cancel() diff --git a/heat/tests/test_scheduler.py b/heat/tests/test_scheduler.py index 8b72aa2a..8fd2630f 100644 --- a/heat/tests/test_scheduler.py +++ b/heat/tests/test_scheduler.py @@ -14,6 +14,7 @@ import mox +import contextlib import eventlet from heat.engine import scheduler @@ -25,13 +26,139 @@ class DummyTask(object): def __call__(self, *args, **kwargs): for i in range(1, self.num_steps + 1): - self.do_step(i) + self.do_step(i, *args, **kwargs) yield - def do_step(self, step_num): + def do_step(self, step_num, *args, **kwargs): print self, step_num +class PollingTaskGroupTest(mox.MoxTestBase): + + def test_group(self): + tasks = [DummyTask() for i in range(3)] + for t in tasks: + self.mox.StubOutWithMock(t, 'do_step') + + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + for t in tasks: + t.do_step(1).AndReturn(None) + for t in tasks: + scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None) + t.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None) + t.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + tg = scheduler.PollingTaskGroup(tasks) + scheduler.TaskRunner(tg)() + + def test_kwargs(self): + input_kwargs = {'i': [0, 1, 2], + 'i2': [0, 1, 4]} + + output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs) + + expected_kwargs = [{'i': 0, 'i2': 0}, + {'i': 1, 'i2': 1}, + {'i': 2, 'i2': 4}] + + self.assertEqual(list(output_kwargs), expected_kwargs) + + def test_kwargs_short(self): + input_kwargs = {'i': [0, 1, 2], + 'i2': [0]} + + output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs) + + expected_kwargs = [{'i': 0, 'i2': 0}] + + self.assertEqual(list(output_kwargs), expected_kwargs) + + def test_no_kwargs(self): + output_kwargs = scheduler.PollingTaskGroup._kwargs({}) + self.assertEqual(list(output_kwargs), []) + + def test_args(self): + input_args = ([0, 1, 2], + [0, 1, 4]) + + output_args = scheduler.PollingTaskGroup._args(input_args) + + expected_args = [(0, 0), (1, 1), (2, 4)] + + self.assertEqual(list(output_args), expected_args) + + def test_args_short(self): + input_args = ([0, 1, 2], + [0]) + + output_args = scheduler.PollingTaskGroup._args(input_args) + + expected_args = [(0, 0)] + + self.assertEqual(list(output_args), expected_args) + + def test_no_args(self): + output_args = scheduler.PollingTaskGroup._args([]) + self.assertEqual(list(output_args), []) + + @contextlib.contextmanager + def _args_test(self, *arg_lists, **kwarg_lists): + dummy = DummyTask(1) + + tg = scheduler.PollingTaskGroup.from_task_with_args(dummy, + *arg_lists, + **kwarg_lists) + + self.mox.StubOutWithMock(dummy, 'do_step') + yield dummy + + self.mox.ReplayAll() + scheduler.TaskRunner(tg)(wait_time=None) + self.mox.VerifyAll() + + def test_with_all_args(self): + with self._args_test([0, 1, 2], [0, 1, 8], + i=[0, 1, 2], i2=[0, 1, 4]) as dummy: + for i in range(3): + dummy.do_step(1, i, i * i * i, i=i, i2=i * i) + + def test_with_short_args(self): + with self._args_test([0, 1, 2], [0, 1], + i=[0, 1, 2], i2=[0, 1, 4]) as dummy: + for i in range(2): + dummy.do_step(1, i, i * i, i=i, i2=i * i) + + def test_with_short_kwargs(self): + with self._args_test([0, 1, 2], [0, 1, 8], + i=[0, 1], i2=[0, 1, 4]) as dummy: + for i in range(2): + dummy.do_step(1, i, i * i, i=i, i2=i * i) + + def test_with_empty_args(self): + with self._args_test([], + i=[0, 1, 2], i2=[0, 1, 4]) as dummy: + pass + + def test_with_empty_kwargs(self): + with self._args_test([0, 1, 2], [0, 1, 8], + i=[]) as dummy: + pass + + def test_with_no_args(self): + with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy: + for i in range(3): + dummy.do_step(1, i=i, i2=i * i) + + def test_with_no_kwargs(self): + with self._args_test([0, 1, 2], [0, 1, 4]) as dummy: + for i in range(3): + dummy.do_step(1, i, i * i) + + class TaskTest(mox.MoxTestBase): def test_run(self): -- 2.45.2