class Volume_actions(extensions.ExtensionDescriptor):
- """Enable volume actions
- """
+ """Enable volume actions."""
name = "VolumeActions"
alias = "os-volume-actions"
return resources.get(resource_id)
def cache_db_items(self, key, items, item_key='id'):
- """Allow API methods to store objects from a DB query to be
+ """Get cached database items.
+
+ Allow API methods to store objects from a DB query to be
used by API extensions within the same API request.
An instance of this class only lives for the lifetime of a
self.cache_resource(items, item_key, key)
def get_db_items(self, key):
- """Allow an API extension to get previously stored objects within
+ """Get database items.
+
+ Allow an API extension to get previously stored objects within
the same API request.
Note that the object data will be slightly stale.
return self.cached_resource(key)
def get_db_item(self, key, item_key):
- """Allow an API extension to get a previously stored object
+ """Get database item.
+
+ Allow an API extension to get a previously stored object
within the same API request.
Note that the object data will be slightly stale.
"""Determine content type of the request body.
Does not do any body introspection, only checks header
-
"""
if "Content-Type" not in self.headers:
return None
:param listnames: list of XML node names whose subnodes should
be considered list items.
-
"""
if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
return node.childNodes[0].nodeValue
Exceptions derived from webob.exc.HTTPException will be automatically
wrapped in Fault() to provide API friendly error responses.
-
"""
def __init__(self, controller, action_peek=None, **deserializers):
def unquote_header_value(value):
"""Unquotes a header value.
+
This does not use the real unquoting but what browsers are actually
using for quoting.
def parse_options_header(value):
- """Parse a ``Content-Type`` like header into a tuple with the content
+ """Parse 'Content-Type'-like header into a tuple.
+
+ Parse a ``Content-Type`` like header into a tuple with the content
type and the options:
>>> parse_options_header('Content-Type: text/html; mimetype=text/html')
"""
def __init__(self, application, limits=None, limiter=None, **kwargs):
- """Initialize new `RateLimitingMiddleware`, which wraps the given WSGI
- application and sets up the given limits.
+ """Initialize class, wrap WSGI app, and set up given limits.
- @param application: WSGI application to wrap
- @param limits: String describing limits
- @param limiter: String identifying class for representing limits
+ :param application: WSGI application to wrap
+ :param limits: String describing limits
+ :param limiter: String identifying class for representing limits
Other parameters are passed to the constructor for the limiter.
"""
@abc.abstractmethod
def update_container_name(self, backup, container):
- """This method exists so that sub-classes can override the container name
- as it comes in to the driver in the backup object. Implementations
- should return None if no change to the container name is desired.
+ """Allow sub-classes to override container name.
+
+ This method exists so that sub-classes can override the container name
+ as it comes in to the driver in the backup object. Implementations
+ should return None if no change to the container name is desired.
"""
return
@abc.abstractmethod
def get_extra_metadata(self, backup, volume):
- """This method allows for collection of extra metadata in prepare_backup()
- which will be passed to get_object_reader() and get_object_writer().
- Subclass extensions can use this extra information to optimize
- data transfers. Return a json serializable object.
+ """Return extra metadata to use in prepare_backup.
+
+ This method allows for collection of extra metadata in prepare_backup()
+ which will be passed to get_object_reader() and get_object_writer().
+ Subclass extensions can use this extra information to optimize
+ data transfers. Return a json serializable object.
"""
return
return swift_object_names
def get_object_writer(self, container, object_name, extra_metadata=None):
- """Returns a writer object that stores a chunk of volume data in a
- Swift object store.
+ """Return a writer object.
+
+ Returns a writer object that stores a chunk of volume data in a
+ Swift object store.
"""
return self.SwiftObjectWriter(container, object_name, self.conn)
def get_object_reader(self, container, object_name, extra_metadata=None):
- """Returns a reader object that retrieves a chunk of backed-up volume data
- from a Swift object store.
+ """Return reader object.
+
+ Returns a reader object that retrieves a chunk of backed-up volume data
+ from a Swift object store.
"""
return self.SwiftObjectReader(container, object_name, self.conn)
backup.save()
def init_host(self):
- """Do any initialization that needs to be run if this is a
- standalone service.
- """
+ """Run initialization needed for a standalone service."""
ctxt = context.get_admin_context()
for mgr in self.volume_managers.values():
def param2id(object_id):
"""Helper function to convert various id types to internal id.
- args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
+
+ :param object_id: e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
"""
if uuidutils.is_uuid_like(object_id):
return object_id
@args('--path', required=True, help='Script path')
def script(self, path):
- """Runs the script from the specified path with flags set properly.
- arguments: path
- """
+ """Runs the script from the specified path with flags set properly."""
exec(compile(open(path).read(), path, 'exec'), locals(), globals())
@args('zone', nargs='?', default=None,
help='Availability Zone (default: %(default)s)')
def list(self, zone=None):
- """Show a list of all physical hosts. Filter by zone.
+ """Show a list of all physical hosts.
+
+ Can be filtered by zone.
args: [zone]
"""
print(_("%(host)-25s\t%(zone)-15s") % {'host': 'host', 'zone': 'zone'})
@args('volume_id',
help='Volume ID to be deleted')
def delete(self, volume_id):
- """Delete a volume, bypassing the check that it
- must be available.
- """
+ """Delete a volume, bypassing the check that it must be available."""
ctxt = context.get_admin_context()
volume = db.volume_get(ctxt, param2id(volume_id))
host = vutils.extract_host(volume['host']) if volume['host'] else None
"""Methods for managing backups."""
def list(self):
- """List all backups (including ones in progress) and the host
+ """List all backups.
+
+ List all backups (including ones in progress) and the host
on which the backup operation is running.
"""
ctxt = context.get_admin_context()
def methods_of(obj):
- """Get all callable methods of an object that don't start with underscore
- returns a list of tuples of the form (method_name, method)
+ """Return non-private methods from an object.
+
+ Get all callable methods of an object that don't start with underscore
+ :return: a list of tuples of the form (method_name, method)
"""
result = []
for i in dir(obj):
def volume_type_extra_specs_update_or_create(context,
volume_type_id,
extra_specs):
- """Create or update volume type extra specs. This adds or modifies the
- key/value pairs specified in the extra specs dict argument
+ """Create or update volume type extra specs.
+
+ This adds or modifies the key/value pairs specified in the extra specs dict
+ argument.
"""
return IMPL.volume_type_extra_specs_update_or_create(context,
volume_type_id,
def volume_glance_metadata_copy_from_volume_to_volume(context,
src_volume_id,
volume_id):
- """Update the Glance metadata for a volume by copying all of the key:value
+ """Update the Glance metadata for a volume.
+
+ Update the Glance metadata for a volume by copying all of the key:value
pairs from the originating volume.
This is so that a volume created from the volume (clone) will retain the
@require_context
@require_volume_exists
def volume_glance_metadata_copy_to_volume(context, volume_id, snapshot_id):
- """Update the Glance metadata from a volume (created from a snapshot) by
+ """Update Glance metadata from a volume.
+
+ Update the Glance metadata from a volume (created from a snapshot) by
copying all of the key:value pairs from the originating snapshot.
This is so that the Glance metadata from the original volume is retained.
def downgrade(migrate_engine):
- """Don't delete the 'default' entries at downgrade time.
+ """Downgrade.
+
+ Don't delete the 'default' entries at downgrade time.
We don't know if the user had default entries when we started.
If they did, we wouldn't want to remove them. So, the safest
thing to do is just leave the 'default' entries at downgrade time.
return base_image_meta
def get_location(self, context, image_id):
- """Returns a tuple of the direct url and locations representing the
- backend storage location, or (None, None) if these attributes are not
- shown by Glance.
+ """Get backend storage location url.
+
+ Returns a tuple containing the direct url and locations representing
+ the backend storage location, or (None, None) if these attributes are
+ not shown by Glance.
"""
if CONF.glance_api_version == 1:
# image location not available in v1
class NotImplementedKeyManager(key_mgr.KeyManager):
- """Key Manager Interface that raises NotImplementedError for all operations
- """
+ """Key Manager interface that raises NotImplementedError"""
def create_key(self, ctxt, algorithm='AES', length=256, expiration=None,
**kwargs):
class CinderPersistentObject(object):
"""Mixin class for Persistent objects.
+
This adds the fields that we use in common for all persistent objects.
"""
fields = {
def get_project_quotas(self, context, resources, project_id,
quota_class=None, defaults=True,
usages=True, parent_project_id=None):
- """Given a list of resources, retrieve the quotas for the given
+ """Retrieve quotas for a project.
+
+ Given a list of resources, retrieve the quotas for the given
project.
:param context: The request context, for access checks.
# License for the specific language governing permissions and limitations
# under the License.
-"""
-The FilterScheduler is for creating volumes.
+"""The FilterScheduler is for creating volumes.
+
You can customize this scheduler by specifying your own volume Filters and
Weighing Functions.
"""
self.max_attempts = self._max_attempts()
def schedule(self, context, topic, method, *args, **kwargs):
- """The schedule() contract requires we return the one
- best-suited host for this request.
- """
+ """Schedule contract that returns best-suited host for this request."""
self._schedule(context, topic, *args, **kwargs)
def _get_configuration_options(self):
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
- """Stuff things into filter_properties. Can be overridden in a
- subclass to add more data.
+ """Stuff things into filter_properties.
+
+ Can be overridden in a subclass to add more data.
"""
vol = request_spec['volume_properties']
filter_properties['size'] = vol['size']
def _post_select_populate_filter_properties(self, filter_properties,
host_state):
- """Add additional information to the filter properties after a host has
+ """Populate filter properties with additional information.
+
+ Add additional information to the filter properties after a host has
been selected by the scheduling process.
"""
# Add a retry entry for the selected volume backend:
self._add_retry_host(filter_properties, host_state.host)
def _add_retry_host(self, filter_properties, host):
- """Add a retry entry for the selected volume backend. In the event that
- the request gets re-scheduled, this entry will signal that the given
- backend has already been tried.
+ """Add a retry entry for the selected volume backend.
+
+ In the event that the request gets re-scheduled, this entry will signal
+ that the given backend has already been tried.
"""
retry = filter_properties.get('retry', None)
if not retry:
return max_attempts
def _log_volume_error(self, volume_id, retry):
- """If the request contained an exception from a previous volume
- create operation, log it to aid debugging
- """
+ """Log requests with exceptions from previous volume operations."""
exc = retry.pop('exc', None) # string-ified exception from volume
if not exc:
return # no exception info from a previous attempt, skip
'exc': exc})
def _populate_retry(self, filter_properties, properties):
- """Populate filter properties with history of retries for this
- request. If maximum retries is exceeded, raise NoValidHost.
+ """Populate filter properties with history of retries for request.
+
+ If maximum retries is exceeded, raise NoValidHost.
"""
max_attempts = self.max_attempts
retry = filter_properties.pop('retry', {})
def _get_weighted_candidates(self, context, request_spec,
filter_properties=None):
- """Returns a list of hosts that meet the required specs,
- ordered by their fitness.
+ """Return a list of hosts that meet required specs.
+
+ Returned list is ordered by their fitness.
"""
elevated = context.elevated()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Weighers that weigh hosts by volume number in backends:
+"""Weighers that weigh hosts by volume number in backends:
1. Volume Number Weigher. Weigh hosts by their volume number.
def _weigh_object(self, host_state, weight_properties):
"""Less volume number weights win.
+
We want spreading to be the default.
"""
context = weight_properties['context']
class MetadataXMLDeserializerTest(test.TestCase):
def test_xml_meta_parsing_special_character(self):
- """Test that when a SaxParser splits a string containing special
+ """Test XML meta parsing with special characters.
+
+ Test that when a SaxParser splits a string containing special
characters into multiple childNodes there are no issues extracting
the text.
"""
def wire_HTTPConnection_to_WSGI(host, app):
- """Monkeypatches HTTPConnection so that if you try to connect to host, you
+ """Monkeypatches HTTPConnection.
+
+ Monkeypatches HTTPConnection so that if you try to connect to host, you
are instead routed straight to the given WSGI app.
After calling this method, when any code calls
can restore the default HTTPConnection interface (for all hosts).
"""
class HTTPConnectionDecorator(object):
- """Wraps the real HTTPConnection class so that when you instantiate
+ """Decorator to mock the HTTPConnection class.
+
+ Wraps the real HTTPConnection class so that when you instantiate
the class you might instead get a fake instance.
"""
return jsonutils.dumps({"verb": verb, "path": path})
def _request(self, verb, url, username=None):
- """Make sure that POSTing to the given url causes the given username
+ """POST request to given url by given username.
+
+ Make sure that POSTing to the given url causes the given username
to perform the given action. Make the internal rate limiter return
delay and make sure that the WSGI app returns the correct response.
"""
def wire_HTTPConnection_to_WSGI(host, app):
- """Monkeypatches HTTPConnection so that if you try to connect to host, you
+ """Monkeypatches HTTPConnection.
+
+ Monkeypatches HTTPConnection so that if you try to connect to host, you
are instead routed straight to the given WSGI app.
After calling this method, when any code calls
can restore the default HTTPConnection interface (for all hosts).
"""
class HTTPConnectionDecorator(object):
- """Wraps the real HTTPConnection class so that when you instantiate
+ """Decorator to mock the HTTPConecction class.
+
+ Wraps the real HTTPConnection class so that when you instantiate
the class you might instead get a fake instance.
"""
class SchedulerDriverBaseTestCase(SchedulerTestCase):
- """Test cases for base scheduler driver class methods
- that can't will fail if the driver is changed.
+ """Test schedule driver class.
+
+ Test cases for base scheduler driver class methods
+ that will fail if the driver is changed.
"""
def test_unimplemented_schedule(self):
class TestIserAdmDriver(tf.TargetDriverFixture):
- """Unit tests for the deprecated ISERTgtAdm flow
- """
+ """Unit tests for the deprecated ISERTgtAdm flow"""
def setUp(self):
super(TestIserAdmDriver, self).setUp()
class TestIserTgtDriver(tf.TargetDriverFixture):
- """Unit tests for the iSER TGT flow
- """
+ """Unit tests for the iSER TGT flow"""
def setUp(self):
super(TestIserTgtDriver, self).setUp()
class TestIserLioAdmDriver(tf.TargetDriverFixture):
- """Unit tests for the iSER LIO flow
- """
+ """Unit tests for the iSER LIO flow"""
def setUp(self):
super(TestIserLioAdmDriver, self).setUp()
self.configuration.iscsi_protocol = 'iser'
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Tests for Backup code.
-
-"""
+"""Tests for Backup code."""
import ddt
import tempfile
@mock.patch.object(lvm.LVMVolumeDriver, 'delete_snapshot')
@mock.patch.object(lvm.LVMVolumeDriver, 'delete_volume')
def test_init_host(self, mock_delete_volume, mock_delete_snapshot):
- """Make sure stuck volumes and backups are reset to correct
+ """Test stuck volumes and backups.
+
+ Make sure stuck volumes and backups are reset to correct
states when backup_manager.init_host() is called
"""
vol1_id = self._create_volume_db_entry()
self.assertEqual(2, notify.call_count)
def test_restore_backup_with_bad_volume_status(self):
- """Test error handling when restoring a backup to a volume
+ """Test error handling.
+
+ Test error handling when restoring a backup to a volume
with a bad status.
"""
vol_id = self._create_volume_db_entry(status='available', size=1)
self.assertEqual(backup['status'], 'available')
def test_restore_backup_with_bad_backup_status(self):
- """Test error handling when restoring a backup with a backup
+ """Test error handling.
+
+ Test error handling when restoring a backup with a backup
with a bad status.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
self.assertTrue(_mock_volume_restore.called)
def test_restore_backup_with_bad_service(self):
- """Test error handling when attempting a restore of a backup
+ """Test error handling.
+
+ Test error handling when attempting a restore of a backup
with a different service to that used to create the backup.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
self.assertEqual(2, notify.call_count)
def test_delete_backup_with_bad_backup_status(self):
- """Test error handling when deleting a backup with a backup
+ """Test error handling.
+
+ Test error handling when deleting a backup with a backup
with a bad status.
"""
vol_id = self._create_volume_db_entry(size=1)
self.assertEqual(backup['status'], 'error')
def test_delete_backup_with_bad_service(self):
- """Test error handling when attempting a delete of a backup
+ """Test error handling.
+
+ Test error handling when attempting a delete of a backup
with a different service to that used to create the backup.
"""
vol_id = self._create_volume_db_entry(size=1)
self.assertEqual(backup['status'], 'error')
def test_delete_backup_with_no_service(self):
- """Test error handling when attempting a delete of a backup
+ """Test error handling.
+
+ Test error handling when attempting a delete of a backup
with no service defined for that backup, relates to bug #1162908
"""
vol_id = self._create_volume_db_entry(size=1)
self.assertEqual(backups[0].id, b2.id)
def test_backup_get_all_by_project_with_deleted(self):
- """Test deleted backups don't show up in backup_get_all_by_project.
- Unless context.read_deleted is 'yes'.
+ """Test deleted backups.
+
+ Test deleted backups don't show up in backup_get_all_by_project.
+ Unless context.read_deleted is 'yes'.
"""
backups = db.backup_get_all_by_project(self.ctxt, 'fake')
self.assertEqual(len(backups), 0)
self.assertEqual(len(backups), 2)
def test_backup_get_all_by_host_with_deleted(self):
- """Test deleted backups don't show up in backup_get_all_by_project.
- Unless context.read_deleted is 'yes'
+ """Test deleted backups.
+
+ Test deleted backups don't show up in backup_get_all_by_project.
+ Unless context.read_deleted is 'yes'
"""
backups = db.backup_get_all_by_host(self.ctxt, 'testhost')
self.assertEqual(len(backups), 0)
backup_mgr.driver_name)
def test_export_record_with_bad_service(self):
- """Test error handling when attempting an export of a backup
+ """Test error handling.
+
+ Test error handling when attempting an export of a backup
record with a different service to that used to create the backup.
"""
vol_id = self._create_volume_db_entry(size=1)
backup)
def test_export_record_with_bad_backup_status(self):
- """Test error handling when exporting a backup record with a backup
+ """Test error handling.
+
+ Test error handling when exporting a backup record with a backup
with a bad status.
"""
vol_id = self._create_volume_db_entry(status='available',
self.assertEqual(backup['size'], vol_size)
def test_import_record_with_bad_service(self):
- """Test error handling when attempting an import of a backup
+ """Test error handling.
+
+ Test error handling when attempting an import of a backup
record with a different service to that used to create the backup.
"""
export = self._create_exported_record_entry()
backup_hosts_expect)
def test_import_record_with_invalid_backup(self):
- """Test error handling when attempting an import of a backup
+ """Test error handling.
+
+ Test error handling when attempting an import of a backup
record where the backup driver returns an exception.
"""
export = self._create_exported_record_entry()
self.assertEqual(backup['size'], vol_size)
def test_import_record_with_verify_invalid_backup(self):
- """Test error handling when attempting an import of a backup
+ """Test error handling.
+
+ Test error handling when attempting an import of a backup
record where the backup driver returns an exception.
"""
vol_size = 1
'get_volume_type_extra_specs',
return_value={'volume_backend_name': 'FCFAST'})
def test_delete_volume_fast_notfound(self, _mock_volume_type):
- """We do not set the provider location.
- """
+ """"Test delete volume with volume not found."""
notfound_delete_vol = {}
notfound_delete_vol['name'] = 'notfound_delete_vol'
notfound_delete_vol['id'] = '10'
@mock.patch('random.randint',
mock.Mock(return_value=0))
def test_initialize_connection_exist(self):
- """A LUN is added to the SG right before the attach,
+ """Test if initialize connection exists.
+
+ A LUN is added to the SG right before the attach,
it may not exists in the first SG query
"""
# Test for auto registration
@mock.patch('random.randint',
mock.Mock(return_value=0))
def test_initialize_connection_no_hlu_left_1(self):
- """There is no hlu per the first SG query
+ """Test initialize connection with no hlu per first SG query.
+
+ There is no hlu per the first SG query
But there are hlu left after the full poll
"""
# Test for auto registration
@mock.patch('random.randint',
mock.Mock(return_value=0))
def test_initialize_connection_no_hlu_left_2(self):
- """There is no usable hlu for the SG
- """
+ """Test initialize connection with no hlu left."""
# Test for auto registration
self.configuration.initiator_auto_registration = True
self.configuration.max_luns_per_storage_group = 2
mock.Mock(return_value={'storagetype:provisioning': 'deduplicated',
'storagetype:pool': 'unit_test_pool'}))
def test_retype_pool_changed_dedup_to_compressed_auto(self):
- """Unit test for retype dedup to compressed and auto tiering
+ """Test retype from dedup to compressed and auto tiering.
+
+ Unit test for retype dedup to compressed and auto tiering
and pool changed
"""
diff_data = {'encryption': {}, 'qos_specs': {},
"get_volume_type_extra_specs",
mock.Mock(return_value={'fast_cache_enabled': 'True'}))
def test_create_volume_with_fastcache(self):
- """Enable fastcache when creating volume."""
+ """Test creating volume with fastcache enabled."""
commands = [self.testData.NDU_LIST_CMD,
self.testData.POOL_PROPERTY_W_FASTCACHE_CMD,
self.testData.LUN_PROPERTY_ALL_CMD('vol_with_type'),
mock_mount.assert_called_once_with(self.TEST_EXPORT1, [])
def test_mount_glusterfs_should_reraise_exception_on_failure(self):
- """_mount_glusterfs should reraise exception if mount fails.
- """
+ """_mount_glusterfs should reraise exception if mount fails."""
drv = self._driver
with mock.patch.object(os_brick.remotefs.remotefs.RemoteFsClient,
self.assertTrue(has_volume)
def test_create_volume_from_snapshot_error_on_non_existing_snapshot(self):
- """Test create_volume_from_snapshot is error on non existing snapshot.
+ """Test create_volume_from_snapshot.
+
+ Test create_volume_from_snapshot is error on non existing snapshot.
"""
volume2 = fake_volume.fake_db_volume(**self._VOLUME2)
snapshot = fake_snapshot.fake_db_snapshot(**self._TEST_SNAPSHOT)
# under the License.
#
-"""
-Tests for the IBM FlashSystem volume driver.
-"""
+"""Tests for the IBM FlashSystem volume driver."""
import mock
from oslo_concurrency import processutils
return six.text_type(num)
def _cmd_lshost(self, **kwargs):
- """svcinfo lshost -delim !
+ """lshost command.
+
+ svcinfo lshost -delim !
svcinfo lshost -delim ! <host>
"""
if 'obj' not in kwargs:
return ('%s' % '\n'.join(objrows), '')
def _cmd_lsnode(self, **kwargs):
- """svcinfo lsnode -delim !
+ """lsnode command.
+
+ svcinfo lsnode -delim !
svcinfo lsnode -delim ! <node>
"""
return ('', '')
def _cmd_mkvdisk(self, **kwargs):
- """svctask mkvdisk -name <name> -mdiskgrp <mdiskgrp> -iogrp <iogrp>
+ """mkvdisk command.
+
+ svctask mkvdisk -name <name> -mdiskgrp <mdiskgrp> -iogrp <iogrp>
-size <size> -unit <unit>
"""
return ('', '')
def _cmd_mkhost(self, **kwargs):
- """svctask mkhost -force -hbawwpn <wwpn> -name <host_name>
+ """mkhost command.
+
+ svctask mkhost -force -hbawwpn <wwpn> -name <host_name>
svctask mkhost -force -iscsiname <initiator> -name <host_name>
"""
return (out, err)
def _cmd_addhostport(self, **kwargs):
- """svctask addhostport -force -hbawwpn <wwpn> <host>
+ """addhostport command.
+
+ svctask addhostport -force -hbawwpn <wwpn> <host>
svctask addhostport -force -iscsiname <initiator> <host>
"""
"Consistency Group created failed")
def test_create_consistencygroup_fail_on_cg_not_empty(self):
- """Test that create_consistencygroup fail
- when consistency group is not empty.
- """
+ """Test create_consistencygroup with empty consistency group."""
self.driver.do_setup(None)
'Consistency Group deleted failed')
def test_delete_consistencygroup_fail_on_volume_not_delete(self):
- """Test that delete_consistencygroup return fail
- when the volume can not be deleted.
- """
+ """Test delete_consistencygroup with volume delete failure."""
self.driver.do_setup(None)
self.driver.delete_consistencygroup(ctxt, CONSISTGROUP)
def test_delete_cgsnapshot_fail_on_snapshot_not_delete(self):
- """Test that delete_cgsnapshot return fail
- when the snapshot can not be deleted.
- """
+ """Test delete_cgsnapshot when the snapshot cannot be deleted."""
self.driver.do_setup(None)
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Tests for NetApp volume driver
-
-"""
+"""Tests for NetApp volume driver."""
from lxml import etree
import mock
class NetAppDirect7modeISCSIDriverTestCase_NV(
NetAppDirectCmodeISCSIDriverTestCase):
- """Test case for NetAppISCSIDriver
- No vfiler
- """
+ """Test case for NetAppISCSIDriver without vfiler"""
def setUp(self):
super(NetAppDirect7modeISCSIDriverTestCase_NV, self).setUp()
class NetAppDirect7modeISCSIDriverTestCase_WV(
NetAppDirect7modeISCSIDriverTestCase_NV):
- """Test case for NetAppISCSIDriver
- With vfiler
- """
+ """Test case for NetAppISCSIDriver with vfiler"""
def setUp(self):
super(NetAppDirect7modeISCSIDriverTestCase_WV, self).setUp()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Tests for NetApp e-series iscsi volume driver.
-"""
+"""Tests for NetApp e-series iscsi volume driver."""
import copy
import json
class FakeEseriesHTTPSession(object):
- """A fake requests.Session for netapp tests.
- """
+ """A fake requests.Session for netapp tests."""
def __init__(self):
self.handler = FakeEseriesServerHandler()
def _create_volume_from_image(self, expected_status, raw=False,
clone_error=False):
- """Try to clone a volume from an image, and check the status
- afterwards.
+ """Try to clone a volume from an image, and check status afterwards.
NOTE: if clone_error is True we force the image type to raw otherwise
clone_image is not called
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Unit tests for the Scality Rest Block Volume Driver.
-"""
+"""Unit tests for the Scality Rest Block Volume Driver."""
import mock
from oslo_concurrency import processutils
self._driver.do_setup, None)
def test_volume_create(self):
- """The volume shall be added in the internal
- state through fake_execute
+ """"Test volume create.
+
+ The volume will be added in the internal state through fake_execute.
"""
volume = {'name': 'volume-test', 'id': 'test', 'size': 4 * units.Gi}
old_vols = self._volumes
class BrickUtils(test.TestCase):
- """Unit test to test the brick utility
- wrapper functions.
- """
+ """Unit test to test the brick utility wrapper functions."""
@mock.patch('cinder.utils.CONF')
@mock.patch('os_brick.initiator.connector.get_connector_properties')
@mock.patch('socket.gethostbyaddr')
def test_update_volume_stats(self, mock_gethost):
- """Makes a mock query to the backend to collect
- stats on all physical devices.
+ """Test Update Volume Stats.
+
+ Makes a mock query to the backend to collect stats on all physical
+ devices.
"""
def gethostbyaddr(addr):
self.assertIsNone(result)
def test_get_active_fc_targets(self):
- """Makes a mock query to the backend to collect
- all the physical adapters and extract the WWNs
+ """Test Get Active FC Targets.
+
+ Makes a mock query to the backend to collect all the physical
+ adapters and extract the WWNs.
"""
conf = {
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Tests for Volume Code.
-
-"""
+"""Tests for Volume Code."""
import datetime
import os
@mock.patch.object(keymgr, 'API', fake_keymgr.fake_api)
def test_create_volume_from_snapshot_with_encryption(self):
- """Test volume can be created from a snapshot of
- an encrypted volume.
- """
+ """Test volume can be created from a snapshot of an encrypted volume"""
ctxt = context.get_admin_context()
db.volume_type_create(ctxt,
self.volume.delete_volume(self.context, volume['id'])
def test_create_volume_from_image_exception(self):
- """Verify that create volume from a non-existing image, the volume
+ """Test create volume from a non-existing image.
+
+ Verify that create volume from a non-existing image, the volume
status is 'error' and is not bootable.
"""
dst_fd, dst_path = tempfile.mkstemp()
{'_pool0': {'allocated_capacity_gb': 1}})
def test_create_volume_from_exact_sized_image(self):
- """Verify that an image which is exactly the same size as the
+ """Test create volume from an image of the same size.
+
+ Verify that an image which is exactly the same size as the
volume, will work correctly.
"""
try:
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Unit Tests for volume types code
-"""
+"""Unit Tests for volume types code."""
import datetime
conf_fixture.def_vol_type)
def test_default_volume_type_missing_in_db(self):
- """Ensures proper exception raised if default volume type
+ """Test default volume type is missing in database.
+
+ Ensures proper exception raised if default volume type
is not in database.
"""
default_vol_type = volume_types.get_default_volume_type()
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Unit tests for Oracle's ZFSSA Cinder volume driver
-"""
+"""Unit tests for Oracle's ZFSSA Cinder volume driver."""
import json
class FakeZFSSA(object):
- """Fake ZFS SA"""
+ """Fake ZFS SA."""
def __init__(self):
self.user = None
self.host = None
class FakeNFSZFSSA(FakeZFSSA):
- """Fake ZFS SA for the NFS Driver
- """
+ """Fake ZFS SA for the NFS Driver."""
def set_webdav(self, https_path, auth_str):
self.webdavclient = https_path
def deepcopy_return_value_class_decorator(cls):
- """Wraps all 'non-protected' methods of a class with the
+ """Wraps 'non-protected' methods of a class with decorator.
+
+ Wraps all 'non-protected' methods of a class with the
deepcopy_return_value_method_decorator decorator.
"""
class NewClass(cls):
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Mock unit tests for the NetApp E-series iscsi driver
-"""
+"""Mock unit tests for the NetApp E-series iscsi driver."""
import copy
def test_unmap_volume_from_host_volume_mapped_to_host_group_but_not_host(
self):
- """Ensure an error is raised if the specified host is not in the
+ """Test volume mapped to host not in specified host group.
+
+ Ensure an error is raised if the specified host is not in the
host group the volume is mapped to.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
self.assertFalse(self.client.delete_volume_mapping.called)
def test_unmap_volume_from_host_volume_mapped_to_outside_host_group(self):
- """Ensure we raise error when we find a volume is mapped to an unknown
+ """Test volume mapped to host group without host.
+
+ Ensure we raise error when we find a volume is mapped to an unknown
host group that does not have the host.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
def test_unmap_volume_from_host_volume_mapped_to_outside_host_group_w_host(
self):
- """Ensure we raise error when we find a volume is mapped to an unknown
+ """Test volume mapped to host in unknown host group.
+
+ Ensure we raise error when we find a volume is mapped to an unknown
host group that has the host.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
def test_map_volume_to_single_host_volume_mapped_to_multiattach_host_group(
self):
- """Should move mapping to target host if volume is not migrating or
+ """Test map volume to a single host.
+
+ Should move mapping to target host if volume is not migrating or
attached(in-use). If volume is not in use then it should not require a
mapping making it ok to sever the mapping to the host group.
"""
fake_mapping_to_host_group)
def test_map_volume_to_multiple_hosts_volume_mapped_to_another_host(self):
- """Should ensure both existing host and destination host are in
+ """Test that mapping moves to another host group.
+
+ Should ensure both existing host and destination host are in
multiattach host group and move the mapping to the host group.
"""
def test_map_volume_to_multiple_hosts_volume_mapped_to_another_host_with_lun_collision_with_source_host( # noqa
self):
- """Should fail attempting to move source host to multiattach host
+ """Test moving source host to multiattach host group.
+
+ Should fail attempting to move source host to multiattach host
group and raise an error.
"""
def test_map_volume_to_multiple_hosts_volume_mapped_to_another_host_with_lun_collision_with_dest_host( # noqa
self):
- """Should fail attempting to move destination host to multiattach host
+ """Test moving destination host to multiattach host group.
+
+ Should fail attempting to move destination host to multiattach host
group and raise an error.
"""
def test_map_volume_to_multiple_hosts_volume_mapped_to_foreign_host_group(
self):
- """Should raise an error stating the volume is mapped to an
+ """Test a target when the host is in a foreign host group.
+
+ Should raise an error stating the volume is mapped to an
unsupported host group.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
def test_map_volume_to_multiple_hosts_volume_mapped_to_host_in_foreign_host_group( # noqa
self):
- """Should raise an error stating the volume is mapped to a
+ """Test a target when the host is in a foreign host group.
+
+ Should raise an error stating the volume is mapped to a
host that is in an unsupported host group.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
def test_map_volume_to_multiple_hosts_volume_target_host_in_foreign_host_group( # noqa
self):
- """Should raise an error stating the target host is in an
+ """Test a target when the host is in a foreign host group.
+
+ Should raise an error stating the target host is in an
unsupported host group.
"""
fake_eseries_volume = copy.deepcopy(eseries_fakes.VOLUME)
class NetAppEseriesLibraryMultiAttachTestCase(test.TestCase):
- """Test driver behavior when the netapp_enable_multiattach
- configuration option is True.
+ """Test driver when netapp_enable_multiattach is enabled.
+
+ Test driver behavior when the netapp_enable_multiattach configuration
+ option is True.
"""
def setUp(self):
def xhtml_escape(value):
- """Escapes a string so it is valid within XML or XHTML.
-
- """
+ """Escapes a string so it is valid within XML or XHTML."""
return saxutils.escape(value, {'"': '"', "'": '''})
def monkey_patch():
- """If the CONF.monkey_patch set as True,
+ """Patches decorators for all functions in a specified module.
+
+ If the CONF.monkey_patch set as True,
this function patches a decorator
for all functions in specified modules.
Parameters of the decorator is as follows.
(See cinder.openstack.common.notifier.api.notify_decorator)
- name - name of the function
- function - object of the function
+ :param name: name of the function
+ :param function: object of the function
"""
# If CONF.monkey_patch is not True, this function do nothing.
if not CONF.monkey_patch:
def brick_get_connector_properties(multipath=False, enforce_multipath=False):
- """wrapper for the brick calls to automatically set
- the root_helper needed for cinder.
+ """Wrapper to automatically set root_helper in brick calls.
- :param multipath: A boolean indicating whether the connector can
- support multipath.
+ :param multipath: A boolean indicating whether the connector can
+ support multipath.
:param enforce_multipath: If True, it raises exception when multipath=True
is specified but multipathd is not running.
If False, it falls back to multipath=False
device_scan_attempts=3,
*args, **kwargs):
"""Wrapper to get a brick connector object.
+
This automatically populates the required protocol as well
as the root_helper needed to execute commands.
"""
def _get_disk_of_partition(devpath, st=None):
- """Returns a disk device path from a partition device path, and stat for
+ """Gets a disk device path and status from partition path.
+
+ Returns a disk device path from a partition device path, and stat for
the device. If devpath is not a partition, devpath is returned as it is.
For example, '/dev/sda' is returned for '/dev/sda1', and '/dev/disk1' is
for '/dev/disk1p1' ('p' is prepended to the partition number if the disk
def get_blkdev_major_minor(path, lookup_for_file=True):
- """Get the device's "major:minor" number of a block device to control
+ """Get 'major:minor' number of block device.
+
+ Get the device's 'major:minor' number of a block device to control
I/O ratelimit of the specified path.
If lookup_for_file is True and the path is a regular file, lookup a disk
device which the file lies on and returns the result for the device.
def check_string_length(value, name, min_length=0, max_length=None):
- """Check the length of specified string
+ """Check the length of specified string.
+
:param value: the value of the string
:param name: the name of the string
:param min_length: the min_length of the string
def remove_invalid_filter_options(context, filters,
allowed_search_options):
- """Remove search options that are not valid
- for non-admin API/context.
- """
+ """Remove search options that are not valid for non-admin API/context."""
+
if context.is_admin:
# Allow all options
return
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Handles all requests relating to volumes.
-"""
+"""Handles all requests relating to volumes."""
import collections
raise NotImplementedError()
def set_host_maintenance(self, context, host, mode):
- """Start/Stop host maintenance window. On start, it triggers
- volume evacuation.
+ """Start/Stop host maintenance window.
+
+ On start, it triggers volume evacuation.
"""
raise NotImplementedError()
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Configuration support for all drivers.
+"""Configuration support for all drivers.
This module allows support for setting configurations either from default
or from a particular FLAGS group, to be able to set multiple configurations
class Configuration(object):
def __init__(self, volume_opts, config_group=None):
- """This takes care of grafting the implementation's config
- values into the config group
+ """Initialize configuration.
+
+ This takes care of grafting the implementation's config
+ values into the config group
"""
self.config_group = config_group
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Drivers for volumes.
-"""
+"""Drivers for volumes."""
import abc
import time
@abc.abstractmethod
def create_volume(self, volume):
- """Creates a volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted.
+ """Creates a volume.
+
+ Can optionally return a Dictionary of changes to the volume object to
+ be persisted.
If volume_type extra specs includes
'capabilities:replication <is> True' the driver
return False
def get_volume_stats(self, refresh=False):
- """Return the current state of the volume service. If 'refresh' is
- True, run the update first.
+ """Return the current state of the volume service.
+
+ If 'refresh' is True, run the update first.
- For replication the following state should be reported:
- replication = True (None or false disables replication)
+ For replication the following state should be reported:
+ replication = True (None or false disables replication)
"""
return None
class VolumeDriver(ConsistencyGroupVD, TransferVD, ManageableVD, ExtendVD,
CloneableVD, CloneableImageVD, SnapshotVD, ReplicaVD,
RetypeVD, LocalVD, MigrateVD, BaseVD):
- """This class will be deprecated soon. Please us the abstract classes
- above for new drivers.
+ """This class will be deprecated soon.
+
+ Please use the abstract classes above for new drivers.
"""
def check_for_setup_error(self):
raise NotImplementedError()
class can help marking them and retrieve the actual used driver object.
"""
def _get_driver(self):
- """Returns the actual driver object. Can be overloaded by the proxy.
+ """Returns the actual driver object.
+
+ Can be overloaded by the proxy.
"""
return getattr(self, "driver", None)
pass
def create_export(self, context, volume):
- """Exports the volume. Can optionally return a Dictionary of changes
- to the volume object to be persisted.
+ """Exports the volume.
+
+ Can optionally return a Dictionary of changes to the volume object to
+ be persisted.
"""
pass
return data
def _override_params(self, default_dict, filtered_user_dict):
- """Override the default config values with user provided values.
- """
+ """Override the default config values with user provided values."""
if filtered_user_dict is None:
# Nothing to override
def _bytes_to_gb(self, spacestring):
"""Space is returned in a string like ...
+
7.38197504E8 Bytes
Need to split that apart and convert to GB.
- returns gbs in int form
+ :returns: gbs in int form
"""
try:
n = spacestring.split(' ', 1)
def _assert_enough_space_for_copy(self, volume_size):
"""The DotHill creates a snap pool before trying to copy the volume.
- The pool is 5.27GB or 20% of the volume size, whichever is larger.
+ The pool is 5.27GB or 20% of the volume size, whichever is larger.
Verify that we have enough space for the pool and then copy
"""
pool_size = max(volume_size * 0.2, 5.27)
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-Fibre Channel Driver for EMC VNX array based on CLI.
-
-"""
+"""Fibre Channel Driver for EMC VNX array based on CLI."""
from oslo_log import log as logging
self.cli.manage_existing(volume, existing_ref)
def manage_existing_get_size(self, volume, existing_ref):
- """Return size of volume to be managed by manage_existing.
- """
+ """Return size of volume to be managed by manage_existing."""
return self.cli.manage_existing_get_size(volume, existing_ref)
def create_consistencygroup(self, context, group):
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""
-iSCSI Drivers for EMC VNX array based on CLI.
-
-"""
+"""iSCSI Drivers for EMC VNX array based on CLI."""
from oslo_log import log as logging
self.cli.manage_existing(volume, existing_ref)
def manage_existing_get_size(self, volume, existing_ref):
- """Return size of volume to be managed by manage_existing.
- """
+ """Return size of volume to be managed by manage_existing."""
return self.cli.manage_existing_get_size(volume, existing_ref)
def create_consistencygroup(self, context, group):
self._gather_info()
def _gather_info(self):
- """Gather the relevant information for update_volume_stats.
- """
+ """Gather the relevant information for update_volume_stats."""
if hasattr(self.configuration, 'cinder_emc_config_file'):
self.pool_info['config_file'] = (
self.configuration.cinder_emc_config_file)
return modifiedVolumeDict
def update_volume_stats(self):
- """Retrieve stats info.
- """
+ """Retrieve stats info."""
if self.pool_info['is_v3']:
location_info, total_capacity_gb, free_capacity_gb = (
return self.common.manage_existing_get_size(volume, external_ref)
def unmanage(self, volume):
- """Export VMAX volume from Cinder, leave the volume intact on the
- backend array.
- """
+ """Export VMAX volume and leave volume intact on the backend array."""
return self.common.unmanage(volume)
def update_consistencygroup(self, context, group,
self.protocol = prtcl
def find_storage_configuration_service(self, conn, storageSystemName):
- """Given the storage system name, get the storage configuration
- service.
+ """Get storage configuration service with given storage system name.
:param conn: connection to the ecom server
:param storageSystemName: the storage system name
def get_associated_replication_from_source_volume(
self, conn, storageSystem, sourceDeviceId):
- """Given the source volume device ID, find associated replication
+ """Get associated replication from source volume.
+
+ Given the source volume device ID, find associated replication
storage synchronized instance names.
:param conn: connection to the ecom server
return out, rc
def _toggle_sp(self):
- """This function toggles the storage IP
+ """Toggle the storage IP.
+
Address between primary IP and secondary IP, if no SP IP address has
exchanged, return False, otherwise True will be returned.
"""
return True
def get_enablers_on_array(self, poll=False):
- """The function would get all the enabler installed
- on array.
- """
+ """The function would get all the enablers installed on array."""
enablers = []
cmd_list = ('ndu', '-list')
out, rc = self.command_execute(*cmd_list, poll=poll)
return enablers
def enable_or_disable_compression_on_lun(self, volumename, compression):
- """The function will enable or disable the compression
- on lun
- """
+ """The function will enable or disable the compression on lun."""
lun_data = self.get_lun_by_name(volumename)
command_compression_cmd = ('compression', '-' + compression,
def get_volume_stats(self, refresh=False):
"""Get volume stats.
+
If 'refresh' is True, run update the stats first.
"""
if refresh:
}
def _get_iscsi_properties(self, lunmap):
- """Gets iscsi configuration
+ """Gets iscsi configuration.
+
:target_discovered: boolean indicating whether discovery was used
:target_iqn: the IQN of the iSCSI target
:target_portal: the portal of the iSCSI target
class GlusterfsDriver(remotefs_drv.RemoteFSSnapDriver, driver.CloneableVD,
driver.ExtendVD):
- """Gluster based cinder driver. Creates file on Gluster share for using it
- as block device on hypervisor.
+ """Gluster based cinder driver.
+
+ Creates file on Gluster share for using it as block device on hypervisor.
Operations such as create/delete/extend volume/snapshot use locking on a
per-process basis to prevent multiple threads from modifying qcow2 chains
def _ensure_share_mounted(self, glusterfs_share):
"""Mount GlusterFS share.
+
:param glusterfs_share: string
"""
mount_path = self._get_mount_point_for_share(glusterfs_share)
def _find_share(self, volume_size_for):
"""Choose GlusterFS share among available ones for given volume size.
+
Current implementation looks for greatest capacity.
:param volume_size_for: int size in GB
"""
def set_targetsecret(self, cmd, ip0, user, pw, targetalias, hdp, secret):
"""Sets the chap secret for the specified target.
+
:param ip0: string IP address of controller
:param user: string user authentication for array
:param pw: string password authentication for array
def get_targetsecret(self, cmd, ip0, user, pw, targetalias, hdp):
"""Returns the chap secret for the specified target.
+
:param ip0: string IP address of controller
:param user: string user authentication for array
:param pw: string password authentication for array
return conf
def _get_service(self, volume):
- """Get the available service parameters for a given volume using
- its type.
- :param volume: dictionary volume reference
+ """Get available service parameters.
+
+ Get the available service parameters for a given volume using its type.
+ :param volume: dictionary volume reference
"""
label = utils.extract_host(volume['host'], level='pool')
def _id_to_vol(self, volume_id):
"""Given the volume id, retrieve the volume object from database.
- :param volume_id: volume id string
+
+ :param volume_id: volume id string
"""
vol = self.db.volume_get(self.context, volume_id)
def _update_vol_location(self, volume_id, loc):
"""Update the provider location.
- :param volume_id: volume id string
- :param loc: string provider location value
+
+ :param volume_id: volume id string
+ :param loc: string provider location value
"""
update = {'provider_location': loc}
def create_export(self, context, volume):
"""Create an export. Moved to initialize_connection.
- :param context:
- :param volume: volume reference
+
+ :param context:
+ :param volume: volume reference
"""
name = volume['name']
def remove_export(self, context, volume):
"""Disconnect a volume from an attached instance.
- :param context: context
- :param volume: dictionary volume reference
+
+ :param context: context
+ :param volume: dictionary volume reference
"""
provider = volume['provider_location']
def create_volume(self, volume):
"""Create a LU on HNAS.
- :param volume: ditctionary volume reference
+
+ :param volume: dictionary volume reference
"""
service = self._get_service(volume)
def create_cloned_volume(self, dst, src):
"""Create a clone of a volume.
- :param dst: ditctionary destination volume reference
- :param src: ditctionary source volume reference
+
+ :param dst: ditctionary destination volume reference
+ :param src: ditctionary source volume reference
"""
if src['size'] != dst['size']:
def delete_volume(self, volume):
"""Delete an LU on HNAS.
+
:param volume: dictionary volume reference
"""
def create_snapshot(self, snapshot):
"""Create a snapshot.
+
:param snapshot: dictionary snapshot reference
"""
class HDSNFSDriver(nfs.NfsDriver):
"""Base class for Hitachi NFS driver.
+
Executes commands relating to Volumes.
Version 1.0.0: Initial driver version
return vol
def _get_service(self, volume):
- """Get the available service parameters for a given volume using
- its type.
+ """Get service parameters.
+
+ Get the available service parameters for a given volume using
+ its type.
:param volume: dictionary volume reference
"""
time.sleep(tries ** 2)
def _get_volume_path(self, nfs_share, volume_name):
- """Get volume path (local fs path) for given volume name on given nfs
- share.
+ """Get volume path (local fs path) for given name on given nfs share.
:param nfs_share string, example 172.18.194.100:/var/nfs
:param volume_name string,
def _get_hdr_dic(self, header, row, delim):
"""Return CLI row data as a dictionary indexed by names from header.
- string. The strings are converted to columns using the delimiter in
- delim.
+
+ The strings are converted to columns using the delimiter in delim.
"""
attributes = header.split(delim)
@fczm_utils.AddFCZone
@utils.synchronized('storwize-host', external=True)
def initialize_connection(self, volume, connector):
- """Perform the necessary work so that an iSCSI/FC connection can
- be made.
+ """Perform necessary work to make an iSCSI/FC connection.
To be able to create an iSCSI/FC connection from a given host to a
volume, we must:
time.sleep(tries ** 2)
def _get_volume_path(self, nfs_share, volume_name):
- """Get volume path (local fs path) for given volume name on given nfs
- share.
+ """Get volume path.
- @param nfs_share string, example 172.18.194.100:/var/nfs
- @param volume_name string,
+ Get volume path (local fs path) for given volume name on given nfs
+ share.
+ :param nfs_share: string, example 172.18.194.100:/var/nfs
+ :param volume_name: string,
example volume-91ee65ec-c473-4391-8c09-162b00c68a8c
"""
class NfsDriver(driver.ExtendVD, remotefs.RemoteFSDriver):
- """NFS based cinder driver. Creates file on NFS share for using it
- as block device on hypervisor.
+ """NFS based cinder driver.
+
+ Creates file on NFS share for using it as block device on hypervisor.
"""
driver_volume_type = 'nfs'
return ''.join(random.sample(char_set, length))
def _clone_volume_from_snapshot(self, volume, snapshot):
- """Clonevolume from snapshot. Extend the volume if the
- size of the volume is more than the snapshot
+ """Clone volume from snapshot.
+
+ Extend the volume if the size of the volume is more than the snapshot.
"""
reserve = not self.configuration.san_thin_provision
self.APIExecutor.clone_vol(volume, snapshot, reserve)
def _response_checker(func):
- """Decorator function to check if the response
- of an API is positive
- """
+ """Decorator function to check if the response of an API is positive."""
@functools.wraps(func)
def inner_response_checker(self, *args, **kwargs):
response = func(self, *args, **kwargs)
def _connection_checker(func):
- """Decorator to re-establish and
- re-run the api if session has expired.
- """
+ """Decorator to re-establish and re-run the api if session has expired."""
@functools.wraps(func)
def inner_connection_checker(self, *args, **kwargs):
for attempts in range(2):
super(DPLFCDriver, self).__init__(*args, **kwargs)
def _get_fc_channel(self):
- """return :
- fcInfos[uuid]
- fcInfo[uuid]['display_name']
- fcInfo[uuid]['display_description']
- fcInfo[uuid]['hardware_address']
- fcInfo[uuid]['type']
- fcInfo[uuid]['speed']
- fcInfo[uuid]['state']
+ """Get FibreChannel info.
+
+ :returns: fcInfos[uuid]
+ fcInfo[uuid]['display_name']
+ fcInfo[uuid]['display_description']
+ fcInfo[uuid]['hardware_address']
+ fcInfo[uuid]['type']
+ fcInfo[uuid]['speed']
+ fcInfo[uuid]['state']
"""
output = None
fcInfos = {}
return fcInfos
def _get_targets(self):
- """return::
- targetInfos[uuid] = targetInfo
- targetInfo['targetUuid']
- targetInfo['targetName']
- targetInfo['targetAddr']
+ """Get targets.
+
+ :returns: targetInfos[uuid] = targetInfo
+ targetInfo['targetUuid']
+ targetInfo['targetName']
+ targetInfo['targetAddr']
"""
output = None
targetInfos = {}
return pools
def _update_volume_stats(self, refresh=False):
- """Return the current state of the volume service. If 'refresh' is
- True, run the update first.
+ """Return the current state of the volume service.
+
+ If 'refresh' is True, run the update first.
"""
data = {}
pools = self._get_pools()
return self._stats
def _get_clone_depth(self, client, volume_name, depth=0):
- """Returns the number of ancestral clones (if any) of the given volume.
- """
+ """Returns the number of ancestral clones of the given volume."""
parent_volume = self.rbd.Image(client.ioctx, volume_name)
try:
_pool, parent, _snap = self._get_clone_info(parent_volume,
self._set_rw_permissions(volume_path)
def _ensure_shares_mounted(self):
- """Look for remote shares in the flags and tries to mount them
- locally.
- """
+ """Look for remote shares in the flags and mount them locally."""
mounted_shares = []
self._load_shares_config(getattr(self.configuration,
pass
def delete_snapshot(self, snapshot):
- """Do nothing for this driver, but allow manager to handle deletion
- of snapshot in error state.
+ """Delete snapshot.
+
+ Do nothing for this driver, but allow manager to handle deletion
+ of snapshot in error state.
"""
pass
run_as_root=self._execute_as_root)
def local_path(self, volume):
- """Get volume path (mounted locally fs path) for given volume
+ """Get volume path (mounted locally fs path) for given volume.
+
:param volume: volume reference
"""
remotefs_share = volume['provider_location']
return output
def _get_hash_str(self, base_str):
- """Return a string that represents hash of base_str
- (in a hex format).
+ """Return a string that represents hash of base_str.
+
+ Returns string in a hex format.
"""
return hashlib.md5(base_str).hexdigest()
def _get_mount_point_for_share(self, share):
"""Return mount point for share.
+
:param share: example 172.18.194.100:/var/fs
"""
return self._remotefsclient.get_mount_point(share)
def _get_available_capacity(self, share):
"""Calculate available space on the share.
+
:param share: example 172.18.194.100:/var/fs
"""
mount_point = self._get_mount_point_for_share(share)
class SmbfsDriver(remotefs_drv.RemoteFSSnapDriver):
- """SMBFS based cinder volume driver.
- """
+ """SMBFS based cinder volume driver."""
driver_volume_type = 'smbfs'
driver_prefix = 'smbfs'
def local_path(self, volume):
"""Get volume path (mounted locally fs path) for given volume.
+
:param volume: volume reference
"""
volume_path_template = self._get_local_volume_path_template(volume)
@staticmethod
def _activate_lv(orig, *args, **kwargs):
- """Use with `patched` to patch `lvm.LVM.activate_lv` to ignore `EEXIST`
+ """Activate lv.
+
+ Use with `patched` to patch `lvm.LVM.activate_lv` to ignore `EEXIST`
"""
try:
orig(*args, **kwargs)
class V6000FCDriver(driver.FibreChannelDriver):
- """Executes commands relating to fibre channel based Violin Memory
- Arrays.
+ """Executes commands relating to fibre channel based Violin Memory Arrays.
Version history:
1.0 - Initial driver
return False
def _update_stats(self):
- """Gathers array stats from the backend and converts them to GB values.
+ """Update array stats.
+
+ Gathers array stats from the backend and converts them to GB values.
"""
data = {}
total_gb = 0
_('Failed to add igroup member: %(code)d, %(message)s') % resp)
def _update_stats(self):
- """Gathers array stats from the backend and converts them to GB values.
+ """Update array stats.
+
+ Gathers array stats from the backend and converts them to GB values.
"""
data = {}
total_gb = 0
return vhd_info
def get_vhd_size(self, vhd_path):
- """Returns a dict containing the virtual size, physical size,
+ """Return vhd size.
+
+ Returns a dict containing the virtual size, physical size,
block size and sector size of the vhd.
"""
size = self.get_vhd_info(vhd_path,
return {'provider_location': target_name}
def remove_export(self, context, volume):
- """Driver entry point to remove an export for a volume.
- """
+ """Driver entry point to remove an export for a volume."""
target_name = "%s%s" % (self.configuration.iscsi_target_prefix,
volume['name'])
def check_for_setup_error(self):
"""Check that the driver is working and can communicate.
+
Invokes the portal and checks that is listening ISCSI traffic.
"""
try:
class RestResult(object):
"""Result from a REST API operation"""
def __init__(self, response=None, err=None):
- """Initialize a RestResult containing the results from a REST call
+ """Initialize a RestResult containing the results from a REST call.
+
:param response: HTTP response
"""
self.response = response
self.rclient.logout()
def _is_pool_owned(self, pdata):
- """returns True if the pool's owner is the
- same as the host.
- """
+ """Returns True if the pool's owner is the same as the host."""
svc = '/api/system/v1/version'
ret = self.rclient.get(svc)
if ret.status != restclient.Status.OK:
self.rclient.login(auth_str)
def get_pool_stats(self, pool):
- """Get space available and total properties of a pool
- returns (avail, total).
+ """Get pool stats.
+
+ Get space available and total properties of a pool
+ returns (avail, total).
"""
svc = '/api/storage/v1/pools/' + pool
ret = self.rclient.get(svc)
return avail, total
def create_project(self, pool, project, compression=None, logbias=None):
- """Create a project on a pool
- Check first whether the pool exists.
+ """Create a project on a pool.
+
+ Check first whether the pool exists.
"""
self.verify_pool(pool)
svc = '/api/storage/v1/pools/' + pool + '/projects/' + project
def create_target(self, alias, interfaces=None, tchapuser=None,
tchapsecret=None):
"""Create an iSCSI target.
- interfaces: an array with network interfaces
- tchapuser, tchapsecret: target's chapuser and chapsecret
- returns target iqn
+
+ :param interfaces: an array with network interfaces
+ :param tchapuser, tchapsecret: target's chapuser and chapsecret
+ :returns: target iqn
"""
svc = '/api/san/v1/iscsi/targets/alias=' + alias
ret = self.rclient.get(svc)
raise exception.VolumeBackendAPIException(data=exception_msg)
def create_lun(self, pool, project, lun, volsize, targetgroup, specs):
-
"""Create a LUN.
- specs - contains volume properties (e.g blocksize, compression).
+
+ specs - contains volume properties (e.g blocksize, compression).
"""
svc = '/api/storage/v1/pools/' + pool + '/projects/' + \
project + '/luns'
class CxtAdm(iscsi.ISCSITarget):
"""Chiscsi target configuration for block storage devices.
+
This includes things like create targets, attach, detach
etc.
"""
@contextlib.contextmanager
def subcommand(self, srcpath, dstpath):
- """Throttle disk I/O bandwidth used by a sub-command, such as 'dd',
+ """Sub-command that reads from srcpath and writes to dstpath.
+
+ Throttle disk I/O bandwidth used by a sub-command, such as 'dd',
that reads from srcpath and writes to dstpath. The sub-command
must be executed with the generated prefix command.
"""
# H105 Don't use author tags
#
-ignore = E251,H405,H105
+ignore = E251,H105
exclude = .git,.venv,.tox,dist,tools,doc,common,*egg,build
max-complexity=30