1 # Copyright 2015 Hewlett-Packard Development Company, L.P.dsvsv
2 # Copyright 2015 OpenStack Foundation
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
19 from tempest_lib import exceptions as lib_exc
22 from neutron.tests.api import base
23 from neutron.tests.api import clients
24 from neutron.tests.tempest import config
25 from neutron.tests.tempest import test
26 from tempest_lib.common.utils import data_utils
31 class SharedNetworksTest(base.BaseAdminNetworkTest):
34 def resource_setup(cls):
35 super(SharedNetworksTest, cls).resource_setup()
36 cls.shared_network = cls.create_shared_network()
38 @test.idempotent_id('6661d219-b96d-4597-ad10-55766123421a')
39 def test_filtering_shared_networks(self):
40 # this test is necessary because the 'shared' column does not actually
41 # exist on networks so the filter function has to translate it into
42 # queries against the RBAC table
44 self._check_shared_correct(
45 self.client.list_networks(shared=True)['networks'], True)
46 self._check_shared_correct(
47 self.admin_client.list_networks(shared=True)['networks'], True)
48 self._check_shared_correct(
49 self.client.list_networks(shared=False)['networks'], False)
50 self._check_shared_correct(
51 self.admin_client.list_networks(shared=False)['networks'], False)
53 def _check_shared_correct(self, items, shared):
54 self.assertNotEmpty(items)
55 self.assertTrue(all(n['shared'] == shared for n in items))
57 @test.idempotent_id('6661d219-b96d-4597-ad10-51672353421a')
58 def test_filtering_shared_subnets(self):
59 # shared subnets need to be tested because their shared status isn't
60 # visible as a regular API attribute and it's solely dependent on the
62 reg = self.create_network()
63 priv = self.create_subnet(reg, client=self.client)
64 shared = self.create_subnet(self.shared_network,
65 client=self.admin_client)
66 self.assertIn(shared, self.client.list_subnets(shared=True)['subnets'])
68 self.admin_client.list_subnets(shared=True)['subnets'])
69 self.assertNotIn(priv,
70 self.client.list_subnets(shared=True)['subnets'])
71 self.assertNotIn(priv,
72 self.admin_client.list_subnets(shared=True)['subnets'])
73 self.assertIn(priv, self.client.list_subnets(shared=False)['subnets'])
75 self.admin_client.list_subnets(shared=False)['subnets'])
76 self.assertNotIn(shared,
77 self.client.list_subnets(shared=False)['subnets'])
78 self.assertNotIn(shared,
79 self.admin_client.list_subnets(shared=False)['subnets'])
81 @test.idempotent_id('6661d219-b96d-4597-ad10-55766ce4abf7')
82 def test_create_update_shared_network(self):
83 shared_network = self.create_shared_network()
84 net_id = shared_network['id']
85 self.assertEqual('ACTIVE', shared_network['status'])
86 self.assertIsNotNone(shared_network['id'])
87 self.assertTrue(self.shared_network['shared'])
88 new_name = "New_shared_network"
89 body = self.admin_client.update_network(net_id, name=new_name,
92 updated_net = body['network']
93 self.assertEqual(new_name, updated_net['name'])
94 self.assertFalse(updated_net['shared'])
95 self.assertFalse(updated_net['admin_state_up'])
97 @test.idempotent_id('9c31fabb-0181-464f-9ace-95144fe9ca77')
98 def test_create_port_shared_network_as_non_admin_tenant(self):
99 # create a port as non admin
100 body = self.client.create_port(network_id=self.shared_network['id'])
102 self.addCleanup(self.admin_client.delete_port, port['id'])
103 # verify the tenant id of admin network and non admin port
104 self.assertNotEqual(self.shared_network['tenant_id'],
107 @test.idempotent_id('3e39c4a6-9caf-4710-88f1-d20073c6dd76')
108 def test_create_bulk_shared_network(self):
109 # Creates 2 networks in one request
110 net_nm = [data_utils.rand_name('network'),
111 data_utils.rand_name('network')]
112 body = self.admin_client.create_bulk_network(net_nm, shared=True)
113 created_networks = body['networks']
114 for net in created_networks:
115 self.addCleanup(self.admin_client.delete_network, net['id'])
116 self.assertIsNotNone(net['id'])
117 self.assertTrue(net['shared'])
119 def _list_shared_networks(self, user):
120 body = user.list_networks(shared=True)
121 networks_list = [net['id'] for net in body['networks']]
122 self.assertIn(self.shared_network['id'], networks_list)
123 self.assertTrue(self.shared_network['shared'])
125 @test.idempotent_id('a064a9fd-e02f-474a-8159-f828cd636a28')
126 def test_list_shared_networks(self):
127 # List the shared networks and confirm that
128 # shared network extension attribute is returned for those networks
129 # that are created as shared
130 self._list_shared_networks(self.admin_client)
131 self._list_shared_networks(self.client)
133 def _show_shared_network(self, user):
134 body = user.show_network(self.shared_network['id'])
135 show_shared_net = body['network']
136 self.assertEqual(self.shared_network['name'], show_shared_net['name'])
137 self.assertEqual(self.shared_network['id'], show_shared_net['id'])
138 self.assertTrue(show_shared_net['shared'])
140 @test.idempotent_id('e03c92a2-638d-4bfa-b50a-b1f66f087e58')
141 def test_show_shared_networks_attribute(self):
142 # Show a shared network and confirm that
143 # shared network extension attribute is returned.
144 self._show_shared_network(self.admin_client)
145 self._show_shared_network(self.client)
148 class AllowedAddressPairSharedNetworkTest(base.BaseAdminNetworkTest):
149 allowed_address_pairs = [{'ip_address': '1.1.1.1'}]
152 def skip_checks(cls):
153 super(AllowedAddressPairSharedNetworkTest, cls).skip_checks()
154 if not test.is_extension_enabled('allowed-address-pairs', 'network'):
155 msg = "Allowed Address Pairs extension not enabled."
156 raise cls.skipException(msg)
159 def resource_setup(cls):
160 super(AllowedAddressPairSharedNetworkTest, cls).resource_setup()
161 cls.network = cls.create_shared_network()
162 cls.create_subnet(cls.network, client=cls.admin_client)
164 @test.attr(type='smoke')
165 @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff1fff')
166 def test_create_with_address_pair_blocked_on_other_network(self):
167 with testtools.ExpectedException(lib_exc.Forbidden):
168 self.create_port(self.network,
169 allowed_address_pairs=self.allowed_address_pairs)
171 @test.attr(type='smoke')
172 @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff2fff')
173 def test_update_with_address_pair_blocked_on_other_network(self):
174 port = self.create_port(self.network)
175 with testtools.ExpectedException(lib_exc.Forbidden):
177 port, allowed_address_pairs=self.allowed_address_pairs)
180 class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
182 force_tenant_isolation = True
185 def resource_setup(cls):
186 super(RBACSharedNetworksTest, cls).resource_setup()
187 if not test.is_extension_enabled('rbac_policies', 'network'):
188 msg = "rbac extension not enabled."
189 raise cls.skipException(msg)
190 creds = cls.isolated_creds.get_alt_creds()
191 cls.client2 = clients.Manager(credentials=creds).network_client
193 def _make_admin_net_and_subnet_shared_to_tenant_id(self, tenant_id):
194 net = self.admin_client.create_network(
195 name=data_utils.rand_name('test-network-'))['network']
196 self.addCleanup(self.admin_client.delete_network, net['id'])
197 subnet = self.create_subnet(net, client=self.admin_client)
198 # network is shared to first unprivileged client by default
199 pol = self.admin_client.create_rbac_policy(
200 object_type='network', object_id=net['id'],
201 action='access_as_shared', target_tenant=tenant_id
203 return {'network': net, 'subnet': subnet, 'policy': pol}
205 @test.attr(type='smoke')
206 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff1fff')
207 def test_network_only_visible_to_policy_target(self):
208 net = self._make_admin_net_and_subnet_shared_to_tenant_id(
209 self.client.tenant_id)['network']
210 self.client.show_network(net['id'])
211 with testtools.ExpectedException(lib_exc.NotFound):
212 # client2 has not been granted access
213 self.client2.show_network(net['id'])
215 @test.attr(type='smoke')
216 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2fff')
217 def test_subnet_on_network_only_visible_to_policy_target(self):
218 sub = self._make_admin_net_and_subnet_shared_to_tenant_id(
219 self.client.tenant_id)['subnet']
220 self.client.show_subnet(sub['id'])
221 with testtools.ExpectedException(lib_exc.NotFound):
222 # client2 has not been granted access
223 self.client2.show_subnet(sub['id'])
225 @test.attr(type='smoke')
226 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2eee')
227 def test_policy_target_update(self):
228 res = self._make_admin_net_and_subnet_shared_to_tenant_id(
229 self.client.tenant_id)
231 update_res = self.admin_client.update_rbac_policy(
232 res['policy']['id'], target_tenant=self.client2.tenant_id)
233 self.assertEqual(self.client2.tenant_id,
234 update_res['rbac_policy']['target_tenant'])
235 # make sure everything else stayed the same
236 res['policy'].pop('target_tenant')
237 update_res['rbac_policy'].pop('target_tenant')
238 self.assertEqual(res['policy'], update_res['rbac_policy'])
240 @test.attr(type='smoke')
241 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff3fff')
242 def test_port_presence_prevents_network_rbac_policy_deletion(self):
243 res = self._make_admin_net_and_subnet_shared_to_tenant_id(
244 self.client.tenant_id)
245 port = self.client.create_port(network_id=res['network']['id'])['port']
246 # a port on the network should prevent the deletion of a policy
247 # required for it to exist
248 with testtools.ExpectedException(lib_exc.Conflict):
249 self.admin_client.delete_rbac_policy(res['policy']['id'])
251 # a wildcard policy should allow the specific policy to be deleted
252 # since it allows the remaining port
253 wild = self.admin_client.create_rbac_policy(
254 object_type='network', object_id=res['network']['id'],
255 action='access_as_shared', target_tenant='*')['rbac_policy']
256 self.admin_client.delete_rbac_policy(res['policy']['id'])
258 # now that wildcard is the only remaining, it should be subjected to
259 # to the same restriction
260 with testtools.ExpectedException(lib_exc.Conflict):
261 self.admin_client.delete_rbac_policy(wild['id'])
262 # similarly, we can't update the policy to a different tenant
263 with testtools.ExpectedException(lib_exc.Conflict):
264 self.admin_client.update_rbac_policy(
265 wild['id'], target_tenant=self.client2.tenant_id)
267 self.client.delete_port(port['id'])
268 # anchor is gone, delete should pass
269 self.admin_client.delete_rbac_policy(wild['id'])
271 @test.attr(type='smoke')
272 @test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
273 def test_tenant_can_delete_port_on_own_network(self):
274 # TODO(kevinbenton): make adjustments to the db lookup to
276 msg = "Non-admin cannot currently delete other's ports."
277 raise self.skipException(msg)
278 # pylint: disable=unreachable
279 net = self.create_network() # owned by self.client
280 self.client.create_rbac_policy(
281 object_type='network', object_id=net['id'],
282 action='access_as_shared', target_tenant=self.client2.tenant_id)
283 port = self.client2.create_port(network_id=net['id'])['port']
284 self.client.delete_port(port['id'])
286 @test.attr(type='smoke')
287 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff4fff')
288 def test_regular_client_shares_to_another_regular_client(self):
289 net = self.create_network() # owned by self.client
290 with testtools.ExpectedException(lib_exc.NotFound):
291 self.client2.show_network(net['id'])
292 pol = self.client.create_rbac_policy(
293 object_type='network', object_id=net['id'],
294 action='access_as_shared', target_tenant=self.client2.tenant_id)
295 self.client2.show_network(net['id'])
297 self.assertIn(pol['rbac_policy'],
298 self.client.list_rbac_policies()['rbac_policies'])
299 # ensure that 'client2' can't see the policy sharing the network to it
300 # because the policy belongs to 'client'
301 self.assertNotIn(pol['rbac_policy']['id'],
303 for p in self.client2.list_rbac_policies()['rbac_policies']])
305 @test.attr(type='smoke')
306 @test.idempotent_id('bf5052b8-b11e-407c-8e43-113447404d3e')
307 def test_filter_fields(self):
308 net = self.create_network()
309 self.client.create_rbac_policy(
310 object_type='network', object_id=net['id'],
311 action='access_as_shared', target_tenant=self.client2.tenant_id)
312 field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
313 ('tenant_id', 'target_tenant'))
314 for fields in field_args:
315 res = self.client.list_rbac_policies(fields=fields)
316 self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
318 @test.attr(type='smoke')
319 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff5fff')
320 def test_policy_show(self):
321 res = self._make_admin_net_and_subnet_shared_to_tenant_id(
322 self.client.tenant_id)
324 p2 = self.admin_client.create_rbac_policy(
325 object_type='network', object_id=res['network']['id'],
326 action='access_as_shared',
327 target_tenant='*')['rbac_policy']
330 p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
332 p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
334 @test.attr(type='smoke')
335 @test.idempotent_id('e7bcb1ea-4877-4266-87bb-76f68b421f31')
336 def test_filter_policies(self):
337 net = self.create_network()
338 pol1 = self.client.create_rbac_policy(
339 object_type='network', object_id=net['id'],
340 action='access_as_shared',
341 target_tenant=self.client2.tenant_id)['rbac_policy']
342 pol2 = self.client.create_rbac_policy(
343 object_type='network', object_id=net['id'],
344 action='access_as_shared',
345 target_tenant=self.client.tenant_id)['rbac_policy']
346 res1 = self.client.list_rbac_policies(id=pol1['id'])['rbac_policies']
347 res2 = self.client.list_rbac_policies(id=pol2['id'])['rbac_policies']
348 self.assertEqual(1, len(res1))
349 self.assertEqual(1, len(res2))
350 self.assertEqual(pol1['id'], res1[0]['id'])
351 self.assertEqual(pol2['id'], res2[0]['id'])
353 @test.attr(type='smoke')
354 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff6fff')
355 def test_regular_client_blocked_from_sharing_anothers_network(self):
356 net = self._make_admin_net_and_subnet_shared_to_tenant_id(
357 self.client.tenant_id)['network']
358 with testtools.ExpectedException(lib_exc.BadRequest):
359 self.client.create_rbac_policy(
360 object_type='network', object_id=net['id'],
361 action='access_as_shared', target_tenant=self.client.tenant_id)
363 @test.attr(type='smoke')
364 @test.idempotent_id('c5f8f785-ce8d-4430-af7e-a236205862fb')
365 def test_rbac_policy_quota(self):
366 if not test.is_extension_enabled('quotas', 'network'):
367 msg = "quotas extension not enabled."
368 raise self.skipException(msg)
369 quota = self.client.show_quotas(self.client.tenant_id)['quota']
370 max_policies = quota['rbac_policy']
371 self.assertGreater(max_policies, 0)
372 net = self.client.create_network(
373 name=data_utils.rand_name('test-network-'))['network']
374 self.addCleanup(self.client.delete_network, net['id'])
375 with testtools.ExpectedException(lib_exc.Conflict):
376 for i in range(0, max_policies + 1):
377 self.admin_client.create_rbac_policy(
378 object_type='network', object_id=net['id'],
379 action='access_as_shared',
380 target_tenant=str(uuid.uuid4()).replace('-', ''))
382 @test.attr(type='smoke')
383 @test.idempotent_id('86c3529b-1231-40de-803c-afffffff7fff')
384 def test_regular_client_blocked_from_sharing_with_wildcard(self):
385 net = self.create_network()
386 with testtools.ExpectedException(lib_exc.Forbidden):
387 self.client.create_rbac_policy(
388 object_type='network', object_id=net['id'],
389 action='access_as_shared', target_tenant='*')
390 # ensure it works on update as well
391 pol = self.client.create_rbac_policy(
392 object_type='network', object_id=net['id'],
393 action='access_as_shared', target_tenant=self.client2.tenant_id)
394 with testtools.ExpectedException(lib_exc.Forbidden):
395 self.client.update_rbac_policy(pol['rbac_policy']['id'],
398 @test.attr(type='smoke')
399 @test.idempotent_id('86c3529b-1231-40de-803c-aeeeeeee7fff')
400 def test_filtering_works_with_rbac_records_present(self):
401 resp = self._make_admin_net_and_subnet_shared_to_tenant_id(
402 self.client.tenant_id)
403 net = resp['network']['id']
404 sub = resp['subnet']['id']
405 self.admin_client.create_rbac_policy(
406 object_type='network', object_id=net,
407 action='access_as_shared', target_tenant='*')
408 self._assert_shared_object_id_listing_presence('subnets', False, sub)
409 self._assert_shared_object_id_listing_presence('subnets', True, sub)
410 self._assert_shared_object_id_listing_presence('networks', False, net)
411 self._assert_shared_object_id_listing_presence('networks', True, net)
413 def _assert_shared_object_id_listing_presence(self, resource, shared, oid):
414 lister = getattr(self.admin_client, 'list_%s' % resource)
415 objects = [o['id'] for o in lister(shared=shared)[resource]]
417 self.assertIn(oid, objects)
419 self.assertNotIn(oid, objects)