rule = rule_dict['security_group_rule']
tenant_id = self._get_tenant_id_for_create(context, rule)
db = SecurityGroupRule(
- id=uuidutils.generate_uuid(), tenant_id=tenant_id,
+ id=(rule.get('id') or uuidutils.generate_uuid()),
+ tenant_id=tenant_id,
security_group_id=rule['security_group_id'],
direction=rule['direction'],
remote_group_id=rule.get('remote_group_id'),
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
+ def test_create_security_group_rule_with_specific_id(self):
+ neutron_context = context.Context('', 'test-tenant')
+ specified_id = "4cd70774-cc67-4a87-9b39-7d1db38eb087"
+ with self.security_group() as sg:
+ rule = self._build_security_group_rule(
+ sg['security_group']['id'], 'ingress', const.PROTO_NUM_TCP)
+ rule['security_group_rule'].update({'id': specified_id,
+ 'port_range_min': None,
+ 'port_range_max': None,
+ 'remote_ip_prefix': None,
+ 'remote_group_id': None})
+ result = self.plugin.create_security_group_rule(
+ neutron_context, rule)
+ self.assertEqual(specified_id, result['id'])
+
class TestConvertIPPrefixToCIDR(base.BaseTestCase):