]> review.fuel-infra Code Review - openstack-build/heat-build.git/commitdiff
Refactor and partly parallelise stack update code
authorZane Bitter <zbitter@redhat.com>
Fri, 21 Jun 2013 16:01:32 +0000 (18:01 +0200)
committerZane Bitter <zbitter@redhat.com>
Fri, 21 Jun 2013 16:17:20 +0000 (18:17 +0200)
Make this easier to read, and also use co-routines to make it run in
parallel where possible.

Change-Id: Ib37f4c04bd250cb15604ba9adb4cca8a97a2c4fb

heat/engine/parser.py
heat/engine/update.py [new file with mode: 0644]
heat/tests/test_parser.py

index 92ded0210d48a19fcba4af53c49db4b15f67c21e..554d8c81e052ed98159e668d1ac95eb9e10207ef 100644 (file)
@@ -13,7 +13,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import eventlet
 import functools
 import re
 
@@ -26,6 +25,7 @@ from heat.engine import resources
 from heat.engine import scheduler
 from heat.engine import template
 from heat.engine import timestamp
+from heat.engine import update
 from heat.engine.parameters import Parameters
 from heat.engine.template import Template
 from heat.engine.clients import Clients
@@ -351,104 +351,48 @@ class Stack(object):
         for r in self:
             r.cache_template()
 
-        # Now make the resources match the new stack definition
-        with eventlet.Timeout(self.timeout_secs()) as tmo:
+        try:
+            update_task = update.StackUpdate(self, newstack)
+            updater = scheduler.TaskRunner(update_task)
             try:
-                # First delete any resources which are not in newstack
-                for res in reversed(self):
-                    if res.name not in newstack.keys():
-                        logger.debug("resource %s not found in updated stack"
-                                     % res.name + " definition, deleting")
-                        # res.destroy raises exception.ResourceFailure on error
-                        res.destroy()
-                        del self.resources[res.name]
-                        self.dependencies = self._get_dependencies(
-                            self.resources.itervalues())
-
-                # Then create any which are defined in newstack but not self
-                for res in newstack:
-                    if res.name not in self.keys():
-                        logger.debug("resource %s not found in current stack"
-                                     % res.name + " definition, adding")
-                        res.stack = self
-                        self[res.name] = res
-                        self.dependencies = self._get_dependencies(
-                            self.resources.itervalues())
-                        # res.create raises exception.ResourceFailure on error
-                        scheduler.TaskRunner(res.create)()
-
-                # Now (the hard part :) update existing resources
-                # The Resource base class allows equality-test of resources,
-                # based on the parsed template snippet for the resource.
-                # If this  test fails, we call the underlying resource.update
-                #
-                # Currently many resources have a default handle_update method
-                # which raises exception.ResourceReplace
-                # optionally they may implement non-interruptive logic and
-                # return UPDATE_COMPLETE. If resources do not implement the
-                # handle_update method at all, update will fail.
-                for res in newstack:
-                    # Compare resolved pre/post update resource snippets,
-                    # note the new resource snippet is resolved in the context
-                    # of the existing stack (which is the stack being updated)
-                    old_snippet = self[res.name].parsed_template(cached=True)
-                    new_snippet = self.resolve_runtime_data(res.t)
-
-                    if old_snippet != new_snippet:
-                        # res.update raises exception.ResourceFailure on error
-                        # or exception.ResourceReplace if update requires
-                        # replacement
-                        try:
-                            self[res.name].update(new_snippet)
-                        except resource.UpdateReplace:
-                            # Resource requires replacement for update
-                            self[res.name].destroy()
-                            res.stack = self
-                            self[res.name] = res
-                            self.dependencies = self._get_dependencies(
-                                self.resources.itervalues())
-                            scheduler.TaskRunner(res.create)()
-                        else:
-                            logger.info("Resource %s for stack %s updated" %
-                                        (res.name, self.name))
-
-                if action == self.UPDATE:
-                    reason = 'Stack successfully updated'
-                else:
-                    reason = 'Stack rollback completed'
-                stack_status = self.COMPLETE
-
-            except eventlet.Timeout as t:
-                if t is tmo:
-                    stack_status = self.FAILED
-                    reason = 'Timed out waiting for %s' % str(res)
-                else:
-                    # not my timeout
-                    raise
-            except exception.ResourceFailure as e:
-                reason = str(e) or "Error : %s" % type(e)
-
-                stack_status = self.FAILED
-                if action == self.UPDATE:
-                    # If rollback is enabled, we do another update, with the
-                    # existing template, so we roll back to the original state
-                    if not self.disable_rollback:
-                        oldstack = Stack(self.context, self.name, self.t,
-                                         self.env)
-                        self.update(oldstack, action=self.ROLLBACK)
-                        return
-
-            self.state_set(action, stack_status, reason)
-
-            # flip the template & environment to the newstack values
-            # Note we do this on success and failure, so the current
-            # stack resources are stored, even if one is in a failed
-            # state (otherwise we won't remove them on delete)
-            self.t = newstack.t
-            self.env = newstack.env
-            template_outputs = self.t[template.OUTPUTS]
-            self.outputs = self.resolve_static_data(template_outputs)
-            self.store()
+                updater(timeout=self.timeout_secs())
+            finally:
+                cur_deps = self._get_dependencies(self.resources.itervalues())
+                self.dependencies = cur_deps
+
+            if action == self.UPDATE:
+                reason = 'Stack successfully updated'
+            else:
+                reason = 'Stack rollback completed'
+            stack_status = self.COMPLETE
+
+        except scheduler.Timeout:
+            stack_status = self.FAILED
+            reason = 'Timed out'
+        except exception.ResourceFailure as e:
+            reason = str(e)
+
+            stack_status = self.FAILED
+            if action == self.UPDATE:
+                # If rollback is enabled, we do another update, with the
+                # existing template, so we roll back to the original state
+                if not self.disable_rollback:
+                    oldstack = Stack(self.context, self.name, self.t,
+                                     self.env)
+                    self.update(oldstack, action=self.ROLLBACK)
+                    return
+
+        self.state_set(action, stack_status, reason)
+
+        # flip the template & environment to the newstack values
+        # Note we do this on success and failure, so the current
+        # stack resources are stored, even if one is in a failed
+        # state (otherwise we won't remove them on delete)
+        self.t = newstack.t
+        self.env = newstack.env
+        template_outputs = self.t[template.OUTPUTS]
+        self.outputs = self.resolve_static_data(template_outputs)
+        self.store()
 
     def delete(self, action=DELETE):
         '''
diff --git a/heat/engine/update.py b/heat/engine/update.py
new file mode 100644 (file)
index 0000000..a9a3989
--- /dev/null
@@ -0,0 +1,104 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from heat.engine import resource
+from heat.engine import scheduler
+
+from heat.openstack.common import log as logging
+
+logger = logging.getLogger(__name__)
+
+
+class StackUpdate(object):
+    """
+    A Task to perform the update of an existing stack to a new template.
+    """
+
+    def __init__(self, existing_stack, new_stack):
+        """Initialise with the existing stack and the new stack."""
+        self.existing_stack = existing_stack
+        self.new_stack = new_stack
+
+    def __str__(self):
+        return '%s Update' % str(self.existing_stack)
+
+    @scheduler.wrappertask
+    def __call__(self):
+        """Return a co-routine that updates the stack."""
+
+        existing_deps = self.existing_stack.dependencies
+        new_deps = self.new_stack.dependencies
+
+        cleanup = scheduler.DependencyTaskGroup(existing_deps,
+                                                self._remove_old_resource,
+                                                reverse=True)
+        create_new = scheduler.DependencyTaskGroup(new_deps,
+                                                   self._create_new_resource)
+        update = scheduler.DependencyTaskGroup(new_deps,
+                                               self._update_resource)
+
+        yield cleanup()
+        yield create_new()
+        yield update()
+
+    @scheduler.wrappertask
+    def _remove_old_resource(self, existing_res):
+        res_name = existing_res.name
+        if res_name not in self.new_stack:
+            logger.debug("resource %s not found in updated stack"
+                         % res_name + " definition, deleting")
+            yield existing_res.destroy()
+            del self.existing_stack.resources[res_name]
+
+    @scheduler.wrappertask
+    def _create_new_resource(self, new_res):
+        res_name = new_res.name
+        if res_name not in self.existing_stack:
+            logger.debug("resource %s not found in current stack"
+                         % res_name + " definition, adding")
+            new_res.stack = self.existing_stack
+            self.existing_stack[res_name] = new_res
+            yield new_res.create()
+
+    @scheduler.wrappertask
+    def _replace_resource(self, new_res):
+        res_name = new_res.name
+        yield self.existing_stack[res_name].destroy()
+        new_res.stack = self.existing_stack
+        self.existing_stack[res_name] = new_res
+        yield new_res.create()
+
+    @scheduler.wrappertask
+    def _update_resource(self, new_res):
+        res_name = new_res.name
+
+        if res_name not in self.existing_stack:
+            return
+
+        # Compare resolved pre/post update resource snippets,
+        # note the new resource snippet is resolved in the context
+        # of the existing stack (which is the stack being updated)
+        existing_snippet = self.existing_stack[res_name].\
+            parsed_template(cached=True)
+        new_snippet = self.existing_stack.resolve_runtime_data(new_res.t)
+
+        if new_snippet != existing_snippet:
+            try:
+                yield self.existing_stack[res_name].update(new_snippet)
+            except resource.UpdateReplace:
+                yield self._replace_resource(new_res)
+            else:
+                logger.info("Resource %s for stack %s updated" %
+                            (res_name, self.existing_stack.name))
index 13788fe2b5d7198a2fe49d3fb3a650adac196aad..d632292127f5c9d465b64e6335a950bc6421677c 100644 (file)
@@ -651,6 +651,10 @@ class StackTest(HeatTestCase):
     def test_update_add(self):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl))
         self.stack.store()
@@ -674,6 +678,10 @@ class StackTest(HeatTestCase):
                 'AResource': {'Type': 'GenericResourceType'},
                 'BResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl))
         self.stack.store()
@@ -695,6 +703,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Description': 'ATemplate',
                 'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl))
         self.stack.store()
@@ -721,6 +733,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl))
         self.stack.store()
@@ -754,6 +770,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=True)
@@ -795,6 +815,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=True)
@@ -835,6 +859,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=True)
@@ -867,6 +895,9 @@ class StackTest(HeatTestCase):
     def test_update_add_failed_create(self):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl))
         self.stack.store()
@@ -905,6 +936,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=False)
@@ -945,6 +980,10 @@ class StackTest(HeatTestCase):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType',
                                             'Properties': {'Foo': 'abc'}}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=False)
@@ -979,6 +1018,10 @@ class StackTest(HeatTestCase):
     def test_update_rollback_add(self):
         tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=False)
@@ -1012,6 +1055,10 @@ class StackTest(HeatTestCase):
                 'AResource': {'Type': 'GenericResourceType'},
                 'BResource': {'Type': 'GenericResourceType'}}}
 
+        self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
+        mox.Replay(scheduler.TaskRunner._sleep)
+
         self.stack = parser.Stack(self.ctx, 'update_test_stack',
                                   template.Template(tmpl),
                                   disable_rollback=False)
@@ -1066,7 +1113,7 @@ class StackTest(HeatTestCase):
                                   template.Template(tmpl))
 
         self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
-        scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
         self.m.ReplayAll()
 
         self.stack.store()
@@ -1126,7 +1173,7 @@ class StackTest(HeatTestCase):
                                   disable_rollback=False)
 
         self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
-        scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
         self.m.ReplayAll()
 
         self.stack.store()
@@ -1196,7 +1243,7 @@ class StackTest(HeatTestCase):
                                   disable_rollback=False)
 
         self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep')
-        scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None)
+        scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes()
         self.m.ReplayAll()
 
         self.stack.store()