return scheduler.TaskRunner(self._confirm_delete)()
+class IPsecPolicy(neutron.NeutronResource):
+ """
+ A resource for IPsec policy in Neutron.
+ """
+
+ lifetime_schema = {
+ 'units': {'Type': 'String', 'AllowedValues': ['seconds', 'kilobytes'],
+ 'Default': 'seconds'},
+ 'value': {'Type': 'Integer', 'Default': 3600},
+ }
+
+ properties_schema = {'name': {'Type': 'String'},
+ 'description': {'Type': 'String'},
+ 'transform_protocol': {'Type': 'String',
+ 'AllowedValues': ['esp', 'ah',
+ 'ah-esp'],
+ 'Default': 'esp'},
+ 'encapsulation_mode': {'Type': 'String',
+ 'AllowedValues': ['tunnel',
+ 'transport'],
+ 'Default': 'tunnel'},
+ 'auth_algorithm': {'Type': 'String',
+ 'AllowedValues': ['sha1'],
+ 'Default': 'sha1'},
+ 'encryption_algorithm': {'Type': 'String',
+ 'AllowedValues': ['3des',
+ 'aes-128',
+ 'aes-192',
+ 'aes-256'],
+ 'Default': 'aes-128'},
+ 'lifetime': {'Type': 'Map',
+ 'Schema': lifetime_schema},
+ 'pfs': {'Type': 'String',
+ 'AllowedValues': ['group2', 'group5',
+ 'group14'],
+ 'Default': 'group5'}}
+
+ attributes_schema = {
+ 'auth_algorithm': 'authentication hash algorithm used by the ipsec'
+ ' policy',
+ 'description': 'description of the ipsec policy',
+ 'encapsulation_mode': 'encapsulation mode for the ipsec policy',
+ 'encryption_algorithm': 'encryption algorithm for the ipsec policy',
+ 'id': 'unique identifier for this ipsec policy',
+ 'lifetime': 'configuration of safety assessment lifetime for the ipsec'
+ ' policy',
+ 'name': 'name for the ipsec policy',
+ 'pfs': 'perfect forward secrecy for the ipsec policy',
+ 'tenant_id': 'tenant owning the ipsec policy',
+ 'transform_protocol': 'transform protocol for the ipsec policy'
+ }
+
+ update_allowed_keys = ('Properties',)
+
+ update_allowed_properties = ('name', 'description',)
+
+ def _show_resource(self):
+ return self.neutron().show_ipsecpolicy(self.resource_id)['ipsecpolicy']
+
+ def handle_create(self):
+ props = self.prepare_properties(
+ self.properties,
+ self.physical_resource_name())
+ ipsecpolicy = self.neutron().create_ipsecpolicy(
+ {'ipsecpolicy': props})['ipsecpolicy']
+ self.resource_id_set(ipsecpolicy['id'])
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ if prop_diff:
+ self.neutron().update_ipsecpolicy(self.resource_id,
+ {'ipsecpolicy': prop_diff})
+
+ def handle_delete(self):
+ client = self.neutron()
+ try:
+ client.delete_ipsecpolicy(self.resource_id)
+ except NeutronClientException as ex:
+ if ex.status_code != 404:
+ raise ex
+ else:
+ return scheduler.TaskRunner(self._confirm_delete)()
+
+
def resource_mapping():
if clients.neutronclient is None:
return {}
return {
'OS::Neutron::VPNService': VPNService,
- 'OS::Neutron::IKEPolicy': IKEPolicy
+ 'OS::Neutron::IKEPolicy': IKEPolicy,
+ 'OS::Neutron::IPsecPolicy': IPsecPolicy
}
}
'''
+ipsecpolicy_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Template to test IPsec policy resource",
+ "Parameters" : {},
+ "Resources" : {
+ "IPsecPolicy" : {
+ "Type" : "OS::Neutron::IPsecPolicy",
+ "Properties" : {
+ "name" : "IPsecPolicy",
+ "description" : "My new IPsec policy",
+ "transform_protocol": "esp",
+ "encapsulation_mode" : "tunnel",
+ "auth_algorithm" : "sha1",
+ "encryption_algorithm" : "3des",
+ "lifetime" : {
+ "units" : "seconds",
+ "value" : 3600
+ },
+ "pfs" : "group5"
+ }
+ }
+ }
+}
+'''
+
@skipIf(neutronclient is None, 'neutronclient unavailable')
class VPNServiceTest(HeatTestCase):
update_template['Properties']['name'] = 'New IKEPolicy'
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
+
+
+@skipIf(neutronclient is None, 'neutronclient unavailable')
+class IPsecPolicyTest(HeatTestCase):
+
+ IPSEC_POLICY_CONF = {
+ 'ipsecpolicy': {
+ 'name': 'IPsecPolicy',
+ 'description': 'My new IPsec policy',
+ 'transform_protocol': 'esp',
+ 'encapsulation_mode': 'tunnel',
+ 'auth_algorithm': 'sha1',
+ 'encryption_algorithm': '3des',
+ 'lifetime': {
+ 'units': 'seconds',
+ 'value': 3600
+ },
+ 'pfs': 'group5'
+ }
+ }
+
+ def setUp(self):
+ super(IPsecPolicyTest, self).setUp()
+ self.m.StubOutWithMock(neutronclient.Client, 'create_ipsecpolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'delete_ipsecpolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'show_ipsecpolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'update_ipsecpolicy')
+ self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
+ utils.setup_dummy_db()
+
+ def create_ipsecpolicy(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_ipsecpolicy(
+ self.IPSEC_POLICY_CONF).AndReturn(
+ {'ipsecpolicy': {'id': 'ips123'}})
+ snippet = template_format.parse(ipsecpolicy_template)
+ self.stack = utils.parse_stack(snippet)
+ return vpnservice.IPsecPolicy('ipsecpolicy',
+ snippet['Resources']['IPsecPolicy'],
+ self.stack)
+
+ @utils.stack_delete_after
+ def test_create(self):
+ rsrc = self.create_ipsecpolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_create_failed(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_ipsecpolicy(
+ self.IPSEC_POLICY_CONF).AndRaise(
+ vpnservice.NeutronClientException())
+ self.m.ReplayAll()
+ snippet = template_format.parse(ipsecpolicy_template)
+ self.stack = utils.parse_stack(snippet)
+ rsrc = vpnservice.IPsecPolicy(
+ 'ipsecpolicy',
+ snippet['Resources']['IPsecPolicy'],
+ self.stack)
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.create))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete(self):
+ neutronclient.Client.delete_ipsecpolicy('ips123')
+ neutronclient.Client.show_ipsecpolicy('ips123').AndRaise(
+ vpnservice.NeutronClientException(status_code=404))
+ rsrc = self.create_ipsecpolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete_already_gone(self):
+ neutronclient.Client.delete_ipsecpolicy('ips123').AndRaise(
+ vpnservice.NeutronClientException(status_code=404))
+ rsrc = self.create_ipsecpolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete_failed(self):
+ neutronclient.Client.delete_ipsecpolicy('ips123').AndRaise(
+ vpnservice.NeutronClientException(status_code=400))
+ rsrc = self.create_ipsecpolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.delete))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_attribute(self):
+ rsrc = self.create_ipsecpolicy()
+ neutronclient.Client.show_ipsecpolicy(
+ 'ips123').MultipleTimes().AndReturn(self.IPSEC_POLICY_CONF)
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual('IPsecPolicy', rsrc.FnGetAtt('name'))
+ self.assertEqual('My new IPsec policy', rsrc.FnGetAtt('description'))
+ self.assertEqual('esp', rsrc.FnGetAtt('transform_protocol'))
+ self.assertEqual('tunnel', rsrc.FnGetAtt('encapsulation_mode'))
+ self.assertEqual('sha1', rsrc.FnGetAtt('auth_algorithm'))
+ self.assertEqual('3des', rsrc.FnGetAtt('encryption_algorithm'))
+ self.assertEqual('seconds', rsrc.FnGetAtt('lifetime')['units'])
+ self.assertEqual(3600, rsrc.FnGetAtt('lifetime')['value'])
+ self.assertEqual('group5', rsrc.FnGetAtt('pfs'))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_attribute_failed(self):
+ rsrc = self.create_ipsecpolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.InvalidTemplateAttribute,
+ rsrc.FnGetAtt, 'non-existent_property')
+ self.assertEqual(
+ 'The Referenced Attribute (ipsecpolicy non-existent_property) is '
+ 'incorrect.',
+ str(error))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_update(self):
+ rsrc = self.create_ipsecpolicy()
+ neutronclient.Client.update_ipsecpolicy(
+ 'ips123',
+ {'ipsecpolicy': {'name': 'New IPsecPolicy'}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ update_template = copy.deepcopy(rsrc.t)
+ update_template['Properties']['name'] = 'New IPsecPolicy'
+ scheduler.TaskRunner(rsrc.update, update_template)()
+ self.m.VerifyAll()