@wsgi.action('os-force_detach')
def _force_detach(self, req, id, body):
- """
- Roll back a bad detach after the volume been disconnected from
- the hypervisor.
- """
+ """Roll back a bad detach after the volume been disconnected."""
context = req.environ['cinder.context']
self.authorize(context, 'force_detach')
try:
class ServiceController(object):
@wsgi.serializers(xml=ServicesIndexTemplate)
def index(self, req):
- """
- Return a list of all running services. Filter by host & service name.
+ """Return a list of all running services.
+
+ Filter by host & service name.
"""
context = req.environ['cinder.context']
authorize(context)
class LimitingReader(object):
"""Reader to limit the size of an incoming request."""
def __init__(self, data, limit):
- """
+ """Initialize LimitingReader.
+
:param data: Underlying data object
:param limit: maximum number of bytes the reader should allow
"""
class APIRouter(base_wsgi.Router):
- """
- Routes requests on the OpenStack API to the appropriate controller
- and method.
- """
+ """Routes requests on the API to the appropriate controller and method."""
ExtensionManager = None # override in subclasses
@classmethod
class XMLDeserializer(TextDeserializer):
def __init__(self, metadata=None):
- """
+ """Initialize XMLDeserializer.
+
:param metadata: information needed to deserialize xml into
a dictionary.
"""
class XMLDictSerializer(DictSerializer):
def __init__(self, metadata=None, xmlns=None):
- """
+ """Initialize XMLDictSerializer.
+
:param metadata: information needed to deserialize xml into
a dictionary.
:param xmlns: XML namespace to include with serialized xml
"""
def __init__(self, controller, action_peek=None, **deserializers):
- """
+ """Initialize Resource.
+
:param controller: object that implement methods created by routes lib
:param action_peek: dictionary of routines for peeking into an action
request body to determine the desired action
class OverLimitFault(webob.exc.HTTPException):
- """
- Rate-limited request response.
- """
+ """Rate-limited request response."""
def __init__(self, message, details, retry_time):
- """
- Initialize new `OverLimitFault` with relevant information.
- """
+ """Initialize new `OverLimitFault` with relevant information."""
hdrs = OverLimitFault._retry_after(retry_time)
self.wrapped_exc = webob.exc.HTTPRequestEntityTooLarge(headers=hdrs)
self.content = {
class LimitsController(object):
- """
- Controller for accessing limits in the OpenStack API.
- """
+ """Controller for accessing limits in the OpenStack API."""
@wsgi.serializers(xml=LimitsTemplate)
def index(self, req):
- """
- Return all global and rate limit information.
- """
+ """Return all global and rate limit information."""
context = req.environ['cinder.context']
quotas = QUOTAS.get_project_quotas(context, context.project_id,
usages=False)
class Limit(object):
- """
- Stores information about a limit for HTTP requests.
- """
+ """Stores information about a limit for HTTP requests."""
UNITS = {
1: "SECOND",
UNIT_MAP = dict([(v, k) for k, v in UNITS.items()])
def __init__(self, verb, uri, regex, value, unit):
- """
- Initialize a new `Limit`.
+ """Initialize a new `Limit`.
@param verb: HTTP verb (POST, PUT, etc.)
@param uri: Human-readable URI
self.error_message = msg % self.__dict__
def __call__(self, verb, url):
- """
- Represents a call to this limit from a relevant request.
+ """Represent a call to this limit from a relevant request.
@param verb: string http verb (POST, GET, etc.)
@param url: string URL
class RateLimitingMiddleware(base_wsgi.Middleware):
- """
- Rate-limits requests passing through this middleware. All limit information
- is stored in memory for this implementation.
+ """Rate-limits requests passing through this middleware.
+
+ All limit information is stored in memory for this implementation.
"""
def __init__(self, application, limits=None, limiter=None, **kwargs):
- """
- Initialize new `RateLimitingMiddleware`, which wraps the given WSGI
- application and sets up the given limits.
+ """Initialize new `RateLimitingMiddleware`
+
+ This wraps the given WSGI application and sets up the given limits.
@param application: WSGI application to wrap
@param limits: String describing limits
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
- """
- Represents a single call through this middleware. We should record the
- request if we have a limit relevant to it. If no limit is relevant to
- the request, ignore it.
+ """Represent a single call through this middleware.
+
+ We should record the request if we have a limit relevant to it.
+ If no limit is relevant to the request, ignore it.
If the request should be rate limited, return a fault telling the user
they are over the limit and need to retry later.
class Limiter(object):
- """
- Rate-limit checking class which handles limits in memory.
- """
+ """Rate-limit checking class which handles limits in memory."""
def __init__(self, limits, **kwargs):
- """
- Initialize the new `Limiter`.
+ """Initialize the new `Limiter`.
@param limits: List of `Limit` objects
"""
self.levels[username] = self.parse_limits(value)
def get_limits(self, username=None):
- """
- Return the limits for a given user.
- """
+ """Return the limits for a given user."""
return [limit.display() for limit in self.levels[username]]
def check_for_delay(self, verb, url, username=None):
- """
- Check the given verb/user/user triplet for limit.
+ """Check the given verb/user/user triplet for limit.
@return: Tuple of delay (in seconds) and error message (or None, None)
"""
# default limit parsing.
@staticmethod
def parse_limits(limits):
- """
- Convert a string into a list of Limit instances. This
- implementation expects a semicolon-separated sequence of
+ """Convert a string into a list of Limit instances.
+
+ This implementation expects a semicolon-separated sequence of
parenthesized groups, where each group contains a
comma-separated sequence consisting of HTTP method,
user-readable URI, a URI reg-exp, an integer number of
class WsgiLimiter(object):
- """
- Rate-limit checking from a WSGI application. Uses an in-memory `Limiter`.
+ """Rate-limit checking from a WSGI application.
+
+ Uses an in-memory `Limiter`.
To use, POST ``/<username>`` with JSON data such as::
"""
def __init__(self, limits=None):
- """
- Initialize the new `WsgiLimiter`.
+ """Initialize the new `WsgiLimiter`.
@param limits: List of `Limit` objects
"""
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, request):
- """
- Handles a call to this application. Returns 204 if the request is
- acceptable to the limiter, else a 403 is returned with a relevant
- header indicating when the request *will* succeed.
+ """Handles a call to this application.
+
+ Returns 204 if the request is acceptable to the limiter, else a 403
+ is returned with a relevant header indicating when the request
+ *will* succeed.
"""
if request.method != "POST":
raise webob.exc.HTTPMethodNotAllowed()
class WsgiLimiterProxy(object):
- """
- Rate-limit requests based on answers from a remote source.
- """
+ """Rate-limit requests based on answers from a remote source."""
def __init__(self, limiter_address):
- """
- Initialize the new `WsgiLimiterProxy`.
+ """Initialize the new `WsgiLimiterProxy`.
@param limiter_address: IP/port combination of where to request limit
"""
# decisions are made by a remote server.
@staticmethod
def parse_limits(limits):
- """
- Ignore a limits string--simply doesn't apply for the limit
- proxy.
+ """Ignore a limits string--simply doesn't apply for the limit proxy.
@return: Empty list.
"""
class APIRouter(cinder.api.openstack.APIRouter):
- """
- Routes requests on the OpenStack API to the appropriate controller
- and method.
- """
+ """Routes requests on the API to the appropriate controller and method."""
ExtensionManager = extensions.ExtensionManager
def _setup_routes(self, mapper, ext_mgr):
class LimitsController(object):
- """
- Controller for accessing limits in the OpenStack API.
- """
+ """Controller for accessing limits in the OpenStack API."""
@wsgi.serializers(xml=LimitsTemplate)
def index(self, req):
- """
- Return all global and rate limit information.
- """
+ """Return all global and rate limit information."""
context = req.environ['cinder.context']
quotas = QUOTAS.get_project_quotas(context, context.project_id,
usages=False)
class Limit(object):
- """
- Stores information about a limit for HTTP requests.
- """
+ """Stores information about a limit for HTTP requests."""
UNITS = {
1: "SECOND",
UNIT_MAP = dict([(v, k) for k, v in UNITS.items()])
def __init__(self, verb, uri, regex, value, unit):
- """
- Initialize a new `Limit`.
+ """Initialize a new `Limit`.
@param verb: HTTP verb (POST, PUT, etc.)
@param uri: Human-readable URI
self.error_message = msg % self.__dict__
def __call__(self, verb, url):
- """
- Represents a call to this limit from a relevant request.
+ """Represent a call to this limit from a relevant request.
@param verb: string http verb (POST, GET, etc.)
@param url: string URL
class RateLimitingMiddleware(base_wsgi.Middleware):
- """
- Rate-limits requests passing through this middleware. All limit information
- is stored in memory for this implementation.
+ """Rate-limits requests passing through this middleware.
+
+ All limit information is stored in memory for this implementation.
"""
def __init__(self, application, limits=None, limiter=None, **kwargs):
- """
- Initialize new `RateLimitingMiddleware`, which wraps the given WSGI
+ """Initialize new `RateLimitingMiddleware`, which wraps the given WSGI
application and sets up the given limits.
@param application: WSGI application to wrap
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
- """
- Represents a single call through this middleware. We should record the
- request if we have a limit relevant to it. If no limit is relevant to
- the request, ignore it.
+ """Represents a single call through this middleware.
- If the request should be rate limited, return a fault telling the user
- they are over the limit and need to retry later.
+ We should record the request if we have a limit relevant to it.
+ If no limit is relevant to the request, ignore it. If the request
+ should be rate limited, return a fault telling the user they are
+ over the limit and need to retry later.
"""
verb = req.method
url = req.url
class Limiter(object):
- """
- Rate-limit checking class which handles limits in memory.
- """
+ """Rate-limit checking class which handles limits in memory."""
def __init__(self, limits, **kwargs):
- """
- Initialize the new `Limiter`.
+ """Initialize the new `Limiter`.
@param limits: List of `Limit` objects
"""
self.levels[username] = self.parse_limits(value)
def get_limits(self, username=None):
- """
- Return the limits for a given user.
- """
+ """Return the limits for a given user."""
return [limit.display() for limit in self.levels[username]]
def check_for_delay(self, verb, url, username=None):
- """
- Check the given verb/user/user triplet for limit.
+ """Check the given verb/user/user triplet for limit.
@return: Tuple of delay (in seconds) and error message (or None, None)
"""
# default limit parsing.
@staticmethod
def parse_limits(limits):
- """
- Convert a string into a list of Limit instances. This
- implementation expects a semicolon-separated sequence of
+ """Convert a string into a list of Limit instances.
+
+ This implementation expects a semicolon-separated sequence of
parenthesized groups, where each group contains a
comma-separated sequence consisting of HTTP method,
user-readable URI, a URI reg-exp, an integer number of
class WsgiLimiter(object):
- """
- Rate-limit checking from a WSGI application. Uses an in-memory `Limiter`.
+ """Rate-limit checking from a WSGI application.
+
+ Uses an in-memory `Limiter`.
To use, POST ``/<username>`` with JSON data such as::
"""
def __init__(self, limits=None):
- """
- Initialize the new `WsgiLimiter`.
+ """Initialize the new `WsgiLimiter`.
@param limits: List of `Limit` objects
"""
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, request):
- """
- Handles a call to this application. Returns 204 if the request is
- acceptable to the limiter, else a 403 is returned with a relevant
- header indicating when the request *will* succeed.
+ """Handles a call to this application.
+
+ Returns 204 if the request is acceptable to the limiter, else a 403
+ is returned with a relevant header indicating when the request
+ *will* succeed.
"""
if request.method != "POST":
raise webob.exc.HTTPMethodNotAllowed()
class WsgiLimiterProxy(object):
- """
- Rate-limit requests based on answers from a remote source.
- """
+ """Rate-limit requests based on answers from a remote source."""
def __init__(self, limiter_address):
- """
- Initialize the new `WsgiLimiterProxy`.
+ """Initialize the new `WsgiLimiterProxy`.
@param limiter_address: IP/port combination of where to request limit
"""
# decisions are made by a remote server.
@staticmethod
def parse_limits(limits):
- """
- Ignore a limits string--simply doesn't apply for the limit
- proxy.
+ """Ignore a limits string--simply doesn't apply for the limit proxy.
@return: Empty list.
"""
class APIRouter(cinder.api.openstack.APIRouter):
- """
- Routes requests on the OpenStack API to the appropriate controller
- and method.
- """
+ """Routes requests on the API to the appropriate controller and method."""
ExtensionManager = extensions.ExtensionManager
def _setup_routes(self, mapper, ext_mgr):
class ViewBuilder(object):
def __init__(self, base_url):
- """
+ """Initialize ViewBuilder.
+
:param base_url: url of the root wsgi application
"""
self.base_url = base_url
def make_links(parent, selector=None):
- """
- Attach an Atom <links> element to the parent.
- """
+ """Attach an Atom <links> element to the parent."""
elem = SubTemplateElement(parent, '{%s}link' % XMLNS_ATOM,
selector=selector)
def make_flat_dict(name, selector=None, subselector=None, ns=None):
- """
- Utility for simple XML templates that traditionally used
- XMLDictSerializer with no metadata. Returns a template element
- where the top-level element has the given tag name, and where
- sub-elements have tag names derived from the object's keys and
- text derived from the object's values. This only works for flat
- dictionary objects, not dictionaries containing nested lists or
- dictionaries.
+ """Utility for simple XML templates.
+
+ Simple templates are templates that traditionally used
+ XMLDictSerializer with no metadata.
+
+ Returns a template element where the top-level element has the
+ given tag name, and where sub-elements have tag names derived
+ from the object's keys and text derived from the object's values.
+
+ This only works for flat dictionary objects, not dictionaries
+ containing nested lists or dictionaries.
"""
# Set up the names we need...
return backups
def _is_backup_service_enabled(self, volume, volume_host):
- """Check if there is an backup service available"""
+ """Check if there is a backup service available."""
topic = CONF.backup_topic
ctxt = context.get_admin_context()
services = self.db.service_get_all_by_topic(ctxt, topic)
return hashlib.md5(base_str).hexdigest()
def get_mount_point(self, device_name):
- """
+ """Get Mount Point.
+
:param device_name: example 172.18.194.100:/var/nfs
"""
return os.path.join(self._mount_base,
timestamp=None, request_id=None, auth_token=None,
overwrite=True, quota_class=None, service_catalog=None,
**kwargs):
- """
+ """Initialize RequestContext.
+
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
indicates deleted records are visible, 'only' indicates that
*only* deleted records are visible.
def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id):
- """
- Update the Glance metadata for a snapshot by copying all of the key:value
- pairs from the originating volume. This is so that a volume created from
- the snapshot will retain the original metadata.
+ """Update the Glance metadata for a snapshot.
+
+ This will copy all of the key:value pairs from the originating volume,
+ to ensure that a volume created from the snapshot will retain the
+ original metadata.
"""
return IMPL.volume_glance_metadata_copy_to_snapshot(context, snapshot_id,
volume_id)
def volume_glance_metadata_copy_to_volume(context, volume_id, snapshot_id):
- """
- Update the Glance metadata from a volume (created from a snapshot) by
- copying all of the key:value pairs from the originating snapshot. This is
- so that the Glance metadata from the original volume is retained.
+ """Update the Glance metadata from a volume (created from a snapshot).
+
+ This will copy all of the key:value pairs from the originating snapshot,
+ to ensure that the Glance metadata from the original volume is retained.
"""
return IMPL.volume_glance_metadata_copy_to_volume(context, volume_id,
snapshot_id)
def volume_glance_metadata_copy_from_volume_to_volume(context,
src_volume_id,
volume_id):
- """
- Update the Glance metadata for a volume by copying all of the key:value
- pairs from the originating volume. This is so that a volume created from
- the volume (clone) will retain the original metadata.
+ """Update the Glance metadata for a volume by copying all of the key:value
+ pairs from the originating volume.
+
+ This is so that a volume created from the volume (clone) will retain the
+ original metadata.
"""
return IMPL.volume_glance_metadata_copy_from_volume_to_volume(
context,
def backup_update(context, backup_id, values):
- """
- Set the given properties on a backup and update it.
+ """Set the given properties on a backup and update it.
Raises NotFound if backup does not exist.
"""
@require_context
def volume_type_get_all(context, inactive=False, filters=None):
- """
- Returns a dict describing all volume_types with name as key.
- """
+ """Returns a dict describing all volume_types with name as key."""
filters = filters or {}
read_deleted = "yes" if inactive else "no"
@require_context
@require_volume_exists
def volume_glance_metadata_create(context, volume_id, key, value):
- """
- Update the Glance metadata for a volume by adding a new key:value pair.
+ """Update the Glance metadata for a volume by adding a new key:value pair.
+
This API does not support changing the value of a key once it has been
created.
"""
@require_context
@require_snapshot_exists
def volume_glance_metadata_copy_to_snapshot(context, snapshot_id, volume_id):
- """
- Update the Glance metadata for a snapshot by copying all of the key:value
- pairs from the originating volume. This is so that a volume created from
- the snapshot will retain the original metadata.
+ """Update the Glance metadata for a snapshot.
+
+ This copies all of the key:value pairs from the originating volume, to
+ ensure that a volume created from the snapshot will retain the
+ original metadata.
"""
session = get_session()
def volume_glance_metadata_copy_from_volume_to_volume(context,
src_volume_id,
volume_id):
- """
- Update the Glance metadata for a volume by copying all of the key:value
- pairs from the originating volume. This is so that a volume created from
- the volume (clone) will retain the original metadata.
+ """Update the Glance metadata for a volume.
+
+ This copies all all of the key:value pairs from the originating volume,
+ to ensure that a volume created from the volume (clone) will
+ retain the original metadata.
"""
session = get_session()
@require_context
@require_volume_exists
def volume_glance_metadata_copy_to_volume(context, volume_id, snapshot_id):
- """
- Update the Glance metadata from a volume (created from a snapshot) by
- copying all of the key:value pairs from the originating snapshot. This is
- so that the Glance metadata from the original volume is retained.
+ """Update the Glance metadata from a volume (created from a snapshot) by
+ copying all of the key:value pairs from the originating snapshot.
+
+ This is so that the Glance metadata from the original volume is retained.
"""
session = get_session()
class ConfKeyManager(key_mgr.KeyManager):
- """
+ """Key Manager that supports one key defined by the fixed_key conf option.
+
This key manager implementation supports all the methods specified by the
key manager interface. This implementation creates a single key in response
to all invocations of create_key. Side effects (e.g., raising exceptions)
class SymmetricKey(Key):
- """
- This class represents symmetric keys
- """
+ """This class represents symmetric keys."""
def __init__(self, alg, key):
"""Create a new SymmetricKey object.
class DbQuotaDriver(object):
- """
- Driver to perform necessary checks to enforce quotas and obtain
- quota information. The default driver utilizes the local
- database.
+
+ """Driver to perform check to enforcement of quotas.
+
+ Also allows to obtain quota information.
+ The default driver utilizes the local database.
"""
def get_by_project(self, context, project_id, resource_name):
def get_class_quotas(self, context, resources, quota_class,
defaults=True):
- """
- Given a list of resources, retrieve the quotas for the given
- quota class.
+ """Given list of resources, retrieve the quotas for given quota class.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resources.
def get_project_quotas(self, context, resources, project_id,
quota_class=None, defaults=True,
usages=True):
- """
- Given a list of resources, retrieve the quotas for the given
+ """Given a list of resources, retrieve the quotas for the given
project.
:param context: The request context, for access checks.
return quotas
def _get_quotas(self, context, resources, keys, has_sync, project_id=None):
- """
- A helper method which retrieves the quotas for the specific
- resources identified by keys, and which apply to the current
- context.
+ """A helper method which retrieves the quotas for specific resources.
+
+ This specific resource is identified by keys, and which apply to the
+ current context.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resources.
db.reservation_rollback(context, reservations, project_id=project_id)
def destroy_all_by_project(self, context, project_id):
- """
- Destroy all quotas, usages, and reservations associated with a
- project.
+ """Destroy all that is associated with a project.
+
+ This includes quotas, usages and reservations.
:param context: The request context, for access checks.
:param project_id: The ID of the project being deleted.
"""Describe a single resource for quota checking."""
def __init__(self, name, flag=None):
- """
- Initializes a Resource.
+ """Initializes a Resource.
:param name: The name of the resource, i.e., "volumes".
:param flag: The name of the flag or configuration option
self.flag = flag
def quota(self, driver, context, **kwargs):
- """
- Given a driver and context, obtain the quota for this
- resource.
+ """Given a driver and context, obtain the quota for this resource.
:param driver: A quota driver.
:param context: The request context.
class CountableResource(AbsoluteResource):
- """
- Describe a resource where the counts aren't based solely on the
- project ID.
- """
+ """Describe a resource where counts aren't based only on the project ID."""
def __init__(self, name, count, flag=None):
"""Initializes a CountableResource.
"""ReservableResource for a specific volume type."""
def __init__(self, part_name, volume_type):
- """
- Initializes a VolumeTypeResource.
+ """Initializes a VolumeTypeResource.
:param part_name: The kind of resource, i.e., "volumes".
:param volume_type: The volume type for this resource.
"%s") % reservations)
def destroy_all_by_project(self, context, project_id):
- """
- Destroy all quotas, usages, and reservations associated with a
+ """Destroy all quotas, usages, and reservations associated with a
project.
:param context: The request context, for access checks.
class SchedulerOptions(object):
- """
- SchedulerOptions monitors a local .json file for changes and loads it
- if needed. This file is converted to a data structure and passed into
- the filtering and weighing functions which can use it for dynamic
- configuration.
+ """SchedulerOptions monitors a local .json file for changes.
+
+ The file is reloaded if needed and converted to a data structure and
+ passed into the filtering and weighing functions which can use it
+ for dynamic configuration.
"""
def __init__(self):
class LimiterTest(test.TestCase):
- """
- Unit tests for the `cinder.api.common.limited` method which takes
- in a list of items and, depending on the 'offset' and 'limit' GET params,
- returns a subset or complete set of the given items.
+ """Unit tests for the `cinder.api.common.limited` method.
+
+ This method takes in a list of items and, depending on the 'offset'
+ and 'limit' GET params, returns a subset or complete set of the given
+ items.
"""
def setUp(self):
class PaginationParamsTest(test.TestCase):
- """
- Unit tests for the `cinder.api.common.get_pagination_params`
- method which takes in a request object and returns 'marker' and 'limit'
+ """Unit tests for `cinder.api.common.get_pagination_params` method.
+
+ This method takes in a request object and returns 'marker' and 'limit'
GET params.
"""
class LimitsControllerTest(BaseLimitTestSuite):
- """
- Tests for `limits.LimitsController` class.
- """
+ """Tests for `limits.LimitsController` class."""
def setUp(self):
"""Run before each test."""
class LimitMiddlewareTest(BaseLimitTestSuite):
- """
- Tests for the `limits.RateLimitingMiddleware` class.
- """
+ """Tests for the `limits.RateLimitingMiddleware` class."""
@webob.dec.wsgify
def _empty_app(self, request):
class LimitTest(BaseLimitTestSuite):
- """
- Tests for the `limits.Limit` class.
- """
+ """Tests for the `limits.Limit` class."""
def test_GET_no_delay(self):
"""Test a limit handles 1 GET per second."""
class ParseLimitsTest(BaseLimitTestSuite):
- """
- Tests for the default limits parser in the in-memory
- `limits.Limiter` class.
- """
+ """Tests for the default limits parser in the `limits.Limiter` class."""
def test_invalid(self):
"""Test that parse_limits() handles invalid input correctly."""
class LimiterTest(BaseLimitTestSuite):
- """
- Tests for the in-memory `limits.Limiter` class.
- """
+ """Tests for the in-memory `limits.Limiter` class."""
def setUp(self):
"""Run before each test."""
return sum(item for item in results if item)
def test_no_delay_GET(self):
- """
- Simple test to ensure no delay on a single call for a limit verb we
- didn"t set.
- """
+ """no delay on a single call for a limit verb we didn"t set."""
delay = self.limiter.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
def test_no_delay_PUT(self):
- """
- Simple test to ensure no delay on a single call for a known limit.
- """
+ """no delay on a single call for a known limit."""
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual(delay, (None, None))
def test_delay_PUT(self):
- """
- Ensure the 11th PUT will result in a delay of 6.0 seconds until
+ """test delay on 11th put request.
+
+ the 11th PUT will result in a delay of 6.0 seconds until
the next request will be granced.
"""
expected = [None] * 10 + [6.0]
self.assertEqual(expected, results)
def test_delay_POST(self):
- """
- Ensure the 8th POST will result in a delay of 6.0 seconds until
- the next request will be granced.
+ """test delay of 8th post request.
+
+ Ensure that the 8th POST will result in a delay of 6.0 seconds
+ until the next request will be granced.
"""
expected = [None] * 7
results = list(self._check(7, "POST", "/anything"))
self.failUnlessAlmostEqual(expected, results, 8)
def test_delay_GET(self):
- """
- Ensure the 11th GET will result in NO delay.
- """
+ """Ensure the 11th GET will result in NO delay."""
expected = [None] * 11
results = list(self._check(11, "GET", "/anything"))
self.assertEqual(expected, results)
self.assertEqual(expected, results)
def test_delay_PUT_volumes(self):
- """
- Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere is still
- OK after 5 requests...but then after 11 total requests, PUT limiting
- kicks in.
+ """Test limit of PUT on /volumes.
+
+ Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere is
+ still OK after 5 requests...
+ but then after 11 total requests, PUT limiting kicks in.
"""
# First 6 requests on PUT /volumes
expected = [None] * 5 + [12.0]
self.assertEqual(expected, results)
def test_delay_PUT_wait(self):
- """
+ """Test limit on PUT is lifted.
+
Ensure after hitting the limit and then waiting for the correct
amount of time, the limit will be lifted.
"""
self.assertEqual(expected, results)
def test_multiple_delays(self):
- """
- Ensure multiple requests still get a delay.
- """
+ """Ensure multiple requests still get a delay."""
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
self.assertEqual(expected, results)
def test_user_limit(self):
- """
- Test user-specific limits.
- """
+ """Test user-specific limits."""
self.assertEqual(self.limiter.levels['user3'], [])
self.assertEqual(len(self.limiter.levels['user0']), 2)
def test_multiple_users(self):
- """
- Tests involving multiple users.
- """
+ """Tests involving multiple users."""
# User0
expected = [None] * 2 + [30.0] * 8
class WsgiLimiterTest(BaseLimitTestSuite):
- """
- Tests for `limits.WsgiLimiter` class.
- """
+ """Tests for `limits.WsgiLimiter` class."""
def setUp(self):
"""Run before each test."""
return jsonutils.dumps({"verb": verb, "path": path})
def _request(self, verb, url, username=None):
- """Make sure that POSTing to the given url causes the given username
- to perform the given action. Make the internal rate limiter return
- delay and make sure that the WSGI app returns the correct response.
+ """Assert that POSTing to given url triggers given action.
+
+ Ensure POSTing to the given url causes the given username
+ to perform the given action.
+
+ Make the internal rate limiter return delay and make sure that the
+ WSGI app returns the correct response.
"""
if username:
request = webob.Request.blank("/%s" % username)
class FakeHttplibSocket(object):
- """
- Fake `httplib.HTTPResponse` replacement.
- """
+ """Fake `httplib.HTTPResponse` replacement."""
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
class FakeHttplibConnection(object):
- """
- Fake `httplib.HTTPConnection`.
- """
+ """Fake `httplib.HTTPConnection`."""
def __init__(self, app, host):
- """
- Initialize `FakeHttplibConnection`.
- """
+ """Initialize `FakeHttplibConnection`."""
self.app = app
self.host = host
def request(self, method, path, body="", headers=None):
- """
- Requests made via this connection actually get translated and routed
- into our WSGI app, we then wait for the response and turn it back into
- an `httplib.HTTPResponse`.
+ """Fake method for request.
+
+ Requests made via this connection actually get translated and
+ routed into our WSGI app, we then wait for the response and turn
+ it back into an `httplib.HTTPResponse`.
"""
if not headers:
headers = {}
class WsgiLimiterProxyTest(BaseLimitTestSuite):
- """
- Tests for the `limits.WsgiLimiterProxy` class.
- """
+ """Tests for the `limits.WsgiLimiterProxy` class."""
def setUp(self):
- """
+ """setUp for test suite.
+
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
directly by something like the `httplib` library.
"""
class SnapshotsUnprocessableEntityTestCase(test.TestCase):
- """
- Tests of places we throw 422 Unprocessable Entity from
- """
+ """Tests of places we throw 422 Unprocessable Entity."""
def setUp(self):
super(SnapshotsUnprocessableEntityTestCase, self).setUp()
class VolumesUnprocessableEntityTestCase(test.TestCase):
- """
- Tests of places we throw 422 Unprocessable Entity from
- """
+ """Tests of places we throw 422 Unprocessable Entity from."""
def setUp(self):
super(VolumesUnprocessableEntityTestCase, self).setUp()
class LimitsControllerTest(BaseLimitTestSuite):
- """
- Tests for `limits.LimitsController` class.
- """
+
+ """Tests for `limits.LimitsController` class."""
def setUp(self):
"""Run before each test."""
class LimitMiddlewareTest(BaseLimitTestSuite):
- """
- Tests for the `limits.RateLimitingMiddleware` class.
- """
+
+ """Tests for the `limits.RateLimitingMiddleware` class."""
@webob.dec.wsgify
def _empty_app(self, request):
class LimitTest(BaseLimitTestSuite):
- """
- Tests for the `limits.Limit` class.
- """
+
+ """Tests for the `limits.Limit` class."""
def test_GET_no_delay(self):
"""Test a limit handles 1 GET per second."""
class ParseLimitsTest(BaseLimitTestSuite):
- """
- Tests for the default limits parser in the in-memory
- `limits.Limiter` class.
- """
+
+ """Tests for the default limits parser in the `limits.Limiter` class."""
def test_invalid(self):
"""Test that parse_limits() handles invalid input correctly."""
class LimiterTest(BaseLimitTestSuite):
- """
- Tests for the in-memory `limits.Limiter` class.
- """
+
+ """Tests for the in-memory `limits.Limiter` class."""
def setUp(self):
"""Run before each test."""
return sum(item for item in results if item)
def test_no_delay_GET(self):
- """
- Simple test to ensure no delay on a single call for a limit verb we
- didn"t set.
- """
+ """Ensure no delay on a single call for a limit verb we didn't set."""
delay = self.limiter.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
def test_no_delay_PUT(self):
- """
- Simple test to ensure no delay on a single call for a known limit.
- """
+ """Ensure no delay on a single call for a known limit."""
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual(delay, (None, None))
def test_delay_PUT(self):
- """
+ """Test delay on 11th PUT request.
+
Ensure the 11th PUT will result in a delay of 6.0 seconds until
the next request will be granced.
"""
self.assertEqual(expected, results)
def test_delay_POST(self):
- """
+ """Test delay on 8th POST request.
+
Ensure the 8th POST will result in a delay of 6.0 seconds until
the next request will be granced.
"""
self.failUnlessAlmostEqual(expected, results, 8)
def test_delay_GET(self):
- """
- Ensure the 11th GET will result in NO delay.
- """
+ """Ensure the 11th GET will result in NO delay."""
expected = [None] * 11
results = list(self._check(11, "GET", "/anything"))
self.assertEqual(expected, results)
self.assertEqual(expected, results)
def test_delay_PUT_volumes(self):
- """
- Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere is still
- OK after 5 requests...but then after 11 total requests, PUT limiting
- kicks in.
+ """Test delay on /volumes.
+
+ Ensure PUT on /volumes limits at 5 requests, and PUT elsewhere
+ is still OK after 5 requests...but then after 11 total requests,
+ PUT limiting kicks in.
"""
# First 6 requests on PUT /volumes
expected = [None] * 5 + [12.0]
self.assertEqual(expected, results)
def test_delay_PUT_wait(self):
- """
- Ensure after hitting the limit and then waiting for the correct
- amount of time, the limit will be lifted.
+ """Test limit is lifted again.
+
+ Ensure after hitting the limit and then waiting for
+ the correct amount of time, the limit will be lifted.
"""
expected = [None] * 10 + [6.0]
results = list(self._check(11, "PUT", "/anything"))
self.assertEqual(expected, results)
def test_multiple_delays(self):
- """
- Ensure multiple requests still get a delay.
- """
+ """Ensure multiple requests still get a delay."""
expected = [None] * 10 + [6.0] * 10
results = list(self._check(20, "PUT", "/anything"))
self.assertEqual(expected, results)
self.assertEqual(expected, results)
def test_user_limit(self):
- """
- Test user-specific limits.
- """
+ """Test user-specific limits."""
self.assertEqual(self.limiter.levels['user3'], [])
self.assertEqual(len(self.limiter.levels['user0']), 2)
def test_multiple_users(self):
- """
- Tests involving multiple users.
- """
+ """Tests involving multiple users."""
# User0
expected = [None] * 2 + [30.0] * 8
class WsgiLimiterTest(BaseLimitTestSuite):
- """
- Tests for `limits.WsgiLimiter` class.
- """
+
+ """Tests for `limits.WsgiLimiter` class."""
def setUp(self):
"""Run before each test."""
class FakeHttplibSocket(object):
- """
- Fake `httplib.HTTPResponse` replacement.
- """
+
+ """Fake `httplib.HTTPResponse` replacement."""
def __init__(self, response_string):
"""Initialize new `FakeHttplibSocket`."""
class FakeHttplibConnection(object):
- """
- Fake `httplib.HTTPConnection`.
- """
+
+ """Fake `httplib.HTTPConnection`."""
def __init__(self, app, host):
- """
- Initialize `FakeHttplibConnection`.
- """
+ """Initialize `FakeHttplibConnection`."""
self.app = app
self.host = host
def request(self, method, path, body="", headers=None):
- """
- Requests made via this connection actually get translated and routed
- into our WSGI app, we then wait for the response and turn it back into
- an `httplib.HTTPResponse`.
+ """Fake request handler.
+
+ Requests made via this connection actually get translated and
+ routed into our WSGI app, we then wait for the response and turn
+ it back into an `httplib.HTTPResponse`.
"""
if not headers:
headers = {}
class WsgiLimiterProxyTest(BaseLimitTestSuite):
- """
- Tests for the `limits.WsgiLimiterProxy` class.
- """
+
+ """Tests for the `limits.WsgiLimiterProxy` class."""
def setUp(self):
- """
+ """setUp() for WsgiLimiterProxyTest.
+
Do some nifty HTTP/WSGI magic which allows for WSGI to be called
directly by something like the `httplib` library.
"""
class MockKeyManager(key_mgr.KeyManager):
- """
+
+ """Mocking manager for integration tests.
+
This mock key manager implementation supports all the methods specified
by the key manager interface. This implementation stores keys within a
dictionary, and as a result, it is not acceptable for use across different
driver_cls = filter_scheduler.FilterScheduler
def test_create_volume_no_hosts(self):
- """
- Ensure empty hosts & child_zones result in NoValidHosts exception.
- """
+ """Ensure empty hosts/child_zones result in NoValidHosts exception."""
def _fake_empty_call_zone_method(*args, **kwargs):
return []
def stub_out_https_backend(stubs):
- """
- Stubs out the httplib.HTTPRequest.getresponse to return
- faked-out data instead of grabbing actual contents of a resource
+ """Stub out the httplib.HTTPRequest.getresponse.
+
+ return faked-out data instead of grabbing actual contents of a resource.
The stubbed getresponse() returns an iterator over
the data "I am a teapot, short and stout\n"
size=0,
object_count=0,
project_id='fake'):
- """
- Create a backup entry in the DB.
+ """Create a backup entry in the DB.
+
Return the entry ID
"""
backup = {}
display_description='this is a test volume',
status='backing-up',
size=1):
- """
- Create a volume entry in the DB.
+ """Create a volume entry in the DB.
+
Return the entry ID
"""
vol = {}
user="openstack_citest",
passwd="openstack_citest",
database="openstack_citest"):
- """
+ """Return connect string.
+
Try to get a connection with a very specific set of values, if we get
- these then we'll run the tests, otherwise they are skipped
+ these then we'll run the tests, otherwise they are skipped.
"""
if backend == "postgres":
backend = "postgresql+psycopg2"
os.unsetenv('PGUSER')
def test_walk_versions(self):
- """
+ """Test walk versions.
+
Walks all version scripts for each tested database, ensuring
that there are no errors in the version scripts for each engine
"""
self._walk_versions(engine, self.snake_walk)
def test_mysql_connect_fail(self):
- """
+ """Test for mysql connection failure.
+
Test that we can trigger a mysql connection failure and we fail
gracefully to ensure we don't break people without mysql
"""
@testtools.skipUnless(_have_mysql(), "mysql not available")
def test_mysql_innodb(self):
- """
- Test that table creation on mysql only builds InnoDB tables
- """
+ """Test that table creation on mysql only builds InnoDB tables."""
# add this to the global lists to make reset work with it, it's removed
# automaticaly in tearDown so no need to clean it up here.
connect_string = _get_connect_string('mysql')
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
def test_postgresql_connect_fail(self):
- """
+ """Test connection failure on PostgrSQL.
+
Test that we can trigger a postgres connection failure and we fail
- gracefully to ensure we don't break people without postgres
+ gracefully to ensure we don't break people without postgres.
"""
if _is_backend_avail('postgres', user="openstack_cifail"):
self.fail("Shouldn't have connected")
TestMigrations.REPOSITORY))
def _migrate_up(self, engine, version, with_data=False):
- """migrate up to a new version of the db.
+ """Migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _prerun_### and
class FakeResponce(object):
def __init__(self, status):
- """
+ """Initialize FakeResponce.
+
:param status: Either 'failed' or 'passed'
"""
self.Status = status
"""Fake IBM XIV and DS8K Proxy Driver."""
def __init__(self, xiv_ds8k_info, logger, expt, driver=None):
- """
- Initialize Proxy
- """
+ """Initialize Proxy."""
self.xiv_ds8k_info = xiv_ds8k_info
self.logger = logger
return dict(rv.iteritems())
def delete(self, context, transfer_id):
- """
- Make the RPC call to delete a volume transfer.
- """
+ """Make the RPC call to delete a volume transfer."""
volume_api.check_policy(context, 'delete_transfer')
transfer = self.db.transfer_get(context, transfer_id)
raise paramiko.SSHException(msg)
def get(self):
- """
- Return an item from the pool, when one is available. This may
- cause the calling greenthread to block. Check if a connection is active
- before returning it. For dead connections create and return a new
- connection.
+ """Return an item from the pool, when one is available.
+
+ This may cause the calling greenthread to block. Check if a
+ connection is active before returning it.
+
+ For dead connections create and return a new connection.
"""
conn = super(SSHPool, self).get()
if conn:
volume['name'], 'volume')
def _get_mount_point_for_share(self, nfs_share):
- """
+ """Get Mount point for a share.
+
:param nfs_share: example 172.18.194.100:/var/nfs
"""
return os.path.join(self.configuration.nexenta_mount_point_base,
@utils.synchronized('3par', external=True)
def create_volume_from_snapshot(self, volume, snapshot):
- """
- Creates a volume from a snapshot.
+ """Create a volume from a snapshot.
TODO: support using the size from the user.
"""
@utils.synchronized('3par', external=True)
def create_volume_from_snapshot(self, volume, snapshot):
- """
- Creates a volume from a snapshot.
+ """Creates a volume from a snapshot.
TODO: support using the size from the user.
"""
self.configuration.append_config_values(zadara_opts)
def do_setup(self, context):
- """
- Any initialization the volume driver does while starting.
+ """Any initialization the volume driver does while starting.
+
Establishes initial connection with VPSA and retrieves access_key.
"""
self.vpsa = ZadaraVPSAConnection(self.configuration)
def _xml_parse_helper(self, xml_tree, first_level, search_tuple,
first=True):
- """
- Helper for parsing VPSA's XML output.
+ """Helper for parsing VPSA's XML output.
Returns single item if first==True or list for multiple selection.
If second argument in search_tuple is None - returns all items with
size=volume['size'])
def delete_volume(self, volume):
- """
- Delete volume.
+ """Delete volume.
Return ok if doesn't exist. Auto detach from all servers.
"""
pass
def initialize_connection(self, volume, connector):
- """
- Attach volume to initiator/host.
+ """Attach volume to initiator/host.
During this call VPSA exposes volume to particular Initiator. It also
creates a 'server' entity for Initiator (if it was not created before)
'data': properties}
def terminate_connection(self, volume, connector, **kwargs):
- """
- Detach volume from the initiator.
- """
+ """Detach volume from the initiator."""
# Get server name for IQN
initiator_name = connector['initiator']
vpsa_srv = self._get_server_name(initiator_name)
class _AnsiColorizer(object):
- """
- A colorizer is an object that loosely wraps around a stream, allowing
+ """ANSI colorizer that wraps a stream object.
+
+ colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
self.stream = stream
def supported(cls, stream=sys.stdout):
- """
+ """Check if platform is supported.
+
A class method that returns True if the current platform supports
- coloring terminal output using this method. Returns False otherwise.
+ coloring terminal output using this method.
+
+ Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
supported = classmethod(supported)
def write(self, text, color):
- """
- Write the given text to the stream in the given color.
+ """Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
class _Win32Colorizer(object):
- """
- See _AnsiColorizer docstring.
- """
+ """See _AnsiColorizer docstring."""
def __init__(self, stream):
import win32console
red, green, blue, bold = (win32console.FOREGROUND_RED,
class _NullColorizer(object):
- """
- See _AnsiColorizer docstring.
- """
+ """See _AnsiColorizer docstring."""
def __init__(self, stream):
self.stream = stream
commands = {posargs}
[flake8]
-ignore = E711,E712,F401,F403,F841,H302,H303,H304,H402,H404,H803
+ignore = E711,E712,F401,F403,F841,H302,H303,H304,H402,H803
builtins = _
exclude = .git,.venv,.tox,dist,doc,common,*egg,build