Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / api / admin / test_shared_network_extension.py
1 # Copyright 2015 Hewlett-Packard Development Company, L.P.dsvsv
2 # Copyright 2015 OpenStack Foundation
3 # All Rights Reserved.
4 #
5 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
6 #    not use this file except in compliance with the License. You may obtain
7 #    a copy of the License at
8 #
9 #         http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #    Unless required by applicable law or agreed to in writing, software
12 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 #    License for the specific language governing permissions and limitations
15 #    under the License.
16
17 import uuid
18
19 from tempest_lib import exceptions as lib_exc
20 import testtools
21
22 from neutron.tests.api import base
23 from neutron.tests.api import clients
24 from neutron.tests.tempest import config
25 from neutron.tests.tempest import test
26 from tempest_lib.common.utils import data_utils
27
28 CONF = config.CONF
29
30
31 class SharedNetworksTest(base.BaseAdminNetworkTest):
32
33     @classmethod
34     def resource_setup(cls):
35         super(SharedNetworksTest, cls).resource_setup()
36         cls.shared_network = cls.create_shared_network()
37
38     @test.idempotent_id('6661d219-b96d-4597-ad10-55766123421a')
39     def test_filtering_shared_networks(self):
40         # this test is necessary because the 'shared' column does not actually
41         # exist on networks so the filter function has to translate it into
42         # queries against the RBAC table
43         self.create_network()
44         self._check_shared_correct(
45             self.client.list_networks(shared=True)['networks'], True)
46         self._check_shared_correct(
47             self.admin_client.list_networks(shared=True)['networks'], True)
48         self._check_shared_correct(
49             self.client.list_networks(shared=False)['networks'], False)
50         self._check_shared_correct(
51             self.admin_client.list_networks(shared=False)['networks'], False)
52
53     def _check_shared_correct(self, items, shared):
54         self.assertNotEmpty(items)
55         self.assertTrue(all(n['shared'] == shared for n in items))
56
57     @test.idempotent_id('6661d219-b96d-4597-ad10-51672353421a')
58     def test_filtering_shared_subnets(self):
59         # shared subnets need to be tested because their shared status isn't
60         # visible as a regular API attribute and it's solely dependent on the
61         # parent network
62         reg = self.create_network()
63         priv = self.create_subnet(reg, client=self.client)
64         shared = self.create_subnet(self.shared_network,
65                                     client=self.admin_client)
66         self.assertIn(shared, self.client.list_subnets(shared=True)['subnets'])
67         self.assertIn(shared,
68             self.admin_client.list_subnets(shared=True)['subnets'])
69         self.assertNotIn(priv,
70             self.client.list_subnets(shared=True)['subnets'])
71         self.assertNotIn(priv,
72             self.admin_client.list_subnets(shared=True)['subnets'])
73         self.assertIn(priv, self.client.list_subnets(shared=False)['subnets'])
74         self.assertIn(priv,
75             self.admin_client.list_subnets(shared=False)['subnets'])
76         self.assertNotIn(shared,
77             self.client.list_subnets(shared=False)['subnets'])
78         self.assertNotIn(shared,
79             self.admin_client.list_subnets(shared=False)['subnets'])
80
81     @test.idempotent_id('6661d219-b96d-4597-ad10-55766ce4abf7')
82     def test_create_update_shared_network(self):
83         shared_network = self.create_shared_network()
84         net_id = shared_network['id']
85         self.assertEqual('ACTIVE', shared_network['status'])
86         self.assertIsNotNone(shared_network['id'])
87         self.assertTrue(self.shared_network['shared'])
88         new_name = "New_shared_network"
89         body = self.admin_client.update_network(net_id, name=new_name,
90                                                 admin_state_up=False,
91                                                 shared=False)
92         updated_net = body['network']
93         self.assertEqual(new_name, updated_net['name'])
94         self.assertFalse(updated_net['shared'])
95         self.assertFalse(updated_net['admin_state_up'])
96
97     @test.idempotent_id('9c31fabb-0181-464f-9ace-95144fe9ca77')
98     def test_create_port_shared_network_as_non_admin_tenant(self):
99         # create a port as non admin
100         body = self.client.create_port(network_id=self.shared_network['id'])
101         port = body['port']
102         self.addCleanup(self.admin_client.delete_port, port['id'])
103         # verify the tenant id of admin network and non admin port
104         self.assertNotEqual(self.shared_network['tenant_id'],
105                             port['tenant_id'])
106
107     @test.idempotent_id('3e39c4a6-9caf-4710-88f1-d20073c6dd76')
108     def test_create_bulk_shared_network(self):
109         # Creates 2 networks in one request
110         net_nm = [data_utils.rand_name('network'),
111                   data_utils.rand_name('network')]
112         body = self.admin_client.create_bulk_network(net_nm, shared=True)
113         created_networks = body['networks']
114         for net in created_networks:
115             self.addCleanup(self.admin_client.delete_network, net['id'])
116             self.assertIsNotNone(net['id'])
117             self.assertTrue(net['shared'])
118
119     def _list_shared_networks(self, user):
120         body = user.list_networks(shared=True)
121         networks_list = [net['id'] for net in body['networks']]
122         self.assertIn(self.shared_network['id'], networks_list)
123         self.assertTrue(self.shared_network['shared'])
124
125     @test.idempotent_id('a064a9fd-e02f-474a-8159-f828cd636a28')
126     def test_list_shared_networks(self):
127         # List the shared networks and confirm that
128         # shared network extension attribute is returned for those networks
129         # that are created as shared
130         self._list_shared_networks(self.admin_client)
131         self._list_shared_networks(self.client)
132
133     def _show_shared_network(self, user):
134         body = user.show_network(self.shared_network['id'])
135         show_shared_net = body['network']
136         self.assertEqual(self.shared_network['name'], show_shared_net['name'])
137         self.assertEqual(self.shared_network['id'], show_shared_net['id'])
138         self.assertTrue(show_shared_net['shared'])
139
140     @test.idempotent_id('e03c92a2-638d-4bfa-b50a-b1f66f087e58')
141     def test_show_shared_networks_attribute(self):
142         # Show a shared network and confirm that
143         # shared network extension attribute is returned.
144         self._show_shared_network(self.admin_client)
145         self._show_shared_network(self.client)
146
147
148 class AllowedAddressPairSharedNetworkTest(base.BaseAdminNetworkTest):
149     allowed_address_pairs = [{'ip_address': '1.1.1.1'}]
150
151     @classmethod
152     def skip_checks(cls):
153         super(AllowedAddressPairSharedNetworkTest, cls).skip_checks()
154         if not test.is_extension_enabled('allowed-address-pairs', 'network'):
155             msg = "Allowed Address Pairs extension not enabled."
156             raise cls.skipException(msg)
157
158     @classmethod
159     def resource_setup(cls):
160         super(AllowedAddressPairSharedNetworkTest, cls).resource_setup()
161         cls.network = cls.create_shared_network()
162         cls.create_subnet(cls.network, client=cls.admin_client)
163
164     @test.attr(type='smoke')
165     @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff1fff')
166     def test_create_with_address_pair_blocked_on_other_network(self):
167         with testtools.ExpectedException(lib_exc.Forbidden):
168             self.create_port(self.network,
169                              allowed_address_pairs=self.allowed_address_pairs)
170
171     @test.attr(type='smoke')
172     @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff2fff')
173     def test_update_with_address_pair_blocked_on_other_network(self):
174         port = self.create_port(self.network)
175         with testtools.ExpectedException(lib_exc.Forbidden):
176             self.update_port(
177                 port, allowed_address_pairs=self.allowed_address_pairs)
178
179
180 class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
181
182     force_tenant_isolation = True
183
184     @classmethod
185     def resource_setup(cls):
186         super(RBACSharedNetworksTest, cls).resource_setup()
187         if not test.is_extension_enabled('rbac_policies', 'network'):
188             msg = "rbac extension not enabled."
189             raise cls.skipException(msg)
190         creds = cls.isolated_creds.get_alt_creds()
191         cls.client2 = clients.Manager(credentials=creds).network_client
192
193     def _make_admin_net_and_subnet_shared_to_tenant_id(self, tenant_id):
194         net = self.admin_client.create_network(
195             name=data_utils.rand_name('test-network-'))['network']
196         self.addCleanup(self.admin_client.delete_network, net['id'])
197         subnet = self.create_subnet(net, client=self.admin_client)
198         # network is shared to first unprivileged client by default
199         pol = self.admin_client.create_rbac_policy(
200             object_type='network', object_id=net['id'],
201             action='access_as_shared', target_tenant=tenant_id
202         )['rbac_policy']
203         return {'network': net, 'subnet': subnet, 'policy': pol}
204
205     @test.attr(type='smoke')
206     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff1fff')
207     def test_network_only_visible_to_policy_target(self):
208         net = self._make_admin_net_and_subnet_shared_to_tenant_id(
209             self.client.tenant_id)['network']
210         self.client.show_network(net['id'])
211         with testtools.ExpectedException(lib_exc.NotFound):
212             # client2 has not been granted access
213             self.client2.show_network(net['id'])
214
215     @test.attr(type='smoke')
216     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2fff')
217     def test_subnet_on_network_only_visible_to_policy_target(self):
218         sub = self._make_admin_net_and_subnet_shared_to_tenant_id(
219             self.client.tenant_id)['subnet']
220         self.client.show_subnet(sub['id'])
221         with testtools.ExpectedException(lib_exc.NotFound):
222             # client2 has not been granted access
223             self.client2.show_subnet(sub['id'])
224
225     @test.attr(type='smoke')
226     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2eee')
227     def test_policy_target_update(self):
228         res = self._make_admin_net_and_subnet_shared_to_tenant_id(
229             self.client.tenant_id)
230         # change to client2
231         update_res = self.admin_client.update_rbac_policy(
232                 res['policy']['id'], target_tenant=self.client2.tenant_id)
233         self.assertEqual(self.client2.tenant_id,
234                          update_res['rbac_policy']['target_tenant'])
235         # make sure everything else stayed the same
236         res['policy'].pop('target_tenant')
237         update_res['rbac_policy'].pop('target_tenant')
238         self.assertEqual(res['policy'], update_res['rbac_policy'])
239
240     @test.attr(type='smoke')
241     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff3fff')
242     def test_port_presence_prevents_network_rbac_policy_deletion(self):
243         res = self._make_admin_net_and_subnet_shared_to_tenant_id(
244             self.client.tenant_id)
245         port = self.client.create_port(network_id=res['network']['id'])['port']
246         # a port on the network should prevent the deletion of a policy
247         # required for it to exist
248         with testtools.ExpectedException(lib_exc.Conflict):
249             self.admin_client.delete_rbac_policy(res['policy']['id'])
250
251         # a wildcard policy should allow the specific policy to be deleted
252         # since it allows the remaining port
253         wild = self.admin_client.create_rbac_policy(
254             object_type='network', object_id=res['network']['id'],
255             action='access_as_shared', target_tenant='*')['rbac_policy']
256         self.admin_client.delete_rbac_policy(res['policy']['id'])
257
258         # now that wildcard is the only remaining, it should be subjected to
259         # to the same restriction
260         with testtools.ExpectedException(lib_exc.Conflict):
261             self.admin_client.delete_rbac_policy(wild['id'])
262         # similarly, we can't update the policy to a different tenant
263         with testtools.ExpectedException(lib_exc.Conflict):
264             self.admin_client.update_rbac_policy(
265                 wild['id'], target_tenant=self.client2.tenant_id)
266
267         self.client.delete_port(port['id'])
268         # anchor is gone, delete should pass
269         self.admin_client.delete_rbac_policy(wild['id'])
270
271     @test.attr(type='smoke')
272     @test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
273     def test_tenant_can_delete_port_on_own_network(self):
274         # TODO(kevinbenton): make adjustments to the db lookup to
275         # make this work.
276         msg = "Non-admin cannot currently delete other's ports."
277         raise self.skipException(msg)
278         # pylint: disable=unreachable
279         net = self.create_network()  # owned by self.client
280         self.client.create_rbac_policy(
281             object_type='network', object_id=net['id'],
282             action='access_as_shared', target_tenant=self.client2.tenant_id)
283         port = self.client2.create_port(network_id=net['id'])['port']
284         self.client.delete_port(port['id'])
285
286     @test.attr(type='smoke')
287     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff4fff')
288     def test_regular_client_shares_to_another_regular_client(self):
289         net = self.create_network()  # owned by self.client
290         with testtools.ExpectedException(lib_exc.NotFound):
291             self.client2.show_network(net['id'])
292         pol = self.client.create_rbac_policy(
293             object_type='network', object_id=net['id'],
294             action='access_as_shared', target_tenant=self.client2.tenant_id)
295         self.client2.show_network(net['id'])
296
297         self.assertIn(pol['rbac_policy'],
298                       self.client.list_rbac_policies()['rbac_policies'])
299         # ensure that 'client2' can't see the policy sharing the network to it
300         # because the policy belongs to 'client'
301         self.assertNotIn(pol['rbac_policy']['id'],
302             [p['id']
303              for p in self.client2.list_rbac_policies()['rbac_policies']])
304
305     @test.attr(type='smoke')
306     @test.idempotent_id('bf5052b8-b11e-407c-8e43-113447404d3e')
307     def test_filter_fields(self):
308         net = self.create_network()
309         self.client.create_rbac_policy(
310             object_type='network', object_id=net['id'],
311             action='access_as_shared', target_tenant=self.client2.tenant_id)
312         field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
313                       ('tenant_id', 'target_tenant'))
314         for fields in field_args:
315             res = self.client.list_rbac_policies(fields=fields)
316             self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
317
318     @test.attr(type='smoke')
319     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff5fff')
320     def test_policy_show(self):
321         res = self._make_admin_net_and_subnet_shared_to_tenant_id(
322             self.client.tenant_id)
323         p1 = res['policy']
324         p2 = self.admin_client.create_rbac_policy(
325             object_type='network', object_id=res['network']['id'],
326             action='access_as_shared',
327             target_tenant='*')['rbac_policy']
328
329         self.assertEqual(
330             p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
331         self.assertEqual(
332             p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
333
334     @test.attr(type='smoke')
335     @test.idempotent_id('e7bcb1ea-4877-4266-87bb-76f68b421f31')
336     def test_filter_policies(self):
337         net = self.create_network()
338         pol1 = self.client.create_rbac_policy(
339             object_type='network', object_id=net['id'],
340             action='access_as_shared',
341             target_tenant=self.client2.tenant_id)['rbac_policy']
342         pol2 = self.client.create_rbac_policy(
343             object_type='network', object_id=net['id'],
344             action='access_as_shared',
345             target_tenant=self.client.tenant_id)['rbac_policy']
346         res1 = self.client.list_rbac_policies(id=pol1['id'])['rbac_policies']
347         res2 = self.client.list_rbac_policies(id=pol2['id'])['rbac_policies']
348         self.assertEqual(1, len(res1))
349         self.assertEqual(1, len(res2))
350         self.assertEqual(pol1['id'], res1[0]['id'])
351         self.assertEqual(pol2['id'], res2[0]['id'])
352
353     @test.attr(type='smoke')
354     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff6fff')
355     def test_regular_client_blocked_from_sharing_anothers_network(self):
356         net = self._make_admin_net_and_subnet_shared_to_tenant_id(
357             self.client.tenant_id)['network']
358         with testtools.ExpectedException(lib_exc.BadRequest):
359             self.client.create_rbac_policy(
360                 object_type='network', object_id=net['id'],
361                 action='access_as_shared', target_tenant=self.client.tenant_id)
362
363     @test.attr(type='smoke')
364     @test.idempotent_id('c5f8f785-ce8d-4430-af7e-a236205862fb')
365     def test_rbac_policy_quota(self):
366         if not test.is_extension_enabled('quotas', 'network'):
367             msg = "quotas extension not enabled."
368             raise self.skipException(msg)
369         quota = self.client.show_quotas(self.client.tenant_id)['quota']
370         max_policies = quota['rbac_policy']
371         self.assertGreater(max_policies, 0)
372         net = self.client.create_network(
373             name=data_utils.rand_name('test-network-'))['network']
374         self.addCleanup(self.client.delete_network, net['id'])
375         with testtools.ExpectedException(lib_exc.Conflict):
376             for i in range(0, max_policies + 1):
377                 self.admin_client.create_rbac_policy(
378                     object_type='network', object_id=net['id'],
379                     action='access_as_shared',
380                     target_tenant=str(uuid.uuid4()).replace('-', ''))
381
382     @test.attr(type='smoke')
383     @test.idempotent_id('86c3529b-1231-40de-803c-afffffff7fff')
384     def test_regular_client_blocked_from_sharing_with_wildcard(self):
385         net = self.create_network()
386         with testtools.ExpectedException(lib_exc.Forbidden):
387             self.client.create_rbac_policy(
388                 object_type='network', object_id=net['id'],
389                 action='access_as_shared', target_tenant='*')
390         # ensure it works on update as well
391         pol = self.client.create_rbac_policy(
392             object_type='network', object_id=net['id'],
393             action='access_as_shared', target_tenant=self.client2.tenant_id)
394         with testtools.ExpectedException(lib_exc.Forbidden):
395             self.client.update_rbac_policy(pol['rbac_policy']['id'],
396                                            target_tenant='*')
397
398     @test.attr(type='smoke')
399     @test.idempotent_id('86c3529b-1231-40de-803c-aeeeeeee7fff')
400     def test_filtering_works_with_rbac_records_present(self):
401         resp = self._make_admin_net_and_subnet_shared_to_tenant_id(
402             self.client.tenant_id)
403         net = resp['network']['id']
404         sub = resp['subnet']['id']
405         self.admin_client.create_rbac_policy(
406             object_type='network', object_id=net,
407             action='access_as_shared', target_tenant='*')
408         self._assert_shared_object_id_listing_presence('subnets', False, sub)
409         self._assert_shared_object_id_listing_presence('subnets', True, sub)
410         self._assert_shared_object_id_listing_presence('networks', False, net)
411         self._assert_shared_object_id_listing_presence('networks', True, net)
412
413     def _assert_shared_object_id_listing_presence(self, resource, shared, oid):
414         lister = getattr(self.admin_client, 'list_%s' % resource)
415         objects = [o['id'] for o in lister(shared=shared)[resource]]
416         if shared:
417             self.assertIn(oid, objects)
418         else:
419             self.assertNotIn(oid, objects)