from oslo_utils import strutils
from webob import exc
+from cinder.api import common
from cinder.api.openstack import wsgi
from cinder.api.v2.views import types as views_types
from cinder.api import xmlutil
def _get_volume_types(self, req):
"""Helper function that returns a list of type dicts."""
+ params = req.params.copy()
+ marker, limit, offset = common.get_pagination_params(params)
+ sort_keys, sort_dirs = common.get_sort_params(params)
+ # NOTE(wanghao): Currently, we still only support to filter by
+ # is_public. If we want to filter by more args, we should set params
+ # to filters.
filters = {}
context = req.environ['cinder.context']
if not context.is_admin and self._validate_policy(context):
req.params.get('is_public', None))
else:
filters['is_public'] = True
- limited_types = volume_types.get_all_types(
- context, search_opts=filters).values()
+ utils.remove_invalid_filter_options(context,
+ filters,
+ self._get_vol_type_filter_options()
+ )
+ limited_types = volume_types.get_all_types(context,
+ filters=filters,
+ marker=marker, limit=limit,
+ sort_keys=sort_keys,
+ sort_dirs=sort_dirs,
+ offset=offset,
+ list_result=True)
return limited_types
+ def _get_vol_type_filter_options(self):
+ """Return volume type search options allowed by non-admin."""
+ return ['is_public']
+
def create_resource():
return wsgi.Resource(VolumeTypesController())
"""Index over trimmed volume types."""
volume_types_list = [self.show(request, volume_type, True)
for volume_type in volume_types]
- return dict(volume_types=volume_types_list)
+ volume_type_links = self._get_collection_links(request, volume_types,
+ 'types')
+ volume_types_dict = dict(volume_types=volume_types_list)
+ if volume_type_links:
+ volume_types_dict['volume_type_links'] = volume_type_links
+ return volume_types_dict
return IMPL.volume_type_update(context, volume_type_id, values)
-def volume_type_get_all(context, inactive=False, filters=None):
+def volume_type_get_all(context, inactive=False, filters=None, marker=None,
+ limit=None, sort_keys=None, sort_dirs=None,
+ offset=None, list_result=False):
"""Get all volume types.
:param context: context to query under
:param inactive: Include inactive volume types to the result set
:param filters: Filters for the query in the form of key/value.
+ :param marker: the last item of the previous page, used to determine the
+ next page of results to return
+ :param limit: maximum number of items to return
+ :param sort_keys: list of attributes by which results should be sorted,
+ paired with corresponding item in sort_dirs
+ :param sort_dirs: list of directions in which results should be sorted,
+ paired with corresponding item in sort_keys
+ :param list_result: For compatibility, if list_result = True, return a list
+ instead of dict.
:is_public: Filter volume types based on visibility:
* **False**: List private volume types only
* **None**: List both public and private volume types
- :returns: list of matching volume types
+ :returns: list/dict of matching volume types
"""
- return IMPL.volume_type_get_all(context, inactive, filters)
+ return IMPL.volume_type_get_all(context, inactive, filters, marker=marker,
+ limit=limit, sort_keys=sort_keys,
+ sort_dirs=sort_dirs, offset=offset,
+ list_result=list_result)
def volume_type_get(context, id, inactive=False, expected_fields=None):
return volume_type_ref
-def _volume_type_get_query(context, session=None, read_deleted=None,
+def _volume_type_get_query(context, session=None, read_deleted='no',
expected_fields=None):
expected_fields = expected_fields or []
query = model_query(context,
return query
+def _process_volume_types_filters(query, filters):
+ context = filters.pop('context', None)
+ if 'is_public' in filters and filters['is_public'] is not None:
+ the_filter = [models.VolumeTypes.is_public == filters['is_public']]
+ if filters['is_public'] and context.project_id is not None:
+ projects_attr = getattr(models.VolumeTypes, 'projects')
+ the_filter.extend([
+ projects_attr.any(project_id=context.project_id, deleted=0)
+ ])
+ if len(the_filter) > 1:
+ query = query.filter(or_(*the_filter))
+ else:
+ query = query.filter(the_filter[0])
+ if 'is_public' in filters:
+ del filters['is_public']
+ if filters:
+ # Ensure that filters' keys exist on the model
+ if not is_valid_model_filters(models.VolumeTypes, filters):
+ return
+ if filters.get('extra_specs') is not None:
+ the_filter = []
+ searchdict = filters.get('extra_specs')
+ extra_specs = getattr(models.VolumeTypes, 'extra_specs')
+ for k, v in searchdict.items():
+ the_filter.extend([extra_specs.any(key=k, value=v,
+ deleted=False)])
+ if len(the_filter) > 1:
+ query = query.filter(and_(*the_filter))
+ else:
+ query = query.filter(the_filter[0])
+ del filters['extra_specs']
+ query = query.filter_by(**filters)
+ return query
+
+
@require_admin_context
def volume_type_update(context, volume_type_id, values):
session = get_session()
@require_context
-def volume_type_get_all(context, inactive=False, filters=None):
- """Returns a dict describing all volume_types with name as key."""
- filters = filters or {}
-
- read_deleted = "yes" if inactive else "no"
-
- query = _volume_type_get_query(context, read_deleted=read_deleted)
-
- if 'is_public' in filters and filters['is_public'] is not None:
- the_filter = [models.VolumeTypes.is_public == filters['is_public']]
- if filters['is_public'] and context.project_id is not None:
- projects_attr = getattr(models.VolumeTypes, 'projects')
- the_filter.extend([
- projects_attr.any(project_id=context.project_id, deleted=0)
- ])
- if len(the_filter) > 1:
- query = query.filter(or_(*the_filter))
- else:
- query = query.filter(the_filter[0])
+def volume_type_get_all(context, inactive=False, filters=None, marker=None,
+ limit=None, sort_keys=None, sort_dirs=None,
+ offset=None, list_result=False):
+ """Returns a dict describing all volume_types with name as key.
- rows = query.order_by("name").all()
+ If no sort parameters are specified then the returned volume types are
+ sorted first by the 'created_at' key and then by the 'id' key in descending
+ order.
- result = {}
- for row in rows:
- result[row['name']] = _dict_with_extra_specs_if_authorized(context,
- row)
+ :param context: context to query under
+ :param marker: the last item of the previous page, used to determine the
+ next page of results to return
+ :param limit: maximum number of items to return
+ :param sort_keys: list of attributes by which results should be sorted,
+ paired with corresponding item in sort_dirs
+ :param sort_dirs: list of directions in which results should be sorted,
+ paired with corresponding item in sort_keys
+ :param filters: dictionary of filters; values that are in lists, tuples,
+ or sets cause an 'IN' operation, while exact matching
+ is used for other values, see _process_volume_type_filters
+ function for more information
+ :param list_result: For compatibility, if list_result = True, return a list
+ instead of dict.
+ :returns: list/dict of matching volume types
+ """
+ session = get_session()
+ with session.begin():
+ # Add context for _process_volume_types_filters
+ filters = filters or {}
+ filters['context'] = context
+ # Generate the query
+ query = _generate_paginate_query(context, session, marker, limit,
+ sort_keys, sort_dirs, filters, offset,
+ models.VolumeTypes)
+ # No volume types would match, return empty dict or list
+ if query is None:
+ if list_result:
+ return []
+ return {}
- return result
+ rows = query.all()
+ if list_result:
+ result = [_dict_with_extra_specs_if_authorized(context, row)
+ for row in rows]
+ return result
+ result = {row['name']: _dict_with_extra_specs_if_authorized(context,
+ row)
+ for row in rows}
+ return result
def _volume_type_get_id_from_volume_type_query(context, id, session=None):
return result[0]
-@require_context
-def _volume_type_get(context, id, session=None, inactive=False,
- expected_fields=None):
- expected_fields = expected_fields or []
+def _volume_type_get_db_object(context, id, session=None, inactive=False,
+ expected_fields=None):
read_deleted = "yes" if inactive else "no"
result = _volume_type_get_query(
context, session, read_deleted, expected_fields).\
filter_by(id=id).\
first()
+ return result
+
+@require_context
+def _volume_type_get(context, id, session=None, inactive=False,
+ expected_fields=None):
+ expected_fields = expected_fields or []
+ result = _volume_type_get_db_object(context, id, session, inactive,
+ expected_fields)
if not result:
raise exception.VolumeTypeNotFound(volume_type_id=id)
models.Snapshot: (_snaps_get_query, _process_snaps_filters, _snapshot_get),
models.Backup: (_backups_get_query, _process_backups_filters, _backup_get),
models.QualityOfServiceSpecs: (_qos_specs_get_query,
- _process_qos_specs_filters, _qos_specs_get)
+ _process_qos_specs_filters, _qos_specs_get),
+ models.VolumeTypes: (_volume_type_get_query, _process_volume_types_filters,
+ _volume_type_get_db_object)
}
@base.CinderObjectRegistry.register
class VolumeTypeList(base.ObjectListBase, base.CinderObject):
- VERSION = '1.0'
+ # Version 1.0: Initial version
+ # Version 1.1: Add pagination support to volume type
+ VERSION = '1.1'
fields = {
'objects': fields.ListOfObjectsField('VolumeType'),
child_versions = {
'1.0': '1.0',
+ '1.1': '1.0',
}
@base.remotable_classmethod
- def get_all(cls, context, inactive=0, search_opts=None):
- types = volume_types.get_all_types(context, inactive, search_opts)
+ def get_all(cls, context, inactive=0, filters=None, marker=None,
+ limit=None, sort_keys=None, sort_dirs=None, offset=None):
+ types = volume_types.get_all_types(context, inactive, filters,
+ marker=marker, limit=limit,
+ sort_keys=sort_keys,
+ sort_dirs=sort_dirs, offset=offset)
expected_attrs = ['extra_specs', 'projects']
return base.obj_make_list(context, cls(context),
objects.VolumeType, types.values(),
return False
-def fake_volume_type_get_all(context, inactive=False, filters=None):
+def fake_volume_type_get_all(context, inactive=False, filters=None,
+ marker=None, limit=None, sort_keys=None,
+ sort_dirs=None, offset=None, list_result=False):
if filters is None or filters['is_public'] is None:
+ if list_result:
+ return VOLUME_TYPES.values()
return VOLUME_TYPES
res = {}
for k, v in VOLUME_TYPES.items():
continue
if v['is_public'] == filters['is_public']:
res.update({k: v})
+ if list_result:
+ return res.values()
return res
from cinder.api.v2 import types
from cinder.api.v2.views import types as views_types
+from cinder import context
from cinder import exception
from cinder import test
from cinder.tests.unit.api import fakes
)
-def return_volume_types_get_all_types(context, search_opts=None):
- return dict(
- vol_type_1=stub_volume_type(1),
- vol_type_2=stub_volume_type(2),
- vol_type_3=stub_volume_type(3)
- )
-
-
-def return_empty_volume_types_get_all_types(context, search_opts=None):
+def return_volume_types_get_all_types(context, filters=None, marker=None,
+ limit=None, sort_keys=None,
+ sort_dirs=None, offset=None,
+ list_result=False):
+ result = dict(vol_type_1=stub_volume_type(1),
+ vol_type_2=stub_volume_type(2),
+ vol_type_3=stub_volume_type(3)
+ )
+ if list_result:
+ return result.values()
+ return result
+
+
+def return_empty_volume_types_get_all_types(context, filters=None, marker=None,
+ limit=None, sort_keys=None,
+ sort_dirs=None, offset=None,
+ list_result=False):
+ if list_result:
+ return []
return {}
class VolumeTypesApiTest(test.TestCase):
+
+ def _create_volume_type(self, volume_type_name, extra_specs=None,
+ is_public=True, projects=None):
+ return volume_types.create(self.ctxt, volume_type_name, extra_specs,
+ is_public, projects).get('id')
+
def setUp(self):
super(VolumeTypesApiTest, self).setUp()
self.controller = types.VolumeTypesController()
+ self.ctxt = context.RequestContext(user_id='fake',
+ project_id='fake',
+ is_admin=True)
+ self.type_id1 = self._create_volume_type('volume_type1',
+ {'key1': 'value1'})
+ self.type_id2 = self._create_volume_type('volume_type2',
+ {'key2': 'value2'})
+ self.type_id3 = self._create_volume_type('volume_type3',
+ {'key3': 'value3'}, False,
+ ['fake'])
def test_volume_types_index(self):
self.stubs.Set(volume_types, 'get_all_types',
self.assertEqual(0, len(res_dict['volume_types']))
+ def test_volume_types_index_with_limit(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?limit=1')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(1, len(res['volume_types']))
+ self.assertEqual(self.type_id3, res['volume_types'][0]['id'])
+
+ expect_next_link = ('http://localhost/v2/fake/types?limit=1'
+ '&marker=%s') % res['volume_types'][0]['id']
+ self.assertEqual(expect_next_link, res['volume_type_links'][0]['href'])
+
+ def test_volume_types_index_with_offset(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?offset=1')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(2, len(res['volume_types']))
+
+ def test_volume_types_index_with_limit_and_offset(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?limit=2&offset=1')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(2, len(res['volume_types']))
+ self.assertEqual(self.type_id2, res['volume_types'][0]['id'])
+ self.assertEqual(self.type_id1, res['volume_types'][1]['id'])
+
+ def test_volume_types_index_with_limit_and_marker(self):
+ req = fakes.HTTPRequest.blank(('/v2/fake/types?limit=1'
+ '&marker=%s') % self.type_id2)
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(1, len(res['volume_types']))
+ self.assertEqual(self.type_id1, res['volume_types'][0]['id'])
+
+ def test_volume_types_index_with_valid_filter(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?is_public=True')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(3, len(res['volume_types']))
+ self.assertEqual(self.type_id3, res['volume_types'][0]['id'])
+ self.assertEqual(self.type_id2, res['volume_types'][1]['id'])
+ self.assertEqual(self.type_id1, res['volume_types'][2]['id'])
+
+ def test_volume_types_index_with_invalid_filter(self):
+ req = fakes.HTTPRequest.blank(('/v2/fake/types?id=%s') % self.type_id1)
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+
+ self.assertEqual(3, len(res['volume_types']))
+
+ def test_volume_types_index_with_sort_keys(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?sort=id')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+ expect_result = [self.type_id1, self.type_id2, self.type_id3]
+ expect_result.sort(reverse=True)
+
+ self.assertEqual(3, len(res['volume_types']))
+ self.assertEqual(expect_result[0], res['volume_types'][0]['id'])
+ self.assertEqual(expect_result[1], res['volume_types'][1]['id'])
+ self.assertEqual(expect_result[2], res['volume_types'][2]['id'])
+
+ def test_volume_types_index_with_sort_and_limit(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?sort=id&limit=2')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+ expect_result = [self.type_id1, self.type_id2, self.type_id3]
+ expect_result.sort(reverse=True)
+
+ self.assertEqual(2, len(res['volume_types']))
+ self.assertEqual(expect_result[0], res['volume_types'][0]['id'])
+ self.assertEqual(expect_result[1], res['volume_types'][1]['id'])
+
+ def test_volume_types_index_with_sort_keys_and_sort_dirs(self):
+ req = fakes.HTTPRequest.blank('/v2/fake/types?sort=id:asc')
+ req.environ['cinder.context'] = self.ctxt
+ res = self.controller.index(req)
+ expect_result = [self.type_id1, self.type_id2, self.type_id3]
+ expect_result.sort()
+
+ self.assertEqual(3, len(res['volume_types']))
+ self.assertEqual(expect_result[0], res['volume_types'][0]['id'])
+ self.assertEqual(expect_result[1], res['volume_types'][1]['id'])
+ self.assertEqual(expect_result[2], res['volume_types'][2]['id'])
+
def test_volume_types_show(self):
self.stubs.Set(volume_types, 'get_volume_type',
return_volume_types_get_volume_type)
'VolumeAttachmentList': '1.0-307d2b6c8dd55ef854f6386898e9e98e',
'VolumeList': '1.1-03ba6cb8c546683e64e15c50042cb1a3',
'VolumeType': '1.0-bf8abbbea2e852ed2e9bac5a9f5f70f2',
- 'VolumeTypeList': '1.0-09b01f4526266c1a58cb206ba509d6d2',
+ 'VolumeTypeList': '1.1-8a1016c03570dc13b9a33fe04a6acb2c',
}
volume_types = objects.VolumeTypeList.get_all(self.context)
self.assertEqual(1, len(volume_types))
TestVolumeType._compare(self, db_volume_type, volume_types[0])
+
+ @mock.patch('cinder.volume.volume_types.get_all_types')
+ def test_get_all_with_pagination(self, get_all_types):
+ db_volume_type = fake_volume.fake_db_volume_type()
+ get_all_types.return_value = {db_volume_type['name']: db_volume_type}
+
+ volume_types = objects.VolumeTypeList.get_all(self.context,
+ filters={'is_public':
+ True},
+ marker=None,
+ limit=1,
+ sort_keys='id',
+ sort_dirs='desc',
+ offset=None)
+ self.assertEqual(1, len(volume_types))
+ TestVolumeType._compare(self, db_volume_type, volume_types[0])
vol_types = volume_types.get_all_types(
self.ctxt,
- search_opts={'extra_specs': {"key1": "val1"}})
+ filters={'extra_specs': {"key1": "val1"}})
self.assertEqual(1, len(vol_types))
self.assertIn("type1", vol_types.keys())
self.assertEqual({"key1": "val1", "key2": "val2"},
vol_types = volume_types.get_all_types(
self.ctxt,
- search_opts={'extra_specs': {"key2": "val2"}})
+ filters={'extra_specs': {"key2": "val2"}})
self.assertEqual(2, len(vol_types))
self.assertIn("type1", vol_types.keys())
self.assertIn("type2", vol_types.keys())
vol_types = volume_types.get_all_types(
self.ctxt,
- search_opts={'extra_specs': {"key3": "val3"}})
+ filters={'extra_specs': {"key3": "val3"}})
self.assertEqual(1, len(vol_types))
self.assertIn("type2", vol_types.keys())
vol_types = volume_types.get_all_types(
self.ctxt,
- search_opts={'extra_specs': {"key1": "val1",
- "key3": "val3"}})
+ filters={'extra_specs': {"key1": "val1", "key3": "val3"}})
self.assertEqual(2, len(vol_types))
self.assertIn("type1", vol_types.keys())
self.assertIn("type3", vol_types.keys())
db.volume_type_destroy(context, id)
-def get_all_types(context, inactive=0, search_opts=None):
+def get_all_types(context, inactive=0, filters=None, marker=None,
+ limit=None, sort_keys=None, sort_dirs=None,
+ offset=None, list_result=False):
"""Get all non-deleted volume_types.
Pass true as argument if you want deleted volume types returned also.
"""
- search_opts = search_opts or {}
- filters = {}
-
- if 'is_public' in search_opts:
- filters['is_public'] = search_opts['is_public']
- del search_opts['is_public']
-
- vol_types = db.volume_type_get_all(context, inactive, filters=filters)
-
- if search_opts:
- LOG.debug("Searching by: %s", search_opts)
-
- def _check_extra_specs_match(vol_type, searchdict):
- for k, v in searchdict.items():
- if (k not in vol_type['extra_specs'].keys()
- or vol_type['extra_specs'][k] != v):
- return False
- return True
-
- # search_option to filter_name mapping.
- filter_mapping = {'extra_specs': _check_extra_specs_match}
-
- result = {}
- for type_name, type_args in vol_types.items():
- # go over all filters in the list
- for opt, values in search_opts.items():
- try:
- filter_func = filter_mapping[opt]
- except KeyError:
- # no such filter - ignore it, go to next filter
- continue
- else:
- if filter_func(type_args, values):
- result[type_name] = type_args
- break
- vol_types = result
+ vol_types = db.volume_type_get_all(context, inactive, filters=filters,
+ marker=marker, limit=limit,
+ sort_keys=sort_keys,
+ sort_dirs=sort_dirs, offset=offset,
+ list_result=list_result)
return vol_types