def bpython(self):
"""Runs a bpython shell.
- Falls back to Ipython/python shell if unavailable"""
+ Falls back to Ipython/python shell if unavailable
+ """
self.run('bpython')
def ipython(self):
"""Runs an Ipython shell.
- Falls back to Python shell if unavailable"""
+ Falls back to Python shell if unavailable
+ """
self.run('ipython')
def python(self):
"""Runs a python shell.
- Falls back to Python shell if unavailable"""
+ Falls back to Python shell if unavailable
+ """
self.run('python')
@args('--shell', dest="shell",
@args('--path', required=True, help='Script path')
def script(self, path):
"""Runs the script from the specifed path with flags set properly.
- arguments: path"""
+ arguments: path
+ """
exec(compile(open(path).read(), path, 'exec'), locals(), globals())
help='Availability Zone (default: %(default)s)')
def list(self, zone=None):
"""Show a list of all physical hosts. Filter by zone.
- args: [zone]"""
+ args: [zone]
+ """
print "%-25s\t%-15s" % (_('host'),
_('zone'))
ctxt = context.get_admin_context()
help='Volume ID to be deleted')
def delete(self, volume_id):
"""Delete a volume, bypassing the check that it
- must be available."""
+ must be available.
+ """
ctxt = context.get_admin_context()
volume = db.volume_get(ctxt, param2id(volume_id))
host = volume['host']
def reattach(self, volume_id):
"""Re-attach a volume that has previously been attached
to an instance. Typically called after a compute host
- has been rebooted."""
+ has been rebooted.
+ """
ctxt = context.get_admin_context()
volume = db.volume_get(ctxt, param2id(volume_id))
if not volume['instance_id']:
def list(self):
"""List all backups (including ones in progress) and the host
- on which the backup operation is running."""
+ on which the backup operation is running.
+ """
ctxt = context.get_admin_context()
backups = db.backup_get_all(ctxt)
def methods_of(obj):
"""Get all callable methods of an object that don't start with underscore
- returns a list of tuples of the form (method_name, method)"""
+ returns a list of tuples of the form (method_name, method)
+ """
result = []
for i in dir(obj):
if callable(getattr(obj, i)) and not i.startswith('_'):
def init_host(self):
"""Do any initialization that needs to be run if this is a
- standalone service."""
+ standalone service.
+ """
ctxt = context.get_admin_context()
self.driver.do_setup(ctxt)
def snapshot_get_active_by_window(context, begin, end=None, project_id=None):
"""Get all the snapshots inside the window.
- Specifying a project_id will filter for a certain project."""
+ Specifying a project_id will filter for a certain project.
+ """
return IMPL.snapshot_get_active_by_window(context, begin, end, project_id)
def volume_get_active_by_window(context, begin, end=None, project_id=None):
"""Get all the volumes inside the window.
- Specifying a project_id will filter for a certain project."""
+ Specifying a project_id will filter for a certain project.
+ """
return IMPL.volume_get_active_by_window(context, begin, end, project_id)
volume_type_id,
extra_specs):
"""Create or update volume type extra specs. This adds or modifies the
- key/value pairs specified in the extra specs dict argument"""
+ key/value pairs specified in the extra specs dict argument
+ """
IMPL.volume_type_extra_specs_update_or_create(context,
volume_type_id,
extra_specs)
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
- it state to the database services table."""
+ it state to the database services table.
+ """
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
"""
class HTTPConnectionDecorator(object):
"""Wraps the real HTTPConnection class so that when you instantiate
- the class you might instead get a fake instance."""
+ the class you might instead get a fake instance.
+ """
def __init__(self, wrapped):
self.wrapped = wrapped
"""
class HTTPConnectionDecorator(object):
"""Wraps the real HTTPConnection class so that when you instantiate
- the class you might instead get a fake instance."""
+ the class you might instead get a fake instance.
+ """
def __init__(self, wrapped):
self.wrapped = wrapped
'Test requires Cinder installed (try setup.py develop')
def test_create_volume_non_admin(self):
"""Test creating an instance locally using run_instance, passing
- a non-admin context. DB actions should work."""
+ a non-admin context. DB actions should work.
+ """
self.was_admin = False
def fake_get(context, *args, **kwargs):
'Test requires Cinder installed (try setup.py develop')
def test_schedule_happy_day(self):
"""Make sure there's nothing glaringly wrong with _schedule()
- by doing a happy day pass through."""
+ by doing a happy day pass through.
+ """
self.next_weight = 1.0
class SchedulerDriverBaseTestCase(SchedulerTestCase):
"""Test cases for base scheduler driver class methods
- that can't will fail if the driver is changed"""
+ that can't will fail if the driver is changed.
+ """
def test_unimplemented_schedule(self):
fake_args = (1, 2, 3)
def test_init_host(self):
"""Make sure stuck volumes and backups are reset to correct
- states when backup_manager.init_host() is called"""
+ states when backup_manager.init_host() is called
+ """
vol1_id = self._create_volume_db_entry(status='backing-up')
vol2_id = self._create_volume_db_entry(status='restoring-backup')
backup1_id = self._create_backup_db_entry(status='creating')
def test_create_backup_with_bad_volume_status(self):
"""Test error handling when creating a backup from a volume
- with a bad status"""
+ with a bad status
+ """
vol_id = self._create_volume_db_entry(status='available', size=1)
backup_id = self._create_backup_db_entry(volume_id=vol_id)
self.assertRaises(exception.InvalidVolume,
def test_create_backup_with_bad_backup_status(self):
"""Test error handling when creating a backup with a backup
- with a bad status"""
+ with a bad status
+ """
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='available',
volume_id=vol_id)
def test_restore_backup_with_bad_volume_status(self):
"""Test error handling when restoring a backup to a volume
- with a bad status"""
+ with a bad status
+ """
vol_id = self._create_volume_db_entry(status='available', size=1)
backup_id = self._create_backup_db_entry(volume_id=vol_id)
self.assertRaises(exception.InvalidVolume,
def test_restore_backup_with_bad_backup_status(self):
"""Test error handling when restoring a backup with a backup
- with a bad status"""
+ with a bad status
+ """
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
backup_id = self._create_backup_db_entry(status='available',
def test_restore_backup_with_bad_service(self):
"""Test error handling when attempting a restore of a backup
- with a different service to that used to create the backup"""
+ with a different service to that used to create the backup
+ """
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
backup_id = self._create_backup_db_entry(status='restoring',
def test_delete_backup_with_bad_backup_status(self):
"""Test error handling when deleting a backup with a backup
- with a bad status"""
+ with a bad status
+ """
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='available',
volume_id=vol_id)
def test_delete_backup_with_bad_service(self):
"""Test error handling when attempting a delete of a backup
- with a different service to that used to create the backup"""
+ with a different service to that used to create the backup
+ """
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='deleting',
volume_id=vol_id)
def test_delete_backup_with_no_service(self):
"""Test error handling when attempting a delete of a backup
- with no service defined for that backup, relates to bug #1162908"""
+ with no service defined for that backup, relates to bug #1162908
+ """
vol_id = self._create_volume_db_entry(size=1)
backup_id = self._create_backup_db_entry(status='deleting',
volume_id=vol_id)
def test_backup_get_all_by_project_with_deleted(self):
"""Test deleted backups don't show up in backup_get_all_by_project.
- Unless context.read_deleted is 'yes'"""
+ Unless context.read_deleted is 'yes'
+ """
backups = db.backup_get_all_by_project(self.ctxt, 'fake')
self.assertEqual(len(backups), 0)
def test_backup_get_all_by_host_with_deleted(self):
"""Test deleted backups don't show up in backup_get_all_by_project.
- Unless context.read_deleted is 'yes'"""
+ Unless context.read_deleted is 'yes'
+ """
backups = db.backup_get_all_by_host(self.ctxt, 'testhost')
self.assertEqual(len(backups), 0)
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
- as models will be far out of sync with the current data."""
+ as models will be far out of sync with the current data.
+ """
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
return sqlalchemy.Table(name, metadata, autoload=True)
def _clone_volume_from_image(self, expected_status,
clone_works=True):
"""Try to clone a volume from an image, and check the status
- afterwards"""
+ afterwards.
+ """
def fake_clone_image(volume, image_location):
return True
def _create_volume_from_image(self, expected_status,
fakeout_copy_image_to_volume=False):
"""Call copy image to volume, Test the status of volume after calling
- copying image to volume."""
+ copying image to volume.
+ """
def fake_local_path(volume):
return dst_path
def test_create_volume_from_image_status_available(self):
"""Verify that before copying image to volume, it is in available
- state."""
+ state.
+ """
self._create_volume_from_image('available')
def test_create_volume_from_image_exception(self):
"""Verify that create volume from image, the volume status is
- 'downloading'."""
+ 'downloading'.
+ """
dst_fd, dst_path = tempfile.mkstemp()
os.close(dst_fd)
def test_create_volume_from_exact_sized_image(self):
"""Verify that an image which is exactly the same size as the
- volume, will work correctly."""
+ volume, will work correctly.
+ """
class _FakeImageService:
def __init__(self, db_driver=None, image_service=None):
pass
def test_default_volume_type_missing_in_db(self):
"""Ensures proper exception raised if default volume type
- is not in database."""
+ is not in database.
+ """
session = db_api.get_session()
default_vol_type = volume_types.get_default_volume_type()
self.assertEqual(default_vol_type, {})
returns: 2 tuple of datetimes (begin, end)
The begin timestamp of this audit period is the same as the
- end of the previous."""
+ end of the previous.
+ """
if not unit:
unit = CONF.volume_usage_audit_period
def set_host_maintenance(self, context, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
- volume evacuation."""
+ volume evacuation.
+ """
raise NotImplementedError()
def __init__(self, volume_opts, config_group=None):
"""This takes care of grafting the implementation's config
- values into the config group"""
+ values into the config group
+ """
self.config_group = config_group
# set the local conf so that __call__'s know what to use
def create_volume(self, volume):
"""Creates a volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted."""
+ changes to the volume object to be persisted.
+ """
raise NotImplementedError()
def create_volume_from_snapshot(self, volume, snapshot):
def create_export(self, context, volume):
"""Exports the volume. Can optionally return a Dictionary of changes
- to the volume object to be persisted."""
+ to the volume object to be persisted.
+ """
raise NotImplementedError()
def remove_export(self, context, volume):
def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service. If 'refresh' is
- True, run the update first."""
+ True, run the update first.
+ """
return None
def do_setup(self, context):
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
class GlusterfsDriver(nfs.RemoteFsDriver):
"""Gluster based cinder driver. Creates file on Gluster share for using it
- as block device on hypervisor."""
+ as block device on hypervisor.
+ """
def __init__(self, *args, **kwargs):
super(GlusterfsDriver, self).__init__(*args, **kwargs)
def create_export(self, ctx, volume):
"""Exports the volume. Can optionally return a Dictionary of changes
- to the volume object to be persisted."""
+ to the volume object to be persisted.
+ """
pass
def remove_export(self, ctx, volume):
def _ensure_shares_mounted(self):
"""Look for GlusterFS shares in the flags and try to mount them
- locally."""
+ locally.
+ """
self._mounted_shares = []
self._load_shares_config(self.configuration.glusterfs_shares_config)
def get_volume_stats(self, refresh=False):
"""Get volume stats.
- If 'refresh' is True, update the stats first."""
+ If 'refresh' is True, update the stats first.
+ """
if refresh or not self._stats:
self._update_volume_stats()
def create_volume(self, volume):
"""Creates a logical volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted."""
+ changes to the volume object to be persisted.
+ """
self._create_volume(volume['name'], self._sizestr(volume['size']))
def create_volume_from_snapshot(self, volume, snapshot):
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
def create_volume(self, volume):
"""Creates a logical volume. Can optionally return a Dictionary of
- changes to the volume object to be persisted."""
+ changes to the volume object to be persisted.
+ """
sizestr = self._sizestr(volume['size'])
vg_name = ("%s/%s-pool" % (self.configuration.volume_group,
self.configuration.volume_group))
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
def add_new_child(self, name, content, convert=False):
"""Add child with tag name and context.
- Convert replaces entity refs to chars."""
+ Convert replaces entity refs to chars.
+ """
child = NaElement(name)
if convert:
content = NaElement._convert_entity_refs(content)
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
def _check_dfm_flags(self):
"""Raises error if any required configuration flag for OnCommand proxy
- is missing."""
+ is missing.
+ """
required_flags = ['netapp_wsdl_url',
'netapp_login',
'netapp_password',
def _check_flags(self):
"""Raises error if any required configuration flag for NetApp Cloud
- Webservices is missing."""
+ Webservices is missing.
+ """
required_flags = ['netapp_wsdl_url',
'netapp_login',
'netapp_password',
def _check_flags(self):
"""Raises error if any required configuration flag for NetApp
- filer is missing."""
+ filer is missing.
+ """
required_flags = ['netapp_login',
'netapp_password',
'netapp_server_hostname',
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh:
self._update_volume_status()
def delete_snapshot(self, snapshot):
"""Do nothing for this driver, but allow manager to handle deletion
- of snapshot in error state."""
+ of snapshot in error state.
+ """
pass
def ensure_export(self, ctx, volume):
def _create_regular_file(self, path, size):
"""Creates regular file of given size. Takes a lot of time for large
- files."""
+ files.
+ """
block_size_mb = 1
block_count = size * units.GiB / (block_size_mb * units.MiB)
def _get_hash_str(self, base_str):
"""returns string that represents hash of base_str
- (in a hex format)."""
+ (in a hex format).
+ """
return hashlib.md5(base_str).hexdigest()
def copy_image_to_volume(self, context, volume, image_service, image_id):
class NfsDriver(RemoteFsDriver):
"""NFS based cinder driver. Creates file on NFS share for using it
- as block device on hypervisor."""
+ as block device on hypervisor.
+ """
def __init__(self, *args, **kwargs):
super(NfsDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(volume_opts)
def create_export(self, ctx, volume):
"""Exports the volume. Can optionally return a Dictionary of changes
- to the volume object to be persisted."""
+ to the volume object to be persisted.
+ """
pass
def remove_export(self, ctx, volume):
def get_volume_stats(self, refresh=False):
"""Get volume status.
- If 'refresh' is True, run update the stats first."""
+ If 'refresh' is True, run update the stats first.
+ """
if refresh or not self._stats:
self._update_volume_status()
def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service. If 'refresh' is
- True, run the update first."""
+ True, run the update first.
+ """
if refresh:
self._update_volume_stats()
return self._stats
"""Get volume status.
If we haven't gotten stats yet or 'refresh' is True,
- run update the stats first."""
+ run update the stats first.
+ """
if not self._stats or refresh:
self._update_volume_status()
def init_host(self):
"""Do any initialization that needs to be run if this is a
- standalone service."""
+ standalone service.
+ """
ctxt = context.get_admin_context()
self.driver.do_setup(ctxt)
purposes.
Generates usage for last completed period, unless 'current_period'
- is True."""
+ is True.
+ """
begin, end = utils.last_completed_audit_period()
if current_period:
audit_start = end
commands = {posargs}
[flake8]
-ignore = E711,E712,F401,F403,F811,F841,H302,H303,H304,H401,H402,H403,H404
+ignore = E711,E712,F401,F403,F811,F841,H302,H303,H304,H401,H402,H404
builtins = _
exclude = .venv,.tox,dist,doc,openstack,*egg