1 # Copyright 2012 OpenStack Foundation
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
23 from oslo_utils import uuidutils
25 from neutron._i18n import _
26 from neutron.api.v2 import attributes
27 from neutron.common import constants
28 from neutron.common import exceptions as n_exc
29 from neutron import context
30 from neutron.tests import base
31 from neutron.tests import tools
34 class TestAttributes(base.BaseTestCase):
36 def _construct_dict_and_constraints(self):
37 """Constructs a test dictionary and a definition of constraints.
38 :return: A (dictionary, constraint) tuple
40 constraints = {'key1': {'type:values': ['val1', 'val2'],
42 'key2': {'type:string': None,
44 'key3': {'type:dict': {'k4': {'type:string': None,
48 dictionary = {'key1': 'val1',
49 'key2': 'a string value',
50 'key3': {'k4': 'a string value'}}
52 return dictionary, constraints
54 def test_is_attr_set(self):
55 data = attributes.ATTR_NOT_SPECIFIED
56 self.assertIs(attributes.is_attr_set(data), False)
59 self.assertIs(attributes.is_attr_set(data), False)
62 self.assertIs(attributes.is_attr_set(data), True)
64 def test_validate_values(self):
65 msg = attributes._validate_values(4, [4, 6])
66 self.assertIsNone(msg)
68 msg = attributes._validate_values(4, (4, 6))
69 self.assertIsNone(msg)
71 msg = attributes._validate_values(7, [4, 6])
72 self.assertEqual("'7' is not in [4, 6]", msg)
74 msg = attributes._validate_values(7, (4, 6))
75 self.assertEqual("'7' is not in (4, 6)", msg)
77 def test_validate_not_empty_string(self):
78 msg = attributes._validate_not_empty_string(' ', None)
79 self.assertEqual(u"' ' Blank strings are not permitted", msg)
81 def test_validate_not_empty_string_or_none(self):
82 msg = attributes._validate_not_empty_string_or_none(' ', None)
83 self.assertEqual(u"' ' Blank strings are not permitted", msg)
85 msg = attributes._validate_not_empty_string_or_none(None, None)
86 self.assertIsNone(msg)
88 def test_validate_string_or_none(self):
89 msg = attributes._validate_not_empty_string_or_none('test', None)
90 self.assertIsNone(msg)
92 msg = attributes._validate_not_empty_string_or_none(None, None)
93 self.assertIsNone(msg)
95 def test_validate_string(self):
96 msg = attributes._validate_string(None, None)
97 self.assertEqual("'None' is not a valid string", msg)
99 # 0 == len(data) == max_len
100 msg = attributes._validate_string("", 0)
101 self.assertIsNone(msg)
103 # 0 == len(data) < max_len
104 msg = attributes._validate_string("", 9)
105 self.assertIsNone(msg)
107 # 0 < len(data) < max_len
108 msg = attributes._validate_string("123456789", 10)
109 self.assertIsNone(msg)
111 # 0 < len(data) == max_len
112 msg = attributes._validate_string("123456789", 9)
113 self.assertIsNone(msg)
115 # 0 < max_len < len(data)
116 msg = attributes._validate_string("1234567890", 9)
117 self.assertEqual("'1234567890' exceeds maximum length of 9", msg)
119 msg = attributes._validate_string("123456789", None)
120 self.assertIsNone(msg)
122 def test_validate_list_of_unique_strings(self):
124 msg = attributes.validate_list_of_unique_strings(data, None)
125 self.assertEqual("'TEST' is not a list", msg)
127 data = ["TEST01", "TEST02", "TEST01"]
128 msg = attributes.validate_list_of_unique_strings(data, None)
130 "Duplicate items in the list: 'TEST01, TEST02, TEST01'", msg)
132 data = ["12345678", "123456789"]
133 msg = attributes.validate_list_of_unique_strings(data, 8)
134 self.assertEqual("'123456789' exceeds maximum length of 8", msg)
136 data = ["TEST01", "TEST02", "TEST03"]
137 msg = attributes.validate_list_of_unique_strings(data, None)
138 self.assertIsNone(msg)
140 def test_validate_no_whitespace(self):
141 data = 'no_white_space'
142 result = attributes._validate_no_whitespace(data)
143 self.assertEqual(data, result)
145 self.assertRaises(n_exc.InvalidInput,
146 attributes._validate_no_whitespace,
149 self.assertRaises(n_exc.InvalidInput,
150 attributes._validate_no_whitespace,
151 'i\thave\twhitespace')
153 for ws in string.whitespace:
154 self.assertRaises(n_exc.InvalidInput,
155 attributes._validate_no_whitespace,
156 '%swhitespace-at-head' % ws)
157 self.assertRaises(n_exc.InvalidInput,
158 attributes._validate_no_whitespace,
159 'whitespace-at-tail%s' % ws)
161 def test_validate_range(self):
162 msg = attributes._validate_range(1, [1, 9])
163 self.assertIsNone(msg)
165 msg = attributes._validate_range(5, [1, 9])
166 self.assertIsNone(msg)
168 msg = attributes._validate_range(9, [1, 9])
169 self.assertIsNone(msg)
171 msg = attributes._validate_range(1, (1, 9))
172 self.assertIsNone(msg)
174 msg = attributes._validate_range(5, (1, 9))
175 self.assertIsNone(msg)
177 msg = attributes._validate_range(9, (1, 9))
178 self.assertIsNone(msg)
180 msg = attributes._validate_range(0, [1, 9])
181 self.assertEqual("'0' is too small - must be at least '1'", msg)
183 msg = attributes._validate_range(10, (1, 9))
184 self.assertEqual("'10' is too large - must be no larger than '9'", msg)
186 msg = attributes._validate_range("bogus", (1, 9))
187 self.assertEqual("'bogus' is not an integer", msg)
189 msg = attributes._validate_range(10, (attributes.UNLIMITED,
190 attributes.UNLIMITED))
191 self.assertIsNone(msg)
193 msg = attributes._validate_range(10, (1, attributes.UNLIMITED))
194 self.assertIsNone(msg)
196 msg = attributes._validate_range(1, (attributes.UNLIMITED, 9))
197 self.assertIsNone(msg)
199 msg = attributes._validate_range(-1, (0, attributes.UNLIMITED))
200 self.assertEqual("'-1' is too small - must be at least '0'", msg)
202 msg = attributes._validate_range(10, (attributes.UNLIMITED, 9))
203 self.assertEqual("'10' is too large - must be no larger than '9'", msg)
205 def _test_validate_mac_address(self, validator, allow_none=False):
206 mac_addr = "ff:16:3e:4f:00:00"
207 msg = validator(mac_addr)
208 self.assertIsNone(msg)
210 mac_addr = "ffa:16:3e:4f:00:00"
211 msg = validator(mac_addr)
212 err_msg = "'%s' is not a valid MAC address"
213 self.assertEqual(err_msg % mac_addr, msg)
215 for invalid_mac_addr in constants.INVALID_MAC_ADDRESSES:
216 msg = validator(invalid_mac_addr)
217 self.assertEqual(err_msg % invalid_mac_addr, msg)
220 msg = validator(mac_addr)
221 self.assertEqual(err_msg % mac_addr, msg)
224 msg = validator(mac_addr)
226 self.assertIsNone(msg)
228 self.assertEqual(err_msg % mac_addr, msg)
230 mac_addr = "ff:16:3e:4f:00:00\r"
231 msg = validator(mac_addr)
232 self.assertEqual(err_msg % mac_addr, msg)
234 def test_validate_mac_address(self):
235 self._test_validate_mac_address(attributes._validate_mac_address)
237 def test_validate_mac_address_or_none(self):
238 self._test_validate_mac_address(
239 attributes._validate_mac_address_or_none, allow_none=True)
241 def test_validate_ip_address(self):
243 msg = attributes._validate_ip_address(ip_addr)
244 self.assertIsNone(msg)
246 ip_addr = '1111.1.1.1'
247 msg = attributes._validate_ip_address(ip_addr)
248 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
250 # Depending on platform to run UTs, this case might or might not be
251 # an equivalent to test_validate_ip_address_bsd.
253 msg = attributes._validate_ip_address(ip_addr)
254 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
256 ip_addr = '1.1.1.1 has whitespace'
257 msg = attributes._validate_ip_address(ip_addr)
258 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
260 ip_addr = '111.1.1.1\twhitespace'
261 msg = attributes._validate_ip_address(ip_addr)
262 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
264 ip_addr = '111.1.1.1\nwhitespace'
265 msg = attributes._validate_ip_address(ip_addr)
266 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
268 for ws in string.whitespace:
269 ip_addr = '%s111.1.1.1' % ws
270 msg = attributes._validate_ip_address(ip_addr)
271 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
273 for ws in string.whitespace:
274 ip_addr = '111.1.1.1%s' % ws
275 msg = attributes._validate_ip_address(ip_addr)
276 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
278 def test_validate_ip_address_with_leading_zero(self):
280 expected_msg = ("'%(data)s' is not an accepted IP address, "
281 "'%(ip)s' is recommended")
282 msg = attributes._validate_ip_address(ip_addr)
283 self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.1'},
286 ip_addr = '1.1.1.011'
287 msg = attributes._validate_ip_address(ip_addr)
288 self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.11'},
292 msg = attributes._validate_ip_address(ip_addr)
293 self.assertEqual(expected_msg % {"data": ip_addr, "ip": '1.1.1.9'},
296 ip_addr = "fe80:0:0:0:0:0:0:0001"
297 msg = attributes._validate_ip_address(ip_addr)
298 self.assertIsNone(msg)
300 def test_validate_ip_address_bsd(self):
301 # NOTE(yamamoto): On NetBSD and OS X, netaddr.IPAddress() accepts
302 # '1' * 59 as a valid address. The behaviour is inherited from
303 # libc behaviour there. This test ensures that our validator reject
304 # such addresses on such platforms by mocking netaddr to emulate
307 with mock.patch('netaddr.IPAddress') as ip_address_cls:
308 msg = attributes._validate_ip_address(ip_addr)
309 ip_address_cls.assert_called_once_with(ip_addr,
310 flags=netaddr.core.ZEROFILL)
311 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
313 def test_validate_ip_pools(self):
314 pools = [[{'end': '10.0.0.254'}],
315 [{'start': '10.0.0.254'}],
316 [{'start': '1000.0.0.254',
318 [{'start': '10.0.0.2', 'end': '10.0.0.254',
320 [{'start': '10.0.0.2', 'end': '10.0.0.254'},
321 {'end': '10.0.0.254'}],
325 msg = attributes._validate_ip_pools(pool)
326 self.assertIsNotNone(msg)
328 pools = [[{'end': '10.0.0.254', 'start': '10.0.0.2'},
329 {'start': '11.0.0.2', 'end': '11.1.1.1'}],
330 [{'start': '11.0.0.2', 'end': '11.0.0.100'}]]
332 msg = attributes._validate_ip_pools(pool)
333 self.assertIsNone(msg)
335 invalid_ip = '10.0.0.2\r'
336 pools = [[{'end': '10.0.0.254', 'start': invalid_ip}]]
338 msg = attributes._validate_ip_pools(pool)
339 self.assertEqual("'%s' is not a valid IP address" % invalid_ip,
342 def test_validate_fixed_ips(self):
344 {'data': [{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
345 'ip_address': '1111.1.1.1'}],
346 'error_msg': "'1111.1.1.1' is not a valid IP address"},
347 {'data': [{'subnet_id': 'invalid',
348 'ip_address': '1.1.1.1'}],
349 'error_msg': "'invalid' is not a valid UUID"},
351 'error_msg': "Invalid data format for fixed IP: 'None'"},
353 'error_msg': "Invalid data format for fixed IP: '1.1.1.1'"},
354 {'data': ['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1'],
355 'error_msg': "Invalid data format for fixed IP: "
356 "'00000000-ffff-ffff-ffff-000000000000'"},
357 {'data': [['00000000-ffff-ffff-ffff-000000000000', '1.1.1.1']],
358 'error_msg': "Invalid data format for fixed IP: "
359 "'['00000000-ffff-ffff-ffff-000000000000', "
361 {'data': [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
362 'ip_address': '1.1.1.1'},
363 {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
364 'ip_address': '1.1.1.1'}],
365 'error_msg': "Duplicate IP address '1.1.1.1'"}]
366 for fixed in fixed_ips:
367 msg = attributes._validate_fixed_ips(fixed['data'])
368 self.assertEqual(fixed['error_msg'], msg)
370 fixed_ips = [[{'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
371 'ip_address': '1.1.1.1'}],
372 [{'subnet_id': '00000000-0fff-ffff-ffff-000000000000',
373 'ip_address': '1.1.1.1'},
374 {'subnet_id': '00000000-ffff-ffff-ffff-000000000000',
375 'ip_address': '1.1.1.2'}]]
376 for fixed in fixed_ips:
377 msg = attributes._validate_fixed_ips(fixed)
378 self.assertIsNone(msg)
380 def test_validate_nameservers(self):
381 ns_pools = [['1.1.1.2', '1.1.1.2'],
382 ['www.hostname.com', 'www.hostname.com'],
384 ['www.hostname.com'],
385 ['www.great.marathons.to.travel'],
389 ['www.internal.hostname.com'],
393 msg = attributes._validate_nameservers(ns, None)
394 self.assertIsNotNone(msg)
396 ns_pools = [['100.0.0.2'],
397 ['1.1.1.1', '1.1.1.2']]
400 msg = attributes._validate_nameservers(ns, None)
401 self.assertIsNone(msg)
403 def test_validate_hostroutes(self):
404 hostroute_pools = [[{'destination': '100.0.0.0/24'}],
405 [{'nexthop': '10.0.2.20'}],
406 [{'nexthop': '10.0.2.20',
408 'destination': '100.0.0.0/8'}],
409 [{'nexthop': '1110.0.2.20',
410 'destination': '100.0.0.0/8'}],
411 [{'nexthop': '10.0.2.20',
412 'destination': '100.0.0.0'}],
413 [{'nexthop': '10.0.2.20',
414 'destination': '100.0.0.0/8'},
415 {'nexthop': '10.0.2.20',
416 'destination': '100.0.0.0/8'}],
419 for host_routes in hostroute_pools:
420 msg = attributes._validate_hostroutes(host_routes, None)
421 self.assertIsNotNone(msg)
423 hostroute_pools = [[{'destination': '100.0.0.0/24',
424 'nexthop': '10.0.2.20'}],
425 [{'nexthop': '10.0.2.20',
426 'destination': '100.0.0.0/8'},
427 {'nexthop': '10.0.2.20',
428 'destination': '101.0.0.0/8'}]]
429 for host_routes in hostroute_pools:
430 msg = attributes._validate_hostroutes(host_routes, None)
431 self.assertIsNone(msg)
433 def test_validate_ip_address_or_none(self):
435 msg = attributes._validate_ip_address_or_none(ip_addr)
436 self.assertIsNone(msg)
439 msg = attributes._validate_ip_address_or_none(ip_addr)
440 self.assertIsNone(msg)
442 ip_addr = '1111.1.1.1'
443 msg = attributes._validate_ip_address_or_none(ip_addr)
444 self.assertEqual("'%s' is not a valid IP address" % ip_addr, msg)
446 def test_uuid_pattern(self):
448 msg = attributes._validate_regex(data, attributes.UUID_PATTERN)
449 self.assertIsNotNone(msg)
451 data = '00000000-ffff-ffff-ffff-000000000000'
452 msg = attributes._validate_regex(data, attributes.UUID_PATTERN)
453 self.assertIsNone(msg)
455 def test_mac_pattern(self):
457 base_mac = "fa:16:3e:00:00:00"
458 msg = attributes._validate_regex(base_mac,
459 attributes.MAC_PATTERN)
460 self.assertIsNone(msg)
463 base_mac = "fa:16:3e:4f:00:00"
464 msg = attributes._validate_regex(base_mac,
465 attributes.MAC_PATTERN)
466 self.assertIsNone(msg)
468 # Invalid - not unicast
469 base_mac = "01:16:3e:4f:00:00"
470 msg = attributes._validate_regex(base_mac,
471 attributes.MAC_PATTERN)
472 self.assertIsNotNone(msg)
474 # Invalid - invalid format
475 base_mac = "a:16:3e:4f:00:00"
476 msg = attributes._validate_regex(base_mac,
477 attributes.MAC_PATTERN)
478 self.assertIsNotNone(msg)
480 # Invalid - invalid format
481 base_mac = "ffa:16:3e:4f:00:00"
482 msg = attributes._validate_regex(base_mac,
483 attributes.MAC_PATTERN)
484 self.assertIsNotNone(msg)
486 # Invalid - invalid format
487 base_mac = "01163e4f0000"
488 msg = attributes._validate_regex(base_mac,
489 attributes.MAC_PATTERN)
490 self.assertIsNotNone(msg)
492 # Invalid - invalid format
493 base_mac = "01-16-3e-4f-00-00"
494 msg = attributes._validate_regex(base_mac,
495 attributes.MAC_PATTERN)
496 self.assertIsNotNone(msg)
498 # Invalid - invalid format
499 base_mac = "00:16:3:f:00:00"
500 msg = attributes._validate_regex(base_mac,
501 attributes.MAC_PATTERN)
502 self.assertIsNotNone(msg)
504 # Invalid - invalid format
505 base_mac = "12:3:4:5:67:89ab"
506 msg = attributes._validate_regex(base_mac,
507 attributes.MAC_PATTERN)
508 self.assertIsNotNone(msg)
510 def _test_validate_subnet(self, validator, allow_none=False):
513 msg = validator(cidr, None)
514 self.assertIsNone(msg)
516 # Valid - IPv6 without final octets
518 msg = validator(cidr, None)
519 self.assertIsNone(msg)
521 # Valid - IPv6 with final octets
523 msg = validator(cidr, None)
524 self.assertIsNone(msg)
526 # Valid - uncompressed ipv6 address
527 cidr = "fe80:0:0:0:0:0:0:0/128"
528 msg = validator(cidr, None)
529 self.assertIsNone(msg)
531 # Valid - ipv6 address with multiple consecutive zero
532 cidr = "2001:0db8:0:0:1::1/128"
533 msg = validator(cidr, None)
534 self.assertIsNone(msg)
536 # Valid - ipv6 address with multiple consecutive zero
537 cidr = "2001:0db8::1:0:0:1/128"
538 msg = validator(cidr, None)
539 self.assertIsNone(msg)
541 # Valid - ipv6 address with multiple consecutive zero
542 cidr = "2001::0:1:0:0:1100/120"
543 msg = validator(cidr, None)
544 self.assertIsNone(msg)
546 # Invalid - abbreviated ipv4 address
548 msg = validator(cidr, None)
549 error = _("'%(data)s' isn't a recognized IP subnet cidr,"
550 " '%(cidr)s' is recommended") % {"data": cidr,
551 "cidr": "10.0.0.0/24"}
552 self.assertEqual(error, msg)
554 # Invalid - IPv4 missing mask
556 msg = validator(cidr, None)
557 error = _("'%(data)s' isn't a recognized IP subnet cidr,"
558 " '%(cidr)s' is recommended") % {"data": cidr,
559 "cidr": "10.0.2.0/32"}
560 self.assertEqual(error, msg)
562 # Valid - IPv4 with non-zero masked bits is ok
563 for i in range(1, 255):
564 cidr = "192.168.1.%s/24" % i
565 msg = validator(cidr, None)
566 self.assertIsNone(msg)
568 # Invalid - IPv6 without final octets, missing mask
570 msg = validator(cidr, None)
571 error = _("'%(data)s' isn't a recognized IP subnet cidr,"
572 " '%(cidr)s' is recommended") % {"data": cidr,
573 "cidr": "fe80::/128"}
574 self.assertEqual(error, msg)
576 # Invalid - IPv6 with final octets, missing mask
578 msg = validator(cidr, None)
579 error = _("'%(data)s' isn't a recognized IP subnet cidr,"
580 " '%(cidr)s' is recommended") % {"data": cidr,
581 "cidr": "fe80::/128"}
582 self.assertEqual(error, msg)
584 # Invalid - Address format error
586 msg = validator(cidr, None)
587 error = "'%s' is not a valid IP subnet" % cidr
588 self.assertEqual(error, msg)
591 msg = validator(cidr, None)
593 self.assertIsNone(msg)
595 error = "'%s' is not a valid IP subnet" % cidr
596 self.assertEqual(error, msg)
598 # Invalid - IPv4 with trailing CR
599 cidr = "10.0.2.0/24\r"
600 msg = validator(cidr, None)
601 error = "'%s' is not a valid IP subnet" % cidr
602 self.assertEqual(error, msg)
604 def test_validate_subnet(self):
605 self._test_validate_subnet(attributes._validate_subnet)
607 def test_validate_subnet_or_none(self):
608 self._test_validate_subnet(attributes._validate_subnet_or_none,
611 def _test_validate_regex(self, validator, allow_none=False):
615 msg = validator(data, pattern)
617 self.assertIsNone(msg)
619 self.assertEqual("'None' is not a valid input", msg)
622 msg = validator(data, pattern)
623 self.assertEqual("'%s' is not a valid input" % data, msg)
626 msg = validator(data, pattern)
627 self.assertIsNone(msg)
630 msg = validator(data, pattern)
631 self.assertIsNone(msg)
633 def test_validate_regex(self):
634 self._test_validate_regex(attributes._validate_regex)
636 def test_validate_regex_or_none(self):
637 self._test_validate_regex(attributes._validate_regex_or_none,
640 def test_validate_uuid(self):
641 invalid_uuids = [None,
644 't5069610-744b-42a7-8bd8-ceac1a229cd4',
645 'e5069610-744bb-42a7-8bd8-ceac1a229cd4']
646 for uuid in invalid_uuids:
647 msg = attributes._validate_uuid(uuid)
648 error = "'%s' is not a valid UUID" % uuid
649 self.assertEqual(error, msg)
651 msg = attributes._validate_uuid('00000000-ffff-ffff-ffff-000000000000')
652 self.assertIsNone(msg)
654 def test__validate_list_of_items(self):
658 'e5069610-744b-42a7-8bd8-ceac1a229cd4',
659 '12345678123456781234567812345678',
660 {'uuid': 'e5069610-744b-42a7-8bd8-ceac1a229cd4'}]
662 msg = attributes._validate_list_of_items(mock.Mock(), item)
663 error = "'%s' is not a list" % item
664 self.assertEqual(error, msg)
666 # check duplicate items in a list
667 duplicate_items = ['e5069610-744b-42a7-8bd8-ceac1a229cd4',
668 'f3eeab00-8367-4524-b662-55e64d4cacb5',
669 'e5069610-744b-42a7-8bd8-ceac1a229cd4']
670 msg = attributes._validate_list_of_items(mock.Mock(), duplicate_items)
671 error = ("Duplicate items in the list: "
672 "'%s'" % ', '.join(duplicate_items))
673 self.assertEqual(error, msg)
679 for list_obj in valid_lists:
680 msg = attributes._validate_list_of_items(
681 mock.Mock(return_value=None), list_obj)
682 self.assertIsNone(msg)
684 def test_validate_dict_type(self):
685 for value in (None, True, '1', []):
686 self.assertEqual("'%s' is not a dictionary" % value,
687 attributes._validate_dict(value))
689 def test_validate_dict_without_constraints(self):
690 msg = attributes._validate_dict({})
691 self.assertIsNone(msg)
693 # Validate a dictionary without constraints.
694 msg = attributes._validate_dict({'key': 'value'})
695 self.assertIsNone(msg)
697 def test_validate_a_valid_dict_with_constraints(self):
698 dictionary, constraints = self._construct_dict_and_constraints()
700 msg = attributes._validate_dict(dictionary, constraints)
701 self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
703 def test_validate_dict_with_invalid_validator(self):
704 dictionary, constraints = self._construct_dict_and_constraints()
706 constraints['key1'] = {'type:unsupported': None, 'required': True}
707 msg = attributes._validate_dict(dictionary, constraints)
708 self.assertEqual("Validator 'type:unsupported' does not exist.", msg)
710 def test_validate_dict_not_required_keys(self):
711 dictionary, constraints = self._construct_dict_and_constraints()
713 del dictionary['key2']
714 msg = attributes._validate_dict(dictionary, constraints)
715 self.assertIsNone(msg, 'Field that was not required by the specs was'
716 'required by the validator.')
718 def test_validate_dict_required_keys(self):
719 dictionary, constraints = self._construct_dict_and_constraints()
721 del dictionary['key1']
722 msg = attributes._validate_dict(dictionary, constraints)
723 self.assertIn('Expected keys:', msg)
725 def test_validate_dict_wrong_values(self):
726 dictionary, constraints = self._construct_dict_and_constraints()
728 dictionary['key1'] = 'UNSUPPORTED'
729 msg = attributes._validate_dict(dictionary, constraints)
730 self.assertIsNotNone(msg)
732 def test_validate_dict_convert_boolean(self):
733 dictionary, constraints = self._construct_dict_and_constraints()
735 constraints['key_bool'] = {
736 'type:boolean': None,
738 'convert_to': attributes.convert_to_boolean}
739 dictionary['key_bool'] = 'true'
740 msg = attributes._validate_dict(dictionary, constraints)
741 self.assertIsNone(msg)
742 # Explicitly comparing with literal 'True' as assertTrue
743 # succeeds also for 'true'
744 self.assertIs(True, dictionary['key_bool'])
746 def test_subdictionary(self):
747 dictionary, constraints = self._construct_dict_and_constraints()
749 del dictionary['key3']['k4']
750 dictionary['key3']['k5'] = 'a string value'
751 msg = attributes._validate_dict(dictionary, constraints)
752 self.assertIn('Expected keys:', msg)
754 def test_validate_dict_or_none(self):
755 dictionary, constraints = self._construct_dict_and_constraints()
757 # Check whether None is a valid value.
758 msg = attributes._validate_dict_or_none(None, constraints)
759 self.assertIsNone(msg, 'Validation of a None dictionary failed.')
761 # Check validation of a regular dictionary.
762 msg = attributes._validate_dict_or_none(dictionary, constraints)
763 self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
765 def test_validate_dict_or_empty(self):
766 dictionary, constraints = self._construct_dict_and_constraints()
768 # Check whether an empty dictionary is valid.
769 msg = attributes._validate_dict_or_empty({}, constraints)
770 self.assertIsNone(msg, 'Validation of a None dictionary failed.')
772 # Check validation of a regular dictionary.
773 msg = attributes._validate_dict_or_none(dictionary, constraints)
774 self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
775 self.assertIsNone(msg, 'Validation of a valid dictionary failed.')
777 def test_validate_non_negative(self):
778 for value in (-1, '-2'):
779 self.assertEqual("'%s' should be non-negative" % value,
780 attributes._validate_non_negative(value))
782 for value in (0, 1, '2', True, False):
783 msg = attributes._validate_non_negative(value)
784 self.assertIsNone(msg)
787 class TestConvertToBoolean(base.BaseTestCase):
789 def test_convert_to_boolean_bool(self):
790 self.assertIs(attributes.convert_to_boolean(True), True)
791 self.assertIs(attributes.convert_to_boolean(False), False)
793 def test_convert_to_boolean_int(self):
794 self.assertIs(attributes.convert_to_boolean(0), False)
795 self.assertIs(attributes.convert_to_boolean(1), True)
796 self.assertRaises(n_exc.InvalidInput,
797 attributes.convert_to_boolean,
800 def test_convert_to_boolean_str(self):
801 self.assertIs(attributes.convert_to_boolean('True'), True)
802 self.assertIs(attributes.convert_to_boolean('true'), True)
803 self.assertIs(attributes.convert_to_boolean('False'), False)
804 self.assertIs(attributes.convert_to_boolean('false'), False)
805 self.assertIs(attributes.convert_to_boolean('0'), False)
806 self.assertIs(attributes.convert_to_boolean('1'), True)
807 self.assertRaises(n_exc.InvalidInput,
808 attributes.convert_to_boolean,
812 class TestConvertToInt(base.BaseTestCase):
814 def test_convert_to_int_int(self):
815 self.assertEqual(-1, attributes.convert_to_int(-1))
816 self.assertEqual(0, attributes.convert_to_int(0))
817 self.assertEqual(1, attributes.convert_to_int(1))
819 def test_convert_to_int_if_not_none(self):
820 self.assertEqual(-1, attributes.convert_to_int_if_not_none(-1))
821 self.assertEqual(0, attributes.convert_to_int_if_not_none(0))
822 self.assertEqual(1, attributes.convert_to_int_if_not_none(1))
823 self.assertIsNone(attributes.convert_to_int_if_not_none(None))
825 def test_convert_to_int_str(self):
826 self.assertEqual(4, attributes.convert_to_int('4'))
827 self.assertEqual(6, attributes.convert_to_int('6'))
828 self.assertRaises(n_exc.InvalidInput,
829 attributes.convert_to_int,
832 def test_convert_to_int_none(self):
833 self.assertRaises(n_exc.InvalidInput,
834 attributes.convert_to_int,
837 def test_convert_none_to_empty_list_none(self):
838 self.assertEqual([], attributes.convert_none_to_empty_list(None))
840 def test_convert_none_to_empty_dict(self):
841 self.assertEqual({}, attributes.convert_none_to_empty_dict(None))
843 def test_convert_none_to_empty_list_value(self):
844 values = ['1', 3, [], [1], {}, {'a': 3}]
847 value, attributes.convert_none_to_empty_list(value))
850 class TestConvertToFloat(base.BaseTestCase):
851 # NOTE: the routine being tested here is a plugin-specific extension
852 # module. As the plugin split proceed towards its second phase this
853 # test should either be remove, or the validation routine moved into
854 # neutron.api.v2.attributes
856 def test_convert_to_float_positve_value(self):
858 1.111, attributes.convert_to_positive_float_or_none(1.111))
859 self.assertEqual(1, attributes.convert_to_positive_float_or_none(1))
860 self.assertEqual(0, attributes.convert_to_positive_float_or_none(0))
862 def test_convert_to_float_negative_value(self):
863 self.assertRaises(n_exc.InvalidInput,
864 attributes.convert_to_positive_float_or_none,
867 def test_convert_to_float_string(self):
868 self.assertEqual(4, attributes.convert_to_positive_float_or_none('4'))
870 4.44, attributes.convert_to_positive_float_or_none('4.44'))
871 self.assertRaises(n_exc.InvalidInput,
872 attributes.convert_to_positive_float_or_none,
875 def test_convert_to_float_none_value(self):
876 self.assertIsNone(attributes.convert_to_positive_float_or_none(None))
879 class TestConvertKvp(base.BaseTestCase):
881 def test_convert_kvp_list_to_dict_succeeds_for_missing_values(self):
882 result = attributes.convert_kvp_list_to_dict(['True'])
883 self.assertEqual({}, result)
885 def test_convert_kvp_list_to_dict_succeeds_for_multiple_values(self):
886 result = attributes.convert_kvp_list_to_dict(
887 ['a=b', 'a=c', 'a=c', 'b=a'])
888 expected = {'a': tools.UnorderedList(['c', 'b']), 'b': ['a']}
889 self.assertEqual(expected, result)
891 def test_convert_kvp_list_to_dict_succeeds_for_values(self):
892 result = attributes.convert_kvp_list_to_dict(['a=b', 'c=d'])
893 self.assertEqual({'a': ['b'], 'c': ['d']}, result)
895 def test_convert_kvp_str_to_list_fails_for_missing_key(self):
896 with testtools.ExpectedException(n_exc.InvalidInput):
897 attributes.convert_kvp_str_to_list('=a')
899 def test_convert_kvp_str_to_list_fails_for_missing_equals(self):
900 with testtools.ExpectedException(n_exc.InvalidInput):
901 attributes.convert_kvp_str_to_list('a')
903 def test_convert_kvp_str_to_list_succeeds_for_one_equals(self):
904 result = attributes.convert_kvp_str_to_list('a=')
905 self.assertEqual(['a', ''], result)
907 def test_convert_kvp_str_to_list_succeeds_for_two_equals(self):
908 result = attributes.convert_kvp_str_to_list('a=a=a')
909 self.assertEqual(['a', 'a=a'], result)
912 class TestConvertToList(base.BaseTestCase):
914 def test_convert_to_empty_list(self):
915 for item in (None, [], (), {}):
916 self.assertEqual([], attributes.convert_to_list(item))
918 def test_convert_to_list_string(self):
919 for item in ('', 'foo'):
920 self.assertEqual([item], attributes.convert_to_list(item))
922 def test_convert_to_list_iterable(self):
923 for item in ([None], [1, 2, 3], (1, 2, 3), set([1, 2, 3]), ['foo']):
924 self.assertEqual(list(item), attributes.convert_to_list(item))
926 def test_convert_to_list_non_iterable(self):
927 for item in (True, False, 1, 1.2, object()):
928 self.assertEqual([item], attributes.convert_to_list(item))
931 class TestResDict(base.BaseTestCase):
932 class _MyException(Exception):
934 _EXC_CLS = _MyException
936 def _test_fill_default_value(self, attr_info, expected, res_dict):
937 attributes.fill_default_value(attr_info, res_dict)
938 self.assertEqual(expected, res_dict)
940 def test_fill_default_value(self):
944 'default': attributes.ATTR_NOT_SPECIFIED,
947 self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
948 self._test_fill_default_value(
949 attr_info, {'key': attributes.ATTR_NOT_SPECIFIED}, {})
956 self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
957 self.assertRaises(ValueError, self._test_fill_default_value,
958 attr_info, {'key': 'X'}, {})
959 self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
960 attr_info, {}, self._EXC_CLS)
966 self.assertRaises(ValueError, self._test_fill_default_value,
967 attr_info, {'key': 'X'}, {'key': 'X'})
968 self._test_fill_default_value(attr_info, {}, {})
969 self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
970 attr_info, {'key': 'X'}, self._EXC_CLS)
972 def _test_convert_value(self, attr_info, expected, res_dict):
973 attributes.convert_value(attr_info, res_dict)
974 self.assertEqual(expected, res_dict)
976 def test_convert_value(self):
981 self._test_convert_value(attr_info,
982 {'key': attributes.ATTR_NOT_SPECIFIED},
983 {'key': attributes.ATTR_NOT_SPECIFIED})
984 self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
985 self._test_convert_value(attr_info,
986 {'other_key': 'X'}, {'other_key': 'X'})
990 'convert_to': attributes.convert_to_int,
993 self._test_convert_value(attr_info,
994 {'key': attributes.ATTR_NOT_SPECIFIED},
995 {'key': attributes.ATTR_NOT_SPECIFIED})
996 self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
997 self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
998 self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
999 attr_info, {'key': 1}, {'key': 'a'})
1003 'validate': {'type:uuid': None},
1006 self._test_convert_value(attr_info,
1007 {'key': attributes.ATTR_NOT_SPECIFIED},
1008 {'key': attributes.ATTR_NOT_SPECIFIED})
1009 uuid_str = '01234567-1234-1234-1234-1234567890ab'
1010 self._test_convert_value(attr_info,
1011 {'key': uuid_str}, {'key': uuid_str})
1012 self.assertRaises(ValueError, self._test_convert_value,
1013 attr_info, {'key': 1}, {'key': 1})
1014 self.assertRaises(self._EXC_CLS, attributes.convert_value,
1015 attr_info, {'key': 1}, self._EXC_CLS)
1017 def test_populate_tenant_id(self):
1018 tenant_id_1 = uuidutils.generate_uuid()
1019 tenant_id_2 = uuidutils.generate_uuid()
1020 # apart from the admin, nobody can create a res on behalf of another
1022 ctx = context.Context(user_id=None, tenant_id=tenant_id_1)
1023 res_dict = {'tenant_id': tenant_id_2}
1024 self.assertRaises(webob.exc.HTTPBadRequest,
1025 attributes.populate_tenant_id,
1026 ctx, res_dict, None, None)
1028 self.assertIsNone(attributes.populate_tenant_id(ctx, res_dict,
1031 # for each create request, the tenant_id should be added to the
1034 attributes.populate_tenant_id(ctx, res_dict2, None, True)
1035 self.assertEqual({'tenant_id': ctx.tenant_id}, res_dict2)
1037 # if the tenant_id is mandatory for the resource and not specified
1038 # in the request nor in the context, an exception should be raised
1040 attr_info = {'tenant_id': {'allow_post': True}, }
1041 ctx.tenant_id = None
1042 self.assertRaises(webob.exc.HTTPBadRequest,
1043 attributes.populate_tenant_id,
1044 ctx, res_dict3, attr_info, True)
1047 class TestHelpers(base.DietTestCase):
1049 def _verify_port_attributes(self, attrs):
1050 for test_attribute in ('id', 'name', 'mac_address', 'network_id',
1051 'tenant_id', 'fixed_ips', 'status'):
1052 self.assertIn(test_attribute, attrs)
1054 def test_get_collection_info(self):
1055 attrs = attributes.get_collection_info('ports')
1056 self._verify_port_attributes(attrs)
1058 def test_get_collection_info_missing(self):
1059 self.assertFalse(attributes.get_collection_info('meh'))
1061 def test_get_resource_info(self):
1062 attributes.REVERSED_PLURALS.pop('port', None)
1063 attrs = attributes.get_resource_info('port')
1064 self._verify_port_attributes(attrs)
1065 # verify side effect
1066 self.assertIn('port', attributes.REVERSED_PLURALS)
1068 def test_get_resource_info_missing(self):
1069 self.assertFalse(attributes.get_resource_info('meh'))
1071 def test_get_resource_info_cached(self):
1072 with mock.patch('neutron.api.v2.attributes.PLURALS') as mock_plurals:
1073 attributes.REVERSED_PLURALS['port'] = 'ports'
1074 attrs = attributes.get_resource_info('port')
1075 self._verify_port_attributes(attrs)
1076 self.assertEqual(0, mock_plurals.items.call_count)