From: armando-migliaccio Date: Fri, 6 Feb 2015 00:27:52 +0000 (-0800) Subject: Fix lack of device ownership enforcement for DVR routers X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=89025a8dd93918ac2967726ec7bb8ee5aa22d924;p=openstack-build%2Fneutron-build.git Fix lack of device ownership enforcement for DVR routers The enforcement rule was applied to centralized router interfaces, to avoid a potential security vulnerabilty. Even though DVR routers are fundamentally different from centralized routers, there is no good reason as to why the rule should be skipped for DVR interfaces. This patch sanitizes the insanity a bit and closes this potential loophole by preventing the operation for DVR routers too. Related-bug: #1243327 Closes-bug: #1410984 Change-Id: I048e6e3926e1c74cf9ecb63cfb53a0b1afb3c579 --- diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 9f7b5b46f..03bbe948f 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -1321,9 +1321,9 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, # NOTE(jkoelker) Get the tenant_id outside of the session to avoid # unneeded db action if the operation raises tenant_id = self._get_tenant_id_for_create(context, p) - if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF: - self._enforce_device_owner_not_router_intf_or_device_id(context, p, - tenant_id) + if p.get('device_owner'): + self._enforce_device_owner_not_router_intf_or_device_id( + context, p.get('device_owner'), p.get('device_id'), tenant_id) port_data = dict(tenant_id=tenant_id, name=p['name'], @@ -1361,30 +1361,22 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, p = port['port'] changed_ips = False - changed_device_id = False with context.session.begin(subtransactions=True): port = self._get_port(context, id) - if 'device_owner' in p: - current_device_owner = p['device_owner'] - changed_device_owner = True - else: - current_device_owner = port['device_owner'] - changed_device_owner = False - if p.get('device_id') != port['device_id']: - changed_device_id = True - - # if the current device_owner is ROUTER_INF and the device_id or - # device_owner changed check device_id is not another tenants - # router - if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF) - and (changed_device_id or changed_device_owner)): + changed_owner = 'device_owner' in p + current_owner = p.get('device_owner') or port['device_owner'] + changed_device_id = p.get('device_id') != port['device_id'] + current_device_id = p.get('device_id') or port['device_id'] + + if current_owner and changed_device_id or changed_owner: self._enforce_device_owner_not_router_intf_or_device_id( - context, p, port['tenant_id'], port) + context, current_owner, current_device_id, + port['tenant_id']) new_mac = p.get('mac_address') if new_mac and new_mac != port['mac_address']: self._check_mac_addr_update( - context, port, new_mac, current_device_owner) + context, port, new_mac, current_owner) # Check if the IPs need to be updated network_id = port['network_id'] @@ -1490,16 +1482,15 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2, return self._get_ports_query(context, filters).count() def _enforce_device_owner_not_router_intf_or_device_id(self, context, - port_request, - tenant_id, - db_port=None): + device_owner, + device_id, + tenant_id): + """Prevent tenants from replacing the device id of router ports with + a router uuid belonging to another tenant. + """ + if device_owner not in constants.ROUTER_INTERFACE_OWNERS: + return if not context.is_admin: - # find the device_id. If the call was update_port and the - # device_id was not passed in we use the device_id from the - # db. - device_id = port_request.get('device_id') - if not device_id and db_port: - device_id = db_port.get('device_id') # check to make sure device_id does not match another tenants # router. if device_id: diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index f9afd4d8e..a76e82c61 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -1255,13 +1255,14 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): with self.network(tenant_id='tenant_a', set_context=True) as n: with self.subnet(network=n): - self._create_port( - self.fmt, n['network']['id'], - tenant_id='tenant_a', - device_id=admin_router['router']['id'], - device_owner='network:router_interface', - set_context=True, - expected_res_status=exc.HTTPConflict.code) + for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS: + self._create_port( + self.fmt, n['network']['id'], + tenant_id='tenant_a', + device_id=admin_router['router']['id'], + device_owner=device_owner, + set_context=True, + expected_res_status=exc.HTTPConflict.code) def test_create_non_router_port_device_id_of_other_teants_router_update( self): @@ -1272,19 +1273,19 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): with self.network(tenant_id='tenant_a', set_context=True) as n: with self.subnet(network=n): - port_res = self._create_port( - self.fmt, n['network']['id'], - tenant_id='tenant_a', - device_id=admin_router['router']['id'], - set_context=True) - port = self.deserialize(self.fmt, port_res) - neutron_context = context.Context('', 'tenant_a') - data = {'port': {'device_owner': - 'network:router_interface'}} - self._update('ports', port['port']['id'], data, - neutron_context=neutron_context, - expected_code=exc.HTTPConflict.code) - self._delete('ports', port['port']['id']) + for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS: + port_res = self._create_port( + self.fmt, n['network']['id'], + tenant_id='tenant_a', + device_id=admin_router['router']['id'], + set_context=True) + port = self.deserialize(self.fmt, port_res) + neutron_context = context.Context('', 'tenant_a') + data = {'port': {'device_owner': device_owner}} + self._update('ports', port['port']['id'], data, + neutron_context=neutron_context, + expected_code=exc.HTTPConflict.code) + self._delete('ports', port['port']['id']) def test_update_port_device_id_to_different_tenants_router(self): with self.router() as admin_router: