with self.subnet(network=net) as sub:
with self.port(
subnet=sub,
- do_delete=False,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF
) as port:
# router ports should be immediately active
@contextlib.contextmanager
def metering_label(self, name='label', description='desc',
- fmt=None, do_delete=True, **kwargs):
+ fmt=None, **kwargs):
if not fmt:
fmt = self.fmt
metering_label = self._make_metering_label(fmt, name,
description, **kwargs)
yield metering_label
- if do_delete:
- self._delete('metering-labels',
- metering_label['metering_label']['id'])
@contextlib.contextmanager
def metering_label_rule(self, metering_label_id=None, direction='ingress',
remote_ip_prefix='10.0.0.0/24',
- excluded='false', fmt=None, do_delete=True):
+ excluded='false', fmt=None):
if not fmt:
fmt = self.fmt
metering_label_rule = self._make_metering_label_rule(fmt,
remote_ip_prefix,
excluded)
yield metering_label_rule
- if do_delete:
- self._delete('metering-label-rules',
- metering_label_rule['metering_label_rule']['id'])
class MeteringPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
name = 'my label'
description = 'my metering label'
- with self.metering_label(name, description,
- do_delete=False) as metering_label:
+ with self.metering_label(name, description) as metering_label:
metering_label_id = metering_label['metering_label']['id']
self._delete('metering-labels', metering_label_id, 204)
with self.metering_label_rule(metering_label_id,
direction,
remote_ip_prefix,
- excluded,
- do_delete=False) as label_rule:
+ excluded) as label_rule:
rule_id = label_rule['metering_label_rule']['id']
self._delete('metering-label-rules', rule_id, 204)
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
with contextlib.nested(
- self.port(do_delete=False),
+ self.port(),
mock.patch.object(l3plugin, 'disassociate_floatingips'),
mock.patch.object(l3plugin, 'notify_routers_updated')
) as (port, disassociate_floatingips, notify):
mock.patch.object(manager.NeutronManager,
'get_service_plugins',
return_value=self.service_plugins),
- self.port(do_delete=False,
- device_owner=device_owner),
+ self.port(device_owner=device_owner),
mock.patch.object(self.l3plugin, 'notify_routers_updated'),
mock.patch.object(self.l3plugin, 'disassociate_floatingips',
return_value=fip_set),
mock.patch.object(manager.NeutronManager,
'get_service_plugins',
return_value=self.service_plugins),
- self.port(do_delete=False,
- device_owner='compute:None'),
+ self.port(device_owner='compute:None'),
mock.patch.object(self.l3plugin, 'dvr_deletens_if_no_port',
return_value=[ns_to_delete]),
mock.patch.object(self.l3plugin, 'remove_router_from_l3_agent',
return self.deserialize(fmt, res)
@contextlib.contextmanager
- def packet_filter_on_network(self, network=None, fmt=None, do_delete=True,
- **kwargs):
+ def packet_filter_on_network(self, network=None, fmt=None, **kwargs):
with test_plugin.optional_ctx(network, self.network) as network_to_use:
net_id = network_to_use['network']['id']
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
yield pf
- if do_delete:
- self._delete('packet_filters', pf['packet_filter']['id'])
if not network:
self._delete('networks', network_to_use['network']['id'])
@contextlib.contextmanager
- def packet_filter_on_port(self, port=None, fmt=None, do_delete=True,
- set_portinfo=True, **kwargs):
+ def packet_filter_on_port(self, port=None, fmt=None, set_portinfo=True,
+ **kwargs):
with test_plugin.optional_ctx(port, self.port) as port_to_use:
net_id = port_to_use['port']['network_id']
port_id = port_to_use['port']['id']
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
self.assertEqual(port_id, pf['packet_filter']['in_port'])
yield pf
- if do_delete:
- self._delete('packet_filters', pf['packet_filter']['id'])
class TestNecPluginPacketFilter(TestNecPluginPacketFilterBase):
with self.packet_filter_on_port() as pf:
pf_id = pf['packet_filter']['id']
self.assertEqual(pf['packet_filter']['status'], 'ACTIVE')
+ self._delete('packet_filters', pf_id)
ctx = mock.ANY
pf_dict = mock.ANY
self.assertEqual(self.ofc.delete_ofc_packet_filter.call_count, 2)
def test_auto_delete_pf_in_network_deletion(self):
- with self.packet_filter_on_network(admin_state_up=False,
- do_delete=False) as pf:
+ with self.packet_filter_on_network(admin_state_up=False) as pf:
pf_id = pf['packet_filter']['id']
self._show('packet_filters', pf_id,
network = self._show('networks', port['port']['network_id'])
with self.packet_filter_on_network(network=network) as pfn:
- with self.packet_filter_on_port(port=port, do_delete=False,
+ with self.packet_filter_on_port(port=port,
set_portinfo=False) as pf:
pf_id = pf['packet_filter']['id']
in_port_id = pf['packet_filter']['in_port']
@contextlib.contextmanager
def netpartition(self, name='netpartition1',
- do_delete=True,
fmt=None,
**kwargs):
netpart = self._make_netpartition(fmt or self.fmt, name)
yield netpart
- if do_delete:
- self._del_netpartition(netpart['net_partition']['id'])
def test_create_netpartition(self):
name = 'netpart1'
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
- set_context=True):
+ set_context=True) as label:
self.mock_add.assert_called_with(self.ctx, expected)
+ self._delete('metering-labels',
+ label['metering_label']['id'])
self.mock_remove.assert_called_with(self.ctx, expected)
def test_remove_one_metering_label_rpc_call(self):
set_context=True):
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=self.tenant_id,
- set_context=True):
+ set_context=True) as label:
self.mock_add.assert_called_with(self.ctx, expected_add)
+ self._delete('metering-labels',
+ label['metering_label']['id'])
self.mock_remove.assert_called_with(self.ctx, expected_remove)
def test_update_metering_label_rules_rpc_call(self):
l = label['metering_label']
with self.metering_label_rule(l['id']):
self.mock_uuid.return_value = second_uuid
- with self.metering_label_rule(l['id'], direction='egress'):
+ with self.metering_label_rule(l['id'],
+ direction='egress') as rule:
self.mock_update.assert_called_with(self.ctx,
expected_add)
+ self._delete('metering-label-rules',
+ rule['metering_label_rule']['id'])
self.mock_update.assert_called_with(self.ctx,
expected_del)
def test_delete_metering_label_does_not_clear_router_tenant_id(self):
tenant_id = '654f6b9d-0f36-4ae5-bd1b-01616794ca60'
- with self.metering_label(tenant_id=tenant_id,
- do_delete=False) as metering_label:
+ with self.metering_label(tenant_id=tenant_id) as metering_label:
with self.router(tenant_id=tenant_id, set_context=True) as r:
router = self._show('routers', r['router']['id'])
self.assertEqual(tenant_id, router['router']['tenant_id'])
@contextlib.contextmanager
def security_group(self, name='webservers', description='webservers',
- fmt=None, do_delete=True):
+ fmt=None):
if not fmt:
fmt = self.fmt
security_group = self._make_security_group(fmt, name, description)
yield security_group
- if do_delete:
- self._delete('security-groups',
- security_group['security_group']['id'])
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
direction='ingress', protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22',
remote_ip_prefix=None, remote_group_id=None,
- fmt=None, do_delete=True, ethertype=const.IPv4):
+ fmt=None, ethertype=const.IPv4):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
yield security_group_rule
- if do_delete:
- self._delete('security-group-rules',
- security_group_rule['security_group_rule']['id'])
def _delete_default_security_group_egress_rules(self, security_group_id):
"""Deletes default egress rules given a security group ID."""
def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'
- with self.security_group(name, description, do_delete=False) as sg:
+ with self.security_group(name, description) as sg:
remote_group_id = sg['security_group']['id']
self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
- with self.security_group(name, description) as sg2:
+ with self.security_group(name, description):
security_group_id = sg['security_group']['id']
- direction = "ingress"
- remote_group_id = sg2['security_group']['id']
- protocol = const.PROTO_NAME_TCP
- port_range_min = 88
- port_range_max = 88
- with self.security_group_rule(security_group_id, direction,
- protocol, port_range_min,
- port_range_max,
- remote_group_id=remote_group_id
- ):
- pass
+
+ rule = self._build_security_group_rule(
+ security_group_id,
+ direction='ingress',
+ proto=const.PROTO_NAME_TCP)
+ security_group_rule = self._make_security_group_rule(self.fmt,
+ rule)
+ self._delete('security-group-rules',
+ security_group_rule['security_group_rule']['id'])
+
self.notifier.assert_has_calls(
[mock.call.security_groups_rule_updated(mock.ANY,
[security_group_id]),
@contextlib.contextmanager
def qos_queue(self, name='foo', min='0', max='10',
- qos_marking=None, dscp='0', default=None, do_delete=True):
+ qos_marking=None, dscp='0', default=None):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': name,
yield qos_queue
- if do_delete:
- self._delete('qos-queues',
- qos_queue['qos_queue']['id'])
-
def test_create_qos_queue(self):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
qos_marking='untrusted', default=False) as q:
self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
def test_create_shared_queue_networks(self):
- with self.qos_queue(default=True, do_delete=False) as q1:
+ with self.qos_queue(default=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
self._delete('ports', port2['port']['id'])
def test_remove_queue_in_use_fail(self):
- with self.qos_queue(do_delete=False) as q1:
+ with self.qos_queue() as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
new_q['qos_queue']['id'])
def test_update_port_adding_device_id(self):
- with self.qos_queue(do_delete=False) as q1:
+ with self.qos_queue() as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
def test_router_add_interface_port_removes_security_group(self):
with self.router() as r:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
body = self._router_interface_action('add',
r['router']['id'],
None,