try:
rtsroot = rtslib.root.RTSRoot()
except rtslib.utils.RTSLibError:
- print _('Ensure that configfs is mounted at /sys/kernel/config.')
+ print(_('Ensure that configfs is mounted at /sys/kernel/config.'))
raise
# Look to see if BlockStorageObject already exists
try:
rtslib.NetworkPortal(tpg_new, '0.0.0.0', 3260, mode='any')
except rtslib.utils.RTSLibError:
- print _('Error creating NetworkPortal: ensure port 3260 '
- 'is not in use by another service.')
+ print(_('Error creating NetworkPortal: ensure port 3260 '
+ 'is not in use by another service.'))
raise
try:
try:
rtsroot = rtslib.root.RTSRoot()
except rtslib.utils.RTSLibError:
- print _('Ensure that configfs is mounted at /sys/kernel/config.')
+ print(_('Ensure that configfs is mounted at /sys/kernel/config.'))
raise
# Look for the target
def usage():
- print "Usage:"
- print sys.argv[0], \
- "create [device] [name] [userid] [password]", \
- "<initiator_iqn,iqn2,iqn3,...>"
- print sys.argv[0], \
- "add-initiator [target_iqn] [userid] [password] [initiator_iqn]"
- print sys.argv[0], "get-targets"
- print sys.argv[0], "delete [iqn]"
- print sys.argv[0], "verify"
+ print("Usage:")
+ print(sys.argv[0] +
+ " create [device] [name] [userid] [password]" +
+ " <initiator_iqn,iqn2,iqn3,...>")
+ print(sys.argv[0] +
+ " add-initiator [target_iqn] [userid] [password] [initiator_iqn]")
+ print(sys.argv[0] + " get-targets")
+ print(sys.argv[0] + " delete [iqn]")
+ print(sys.argv[0] + " verify")
sys.exit(1)
self.fake_random(reset=True)
host_state = {'host': 'host.example.com', 'free_capacity_gb': 99999}
weight = weigher._weigh_object(host_state, None)
- self.assertEquals(1.0, weight)
+ self.assertEqual(1.0, weight)
weight = weigher._weigh_object(host_state, None)
- self.assertEquals(2.0, weight)
+ self.assertEqual(2.0, weight)
weight = weigher._weigh_object(host_state, None)
- self.assertEquals(3.0, weight)
+ self.assertEqual(3.0, weight)
def test_host_manager_choosing_chance_weigher(self):
# ensure HostManager can load the ChanceWeigher
# via the entry points mechanism
hm = host_manager.HostManager()
weighers = hm._choose_host_weighers('ChanceWeigher')
- self.assertEquals(1, len(weighers))
- self.assertEquals(weighers[0], ChanceWeigher)
+ self.assertEqual(1, len(weighers))
+ self.assertEqual(weighers[0], ChanceWeigher)
def test_use_of_chance_weigher_via_host_manager(self):
# ensure we don't lose any hosts when weighing with
fake_hosts = [host_manager.HostState('fake_host%s' % x)
for x in xrange(1, 5)]
weighed_hosts = hm.get_weighed_hosts(fake_hosts, {}, 'ChanceWeigher')
- self.assertEquals(4, len(weighed_hosts))
+ self.assertEqual(4, len(weighed_hosts))
self.service.backup(backup, rbd_io)
- self.assertEquals(self.called, ['list', 'popen_init', 'read',
- 'popen_init', 'write',
- 'stdout_close', 'communicate'])
+ self.assertEqual(self.called, ['list', 'popen_init', 'read',
+ 'popen_init', 'write',
+ 'stdout_close', 'communicate'])
# Ensure the files are equal
self.assertEqual(checksum.digest(), self.checksum.digest())
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
- self.assertEquals(self.called, ['remove'])
+ self.assertEqual(self.called, ['remove'])
def test_try_delete_base_image(self):
# don't create volume db entry since it should not be required
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
- self.assertEquals(self.called, ['remove'])
+ self.assertEqual(self.called, ['remove'])
def test_try_delete_base_image_busy(self):
"""This should induce retries then raise rbd.ImageBusy."""
self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
self._setup_mock_popen(self, ['out', 'err'])
self.service._piped_execute(['foo'], ['bar'])
- self.assertEquals(self.called, ['popen_init', 'popen_init',
- 'stdout_close', 'communicate'])
+ self.assertEqual(self.called, ['popen_init', 'popen_init',
+ 'stdout_close', 'communicate'])
def tearDown(self):
self.volume_file.close()
compute_catalog = [{u'type': u'compute', u'name': u'nova'}]
ctxt = context.RequestContext('111', '222',
service_catalog=service_catalog)
- self.assertEquals(ctxt.service_catalog, compute_catalog)
+ self.assertEqual(ctxt.service_catalog, compute_catalog)
db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata})
db.volume_metadata_delete(self.ctxt, 1, 'c')
metadata.pop('c')
- self.assertEquals(metadata, db.volume_metadata_get(self.ctxt, 1))
+ self.assertEqual(metadata, db.volume_metadata_get(self.ctxt, 1))
class DBAPISnapshotTestCase(BaseTest):
inf = image_utils.qemu_img_info(TEST_PATH)
- self.assertEquals(inf.image, 'qemu.qcow2')
- self.assertEquals(inf.backing_file, 'qemu.qcow2')
- self.assertEquals(inf.file_format, 'qcow2')
- self.assertEquals(inf.virtual_size, 52428800)
- self.assertEquals(inf.cluster_size, 65536)
- self.assertEquals(inf.disk_size, 200704)
-
- self.assertEquals(inf.snapshots[0]['id'], '1')
- self.assertEquals(inf.snapshots[0]['tag'], 'snap1')
- self.assertEquals(inf.snapshots[0]['vm_size'], '1.7G')
- self.assertEquals(inf.snapshots[0]['date'], '2011-10-04')
- self.assertEquals(inf.snapshots[0]['vm_clock'],
- '19:04:00 32:06:34.974')
-
- self.assertEquals(str(inf), TEST_STR)
+ self.assertEqual(inf.image, 'qemu.qcow2')
+ self.assertEqual(inf.backing_file, 'qemu.qcow2')
+ self.assertEqual(inf.file_format, 'qcow2')
+ self.assertEqual(inf.virtual_size, 52428800)
+ self.assertEqual(inf.cluster_size, 65536)
+ self.assertEqual(inf.disk_size, 200704)
+
+ self.assertEqual(inf.snapshots[0]['id'], '1')
+ self.assertEqual(inf.snapshots[0]['tag'], 'snap1')
+ self.assertEqual(inf.snapshots[0]['vm_size'], '1.7G')
+ self.assertEqual(inf.snapshots[0]['date'], '2011-10-04')
+ self.assertEqual(inf.snapshots[0]['vm_clock'],
+ '19:04:00 32:06:34.974')
+
+ self.assertEqual(str(inf), TEST_STR)
def _test_fetch_to_raw(self, has_qemu=True, src_inf=None, dest_inf=None):
mox = self._mox
)
for value, result in values_to_test:
- self.assertEquals(utils.str2size(value), result)
+ self.assertEqual(utils.str2size(value), result)
# Invalid format value
self.assertRaises(ValueError, utils.str2size, 'A')
'/rest/nms/')),
)
for url, result in urls:
- self.assertEquals(utils.parse_nms_url(url), result)
+ self.assertEqual(utils.parse_nms_url(url), result)
volume = self._generate_vol_info(None, None)
volume['volume_type_id'] = None
self.driver.create_volume(volume)
- self.assertNotEquals(cap['extent_size'], self.driver._extent_size)
+ self.assertNotEqual(cap['extent_size'], self.driver._extent_size)
self.driver.migrate_volume(ctxt, volume, host)
self.driver.delete_volume(volume)
fp = open(tmpfilename2, 'r+')
runs = fp.read()
fp.close()
- self.assertNotEquals(runs.strip(), 'failure', 'stdin did not '
- 'always get passed '
- 'correctly')
+ self.assertNotEqual(runs.strip(), 'failure', 'stdin did not '
+ 'always get passed '
+ 'correctly')
runs = int(runs.strip())
self.assertEqual(runs, 10, 'Ran %d times instead of 10.' % (runs,))
finally:
# Install bounded pep8/pyflakes first, then let flake8 install
-hacking>=0.5.6,<0.8
+hacking>=0.8.0,<0.9
coverage>=3.6
discover
fixtures>=0.3.14