]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add a scheduler module
authorZane Bitter <zbitter@redhat.com>
Mon, 29 Apr 2013 09:28:37 +0000 (11:28 +0200)
committerZane Bitter <zbitter@redhat.com>
Mon, 29 Apr 2013 09:28:37 +0000 (11:28 +0200)
Utility routines for running coroutine-based tasks.

Change-Id: I907d889a564a1118b7c9caa30f3ad65dfac76534

heat/engine/scheduler.py [new file with mode: 0644]
heat/tests/test_scheduler.py [new file with mode: 0644]

diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py
new file mode 100644 (file)
index 0000000..986c9c3
--- /dev/null
@@ -0,0 +1,141 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import eventlet
+import types
+
+from heat.openstack.common import log as logging
+
+logger = logging.getLogger(__name__)
+
+
+def task_description(task):
+    """
+    Return a human-readable string description of a task suitable for logging
+    the status of the task.
+    """
+    if isinstance(task, types.MethodType):
+        name = getattr(task, '__name__')
+        obj = getattr(task, '__self__')
+        if name is not None and obj is not None:
+            return '%s from %s' % (name, obj)
+    return repr(task)
+
+
+class TaskRunner(object):
+    """
+    Wrapper for a resumable task (co-routine).
+    """
+
+    def __init__(self, task, *args, **kwargs):
+        """
+        Initialise with a task function, and arguments to be passed to it when
+        it is started.
+
+        The task function may be a co-routine that yields control flow between
+        steps.
+        """
+        assert callable(task), "Task is not callable"
+
+        self._task = task
+        self._args = args
+        self._kwargs = kwargs
+        self._runner = None
+        self._done = False
+        self.name = task_description(task)
+
+    def __str__(self):
+        """Return a human-readable string representation of the task."""
+        return 'Task %s' % self.name
+
+    def _sleep(self, wait_time):
+        """Sleep for the specified number of seconds."""
+        if wait_time is not None:
+            logger.debug('%s sleeping' % str(self))
+            eventlet.sleep(wait_time)
+
+    def __call__(self, wait_time=1):
+        """
+        Start and run the task to completion.
+
+        The task will sleep for `wait_time` seconds between steps. To avoid
+        sleeping, pass `None` for `wait_time`.
+        """
+        self.start()
+        self.run_to_completion(wait_time=wait_time)
+
+    def start(self):
+        """
+        Initialise the task and run its first step.
+        """
+        assert self._runner is None, "Task already started"
+
+        logger.debug('%s starting' % str(self))
+
+        result = self._task(*self._args, **self._kwargs)
+        if isinstance(result, types.GeneratorType):
+            self._runner = result
+            self.step()
+        else:
+            self._runner = False
+            self._done = True
+            logger.debug('%s done (not resumable)' % str(self))
+
+    def step(self):
+        """
+        Run another step of the task, and return True if the task is complete;
+        False otherwise.
+        """
+        if not self.done():
+            assert self._runner is not None, "Task not started"
+
+            logger.debug('%s running' % str(self))
+
+            try:
+                next(self._runner)
+            except StopIteration:
+                self._done = True
+                logger.debug('%s complete' % str(self))
+
+        return self._done
+
+    def run_to_completion(self, wait_time=1):
+        """
+        Run the task to completion.
+
+        The task will sleep for `wait_time` seconds between steps. To avoid
+        sleeping, pass `None` for `wait_time`.
+        """
+        while not self.step():
+            self._sleep(wait_time)
+
+    def cancel(self):
+        """Cancel the task if it is running."""
+        if self.started() and not self.done():
+            logger.debug('%s cancelled' % str(self))
+            self._runner.close()
+            self._done = True
+
+    def started(self):
+        """Return True if the task has been started."""
+        return self._runner is not None
+
+    def done(self):
+        """Return True if the task is complete."""
+        return self._done
+
+    def __nonzero__(self):
+        """Return True if there are steps remaining."""
+        return not self.done()
diff --git a/heat/tests/test_scheduler.py b/heat/tests/test_scheduler.py
new file mode 100644 (file)
index 0000000..ad75b6e
--- /dev/null
@@ -0,0 +1,229 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mox
+from nose.plugins.attrib import attr
+
+import eventlet
+
+from heat.engine import scheduler
+
+
+class DummyTask(object):
+    def __init__(self, num_steps=3):
+        self.num_steps = num_steps
+
+    def __call__(self, *args, **kwargs):
+        for i in range(1, self.num_steps + 1):
+            self.do_step(i)
+            yield
+
+    def do_step(self, step_num):
+        print self, step_num
+
+
+@attr(tag=['unit', 'scheduler'])
+@attr(speed='fast')
+class TaskTest(mox.MoxTestBase):
+
+    def test_run(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+        scheduler.TaskRunner._sleep(1).AndReturn(None)
+        task.do_step(2).AndReturn(None)
+        scheduler.TaskRunner._sleep(1).AndReturn(None)
+        task.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        scheduler.TaskRunner(task)()
+
+    def test_run_wait_time(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+        scheduler.TaskRunner._sleep(42).AndReturn(None)
+        task.do_step(2).AndReturn(None)
+        scheduler.TaskRunner._sleep(42).AndReturn(None)
+        task.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        scheduler.TaskRunner(task)(wait_time=42)
+
+    def test_start_run(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+        scheduler.TaskRunner._sleep(1).AndReturn(None)
+        task.do_step(2).AndReturn(None)
+        scheduler.TaskRunner._sleep(1).AndReturn(None)
+        task.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+        runner.start()
+        runner.run_to_completion()
+
+    def test_start_run_wait_time(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+        scheduler.TaskRunner._sleep(24).AndReturn(None)
+        task.do_step(2).AndReturn(None)
+        scheduler.TaskRunner._sleep(24).AndReturn(None)
+        task.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+        runner.start()
+        runner.run_to_completion(wait_time=24)
+
+    def test_sleep(self):
+        sleep_time = 42
+        self.mox.StubOutWithMock(eventlet, 'sleep')
+        eventlet.sleep(sleep_time).MultipleTimes().AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(DummyTask())
+        runner(wait_time=sleep_time)
+
+    def test_sleep_zero(self):
+        self.mox.StubOutWithMock(eventlet, 'sleep')
+        eventlet.sleep(0).MultipleTimes().AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(DummyTask())
+        runner(wait_time=0)
+
+    def test_sleep_none(self):
+        self.mox.StubOutWithMock(eventlet, 'sleep')
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(DummyTask())
+        runner(wait_time=None)
+
+    def test_args(self):
+        args = ['foo', 'bar']
+        kwargs = {'baz': 'quux', 'blarg': 'wibble'}
+
+        self.mox.StubOutWithMock(DummyTask, '__call__')
+        task = DummyTask()
+
+        task(*args, **kwargs)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task, *args, **kwargs)
+        runner(wait_time=None)
+
+    def test_non_callable(self):
+        self.assertRaises(AssertionError, scheduler.TaskRunner, object())
+
+    def test_stepping(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+        task.do_step(2).AndReturn(None)
+        task.do_step(3).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+        runner.start()
+
+        self.assertFalse(runner.step())
+        self.assertTrue(runner)
+        self.assertFalse(runner.step())
+        self.assertTrue(runner.step())
+        self.assertFalse(runner)
+
+    def test_start_no_steps(self):
+        task = DummyTask(0)
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+        runner.start()
+
+        self.assertTrue(runner.done())
+        self.assertTrue(runner.step())
+
+    def test_start_only(self):
+        task = DummyTask()
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        task.do_step(1).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+
+        self.assertFalse(runner.started())
+        runner.start()
+        self.assertTrue(runner.started())
+
+    def test_double_start(self):
+        runner = scheduler.TaskRunner(DummyTask())
+
+        runner.start()
+        self.assertRaises(AssertionError, runner.start)
+
+    def test_call_double_start(self):
+        runner = scheduler.TaskRunner(DummyTask())
+
+        runner(wait_time=None)
+        self.assertRaises(AssertionError, runner.start)
+
+    def test_start_function(self):
+        def task():
+            pass
+
+        runner = scheduler.TaskRunner(task)
+
+        runner.start()
+        self.assertTrue(runner.started())
+        self.assertTrue(runner.done())
+        self.assertTrue(runner.step())
+
+    def test_repeated_done(self):
+        task = DummyTask(0)
+        self.mox.StubOutWithMock(task, 'do_step')
+        self.mox.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+
+        runner.start()
+        self.assertTrue(runner.step())
+        self.assertTrue(runner.step())