]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add a timeout option to the scheduler
authorZane Bitter <zbitter@redhat.com>
Mon, 13 May 2013 15:55:41 +0000 (17:55 +0200)
committerZane Bitter <zbitter@redhat.com>
Mon, 13 May 2013 15:55:46 +0000 (17:55 +0200)
Change-Id: Ia796c5027c8faeefb9ddf48d88583d3d7901448d

heat/engine/scheduler.py
heat/tests/test_scheduler.py

index e0dbf4ebe77518c64f6719e37267a3b01c7bc06e..d6a48c59abdf8ddc0ad28197b386956b3cdc5ebb 100644 (file)
@@ -18,6 +18,7 @@ import functools
 import itertools
 import sys
 import types
+from time import time as wallclock
 
 from heat.openstack.common import excutils
 from heat.openstack.common import log as logging
@@ -38,6 +39,35 @@ def task_description(task):
     return repr(task)
 
 
+class Timeout(BaseException):
+    """
+    Timeout exception, raised within a task when it has exceeded its allotted
+    (wallclock) running time.
+
+    This allows the task to perform any necessary cleanup, as well as use a
+    different exception to notify the controlling task if appropriate. If the
+    task supresses the exception altogether, it will be cancelled but the
+    controlling task will not be notified of the timeout.
+    """
+
+    def __init__(self, task_runner, timeout):
+        """
+        Initialise with the TaskRunner and a timeout period in seconds.
+        """
+        message = _('%s Timed out') % task_runner
+        super(Timeout, self).__init__(message)
+
+        # Note that we don't attempt to handle leap seconds or large clock
+        # jumps here. The latter are assumed to be rare and the former
+        # negligible in the context of the timeout. Time zone adjustments,
+        # Daylight Savings and the like *are* handled. PEP 418 adds a proper
+        # monotonic clock, but only in Python 3.3.
+        self._endtime = wallclock() + timeout
+
+    def expired(self):
+        return wallclock() > self._endtime
+
+
 class TaskRunner(object):
     """
     Wrapper for a resumable task (co-routine).
@@ -58,6 +88,7 @@ class TaskRunner(object):
         self._kwargs = kwargs
         self._runner = None
         self._done = False
+        self._timeout = None
         self.name = task_description(task)
 
     def __str__(self):
@@ -70,24 +101,31 @@ class TaskRunner(object):
             logger.debug('%s sleeping' % str(self))
             eventlet.sleep(wait_time)
 
-    def __call__(self, wait_time=1):
+    def __call__(self, wait_time=1, timeout=None):
         """
         Start and run the task to completion.
 
         The task will sleep for `wait_time` seconds between steps. To avoid
         sleeping, pass `None` for `wait_time`.
         """
-        self.start()
+        self.start(timeout=timeout)
         self.run_to_completion(wait_time=wait_time)
 
-    def start(self):
+    def start(self, timeout=None):
         """
         Initialise the task and run its first step.
+
+        If a timeout is specified, any attempt to step the task after that
+        number of seconds has elapsed will result in a Timeout being
+        raised inside the task.
         """
         assert self._runner is None, "Task already started"
 
         logger.debug('%s starting' % str(self))
 
+        if timeout is not None:
+            self._timeout = Timeout(self, timeout)
+
         result = self._task(*self._args, **self._kwargs)
         if isinstance(result, types.GeneratorType):
             self._runner = result
@@ -105,13 +143,24 @@ class TaskRunner(object):
         if not self.done():
             assert self._runner is not None, "Task not started"
 
-            logger.debug('%s running' % str(self))
+            if self._timeout is not None and self._timeout.expired():
+                logger.info('%s timed out' % str(self))
 
-            try:
-                next(self._runner)
-            except StopIteration:
-                self._done = True
-                logger.debug('%s complete' % str(self))
+                try:
+                    self._runner.throw(self._timeout)
+                except StopIteration:
+                    self._done = True
+                else:
+                    # Clean up in case task swallows exception without exiting
+                    self.cancel()
+            else:
+                logger.debug('%s running' % str(self))
+
+                try:
+                    next(self._runner)
+                except StopIteration:
+                    self._done = True
+                    logger.debug('%s complete' % str(self))
 
         return self._done
 
index 206f24f78414896b1a460a8a07a7ff5d4c34c789..6df29fef0f7d0e1554799ca8460ac960dd9e1a74 100644 (file)
@@ -352,6 +352,82 @@ class TaskTest(mox.MoxTestBase):
         self.assertTrue(runner.step())
         self.assertTrue(runner.step())
 
+    def test_timeout(self):
+        st = scheduler.wallclock()
+
+        def task():
+            while True:
+                yield
+
+        self.mox.StubOutWithMock(scheduler, 'wallclock')
+        scheduler.wallclock().AndReturn(st)
+        scheduler.wallclock().AndReturn(st + 0.5)
+        scheduler.wallclock().AndReturn(st + 1.5)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+
+        runner.start(timeout=1)
+        self.assertTrue(runner)
+        self.assertRaises(scheduler.Timeout, runner.step)
+
+        self.mox.VerifyAll()
+
+    def test_timeout_return(self):
+        st = scheduler.wallclock()
+
+        def task():
+            while True:
+                try:
+                    yield
+                except scheduler.Timeout:
+                    return
+
+        self.mox.StubOutWithMock(scheduler, 'wallclock')
+        scheduler.wallclock().AndReturn(st)
+        scheduler.wallclock().AndReturn(st + 0.5)
+        scheduler.wallclock().AndReturn(st + 1.5)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+
+        runner.start(timeout=1)
+        self.assertTrue(runner)
+        self.assertTrue(runner.step())
+        self.assertFalse(runner)
+
+        self.mox.VerifyAll()
+
+    def test_timeout_swallowed(self):
+        st = scheduler.wallclock()
+
+        def task():
+            while True:
+                try:
+                    yield
+                except scheduler.Timeout:
+                    yield
+                    self.fail('Task still running')
+
+        self.mox.StubOutWithMock(scheduler, 'wallclock')
+        scheduler.wallclock().AndReturn(st)
+        scheduler.wallclock().AndReturn(st + 0.5)
+        scheduler.wallclock().AndReturn(st + 1.5)
+
+        self.mox.ReplayAll()
+
+        runner = scheduler.TaskRunner(task)
+
+        runner.start(timeout=1)
+        self.assertTrue(runner)
+        self.assertTrue(runner.step())
+        self.assertFalse(runner)
+        self.assertTrue(runner.step())
+
+        self.mox.VerifyAll()
+
 
 class WrapperTaskTest(mox.MoxTestBase):