return query
-def exact_filter(query, model, filters, legal_keys):
- """Applies exact match filtering to a query.
-
- Returns the updated query. Modifies filters argument to remove
- filters consumed.
-
- :param query: query to apply filters to
- :param model: model object the query applies to, for IN-style
- filtering
- :param filters: dictionary of filters; values that are lists,
- tuples, sets, or frozensets cause an 'IN' test to
- be performed, while exact matching ('==' operator)
- is used for other values
- :param legal_keys: list of keys to apply exact filtering to
- """
-
- filter_dict = {}
-
- # Walk through all the keys
- for key in legal_keys:
- # Skip ones we're not filtering on
- if key not in filters:
- continue
-
- # OK, filtering on this key; what value do we search for?
- value = filters.pop(key)
-
- if isinstance(value, (list, tuple, set, frozenset)):
- # Looking for values in a list; apply to query directly
- column_attr = getattr(model, key)
- query = query.filter(column_attr.in_(value))
- else:
- # OK, simple exact match; save for later
- filter_dict[key] = value
-
- # Apply simple exact matches
- if filter_dict:
- query = query.filter_by(**filter_dict)
-
- return query
-
-
def _sync_volumes(context, project_id, session, volume_type_id=None,
volume_type_name=None):
(volumes, gigs) = _volume_data_get_for_project(
return quota_usage_ref
-@require_admin_context
-def quota_usage_create(context, project_id, resource, in_use, reserved,
- until_refresh):
- return _quota_usage_create(context, project_id, resource, in_use, reserved,
- until_refresh)
-
###################
return result
-@require_context
-@require_snapshot_exists
-def snapshot_metadata_get_item(context, snapshot_id, key):
- return _snapshot_metadata_get_item(context, snapshot_id, key)
-
-
@require_context
@require_snapshot_exists
def snapshot_metadata_update(context, snapshot_id, metadata, delete):
return result
-@require_context
-def volume_type_extra_specs_get_item(context, volume_type_id, key):
- return _volume_type_extra_specs_get_item(context, volume_type_id, key)
-
-
@require_context
def volume_type_extra_specs_update_or_create(context, volume_type_id,
specs):
extend qos specs association to other entities, such as volumes,
sometime in future.
"""
+ # Raise QoSSpecsNotFound if no specs found
_qos_specs_get_ref(context, qos_specs_id, None)
-
return volume_type_qos_associations_get(context, qos_specs_id)
obj1 = self._dict_from_object(obj1, ignored_keys)
obj2 = self._dict_from_object(obj2, ignored_keys)
- self.assertEqual(len(obj1), len(obj2))
+ self.assertEqual(
+ len(obj1), len(obj2),
+ "Keys mismatch: %s" % str(set(obj1.keys()) ^ set(obj2.keys())))
for key, value in obj1.iteritems():
self.assertEqual(value, obj2[key])
def _assertEqualListsOfObjects(self, objs1, objs2, ignored_keys=None):
- self.assertEqual(len(objs1), len(objs2))
- objs2 = dict([(o['id'], o) for o in objs2])
- for o1 in objs1:
- self._assertEqualObjects(o1, objs2[o1['id']], ignored_keys)
+ obj_to_dict = lambda o: self._dict_from_object(o, ignored_keys)
+ sort_key = lambda d: [d[k] for k in sorted(d)]
+ conv_and_sort = lambda obj: sorted(map(obj_to_dict, obj), key=sort_key)
+
+ self.assertEqual(conv_and_sort(objs1), conv_and_sort(objs2))
def _assertEqualListsOfPrimitivesAsSets(self, primitives1, primitives2):
self.assertEqual(len(primitives1), len(primitives2))
db.volume_get_all_by_instance_uuid(
self.ctxt, instance_uuids[i]))
+ def test_volume_get_all_by_instance_uuid_empty(self):
+ self.assertEqual([], db.volume_get_all_by_instance_uuid(self.ctxt,
+ 'empty'))
+
def test_volume_get_all_by_project(self):
volumes = []
for i in xrange(3):
def test_volume_update(self):
volume = db.volume_create(self.ctxt, {'host': 'h1'})
- db.volume_update(self.ctxt, volume['id'], {'host': 'h2'})
+ db.volume_update(self.ctxt, volume['id'],
+ {'host': 'h2', 'metadata': {'m1': 'v1'}})
volume = db.volume_get(self.ctxt, volume['id'])
self.assertEqual('h2', volume['host'])
self.assertEqual(should_be, db.volume_metadata_get(self.ctxt, 1))
+ def test_volume_metadata_delete(self):
+ metadata = {'a': 'b', 'c': 'd'}
+ db.volume_create(self.ctxt, {'id': 1, 'metadata': metadata})
+ db.volume_metadata_delete(self.ctxt, 1, 'c')
+ metadata.pop('c')
+ self.assertEquals(metadata, db.volume_metadata_get(self.ctxt, 1))
+
class DBAPISnapshotTestCase(BaseTest):
+
+ """Tests for cinder.db.api.snapshot_*."""
+
+ def test_snapshot_data_get_for_project(self):
+ actual = db.snapshot_data_get_for_project(self.ctxt, 'project1')
+ self.assertEqual(actual, (0, 0))
+ db.volume_create(self.ctxt, {'id': 1,
+ 'project_id': 'project1',
+ 'size': 42})
+ snapshot = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1,
+ 'project_id': 'project1',
+ 'volume_size': 42})
+ actual = db.snapshot_data_get_for_project(self.ctxt, 'project1')
+ self.assertEqual(actual, (1, 42))
+
+ def test_snapshot_get_all(self):
+ db.volume_create(self.ctxt, {'id': 1})
+ snapshot = db.snapshot_create(self.ctxt, {'id': 1, 'volume_id': 1})
+ self._assertEqualListsOfObjects([snapshot],
+ db.snapshot_get_all(self.ctxt),
+ ignored_keys=['metadata', 'volume'])
+
def test_snapshot_metadata_get(self):
metadata = {'a': 'b', 'c': 'd'}
db.volume_create(self.ctxt, {'id': 1})
self.assertEqual(should_be, db.snapshot_metadata_get(self.ctxt, 1))
+class DBAPIVolumeTypeTestCase(BaseTest):
+
+ """Tests for the db.api.volume_type_* methods."""
+
+ def setUp(self):
+ self.ctxt = context.get_admin_context()
+ super(DBAPIVolumeTypeTestCase, self).setUp()
+
+ def test_volume_type_create_exists(self):
+ vt = db.volume_type_create(self.ctxt, {'name': 'n1'})
+ self.assertRaises(exception.VolumeTypeExists,
+ db.volume_type_create,
+ self.ctxt,
+ {'name': 'n1'})
+ self.assertRaises(exception.VolumeTypeExists,
+ db.volume_type_create,
+ self.ctxt,
+ {'name': 'n2', 'id': vt['id']})
+
+
class DBAPIEncryptionTestCase(BaseTest):
"""Tests for the db.api.volume_type_encryption_* methods."""
self.ctxt,
'project1'))
+ def test_reservation_get_all_by_project(self):
+ reservations = _quota_reserve(self.ctxt, 'project1')
+ r1 = db.reservation_get(self.ctxt, reservations[0])
+ r2 = db.reservation_get(self.ctxt, reservations[1])
+ expected = {'project_id': 'project1',
+ r1['resource']: {r1['uuid']: r1['delta']},
+ r2['resource']: {r2['uuid']: r2['delta']}}
+ self.assertEqual(expected, db.reservation_get_all_by_project(
+ self.ctxt, 'project1'))
+
def test_reservation_expire(self):
self.values['expire'] = datetime.datetime.utcnow() + \
datetime.timedelta(days=1)
self.ctxt,
'project1'))
+ def test_reservation_destroy(self):
+ reservations = _quota_reserve(self.ctxt, 'project1')
+ r1 = db.reservation_get(self.ctxt, reservations[0])
+ db.reservation_destroy(self.ctxt, reservations[1])
+ expected = {'project_id': 'project1',
+ r1['resource']: {r1['uuid']: r1['delta']}}
+ self.assertEqual(expected, db.reservation_get_all_by_project(
+ self.ctxt, 'project1'))
+
+
+class DBAPIQuotaClassTestCase(BaseTest):
+
+ """Tests for db.api.quota_class_* methods."""
+
+ def setUp(self):
+ super(DBAPIQuotaClassTestCase, self).setUp()
+ self.sample_qc = db.quota_class_create(self.ctxt, 'test_qc',
+ 'test_resource', 42)
+
+ def test_quota_class_get(self):
+ qc = db.quota_class_get(self.ctxt, 'test_qc', 'test_resource')
+ self._assertEqualObjects(self.sample_qc, qc)
+
+ def test_quota_class_destroy(self):
+ db.quota_class_destroy(self.ctxt, 'test_qc', 'test_resource')
+ self.assertRaises(exception.QuotaClassNotFound,
+ db.quota_class_get, self.ctxt,
+ 'test_qc', 'test_resource')
+
+ def test_quota_class_get_not_found(self):
+ self.assertRaises(exception.QuotaClassNotFound,
+ db.quota_class_get, self.ctxt, 'nonexistent',
+ 'nonexistent')
+
+ def test_quota_class_get_all_by_name(self):
+ sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
+ sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
+ self.assertEqual({'class_name': 'test_qc', 'test_resource': 42},
+ db.quota_class_get_all_by_name(self.ctxt, 'test_qc'))
+ self.assertEqual({'class_name': 'test2', 'res1': 43, 'res2': 44},
+ db.quota_class_get_all_by_name(self.ctxt, 'test2'))
+
+ def test_quota_class_update(self):
+ db.quota_class_update(self.ctxt, 'test_qc', 'test_resource', 43)
+ updated = db.quota_class_get(self.ctxt, 'test_qc', 'test_resource')
+ self.assertEqual(43, updated['hard_limit'])
+
+ def test_quota_class_destroy_all_by_name(self):
+ sample1 = db.quota_class_create(self.ctxt, 'test2', 'res1', 43)
+ sample2 = db.quota_class_create(self.ctxt, 'test2', 'res2', 44)
+ db.quota_class_destroy_all_by_name(self.ctxt, 'test2')
+ self.assertEqual({'class_name': 'test2'},
+ db.quota_class_get_all_by_name(self.ctxt, 'test2'))
+
class DBAPIQuotaTestCase(BaseTest):
self.assertIn(reservation.resource, res_names)
res_names.remove(reservation.resource)
+ def test_quota_destroy(self):
+ db.quota_create(self.ctxt, 'project1', 'resource1', 41)
+ self.assertIsNone(db.quota_destroy(self.ctxt, 'project1',
+ 'resource1'))
+ self.assertRaises(exception.ProjectQuotaNotFound, db.quota_get,
+ self.ctxt, 'project1', 'resource1')
+
def test_quota_destroy_all_by_project(self):
reservations = _quota_reserve(self.ctxt, 'project1')
db.quota_destroy_all_by_project(self.ctxt, 'project1')
self.created[1]['project_id'])
self._assertEqualObjects(self.created[1], byproj[0])
+ def test_backup_update_nonexistent(self):
+ self.assertRaises(exception.BackupNotFound,
+ db.backup_update,
+ self.ctxt, 'nonexistent', {})
+
def test_backup_update(self):
updated_values = self._get_values(one=True)
update_id = self.created[1]['id']
{
'id': 1,
'host': 'devstack',
+ 'project_id': 'p1',
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 2, 1, 1, 1, 1),
{
'id': 2,
'host': 'devstack',
+ 'project_id': 'p1',
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 3, 10, 1, 1, 1),
{
'id': 3,
'host': 'devstack',
+ 'project_id': 'p1',
'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),
'deleted': True, 'status': 'deleted',
'deleted_at': datetime.datetime(1, 5, 1, 1, 1, 1),
{
'id': 4,
'host': 'devstack',
+ 'project_id': 'p1',
'created_at': datetime.datetime(1, 3, 10, 1, 1, 1),
},
{
'id': 5,
'host': 'devstack',
+ 'project_id': 'p1',
'created_at': datetime.datetime(1, 5, 1, 1, 1, 1),
}
]
volumes = db.volume_get_active_by_window(
self.context,
datetime.datetime(1, 3, 1, 1, 1, 1),
- datetime.datetime(1, 4, 1, 1, 1, 1))
+ datetime.datetime(1, 4, 1, 1, 1, 1),
+ project_id='p1')
self.assertEqual(len(volumes), 3)
self.assertEqual(volumes[0].id, u'2')
self.assertEqual(volumes[1].id, u'3')
snapshots = db.snapshot_get_active_by_window(
self.context,
datetime.datetime(1, 3, 1, 1, 1, 1),
- datetime.datetime(1, 4, 1, 1, 1, 1))
+ datetime.datetime(1, 4, 1, 1, 1, 1),
+ project_id='p1')
self.assertEqual(len(snapshots), 3)
self.assertEqual(snapshots[0].id, u'2')
self.assertEqual(snapshots[0].volume.id, u'1')
def setUp(self):
super(VolumeGlanceMetadataTestCase, self).setUp()
- self.context = context.get_admin_context()
-
- def tearDown(self):
- super(VolumeGlanceMetadataTestCase, self).tearDown()
+ self.ctxt = context.get_admin_context()
def test_vol_glance_metadata_bad_vol_id(self):
ctxt = context.get_admin_context()
for (key, value) in expected_meta.items():
self.assertEqual(meta[key], value)
- def test_vol_glance_metadata_copy_to_volume(self):
+ def test_vol_glance_metadata_copy_from_volume_to_volume(self):
ctxt = context.get_admin_context()
db.volume_create(ctxt, {'id': 1})
db.volume_create(ctxt, {'id': 100, 'source_volid': 1})
for meta in db.volume_glance_metadata_get(ctxt, 100):
for (key, value) in expected_meta.items():
self.assertEqual(meta[key], value)
+
+ def test_volume_glance_metadata_copy_to_volume(self):
+ vol1 = db.volume_create(self.ctxt, {})
+ vol2 = db.volume_create(self.ctxt, {})
+ db.volume_glance_metadata_create(self.ctxt, vol1['id'], 'm1', 'v1')
+ snapshot = db.snapshot_create(self.ctxt, {'volume_id': vol1['id']})
+ db.volume_glance_metadata_copy_to_snapshot(self.ctxt, snapshot['id'],
+ vol1['id'])
+ db.volume_glance_metadata_copy_to_volume(self.ctxt, vol2['id'],
+ snapshot['id'])
+ metadata = db.volume_glance_metadata_get(self.ctxt, vol2['id'])
+ metadata = dict([(m['key'], m['value']) for m in metadata])
+ self.assertEqual(metadata, {'m1': 'v1'})
+
+ def test_volume_snapshot_glance_metadata_get_nonexistent(self):
+ vol = db.volume_create(self.ctxt, {})
+ snapshot = db.snapshot_create(self.ctxt, {'volume_id': vol['id']})
+ self.assertRaises(exception.GlanceMetadataNotFound,
+ db.volume_snapshot_glance_metadata_get,
+ self.ctxt, snapshot['id'])