pass
def add_router_interface(self, context, router_id, interface_info):
- # make sure router exists - will raise if not
- self._get_router(context, router_id)
+ # make sure router exists
+ router = self._get_router(context, router_id)
if not interface_info:
msg = "Either subnet_id or port_id must be specified"
raise q_exc.BadRequest(resource='router', msg=msg)
+ try:
+ policy.enforce(context,
+ "extension:router:add_router_interface",
+ self._make_router_dict(router))
+ except q_exc.PolicyNotAuthorized:
+ raise l3.RouterNotFound(router_id=router_id)
+
if 'port_id' in interface_info:
if 'subnet_id' in interface_info:
msg = "cannot specify both subnet-id and port-id"
def remove_router_interface(self, context, router_id, interface_info):
# make sure router exists
router = self._get_router(context, router_id)
+ try:
+ policy.enforce(context,
+ "extension:router:remove_router_interface",
+ self._make_router_dict(router))
+ except q_exc.PolicyNotAuthorized:
+ raise l3.RouterNotFound(router_id=router_id)
if not interface_info:
msg = "Either subnet_id or port_id must be specified"
return self.deserialize('json', res)
@contextlib.contextmanager
- def router(self, name='router1', admin_status_up=True, fmt='json'):
- res = self._create_router(fmt, _uuid(), name=name,
+ def router(self, name='router1', admin_status_up=True,
+ fmt='json', tenant_id=_uuid()):
+ res = self._create_router(fmt, tenant_id, name=name,
admin_state_up=admin_status_up)
router = self.deserialize(fmt, res)
yield router
body = self._show('ports', r_port_id,
expected_code=exc.HTTPNotFound.code)
+ def test_router_add_interface_subnet_with_bad_tenant(self):
+ with mock.patch('quantum.context.Context.to_dict') as tdict:
+ tenant_id = _uuid()
+ admin_context = {'roles': ['admin']}
+ tenant_context = {'tenant_id': 'bad_tenant',
+ 'roles': []}
+ tdict.return_value = admin_context
+ with self.router(tenant_id=tenant_id) as r:
+ with self.network(tenant_id=tenant_id) as n:
+ with self.subnet(network=n) as s:
+ tdict.return_value = tenant_context
+ err_code = exc.HTTPNotFound.code
+ self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ err_code)
+ tdict.return_value = admin_context
+ body = self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
+ self.assertTrue('port_id' in body)
+ tdict.return_value = tenant_context
+ self._router_interface_action('remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None,
+ err_code)
+ tdict.return_value = admin_context
+ body = self._router_interface_action('remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
+
def test_router_add_interface_port(self):
with self.router() as r:
with self.port(no_delete=True) as p:
None,
p['port']['id'])
+ def test_router_add_interface_port_bad_tenant(self):
+ with mock.patch('quantum.context.Context.to_dict') as tdict:
+ tenant_id = _uuid()
+ admin_context = {'roles': ['admin']}
+ tenant_context = {'tenant_id': 'bad_tenant',
+ 'roles': []}
+ tdict.return_value = admin_context
+ with self.router() as r:
+ with self.port(no_delete=True) as p:
+ tdict.return_value = tenant_context
+ err_code = exc.HTTPNotFound.code
+ body = self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'],
+ err_code)
+ tdict.return_value = admin_context
+ body = self._router_interface_action('add',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
+ tdict.return_value = tenant_context
+ # clean-up
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'],
+ err_code)
+
+ tdict.return_value = admin_context
+ self._router_interface_action('remove',
+ r['router']['id'],
+ None,
+ p['port']['id'])
+
def test_router_add_interface_dup_subnet1(self):
with self.router() as r:
with self.subnet() as s: