network_id)
for subnet in subnets:
self._check_for_dup_router_subnet(context, router_id,
- network_id, subnet['id'])
+ network_id, subnet['id'],
+ subnet['cidr'])
# Port has no 'tenant-id', as it is hidden from user
gw_port = self.create_port(context.elevated(), {
filters=filters)
def _check_for_dup_router_subnet(self, context, router_id,
- network_id, subnet_id):
+ network_id, subnet_id, subnet_cidr):
try:
rport_qry = context.session.query(models_v2.Port)
rports = rport_qry.filter_by(
device_id=router_id).all()
# its possible these ports on on the same network, but
# different subnet
- new_cidr = self._get_subnet(context, subnet_id)['cidr']
- new_ipnet = netaddr.IPNetwork(new_cidr)
+ new_ipnet = netaddr.IPNetwork(subnet_cidr)
for p in rports:
for ip in p['fixed_ips']:
if ip['subnet_id'] == subnet_id:
msg = ("Router already has a port on subnet %s"
% subnet_id)
raise q_exc.BadRequest(resource='router', msg=msg)
+ sub_id = ip['subnet_id']
cidr = self._get_subnet(context.elevated(),
- ip['subnet_id'])['cidr']
+ sub_id)['cidr']
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
- match2 = netaddr.all_matching_cidrs(ipnet, [new_cidr])
+ match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr])
if match1 or match2:
- msg = (("Cidr %s of subnet %s is overlapped "
- + "with cidr %s of subnet %s")
- % (new_cidr, subnet_id, cidr, ip['subnet_id']))
+ msg = (_("Cidr %(subnet_cidr)s of subnet "
+ "%(subnet_id)s overlaps with cidr %(cidr)s "
+ " of subnet %(sub_id)") % locals())
raise q_exc.BadRequest(resource='router', msg=msg)
except exc.NoResultFound:
pass
if len(fixed_ips) != 1:
msg = 'Router port must have exactly one fixed IP'
raise q_exc.BadRequest(resource='router', msg=msg)
+ subnet = self._get_subnet(context, fixed_ips[0]['subnet_id'])
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
- fixed_ips[0]['subnet_id'])
+ subnet['id'],
+ subnet['cidr'])
port.update({'device_id': router_id,
'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
msg = 'Subnet for router interface must have a gateway IP'
raise q_exc.BadRequest(resource='router', msg=msg)
self._check_for_dup_router_subnet(context, router_id,
- subnet['network_id'], subnet_id)
+ subnet['network_id'],
+ subnet_id,
+ subnet['cidr'])
fixed_ip = {'ip_address': subnet['gateway_ip'],
'subnet_id': subnet['id']}
port = self.create_port(context, {
return self.ext_api
def _delete(self, collection, id,
- expected_code=webob.exc.HTTPNoContent.code):
+ expected_code=webob.exc.HTTPNoContent.code,
+ quantum_context=None):
req = self.new_delete_request(collection, id)
+ if quantum_context:
+ # create a specific auth context for this request
+ req.environ['quantum.context'] = quantum_context
res = req.get_response(self._api_for_resource(collection))
self.assertEqual(res.status_int, expected_code)
- def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
+ def _show(self, resource, id,
+ expected_code=webob.exc.HTTPOk.code,
+ quantum_context=None):
req = self.new_show_request(resource, id)
+ if quantum_context:
+ # create a specific auth context for this request
+ req.environ['quantum.context'] = quantum_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
def _update(self, resource, id, new_data,
- expected_code=webob.exc.HTTPOk.code):
+ expected_code=webob.exc.HTTPOk.code,
+ quantum_context=None):
req = self.new_update_request(resource, new_data, id)
+ if quantum_context:
+ # create a specific auth context for this request
+ req.environ['quantum.context'] = quantum_context
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(res.status_int, expected_code)
return self.deserialize('json', res)
test_config['extension_manager'] = ext_mgr
super(L3NatDBTestCase, self).setUp()
- def _create_router(self, fmt, tenant_id, name=None, admin_state_up=None):
+ def _create_router(self, fmt, tenant_id, name=None,
+ admin_state_up=None, set_context=False):
data = {'router': {'tenant_id': tenant_id}}
if name:
data['router']['name'] = name
if admin_state_up:
data['router']['admin_state_up'] = admin_state_up
router_req = self.new_create_request('routers', data, fmt)
+ if set_context and tenant_id:
+ # create a specific auth context for this request
+ router_req.environ['quantum.context'] = context.Context(
+ '', tenant_id)
+
return router_req.get_response(self.ext_api)
- def _make_router(self, fmt, tenant_id, name=None, admin_state_up=None):
- res = self._create_router(fmt, tenant_id, name, admin_state_up)
+ def _make_router(self, fmt, tenant_id, name=None,
+ admin_state_up=None, set_context=False):
+ res = self._create_router(fmt, tenant_id, name,
+ admin_state_up, set_context)
return self.deserialize(fmt, res)
def _add_external_gateway_to_router(self, router_id, network_id,
- expected_code=exc.HTTPOk.code):
+ expected_code=exc.HTTPOk.code,
+ quantum_context=None):
return self._update('routers', router_id,
{'router': {'external_gateway_info':
{'network_id': network_id}}},
- expected_code=expected_code)
+ expected_code=expected_code,
+ quantum_context=quantum_context)
def _remove_external_gateway_from_router(self, router_id, network_id,
expected_code=exc.HTTPOk.code):
@contextlib.contextmanager
def router(self, name='router1', admin_status_up=True,
- fmt='json', tenant_id=_uuid()):
- router = self._make_router(fmt, tenant_id, name, admin_status_up)
+ fmt='json', tenant_id=_uuid(), set_context=False):
+ router = self._make_router(fmt, tenant_id, name,
+ admin_status_up, set_context)
try:
yield router
finally:
gw_info = body['router']['external_gateway_info']
self.assertEqual(gw_info, None)
+ def test_router_add_gateway_tenant_ctx(self):
+ with self.router(tenant_id='noadmin',
+ set_context=True) as r:
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ ctx = context.Context('', 'noadmin')
+ self._add_external_gateway_to_router(
+ r['router']['id'],
+ s['subnet']['network_id'],
+ quantum_context=ctx)
+ body = self._show('routers', r['router']['id'])
+ net_id = body['router']['external_gateway_info']['network_id']
+ self.assertEqual(net_id, s['subnet']['network_id'])
+ self._remove_external_gateway_from_router(
+ r['router']['id'],
+ s['subnet']['network_id'])
+ body = self._show('routers', r['router']['id'])
+ gw_info = body['router']['external_gateway_info']
+ self.assertEqual(gw_info, None)
+
def test_router_add_gateway_invalid_network_returns_404(self):
with self.router() as r:
self._add_external_gateway_to_router(
{'network': {l3.EXTERNAL: True}})
def _create_floatingip(self, fmt, network_id, port_id=None,
- fixed_ip=None):
+ fixed_ip=None, set_context=False):
data = {'floatingip': {'floating_network_id': network_id,
'tenant_id': self._tenant_id}}
if port_id:
if fixed_ip:
data['floatingip']['fixed_ip_address'] = fixed_ip
floatingip_req = self.new_create_request('floatingips', data, fmt)
+ if set_context and self._tenant_id:
+ # create a specific auth context for this request
+ floatingip_req.environ['quantum.context'] = context.Context(
+ '', self._tenant_id)
return floatingip_req.get_response(self.ext_api)
def _make_floatingip(self, fmt, network_id, port_id=None,
- fixed_ip=None):
- res = self._create_floatingip(fmt, network_id, port_id, fixed_ip)
+ fixed_ip=None, set_context=False):
+ res = self._create_floatingip(fmt, network_id, port_id,
+ fixed_ip, set_context)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
return self.deserialize(fmt, res)
fip['floatingip']['id'])
@contextlib.contextmanager
- def floatingip_with_assoc(self, port_id=None, fmt='json'):
+ def floatingip_with_assoc(self, port_id=None, fmt='json',
+ set_context=False):
with self.subnet(cidr='11.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
floatingip = self._make_floatingip(
fmt,
public_sub['subnet']['network_id'],
- port_id=private_port['port']['id'])
+ port_id=private_port['port']['id'],
+ set_context=False)
yield floatingip
finally:
if floatingip:
public_sub['subnet']['network_id'])
@contextlib.contextmanager
- def floatingip_no_assoc(self, private_sub, fmt='json'):
+ def floatingip_no_assoc(self, private_sub, fmt='json', set_context=False):
with self.subnet(cidr='12.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.router() as r:
floatingip = self._make_floatingip(
fmt,
- public_sub['subnet']['network_id'])
+ public_sub['subnet']['network_id'],
+ set_context=set_context)
yield floatingip
finally:
if floatingip: