From 160490d3afbf2de25edb2bb4d20f27672b523395 Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Mon, 29 Apr 2013 11:28:37 +0200 Subject: [PATCH] Add a scheduler module Utility routines for running coroutine-based tasks. Change-Id: I907d889a564a1118b7c9caa30f3ad65dfac76534 --- heat/engine/scheduler.py | 141 +++++++++++++++++++++ heat/tests/test_scheduler.py | 229 +++++++++++++++++++++++++++++++++++ 2 files changed, 370 insertions(+) create mode 100644 heat/engine/scheduler.py create mode 100644 heat/tests/test_scheduler.py diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py new file mode 100644 index 00000000..986c9c3b --- /dev/null +++ b/heat/engine/scheduler.py @@ -0,0 +1,141 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import types + +from heat.openstack.common import log as logging + +logger = logging.getLogger(__name__) + + +def task_description(task): + """ + Return a human-readable string description of a task suitable for logging + the status of the task. + """ + if isinstance(task, types.MethodType): + name = getattr(task, '__name__') + obj = getattr(task, '__self__') + if name is not None and obj is not None: + return '%s from %s' % (name, obj) + return repr(task) + + +class TaskRunner(object): + """ + Wrapper for a resumable task (co-routine). + """ + + def __init__(self, task, *args, **kwargs): + """ + Initialise with a task function, and arguments to be passed to it when + it is started. + + The task function may be a co-routine that yields control flow between + steps. + """ + assert callable(task), "Task is not callable" + + self._task = task + self._args = args + self._kwargs = kwargs + self._runner = None + self._done = False + self.name = task_description(task) + + def __str__(self): + """Return a human-readable string representation of the task.""" + return 'Task %s' % self.name + + def _sleep(self, wait_time): + """Sleep for the specified number of seconds.""" + if wait_time is not None: + logger.debug('%s sleeping' % str(self)) + eventlet.sleep(wait_time) + + def __call__(self, wait_time=1): + """ + Start and run the task to completion. + + The task will sleep for `wait_time` seconds between steps. To avoid + sleeping, pass `None` for `wait_time`. + """ + self.start() + self.run_to_completion(wait_time=wait_time) + + def start(self): + """ + Initialise the task and run its first step. + """ + assert self._runner is None, "Task already started" + + logger.debug('%s starting' % str(self)) + + result = self._task(*self._args, **self._kwargs) + if isinstance(result, types.GeneratorType): + self._runner = result + self.step() + else: + self._runner = False + self._done = True + logger.debug('%s done (not resumable)' % str(self)) + + def step(self): + """ + Run another step of the task, and return True if the task is complete; + False otherwise. + """ + if not self.done(): + assert self._runner is not None, "Task not started" + + logger.debug('%s running' % str(self)) + + try: + next(self._runner) + except StopIteration: + self._done = True + logger.debug('%s complete' % str(self)) + + return self._done + + def run_to_completion(self, wait_time=1): + """ + Run the task to completion. + + The task will sleep for `wait_time` seconds between steps. To avoid + sleeping, pass `None` for `wait_time`. + """ + while not self.step(): + self._sleep(wait_time) + + def cancel(self): + """Cancel the task if it is running.""" + if self.started() and not self.done(): + logger.debug('%s cancelled' % str(self)) + self._runner.close() + self._done = True + + def started(self): + """Return True if the task has been started.""" + return self._runner is not None + + def done(self): + """Return True if the task is complete.""" + return self._done + + def __nonzero__(self): + """Return True if there are steps remaining.""" + return not self.done() diff --git a/heat/tests/test_scheduler.py b/heat/tests/test_scheduler.py new file mode 100644 index 00000000..ad75b6e2 --- /dev/null +++ b/heat/tests/test_scheduler.py @@ -0,0 +1,229 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mox +from nose.plugins.attrib import attr + +import eventlet + +from heat.engine import scheduler + + +class DummyTask(object): + def __init__(self, num_steps=3): + self.num_steps = num_steps + + def __call__(self, *args, **kwargs): + for i in range(1, self.num_steps + 1): + self.do_step(i) + yield + + def do_step(self, step_num): + print self, step_num + + +@attr(tag=['unit', 'scheduler']) +@attr(speed='fast') +class TaskTest(mox.MoxTestBase): + + def test_run(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + scheduler.TaskRunner(task)() + + def test_run_wait_time(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(42).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(42).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + scheduler.TaskRunner(task)(wait_time=42) + + def test_start_run(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(1).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + runner.run_to_completion() + + def test_start_run_wait_time(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + scheduler.TaskRunner._sleep(24).AndReturn(None) + task.do_step(2).AndReturn(None) + scheduler.TaskRunner._sleep(24).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + runner.run_to_completion(wait_time=24) + + def test_sleep(self): + sleep_time = 42 + self.mox.StubOutWithMock(eventlet, 'sleep') + eventlet.sleep(sleep_time).MultipleTimes().AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(DummyTask()) + runner(wait_time=sleep_time) + + def test_sleep_zero(self): + self.mox.StubOutWithMock(eventlet, 'sleep') + eventlet.sleep(0).MultipleTimes().AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(DummyTask()) + runner(wait_time=0) + + def test_sleep_none(self): + self.mox.StubOutWithMock(eventlet, 'sleep') + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(DummyTask()) + runner(wait_time=None) + + def test_args(self): + args = ['foo', 'bar'] + kwargs = {'baz': 'quux', 'blarg': 'wibble'} + + self.mox.StubOutWithMock(DummyTask, '__call__') + task = DummyTask() + + task(*args, **kwargs) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task, *args, **kwargs) + runner(wait_time=None) + + def test_non_callable(self): + self.assertRaises(AssertionError, scheduler.TaskRunner, object()) + + def test_stepping(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + task.do_step(2).AndReturn(None) + task.do_step(3).AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + + self.assertFalse(runner.step()) + self.assertTrue(runner) + self.assertFalse(runner.step()) + self.assertTrue(runner.step()) + self.assertFalse(runner) + + def test_start_no_steps(self): + task = DummyTask(0) + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + runner.start() + + self.assertTrue(runner.done()) + self.assertTrue(runner.step()) + + def test_start_only(self): + task = DummyTask() + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + task.do_step(1).AndReturn(None) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + + self.assertFalse(runner.started()) + runner.start() + self.assertTrue(runner.started()) + + def test_double_start(self): + runner = scheduler.TaskRunner(DummyTask()) + + runner.start() + self.assertRaises(AssertionError, runner.start) + + def test_call_double_start(self): + runner = scheduler.TaskRunner(DummyTask()) + + runner(wait_time=None) + self.assertRaises(AssertionError, runner.start) + + def test_start_function(self): + def task(): + pass + + runner = scheduler.TaskRunner(task) + + runner.start() + self.assertTrue(runner.started()) + self.assertTrue(runner.done()) + self.assertTrue(runner.step()) + + def test_repeated_done(self): + task = DummyTask(0) + self.mox.StubOutWithMock(task, 'do_step') + self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep') + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + + runner.start() + self.assertTrue(runner.step()) + self.assertTrue(runner.step()) -- 2.45.2