]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
heat engine : kill running greenthreads on stack_delete
authorSteven Hardy <shardy@redhat.com>
Thu, 18 Oct 2012 16:13:02 +0000 (17:13 +0100)
committerSteven Hardy <shardy@redhat.com>
Tue, 23 Oct 2012 10:59:09 +0000 (11:59 +0100)
Add logic to track running eventlet greenthreads and kill
them when we start a stack_delete.  This should avoid errors
where long-running greenthreads end up referencing stacks which
have subsequently been deleted.

Fixes #261
Ref #223

Change-Id: I0d10b6f2dad0efa1caec18a67a3cc66cc693ea24
Signed-off-by: Steven Hardy <shardy@redhat.com>
heat/engine/manager.py
heat/tests/test_engine_manager.py

index 4145813c6b7386e0daca97959bcc53134341cb15..9ea57d771d56f3a7d675d37d8c79916ef63ce77b 100644 (file)
@@ -21,6 +21,7 @@ import json
 import urlparse
 import httplib
 import eventlet
+import collections
 
 from heat import manager
 from heat.db import api as db_api
@@ -59,7 +60,32 @@ class EngineManager(manager.Manager):
 
     def __init__(self, *args, **kwargs):
         """Load configuration options and connect to the hypervisor."""
-        pass
+
+        # Maintain a dict mapping stack ids to in-progress greenthreads
+        # allows us to kill any pending create|update before delete_stack
+        #
+        # Currently we should only ever have one outstanding thread, but
+        # the implementation makes this a dict-of-sets so we could use
+        # the same method to cancel multiple threads, e.g if long-running
+        # query actions need to be spawned instead of run immediately
+        self.stack_threads = collections.defaultdict(set)
+
+    def _gt_done_callback(self, gt, **kwargs):
+        '''
+        Callback function to be passed to GreenThread.link() when we spawn()
+        Removes the thread ID from the stack_threads set of pending threads
+        kwargs should contain 'stack_id'
+        '''
+        if not 'stack_id' in kwargs:
+            logger.error("_gt_done_callback called with no stack_id!")
+        else:
+            stack_id = kwargs['stack_id']
+            if stack_id in self.stack_threads:
+                logger.debug("Thread done callback for stack %s, %s" %
+                             (stack_id, gt))
+                self.stack_threads[stack_id].discard(gt)
+                if not len(self.stack_threads[stack_id]):
+                    del self.stack_threads[stack_id]
 
     def identify_stack(self, context, stack_name):
         """
@@ -139,7 +165,12 @@ class EngineManager(manager.Manager):
             return response
 
         stack_id = stack.store()
-        greenpool.spawn_n(stack.create)
+
+        # Spawn a greenthread to do the create, and register a
+        # callback to remove the thread from stack_threads when done
+        gt = greenpool.spawn(stack.create)
+        gt.link(self._gt_done_callback, stack_id=stack_id)
+        self.stack_threads[stack_id].add(gt)
 
         return dict(stack.identifier())
 
@@ -176,7 +207,11 @@ class EngineManager(manager.Manager):
         if response['Description'] != 'Successfully validated':
             return response
 
-        greenpool.spawn_n(current_stack.update, updated_stack)
+        # Spawn a greenthread to do the update, and register a
+        # callback to remove the thread from stack_threads when done
+        gt = greenpool.spawn(current_stack.update, updated_stack)
+        gt.link(self._gt_done_callback, stack_id=db_stack.id)
+        self.stack_threads[db_stack.id].add(gt)
 
         return dict(current_stack.identifier())
 
@@ -241,6 +276,17 @@ class EngineManager(manager.Manager):
         logger.info('deleting stack %s' % st.name)
 
         stack = parser.Stack.load(context, st.id)
+
+        # Kill any in-progress create or update threads
+        if st.id in self.stack_threads:
+            # Note we must use set.copy() here or we get an error when thread
+            # rescheduling happens on t.kill() and _gt_done_callback modifies
+            # stack_threads[st.id] mid-iteration
+            for t in self.stack_threads[st.id].copy():
+                logger.warning("Killing running thread %s for stack %s" %
+                               (t, st.name))
+                t.kill()
+
         greenpool.spawn_n(stack.delete)
         return None
 
index f6dde6a4f3eb722f3d9b44156b0f0b78a258fdce..7e2ef5cd8db1bd7d1fcc69fb4fe7a9a409b852d6 100644 (file)
@@ -81,6 +81,11 @@ def setup_mocks(mocks, stack):
                       meta=None).AndReturn(fc.servers.list()[-1])
 
 
+class DummyGreenThread():
+    def link(self, gt, **kwargs):
+        pass
+
+
 @attr(tag=['unit', 'stack'])
 @attr(speed='slow')
 class stackCreateTest(unittest.TestCase):
@@ -159,8 +164,8 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.StubOutWithMock(stack, 'validate')
         stack.validate().AndReturn({'Description': 'Successfully validated'})
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
-        manager.greenpool.spawn_n(stack.create)
+        self.m.StubOutWithMock(manager.greenpool, 'spawn')
+        manager.greenpool.spawn(stack.create).AndReturn(DummyGreenThread())
 
         self.m.ReplayAll()
 
@@ -193,7 +198,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         error = {'Description': 'fubar'}
         stack.validate().AndReturn(error)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+        self.m.StubOutWithMock(manager.greenpool, 'spawn')
 
         self.m.ReplayAll()
 
@@ -257,8 +262,9 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         self.m.StubOutWithMock(stack, 'validate')
         stack.validate().AndReturn({'Description': 'Successfully validated'})
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
-        manager.greenpool.spawn_n(old_stack.update, stack)
+        self.m.StubOutWithMock(manager.greenpool, 'spawn')
+        manager.greenpool.spawn(old_stack.update, stack).AndReturn(
+                                DummyGreenThread())
 
         self.m.ReplayAll()
 
@@ -297,7 +303,7 @@ class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
         error = {'Description': 'fubar'}
         stack.validate().AndReturn(error)
 
-        self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+        self.m.StubOutWithMock(manager.greenpool, 'spawn')
 
         self.m.ReplayAll()