fwp = firewall_policy['firewall_policy']
with context.session.begin(subtransactions=True):
fwp_db = self._get_firewall_policy(context, id)
+ # check tenant ids are same for fw and fwp or not
+ if not fwp.get('shared', True) and fwp_db.firewalls:
+ for fw in fwp_db['firewalls']:
+ if fwp_db['tenant_id'] != fw['tenant_id']:
+ raise firewall.FirewallPolicyInUse(
+ firewall_policy_id=id)
# check any existing rules are not shared
if 'shared' in fwp and 'firewall_rules' not in fwp:
self._check_unshared_rules_for_policy(fwp_db, fwp)
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+ def test_update_firewall_policy_assoc_with_other_tenant_firewall(self):
+ with self.firewall_policy(shared=True, tenant_id='tenant1') as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ with self.firewall(firewall_policy_id=fwp_id):
+ data = {'firewall_policy': {'shared': False}}
+ req = self.new_update_request('firewall_policies', data,
+ fwp['firewall_policy']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
+
def test_delete_firewall_policy(self):
ctx = context.get_admin_context()
with self.firewall_policy(do_delete=False) as fwp: