From: Steven Hardy Date: Thu, 18 Oct 2012 16:13:02 +0000 (+0100) Subject: heat engine : kill running greenthreads on stack_delete X-Git-Tag: 2014.1~1278 X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=b0661fdc8cb289a4d0d531809fcfc399936d3f18;p=openstack-build%2Fheat-build.git heat engine : kill running greenthreads on stack_delete Add logic to track running eventlet greenthreads and kill them when we start a stack_delete. This should avoid errors where long-running greenthreads end up referencing stacks which have subsequently been deleted. Fixes #261 Ref #223 Change-Id: I0d10b6f2dad0efa1caec18a67a3cc66cc693ea24 Signed-off-by: Steven Hardy --- diff --git a/heat/engine/manager.py b/heat/engine/manager.py index 4145813c..9ea57d77 100644 --- a/heat/engine/manager.py +++ b/heat/engine/manager.py @@ -21,6 +21,7 @@ import json import urlparse import httplib import eventlet +import collections from heat import manager from heat.db import api as db_api @@ -59,7 +60,32 @@ class EngineManager(manager.Manager): def __init__(self, *args, **kwargs): """Load configuration options and connect to the hypervisor.""" - pass + + # Maintain a dict mapping stack ids to in-progress greenthreads + # allows us to kill any pending create|update before delete_stack + # + # Currently we should only ever have one outstanding thread, but + # the implementation makes this a dict-of-sets so we could use + # the same method to cancel multiple threads, e.g if long-running + # query actions need to be spawned instead of run immediately + self.stack_threads = collections.defaultdict(set) + + def _gt_done_callback(self, gt, **kwargs): + ''' + Callback function to be passed to GreenThread.link() when we spawn() + Removes the thread ID from the stack_threads set of pending threads + kwargs should contain 'stack_id' + ''' + if not 'stack_id' in kwargs: + logger.error("_gt_done_callback called with no stack_id!") + else: + stack_id = kwargs['stack_id'] + if stack_id in self.stack_threads: + logger.debug("Thread done callback for stack %s, %s" % + (stack_id, gt)) + self.stack_threads[stack_id].discard(gt) + if not len(self.stack_threads[stack_id]): + del self.stack_threads[stack_id] def identify_stack(self, context, stack_name): """ @@ -139,7 +165,12 @@ class EngineManager(manager.Manager): return response stack_id = stack.store() - greenpool.spawn_n(stack.create) + + # Spawn a greenthread to do the create, and register a + # callback to remove the thread from stack_threads when done + gt = greenpool.spawn(stack.create) + gt.link(self._gt_done_callback, stack_id=stack_id) + self.stack_threads[stack_id].add(gt) return dict(stack.identifier()) @@ -176,7 +207,11 @@ class EngineManager(manager.Manager): if response['Description'] != 'Successfully validated': return response - greenpool.spawn_n(current_stack.update, updated_stack) + # Spawn a greenthread to do the update, and register a + # callback to remove the thread from stack_threads when done + gt = greenpool.spawn(current_stack.update, updated_stack) + gt.link(self._gt_done_callback, stack_id=db_stack.id) + self.stack_threads[db_stack.id].add(gt) return dict(current_stack.identifier()) @@ -241,6 +276,17 @@ class EngineManager(manager.Manager): logger.info('deleting stack %s' % st.name) stack = parser.Stack.load(context, st.id) + + # Kill any in-progress create or update threads + if st.id in self.stack_threads: + # Note we must use set.copy() here or we get an error when thread + # rescheduling happens on t.kill() and _gt_done_callback modifies + # stack_threads[st.id] mid-iteration + for t in self.stack_threads[st.id].copy(): + logger.warning("Killing running thread %s for stack %s" % + (t, st.name)) + t.kill() + greenpool.spawn_n(stack.delete) return None diff --git a/heat/tests/test_engine_manager.py b/heat/tests/test_engine_manager.py index f6dde6a4..7e2ef5cd 100644 --- a/heat/tests/test_engine_manager.py +++ b/heat/tests/test_engine_manager.py @@ -81,6 +81,11 @@ def setup_mocks(mocks, stack): meta=None).AndReturn(fc.servers.list()[-1]) +class DummyGreenThread(): + def link(self, gt, **kwargs): + pass + + @attr(tag=['unit', 'stack']) @attr(speed='slow') class stackCreateTest(unittest.TestCase): @@ -159,8 +164,8 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase): self.m.StubOutWithMock(stack, 'validate') stack.validate().AndReturn({'Description': 'Successfully validated'}) - self.m.StubOutWithMock(manager.greenpool, 'spawn_n') - manager.greenpool.spawn_n(stack.create) + self.m.StubOutWithMock(manager.greenpool, 'spawn') + manager.greenpool.spawn(stack.create).AndReturn(DummyGreenThread()) self.m.ReplayAll() @@ -193,7 +198,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase): error = {'Description': 'fubar'} stack.validate().AndReturn(error) - self.m.StubOutWithMock(manager.greenpool, 'spawn_n') + self.m.StubOutWithMock(manager.greenpool, 'spawn') self.m.ReplayAll() @@ -257,8 +262,9 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase): self.m.StubOutWithMock(stack, 'validate') stack.validate().AndReturn({'Description': 'Successfully validated'}) - self.m.StubOutWithMock(manager.greenpool, 'spawn_n') - manager.greenpool.spawn_n(old_stack.update, stack) + self.m.StubOutWithMock(manager.greenpool, 'spawn') + manager.greenpool.spawn(old_stack.update, stack).AndReturn( + DummyGreenThread()) self.m.ReplayAll() @@ -297,7 +303,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase): error = {'Description': 'fubar'} stack.validate().AndReturn(error) - self.m.StubOutWithMock(manager.greenpool, 'spawn_n') + self.m.StubOutWithMock(manager.greenpool, 'spawn') self.m.ReplayAll()