class VolumeTypeExtraSpecsController(wsgi.Controller):
- """ The volume type extra specs API controller for the OpenStack API """
+ """The volume type extra specs API controller for the OpenStack API."""
def _get_extra_specs(self, context, type_id):
extra_specs = db.volume_type_extra_specs_get(context, type_id)
@wsgi.serializers(xml=VolumeTypeExtraSpecsTemplate)
def index(self, req, type_id):
- """ Returns the list of extra specs for a given volume type """
+ """Returns the list of extra specs for a given volume type."""
context = req.environ['cinder.context']
authorize(context)
self._check_type(context, type_id)
raise webob.exc.HTTPNotFound()
def delete(self, req, type_id, id):
- """ Deletes an existing extra spec """
+ """Deletes an existing extra spec."""
context = req.environ['cinder.context']
self._check_type(context, type_id)
authorize(context)
class VolumeTransferController(wsgi.Controller):
- """ The Volume Transfer API controller for the Openstack API."""
+ """The Volume Transfer API controller for the Openstack API."""
_view_builder_class = transfer_view.ViewBuilder
class Controller(object):
- """ The volume metadata API controller for the OpenStack API """
+ """The volume metadata API controller for the OpenStack API."""
def __init__(self):
self.volume_api = volume.API()
@wsgi.serializers(xml=common.MetadataTemplate)
def index(self, req, snapshot_id):
- """ Returns the list of metadata for a given snapshot"""
+ """Returns the list of metadata for a given snapshot."""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, snapshot_id)}
@wsgi.serializers(xml=common.MetaItemTemplate)
def show(self, req, snapshot_id, id):
- """ Return a single metadata item """
+ """Return a single metadata item."""
context = req.environ['cinder.context']
data = self._get_metadata(context, snapshot_id)
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, snapshot_id, id):
- """ Deletes an existing metadata """
+ """Deletes an existing metadata."""
context = req.environ['cinder.context']
metadata = self._get_metadata(context, snapshot_id)
class Controller(object):
- """ The volume metadata API controller for the OpenStack API """
+ """The volume metadata API controller for the OpenStack API."""
def __init__(self):
self.volume_api = volume.API()
@wsgi.serializers(xml=common.MetadataTemplate)
def index(self, req, volume_id):
- """ Returns the list of metadata for a given volume"""
+ """Returns the list of metadata for a given volume."""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, volume_id)}
@wsgi.serializers(xml=common.MetaItemTemplate)
def show(self, req, volume_id, id):
- """ Return a single metadata item """
+ """Return a single metadata item."""
context = req.environ['cinder.context']
data = self._get_metadata(context, volume_id)
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, volume_id, id):
- """ Deletes an existing metadata """
+ """Deletes an existing metadata."""
context = req.environ['cinder.context']
metadata = self._get_metadata(context, volume_id)
class Controller(object):
- """ The volume metadata API controller for the OpenStack API """
+ """The volume metadata API controller for the OpenStack API."""
def __init__(self):
self.volume_api = volume.API()
@wsgi.serializers(xml=common.MetadataTemplate)
def index(self, req, snapshot_id):
- """ Returns the list of metadata for a given snapshot"""
+ """Returns the list of metadata for a given snapshot."""
context = req.environ['cinder.context']
return {'metadata': self._get_metadata(context, snapshot_id)}
@wsgi.serializers(xml=common.MetaItemTemplate)
def show(self, req, snapshot_id, id):
- """ Return a single metadata item """
+ """Return a single metadata item."""
context = req.environ['cinder.context']
data = self._get_metadata(context, snapshot_id)
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, snapshot_id, id):
- """ Deletes an existing metadata """
+ """Deletes an existing metadata."""
context = req.environ['cinder.context']
metadata = self._get_metadata(context, snapshot_id)
"""
def setUp(self):
- """ Run before each test. """
+ """Run before each test."""
super(LimiterTest, self).setUp()
self.tiny = range(1)
self.small = range(10)
self.large = range(10000)
def test_limiter_offset_zero(self):
- """ Test offset key works with 0. """
+ """Test offset key works with 0."""
req = webob.Request.blank('/?offset=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_offset_medium(self):
- """ Test offset key works with a medium sized number. """
+ """Test offset key works with a medium sized number."""
req = webob.Request.blank('/?offset=10')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), self.small[10:])
self.assertEqual(common.limited(self.large, req), self.large[10:1010])
def test_limiter_offset_over_max(self):
- """ Test offset key works with a number over 1000 (max_limit). """
+ """Test offset key works with a number over 1000 (max_limit)."""
req = webob.Request.blank('/?offset=1001')
self.assertEqual(common.limited(self.tiny, req), [])
self.assertEqual(common.limited(self.small, req), [])
common.limited(self.large, req), self.large[1001:2001])
def test_limiter_offset_blank(self):
- """ Test offset key works with a blank offset. """
+ """Test offset key works with a blank offset."""
req = webob.Request.blank('/?offset=')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_offset_bad(self):
- """ Test offset key works with a BAD offset. """
+ """Test offset key works with a BAD offset."""
req = webob.Request.blank(u'/?offset=\u0020aa')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_nothing(self):
- """ Test request with no offset or limit """
+ """Test request with no offset or limit."""
req = webob.Request.blank('/')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_zero(self):
- """ Test limit of zero. """
+ """Test limit of zero."""
req = webob.Request.blank('/?limit=0')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_bad(self):
- """ Test with a bad limit. """
+ """Test with a bad limit."""
req = webob.Request.blank(u'/?limit=hello')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_limit_medium(self):
- """ Test limit of 10. """
+ """Test limit of 10."""
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
self.assertEqual(common.limited(self.large, req), self.large[:10])
def test_limiter_limit_over_max(self):
- """ Test limit of 3000. """
+ """Test limit of 3000."""
req = webob.Request.blank('/?limit=3000')
self.assertEqual(common.limited(self.tiny, req), self.tiny)
self.assertEqual(common.limited(self.small, req), self.small)
self.assertEqual(common.limited(self.large, req), self.large[:1000])
def test_limiter_limit_and_offset(self):
- """ Test request with both limit and offset. """
+ """Test request with both limit and offset."""
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(common.limited(items, req), items[1:4])
self.assertEqual(common.limited(items, req), [])
def test_limiter_custom_max_limit(self):
- """ Test a max_limit other than 1000. """
+ """Test a max_limit other than 1000."""
items = range(2000)
req = webob.Request.blank('/?offset=1&limit=3')
self.assertEqual(
self.assertEqual(common.limited(items, req, max_limit=2000), [])
def test_limiter_negative_limit(self):
- """ Test a negative limit. """
+ """Test a negative limit."""
req = webob.Request.blank('/?limit=-3000')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
def test_limiter_negative_offset(self):
- """ Test a negative offset. """
+ """Test a negative offset."""
req = webob.Request.blank('/?offset=-30')
self.assertRaises(
webob.exc.HTTPBadRequest, common.limited, self.tiny, req)
"""
def test_nonnumerical_limit(self):
- """ Test nonnumerical limit param. """
+ """Test nonnumerical limit param."""
req = webob.Request.blank('/?limit=hello')
self.assertRaises(
webob.exc.HTTPBadRequest, common.get_pagination_params, req)
def test_no_params(self):
- """ Test no params. """
+ """Test no params."""
req = webob.Request.blank('/')
self.assertEqual(common.get_pagination_params(req), {})
def test_valid_marker(self):
- """ Test valid marker param. """
+ """Test valid marker param."""
req = webob.Request.blank(
'/?marker=263abb28-1de6-412f-b00b-f0ee0c4333c2')
self.assertEqual(common.get_pagination_params(req),
{'marker': '263abb28-1de6-412f-b00b-f0ee0c4333c2'})
def test_valid_limit(self):
- """ Test valid limit param. """
+ """Test valid limit param."""
req = webob.Request.blank('/?limit=10')
self.assertEqual(common.get_pagination_params(req), {'limit': 10})
def test_invalid_limit(self):
- """ Test invalid limit param. """
+ """Test invalid limit param."""
req = webob.Request.blank('/?limit=-2')
self.assertRaises(
webob.exc.HTTPBadRequest, common.get_pagination_params, req)
def test_valid_limit_and_marker(self):
- """ Test valid limit and marker parameters. """
+ """Test valid limit and marker parameters."""
marker = '263abb28-1de6-412f-b00b-f0ee0c4333c2'
req = webob.Request.blank('/?limit=20&marker=%s' % marker)
self.assertEqual(common.get_pagination_params(req),
return 1
def _attach_volume(self):
- """Attach volumes to an instance. """
+ """Attach volumes to an instance."""
return []
def _detach_volume(self, volume_id_list):
driver_name = "cinder.volume.drivers.lvm.LVMISCSIDriver"
def _attach_volume(self):
- """Attach volumes to an instance. """
+ """Attach volumes to an instance."""
volume_id_list = []
for index in xrange(3):
vol = {}
def is_valid_boolstr(val):
- """Check if the provided string is a valid bool string or not. """
+ """Check if the provided string is a valid bool string or not."""
val = str(val).lower()
return (val == 'true' or val == 'false' or
val == 'yes' or val == 'no' or
def monkey_patch():
- """ If the CONF.monkey_patch set as True,
+ """If the CONF.monkey_patch set as True,
this function patches a decorator
for all functions in specified modules.
+
You can set decorators for each modules
using CONF.monkey_patch_modules.
The format is "Module path:Decorator function".
raise NotImplementedError()
def attach_volume(self, context, volume_id, instance_uuid, mountpoint):
- """ Callback for volume attached to instance."""
+ """Callback for volume attached to instance."""
pass
def detach_volume(self, context, volume_id):
- """ Callback for volume detached."""
+ """Callback for volume detached."""
pass
def get_volume_stats(self, refresh=False):
return capacity
def _cli_run(self, verb, cli_args):
- """ Runs a CLI command over SSH, without doing any result parsing. """
+ """Runs a CLI command over SSH, without doing any result parsing."""
cli_arg_strings = []
if cli_args:
for k, v in cli_args.items():
return showhost.split(',')[1]
def terminate_connection(self, volume, hostname, wwn_iqn):
- """ Driver entry point to unattach a volume from an instance."""
+ """Driver entry point to unattach a volume from an instance."""
try:
# does 3par know this host by a different name?
if hostname in self.hosts_naming_dict:
@utils.synchronized('3par', external=True)
def create_cloned_volume(self, volume, src_vref):
- """ Clone an existing volume. """
+ """Clone an existing volume."""
self.common.client_login()
new_vol = self.common.create_cloned_volume(volume, src_vref)
self.common.client_logout()
pass
def attach_volume(self, context, volume_id, instance_uuid, mountpoint):
- """ Callback for volume attached to instance."""
+ """Callback for volume attached to instance."""
pass
def detach_volume(self, context, volume_id):
- """ Callback for volume detached."""
+ """Callback for volume detached."""
pass
def get_volume_stats(self, refresh=False):
self.driver.ensure_export(context, volume_ref)
def _copy_image_to_volume(self, context, volume, image_service, image_id):
- """Downloads Glance image to the specified volume. """
+ """Downloads Glance image to the specified volume."""
volume_id = volume['id']
try:
self.driver.copy_image_to_volume(context, volume,
self.update_service_capabilities(volume_stats)
def publish_service_capabilities(self, context):
- """ Collect driver status and then publish """
+ """Collect driver status and then publish."""
self._report_driver_status(context)
self._publish_service_capabilities(context)
def notify_usage_exists(context, volume_ref, current_period=False):
- """ Generates 'exists' notification for a volume for usage auditing
- purposes.
+ """Generates 'exists' notification for a volume for usage auditing
+ purposes.
- Generates usage for last completed period, unless 'current_period'
- is True.
+ Generates usage for last completed period, unless 'current_period'
+ is True.
"""
begin, end = utils.last_completed_audit_period()
if current_period:
commands = {posargs}
[flake8]
-ignore = E711,E712,F401,F403,F811,F841,H302,H303,H304,H401,H402,H404
+ignore = E711,E712,F401,F403,F811,F841,H302,H303,H304,H402,H404
builtins = _
exclude = .venv,.tox,dist,doc,openstack,*egg,build