From 8c3b308803ef5094ba1962d616e54318b3550d2a Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Mon, 13 May 2013 17:55:41 +0200 Subject: [PATCH] Add a timeout option to the scheduler Change-Id: Ia796c5027c8faeefb9ddf48d88583d3d7901448d --- heat/engine/scheduler.py | 67 ++++++++++++++++++++++++++----- heat/tests/test_scheduler.py | 76 ++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 9 deletions(-) diff --git a/heat/engine/scheduler.py b/heat/engine/scheduler.py index e0dbf4eb..d6a48c59 100644 --- a/heat/engine/scheduler.py +++ b/heat/engine/scheduler.py @@ -18,6 +18,7 @@ import functools import itertools import sys import types +from time import time as wallclock from heat.openstack.common import excutils from heat.openstack.common import log as logging @@ -38,6 +39,35 @@ def task_description(task): return repr(task) +class Timeout(BaseException): + """ + Timeout exception, raised within a task when it has exceeded its allotted + (wallclock) running time. + + This allows the task to perform any necessary cleanup, as well as use a + different exception to notify the controlling task if appropriate. If the + task supresses the exception altogether, it will be cancelled but the + controlling task will not be notified of the timeout. + """ + + def __init__(self, task_runner, timeout): + """ + Initialise with the TaskRunner and a timeout period in seconds. + """ + message = _('%s Timed out') % task_runner + super(Timeout, self).__init__(message) + + # Note that we don't attempt to handle leap seconds or large clock + # jumps here. The latter are assumed to be rare and the former + # negligible in the context of the timeout. Time zone adjustments, + # Daylight Savings and the like *are* handled. PEP 418 adds a proper + # monotonic clock, but only in Python 3.3. + self._endtime = wallclock() + timeout + + def expired(self): + return wallclock() > self._endtime + + class TaskRunner(object): """ Wrapper for a resumable task (co-routine). @@ -58,6 +88,7 @@ class TaskRunner(object): self._kwargs = kwargs self._runner = None self._done = False + self._timeout = None self.name = task_description(task) def __str__(self): @@ -70,24 +101,31 @@ class TaskRunner(object): logger.debug('%s sleeping' % str(self)) eventlet.sleep(wait_time) - def __call__(self, wait_time=1): + def __call__(self, wait_time=1, timeout=None): """ Start and run the task to completion. The task will sleep for `wait_time` seconds between steps. To avoid sleeping, pass `None` for `wait_time`. """ - self.start() + self.start(timeout=timeout) self.run_to_completion(wait_time=wait_time) - def start(self): + def start(self, timeout=None): """ Initialise the task and run its first step. + + If a timeout is specified, any attempt to step the task after that + number of seconds has elapsed will result in a Timeout being + raised inside the task. """ assert self._runner is None, "Task already started" logger.debug('%s starting' % str(self)) + if timeout is not None: + self._timeout = Timeout(self, timeout) + result = self._task(*self._args, **self._kwargs) if isinstance(result, types.GeneratorType): self._runner = result @@ -105,13 +143,24 @@ class TaskRunner(object): if not self.done(): assert self._runner is not None, "Task not started" - logger.debug('%s running' % str(self)) + if self._timeout is not None and self._timeout.expired(): + logger.info('%s timed out' % str(self)) - try: - next(self._runner) - except StopIteration: - self._done = True - logger.debug('%s complete' % str(self)) + try: + self._runner.throw(self._timeout) + except StopIteration: + self._done = True + else: + # Clean up in case task swallows exception without exiting + self.cancel() + else: + logger.debug('%s running' % str(self)) + + try: + next(self._runner) + except StopIteration: + self._done = True + logger.debug('%s complete' % str(self)) return self._done diff --git a/heat/tests/test_scheduler.py b/heat/tests/test_scheduler.py index 206f24f7..6df29fef 100644 --- a/heat/tests/test_scheduler.py +++ b/heat/tests/test_scheduler.py @@ -352,6 +352,82 @@ class TaskTest(mox.MoxTestBase): self.assertTrue(runner.step()) self.assertTrue(runner.step()) + def test_timeout(self): + st = scheduler.wallclock() + + def task(): + while True: + yield + + self.mox.StubOutWithMock(scheduler, 'wallclock') + scheduler.wallclock().AndReturn(st) + scheduler.wallclock().AndReturn(st + 0.5) + scheduler.wallclock().AndReturn(st + 1.5) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + + runner.start(timeout=1) + self.assertTrue(runner) + self.assertRaises(scheduler.Timeout, runner.step) + + self.mox.VerifyAll() + + def test_timeout_return(self): + st = scheduler.wallclock() + + def task(): + while True: + try: + yield + except scheduler.Timeout: + return + + self.mox.StubOutWithMock(scheduler, 'wallclock') + scheduler.wallclock().AndReturn(st) + scheduler.wallclock().AndReturn(st + 0.5) + scheduler.wallclock().AndReturn(st + 1.5) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + + runner.start(timeout=1) + self.assertTrue(runner) + self.assertTrue(runner.step()) + self.assertFalse(runner) + + self.mox.VerifyAll() + + def test_timeout_swallowed(self): + st = scheduler.wallclock() + + def task(): + while True: + try: + yield + except scheduler.Timeout: + yield + self.fail('Task still running') + + self.mox.StubOutWithMock(scheduler, 'wallclock') + scheduler.wallclock().AndReturn(st) + scheduler.wallclock().AndReturn(st + 0.5) + scheduler.wallclock().AndReturn(st + 1.5) + + self.mox.ReplayAll() + + runner = scheduler.TaskRunner(task) + + runner.start(timeout=1) + self.assertTrue(runner) + self.assertTrue(runner.step()) + self.assertFalse(runner) + self.assertTrue(runner.step()) + + self.mox.VerifyAll() + class WrapperTaskTest(mox.MoxTestBase): -- 2.45.2