import itertools
import sys
import types
+from time import time as wallclock
from heat.openstack.common import excutils
from heat.openstack.common import log as logging
return repr(task)
+class Timeout(BaseException):
+ """
+ Timeout exception, raised within a task when it has exceeded its allotted
+ (wallclock) running time.
+
+ This allows the task to perform any necessary cleanup, as well as use a
+ different exception to notify the controlling task if appropriate. If the
+ task supresses the exception altogether, it will be cancelled but the
+ controlling task will not be notified of the timeout.
+ """
+
+ def __init__(self, task_runner, timeout):
+ """
+ Initialise with the TaskRunner and a timeout period in seconds.
+ """
+ message = _('%s Timed out') % task_runner
+ super(Timeout, self).__init__(message)
+
+ # Note that we don't attempt to handle leap seconds or large clock
+ # jumps here. The latter are assumed to be rare and the former
+ # negligible in the context of the timeout. Time zone adjustments,
+ # Daylight Savings and the like *are* handled. PEP 418 adds a proper
+ # monotonic clock, but only in Python 3.3.
+ self._endtime = wallclock() + timeout
+
+ def expired(self):
+ return wallclock() > self._endtime
+
+
class TaskRunner(object):
"""
Wrapper for a resumable task (co-routine).
self._kwargs = kwargs
self._runner = None
self._done = False
+ self._timeout = None
self.name = task_description(task)
def __str__(self):
logger.debug('%s sleeping' % str(self))
eventlet.sleep(wait_time)
- def __call__(self, wait_time=1):
+ def __call__(self, wait_time=1, timeout=None):
"""
Start and run the task to completion.
The task will sleep for `wait_time` seconds between steps. To avoid
sleeping, pass `None` for `wait_time`.
"""
- self.start()
+ self.start(timeout=timeout)
self.run_to_completion(wait_time=wait_time)
- def start(self):
+ def start(self, timeout=None):
"""
Initialise the task and run its first step.
+
+ If a timeout is specified, any attempt to step the task after that
+ number of seconds has elapsed will result in a Timeout being
+ raised inside the task.
"""
assert self._runner is None, "Task already started"
logger.debug('%s starting' % str(self))
+ if timeout is not None:
+ self._timeout = Timeout(self, timeout)
+
result = self._task(*self._args, **self._kwargs)
if isinstance(result, types.GeneratorType):
self._runner = result
if not self.done():
assert self._runner is not None, "Task not started"
- logger.debug('%s running' % str(self))
+ if self._timeout is not None and self._timeout.expired():
+ logger.info('%s timed out' % str(self))
- try:
- next(self._runner)
- except StopIteration:
- self._done = True
- logger.debug('%s complete' % str(self))
+ try:
+ self._runner.throw(self._timeout)
+ except StopIteration:
+ self._done = True
+ else:
+ # Clean up in case task swallows exception without exiting
+ self.cancel()
+ else:
+ logger.debug('%s running' % str(self))
+
+ try:
+ next(self._runner)
+ except StopIteration:
+ self._done = True
+ logger.debug('%s complete' % str(self))
return self._done
self.assertTrue(runner.step())
self.assertTrue(runner.step())
+ def test_timeout(self):
+ st = scheduler.wallclock()
+
+ def task():
+ while True:
+ yield
+
+ self.mox.StubOutWithMock(scheduler, 'wallclock')
+ scheduler.wallclock().AndReturn(st)
+ scheduler.wallclock().AndReturn(st + 0.5)
+ scheduler.wallclock().AndReturn(st + 1.5)
+
+ self.mox.ReplayAll()
+
+ runner = scheduler.TaskRunner(task)
+
+ runner.start(timeout=1)
+ self.assertTrue(runner)
+ self.assertRaises(scheduler.Timeout, runner.step)
+
+ self.mox.VerifyAll()
+
+ def test_timeout_return(self):
+ st = scheduler.wallclock()
+
+ def task():
+ while True:
+ try:
+ yield
+ except scheduler.Timeout:
+ return
+
+ self.mox.StubOutWithMock(scheduler, 'wallclock')
+ scheduler.wallclock().AndReturn(st)
+ scheduler.wallclock().AndReturn(st + 0.5)
+ scheduler.wallclock().AndReturn(st + 1.5)
+
+ self.mox.ReplayAll()
+
+ runner = scheduler.TaskRunner(task)
+
+ runner.start(timeout=1)
+ self.assertTrue(runner)
+ self.assertTrue(runner.step())
+ self.assertFalse(runner)
+
+ self.mox.VerifyAll()
+
+ def test_timeout_swallowed(self):
+ st = scheduler.wallclock()
+
+ def task():
+ while True:
+ try:
+ yield
+ except scheduler.Timeout:
+ yield
+ self.fail('Task still running')
+
+ self.mox.StubOutWithMock(scheduler, 'wallclock')
+ scheduler.wallclock().AndReturn(st)
+ scheduler.wallclock().AndReturn(st + 0.5)
+ scheduler.wallclock().AndReturn(st + 1.5)
+
+ self.mox.ReplayAll()
+
+ runner = scheduler.TaskRunner(task)
+
+ runner.start(timeout=1)
+ self.assertTrue(runner)
+ self.assertTrue(runner.step())
+ self.assertFalse(runner)
+ self.assertTrue(runner.step())
+
+ self.mox.VerifyAll()
+
class WrapperTaskTest(mox.MoxTestBase):