action = "delete_%s" % self._resource
# Check authz
- obj = self._item(request, id)
+ obj = self._item(request, id)[self._resource]
try:
policy.enforce(request.context, action, obj)
except exceptions.PolicyNotAuthorized:
action = "update_%s" % self._resource
# Check authz
- orig_obj = self._item(request, id)
+ orig_obj = self._item(request, id)[self._resource]
try:
policy.enforce(request.context, action, orig_obj)
except exceptions.PolicyNotAuthorized:
def _populate_tenant_id(self, context, res_dict, is_create):
- if self._resource not in ['network', 'port']:
- return
-
if (('tenant_id' in res_dict and
res_dict['tenant_id'] != context.tenant_id and
not context.is_admin)):
'admin_state_up': {'allow_post': True, 'allow_put': True,
'default': True},
'status': {'allow_post': False, 'allow_put': False},
- 'tenant_id': {'allow_post': True, 'allow_put': True},
+ 'tenant_id': {'allow_post': True, 'allow_put': False},
},
'ports': {
'id': {'allow_post': False, 'allow_put': False},
'host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
'device_id': {'allow_post': True, 'allow_put': True, 'default': ''},
- 'tenant_id': {'allow_post': True, 'allow_put': True},
+ 'tenant_id': {'allow_post': True, 'allow_put': False},
},
'subnets': {
'id': {'allow_post': False, 'allow_put': False},
'default': ATTR_NOT_SPECIFIED},
'additional_host_routes': {'allow_post': True, 'allow_put': True,
'default': ATTR_NOT_SPECIFIED},
+ 'tenant_id': {'allow_post': True, 'allow_put': False},
}
}
query = context.session.query(model)
# NOTE(jkoelker) non-admin queries are scoped to their tenant_id
- if not context.is_admin and hasattr(model.tenant_id):
- query = query.filter(tenant_id=context.tenant_id)
+ if not context.is_admin and hasattr(model, 'tenant_id'):
+ query = query.filter(model.tenant_id == context.tenant_id)
return query
def _make_subnet_dict(self, subnet, fields=None):
res = {'id': subnet['id'],
+ 'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
'ip_version': subnet['ip_version'],
'cidr': subnet['cidr'],
if s['gateway_ip'] == api_router.ATTR_NOT_SPECIFIED:
s['gateway_ip'] = str(netaddr.IPAddress(net.first + 1))
+ tenant_id = self._get_tenant_id_for_create(context, s)
with context.session.begin():
network = self._get_network(context, s["network_id"])
self._validate_subnet_cidr(network, s['cidr'])
- subnet = models_v2.Subnet(network_id=s['network_id'],
+ subnet = models_v2.Subnet(tenant_id=tenant_id,
+ network_id=s['network_id'],
ip_version=s['ip_version'],
cidr=s['cidr'],
gateway_ip=s['gateway_ip'])
device_id = sa.Column(sa.String(255), nullable=False)
-class Subnet(model_base.BASEV2, HasId):
+class Subnet(model_base.BASEV2, HasId, HasTenant):
"""Represents a quantum subnet.
When a subnet is created the first and last entries will be created. These
# Note: since all resources use the same controller and validation
# logic, we actually get really good coverage from testing just networks.
class JSONV2TestCase(APIv2TestCase):
- def test_list(self):
+
+ def _test_list(self, req_tenant_id, real_tenant_id):
+ env = {}
+ if req_tenant_id:
+ env = {'quantum.context': context.Context('', req_tenant_id)}
input_dict = {'id': str(uuid.uuid4()),
'name': 'net1',
'admin_state_up': True,
'status': "ACTIVE",
- 'tenant_id': str(uuid.uuid4()),
+ 'tenant_id': real_tenant_id,
'subnets': []}
return_value = [input_dict]
instance = self.plugin.return_value
instance.get_networks.return_value = return_value
- res = self.api.get(_get_path('networks'))
+ res = self.api.get(_get_path('networks'), extra_environ=env)
self.assertTrue('networks' in res.json)
- self.assertEqual(len(res.json['networks']), 1)
- output_dict = res.json['networks'][0]
- self.assertEqual(len(input_dict), len(output_dict))
- for k, v in input_dict.iteritems():
- self.assertEqual(v, output_dict[k])
+ if not req_tenant_id or req_tenant_id == real_tenant_id:
+ # expect full list returned
+ self.assertEqual(len(res.json['networks']), 1)
+ output_dict = res.json['networks'][0]
+ self.assertEqual(len(input_dict), len(output_dict))
+ for k, v in input_dict.iteritems():
+ self.assertEqual(v, output_dict[k])
+ else:
+ # expect no results
+ self.assertEqual(len(res.json['networks']), 0)
+
+ def test_list_noauth(self):
+ self._test_list(None, _uuid())
+
+ def test_list_keystone(self):
+ tenant_id = _uuid()
+ self._test_list(tenant_id, tenant_id)
+
+ def test_list_keystone_bad(self):
+ tenant_id = _uuid()
+ self._test_list(tenant_id + "bad", tenant_id)
def test_create(self):
net_id = _uuid()
self.api.get(_get_path('networks', id=str(uuid.uuid4())))
- def test_delete(self):
+ def _test_delete(self, req_tenant_id, real_tenant_id, expected_code,
+ expect_errors=False):
+ env = {}
+ if req_tenant_id:
+ env = {'quantum.context': context.Context('', req_tenant_id)}
instance = self.plugin.return_value
+ instance.get_network.return_value = {'tenant_id': real_tenant_id}
instance.delete_network.return_value = None
- res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())))
- self.assertEqual(res.status_int, exc.HTTPNoContent.code)
+ res = self.api.delete(_get_path('networks', id=str(uuid.uuid4())),
+ extra_environ=env, expect_errors=expect_errors)
+ self.assertEqual(res.status_int, expected_code)
+
+ def test_delete_noauth(self):
+ self._test_delete(None, _uuid(), exc.HTTPNoContent.code)
+
+ def test_delete_keystone(self):
+ tenant_id = _uuid()
+ self._test_delete(tenant_id, tenant_id, exc.HTTPNoContent.code)
+
+ def test_delete_keystone_bad_tenant(self):
+ tenant_id = _uuid()
+ self._test_delete(tenant_id + "bad", tenant_id,
+ exc.HTTPNotFound.code, expect_errors=True)
+
+ def _test_get(self, req_tenant_id, real_tenant_id, expected_code,
+ expect_errors=False):
+ env = {}
+ if req_tenant_id:
+ env = {'quantum.context': context.Context('', req_tenant_id)}
+ data = {'tenant_id': real_tenant_id}
+ instance = self.plugin.return_value
+ instance.get_network.return_value = data
+
+ res = self.api.get(_get_path('networks',
+ id=str(uuid.uuid4())),
+ extra_environ=env,
+ expect_errors=expect_errors)
+ self.assertEqual(res.status_int, expected_code)
+
+ def test_get_noauth(self):
+ self._test_get(None, _uuid(), 200)
- def test_update(self):
+ def test_get_keystone(self):
+ tenant_id = _uuid()
+ self._test_get(tenant_id, tenant_id, 200)
+
+ def test_get_keystone_bad_tenant(self):
+ tenant_id = _uuid()
+ self._test_get(tenant_id + "bad", tenant_id,
+ exc.HTTPNotFound.code, expect_errors=True)
+
+ def _test_update(self, req_tenant_id, real_tenant_id, expected_code,
+ expect_errors=False):
+ env = {}
+ if req_tenant_id:
+ env = {'quantum.context': context.Context('', req_tenant_id)}
# leave out 'name' field intentionally
data = {'network': {'admin_state_up': True}}
return_value = {'subnets': []}
return_value.update(data['network'].copy())
instance = self.plugin.return_value
+ instance.get_network.return_value = {'tenant_id': real_tenant_id}
instance.update_network.return_value = return_value
- self.api.put_json(_get_path('networks',
- id=str(uuid.uuid4())), data)
+ res = self.api.put_json(_get_path('networks',
+ id=str(uuid.uuid4())),
+ data,
+ extra_environ=env,
+ expect_errors=expect_errors)
+ self.assertEqual(res.status_int, expected_code)
+
+ def test_update_noauth(self):
+ self._test_update(None, _uuid(), 200)
+
+ def test_update_keystone(self):
+ tenant_id = _uuid()
+ self._test_update(tenant_id, tenant_id, 200)
+
+ def test_update_keystone_bad_tenant(self):
+ tenant_id = _uuid()
+ self._test_update(tenant_id + "bad", tenant_id,
+ exc.HTTPNotFound.code, expect_errors=True)
def test_update_readonly_field(self):
data = {'network': {'status': "NANANA"}}
from quantum.api.v2.router import APIRouter
from quantum.common import config
from quantum.common import exceptions as q_exc
+from quantum import context
from quantum.db import api as db
from quantum.openstack.common import cfg
from quantum.tests.unit.testlib_api import create_request
network_req = self.new_create_request('networks', data, fmt)
return network_req.get_response(self.api)
- def _create_subnet(self, fmt, net_id, gateway_ip, cidr,
+ def _create_subnet(self, fmt, tenant_id, net_id, gateway_ip, cidr,
allocation_pools=None, ip_version=4):
- data = {'subnet': {'network_id': net_id,
+ data = {'subnet': {'tenant_id': tenant_id,
+ 'network_id': net_id,
'cidr': cidr,
'ip_version': ip_version}}
if gateway_ip:
def _make_subnet(self, fmt, network, gateway, cidr,
allocation_pools=None, ip_version=4):
res = self._create_subnet(fmt,
+ network['network']['tenant_id'],
network['network']['id'],
gateway,
cidr,
self._delete('ports', port['port']['id'])
+class TestBasicGet(QuantumDbPluginV2TestCase):
+
+ def test_single_get_admin(self):
+ plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
+ with self.network() as network:
+ net_id = network['network']['id']
+ ctx = context.get_admin_context()
+ n = plugin._get_network(ctx, net_id)
+ self.assertEqual(net_id, n.id)
+
+ def test_single_get_tenant(self):
+ plugin = quantum.db.db_base_plugin_v2.QuantumDbPluginV2()
+ with self.network() as network:
+ net_id = network['network']['id']
+ ctx = context.get_admin_context()
+ n = plugin._get_network(ctx, net_id)
+ self.assertEqual(net_id, n.id)
+
+
class TestV2HTTPResponse(QuantumDbPluginV2TestCase):
def test_create_returns_201(self):
res = self._create_network('json', 'net2', True)
fmt = 'json'
with self.subnet() as subnet:
# Get a IPv4 and IPv6 address
+ tenant_id = subnet['subnet']['tenant_id']
net_id = subnet['subnet']['network_id']
- res = self._create_subnet(fmt, net_id=net_id,
+ res = self._create_subnet(fmt, tenant_id, net_id=net_id,
cidr='2607:f0d0:1002:51::0/124',
ip_version=6, gateway_ip=None)
subnet2 = self.deserialize(fmt, res)