fwr = firewall_rule['firewall_rule']
self._validate_fwr_protocol_parameters(fwr)
tenant_id = self._get_tenant_id_for_create(context, fwr)
+ if not fwr['protocol'] and (fwr['source_port'] or
+ fwr['destination_port']):
+ raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
src_port_min, src_port_max = self._get_min_max_ports_from_range(
fwr['source_port'])
dst_port_min, dst_port_max = self._get_min_max_ports_from_range(
del fwr['destination_port']
with context.session.begin(subtransactions=True):
fwr_db = self._get_firewall_rule(context, id)
+ protocol = fwr.get('protocol', fwr_db['protocol'])
+ if not protocol:
+ sport = fwr.get('source_port_range_min',
+ fwr_db['source_port_range_min'])
+ dport = fwr.get('destination_port_range_min',
+ fwr_db['destination_port_range_min'])
+ if sport or dport:
+ raise firewall.FirewallRuleWithPortWithoutProtocolInvalid()
fwr_db.update(fwr)
if fwr_db.firewall_policy_id:
fwp_db = self._get_firewall_policy(context,
for k, v in attrs.iteritems():
self.assertEqual(firewall_rule['firewall_rule'][k], v)
+ def test_create_firewall_rule_without_protocol_with_dport(self):
+ attrs = self._get_test_firewall_rule_attrs()
+ attrs['protocol'] = None
+ attrs['source_port'] = None
+ res = self._create_firewall_rule(self.fmt, **attrs)
+ self.assertEqual(400, res.status_int)
+
+ def test_create_firewall_rule_without_protocol_with_sport(self):
+ attrs = self._get_test_firewall_rule_attrs()
+ attrs['protocol'] = None
+ attrs['destination_port'] = None
+ res = self._create_firewall_rule(self.fmt, **attrs)
+ self.assertEqual(400, res.status_int)
+
def test_show_firewall_rule_with_fw_policy_not_associated(self):
attrs = self._get_test_firewall_rule_attrs()
with self.firewall_rule() as fw_rule:
for k, v in attrs.iteritems():
self.assertEqual(res['firewall_rule'][k], v)
+ def test_update_firewall_rule_with_port_and_no_proto(self):
+ with self.firewall_rule() as fwr:
+ data = {'firewall_rule': {'protocol': None,
+ 'destination_port': 80}}
+ req = self.new_update_request('firewall_rules', data,
+ fwr['firewall_rule']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(400, res.status_int)
+
+ def test_update_firewall_rule_without_ports_and_no_proto(self):
+ with self.firewall_rule() as fwr:
+ data = {'firewall_rule': {'protocol': None,
+ 'destination_port': None,
+ 'source_port': None}}
+ req = self.new_update_request('firewall_rules', data,
+ fwr['firewall_rule']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(200, res.status_int)
+
+ def test_update_firewall_rule_with_port(self):
+ with self.firewall_rule(source_port=None,
+ destination_port=None,
+ protocol=None) as fwr:
+ data = {'firewall_rule': {'destination_port': 80}}
+ req = self.new_update_request('firewall_rules', data,
+ fwr['firewall_rule']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(400, res.status_int)
+
+ def test_update_firewall_rule_with_port_and_protocol(self):
+ with self.firewall_rule(source_port=None,
+ destination_port=None,
+ protocol=None) as fwr:
+ data = {'firewall_rule': {'destination_port': 80,
+ 'protocol': 'tcp'}}
+ req = self.new_update_request('firewall_rules', data,
+ fwr['firewall_rule']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(200, res.status_int)
+
def test_update_firewall_rule_with_policy_associated(self):
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)