Set lock_path correctly.
[openstack-build/neutron-build.git] / neutron / tests / unit / api / v2 / test_attributes.py
1 # Copyright 2012 OpenStack Foundation
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 import string
17 import testtools
18
19 import mock
20 import netaddr
21 import webob.exc
22
23 from oslo_utils import uuidutils
24
25 from neutron._i18n import _
26 from neutron.api.v2 import attributes
27 from neutron.common import constants
28 from neutron.common import exceptions as n_exc
29 from neutron import context
30 from neutron.tests import base
31 from neutron.tests import tools
32
33
34 class TestAttributes(base.BaseTestCase):
35
36     def _construct_dict_and_constraints(self):
37         """Constructs a test dictionary and a definition of constraints.
38         :return: A (dictionary, constraint) tuple
39         """
40         constraints = {'key1': {'type:values': ['val1', 'val2'],
41                                 'required': True},
42                        'key2': {'type:string': None,
43                                 'required': False},
44                        'key3': {'type:dict': {'k4': {'type:string': None,
45                                                      'required': True}},
46                                 'required': True}}
47
48         dictionary = {'key1': 'val1',
49                       'key2': 'a string value',
50                       'key3': {'k4': 'a string value'}}
51
52         return dictionary, constraints
53
54     def test_is_attr_set(self):
55         data = attributes.ATTR_NOT_SPECIFIED
56         self.assertIs(attributes.is_attr_set(data), False)
57
58         data = None
59         self.assertIs(attributes.is_attr_set(data), False)
60
61         data = "I'm set"
62         self.assertIs(attributes.is_attr_set(data), True)
63
64     def test_validate_values(self):
65         msg = attributes._validate_values(4, [4, 6])
66         self.assertIsNone(msg)
67
68         msg = attributes._validate_values(4, (4, 6))
69         self.assertIsNone(msg)
70
71         msg = attributes._validate_values(7, [4, 6])
72         self.assertEqual("'7' is not in [4, 6]", msg)
73
74         msg = attributes._validate_values(7, (4, 6))
75         self.assertEqual("'7' is not in (4, 6)", msg)
76
77     def test_validate_not_empty_string(self):
78         msg = attributes._validate_not_empty_string('    ', None)
79         self.assertEqual(u"'    ' Blank strings are not permitted", msg)
80
81     def test_validate_not_empty_string_or_none(self):
82         msg = attributes._validate_not_empty_string_or_none('    ', None)
83         self.assertEqual(u"'    ' Blank strings are not permitted", msg)
84
85         msg = attributes._validate_not_empty_string_or_none(None, None)
86         self.assertIsNone(msg)
87
88     def test_validate_string_or_none(self):
89         msg = attributes._validate_not_empty_string_or_none('test', None)
90         self.assertIsNone(msg)
91
92         msg = attributes._validate_not_empty_string_or_none(None, None)
93         self.assertIsNone(msg)
94
95     def test_validate_string(self):
96         msg = attributes._validate_string(None, None)
97         self.assertEqual("'None' is not a valid string", msg)
98
99         # 0 == len(data) == max_len
100         msg = attributes._validate_string("", 0)
101         self.assertIsNone(msg)
102
103         # 0 == len(data) < max_len
104         msg = attributes._validate_string("", 9)
105         self.assertIsNone(msg)
106
107         # 0 < len(data) < max_len
108         msg = attributes._validate_string("123456789", 10)
109         self.assertIsNone(msg)
110
111         # 0 < len(data) == max_len
112         msg = attributes._validate_string("123456789", 9)
113         self.assertIsNone(msg)
114
115         # 0 < max_len < len(data)
116         msg = attributes._validate_string("1234567890", 9)
117         self.assertEqual("'1234567890' exceeds maximum length of 9", msg)
118
119         msg = attributes._validate_string("123456789", None)
120         self.assertIsNone(msg)
121
122     def test_validate_list_of_unique_strings(self):
123         data = "TEST"
124         msg = attributes.validate_list_of_unique_strings(data, None)
125         self.assertEqual("'TEST' is not a list", msg)
126
127         data = ["TEST01", "TEST02", "TEST01"]
128         msg = attributes.validate_list_of_unique_strings(data, None)
129         self.assertEqual(
130             "Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg)
131
132         data = ["12345678", "123456789"]
133         msg = attributes.validate_list_of_unique_strings(data, 8)
134         self.assertEqual("'123456789' exceeds maximum length of 8", msg)
135
136         data = ["TEST01", "TEST02", "TEST03"]
137         msg = attributes.validate_list_of_unique_strings(data, None)
138         self.assertIsNone(msg)
139
140     def test_validate_no_whitespace(self):
141         data = 'no_white_space'
142         result = attributes._validate_no_whitespace(data)
143         self.assertEqual(data, result)
144
145         self.assertRaises(n_exc.InvalidInput,
146                           attributes._validate_no_whitespace,
147                           'i have whitespace')
148
149         self.assertRaises(n_exc.InvalidInput,
150                           attributes._validate_no_whitespace,
151                           'i\thave\twhitespace')
152
153         for ws in string.whitespace:
154             self.assertRaises(n_exc.InvalidInput,
155                               attributes._validate_no_whitespace,
156                               '%swhitespace-at-head' % ws)
157             self.assertRaises(n_exc.InvalidInput,
158                               attributes._validate_no_whitespace,
159                               'whitespace-at-tail%s' % ws)
160
161     def test_validate_range(self):
162         msg = attributes._validate_range(1, [1, 9])
163         self.assertIsNone(msg)
164
165         msg = attributes._validate_range(5, [1, 9])
166         self.assertIsNone(msg)
167
168         msg = attributes._validate_range(9, [1, 9])
169         self.assertIsNone(msg)
170
171         msg = attributes._validate_range(1, (1, 9))
172         self.assertIsNone(msg)
173
174         msg = attributes._validate_range(5, (1, 9))
175         self.assertIsNone(msg)
176
177         msg = attributes._validate_range(9, (1, 9))
178         self.assertIsNone(msg)
179
180         msg = attributes._validate_range(0, [1, 9])
181         self.assertEqual("'0' is too small - must be at least '1'", msg)
182
183         msg = attributes._validate_range(10, (1, 9))
184         self.assertEqual("'10' is too large - must be no larger than '9'", msg)
185
186         msg = attributes._validate_range("bogus", (1, 9))
187         self.assertEqual("'bogus' is not an integer", msg)
188
189         msg = attributes._validate_range(10, (attributes.UNLIMITED,
190                                               attributes.UNLIMITED))
191         self.assertIsNone(msg)
192
193         msg = attributes._validate_range(10, (1, attributes.UNLIMITED))
194         self.assertIsNone(msg)
195
196         msg = attributes._validate_range(1, (attributes.UNLIMITED, 9))
197         self.assertIsNone(msg)
198
199         msg = attributes._validate_range(-1, (0, attributes.UNLIMITED))
200         self.assertEqual("'-1' is too small - must be at least '0'", msg)
201
202         msg = attributes._validate_range(10, (attributes.UNLIMITED, 9))
203         self.assertEqual("'10' is too large - must be no larger than '9'", msg)
204
205     def _test_validate_mac_address(self, validator, allow_none=False):
206         mac_addr = "ff:16:3e:4f:00:00"
207         msg = validator(mac_addr)
208         self.assertIsNone(msg)
209
210         mac_addr = "ffa:16:3e:4f:00:00"
211         msg = validator(mac_addr)
212         err_msg = "'%s' is not a valid MAC address"
213         self.assertEqual(err_msg % mac_addr, msg)
214
215         for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES:
216             msg = validator(invalid_mac_addr)
217             self.assertEqual(err_msg % invalid_mac_addr, msg)
218
219         mac_addr = "123"
220         msg = validator(mac_addr)
221         self.assertEqual(err_msg % mac_addr, msg)
222
223         mac_addr = None
224         msg = validator(mac_addr)
225         if allow_none:
226             self.assertIsNone(msg)
227         else:
228             self.assertEqual(err_msg % mac_addr, msg)
229
230         mac_addr = "ff:16:3e:4f:00:00\r"
231         msg = validator(mac_addr)
232         self.assertEqual(err_msg % mac_addr, msg)
233
234     def test_validate_mac_address(self):
235         self._test_validate_mac_address(attributes._validate_mac_address)
236
237     def test_validate_mac_address_or_none(self):
238         self._test_validate_mac_address(
239             attributes._validate_mac_address_or_none, allow_none=True)
240
241     def test_validate_ip_address(self):
242         ip_addr = '1.1.1.1'
243         msg = attributes._validate_ip_address(ip_addr)
244         self.assertIsNone(msg)
245
246         ip_addr = '1111.1.1.1'
247         msg = attributes._validate_ip_address(ip_addr)
248         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
249
250         # Depending on platform to run UTs, this case might or might not be
251         # an equivalent to test_validate_ip_address_bsd.
252         ip_addr = '1' * 59
253         msg = attributes._validate_ip_address(ip_addr)
254         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
255
256         ip_addr = '1.1.1.1 has whitespace'
257         msg = attributes._validate_ip_address(ip_addr)
258         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
259
260         ip_addr = '111.1.1.1\twhitespace'
261         msg = attributes._validate_ip_address(ip_addr)
262         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
263
264         ip_addr = '111.1.1.1\nwhitespace'
265         msg = attributes._validate_ip_address(ip_addr)
266         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
267
268         for ws in string.whitespace:
269             ip_addr = '%s111.1.1.1' % ws
270             msg = attributes._validate_ip_address(ip_addr)
271             self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
272
273         for ws in string.whitespace:
274             ip_addr = '111.1.1.1%s' % ws
275             msg = attributes._validate_ip_address(ip_addr)
276             self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
277
278     def test_validate_ip_address_with_leading_zero(self):
279         ip_addr = '1.1.1.01'
280         expected_msg = ("'%(data)s' is not an accepted IP address, "
281                         "'%(ip)s' is recommended")
282         msg = attributes._validate_ip_address(ip_addr)
283         self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'},
284                          msg)
285
286         ip_addr = '1.1.1.011'
287         msg = attributes._validate_ip_address(ip_addr)
288         self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'},
289                          msg)
290
291         ip_addr = '1.1.1.09'
292         msg = attributes._validate_ip_address(ip_addr)
293         self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'},
294                          msg)
295
296         ip_addr = "fe80:0:0:0:0:0:0:0001"
297         msg = attributes._validate_ip_address(ip_addr)
298         self.assertIsNone(msg)
299
300     def test_validate_ip_address_bsd(self):
301         # NOTE(yamamoto):  On NetBSD and OS X, netaddr.IPAddress() accepts
302         # '1' * 59 as a valid address.  The behaviour is inherited from
303         # libc behaviour there.  This test ensures that our validator reject
304         # such addresses on such platforms by mocking netaddr to emulate
305         # the behaviour.
306         ip_addr = '1' * 59
307         with mock.patch('netaddr.IPAddress') as ip_address_cls:
308             msg = attributes._validate_ip_address(ip_addr)
309         ip_address_cls.assert_called_once_with(ip_addr,
310                                                flags=netaddr.core.ZEROFILL)
311         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
312
313     def test_validate_ip_pools(self):
314         pools = [[{'end': '10.0.0.254'}],
315                  [{'start': '10.0.0.254'}],
316                  [{'start': '1000.0.0.254',
317                    'end': '1.1.1.1'}],
318                  [{'start': '10.0.0.2', 'end': '10.0.0.254',
319                    'forza': 'juve'}],
320                  [{'start': '10.0.0.2', 'end': '10.0.0.254'},
321                   {'end': '10.0.0.254'}],
322                  [None],
323                  None]
324         for pool in pools:
325             msg = attributes._validate_ip_pools(pool)
326             self.assertIsNotNone(msg)
327
328         pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'},
329                   {'start': '11.0.0.2', 'end': '11.1.1.1'}],
330                  [{'start': '11.0.0.2', 'end': '11.0.0.100'}]]
331         for pool in pools:
332             msg = attributes._validate_ip_pools(pool)
333             self.assertIsNone(msg)
334
335         invalid_ip = '10.0.0.2\r'
336         pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]]
337         for pool in pools:
338             msg = attributes._validate_ip_pools(pool)
339             self.assertEqual("'%s' is not a valid IP address" % invalid_ip,
340                              msg)
341
342     def test_validate_fixed_ips(self):
343         fixed_ips = [
344             {'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
345                        'ip_address': '1111.1.1.1'}],
346              'error_msg': "'1111.1.1.1' is not a valid IP address"},
347             {'data': [{'subnet_id': 'invalid',
348                        'ip_address': '1.1.1.1'}],
349              'error_msg': "'invalid' is not a valid UUID"},
350             {'data': None,
351              'error_msg': "Invalid data format for fixed IP: 'None'"},
352             {'data': "1.1.1.1",
353              'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"},
354             {'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'],
355              'error_msg': "Invalid data format for fixed IP: "
356                           "'00000000-ffff-ffff-ffff-000000000000'"},
357             {'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']],
358              'error_msg': "Invalid data format for fixed IP: "
359                           "'['00000000-ffff-ffff-ffff-000000000000', "
360                           "'1.1.1.1']'"},
361             {'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
362                        'ip_address': '1.1.1.1'},
363                       {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
364                        'ip_address': '1.1.1.1'}],
365              'error_msg': "Duplicate IP address '1.1.1.1'"}]
366         for fixed in fixed_ips:
367             msg = attributes._validate_fixed_ips(fixed['data'])
368             self.assertEqual(fixed['error_msg'], msg)
369
370         fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
371                        'ip_address': '1.1.1.1'}],
372                      [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
373                        'ip_address': '1.1.1.1'},
374                       {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
375                        'ip_address': '1.1.1.2'}]]
376         for fixed in fixed_ips:
377             msg = attributes._validate_fixed_ips(fixed)
378             self.assertIsNone(msg)
379
380     def test_validate_nameservers(self):
381         ns_pools = [['1.1.1.2', '1.1.1.2'],
382                     ['www.hostname.com', 'www.hostname.com'],
383                     ['1000.0.0.1'],
384                     ['www.hostname.com'],
385                     ['www.great.marathons.to.travel'],
386                     ['valid'],
387                     ['77.hostname.com'],
388                     ['1' * 59],
389                     ['www.internal.hostname.com'],
390                     None]
391
392         for ns in ns_pools:
393             msg = attributes._validate_nameservers(ns, None)
394             self.assertIsNotNone(msg)
395
396         ns_pools = [['100.0.0.2'],
397                     ['1.1.1.1', '1.1.1.2']]
398
399         for ns in ns_pools:
400             msg = attributes._validate_nameservers(ns, None)
401             self.assertIsNone(msg)
402
403     def test_validate_hostroutes(self):
404         hostroute_pools = [[{'destination': '100.0.0.0/24'}],
405                            [{'nexthop': '10.0.2.20'}],
406                            [{'nexthop': '10.0.2.20',
407                              'forza': 'juve',
408                              'destination': '100.0.0.0/8'}],
409                            [{'nexthop': '1110.0.2.20',
410                              'destination': '100.0.0.0/8'}],
411                            [{'nexthop': '10.0.2.20',
412                              'destination': '100.0.0.0'}],
413                            [{'nexthop': '10.0.2.20',
414                              'destination': '100.0.0.0/8'},
415                             {'nexthop': '10.0.2.20',
416                              'destination': '100.0.0.0/8'}],
417                            [None],
418                            None]
419         for host_routes in hostroute_pools:
420             msg = attributes._validate_hostroutes(host_routes, None)
421             self.assertIsNotNone(msg)
422
423         hostroute_pools = [[{'destination': '100.0.0.0/24',
424                              'nexthop': '10.0.2.20'}],
425                            [{'nexthop': '10.0.2.20',
426                              'destination': '100.0.0.0/8'},
427                             {'nexthop': '10.0.2.20',
428                              'destination': '101.0.0.0/8'}]]
429         for host_routes in hostroute_pools:
430             msg = attributes._validate_hostroutes(host_routes, None)
431             self.assertIsNone(msg)
432
433     def test_validate_ip_address_or_none(self):
434         ip_addr = None
435         msg = attributes._validate_ip_address_or_none(ip_addr)
436         self.assertIsNone(msg)
437
438         ip_addr = '1.1.1.1'
439         msg = attributes._validate_ip_address_or_none(ip_addr)
440         self.assertIsNone(msg)
441
442         ip_addr = '1111.1.1.1'
443         msg = attributes._validate_ip_address_or_none(ip_addr)
444         self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
445
446     def test_uuid_pattern(self):
447         data = 'garbage'
448         msg = attributes._validate_regex(data, attributes.UUID_PATTERN)
449         self.assertIsNotNone(msg)
450
451         data = '00000000-ffff-ffff-ffff-000000000000'
452         msg = attributes._validate_regex(data, attributes.UUID_PATTERN)
453         self.assertIsNone(msg)
454
455     def test_mac_pattern(self):
456         # Valid - 3 octets
457         base_mac = "fa:16:3e:00:00:00"
458         msg = attributes._validate_regex(base_mac,
459                                          attributes.MAC_PATTERN)
460         self.assertIsNone(msg)
461
462         # Valid - 4 octets
463         base_mac = "fa:16:3e:4f:00:00"
464         msg = attributes._validate_regex(base_mac,
465                                          attributes.MAC_PATTERN)
466         self.assertIsNone(msg)
467
468         # Invalid - not unicast
469         base_mac = "01:16:3e:4f:00:00"
470         msg = attributes._validate_regex(base_mac,
471                                          attributes.MAC_PATTERN)
472         self.assertIsNotNone(msg)
473
474         # Invalid - invalid format
475         base_mac = "a:16:3e:4f:00:00"
476         msg = attributes._validate_regex(base_mac,
477                                          attributes.MAC_PATTERN)
478         self.assertIsNotNone(msg)
479
480         # Invalid - invalid format
481         base_mac = "ffa:16:3e:4f:00:00"
482         msg = attributes._validate_regex(base_mac,
483                                          attributes.MAC_PATTERN)
484         self.assertIsNotNone(msg)
485
486         # Invalid - invalid format
487         base_mac = "01163e4f0000"
488         msg = attributes._validate_regex(base_mac,
489                                          attributes.MAC_PATTERN)
490         self.assertIsNotNone(msg)
491
492         # Invalid - invalid format
493         base_mac = "01-16-3e-4f-00-00"
494         msg = attributes._validate_regex(base_mac,
495                                          attributes.MAC_PATTERN)
496         self.assertIsNotNone(msg)
497
498         # Invalid - invalid format
499         base_mac = "00:16:3:f:00:00"
500         msg = attributes._validate_regex(base_mac,
501                                          attributes.MAC_PATTERN)
502         self.assertIsNotNone(msg)
503
504         # Invalid - invalid format
505         base_mac = "12:3:4:5:67:89ab"
506         msg = attributes._validate_regex(base_mac,
507                                          attributes.MAC_PATTERN)
508         self.assertIsNotNone(msg)
509
510     def _test_validate_subnet(self, validator, allow_none=False):
511         # Valid - IPv4
512         cidr = "10.0.2.0/24"
513         msg = validator(cidr, None)
514         self.assertIsNone(msg)
515
516         # Valid - IPv6 without final octets
517         cidr = "fe80::/24"
518         msg = validator(cidr, None)
519         self.assertIsNone(msg)
520
521         # Valid - IPv6 with final octets
522         cidr = "fe80::/24"
523         msg = validator(cidr, None)
524         self.assertIsNone(msg)
525
526         # Valid - uncompressed ipv6 address
527         cidr = "fe80:0:0:0:0:0:0:0/128"
528         msg = validator(cidr, None)
529         self.assertIsNone(msg)
530
531         # Valid - ipv6 address with multiple consecutive zero
532         cidr = "2001:0db8:0:0:1::1/128"
533         msg = validator(cidr, None)
534         self.assertIsNone(msg)
535
536         # Valid - ipv6 address with multiple consecutive zero
537         cidr = "2001:0db8::1:0:0:1/128"
538         msg = validator(cidr, None)
539         self.assertIsNone(msg)
540
541         # Valid - ipv6 address with multiple consecutive zero
542         cidr = "2001::0:1:0:0:1100/120"
543         msg = validator(cidr, None)
544         self.assertIsNone(msg)
545
546         # Invalid - abbreviated ipv4 address
547         cidr = "10/24"
548         msg = validator(cidr, None)
549         error = _("'%(data)s' isn't a recognized IP subnet cidr,"
550                   " '%(cidr)s' is recommended") % {"data": cidr,
551                                                    "cidr": "10.0.0.0/24"}
552         self.assertEqual(error, msg)
553
554         # Invalid - IPv4 missing mask
555         cidr = "10.0.2.0"
556         msg = validator(cidr, None)
557         error = _("'%(data)s' isn't a recognized IP subnet cidr,"
558                   " '%(cidr)s' is recommended") % {"data": cidr,
559                                                    "cidr": "10.0.2.0/32"}
560         self.assertEqual(error, msg)
561
562         # Valid - IPv4 with non-zero masked bits is ok
563         for i in range(1, 255):
564             cidr = "192.168.1.%s/24" % i
565             msg = validator(cidr, None)
566             self.assertIsNone(msg)
567
568         # Invalid - IPv6 without final octets, missing mask
569         cidr = "fe80::"
570         msg = validator(cidr, None)
571         error = _("'%(data)s' isn't a recognized IP subnet cidr,"
572                   " '%(cidr)s' is recommended") % {"data": cidr,
573                                                    "cidr": "fe80::/128"}
574         self.assertEqual(error, msg)
575
576         # Invalid - IPv6 with final octets, missing mask
577         cidr = "fe80::0"
578         msg = validator(cidr, None)
579         error = _("'%(data)s' isn't a recognized IP subnet cidr,"
580                   " '%(cidr)s' is recommended") % {"data": cidr,
581                                                    "cidr": "fe80::/128"}
582         self.assertEqual(error, msg)
583
584         # Invalid - Address format error
585         cidr = 'invalid'
586         msg = validator(cidr, None)
587         error = "'%s' is not a valid IP subnet" % cidr
588         self.assertEqual(error, msg)
589
590         cidr = None
591         msg = validator(cidr, None)
592         if allow_none:
593             self.assertIsNone(msg)
594         else:
595             error = "'%s' is not a valid IP subnet" % cidr
596             self.assertEqual(error, msg)
597
598         # Invalid - IPv4 with trailing CR
599         cidr = "10.0.2.0/24\r"
600         msg = validator(cidr, None)
601         error = "'%s' is not a valid IP subnet" % cidr
602         self.assertEqual(error, msg)
603
604     def test_validate_subnet(self):
605         self._test_validate_subnet(attributes._validate_subnet)
606
607     def test_validate_subnet_or_none(self):
608         self._test_validate_subnet(attributes._validate_subnet_or_none,
609                                    allow_none=True)
610
611     def _test_validate_regex(self, validator, allow_none=False):
612         pattern = '[hc]at'
613
614         data = None
615         msg = validator(data, pattern)
616         if allow_none:
617             self.assertIsNone(msg)
618         else:
619             self.assertEqual("'None' is not a valid input", msg)
620
621         data = 'bat'
622         msg = validator(data, pattern)
623         self.assertEqual("'%s' is not a valid input" % data, msg)
624
625         data = 'hat'
626         msg = validator(data, pattern)
627         self.assertIsNone(msg)
628
629         data = 'cat'
630         msg = validator(data, pattern)
631         self.assertIsNone(msg)
632
633     def test_validate_regex(self):
634         self._test_validate_regex(attributes._validate_regex)
635
636     def test_validate_regex_or_none(self):
637         self._test_validate_regex(attributes._validate_regex_or_none,
638                                   allow_none=True)
639
640     def test_validate_uuid(self):
641         invalid_uuids = [None,
642                          123,
643                          '123',
644                          't5069610-744b-42a7-8bd8-ceac1a229cd4',
645                          'e5069610-744bb-42a7-8bd8-ceac1a229cd4']
646         for uuid in invalid_uuids:
647             msg = attributes._validate_uuid(uuid)
648             error = "'%s' is not a valid UUID" % uuid
649             self.assertEqual(error, msg)
650
651         msg = attributes._validate_uuid('00000000-ffff-ffff-ffff-000000000000')
652         self.assertIsNone(msg)
653
654     def test__validate_list_of_items(self):
655         # check not a list
656         items = [None,
657                  123,
658                  'e5069610-744b-42a7-8bd8-ceac1a229cd4',
659                  '12345678123456781234567812345678',
660                  {'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}]
661         for item in items:
662             msg = attributes._validate_list_of_items(mock.Mock(), item)
663             error = "'%s' is not a list" % item
664             self.assertEqual(error, msg)
665
666         # check duplicate items in a list
667         duplicate_items = ['e5069610-744b-42a7-8bd8-ceac1a229cd4',
668                            'f3eeab00-8367-4524-b662-55e64d4cacb5',
669                            'e5069610-744b-42a7-8bd8-ceac1a229cd4']
670         msg = attributes._validate_list_of_items(mock.Mock(), duplicate_items)
671         error = ("Duplicate items in the list: "
672                  "'%s'" % ', '.join(duplicate_items))
673         self.assertEqual(error, msg)
674
675         # check valid lists
676         valid_lists = [[],
677                        [1, 2, 3],
678                        ['a', 'b', 'c']]
679         for list_obj in valid_lists:
680             msg = attributes._validate_list_of_items(
681                 mock.Mock(return_value=None), list_obj)
682             self.assertIsNone(msg)
683
684     def test_validate_dict_type(self):
685         for value in (None, True, '1', []):
686             self.assertEqual("'%s' is not a dictionary" % value,
687                              attributes._validate_dict(value))
688
689     def test_validate_dict_without_constraints(self):
690         msg = attributes._validate_dict({})
691         self.assertIsNone(msg)
692
693         # Validate a dictionary without constraints.
694         msg = attributes._validate_dict({'key': 'value'})
695         self.assertIsNone(msg)
696
697     def test_validate_a_valid_dict_with_constraints(self):
698         dictionary, constraints = self._construct_dict_and_constraints()
699
700         msg = attributes._validate_dict(dictionary, constraints)
701         self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
702
703     def test_validate_dict_with_invalid_validator(self):
704         dictionary, constraints = self._construct_dict_and_constraints()
705
706         constraints['key1'] = {'type:unsupported': None, 'required': True}
707         msg = attributes._validate_dict(dictionary, constraints)
708         self.assertEqual("Validator 'type:unsupported' does not exist.", msg)
709
710     def test_validate_dict_not_required_keys(self):
711         dictionary, constraints = self._construct_dict_and_constraints()
712
713         del dictionary['key2']
714         msg = attributes._validate_dict(dictionary, constraints)
715         self.assertIsNone(msg, 'Field that was not required by the specs was'
716                                'required by the validator.')
717
718     def test_validate_dict_required_keys(self):
719         dictionary, constraints = self._construct_dict_and_constraints()
720
721         del dictionary['key1']
722         msg = attributes._validate_dict(dictionary, constraints)
723         self.assertIn('Expected keys:', msg)
724
725     def test_validate_dict_wrong_values(self):
726         dictionary, constraints = self._construct_dict_and_constraints()
727
728         dictionary['key1'] = 'UNSUPPORTED'
729         msg = attributes._validate_dict(dictionary, constraints)
730         self.assertIsNotNone(msg)
731
732     def test_validate_dict_convert_boolean(self):
733         dictionary, constraints = self._construct_dict_and_constraints()
734
735         constraints['key_bool'] = {
736             'type:boolean': None,
737             'required': False,
738             'convert_to': attributes.convert_to_boolean}
739         dictionary['key_bool'] = 'true'
740         msg = attributes._validate_dict(dictionary, constraints)
741         self.assertIsNone(msg)
742         # Explicitly comparing with literal 'True' as assertTrue
743         # succeeds also for 'true'
744         self.assertIs(True, dictionary['key_bool'])
745
746     def test_subdictionary(self):
747         dictionary, constraints = self._construct_dict_and_constraints()
748
749         del dictionary['key3']['k4']
750         dictionary['key3']['k5'] = 'a string value'
751         msg = attributes._validate_dict(dictionary, constraints)
752         self.assertIn('Expected keys:', msg)
753
754     def test_validate_dict_or_none(self):
755         dictionary, constraints = self._construct_dict_and_constraints()
756
757         # Check whether None is a valid value.
758         msg = attributes._validate_dict_or_none(None, constraints)
759         self.assertIsNone(msg, 'Validation of a None dictionary failed.')
760
761         # Check validation of a regular dictionary.
762         msg = attributes._validate_dict_or_none(dictionary, constraints)
763         self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
764
765     def test_validate_dict_or_empty(self):
766         dictionary, constraints = self._construct_dict_and_constraints()
767
768         # Check whether an empty dictionary is valid.
769         msg = attributes._validate_dict_or_empty({}, constraints)
770         self.assertIsNone(msg, 'Validation of a None dictionary failed.')
771
772         # Check validation of a regular dictionary.
773         msg = attributes._validate_dict_or_none(dictionary, constraints)
774         self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
775         self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
776
777     def test_validate_non_negative(self):
778         for value in (-1, '-2'):
779             self.assertEqual("'%s' should be non-negative" % value,
780                              attributes._validate_non_negative(value))
781
782         for value in (0, 1, '2', True, False):
783             msg = attributes._validate_non_negative(value)
784             self.assertIsNone(msg)
785
786
787 class TestConvertToBoolean(base.BaseTestCase):
788
789     def test_convert_to_boolean_bool(self):
790         self.assertIs(attributes.convert_to_boolean(True), True)
791         self.assertIs(attributes.convert_to_boolean(False), False)
792
793     def test_convert_to_boolean_int(self):
794         self.assertIs(attributes.convert_to_boolean(0), False)
795         self.assertIs(attributes.convert_to_boolean(1), True)
796         self.assertRaises(n_exc.InvalidInput,
797                           attributes.convert_to_boolean,
798                           7)
799
800     def test_convert_to_boolean_str(self):
801         self.assertIs(attributes.convert_to_boolean('True'), True)
802         self.assertIs(attributes.convert_to_boolean('true'), True)
803         self.assertIs(attributes.convert_to_boolean('False'), False)
804         self.assertIs(attributes.convert_to_boolean('false'), False)
805         self.assertIs(attributes.convert_to_boolean('0'), False)
806         self.assertIs(attributes.convert_to_boolean('1'), True)
807         self.assertRaises(n_exc.InvalidInput,
808                           attributes.convert_to_boolean,
809                           '7')
810
811
812 class TestConvertToInt(base.BaseTestCase):
813
814     def test_convert_to_int_int(self):
815         self.assertEqual(-1, attributes.convert_to_int(-1))
816         self.assertEqual(0, attributes.convert_to_int(0))
817         self.assertEqual(1, attributes.convert_to_int(1))
818
819     def test_convert_to_int_if_not_none(self):
820         self.assertEqual(-1, attributes.convert_to_int_if_not_none(-1))
821         self.assertEqual(0, attributes.convert_to_int_if_not_none(0))
822         self.assertEqual(1, attributes.convert_to_int_if_not_none(1))
823         self.assertIsNone(attributes.convert_to_int_if_not_none(None))
824
825     def test_convert_to_int_str(self):
826         self.assertEqual(4, attributes.convert_to_int('4'))
827         self.assertEqual(6, attributes.convert_to_int('6'))
828         self.assertRaises(n_exc.InvalidInput,
829                           attributes.convert_to_int,
830                           'garbage')
831
832     def test_convert_to_int_none(self):
833         self.assertRaises(n_exc.InvalidInput,
834                           attributes.convert_to_int,
835                           None)
836
837     def test_convert_none_to_empty_list_none(self):
838         self.assertEqual([], attributes.convert_none_to_empty_list(None))
839
840     def test_convert_none_to_empty_dict(self):
841         self.assertEqual({}, attributes.convert_none_to_empty_dict(None))
842
843     def test_convert_none_to_empty_list_value(self):
844         values = ['1', 3, [], [1], {}, {'a': 3}]
845         for value in values:
846             self.assertEqual(
847                 value, attributes.convert_none_to_empty_list(value))
848
849
850 class TestConvertToFloat(base.BaseTestCase):
851     # NOTE: the routine being tested here is a plugin-specific extension
852     # module. As the plugin split proceed towards its second phase this
853     # test should either be remove, or the validation routine moved into
854     # neutron.api.v2.attributes
855
856     def test_convert_to_float_positve_value(self):
857         self.assertEqual(
858             1.111, attributes.convert_to_positive_float_or_none(1.111))
859         self.assertEqual(1, attributes.convert_to_positive_float_or_none(1))
860         self.assertEqual(0, attributes.convert_to_positive_float_or_none(0))
861
862     def test_convert_to_float_negative_value(self):
863         self.assertRaises(n_exc.InvalidInput,
864                           attributes.convert_to_positive_float_or_none,
865                           -1.11)
866
867     def test_convert_to_float_string(self):
868         self.assertEqual(4, attributes.convert_to_positive_float_or_none('4'))
869         self.assertEqual(
870             4.44, attributes.convert_to_positive_float_or_none('4.44'))
871         self.assertRaises(n_exc.InvalidInput,
872                           attributes.convert_to_positive_float_or_none,
873                           'garbage')
874
875     def test_convert_to_float_none_value(self):
876         self.assertIsNone(attributes.convert_to_positive_float_or_none(None))
877
878
879 class TestConvertKvp(base.BaseTestCase):
880
881     def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
882         result = attributes.convert_kvp_list_to_dict(['True'])
883         self.assertEqual({}, result)
884
885     def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self):
886         result = attributes.convert_kvp_list_to_dict(
887             ['a=b', 'a=c', 'a=c', 'b=a'])
888         expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']}
889         self.assertEqual(expected, result)
890
891     def test_convert_kvp_list_to_dict_succeeds_for_values(self):
892         result = attributes.convert_kvp_list_to_dict(['a=b', 'c=d'])
893         self.assertEqual({'a': ['b'], 'c': ['d']}, result)
894
895     def test_convert_kvp_str_to_list_fails_for_missing_key(self):
896         with testtools.ExpectedException(n_exc.InvalidInput):
897             attributes.convert_kvp_str_to_list('=a')
898
899     def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
900         with testtools.ExpectedException(n_exc.InvalidInput):
901             attributes.convert_kvp_str_to_list('a')
902
903     def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
904         result = attributes.convert_kvp_str_to_list('a=')
905         self.assertEqual(['a', ''], result)
906
907     def test_convert_kvp_str_to_list_succeeds_for_two_equals(self):
908         result = attributes.convert_kvp_str_to_list('a=a=a')
909         self.assertEqual(['a', 'a=a'], result)
910
911
912 class TestConvertToList(base.BaseTestCase):
913
914     def test_convert_to_empty_list(self):
915         for item in (None, [], (), {}):
916             self.assertEqual([], attributes.convert_to_list(item))
917
918     def test_convert_to_list_string(self):
919         for item in ('', 'foo'):
920             self.assertEqual([item], attributes.convert_to_list(item))
921
922     def test_convert_to_list_iterable(self):
923         for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']):
924             self.assertEqual(list(item), attributes.convert_to_list(item))
925
926     def test_convert_to_list_non_iterable(self):
927         for item in (True, False, 1, 1.2, object()):
928             self.assertEqual([item], attributes.convert_to_list(item))
929
930
931 class TestResDict(base.BaseTestCase):
932     class _MyException(Exception):
933         pass
934     _EXC_CLS = _MyException
935
936     def _test_fill_default_value(self, attr_info, expected, res_dict):
937         attributes.fill_default_value(attr_info, res_dict)
938         self.assertEqual(expected, res_dict)
939
940     def test_fill_default_value(self):
941         attr_info = {
942             'key': {
943                 'allow_post': True,
944                 'default': attributes.ATTR_NOT_SPECIFIED,
945             },
946         }
947         self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
948         self._test_fill_default_value(
949             attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
950
951         attr_info = {
952             'key': {
953                 'allow_post': True,
954             },
955         }
956         self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
957         self.assertRaises(ValueError, self._test_fill_default_value,
958                           attr_info, {'key': 'X'}, {})
959         self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
960                           attr_info, {}, self._EXC_CLS)
961         attr_info = {
962             'key': {
963                 'allow_post': False,
964             },
965         }
966         self.assertRaises(ValueError, self._test_fill_default_value,
967                           attr_info, {'key': 'X'}, {'key': 'X'})
968         self._test_fill_default_value(attr_info, {}, {})
969         self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
970                           attr_info, {'key': 'X'}, self._EXC_CLS)
971
972     def _test_convert_value(self, attr_info, expected, res_dict):
973         attributes.convert_value(attr_info, res_dict)
974         self.assertEqual(expected, res_dict)
975
976     def test_convert_value(self):
977         attr_info = {
978             'key': {
979             },
980         }
981         self._test_convert_value(attr_info,
982                                  {'key': attributes.ATTR_NOT_SPECIFIED},
983                                  {'key': attributes.ATTR_NOT_SPECIFIED})
984         self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
985         self._test_convert_value(attr_info,
986                                  {'other_key': 'X'}, {'other_key': 'X'})
987
988         attr_info = {
989             'key': {
990                 'convert_to': attributes.convert_to_int,
991             },
992         }
993         self._test_convert_value(attr_info,
994                                  {'key': attributes.ATTR_NOT_SPECIFIED},
995                                  {'key': attributes.ATTR_NOT_SPECIFIED})
996         self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
997         self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
998         self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
999                           attr_info, {'key': 1}, {'key': 'a'})
1000
1001         attr_info = {
1002             'key': {
1003                 'validate': {'type:uuid': None},
1004             },
1005         }
1006         self._test_convert_value(attr_info,
1007                                  {'key': attributes.ATTR_NOT_SPECIFIED},
1008                                  {'key': attributes.ATTR_NOT_SPECIFIED})
1009         uuid_str = '01234567-1234-1234-1234-1234567890ab'
1010         self._test_convert_value(attr_info,
1011                                  {'key': uuid_str}, {'key': uuid_str})
1012         self.assertRaises(ValueError, self._test_convert_value,
1013                           attr_info, {'key': 1}, {'key': 1})
1014         self.assertRaises(self._EXC_CLS, attributes.convert_value,
1015                           attr_info, {'key': 1}, self._EXC_CLS)
1016
1017     def test_populate_tenant_id(self):
1018         tenant_id_1 = uuidutils.generate_uuid()
1019         tenant_id_2 = uuidutils.generate_uuid()
1020         # apart from the admin, nobody can create a res on behalf of another
1021         # tenant
1022         ctx = context.Context(user_id=None, tenant_id=tenant_id_1)
1023         res_dict = {'tenant_id': tenant_id_2}
1024         self.assertRaises(webob.exc.HTTPBadRequest,
1025                           attributes.populate_tenant_id,
1026                           ctx, res_dict, None, None)
1027         ctx.is_admin = True
1028         self.assertIsNone(attributes.populate_tenant_id(ctx, res_dict,
1029                                                         None, None))
1030
1031         # for each create request, the tenant_id should be added to the
1032         # req body
1033         res_dict2 = {}
1034         attributes.populate_tenant_id(ctx, res_dict2, None, True)
1035         self.assertEqual({'tenant_id': ctx.tenant_id}, res_dict2)
1036
1037         # if the tenant_id is mandatory for the resource and not specified
1038         # in the request nor in the context, an exception should be raised
1039         res_dict3 = {}
1040         attr_info = {'tenant_id': {'allow_post': True}, }
1041         ctx.tenant_id = None
1042         self.assertRaises(webob.exc.HTTPBadRequest,
1043                           attributes.populate_tenant_id,
1044                           ctx, res_dict3, attr_info, True)
1045
1046
1047 class TestHelpers(base.DietTestCase):
1048
1049     def _verify_port_attributes(self, attrs):
1050         for test_attribute in ('id', 'name', 'mac_address', 'network_id',
1051                                'tenant_id', 'fixed_ips', 'status'):
1052             self.assertIn(test_attribute, attrs)
1053
1054     def test_get_collection_info(self):
1055         attrs = attributes.get_collection_info('ports')
1056         self._verify_port_attributes(attrs)
1057
1058     def test_get_collection_info_missing(self):
1059         self.assertFalse(attributes.get_collection_info('meh'))
1060
1061     def test_get_resource_info(self):
1062         attributes.REVERSED_PLURALS.pop('port', None)
1063         attrs = attributes.get_resource_info('port')
1064         self._verify_port_attributes(attrs)
1065         # verify side effect
1066         self.assertIn('port', attributes.REVERSED_PLURALS)
1067
1068     def test_get_resource_info_missing(self):
1069         self.assertFalse(attributes.get_resource_info('meh'))
1070
1071     def test_get_resource_info_cached(self):
1072         with mock.patch('neutron.api.v2.attributes.PLURALS') as mock_plurals:
1073             attributes.REVERSED_PLURALS['port'] = 'ports'
1074             attrs = attributes.get_resource_info('port')
1075             self._verify_port_attributes(attrs)
1076         self.assertEqual(0, mock_plurals.items.call_count)