from cinder.api.openstack import common
from cinder.api.openstack import wsgi
from cinder.api.openstack import xmlutil
+from cinder.api.openstack.volume import volumes
from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
search_opts = {}
search_opts.update(req.GET)
+ allowed_search_options = ('status', 'volume_id', 'display_name')
+ volumes.remove_invalid_options(context, search_opts,
+ allowed_search_options)
snapshots = self.volume_api.get_all_snapshots(context,
search_opts=search_opts)
def _get_volume_search_options(self):
"""Return volume search options allowed by non-admin."""
- return ('name', 'status')
+ return ('display_name', 'status')
def create_resource(ext_mgr):
log_msg = _("Removing options '%(bad_options)s' from query") % locals()
LOG.debug(log_msg)
for opt in unknown_options:
- search_options.pop(opt, None)
+ del search_options[opt]
resp_snapshot = resp_snapshots.pop()
self.assertEqual(resp_snapshot['id'], UUID)
+ def test_snapshot_list_by_status(self):
+ def stub_snapshot_get_all_by_project(context, project_id):
+ return [
+ fakes.stub_snapshot(1, display_name='backup1',
+ status='available'),
+ fakes.stub_snapshot(2, display_name='backup2',
+ status='available'),
+ fakes.stub_snapshot(3, display_name='backup3',
+ status='creating'),
+ ]
+ self.stubs.Set(db, 'snapshot_get_all_by_project',
+ stub_snapshot_get_all_by_project)
+
+ # no status filter
+ req = fakes.HTTPRequest.blank('/v1/snapshots')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 3)
+ # single match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?status=creating')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 1)
+ self.assertEqual(resp['snapshots'][0]['status'], 'creating')
+ # multiple match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?status=available')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 2)
+ for snapshot in resp['snapshots']:
+ self.assertEquals(snapshot['status'], 'available')
+ # no match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?status=error')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 0)
+
+ def test_snapshot_list_by_volume(self):
+ def stub_snapshot_get_all_by_project(context, project_id):
+ return [
+ fakes.stub_snapshot(1, volume_id='vol1', status='creating'),
+ fakes.stub_snapshot(2, volume_id='vol1', status='available'),
+ fakes.stub_snapshot(3, volume_id='vol2', status='available'),
+ ]
+ self.stubs.Set(db, 'snapshot_get_all_by_project',
+ stub_snapshot_get_all_by_project)
+
+ # single match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?volume_id=vol2')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 1)
+ self.assertEqual(resp['snapshots'][0]['volume_id'], 'vol2')
+ # multiple match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?volume_id=vol1')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 2)
+ for snapshot in resp['snapshots']:
+ self.assertEqual(snapshot['volume_id'], 'vol1')
+ # multiple filters
+ req = fakes.HTTPRequest.blank('/v1/snapshots?volume_id=vol1'
+ '&status=available')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 1)
+ self.assertEqual(resp['snapshots'][0]['volume_id'], 'vol1')
+ self.assertEqual(resp['snapshots'][0]['status'], 'available')
+
+ def test_snapshot_list_by_name(self):
+ def stub_snapshot_get_all_by_project(context, project_id):
+ return [
+ fakes.stub_snapshot(1, display_name='backup1'),
+ fakes.stub_snapshot(2, display_name='backup2'),
+ fakes.stub_snapshot(3, display_name='backup3'),
+ ]
+ self.stubs.Set(db, 'snapshot_get_all_by_project',
+ stub_snapshot_get_all_by_project)
+
+ # no display_name filter
+ req = fakes.HTTPRequest.blank('/v1/snapshots')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 3)
+ # filter by one name
+ req = fakes.HTTPRequest.blank('/v1/snapshots?display_name=backup2')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 1)
+ self.assertEquals(resp['snapshots'][0]['display_name'], 'backup2')
+ # filter no match
+ req = fakes.HTTPRequest.blank('/v1/snapshots?display_name=backup4')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['snapshots']), 0)
+
def test_admin_list_snapshots_limited_to_project(self):
req = fakes.HTTPRequest.blank('/v1/fake/snapshots',
use_admin_context=True)
'size': 1}]}
self.assertEqual(res_dict, expected)
+ def test_volume_list_by_name(self):
+ def stub_volume_get_all_by_project(context, project_id):
+ return [
+ fakes.stub_volume(1, display_name='vol1'),
+ fakes.stub_volume(2, display_name='vol2'),
+ fakes.stub_volume(3, display_name='vol3'),
+ ]
+ self.stubs.Set(db, 'volume_get_all_by_project',
+ stub_volume_get_all_by_project)
+
+ # no display_name filter
+ req = fakes.HTTPRequest.blank('/v1/volumes')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 3)
+ # filter on display_name
+ req = fakes.HTTPRequest.blank('/v1/volumes?display_name=vol2')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 1)
+ self.assertEqual(resp['volumes'][0]['display_name'], 'vol2')
+ # filter no match
+ req = fakes.HTTPRequest.blank('/v1/volumes?display_name=vol4')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 0)
+
+ def test_volume_list_by_status(self):
+ def stub_volume_get_all_by_project(context, project_id):
+ return [
+ fakes.stub_volume(1, display_name='vol1', status='available'),
+ fakes.stub_volume(2, display_name='vol2', status='available'),
+ fakes.stub_volume(3, display_name='vol3', status='in-use'),
+ ]
+ self.stubs.Set(db, 'volume_get_all_by_project',
+ stub_volume_get_all_by_project)
+ # no status filter
+ req = fakes.HTTPRequest.blank('/v1/volumes')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 3)
+ # single match
+ req = fakes.HTTPRequest.blank('/v1/volumes?status=in-use')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 1)
+ self.assertEqual(resp['volumes'][0]['status'], 'in-use')
+ # multiple match
+ req = fakes.HTTPRequest.blank('/v1/volumes?status=available')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 2)
+ for volume in resp['volumes']:
+ self.assertEqual(volume['status'], 'available')
+ # multiple filters
+ req = fakes.HTTPRequest.blank('/v1/volumes?status=available&'
+ 'display_name=vol1')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 1)
+ self.assertEqual(resp['volumes'][0]['display_name'], 'vol1')
+ self.assertEqual(resp['volumes'][0]['status'], 'available')
+ # no match
+ req = fakes.HTTPRequest.blank('/v1/volumes?status=in-use&'
+ 'display_name=vol1')
+ resp = self.controller.index(req)
+ self.assertEqual(len(resp['volumes']), 0)
+
def test_volume_show(self):
req = fakes.HTTPRequest.blank('/v1/volumes/1')
res_dict = self.controller.show(req, '1')
filter_mapping = {'metadata': _check_metadata_match}
result = []
+ not_found = object()
for volume in volumes:
# go over all filters in the list
for opt, values in search_opts.iteritems():
try:
filter_func = filter_mapping[opt]
except KeyError:
- # no such filter - ignore it, go to next filter
- continue
- else:
- if filter_func(volume, values):
- result.append(volume)
- break
+ def filter_func(volume, value):
+ return volume.get(opt, not_found) == value
+ if not filter_func(volume, values):
+ break # volume doesn't match this filter
+ else: # did not break out loop
+ result.append(volume) # volume matches all filters
volumes = result
return volumes
if (context.is_admin and 'all_tenants' in search_opts):
# Need to remove all_tenants to pass the filtering below.
del search_opts['all_tenants']
- return self.db.snapshot_get_all(context)
+ snapshots = self.db.snapshot_get_all(context)
else:
- return self.db.snapshot_get_all_by_project(context,
- context.project_id)
+ snapshots = self.db.snapshot_get_all_by_project(
+ context, context.project_id)
+
+ if search_opts:
+ LOG.debug(_("Searching by: %s") % str(search_opts))
+
+ results = []
+ not_found = object()
+ for snapshot in snapshots:
+ for opt, value in search_opts.iteritems():
+ if snapshot.get(opt, not_found) != value:
+ break
+ else:
+ results.append(snapshot)
+ snapshots = results
+ return snapshots
@wrap_check_policy
def check_attach(self, context, volume):