from oslo_utils import encodeutils
from oslo_utils import excutils
from oslo_utils import units
+from six.moves import range
from cinder.backup import driver
from cinder import exception
else:
zeroes = '\0' * length
chunks = int(length / self.chunk_size)
- for chunk in xrange(0, chunks):
+ for chunk in range(0, chunks):
LOG.debug("Writing zeroes chunk %d", chunk)
volume.write(zeroes)
volume.flush()
LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred",
{'chunks': chunks, 'bytes': self.chunk_size})
- for chunk in xrange(0, chunks):
+ for chunk in range(0, chunks):
before = time.time()
data = src.read(self.chunk_size)
# If we have reach end of source, discard any extraneous bytes from
"""Implementation of paginate query."""
from oslo_log import log as logging
+from six.moves import range
import sqlalchemy
from cinder import exception
# Build up an array of sort criteria as in the docstring
criteria_list = []
- for i in xrange(0, len(sort_keys)):
+ for i in range(0, len(sort_keys)):
crit_attrs = []
- for j in xrange(0, i):
+ for j in range(0, i):
model_attr = getattr(model, sort_keys[j])
crit_attrs.append((model_attr == marker_values[j]))
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import six
+from six.moves import range
from six.moves import urllib
from cinder import exception
glanceclient.exc.CommunicationError)
num_attempts = 1 + CONF.glance_num_retries
- for attempt in xrange(1, num_attempts + 1):
+ for attempt in range(1, num_attempts + 1):
client = self.client or self._create_onetime_client(context,
version)
try:
self.assertIsNone(request.cached_resource_by_id('r-0'))
resources = []
- for x in xrange(3):
+ for x in range(3):
resources.append({'id': 'r-%s' % x})
# Cache an empty list of resources using the default name
r = wsgi.Request.blank('/foo')
resources = []
- for x in xrange(3):
+ for x in range(3):
resources.append({'id': 'id%s' % x})
# Store 2
from oslo_serialization import jsonutils
import six
from six.moves import http_client
+from six.moves import range
import webob
from cinder.api.v1 import limits
def _check(self, num, verb, url, username=None):
"""Check and yield results from checks."""
- for x in xrange(num):
+ for x in range(num):
yield self.limiter.check_for_delay(verb, url, username)[0]
def _check_sum(self, num, verb, url, username=None):
from oslo_serialization import jsonutils
import six
from six.moves import http_client
+from six.moves import range
import webob
from cinder.api.v2 import limits
def _check(self, num, verb, url, username=None):
"""Check and yield results from checks."""
- for x in xrange(num):
+ for x in range(num):
yield self.limiter.check_for_delay(verb, url, username)[0]
def _check_sum(self, num, verb, url, username=None):
from oslo_config import cfg
from oslo_utils import timeutils
import six
+from six.moves import range
from six.moves import urllib
import webob
filters=None,
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
- for i in xrange(CONF.osapi_max_limit)]
+ for i in range(CONF.osapi_max_limit)]
if limit is None or limit >= len(vols):
return vols
return vols[:limit]
filters=None,
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
- for i in xrange(100)]
+ for i in range(100)]
if limit is None or limit >= len(vols):
return vols
return vols[:limit]
filters=None,
viewable_admin_meta=False):
vols = [stubs.stub_volume(i)
- for i in xrange(CONF.osapi_max_limit + 100)]
+ for i in range(CONF.osapi_max_limit + 100)]
if limit is None or limit >= len(vols):
return vols
return vols[:limit]
mock.Mock(return_value=mock_remotefsclient))
# Remove tempdir.
self.addCleanup(shutil.rmtree, self.temp_dir)
- for _i in xrange(0, 128):
+ for _i in range(0, 128):
self.volume_file.write(os.urandom(1024))
def test_backup_uncompressed(self):
# the ChanceWeigher
hm = host_manager.HostManager()
fake_hosts = [host_manager.HostState('fake_host%s' % x)
- for x in xrange(1, 5)]
+ for x in range(1, 5)]
weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher')
self.assertEqual(4, len(weighed_hosts))
super(HostManagerTestCase, self).setUp()
self.host_manager = host_manager.HostManager()
self.fake_hosts = [host_manager.HostState('fake_host%s' % x)
- for x in xrange(1, 5)]
+ for x in range(1, 5)]
def test_choose_host_filters_not_found(self):
self.flags(scheduler_default_filters='FakeFilterClass3')
# Get host_state_map and make sure we have the first 4 hosts
host_state_map = self.host_manager.host_state_map
self.assertEqual(len(host_state_map), 3)
- for i in xrange(3):
+ for i in range(3):
volume_node = services[i]
host = volume_node['host']
self.assertEqual(host_state_map[host].service, volume_node)
# down, host4 is missing capabilities)
host_state_map = self.host_manager.host_state_map
self.assertEqual(len(host_state_map), 2)
- for i in xrange(2):
+ for i in range(2):
volume_node = services[i]
host = volume_node['host']
self.assertEqual(host_state_map[host].service,
from oslo_log import log as logging
from oslo_serialization import jsonutils
import six
+from six.moves import range
from cinder.backup import driver
from cinder.backup.drivers import ceph
# Create a file with some data in it.
self.volume_file = tempfile.NamedTemporaryFile()
self.addCleanup(self.volume_file.close)
- for _i in xrange(0, self.num_chunks):
+ for _i in range(0, self.num_chunks):
data = os.urandom(self.chunk_size)
self.checksum.update(data)
self.volume_file.write(data)
checksum = hashlib.sha256()
test_file.seek(0)
- for _c in xrange(0, self.num_chunks):
+ for _c in range(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
checksum = hashlib.sha256()
test_file.seek(0)
- for _c in xrange(0, self.num_chunks):
+ for _c in range(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
checksum = hashlib.sha256()
test_file.seek(0)
- for _c in xrange(0, self.num_chunks):
+ for _c in range(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
self.addCleanup(self.volume_file.close)
# Remove tempdir.
self.addCleanup(shutil.rmtree, self.temp_dir)
- for _i in xrange(0, 128):
+ for _i in range(0, 128):
self.volume_file.write(os.urandom(1024))
def test_backup_swift_url(self):
self.assertEqual(attachment['attached_host'], host_name)
def test_volume_data_get_for_host(self):
- for i in xrange(THREE):
- for j in xrange(THREE):
+ for i in range(THREE):
+ for j in range(THREE):
db.volume_create(self.ctxt, {'host': 'h%d' % i,
'size': ONE_HUNDREDS})
- for i in xrange(THREE):
+ for i in range(THREE):
self.assertEqual((THREE, THREE_HUNDREDS),
db.volume_data_get_for_host(
self.ctxt, 'h%d' % i))
def test_volume_data_get_for_host_for_multi_backend(self):
- for i in xrange(THREE):
- for j in xrange(THREE):
+ for i in range(THREE):
+ for j in range(THREE):
db.volume_create(self.ctxt, {'host':
'h%d@lvmdriver-1#lvmdriver-1' % i,
'size': ONE_HUNDREDS})
- for i in xrange(THREE):
+ for i in range(THREE):
self.assertEqual((THREE, THREE_HUNDREDS),
db.volume_data_get_for_host(
self.ctxt, 'h%d@lvmdriver-1' % i))
def test_volume_data_get_for_project(self):
- for i in xrange(THREE):
- for j in xrange(THREE):
+ for i in range(THREE):
+ for j in range(THREE):
db.volume_create(self.ctxt, {'project_id': 'p%d' % i,
'size': ONE_HUNDREDS,
'host': 'h-%d-%d' % (i, j),
})
- for i in xrange(THREE):
+ for i in range(THREE):
self.assertEqual((THREE, THREE_HUNDREDS),
db.volume_data_get_for_project(
self.ctxt, 'p%d' % i))
def test_volume_get_all(self):
volumes = [db.volume_create(self.ctxt,
{'host': 'h%d' % i, 'size': i})
- for i in xrange(3)]
+ for i in range(3)]
self._assertEqualListsOfObjects(volumes, db.volume_get_all(
self.ctxt, None, None, ['host'], None))
def test_volume_get_all_by_host(self):
volumes = []
- for i in xrange(3):
+ for i in range(3):
volumes.append([db.volume_create(self.ctxt, {'host': 'h%d' % i})
- for j in xrange(3)])
- for i in xrange(3):
+ for j in range(3)])
+ for i in range(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_host(
self.ctxt, 'h%d' % i))
def test_volume_get_all_by_host_with_pools(self):
volumes = []
vol_on_host_wo_pool = [db.volume_create(self.ctxt, {'host': 'foo'})
- for j in xrange(3)]
+ for j in range(3)]
vol_on_host_w_pool = [db.volume_create(
self.ctxt, {'host': 'foo#pool0'})]
volumes.append((vol_on_host_wo_pool +
def test_volume_get_all_by_group(self):
volumes = []
- for i in xrange(3):
+ for i in range(3):
volumes.append([db.volume_create(self.ctxt, {
- 'consistencygroup_id': 'g%d' % i}) for j in xrange(3)])
- for i in xrange(3):
+ 'consistencygroup_id': 'g%d' % i}) for j in range(3)])
+ for i in range(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_group(
self.ctxt, 'g%d' % i))
def test_volume_get_all_by_project(self):
volumes = []
- for i in xrange(3):
+ for i in range(3):
volumes.append([db.volume_create(self.ctxt, {
- 'project_id': 'p%d' % i}) for j in xrange(3)])
- for i in xrange(3):
+ 'project_id': 'p%d' % i}) for j in range(3)])
+ for i in range(3):
self._assertEqualListsOfObjects(volumes[i],
db.volume_get_all_by_project(
self.ctxt, 'p%d' % i, None,
{'project_id': 'g1',
'display_name': 'name_%d' % i,
'size': 1})
- for i in xrange(2)])
+ for i in range(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g1',
'display_name': 'name_%d' % i,
'size': 2})
- for i in xrange(2)])
+ for i in range(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g1',
'display_name': 'name_%d' % i})
- for i in xrange(2)])
+ for i in range(2)])
vols.extend([db.volume_create(self.ctxt,
{'project_id': 'g2',
'display_name': 'name_%d' % i,
'size': 1})
- for i in xrange(2)])
+ for i in range(2)])
# By project, filter on size and name
filters = {'size': '1'}
from oslo_utils import timeutils
import paramiko
import six
+from six.moves import range
import cinder
from cinder import exception
expected_sleep_arg = []
- for i in xrange(retries):
+ for i in range(retries):
if i > 0:
interval *= backoff_rate
expected_sleep_arg.append(float(interval))
# FIXME(jdg): What is this actually testing?
# We never call the internal _check method?
- for _index in xrange(100):
+ for _index in range(100):
tests_utils.create_volume(self.context, **self.volume_params)
for volume_id in volume_ids:
self.volume.delete_volume(self.context, volume_id)
def _attach_volume(self):
"""Attach volumes to an instance."""
volume_id_list = []
- for index in xrange(3):
+ for index in range(3):
vol = {}
vol['size'] = 0
vol_ref = db.volume_create(self.context, vol)
from oslo_utils import excutils
from oslo_utils import timeutils
import six
+from six.moves import range
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow import task
LOG.info(_LI("initiator_auto_registration: False. "
"Initiator auto registration is not enabled. "
"Please register initiator manually."))
- self.hlu_set = set(xrange(1, self.max_luns_per_sg + 1))
+ self.hlu_set = set(range(1, self.max_luns_per_sg + 1))
self._client = CommandLineHelper(self.configuration)
conf_pools = self.configuration.safe_get("storage_vnx_pool_names")
self.storage_pools = self._get_managed_storage_pools(conf_pools)
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
+from six.moves import range
from cinder import exception
from cinder.i18n import _, _LE, _LW, _LI
"""
lines = [line for line in out if line != '']
# Every record has 2 lines
- for i in xrange(0, len(lines), 2):
+ for i in range(0, len(lines), 2):
try:
int(lines[i][0])
# sanity check
import random
from oslo_log import log as logging
+from six.moves import range
from cinder import exception
from cinder.i18n import _
from oslo_utils import excutils
from oslo_utils import units
import six
+from six.moves import range
from cinder.brick.local_dev import lvm
from cinder import exception
sleep_time = self._sleep_factor
exc_info = None
- for attempt in xrange(self._count):
+ for attempt in range(self._count):
if attempt != 0:
LOG.warning(_LW('Retrying failed call to %(func)s, '
'attempt %(attempt)i.'),
LOG.debug("Entering _wait_for_export_config loop: state=%s.",
state)
- for node_id in xrange(2):
+ for node_id in range(2):
resp = mg_conns[node_id].basic.get_node_values(bn)
if state and len(resp.keys()):
status[node_id] = True
from oslo_log import log as logging
from oslo_utils import units
+from six.moves import range
from cinder import context
from cinder.db.sqlalchemy import models
output = []
for w in wwns:
output.append('wwn.{0}'.format(
- ':'.join(w[x:x + 2] for x in xrange(0, len(w), 2))))
+ ':'.join(w[x:x + 2] for x in range(0, len(w), 2))))
return output
def _convert_wwns_vmem_to_openstack(self, wwns):
LOG.debug("Entering _wait_for_targetstate loop: target=%s.",
target_name)
- for node_id in xrange(2):
+ for node_id in range(2):
resp = mg_conns[node_id].basic.get_node_values(bn)
if len(resp.keys()):
status[node_id] = True
from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import units
+from six.moves import range
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import db
# then fill with random characters from all symbol groups
symbols = ''.join(symbolgroups)
- password.extend([random.choice(symbols) for _i in xrange(length)])
+ password.extend([random.choice(symbols) for _i in range(length)])
# finally shuffle to ensure first x characters aren't from a
# predictable group
# remove the item that was added in the constructor, since I'm tired of
# reading through docutils for the proper way to construct an empty list
lists = []
- for i in xrange(5):
+ for i in range(5):
lists.append(nodes.bullet_list("", nodes.Text('', '')))
lists[i].remove(lists[i][0])
lists[i]['classes'].append('todo_list')