with self.subnet(network=net, do_delete=False) as sub:
with self.port(
subnet=sub,
- no_delete=True,
+ do_delete=False,
device_owner=constants.DEVICE_OWNER_ROUTER_INTF
) as port:
# router ports should be immediately active
with self.router() as r:
with self.subnet() as s:
with self.subnet(cidr='10.0.10.0/24') as s1:
- with self.port(subnet=s1, no_delete=True) as p:
+ with self.port(subnet=s1, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_router_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_subnet_returns_400(self):
with self.router() as r:
with self.subnet(cidr='10.0.10.0/24') as s:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet(cidr='10.0.10.0/24'):
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def firewall_policy(self, fmt=None, name='firewall_policy1',
description=DESCRIPTION, shared=True,
firewall_rules=None, audited=True,
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if firewall_rules is None:
firewall_rules = []
if not fmt:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_policy = self.deserialize(fmt or self.fmt, res)
yield firewall_policy
- if not no_delete:
+ if do_delete:
self._delete('firewall_policies',
firewall_policy['firewall_policy']['id'])
source_port=SOURCE_PORT,
destination_port=DESTINATION_PORT,
action=ACTION, enabled=ENABLED,
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_firewall_rule(fmt, name, shared, protocol,
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_rule = self.deserialize(fmt or self.fmt, res)
yield firewall_rule
- if not no_delete:
+ if do_delete:
self._delete('firewall_rules',
firewall_rule['firewall_rule']['id'])
@contextlib.contextmanager
def firewall(self, fmt=None, name='firewall_1', description=DESCRIPTION,
firewall_policy_id=None, admin_state_up=True,
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_firewall(fmt, name, description, firewall_policy_id,
raise webob.exc.HTTPClientError(code=res.status_int)
firewall = self.deserialize(fmt or self.fmt, res)
yield firewall
- if not no_delete:
+ if do_delete:
self._delete('firewalls', firewall['firewall']['id'])
def _rule_action(self, action, id, firewall_rule_id, insert_before=None,
def test_delete_firewall_policy(self):
ctx = context.get_admin_context()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy(do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
req = self.new_delete_request('firewall_policies', fwp_id)
res = req.get_response(self.ext_api)
def test_delete_firewall_policy_with_rule(self):
ctx = context.get_admin_context()
attrs = self._get_test_firewall_policy_attrs()
- with self.firewall_policy(no_delete=True) as fwp:
+ with self.firewall_policy(do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall_rule(name='fwr1') as fr:
fr_id = fr['firewall_rule']['id']
def test_delete_firewall_rule(self):
ctx = context.get_admin_context()
- with self.firewall_rule(no_delete=True) as fwr:
+ with self.firewall_rule(do_delete=False) as fwr:
fwr_id = fwr['firewall_rule']['id']
req = self.new_delete_request('firewall_rules', fwr_id)
res = req.get_response(self.ext_api)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
- no_delete=True) as fw:
+ do_delete=False) as fw:
fw_id = fw['firewall']['id']
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
protocol='HTTP', protocol_port=80, admin_state_up=True,
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
)
vip = self.deserialize(fmt or self.fmt, res)
yield vip
- if not no_delete:
+ if do_delete:
self._delete('vips', vip['vip']['id'])
@contextlib.contextmanager
def pool(self, fmt=None, name='pool1', lb_method='ROUND_ROBIN',
- protocol='HTTP', admin_state_up=True, no_delete=False,
+ protocol='HTTP', admin_state_up=True, do_delete=True,
**kwargs):
if not fmt:
fmt = self.fmt
)
pool = self.deserialize(fmt or self.fmt, res)
yield pool
- if not no_delete:
+ if do_delete:
self._delete('pools', pool['pool']['id'])
@contextlib.contextmanager
def member(self, fmt=None, address='192.168.1.100', protocol_port=80,
- admin_state_up=True, no_delete=False, **kwargs):
+ admin_state_up=True, do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_member(fmt,
)
member = self.deserialize(fmt or self.fmt, res)
yield member
- if not no_delete:
+ if do_delete:
self._delete('members', member['member']['id'])
@contextlib.contextmanager
def health_monitor(self, fmt=None, type='TCP',
delay=30, timeout=10, max_retries=3,
admin_state_up=True,
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_health_monitor(fmt,
for arg in http_related_attributes:
self.assertIsNone(the_health_monitor.get(arg))
yield health_monitor
- if not no_delete:
+ if do_delete:
self._delete('health_monitors', the_health_monitor['id'])
def test_delete_vip(self):
with self.pool():
- with self.vip(no_delete=True) as vip:
+ with self.vip(do_delete=False) as vip:
req = self.new_delete_request('vips',
vip['vip']['id'])
res = req.get_response(self.ext_api)
self._delete('members', member1['member']['id'])
def test_delete_pool(self):
- with self.pool(no_delete=True) as pool:
- with self.member(no_delete=True,
+ with self.pool(do_delete=False) as pool:
+ with self.member(do_delete=False,
pool_id=pool['pool']['id']):
req = self.new_delete_request('pools',
pool['pool']['id'])
self.assertEqual(res.status_int, webob.exc.HTTPNoContent.code)
def test_delete_pool_preserve_state(self):
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
with self.vip(pool=pool):
req = self.new_delete_request('pools',
pool['pool']['id'])
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id,
- no_delete=True) as member:
+ do_delete=False) as member:
req = self.new_delete_request('members',
member['member']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res['health_monitor'][k], v)
def test_delete_healthmonitor(self):
- with self.health_monitor(no_delete=True) as monitor:
+ with self.health_monitor(do_delete=False) as monitor:
ctx = context.get_admin_context()
qry = ctx.session.query(ldb.HealthMonitor)
qry = qry.filter_by(id=monitor['health_monitor']['id'])
@contextlib.contextmanager
def metering_label(self, name='label', description='desc',
- fmt=None, no_delete=False, **kwargs):
+ fmt=None, do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
metering_label = self._make_metering_label(fmt, name,
description, **kwargs)
yield metering_label
- if not no_delete:
+ if do_delete:
self._delete('metering-labels',
metering_label['metering_label']['id'])
@contextlib.contextmanager
def metering_label_rule(self, metering_label_id=None, direction='ingress',
remote_ip_prefix='10.0.0.0/24',
- excluded='false', fmt=None, no_delete=False):
+ excluded='false', fmt=None, do_delete=True):
if not fmt:
fmt = self.fmt
metering_label_rule = self._make_metering_label_rule(fmt,
remote_ip_prefix,
excluded)
yield metering_label_rule
- if not no_delete:
+ if do_delete:
self._delete('metering-label-rules',
metering_label_rule['metering_label_rule']['id'])
description = 'my metering label'
with self.metering_label(name, description,
- no_delete=True) as metering_label:
+ do_delete=False) as metering_label:
metering_label_id = metering_label['metering_label']['id']
self._delete('metering-labels', metering_label_id, 204)
direction,
remote_ip_prefix,
excluded,
- no_delete=True) as label_rule:
+ do_delete=False) as label_rule:
rule_id = label_rule['metering_label_rule']['id']
self._delete('metering-label-rules', rule_id, 204)
lifetime_value=3600,
ike_version='v1',
pfs='group5',
- no_delete=False,
+ do_delete=True,
**kwargs):
if not fmt:
fmt = self.fmt
raise webob.exc.HTTPClientError(code=res.status_int)
ikepolicy = self.deserialize(fmt or self.fmt, res)
yield ikepolicy
- if not no_delete:
+ if do_delete:
self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
def _create_ipsecpolicy(self, fmt,
lifetime_units='seconds',
lifetime_value=3600,
pfs='group5',
- no_delete=False, **kwargs):
+ do_delete=True, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_ipsecpolicy(fmt,
raise webob.exc.HTTPClientError(code=res.status_int)
ipsecpolicy = self.deserialize(fmt or self.fmt, res)
yield ipsecpolicy
- if not no_delete:
+ if do_delete:
self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
def _create_vpnservice(self, fmt, name,
subnet=None,
router=None,
admin_state_up=True,
- no_delete=False,
+ do_delete=True,
plug_subnet=True,
external_subnet_cidr='192.168.100.0/24',
external_router=True,
if res.status_int < 400:
yield vpnservice
- if not no_delete and vpnservice.get('vpnservice'):
+ if do_delete and vpnservice.get('vpnservice'):
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
if plug_subnet:
vpnservice=None,
ikepolicy=None,
ipsecpolicy=None,
- admin_state_up=True, no_delete=False,
+ admin_state_up=True, do_delete=True,
**kwargs):
if not fmt:
fmt = self.fmt
)
yield ipsec_site_connection
- if not no_delete:
+ if do_delete:
self._delete(
'ipsec-site-connections',
ipsec_site_connection[
def test_delete_ikepolicy(self):
"""Test case to delete an ikepolicy."""
- with self.ikepolicy(no_delete=True) as ikepolicy:
+ with self.ikepolicy(do_delete=False) as ikepolicy:
req = self.new_delete_request('ikepolicies',
ikepolicy['ikepolicy']['id'])
res = req.get_response(self.ext_api)
def test_delete_ipsecpolicy(self):
"""Test case to delete an ipsecpolicy."""
- with self.ipsecpolicy(no_delete=True) as ipsecpolicy:
+ with self.ipsecpolicy(do_delete=False) as ipsecpolicy:
req = self.new_delete_request('ipsecpolicies',
ipsecpolicy['ipsecpolicy']['id'])
res = req.get_response(self.ext_api)
def test_delete_vpnservice(self):
"""Test case to delete a vpnservice."""
with self.vpnservice(name='vpnserver',
- no_delete=True) as vpnservice:
+ do_delete=False) as vpnservice:
req = self.new_delete_request('vpnservices',
vpnservice['vpnservice']['id'])
res = req.get_response(self.ext_api)
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
with self.ipsec_site_connection(
- no_delete=True) as ipsec_site_connection:
+ do_delete=False) as ipsec_site_connection:
req = self.new_delete_request(
'ipsec-site-connections',
ipsec_site_connection['ipsec_site_connection']['id']
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
with contextlib.nested(
- self.port(no_delete=True),
+ self.port(do_delete=False),
mock.patch.object(l3plugin, 'disassociate_floatingips'),
mock.patch.object(l3plugin, 'notify_routers_updated')
) as (port, disassociate_floatingips, notify):
def _test_delete_port_for_disappeared_ofc_port(self, raised_exc):
self.ofc.set_raise_exc('delete_ofc_port', raised_exc)
- with self.port(no_delete=True) as port:
+ with self.port(do_delete=False) as port:
port_id = port['port']['id']
portinfo = {'id': port_id, 'port_no': 123}
expected_code=webob.exc.HTTPNotFound.code)
def test_auto_delete_pf_in_port_deletion(self):
- with self.port(no_delete=True) as port:
+ with self.port(do_delete=False) as port:
network = self._show('networks', port['port']['network_id'])
with self.packet_filter_on_network(network=network) as pfn:
def test_router_update_with_dup_destination_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
cidr='10.0.1.0/24',
do_delete=False) as subnet1:
pass
- with self.port(subnet=subnet1, no_delete=True) as port2:
+ with self.port(subnet=subnet1, do_delete=False) as port2:
pass
dhcp_agents = self._list_dhcp_agents_hosting_network(
port2['port']['network_id'])
do_delete=False) as subnet1:
if owner:
with self.port(subnet=subnet1,
- no_delete=True,
+ do_delete=False,
device_owner=owner) as port:
return [net1, subnet1, port]
else:
with self.port(subnet=subnet1,
- no_delete=True) as port:
+ do_delete=False) as port:
return [net1, subnet1, port]
def _notification_mocks(self, hosts, net, subnet, port):
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ do_delete=False) as fw:
fw_id = fw['firewall']['id']
with ctx.session.begin(subtransactions=True):
fw_db = self.plugin._get_firewall(ctx, fw_id)
with self.firewall_policy() as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(firewall_policy_id=fwp_id,
- no_delete=True) as fw:
+ do_delete=False) as fw:
fw_id = fw['firewall']['id']
req = self.new_delete_request('firewalls', fw_id)
res = req.get_response(self.ext_api)
self.skip("App cookie persistence not supported.")
def test_pool_port(self):
- with self.port(no_delete=True) as port:
+ with self.port(do_delete=False) as port:
with self.pool() as pool:
h_db.add_pool_port(context.get_admin_context(),
pool['pool']['id'], port['port']['id'])
"""Test the rest call failure handling by Exception raising."""
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
- with self.pool(no_delete=True,
+ with self.pool(do_delete=False,
provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
def test_update_vip(self):
with self.subnet() as subnet:
with self.pool(provider='radware',
- no_delete=True,
+ do_delete=False,
subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
- with self.pool(no_delete=True,
+ with self.pool(do_delete=False,
provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
with contextlib.nested(
self.member(pool_id=pool['pool']['id'],
- no_delete=True),
+ do_delete=False),
self.member(pool_id=pool['pool']['id'],
address='192.168.1.101',
- no_delete=True),
- self.health_monitor(no_delete=True),
- self.vip(pool=pool, subnet=subnet, no_delete=True)
+ do_delete=False),
+ self.health_monitor(do_delete=False),
+ self.vip(pool=pool, subnet=subnet, do_delete=False)
) as (mem1, mem2, hm, vip):
plugin.create_pool_health_monitor(
def test_delete_vip(self):
with self.subnet() as subnet:
with self.pool(provider='radware',
- no_delete=True,
+ do_delete=False,
subnet_id=subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
with self.subnet(cidr='10.0.0.0/24') as subnet:
with self.subnet(cidr='10.0.1.0/24') as pool_subnet:
with self.pool(provider='radware',
- no_delete=True,
+ do_delete=False,
subnet_id=pool_subnet['subnet']['id']) as pool:
vip_data = {
'name': 'vip1',
def test_delete_pool_with_vip(self):
with self.subnet() as subnet:
with self.pool(provider='radware',
- no_delete=True,
+ do_delete=False,
subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
self.assertRaises(loadbalancer.PoolInUse,
with self.pool(provider='radware',
subnet_id=subnet['subnet']['id']) as p:
with self.member(pool_id=p['pool']['id'],
- no_delete=True) as m:
+ do_delete=False) as m:
with self.vip(pool=p, subnet=subnet):
# Reset mock and
def test_delete_member_without_vip(self):
with self.subnet():
with self.pool(provider='radware') as p:
- with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
+ with self.member(pool_id=p['pool']['id'],
+ do_delete=False) as m:
self.plugin_instance.delete_member(
context.get_admin_context(), m['member']['id']
)
def test_delete_pool_hm_with_vip(self):
with self.subnet() as subnet:
- with self.health_monitor(no_delete=True) as hm:
+ with self.health_monitor(do_delete=False) as hm:
with self.pool(provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
def test_delete_vip(self):
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
- with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
+ with self.vip(pool=pool, subnet=subnet,
+ do_delete=False) as vip:
ctx = context.get_admin_context()
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
vip['vip']['status'] = 'PENDING_DELETE'
mock.ANY, old_pool, updated, 'host')
def test_delete_pool(self):
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
req = self.new_delete_request('pools',
pool['pool']['id'])
res = req.get_response(self.ext_api)
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id,
- no_delete=True) as member:
+ do_delete=False) as member:
req = self.new_delete_request('members',
member['member']['id'])
res = req.get_response(self.ext_api)
def test_pool_unscheduling_on_pool_deletion(self):
self._register_agent_states(lbaas_agents=True)
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
lbaas_agent = self._get_lbaas_agent_hosting_pool(
pool['pool']['id'])
self.assertIsNotNone(lbaas_agent)
def test_delete_metering_label_does_not_clear_router_tenant_id(self):
tenant_id = '654f6b9d-0f36-4ae5-bd1b-01616794ca60'
with self.metering_label(tenant_id=tenant_id,
- no_delete=True) as metering_label:
+ do_delete=False) as metering_label:
with self.router(tenant_id=tenant_id, set_context=True) as r:
router = self._show('routers', r['router']['id'])
self.assertEqual(tenant_id, router['router']['tenant_id'])
admin_state_up, **kwargs)
yield network
if do_delete:
- # The do_delete parameter allows you to control whether the
- # created network is immediately deleted again. Therefore, this
- # function is also usable in tests, which require the creation
- # of many networks.
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
- def port(self, subnet=None, fmt=None, no_delete=False,
+ def port(self, subnet=None, fmt=None, do_delete=True,
**kwargs):
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
yield port
- if not no_delete:
+ if do_delete:
self._delete('ports', port['port']['id'])
def _test_list_with_sort(self, resource,
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
- with self.port(no_delete=True) as port:
+ with self.port(do_delete=False) as port:
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
- self.port(subnet=subnet, device_id='owner1', no_delete=True),
- self.port(subnet=subnet, device_id='owner1', no_delete=True),
+ self.port(subnet=subnet, device_id='owner1', do_delete=False),
+ self.port(subnet=subnet, device_id='owner1', do_delete=False),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
network_id = subnet['subnet']['network_id']
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
- self.port(subnet=subnet, device_id='owner1', no_delete=True),
+ self.port(subnet=subnet, device_id='owner1', do_delete=False),
self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._routes_update_prepare(r['router']['id'],
None, p['port']['id'], routes)
body = self._update('routers', r['router']['id'],
'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
self.router(),
self.subnet(cidr='10.0.0.0/24')) as (r1, r2, s):
with contextlib.nested(
- self.port(subnet=s, no_delete=True),
- self.port(subnet=s, no_delete=True)) as (p1, p2):
+ self.port(subnet=s, do_delete=False),
+ self.port(subnet=s, do_delete=False)) as (p1, p2):
body = self._routes_update_prepare(r1['router']['id'],
None, p1['port']['id'],
routes1)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_orig)
def _test_malformed_route(self, routes):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_nexthop_is_port_ip(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_too_many_routes(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_dup_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_invalid_ip_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_invalid_nexthop_ip(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_nexthop_is_outside_port_subnet(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, no_delete=True) as p:
+ with self.port(subnet=s, do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
@contextlib.contextmanager
def security_group(self, name='webservers', description='webservers',
- fmt=None, no_delete=False):
+ fmt=None, do_delete=True):
if not fmt:
fmt = self.fmt
security_group = self._make_security_group(fmt, name, description)
yield security_group
- if not no_delete:
+ if do_delete:
self._delete('security-groups',
security_group['security_group']['id'])
direction='ingress', protocol=const.PROTO_NAME_TCP,
port_range_min='22', port_range_max='22',
remote_ip_prefix=None, remote_group_id=None,
- fmt=None, no_delete=False, ethertype=const.IPv4):
+ fmt=None, do_delete=True, ethertype=const.IPv4):
if not fmt:
fmt = self.fmt
rule = self._build_security_group_rule(security_group_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
yield security_group_rule
- if not no_delete:
+ if do_delete:
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
def test_delete_security_group(self):
name = 'webservers'
description = 'my webservers'
- with self.security_group(name, description, no_delete=True) as sg:
+ with self.security_group(name, description, do_delete=False) as sg:
remote_group_id = sg['security_group']['id']
self._delete('security-groups', remote_group_id,
webob.exc.HTTPNoContent.code)
def test_router_add_interface_port(self):
with self.router() as r:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
'roles': []}
tdict.return_value = admin_context
with self.router() as r:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
def test_router_add_interface_dup_subnet2_returns_400(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(subnet=s, no_delete=True) as p1:
+ with self.port(subnet=s, do_delete=False) as p1:
with self.port(subnet=s) as p2:
self._router_interface_action('add',
r['router']['id'],
def test_router_remove_interface_wrong_subnet_returns_400(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_returns_200(self):
with self.router() as r:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet():
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_l3_agent_routers_query_interfaces(self):
with self.router() as r:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
with self.router() as r:
with self.subnet(cidr='9.0.1.0/24') as subnet:
with self.port(subnet=subnet,
- no_delete=True,
+ do_delete=False,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
self._router_interface_action('add',
r['router']['id'],
self._test_notify_op_agent(self._test_router_gateway_op_agent)
def _test_interfaces_op_agent(self, r, notifyApi):
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
self._router_interface_action('add',
r['router']['id'],
None,
@contextlib.contextmanager
def qos_queue(self, name='foo', min='0', max='10',
- qos_marking=None, dscp='0', default=None, no_delete=False):
+ qos_marking=None, dscp='0', default=None, do_delete=True):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': name,
yield qos_queue
- if not no_delete:
+ if do_delete:
self._delete('qos-queues',
qos_queue['qos_queue']['id'])
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
- with self.port(device_id=device_id, do_delete=False) as p:
+ with self.port(device_id=device_id) as p:
self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
def test_create_shared_queue_networks(self):
- with self.qos_queue(default=True, no_delete=True) as q1:
+ with self.qos_queue(default=True, do_delete=False) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
self._delete('ports', port2['port']['id'])
def test_remove_queue_in_use_fail(self):
- with self.qos_queue(no_delete=True) as q1:
+ with self.qos_queue(do_delete=False) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
new_q['qos_queue']['id'])
def test_update_port_adding_device_id(self):
- with self.qos_queue(no_delete=True) as q1:
+ with self.qos_queue(do_delete=False) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
def test_router_add_interface_port_removes_security_group(self):
with self.router() as r:
- with self.port(no_delete=True) as p:
+ with self.port(do_delete=False) as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
ctx = context.get_admin_context()
name = 'firewall'
with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr2',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ do_delete=False)) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
with self.firewall_policy(firewall_rules=fw_rule_ids,
- no_delete=True) as fwp:
+ do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name=name,
firewall_policy_id=fwp_id) as firewall:
ctx = context.get_admin_context()
name = 'new_firewall'
with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr2',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ do_delete=False)) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
with self.firewall_policy(firewall_rules=fw_rule_ids,
- no_delete=True) as fwp:
+ do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name=name,
firewall_policy_id=fwp_id) as firewall:
ctx = context.get_admin_context()
name = 'firewall'
with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr2',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr3',
- no_delete=True)) as fr:
+ do_delete=False)) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
with self.firewall_policy(firewall_rules=fw_rule_ids,
- no_delete=True) as fwp:
+ do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name=name,
firewall_policy_id=fwp_id) as firewall:
ctx = context.get_admin_context()
name = 'new_firewall'
with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True)) as fr:
+ do_delete=False)) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
with self.firewall_policy(firewall_rules=fw_rule_ids,
- no_delete=True) as fwp:
+ do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name=name,
firewall_policy_id=fwp_id) as firewall:
ctx = context.get_admin_context()
name = 'new_firewall'
with contextlib.nested(self.firewall_rule(name='fwr1',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr2',
- no_delete=True)) as fr:
+ do_delete=False)) as fr:
fw_rule_ids = [r['firewall_rule']['id'] for r in fr]
with self.firewall_policy(firewall_rules=fw_rule_ids,
- no_delete=True) as fwp:
+ do_delete=False) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name=name,
firewall_policy_id=fwp_id) as firewall:
ctx, fw_create['id'])
self.driver.update_firewall(ctx, VSE_ID, fw_create)
with contextlib.nested(self.firewall_rule(name='fwr0',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr1',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr2',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr3',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr4',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(name='fwr5',
- no_delete=True),
+ do_delete=False),
self.firewall_rule(
name='fwr6',
- no_delete=True)) as fwr:
+ do_delete=False)) as fwr:
# test insert when rule list is empty
fwr0_id = fwr[0]['firewall_rule']['id']
self._rule_action('insert', fwp_id, fwr0_id,
firewall_policy_id=fwp_id,
router_id=self._create_and_get_router(),
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
- no_delete=True) as fw:
+ do_delete=False) as fw:
fw_id = fw['firewall']['id']
with ctx.session.begin(subtransactions=True):
req = self.new_delete_request('firewalls', fw_id)
)
with self.vip(
router_id=self._create_and_get_router(),
- pool=pool, subnet=subnet, no_delete=True) as vip:
+ pool=pool, subnet=subnet, do_delete=False) as vip:
req = self.new_delete_request('vips', vip['vip']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
router_id=self._create_and_get_router(),
pool=pool, subnet=subnet):
with self.member(pool_id=pool_id,
- no_delete=True) as member:
+ do_delete=False) as member:
req = self.new_delete_request('members',
member['member']['id'])
res = req.get_response(self.ext_api)
def test_create_and_get_vip(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
self.pool_id = pool['pool']['id']
POOL_MAP_INFO['pool_id'] = pool['pool']['id']
vcns_db.add_vcns_edge_pool_binding(ctx.session, POOL_MAP_INFO)
def test_create_two_vips_with_same_name(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
self.pool_id = pool['pool']['id']
POOL_MAP_INFO['pool_id'] = pool['pool']['id']
vcns_db.add_vcns_edge_pool_binding(ctx.session, POOL_MAP_INFO)
def test_update_vip(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
self.pool_id = pool['pool']['id']
POOL_MAP_INFO['pool_id'] = pool['pool']['id']
vcns_db.add_vcns_edge_pool_binding(ctx.session, POOL_MAP_INFO)
def test_delete_vip(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as pool:
+ with self.pool(do_delete=False) as pool:
self.pool_id = pool['pool']['id']
POOL_MAP_INFO['pool_id'] = pool['pool']['id']
vcns_db.add_vcns_edge_pool_binding(ctx.session, POOL_MAP_INFO)
#Test Pool Operation
def test_create_and_get_pool(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as p:
+ with self.pool(do_delete=False) as p:
self.pool_id = p['pool']['id']
pool_create = p['pool']
self.driver.create_pool(ctx, VSE_ID, pool_create, [])
def test_create_two_pools_with_same_name(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as p:
+ with self.pool(do_delete=False) as p:
self.pool_id = p['pool']['id']
pool_create = p['pool']
self.driver.create_pool(ctx, VSE_ID, pool_create, [])
def test_update_pool(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as p:
+ with self.pool(do_delete=False) as p:
self.pool_id = p['pool']['id']
pool_create = p['pool']
self.driver.create_pool(ctx, VSE_ID, pool_create, [])
def test_delete_pool(self):
ctx = context.get_admin_context()
- with self.pool(no_delete=True) as p:
+ with self.pool(do_delete=False) as p:
self.pool_id = p['pool']['id']
pool_create = p['pool']
self.driver.create_pool(ctx, VSE_ID, pool_create, [])
def test_create_and_get_monitor(self):
ctx = context.get_admin_context()
- with self.health_monitor(no_delete=True) as m:
+ with self.health_monitor(do_delete=False) as m:
monitor_create = m['health_monitor']
self.driver.create_health_monitor(ctx, VSE_ID, monitor_create)
monitor_get = self.driver.get_health_monitor(
def test_update_health_monitor(self):
ctx = context.get_admin_context()
- with self.health_monitor(no_delete=True) as m:
+ with self.health_monitor(do_delete=False) as m:
monitor_create = m['health_monitor']
self.driver.create_health_monitor(
ctx, VSE_ID, monitor_create)
def test_delete_health_monitor(self):
ctx = context.get_admin_context()
- with self.health_monitor(no_delete=True) as m:
+ with self.health_monitor(do_delete=False) as m:
monitor_create = m['health_monitor']
self.driver.create_health_monitor(ctx, VSE_ID, monitor_create)
self.driver.delete_health_monitor(
with self.vpnservice(name='vpnservice',
subnet=subnet,
router=router,
- no_delete=True) as vpnservice:
+ do_delete=False) as vpnservice:
req = self.new_delete_request(
'vpnservices', vpnservice['vpnservice']['id'])
res = req.get_response(self.ext_api)
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
with self.ipsec_site_connection(
- no_delete=True) as ipsec_site_connection:
+ do_delete=False) as ipsec_site_connection:
req = self.new_delete_request(
'ipsec-site-connections',
ipsec_site_connection['ipsec_site_connection']['id']