This patch adds filter options to backup list.
Like cinder list, now cinder backup-list accept
parameters.
This blueprint is complimented with the client part
https://blueprints.launchpad.net/python-cinderclient
/+spec/add-filter-options-to-backup-list
blueprint: add-filter-options-to-backup-list
DocImpact
Change-Id: I9e095ee44a03744eb3f319323d9130a89196614e
"""Returns a detailed list of backups."""
return self._get_backups(req, is_detail=True)
+ @staticmethod
+ def _get_backup_filter_options():
+ """Return volume search options allowed by non-admin."""
+ return ('name', 'status', 'volume_id')
+
def _get_backups(self, req, is_detail):
"""Returns a list of backups, transformed through view builder."""
context = req.environ['cinder.context']
- backups = self.backup_api.get_all(context)
+ filters = req.params.copy()
+
+ utils.remove_invalid_filter_options(context,
+ filters,
+ self._get_backup_filter_options())
+
+ if 'name' in filters:
+ filters['display_name'] = filters['name']
+ del filters['name']
+
+ backups = self.backup_api.get_all(context, search_opts=filters)
limited_list = common.limited(backups, req)
if is_detail:
from cinder.api import common
from cinder.api.openstack import wsgi
-from cinder.api.v1 import volumes
from cinder.api import xmlutil
from cinder import exception
from cinder.openstack.common import log as logging
#filter out invalid option
allowed_search_options = ('status', 'volume_id', 'display_name')
- volumes.remove_invalid_options(context, search_opts,
- allowed_search_options)
+ utils.remove_invalid_filter_options(context, search_opts,
+ allowed_search_options)
snapshots = self.volume_api.get_all_snapshots(context,
search_opts=search_opts)
search_opts['metadata'] = ast.literal_eval(search_opts['metadata'])
context = req.environ['cinder.context']
- remove_invalid_options(context,
- search_opts, self._get_volume_search_options())
+ utils.remove_invalid_filter_options(context,
+ search_opts,
+ self._get_volume_search_options())
volumes = self.volume_api.get_all(context, marker=None, limit=None,
sort_key='created_at',
def create_resource(ext_mgr):
return wsgi.Resource(VolumeController(ext_mgr))
-
-
-def remove_invalid_options(context, search_options, allowed_search_options):
- """Remove search options that are not valid for non-admin API/context."""
- if context.is_admin:
- # Allow all options
- return
- # Otherwise, strip out all unknown options
- unknown_options = [opt for opt in search_options
- if opt not in allowed_search_options]
- bad_options = ", ".join(unknown_options)
- log_msg = _("Removing options '%(bad_options)s'"
- " from query") % {'bad_options': bad_options}
- LOG.debug(log_msg)
- for opt in unknown_options:
- del search_options[opt]
from cinder.api import common
from cinder.api.openstack import wsgi
-from cinder.api.v2 import volumes
from cinder.api import xmlutil
from cinder import exception
from cinder.openstack.common import log as logging
#filter out invalid option
allowed_search_options = ('status', 'volume_id', 'name')
- volumes.remove_invalid_options(context, search_opts,
- allowed_search_options)
+ utils.remove_invalid_filter_options(context, search_opts,
+ allowed_search_options)
# NOTE(thingee): v2 API allows name instead of display_name
if 'name' in search_opts:
params.pop('offset', None)
filters = params
- remove_invalid_options(context,
- filters, self._get_volume_filter_options())
+ utils.remove_invalid_filter_options(context,
+ filters,
+ self._get_volume_filter_options())
# NOTE(thingee): v2 API allows name instead of display_name
if 'name' in filters:
def create_resource(ext_mgr):
return wsgi.Resource(VolumeController(ext_mgr))
-
-
-def remove_invalid_options(context, filters, allowed_search_options):
- """Remove search options that are not valid for non-admin API/context."""
- if context.is_admin:
- # Allow all options
- return
- # Otherwise, strip out all unknown options
- unknown_options = [opt for opt in filters
- if opt not in allowed_search_options]
- bad_options = ", ".join(unknown_options)
- log_msg = _("Removing options '%s' from query") % bad_options
- LOG.debug(log_msg)
- for opt in unknown_options:
- del filters[opt]
backup['host'],
backup['id'])
- # TODO(moorehef): Add support for search_opts, discarded atm
def get_all(self, context, search_opts=None):
if search_opts is None:
search_opts = {}
check_policy(context, 'get_all')
if context.is_admin:
- backups = self.db.backup_get_all(context)
+ backups = self.db.backup_get_all(context, filters=search_opts)
else:
backups = self.db.backup_get_all_by_project(context,
- context.project_id)
+ context.project_id,
+ filters=search_opts)
return backups
return IMPL.backup_get(context, backup_id)
-def backup_get_all(context):
+def backup_get_all(context, filters=None):
"""Get all backups."""
- return IMPL.backup_get_all(context)
+ return IMPL.backup_get_all(context, filters=filters)
def backup_get_all_by_host(context, host):
return IMPL.backup_create(context, values)
-def backup_get_all_by_project(context, project_id):
+def backup_get_all_by_project(context, project_id, filters=None):
"""Get all backups belonging to a project."""
- return IMPL.backup_get_all_by_project(context, project_id)
+ return IMPL.backup_get_all_by_project(context, project_id,
+ filters=filters)
def backup_update(context, backup_id, values):
return result
+def _backup_get_all(context, filters=None):
+ session = get_session()
+ with session.begin():
+ # Generate the query
+ query = model_query(context, models.Backup)
+ if filters:
+ query = query.filter_by(**filters)
+
+ return query.all()
+
+
@require_admin_context
-def backup_get_all(context):
- return model_query(context, models.Backup).all()
+def backup_get_all(context, filters=None):
+ return _backup_get_all(context, filters)
@require_admin_context
@require_context
-def backup_get_all_by_project(context, project_id):
+def backup_get_all_by_project(context, project_id, filters=None):
+
authorize_project_context(context, project_id)
+ if not filters:
+ filters = {}
+ else:
+ filters = filters.copy()
+
+ filters['project_id'] = project_id
- return model_query(context, models.Backup).\
- filter_by(project_id=project_id).all()
+ return _backup_get_all(context, filters)
@require_context
db.backup_destroy(context.get_admin_context(), backup_id2)
db.backup_destroy(context.get_admin_context(), backup_id1)
+ def test_list_backups_detail_using_filters(self):
+ backup_id1 = self._create_backup(display_name='test2')
+ backup_id2 = self._create_backup(status='available')
+ backup_id3 = self._create_backup(volume_id=4321)
+
+ req = webob.Request.blank('/v2/fake/backups/detail?name=test2')
+ req.method = 'GET'
+ req.headers['Content-Type'] = 'application/json'
+ req.headers['Accept'] = 'application/json'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(len(res_dict['backups']), 1)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_dict['backups'][0]['id'], backup_id1)
+
+ req = webob.Request.blank('/v2/fake/backups/detail?status=available')
+ req.method = 'GET'
+ req.headers['Content-Type'] = 'application/json'
+ req.headers['Accept'] = 'application/json'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(len(res_dict['backups']), 1)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_dict['backups'][0]['id'], backup_id2)
+
+ req = webob.Request.blank('/v2/fake/backups/detail?volume_id=4321')
+ req.method = 'GET'
+ req.headers['Content-Type'] = 'application/json'
+ req.headers['Accept'] = 'application/json'
+ res = req.get_response(fakes.wsgi_app())
+ res_dict = json.loads(res.body)
+
+ self.assertEqual(len(res_dict['backups']), 1)
+ self.assertEqual(res.status_int, 200)
+ self.assertEqual(res_dict['backups'][0]['id'], backup_id3)
+
+ db.backup_destroy(context.get_admin_context(), backup_id3)
+ db.backup_destroy(context.get_admin_context(), backup_id2)
+ db.backup_destroy(context.get_admin_context(), backup_id1)
+
def test_list_backups_detail_xml(self):
backup_id1 = self._create_backup()
backup_id2 = self._create_backup()
all_backups = db.backup_get_all(self.ctxt)
self._assertEqualListsOfObjects(self.created, all_backups)
+ def tests_backup_get_all_by_filter(self):
+ filters = {'status': self.created[1]['status']}
+ filtered_backups = db.backup_get_all(self.ctxt, filters=filters)
+ self._assertEqualListsOfObjects([self.created[1]], filtered_backups)
+
+ filters = {'display_name': self.created[1]['display_name']}
+ filtered_backups = db.backup_get_all(self.ctxt, filters=filters)
+ self._assertEqualListsOfObjects([self.created[1]], filtered_backups)
+
+ filters = {'volume_id': self.created[1]['volume_id']}
+ filtered_backups = db.backup_get_all(self.ctxt, filters=filters)
+ self._assertEqualListsOfObjects([self.created[1]], filtered_backups)
+
def test_backup_get_all_by_host(self):
byhost = db.backup_get_all_by_host(self.ctxt,
self.created[1]['host'])
volume['metadata'].update(visible_admin_meta)
else:
volume['metadata'] = visible_admin_meta
+
+
+def remove_invalid_filter_options(context, filters,
+ allowed_search_options):
+ """Remove search options that are not valid
+ for non-admin API/context.
+ """
+ if context.is_admin:
+ # Allow all options
+ return
+ # Otherwise, strip out all unknown options
+ unknown_options = [opt for opt in filters
+ if opt not in allowed_search_options]
+ bad_options = ", ".join(unknown_options)
+ log_msg = "Removing options '%s' from query." % bad_options
+ LOG.debug(log_msg)
+ for opt in unknown_options:
+ del filters[opt]