def router(self, name='router1', admin_state_up=True,
fmt=None, tenant_id=_uuid(),
external_gateway_info=None, set_context=False,
- no_delete=False,
**kwargs):
router = self._make_router(fmt or self.fmt, tenant_id, name,
admin_state_up, external_gateway_info,
set_context, **kwargs)
yield router
- if not no_delete:
- self._delete('routers', router['router']['id'])
def _set_net_external(self, net_id):
self._update('networks', net_id,
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
- self._router_interface_action(
- 'remove', r['router']['id'],
- private_sub['subnet']['id'], None)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc_with_public_sub(
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
- self._router_interface_action('remove', r['router']['id'],
- private_sub['subnet']['id'],
- None)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc(self, private_sub, fmt=None, set_context=False):
self.assertEqual(
s['subnet']['network_id'],
router['router']['external_gateway_info']['network_id'])
- self._delete('routers', router['router']['id'])
def test_router_create_with_gwinfo_ext_ip(self):
with self.subnet() as s:
external_gateway_info=ext_info
)
router = self.deserialize(self.fmt, res)
- self._delete('routers', router['router']['id'])
self.assertEqual(
[{'ip_address': '10.0.0.99', 'subnet_id': s['subnet']['id']}],
router['router']['external_gateway_info'][
ext_ips = router['router']['external_gateway_info'][
'external_fixed_ips']
- self._delete('routers', router['router']['id'])
self.assertEqual(
[{'subnet_id': s['subnet']['id'],
'ip_address': mock.ANY}], ext_ips)
ipv6_ra_mode=uc['ra_mode'],
ipv6_address_mode=uc['address_mode']) as s:
self._test_router_add_interface_subnet(r, s, uc['msg'])
- self._delete('subnets', s['subnet']['id'])
def test_router_add_iface_ipv6_ext_ra_subnet_returns_400(self):
"""Test router-interface-add for in-valid ipv6 subnets.
None,
expected_code=exp_code,
msg=uc['msg'])
- self._delete('subnets', s['subnet']['id'])
def test_router_add_interface_ipv6_subnet_without_gateway_ip(self):
with self.router() as r:
s['subnet']['id'],
None,
err_code)
- tdict.return_value = admin_context
- body = self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
def test_router_add_interface_subnet_with_port_from_other_tenant(self):
tenant_id = _uuid()
None,
tenant_id=tenant_id)
self.assertIn('port_id', body)
- self._router_interface_action(
- 'remove',
- r['router']['id'],
- s1['subnet']['id'],
- None,
- tenant_id=tenant_id)
- body = self._router_interface_action(
- 'remove',
- r['router']['id'],
- s2['subnet']['id'],
- None)
def test_router_add_interface_port(self):
with self.router() as r:
body = self._show('ports', p['port']['id'])
self.assertEqual(body['port']['device_id'], r['router']['id'])
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
-
def test_router_add_interface_empty_port_and_subnet_ids(self):
with self.router() as r:
self._router_interface_action('add', r['router']['id'],
p['port']['id'],
err_code)
- tdict.return_value = admin_context
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
-
def test_router_add_interface_dup_subnet1_returns_400(self):
with self.router() as r:
with self.subnet() as s:
None,
expected_code=exc.
HTTPBadRequest.code)
- self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
def test_router_add_interface_dup_subnet2_returns_400(self):
with self.router() as r:
p2['port']['id'],
expected_code=exc.
HTTPBadRequest.code)
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p1['port']['id'])
def test_router_add_interface_overlapped_cidr_returns_400(self):
with self.router() as r:
try_overlapped_cidr('10.0.1.0/24')
# another subnet with overlapped cidr including s1
try_overlapped_cidr('10.0.0.0/16')
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- s1['subnet']['id'],
- None)
def test_router_add_interface_no_data_returns_400(self):
with self.router() as r:
r['router']['id'],
s['subnet']['network_id'],
expected_code=exc.HTTPBadRequest.code)
- self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
def test_router_add_gateway_dup_subnet2_returns_400(self):
with self.router() as r:
None,
expected_code=exc.
HTTPBadRequest.code)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s['subnet']['network_id'])
- def test_router_add_gateway(self):
+ def test_router_add_and_remove_gateway(self):
with self.router() as r:
with self.subnet() as s:
self._set_net_external(s['subnet']['network_id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
- def test_router_add_gateway_tenant_ctx(self):
+ def test_router_add_and_remove_gateway_tenant_ctx(self):
with self.router(tenant_id='noadmin',
set_context=True) as r:
with self.subnet() as s:
self._update('ports', port['port']['id'], data,
neutron_context=neutron_context,
expected_code=exc.HTTPConflict.code)
- self._delete('ports', port['port']['id'])
def test_update_port_device_id_to_different_tenants_router(self):
with self.router() as admin_router:
self._update('ports', port['port_id'], data,
neutron_context=neutron_context,
expected_code=exc.HTTPConflict.code)
- self._router_interface_action(
- 'remove', tenant_router['router']['id'],
- s['subnet']['id'], None, tenant_id='tenant_a')
def test_router_add_gateway_invalid_network_returns_400(self):
with self.router() as r:
self._delete('routers', r['router']['id'],
expected_code=exc.HTTPConflict.code)
- # remove interface so test can exit without errors
- self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
-
def test_router_remove_interface_callback_failure_returns_409(self):
with contextlib.nested(
self.router(),
s['subnet']['id'],
None,
exc.HTTPConflict.code)
- # remove properly to clean-up
- self._router_interface_action(
- 'remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
def test_router_clear_gateway_callback_failure_returns_409(self):
with contextlib.nested(
s['subnet']['id'],
p['port']['id'],
exc.HTTPBadRequest.code)
- #remove properly to clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
def test_router_remove_interface_nothing_returns_400(self):
with self.router() as r:
None,
p2['port']['id'],
exc.HTTPNotFound.code)
- # remove correct interface to cleanup
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
- # remove extra port created
- self._delete('ports', p2['port']['id'])
def test_router_delete(self):
with self.router() as router:
None)
self._delete('routers', router['router']['id'],
exc.HTTPConflict.code)
- self._router_interface_action('remove',
- router['router']['id'],
- subnet['subnet']['id'],
- None)
- self._delete('routers', router['router']['id'])
def test_router_delete_with_floatingip_existed_returns_409(self):
with self.port() as p:
self.fmt, public_sub['subnet']['network_id'],
port_id=p['port']['id'])
self.assertEqual(res.status_int, exc.HTTPCreated.code)
- floatingip = self.deserialize(self.fmt, res)
self._delete('routers', r['router']['id'],
expected_code=exc.HTTPConflict.code)
- # Cleanup
- self._delete('floatingips', floatingip['floatingip']['id'])
- self._router_interface_action('remove', r['router']['id'],
- private_sub['subnet']['id'],
- None)
- self._delete('routers', r['router']['id'])
def test_router_show(self):
name = 'router1'
self._update('networks', s1['subnet']['network_id'],
{'network': {external_net.EXTERNAL: False}},
expected_code=exc.HTTPConflict.code)
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s1['subnet']['network_id'])
def test_network_update_external(self):
with self.router() as r:
s1['subnet']['network_id'])
self._update('networks', testnet['network']['id'],
{'network': {external_net.EXTERNAL: False}})
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s1['subnet']['network_id'])
def test_floatingip_crd_ops(self):
with self.floatingip_with_assoc() as fip:
if (p['device_owner'] ==
l3_constants.DEVICE_OWNER_FLOATINGIP):
self.fail('garbage port is not deleted')
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
- self._router_interface_action('remove',
- r['router']['id'],
- private_sub['subnet']['id'],
- None)
def test_floatingip_with_assoc_fails(self):
self._test_floatingip_with_assoc_fails(
body = self._show('ports', p['port']['id'],
expected_code=exc.HTTPNotFound.code)
- for fip in [fip1, fip2]:
- self._delete('floatingips',
- fip['floatingip']['id'])
-
- self._router_interface_action(
- 'remove', r['router']['id'],
- private_sub['subnet']['id'], None)
-
- self._remove_external_gateway_from_router(
- r['router']['id'],
- public_sub['subnet']['network_id'])
-
def test_floatingip_update_different_fixed_ip_same_port(self):
with self.subnet() as s:
ip_range = list(netaddr.IPNetwork(s['subnet']['cidr']))
port_id=p['port']['id'])
self.assertEqual(res.status_int,
exc.HTTPBadRequest.code)
- self._router_interface_action('remove',
- r['router']['id'],
- private_sub
- ['subnet']['id'],
- None)
- self._delete('routers', r['router']['id'])
def test_floatingip_with_invalid_create_port(self):
self._test_floatingip_with_invalid_create_port(
public_network['network']['id'],
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
- # cleanup
- self._router_interface_action('remove',
- r['router']['id'],
- private_sub['subnet']['id'],
- None)
def test_create_floatingip_invalid_floating_network_id_returns_400(self):
# API-level test - no need to create all objects for l3 plugin
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
- try:
- self._test_list_with_sort('floatingip', (fp3, fp2, fp1),
- [('floating_ip_address', 'desc')])
- finally:
- self._delete('floatingips', fp1['floatingip']['id'])
- self._delete('floatingips', fp2['floatingip']['id'])
- self._delete('floatingips', fp3['floatingip']['id'])
+ self._test_list_with_sort('floatingip', (fp3, fp2, fp1),
+ [('floating_ip_address', 'desc')])
def test_floatingip_list_with_port_id(self):
with self.floatingip_with_assoc() as fip:
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
- try:
- self._test_list_with_pagination(
- 'floatingip', (fp1, fp2, fp3),
- ('floating_ip_address', 'asc'), 2, 2)
- finally:
- self._delete('floatingips', fp1['floatingip']['id'])
- self._delete('floatingips', fp2['floatingip']['id'])
- self._delete('floatingips', fp3['floatingip']['id'])
+ self._test_list_with_pagination(
+ 'floatingip', (fp1, fp2, fp3),
+ ('floating_ip_address', 'asc'), 2, 2)
def test_floatingip_list_with_pagination_reverse(self):
with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
fp1 = self._make_floatingip(self.fmt, network_id1)
fp2 = self._make_floatingip(self.fmt, network_id2)
fp3 = self._make_floatingip(self.fmt, network_id3)
- try:
- self._test_list_with_pagination_reverse(
- 'floatingip', (fp1, fp2, fp3),
- ('floating_ip_address', 'asc'), 2, 2)
- finally:
- self._delete('floatingips', fp1['floatingip']['id'])
- self._delete('floatingips', fp2['floatingip']['id'])
- self._delete('floatingips', fp3['floatingip']['id'])
+ self._test_list_with_pagination_reverse(
+ 'floatingip', (fp1, fp2, fp3),
+ ('floating_ip_address', 'asc'), 2, 2)
def test_floatingip_multi_external_one_internal(self):
with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
# subnet cannot be deleted as it's attached to a router
self._delete('subnets', s['subnet']['id'],
expected_code=exc.HTTPConflict.code)
- # remove interface so test can exit without errors
- self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
def _ipv6_subnet(self, mode):
return self.subnet(cidr='fd00::1/64', gateway_ip='fd00::1',
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.10')
- try:
- self.assertEqual(fp['floatingip']['floating_ip_address'],
- '10.0.0.10')
- finally:
- self._delete('floatingips', fp['floatingip']['id'])
+ self.assertEqual(fp['floatingip']['floating_ip_address'],
+ '10.0.0.10')
def test_create_floatingip_with_specific_ip_out_of_allocation(self):
with self.subnet(cidr='10.0.0.0/24',
self._set_net_external(network_id)
fp = self._make_floatingip(self.fmt, network_id,
floating_ip='10.0.0.30')
- try:
- self.assertEqual(fp['floatingip']['floating_ip_address'],
- '10.0.0.30')
- finally:
- self._delete('floatingips', fp['floatingip']['id'])
+ self.assertEqual(fp['floatingip']['floating_ip_address'],
+ '10.0.0.30')
def test_create_floatingip_with_specific_ip_non_admin(self):
ctx = context.Context('user_id', 'tenant_id')
with self.subnet(cidr='10.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
- fp1 = self._make_floatingip(self.fmt, network_id,
- floating_ip='10.0.0.10')
+ self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.10')
- try:
- self._make_floatingip(self.fmt, network_id,
- floating_ip='10.0.0.10',
- http_status=exc.HTTPConflict.code)
- finally:
- self._delete('floatingips', fp1['floatingip']['id'])
+ self._make_floatingip(self.fmt, network_id,
+ floating_ip='10.0.0.10',
+ http_status=exc.HTTPConflict.code)
class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
subnet_id = subnets[0]['id']
wanted_subnetid = p['port']['fixed_ips'][0]['subnet_id']
self.assertEqual(wanted_subnetid, subnet_id)
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
def test_l3_agent_routers_query_ignore_interfaces_with_moreThanOneIp(self):
with self.router() as r:
self.assertEqual(1, len(routers))
interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
self.assertEqual(1, len(interfaces))
- # clean-up
- self._router_interface_action('remove',
- r['router']['id'],
- None,
- p['port']['id'])
def test_l3_agent_routers_query_gateway(self):
with self.router() as r:
def _test_floatingips_op_agent(self, notifyApi):
with self.floatingip_with_assoc():
pass
- # add gateway, add interface, associate, deletion of floatingip,
- # delete gateway, delete interface
- self.assertEqual(6, notifyApi.routers_updated.call_count)
+ # add gateway, add interface, associate, deletion of floatingip
+ self.assertEqual(4, notifyApi.routers_updated.call_count)
def test_floatingips_op_agent(self):
self._test_notify_op_agent(self._test_floatingips_op_agent)
s2['subnet']['network_id'])
self._assert_router_on_agent(r['router']['id'], 'host2')
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s2['subnet']['network_id'])
-
def test_update_gateway_agent_exists_supporting_multiple_network(self):
with contextlib.nested(self.router(),
self.subnet(),
s2['subnet']['network_id'])
self._assert_router_on_agent(r['router']['id'], 'host2')
- self._remove_external_gateway_from_router(
- r['router']['id'],
- s2['subnet']['network_id'])
-
def test_router_update_gateway_no_eligible_l3_agent(self):
with self.router() as r:
with self.subnet() as s1: