import urlparse
import httplib
import eventlet
+import collections
from heat import manager
from heat.db import api as db_api
def __init__(self, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
- pass
+
+ # Maintain a dict mapping stack ids to in-progress greenthreads
+ # allows us to kill any pending create|update before delete_stack
+ #
+ # Currently we should only ever have one outstanding thread, but
+ # the implementation makes this a dict-of-sets so we could use
+ # the same method to cancel multiple threads, e.g if long-running
+ # query actions need to be spawned instead of run immediately
+ self.stack_threads = collections.defaultdict(set)
+
+ def _gt_done_callback(self, gt, **kwargs):
+ '''
+ Callback function to be passed to GreenThread.link() when we spawn()
+ Removes the thread ID from the stack_threads set of pending threads
+ kwargs should contain 'stack_id'
+ '''
+ if not 'stack_id' in kwargs:
+ logger.error("_gt_done_callback called with no stack_id!")
+ else:
+ stack_id = kwargs['stack_id']
+ if stack_id in self.stack_threads:
+ logger.debug("Thread done callback for stack %s, %s" %
+ (stack_id, gt))
+ self.stack_threads[stack_id].discard(gt)
+ if not len(self.stack_threads[stack_id]):
+ del self.stack_threads[stack_id]
def identify_stack(self, context, stack_name):
"""
return response
stack_id = stack.store()
- greenpool.spawn_n(stack.create)
+
+ # Spawn a greenthread to do the create, and register a
+ # callback to remove the thread from stack_threads when done
+ gt = greenpool.spawn(stack.create)
+ gt.link(self._gt_done_callback, stack_id=stack_id)
+ self.stack_threads[stack_id].add(gt)
return dict(stack.identifier())
if response['Description'] != 'Successfully validated':
return response
- greenpool.spawn_n(current_stack.update, updated_stack)
+ # Spawn a greenthread to do the update, and register a
+ # callback to remove the thread from stack_threads when done
+ gt = greenpool.spawn(current_stack.update, updated_stack)
+ gt.link(self._gt_done_callback, stack_id=db_stack.id)
+ self.stack_threads[db_stack.id].add(gt)
return dict(current_stack.identifier())
logger.info('deleting stack %s' % st.name)
stack = parser.Stack.load(context, st.id)
+
+ # Kill any in-progress create or update threads
+ if st.id in self.stack_threads:
+ # Note we must use set.copy() here or we get an error when thread
+ # rescheduling happens on t.kill() and _gt_done_callback modifies
+ # stack_threads[st.id] mid-iteration
+ for t in self.stack_threads[st.id].copy():
+ logger.warning("Killing running thread %s for stack %s" %
+ (t, st.name))
+ t.kill()
+
greenpool.spawn_n(stack.delete)
return None
meta=None).AndReturn(fc.servers.list()[-1])
+class DummyGreenThread():
+ def link(self, gt, **kwargs):
+ pass
+
+
@attr(tag=['unit', 'stack'])
@attr(speed='slow')
class stackCreateTest(unittest.TestCase):
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn({'Description': 'Successfully validated'})
- self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
- manager.greenpool.spawn_n(stack.create)
+ self.m.StubOutWithMock(manager.greenpool, 'spawn')
+ manager.greenpool.spawn(stack.create).AndReturn(DummyGreenThread())
self.m.ReplayAll()
error = {'Description': 'fubar'}
stack.validate().AndReturn(error)
- self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+ self.m.StubOutWithMock(manager.greenpool, 'spawn')
self.m.ReplayAll()
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn({'Description': 'Successfully validated'})
- self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
- manager.greenpool.spawn_n(old_stack.update, stack)
+ self.m.StubOutWithMock(manager.greenpool, 'spawn')
+ manager.greenpool.spawn(old_stack.update, stack).AndReturn(
+ DummyGreenThread())
self.m.ReplayAll()
error = {'Description': 'fubar'}
stack.validate().AndReturn(error)
- self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
+ self.m.StubOutWithMock(manager.greenpool, 'spawn')
self.m.ReplayAll()