def test_router_port_status_active(self):
# router ports screw up port auto-deletion so it has to be
# disabled for this test
- with self.network(do_delete=False) as net:
- with self.subnet(network=net, do_delete=False) as sub:
+ with self.network() as net:
+ with self.subnet(network=net) as sub:
with self.port(
subnet=sub,
do_delete=False,
with self.port(**kwargs) as port:
self.assertEqual(port['port']['binding:vif_type'],
portbindings.VIF_TYPE_IVS)
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
kwargs = {'name': 'name2', 'binding:host_id': 'someotherhost',
'device_id': 'other_dev'}
with self.port(**kwargs) as port:
with self.router() as r:
with self.subnet() as s:
with self.subnet(cidr='10.0.10.0/24') as s1:
- with self.port(subnet=s1, do_delete=False) as p:
+ with self.port(subnet=s1) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_router_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_subnet_returns_400(self):
with self.router() as r:
with self.subnet(cidr='10.0.10.0/24') as s:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet(cidr='10.0.10.0/24'):
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
rv.getresponse.return_value.getheader.return_value = 'HASHHEADER'
rv.getresponse.return_value.status = 200
rv.getresponse.return_value.read.return_value = ''
- with self.network():
+ with self.network() as network:
callheaders = rv.request.mock_calls[0][1][3]
self.assertIn('X-BSN-BVS-HASH-MATCH', callheaders)
# first call will be empty to indicate no previous state hash
self.assertEqual(callheaders['X-BSN-BVS-HASH-MATCH'], '')
# change the header that will be received on delete call
rv.getresponse.return_value.getheader.return_value = 'HASH2'
-
+ self._delete('networks', network['network']['id'])
# net delete should have used header received on create
callheaders = rv.request.mock_calls[1][1][3]
self.assertEqual(callheaders['X-BSN-BVS-HASH-MATCH'], 'HASHHEADER')
rv.getresponse.return_value.getheader.return_value = 'HASHHEADER'
rv.getresponse.return_value.status = 200
rv.getresponse.return_value.read.return_value = ''
- with self.network():
+ with self.network() as net:
# change the header that will be received on delete call
rv.getresponse.return_value.getheader.return_value = 'EVIL'
rv.getresponse.return_value.status = 'GARBAGE'
+ self._delete('networks', net['network']['id'])
# create again should not use header from delete call
with self.network():
self.skipTest("Unsupported test")
super(TestN1kvSubnets, self).setUp()
+ def test_port_prevents_network_deletion(self):
+ self.skipTest("plugin does not return standard conflict code")
+
def test_create_subnet_with_invalid_parameters(self):
"""Test subnet creation with invalid parameters sent to the VSM"""
with self.network() as network:
yield res
if do_delete:
self._delete('ports', port['port']['id'])
+ self._delete('subnets', subnet['subnet']['id'])
+ self._delete('networks', network['network']['id'])
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
'device_id': device_id,
'device_owner': DEVICE_OWNER}
with self.port(subnet=subnet, fmt=self.fmt,
- arg_list=arg_list, **port_dict):
+ arg_list=arg_list, **port_dict) as port:
self.assertTrue(self._is_vlan_configured(
vlan_creation_expected=vlan_creation_expected,
add_keyword_expected=False))
self.mock_ncclient.reset_mock()
yield
+ self._delete('ports', port['port']['id'])
# Create network and subnet
with self.network(name=NETWORK_NAME) as network:
if res.status_int >= 400:
raise webob.exc.HTTPClientError(
code=res.status_int, detail=vpnservice)
+ self._delete('subnets', public_sub['subnet']['id'])
+ if not subnet:
+ self._delete('subnets', tmp_subnet['subnet']['id'])
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
dpd)
except webob.exc.HTTPClientError as ce:
self.assertEqual(ce.code, expected_status_int)
+ self._delete('subnets', subnet['subnet']['id'])
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
self.assertEqual(v, sorted(actual[k]))
else:
self.assertEqual(v, actual[k])
+ self._delete('networks', subnet['subnet']['network_id'])
def test_show_ipsec_site_connection(self):
"""Test case to show a ipsec_site_connection."""
yield req.get_response(self.api)
if expected_failure:
self._create_deviceowner_mock()
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', network['network']['id'])
def _assertExpectedHTTP(self, status, exc):
"""Confirm that an HTTP status corresponds to an expected exception.
add_keyword_expected=False))
self.mock_ncclient.reset_mock()
yield
+ self._delete('ports', port['port']['id'])
# Create network and subnet
with self.network(name=NETWORK_NAME) as network:
self.callbacks.update_device_up(self.adminContext,
agent_id=HOST,
device=device1)
-
+ self._delete('ports', port2['port']['id'])
p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
expected = {'args':
{'fdb_entries':
self.callbacks.update_device_up(self.adminContext,
agent_id=HOST,
device=device)
-
+ self._delete('ports', port['port']['id'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
expected = {'args':
{'fdb_entries':
**profile_arg) as port:
self._check_default_port_binding_profole(
port, expected_vif_type=vif_type)
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
def test_create_port_binding_profile_with_empty_dict(self):
profile_arg = {portbindings.PROFILE: {}}
self.rpcapi_update_ports(removed=[port_id])
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNone(self._get_portinfo(port_id))
+ self._delete('ports', port['port']['id'])
# The port and portinfo is expected to delete when exiting with-clause.
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(ndb.get_portinfo(self.context.session,
port_id).port_no, 456)
+ self._delete('ports', port['port']['id'])
if not portinfo_change_first:
# The port is expected to delete when exiting with-clause.
with self.network() as network:
net = network['network']
self.assertEqual(network['network']['status'], 'ACTIVE')
+ self._delete('networks', network['network']['id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
with self.network(admin_state_up=False) as network:
net = network['network']
self.assertEqual(network['network']['status'], 'DOWN')
+ self._delete('networks', network['network']['id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
with self.network() as net2:
nets.append(net2['network'])
self.assertEqual(net2['network']['status'], 'ACTIVE')
-
+ self._delete('networks', net2['network']['id'])
+ self._delete('networks', net1['network']['id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
# tearDown(). When OFCManager has failed to create a network on OFC,
# it does not keeps ofc_network entry and will fail to delete this
# network from OFC. Deletion of network is not the scope of this test.
- with self.network(do_delete=False) as network:
+ with self.network() as network:
net = network['network']
self.assertEqual(net['status'], 'ERROR')
net_ref = self._show('networks', net['id'])
net_ref = self._show('networks', net['id'])
self.assertEqual(net_ref['network']['status'], 'ACTIVE')
+ self._delete('networks', network['network']['id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
p1_ref = self._show('ports', p1['id'])
self.assertEqual(p1_ref['port']['status'], 'DOWN')
-
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
mock.call.create_ofc_tenant(ctx, self._tenant_id),
p1_ref = self._show('ports', p1['id'])
self.assertEqual(p1_ref['port']['status'], 'ACTIVE')
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
self.rpcapi_update_ports(added=[portinfo])
# In a case of dhcp port, the port is deleted automatically
# when delete_network.
+ self._delete('networks', network['network']['id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
net_id = net['network']['id']
net_ref = self._show('networks', net_id)
self.assertEqual(net_ref['network']['status'], 'ERROR')
-
+ self._delete('networks', net['network']['id'])
ctx = mock.ANY
tenant_id = self._tenant_id
net_name = mock.ANY
self.assertEqual(net_ref['network']['status'], 'ERROR')
self.ofc.set_raise_exc('delete_ofc_network', None)
+ self._delete('networks', net['network']['id'])
ctx = mock.ANY
tenant = mock.ANY
self.ofc.set_raise_exc('delete_ofc_port',
nexc.OFCException(reason='hoge'))
- with self.network(do_delete=False) as net:
+ with self.network() as net:
net_id = net['network']['id']
device_owner = db_base_plugin_v2.AUTO_DELETE_PORT_OWNERS[0]
{'admin_state_up': False})
self.assertEqual(res['status'], 'DOWN')
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
expected = [
mock.call.exists_ofc_tenant(ctx, self._tenant_id),
self.assertEqual(res['port']['status'], 'ACTIVE')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ACTIVE')
-
+ self._delete('ports', port['port']['id'])
ctx = mock.ANY
port = mock.ANY
expected = [
self.assertEqual(res['port']['status'], 'DOWN')
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'DOWN')
+ self._delete('ports', port['port']['id'])
ctx = mock.ANY
port = mock.ANY
self.rpcapi_update_ports(added=[portinfo])
port_ref = self._show('ports', port_id)
self.assertEqual(port_ref['port']['status'], 'ERROR')
+ self._delete('ports', port['port']['id'])
ctx = mock.ANY
port = mock.ANY
self.assertEqual(port_ref['port']['status'], 'ERROR')
self.ofc.set_raise_exc('delete_ofc_port', None)
+ self._delete('ports', port['port']['id'])
ctx = mock.ANY
port = mock.ANY
def _test_delete_port_for_disappeared_ofc_port(self, raised_exc):
self.ofc.set_raise_exc('delete_ofc_port', raised_exc)
- with self.port(do_delete=False) as port:
+ with self.port() as port:
port_id = port['port']['id']
portinfo = {'id': port_id, 'port_no': 123}
yield pf
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
+ if not network:
+ self._delete('networks', network_to_use['network']['id'])
@contextlib.contextmanager
def packet_filter_on_port(self, port=None, fmt=None, do_delete=True,
expected_code=webob.exc.HTTPNotFound.code)
def test_auto_delete_pf_in_port_deletion(self):
- with self.port(do_delete=False) as port:
+ with self.port() as port:
network = self._show('networks', port['port']['network_id'])
with self.packet_filter_on_network(network=network) as pfn:
self.assertEqual(
fip2['floatingip']['port_id'],
body['floatingip']['port_id'])
+ self._delete('ports', p['port']['id'])
# Test that port has been successfully deleted.
body = self._show('ports', p['port']['id'],
def test_router_update_with_dup_destination_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
with self.port() as port1:
dhcp_agents = self._list_dhcp_agents_hosting_network(
port1['port']['network_id'])
+ self._delete('ports', port1['port']['id'])
+ self._delete('networks', port1['port']['network_id'])
self.assertEqual(1, len(dhcp_agents['agents']))
agents = self._list_agents()
self._disable_agent(agents['agents'][0]['id'])
with self.port() as port2:
dhcp_agents = self._list_dhcp_agents_hosting_network(
port2['port']['network_id'])
+ self._delete('ports', port2['port']['id'])
self.assertEqual(0, len(dhcp_agents['agents']))
def test_network_scheduler_with_down_agent(self):
with self.port() as port:
dhcp_agents = self._list_dhcp_agents_hosting_network(
port['port']['network_id'])
+ self._delete('ports', port['port']['id'])
+ self._delete('networks', port['port']['network_id'])
self.assertEqual(1, len(dhcp_agents['agents']))
with mock.patch(is_agent_down_str) as mock_is_agent_down:
mock_is_agent_down.return_value = True
with self.port() as port:
dhcp_agents = self._list_dhcp_agents_hosting_network(
port['port']['network_id'])
+ self._delete('ports', port['port']['id'])
self.assertEqual(0, len(dhcp_agents['agents']))
def test_network_scheduler_with_hosted_network(self):
mock_hosting_agents.return_value = plugin.get_agents_db(
self.adminContext)
- with self.network('test', do_delete=False) as net1:
+ with self.network('test') as net1:
pass
with self.subnet(network=net1,
- cidr='10.0.1.0/24',
- do_delete=False) as subnet1:
+ cidr='10.0.1.0/24') as subnet1:
pass
- with self.port(subnet=subnet1, do_delete=False) as port2:
+ with self.port(subnet=subnet1) as port2:
pass
dhcp_agents = self._list_dhcp_agents_hosting_network(
port2['port']['network_id'])
self._assert_notify(notifications, expected_event_type)
def test_network_remove_from_dhcp_agent_notification(self):
- with self.network(do_delete=False) as net1:
+ with self.network() as net1:
network_id = net1['network']['id']
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
'configurations': {'dhcp_driver': 'dhcp_driver',
'use_namespaces': True, },
'agent_type': constants.AGENT_TYPE_DHCP})
- with self.network(do_delete=False) as net1:
+ with self.network() as net1:
with self.subnet(network=net1,
- gateway_ip=gateway,
- do_delete=False) as subnet1:
+ gateway_ip=gateway) as subnet1:
if owner:
with self.port(subnet=subnet1,
- do_delete=False,
device_owner=owner) as port:
return [net1, subnet1, port]
else:
- with self.port(subnet=subnet1,
- do_delete=False) as port:
+ with self.port(subnet=subnet1) as port:
return [net1, subnet1, port]
def _notification_mocks(self, hosts, net, subnet, port):
self.skip("App cookie persistence not supported.")
def test_pool_port(self):
- with self.port(do_delete=False) as port:
+ with self.port() as port:
with self.pool() as pool:
h_db.add_pool_port(context.get_admin_context(),
pool['pool']['id'], port['port']['id'])
def test_create_vip_failure(self):
"""Test the rest call failure handling by Exception raising."""
- with self.network(do_delete=False) as network:
- with self.subnet(network=network, do_delete=False) as subnet:
+ with self.network() as network:
+ with self.subnet(network=network) as subnet:
with self.pool(do_delete=False,
provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
def test_delete_vip_failure(self):
plugin = self.plugin_instance
- with self.network(do_delete=False) as network:
- with self.subnet(network=network, do_delete=False) as subnet:
+ with self.network() as network:
+ with self.subnet(network=network) as subnet:
with self.pool(do_delete=False,
provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
def network(self, name='net1',
admin_state_up=True,
fmt=None,
- do_delete=True,
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
yield network
- if do_delete:
- self._delete('networks', network['network']['id'])
@contextlib.contextmanager
def subnet(self, network=None,
dns_nameservers=None,
host_routes=None,
shared=None,
- do_delete=True,
ipv6_ra_mode=None,
ipv6_address_mode=None):
with optional_ctx(network, self.network) as network_to_use:
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
yield subnet
- if do_delete:
- self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
- def port(self, subnet=None, fmt=None, do_delete=True,
- **kwargs):
+ def port(self, subnet=None, fmt=None, **kwargs):
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
yield port
- if do_delete:
- self._delete('ports', port['port']['id'])
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):
self.assertEqual('myname', port['port']['name'])
def test_create_port_as_admin(self):
- with self.network(do_delete=False) as network:
+ with self.network() as network:
self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
- with self.port(do_delete=False) as port:
+ with self.port() as port:
self._delete('ports', port['port']['id'])
self._show('ports', port['port']['id'],
expected_code=webob.exc.HTTPNotFound.code)
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
- self.port(subnet=subnet, device_id='owner1', do_delete=False),
- self.port(subnet=subnet, device_id='owner1', do_delete=False),
+ self.port(subnet=subnet, device_id='owner1'),
+ self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
network_id = subnet['subnet']['network_id']
ctx = context.get_admin_context()
with self.subnet() as subnet:
with contextlib.nested(
- self.port(subnet=subnet, device_id='owner1', do_delete=False),
+ self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner1'),
self.port(subnet=subnet, device_id='owner2'),
) as (p1, p2, p3):
sorted(expected[k]))
else:
self.assertEqual(subnet['subnet'][k], expected[k])
- return subnet
+ self._delete('subnets', subnet['subnet']['id'])
+ return subnet
def test_create_subnet(self):
gateway_ip = '10.0.0.1'
res = self._create_subnet_bulk(self.fmt, 2,
net['network']['id'],
'test')
+ self._delete('networks', net['network']['id'])
# We expect a 500 as we injected a fault in the plugin
self._validate_behavior_on_bulk_failure(
res, 'subnets', webob.exc.HTTPServerError.code
with self.network() as network:
with contextlib.nested(
self.subnet(network=network),
- self.subnet(network=network, cidr='10.0.1.0/24',
- do_delete=False)) as (subnet1, subnet2):
+ self.subnet(network=network, cidr='10.0.1.0/24'),
+ ) as (subnet1, subnet2):
subnet1_id = subnet1['subnet']['id']
subnet2_id = subnet2['subnet']['id']
with self.port(
set_context=True)
def test_create_subnet_as_admin(self):
- with self.network(do_delete=False) as network:
+ with self.network() as network:
self._create_subnet(self.fmt,
network['network']['id'],
'10.0.2.0/24',
'max_subnet_host_routes',
n_exc.HostRoutesExhausted)
+ def test_port_prevents_network_deletion(self):
+ with self.port() as p:
+ self._delete('networks', p['port']['network_id'],
+ expected_code=webob.exc.HTTPConflict.code)
+
+ def test_port_prevents_subnet_deletion(self):
+ with self.port() as p:
+ self._delete('subnets', p['port']['fixed_ips'][0]['subnet_id'],
+ expected_code=webob.exc.HTTPConflict.code)
+
class DbModelTestCase(base.BaseTestCase):
"""DB model tests."""
'get_service_plugins') as srv_plugins:
l3_mock = mock.Mock()
srv_plugins.return_value = {'L3_ROUTER_NAT': l3_mock}
- with self.network(do_delete=False) as net:
+ with self.network() as net:
req = self.new_delete_request('networks', net['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._routes_update_prepare(r['router']['id'],
None, p['port']['id'], routes)
body = self._update('routers', r['router']['id'],
'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
self.router(),
self.subnet(cidr='10.0.0.0/24')) as (r1, r2, s):
with contextlib.nested(
- self.port(subnet=s, do_delete=False),
- self.port(subnet=s, do_delete=False)) as (p1, p2):
+ self.port(subnet=s),
+ self.port(subnet=s)) as (p1, p2):
body = self._routes_update_prepare(r1['router']['id'],
None, p1['port']['id'],
routes1)
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_orig)
def _test_malformed_route(self, routes):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_nexthop_is_port_ip(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_too_many_routes(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_dup_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_invalid_ip_address(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_invalid_nexthop_ip(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_update_with_nexthop_is_outside_port_subnet(self):
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
- with self.port(subnet=s, do_delete=False) as p:
+ with self.port(subnet=s) as p:
self._router_interface_action('add',
r['router']['id'],
None,
self.assertEqual(res.status_int, 403)
def test_update_port_security_off_shared_network(self):
- with self.network(shared=True, do_delete=False) as net:
- with self.subnet(network=net, do_delete=False):
+ with self.network(shared=True) as net:
+ with self.subnet(network=net):
res = self._create_port('json', net['network']['id'],
tenant_id='not_network_owner',
set_context=True)
def test_router_add_interface_port(self):
with self.router() as r:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
'roles': []}
tdict.return_value = admin_context
with self.router() as r:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
tdict.return_value = tenant_context
err_code = exc.HTTPNotFound.code
self._router_interface_action('add',
def test_router_add_interface_dup_subnet2_returns_400(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(subnet=s, do_delete=False) as p1:
+ with self.port(subnet=s) as p1:
with self.port(subnet=s) as p2:
self._router_interface_action('add',
r['router']['id'],
def test_router_remove_interface_wrong_subnet_returns_400(self):
with self.router() as r:
with self.subnet() as s:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_returns_200(self):
with self.router() as r:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
body = self._router_interface_action('add',
r['router']['id'],
None,
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
with self.subnet():
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
self.assertEqual(
body['floatingip']['port_id'],
fip2['floatingip']['port_id'])
-
+ self._delete('ports', p['port']['id'])
# Test that port has been successfully deleted.
body = self._show('ports', p['port']['id'],
expected_code=exc.HTTPNotFound.code)
# note: once this port goes out of scope, the port will be
# deleted, which is what we want to test. We want to confirm
# that the fields are set back to None
+ self._delete('ports', p['port']['id'])
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(body['floatingip']['id'],
fip['floatingip']['id'])
with self.network() as net:
net_id = net['network']['id']
self._set_net_external(net_id)
- with self.subnet(network=net, do_delete=False):
+ with self.subnet(network=net):
self._make_floatingip(self.fmt, net_id)
def test_l3_agent_routers_query_interfaces(self):
with self.router() as r:
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
with self.router() as r:
with self.subnet(cidr='9.0.1.0/24') as subnet:
with self.port(subnet=subnet,
- do_delete=False,
fixed_ips=[{'ip_address': '9.0.1.3'}]) as p:
self._router_interface_action('add',
r['router']['id'],
self._test_notify_op_agent(self._test_router_gateway_op_agent)
def _test_interfaces_op_agent(self, r, notifyApi):
- with self.port(do_delete=False) as p:
+ with self.port() as p:
self._router_interface_action('add',
r['router']['id'],
None,
subnets = self._list('subnets')['subnets']
with self.subnet() as s:
with self.port(subnet=s, device_id='1234',
- device_owner=constants.DEVICE_OWNER_DHCP):
+ device_owner=constants.DEVICE_OWNER_DHCP) as port:
subnets = self._list('subnets')['subnets']
self.assertEqual(len(subnets), 1)
self.assertEqual(subnets[0]['host_routes'][0]['nexthop'],
'10.0.0.2')
self.assertEqual(subnets[0]['host_routes'][0]['destination'],
'169.254.169.254/32')
-
+ self._delete('ports', port['port']['id'])
subnets = self._list('subnets')['subnets']
# Test that route is deleted after dhcp port is removed
self.assertEqual(len(subnets[0]['host_routes']), 0)