f9bc4b2165503b7ca6448c119122db29ab9130f6
[openstack-build/neutron-build.git] / neutron / tests / unit / common / test_utils.py
1 # Copyright (c) 2012 OpenStack Foundation.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import errno
16 import re
17
18 import eventlet
19 import mock
20 import netaddr
21 import six
22 import testtools
23
24 from neutron.common import constants
25 from neutron.common import exceptions as n_exc
26 from neutron.common import utils
27 from neutron.plugins.common import constants as p_const
28 from neutron.plugins.common import utils as plugin_utils
29 from neutron.tests import base
30 from neutron.tests.common import helpers
31
32 from oslo_log import log as logging
33
34
35 class TestParseMappings(base.BaseTestCase):
36     def parse(self, mapping_list, unique_values=True):
37         return utils.parse_mappings(mapping_list, unique_values)
38
39     def test_parse_mappings_fails_for_missing_separator(self):
40         with testtools.ExpectedException(ValueError):
41             self.parse(['key'])
42
43     def test_parse_mappings_fails_for_missing_key(self):
44         with testtools.ExpectedException(ValueError):
45             self.parse([':val'])
46
47     def test_parse_mappings_fails_for_missing_value(self):
48         with testtools.ExpectedException(ValueError):
49             self.parse(['key:'])
50
51     def test_parse_mappings_fails_for_extra_separator(self):
52         with testtools.ExpectedException(ValueError):
53             self.parse(['key:val:junk'])
54
55     def test_parse_mappings_fails_for_duplicate_key(self):
56         with testtools.ExpectedException(ValueError):
57             self.parse(['key:val1', 'key:val2'])
58
59     def test_parse_mappings_fails_for_duplicate_value(self):
60         with testtools.ExpectedException(ValueError):
61             self.parse(['key1:val', 'key2:val'])
62
63     def test_parse_mappings_succeeds_for_one_mapping(self):
64         self.assertEqual(self.parse(['key:val']), {'key': 'val'})
65
66     def test_parse_mappings_succeeds_for_n_mappings(self):
67         self.assertEqual(self.parse(['key1:val1', 'key2:val2']),
68                          {'key1': 'val1', 'key2': 'val2'})
69
70     def test_parse_mappings_succeeds_for_duplicate_value(self):
71         self.assertEqual(self.parse(['key1:val', 'key2:val'], False),
72                          {'key1': 'val', 'key2': 'val'})
73
74     def test_parse_mappings_succeeds_for_no_mappings(self):
75         self.assertEqual({}, self.parse(['']))
76
77
78 class TestParseTunnelRangesMixin(object):
79     TUN_MIN = None
80     TUN_MAX = None
81     TYPE = None
82     _err_prefix = "Invalid network tunnel range: '%d:%d' - "
83     _err_suffix = "%s is not a valid %s identifier."
84     _err_range = "End of tunnel range is less than start of tunnel range."
85
86     def _build_invalid_tunnel_range_msg(self, t_range_tuple, n):
87         bad_id = t_range_tuple[n - 1]
88         return (self._err_prefix % t_range_tuple) + (self._err_suffix
89                                                  % (bad_id, self.TYPE))
90
91     def _build_range_reversed_msg(self, t_range_tuple):
92         return (self._err_prefix % t_range_tuple) + self._err_range
93
94     def _verify_range(self, tunnel_range):
95         return plugin_utils.verify_tunnel_range(tunnel_range, self.TYPE)
96
97     def _check_range_valid_ranges(self, tunnel_range):
98         self.assertIsNone(self._verify_range(tunnel_range))
99
100     def _check_range_invalid_ranges(self, bad_range, which):
101         expected_msg = self._build_invalid_tunnel_range_msg(bad_range, which)
102         err = self.assertRaises(n_exc.NetworkTunnelRangeError,
103                                 self._verify_range, bad_range)
104         self.assertEqual(expected_msg, str(err))
105
106     def _check_range_reversed(self, bad_range):
107         err = self.assertRaises(n_exc.NetworkTunnelRangeError,
108                                 self._verify_range, bad_range)
109         expected_msg = self._build_range_reversed_msg(bad_range)
110         self.assertEqual(expected_msg, str(err))
111
112     def test_range_tunnel_id_valid(self):
113             self._check_range_valid_ranges((self.TUN_MIN, self.TUN_MAX))
114
115     def test_range_tunnel_id_invalid(self):
116             self._check_range_invalid_ranges((-1, self.TUN_MAX), 1)
117             self._check_range_invalid_ranges((self.TUN_MIN,
118                                               self.TUN_MAX + 1), 2)
119             self._check_range_invalid_ranges((self.TUN_MIN - 1,
120                                               self.TUN_MAX + 1), 1)
121
122     def test_range_tunnel_id_reversed(self):
123             self._check_range_reversed((self.TUN_MAX, self.TUN_MIN))
124
125
126 class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
127                                     base.BaseTestCase):
128     TUN_MIN = p_const.MIN_GRE_ID
129     TUN_MAX = p_const.MAX_GRE_ID
130     TYPE = p_const.TYPE_GRE
131
132
133 class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
134                                       base.BaseTestCase):
135     TUN_MIN = p_const.MIN_VXLAN_VNI
136     TUN_MAX = p_const.MAX_VXLAN_VNI
137     TYPE = p_const.TYPE_VXLAN
138
139
140 class UtilTestParseVlanRanges(base.BaseTestCase):
141     _err_prefix = "Invalid network VLAN range: '"
142     _err_too_few = "' - 'need more than 2 values to unpack'."
143     _err_too_many_prefix = "' - 'too many values to unpack"
144     _err_not_int = "' - 'invalid literal for int() with base 10: '%s''."
145     _err_bad_vlan = "' - '%s is not a valid VLAN tag'."
146     _err_range = "' - 'End of VLAN range is less than start of VLAN range'."
147
148     def _range_too_few_err(self, nv_range):
149         return self._err_prefix + nv_range + self._err_too_few
150
151     def _range_too_many_err_prefix(self, nv_range):
152         return self._err_prefix + nv_range + self._err_too_many_prefix
153
154     def _vlan_not_int_err(self, nv_range, vlan):
155         return self._err_prefix + nv_range + (self._err_not_int % vlan)
156
157     def _nrange_invalid_vlan(self, nv_range, n):
158         vlan = nv_range.split(':')[n]
159         v_range = ':'.join(nv_range.split(':')[1:])
160         return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
161
162     def _vrange_invalid_vlan(self, v_range_tuple, n):
163         vlan = v_range_tuple[n - 1]
164         v_range_str = '%d:%d' % v_range_tuple
165         return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
166
167     def _vrange_invalid(self, v_range_tuple):
168         v_range_str = '%d:%d' % v_range_tuple
169         return self._err_prefix + v_range_str + self._err_range
170
171
172 class TestVlanNetworkNameValid(base.BaseTestCase):
173     def parse_vlan_ranges(self, vlan_range):
174         return plugin_utils.parse_network_vlan_ranges(vlan_range)
175
176     def test_validate_provider_phynet_name_mixed(self):
177         self.assertRaises(n_exc.PhysicalNetworkNameError,
178                           self.parse_vlan_ranges,
179                           ['', ':23:30', 'physnet1',
180                            'tenant_net:100:200'])
181
182     def test_validate_provider_phynet_name_bad(self):
183         self.assertRaises(n_exc.PhysicalNetworkNameError,
184                           self.parse_vlan_ranges,
185                           [':1:34'])
186
187
188 class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
189     def verify_range(self, vlan_range):
190         return plugin_utils.verify_vlan_range(vlan_range)
191
192     def test_range_valid_ranges(self):
193         self.assertIsNone(self.verify_range((1, 2)))
194         self.assertIsNone(self.verify_range((1, 1999)))
195         self.assertIsNone(self.verify_range((100, 100)))
196         self.assertIsNone(self.verify_range((100, 200)))
197         self.assertIsNone(self.verify_range((4001, 4094)))
198         self.assertIsNone(self.verify_range((1, 4094)))
199
200     def check_one_vlan_invalid(self, bad_range, which):
201         expected_msg = self._vrange_invalid_vlan(bad_range, which)
202         err = self.assertRaises(n_exc.NetworkVlanRangeError,
203                                 self.verify_range, bad_range)
204         self.assertEqual(str(err), expected_msg)
205
206     def test_range_first_vlan_invalid_negative(self):
207         self.check_one_vlan_invalid((-1, 199), 1)
208
209     def test_range_first_vlan_invalid_zero(self):
210         self.check_one_vlan_invalid((0, 199), 1)
211
212     def test_range_first_vlan_invalid_limit_plus_one(self):
213         self.check_one_vlan_invalid((4095, 199), 1)
214
215     def test_range_first_vlan_invalid_too_big(self):
216         self.check_one_vlan_invalid((9999, 199), 1)
217
218     def test_range_second_vlan_invalid_negative(self):
219         self.check_one_vlan_invalid((299, -1), 2)
220
221     def test_range_second_vlan_invalid_zero(self):
222         self.check_one_vlan_invalid((299, 0), 2)
223
224     def test_range_second_vlan_invalid_limit_plus_one(self):
225         self.check_one_vlan_invalid((299, 4095), 2)
226
227     def test_range_second_vlan_invalid_too_big(self):
228         self.check_one_vlan_invalid((299, 9999), 2)
229
230     def test_range_both_vlans_invalid_01(self):
231         self.check_one_vlan_invalid((-1, 0), 1)
232
233     def test_range_both_vlans_invalid_02(self):
234         self.check_one_vlan_invalid((0, 4095), 1)
235
236     def test_range_both_vlans_invalid_03(self):
237         self.check_one_vlan_invalid((4095, 9999), 1)
238
239     def test_range_both_vlans_invalid_04(self):
240         self.check_one_vlan_invalid((9999, -1), 1)
241
242     def test_range_reversed(self):
243         bad_range = (95, 10)
244         expected_msg = self._vrange_invalid(bad_range)
245         err = self.assertRaises(n_exc.NetworkVlanRangeError,
246                                 self.verify_range, bad_range)
247         self.assertEqual(str(err), expected_msg)
248
249
250 class TestParseOneVlanRange(UtilTestParseVlanRanges):
251     def parse_one(self, cfg_entry):
252         return plugin_utils.parse_network_vlan_range(cfg_entry)
253
254     def test_parse_one_net_no_vlan_range(self):
255         config_str = "net1"
256         expected_networks = ("net1", None)
257         self.assertEqual(self.parse_one(config_str), expected_networks)
258
259     def test_parse_one_net_and_vlan_range(self):
260         config_str = "net1:100:199"
261         expected_networks = ("net1", (100, 199))
262         self.assertEqual(self.parse_one(config_str), expected_networks)
263
264     def test_parse_one_net_incomplete_range(self):
265         config_str = "net1:100"
266         expected_msg = self._range_too_few_err(config_str)
267         err = self.assertRaises(n_exc.NetworkVlanRangeError,
268                                 self.parse_one, config_str)
269         self.assertEqual(str(err), expected_msg)
270
271     def test_parse_one_net_range_too_many(self):
272         config_str = "net1:100:150:200"
273         expected_msg_prefix = self._range_too_many_err_prefix(config_str)
274         err = self.assertRaises(n_exc.NetworkVlanRangeError,
275                                 self.parse_one, config_str)
276         # The error message is not same in Python 2 and Python 3. In Python 3,
277         # it depends on the amount of values used when unpacking, so it cannot
278         # be predicted as a fixed string.
279         self.assertTrue(str(err).startswith(expected_msg_prefix))
280
281     def test_parse_one_net_vlan1_not_int(self):
282         config_str = "net1:foo:199"
283         expected_msg = self._vlan_not_int_err(config_str, 'foo')
284         err = self.assertRaises(n_exc.NetworkVlanRangeError,
285                                 self.parse_one, config_str)
286         self.assertEqual(str(err), expected_msg)
287
288     def test_parse_one_net_vlan2_not_int(self):
289         config_str = "net1:100:bar"
290         expected_msg = self._vlan_not_int_err(config_str, 'bar')
291         err = self.assertRaises(n_exc.NetworkVlanRangeError,
292                                 self.parse_one, config_str)
293         self.assertEqual(str(err), expected_msg)
294
295     def test_parse_one_net_and_max_range(self):
296         config_str = "net1:1:4094"
297         expected_networks = ("net1", (1, 4094))
298         self.assertEqual(self.parse_one(config_str), expected_networks)
299
300     def test_parse_one_net_range_bad_vlan1(self):
301         config_str = "net1:9000:150"
302         expected_msg = self._nrange_invalid_vlan(config_str, 1)
303         err = self.assertRaises(n_exc.NetworkVlanRangeError,
304                                 self.parse_one, config_str)
305         self.assertEqual(str(err), expected_msg)
306
307     def test_parse_one_net_range_bad_vlan2(self):
308         config_str = "net1:4000:4999"
309         expected_msg = self._nrange_invalid_vlan(config_str, 2)
310         err = self.assertRaises(n_exc.NetworkVlanRangeError,
311                                 self.parse_one, config_str)
312         self.assertEqual(str(err), expected_msg)
313
314
315 class TestParseVlanRangeList(UtilTestParseVlanRanges):
316     def parse_list(self, cfg_entries):
317         return plugin_utils.parse_network_vlan_ranges(cfg_entries)
318
319     def test_parse_list_one_net_no_vlan_range(self):
320         config_list = ["net1"]
321         expected_networks = {"net1": []}
322         self.assertEqual(self.parse_list(config_list), expected_networks)
323
324     def test_parse_list_one_net_vlan_range(self):
325         config_list = ["net1:100:199"]
326         expected_networks = {"net1": [(100, 199)]}
327         self.assertEqual(self.parse_list(config_list), expected_networks)
328
329     def test_parse_two_nets_no_vlan_range(self):
330         config_list = ["net1",
331                        "net2"]
332         expected_networks = {"net1": [],
333                              "net2": []}
334         self.assertEqual(self.parse_list(config_list), expected_networks)
335
336     def test_parse_two_nets_range_and_no_range(self):
337         config_list = ["net1:100:199",
338                        "net2"]
339         expected_networks = {"net1": [(100, 199)],
340                              "net2": []}
341         self.assertEqual(self.parse_list(config_list), expected_networks)
342
343     def test_parse_two_nets_no_range_and_range(self):
344         config_list = ["net1",
345                        "net2:200:299"]
346         expected_networks = {"net1": [],
347                              "net2": [(200, 299)]}
348         self.assertEqual(self.parse_list(config_list), expected_networks)
349
350     def test_parse_two_nets_bad_vlan_range1(self):
351         config_list = ["net1:100",
352                        "net2:200:299"]
353         expected_msg = self._range_too_few_err(config_list[0])
354         err = self.assertRaises(n_exc.NetworkVlanRangeError,
355                                 self.parse_list, config_list)
356         self.assertEqual(str(err), expected_msg)
357
358     def test_parse_two_nets_vlan_not_int2(self):
359         config_list = ["net1:100:199",
360                        "net2:200:0x200"]
361         expected_msg = self._vlan_not_int_err(config_list[1], '0x200')
362         err = self.assertRaises(n_exc.NetworkVlanRangeError,
363                                 self.parse_list, config_list)
364         self.assertEqual(str(err), expected_msg)
365
366     def test_parse_two_nets_and_append_1_2(self):
367         config_list = ["net1:100:199",
368                        "net1:1000:1099",
369                        "net2:200:299"]
370         expected_networks = {"net1": [(100, 199),
371                                       (1000, 1099)],
372                              "net2": [(200, 299)]}
373         self.assertEqual(self.parse_list(config_list), expected_networks)
374
375     def test_parse_two_nets_and_append_1_3(self):
376         config_list = ["net1:100:199",
377                        "net2:200:299",
378                        "net1:1000:1099"]
379         expected_networks = {"net1": [(100, 199),
380                                       (1000, 1099)],
381                              "net2": [(200, 299)]}
382         self.assertEqual(self.parse_list(config_list), expected_networks)
383
384
385 class TestDictUtils(base.BaseTestCase):
386     def test_dict2str(self):
387         dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
388         expected = "key1=value1,key2=value2,key3=value3"
389         self.assertEqual(utils.dict2str(dic), expected)
390
391     def test_str2dict(self):
392         string = "key1=value1,key2=value2,key3=value3"
393         expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
394         self.assertEqual(utils.str2dict(string), expected)
395
396     def test_dict_str_conversion(self):
397         dic = {"key1": "value1", "key2": "value2"}
398         self.assertEqual(utils.str2dict(utils.dict2str(dic)), dic)
399
400     def test_diff_list_of_dict(self):
401         old_list = [{"key1": "value1"},
402                     {"key2": "value2"},
403                     {"key3": "value3"}]
404         new_list = [{"key1": "value1"},
405                     {"key2": "value2"},
406                     {"key4": "value4"}]
407         added, removed = utils.diff_list_of_dict(old_list, new_list)
408         self.assertEqual(added, [dict(key4="value4")])
409         self.assertEqual(removed, [dict(key3="value3")])
410
411
412 class _CachingDecorator(object):
413     def __init__(self):
414         self.func_retval = 'bar'
415         self._cache = mock.Mock()
416
417     @utils.cache_method_results
418     def func(self, *args, **kwargs):
419         return self.func_retval
420
421
422 class TestCachingDecorator(base.BaseTestCase):
423     def setUp(self):
424         super(TestCachingDecorator, self).setUp()
425         self.decor = _CachingDecorator()
426         self.func_name = '%(module)s._CachingDecorator.func' % {
427             'module': self.__module__
428         }
429         self.not_cached = self.decor.func.func.__self__._not_cached
430
431     def test_cache_miss(self):
432         expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
433         args = (1, 2)
434         kwargs = {'foo': 'bar'}
435         self.decor._cache.get.return_value = self.not_cached
436         retval = self.decor.func(*args, **kwargs)
437         self.decor._cache.set.assert_called_once_with(
438             expected_key, self.decor.func_retval, None)
439         self.assertEqual(self.decor.func_retval, retval)
440
441     def test_cache_hit(self):
442         expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
443         args = (1, 2)
444         kwargs = {'foo': 'bar'}
445         retval = self.decor.func(*args, **kwargs)
446         self.assertFalse(self.decor._cache.set.called)
447         self.assertEqual(self.decor._cache.get.return_value, retval)
448         self.decor._cache.get.assert_called_once_with(expected_key,
449                                                       self.not_cached)
450
451     def test_get_unhashable(self):
452         expected_key = (self.func_name, [1], 2)
453         self.decor._cache.get.side_effect = TypeError
454         retval = self.decor.func([1], 2)
455         self.assertFalse(self.decor._cache.set.called)
456         self.assertEqual(self.decor.func_retval, retval)
457         self.decor._cache.get.assert_called_once_with(expected_key,
458                                                       self.not_cached)
459
460     def test_missing_cache(self):
461         delattr(self.decor, '_cache')
462         self.assertRaises(NotImplementedError, self.decor.func, (1, 2))
463
464     def test_no_cache(self):
465         self.decor._cache = False
466         retval = self.decor.func((1, 2))
467         self.assertEqual(self.decor.func_retval, retval)
468
469
470 class TestDict2Tuples(base.BaseTestCase):
471     def test_dict(self):
472         input_dict = {'foo': 'bar', '42': 'baz', 'aaa': 'zzz'}
473         expected = (('42', 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
474         output_tuple = utils.dict2tuple(input_dict)
475         self.assertEqual(expected, output_tuple)
476
477
478 class TestExceptionLogger(base.BaseTestCase):
479     def test_normal_call(self):
480         result = "Result"
481
482         @utils.exception_logger()
483         def func():
484             return result
485
486         self.assertEqual(result, func())
487
488     def test_raise(self):
489         result = "Result"
490
491         @utils.exception_logger()
492         def func():
493             raise RuntimeError(result)
494
495         self.assertRaises(RuntimeError, func)
496
497     def test_spawn_normal(self):
498         result = "Result"
499         logger = mock.Mock()
500
501         @utils.exception_logger(logger=logger)
502         def func():
503             return result
504
505         gt = eventlet.spawn(func)
506         self.assertEqual(result, gt.wait())
507         self.assertFalse(logger.called)
508
509     def test_spawn_raise(self):
510         result = "Result"
511         logger = mock.Mock()
512
513         @utils.exception_logger(logger=logger)
514         def func():
515             raise RuntimeError(result)
516
517         gt = eventlet.spawn(func)
518         self.assertRaises(RuntimeError, gt.wait)
519         self.assertTrue(logger.called)
520
521     def test_pool_spawn_normal(self):
522         logger = mock.Mock()
523         calls = mock.Mock()
524
525         @utils.exception_logger(logger=logger)
526         def func(i):
527             calls(i)
528
529         pool = eventlet.GreenPool(4)
530         for i in range(0, 4):
531             pool.spawn(func, i)
532         pool.waitall()
533
534         calls.assert_has_calls([mock.call(0), mock.call(1),
535                                 mock.call(2), mock.call(3)],
536                                any_order=True)
537         self.assertFalse(logger.called)
538
539     def test_pool_spawn_raise(self):
540         logger = mock.Mock()
541         calls = mock.Mock()
542
543         @utils.exception_logger(logger=logger)
544         def func(i):
545             if i == 2:
546                 raise RuntimeError(2)
547             else:
548                 calls(i)
549
550         pool = eventlet.GreenPool(4)
551         for i in range(0, 4):
552             pool.spawn(func, i)
553         pool.waitall()
554
555         calls.assert_has_calls([mock.call(0), mock.call(1), mock.call(3)],
556                                any_order=True)
557         self.assertTrue(logger.called)
558
559
560 class TestDvrServices(base.BaseTestCase):
561
562     def _test_is_dvr_serviced(self, device_owner, expected):
563         self.assertEqual(expected, utils.is_dvr_serviced(device_owner))
564
565     def test_is_dvr_serviced_with_lb_port(self):
566         self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCER, True)
567
568     def test_is_dvr_serviced_with_lbv2_port(self):
569         self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCERV2, True)
570
571     def test_is_dvr_serviced_with_dhcp_port(self):
572         self._test_is_dvr_serviced(constants.DEVICE_OWNER_DHCP, True)
573
574     def test_is_dvr_serviced_with_vm_port(self):
575         self._test_is_dvr_serviced(constants.DEVICE_OWNER_COMPUTE_PREFIX, True)
576
577
578 class TestIpToCidr(base.BaseTestCase):
579     def test_ip_to_cidr_ipv4_default(self):
580         self.assertEqual('15.1.2.3/32', utils.ip_to_cidr('15.1.2.3'))
581
582     def test_ip_to_cidr_ipv4_prefix(self):
583         self.assertEqual('15.1.2.3/24', utils.ip_to_cidr('15.1.2.3', 24))
584
585     def test_ip_to_cidr_ipv4_netaddr(self):
586         ip_address = netaddr.IPAddress('15.1.2.3')
587         self.assertEqual('15.1.2.3/32', utils.ip_to_cidr(ip_address))
588
589     def test_ip_to_cidr_ipv4_bad_prefix(self):
590         self.assertRaises(netaddr.core.AddrFormatError,
591                           utils.ip_to_cidr, '15.1.2.3', 33)
592
593     def test_ip_to_cidr_ipv6_default(self):
594         self.assertEqual('::1/128', utils.ip_to_cidr('::1'))
595
596     def test_ip_to_cidr_ipv6_prefix(self):
597         self.assertEqual('::1/64', utils.ip_to_cidr('::1', 64))
598
599     def test_ip_to_cidr_ipv6_bad_prefix(self):
600         self.assertRaises(netaddr.core.AddrFormatError,
601                           utils.ip_to_cidr, '2000::1', 129)
602
603
604 class TestCidrIsHost(base.BaseTestCase):
605     def test_is_cidr_host_ipv4(self):
606         self.assertTrue(utils.is_cidr_host('15.1.2.3/32'))
607
608     def test_is_cidr_host_ipv4_not_cidr(self):
609         self.assertRaises(ValueError,
610                           utils.is_cidr_host,
611                           '15.1.2.3')
612
613     def test_is_cidr_host_ipv6(self):
614         self.assertTrue(utils.is_cidr_host('2000::1/128'))
615
616     def test_is_cidr_host_ipv6_netaddr(self):
617         net = netaddr.IPNetwork("2000::1")
618         self.assertTrue(utils.is_cidr_host(net))
619
620     def test_is_cidr_host_ipv6_32(self):
621         self.assertFalse(utils.is_cidr_host('2000::1/32'))
622
623     def test_is_cidr_host_ipv6_not_cidr(self):
624         self.assertRaises(ValueError,
625                           utils.is_cidr_host,
626                           '2000::1')
627
628     def test_is_cidr_host_ipv6_not_cidr_netaddr(self):
629         ip_address = netaddr.IPAddress("2000::3")
630         self.assertRaises(ValueError,
631                           utils.is_cidr_host,
632                           ip_address)
633
634
635 class TestIpVersionFromInt(base.BaseTestCase):
636     def test_ip_version_from_int_ipv4(self):
637         self.assertEqual(utils.ip_version_from_int(4),
638                          constants.IPv4)
639
640     def test_ip_version_from_int_ipv6(self):
641         self.assertEqual(utils.ip_version_from_int(6),
642                          constants.IPv6)
643
644     def test_ip_version_from_int_illegal_int(self):
645         self.assertRaises(ValueError,
646                           utils.ip_version_from_int,
647                           8)
648
649
650 class TestDelayedStringRenderer(base.BaseTestCase):
651     def test_call_deferred_until_str(self):
652         my_func = mock.MagicMock(return_value='Brie cheese!')
653         delayed = utils.DelayedStringRenderer(my_func, 1, 2, key_arg=44)
654         self.assertFalse(my_func.called)
655         string = "Type: %s" % delayed
656         my_func.assert_called_once_with(1, 2, key_arg=44)
657         self.assertEqual("Type: Brie cheese!", string)
658
659     def test_not_called_with_low_log_level(self):
660         LOG = logging.getLogger(__name__)
661         # make sure we return logging to previous level
662         current_log_level = LOG.logger.getEffectiveLevel()
663         self.addCleanup(LOG.logger.setLevel, current_log_level)
664
665         my_func = mock.MagicMock()
666         delayed = utils.DelayedStringRenderer(my_func)
667
668         # set to warning so we shouldn't be logging debug messages
669         LOG.logger.setLevel(logging.logging.WARNING)
670         LOG.debug("Hello %s", delayed)
671         self.assertFalse(my_func.called)
672
673         # but it should be called with the debug level
674         LOG.logger.setLevel(logging.logging.DEBUG)
675         LOG.debug("Hello %s", delayed)
676         self.assertTrue(my_func.called)
677
678
679 class TestEnsureDir(base.BaseTestCase):
680     @mock.patch('os.makedirs')
681     def test_ensure_dir_no_fail_if_exists(self, makedirs):
682         error = OSError()
683         error.errno = errno.EEXIST
684         makedirs.side_effect = error
685         utils.ensure_dir("/etc/create/concurrently")
686
687     @mock.patch('os.makedirs')
688     def test_ensure_dir_calls_makedirs(self, makedirs):
689         utils.ensure_dir("/etc/create/directory")
690         makedirs.assert_called_once_with("/etc/create/directory", 0o755)
691
692
693 class TestCamelize(base.BaseTestCase):
694     def test_camelize(self):
695         data = {'bandwidth_limit': 'BandwidthLimit',
696                 'test': 'Test',
697                 'some__more__dashes': 'SomeMoreDashes',
698                 'a_penguin_walks_into_a_bar': 'APenguinWalksIntoABar'}
699
700         for s, expected in data.items():
701             self.assertEqual(expected, utils.camelize(s))
702
703
704 class TestRoundVal(base.BaseTestCase):
705     def test_round_val_ok(self):
706         for expected, value in ((0, 0),
707                                 (0, 0.1),
708                                 (1, 0.5),
709                                 (1, 1.49),
710                                 (2, 1.5)):
711             self.assertEqual(expected, utils.round_val(value))
712
713
714 class TestGetRandomString(base.BaseTestCase):
715     def test_get_random_string(self):
716         length = 127
717         random_string = utils.get_random_string(length)
718         self.assertEqual(length, len(random_string))
719         regex = re.compile('^[0-9a-fA-F]+$')
720         self.assertIsNotNone(regex.match(random_string))
721
722
723 class TestSafeDecodeUtf8(base.BaseTestCase):
724
725     @helpers.requires_py2
726     def test_py2_does_nothing(self):
727         s = 'test-py2'
728         self.assertIs(s, utils.safe_decode_utf8(s))
729
730     @helpers.requires_py3
731     def test_py3_decoded_valid_bytes(self):
732         s = bytes('test-py2', 'utf-8')
733         decoded_str = utils.safe_decode_utf8(s)
734         self.assertIsInstance(decoded_str, six.text_type)
735         self.assertEqual(s, decoded_str.encode('utf-8'))
736
737     @helpers.requires_py3
738     def test_py3_decoded_invalid_bytes(self):
739         s = bytes('test-py2', 'utf_16')
740         decoded_str = utils.safe_decode_utf8(s)
741         self.assertIsInstance(decoded_str, six.text_type)