# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)
- if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF:
- self._enforce_device_owner_not_router_intf_or_device_id(context, p,
- tenant_id)
+ if p.get('device_owner'):
+ self._enforce_device_owner_not_router_intf_or_device_id(
+ context, p.get('device_owner'), p.get('device_id'), tenant_id)
port_data = dict(tenant_id=tenant_id,
name=p['name'],
p = port['port']
changed_ips = False
- changed_device_id = False
with context.session.begin(subtransactions=True):
port = self._get_port(context, id)
- if 'device_owner' in p:
- current_device_owner = p['device_owner']
- changed_device_owner = True
- else:
- current_device_owner = port['device_owner']
- changed_device_owner = False
- if p.get('device_id') != port['device_id']:
- changed_device_id = True
-
- # if the current device_owner is ROUTER_INF and the device_id or
- # device_owner changed check device_id is not another tenants
- # router
- if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF)
- and (changed_device_id or changed_device_owner)):
+ changed_owner = 'device_owner' in p
+ current_owner = p.get('device_owner') or port['device_owner']
+ changed_device_id = p.get('device_id') != port['device_id']
+ current_device_id = p.get('device_id') or port['device_id']
+
+ if current_owner and changed_device_id or changed_owner:
self._enforce_device_owner_not_router_intf_or_device_id(
- context, p, port['tenant_id'], port)
+ context, current_owner, current_device_id,
+ port['tenant_id'])
new_mac = p.get('mac_address')
if new_mac and new_mac != port['mac_address']:
self._check_mac_addr_update(
- context, port, new_mac, current_device_owner)
+ context, port, new_mac, current_owner)
# Check if the IPs need to be updated
network_id = port['network_id']
return self._get_ports_query(context, filters).count()
def _enforce_device_owner_not_router_intf_or_device_id(self, context,
- port_request,
- tenant_id,
- db_port=None):
+ device_owner,
+ device_id,
+ tenant_id):
+ """Prevent tenants from replacing the device id of router ports with
+ a router uuid belonging to another tenant.
+ """
+ if device_owner not in constants.ROUTER_INTERFACE_OWNERS:
+ return
if not context.is_admin:
- # find the device_id. If the call was update_port and the
- # device_id was not passed in we use the device_id from the
- # db.
- device_id = port_request.get('device_id')
- if not device_id and db_port:
- device_id = db_port.get('device_id')
# check to make sure device_id does not match another tenants
# router.
if device_id:
with self.network(tenant_id='tenant_a',
set_context=True) as n:
with self.subnet(network=n):
- self._create_port(
- self.fmt, n['network']['id'],
- tenant_id='tenant_a',
- device_id=admin_router['router']['id'],
- device_owner='network:router_interface',
- set_context=True,
- expected_res_status=exc.HTTPConflict.code)
+ for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
+ self._create_port(
+ self.fmt, n['network']['id'],
+ tenant_id='tenant_a',
+ device_id=admin_router['router']['id'],
+ device_owner=device_owner,
+ set_context=True,
+ expected_res_status=exc.HTTPConflict.code)
def test_create_non_router_port_device_id_of_other_teants_router_update(
self):
with self.network(tenant_id='tenant_a',
set_context=True) as n:
with self.subnet(network=n):
- port_res = self._create_port(
- self.fmt, n['network']['id'],
- tenant_id='tenant_a',
- device_id=admin_router['router']['id'],
- set_context=True)
- port = self.deserialize(self.fmt, port_res)
- neutron_context = context.Context('', 'tenant_a')
- data = {'port': {'device_owner':
- 'network:router_interface'}}
- self._update('ports', port['port']['id'], data,
- neutron_context=neutron_context,
- expected_code=exc.HTTPConflict.code)
- self._delete('ports', port['port']['id'])
+ for device_owner in l3_constants.ROUTER_INTERFACE_OWNERS:
+ port_res = self._create_port(
+ self.fmt, n['network']['id'],
+ tenant_id='tenant_a',
+ device_id=admin_router['router']['id'],
+ set_context=True)
+ port = self.deserialize(self.fmt, port_res)
+ neutron_context = context.Context('', 'tenant_a')
+ data = {'port': {'device_owner': device_owner}}
+ self._update('ports', port['port']['id'], data,
+ neutron_context=neutron_context,
+ expected_code=exc.HTTPConflict.code)
+ self._delete('ports', port['port']['id'])
def test_update_port_device_id_to_different_tenants_router(self):
with self.router() as admin_router: