raise exc.HTTPBadRequest(explanation=msg)
context = req.environ['cinder.context']
import_data = body['backup-record']
- #Verify that body elements are provided
+ # Verify that body elements are provided
try:
backup_service = import_data['backup_service']
backup_url = import_data['backup_url']
force = req.params.get('force', None)
- #convert string to bool type in strict manner
+ # Convert string to bool type in strict manner
force = strutils.bool_from_string(force)
LOG.debug("Delete qos_spec: %(id)s, force: %(force)s" %
{'id': id, 'force': force})
context = req.environ['cinder.context']
authorize(context, 'get_pools')
- #TODO(zhiteng) Add filters support
+ # TODO(zhiteng) Add filters support
detail = req.params.get('detail', False)
pools = self.scheduler_api.get_pools(context, filters=None)
from cinder import wsgi
-#default request size is 112k
+# Default request size is 112k
max_request_body_size_opt = cfg.IntOpt('osapi_max_request_body_size',
default=114688,
help='Max size for body of a request')
self._add_xmlns(node, has_atom)
return node.toxml('UTF-8')
- #NOTE (ameade): the has_atom should be removed after all of the
+ # NOTE (ameade): the has_atom should be removed after all of the
# xml serializers and view builders have been updated to the current
# spec that required all responses include the xmlns:atom, the has_atom
# flag is to prevent current tests from breaking
if xmlns:
result.setAttribute('xmlns', xmlns)
- #TODO(bcwaldon): accomplish this without a type-check
+ # TODO(bcwaldon): accomplish this without a type-check
if isinstance(data, list):
collections = metadata.get('list_collections', {})
if nodename in collections:
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
- #TODO(bcwaldon): accomplish this without a type-check
+ # TODO(bcwaldon): accomplish this without a type-check
elif isinstance(data, dict):
collections = metadata.get('dict_collections', {})
if nodename in collections:
"""Returns a list of snapshots, transformed through entity_maker."""
context = req.environ['cinder.context']
- #pop out limit and offset , they are not search_opts
+ # pop out limit and offset , they are not search_opts
search_opts = req.GET.copy()
search_opts.pop('limit', None)
search_opts.pop('offset', None)
- #filter out invalid option
+ # filter out invalid option
allowed_search_options = ('status', 'volume_id', 'display_name')
utils.remove_invalid_filter_options(context, search_opts,
allowed_search_options)
def _items(self, req, entity_maker):
"""Returns a list of volumes, transformed through entity_maker."""
- #pop out limit and offset , they are not search_opts
+ # pop out limit and offset , they are not search_opts
search_opts = req.GET.copy()
search_opts.pop('limit', None)
search_opts.pop('offset', None)
"""Returns a list of snapshots, transformed through entity_maker."""
context = req.environ['cinder.context']
- #pop out limit and offset , they are not search_opts
+ # pop out limit and offset , they are not search_opts
search_opts = req.GET.copy()
search_opts.pop('limit', None)
search_opts.pop('offset', None)
- #filter out invalid option
+ # filter out invalid option
allowed_search_options = ('status', 'volume_id', 'name')
utils.remove_invalid_filter_options(context, search_opts,
allowed_search_options)
def detail(self, request, qos_spec):
"""Detailed view of a single qos_spec."""
- #TODO(zhiteng) Add associations to detailed view
+ # TODO(zhiteng) Add associations to detailed view
return {
'qos_specs': qos_spec,
'links': self._get_links(request,
def getAttrib(self, obj):
"""Get attribute."""
tmpattrib = {}
- #Now set up all the attributes...
+ # Now set up all the attributes...
for key, value in self.attrib.items():
try:
tmpattrib[key] = value(obj)
tagnameList = self._splitTagName(tagname)
insertIndex = 0
- #If parent is not none and has same tagname
+ # If parent is not none and has same tagname
if parent is not None:
for i in range(0, len(tagnameList)):
tmpInsertPos = parent.find(tagnameList[i])
if insertIndex >= len(tagnameList):
insertIndex = insertIndex - 1
- #Create root elem
+ # Create root elem
elem = etree.Element(tagnameList[insertIndex], nsmap=nsmap)
rootelem = elem
subelem = elem
- #Create subelem
+ # Create subelem
for i in range((insertIndex + 1), len(tagnameList)):
subelem = etree.SubElement(elem, tagnameList[i])
elem = subelem
# If we have a parent, append the node to the parent
if parent is not None:
- #If we can merge this element, then insert
+ # If we can merge this element, then insert
if insertIndex > 0:
parent.insert(len(list(parent)), rootelem)
else:
finally:
QUOTAS.rollback(context, reservations)
- #TODO(DuncanT): In future, when we have a generic local attach,
- # this can go via the scheduler, which enables
- # better load balancing and isolation of services
+ # TODO(DuncanT): In future, when we have a generic local attach,
+ # this can go via the scheduler, which enables
+ # better load balancing and isolation of services
self.backup_rpcapi.create_backup(context,
backup['host'],
backup['id'],
device_info = {'type': 'block'}
if self.use_multipath:
- #multipath installed, discovering other targets if available
+ # multipath installed, discovering other targets if available
for ip, iqn in self._discover_iscsi_portals(connection_properties):
props = copy.deepcopy(connection_properties)
props['target_portal'] = ip
host_device = next(dev for dev in host_devices if os.path.exists(dev))
if self.use_multipath:
- #we use the multipath device instead of the single path device
+ # we use the multipath device instead of the single path device
self._rescan_multipath()
multipath_device = self._get_multipath_device_name(host_device)
if multipath_device is not None:
"node.session.auth.password",
connection_properties['auth_password'])
- #duplicate logins crash iscsiadm after load,
- #so we scan active sessions to see if the node is logged in.
+ # duplicate logins crash iscsiadm after load,
+ # so we scan active sessions to see if the node is logged in.
out = self._run_iscsiadm_bare(["-m", "session"],
run_as_root=True,
check_exit_code=[0, 1, 21])[0] or ""
("--login",),
check_exit_code=[0, 255])
except putils.ProcessExecutionError as err:
- #as this might be one of many paths,
- #only set successful logins to startup automatically
+ # as this might be one of many paths,
+ # only set successful logins to startup automatically
if err.exit_code in [15]:
self._iscsiadm_update(connection_properties,
"node.startup",
waiting_status = {'tries': 0}
- #NOTE(jbr_): Device path is not always present immediately
+ # NOTE(jbr_): Device path is not always present immediately
def _wait_for_discovery(aoe_path):
if os.path.exists(aoe_path):
raise loopingcall.LoopingCallDone
]
"""
filters = filters or {}
- #TODO(zhiteng) Add filters for 'consumer'
+ # TODO(zhiteng) Add filters for 'consumer'
read_deleted = "yes" if inactive else "no"
rows = model_query(context, models.QualityOfServiceSpecs,
return
try:
- #Set default volumes
+ # Set default volumes
qci = quota_classes.insert()
qci.execute({'created_at': CREATED_AT,
'class_name': CLASS_NAME,
'resource': 'volumes',
'hard_limit': CONF.quota_volumes,
'deleted': False, })
- #Set default snapshots
+ # Set default snapshots
qci.execute({'created_at': CREATED_AT,
'class_name': CLASS_NAME,
'resource': 'snapshots',
'hard_limit': CONF.quota_snapshots,
'deleted': False, })
- #Set default gigabytes
+ # Set default gigabytes
qci.execute({'created_at': CREATED_AT,
'class_name': CLASS_NAME,
'resource': 'gigabytes',
message = _("File %(file_path)s could not be found.")
-#TODO(bcwaldon): EOL this exception!
class Duplicate(CinderException):
pass
message = _("Bad HTTP response status %(status)s")
-#SolidFire
+# SolidFire
class SolidFireAPIException(VolumeBackendAPIException):
message = _("Bad response from SolidFire API")
image_meta, data=None, purge_props=True):
"""Modify the given image with the new data."""
image_meta = self._translate_to_glance(image_meta)
- #NOTE(dosaboy): see comment in bug 1210467
+ # NOTE(dosaboy): see comment in bug 1210467
if CONF.glance_api_version == 1:
image_meta['purge_props'] = purge_props
- #NOTE(bcwaldon): id is not an editable field, but it is likely to be
+ # NOTE(bcwaldon): id is not an editable field, but it is likely to be
# passed in by calling code. Let's be nice and ignore it.
image_meta.pop('id', None)
if data:
image_meta['data'] = data
try:
- #NOTE(dosaboy): the v2 api separates update from upload
+ # NOTE(dosaboy): the v2 api separates update from upload
if data and CONF.glance_api_version > 1:
image_meta = self._client.call(context, 'upload', image_id,
image_meta['data'])
def _extract_attributes(image):
- #NOTE(hdd): If a key is not found, base.Resource.__getattr__() may perform
+ # NOTE(hdd): If a key is not found, base.Resource.__getattr__() may perform
# a get(), resulting in a useless request back to glance. This list is
# therefore sorted, with dependent attributes as the end
# 'deleted_at' depends on 'deleted'
:returns: a tuple of the form (image_service, image_id)
"""
- #NOTE(bcwaldon): If image_href doesn't look like a URI, assume its a
+ # NOTE(bcwaldon): If image_href doesn't look like a URI, assume its a
# standalone image ID
if '/' not in str(image_href):
image_service = get_default_image_service()
return top_host.obj
def get_pools(self, context, filters):
- #TODO(zhiteng) Add filters support
+ # TODO(zhiteng) Add filters support
return self.host_manager.get_pools(context)
def _post_select_populate_filter_properties(self, filter_properties,
total_space = host_state.total_capacity_gb
if (free_space == 'infinite' or free_space == 'unknown' or
total_space == 'infinite' or total_space == 'unknown'):
- #(zhiteng) 'infinite' and 'unknown' are treated the same
+ # (zhiteng) 'infinite' and 'unknown' are treated the same
# here, for sorting purpose.
# As a partial fix for bug #1350638, 'infinite' and 'unknown' are
test_host = 'test_host'
alt_host = 'strange_host'
empty_service = []
- #service host not match with volume's host
+ # service host not match with volume's host
host_not_match = [{'availability_zone': "fake_az", 'host': alt_host,
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- #service az not match with volume's az
+ # service az not match with volume's az
az_not_match = [{'availability_zone': "strange_az", 'host': test_host,
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- #service disabled
+ # service disabled
disabled_service = []
- #dead service that last reported at 20th century
+ # dead service that last reported at 20th century
dead_service = [{'availability_zone': "fake_az", 'host': alt_host,
'disabled': 0, 'updated_at': '1989-04-16 02:55:44'}]
- #first service's host not match but second one works.
+ # first service's host not match but second one works.
multi_services = [{'availability_zone': "fake_az", 'host': alt_host,
'disabled': 0, 'updated_at': timeutils.utcnow()},
{'availability_zone': "fake_az", 'host': test_host,
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- #Setup mock to run through the following service cases
+ # Setup mock to run through the following service cases
_mock_service_get_all_by_topic.side_effect = [empty_service,
host_not_match,
az_not_match,
host=test_host)['id']
volume = self.volume_api.get(context.get_admin_context(), volume_id)
- #test empty service
+ # test empty service
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
False)
- #test host not match service
+ # test host not match service
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
False)
- #test az not match service
+ # test az not match service
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
False)
- #test disabled service
+ # test disabled service
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
False)
- #test dead service
+ # test dead service
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
False)
- #test multi services and the last service matches
+ # test multi services and the last service matches
self.assertEqual(self.backup_api._is_backup_service_enabled(volume,
test_host),
True)
self.assertEqual(export.item(0).getAttribute('backup_url'),
backup_url)
- #db.backup_destroy(context.get_admin_context(), backup_id)
+ # db.backup_destroy(context.get_admin_context(), backup_id)
def test_export_record_with_bad_backup_id(self):
backup_service = 'fake'
backup_url = 'fake'
- #test with no backup_service
+ # test with no backup_service
req = webob.Request.blank('/v2/fake/backups/import_record')
body = {'backup-record': {'backup_url': backup_url}}
req.body = json.dumps(body)
self.assertEqual(res_dict['badRequest']['message'],
'Incorrect request body format.')
- #test with no backup_url
+ # test with no backup_url
req = webob.Request.blank('/v2/fake/backups/import_record')
body = {'backup-record': {'backup_service': backup_service}}
req.body = json.dumps(body)
self.assertEqual(res_dict['badRequest']['message'],
'Incorrect request body format.')
- #test with no backup_url and backup_url
+ # test with no backup_url and backup_url
req = webob.Request.blank('/v2/fake/backups/import_record')
body = {'backup-record': {}}
req.body = json.dumps(body)
class FoxInSocksFlavorGooseControllerExtension(wsgi.Controller):
@wsgi.extends
def show(self, req, resp_obj, id):
- #NOTE: This only handles JSON responses.
+ # NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
resp_obj.obj['flavor']['googoose'] = req.GET.get('chewing')
class FoxInSocksFlavorBandsControllerExtension(wsgi.Controller):
@wsgi.extends
def show(self, req, resp_obj, id):
- #NOTE: This only handles JSON responses.
+ # NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
resp_obj.obj['big_bands'] = 'Pig Bands!'
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
- #verify absolute limits
+ # verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 4)
for limit in absolutes:
value = limit.get('value')
self.assertEqual(value, str(fixture['limits']['absolute'][name]))
- #verify rate limits
+ # verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 2)
for i, rate in enumerate(rates):
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
- #verify absolute limits
+ # verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 0)
- #verify rate limits
+ # verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 0)
req.method = 'POST'
req.headers["content-type"] = "application/json"
- #test for long key
+ # test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for long value
+ # test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for empty key.
+ # test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(2, res['snapshots'][0]['id'])
- #admin case
+ # admin case
list_snapshots_with_limit_and_offset(is_admin=True)
- #non_admin case
+ # non_admin case
list_snapshots_with_limit_and_offset(is_admin=False)
def test_admin_list_snapshots_all_tenants(self):
req.method = 'POST'
req.headers["content-type"] = "application/json"
- #test for long key
+ # test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for long value
+ # test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for empty key.
+ # test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.assertEqual(len(volumes), 1)
self.assertEqual(volumes[0]['id'], 2)
- #admin case
+ # admin case
volume_detail_limit_offset(is_admin=True)
- #non_admin case
+ # non_admin case
volume_detail_limit_offset(is_admin=False)
def test_volume_show_with_admin_metadata(self):
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
- #verify absolute limits
+ # verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 4)
for limit in absolutes:
value = limit.get('value')
self.assertEqual(value, str(fixture['limits']['absolute'][name]))
- #verify rate limits
+ # verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 2)
for i, rate in enumerate(rates):
root = etree.XML(output)
xmlutil.validate_schema(root, 'limits')
- #verify absolute limits
+ # verify absolute limits
absolutes = root.xpath('ns:absolute/ns:limit', namespaces=NS)
self.assertEqual(len(absolutes), 0)
- #verify rate limits
+ # verify rate limits
rates = root.xpath('ns:rates/ns:rate', namespaces=NS)
self.assertEqual(len(rates), 0)
req.method = 'POST'
req.headers["content-type"] = "application/json"
- #test for long key
+ # test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for long value
+ # test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for empty key.
+ # test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.assertEqual(1, len(res['snapshots']))
self.assertEqual(2, res['snapshots'][0]['id'])
- #admin case
+ # admin case
list_snapshots_with_limit_and_offset(is_admin=True)
- #non_admin case
+ # non_admin case
list_snapshots_with_limit_and_offset(is_admin=False)
def test_admin_list_snapshots_all_tenants(self):
req.method = 'POST'
req.headers["content-type"] = "application/json"
- #test for long key
+ # test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for long value
+ # test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,
self.controller.create, req, self.req_id, data)
- #test for empty key.
+ # test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dumps(data)
self.assertRaises(webob.exc.HTTPBadRequest,
self.configuration.volume_group_name = 'fake-vg'
super(BrickLvmTestCase, self).setUp()
- #Stub processutils.execute for static methods
+ # Stub processutils.execute for static methods
self.stubs.Set(processutils, 'execute',
self.fake_execute)
self.vg = brick.LVM(self.configuration.volume_group_name,
_images = images or []
map(lambda image: self.create(**image), _images)
- #NOTE(bcwaldon): HACK to get client.images.* to work
+ # NOTE(bcwaldon): HACK to get client.images.* to work
self.images = lambda: None
for fn in ('list', 'get', 'data', 'create', 'update', 'delete'):
setattr(self.images, fn, getattr(self, fn))
- #TODO(bcwaldon): implement filters
+ # TODO(bcwaldon): implement filters
def list(self, filters=None, marker=None, limit=30):
if marker is None:
index = 0
self._imagedata = {}
super(_FakeImageService, self).__init__()
- #TODO(bcwaldon): implement optional kwargs such as limit, sort_dir
+ # TODO(bcwaldon): implement optional kwargs such as limit, sort_dir
def detail(self, context, **kwargs):
"""Return list of detailed image information."""
return copy.deepcopy(self.images.values())
def setUp(self):
super(TestGlanceImageService, self).setUp()
- #fakes.stub_out_compute_api_snapshot(self.stubs)
client = glance_stubs.StubGlanceClient()
self.service = self._create_image_service(client)
export['backup_service'] = 'cinder.tests.backup.bad_service'
imported_record = self._create_export_record_db_entry()
- #Test the case where the additional hosts list is empty
+ # Test the case where the additional hosts list is empty
backup_hosts = []
self.assertRaises(exception.ServiceNotFound,
self.backup_mgr.import_record,
export['backup_url'],
backup_hosts)
- #Test that the import backup keeps calling other hosts to find a
- #suitable host for the backup service
+ # Test that the import backup keeps calling other hosts to find a
+ # suitable host for the backup service
backup_hosts = ['fake1', 'fake2']
BackupAPI_import = 'cinder.backup.rpcapi.BackupAPI.import_record'
with mock.patch(BackupAPI_import) as _mock_backup_import:
self.service.delete(self.backup)
self.assertTrue(mock_del_backup_snap.called)
- #self.assertFalse(self.mock_rbd.ImageNotFound.called)
self.assertTrue(self.mock_rbd.RBD.return_value.list.called)
self.assertTrue(self.mock_rbd.RBD.return_value.remove.called)
mock_init):
context = {}
volume = {'id': self.VOLUME.get(u'name')}
- #self.driver.ensure_export(context, volume)
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.ensure_export,
context,
self.configuration.storage_vnx_pool_name = 'unit_test_pool'
self.configuration.san_login = 'sysadmin'
self.configuration.san_password = 'sysadmin'
- #set the timeout to 0.012s = 0.0002 * 60 = 1.2ms
+ # set the timeout to 0.012s = 0.0002 * 60 = 1.2ms
self.configuration.default_timeout = 0.0002
self.configuration.initiator_auto_registration = True
self.configuration.check_max_pool_luns_threshold = False
'-Deduplication',
'-ThinProvisioning',
'-FAST']
- #case
+ # case
self.driver.create_volume(self.testData.test_volume_with_type)
- #verification
+ # verification
expect_cmd = [
mock.call(*self.testData.LUN_CREATION_CMD(
'vol_with_type', 1,
'-Deduplication',
'-ThinProvisioning',
'-FAST']
- #case
+ # case
self.driver.create_volume(self.testData.test_volume_with_type)
- #verification
+ # verification
expect_cmd = [
mock.call(*self.testData.LUN_CREATION_CMD(
'vol_with_type', 1,
'-Deduplication',
'-ThinProvisioning',
'-FAST']
- #case
+ # case
self.driver.create_volume(self.testData.test_volume_with_type)
- #verification
+ # verification
expect_cmd = [
mock.call(*self.testData.LUN_CREATION_CMD(
'vol_with_type', 1,
'-Deduplication',
'-ThinProvisioning',
'-FAST']
- #case
+ # case
self.driver.create_volume(self.testData.test_volume_with_type)
- #verification
+ # verification
expect_cmd = [
mock.call(*self.testData.LUN_CREATION_CMD(
'vol_with_type', 1,
ret = self.driver.migrate_volume(None, self.testData.test_volume,
fakehost)[0]
self.assertTrue(ret)
- #verification
+ # verification
expect_cmd = [mock.call(*self.testData.MIGRATION_CMD(1, 1),
retry_disable=True,
poll=True),
ret = self.driver.migrate_volume(None, self.testData.test_volume,
fake_host)[0]
self.assertTrue(ret)
- #verification
+ # verification
expect_cmd = [mock.call(*self.testData.MIGRATION_CMD(),
retry_disable=True,
poll=True),
ret = self.driver.migrate_volume(None, self.testData.test_volume5,
fakehost)[0]
self.assertTrue(ret)
- #verification
+ # verification
expect_cmd = [mock.call(*self.testData.MIGRATION_CMD(5, 5),
retry_disable=True,
poll=True),
ret = self.driver.migrate_volume(None, self.testData.test_volume,
fakehost)[0]
self.assertFalse(ret)
- #verification
+ # verification
expect_cmd = [mock.call(*self.testData.MIGRATION_CMD(),
retry_disable=True,
poll=True)]
def test_create_destroy_volume_snapshot(self):
fake_cli = self.driverSetup()
- #case
+ # case
self.driver.create_snapshot(self.testData.test_snapshot)
self.driver.delete_snapshot(self.testData.test_snapshot)
- #verification
+ # verification
expect_cmd = [mock.call(*self.testData.SNAP_CREATE_CMD('snapshot1'),
poll=False),
mock.call(*self.testData.SNAP_DELETE_CMD('snapshot1'),
results = [FAKE_ERROR_RETURN]
fake_cli = self.driverSetup(commands, results)
- #case
+ # case
self.assertRaises(EMCVnxCLICmdError,
self.driver.create_snapshot,
self.testData.test_failed_snapshot)
- #verification
+ # verification
expect_cmd = [
mock.call(
*self.testData.SNAP_CREATE_CMD('failed_snapshot'),
fake_cli.assert_has_calls(expect_cmd)
def test_create_volume_from_snapshot(self):
- #set up
+ # set up
cmd_dest = self.testData.LUN_PROPERTY_ALL_CMD("vol2_dest")
cmd_dest_np = self.testData.LUN_PROPERTY_ALL_CMD("vol2_dest")
output_dest = self.testData.LUN_PROPERTY("vol2_dest")
self.testData.test_pool_name
self.driver = EMCCLIISCSIDriver(configuration=self.configuration)
assert isinstance(self.driver.cli, emc_vnx_cli.EMCVnxCliPool)
- #mock the command executor
+ # mock the command executor
fake_command_execute = self.get_command_execute_simulator(
commands, results)
fake_cli = mock.MagicMock(side_effect=fake_command_execute)
self.configuration.storage_vnx_pool_name = invalid_pool_name
self.driver = EMCCLIISCSIDriver(configuration=self.configuration)
assert isinstance(self.driver.cli, emc_vnx_cli.EMCVnxCliPool)
- #mock the command executor
+ # mock the command executor
fake_command_execute = self.get_command_execute_simulator(
commands, results)
fake_cli = mock.MagicMock(side_effect=fake_command_execute)
self.driver = EMCCLIISCSIDriver(configuration=self.configuration)
assert isinstance(self.driver.cli, emc_vnx_cli.EMCVnxCliPool)
- #mock the command executor
+ # mock the command executor
fake_command_execute = self.get_command_execute_simulator(
commands, results)
fake_cli = mock.MagicMock(side_effect=fake_command_execute)
expected = [mock.call(*get_lun_cmd, poll=True)]
assert get_size == test_size
fake_cli.assert_has_calls(expected)
- #Test the function with invalid reference.
+ # Test the function with invalid reference.
invaild_ref = {'fake': 'fake_ref'}
self.assertRaises(exception.ManageExistingInvalidReference,
self.driver.manage_existing_get_size,
mock_create_client.return_value = mock_client
common = self.driver._login()
- #Setup a single ISCSI IP
+ # Setup a single ISCSI IP
iscsi_ips = ["10.10.220.253"]
self.driver.configuration.hp3par_iscsi_ips = iscsi_ips
mock_create_client.return_value = mock_client
common = self.driver._login()
- #Setup two ISCSI IPs
+ # Setup two ISCSI IPs
iscsi_ips = ["10.10.220.252", "10.10.220.253"]
self.driver.configuration.hp3par_iscsi_ips = iscsi_ips
mock_create_client.return_value = mock_client
common = self.driver._login()
- #Setup two ISCSI IPs
+ # Setup two ISCSI IPs
iscsi_ips = ["10.10.220.252", "10.10.220.253"]
self.driver.configuration.hp3par_iscsi_ips = iscsi_ips
mock_client.getPorts.return_value = PORTS1_RET
mock_client.getVLUNs.return_value = VLUNS5_RET
- #Setup two ISCSI IPs
+ # Setup two ISCSI IPs
iscsi_ips = ["10.10.220.252", "10.10.220.253"]
self.driver.configuration.hp3par_iscsi_ips = iscsi_ips
import cinder.volume.drivers.openvstorage as ovsvd
-#MOCKUPS
+# MOCKUPS
MOCK_hostname = 'test-hostname'
MOCK_mountpoint = '/mnt/test'
MOCK_vdisk_guid = '0000'
INITIATOR = 'iqn.2013-08.org.debian:01:aaaaaaaa'
DATA_IN_VOLUME = {'id': VOLUMEUUID}
DATA_IN_CONNECTOR = {'initiator': INITIATOR}
-## dpl.getpool
DATA_SERVER_INFO = 0, {
'metadata': {'vendor': 'ProphetStor',
'version': '1.5'}}
'objectType': 'application/cdmi-container',
'percentComplete': 100}
-## dpl.assignvdev
DATA_ASSIGNVDEV = 0, {
'children': [],
'childrenrange': '',
# Make sure that the volumes have been created
self._assert_vol_exists(volume['name'], True)
- #Set up one WWPN that won't match and one that will.
+ # Set up one WWPN that won't match and one that will.
self.driver._state['storage_nodes']['1']['WWPN'] = ['123456789ABCDEF0',
'AABBCCDDEEFF0010']
# Make sure that the volumes have been created
self._assert_vol_exists(volume['name'], True)
- #Set up WWPNs that will not match what is available.
+ # Set up WWPNs that will not match what is available.
self.driver._state['storage_nodes']['1']['WWPN'] = ['123456789ABCDEF0',
'123456789ABCDEF1']
# Make sure that the volumes have been created
self._assert_vol_exists(volume['name'], True)
- #Set up one WWPN.
+ # Set up one WWPN.
self.driver._state['storage_nodes']['1']['WWPN'] = ['AABBCCDDEEFF0012']
wwpns = ['ff00000000000000', 'ff00000000000001']
self.assertEqual(len(ts), 0, 'Unexpected transfers listed.')
def test_delete_transfer_with_deleted_volume(self):
- #create a volume
+ # create a volume
volume = utils.create_volume(self.ctxt, id='1',
updated_at=self.updated_at)
- #create a transfer
+ # create a transfer
tx_api = transfer_api.API()
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
self.assertEqual(t['id'], transfer['id'], 'Unexpected transfer id')
- #force delete volume
+ # force delete volume
db.volume_destroy(context.get_admin_context(), volume['id'])
- #Make sure transfer has been deleted.
+ # Make sure transfer has been deleted.
self.assertRaises(exception.TransferNotFound,
tx_api.get,
self.ctxt,
self.assertDictMatch(expected, res)
def test_volume_types_diff(self):
- #type_ref 1 and 2 have the same extra_specs, while 3 has different
+ # type_ref 1 and 2 have the same extra_specs, while 3 has different
keyvals1 = {"key1": "val1", "key2": "val2"}
keyvals2 = {"key1": "val0", "key2": "val2"}
type_ref1 = volume_types.create(self.ctxt, "type1", keyvals1)
self.assertEqual(same, False)
self.assertEqual(diff['extra_specs']['key1'], ('val1', 'val0'))
- #qos_ref 1 and 2 have the same specs, while 3 has different
+ # qos_ref 1 and 2 have the same specs, while 3 has different
qos_keyvals1 = {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'}
qos_keyvals2 = {'k1': 'v0', 'k2': 'v2', 'k3': 'v3'}
qos_ref1 = qos_specs.create(self.ctxt, 'qos-specs-1', qos_keyvals1)
raise exception.Invalid()
#################################
-## UNIT TESTS ##
+# UNIT TESTS #
#################################
def test_do_setup(self, mock_req):
self.setup_driver()
if vol_name == vol:
attachments = params['attachments']
if srv in attachments:
- #already attached - ok
+ # already attached - ok
return RUNTIME_VARS['good']
else:
attachments.append(srv)
if params['cg-name'] == cg_name:
snapshots = params['snapshots']
if snap_name in snapshots:
- #already attached
+ # already attached
return RUNTIME_VARS['bad_volume']
else:
snapshots.append(snap_name)
nfs_info = self._get_nfs_info()
for share in self.shares:
- #export = share.split(':')[1]
if share in nfs_info.keys():
LOG.info(_LI("share: %(share)s -> %(info)s"),
{'share': share, 'info': nfs_info[share]['path']})
'Huawei OceanStor %(product)s series storage arrays.')
% {'protocol': protocol,
'product': product})
- #Map HVS to 18000
+ # Map HVS to 18000
if product in MAPPING:
LOG.warn(_LW("Product name %s is deprecated, update your "
"configuration to the new product name."), product)
LOG = logging.getLogger(__name__)
nas_opts = [
- #TODO(eharney): deprecate nas_ip and change this to nas_host
+ # TODO(eharney): deprecate nas_ip and change this to nas_host
cfg.StrOpt('nas_ip',
default='',
help='IP address or Hostname of NAS system.'),
qos_specs_id = volume_type.get('qos_specs_id')
specs = volume_type.get('extra_specs')
- #NOTE(kmartin): We prefer the qos_specs association
+ # NOTE(kmartin): We prefer the qos_specs association
# and override any existing extra-specs settings
# if present.
if qos_specs_id is not None:
def check_for_setup_error(self):
"""Return error if prerequisites aren't met."""
try:
- #NOTE(francois-charlier) Since 0.24 'collie cluster info -r'
- # gives short output, but for compatibility reason we won't
- # use it and just check if 'running' is in the output.
+ # NOTE(francois-charlier) Since 0.24 'collie cluster info -r'
+ # gives short output, but for compatibility reason we won't
+ # use it and just check if 'running' is in the output.
(out, _err) = self._execute('collie', 'cluster', 'info')
if 'status: running' not in out:
exception_message = (_("Sheepdog is not working: %s") % out)
# exist, this is expected as it signals that the image_id is missing.
image_meta = self.image_service.show(context, image_id)
- #check whether image is active
+ # check whether image is active
if image_meta['status'] != 'active':
msg = _('Image %(image_id)s is not active.')\
% {'image_id': image_id}
except exception.VolumeTypeNotFoundByName as e:
# Couldn't find volume type with the name in default_volume_type
# flag, record this issue and move on
- #TODO(zhiteng) consider add notification to warn admin
+ # TODO(zhiteng) consider add notification to warn admin
LOG.exception(_LE('Default volume type is not found,'
'please check default_volume_type config: %s') %
six.text_type(e))
# E251 unexpected spaces around keyword / parameter equals
# reason: no improvement in readability
#
-# E265 block comment should start with '# '
-# reason: no improvement in readability
-#
# H402 one line docstring needs punctuation
# reason: removed in hacking (https://review.openstack.org/#/c/101497/)
#
# H302,H405
-ignore = E251,E265,H302,H402,H405,H803,H904
+ignore = E251,H302,H402,H405,H803,H904
exclude = .git,.venv,.tox,dist,tools,doc,common,*egg,build
max-complexity=30