1 # Copyright (c) 2012 OpenStack Foundation.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
24 from neutron.common import constants
25 from neutron.common import exceptions as n_exc
26 from neutron.common import utils
27 from neutron.plugins.common import constants as p_const
28 from neutron.plugins.common import utils as plugin_utils
29 from neutron.tests import base
30 from neutron.tests.common import helpers
32 from oslo_log import log as logging
35 class TestParseMappings(base.BaseTestCase):
36 def parse(self, mapping_list, unique_values=True):
37 return utils.parse_mappings(mapping_list, unique_values)
39 def test_parse_mappings_fails_for_missing_separator(self):
40 with testtools.ExpectedException(ValueError):
43 def test_parse_mappings_fails_for_missing_key(self):
44 with testtools.ExpectedException(ValueError):
47 def test_parse_mappings_fails_for_missing_value(self):
48 with testtools.ExpectedException(ValueError):
51 def test_parse_mappings_fails_for_extra_separator(self):
52 with testtools.ExpectedException(ValueError):
53 self.parse(['key:val:junk'])
55 def test_parse_mappings_fails_for_duplicate_key(self):
56 with testtools.ExpectedException(ValueError):
57 self.parse(['key:val1', 'key:val2'])
59 def test_parse_mappings_fails_for_duplicate_value(self):
60 with testtools.ExpectedException(ValueError):
61 self.parse(['key1:val', 'key2:val'])
63 def test_parse_mappings_succeeds_for_one_mapping(self):
64 self.assertEqual(self.parse(['key:val']), {'key': 'val'})
66 def test_parse_mappings_succeeds_for_n_mappings(self):
67 self.assertEqual(self.parse(['key1:val1', 'key2:val2']),
68 {'key1': 'val1', 'key2': 'val2'})
70 def test_parse_mappings_succeeds_for_duplicate_value(self):
71 self.assertEqual(self.parse(['key1:val', 'key2:val'], False),
72 {'key1': 'val', 'key2': 'val'})
74 def test_parse_mappings_succeeds_for_no_mappings(self):
75 self.assertEqual({}, self.parse(['']))
78 class TestParseTunnelRangesMixin(object):
82 _err_prefix = "Invalid network tunnel range: '%d:%d' - "
83 _err_suffix = "%s is not a valid %s identifier."
84 _err_range = "End of tunnel range is less than start of tunnel range."
86 def _build_invalid_tunnel_range_msg(self, t_range_tuple, n):
87 bad_id = t_range_tuple[n - 1]
88 return (self._err_prefix % t_range_tuple) + (self._err_suffix
89 % (bad_id, self.TYPE))
91 def _build_range_reversed_msg(self, t_range_tuple):
92 return (self._err_prefix % t_range_tuple) + self._err_range
94 def _verify_range(self, tunnel_range):
95 return plugin_utils.verify_tunnel_range(tunnel_range, self.TYPE)
97 def _check_range_valid_ranges(self, tunnel_range):
98 self.assertIsNone(self._verify_range(tunnel_range))
100 def _check_range_invalid_ranges(self, bad_range, which):
101 expected_msg = self._build_invalid_tunnel_range_msg(bad_range, which)
102 err = self.assertRaises(n_exc.NetworkTunnelRangeError,
103 self._verify_range, bad_range)
104 self.assertEqual(expected_msg, str(err))
106 def _check_range_reversed(self, bad_range):
107 err = self.assertRaises(n_exc.NetworkTunnelRangeError,
108 self._verify_range, bad_range)
109 expected_msg = self._build_range_reversed_msg(bad_range)
110 self.assertEqual(expected_msg, str(err))
112 def test_range_tunnel_id_valid(self):
113 self._check_range_valid_ranges((self.TUN_MIN, self.TUN_MAX))
115 def test_range_tunnel_id_invalid(self):
116 self._check_range_invalid_ranges((-1, self.TUN_MAX), 1)
117 self._check_range_invalid_ranges((self.TUN_MIN,
118 self.TUN_MAX + 1), 2)
119 self._check_range_invalid_ranges((self.TUN_MIN - 1,
120 self.TUN_MAX + 1), 1)
122 def test_range_tunnel_id_reversed(self):
123 self._check_range_reversed((self.TUN_MAX, self.TUN_MIN))
126 class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
128 TUN_MIN = p_const.MIN_GRE_ID
129 TUN_MAX = p_const.MAX_GRE_ID
130 TYPE = p_const.TYPE_GRE
133 class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
135 TUN_MIN = p_const.MIN_VXLAN_VNI
136 TUN_MAX = p_const.MAX_VXLAN_VNI
137 TYPE = p_const.TYPE_VXLAN
140 class UtilTestParseVlanRanges(base.BaseTestCase):
141 _err_prefix = "Invalid network VLAN range: '"
142 _err_too_few = "' - 'need more than 2 values to unpack'."
143 _err_too_many_prefix = "' - 'too many values to unpack"
144 _err_not_int = "' - 'invalid literal for int() with base 10: '%s''."
145 _err_bad_vlan = "' - '%s is not a valid VLAN tag'."
146 _err_range = "' - 'End of VLAN range is less than start of VLAN range'."
148 def _range_too_few_err(self, nv_range):
149 return self._err_prefix + nv_range + self._err_too_few
151 def _range_too_many_err_prefix(self, nv_range):
152 return self._err_prefix + nv_range + self._err_too_many_prefix
154 def _vlan_not_int_err(self, nv_range, vlan):
155 return self._err_prefix + nv_range + (self._err_not_int % vlan)
157 def _nrange_invalid_vlan(self, nv_range, n):
158 vlan = nv_range.split(':')[n]
159 v_range = ':'.join(nv_range.split(':')[1:])
160 return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
162 def _vrange_invalid_vlan(self, v_range_tuple, n):
163 vlan = v_range_tuple[n - 1]
164 v_range_str = '%d:%d' % v_range_tuple
165 return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
167 def _vrange_invalid(self, v_range_tuple):
168 v_range_str = '%d:%d' % v_range_tuple
169 return self._err_prefix + v_range_str + self._err_range
172 class TestVlanNetworkNameValid(base.BaseTestCase):
173 def parse_vlan_ranges(self, vlan_range):
174 return plugin_utils.parse_network_vlan_ranges(vlan_range)
176 def test_validate_provider_phynet_name_mixed(self):
177 self.assertRaises(n_exc.PhysicalNetworkNameError,
178 self.parse_vlan_ranges,
179 ['', ':23:30', 'physnet1',
180 'tenant_net:100:200'])
182 def test_validate_provider_phynet_name_bad(self):
183 self.assertRaises(n_exc.PhysicalNetworkNameError,
184 self.parse_vlan_ranges,
188 class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
189 def verify_range(self, vlan_range):
190 return plugin_utils.verify_vlan_range(vlan_range)
192 def test_range_valid_ranges(self):
193 self.assertIsNone(self.verify_range((1, 2)))
194 self.assertIsNone(self.verify_range((1, 1999)))
195 self.assertIsNone(self.verify_range((100, 100)))
196 self.assertIsNone(self.verify_range((100, 200)))
197 self.assertIsNone(self.verify_range((4001, 4094)))
198 self.assertIsNone(self.verify_range((1, 4094)))
200 def check_one_vlan_invalid(self, bad_range, which):
201 expected_msg = self._vrange_invalid_vlan(bad_range, which)
202 err = self.assertRaises(n_exc.NetworkVlanRangeError,
203 self.verify_range, bad_range)
204 self.assertEqual(str(err), expected_msg)
206 def test_range_first_vlan_invalid_negative(self):
207 self.check_one_vlan_invalid((-1, 199), 1)
209 def test_range_first_vlan_invalid_zero(self):
210 self.check_one_vlan_invalid((0, 199), 1)
212 def test_range_first_vlan_invalid_limit_plus_one(self):
213 self.check_one_vlan_invalid((4095, 199), 1)
215 def test_range_first_vlan_invalid_too_big(self):
216 self.check_one_vlan_invalid((9999, 199), 1)
218 def test_range_second_vlan_invalid_negative(self):
219 self.check_one_vlan_invalid((299, -1), 2)
221 def test_range_second_vlan_invalid_zero(self):
222 self.check_one_vlan_invalid((299, 0), 2)
224 def test_range_second_vlan_invalid_limit_plus_one(self):
225 self.check_one_vlan_invalid((299, 4095), 2)
227 def test_range_second_vlan_invalid_too_big(self):
228 self.check_one_vlan_invalid((299, 9999), 2)
230 def test_range_both_vlans_invalid_01(self):
231 self.check_one_vlan_invalid((-1, 0), 1)
233 def test_range_both_vlans_invalid_02(self):
234 self.check_one_vlan_invalid((0, 4095), 1)
236 def test_range_both_vlans_invalid_03(self):
237 self.check_one_vlan_invalid((4095, 9999), 1)
239 def test_range_both_vlans_invalid_04(self):
240 self.check_one_vlan_invalid((9999, -1), 1)
242 def test_range_reversed(self):
244 expected_msg = self._vrange_invalid(bad_range)
245 err = self.assertRaises(n_exc.NetworkVlanRangeError,
246 self.verify_range, bad_range)
247 self.assertEqual(str(err), expected_msg)
250 class TestParseOneVlanRange(UtilTestParseVlanRanges):
251 def parse_one(self, cfg_entry):
252 return plugin_utils.parse_network_vlan_range(cfg_entry)
254 def test_parse_one_net_no_vlan_range(self):
256 expected_networks = ("net1", None)
257 self.assertEqual(self.parse_one(config_str), expected_networks)
259 def test_parse_one_net_and_vlan_range(self):
260 config_str = "net1:100:199"
261 expected_networks = ("net1", (100, 199))
262 self.assertEqual(self.parse_one(config_str), expected_networks)
264 def test_parse_one_net_incomplete_range(self):
265 config_str = "net1:100"
266 expected_msg = self._range_too_few_err(config_str)
267 err = self.assertRaises(n_exc.NetworkVlanRangeError,
268 self.parse_one, config_str)
269 self.assertEqual(str(err), expected_msg)
271 def test_parse_one_net_range_too_many(self):
272 config_str = "net1:100:150:200"
273 expected_msg_prefix = self._range_too_many_err_prefix(config_str)
274 err = self.assertRaises(n_exc.NetworkVlanRangeError,
275 self.parse_one, config_str)
276 # The error message is not same in Python 2 and Python 3. In Python 3,
277 # it depends on the amount of values used when unpacking, so it cannot
278 # be predicted as a fixed string.
279 self.assertTrue(str(err).startswith(expected_msg_prefix))
281 def test_parse_one_net_vlan1_not_int(self):
282 config_str = "net1:foo:199"
283 expected_msg = self._vlan_not_int_err(config_str, 'foo')
284 err = self.assertRaises(n_exc.NetworkVlanRangeError,
285 self.parse_one, config_str)
286 self.assertEqual(str(err), expected_msg)
288 def test_parse_one_net_vlan2_not_int(self):
289 config_str = "net1:100:bar"
290 expected_msg = self._vlan_not_int_err(config_str, 'bar')
291 err = self.assertRaises(n_exc.NetworkVlanRangeError,
292 self.parse_one, config_str)
293 self.assertEqual(str(err), expected_msg)
295 def test_parse_one_net_and_max_range(self):
296 config_str = "net1:1:4094"
297 expected_networks = ("net1", (1, 4094))
298 self.assertEqual(self.parse_one(config_str), expected_networks)
300 def test_parse_one_net_range_bad_vlan1(self):
301 config_str = "net1:9000:150"
302 expected_msg = self._nrange_invalid_vlan(config_str, 1)
303 err = self.assertRaises(n_exc.NetworkVlanRangeError,
304 self.parse_one, config_str)
305 self.assertEqual(str(err), expected_msg)
307 def test_parse_one_net_range_bad_vlan2(self):
308 config_str = "net1:4000:4999"
309 expected_msg = self._nrange_invalid_vlan(config_str, 2)
310 err = self.assertRaises(n_exc.NetworkVlanRangeError,
311 self.parse_one, config_str)
312 self.assertEqual(str(err), expected_msg)
315 class TestParseVlanRangeList(UtilTestParseVlanRanges):
316 def parse_list(self, cfg_entries):
317 return plugin_utils.parse_network_vlan_ranges(cfg_entries)
319 def test_parse_list_one_net_no_vlan_range(self):
320 config_list = ["net1"]
321 expected_networks = {"net1": []}
322 self.assertEqual(self.parse_list(config_list), expected_networks)
324 def test_parse_list_one_net_vlan_range(self):
325 config_list = ["net1:100:199"]
326 expected_networks = {"net1": [(100, 199)]}
327 self.assertEqual(self.parse_list(config_list), expected_networks)
329 def test_parse_two_nets_no_vlan_range(self):
330 config_list = ["net1",
332 expected_networks = {"net1": [],
334 self.assertEqual(self.parse_list(config_list), expected_networks)
336 def test_parse_two_nets_range_and_no_range(self):
337 config_list = ["net1:100:199",
339 expected_networks = {"net1": [(100, 199)],
341 self.assertEqual(self.parse_list(config_list), expected_networks)
343 def test_parse_two_nets_no_range_and_range(self):
344 config_list = ["net1",
346 expected_networks = {"net1": [],
347 "net2": [(200, 299)]}
348 self.assertEqual(self.parse_list(config_list), expected_networks)
350 def test_parse_two_nets_bad_vlan_range1(self):
351 config_list = ["net1:100",
353 expected_msg = self._range_too_few_err(config_list[0])
354 err = self.assertRaises(n_exc.NetworkVlanRangeError,
355 self.parse_list, config_list)
356 self.assertEqual(str(err), expected_msg)
358 def test_parse_two_nets_vlan_not_int2(self):
359 config_list = ["net1:100:199",
361 expected_msg = self._vlan_not_int_err(config_list[1], '0x200')
362 err = self.assertRaises(n_exc.NetworkVlanRangeError,
363 self.parse_list, config_list)
364 self.assertEqual(str(err), expected_msg)
366 def test_parse_two_nets_and_append_1_2(self):
367 config_list = ["net1:100:199",
370 expected_networks = {"net1": [(100, 199),
372 "net2": [(200, 299)]}
373 self.assertEqual(self.parse_list(config_list), expected_networks)
375 def test_parse_two_nets_and_append_1_3(self):
376 config_list = ["net1:100:199",
379 expected_networks = {"net1": [(100, 199),
381 "net2": [(200, 299)]}
382 self.assertEqual(self.parse_list(config_list), expected_networks)
385 class TestDictUtils(base.BaseTestCase):
386 def test_dict2str(self):
387 dic = {"key1": "value1", "key2": "value2", "key3": "value3"}
388 expected = "key1=value1,key2=value2,key3=value3"
389 self.assertEqual(utils.dict2str(dic), expected)
391 def test_str2dict(self):
392 string = "key1=value1,key2=value2,key3=value3"
393 expected = {"key1": "value1", "key2": "value2", "key3": "value3"}
394 self.assertEqual(utils.str2dict(string), expected)
396 def test_dict_str_conversion(self):
397 dic = {"key1": "value1", "key2": "value2"}
398 self.assertEqual(utils.str2dict(utils.dict2str(dic)), dic)
400 def test_diff_list_of_dict(self):
401 old_list = [{"key1": "value1"},
404 new_list = [{"key1": "value1"},
407 added, removed = utils.diff_list_of_dict(old_list, new_list)
408 self.assertEqual(added, [dict(key4="value4")])
409 self.assertEqual(removed, [dict(key3="value3")])
412 class _CachingDecorator(object):
414 self.func_retval = 'bar'
415 self._cache = mock.Mock()
417 @utils.cache_method_results
418 def func(self, *args, **kwargs):
419 return self.func_retval
422 class TestCachingDecorator(base.BaseTestCase):
424 super(TestCachingDecorator, self).setUp()
425 self.decor = _CachingDecorator()
426 self.func_name = '%(module)s._CachingDecorator.func' % {
427 'module': self.__module__
429 self.not_cached = self.decor.func.func.__self__._not_cached
431 def test_cache_miss(self):
432 expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
434 kwargs = {'foo': 'bar'}
435 self.decor._cache.get.return_value = self.not_cached
436 retval = self.decor.func(*args, **kwargs)
437 self.decor._cache.set.assert_called_once_with(
438 expected_key, self.decor.func_retval, None)
439 self.assertEqual(self.decor.func_retval, retval)
441 def test_cache_hit(self):
442 expected_key = (self.func_name, 1, 2, ('foo', 'bar'))
444 kwargs = {'foo': 'bar'}
445 retval = self.decor.func(*args, **kwargs)
446 self.assertFalse(self.decor._cache.set.called)
447 self.assertEqual(self.decor._cache.get.return_value, retval)
448 self.decor._cache.get.assert_called_once_with(expected_key,
451 def test_get_unhashable(self):
452 expected_key = (self.func_name, [1], 2)
453 self.decor._cache.get.side_effect = TypeError
454 retval = self.decor.func([1], 2)
455 self.assertFalse(self.decor._cache.set.called)
456 self.assertEqual(self.decor.func_retval, retval)
457 self.decor._cache.get.assert_called_once_with(expected_key,
460 def test_missing_cache(self):
461 delattr(self.decor, '_cache')
462 self.assertRaises(NotImplementedError, self.decor.func, (1, 2))
464 def test_no_cache(self):
465 self.decor._cache = False
466 retval = self.decor.func((1, 2))
467 self.assertEqual(self.decor.func_retval, retval)
470 class TestDict2Tuples(base.BaseTestCase):
472 input_dict = {'foo': 'bar', '42': 'baz', 'aaa': 'zzz'}
473 expected = (('42', 'baz'), ('aaa', 'zzz'), ('foo', 'bar'))
474 output_tuple = utils.dict2tuple(input_dict)
475 self.assertEqual(expected, output_tuple)
478 class TestExceptionLogger(base.BaseTestCase):
479 def test_normal_call(self):
482 @utils.exception_logger()
486 self.assertEqual(result, func())
488 def test_raise(self):
491 @utils.exception_logger()
493 raise RuntimeError(result)
495 self.assertRaises(RuntimeError, func)
497 def test_spawn_normal(self):
501 @utils.exception_logger(logger=logger)
505 gt = eventlet.spawn(func)
506 self.assertEqual(result, gt.wait())
507 self.assertFalse(logger.called)
509 def test_spawn_raise(self):
513 @utils.exception_logger(logger=logger)
515 raise RuntimeError(result)
517 gt = eventlet.spawn(func)
518 self.assertRaises(RuntimeError, gt.wait)
519 self.assertTrue(logger.called)
521 def test_pool_spawn_normal(self):
525 @utils.exception_logger(logger=logger)
529 pool = eventlet.GreenPool(4)
530 for i in range(0, 4):
534 calls.assert_has_calls([mock.call(0), mock.call(1),
535 mock.call(2), mock.call(3)],
537 self.assertFalse(logger.called)
539 def test_pool_spawn_raise(self):
543 @utils.exception_logger(logger=logger)
546 raise RuntimeError(2)
550 pool = eventlet.GreenPool(4)
551 for i in range(0, 4):
555 calls.assert_has_calls([mock.call(0), mock.call(1), mock.call(3)],
557 self.assertTrue(logger.called)
560 class TestDvrServices(base.BaseTestCase):
562 def _test_is_dvr_serviced(self, device_owner, expected):
563 self.assertEqual(expected, utils.is_dvr_serviced(device_owner))
565 def test_is_dvr_serviced_with_lb_port(self):
566 self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCER, True)
568 def test_is_dvr_serviced_with_lbv2_port(self):
569 self._test_is_dvr_serviced(constants.DEVICE_OWNER_LOADBALANCERV2, True)
571 def test_is_dvr_serviced_with_dhcp_port(self):
572 self._test_is_dvr_serviced(constants.DEVICE_OWNER_DHCP, True)
574 def test_is_dvr_serviced_with_vm_port(self):
575 self._test_is_dvr_serviced(constants.DEVICE_OWNER_COMPUTE_PREFIX, True)
578 class TestIpToCidr(base.BaseTestCase):
579 def test_ip_to_cidr_ipv4_default(self):
580 self.assertEqual('15.1.2.3/32', utils.ip_to_cidr('15.1.2.3'))
582 def test_ip_to_cidr_ipv4_prefix(self):
583 self.assertEqual('15.1.2.3/24', utils.ip_to_cidr('15.1.2.3', 24))
585 def test_ip_to_cidr_ipv4_netaddr(self):
586 ip_address = netaddr.IPAddress('15.1.2.3')
587 self.assertEqual('15.1.2.3/32', utils.ip_to_cidr(ip_address))
589 def test_ip_to_cidr_ipv4_bad_prefix(self):
590 self.assertRaises(netaddr.core.AddrFormatError,
591 utils.ip_to_cidr, '15.1.2.3', 33)
593 def test_ip_to_cidr_ipv6_default(self):
594 self.assertEqual('::1/128', utils.ip_to_cidr('::1'))
596 def test_ip_to_cidr_ipv6_prefix(self):
597 self.assertEqual('::1/64', utils.ip_to_cidr('::1', 64))
599 def test_ip_to_cidr_ipv6_bad_prefix(self):
600 self.assertRaises(netaddr.core.AddrFormatError,
601 utils.ip_to_cidr, '2000::1', 129)
604 class TestCidrIsHost(base.BaseTestCase):
605 def test_is_cidr_host_ipv4(self):
606 self.assertTrue(utils.is_cidr_host('15.1.2.3/32'))
608 def test_is_cidr_host_ipv4_not_cidr(self):
609 self.assertRaises(ValueError,
613 def test_is_cidr_host_ipv6(self):
614 self.assertTrue(utils.is_cidr_host('2000::1/128'))
616 def test_is_cidr_host_ipv6_netaddr(self):
617 net = netaddr.IPNetwork("2000::1")
618 self.assertTrue(utils.is_cidr_host(net))
620 def test_is_cidr_host_ipv6_32(self):
621 self.assertFalse(utils.is_cidr_host('2000::1/32'))
623 def test_is_cidr_host_ipv6_not_cidr(self):
624 self.assertRaises(ValueError,
628 def test_is_cidr_host_ipv6_not_cidr_netaddr(self):
629 ip_address = netaddr.IPAddress("2000::3")
630 self.assertRaises(ValueError,
635 class TestIpVersionFromInt(base.BaseTestCase):
636 def test_ip_version_from_int_ipv4(self):
637 self.assertEqual(utils.ip_version_from_int(4),
640 def test_ip_version_from_int_ipv6(self):
641 self.assertEqual(utils.ip_version_from_int(6),
644 def test_ip_version_from_int_illegal_int(self):
645 self.assertRaises(ValueError,
646 utils.ip_version_from_int,
650 class TestDelayedStringRenderer(base.BaseTestCase):
651 def test_call_deferred_until_str(self):
652 my_func = mock.MagicMock(return_value='Brie cheese!')
653 delayed = utils.DelayedStringRenderer(my_func, 1, 2, key_arg=44)
654 self.assertFalse(my_func.called)
655 string = "Type: %s" % delayed
656 my_func.assert_called_once_with(1, 2, key_arg=44)
657 self.assertEqual("Type: Brie cheese!", string)
659 def test_not_called_with_low_log_level(self):
660 LOG = logging.getLogger(__name__)
661 # make sure we return logging to previous level
662 current_log_level = LOG.logger.getEffectiveLevel()
663 self.addCleanup(LOG.logger.setLevel, current_log_level)
665 my_func = mock.MagicMock()
666 delayed = utils.DelayedStringRenderer(my_func)
668 # set to warning so we shouldn't be logging debug messages
669 LOG.logger.setLevel(logging.logging.WARNING)
670 LOG.debug("Hello %s", delayed)
671 self.assertFalse(my_func.called)
673 # but it should be called with the debug level
674 LOG.logger.setLevel(logging.logging.DEBUG)
675 LOG.debug("Hello %s", delayed)
676 self.assertTrue(my_func.called)
679 class TestEnsureDir(base.BaseTestCase):
680 @mock.patch('os.makedirs')
681 def test_ensure_dir_no_fail_if_exists(self, makedirs):
683 error.errno = errno.EEXIST
684 makedirs.side_effect = error
685 utils.ensure_dir("/etc/create/concurrently")
687 @mock.patch('os.makedirs')
688 def test_ensure_dir_calls_makedirs(self, makedirs):
689 utils.ensure_dir("/etc/create/directory")
690 makedirs.assert_called_once_with("/etc/create/directory", 0o755)
693 class TestCamelize(base.BaseTestCase):
694 def test_camelize(self):
695 data = {'bandwidth_limit': 'BandwidthLimit',
697 'some__more__dashes': 'SomeMoreDashes',
698 'a_penguin_walks_into_a_bar': 'APenguinWalksIntoABar'}
700 for s, expected in data.items():
701 self.assertEqual(expected, utils.camelize(s))
704 class TestRoundVal(base.BaseTestCase):
705 def test_round_val_ok(self):
706 for expected, value in ((0, 0),
711 self.assertEqual(expected, utils.round_val(value))
714 class TestGetRandomString(base.BaseTestCase):
715 def test_get_random_string(self):
717 random_string = utils.get_random_string(length)
718 self.assertEqual(length, len(random_string))
719 regex = re.compile('^[0-9a-fA-F]+$')
720 self.assertIsNotNone(regex.match(random_string))
723 class TestSafeDecodeUtf8(base.BaseTestCase):
725 @helpers.requires_py2
726 def test_py2_does_nothing(self):
728 self.assertIs(s, utils.safe_decode_utf8(s))
730 @helpers.requires_py3
731 def test_py3_decoded_valid_bytes(self):
732 s = bytes('test-py2', 'utf-8')
733 decoded_str = utils.safe_decode_utf8(s)
734 self.assertIsInstance(decoded_str, six.text_type)
735 self.assertEqual(s, decoded_str.encode('utf-8'))
737 @helpers.requires_py3
738 def test_py3_decoded_invalid_bytes(self):
739 s = bytes('test-py2', 'utf_16')
740 decoded_str = utils.safe_decode_utf8(s)
741 self.assertIsInstance(decoded_str, six.text_type)