# under the License.
import eventlet
+import functools
+import itertools
import types
+from heat.openstack.common import excutils
from heat.openstack.common import log as logging
logger = logging.getLogger(__name__)
def __nonzero__(self):
"""Return True if there are steps remaining."""
return not self.done()
+
+
+class PollingTaskGroup(object):
+ """
+ A task which manages a group of subtasks.
+
+ When the task is started, all of its subtasks are also started. The task
+ completes when all subtasks are complete.
+
+ Once started, the subtasks are assumed to be only polling for completion
+ of an asynchronous operation, so no attempt is made to give them equal
+ scheduling slots.
+ """
+
+ def __init__(self, tasks, name=None):
+ """Initialise with a list of tasks"""
+ self._tasks = list(tasks)
+ if name is None:
+ name = ', '.join(task_description(t) for t in self._tasks)
+ self.name = name
+
+ @staticmethod
+ def _args(arg_lists):
+ """Return a list containing the positional args for each subtask."""
+ return zip(*arg_lists)
+
+ @staticmethod
+ def _kwargs(kwarg_lists):
+ """Return a list containing the keyword args for each subtask."""
+ keygroups = (itertools.izip(itertools.repeat(name),
+ arglist)
+ for name, arglist in kwarg_lists.iteritems())
+ return [dict(kwargs) for kwargs in itertools.izip(*keygroups)]
+
+ @classmethod
+ def from_task_with_args(cls, task, *arg_lists, **kwarg_lists):
+ """
+ Return a new PollingTaskGroup where each subtask is identical except
+ for the arguments passed to it.
+
+ Each argument to use should be passed as a list (or iterable) of values
+ such that one is passed in the corresponding position for each subtask.
+ The number of subtasks spawned depends on the length of the argument
+ lists. For example:
+
+ PollingTaskGroup.from_task_with_args(my_task,
+ [1, 2, 3],
+ alpha=['a', 'b', 'c'])
+
+ will start three TaskRunners that will run:
+
+ my_task(1, alpha='a')
+ my_task(2, alpha='b')
+ my_task(3, alpha='c')
+
+ respectively.
+
+ If multiple arguments are supplied, each list should be of the same
+ length. In the case of any discrepancy, the length of the shortest
+ argument list will be used, and any extra arguments discarded.
+ """
+
+ args_list = cls._args(arg_lists)
+ kwargs_list = cls._kwargs(kwarg_lists)
+
+ if kwarg_lists and not arg_lists:
+ args_list = [[]] * len(kwargs_list)
+ elif arg_lists and not kwarg_lists:
+ kwargs_list = [{}] * len(args_list)
+
+ task_args = itertools.izip(args_list, kwargs_list)
+ tasks = (functools.partial(task, *a, **kwa) for a, kwa in task_args)
+
+ return cls(tasks, name=task_description(task))
+
+ def __repr__(self):
+ """Return a string representation of the task group."""
+ return '%s(%s)' % (type(self).__name__, self.name)
+
+ def __call__(self):
+ """Return a co-routine which runs the task group"""
+ runners = [TaskRunner(t) for t in self._tasks]
+
+ try:
+ for r in runners:
+ r.start()
+
+ while runners:
+ yield
+ runners = list(itertools.dropwhile(lambda r: r.step(),
+ runners))
+ except:
+ with excutils.save_and_reraise_exception():
+ for r in runners:
+ r.cancel()
import mox
+import contextlib
import eventlet
from heat.engine import scheduler
def __call__(self, *args, **kwargs):
for i in range(1, self.num_steps + 1):
- self.do_step(i)
+ self.do_step(i, *args, **kwargs)
yield
- def do_step(self, step_num):
+ def do_step(self, step_num, *args, **kwargs):
print self, step_num
+class PollingTaskGroupTest(mox.MoxTestBase):
+
+ def test_group(self):
+ tasks = [DummyTask() for i in range(3)]
+ for t in tasks:
+ self.mox.StubOutWithMock(t, 'do_step')
+
+ self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+ for t in tasks:
+ t.do_step(1).AndReturn(None)
+ for t in tasks:
+ scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+ t.do_step(2).AndReturn(None)
+ scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+ t.do_step(3).AndReturn(None)
+
+ self.mox.ReplayAll()
+
+ tg = scheduler.PollingTaskGroup(tasks)
+ scheduler.TaskRunner(tg)()
+
+ def test_kwargs(self):
+ input_kwargs = {'i': [0, 1, 2],
+ 'i2': [0, 1, 4]}
+
+ output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
+
+ expected_kwargs = [{'i': 0, 'i2': 0},
+ {'i': 1, 'i2': 1},
+ {'i': 2, 'i2': 4}]
+
+ self.assertEqual(list(output_kwargs), expected_kwargs)
+
+ def test_kwargs_short(self):
+ input_kwargs = {'i': [0, 1, 2],
+ 'i2': [0]}
+
+ output_kwargs = scheduler.PollingTaskGroup._kwargs(input_kwargs)
+
+ expected_kwargs = [{'i': 0, 'i2': 0}]
+
+ self.assertEqual(list(output_kwargs), expected_kwargs)
+
+ def test_no_kwargs(self):
+ output_kwargs = scheduler.PollingTaskGroup._kwargs({})
+ self.assertEqual(list(output_kwargs), [])
+
+ def test_args(self):
+ input_args = ([0, 1, 2],
+ [0, 1, 4])
+
+ output_args = scheduler.PollingTaskGroup._args(input_args)
+
+ expected_args = [(0, 0), (1, 1), (2, 4)]
+
+ self.assertEqual(list(output_args), expected_args)
+
+ def test_args_short(self):
+ input_args = ([0, 1, 2],
+ [0])
+
+ output_args = scheduler.PollingTaskGroup._args(input_args)
+
+ expected_args = [(0, 0)]
+
+ self.assertEqual(list(output_args), expected_args)
+
+ def test_no_args(self):
+ output_args = scheduler.PollingTaskGroup._args([])
+ self.assertEqual(list(output_args), [])
+
+ @contextlib.contextmanager
+ def _args_test(self, *arg_lists, **kwarg_lists):
+ dummy = DummyTask(1)
+
+ tg = scheduler.PollingTaskGroup.from_task_with_args(dummy,
+ *arg_lists,
+ **kwarg_lists)
+
+ self.mox.StubOutWithMock(dummy, 'do_step')
+ yield dummy
+
+ self.mox.ReplayAll()
+ scheduler.TaskRunner(tg)(wait_time=None)
+ self.mox.VerifyAll()
+
+ def test_with_all_args(self):
+ with self._args_test([0, 1, 2], [0, 1, 8],
+ i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+ for i in range(3):
+ dummy.do_step(1, i, i * i * i, i=i, i2=i * i)
+
+ def test_with_short_args(self):
+ with self._args_test([0, 1, 2], [0, 1],
+ i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+ for i in range(2):
+ dummy.do_step(1, i, i * i, i=i, i2=i * i)
+
+ def test_with_short_kwargs(self):
+ with self._args_test([0, 1, 2], [0, 1, 8],
+ i=[0, 1], i2=[0, 1, 4]) as dummy:
+ for i in range(2):
+ dummy.do_step(1, i, i * i, i=i, i2=i * i)
+
+ def test_with_empty_args(self):
+ with self._args_test([],
+ i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+ pass
+
+ def test_with_empty_kwargs(self):
+ with self._args_test([0, 1, 2], [0, 1, 8],
+ i=[]) as dummy:
+ pass
+
+ def test_with_no_args(self):
+ with self._args_test(i=[0, 1, 2], i2=[0, 1, 4]) as dummy:
+ for i in range(3):
+ dummy.do_step(1, i=i, i2=i * i)
+
+ def test_with_no_kwargs(self):
+ with self._args_test([0, 1, 2], [0, 1, 4]) as dummy:
+ for i in range(3):
+ dummy.do_step(1, i, i * i)
+
+
class TaskTest(mox.MoxTestBase):
def test_run(self):