from cinder import exception
from cinder.i18n import _, _LI, _LW
from cinder import objects
+from cinder.objects import fields
import cinder.policy
from cinder import quota
from cinder import utils
:raises: ServiceNotFound
"""
check_policy(context, 'delete')
- if not force and backup.status not in ['available', 'error']:
+ if not force and backup.status not in [fields.BackupStatus.AVAILABLE,
+ fields.BackupStatus.ERROR]:
msg = _('Backup status must be available or error')
raise exception.InvalidBackup(reason=msg)
if force and not self._check_support_to_force_delete(context,
msg = _('Incremental backups exist for this backup.')
raise exception.InvalidBackup(reason=msg)
- backup.status = 'deleting'
+ backup.status = fields.BackupStatus.DELETING
backup.save()
self.backup_rpcapi.delete_backup(context, backup)
parent_id = None
if latest_backup:
parent_id = latest_backup.id
- if latest_backup['status'] != "available":
+ if latest_backup['status'] != fields.BackupStatus.AVAILABLE:
msg = _('The parent backup must be available for '
'incremental backup.')
raise exception.InvalidBackup(reason=msg)
'display_name': name,
'display_description': description,
'volume_id': volume_id,
- 'status': 'creating',
+ 'status': fields.BackupStatus.CREATING,
'container': container,
'parent_id': parent_id,
'size': volume['size'],
"""Make the RPC call to restore a volume backup."""
check_policy(context, 'restore')
backup = self.get(context, backup_id)
- if backup['status'] != 'available':
+ if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = _('Backup status must be available')
raise exception.InvalidBackup(reason=msg)
# Setting the status here rather than setting at start and unrolling
# for each error condition, it should be a very small window
- backup.status = 'restoring'
+ backup.status = fields.BackupStatus.RESTORING
backup.save()
volume_host = volume_utils.extract_host(volume['host'], 'host')
self.db.volume_update(context, volume_id, {'status':
"""
check_policy(context, 'backup-export')
backup = self.get(context, backup_id)
- if backup['status'] != 'available':
+ if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = (_('Backup status must be available and not %s.') %
backup['status'])
raise exception.InvalidBackup(reason=msg)
'user_id': context.user_id,
'project_id': context.project_id,
'volume_id': '0000-0000-0000-0000',
- 'status': 'creating',
+ 'status': fields.BackupStatus.CREATING,
}
try:
# If record exists and it's not deleted we cannot proceed with the
# import
- if backup.status != 'deleted':
+ if backup.status != fields.BackupStatus.DELETED:
msg = _('Backup already exists in database.')
raise exception.InvalidBackup(reason=msg)
from cinder import exception
from cinder.i18n import _, _LE, _LI, _LW
from cinder import objects
+from cinder.objects import fields
from cinder.volume import utils as volume_utils
LOG = logging.getLogger(__name__)
# has been changed to delete or has been deleted, we cancel the
# backup process to do forcing delete.
backup = objects.Backup.get_by_id(self.context, backup.id)
- if 'deleting' == backup.status or 'deleted' == backup.status:
+ if backup.status in (fields.BackupStatus.DELETING,
+ fields.BackupStatus.DELETED):
is_backup_canceled = True
# To avoid the chunk left when deletion complete, need to
# clean up the object of chunk again.
from cinder.i18n import _, _LE, _LI, _LW
from cinder import manager
from cinder import objects
+from cinder.objects import fields
from cinder import quota
from cinder import rpc
from cinder import utils
driver.set_initialized()
def _update_backup_error(self, backup, context, err):
- backup.status = 'error'
+ backup.status = fields.BackupStatus.ERROR
backup.fail_reason = err
backup.save()
{'status': 'error_restoring'})
def _cleanup_one_backup(self, ctxt, backup):
- if backup['status'] == 'creating':
+ if backup['status'] == fields.BackupStatus.CREATING:
LOG.info(_LI('Resetting backup %s to error (was creating).'),
backup['id'])
err = 'incomplete backup reset on manager restart'
self._update_backup_error(backup, ctxt, err)
- if backup['status'] == 'restoring':
+ if backup['status'] == fields.BackupStatus.RESTORING:
LOG.info(_LI('Resetting backup %s to '
'available (was restoring).'),
backup['id'])
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.save()
- if backup['status'] == 'deleting':
+ if backup['status'] == fields.BackupStatus.DELETING:
LOG.info(_LI('Resuming delete on backup: %s.'), backup['id'])
if CONF.backup_service_inithost_offload:
# Offload all the pending backup delete operations to the
"backup %s.", backup.id)
return
- if backup.temp_volume_id and backup.status == 'error':
+ if (backup.temp_volume_id
+ and backup.status == fields.BackupStatus.ERROR):
try:
temp_volume = self.db.volume_get(ctxt,
backup.temp_volume_id)
backup.temp_volume_id = None
backup.save()
- if backup.temp_snapshot_id and backup.status == 'error':
+ if (backup.temp_snapshot_id
+ and backup.status == fields.BackupStatus.ERROR):
try:
temp_snapshot = objects.Snapshot.get_by_id(
ctxt, backup.temp_snapshot_id)
self._update_backup_error(backup, context, err)
raise exception.InvalidVolume(reason=err)
- expected_status = 'creating'
+ expected_status = fields.BackupStatus.CREATING
actual_status = backup.status
if actual_status != expected_status:
err = _('Create backup aborted, expected backup status '
self.db.volume_update(context, volume_id,
{'status': previous_status,
'previous_status': 'backing-up'})
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.size = volume['size']
backup.availability_zone = self.az
backup.save()
'%(expected_status)s but got %(actual_status)s.') %
{'expected_status': expected_status,
'actual_status': actual_status})
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.save()
raise exception.InvalidVolume(reason=err)
- expected_status = 'restoring'
+ expected_status = fields.BackupStatus.RESTORING
actual_status = backup['status']
if actual_status != expected_status:
err = (_('Restore backup aborted: expected backup status '
'configured_service': configured_service,
'backup_service': backup_service,
}
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.save()
self.db.volume_update(context, volume_id, {'status': 'error'})
raise exception.InvalidBackup(reason=err)
with excutils.save_and_reraise_exception():
self.db.volume_update(context, volume_id,
{'status': 'error_restoring'})
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.save()
self.db.volume_update(context, volume_id, {'status': 'available'})
- backup.status = 'available'
+ backup.status = fields.BackupStatus.AVAILABLE
backup.save()
LOG.info(_LI('Restore backup finished, backup %(backup_id)s restored'
' to volume %(volume_id)s.'),
backup.host = self.host
backup.save()
- expected_status = 'deleting'
+ expected_status = fields.BackupStatus.DELETING
actual_status = backup.status
if actual_status != expected_status:
err = _('Delete_backup aborted, expected backup status '
"""
LOG.info(_LI('Export record started, backup: %s.'), backup.id)
- expected_status = 'available'
+ expected_status = fields.BackupStatus.AVAILABLE
actual_status = backup.status
if actual_status != expected_status:
err = (_('Export backup aborted, expected backup status '
raise exception.InvalidBackup(reason=msg)
# Overwrite some fields
- backup_options['status'] = 'available'
+ backup_options['status'] = fields.BackupStatus.AVAILABLE
backup_options['service'] = self.driver_name
backup_options['availability_zone'] = self.az
backup_options['host'] = self.host
# Verify backup
try:
# check whether the backup is ok or not
- if status == 'available' and backup['status'] != 'restoring':
+ if (status == fields.BackupStatus.AVAILABLE
+ and backup['status'] != fields.BackupStatus.RESTORING):
# check whether we could verify the backup is ok or not
if isinstance(backup_service,
driver.BackupDriverWithVerify):
reason=msg)
# reset status to error or from restoring to available
else:
- if (status == 'error' or
- (status == 'available' and
- backup.status == 'restoring')):
+ if (status == fields.BackupStatus.ERROR or
+ (status == fields.BackupStatus.AVAILABLE and
+ backup.status == fields.BackupStatus.RESTORING)):
backup.status = status
backup.save()
except exception.InvalidBackup:
from cinder.db.sqlalchemy import models
from cinder import exception
from cinder.i18n import _, _LW, _LE, _LI
+from cinder.objects import fields
CONF = cfg.CONF
def backup_destroy(context, backup_id):
model_query(context, models.Backup).\
filter_by(id=backup_id).\
- update({'status': 'deleted',
+ update({'status': fields.BackupStatus.DELETED,
'deleted': True,
'deleted_at': timeutils.utcnow(),
'updated_at': literal_column('updated_at')})
from cinder.i18n import _
from cinder import objects
from cinder.objects import base
+from cinder.objects import fields as c_fields
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
# Version 1.1: Add new field num_dependent_backups and extra fields
# is_incremental and has_dependent_backups.
# Version 1.2: Add new field snapshot_id and data_timestamp.
- VERSION = '1.2'
+ # Version 1.3: Changed 'status' field to use BackupStatusField
+ VERSION = '1.3'
fields = {
'id': fields.UUIDField(),
'availability_zone': fields.StringField(nullable=True),
'container': fields.StringField(nullable=True),
'parent_id': fields.StringField(nullable=True),
- 'status': fields.StringField(nullable=True),
+ 'status': c_fields.BackupStatusField(nullable=True),
'fail_reason': fields.StringField(nullable=True),
'size': fields.IntegerField(),
--- /dev/null
+# Copyright 2015 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Custom fields for Cinder objects."""
+
+from oslo_versionedobjects import fields
+
+
+BaseEnumField = fields.BaseEnumField
+Enum = fields.Enum
+Field = fields.Field
+FieldType = fields.FieldType
+
+
+class BackupStatus(Enum):
+ ERROR = 'error'
+ ERROR_DELETING = 'error_deleting'
+ CREATING = 'creating'
+ AVAILABLE = 'available'
+ DELETING = 'deleting'
+ DELETED = 'deleted'
+ RESTORING = 'restoring'
+
+ ALL = (ERROR, ERROR_DELETING, CREATING, AVAILABLE, DELETING, DELETED,
+ RESTORING)
+
+ def __init__(self):
+ super(BackupStatus, self).__init__(valid_values=BackupStatus.ALL)
+
+
+class BackupStatusField(BaseEnumField):
+ AUTO_TYPE = BackupStatus()
from cinder import db
from cinder import exception
from cinder import objects
+from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api.contrib import test_backups
from cinder.tests.unit.api import fakes
volume = db.volume_create(self.ctx, {'status': 'available',
'user_id': 'user',
'project_id': 'project'})
- backup = db.backup_create(self.ctx, {'status': 'available',
- 'size': 1,
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
+ backup = db.backup_create(self.ctx,
+ {'status': fields.BackupStatus.AVAILABLE,
+ 'size': 1,
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
resp = self._issue_backup_reset(self.ctx,
backup,
- {'status': 'error'})
+ {'status': fields.BackupStatus.ERROR})
self.assertEqual(202, resp.status_int)
'volume_id': "fakeid"})
resp = self._issue_backup_reset(ctx,
backup,
- {'status': 'error'})
+ {'status': fields.BackupStatus.ERROR})
# request is not authorized
self.assertEqual(403, resp.status_int)
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
- backup = db.backup_create(self.ctx, {'status': 'available',
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
+ backup = db.backup_create(self.ctx,
+ {'status': fields.BackupStatus.AVAILABLE,
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
resp = self._issue_backup_reset(self.ctx,
backup,
- {'status': 'error'})
+ {'status': fields.BackupStatus.ERROR})
self.assertEqual(202, resp.status_int)
volume = db.volume_create(self.ctx,
{'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
- backup = db.backup_create(self.ctx, {'status': 'available',
- 'volume_id': volume['id'],
- 'user_id': 'user',
- 'project_id': 'project'})
+ backup = db.backup_create(self.ctx,
+ {'status': fields.BackupStatus.AVAILABLE,
+ 'volume_id': volume['id'],
+ 'user_id': 'user',
+ 'project_id': 'project'})
backup['id'] = 'fake_id'
resp = self._issue_backup_reset(self.ctx,
backup,
- {'status': 'error'})
+ {'status': fields.BackupStatus.ERROR})
# Should raise 404 if backup doesn't exist.
self.assertEqual(404, resp.status_int)
from cinder import exception
from cinder.i18n import _
from cinder import objects
+from cinder.objects import fields
from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import utils
display_name='test_backup',
display_description='this is a test backup',
container='volumebackups',
- status='creating',
+ status=fields.BackupStatus.CREATING,
incremental=False,
parent_id=None,
size=0, object_count=0, host='testhost',
self.assertEqual(backup_id, res_dict['backup']['id'])
self.assertEqual(0, res_dict['backup']['object_count'])
self.assertEqual(0, res_dict['backup']['size'])
- self.assertEqual('creating', res_dict['backup']['status'])
+ self.assertEqual(fields.BackupStatus.CREATING,
+ res_dict['backup']['status'])
self.assertEqual(volume_id, res_dict['backup']['volume_id'])
self.assertFalse(res_dict['backup']['is_incremental'])
self.assertFalse(res_dict['backup']['has_dependent_backups'])
self.assertEqual(backup_id3, res_dict['backups'][0]['id'])
self.assertEqual(0, res_dict['backups'][0]['object_count'])
self.assertEqual(0, res_dict['backups'][0]['size'])
- self.assertEqual('creating', res_dict['backups'][0]['status'])
+ self.assertEqual(fields.BackupStatus.CREATING,
+ res_dict['backups'][0]['status'])
self.assertEqual('1', res_dict['backups'][0]['volume_id'])
self.assertIn('updated_at', res_dict['backups'][0])
self.assertEqual(backup_id2, res_dict['backups'][1]['id'])
self.assertEqual(0, res_dict['backups'][1]['object_count'])
self.assertEqual(0, res_dict['backups'][1]['size'])
- self.assertEqual('creating', res_dict['backups'][1]['status'])
+ self.assertEqual(fields.BackupStatus.CREATING,
+ res_dict['backups'][1]['status'])
self.assertEqual('1', res_dict['backups'][1]['volume_id'])
self.assertIn('updated_at', res_dict['backups'][1])
self.assertEqual(backup_id1, res_dict['backups'][2]['id'])
self.assertEqual(0, res_dict['backups'][2]['object_count'])
self.assertEqual(0, res_dict['backups'][2]['size'])
- self.assertEqual('creating', res_dict['backups'][2]['status'])
+ self.assertEqual(fields.BackupStatus.CREATING,
+ res_dict['backups'][2]['status'])
self.assertEqual('1', res_dict['backups'][2]['volume_id'])
self.assertIn('updated_at', res_dict['backups'][2])
def test_list_backups_detail_using_filters(self):
backup_id1 = self._create_backup(display_name='test2')
- backup_id2 = self._create_backup(status='available')
+ backup_id2 = self._create_backup(status=fields.BackupStatus.AVAILABLE)
backup_id3 = self._create_backup(volume_id=4321)
req = webob.Request.blank('/v2/fake/backups/detail?name=test2')
self.assertEqual(
0, int(backup_detail.item(0).getAttribute('size')))
self.assertEqual(
- 'creating', backup_detail.item(0).getAttribute('status'))
+ fields.BackupStatus.CREATING,
+ backup_detail.item(0).getAttribute('status'))
self.assertEqual(
1, int(backup_detail.item(0).getAttribute('volume_id')))
self.assertEqual(
0, int(backup_detail.item(1).getAttribute('size')))
self.assertEqual(
- 'creating', backup_detail.item(1).getAttribute('status'))
+ fields.BackupStatus.CREATING,
+ backup_detail.item(1).getAttribute('status'))
self.assertEqual(
1, int(backup_detail.item(1).getAttribute('volume_id')))
0, int(backup_detail.item(2).getAttribute('object_count')))
self.assertEqual(
0, int(backup_detail.item(2).getAttribute('size')))
- self.assertEqual(
- 'creating', backup_detail.item(2).getAttribute('status'))
+ self.assertEqual(fields.BackupStatus.CREATING,
+ backup_detail.item(2).getAttribute('status'))
self.assertEqual(
1, int(backup_detail.item(2).getAttribute('volume_id')))
volume_id = utils.create_volume(self.context, size=5,
status='in-use')['id']
- backup_id = self._create_backup(volume_id, status="available")
+ backup_id = self._create_backup(volume_id,
+ status=fields.BackupStatus.AVAILABLE)
body = {"backup": {"display_name": "nightly001",
"display_description":
"Nightly Backup 03-Sep-2012",
volume_id,
status='available')
snapshot_id = snapshot.id
- backup_id = self._create_backup(volume_id, status="available")
+ backup_id = self._create_backup(volume_id,
+ status=fields.BackupStatus.AVAILABLE)
body = {"backup": {"display_name": "nightly001",
"display_description":
"Nightly Backup 03-Sep-2012",
_mock_service_get_all_by_topic.return_value = [
{'availability_zone': "az1", 'host': 'testhost',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
req = webob.Request.blank('/v2/fake/backups/%s' %
backup_id)
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(202, res.status_int)
- self.assertEqual('deleting',
+ self.assertEqual(fields.BackupStatus.DELETING,
self._get_backup_attrib(backup_id, 'status'))
db.backup_destroy(context.get_admin_context(), backup_id)
_mock_service_get_all_by_topic.return_value = [
{'availability_zone': "az1", 'host': 'testhost',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- backup_id = self._create_backup(status='available')
- delta_id = self._create_backup(status='available',
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
+ delta_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
incremental=True)
req = webob.Request.blank('/v2/fake/backups/%s' %
delta_id)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(202, res.status_int)
- self.assertEqual('deleting',
+ self.assertEqual(fields.BackupStatus.DELETING,
self._get_backup_attrib(delta_id, 'status'))
db.backup_destroy(context.get_admin_context(), delta_id)
_mock_service_get_all_by_topic.return_value = [
{'availability_zone': "az1", 'host': 'testhost',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
- backup_id = self._create_backup(status='error')
+ backup_id = self._create_backup(status=fields.BackupStatus.ERROR)
req = webob.Request.blank('/v2/fake/backups/%s' %
backup_id)
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(202, res.status_int)
- self.assertEqual('deleting',
+ self.assertEqual(fields.BackupStatus.DELETING,
self._get_backup_attrib(backup_id, 'status'))
db.backup_destroy(context.get_admin_context(), backup_id)
{'availability_zone': "az1", 'host': 'testhost',
'disabled': 0, 'updated_at': timeutils.utcnow()}]
volume_id = utils.create_volume(self.context, size=5)['id']
- backup_id = self._create_backup(volume_id, status="available")
- delta_backup_id = self._create_backup(status='available',
- incremental=True,
- parent_id=backup_id)
+ backup_id = self._create_backup(volume_id,
+ status=fields.BackupStatus.AVAILABLE)
+ delta_backup_id = self._create_backup(
+ status=fields.BackupStatus.AVAILABLE, incremental=True,
+ parent_id=backup_id)
req = webob.Request.blank('/v2/fake/backups/%s' %
backup_id)
db.backup_destroy(context.get_admin_context(), backup_id)
def test_restore_backup_volume_id_specified_json(self):
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
# need to create the volume referenced below first
volume_name = 'test1'
volume_id = utils.create_volume(self.context,
def test_restore_backup_volume_id_specified_xml(self):
volume_name = 'test1'
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
volume_id = utils.create_volume(self.context,
size=2,
display_name=volume_name)['id']
def test_restore_backup_with_no_body(self):
# omit body from the request
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
backup_id)
def test_restore_backup_with_body_KeyError(self):
# omit restore from body
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
req = webob.Request.blank('/v2/fake/backups/%s/restore' % backup_id)
body = {"": {}}
_mock_volume_api_create.side_effect = fake_volume_api_create
- backup_id = self._create_backup(size=5, status='available')
+ backup_id = self._create_backup(size=5,
+ status=fields.BackupStatus.AVAILABLE)
body = {"restore": {}}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
_mock_volume_api_create.side_effect = fake_volume_api_create
- backup_id = self._create_backup(size=5, status='available')
+ backup_id = self._create_backup(size=5,
+ status=fields.BackupStatus.AVAILABLE)
body = {"restore": {'name': 'vol-01'}}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
def test_restore_backup_name_volume_id_specified(self):
- backup_id = self._create_backup(size=5, status='available')
+ backup_id = self._create_backup(size=5,
+ status=fields.BackupStatus.AVAILABLE)
orig_vol_name = "vol-00"
volume_id = utils.create_volume(self.context, size=5,
display_name=orig_vol_name)['id']
_mock_volume_api_restore.side_effect = \
exception.InvalidInput(reason=msg)
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
# need to create the volume referenced below first
volume_id = utils.create_volume(self.context, size=0)['id']
body = {"restore": {"volume_id": volume_id, }}
res_dict['badRequest']['message'])
def test_restore_backup_with_InvalidVolume(self):
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
# need to create the volume referenced below first
volume_id = utils.create_volume(self.context, size=5,
status='attaching')['id']
db.backup_destroy(context.get_admin_context(), backup_id)
def test_restore_backup_with_InvalidBackup(self):
- backup_id = self._create_backup(status='restoring')
+ backup_id = self._create_backup(status=fields.BackupStatus.RESTORING)
# need to create the volume referenced below first
volume_id = utils.create_volume(self.context, size=5)['id']
db.volume_destroy(context.get_admin_context(), volume_id)
def test_restore_backup_with_VolumeNotFound(self):
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
body = {"restore": {"volume_id": "9999", }}
req = webob.Request.blank('/v2/fake/backups/%s/restore' %
consumed='2',
quota='3')
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
# need to create the volume referenced below first
volume_id = utils.create_volume(self.context, size=5)['id']
_mock_backup_restore.side_effect = \
exception.VolumeLimitExceeded(allowed=1)
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
# need to create the volume referenced below first
volume_id = utils.create_volume(self.context, size=5)['id']
def test_restore_backup_to_undersized_volume(self):
backup_size = 10
- backup_id = self._create_backup(status='available', size=backup_size)
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=backup_size)
# need to create the volume referenced below first
volume_size = 5
volume_id = utils.create_volume(self.context, size=volume_size)['id']
db.backup_destroy(context.get_admin_context(), backup_id)
def test_restore_backup_to_oversized_volume(self):
- backup_id = self._create_backup(status='available', size=10)
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=10)
# need to create the volume referenced below first
volume_name = 'test1'
volume_id = utils.create_volume(self.context,
@mock.patch('cinder.backup.rpcapi.BackupAPI.restore_backup')
def test_restore_backup_with_different_host(self, mock_restore_backup):
volume_name = 'test1'
- backup_id = self._create_backup(status='available', size=10,
- host='HostA@BackendB#PoolA')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=10, host='HostA@BackendB#PoolA')
volume_id = utils.create_volume(self.context, size=10,
host='HostB@BackendB#PoolB',
display_name=volume_name)['id']
db.backup_destroy(context.get_admin_context(), backup_id)
def test_export_record_as_non_admin(self):
- backup_id = self._create_backup(status='available', size=10)
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=10)
req = webob.Request.blank('/v2/fake/backups/%s/export_record' %
backup_id)
req.method = 'GET'
@mock.patch('cinder.backup.rpcapi.BackupAPI.export_record')
def test_export_backup_record_id_specified_json(self,
_mock_export_record_rpc):
- backup_id = self._create_backup(status='available', size=10)
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=10)
ctx = context.RequestContext('admin', 'fake', is_admin=True)
backup_service = 'fake'
backup_url = 'fake'
@mock.patch('cinder.backup.rpcapi.BackupAPI.export_record')
def test_export_record_backup_id_specified_xml(self,
_mock_export_record_rpc):
- backup_id = self._create_backup(status='available', size=10)
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE,
+ size=10)
ctx = context.RequestContext('admin', 'fake', is_admin=True)
backup_service = 'fake'
backup_url = 'fake'
def test_export_record_for_unavailable_backup(self):
- backup_id = self._create_backup(status='restoring')
+ backup_id = self._create_backup(status=fields.BackupStatus.RESTORING)
ctx = context.RequestContext('admin', 'fake', is_admin=True)
req = webob.Request.blank('/v2/fake/backups/%s/export_record' %
backup_id)
msg = 'fake unavailable service'
_mock_export_record_rpc.side_effect = \
exception.InvalidBackup(reason=msg)
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
ctx = context.RequestContext('admin', 'fake', is_admin=True)
req = webob.Request.blank('/v2/fake/backups/%s/export_record' %
backup_id)
backup_service = 'fake'
ctx = context.RequestContext('admin', project_id, is_admin=True)
backup = objects.Backup(ctx, id='id', user_id='user_id',
- project_id=project_id, status='available')
+ project_id=project_id,
+ status=fields.BackupStatus.AVAILABLE)
backup_url = backup.encode_record()
_mock_import_record_rpc.return_value = None
_mock_list_services.return_value = [backup_service]
self.assertEqual(ctx.project_id, db_backup.project_id)
self.assertEqual(ctx.user_id, db_backup.user_id)
self.assertEqual('0000-0000-0000-0000', db_backup.volume_id)
- self.assertEqual('creating', db_backup.status)
+ self.assertEqual(fields.BackupStatus.CREATING, db_backup.status)
@mock.patch('cinder.backup.api.API._list_backup_services')
@mock.patch('cinder.backup.rpcapi.BackupAPI.import_record')
# Original backup belonged to a different user_id and project_id
backup = objects.Backup(ctx, id='id', user_id='original_user_id',
project_id='original_project_id',
- status='available')
+ status=fields.BackupStatus.AVAILABLE)
backup_url = backup.encode_record()
# Deleted DB entry has project_id and user_id set to fake
- backup_id = self._create_backup('id', status='deleted')
+ backup_id = self._create_backup('id',
+ status=fields.BackupStatus.DELETED)
backup_service = 'fake'
_mock_import_record_rpc.return_value = None
_mock_list_services.return_value = [backup_service]
self.assertEqual(ctx.project_id, db_backup.project_id)
self.assertEqual(ctx.user_id, db_backup.user_id)
self.assertEqual('0000-0000-0000-0000', db_backup.volume_id)
- self.assertEqual('creating', db_backup.status)
+ self.assertEqual(fields.BackupStatus.CREATING, db_backup.status)
db.backup_destroy(context.get_admin_context(), backup_id)
backup_service = 'fake'
ctx = context.RequestContext('admin', project_id, is_admin=True)
backup = objects.Backup(ctx, id='id', user_id='user_id',
- project_id=project_id, status='available')
+ project_id=project_id,
+ status=fields.BackupStatus.AVAILABLE)
backup_url = backup.encode_record()
_mock_import_record_rpc.return_value = None
_mock_list_services.return_value = [backup_service]
self.assertEqual(ctx.project_id, db_backup.project_id)
self.assertEqual(ctx.user_id, db_backup.user_id)
self.assertEqual('0000-0000-0000-0000', db_backup.volume_id)
- self.assertEqual('creating', db_backup.status)
+ self.assertEqual(fields.BackupStatus.CREATING, db_backup.status)
# Verify the response
dom = minidom.parseString(res.body)
_mock_import_record,
_mock_list_services):
ctx = context.RequestContext('admin', 'fake', is_admin=True)
- backup_id = self._create_backup('1', status='deleted')
+ backup_id = self._create_backup('1',
+ status=fields.BackupStatus.DELETED)
backup_service = 'fake'
backup = objects.Backup.get_by_id(ctx, backup_id)
backup_url = backup.encode_record()
return_value=False)
def test_force_delete_with_not_supported_operation(self,
mock_check_support):
- backup_id = self._create_backup(status='available')
+ backup_id = self._create_backup(status=fields.BackupStatus.AVAILABLE)
backup = self.backup_api.get(self.context, backup_id)
self.assertRaises(exception.NotSupportedOperation,
self.backup_api.delete, self.context, backup, True)
@ddt.data(False, True)
def test_show_incremental_backup(self, backup_from_snapshot):
volume_id = utils.create_volume(self.context, size=5)['id']
- parent_backup_id = self._create_backup(volume_id, status="available",
- num_dependent_backups=1)
- backup_id = self._create_backup(volume_id, status="available",
+ parent_backup_id = self._create_backup(
+ volume_id, status=fields.BackupStatus.AVAILABLE,
+ num_dependent_backups=1)
+ backup_id = self._create_backup(volume_id,
+ status=fields.BackupStatus.AVAILABLE,
incremental=True,
parent_id=parent_backup_id,
num_dependent_backups=1)
snapshot = utils.create_snapshot(self.context,
volume_id)
snapshot_id = snapshot.id
- child_backup_id = self._create_backup(volume_id, status="available",
- incremental=True,
- parent_id=backup_id,
- snapshot_id=snapshot_id)
+ child_backup_id = self._create_backup(
+ volume_id, status=fields.BackupStatus.AVAILABLE, incremental=True,
+ parent_id=backup_id, snapshot_id=snapshot_id)
req = webob.Request.blank('/v2/fake/backups/%s' %
backup_id)
from oslo_versionedobjects import fields
from cinder import objects
+from cinder.objects import fields as c_fields
def fake_db_backup(**updates):
'user_id': 'fake_user',
'project_id': 'fake_project',
'volume_id': 'fake_id',
- 'status': 'creating',
+ 'status': c_fields.BackupStatus.CREATING,
'host': 'fake_host',
'display_name': 'fake_name',
'size': 5,
from cinder.db.sqlalchemy import models
from cinder import exception
from cinder import objects
+from cinder.objects import fields
from cinder.tests.unit import fake_volume
from cinder.tests.unit import objects as test_objects
from cinder.tests.unit import utils
fake_backup = {
'id': '1',
'volume_id': 'fake_id',
- 'status': "creating",
+ 'status': fields.BackupStatus.CREATING,
'size': 1,
'display_name': 'fake_name',
'display_description': 'fake_description',
--- /dev/null
+# Copyright 2015 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from cinder.objects import fields
+from cinder import test
+
+
+class FakeFieldType(fields.FieldType):
+ def coerce(self, obj, attr, value):
+ return '*%s*' % value
+
+ def to_primitive(self, obj, attr, value):
+ return '!%s!' % value
+
+ def from_primitive(self, obj, attr, value):
+ return value[1:-1]
+
+
+class TestField(test.TestCase):
+ def setUp(self):
+ super(TestField, self).setUp()
+ self.field = fields.Field(FakeFieldType())
+ self.coerce_good_values = [('foo', '*foo*')]
+ self.coerce_bad_values = []
+ self.to_primitive_values = [('foo', '!foo!')]
+ self.from_primitive_values = [('!foo!', 'foo')]
+
+ def test_coerce_good_values(self):
+ for in_val, out_val in self.coerce_good_values:
+ self.assertEqual(out_val, self.field.coerce('obj', 'attr', in_val))
+
+ def test_coerce_bad_values(self):
+ for in_val in self.coerce_bad_values:
+ self.assertRaises((TypeError, ValueError),
+ self.field.coerce, 'obj', 'attr', in_val)
+
+ def test_to_primitive(self):
+ for in_val, prim_val in self.to_primitive_values:
+ self.assertEqual(prim_val, self.field.to_primitive('obj', 'attr',
+ in_val))
+
+ def test_from_primitive(self):
+ class ObjectLikeThing(object):
+ _context = 'context'
+
+ for prim_val, out_val in self.from_primitive_values:
+ self.assertEqual(out_val, self.field.from_primitive(
+ ObjectLikeThing, 'attr', prim_val))
+
+ def test_stringify(self):
+ self.assertEqual('123', self.field.stringify(123))
+
+
+class TestBackupStatus(TestField):
+ def setUp(self):
+ super(TestBackupStatus, self).setUp()
+ self.field = fields.BackupStatusField()
+ self.coerce_good_values = [('error', 'error'),
+ ('error_deleting', 'error_deleting'),
+ ('creating', 'creating'),
+ ('available', 'available'),
+ ('deleting', 'deleting'),
+ ('deleted', 'deleted'),
+ ('restoring', 'restoring')]
+ self.coerce_bad_values = ['acme']
+ self.to_primitive_values = self.coerce_good_values[0:1]
+ self.from_primitive_values = self.coerce_good_values[0:1]
+
+ def test_stringify(self):
+ self.assertEqual("'error'", self.field.stringify('error'))
+
+ def test_stringify_invalid(self):
+ self.assertRaises(ValueError, self.field.stringify, 'not_a_status')
# NOTE: The hashes in this list should only be changed if they come with a
# corresponding version bump in the affected objects.
object_data = {
- 'Backup': '1.2-62c3da6df3dccb76796e4da65a45a44f',
- 'BackupImport': '1.2-62c3da6df3dccb76796e4da65a45a44f',
+ 'Backup': '1.3-2e63492190bbbc85c0e5bea328cd38f7',
+ 'BackupImport': '1.3-2e63492190bbbc85c0e5bea328cd38f7',
'BackupList': '1.0-24591dabe26d920ce0756fe64cd5f3aa',
'CGSnapshot': '1.0-190da2a2aa9457edc771d888f7d225c4',
'CGSnapshotList': '1.0-e8c3f4078cd0ee23487b34d173eec776',
from cinder import db
from cinder import exception
from cinder import objects
+from cinder.objects import fields
from cinder import test
from cinder.tests.unit.backup import fake_service_with_verify as fake_service
from cinder.volume.drivers import lvm
def _create_backup_db_entry(self, volume_id=1, display_name='test_backup',
display_description='this is a test backup',
container='volumebackups',
- status='creating',
+ status=fields.BackupStatus.CREATING,
size=1,
object_count=0,
project_id='fake',
"""Create backup metadata export entry."""
vol_id = self._create_volume_db_entry(status='available',
size=vol_size)
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id)
if exported_id is not None:
backup.id = exported_id
def _create_export_record_db_entry(self,
volume_id='0000',
- status='creating',
+ status=fields.BackupStatus.CREATING,
project_id='fake',
backup_id=None):
"""Create a backup entry in the DB.
vol6_id = self._create_volume_db_entry()
db.volume_update(self.ctxt, vol6_id, {'status': 'restoring-backup'})
- backup1 = self._create_backup_db_entry(status='creating',
- volume_id=vol1_id)
- backup2 = self._create_backup_db_entry(status='restoring',
- volume_id=vol2_id)
- backup3 = self._create_backup_db_entry(status='deleting',
- volume_id=vol3_id)
- self._create_backup_db_entry(status='creating',
+ backup1 = self._create_backup_db_entry(
+ status=fields.BackupStatus.CREATING, volume_id=vol1_id)
+ backup2 = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol2_id)
+ backup3 = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol3_id)
+ self._create_backup_db_entry(status=fields.BackupStatus.CREATING,
volume_id=vol4_id,
temp_volume_id=temp_vol_id)
- self._create_backup_db_entry(status='creating',
+ self._create_backup_db_entry(status=fields.BackupStatus.CREATING,
volume_id=vol5_id,
temp_snapshot_id=temp_snap.id)
self.assertEqual('error_restoring', vol6['status'])
backup1 = db.backup_get(self.ctxt, backup1.id)
- self.assertEqual('error', backup1['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup1['status'])
backup2 = db.backup_get(self.ctxt, backup2.id)
- self.assertEqual('available', backup2['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup2['status'])
self.assertRaises(exception.BackupNotFound,
db.backup_get,
self.ctxt,
self.override_config('backup_service_inithost_offload', True)
vol1_id = self._create_volume_db_entry()
db.volume_update(self.ctxt, vol1_id, {'status': 'available'})
- backup1 = self._create_backup_db_entry(status='deleting',
- volume_id=vol1_id)
+ backup1 = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol1_id)
vol2_id = self._create_volume_db_entry()
db.volume_update(self.ctxt, vol2_id, {'status': 'available'})
- backup2 = self._create_backup_db_entry(status='deleting',
- volume_id=vol2_id)
+ backup2 = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol2_id)
mock_get_all_by_host.return_value = [backup1, backup2]
self.backup_mgr.init_host()
calls = [mock.call(self.backup_mgr.delete_backup, mock.ANY, backup1),
def test_cleanup_one_creating_backup(self):
"""Test cleanup_one_backup for volume status 'creating'."""
- backup = self._create_backup_db_entry(status='creating')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.CREATING)
self.backup_mgr._cleanup_one_backup(self.ctxt, backup)
- self.assertEqual('error', backup.status)
+ self.assertEqual(fields.BackupStatus.ERROR, backup.status)
def test_cleanup_one_restoring_backup(self):
"""Test cleanup_one_backup for volume status 'restoring'."""
- backup = self._create_backup_db_entry(status='restoring')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING)
self.backup_mgr._cleanup_one_backup(self.ctxt, backup)
- self.assertEqual('available', backup.status)
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup.status)
def test_cleanup_one_deleting_backup(self):
"""Test cleanup_one_backup for volume status 'deleting'."""
- backup = self._create_backup_db_entry(status='deleting')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING)
self.backup_mgr._cleanup_one_backup(self.ctxt, backup)
mock_volume_get = self.mock_object(db, 'volume_get')
mock_volume_get.side_effect = [err]
- backup = self._create_backup_db_entry(status='creating')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.CREATING)
self.assertIsNone(
self.backup_mgr._cleanup_temp_volumes_snapshots_for_one_backup(
vol1_id = self._create_volume_db_entry()
self._create_volume_attach(vol1_id)
db.volume_update(self.ctxt, vol1_id, {'status': 'backing-up'})
- backup = self._create_backup_db_entry(status='error',
+ backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=vol1_id,
temp_snapshot_id='fake')
vol1_id = self._create_volume_db_entry()
self._create_volume_attach(vol1_id)
db.volume_update(self.ctxt, vol1_id, {'status': 'backing-up'})
- backup = self._create_backup_db_entry(status='error',
+ backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=vol1_id,
temp_volume_id='fake')
def test_create_backup_with_bad_backup_status(self):
"""Test creating a backup with a backup with a bad status."""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.create_backup,
self.ctxt,
self.assertEqual('available', vol['status'])
self.assertEqual('error_backing-up', vol['previous_status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
self.assertTrue(_mock_volume_backup.called)
@mock.patch('%s.%s' % (CONF.volume_driver, 'backup_volume'))
self.assertEqual('available', vol['status'])
self.assertEqual('backing-up', vol['previous_status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
self.assertTrue(_mock_volume_backup.called)
backup,
vol_id)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
def test_restore_backup_with_bad_backup_status(self):
"""Test error handling.
"""
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.restore_backup,
self.ctxt,
vol = db.volume_get(self.ctxt, vol_id)
self.assertEqual('error', vol['status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
@mock.patch('%s.%s' % (CONF.volume_driver, 'restore_backup'))
def test_restore_backup_with_driver_error(self, _mock_volume_restore):
"""Test error handling when an error occurs during backup restore."""
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol_id)
_mock_volume_restore.side_effect = FakeBackupException('fake')
self.assertRaises(FakeBackupException,
vol = db.volume_get(self.ctxt, vol_id)
self.assertEqual('error_restoring', vol['status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertTrue(_mock_volume_restore.called)
def test_restore_backup_with_bad_service(self):
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=1)
service = 'cinder.tests.backup.bad_service'
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=vol_id,
- service=service)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol_id,
+ service=service)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.restore_backup,
vol = db.volume_get(self.ctxt, vol_id)
self.assertEqual('error', vol['status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
@mock.patch('%s.%s' % (CONF.volume_driver, 'restore_backup'))
def test_restore_backup(self, _mock_volume_restore):
vol_size = 1
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=vol_size)
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol_id)
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
vol = db.volume_get(self.ctxt, vol_id)
self.assertEqual('available', vol['status'])
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertTrue(_mock_volume_restore.called)
@mock.patch('cinder.volume.utils.notify_about_backup_usage')
vol_size = 1
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=vol_size)
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol_id)
self.backup_mgr.restore_backup(self.ctxt, backup, vol_id)
self.assertEqual(2, notify.call_count)
vol_size = 1
vol_id = self._create_volume_db_entry(status='restoring-backup',
size=vol_size)
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING, volume_id=vol_id)
self.backup_mgr._get_driver = mock.MagicMock()
self.backup_mgr._get_volume_backend = mock.MagicMock()
with a bad status.
"""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.delete_backup,
self.ctxt,
backup)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_delete_backup_with_error(self):
"""Test error handling when an error occurs during backup deletion."""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='deleting',
- display_name='fail_on_delete',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING,
+ display_name='fail_on_delete', volume_id=vol_id)
self.assertRaises(IOError,
self.backup_mgr.delete_backup,
self.ctxt,
backup)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_delete_backup_with_bad_service(self):
"""Test error handling.
"""
vol_id = self._create_volume_db_entry(size=1)
service = 'cinder.tests.backup.bad_service'
- backup = self._create_backup_db_entry(status='deleting',
- volume_id=vol_id,
- service=service)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol_id,
+ service=service)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.delete_backup,
self.ctxt,
backup)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_delete_backup_with_no_service(self):
"""Test error handling.
with no service defined for that backup, relates to bug #1162908
"""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='deleting',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol_id)
backup.service = None
backup.save()
self.backup_mgr.delete_backup(self.ctxt, backup)
def test_delete_backup(self):
"""Test normal backup deletion."""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='deleting',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol_id)
self.backup_mgr.delete_backup(self.ctxt, backup)
self.assertRaises(exception.BackupNotFound,
db.backup_get,
backup = db.backup_get(ctxt_read_deleted, backup.id)
self.assertTrue(backup.deleted)
self.assertGreaterEqual(timeutils.utcnow(), backup.deleted_at)
- self.assertEqual('deleted', backup.status)
+ self.assertEqual(fields.BackupStatus.DELETED, backup.status)
@mock.patch('cinder.volume.utils.notify_about_backup_usage')
def test_delete_backup_with_notify(self, notify):
"""Test normal backup deletion with notifications."""
vol_id = self._create_volume_db_entry(size=1)
- backup = self._create_backup_db_entry(status='deleting',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.DELETING, volume_id=vol_id)
self.backup_mgr.delete_backup(self.ctxt, backup)
self.assertEqual(2, notify.call_count)
"""
vol_id = self._create_volume_db_entry(size=1)
service = 'cinder.tests.backup.bad_service'
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id,
- service=service)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id,
+ service=service)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.export_record,
"""
vol_id = self._create_volume_db_entry(status='available',
size=1)
- backup = self._create_backup_db_entry(status='error',
+ backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=vol_id)
self.assertRaises(exception.InvalidBackup,
self.backup_mgr.export_record,
vol_size = 1
vol_id = self._create_volume_db_entry(status='available',
size=vol_size)
- backup = self._create_backup_db_entry(status='available',
- volume_id=vol_id)
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.AVAILABLE, volume_id=vol_id)
export = self.backup_mgr.export_record(self.ctxt, backup)
self.assertEqual(CONF.backup_driver, export['backup_service'])
export['backup_url'],
backup_hosts)
backup = db.backup_get(self.ctxt, imported_record.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
def test_import_record_with_wrong_id(self):
backup_hosts)
self.assertTrue(_mock_record_import.called)
backup = db.backup_get(self.ctxt, imported_record.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_not_supported_driver_to_force_delete(self):
"""Test force delete check method for not supported drivers."""
export['backup_url'],
backup_hosts)
backup = db.backup_get(self.ctxt, imported_record.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
def test_import_record_with_verify_invalid_backup(self):
backup_hosts)
self.assertTrue(_mock_record_verify.called)
backup = db.backup_get(self.ctxt, imported_record.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_backup_reset_status_from_nonrestoring_to_available(
self):
vol_id = self._create_volume_db_entry(status='available',
size=1)
- backup = self._create_backup_db_entry(status='error',
+ backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=vol_id)
with mock.patch.object(manager.BackupManager,
'_map_service_to_driver') as \
fake_service.get_backup_driver(self.ctxt)
self.backup_mgr.reset_status(self.ctxt,
backup,
- 'available')
+ fields.BackupStatus.AVAILABLE)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
def test_backup_reset_status_to_available_invalid_backup(self):
volume = db.volume_create(self.ctxt, {'status': 'available',
'host': 'test',
'provider_location': '',
'size': 1})
- backup = self._create_backup_db_entry(status='error',
+ backup = self._create_backup_db_entry(status=fields.BackupStatus.ERROR,
volume_id=volume['id'])
backup_driver = self.backup_mgr.service.get_backup_driver(self.ctxt)
self.backup_mgr.reset_status,
self.ctxt,
backup,
- 'available')
+ fields.BackupStatus.AVAILABLE)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
def test_backup_reset_status_from_restoring_to_available(self):
volume = db.volume_create(self.ctxt,
'host': 'test',
'provider_location': '',
'size': 1})
- backup = self._create_backup_db_entry(status='restoring',
- volume_id=volume['id'])
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.RESTORING,
+ volume_id=volume['id'])
- self.backup_mgr.reset_status(self.ctxt, backup, 'available')
+ self.backup_mgr.reset_status(self.ctxt, backup,
+ fields.BackupStatus.AVAILABLE)
backup = db.backup_get(self.ctxt, backup.id)
- self.assertEqual('available', backup['status'])
+ self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
def test_backup_reset_status_to_error(self):
volume = db.volume_create(self.ctxt,
'host': 'test',
'provider_location': '',
'size': 1})
- backup = self._create_backup_db_entry(status='creating',
- volume_id=volume['id'])
- self.backup_mgr.reset_status(self.ctxt, backup, 'error')
+ backup = self._create_backup_db_entry(
+ status=fields.BackupStatus.CREATING,
+ volume_id=volume['id'])
+ self.backup_mgr.reset_status(self.ctxt, backup,
+ fields.BackupStatus.ERROR)
backup = db.backup_get(self.ctxt, backup['id'])
- self.assertEqual('error', backup['status'])
+ self.assertEqual(fields.BackupStatus.ERROR, backup['status'])
@ddt.ddt
from cinder.cmd import volume_usage_audit
from cinder import context
from cinder import exception
+from cinder.objects import fields
from cinder import test
from cinder.tests.unit import fake_volume
from cinder import version
'host': 'fake-host',
'display_name': 'fake-display-name',
'container': 'fake-container',
- 'status': 'fake-status',
+ 'status': fields.BackupStatus.AVAILABLE,
'size': 123,
'object_count': 1,
'volume_id': 'fake-volume-id',
import mock
from cinder import exception
+from cinder.objects import fields
from cinder import test
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
'name': 'backup-test',
'id': 'id-backup',
'provider_location': '0',
- 'status': 'available'}
+ 'status': fields.BackupStatus.AVAILABLE}
_TEST_SNAPSHOT = {'volume_name': 'test',
'size': 128,
from cinder.db.sqlalchemy import models as sqa_models
from cinder import exception
from cinder import objects
+from cinder.objects import fields
from cinder import quota
from cinder import quota_utils
from cinder import test
backup['project_id'] = self.project_id
backup['volume_id'] = volume['id']
backup['volume_size'] = volume['size']
- backup['status'] = 'available'
+ backup['status'] = fields.BackupStatus.AVAILABLE
return db.backup_create(self.context, backup)
def test_volume_size_limit_exceeds(self):
from cinder.image import image_utils
from cinder import keymgr
from cinder import objects
+from cinder.objects import fields
import cinder.policy
from cinder import quota
from cinder import test
backup['display_name'] = 'test_check_for_setup_error'
backup['display_description'] = 'test_check_for_setup_error'
backup['container'] = 'fake'
- backup['status'] = 'creating'
+ backup['status'] = fields.BackupStatus.CREATING
backup['fail_reason'] = ''
backup['service'] = 'fake'
backup['parent_id'] = None
from cinder import context
from cinder import db
from cinder import objects
+from cinder.objects import fields
def get_test_admin_context():
volume_id,
display_name='test_backup',
display_description='This is a test backup',
- status='creating',
+ status=fields.BackupStatus.CREATING,
parent_id=None,
temp_volume_id=None,
temp_snapshot_id=None,