]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Fix lack of device ownership enforcement for DVR routers
authorarmando-migliaccio <armamig@gmail.com>
Fri, 6 Feb 2015 00:27:52 +0000 (16:27 -0800)
committerarmando-migliaccio <armamig@gmail.com>
Mon, 9 Feb 2015 20:30:09 +0000 (12:30 -0800)
The enforcement rule was applied to centralized router interfaces, to avoid
a potential security vulnerabilty.

Even though DVR routers are fundamentally different from centralized routers,
there is no good reason as to why the rule should be skipped for DVR interfaces.

This patch sanitizes the insanity a bit and closes this potential loophole by
preventing the operation for DVR routers too.

Related-bug: #1243327
Closes-bug: #1410984

Change-Id: I048e6e3926e1c74cf9ecb63cfb53a0b1afb3c579

neutron/db/db_base_plugin_v2.py
neutron/tests/unit/test_l3_plugin.py

index 9f7b5b46fc8f6b1d4ec302b5309e7da14f5fb0fb..03bbe948f1ec888a31214229bfcd84606a8c27ca 100644 (file)
@@ -1321,9 +1321,9 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
         # NOTE(jkoelker) Get the tenant_id outside of the session to avoid
         #                unneeded db action if the operation raises
         tenant_id = self._get_tenant_id_for_create(context, p)
-        if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF:
-            self._enforce_device_owner_not_router_intf_or_device_id(context, p,
-                                                                    tenant_id)
+        if p.get('device_owner'):
+            self._enforce_device_owner_not_router_intf_or_device_id(
+                context, p.get('device_owner'), p.get('device_id'), tenant_id)
 
         port_data = dict(tenant_id=tenant_id,
                          name=p['name'],
@@ -1361,30 +1361,22 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
         p = port['port']
 
         changed_ips = False
-        changed_device_id = False
         with context.session.begin(subtransactions=True):
             port = self._get_port(context, id)
-            if 'device_owner' in p:
-                current_device_owner = p['device_owner']
-                changed_device_owner = True
-            else:
-                current_device_owner = port['device_owner']
-                changed_device_owner = False
-            if p.get('device_id') != port['device_id']:
-                changed_device_id = True
-
-            # if the current device_owner is ROUTER_INF and the device_id or
-            # device_owner changed check device_id is not another tenants
-            # router
-            if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF)
-                    and (changed_device_id or changed_device_owner)):
+            changed_owner = 'device_owner' in p
+            current_owner = p.get('device_owner') or port['device_owner']
+            changed_device_id = p.get('device_id') != port['device_id']
+            current_device_id = p.get('device_id') or port['device_id']
+
+            if current_owner and changed_device_id or changed_owner:
                 self._enforce_device_owner_not_router_intf_or_device_id(
-                    context, p, port['tenant_id'], port)
+                    context, current_owner, current_device_id,
+                    port['tenant_id'])
 
             new_mac = p.get('mac_address')
             if new_mac and new_mac != port['mac_address']:
                 self._check_mac_addr_update(
-                    context, port, new_mac, current_device_owner)
+                    context, port, new_mac, current_owner)
 
             # Check if the IPs need to be updated
             network_id = port['network_id']
@@ -1490,16 +1482,15 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
         return self._get_ports_query(context, filters).count()
 
     def _enforce_device_owner_not_router_intf_or_device_id(self, context,
-                                                           port_request,
-                                                           tenant_id,
-                                                           db_port=None):
+                                                           device_owner,
+                                                           device_id,
+                                                           tenant_id):
+        """Prevent tenants from replacing the device id of router ports with
+        a router uuid belonging to another tenant.
+        """
+        if device_owner not in constants.ROUTER_INTERFACE_OWNERS:
+            return
         if not context.is_admin:
-            # find the device_id. If the call was update_port and the
-            # device_id was not passed in we use the device_id from the
-            # db.
-            device_id = port_request.get('device_id')
-            if not device_id and db_port:
-                device_id = db_port.get('device_id')
             # check to make sure device_id does not match another tenants
             # router.
             if device_id:
index f9afd4d8ef7df869e7b3ad267b196c03ea5aad96..a76e82c6115bf1e6a369c4af183038ccb6001b3a 100644 (file)
@@ -1255,13 +1255,14 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
             with self.network(tenant_id='tenant_a',
                               set_context=True) as n:
                 with self.subnet(network=n):
-                    self._create_port(
-                        self.fmt, n['network']['id'],
-                        tenant_id='tenant_a',
-                        device_id=admin_router['router']['id'],
-                        device_owner='network:router_interface',
-                        set_context=True,
-                        expected_res_status=exc.HTTPConflict.code)
+                    for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
+                        self._create_port(
+                            self.fmt, n['network']['id'],
+                            tenant_id='tenant_a',
+                            device_id=admin_router['router']['id'],
+                            device_owner=device_owner,
+                            set_context=True,
+                            expected_res_status=exc.HTTPConflict.code)
 
     def test_create_non_router_port_device_id_of_other_teants_router_update(
         self):
@@ -1272,19 +1273,19 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
             with self.network(tenant_id='tenant_a',
                               set_context=True) as n:
                 with self.subnet(network=n):
-                    port_res = self._create_port(
-                        self.fmt, n['network']['id'],
-                        tenant_id='tenant_a',
-                        device_id=admin_router['router']['id'],
-                        set_context=True)
-                    port = self.deserialize(self.fmt, port_res)
-                    neutron_context = context.Context('', 'tenant_a')
-                    data = {'port': {'device_owner':
-                                     'network:router_interface'}}
-                    self._update('ports', port['port']['id'], data,
-                                 neutron_context=neutron_context,
-                                 expected_code=exc.HTTPConflict.code)
-                    self._delete('ports', port['port']['id'])
+                    for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
+                        port_res = self._create_port(
+                            self.fmt, n['network']['id'],
+                            tenant_id='tenant_a',
+                            device_id=admin_router['router']['id'],
+                            set_context=True)
+                        port = self.deserialize(self.fmt, port_res)
+                        neutron_context = context.Context('', 'tenant_a')
+                        data = {'port': {'device_owner': device_owner}}
+                        self._update('ports', port['port']['id'], data,
+                                     neutron_context=neutron_context,
+                                     expected_code=exc.HTTPConflict.code)
+                        self._delete('ports', port['port']['id'])
 
     def test_update_port_device_id_to_different_tenants_router(self):
         with self.router() as admin_router: