]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Add initial suspend logic to engine
authorSteven Hardy <shardy@redhat.com>
Wed, 15 May 2013 18:01:39 +0000 (19:01 +0100)
committerSteven Hardy <shardy@redhat.com>
Mon, 24 Jun 2013 13:54:55 +0000 (14:54 +0100)
blueprint: stack-suspend-resume
Change-Id: Icf81672534de6b07d938785e659b7f6c733eacc4

heat/engine/parser.py
heat/engine/resource.py
heat/engine/service.py
heat/rpc/client.py
heat/tests/generic_resource.py
heat/tests/test_engine_service.py
heat/tests/test_parser.py
heat/tests/test_resource.py
heat/tests/test_rpc_client.py

index 92ded0210d48a19fcba4af53c49db4b15f67c21e..46e70659ab3b97511a3174f911fa080df9ae80b4 100644 (file)
@@ -41,8 +41,8 @@ logger = logging.getLogger(__name__)
 
 class Stack(object):
 
-    ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK
-               ) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK')
+    ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
+               ) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
 
     STATUSES = (IN_PROGRESS, FAILED, COMPLETE
                 ) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
@@ -483,6 +483,47 @@ class Stack(object):
             db_api.stack_delete(self.context, self.id)
             self.id = None
 
+    def suspend(self):
+        '''
+        Suspend the stack, which invokes handle_suspend for all stack resources
+        waits for all resources to become SUSPEND_COMPLETE then declares the
+        stack SUSPEND_COMPLETE.
+        Note the default implementation for all resources is to do nothing
+        other than move to SUSPEND_COMPLETE, so the resources must implement
+        handle_suspend for this to have any effect.
+        '''
+        sus_task = scheduler.TaskRunner(self.suspend_task)
+        sus_task(timeout=self.timeout_secs())
+
+    @scheduler.wrappertask
+    def suspend_task(self):
+        '''
+        A task to suspend the stack, suspends each resource in reverse
+        dependency order
+        '''
+        logger.info("Stack %s suspend started" % self.name)
+        self.state_set(self.SUSPEND, self.IN_PROGRESS, 'Stack suspend started')
+
+        stack_status = self.COMPLETE
+        reason = 'Stack suspend complete'
+
+        def resource_suspend(r):
+            return r.suspend()
+
+        sus_task = scheduler.DependencyTaskGroup(self.dependencies,
+                                                 resource_suspend,
+                                                 reverse=True)
+        try:
+            yield sus_task()
+        except exception.ResourceFailure as ex:
+            stack_status = self.FAILED
+            reason = 'Resource failed: %s' % str(ex)
+        except scheduler.Timeout:
+            stack_status = self.FAILED
+            reason = 'Suspend timed out'
+
+        self.state_set(self.SUSPEND, stack_status, reason)
+
     def output(self, key):
         '''
         Get the value of the specified stack output.
index a77c20ea40334dc033c6370c45b81959c32b67ba..0f15d6cf4b2b1df9d22c91c45db7820f5529e34f 100644 (file)
@@ -102,8 +102,8 @@ class Metadata(object):
 
 
 class Resource(object):
-    ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK
-               ) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK')
+    ACTIONS = (CREATE, DELETE, UPDATE, ROLLBACK, SUSPEND
+               ) = ('CREATE', 'DELETE', 'UPDATE', 'ROLLBACK', 'SUSPEND')
 
     STATUSES = (IN_PROGRESS, FAILED, COMPLETE
                 ) = ('IN_PROGRESS', 'FAILED', 'COMPLETE')
@@ -394,6 +394,16 @@ class Resource(object):
         '''
         return True
 
+    def check_suspend_complete(self, suspend_data):
+        '''
+        Check if the resource is suspended
+        By default this happens as soon as the handle_suspend() method
+        has completed successfully, but subclasses may customise this by
+        overriding this function. The return value of handle_suspend() is
+        passed in to this function each time it is called.
+        '''
+        return True
+
     def update(self, json_snippet=None):
         '''
         update the resource. Subclasses should provide a handle_update() method
@@ -431,6 +441,24 @@ class Resource(object):
             self.t = self.stack.resolve_static_data(json_snippet)
             self.state_set(self.UPDATE, self.COMPLETE)
 
+    def suspend(self):
+        '''
+        Suspend the resource.  Subclasses should provide a handle_suspend()
+        method to implement suspend, the base-class handle_update does nothing
+        Note this uses the same coroutine logic as create() since suspending
+        instances is a non-immediate operation and we want to paralellize
+        '''
+        # Don't try to suspend the resource unless it's in a stable state
+        if self.state not in ((self.CREATE, self.COMPLETE),
+                              (self.UPDATE, self.COMPLETE),
+                              (self.ROLLBACK, self.COMPLETE)):
+            exc = exception.Error('State %s invalid for suspend'
+                                  % str(self.state))
+            raise exception.ResourceFailure(exc)
+
+        logger.info('suspending %s' % str(self))
+        return self._do_action(self.SUSPEND)
+
     def physical_resource_name(self):
         if self.id is None:
             return None
index 9c12b9feaa930265fb30140763b6c1fe3a2379ee..571229b4211272f4cab521517552df034042e85b 100644 (file)
@@ -473,6 +473,22 @@ class EngineService(service.Service):
         return [api.format_stack_resource(resource, detail=False)
                 for resource in stack if resource.id is not None]
 
+    @request_context
+    def stack_suspend(self, cnxt, stack_identity):
+        '''
+        Handle request to perform an action on an existing stack
+        actions are non-lifecycle operations which manipulate the
+        state of the stack but not the definition
+        '''
+        def _stack_suspend(stack):
+            logger.debug("suspending stack %s" % stack.name)
+            stack.suspend()
+
+        s = self._get_stack(cnxt, stack_identity)
+
+        stack = parser.Stack.load(cnxt, stack=s)
+        self._start_in_thread(stack.id, _stack_suspend, stack)
+
     @request_context
     def metadata_update(self, cnxt, stack_identity,
                         resource_name, metadata):
index 70d8b0ab5d6c96c04f6ca3966ddd0204dd763946..3fca0af3ec9f77f968581682edf060890cd89617 100644 (file)
@@ -213,6 +213,10 @@ class EngineClient(heat.openstack.common.rpc.proxy.RpcProxy):
         return self.call(ctxt, self.make_msg('list_stack_resources',
                                              stack_identity=stack_identity))
 
+    def stack_suspend(self, ctxt, stack_identity):
+        return self.call(ctxt, self.make_msg('stack_suspend',
+                                             stack_identity=stack_identity))
+
     def metadata_update(self, ctxt, stack_identity, resource_name, metadata):
         """
         Update the metadata for the given resource.
index a6b7dd77358aa702d5af593e141e8a567d367133..8b07803ff6c70fe5097f889c158eb72da9850942 100644 (file)
@@ -35,3 +35,6 @@ class GenericResource(resource.Resource):
 
     def _resolve_attribute(self, name):
         return self.name
+
+    def handle_suspend(self):
+        logger.warning('Suspending generic resource (Type "%s")' % self.type())
index 92cb00936f12a059f88ed4c8e1210a120b6d1ae0..d9d50534d0785f1e0e66a260ed07a1a5f3201958 100644 (file)
@@ -465,6 +465,47 @@ class stackServiceCreateUpdateDeleteTest(HeatTestCase):
         self.m.VerifyAll()
 
 
+class stackServiceSuspendTest(HeatTestCase):
+
+    def setUp(self):
+        super(stackServiceSuspendTest, self).setUp()
+        self.username = 'stack_service_suspend_test_user'
+        self.tenant = 'stack_service_suspend_test_tenant'
+        setup_dummy_db()
+        self.ctx = create_context(self.m, self.username, self.tenant)
+
+        self.man = service.EngineService('a-host', 'a-topic')
+
+    def test_stack_suspend(self):
+        stack_name = 'service_suspend_test_stack'
+        stack = get_wordpress_stack(stack_name, self.ctx)
+        sid = stack.store()
+        s = db_api.stack_get(self.ctx, sid)
+
+        self.m.StubOutWithMock(parser.Stack, 'load')
+        parser.Stack.load(self.ctx, stack=s).AndReturn(stack)
+
+        self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
+        service.EngineService._start_in_thread(sid,
+                                               mox.IgnoreArg(),
+                                               stack).AndReturn(None)
+        self.m.ReplayAll()
+
+        result = self.man.stack_suspend(self.ctx, stack.identifier())
+        self.assertEqual(result, None)
+        self.m.VerifyAll()
+
+    def test_stack_suspend_nonexist(self):
+        stack_name = 'service_suspend_nonexist_test_stack'
+        stack = get_wordpress_stack(stack_name, self.ctx)
+
+        self.m.ReplayAll()
+
+        self.assertRaises(exception.StackNotFound,
+                          self.man.stack_suspend, self.ctx, stack.identifier())
+        self.m.VerifyAll()
+
+
 class stackServiceTest(HeatTestCase):
 
     def setUp(self):
@@ -1160,3 +1201,32 @@ class stackServiceTest(HeatTestCase):
         sl = self.eng.show_stack(self.ctx, None)
 
         self.assertEqual(len(sl), 0)
+
+    def test_stack_suspend(self):
+        stack_name = 'service_suspend_test_stack'
+        stack = get_wordpress_stack(stack_name, self.ctx)
+        sid = stack.store()
+        s = db_api.stack_get(self.ctx, sid)
+
+        self.m.StubOutWithMock(parser.Stack, 'load')
+        parser.Stack.load(self.ctx, stack=s).AndReturn(stack)
+
+        self.m.StubOutWithMock(service.EngineService, '_start_in_thread')
+        service.EngineService._start_in_thread(sid,
+                                               mox.IgnoreArg(),
+                                               stack).AndReturn(None)
+        self.m.ReplayAll()
+
+        result = self.eng.stack_suspend(self.ctx, stack.identifier())
+        self.assertEqual(result, None)
+        self.m.VerifyAll()
+
+    def test_stack_suspend_nonexist(self):
+        stack_name = 'service_suspend_nonexist_test_stack'
+        stack = get_wordpress_stack(stack_name, self.ctx)
+
+        self.m.ReplayAll()
+
+        self.assertRaises(exception.StackNotFound,
+                          self.eng.stack_suspend, self.ctx, stack.identifier())
+        self.m.VerifyAll()
index 13788fe2b5d7198a2fe49d3fb3a650adac196aad..8326f71efac32e60689730e0f52b1caed4c4a758 100644 (file)
@@ -580,6 +580,70 @@ class StackTest(HeatTestCase):
         self.assertEqual(self.stack.state,
                          (parser.Stack.DELETE, parser.Stack.COMPLETE))
 
+    @stack_delete_after
+    def test_suspend(self):
+        self.m.ReplayAll()
+        tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
+        self.stack = parser.Stack(self.ctx, 'suspend_test',
+                                  parser.Template(tmpl))
+        stack_id = self.stack.store()
+        self.stack.create()
+        self.assertEqual(self.stack.state,
+                         (self.stack.CREATE, self.stack.COMPLETE))
+
+        self.stack.suspend()
+
+        self.assertEqual(self.stack.state,
+                         (self.stack.SUSPEND, self.stack.COMPLETE))
+        self.m.VerifyAll()
+
+    @stack_delete_after
+    def test_suspend_fail(self):
+        tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
+        self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
+        exc = exception.ResourceFailure(Exception('foo'))
+        generic_rsrc.GenericResource.handle_suspend().AndRaise(exc)
+        self.m.ReplayAll()
+
+        self.stack = parser.Stack(self.ctx, 'suspend_test_fail',
+                                  parser.Template(tmpl))
+
+        stack_id = self.stack.store()
+        self.stack.create()
+        self.assertEqual(self.stack.state,
+                         (self.stack.CREATE, self.stack.COMPLETE))
+
+        self.stack.suspend()
+
+        self.assertEqual(self.stack.state,
+                         (self.stack.SUSPEND, self.stack.FAILED))
+        self.assertEqual(self.stack.status_reason,
+                         'Resource failed: Exception: foo')
+        self.m.VerifyAll()
+
+    @stack_delete_after
+    def test_suspend_timeout(self):
+        tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
+        self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
+        exc = scheduler.Timeout('foo', 0)
+        generic_rsrc.GenericResource.handle_suspend().AndRaise(exc)
+        self.m.ReplayAll()
+
+        self.stack = parser.Stack(self.ctx, 'suspend_test_fail_timeout',
+                                  parser.Template(tmpl))
+
+        stack_id = self.stack.store()
+        self.stack.create()
+        self.assertEqual(self.stack.state,
+                         (self.stack.CREATE, self.stack.COMPLETE))
+
+        self.stack.suspend()
+
+        self.assertEqual(self.stack.state,
+                         (self.stack.SUSPEND, self.stack.FAILED))
+        self.assertEqual(self.stack.status_reason, 'Suspend timed out')
+        self.m.VerifyAll()
+
     @stack_delete_after
     def test_delete_rollback(self):
         self.stack = parser.Stack(self.ctx, 'delete_rollback_test',
index 3dc2853b7eaa8f9b01460dcddeff8abddd30523a..f00950631308b7cbf6320d57cd6a1d280da1de38 100644 (file)
@@ -12,6 +12,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from eventlet.support import greenlets as greenlet
 
 from heat.common import context
 from heat.common import exception
@@ -386,6 +387,79 @@ class ResourceTest(HeatTestCase):
         self.assertEqual((res.UPDATE, res.FAILED), res.state)
         self.m.VerifyAll()
 
+    def test_suspend_ok(self):
+        # patch in a dummy property schema for GenericResource
+        dummy_schema = {'Foo': {'Type': 'String'}}
+        generic_rsrc.GenericResource.properties_schema = dummy_schema
+
+        tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
+        res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
+        res.update_allowed_keys = ('Properties',)
+        res.update_allowed_properties = ('Foo',)
+        scheduler.TaskRunner(res.create)()
+        self.assertEqual((res.CREATE, res.COMPLETE), res.state)
+        scheduler.TaskRunner(res.suspend)()
+        self.assertEqual((res.SUSPEND, res.COMPLETE), res.state)
+
+    def test_suspend_fail_inprogress(self):
+        # patch in a dummy property schema for GenericResource
+        dummy_schema = {'Foo': {'Type': 'String'}}
+        generic_rsrc.GenericResource.properties_schema = dummy_schema
+
+        tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
+        res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
+        scheduler.TaskRunner(res.create)()
+        self.assertEqual((res.CREATE, res.COMPLETE), res.state)
+
+        res.state_set(res.CREATE, res.IN_PROGRESS)
+        suspend = scheduler.TaskRunner(res.suspend)
+        self.assertRaises(exception.ResourceFailure, suspend)
+
+        res.state_set(res.UPDATE, res.IN_PROGRESS)
+        suspend = scheduler.TaskRunner(res.suspend)
+        self.assertRaises(exception.ResourceFailure, suspend)
+
+        res.state_set(res.DELETE, res.IN_PROGRESS)
+        suspend = scheduler.TaskRunner(res.suspend)
+        self.assertRaises(exception.ResourceFailure, suspend)
+
+    def test_suspend_fail_exit(self):
+        # patch in a dummy property schema for GenericResource
+        dummy_schema = {'Foo': {'Type': 'String'}}
+        generic_rsrc.GenericResource.properties_schema = dummy_schema
+
+        tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
+        res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
+        scheduler.TaskRunner(res.create)()
+        self.assertEqual((res.CREATE, res.COMPLETE), res.state)
+
+        self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
+        generic_rsrc.GenericResource.handle_suspend().AndRaise(
+            greenlet.GreenletExit())
+        self.m.ReplayAll()
+
+        suspend = scheduler.TaskRunner(res.suspend)
+        self.assertRaises(greenlet.GreenletExit, suspend)
+        self.assertEqual((res.SUSPEND, res.FAILED), res.state)
+
+    def test_suspend_fail_exception(self):
+        # patch in a dummy property schema for GenericResource
+        dummy_schema = {'Foo': {'Type': 'String'}}
+        generic_rsrc.GenericResource.properties_schema = dummy_schema
+
+        tmpl = {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}
+        res = generic_rsrc.GenericResource('test_resource', tmpl, self.stack)
+        scheduler.TaskRunner(res.create)()
+        self.assertEqual((res.CREATE, res.COMPLETE), res.state)
+
+        self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
+        generic_rsrc.GenericResource.handle_suspend().AndRaise(Exception())
+        self.m.ReplayAll()
+
+        suspend = scheduler.TaskRunner(res.suspend)
+        self.assertRaises(exception.ResourceFailure, suspend)
+        self.assertEqual((res.SUSPEND, res.FAILED), res.state)
+
 
 class MetadataTest(HeatTestCase):
     def setUp(self):
index e4326bef0b416c30d069367abf262789ca294892..ff06266e7f480fbb96f9aa2fd8ebd7df95e50ae3 100644 (file)
@@ -141,6 +141,10 @@ class EngineRpcAPITestCase(testtools.TestCase):
         self._test_engine_api('list_stack_resources', 'call',
                               stack_identity=self.identity)
 
+    def test_stack_suspend(self):
+        self._test_engine_api('stack_suspend', 'call',
+                              stack_identity=self.identity)
+
     def test_metadata_update(self):
         self._test_engine_api('metadata_update', 'call',
                               stack_identity=self.identity,