return scheduler.TaskRunner(self._confirm_delete)()
+class IKEPolicy(neutron.NeutronResource):
+ """
+ A resource for IKE policy in Neutron.
+ """
+
+ lifetime_schema = {
+ 'units': {'Type': 'String', 'AllowedValues': ['seconds', 'kilobytes'],
+ 'Default': 'seconds'},
+ 'value': {'Type': 'Integer', 'Default': 3600},
+ }
+
+ properties_schema = {'name': {'Type': 'String'},
+ 'description': {'Type': 'String'},
+ 'auth_algorithm': {'Type': 'String',
+ 'AllowedValues': ['sha1'],
+ 'Default': 'sha1'},
+ 'encryption_algorithm': {'Type': 'String',
+ 'AllowedValues': ['3des',
+ 'aes-128',
+ 'aes-192',
+ 'aes-256'],
+ 'Default': 'aes-128'},
+ 'phase1_negotiation_mode': {'Type': 'String',
+ 'AllowedValues': ['main'],
+ 'Default': 'main'},
+ 'lifetime': {'Type': 'Map',
+ 'Schema': lifetime_schema},
+ 'pfs': {'Type': 'String',
+ 'AllowedValues': ['group2', 'group5',
+ 'group14'],
+ 'Default': 'group5'},
+ 'ike_version': {'Type': 'String',
+ 'AllowedValues': ['v1', 'v2'],
+ 'Default': 'v1'}}
+
+ attributes_schema = {
+ 'auth_algorithm': 'authentication hash algorithm used by the ike'
+ ' policy',
+ 'description': 'description of the ike policy',
+ 'encryption_algorithm': 'encryption algorithm used by the ike policy',
+ 'id': 'unique identifier for the ike policy',
+ 'ike_version': 'version of the ike policy',
+ 'lifetime': 'configuration of safety assessment lifetime for the ike'
+ ' policy',
+ 'name': 'name for the ike policy',
+ 'pfs': 'perfect forward secrecy for the ike policy',
+ 'phase1_negotiation_mode': 'negotiation mode for the ike policy',
+ 'tenant_id': 'tenant owning the ike policy',
+ }
+
+ update_allowed_keys = ('Properties',)
+
+ update_allowed_properties = ('name', 'description',)
+
+ def _show_resource(self):
+ return self.neutron().show_ikepolicy(self.resource_id)['ikepolicy']
+
+ def handle_create(self):
+ props = self.prepare_properties(
+ self.properties,
+ self.physical_resource_name())
+ ikepolicy = self.neutron().create_ikepolicy({'ikepolicy': props})[
+ 'ikepolicy']
+ self.resource_id_set(ikepolicy['id'])
+
+ def handle_update(self, json_snippet, tmpl_diff, prop_diff):
+ if prop_diff:
+ self.neutron().update_ikepolicy(self.resource_id,
+ {'ikepolicy': prop_diff})
+
+ def handle_delete(self):
+ client = self.neutron()
+ try:
+ client.delete_ikepolicy(self.resource_id)
+ except NeutronClientException as ex:
+ if ex.status_code != 404:
+ raise ex
+ else:
+ return scheduler.TaskRunner(self._confirm_delete)()
+
+
def resource_mapping():
if clients.neutronclient is None:
return {}
return {
'OS::Neutron::VPNService': VPNService,
+ 'OS::Neutron::IKEPolicy': IKEPolicy
}
}
'''
+ikepolicy_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Template to test IKE policy resource",
+ "Parameters" : {},
+ "Resources" : {
+ "IKEPolicy" : {
+ "Type" : "OS::Neutron::IKEPolicy",
+ "Properties" : {
+ "name" : "IKEPolicy",
+ "description" : "My new IKE policy",
+ "auth_algorithm" : "sha1",
+ "encryption_algorithm" : "3des",
+ "phase1_negotiation_mode" : "main",
+ "lifetime" : {
+ "units" : "seconds",
+ "value" : 3600
+ },
+ "pfs" : "group5",
+ "ike_version" : "v1"
+ }
+ }
+ }
+}
+'''
+
@skipIf(neutronclient is None, 'neutronclient unavailable')
class VPNServiceTest(HeatTestCase):
update_template['Properties']['admin_state_up'] = False
scheduler.TaskRunner(rsrc.update, update_template)()
self.m.VerifyAll()
+
+
+@skipIf(neutronclient is None, 'neutronclient unavailable')
+class IKEPolicyTest(HeatTestCase):
+
+ IKE_POLICY_CONF = {
+ 'ikepolicy': {
+ 'name': 'IKEPolicy',
+ 'description': 'My new IKE policy',
+ 'auth_algorithm': 'sha1',
+ 'encryption_algorithm': '3des',
+ 'phase1_negotiation_mode': 'main',
+ 'lifetime': {
+ 'units': 'seconds',
+ 'value': 3600
+ },
+ 'pfs': 'group5',
+ 'ike_version': 'v1'
+ }
+ }
+
+ def setUp(self):
+ super(IKEPolicyTest, self).setUp()
+ self.m.StubOutWithMock(neutronclient.Client, 'create_ikepolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'delete_ikepolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'show_ikepolicy')
+ self.m.StubOutWithMock(neutronclient.Client, 'update_ikepolicy')
+ self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
+ utils.setup_dummy_db()
+
+ def create_ikepolicy(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_ikepolicy(
+ self.IKE_POLICY_CONF).AndReturn(
+ {'ikepolicy': {'id': 'ike123'}})
+ snippet = template_format.parse(ikepolicy_template)
+ self.stack = utils.parse_stack(snippet)
+ return vpnservice.IKEPolicy('ikepolicy',
+ snippet['Resources']['IKEPolicy'],
+ self.stack)
+
+ @utils.stack_delete_after
+ def test_create(self):
+ rsrc = self.create_ikepolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_create_failed(self):
+ clients.OpenStackClients.keystone().AndReturn(
+ fakes.FakeKeystoneClient())
+ neutronclient.Client.create_ikepolicy(
+ self.IKE_POLICY_CONF).AndRaise(
+ vpnservice.NeutronClientException())
+ self.m.ReplayAll()
+ snippet = template_format.parse(ikepolicy_template)
+ self.stack = utils.parse_stack(snippet)
+ rsrc = vpnservice.IKEPolicy(
+ 'ikepolicy',
+ snippet['Resources']['IKEPolicy'],
+ self.stack)
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.create))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.CREATE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete(self):
+ neutronclient.Client.delete_ikepolicy('ike123')
+ neutronclient.Client.show_ikepolicy('ike123').AndRaise(
+ vpnservice.NeutronClientException(status_code=404))
+ rsrc = self.create_ikepolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete_already_gone(self):
+ neutronclient.Client.delete_ikepolicy('ike123').AndRaise(
+ vpnservice.NeutronClientException(status_code=404))
+ rsrc = self.create_ikepolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ scheduler.TaskRunner(rsrc.delete)()
+ self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_delete_failed(self):
+ neutronclient.Client.delete_ikepolicy('ike123').AndRaise(
+ vpnservice.NeutronClientException(status_code=400))
+ rsrc = self.create_ikepolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.ResourceFailure,
+ scheduler.TaskRunner(rsrc.delete))
+ self.assertEqual(
+ 'NeutronClientException: An unknown exception occurred.',
+ str(error))
+ self.assertEqual((rsrc.DELETE, rsrc.FAILED), rsrc.state)
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_attribute(self):
+ rsrc = self.create_ikepolicy()
+ neutronclient.Client.show_ikepolicy(
+ 'ike123').MultipleTimes().AndReturn(self.IKE_POLICY_CONF)
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ self.assertEqual('IKEPolicy', rsrc.FnGetAtt('name'))
+ self.assertEqual('My new IKE policy', rsrc.FnGetAtt('description'))
+ self.assertEqual('sha1', rsrc.FnGetAtt('auth_algorithm'))
+ self.assertEqual('3des', rsrc.FnGetAtt('encryption_algorithm'))
+ self.assertEqual('main', rsrc.FnGetAtt('phase1_negotiation_mode'))
+ self.assertEqual('seconds', rsrc.FnGetAtt('lifetime')['units'])
+ self.assertEqual(3600, rsrc.FnGetAtt('lifetime')['value'])
+ self.assertEqual('group5', rsrc.FnGetAtt('pfs'))
+ self.assertEqual('v1', rsrc.FnGetAtt('ike_version'))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_attribute_failed(self):
+ rsrc = self.create_ikepolicy()
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ error = self.assertRaises(exception.InvalidTemplateAttribute,
+ rsrc.FnGetAtt, 'non-existent_property')
+ self.assertEqual(
+ 'The Referenced Attribute (ikepolicy non-existent_property) is '
+ 'incorrect.',
+ str(error))
+ self.m.VerifyAll()
+
+ @utils.stack_delete_after
+ def test_update(self):
+ rsrc = self.create_ikepolicy()
+ neutronclient.Client.update_ikepolicy('ike123',
+ {'ikepolicy': {
+ 'name': 'New IKEPolicy'}})
+ self.m.ReplayAll()
+ scheduler.TaskRunner(rsrc.create)()
+ update_template = copy.deepcopy(rsrc.t)
+ update_template['Properties']['name'] = 'New IKEPolicy'
+ scheduler.TaskRunner(rsrc.update, update_template)()
+ self.m.VerifyAll()