import contextlib
import logging
+import mock
import webob.exc
from neutron.api import extensions as api_ext
from neutron.db.firewall import firewall_db as fdb
import neutron.extensions
from neutron.extensions import firewall
+from neutron import manager
from neutron.openstack.common import importutils
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
+from neutron.services.firewall import fwaas_plugin
from neutron.tests.unit import test_db_plugin
DB_FW_PLUGIN_KLASS = (
"neutron.db.firewall.firewall_db.Firewall_db_mixin"
)
+FWAAS_PLUGIN = 'neutron.services.firewall.fwaas_plugin'
+DELETEFW_PATH = FWAAS_PLUGIN + '.FirewallAgentApi.delete_firewall'
extensions_path = ':'.join(neutron.extensions.__path__)
DESCRIPTION = 'default description'
SHARED = True
ADMIN_STATE_UP = True
+class FakeAgentApi(fwaas_plugin.FirewallCallbacks):
+ """
+ This class used to mock the AgentAPI delete method inherits from
+ FirewallCallbacks because it needs access to the firewall_deleted method.
+ The delete_firewall method belongs to the FirewallAgentApi, which has
+ no access to the firewall_deleted method normally because it's not
+ responsible for deleting the firewall from the DB. However, it needs
+ to in the unit tests since there is no agent to call back.
+ """
+ def __init__(self):
+ pass
+
+ def delete_firewall(self, context, firewall, **kwargs):
+ self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL']
+ self.firewall_deleted(context, firewall['id'], **kwargs)
+
+
class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
resource_prefix_map = dict(
(k, constants.COMMON_PREFIXES[constants.FIREWALL])
)
def setUp(self, core_plugin=None, fw_plugin=None, ext_mgr=None):
+ self.agentapi_delf_p = mock.patch(DELETEFW_PATH, create=True,
+ new=FakeAgentApi().delete_firewall)
+ self.agentapi_delf_p.start()
if not fw_plugin:
fw_plugin = DB_FW_PLUGIN_KLASS
service_plugins = {'fw_plugin_name': fw_plugin}
name = "firewall_policy1"
attrs = self._get_test_firewall_policy_attrs(name)
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
attrs['firewall_rules'] = fw_rule_ids
with self.firewall_policy(name=name, shared=SHARED,
firewall_rules=fw_rule_ids,
- audited=AUDITED,
- no_delete=True) as fwp:
+ audited=AUDITED) as fwp:
for k, v in attrs.iteritems():
self.assertEqual(fwp['firewall_policy'][k], v)
def test_update_firewall_policy_with_rules(self):
attrs = self._get_test_firewall_policy_attrs()
- with self.firewall_policy() as fwp:
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as fr:
+ with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
attrs['firewall_rules'] = fw_rule_ids
data = {'firewall_policy':
def test_update_firewall_policy_replace_rules(self):
attrs = self._get_test_firewall_policy_attrs()
- with self.firewall_policy() as fwp:
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True)) as fr1:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3'),
+ self.firewall_rule(name='fwr4')) as frs:
+ fr1 = frs[0:2]
+ fr2 = frs[2:4]
+ with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
req = self.new_update_request('firewall_policies', data,
fwp['firewall_policy']['id'])
req.get_response(self.ext_api)
- with contextlib.nested(self.firewall_rule(name='fwr3',
- no_delete=True),
- self.firewall_rule(name='fwr4',
- no_delete=True)) as fr2:
+
fw_rule_ids = [r['firewall_rule']['id'] for r in fr2]
attrs['firewall_rules'] = fw_rule_ids
- data = {'firewall_policy':
- {'firewall_rules': fw_rule_ids}}
- req = self.new_update_request('firewall_policies', data,
+ new_data = {'firewall_policy':
+ {'firewall_rules': fw_rule_ids}}
+ req = self.new_update_request('firewall_policies', new_data,
fwp['firewall_policy']['id'])
res = self.deserialize(self.fmt,
req.get_response(self.ext_api))
def test_update_firewall_policy_reorder_rules(self):
attrs = self._get_test_firewall_policy_attrs()
- with self.firewall_policy() as fwp:
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True),
- self.firewall_rule(name='fwr4',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3'),
+ self.firewall_rule(name='fwr4')) as fr:
+ with self.firewall_policy() as fwp:
fw_rule_ids = [fr[2]['firewall_rule']['id'],
fr[3]['firewall_rule']['id']]
data = {'firewall_policy':
def test_update_firewall_policy_with_non_existing_rule(self):
attrs = self._get_test_firewall_policy_attrs()
- with self.firewall_policy() as fwp:
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2')) as fr:
+ with self.firewall_policy() as fwp:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
# appending non-existent rule
fw_rule_ids.append(uuidutils.generate_uuid())
attrs = self._get_test_firewall_policy_attrs()
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall_rule(name='fwr1', no_delete=True) as fr:
+ with self.firewall_rule(name='fwr1') as fr:
fr_id = fr['firewall_rule']['id']
fw_rule_ids = [fr_id]
attrs['firewall_rules'] = fw_rule_ids
def test_delete_firewall_policy_with_firewall_association(self):
attrs = self._get_test_firewall_attrs()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
def test_show_firewall_rule_with_fw_policy_associated(self):
attrs = self._get_test_firewall_rule_attrs()
- with self.firewall_policy(no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['firewall_policy_id'] = fwp_id
- with self.firewall_rule(no_delete=True) as fw_rule:
+ with self.firewall_rule() as fw_rule:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['firewall_policy_id'] = fwp_id
data = {'firewall_policy':
{'firewall_rules':
[fw_rule['firewall_rule']['id']]}}
self.assertEqual(res['firewall_rule'][k], v)
def test_list_firewall_rules(self):
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as fr:
query_params = 'protocol=tcp'
self._test_list_resources('firewall_rule', fr,
query_params=query_params)
attrs['source_port'] = '10:20'
attrs['destination_port'] = '30:40'
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': '10:20',
'destination_port': '30:40'}}
attrs['source_port'] = '10000'
attrs['destination_port'] = '80'
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': 10000,
'destination_port': 80}}
attrs['source_port'] = '10000'
attrs['destination_port'] = '80'
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': '10000',
'destination_port': '80'}}
attrs['source_port'] = None
attrs['destination_port'] = None
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
data = {'firewall_rule': {'name': name,
'source_port': None,
'destination_port': None}}
def test_update_firewall_rule_with_policy_associated(self):
name = "new_firewall_rule1"
attrs = self._get_test_firewall_rule_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['firewall_policy_id'] = fwp_id
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['firewall_policy_id'] = fwp_id
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request('firewall_policies', data,
def test_delete_firewall_rule_with_policy_associated(self):
attrs = self._get_test_firewall_rule_attrs()
- with self.firewall_policy(no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['firewall_policy_id'] = fwp_id
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule() as fwr:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['firewall_policy_id'] = fwp_id
fwr_id = fwr['firewall_rule']['id']
data = {'firewall_policy': {'firewall_rules': [fwr_id]}}
req = self.new_update_request('firewall_policies', data,
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
name = "firewall1"
attrs = self._get_test_firewall_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(name=name,
self.assertEqual(res['firewall'][k], v)
def test_list_firewalls(self):
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with contextlib.nested(self.firewall(name='fw1',
firewall_policy_id=fwp_id,
def test_delete_firewall(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
no_delete=True) as fw:
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
- with self.firewall_policy() as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['id'] = fwp_id
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True)) as fr1:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as frs:
+ fr1 = frs[0:2]
+ fwr3 = frs[2]
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['id'] = fwp_id
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
insert_after=None,
expected_code=webob.exc.HTTPConflict.code,
expected_body=None)
- with self.firewall_rule(name='fwr3', no_delete=True) as fwr3:
- fwr3_id = fwr3['firewall_rule']['id']
- attrs['firewall_rules'].insert(0, fwr3_id)
- self._rule_action('insert', fwp_id, fwr3_id,
- insert_before=fw_rule_ids[0],
- insert_after=None,
- expected_code=webob.exc.HTTPOk.code,
- expected_body=attrs)
+ fwr3_id = fwr3['firewall_rule']['id']
+ attrs['firewall_rules'].insert(0, fwr3_id)
+ self._rule_action('insert', fwp_id, fwr3_id,
+ insert_before=fw_rule_ids[0],
+ insert_after=None,
+ expected_code=webob.exc.HTTPOk.code,
+ expected_body=attrs)
def test_insert_rule_in_policy_failures(self):
- with self.firewall_policy() as fwp:
- fwp_id = fwp['firewall_policy']['id']
- with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
+ with self.firewall_rule(name='fwr1') as fr1:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
fr1_id = fr1['firewall_rule']['id']
fw_rule_ids = [fr1_id]
data = {'firewall_policy':
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
- with self.firewall_policy() as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['id'] = fwp_id
- with contextlib.nested(self.firewall_rule(name='fwr0',
- no_delete=True),
- self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True),
- self.firewall_rule(name='fwr4',
- no_delete=True),
- self.firewall_rule(name='fwr5',
- no_delete=True),
- self.firewall_rule(name='fwr6',
- no_delete=True)) as fwr:
+ with contextlib.nested(self.firewall_rule(name='fwr0'),
+ self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3'),
+ self.firewall_rule(name='fwr4'),
+ self.firewall_rule(name='fwr5'),
+ self.firewall_rule(name='fwr6')) as fwr:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['id'] = fwp_id
# test insert when rule list is empty
fwr0_id = fwr[0]['firewall_rule']['id']
attrs['firewall_rules'].insert(0, fwr0_id)
attrs = self._get_test_firewall_policy_attrs()
attrs['audited'] = False
attrs['firewall_list'] = []
- with self.firewall_policy() as fwp:
- fwp_id = fwp['firewall_policy']['id']
- attrs['id'] = fwp_id
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True)) as fr1:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as fr1:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
+ attrs['id'] = fwp_id
fw_rule_ids = [r['firewall_rule']['id'] for r in fr1]
attrs['firewall_rules'] = fw_rule_ids[:]
data = {'firewall_policy':
expected_body=None)
def test_remove_rule_from_policy_failures(self):
- with self.firewall_policy() as fwp:
- fwp_id = fwp['firewall_policy']['id']
- with self.firewall_rule(name='fwr1', no_delete=True) as fr1:
+ with self.firewall_rule(name='fwr1') as fr1:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [fr1['firewall_rule']['id']]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
def test_set_firewall_status(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
def test_firewall_deleted(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
def test_firewall_deleted_error(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
- with self.firewall(firewall_policy_id=fwp_id,
- admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ with self.firewall(
+ firewall_policy_id=fwp_id,
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP,
+ ) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.firewall_deleted(ctx, fw_id,
host='dummy')
def test_get_firewall_for_tenant(self):
tenant_id = 'test-tenant'
ctx = context.Context('', tenant_id)
- with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- with contextlib.nested(self.firewall_rule(name='fwr1',
- tenant_id=tenant_id,
- no_delete=True),
- self.firewall_rule(name='fwr2',
- tenant_id=tenant_id,
- no_delete=True),
- self.firewall_rule(name='fwr3',
- tenant_id=tenant_id,
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1',
+ tenant_id=tenant_id),
+ self.firewall_rule(name='fwr2',
+ tenant_id=tenant_id),
+ self.firewall_rule(name='fwr3',
+ tenant_id=tenant_id)
+ ) as fr:
+ with self.firewall_policy(tenant_id=tenant_id) as fwp:
+ fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
with self.firewall(firewall_policy_id=fwp_id,
tenant_id=tenant_id,
admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
res = self.callbacks.get_firewalls_for_tenant(ctx,
host='dummy')
def test_get_firewall_for_tenant_without_rules(self):
tenant_id = 'test-tenant'
ctx = context.Context('', tenant_id)
- with self.firewall_policy(tenant_id=tenant_id, no_delete=True) as fwp:
+ with self.firewall_policy(tenant_id=tenant_id) as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id,
- admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ admin_state_up=test_db_firewall.ADMIN_STATE_UP
+ ) as fw:
fw_list = [fw['firewall']]
f = self.callbacks.get_firewalls_for_tenant_without_rules
res = f(ctx, host='dummy')
self.callbacks = self.plugin.callbacks
def test_create_second_firewall_not_permitted(self):
- with self.firewall(no_delete=True):
+ with self.firewall():
res = self._create_firewall(
None, 'firewall2', description='test',
firewall_policy_id=None, admin_state_up=True)
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
name = "new_firewall1"
attrs = self._get_test_firewall_attrs(name)
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
self.assertEqual(res.status_int, 409)
def test_update_firewall_rule_fails_when_firewall_pending(self):
- with self.firewall_policy(no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- with self.firewall_rule(name='fwr1', no_delete=True) as fr:
+ with self.firewall_rule(name='fwr1') as fr:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
fr_id = fr['firewall_rule']['id']
fw_rule_ids = [fr_id]
data = {'firewall_policy':
req.get_response(self.ext_api)
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- no_delete=True):
+ test_db_firewall.ADMIN_STATE_UP):
data = {'firewall_rule': {'protocol': 'udp'}}
req = self.new_update_request('firewall_rules',
data, fr_id)
def test_delete_firewall(self):
ctx = context.get_admin_context()
attrs = self._get_test_firewall_attrs()
-
- with self.firewall_policy(no_delete=True) as fwp:
+ # stop the AgentRPC patch for this one to test pending states
+ self.agentapi_delf_p.stop()
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
fw_db = self.plugin._get_firewall(ctx, fw_id)
for k, v in attrs.iteritems():
self.assertEqual(fw_db[k], v)
+ # cleanup the pending firewall
+ self.plugin.callbacks.firewall_deleted(ctx, fw_id)
def test_delete_firewall_after_agent_delete(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
no_delete=True) as fw:
fw_id = fw['firewall']['id']
- with ctx.session.begin(subtransactions=True):
- req = self.new_delete_request('firewalls', fw_id)
- res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, 204)
- self.plugin.callbacks.firewall_deleted(ctx, fw_id)
- self.assertRaises(firewall.FirewallNotFound,
- self.plugin.get_firewall,
- ctx, fw_id)
+ req = self.new_delete_request('firewalls', fw_id)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ self.assertRaises(firewall.FirewallNotFound,
+ self.plugin.get_firewall,
+ ctx, fw_id)
def test_make_firewall_dict_with_in_place_rules(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
- fwp_id = fwp['firewall_policy']['id']
- with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
- self.firewall_rule(name='fwr2',
- no_delete=True),
- self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ with contextlib.nested(self.firewall_rule(name='fwr1'),
+ self.firewall_rule(name='fwr2'),
+ self.firewall_rule(name='fwr3')) as fr:
+ with self.firewall_policy() as fwp:
+ fwp_id = fwp['firewall_policy']['id']
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
data = {'firewall_policy':
{'firewall_rules': fw_rule_ids}}
attrs['firewall_policy_id'] = fwp_id
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=
- test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ test_db_firewall.ADMIN_STATE_UP) as fw:
fw_id = fw['firewall']['id']
fw_rules = (
self.plugin._make_firewall_dict_with_rules(ctx,
def test_make_firewall_dict_with_in_place_rules_no_policy(self):
ctx = context.get_admin_context()
- with self.firewall(no_delete=True) as fw:
+ with self.firewall() as fw:
fw_id = fw['firewall']['id']
fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id)
self.assertEqual(fw_rules['firewall_rule_list'], [])
def test_list_firewalls(self):
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name='fw1', firewall_policy_id=fwp_id,
description='fw') as fwalls: