From: Zane Bitter Date: Fri, 21 Jun 2013 16:01:32 +0000 (+0200) Subject: Refactor and partly parallelise stack update code X-Git-Tag: 2014.1~438^2 X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=46edefa36cfd3ff6d31a54c7e8eccd059887badb;p=openstack-build%2Fheat-build.git Refactor and partly parallelise stack update code Make this easier to read, and also use co-routines to make it run in parallel where possible. Change-Id: Ib37f4c04bd250cb15604ba9adb4cca8a97a2c4fb --- diff --git a/heat/engine/parser.py b/heat/engine/parser.py index 92ded021..554d8c81 100644 --- a/heat/engine/parser.py +++ b/heat/engine/parser.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import functools import re @@ -26,6 +25,7 @@ from heat.engine import resources from heat.engine import scheduler from heat.engine import template from heat.engine import timestamp +from heat.engine import update from heat.engine.parameters import Parameters from heat.engine.template import Template from heat.engine.clients import Clients @@ -351,104 +351,48 @@ class Stack(object): for r in self: r.cache_template() - # Now make the resources match the new stack definition - with eventlet.Timeout(self.timeout_secs()) as tmo: + try: + update_task = update.StackUpdate(self, newstack) + updater = scheduler.TaskRunner(update_task) try: - # First delete any resources which are not in newstack - for res in reversed(self): - if res.name not in newstack.keys(): - logger.debug("resource %s not found in updated stack" - % res.name + " definition, deleting") - # res.destroy raises exception.ResourceFailure on error - res.destroy() - del self.resources[res.name] - self.dependencies = self._get_dependencies( - self.resources.itervalues()) - - # Then create any which are defined in newstack but not self - for res in newstack: - if res.name not in self.keys(): - logger.debug("resource %s not found in current stack" - % res.name + " definition, adding") - res.stack = self - self[res.name] = res - self.dependencies = self._get_dependencies( - self.resources.itervalues()) - # res.create raises exception.ResourceFailure on error - scheduler.TaskRunner(res.create)() - - # Now (the hard part :) update existing resources - # The Resource base class allows equality-test of resources, - # based on the parsed template snippet for the resource. - # If this test fails, we call the underlying resource.update - # - # Currently many resources have a default handle_update method - # which raises exception.ResourceReplace - # optionally they may implement non-interruptive logic and - # return UPDATE_COMPLETE. If resources do not implement the - # handle_update method at all, update will fail. - for res in newstack: - # Compare resolved pre/post update resource snippets, - # note the new resource snippet is resolved in the context - # of the existing stack (which is the stack being updated) - old_snippet = self[res.name].parsed_template(cached=True) - new_snippet = self.resolve_runtime_data(res.t) - - if old_snippet != new_snippet: - # res.update raises exception.ResourceFailure on error - # or exception.ResourceReplace if update requires - # replacement - try: - self[res.name].update(new_snippet) - except resource.UpdateReplace: - # Resource requires replacement for update - self[res.name].destroy() - res.stack = self - self[res.name] = res - self.dependencies = self._get_dependencies( - self.resources.itervalues()) - scheduler.TaskRunner(res.create)() - else: - logger.info("Resource %s for stack %s updated" % - (res.name, self.name)) - - if action == self.UPDATE: - reason = 'Stack successfully updated' - else: - reason = 'Stack rollback completed' - stack_status = self.COMPLETE - - except eventlet.Timeout as t: - if t is tmo: - stack_status = self.FAILED - reason = 'Timed out waiting for %s' % str(res) - else: - # not my timeout - raise - except exception.ResourceFailure as e: - reason = str(e) or "Error : %s" % type(e) - - stack_status = self.FAILED - if action == self.UPDATE: - # If rollback is enabled, we do another update, with the - # existing template, so we roll back to the original state - if not self.disable_rollback: - oldstack = Stack(self.context, self.name, self.t, - self.env) - self.update(oldstack, action=self.ROLLBACK) - return - - self.state_set(action, stack_status, reason) - - # flip the template & environment to the newstack values - # Note we do this on success and failure, so the current - # stack resources are stored, even if one is in a failed - # state (otherwise we won't remove them on delete) - self.t = newstack.t - self.env = newstack.env - template_outputs = self.t[template.OUTPUTS] - self.outputs = self.resolve_static_data(template_outputs) - self.store() + updater(timeout=self.timeout_secs()) + finally: + cur_deps = self._get_dependencies(self.resources.itervalues()) + self.dependencies = cur_deps + + if action == self.UPDATE: + reason = 'Stack successfully updated' + else: + reason = 'Stack rollback completed' + stack_status = self.COMPLETE + + except scheduler.Timeout: + stack_status = self.FAILED + reason = 'Timed out' + except exception.ResourceFailure as e: + reason = str(e) + + stack_status = self.FAILED + if action == self.UPDATE: + # If rollback is enabled, we do another update, with the + # existing template, so we roll back to the original state + if not self.disable_rollback: + oldstack = Stack(self.context, self.name, self.t, + self.env) + self.update(oldstack, action=self.ROLLBACK) + return + + self.state_set(action, stack_status, reason) + + # flip the template & environment to the newstack values + # Note we do this on success and failure, so the current + # stack resources are stored, even if one is in a failed + # state (otherwise we won't remove them on delete) + self.t = newstack.t + self.env = newstack.env + template_outputs = self.t[template.OUTPUTS] + self.outputs = self.resolve_static_data(template_outputs) + self.store() def delete(self, action=DELETE): ''' diff --git a/heat/engine/update.py b/heat/engine/update.py new file mode 100644 index 00000000..a9a39890 --- /dev/null +++ b/heat/engine/update.py @@ -0,0 +1,104 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.engine import resource +from heat.engine import scheduler + +from heat.openstack.common import log as logging + +logger = logging.getLogger(__name__) + + +class StackUpdate(object): + """ + A Task to perform the update of an existing stack to a new template. + """ + + def __init__(self, existing_stack, new_stack): + """Initialise with the existing stack and the new stack.""" + self.existing_stack = existing_stack + self.new_stack = new_stack + + def __str__(self): + return '%s Update' % str(self.existing_stack) + + @scheduler.wrappertask + def __call__(self): + """Return a co-routine that updates the stack.""" + + existing_deps = self.existing_stack.dependencies + new_deps = self.new_stack.dependencies + + cleanup = scheduler.DependencyTaskGroup(existing_deps, + self._remove_old_resource, + reverse=True) + create_new = scheduler.DependencyTaskGroup(new_deps, + self._create_new_resource) + update = scheduler.DependencyTaskGroup(new_deps, + self._update_resource) + + yield cleanup() + yield create_new() + yield update() + + @scheduler.wrappertask + def _remove_old_resource(self, existing_res): + res_name = existing_res.name + if res_name not in self.new_stack: + logger.debug("resource %s not found in updated stack" + % res_name + " definition, deleting") + yield existing_res.destroy() + del self.existing_stack.resources[res_name] + + @scheduler.wrappertask + def _create_new_resource(self, new_res): + res_name = new_res.name + if res_name not in self.existing_stack: + logger.debug("resource %s not found in current stack" + % res_name + " definition, adding") + new_res.stack = self.existing_stack + self.existing_stack[res_name] = new_res + yield new_res.create() + + @scheduler.wrappertask + def _replace_resource(self, new_res): + res_name = new_res.name + yield self.existing_stack[res_name].destroy() + new_res.stack = self.existing_stack + self.existing_stack[res_name] = new_res + yield new_res.create() + + @scheduler.wrappertask + def _update_resource(self, new_res): + res_name = new_res.name + + if res_name not in self.existing_stack: + return + + # Compare resolved pre/post update resource snippets, + # note the new resource snippet is resolved in the context + # of the existing stack (which is the stack being updated) + existing_snippet = self.existing_stack[res_name].\ + parsed_template(cached=True) + new_snippet = self.existing_stack.resolve_runtime_data(new_res.t) + + if new_snippet != existing_snippet: + try: + yield self.existing_stack[res_name].update(new_snippet) + except resource.UpdateReplace: + yield self._replace_resource(new_res) + else: + logger.info("Resource %s for stack %s updated" % + (res_name, self.existing_stack.name)) diff --git a/heat/tests/test_parser.py b/heat/tests/test_parser.py index 13788fe2..d6322921 100644 --- a/heat/tests/test_parser.py +++ b/heat/tests/test_parser.py @@ -651,6 +651,10 @@ class StackTest(HeatTestCase): def test_update_add(self): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl)) self.stack.store() @@ -674,6 +678,10 @@ class StackTest(HeatTestCase): 'AResource': {'Type': 'GenericResourceType'}, 'BResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl)) self.stack.store() @@ -695,6 +703,10 @@ class StackTest(HeatTestCase): tmpl = {'Description': 'ATemplate', 'Resources': {'AResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl)) self.stack.store() @@ -721,6 +733,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl)) self.stack.store() @@ -754,6 +770,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=True) @@ -795,6 +815,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=True) @@ -835,6 +859,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=True) @@ -867,6 +895,9 @@ class StackTest(HeatTestCase): def test_update_add_failed_create(self): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl)) self.stack.store() @@ -905,6 +936,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=False) @@ -945,6 +980,10 @@ class StackTest(HeatTestCase): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType', 'Properties': {'Foo': 'abc'}}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=False) @@ -979,6 +1018,10 @@ class StackTest(HeatTestCase): def test_update_rollback_add(self): tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=False) @@ -1012,6 +1055,10 @@ class StackTest(HeatTestCase): 'AResource': {'Type': 'GenericResourceType'}, 'BResource': {'Type': 'GenericResourceType'}}} + self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() + mox.Replay(scheduler.TaskRunner._sleep) + self.stack = parser.Stack(self.ctx, 'update_test_stack', template.Template(tmpl), disable_rollback=False) @@ -1066,7 +1113,7 @@ class StackTest(HeatTestCase): template.Template(tmpl)) self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') - scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None) + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() self.m.ReplayAll() self.stack.store() @@ -1126,7 +1173,7 @@ class StackTest(HeatTestCase): disable_rollback=False) self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') - scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None) + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() self.m.ReplayAll() self.stack.store() @@ -1196,7 +1243,7 @@ class StackTest(HeatTestCase): disable_rollback=False) self.m.StubOutWithMock(scheduler.TaskRunner, '_sleep') - scheduler.TaskRunner._sleep(mox.IsA(int)).AndReturn(None) + scheduler.TaskRunner._sleep(mox.IsA(int)).MultipleTimes() self.m.ReplayAll() self.stack.store()