From 458f1df58f7ef248da60310f3b2642eb621a3f46 Mon Sep 17 00:00:00 2001 From: Sumit Naiksatam Date: Thu, 22 Aug 2013 14:27:45 -0700 Subject: [PATCH] Adds support for Neutron Firewall Introduces the following Neutron Firewall as a Service (FWaaS) components to Heat resources: * Firewall * FirewallPolicy * FirewallRule Change-Id: I646a1402a336e7cc7c88d555cc836614bda9dca2 Implements: blueprint fwaas-heat --- heat/engine/resources/neutron/firewall.py | 229 +++++++++++ heat/tests/test_neutron_firewall.py | 480 ++++++++++++++++++++++ 2 files changed, 709 insertions(+) create mode 100644 heat/engine/resources/neutron/firewall.py create mode 100644 heat/tests/test_neutron_firewall.py diff --git a/heat/engine/resources/neutron/firewall.py b/heat/engine/resources/neutron/firewall.py new file mode 100644 index 00000000..0d9eb6c7 --- /dev/null +++ b/heat/engine/resources/neutron/firewall.py @@ -0,0 +1,229 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from heat.engine import clients +from heat.engine.resources.neutron import neutron +from heat.engine import scheduler + +if clients.neutronclient is not None: + from neutronclient.common.exceptions import NeutronClientException + +from heat.openstack.common import log as logging + +logger = logging.getLogger(__name__) + + +class Firewall(neutron.NeutronResource): + """ + A resource for the Firewall resource in Neutron FWaaS. + """ + + properties_schema = {'name': {'Type': 'String'}, + 'description': {'Type': 'String'}, + 'admin_state_up': {'Type': 'Boolean', + 'Default': True}, + 'firewall_policy_id': {'Type': 'String', + 'Required': True}} + + attributes_schema = { + 'id': 'unique identifier for the Firewall', + 'name': 'name for the Firewall', + 'description': 'description of the Firewall', + 'admin_state_up': 'the administrative state of the Firewall', + 'firewall_policy_id': 'unique identifier of the FirewallPolicy used to' + 'create the Firewall', + 'status': 'the status of the Firewall', + 'tenant_id': 'Id of the tenant owning the Firewall' + } + + update_allowed_keys = ('Properties',) + update_allowed_properties = ('name', 'description', 'admin_state_up', + 'firewall_policy_id') + + def _show_resource(self): + return self.neutron().show_firewall(self.resource_id)['firewall'] + + def handle_create(self): + props = self.prepare_properties( + self.properties, + self.physical_resource_name()) + firewall = self.neutron().create_firewall({'firewall': props})[ + 'firewall'] + self.resource_id_set(firewall['id']) + + def handle_update(self, json_snippet, tmpl_diff, prop_diff): + if prop_diff: + self.neutron().update_firewall( + self.resource_id, {'firewall': prop_diff}) + + def handle_delete(self): + client = self.neutron() + try: + client.delete_firewall(self.resource_id) + except NeutronClientException as ex: + if ex.status_code != 404: + raise ex + else: + return scheduler.TaskRunner(self._confirm_delete)() + + +class FirewallPolicy(neutron.NeutronResource): + """ + A resource for the FirewallPolicy resource in Neutron FWaaS. + """ + + properties_schema = {'name': {'Type': 'String'}, + 'description': {'Type': 'String'}, + 'shared': {'Type': 'Boolean', + 'Default': False}, + 'audited': {'Type': 'Boolean', + 'Default': False}, + 'firewall_rules': {'Type': 'List', + 'Required': True}} + + attributes_schema = { + 'id': 'unique identifier for the FirewallPolicy', + 'name': 'name for the FirewallPolicy', + 'description': 'description of the FirewallPolicy', + 'firewall_rules': 'list of FirewallRules in this FirewallPolicy', + 'shared': 'shared status of this FirewallPolicy', + 'audited': 'audit status of this FirewallPolicy', + 'tenant_id': 'Id of the tenant owning the FirewallPolicy' + } + + update_allowed_keys = ('Properties',) + update_allowed_properties = ('name', 'description', 'shared', + 'audited', 'firewall_rules') + + def _show_resource(self): + return self.neutron().show_firewall_policy(self.resource_id)[ + 'firewall_policy'] + + def handle_create(self): + props = self.prepare_properties( + self.properties, + self.physical_resource_name()) + firewall_policy = self.neutron().create_firewall_policy( + {'firewall_policy': props})['firewall_policy'] + self.resource_id_set(firewall_policy['id']) + + def handle_update(self, json_snippet, tmpl_diff, prop_diff): + if prop_diff: + self.neutron().update_firewall_policy( + self.resource_id, {'firewall_policy': prop_diff}) + + def handle_delete(self): + client = self.neutron() + try: + client.delete_firewall_policy(self.resource_id) + except NeutronClientException as ex: + if ex.status_code != 404: + raise ex + else: + return scheduler.TaskRunner(self._confirm_delete)() + + +class FirewallRule(neutron.NeutronResource): + """ + A resource for the FirewallRule resource in Neutron FWaaS. + """ + + properties_schema = {'name': {'Type': 'String'}, + 'description': {'Type': 'String'}, + 'shared': {'Type': 'Boolean', + 'Default': False}, + 'protocol': {'Type': 'String', + 'AllowedValues': ['tcp', 'udp', 'icmp', + None], + 'Default': None}, + 'ip_version': {'Type': 'String', + 'AllowedValues': ['4', '6'], + 'Default': '4'}, + 'source_ip_address': {'Type': 'String', + 'Default': None}, + 'destination_ip_address': {'Type': 'String', + 'Default': None}, + 'source_port': {'Type': 'String', + 'Default': None}, + 'destination_port': {'Type': 'String', + 'Default': None}, + 'action': {'Type': 'String', + 'AllowedValues': ['allow', 'deny'], + 'Default': 'deny'}, + 'enabled': {'Type': 'Boolean', + 'Default': True}} + + attributes_schema = { + 'id': 'unique identifier for the FirewallRule', + 'name': 'name for the FirewallRule', + 'description': 'description of the FirewallRule', + 'firewall_policy_id': 'unique identifier of the FirewallPolicy to' + 'which this FirewallRule belongs', + 'shared': 'shared status of this FirewallRule', + 'protocol': 'protocol value for this FirewallRule', + 'ip_version': 'ip_version for this FirewallRule', + 'source_ip_address': 'source ip_address for this FirewallRule', + 'destination_ip_address': 'destination ip_address for this' + 'FirewallRule', + 'source_port': 'source port range for this FirewallRule', + 'destination_port': 'destination port range for this FirewallRule', + 'action': 'allow or deny action for this FirewallRule', + 'enabled': 'indicates whether this FirewallRule is enabled or not', + 'position': 'position of the rule within the FirewallPolicy', + 'tenant_id': 'Id of the tenant owning the Firewall' + } + + update_allowed_keys = ('Properties',) + update_allowed_properties = ('name', 'description', 'shared', + 'protocol', 'ip_version', 'source_ip_address', + 'destination_ip_address', 'source_port', + 'destination_port', 'action', 'enabled') + + def _show_resource(self): + return self.neutron().show_firewall_rule( + self.resource_id)['firewall_rule'] + + def handle_create(self): + props = self.prepare_properties( + self.properties, + self.physical_resource_name()) + firewall_rule = self.neutron().create_firewall_rule( + {'firewall_rule': props})['firewall_rule'] + self.resource_id_set(firewall_rule['id']) + + def handle_update(self, json_snippet, tmpl_diff, prop_diff): + if prop_diff: + self.neutron().update_firewall_rule( + self.resource_id, {'firewall_rule': prop_diff}) + + def handle_delete(self): + client = self.neutron() + try: + client.delete_firewall_rule(self.resource_id) + except NeutronClientException as ex: + if ex.status_code != 404: + raise ex + else: + return scheduler.TaskRunner(self._confirm_delete)() + + +def resource_mapping(): + if clients.neutronclient is None: + return {} + + return { + 'OS::Neutron::Firewall': Firewall, + 'OS::Neutron::FirewallPolicy': FirewallPolicy, + 'OS::Neutron::FirewallRule': FirewallRule, + } diff --git a/heat/tests/test_neutron_firewall.py b/heat/tests/test_neutron_firewall.py new file mode 100644 index 00000000..fc532adb --- /dev/null +++ b/heat/tests/test_neutron_firewall.py @@ -0,0 +1,480 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from testtools import skipIf + +from heat.common import exception +from heat.common import template_format +from heat.engine import clients +from heat.engine import scheduler +from heat.engine.resources.neutron import firewall +from heat.openstack.common.importutils import try_import +from heat.tests import fakes +from heat.tests import utils +from heat.tests.common import HeatTestCase + +neutronclient = try_import('neutronclient.v2_0.client') + +firewall_template = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "Template to test neutron firewall resource", + "Parameters" : {}, + "Resources" : { + "firewall": { + "Type": "OS::Neutron::Firewall", + "Properties": { + "name": "test-firewall", + "firewall_policy_id": "policy-id", + "admin_state_up": True, + } + } + } +} +''' + +firewall_policy_template = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "Template to test neutron firewall policy resource", + "Parameters" : {}, + "Resources" : { + "firewall_policy": { + "Type": "OS::Neutron::FirewallPolicy", + "Properties": { + "name": "test-firewall-policy", + "shared": True, + "audited": True, + "firewall_rules": ['rule-id-1', 'rule-id-2'], + } + } + } +} +''' + +firewall_rule_template = ''' +{ + "AWSTemplateFormatVersion" : "2010-09-09", + "Description" : "Template to test neutron firewall rule resource", + "Parameters" : {}, + "Resources" : { + "firewall_rule": { + "Type": "OS::Neutron::FirewallRule", + "Properties": { + "name": "test-firewall-rule", + "shared": True, + "protocol": "tcp", + "action": "allow", + "enabled": True, + "ip_version": "4", + } + } + } +} +''' + + +@skipIf(neutronclient is None, 'neutronclient unavailable') +class FirewallTest(HeatTestCase): + + def setUp(self): + super(FirewallTest, self).setUp() + self.m.StubOutWithMock(neutronclient.Client, 'create_firewall') + self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall') + self.m.StubOutWithMock(neutronclient.Client, 'show_firewall') + self.m.StubOutWithMock(neutronclient.Client, 'update_firewall') + self.m.StubOutWithMock(clients.OpenStackClients, 'keystone') + utils.setup_dummy_db() + + def create_firewall(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall({ + 'firewall': { + 'name': 'test-firewall', 'admin_state_up': True, + 'firewall_policy_id': 'policy-id'}} + ).AndReturn({'firewall': {'id': '5678'}}) + + snippet = template_format.parse(firewall_template) + stack = utils.parse_stack(snippet) + return firewall.Firewall( + 'firewall', snippet['Resources']['firewall'], stack) + + def test_create(self): + rsrc = self.create_firewall() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_create_failed(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall({ + 'firewall': { + 'name': 'test-firewall', 'admin_state_up': True, + 'firewall_policy_id': 'policy-id'}} + ).AndRaise(firewall.NeutronClientException()) + self.m.ReplayAll() + + snippet = template_format.parse(firewall_template) + stack = utils.parse_stack(snippet) + rsrc = firewall.Firewall( + 'firewall', snippet['Resources']['firewall'], stack) + + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.create)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_delete(self): + neutronclient.Client.delete_firewall('5678') + neutronclient.Client.show_firewall('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_already_gone(self): + neutronclient.Client.delete_firewall('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_failed(self): + neutronclient.Client.delete_firewall('5678').AndRaise( + firewall.NeutronClientException(status_code=400)) + + rsrc = self.create_firewall() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.delete)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_attribute(self): + rsrc = self.create_firewall() + neutronclient.Client.show_firewall('5678').MultipleTimes( + ).AndReturn( + {'firewall': {'admin_state_up': True, + 'firewall_policy_id': 'policy-id'}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual(True, rsrc.FnGetAtt('admin_state_up')) + self.assertEqual('policy-id', rsrc.FnGetAtt('firewall_policy_id')) + self.m.VerifyAll() + + def test_attribute_failed(self): + rsrc = self.create_firewall() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.InvalidTemplateAttribute, + rsrc.FnGetAtt, 'subnet_id') + self.assertEqual( + 'The Referenced Attribute (firewall subnet_id) is ' + 'incorrect.', str(error)) + self.m.VerifyAll() + + def test_update(self): + rsrc = self.create_firewall() + neutronclient.Client.update_firewall( + '5678', {'firewall': {'admin_state_up': False}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + + update_template = copy.deepcopy(rsrc.t) + update_template['Properties']['admin_state_up'] = False + self.assertEqual(None, rsrc.update(update_template)) + + self.m.VerifyAll() + + +@skipIf(neutronclient is None, 'neutronclient unavailable') +class FirewallPolicyTest(HeatTestCase): + + def setUp(self): + super(FirewallPolicyTest, self).setUp() + self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_policy') + self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_policy') + self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_policy') + self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_policy') + self.m.StubOutWithMock(clients.OpenStackClients, 'keystone') + utils.setup_dummy_db() + + def create_firewall_policy(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall_policy({ + 'firewall_policy': { + 'name': 'test-firewall-policy', 'shared': True, + 'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}} + ).AndReturn({'firewall_policy': {'id': '5678'}}) + + snippet = template_format.parse(firewall_policy_template) + stack = utils.parse_stack(snippet) + return firewall.FirewallPolicy( + 'firewall_policy', snippet['Resources']['firewall_policy'], stack) + + def test_create(self): + rsrc = self.create_firewall_policy() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_create_failed(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall_policy({ + 'firewall_policy': { + 'name': 'test-firewall-policy', 'shared': True, + 'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}} + ).AndRaise(firewall.NeutronClientException()) + self.m.ReplayAll() + + snippet = template_format.parse(firewall_policy_template) + stack = utils.parse_stack(snippet) + rsrc = firewall.FirewallPolicy( + 'firewall_policy', snippet['Resources']['firewall_policy'], stack) + + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.create)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_delete(self): + neutronclient.Client.delete_firewall_policy('5678') + neutronclient.Client.show_firewall_policy('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall_policy() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_already_gone(self): + neutronclient.Client.delete_firewall_policy('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall_policy() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_failed(self): + neutronclient.Client.delete_firewall_policy('5678').AndRaise( + firewall.NeutronClientException(status_code=400)) + + rsrc = self.create_firewall_policy() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.delete)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_attribute(self): + rsrc = self.create_firewall_policy() + neutronclient.Client.show_firewall_policy('5678').MultipleTimes( + ).AndReturn( + {'firewall_policy': {'audited': True, 'shared': True}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual(True, rsrc.FnGetAtt('audited')) + self.assertEqual(True, rsrc.FnGetAtt('shared')) + self.m.VerifyAll() + + def test_attribute_failed(self): + rsrc = self.create_firewall_policy() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.InvalidTemplateAttribute, + rsrc.FnGetAtt, 'subnet_id') + self.assertEqual( + 'The Referenced Attribute (firewall_policy subnet_id) is ' + 'incorrect.', str(error)) + self.m.VerifyAll() + + def test_update(self): + rsrc = self.create_firewall_policy() + neutronclient.Client.update_firewall_policy( + '5678', {'firewall_policy': {'firewall_rules': ['3', '4']}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + + update_template = copy.deepcopy(rsrc.t) + update_template['Properties']['firewall_rules'] = ['3', '4'] + self.assertEqual(None, rsrc.update(update_template)) + + self.m.VerifyAll() + + +@skipIf(neutronclient is None, 'neutronclient unavailable') +class FirewallRuleTest(HeatTestCase): + + def setUp(self): + super(FirewallRuleTest, self).setUp() + self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_rule') + self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_rule') + self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_rule') + self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_rule') + self.m.StubOutWithMock(clients.OpenStackClients, 'keystone') + utils.setup_dummy_db() + + def create_firewall_rule(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall_rule({ + 'firewall_rule': { + 'name': 'test-firewall-rule', 'shared': True, + 'action': 'allow', 'protocol': 'tcp', 'enabled': True, + 'ip_version': "4"}} + ).AndReturn({'firewall_rule': {'id': '5678'}}) + + snippet = template_format.parse(firewall_rule_template) + stack = utils.parse_stack(snippet) + return firewall.FirewallRule( + 'firewall_rule', snippet['Resources']['firewall_rule'], stack) + + def test_create(self): + rsrc = self.create_firewall_rule() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_create_failed(self): + clients.OpenStackClients.keystone().AndReturn( + fakes.FakeKeystoneClient()) + neutronclient.Client.create_firewall_rule({ + 'firewall_rule': { + 'name': 'test-firewall-rule', 'shared': True, + 'action': 'allow', 'protocol': 'tcp', 'enabled': True, + 'ip_version': "4"}} + ).AndRaise(firewall.NeutronClientException()) + self.m.ReplayAll() + + snippet = template_format.parse(firewall_rule_template) + stack = utils.parse_stack(snippet) + rsrc = firewall.FirewallRule( + 'firewall_rule', snippet['Resources']['firewall_rule'], stack) + + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.create)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_delete(self): + neutronclient.Client.delete_firewall_rule('5678') + neutronclient.Client.show_firewall_rule('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall_rule() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_already_gone(self): + neutronclient.Client.delete_firewall_rule('5678').AndRaise( + firewall.NeutronClientException(status_code=404)) + + rsrc = self.create_firewall_rule() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + scheduler.TaskRunner(rsrc.delete)() + self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state) + self.m.VerifyAll() + + def test_delete_failed(self): + neutronclient.Client.delete_firewall_rule('5678').AndRaise( + firewall.NeutronClientException(status_code=400)) + + rsrc = self.create_firewall_rule() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.ResourceFailure, + scheduler.TaskRunner(rsrc.delete)) + self.assertEqual( + 'NeutronClientException: An unknown exception occurred.', + str(error)) + self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state) + self.m.VerifyAll() + + def test_attribute(self): + rsrc = self.create_firewall_rule() + neutronclient.Client.show_firewall_rule('5678').MultipleTimes( + ).AndReturn( + {'firewall_rule': {'protocol': 'tcp', 'shared': True}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + self.assertEqual('tcp', rsrc.FnGetAtt('protocol')) + self.assertEqual(True, rsrc.FnGetAtt('shared')) + self.m.VerifyAll() + + def test_attribute_failed(self): + rsrc = self.create_firewall_rule() + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + error = self.assertRaises(exception.InvalidTemplateAttribute, + rsrc.FnGetAtt, 'subnet_id') + self.assertEqual( + 'The Referenced Attribute (firewall_rule subnet_id) is ' + 'incorrect.', str(error)) + self.m.VerifyAll() + + def test_update(self): + rsrc = self.create_firewall_rule() + neutronclient.Client.update_firewall_rule( + '5678', {'firewall_rule': {'protocol': 'icmp'}}) + self.m.ReplayAll() + scheduler.TaskRunner(rsrc.create)() + + update_template = copy.deepcopy(rsrc.t) + update_template['Properties']['protocol'] = 'icmp' + self.assertEqual(None, rsrc.update(update_template)) + + self.m.VerifyAll() -- 2.45.2