paired with corresponding item in sort_keys
:returns: list of matching snapshots
"""
+ if filters and not is_valid_model_filters(models.Snapshot, filters):
+ return []
+
session = get_session()
with session.begin():
query = _generate_paginate_query(context, session, marker, limit,
@require_context
def snapshot_get_by_host(context, host, filters=None):
+ if filters and not is_valid_model_filters(models.Snapshot, filters):
+ return []
+
query = model_query(context, models.Snapshot, read_deleted='no',
project_only=True)
if filters:
paired with corresponding item in sort_keys
:returns: list of matching snapshots
"""
+ if filters and not is_valid_model_filters(models.Snapshot, filters):
+ return []
+
authorize_project_context(context, project_id)
# Add project_id to filters
def _backup_get_all(context, filters=None):
+ if filters and not is_valid_model_filters(models.Backup, filters):
+ return []
+
session = get_session()
with session.begin():
# Generate the query
self.ctxt,
'host2', {'status': 'error'}),
ignored_keys='volume')
+ self._assertEqualListsOfObjects([],
+ db.snapshot_get_by_host(
+ self.ctxt,
+ 'host2', {'fake_key': 'fake'}),
+ ignored_keys='volume')
+
+ def test_snapshot_get_all_by_project(self):
+ db.volume_create(self.ctxt, {'id': 1})
+ db.volume_create(self.ctxt, {'id': 2})
+ snapshot1 = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1,
+ 'project_id': 'project1'})
+ snapshot2 = db.snapshot_create(self.ctxt, {'id': 2, 'volume_id': 2,
+ 'status': 'error',
+ 'project_id': 'project2'})
+
+ self._assertEqualListsOfObjects([snapshot1],
+ db.snapshot_get_all_by_project(
+ self.ctxt,
+ 'project1'),
+ ignored_keys='volume')
+ self._assertEqualListsOfObjects([snapshot2],
+ db.snapshot_get_all_by_project(
+ self.ctxt,
+ 'project2'),
+ ignored_keys='volume')
+ self._assertEqualListsOfObjects([],
+ db.snapshot_get_all_by_project(
+ self.ctxt,
+ 'project2',
+ {'status': 'available'}),
+ ignored_keys='volume')
+ self._assertEqualListsOfObjects([snapshot2],
+ db.snapshot_get_all_by_project(
+ self.ctxt,
+ 'project2',
+ {'status': 'error'}),
+ ignored_keys='volume')
+ self._assertEqualListsOfObjects([],
+ db.snapshot_get_all_by_project(
+ self.ctxt,
+ 'project2',
+ {'fake_key': 'fake'}),
+ ignored_keys='volume')
def test_snapshot_metadata_get(self):
metadata = {'a': 'b', 'c': 'd'}
filtered_backups = db.backup_get_all(self.ctxt, filters=filters)
self._assertEqualListsOfObjects([self.created[1]], filtered_backups)
+ filters = {'fake_key': 'fake'}
+ filtered_backups = db.backup_get_all(self.ctxt, filters=filters)
+ self._assertEqualListsOfObjects([], filtered_backups)
+
def test_backup_get_all_by_host(self):
byhost = db.backup_get_all_by_host(self.ctxt,
self.created[1]['host'])
self.created[1]['project_id'])
self._assertEqualObjects(self.created[1], byproj[0])
+ byproj = db.backup_get_all_by_project(self.ctxt,
+ self.created[1]['project_id'],
+ {'fake_key': 'fake'})
+ self._assertEqualListsOfObjects([], byproj)
+
+ def test_backup_get_all_by_volume(self):
+ byvol = db.backup_get_all_by_volume(self.ctxt,
+ self.created[1]['volume_id'])
+ self._assertEqualObjects(self.created[1], byvol[0])
+
+ byvol = db.backup_get_all_by_volume(self.ctxt,
+ self.created[1]['volume_id'],
+ {'fake_key': 'fake'})
+ self._assertEqualListsOfObjects([], byvol)
+
def test_backup_update_nonexistent(self):
self.assertRaises(exception.BackupNotFound,
db.backup_update,