]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Added policy checks for add interface and remove interface
authorNachi Ueno <nachi@nttmcl.com>
Wed, 5 Sep 2012 20:29:27 +0000 (20:29 +0000)
committerNachi Ueno <nachi@nttmcl.com>
Sun, 9 Sep 2012 16:50:55 +0000 (16:50 +0000)
Fixes bug 1042037
admin_only policy didn't works, so policy checks
for add interface and remove interface needed.
Also updated policy.json

Change-Id: Ifec281250ccbe1680a3e634f4efdb7ba7ef3ec94

etc/policy.json
quantum/db/l3_db.py
quantum/tests/unit/test_l3_plugin.py

index 6cbb374febf7c30703bcfd1c0d48aa6967b029fb..c975189796c17bde34745ef9ec1eb5efd9d40279 100644 (file)
@@ -12,6 +12,8 @@
 
     "extension:router:view": [["rule:regular_user"]],
     "extension:router:set": [["rule:admin_only"]],
+    "extension:router:add_router_interface": [["rule:admin_only"]],
+    "extension:router:remove_router_interface": [["rule:admin_or_owner"]],
 
     "subnets:private:read": [["rule:admin_or_owner"]],
     "subnets:private:write": [["rule:admin_or_owner"]],
index f65fca81bf1962bbf037dd44a51a365fcbab7a63..980fb0e5d67e11189971abd982380147e229b238 100644 (file)
@@ -274,12 +274,19 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
             pass
 
     def add_router_interface(self, context, router_id, interface_info):
-        # make sure router exists - will raise if not
-        self._get_router(context, router_id)
+        # make sure router exists
+        router = self._get_router(context, router_id)
         if not interface_info:
             msg = "Either subnet_id or port_id must be specified"
             raise q_exc.BadRequest(resource='router', msg=msg)
 
+        try:
+            policy.enforce(context,
+                           "extension:router:add_router_interface",
+                           self._make_router_dict(router))
+        except q_exc.PolicyNotAuthorized:
+            raise l3.RouterNotFound(router_id=router_id)
+
         if 'port_id' in interface_info:
             if 'subnet_id' in interface_info:
                 msg = "cannot specify both subnet-id and port-id"
@@ -327,6 +334,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
     def remove_router_interface(self, context, router_id, interface_info):
         # make sure router exists
         router = self._get_router(context, router_id)
+        try:
+            policy.enforce(context,
+                           "extension:router:remove_router_interface",
+                           self._make_router_dict(router))
+        except q_exc.PolicyNotAuthorized:
+            raise l3.RouterNotFound(router_id=router_id)
 
         if not interface_info:
             msg = "Either subnet_id or port_id must be specified"
index b96dffe03f19c8cda00420d9c5bd271659906586..fae16af41223379e6db656d0f6669e73c20734f4 100644 (file)
@@ -329,8 +329,9 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
         return self.deserialize('json', res)
 
     @contextlib.contextmanager
-    def router(self, name='router1', admin_status_up=True, fmt='json'):
-        res = self._create_router(fmt, _uuid(), name=name,
+    def router(self, name='router1', admin_status_up=True,
+               fmt='json', tenant_id=_uuid()):
+        res = self._create_router(fmt, tenant_id, name=name,
                                   admin_state_up=admin_status_up)
         router = self.deserialize(fmt, res)
         yield router
@@ -387,6 +388,41 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
                 body = self._show('ports', r_port_id,
                                   expected_code=exc.HTTPNotFound.code)
 
+    def test_router_add_interface_subnet_with_bad_tenant(self):
+        with mock.patch('quantum.context.Context.to_dict') as tdict:
+            tenant_id = _uuid()
+            admin_context = {'roles': ['admin']}
+            tenant_context = {'tenant_id': 'bad_tenant',
+                              'roles': []}
+            tdict.return_value = admin_context
+            with self.router(tenant_id=tenant_id) as r:
+                with self.network(tenant_id=tenant_id) as n:
+                    with self.subnet(network=n) as s:
+                        tdict.return_value = tenant_context
+                        err_code = exc.HTTPNotFound.code
+                        self._router_interface_action('add',
+                                                      r['router']['id'],
+                                                      s['subnet']['id'],
+                                                      None,
+                                                      err_code)
+                        tdict.return_value = admin_context
+                        body = self._router_interface_action('add',
+                                                             r['router']['id'],
+                                                             s['subnet']['id'],
+                                                             None)
+                        self.assertTrue('port_id' in body)
+                        tdict.return_value = tenant_context
+                        self._router_interface_action('remove',
+                                                      r['router']['id'],
+                                                      s['subnet']['id'],
+                                                      None,
+                                                      err_code)
+                        tdict.return_value = admin_context
+                        body = self._router_interface_action('remove',
+                                                             r['router']['id'],
+                                                             s['subnet']['id'],
+                                                             None)
+
     def test_router_add_interface_port(self):
         with self.router() as r:
             with self.port(no_delete=True) as p:
@@ -407,6 +443,42 @@ class L3NatDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
                                               None,
                                               p['port']['id'])
 
+    def test_router_add_interface_port_bad_tenant(self):
+        with mock.patch('quantum.context.Context.to_dict') as tdict:
+            tenant_id = _uuid()
+            admin_context = {'roles': ['admin']}
+            tenant_context = {'tenant_id': 'bad_tenant',
+                              'roles': []}
+            tdict.return_value = admin_context
+            with self.router() as r:
+                with self.port(no_delete=True) as p:
+                    tdict.return_value = tenant_context
+                    err_code = exc.HTTPNotFound.code
+                    body = self._router_interface_action('add',
+                                                         r['router']['id'],
+                                                         None,
+                                                         p['port']['id'],
+                                                         err_code)
+                    tdict.return_value = admin_context
+                    body = self._router_interface_action('add',
+                                                         r['router']['id'],
+                                                         None,
+                                                         p['port']['id'])
+
+                    tdict.return_value = tenant_context
+                    # clean-up
+                    self._router_interface_action('remove',
+                                                  r['router']['id'],
+                                                  None,
+                                                  p['port']['id'],
+                                                  err_code)
+
+                    tdict.return_value = admin_context
+                    self._router_interface_action('remove',
+                                                  r['router']['id'],
+                                                  None,
+                                                  p['port']['id'])
+
     def test_router_add_interface_dup_subnet1(self):
         with self.router() as r:
             with self.subnet() as s: