from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
+from neutron import context as ctx
from neutron.db import api as db
from neutron.db import models_v2
from neutron.db import sqlalchemyutils
+from neutron.extensions import l3
+from neutron import manager
from neutron import neutron_plugin_base_v2
from neutron.notifiers import nova
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
+from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
tenant_id = self._get_tenant_id_for_create(context, p)
+ if p.get('device_owner') == constants.DEVICE_OWNER_ROUTER_INTF:
+ self._enforce_device_owner_not_router_intf_or_device_id(context, p,
+ tenant_id)
with context.session.begin(subtransactions=True):
network = self._get_network(context, network_id)
changed_ips = False
with context.session.begin(subtransactions=True):
port = self._get_port(context, id)
+ if 'device_owner' in p:
+ current_device_owner = p['device_owner']
+ changed_device_owner = True
+ else:
+ current_device_owner = port['device_owner']
+ changed_device_owner = False
+ if p.get('device_id') != port['device_id']:
+ changed_device_id = True
+
+ # if the current device_owner is ROUTER_INF and the device_id or
+ # device_owner changed check device_id is not another tenants
+ # router
+ if ((current_device_owner == constants.DEVICE_OWNER_ROUTER_INTF)
+ and (changed_device_id or changed_device_owner)):
+ self._enforce_device_owner_not_router_intf_or_device_id(
+ context, p, port['tenant_id'], port)
+
# Check if the IPs need to be updated
if 'fixed_ips' in p:
changed_ips = True
def get_ports_count(self, context, filters=None):
return self._get_ports_query(context, filters).count()
+
+ def _enforce_device_owner_not_router_intf_or_device_id(self, context,
+ port_request,
+ tenant_id,
+ db_port=None):
+ if not context.is_admin:
+ # find the device_id. If the call was update_port and the
+ # device_id was not passed in we use the device_id from the
+ # db.
+ device_id = port_request.get('device_id')
+ if not device_id and db_port:
+ device_id = db_port.get('device_id')
+ # check to make sure device_id does not match another tenants
+ # router.
+ if device_id:
+ if hasattr(self, 'get_router'):
+ try:
+ ctx_admin = ctx.get_admin_context()
+ router = self.get_router(ctx_admin, device_id)
+ except l3.RouterNotFound:
+ return
+ else:
+ l3plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT))
+ if l3plugin:
+ try:
+ ctx_admin = ctx.get_admin_context()
+ router = l3plugin.get_router(ctx_admin,
+ device_id)
+ except l3.RouterNotFound:
+ return
+ else:
+ # raise as extension doesn't support L3 anyways.
+ raise n_exc.DeviceIDNotOwnedByTenant(
+ device_id=device_id)
+ if tenant_id != router['tenant_id']:
+ raise n_exc.DeviceIDNotOwnedByTenant(device_id=device_id)
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
+ def test_create_router_port_with_device_id_of_other_teants_router(self):
+ with self.router() as admin_router:
+ with self.network(tenant_id='tenant_a',
+ set_context=True) as n:
+ with self.subnet(network=n):
+ self._create_port(
+ self.fmt, n['network']['id'],
+ tenant_id='tenant_a',
+ device_id=admin_router['router']['id'],
+ device_owner='network:router_interface',
+ set_context=True,
+ expected_res_status=exc.HTTPConflict.code)
+
+ def test_create_non_router_port_device_id_of_other_teants_router_update(
+ self):
+ # This tests that HTTPConflict is raised if we create a non-router
+ # port that matches the device_id of another tenants router and then
+ # we change the device_owner to be network:router_interface.
+ with self.router() as admin_router:
+ with self.network(tenant_id='tenant_a',
+ set_context=True) as n:
+ with self.subnet(network=n):
+ port_res = self._create_port(
+ self.fmt, n['network']['id'],
+ tenant_id='tenant_a',
+ device_id=admin_router['router']['id'],
+ set_context=True)
+ port = self.deserialize(self.fmt, port_res)
+ neutron_context = context.Context('', 'tenant_a')
+ data = {'port': {'device_owner':
+ 'network:router_interface'}}
+ self._update('ports', port['port']['id'], data,
+ neutron_context=neutron_context,
+ expected_code=exc.HTTPConflict.code)
+ self._delete('ports', port['port']['id'])
+
+ def test_update_port_device_id_to_different_tenants_router(self):
+ with self.router() as admin_router:
+ with self.router(tenant_id='tenant_a',
+ set_context=True) as tenant_router:
+ with self.network(tenant_id='tenant_a',
+ set_context=True) as n:
+ with self.subnet(network=n) as s:
+ port = self._router_interface_action(
+ 'add', tenant_router['router']['id'],
+ s['subnet']['id'], None, tenant_id='tenant_a')
+ neutron_context = context.Context('', 'tenant_a')
+ data = {'port':
+ {'device_id': admin_router['router']['id']}}
+ self._update('ports', port['port_id'], data,
+ neutron_context=neutron_context,
+ expected_code=exc.HTTPConflict.code)
+ self._router_interface_action(
+ 'remove', tenant_router['router']['id'],
+ s['subnet']['id'], None, tenant_id='tenant_a')
+
def test_router_add_gateway_invalid_network_returns_404(self):
with self.router() as r:
self._add_external_gateway_to_router(