def create_address_scope(self, context, address_scope):
"""Create an address scope."""
a_s = address_scope['address_scope']
- tenant_id = self._get_tenant_id_for_create(context, a_s)
address_scope_id = a_s.get('id') or uuidutils.generate_uuid()
with context.session.begin(subtransactions=True):
- pool_args = {'tenant_id': tenant_id,
+ pool_args = {'tenant_id': a_s['tenant_id'],
'id': address_scope_id,
'name': a_s['name'],
'shared': a_s['shared'],
from sqlalchemy import or_
from sqlalchemy import sql
-from neutron._i18n import _
-from neutron.common import exceptions as n_exc
from neutron.db import sqlalchemyutils
if key in fields))
return resource
- def _get_tenant_id_for_create(self, context, resource):
- if context.is_admin and 'tenant_id' in resource:
- tenant_id = resource['tenant_id']
- elif ('tenant_id' in resource and
- resource['tenant_id'] != context.tenant_id):
- reason = _('Cannot create resource for another tenant')
- raise n_exc.AdminRequired(reason=reason)
- else:
- tenant_id = context.tenant_id
- return tenant_id
-
def _get_by_id(self, context, model, id):
query = self._model_query(context, model)
return query.filter(model.id == id).one()
n = network['network']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
- tenant_id = self._get_tenant_id_for_create(context, n)
+ tenant_id = n['tenant_id']
with context.session.begin(subtransactions=True):
args = {'tenant_id': tenant_id,
'id': n.get('id') or uuidutils.generate_uuid(),
net = netaddr.IPNetwork(s['cidr'])
subnet['subnet']['cidr'] = '%s/%s' % (net.network, net.prefixlen)
- s['tenant_id'] = self._get_tenant_id_for_create(context, s)
subnetpool_id = self._get_subnetpool_id(context, s)
if subnetpool_id:
self.ipam.validate_pools_with_subnetpool(s)
self._validate_address_scope_id(context, sp_reader.address_scope_id,
id, sp_reader.prefixes,
sp_reader.ip_version)
- tenant_id = self._get_tenant_id_for_create(context, sp)
with context.session.begin(subtransactions=True):
- pool_args = {'tenant_id': tenant_id,
+ pool_args = {'tenant_id': sp['tenant_id'],
'id': sp_reader.id,
'name': sp_reader.name,
'ip_version': sp_reader.ip_version,
network_id = p['network_id']
# NOTE(jkoelker) Get the tenant_id outside of the session to avoid
# unneeded db action if the operation raises
- tenant_id = self._get_tenant_id_for_create(context, p)
+ tenant_id = p['tenant_id']
if p.get('device_owner'):
self._enforce_device_owner_not_router_intf_or_device_id(
context, p.get('device_owner'), p.get('device_id'), tenant_id)
def create_router(self, context, router):
r = router['router']
gw_info = r.pop(EXTERNAL_GW_INFO, None)
- tenant_id = self._get_tenant_id_for_create(context, r)
- router_db = self._create_router_db(context, r, tenant_id)
+ router_db = self._create_router_db(context, r, r['tenant_id'])
try:
if gw_info:
self._update_router_gw_info(context, router_db['id'],
def _create_floatingip(self, context, floatingip,
initial_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):
fip = floatingip['floatingip']
- tenant_id = self._get_tenant_id_for_create(context, fip)
fip_id = uuidutils.generate_uuid()
f_net_id = fip['floating_network_id']
floating_ip_address = floating_fixed_ip['ip_address']
floatingip_db = FloatingIP(
id=fip_id,
- tenant_id=tenant_id,
+ tenant_id=fip['tenant_id'],
status=initial_status,
floating_network_id=fip['floating_network_id'],
floating_ip_address=floating_ip_address,
floating_port_id=external_port['id'])
- fip['tenant_id'] = tenant_id
# Update association with internal port
# and define external IP address
self._update_fip_assoc(context, fip,
def create_metering_label(self, context, metering_label):
m = metering_label['metering_label']
- tenant_id = self._get_tenant_id_for_create(context, m)
with context.session.begin(subtransactions=True):
metering_db = MeteringLabel(id=uuidutils.generate_uuid(),
description=m['description'],
- tenant_id=tenant_id,
+ tenant_id=m['tenant_id'],
name=m['name'],
shared=m['shared'])
context.session.add(metering_db)
except c_exc.CallbackFailure as e:
raise n_exc.InvalidInput(error_message=e)
dbmodel = models.get_type_model_map()[e['object_type']]
- tenant_id = self._get_tenant_id_for_create(context, e)
with context.session.begin(subtransactions=True):
db_entry = dbmodel(object_id=e['object_id'],
target_tenant=e['target_tenant'],
action=e['action'],
- tenant_id=tenant_id)
+ tenant_id=e['tenant_id'])
context.session.add(db_entry)
return self._make_rbac_policy_dict(db_entry)
except exceptions.CallbackFailure as e:
raise ext_sg.SecurityGroupConflict(reason=e)
- tenant_id = self._get_tenant_id_for_create(context, s)
+ tenant_id = s['tenant_id']
if not default_sg:
self._ensure_default_security_group(context, tenant_id)
except exceptions.CallbackFailure as e:
raise ext_sg.SecurityGroupConflict(reason=e)
- tenant_id = self._get_tenant_id_for_create(context, rule_dict)
with context.session.begin(subtransactions=True):
db = SecurityGroupRule(
id=(rule_dict.get('id') or uuidutils.generate_uuid()),
- tenant_id=tenant_id,
+ tenant_id=rule_dict['tenant_id'],
security_group_id=rule_dict['security_group_id'],
direction=rule_dict['direction'],
remote_group_id=rule_dict.get('remote_group_id'),
port = port['port']
if port.get('device_owner') and utils.is_port_trusted(port):
return
- tenant_id = self._get_tenant_id_for_create(context, port)
- default_sg = self._ensure_default_security_group(context, tenant_id)
+ default_sg = self._ensure_default_security_group(context,
+ port['tenant_id'])
if not attributes.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
port[ext_sg.SECURITYGROUPS] = [default_sg]
def _create_network_db(self, context, network):
net_data = network[attributes.NETWORK]
- tenant_id = self._get_tenant_id_for_create(context, net_data)
+ tenant_id = net_data['tenant_id']
session = context.session
with session.begin(subtransactions=True):
self._ensure_default_security_group(context, tenant_id)
return agent
def _create_router(self, name):
- router = {'name': name, 'admin_state_up': True}
+ router = {'name': name, 'admin_state_up': True,
+ 'tenant_id': self.adminContext.tenant_id}
return self.l3_plugin.create_router(
self.adminContext, {'router': router})
def _create_router(self, az_hints, ha):
router = {'name': 'router1', 'admin_state_up': True,
- 'availability_zone_hints': az_hints}
+ 'availability_zone_hints': az_hints,
+ 'tenant_id': self._tenant_id}
if ha:
router['ha'] = True
return self.l3_plugin.create_router(
# framework
kwargs.setdefault('admin_state_up', True)
kwargs.setdefault('shared', False)
+ kwargs.setdefault('tenant_id', self.ctx.tenant_id)
data = dict(network=kwargs)
result = self.plugin.create_network(self.ctx, data)
return base.AttributeDict(result)
def _prepare_network(self):
network = {'network': {'name': 'abc',
'shared': False,
+ 'tenant_id': 'tenant_id',
'admin_state_up': True}}
return self.plugin.create_network(self.ctx, network)
def _prepare_ipv6_pd_subnet(self):
subnet = {'subnet': {'network_id': self.network['id'],
+ 'tenant_id': 'tenant_id',
'cidr': None,
'ip_version': 6,
'name': 'ipv6_pd',
import mock
import netaddr
+import webob.exc
+
+from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
+from neutron import context
from neutron.tests import base
from neutron.tests import tools
attr_info, {'key': 1}, {'key': 1})
self.assertRaises(self._EXC_CLS, attributes.convert_value,
attr_info, {'key': 1}, self._EXC_CLS)
+
+ def test_populate_tenant_id(self):
+ tenant_id_1 = uuidutils.generate_uuid()
+ tenant_id_2 = uuidutils.generate_uuid()
+ # apart from the admin, nobody can create a res on behalf of another
+ # tenant
+ ctx = context.Context(user_id=None, tenant_id=tenant_id_1)
+ res_dict = {'tenant_id': tenant_id_2}
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ attributes.populate_tenant_id,
+ ctx, res_dict, None, None)
+ ctx.is_admin = True
+ self.assertIsNone(attributes.populate_tenant_id(ctx, res_dict,
+ None, None))
+
+ # for each create request, the tenant_id should be added to the
+ # req body
+ res_dict2 = {}
+ attributes.populate_tenant_id(ctx, res_dict2, None, True)
+ self.assertEqual({'tenant_id': ctx.tenant_id}, res_dict2)
+
+ # if the tenant_id is mandatory for the resource and not specified
+ # in the request nor in the context, an exception should be raised
+ res_dict3 = {}
+ attr_info = {'tenant_id': {'allow_post': True}, }
+ ctx.tenant_id = None
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ attributes.populate_tenant_id,
+ ctx, res_dict3, attr_info, True)
self._set_net_external(net_id)
router = {'name': 'router1',
'admin_state_up': True,
+ 'tenant_id': 'tenant_id',
'external_gateway_info': {'network_id': net_id},
'distributed': True}
r = self.l3plugin.create_router(
router = {'name': 'router1',
'external_gateway_info': {'network_id': net_id},
+ 'tenant_id': 'tenant_id',
'admin_state_up': True,
'distributed': True}
r = self.l3plugin.create_router(self.adminContext,
self._set_net_external(net_id)
router = {'name': 'router1',
+ 'tenant_id': 'tenant_id',
'admin_state_up': True,
'distributed': True}
r = self.l3plugin.create_router(self.adminContext,
router = {'name': 'router1',
'external_gateway_info': {'network_id': net_id},
+ 'tenant_id': 'tenant_id',
'admin_state_up': True,
'distributed': True}
r = self.l3plugin.create_router(self.adminContext,
router = {'name': 'router1',
'external_gateway_info': {'network_id': net_id},
+ 'tenant_id': 'tenant_id',
'admin_state_up': True,
'distributed': True}
r = self.l3plugin.create_router(self.adminContext,
if ctx is None:
ctx = self.admin_ctx
ctx.tenant_id = tenant_id
- router = {'name': 'router1', 'admin_state_up': True}
+ router = {'name': 'router1',
+ 'admin_state_up': True,
+ 'tenant_id': tenant_id}
if ha is not None:
router['ha'] = ha
if distributed is not None:
plugin = manager.NeutronManager.get_service_plugins()[
service_constants.L3_ROUTER_NAT]
router_req = {'router': {'id': _uuid(), 'name': 'router',
+ 'tenant_id': 'foo',
'admin_state_up': True}}
result = plugin.create_router(context.Context('', 'foo'), router_req)
self.assertEqual(result['id'], router_req['router']['id'])
with self.network() as n:
data = {'router': {
'name': 'router1', 'admin_state_up': True,
+ 'tenant_id': ctx.tenant_id,
'external_gateway_info': {'network_id': n['network']['id']}}}
self.assertRaises(MyException, plugin.create_router, ctx, data)
supported_extension_aliases = ["security-group", "port-security"]
def create_network(self, context, network):
- tenant_id = self._get_tenant_id_for_create(context, network['network'])
+ tenant_id = network['network'].get('tenant_id')
self._ensure_default_security_group(context, tenant_id)
with context.session.begin(subtransactions=True):
neutron_db = super(PortSecurityTestPlugin, self).create_network(
supported_extension_aliases = ["security-group"]
def create_port(self, context, port):
- tenant_id = self._get_tenant_id_for_create(context, port['port'])
+ tenant_id = port['port']['tenant_id']
default_sg = self._ensure_default_security_group(context, tenant_id)
if not attr.is_attr_set(port['port'].get(ext_sg.SECURITYGROUPS)):
port['port'][ext_sg.SECURITYGROUPS] = [default_sg]
return port
def create_network(self, context, network):
- tenant_id = self._get_tenant_id_for_create(context, network['network'])
- self._ensure_default_security_group(context, tenant_id)
+ self._ensure_default_security_group(context,
+ network['network']['tenant_id'])
return super(SecurityGroupTestPlugin, self).create_network(context,
network)
p_const.L3_ROUTER_NAT]
r = plugin.create_router(
self.context,
- {'router': {'name': 'router', 'admin_state_up': True}})
+ {'router': {'name': 'router', 'admin_state_up': True,
+ 'tenant_id': self.context.tenant_id}})
with self.subnet() as s:
p = plugin.add_router_interface(self.context, r['id'],
{'subnet_id': s['subnet']['id']})
def _create_ha_router(self, ha=True, tenant_id='tenant1', az_hints=None):
self.adminContext.tenant_id = tenant_id
- router = {'name': 'router1', 'admin_state_up': True}
+ router = {'name': 'router1', 'admin_state_up': True,
+ 'tenant_id': tenant_id}
if ha is not None:
router['ha'] = ha
if az_hints is None: