Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / api / test_ports.py
1 # Copyright 2014 OpenStack Foundation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 import socket
17
18 import netaddr
19 from tempest_lib.common.utils import data_utils
20
21 from neutron.tests.api import base
22 from neutron.tests.api import base_security_groups as sec_base
23 from neutron.tests.tempest.common import custom_matchers
24 from neutron.tests.tempest import config
25 from neutron.tests.tempest import test
26
27 CONF = config.CONF
28
29
30 class PortsTestJSON(sec_base.BaseSecGroupTest):
31
32     """
33     Test the following operations for ports:
34
35         port create
36         port delete
37         port list
38         port show
39         port update
40     """
41
42     @classmethod
43     def resource_setup(cls):
44         super(PortsTestJSON, cls).resource_setup()
45         cls.network = cls.create_network()
46         cls.port = cls.create_port(cls.network)
47
48     def _delete_port(self, port_id):
49         self.client.delete_port(port_id)
50         body = self.client.list_ports()
51         ports_list = body['ports']
52         self.assertNotIn(port_id, [n['id'] for n in ports_list])
53
54     @test.attr(type='smoke')
55     @test.idempotent_id('c72c1c0c-2193-4aca-aaa4-b1442640f51c')
56     def test_create_update_delete_port(self):
57         # Verify port creation
58         body = self.client.create_port(network_id=self.network['id'])
59         port = body['port']
60         # Schedule port deletion with verification upon test completion
61         self.addCleanup(self._delete_port, port['id'])
62         self.assertTrue(port['admin_state_up'])
63         # Verify port update
64         new_name = "New_Port"
65         body = self.client.update_port(port['id'],
66                                        name=new_name,
67                                        admin_state_up=False)
68         updated_port = body['port']
69         self.assertEqual(updated_port['name'], new_name)
70         self.assertFalse(updated_port['admin_state_up'])
71
72     @test.idempotent_id('67f1b811-f8db-43e2-86bd-72c074d4a42c')
73     def test_create_bulk_port(self):
74         network1 = self.network
75         name = data_utils.rand_name('network-')
76         network2 = self.create_network(network_name=name)
77         network_list = [network1['id'], network2['id']]
78         port_list = [{'network_id': net_id} for net_id in network_list]
79         body = self.client.create_bulk_port(port_list)
80         created_ports = body['ports']
81         port1 = created_ports[0]
82         port2 = created_ports[1]
83         self.addCleanup(self._delete_port, port1['id'])
84         self.addCleanup(self._delete_port, port2['id'])
85         self.assertEqual(port1['network_id'], network1['id'])
86         self.assertEqual(port2['network_id'], network2['id'])
87         self.assertTrue(port1['admin_state_up'])
88         self.assertTrue(port2['admin_state_up'])
89
90     @classmethod
91     def _get_ipaddress_from_tempest_conf(cls):
92         """Return first subnet gateway for configured CIDR """
93         if cls._ip_version == 4:
94             cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
95
96         elif cls._ip_version == 6:
97             cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
98
99         return netaddr.IPAddress(cidr)
100
101     @test.attr(type='smoke')
102     @test.idempotent_id('0435f278-40ae-48cb-a404-b8a087bc09b1')
103     def test_create_port_in_allowed_allocation_pools(self):
104         network = self.create_network()
105         net_id = network['id']
106         address = self._get_ipaddress_from_tempest_conf()
107         allocation_pools = {'allocation_pools': [{'start': str(address + 4),
108                                                   'end': str(address + 6)}]}
109         subnet = self.create_subnet(network, **allocation_pools)
110         self.addCleanup(self.client.delete_subnet, subnet['id'])
111         body = self.client.create_port(network_id=net_id)
112         self.addCleanup(self.client.delete_port, body['port']['id'])
113         port = body['port']
114         ip_address = port['fixed_ips'][0]['ip_address']
115         start_ip_address = allocation_pools['allocation_pools'][0]['start']
116         end_ip_address = allocation_pools['allocation_pools'][0]['end']
117         ip_range = netaddr.IPRange(start_ip_address, end_ip_address)
118         self.assertIn(ip_address, ip_range)
119
120     @test.attr(type='smoke')
121     @test.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f')
122     def test_show_port(self):
123         # Verify the details of port
124         body = self.client.show_port(self.port['id'])
125         port = body['port']
126         self.assertIn('id', port)
127         # TODO(Santosh)- This is a temporary workaround to compare create_port
128         # and show_port dict elements.Remove this once extra_dhcp_opts issue
129         # gets fixed in neutron.( bug - 1365341.)
130         self.assertThat(self.port,
131                         custom_matchers.MatchesDictExceptForKeys
132                         (port, excluded_keys=['extra_dhcp_opts']))
133
134     @test.attr(type='smoke')
135     @test.idempotent_id('45fcdaf2-dab0-4c13-ac6c-fcddfb579dbd')
136     def test_show_port_fields(self):
137         # Verify specific fields of a port
138         fields = ['id', 'mac_address']
139         body = self.client.show_port(self.port['id'],
140                                      fields=fields)
141         port = body['port']
142         self.assertEqual(sorted(port.keys()), sorted(fields))
143         for field_name in fields:
144             self.assertEqual(port[field_name], self.port[field_name])
145
146     @test.attr(type='smoke')
147     @test.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e')
148     def test_list_ports(self):
149         # Verify the port exists in the list of all ports
150         body = self.client.list_ports()
151         ports = [port['id'] for port in body['ports']
152                  if port['id'] == self.port['id']]
153         self.assertNotEmpty(ports, "Created port not found in the list")
154
155     @test.attr(type='smoke')
156     @test.idempotent_id('5ad01ed0-0e6e-4c5d-8194-232801b15c72')
157     def test_port_list_filter_by_router_id(self):
158         # Create a router
159         network = self.create_network()
160         self.addCleanup(self.client.delete_network, network['id'])
161         subnet = self.create_subnet(network)
162         self.addCleanup(self.client.delete_subnet, subnet['id'])
163         router = self.create_router(data_utils.rand_name('router-'))
164         self.addCleanup(self.client.delete_router, router['id'])
165         port = self.client.create_port(network_id=network['id'])
166         # Add router interface to port created above
167         self.client.add_router_interface_with_port_id(
168             router['id'], port['port']['id'])
169         self.addCleanup(self.client.remove_router_interface_with_port_id,
170                         router['id'], port['port']['id'])
171         # List ports filtered by router_id
172         port_list = self.client.list_ports(device_id=router['id'])
173         ports = port_list['ports']
174         self.assertEqual(len(ports), 1)
175         self.assertEqual(ports[0]['id'], port['port']['id'])
176         self.assertEqual(ports[0]['device_id'], router['id'])
177
178     @test.attr(type='smoke')
179     @test.idempotent_id('ff7f117f-f034-4e0e-abff-ccef05c454b4')
180     def test_list_ports_fields(self):
181         # Verify specific fields of ports
182         fields = ['id', 'mac_address']
183         body = self.client.list_ports(fields=fields)
184         ports = body['ports']
185         self.assertNotEmpty(ports, "Port list returned is empty")
186         # Asserting the fields returned are correct
187         for port in ports:
188             self.assertEqual(sorted(fields), sorted(port.keys()))
189
190     @test.attr(type='smoke')
191     @test.idempotent_id('63aeadd4-3b49-427f-a3b1-19ca81f06270')
192     def test_create_update_port_with_second_ip(self):
193         # Create a network with two subnets
194         network = self.create_network()
195         self.addCleanup(self.client.delete_network, network['id'])
196         subnet_1 = self.create_subnet(network)
197         self.addCleanup(self.client.delete_subnet, subnet_1['id'])
198         subnet_2 = self.create_subnet(network)
199         self.addCleanup(self.client.delete_subnet, subnet_2['id'])
200         fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
201         fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
202
203         fixed_ips = fixed_ip_1 + fixed_ip_2
204
205         # Create a port with multiple IP addresses
206         port = self.create_port(network,
207                                 fixed_ips=fixed_ips)
208         self.addCleanup(self.client.delete_port, port['id'])
209         self.assertEqual(2, len(port['fixed_ips']))
210         check_fixed_ips = [subnet_1['id'], subnet_2['id']]
211         for item in port['fixed_ips']:
212             self.assertIn(item['subnet_id'], check_fixed_ips)
213
214         # Update the port to return to a single IP address
215         port = self.update_port(port, fixed_ips=fixed_ip_1)
216         self.assertEqual(1, len(port['fixed_ips']))
217
218         # Update the port with a second IP address from second subnet
219         port = self.update_port(port, fixed_ips=fixed_ips)
220         self.assertEqual(2, len(port['fixed_ips']))
221
222     def _update_port_with_security_groups(self, security_groups_names):
223         subnet_1 = self.create_subnet(self.network)
224         self.addCleanup(self.client.delete_subnet, subnet_1['id'])
225         fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
226
227         security_groups_list = list()
228         for name in security_groups_names:
229             group_create_body = self.client.create_security_group(
230                 name=name)
231             self.addCleanup(self.client.delete_security_group,
232                             group_create_body['security_group']['id'])
233             security_groups_list.append(group_create_body['security_group']
234                                         ['id'])
235         # Create a port
236         sec_grp_name = data_utils.rand_name('secgroup')
237         security_group = self.client.create_security_group(name=sec_grp_name)
238         self.addCleanup(self.client.delete_security_group,
239                         security_group['security_group']['id'])
240         post_body = {
241             "name": data_utils.rand_name('port-'),
242             "security_groups": [security_group['security_group']['id']],
243             "network_id": self.network['id'],
244             "admin_state_up": True,
245             "fixed_ips": fixed_ip_1}
246         body = self.client.create_port(**post_body)
247         self.addCleanup(self.client.delete_port, body['port']['id'])
248         port = body['port']
249
250         # Update the port with security groups
251         subnet_2 = self.create_subnet(self.network)
252         fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
253         update_body = {"name": data_utils.rand_name('port-'),
254                        "admin_state_up": False,
255                        "fixed_ips": fixed_ip_2,
256                        "security_groups": security_groups_list}
257         body = self.client.update_port(port['id'], **update_body)
258         port_show = body['port']
259         # Verify the security groups and other attributes updated to port
260         exclude_keys = set(port_show).symmetric_difference(update_body)
261         exclude_keys.add('fixed_ips')
262         exclude_keys.add('security_groups')
263         self.assertThat(port_show, custom_matchers.MatchesDictExceptForKeys(
264                         update_body, exclude_keys))
265         self.assertEqual(fixed_ip_2[0]['subnet_id'],
266                          port_show['fixed_ips'][0]['subnet_id'])
267
268         for security_group in security_groups_list:
269             self.assertIn(security_group, port_show['security_groups'])
270
271     @test.attr(type='smoke')
272     @test.idempotent_id('58091b66-4ff4-4cc1-a549-05d60c7acd1a')
273     def test_update_port_with_security_group_and_extra_attributes(self):
274         self._update_port_with_security_groups(
275             [data_utils.rand_name('secgroup')])
276
277     @test.attr(type='smoke')
278     @test.idempotent_id('edf6766d-3d40-4621-bc6e-2521a44c257d')
279     def test_update_port_with_two_security_groups_and_extra_attributes(self):
280         self._update_port_with_security_groups(
281             [data_utils.rand_name('secgroup'),
282              data_utils.rand_name('secgroup')])
283
284     @test.attr(type='smoke')
285     @test.idempotent_id('13e95171-6cbd-489c-9d7c-3f9c58215c18')
286     def test_create_show_delete_port_user_defined_mac(self):
287         # Create a port for a legal mac
288         body = self.client.create_port(network_id=self.network['id'])
289         old_port = body['port']
290         free_mac_address = old_port['mac_address']
291         self.client.delete_port(old_port['id'])
292         # Create a new port with user defined mac
293         body = self.client.create_port(network_id=self.network['id'],
294                                        mac_address=free_mac_address)
295         self.addCleanup(self.client.delete_port, body['port']['id'])
296         port = body['port']
297         body = self.client.show_port(port['id'])
298         show_port = body['port']
299         self.assertEqual(free_mac_address,
300                          show_port['mac_address'])
301
302     @test.attr(type='smoke')
303     @test.idempotent_id('4179dcb9-1382-4ced-84fe-1b91c54f5735')
304     def test_create_port_with_no_securitygroups(self):
305         network = self.create_network()
306         self.addCleanup(self.client.delete_network, network['id'])
307         subnet = self.create_subnet(network)
308         self.addCleanup(self.client.delete_subnet, subnet['id'])
309         port = self.create_port(network, security_groups=[])
310         self.addCleanup(self.client.delete_port, port['id'])
311         self.assertIsNotNone(port['security_groups'])
312         self.assertEmpty(port['security_groups'])
313
314
315 class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest):
316
317     @classmethod
318     def resource_setup(cls):
319         super(PortsAdminExtendedAttrsTestJSON, cls).resource_setup()
320         cls.identity_client = cls._get_identity_admin_client()
321         cls.tenant = cls.identity_client.get_tenant_by_name(
322             CONF.identity.tenant_name)
323         cls.network = cls.create_network()
324         cls.host_id = socket.gethostname()
325
326     @test.attr(type='smoke')
327     @test.idempotent_id('8e8569c1-9ac7-44db-8bc1-f5fb2814f29b')
328     def test_create_port_binding_ext_attr(self):
329         post_body = {"network_id": self.network['id'],
330                      "binding:host_id": self.host_id}
331         body = self.admin_client.create_port(**post_body)
332         port = body['port']
333         self.addCleanup(self.admin_client.delete_port, port['id'])
334         host_id = port['binding:host_id']
335         self.assertIsNotNone(host_id)
336         self.assertEqual(self.host_id, host_id)
337
338     @test.attr(type='smoke')
339     @test.idempotent_id('6f6c412c-711f-444d-8502-0ac30fbf5dd5')
340     def test_update_port_binding_ext_attr(self):
341         post_body = {"network_id": self.network['id']}
342         body = self.admin_client.create_port(**post_body)
343         port = body['port']
344         self.addCleanup(self.admin_client.delete_port, port['id'])
345         update_body = {"binding:host_id": self.host_id}
346         body = self.admin_client.update_port(port['id'], **update_body)
347         updated_port = body['port']
348         host_id = updated_port['binding:host_id']
349         self.assertIsNotNone(host_id)
350         self.assertEqual(self.host_id, host_id)
351
352     @test.attr(type='smoke')
353     @test.idempotent_id('1c82a44a-6c6e-48ff-89e1-abe7eaf8f9f8')
354     def test_list_ports_binding_ext_attr(self):
355         # Create a new port
356         post_body = {"network_id": self.network['id']}
357         body = self.admin_client.create_port(**post_body)
358         port = body['port']
359         self.addCleanup(self.admin_client.delete_port, port['id'])
360
361         # Update the port's binding attributes so that is now 'bound'
362         # to a host
363         update_body = {"binding:host_id": self.host_id}
364         self.admin_client.update_port(port['id'], **update_body)
365
366         # List all ports, ensure new port is part of list and its binding
367         # attributes are set and accurate
368         body = self.admin_client.list_ports()
369         ports_list = body['ports']
370         pids_list = [p['id'] for p in ports_list]
371         self.assertIn(port['id'], pids_list)
372         listed_port = [p for p in ports_list if p['id'] == port['id']]
373         self.assertEqual(1, len(listed_port),
374                          'Multiple ports listed with id %s in ports listing: '
375                          '%s' % (port['id'], ports_list))
376         self.assertEqual(self.host_id, listed_port[0]['binding:host_id'])
377
378     @test.attr(type='smoke')
379     @test.idempotent_id('b54ac0ff-35fc-4c79-9ca3-c7dbd4ea4f13')
380     def test_show_port_binding_ext_attr(self):
381         body = self.admin_client.create_port(network_id=self.network['id'])
382         port = body['port']
383         self.addCleanup(self.admin_client.delete_port, port['id'])
384         body = self.admin_client.show_port(port['id'])
385         show_port = body['port']
386         self.assertEqual(port['binding:host_id'],
387                          show_port['binding:host_id'])
388         self.assertEqual(port['binding:vif_type'],
389                          show_port['binding:vif_type'])
390         self.assertEqual(port['binding:vif_details'],
391                          show_port['binding:vif_details'])
392
393
394 class PortsIpV6TestJSON(PortsTestJSON):
395     _ip_version = 6
396     _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
397     _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
398
399
400 class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON):
401     _ip_version = 6
402     _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
403     _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits