Summary docstrings should end with punctuation.
Also changed it to command-style in a few places.
Change-Id: Id94fe995aa05356106ad09899b0ada27d608ff21
def _get_limit_param(request):
- """Extract integer limit from request or fail"""
+ """Extract integer limit from request or fail."""
try:
limit = int(request.GET['limit'])
except ValueError:
def _get_marker_param(request):
- """Extract marker id from request or fail"""
+ """Extract marker id from request or fail."""
return request.GET['marker']
class MetadataXMLDeserializer(wsgi.XMLDeserializer):
def extract_metadata(self, metadata_node):
- """Marshal the metadata attribute of a parsed request"""
+ """Marshal the metadata attribute of a parsed request."""
if metadata_node is None:
return {}
metadata = {}
class Availability_zones(extensions.ExtensionDescriptor):
- """Describe Availability Zones"""
+ """Describe Availability Zones."""
name = 'AvailabilityZones'
alias = 'os-availability-zone'
class Hosts(extensions.ExtensionDescriptor):
- """Admin-only host administration"""
+ """Admin-only host administration."""
name = "Hosts"
alias = "os-hosts"
class Image_create(extensions.ExtensionDescriptor):
- """Allow creating a volume from an image in the Create Volume v1 API"""
+ """Allow creating a volume from an image in the Create Volume v1 API."""
name = "CreateVolumeExtension"
alias = "os-image-create"
class Qos_specs_manage(extensions.ExtensionDescriptor):
- """QoS specs support"""
+ """QoS specs support."""
name = "Qos_specs_manage"
alias = "qos-specs"
class QuotaClassSetsController(wsgi.Controller):
def _format_quota_set(self, quota_class, quota_set):
- """Convert the quota object to a result dict"""
+ """Convert the quota object to a result dict."""
quota_set['id'] = str(quota_class)
class Quota_classes(extensions.ExtensionDescriptor):
- """Quota classes management support"""
+ """Quota classes management support."""
name = "QuotaClasses"
alias = "os-quota-class-sets"
class QuotaSetsController(wsgi.Controller):
def _format_quota_set(self, project_id, quota_set):
- """Convert the quota object to a result dict"""
+ """Convert the quota object to a result dict."""
quota_set['id'] = str(project_id)
class Quotas(extensions.ExtensionDescriptor):
- """Quotas management support"""
+ """Quota management support."""
name = "Quotas"
alias = "os-quota-sets"
@wsgi.serializers(xml=ServicesUpdateTemplate)
def update(self, req, id, body):
- """Enable/Disable scheduling for a service"""
+ """Enable/Disable scheduling for a service."""
context = req.environ['cinder.context']
authorize(context)
class Services(extensions.ExtensionDescriptor):
- """Services support"""
+ """Services support."""
name = "Services"
alias = "os-services"
class Types_extra_specs(extensions.ExtensionDescriptor):
- """Types extra specs support"""
+ """Type extra specs support."""
name = "TypesExtraSpecs"
alias = "os-types-extra-specs"
class VolumeEncryptionMetadataController(wsgi.Controller):
- """The volume encryption metadata API extension"""
+ """The volume encryption metadata API extension."""
def _get_volume_encryption_metadata(self, context, volume_id):
return db.volume_encryption_metadata_get(context, volume_id)
class Volume_image_metadata(extensions.ExtensionDescriptor):
- """Show image metadata associated with the volume"""
+ """Show image metadata associated with the volume."""
name = "VolumeImageMetadata"
alias = "os-vol-image-meta"
@wsgi.serializers(xml=TransfersTemplate)
def index(self, req):
- """Returns a summary list of transfers"""
+ """Returns a summary list of transfers."""
return self._get_transfers(req, is_detail=False)
@wsgi.serializers(xml=TransfersTemplate)
class Volume_transfer(extensions.ExtensionDescriptor):
- """Volume transfer management support"""
+ """Volume transfer management support."""
name = "VolumeTransfer"
alias = "os-volume-transfer"
class CinderKeystoneContext(base_wsgi.Middleware):
- """Make a request context from keystone headers"""
+ """Make a request context from keystone headers."""
@webob.dec.wsgify(RequestClass=base_wsgi.Request)
def __call__(self, req):
@classmethod
def factory(cls, global_config, **local_config):
- """Simple paste factory, :class:`cinder.wsgi.Router` doesn't have"""
+ """Simple paste factory, :class:`cinder.wsgi.Router` doesn't have."""
return cls()
def __init__(self, ext_mgr=None):
class TextDeserializer(ActionDispatcher):
- """Default request body deserialization"""
+ """Default request body deserialization."""
def deserialize(self, datastring, action='default'):
return self.dispatch(datastring, action=action)
return None
def find_first_child_named(self, parent, name):
- """Search a nodes children for the first child with a given name"""
+ """Search a nodes children for the first child with a given name."""
for node in parent.childNodes:
if node.nodeName == name:
return node
return None
def find_children_named(self, parent, name):
- """Return all of a nodes children who have the given name"""
+ """Return all of a nodes children who have the given name."""
for node in parent.childNodes:
if node.nodeName == name:
yield node
def extract_text(self, node):
- """Get the text field contained by the given node"""
+ """Get the text field contained by the given node."""
if len(node.childNodes) == 1:
child = node.childNodes[0]
if child.nodeType == child.TEXT_NODE:
return ""
def find_attribute_or_element(self, parent, name):
- """Get an attribute value; fallback to an element if not found"""
+ """Get an attribute value; fallback to an element if not found."""
if parent.hasAttribute(name):
return parent.getAttribute(name)
class MetadataXMLDeserializer(XMLDeserializer):
def extract_metadata(self, metadata_node):
- """Marshal the metadata attribute of a parsed request"""
+ """Marshal the metadata attribute of a parsed request."""
metadata = {}
if metadata_node is not None:
for meta_node in self.find_children_named(metadata_node, "meta"):
class DictSerializer(ActionDispatcher):
- """Default request body serialization"""
+ """Default request body serialization."""
def serialize(self, data, action='default'):
return self.dispatch(data, action=action)
class JSONDictSerializer(DictSerializer):
- """Default JSON request body serialization"""
+ """Default JSON request body serialization."""
def default(self, data):
return jsonutils.dumps(data)
}
def _get_attachments(self, volume):
- """Retrieves the attachments of the volume object"""
+ """Retrieve the attachments of the volume object."""
attachments = []
if volume['attach_status'] == 'attached':
return attachments
def _get_volume_metadata(self, volume):
- """Retrieves the metadata of the volume object"""
+ """Retrieve the metadata of the volume object."""
if volume.get('volume_metadata'):
metadata = volume.get('volume_metadata')
return dict((item['key'], item['value']) for item in metadata)
return {}
def _get_volume_type(self, volume):
- """Retrieves the type the volume object is"""
+ """Retrieve the type the volume object."""
if volume['volume_type_id'] and volume.get('volume_type'):
return volume['volume_type']['name']
else:
class ViewBuilder(cinder.api.common.ViewBuilder):
- """Map cinder.volumes.api list_availability_zones response into dicts"""
+ """Map cinder.volumes.api list_availability_zones response into dicts."""
def list(self, request, availability_zones):
def fmt(az):
return trimmed if brief else dict(volume_type=trimmed)
def index(self, request, volume_types):
- """Index over trimmed volume types"""
+ """Index over trimmed volume types."""
volume_types_list = [self.show(request, volume_type, True)
for volume_type in volume_types]
return dict(volume_types=volume_types_list)
pass
def getAttrib(self, obj):
- """Get attribute"""
+ """Get attribute."""
tmpattrib = {}
#Now set up all the attributes...
for key, value in self.attrib.items():
class BackupDriver(base.Base):
def backup(self, backup, volume_file):
- """Starts a backup of a specified volume"""
+ """Start a backup of a specified volume."""
raise NotImplementedError()
def restore(self, backup, volume_id, volume_file):
- """Restores a saved backup"""
+ """Restore a saved backup."""
raise NotImplementedError()
def delete(self, backup):
- """Deletes a saved backup"""
+ """Delete a saved backup."""
raise NotImplementedError()
return metadata
def _prepare_backup(self, backup):
- """Prepare the backup process and return the backup metadata"""
+ """Prepare the backup process and return the backup metadata."""
backup_id = backup['id']
volume_id = backup['volume_id']
volume = self.db.volume_get(self.context, volume_id)
return object_meta, container
def _backup_chunk(self, backup, container, data, data_offset, object_meta):
- """Backup data chunk based on the object metadata and offset"""
+ """Backup data chunk based on the object metadata and offset."""
object_prefix = object_meta['prefix']
object_list = object_meta['list']
object_id = object_meta['id']
eventlet.sleep(0)
def _finalize_backup(self, backup, container, object_meta):
- """Finalize the backup by updating its metadata on Swift"""
+ """Finalize the backup by updating its metadata on Swift."""
object_list = object_meta['list']
object_id = object_meta['id']
try:
class NoMoreTargets(exception.CinderException):
- """No more available targets"""
+ """No more available targets."""
pass
@require_context
def volume_type_get(context, id, inactive=False):
- """Returns a dict describing specific volume_type"""
+ """Return a dict describing specific volume_type."""
return _volume_type_get(context, id, None, inactive)
@require_context
def volume_type_get_by_name(context, name):
- """Returns a dict describing specific volume_type"""
+ """Return a dict describing specific volume_type."""
return _volume_type_get_by_name(context, name)
def convert_image(source, dest, out_format):
- """Convert image to other format"""
+ """Convert image to other format."""
cmd = ('qemu-img', 'convert', '-O', out_format, source, dest)
utils.execute(*cmd, run_as_root=True)
self.updated = capability['timestamp']
def consume_from_volume(self, volume):
- """Incrementally update host state from an volume"""
+ """Incrementally update host state from an volume."""
volume_gb = volume['size']
if self.free_capacity_gb == 'infinite':
# There's virtually infinite space on back-end
def get_filtered_hosts(self, hosts, filter_properties,
filter_class_names=None):
- """Filter hosts and return only ones passing all filters"""
+ """Filter hosts and return only ones passing all filters."""
filter_classes = self._choose_host_filters(filter_class_names)
return self.filter_handler.get_filtered_objects(filter_classes,
hosts,
def get_weighed_hosts(self, hosts, weight_properties,
weigher_class_names=None):
- """Weigh the hosts"""
+ """Weigh the hosts."""
weigher_classes = self._choose_host_weighers(weigher_class_names)
return self.weight_handler.get_weighed_objects(weigher_classes,
hosts,
})
def assertGreater(self, first, second, msg=None):
- """Python < v2.7 compatibility. Assert 'first' > 'second'"""
+ """Python < v2.7 compatibility. Assert 'first' > 'second'."""
try:
f = super(TestCase, self).assertGreater
except AttributeError:
f(first, second, msg=msg)
def assertGreaterEqual(self, first, second, msg=None):
- """Python < v2.7 compatibility. Assert 'first' >= 'second'"""
+ """Python < v2.7 compatibility. Assert 'first' >= 'second'."""
try:
f = super(TestCase, self).assertGreaterEqual
except AttributeError:
class Foxinsocks(extensions.ExtensionDescriptor):
- """The Fox In Socks Extension"""
+ """The Fox In Socks Extension."""
name = "Fox In Socks"
alias = "FOXNSOX"
self.assertIn("Entrada invalida: El valor es invalido", resp.body)
def test_fault_has_status_int(self):
- """Ensure the status_int is set correctly on faults"""
+ """Ensure the status_int is set correctly on faults."""
fault = wsgi.Fault(webob.exc.HTTPBadRequest(explanation='what?'))
self.assertEqual(fault.status_int, 400)
def test_xml_serializer(self):
- """Ensure that a v1.1 request responds with a v1 xmlns"""
+ """Ensure that a v1.1 request responds with a v1 xmlns."""
request = webob.Request.blank('/v1',
headers={"Accept": "application/xml"})
fox_ext, {'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
'name': 'Fox In Socks',
'updated': '2011-01-22T13:25:27-06:00',
- 'description': 'The Fox In Socks Extension',
+ 'description': 'The Fox In Socks Extension.',
'alias': 'FOXNSOX',
'links': []}, )
{"namespace": "http://www.fox.in.socks/api/ext/pie/v1.0",
"name": "Fox In Socks",
"updated": "2011-01-22T13:25:27-06:00",
- "description": "The Fox In Socks Extension",
+ "description": "The Fox In Socks Extension.",
"alias": "FOXNSOX",
"links": []})
self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(
fox_ext.findtext('{0}description'.format(NS)),
- 'The Fox In Socks Extension')
+ 'The Fox In Socks Extension.')
xmlutil.validate_schema(root, 'extensions')
self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(
root.findtext('{0}description'.format(NS)),
- 'The Fox In Socks Extension')
+ 'The Fox In Socks Extension.')
xmlutil.validate_schema(root, 'extension')
self.assertEqual(value, expected)
def test_limited_request_xml(self):
- """Test a rate-limited (413) response as XML"""
+ """Test a rate-limited (413) response as XML."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
self.assertEqual(value, expected)
def test_limited_request_xml(self):
- """Test a rate-limited (413) response as XML"""
+ """Test a rate-limited (413) response as XML."""
request = webob.Request.blank("/")
response = request.get_response(self.app)
self.assertEqual(200, response.status_int)
class FakeSwiftConnection(object):
- """Logging calls instead of executing"""
+ """Logging calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
class HostManagerTestCase(test.TestCase):
- """Test case for HostManager class"""
+ """Test case for HostManager class."""
def setUp(self):
super(HostManagerTestCase, self).setUp()
class HostStateTestCase(test.TestCase):
- """Test case for HostState class"""
+ """Test case for HostState class."""
def test_update_from_volume_capability(self):
fake_host = host_manager.HostState('host1')
backup_id)
def test_create_backup_with_error(self):
- """Test error handling when an error occurs during backup creation"""
+ """Test error handling when error occurs during backup creation."""
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(volume_id=vol_id)
self.assertEqual(backup['status'], 'error')
def test_create_backup(self):
- """Test normal backup creation"""
+ """Test normal backup creation."""
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size)
backup_id = self._create_backup_db_entry(volume_id=vol_id)
def test_restore_backup_with_bad_volume_status(self):
"""Test error handling when restoring a backup to a volume
- with a bad status
+ with a bad status.
"""
vol_id = self._create_volume_db_entry(status='available', size=1)
backup_id = self._create_backup_db_entry(volume_id=vol_id)
def test_restore_backup_with_bad_backup_status(self):
"""Test error handling when restoring a backup with a backup
- with a bad status
+ with a bad status.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
self.assertEqual(backup['status'], 'error')
def test_restore_backup_with_driver_error(self):
- """Test error handling when an error occurs during backup restore"""
+ """Test error handling when an error occurs during backup restore."""
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
backup_id = self._create_backup_db_entry(status='restoring',
def test_restore_backup_with_bad_service(self):
"""Test error handling when attempting a restore of a backup
- with a different service to that used to create the backup
+ with a different service to that used to create the backup.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
self.assertEqual(backup['status'], 'available')
def test_restore_backup(self):
- """Test normal backup restoration"""
+ """Test normal backup restoration."""
vol_size = 1
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=vol_size)
def test_delete_backup_with_bad_backup_status(self):
"""Test error handling when deleting a backup with a backup
- with a bad status
+ with a bad status.
"""
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='available',
def test_delete_backup_with_bad_service(self):
"""Test error handling when attempting a delete of a backup
- with a different service to that used to create the backup
+ with a different service to that used to create the backup.
"""
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='deleting',
self.backup_mgr.delete_backup(self.ctxt, backup_id)
def test_delete_backup(self):
- """Test normal backup deletion"""
+ """Test normal backup deletion."""
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='deleting',
volume_id=vol_id)
def test_backup_get_all_by_project_with_deleted(self):
"""Test deleted backups don't show up in backup_get_all_by_project.
- Unless context.read_deleted is 'yes'
+ Unless context.read_deleted is 'yes'.
"""
backups = db.backup_get_all_by_project(self.ctxt, 'fake')
self.assertEqual(len(backups), 0)
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-""" Tests for Ceph backup service """
+""" Tests for Ceph backup service."""
import fcntl
import hashlib
class BackupCephTestCase(test.TestCase):
- """Test Case for backup to Ceph object store"""
+ """Test Case for backup to Ceph object store."""
def _create_volume_db_entry(self, id, size):
vol = {'id': id, 'size': size, 'status': 'available'}
super(GPFSDriverTestCase, self).tearDown()
def test_create_delete_volume_full_backing_file(self):
- """create and delete vol with full creation method"""
+ """Create and delete vol with full creation method."""
CONF.gpfs_sparse_volumes = False
vol = test_utils.create_volume(self.context, host=CONF.host)
volume_id = vol['id']
self.assertFalse(os.path.exists(path))
def test_create_delete_volume_sparse_backing_file(self):
- """create and delete vol with default sparse creation method"""
+ """Create and delete vol with default sparse creation method."""
CONF.gpfs_sparse_volumes = True
vol = test_utils.create_volume(self.context, host=CONF.host)
volume_id = vol['id']
self.assertEqual(0, len(snapshots.c.volume_id.foreign_keys))
def test_migration_008(self):
- """Test that adding and removing the backups table works correctly"""
+ """Test that adding and removing the backups table works correctly."""
for (key, engine) in self.engines.items():
migration_api.version_control(engine,
TestMigrations.REPOSITORY,
class FakeHttplibSocket(object):
- """A fake socket implementation for httplib.HTTPResponse"""
+ """A fake socket implementation for httplib.HTTPResponse."""
def __init__(self, value):
self._rbuffer = StringIO.StringIO(value)
self._wbuffer = StringIO.StringIO('')
class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):
- """HTTP handler that fakes enough stuff to allow the driver to run"""
+ """HTTP handler that fakes enough stuff to allow the driver to run."""
def do_GET(s):
"""Respond to a GET request."""
@staticmethod
def _get_child_content(self, name):
- """Get the content of the child"""
+ """Get the content of the child."""
for child in self.iterchildren():
if child.tag == name or etree.QName(child.tag).localname == name:
return child.text
class FakeDirect7MODEServerHandler(FakeHTTPRequestHandler):
- """HTTP handler that fakes enough stuff to allow the driver to run"""
+ """HTTP handler that fakes enough stuff to allow the driver to run."""
def do_GET(s):
"""Respond to a GET request."""
mox.VerifyAll()
def test_get_volume_stats(self):
- """get_volume_stats must fill the correct values"""
+ """get_volume_stats must fill the correct values."""
mox = self._mox
drv = self._driver
class FakeManager(manager.Manager):
- """Fake manager for tests"""
+ """Fake manager for tests."""
def __init__(self, host=None,
db_driver=None, service_name=None):
super(FakeManager, self).__init__(host=host,
class ServiceManagerTestCase(test.TestCase):
- """Test cases for Services"""
+ """Test cases for Services."""
def test_message_gets_to_manager(self):
serv = service.Service('test',
class ServiceTestCase(test.TestCase):
- """Test cases for Services"""
+ """Test cases for Services."""
def setUp(self):
super(ServiceTestCase, self).setUp()
self.volume.delete_volume(self.context, volume['id'])
def test_cannot_force_delete_attached_volume(self):
- """Test volume can't be force delete in attached state"""
+ """Test volume can't be force delete in attached state."""
volume = tests_utils.create_volume(self.context, **self.volume_params)
self.volume.create_volume(self.context, volume['id'])
volume['status'] = 'in-use'
def total_seconds(td):
- """Local total_seconds implementation for compatibility with python 2.6"""
+ """Local total_seconds implementation for compatibility with python 2.6."""
if hasattr(td, 'total_seconds'):
return td.total_seconds()
else:
def walk_class_hierarchy(clazz, encountered=None):
- """Walk class hierarchy, yielding most derived classes first"""
+ """Walk class hierarchy, yielding most derived classes first."""
if not encountered:
encountered = []
for subclass in clazz.__subclasses__():
return None
def do_setup(self, context):
- """Any initialization the volume driver does while starting"""
+ """Any initialization the volume driver does while starting."""
pass
def validate_connector(self, connector):
- """Fail if connector doesn't contain all the data needed by driver"""
+ """Fail if connector doesn't contain all the data needed by driver."""
pass
def _copy_volume_data_cleanup(self, context, volume, properties,
pass
def _get_iscsi_initiator(self):
- """Get iscsi initiator name for this machine"""
+ """Get iscsi initiator name for this machine."""
# NOTE openiscsi stores initiator name in a file that
# needs root permission to read.
contents = utils.read_file_as_root('/etc/iscsi/initiatorname.iscsi')
pass
def extend_volume(self, volume, new_size):
- """Extend the size of the volume"""
+ """Extend the size of the volume."""
try:
self._eql_execute('volume', 'select', volume['name'],
'size', "%sG" % new_size)
lunparam['INITIALDISTRIBUTEPOLICY'] = "2"
def _init_lun_parameters(self, name, parameters):
- """Init basic LUN parameters """
+ """Init basic LUN parameters."""
lunparam = {"TYPE": "11",
"NAME": name,
"PARENTTYPE": "216",
raise NotImplementedError()
def _check_share_in_use(self, conn, dir):
- """Checks if share is cinder mounted and returns it. """
+ """Checks if share is cinder mounted and returns it."""
try:
if conn:
host = conn.split(':')[0]
self._remotefsclient.set_execute(execute)
def do_setup(self, context):
- """Any initialization the volume driver does while starting"""
+ """Any initialization the volume driver does while starting."""
super(NfsDriver, self).do_setup(context)
config = self.configuration.nfs_shares_config
self.cluster_vip = None
def _cliq_run(self, verb, cliq_args, check_exit_code=True):
- """Runs a CLIQ command over SSH, without doing any result parsing"""
+ """Runs a CLIQ command over SSH, without doing any result parsing."""
cmd_list = [verb]
for k, v in cliq_args.items():
cmd_list.append("%s=%s" % (k, v))
return self._run_ssh(cmd_list, check_exit_code)
def _cliq_run_xml(self, verb, cliq_args, check_cliq_result=True):
- """Runs a CLIQ command over SSH, parsing and checking the output"""
+ """Runs a CLIQ command over SSH, parsing and checking the output."""
cliq_args['output'] = 'XML'
(out, _err) = self._cliq_run(verb, cliq_args, check_cliq_result)
return result_xml
def _cliq_get_cluster_info(self, cluster_name):
- """Queries for info about the cluster (including IP)"""
+ """Queries for info about the cluster (including IP)."""
cliq_args = {}
cliq_args['clusterName'] = cluster_name
cliq_args['searchDepth'] = '1'
return result_xml
def _cliq_get_cluster_vip(self, cluster_name):
- """Gets the IP on which a cluster shares iSCSI volumes"""
+ """Gets the IP on which a cluster shares iSCSI volumes."""
cluster_xml = self._cliq_get_cluster_info(cluster_name)
vips = []
raise exception.VolumeBackendAPIException(data=msg)
def _cliq_get_volume_info(self, volume_name):
- """Gets the volume info, including IQN"""
+ """Gets the volume info, including IQN."""
cliq_args = {}
cliq_args['volumeName'] = volume_name
result_xml = self._cliq_run_xml("getVolumeInfo", cliq_args)
return volume_attributes
def _cliq_get_snapshot_info(self, snapshot_name):
- """Gets the snapshot info, including IQN"""
+ """Gets the snapshot info, including IQN."""
cliq_args = {}
cliq_args['snapshotName'] = snapshot_name
result_xml = self._cliq_run_xml("getSnapshotInfo", cliq_args)
self._execute('/usr/sbin/sbdadm', 'delete-lu', luid)
def _collect_lines(self, data):
- """Split lines from data into an array, trimming them """
+ """Split lines from data into an array, trimming them."""
matches = []
for line in data.splitlines():
match = line.strip()
return matches
def _get_prefixed_values(self, data, prefix):
- """Collect lines which start with prefix; with trimming"""
+ """Collect lines which start with prefix; with trimming."""
matches = []
for line in data.splitlines():
line = line.strip()
class SheepdogDriver(driver.VolumeDriver):
- """Executes commands relating to Sheepdog Volumes"""
+ """Executes commands relating to Sheepdog Volumes."""
VERSION = "1.0.0"
self._stats = {}
def check_for_setup_error(self):
- """Returns an error if prerequisites aren't met"""
+ """Return error if prerequisites aren't met."""
try:
#NOTE(francois-charlier) Since 0.24 'collie cluster info -r'
# gives short output, but for compatibility reason we won't
raise NotImplementedError()
def create_volume(self, volume):
- """Creates a sheepdog volume"""
+ """Create a sheepdog volume."""
self._try_execute('qemu-img', 'create',
"sheepdog:%s" % volume['name'],
'%sG' % volume['size'])
def create_volume_from_snapshot(self, volume, snapshot):
- """Creates a sheepdog volume from a snapshot."""
+ """Create a sheepdog volume from a snapshot."""
self._try_execute('qemu-img', 'create', '-b',
"sheepdog:%s:%s" % (snapshot['volume_name'],
snapshot['name']),
"sheepdog:%s" % volume['name'])
def delete_volume(self, volume):
- """Deletes a logical volume"""
+ """Delete a logical volume."""
self._delete(volume)
def _ensure_dir_exists(self, tmp_dir):
self._resize(volume)
def create_snapshot(self, snapshot):
- """Creates a sheepdog snapshot"""
+ """Create a sheepdog snapshot."""
self._try_execute('qemu-img', 'snapshot', '-c', snapshot['name'],
"sheepdog:%s" % snapshot['volume_name'])
def delete_snapshot(self, snapshot):
- """Deletes a sheepdog snapshot"""
+ """Delete a sheepdog snapshot."""
self._try_execute('collie', 'vdi', 'delete', snapshot['volume_name'],
'-s', snapshot['name'])
return "sheepdog:%s" % volume['name']
def ensure_export(self, context, volume):
- """Safely and synchronously recreates an export for a logical volume"""
+ """Safely and synchronously recreate an export for a logical volume."""
pass
def create_export(self, context, volume):
- """Exports the volume"""
+ """Export a volume."""
pass
def remove_export(self, context, volume):
- """Removes an export for a logical volume"""
+ """Remove an export for a logical volume."""
pass
def initialize_connection(self, volume, connector):
class CLIResponse(object):
- '''Parse SVC CLI output and generate iterable'''
+ '''Parse SVC CLI output and generate iterable.'''
def __init__(self, raw, delim='!', with_header=True):
super(CLIResponse, self).__init__()
VMwareHTTPFile.__init__(self, conn)
def read(self, chunk_size):
- """Read a chunk from file"""
+ """Read a chunk from file."""
self._progress += READ_CHUNKSIZE
LOG.debug(_("Read %s bytes from vmdk.") % self._progress)
return self.file_handle.read(READ_CHUNKSIZE)
raise exception.VolumeBackendAPIException(data=err_msg)
def create_volume(self, vhd_path, vol_name, vol_size):
- """Creates a volume"""
+ """Creates a volume."""
try:
cl = self._conn_wmi.__getattr__("WT_Disk")
cl.NewWTDisk(DevicePath=vhd_path,
raise exception.VolumeBackendAPIException(data=err_msg)
def add_disk_to_target(self, vol_name, target_name):
- """Adds the disk to the target"""
+ """Adds the disk to the target."""
try:
q = self._conn_wmi.WT_Disk(Description=vol_name)
wt_disk = q[0]
pass
def check_for_setup_error(self):
- """To override superclass' method"""
+ """To override superclass' method."""
def create_volume_from_snapshot(self, volume, snapshot):
return self._copy_volume(
class ExtractVolumeRefTask(base.CinderTask):
- """Extracts volume reference for given volume id. """
+ """Extracts volume reference for given volume id."""
default_provides = 'volume_ref'
def _copy_image_to_volume(self, context, volume_ref,
image_id, image_location, image_service):
- """Downloads Glance image to the specified volume. """
+ """Downloads Glance image to the specified volume."""
copy_image_to_volume = self.driver.copy_image_to_volume
volume_id = volume_ref['id']
LOG.debug(_("Attempting download of %(image_id)s (%(image_location)s)"
@utils.require_driver_initialized
def attach_volume(self, context, volume_id, instance_uuid, host_name,
mountpoint, mode):
- """Updates db to show volume is attached"""
+ """Updates db to show volume is attached."""
@utils.synchronized(volume_id, external=True)
def do_attach():
# check the volume status before attaching
@utils.require_driver_initialized
def detach_volume(self, context, volume_id):
- """Updates db to show volume is detached"""
+ """Updates db to show volume is detached."""
# TODO(vish): refactor this into a more general "unreserve"
# TODO(sleepsonthefloor): Is this 'elevated' appropriate?
commands = {posargs}
[flake8]
-ignore = E711,E712,F401,F403,F841,H302,H303,H304,H402,H803
+ignore = E711,E712,F401,F403,F841,H302,H303,H304,H803
builtins = _
exclude = .git,.venv,.tox,dist,doc,common,*egg,build