LOG.debug(_("set_firewall_status() called"))
with context.session.begin(subtransactions=True):
fw_db = self.plugin._get_firewall(context, firewall_id)
+ # ignore changing status if firewall expects to be deleted
+ # That case means that while some pending operation has been
+ # performed on the backend, neutron server received delete request
+ # and changed firewall status to const.PENDING_DELETE
+ if fw_db.status == const.PENDING_DELETE:
+ LOG.debug(_("Firewall %(fw_id)s in PENDING_DELETE state, "
+ "not changing to %(status)s"),
+ {'fw_id': firewall_id, 'status': status})
+ return False
#TODO(xuhanp): Remove INACTIVE status and use DOWN to
# be consistent with other network resources
if status in (const.ACTIVE, const.INACTIVE, const.DOWN):
LOG.debug(_("firewall_deleted() called"))
with context.session.begin(subtransactions=True):
fw_db = self.plugin._get_firewall(context, firewall_id)
- if fw_db.status == const.PENDING_DELETE:
+ # allow to delete firewalls in ERROR state
+ if fw_db.status in (const.PENDING_DELETE, const.ERROR):
self.plugin.delete_db_firewall_object(context, firewall_id)
return True
else:
self.assertEqual(fw_db['status'], const.ERROR)
self.assertFalse(res)
+ def test_set_firewall_status_pending_delete(self):
+ ctx = context.get_admin_context()
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ with self.firewall(firewall_policy_id=fwp_id,
+ admin_state_up=
+ test_db_firewall.ADMIN_STATE_UP) as fw:
+ fw_id = fw['firewall']['id']
+ fw_db = self.plugin._get_firewall(ctx, fw_id)
+ fw_db['status'] = const.PENDING_DELETE
+ ctx.session.flush()
+ res = self.callbacks.set_firewall_status(ctx, fw_id,
+ const.ACTIVE,
+ host='dummy')
+ fw_db = self.plugin.get_firewall(ctx, fw_id)
+ self.assertEqual(fw_db['status'], const.PENDING_DELETE)
+ self.assertFalse(res)
+
def test_firewall_deleted(self):
ctx = context.get_admin_context()
with self.firewall_policy() as fwp: