The function `xrange` was renamed to `range` in Python 3.
* Remove `xrange` occurences so that Python 3 tests can pass. Use
`six.moves.range` instead to get the right function in both cases.
* Generalize the use of the efficient `range` (ex-`xrange`) in
critical sections (when iterating over large lists).
* Simplify code.
* Add a hacking check to prevent future usage of `xrange`.
Change-Id: I080acaaa1d4753619fbbb76dddba6d946d84e73f
Partially implements: blueprint neutron-python3
- [N322] Detect common errors with assert_called_once_with
- [N323] Enforce namespace-less imports for oslo libraries
- [N324] Prevent use of deprecated contextlib.nested.
+- [N325] Python 3: Do not use xrange.
Creating Unit Tests
-------------------
criteria_list = []
for i, sort in enumerate(sorts):
crit_attrs = [(getattr(model, sorts[j][0]) == marker_values[j])
- for j in moves.xrange(i)]
+ for j in moves.range(i)]
model_attr = getattr(model, sort[0])
if sort[1]:
crit_attrs.append((model_attr > marker_values[i]))
yield(0, msg)
+def check_python3_xrange(logical_line):
+ if re.search(r"\bxrange\s*\(", logical_line):
+ yield(0, "N325: Do not use xrange. Use range, or six.moves.range for "
+ "large loops.")
+
+
def factory(register):
register(validate_log_translations)
register(use_jsonutils)
register(no_translate_debug_logs)
register(check_oslo_namespace_imports)
register(check_no_contextlib_nested)
+ register(check_python3_xrange)
min_vlan_search = vlan_id or MIN_VLAN
max_vlan_search = (vlan_id + 1) if vlan_id else MAX_VLAN
- for vlan in moves.xrange(min_vlan_search, max_vlan_search):
+ for vlan in moves.range(min_vlan_search, max_vlan_search):
if vlan not in self.vlans:
self.vlans.add(vlan)
return vlan
return False
test_iface = None
- for seg_id in moves.xrange(1, p_const.MAX_VXLAN_VNI + 1):
+ for seg_id in moves.range(1, p_const.MAX_VXLAN_VNI + 1):
if not ip_lib.device_exists(
self.get_vxlan_device_name(seg_id)):
test_iface = self.ensure_vxlan(seg_id)
"%(tun_min)s:%(tun_max)s"),
{'tun_min': tun_min, 'tun_max': tun_max})
else:
- gre_ids |= set(moves.xrange(tun_min, tun_max + 1))
+ gre_ids |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
try:
# this physical network
vlan_ids = set()
for vlan_min, vlan_max in vlan_ranges:
- vlan_ids |= set(moves.xrange(vlan_min, vlan_max + 1))
+ vlan_ids |= set(moves.range(vlan_min, vlan_max + 1))
# remove from table unallocated vlans not currently
# allocatable
"%(tun_min)s:%(tun_max)s"),
{'tun_min': tun_min, 'tun_max': tun_max})
else:
- vxlan_vnis |= set(moves.xrange(tun_min, tun_max + 1))
+ vxlan_vnis |= set(moves.range(tun_min, tun_max + 1))
session = db_api.get_session()
with session.begin(subtransactions=True):
super(OVSNeutronAgent, self).__init__()
self.use_veth_interconnection = use_veth_interconnection
self.veth_mtu = veth_mtu
- self.available_local_vlans = set(moves.xrange(p_const.MIN_VLAN_TAG,
- p_const.MAX_VLAN_TAG))
+ self.available_local_vlans = set(moves.range(p_const.MIN_VLAN_TAG,
+ p_const.MAX_VLAN_TAG))
self.use_call = True
self.tunnel_types = tunnel_types or []
self.l2_pop = l2_population
import eventlet
-from six import moves
-
from neutron.agent.linux import async_process
from neutron.tests import base
def setUp(self):
super(AsyncProcessTestFramework, self).setUp()
self.test_file_path = self.get_temp_file_path('test_async_process.tmp')
- self.data = [str(x) for x in moves.xrange(4)]
+ self.data = [str(x) for x in range(4)]
with file(self.test_file_path, 'w') as f:
f.writelines('%s\n' % item for item in self.data)
def spawn_n_children(self, n, service=None):
self._child_processes = []
- for child_number in moves.xrange(n):
+ for child_number in moves.range(n):
uuid = self._child_uuid(child_number)
_callback = self._make_cmdline_callback(uuid)
pm = external_process.ProcessManager(
import copy
import hashlib
-from six.moves import http_client as httplib
import json
import posixpath
import re
import OpenSSL
from oslo_log import log as logging
from six import moves
+from six.moves import http_client as httplib
from tempest_lib import exceptions as lib_exc
from neutron.tests.tempest import exceptions as exc
# Also try Subject Alternative Names for a match
san_list = None
- for i in moves.xrange(x509.get_extension_count()):
+ for i in moves.range(x509.get_extension_count()):
ext = x509.get_extension(i)
if ext.get_short_name() == 'subjectAltName':
san_list = str(ext)
import netaddr
from oslo_log import log
import oslo_messaging
+from six import moves
from testtools import matchers
from neutron.agent.common import config as agent_config
ipv6_subnet_modes = [subnet_mode_none] * count
elif len(ipv6_subnet_modes) != count:
ipv6_subnet_modes.extend([subnet_mode_none for i in
- xrange(len(ipv6_subnet_modes), count)])
+ moves.range(len(ipv6_subnet_modes),
+ count)])
if ip_version == 4:
ip_pool = '35.4.%i.4'
fixed_ips, subnets = [], []
num_existing_subnets = len(subnets)
- for i in xrange(count):
+ for i in moves.range(count):
subnet_id = _uuid()
fixed_ips.append(
{'ip_address': ip_pool % (i + num_existing_subnets),
import mock
from oslo_config import cfg
+from six import moves
import six.moves.urllib.parse as urlparse
import webob
from webob import exc
resource += 's'
num = len(initial_input[resource]) if initial_input and isinstance(
initial_input[resource], list) else 1
- expected = [expected_item for x in xrange(num)]
+ expected = [expected_item for x in moves.range(num)]
self.assertEqual(expected, dhcp_notifier.call_args_list)
self.assertEqual(num, dhcp_notifier.call_count)
self.assertEqual(expected_code, res.status_int)
class HackingTestCase(base.BaseTestCase):
+ def assertLinePasses(self, func, line):
+ with testtools.ExpectedException(StopIteration):
+ next(func(line))
+
+ def assertLineFails(self, func, line):
+ self.assertIsInstance(next(func(line)), tuple)
+
def test_log_translations(self):
expected_marks = {
'error': '_LE',
"neutron/tests/test_assert.py"))))
def test_check_oslo_namespace_imports(self):
- def check(s, fail=True):
- func = checks.check_oslo_namespace_imports
- if fail:
- self.assertIsInstance(next(func(s)), tuple)
- else:
- with testtools.ExpectedException(StopIteration):
- next(func(s))
-
- check('from oslo_utils import importutils', fail=False)
- check('import oslo_messaging', fail=False)
- check('from oslo.utils import importutils')
- check('from oslo import messaging')
- check('import oslo.messaging')
+ f = checks.check_oslo_namespace_imports
+ self.assertLinePasses(f, 'from oslo_utils import importutils')
+ self.assertLinePasses(f, 'import oslo_messaging')
+ self.assertLineFails(f, 'from oslo.utils import importutils')
+ self.assertLineFails(f, 'from oslo import messaging')
+ self.assertLineFails(f, 'import oslo.messaging')
+
+ def test_check_python3_xrange(self):
+ f = checks.check_python3_xrange
+ self.assertLineFails(f, 'a = xrange(1000)')
+ self.assertLineFails(f, 'b =xrange ( 42 )')
+ self.assertLineFails(f, 'c = xrange(1, 10, 2)')
+ self.assertLinePasses(f, 'd = range(1000)')
+ self.assertLinePasses(f, 'e = six.moves.range(1337)')
def test_vlan_pool(self):
vlan_ids = set()
- for x in moves.xrange(VLAN_MIN, VLAN_MAX + 1):
+ for x in moves.range(VLAN_MIN, VLAN_MAX + 1):
(physical_network, seg_type,
vlan_id, m_ip) = n1kv_db_v2.reserve_vlan(self.session, self.net_p)
self.assertEqual(physical_network, PHYS_NET)
def test_vxlan_pool(self):
vxlan_ids = set()
- for x in moves.xrange(VXLAN_MIN, VXLAN_MAX + 1):
+ for x in moves.range(VXLAN_MIN, VXLAN_MAX + 1):
vxlan = n1kv_db_v2.reserve_vxlan(self.session, self.net_p)
vxlan_id = vxlan[2]
self.assertThat(vxlan_id, matchers.GreaterThan(VXLAN_MIN - 1))
api.PHYSICAL_NETWORK: 'None',
api.SEGMENTATION_ID: None}
- for x in moves.xrange(TUN_MIN, TUN_MAX + 1):
+ for x in moves.range(TUN_MIN, TUN_MAX + 1):
segment = self.driver.reserve_provider_segment(self.session,
specs)
self.assertEqual(self.TYPE, segment[api.NETWORK_TYPE])
def test_allocate_tenant_segment(self):
tunnel_ids = set()
- for x in moves.xrange(TUN_MIN, TUN_MAX + 1):
+ for x in moves.range(TUN_MIN, TUN_MAX + 1):
segment = self.driver.allocate_tenant_segment(self.session)
self.assertThat(segment[api.SEGMENTATION_ID],
matchers.GreaterThan(TUN_MIN - 1))
import mock
from oslo_config import cfg
+from six import moves
import testtools
from neutron.agent.common import ovs_lib
# Ensure vif_ports_scenario is longer than DAEMON_LOOP_COUNT
if len(self.vif_ports_scenario) < DAEMON_LOOP_COUNT:
self.vif_ports_scenario.extend(
- [] for _i in xrange(DAEMON_LOOP_COUNT -
- len(self.vif_ports_scenario)))
+ [] for _i in moves.range(DAEMON_LOOP_COUNT -
+ len(self.vif_ports_scenario)))
with contextlib.nested(
mock.patch.object(time, 'sleep', side_effect=sleep_mock),
tb = root_tb
tracebacks = [tb]
- for x in moves.xrange(len(ignored_bit_array) - 1):
+ for x in moves.range(len(ignored_bit_array) - 1):
tb.tb_next = mock.Mock()
tb = tb.tb_next
tracebacks.append(tb)