# Check in database if rule exists
filters = self._make_security_group_rule_filter_dict(i)
- rules = self.get_security_group_rules(context, filters)
- if rules:
- raise ext_sg.SecurityGroupRuleExists(id=str(rules[0]['id']))
+ db_rules = self.get_security_group_rules(context, filters)
+ # Note(arosen): the call to get_security_group_rules wildcards
+ # values in the filter that have a value of [None]. For
+ # example, filters = {'remote_group_id': [None]} will return
+ # all security group rules regardless of their value of
+ # remote_group_id. Therefore it is not possible to do this
+ # query unless the behavior of _get_collection()
+ # is changed which cannot be because other methods are already
+ # relying on this behavor. Therefore, we do the filtering
+ # below to check for these corner cases.
+ for db_rule in db_rules:
+ # need to remove id from db_rule for matching
+ id = db_rule.pop('id')
+ if (i['security_group_rule'] == db_rule):
+ raise ext_sg.SecurityGroupRuleExists(id=id)
def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+ def test_create_security_group_rule_allow_all_ipv4(self):
+ with self.security_group() as sg:
+ rule = {'security_group_id': sg['security_group']['id'],
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'tenant_id': 'test_tenant'}
+
+ res = self._create_security_group_rule(
+ self.fmt, {'security_group_rule': rule})
+ rule = self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
+ def test_create_security_group_rule_allow_all_ipv4_v6_bulk(self):
+ if self._skip_native_bulk:
+ self.skipTest("Plugin does not support native bulk "
+ "security_group_rule create")
+ with self.security_group() as sg:
+ rule_v4 = {'security_group_id': sg['security_group']['id'],
+ 'direction': 'ingress',
+ 'ethertype': 'IPv4',
+ 'tenant_id': 'test_tenant'}
+ rule_v6 = {'security_group_id': sg['security_group']['id'],
+ 'direction': 'ingress',
+ 'ethertype': 'IPv6',
+ 'tenant_id': 'test_tenant'}
+
+ rules = {'security_group_rules': [rule_v4, rule_v6]}
+ res = self._create_security_group_rule(self.fmt, rules)
+ self.deserialize(self.fmt, res)
+ self.assertEqual(res.status_int, webob.exc.HTTPCreated.code)
+
def test_create_security_group_rule_duplicate_rule_in_post(self):
if self._skip_native_bulk:
self.skipTest("Plugin does not support native bulk "