--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from heat.engine import clients
+from heat.engine.resources.neutron import neutron
+from heat.engine import scheduler
+
+if clients.neutronclient is not None:
+ from neutronclient.common.exceptions import NeutronClientException
+
+from heat.openstack.common import log as logging
+
+logger = logging.getLogger(__name__)
+
+
+class Firewall(neutron.NeutronResource):
+ """
+ A resource for the Firewall resource in Neutron FWaaS.
+ """
+
+ properties_schema = {'name': {'Type': 'String'},
+ 'description': {'Type': 'String'},
+ 'admin_state_up': {'Type': 'Boolean',
+ 'Default': True},
+ 'firewall_policy_id': {'Type': 'String',
+ 'Required': True}}
+
+ attributes_schema = {
+ 'id': 'unique identifier for the Firewall',
+ 'name': 'name for the Firewall',
+ 'description': 'description of the Firewall',
+ 'admin_state_up': 'the administrative state of the Firewall',
+ 'firewall_policy_id': 'unique identifier of the FirewallPolicy used to'
+ 'create the Firewall',
+ 'status': 'the status of the Firewall',
+ 'tenant_id': 'Id of the tenant owning the Firewall'
+ }
+
+ update_allowed_keys = ('Properties',)
+ update_allowed_properties = ('name', 'description', 'admin_state_up',
+ 'firewall_policy_id')
+
+ def _show_resource(self):
+ return self.neutron().show_firewall(self.resource_id)['firewall']
+
+ def handle_create(self):
+ props = self.prepare_properties(
+ self.properties,
+ self.physical_resource_name())
+ firewall = self.neutron().create_firewall({'firewall': props})[
+ 'firewall']
+ self.resource_id_set(firewall['id'])
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ if prop_diff:
+ self.neutron().update_firewall(
+ self.resource_id, {'firewall': prop_diff})
+
+ def handle_delete(self):
+ client = self.neutron()
+ try:
+ client.delete_firewall(self.resource_id)
+ except NeutronClientException as ex:
+ if ex.status_code != 404:
+ raise ex
+ else:
+ return scheduler.TaskRunner(self._confirm_delete)()
+
+
+class FirewallPolicy(neutron.NeutronResource):
+ """
+ A resource for the FirewallPolicy resource in Neutron FWaaS.
+ """
+
+ properties_schema = {'name': {'Type': 'String'},
+ 'description': {'Type': 'String'},
+ 'shared': {'Type': 'Boolean',
+ 'Default': False},
+ 'audited': {'Type': 'Boolean',
+ 'Default': False},
+ 'firewall_rules': {'Type': 'List',
+ 'Required': True}}
+
+ attributes_schema = {
+ 'id': 'unique identifier for the FirewallPolicy',
+ 'name': 'name for the FirewallPolicy',
+ 'description': 'description of the FirewallPolicy',
+ 'firewall_rules': 'list of FirewallRules in this FirewallPolicy',
+ 'shared': 'shared status of this FirewallPolicy',
+ 'audited': 'audit status of this FirewallPolicy',
+ 'tenant_id': 'Id of the tenant owning the FirewallPolicy'
+ }
+
+ update_allowed_keys = ('Properties',)
+ update_allowed_properties = ('name', 'description', 'shared',
+ 'audited', 'firewall_rules')
+
+ def _show_resource(self):
+ return self.neutron().show_firewall_policy(self.resource_id)[
+ 'firewall_policy']
+
+ def handle_create(self):
+ props = self.prepare_properties(
+ self.properties,
+ self.physical_resource_name())
+ firewall_policy = self.neutron().create_firewall_policy(
+ {'firewall_policy': props})['firewall_policy']
+ self.resource_id_set(firewall_policy['id'])
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ if prop_diff:
+ self.neutron().update_firewall_policy(
+ self.resource_id, {'firewall_policy': prop_diff})
+
+ def handle_delete(self):
+ client = self.neutron()
+ try:
+ client.delete_firewall_policy(self.resource_id)
+ except NeutronClientException as ex:
+ if ex.status_code != 404:
+ raise ex
+ else:
+ return scheduler.TaskRunner(self._confirm_delete)()
+
+
+class FirewallRule(neutron.NeutronResource):
+ """
+ A resource for the FirewallRule resource in Neutron FWaaS.
+ """
+
+ properties_schema = {'name': {'Type': 'String'},
+ 'description': {'Type': 'String'},
+ 'shared': {'Type': 'Boolean',
+ 'Default': False},
+ 'protocol': {'Type': 'String',
+ 'AllowedValues': ['tcp', 'udp', 'icmp',
+ None],
+ 'Default': None},
+ 'ip_version': {'Type': 'String',
+ 'AllowedValues': ['4', '6'],
+ 'Default': '4'},
+ 'source_ip_address': {'Type': 'String',
+ 'Default': None},
+ 'destination_ip_address': {'Type': 'String',
+ 'Default': None},
+ 'source_port': {'Type': 'String',
+ 'Default': None},
+ 'destination_port': {'Type': 'String',
+ 'Default': None},
+ 'action': {'Type': 'String',
+ 'AllowedValues': ['allow', 'deny'],
+ 'Default': 'deny'},
+ 'enabled': {'Type': 'Boolean',
+ 'Default': True}}
+
+ attributes_schema = {
+ 'id': 'unique identifier for the FirewallRule',
+ 'name': 'name for the FirewallRule',
+ 'description': 'description of the FirewallRule',
+ 'firewall_policy_id': 'unique identifier of the FirewallPolicy to'
+ 'which this FirewallRule belongs',
+ 'shared': 'shared status of this FirewallRule',
+ 'protocol': 'protocol value for this FirewallRule',
+ 'ip_version': 'ip_version for this FirewallRule',
+ 'source_ip_address': 'source ip_address for this FirewallRule',
+ 'destination_ip_address': 'destination ip_address for this'
+ 'FirewallRule',
+ 'source_port': 'source port range for this FirewallRule',
+ 'destination_port': 'destination port range for this FirewallRule',
+ 'action': 'allow or deny action for this FirewallRule',
+ 'enabled': 'indicates whether this FirewallRule is enabled or not',
+ 'position': 'position of the rule within the FirewallPolicy',
+ 'tenant_id': 'Id of the tenant owning the Firewall'
+ }
+
+ update_allowed_keys = ('Properties',)
+ update_allowed_properties = ('name', 'description', 'shared',
+ 'protocol', 'ip_version', 'source_ip_address',
+ 'destination_ip_address', 'source_port',
+ 'destination_port', 'action', 'enabled')
+
+ def _show_resource(self):
+ return self.neutron().show_firewall_rule(
+ self.resource_id)['firewall_rule']
+
+ def handle_create(self):
+ props = self.prepare_properties(
+ self.properties,
+ self.physical_resource_name())
+ firewall_rule = self.neutron().create_firewall_rule(
+ {'firewall_rule': props})['firewall_rule']
+ self.resource_id_set(firewall_rule['id'])
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ if prop_diff:
+ self.neutron().update_firewall_rule(
+ self.resource_id, {'firewall_rule': prop_diff})
+
+ def handle_delete(self):
+ client = self.neutron()
+ try:
+ client.delete_firewall_rule(self.resource_id)
+ except NeutronClientException as ex:
+ if ex.status_code != 404:
+ raise ex
+ else:
+ return scheduler.TaskRunner(self._confirm_delete)()
+
+
+def resource_mapping():
+ if clients.neutronclient is None:
+ return {}
+
+ return {
+ 'OS::Neutron::Firewall': Firewall,
+ 'OS::Neutron::FirewallPolicy': FirewallPolicy,
+ 'OS::Neutron::FirewallRule': FirewallRule,
+ }
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from testtools import skipIf
+
+from heat.common import exception
+from heat.common import template_format
+from heat.engine import clients
+from heat.engine import scheduler
+from heat.engine.resources.neutron import firewall
+from heat.openstack.common.importutils import try_import
+from heat.tests import fakes
+from heat.tests import utils
+from heat.tests.common import HeatTestCase
+
+neutronclient = try_import('neutronclient.v2_0.client')
+
+firewall_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Template to test neutron firewall resource",
+ "Parameters" : {},
+ "Resources" : {
+ "firewall": {
+ "Type": "OS::Neutron::Firewall",
+ "Properties": {
+ "name": "test-firewall",
+ "firewall_policy_id": "policy-id",
+ "admin_state_up": True,
+ }
+ }
+ }
+}
+'''
+
+firewall_policy_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Template to test neutron firewall policy resource",
+ "Parameters" : {},
+ "Resources" : {
+ "firewall_policy": {
+ "Type": "OS::Neutron::FirewallPolicy",
+ "Properties": {
+ "name": "test-firewall-policy",
+ "shared": True,
+ "audited": True,
+ "firewall_rules": ['rule-id-1', 'rule-id-2'],
+ }
+ }
+ }
+}
+'''
+
+firewall_rule_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Template to test neutron firewall rule resource",
+ "Parameters" : {},
+ "Resources" : {
+ "firewall_rule": {
+ "Type": "OS::Neutron::FirewallRule",
+ "Properties": {
+ "name": "test-firewall-rule",
+ "shared": True,
+ "protocol": "tcp",
+ "action": "allow",
+ "enabled": True,
+ "ip_version": "4",
+ }
+ }
+ }
+}
+'''
+
+
+@skipIf(neutronclient is None, 'neutronclient unavailable')
+class FirewallTest(HeatTestCase):
+
+ def setUp(self):
+ super(FirewallTest, self).setUp()
+ self.m.StubOutWithMock(neutronclient.Client, 'create_firewall')
+ self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall')
+ self.m.StubOutWithMock(neutronclient.Client, 'show_firewall')
+ self.m.StubOutWithMock(neutronclient.Client, 'update_firewall')
+ self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
+ utils.setup_dummy_db()
+
+ def create_firewall(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall({
+ 'firewall': {
+ 'name': 'test-firewall', 'admin_state_up': True,
+ 'firewall_policy_id': 'policy-id'}}
+ ).AndReturn({'firewall': {'id': '5678'}})
+
+ snippet = template_format.parse(firewall_template)
+ stack = utils.parse_stack(snippet)
+ return firewall.Firewall(
+ 'firewall', snippet['Resources']['firewall'], stack)
+
+ def test_create(self):
+ rsrc = self.create_firewall()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_create_failed(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall({
+ 'firewall': {
+ 'name': 'test-firewall', 'admin_state_up': True,
+ 'firewall_policy_id': 'policy-id'}}
+ ).AndRaise(firewall.NeutronClientException())
+ self.m.ReplayAll()
+
+ snippet = template_format.parse(firewall_template)
+ stack = utils.parse_stack(snippet)
+ rsrc = firewall.Firewall(
+ 'firewall', snippet['Resources']['firewall'], stack)
+
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.create))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete(self):
+ neutronclient.Client.delete_firewall('5678')
+ neutronclient.Client.show_firewall('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_already_gone(self):
+ neutronclient.Client.delete_firewall('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_failed(self):
+ neutronclient.Client.delete_firewall('5678').AndRaise(
+ firewall.NeutronClientException(status_code=400))
+
+ rsrc = self.create_firewall()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.delete))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_attribute(self):
+ rsrc = self.create_firewall()
+ neutronclient.Client.show_firewall('5678').MultipleTimes(
+ ).AndReturn(
+ {'firewall': {'admin_state_up': True,
+ 'firewall_policy_id': 'policy-id'}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual(True, rsrc.FnGetAtt('admin_state_up'))
+ self.assertEqual('policy-id', rsrc.FnGetAtt('firewall_policy_id'))
+ self.m.VerifyAll()
+
+ def test_attribute_failed(self):
+ rsrc = self.create_firewall()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.InvalidTemplateAttribute,
+ rsrc.FnGetAtt, 'subnet_id')
+ self.assertEqual(
+ 'The Referenced Attribute (firewall subnet_id) is '
+ 'incorrect.', str(error))
+ self.m.VerifyAll()
+
+ def test_update(self):
+ rsrc = self.create_firewall()
+ neutronclient.Client.update_firewall(
+ '5678', {'firewall': {'admin_state_up': False}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+
+ update_template = copy.deepcopy(rsrc.t)
+ update_template['Properties']['admin_state_up'] = False
+ self.assertEqual(None, rsrc.update(update_template))
+
+ self.m.VerifyAll()
+
+
+@skipIf(neutronclient is None, 'neutronclient unavailable')
+class FirewallPolicyTest(HeatTestCase):
+
+ def setUp(self):
+ super(FirewallPolicyTest, self).setUp()
+ self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_policy')
+ self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_policy')
+ self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_policy')
+ self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_policy')
+ self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
+ utils.setup_dummy_db()
+
+ def create_firewall_policy(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall_policy({
+ 'firewall_policy': {
+ 'name': 'test-firewall-policy', 'shared': True,
+ 'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}}
+ ).AndReturn({'firewall_policy': {'id': '5678'}})
+
+ snippet = template_format.parse(firewall_policy_template)
+ stack = utils.parse_stack(snippet)
+ return firewall.FirewallPolicy(
+ 'firewall_policy', snippet['Resources']['firewall_policy'], stack)
+
+ def test_create(self):
+ rsrc = self.create_firewall_policy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_create_failed(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall_policy({
+ 'firewall_policy': {
+ 'name': 'test-firewall-policy', 'shared': True,
+ 'audited': True, 'firewall_rules': ['rule-id-1', 'rule-id-2']}}
+ ).AndRaise(firewall.NeutronClientException())
+ self.m.ReplayAll()
+
+ snippet = template_format.parse(firewall_policy_template)
+ stack = utils.parse_stack(snippet)
+ rsrc = firewall.FirewallPolicy(
+ 'firewall_policy', snippet['Resources']['firewall_policy'], stack)
+
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.create))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete(self):
+ neutronclient.Client.delete_firewall_policy('5678')
+ neutronclient.Client.show_firewall_policy('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall_policy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_already_gone(self):
+ neutronclient.Client.delete_firewall_policy('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall_policy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_failed(self):
+ neutronclient.Client.delete_firewall_policy('5678').AndRaise(
+ firewall.NeutronClientException(status_code=400))
+
+ rsrc = self.create_firewall_policy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.delete))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_attribute(self):
+ rsrc = self.create_firewall_policy()
+ neutronclient.Client.show_firewall_policy('5678').MultipleTimes(
+ ).AndReturn(
+ {'firewall_policy': {'audited': True, 'shared': True}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual(True, rsrc.FnGetAtt('audited'))
+ self.assertEqual(True, rsrc.FnGetAtt('shared'))
+ self.m.VerifyAll()
+
+ def test_attribute_failed(self):
+ rsrc = self.create_firewall_policy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.InvalidTemplateAttribute,
+ rsrc.FnGetAtt, 'subnet_id')
+ self.assertEqual(
+ 'The Referenced Attribute (firewall_policy subnet_id) is '
+ 'incorrect.', str(error))
+ self.m.VerifyAll()
+
+ def test_update(self):
+ rsrc = self.create_firewall_policy()
+ neutronclient.Client.update_firewall_policy(
+ '5678', {'firewall_policy': {'firewall_rules': ['3', '4']}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+
+ update_template = copy.deepcopy(rsrc.t)
+ update_template['Properties']['firewall_rules'] = ['3', '4']
+ self.assertEqual(None, rsrc.update(update_template))
+
+ self.m.VerifyAll()
+
+
+@skipIf(neutronclient is None, 'neutronclient unavailable')
+class FirewallRuleTest(HeatTestCase):
+
+ def setUp(self):
+ super(FirewallRuleTest, self).setUp()
+ self.m.StubOutWithMock(neutronclient.Client, 'create_firewall_rule')
+ self.m.StubOutWithMock(neutronclient.Client, 'delete_firewall_rule')
+ self.m.StubOutWithMock(neutronclient.Client, 'show_firewall_rule')
+ self.m.StubOutWithMock(neutronclient.Client, 'update_firewall_rule')
+ self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
+ utils.setup_dummy_db()
+
+ def create_firewall_rule(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall_rule({
+ 'firewall_rule': {
+ 'name': 'test-firewall-rule', 'shared': True,
+ 'action': 'allow', 'protocol': 'tcp', 'enabled': True,
+ 'ip_version': "4"}}
+ ).AndReturn({'firewall_rule': {'id': '5678'}})
+
+ snippet = template_format.parse(firewall_rule_template)
+ stack = utils.parse_stack(snippet)
+ return firewall.FirewallRule(
+ 'firewall_rule', snippet['Resources']['firewall_rule'], stack)
+
+ def test_create(self):
+ rsrc = self.create_firewall_rule()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_create_failed(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_firewall_rule({
+ 'firewall_rule': {
+ 'name': 'test-firewall-rule', 'shared': True,
+ 'action': 'allow', 'protocol': 'tcp', 'enabled': True,
+ 'ip_version': "4"}}
+ ).AndRaise(firewall.NeutronClientException())
+ self.m.ReplayAll()
+
+ snippet = template_format.parse(firewall_rule_template)
+ stack = utils.parse_stack(snippet)
+ rsrc = firewall.FirewallRule(
+ 'firewall_rule', snippet['Resources']['firewall_rule'], stack)
+
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.create))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete(self):
+ neutronclient.Client.delete_firewall_rule('5678')
+ neutronclient.Client.show_firewall_rule('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall_rule()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_already_gone(self):
+ neutronclient.Client.delete_firewall_rule('5678').AndRaise(
+ firewall.NeutronClientException(status_code=404))
+
+ rsrc = self.create_firewall_rule()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_delete_failed(self):
+ neutronclient.Client.delete_firewall_rule('5678').AndRaise(
+ firewall.NeutronClientException(status_code=400))
+
+ rsrc = self.create_firewall_rule()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.delete))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ def test_attribute(self):
+ rsrc = self.create_firewall_rule()
+ neutronclient.Client.show_firewall_rule('5678').MultipleTimes(
+ ).AndReturn(
+ {'firewall_rule': {'protocol': 'tcp', 'shared': True}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual('tcp', rsrc.FnGetAtt('protocol'))
+ self.assertEqual(True, rsrc.FnGetAtt('shared'))
+ self.m.VerifyAll()
+
+ def test_attribute_failed(self):
+ rsrc = self.create_firewall_rule()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.InvalidTemplateAttribute,
+ rsrc.FnGetAtt, 'subnet_id')
+ self.assertEqual(
+ 'The Referenced Attribute (firewall_rule subnet_id) is '
+ 'incorrect.', str(error))
+ self.m.VerifyAll()
+
+ def test_update(self):
+ rsrc = self.create_firewall_rule()
+ neutronclient.Client.update_firewall_rule(
+ '5678', {'firewall_rule': {'protocol': 'icmp'}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+
+ update_template = copy.deepcopy(rsrc.t)
+ update_template['Properties']['protocol'] = 'icmp'
+ self.assertEqual(None, rsrc.update(update_template))
+
+ self.m.VerifyAll()