@require_context
def backup_get(context, backup_id, session=None):
result = model_query(context, models.Backup,
- read_deleted="yes").filter_by(id=backup_id).first()
+ session=session, project_only=True).\
+ filter_by(id=backup_id).\
+ first()
+
if not result:
raise exception.BackupNotFound(backup_id=backup_id)
+
return result
@require_admin_context
def backup_get_all(context):
- return model_query(context, models.Backup, read_deleted="yes").all()
+ return model_query(context, models.Backup).all()
@require_admin_context
def backup_get_all_by_host(context, host):
- return model_query(context, models.Backup,
- read_deleted="yes").filter_by(host=host).all()
+ return model_query(context, models.Backup).filter_by(host=host).all()
@require_context
def backup_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
- return model_query(context, models.Backup, read_deleted="yes").\
+ return model_query(context, models.Backup).\
filter_by(project_id=project_id).all()
def backup_destroy(context, backup_id):
session = get_session()
with session.begin():
- model_query(context, models.Backup,
- read_deleted="yes").filter_by(id=backup_id).delete()
+ session.query(models.Backup).\
+ filter_by(id=backup_id).\
+ update({'status': 'deleted',
+ 'deleted': True,
+ 'deleted_at': timeutils.utcnow(),
+ 'updated_at': literal_column('updated_at')})
from cinder import flags
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
+from cinder.openstack.common import timeutils
from cinder import test
FLAGS = flags.FLAGS
self.ctxt,
backup_id)
+ ctxt_read_deleted = context.get_admin_context('yes')
+ backup = db.backup_get(ctxt_read_deleted, backup_id)
+ self.assertEqual(backup.deleted, True)
+ self.assertTrue(timeutils.utcnow() > backup.deleted_at)
+ self.assertEqual(backup.status, 'deleted')
+
def test_list_backup(self):
backups = db.backup_get_all_by_project(self.ctxt, 'project1')
self.assertEqual(len(backups), 0)
backups = db.backup_get_all_by_project(self.ctxt, 'project1')
self.assertEqual(len(backups), 1)
self.assertEqual(backups[0].id, b2)
+
+ def test_backup_get_all_by_project_with_deleted(self):
+ """Test deleted backups don't show up in backup_get_all_by_project.
+ Unless context.read_deleted is 'yes'"""
+ backups = db.backup_get_all_by_project(self.ctxt, 'fake')
+ self.assertEqual(len(backups), 0)
+
+ backup_id_keep = self._create_backup_db_entry()
+ backup_id = self._create_backup_db_entry()
+ db.backup_destroy(self.ctxt, backup_id)
+
+ backups = db.backup_get_all_by_project(self.ctxt, 'fake')
+ self.assertEqual(len(backups), 1)
+ self.assertEqual(backups[0].id, backup_id_keep)
+
+ ctxt_read_deleted = context.get_admin_context('yes')
+ backups = db.backup_get_all_by_project(ctxt_read_deleted, 'fake')
+ self.assertEqual(len(backups), 2)
+
+ def test_backup_get_all_by_host_with_deleted(self):
+ """Test deleted backups don't show up in backup_get_all_by_project.
+ Unless context.read_deleted is 'yes'"""
+ backups = db.backup_get_all_by_host(self.ctxt, 'testhost')
+ self.assertEqual(len(backups), 0)
+
+ backup_id_keep = self._create_backup_db_entry()
+ backup_id = self._create_backup_db_entry()
+ db.backup_destroy(self.ctxt, backup_id)
+
+ backups = db.backup_get_all_by_host(self.ctxt, 'testhost')
+ self.assertEqual(len(backups), 1)
+ self.assertEqual(backups[0].id, backup_id_keep)
+
+ ctxt_read_deleted = context.get_admin_context('yes')
+ backups = db.backup_get_all_by_host(ctxt_read_deleted, 'testhost')
+ self.assertEqual(len(backups), 2)