]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Add support for backing up volume metadata
authorEdward Hope-Morley <edward.hope-morley@canonical.com>
Tue, 15 Oct 2013 18:01:06 +0000 (19:01 +0100)
committerEdward Hope-Morley <edward.hope-morley@canonical.com>
Sat, 15 Feb 2014 12:24:52 +0000 (12:24 +0000)
This commit adds a new versioned api to the backup driver base
class which all backup driver implementations should use in order
to support backing up volume metadata as well as data. The general
rule here is that we backup all metadata from a set of db tables
and can then be selective about what we restore.

As part of this commit, Ceph backup driver implements the new api
to backup volume metadata (including glance metadata) thus
supporting restoring volumes as bootable.

It is expected that the other backup driver implementations will
follow suit and adopt this approach.

Implements: blueprint cinder-backup-volume-metadata-support
Fixes: bug 1137908
Change-Id: Icf2f1202d827d2435e09f5dad0f6f022f35dceee

cinder/backup/driver.py
cinder/backup/drivers/ceph.py
cinder/backup/drivers/swift.py
cinder/backup/drivers/tsm.py
cinder/exception.py
cinder/tests/test_backup_ceph.py
cinder/tests/test_backup_driver_base.py [new file with mode: 0644]
etc/cinder/cinder.conf.sample

index 241ca8348301d798543872f26e7b57eea62358ef..81fc0321316d2b0430a51b580030f9603d4e669e 100644 (file)
 """Base class for all backup drivers."""
 
 from cinder.db import base
+from cinder import exception
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common import log as logging
+from oslo.config import cfg
+
+service_opts = [
+    cfg.IntOpt('backup_metadata_version', default=1,
+               help='Backup metadata version to be used when backing up '
+                    'volume metadata. If this number is bumped, make sure the '
+                    'service doing the restore supports the new version.')
+]
+
+CONF = cfg.CONF
+CONF.register_opts(service_opts)
+
+LOG = logging.getLogger(__name__)
+
+
+class BackupMetadataAPI(base.Base):
+
+    TYPE_TAG_VOL_BASE_META = 'volume-base-metadata'
+    TYPE_TAG_VOL_META = 'volume-metadata'
+    TYPE_TAG_VOL_GLANCE_META = 'volume-glance-metadata'
+
+    def __init__(self, context, db_driver=None):
+        super(BackupMetadataAPI, self).__init__(db_driver)
+        self.context = context
+
+    @staticmethod
+    def _is_serializable(value):
+        """Returns True if value is serializable."""
+        try:
+            jsonutils.dumps(value)
+        except TypeError:
+            LOG.info(_("Value with type=%s is not serializable") %
+                     type(value))
+            return False
+
+        return True
+
+    def _save_vol_base_meta(self, container, volume_id):
+        """Save base volume metadata to container.
+
+        This will fetch all fields from the db Volume object for volume_id and
+        save them in the provided container dictionary.
+        """
+        type_tag = self.TYPE_TAG_VOL_BASE_META
+        LOG.debug(_("Getting metadata type '%s'") % type_tag)
+        meta = self.db.volume_get(self.context, volume_id)
+        if meta:
+            container[type_tag] = {}
+            for key, value in meta:
+                # Exclude fields that are "not JSON serializable"
+                if not self._is_serializable(value):
+                    LOG.info(_("Unable to serialize field '%s' - excluding "
+                               "from backup") % (key))
+                    continue
+                container[type_tag][key] = value
+
+            LOG.debug(_("Completed fetching metadata type '%s'") % type_tag)
+        else:
+            LOG.debug(_("No metadata type '%s' available") % type_tag)
+
+    def _save_vol_meta(self, container, volume_id):
+        """Save volume metadata to container.
+
+        This will fetch all fields from the db VolumeMetadata object for
+        volume_id and save them in the provided container dictionary.
+        """
+        type_tag = self.TYPE_TAG_VOL_META
+        LOG.debug(_("Getting metadata type '%s'") % type_tag)
+        meta = self.db.volume_metadata_get(self.context, volume_id)
+        if meta:
+            container[type_tag] = {}
+            for entry in meta:
+                # Exclude fields that are "not JSON serializable"
+                if not self._is_serializable(meta[entry]):
+                    LOG.info(_("Unable to serialize field '%s' - excluding "
+                               "from backup") % (entry))
+                    continue
+                container[type_tag][entry] = meta[entry]
+
+            LOG.debug(_("Completed fetching metadata type '%s'") % type_tag)
+        else:
+            LOG.debug(_("No metadata type '%s' available") % type_tag)
+
+    def _save_vol_glance_meta(self, container, volume_id):
+        """Save volume Glance metadata to container.
+
+        This will fetch all fields from the db VolumeGlanceMetadata object for
+        volume_id and save them in the provided container dictionary.
+        """
+        type_tag = self.TYPE_TAG_VOL_GLANCE_META
+        LOG.debug(_("Getting metadata type '%s'") % type_tag)
+        try:
+            meta = self.db.volume_glance_metadata_get(self.context, volume_id)
+            if meta:
+                container[type_tag] = {}
+                for entry in meta:
+                    # Exclude fields that are "not JSON serializable"
+                    if not self._is_serializable(entry.value):
+                        LOG.info(_("Unable to serialize field '%s' - "
+                                   "excluding from backup") % (entry))
+                        continue
+                    container[type_tag][entry.key] = entry.value
+
+            LOG.debug(_("Completed fetching metadata type '%s'") % type_tag)
+        except exception.GlanceMetadataNotFound:
+            LOG.debug(_("No metadata type '%s' available") % type_tag)
+
+    @staticmethod
+    def _filter(metadata, fields):
+        """Returns set of metadata restricted to required fields.
+
+        If fields is empty list, the full set is returned.
+        """
+        if fields == []:
+            return metadata
+
+        subset = {}
+        for field in fields:
+            if field in metadata:
+                subset[field] = metadata[field]
+            else:
+                LOG.debug(_("Excluding field '%s'") % (field))
+
+        return subset
+
+    def _restore_vol_base_meta(self, metadata, volume_id, fields):
+        """Restore values to Volume object for provided fields."""
+        LOG.debug(_("Restoring volume base metadata"))
+        # Only set the display_name if it was not None since the
+        # restore action will have set a name which is more useful than
+        # None.
+        key = 'display_name'
+        if key in fields and key in metadata and metadata[key] is None:
+            fields = [f for f in fields if f != key]
+
+        metadata = self._filter(metadata, fields)
+        self.db.volume_update(self.context, volume_id, metadata)
+
+    def _restore_vol_meta(self, metadata, volume_id, fields):
+        """Restore values to VolumeMetadata object for provided fields."""
+        LOG.debug(_("Restoring volume metadata"))
+        metadata = self._filter(metadata, fields)
+        self.db.volume_metadata_update(self.context, volume_id, metadata, True)
+
+    def _restore_vol_glance_meta(self, metadata, volume_id, fields):
+        """Restore values to VolumeGlanceMetadata object for provided fields.
+
+        First delete any existing metadata then save new values.
+        """
+        LOG.debug(_("Restoring volume glance metadata"))
+        metadata = self._filter(metadata, fields)
+        self.db.volume_glance_metadata_delete_by_volume(self.context,
+                                                        volume_id)
+        for key, value in metadata.items():
+            self.db.volume_glance_metadata_create(self.context,
+                                                  volume_id,
+                                                  key, value)
+
+        # Now mark the volume as bootable
+        self.db.volume_update(self.context, volume_id,
+                              {'bootable': True})
+
+    def _v1_restore_factory(self):
+        """All metadata is backed up but we selectively restore.
+
+        Returns a dictionary of the form:
+
+            {<type tag>: (<fields list>, <restore function>)}
+
+        Empty field list indicates that all backed up fields should be
+        restored.
+        """
+        return {self.TYPE_TAG_VOL_BASE_META:
+                (self._restore_vol_base_meta,
+                 ['display_name', 'display_description']),
+                self.TYPE_TAG_VOL_META:
+                (self._restore_vol_meta, []),
+                self.TYPE_TAG_VOL_GLANCE_META:
+                (self._restore_vol_glance_meta, [])}
+
+    def get(self, volume_id):
+        """Get volume metadata.
+
+        Returns a json-encoded dict containing all metadata and the restore
+        version i.e. the version used to decide what actually gets restored
+        from this container when doing a backup restore.
+        """
+        container = {'version': CONF.backup_metadata_version}
+        self._save_vol_base_meta(container, volume_id)
+        self._save_vol_meta(container, volume_id)
+        self._save_vol_glance_meta(container, volume_id)
+
+        if container:
+            return jsonutils.dumps(container)
+        else:
+            return None
+
+    def put(self, volume_id, json_metadata):
+        """Restore volume metadata to a volume.
+
+        The json container should contain a version that is supported here.
+        """
+        meta_container = jsonutils.loads(json_metadata)
+        version = meta_container['version']
+        if version == 1:
+            factory = self._v1_restore_factory()
+        else:
+            msg = (_("Unsupported backup metadata version (%s)") % (version))
+            raise exception.BackupMetadataUnsupportedVersion(msg)
+
+        for type in factory:
+            func = factory[type][0]
+            fields = factory[type][1]
+            if type in meta_container:
+                func(meta_container[type], volume_id, fields)
+            else:
+                msg = _("No metadata of type '%s' to restore") % (type)
+                LOG.debug(msg)
 
 
 class BackupDriver(base.Base):
 
-    def backup(self, backup, volume_file):
+    def __init__(self, context, db_driver=None):
+        super(BackupDriver, self).__init__(db_driver)
+        self.context = context
+        self.backup_meta_api = BackupMetadataAPI(context, db_driver)
+
+    def get_metadata(self, volume_id):
+        return self.backup_meta_api.get(volume_id)
+
+    def put_metadata(self, volume_id, json_metadata):
+        self.backup_meta_api.put(volume_id, json_metadata)
+
+    def backup(self, backup, volume_file, backup_metadata=False):
         """Start a backup of a specified volume."""
         raise NotImplementedError()
 
index 18268370dbdd25121790ecc425d9d1d9c20911f0..bb26342171a27991ad43ea3a43dd2d342327d552 100644 (file)
@@ -93,6 +93,65 @@ CONF = cfg.CONF
 CONF.register_opts(service_opts)
 
 
+class VolumeMetadataBackup(object):
+
+    def __init__(self, client, backup_id):
+        self._client = client
+        self._backup_id = backup_id
+
+    @property
+    def name(self):
+        return strutils.safe_encode("backup.%s.meta" % (self._backup_id))
+
+    @property
+    def exists(self):
+        meta_obj = rados.Object(self._client.ioctx, self.name)
+        return self._exists(meta_obj)
+
+    def _exists(self, obj):
+        try:
+            obj.stat()
+        except rados.ObjectNotFound:
+            return False
+        else:
+            return True
+
+    def set(self, json_meta):
+        """Write JSON metadata to a new object.
+
+        This should only be called once per backup. Raises
+        VolumeMetadataBackupExists if the object already exists.
+        """
+        meta_obj = rados.Object(self._client.ioctx, self.name)
+        if self._exists(meta_obj):
+            msg = _("Metadata backup object '%s' already exists") % (self.name)
+            raise exception.VolumeMetadataBackupExists(msg)
+
+        meta_obj.write(json_meta)
+
+    def get(self):
+        """Get metadata backup object.
+
+        Returns None if the object does not exist.
+        """
+        meta_obj = rados.Object(self._client.ioctx, self.name)
+        if not self._exists(meta_obj):
+            msg = _("Metadata backup object %s does not exist") % (self.name)
+            LOG.debug(msg)
+            return None
+
+        return meta_obj.read()
+
+    def remove_if_exists(self):
+        meta_obj = rados.Object(self._client.ioctx, self.name)
+        try:
+            meta_obj.remove()
+        except rados.ObjectNotFound:
+            msg = (_("Metadata backup object '%s' not found - ignoring") %
+                   (self.name))
+            LOG.debug(msg)
+
+
 class CephBackupDriver(BackupDriver):
     """Backup Cinder volumes to Ceph Object Store.
 
@@ -106,10 +165,9 @@ class CephBackupDriver(BackupDriver):
     """
 
     def __init__(self, context, db_driver=None, execute=None):
-        super(CephBackupDriver, self).__init__(db_driver)
+        super(CephBackupDriver, self).__init__(context, db_driver)
         self.rbd = rbd
         self.rados = rados
-        self.context = context
         self.chunk_size = CONF.backup_ceph_chunk_size
         self._execute = execute or utils.execute
 
@@ -737,8 +795,30 @@ class CephBackupDriver(BackupDriver):
 
         return int(volume['size']) * units.GiB
 
-    def backup(self, backup, volume_file):
-        """Backup the given volume to Ceph object store.
+    def _backup_metadata(self, backup):
+        """Backup volume metadata.
+
+        NOTE(dosaboy): the metadata we are backing up is obtained from a
+                       versioned api so we should not alter it in any way here.
+                       We must also be sure that the service that will perform
+                       the restore is compatible with version used.
+        """
+        json_meta = self.get_metadata(backup['volume_id'])
+        if not json_meta:
+            LOG.debug("No volume metadata to backup")
+            return
+
+        LOG.debug("Backing up volume metadata")
+        try:
+            with rbd_driver.RADOSClient(self) as client:
+                vol_meta_backup = VolumeMetadataBackup(client, backup['id'])
+                vol_meta_backup.set(json_meta)
+        except exception.VolumeMetadataBackupExists as e:
+            msg = _("Failed to backup volume metadata - %s") % (str(e))
+            raise exception.BackupOperationError(msg)
+
+    def backup(self, backup, volume_file, backup_metadata=True):
+        """Backup volume and metadata (if available) to Ceph object store.
 
         If the source volume is an RBD we will attempt to do an
         incremental/differential backup, otherwise a full copy is performed.
@@ -774,6 +854,14 @@ class CephBackupDriver(BackupDriver):
         self.db.backup_update(self.context, backup_id,
                               {'container': self._ceph_backup_pool})
 
+        if backup_metadata:
+            try:
+                self._backup_metadata(backup)
+            except exception.BackupOperationError:
+                # Cleanup.
+                self.delete(backup)
+                raise
+
         LOG.debug(_("Backup '%s' finished.") % (backup_id))
 
     def _full_restore(self, backup_id, volume_id, dest_file, dest_name,
@@ -1008,8 +1096,30 @@ class CephBackupDriver(BackupDriver):
             self._full_restore(backup_id, backup_volume_id, volume_file,
                                volume_name, length, src_snap=restore_point)
 
+    def _restore_metadata(self, backup, volume_id):
+        """Restore volume metadata from backup.
+
+        If this backup has associated metadata, save it to the restore target
+        otherwise do nothing.
+        """
+        try:
+            with rbd_driver.RADOSClient(self) as client:
+                meta_bak = VolumeMetadataBackup(client, backup['id'])
+                meta = meta_bak.get()
+                if meta is not None:
+                    self.put_metadata(volume_id, meta)
+                else:
+                    LOG.debug(_("Volume has no backed up metadata"))
+        except exception.BackupMetadataUnsupportedVersion:
+            msg = _("Metadata restore failed due to incompatible version")
+            LOG.error(msg)
+            raise exception.BackupOperationError(msg)
+
     def restore(self, backup, volume_id, volume_file):
-        """Restore the given volume backup from Ceph object store."""
+        """Restore volume from backup in Ceph object store.
+
+        If volume metadata is available this will also be restored.
+        """
         target_volume = self.db.volume_get(self.context, volume_id)
         LOG.debug(_('Starting restore from Ceph backup=%(src)s to '
                     'volume=%(dest)s') %
@@ -1027,6 +1137,8 @@ class CephBackupDriver(BackupDriver):
             else:
                 os.fsync(fileno)
 
+            self._restore_metadata(backup, volume_id)
+
             LOG.debug(_('Restore finished successfully.'))
         except exception.BackupOperationError as e:
             LOG.error(_('Restore finished with error - %s') % (e))
@@ -1037,12 +1149,20 @@ class CephBackupDriver(BackupDriver):
         backup_id = backup['id']
         LOG.debug(_('Delete started for backup=%s') % backup['id'])
 
+        delete_failed = False
         try:
             self._try_delete_base_image(backup['id'], backup['volume_id'])
         except self.rbd.ImageNotFound:
-            msg = _("RBD image not found but continuing anyway so "
-                    "that db entry can be removed")
+            msg = _("RBD image not found but continuing anyway so that we can "
+                    "attempt to delete metadata backup and db entry can be "
+                    "removed")
             LOG.warning(msg)
+            delete_failed = True
+
+        with rbd_driver.RADOSClient(self) as client:
+            VolumeMetadataBackup(client, backup['id']).remove_if_exists()
+
+        if delete_failed:
             LOG.info(_("Delete '%s' finished with warning") % (backup_id))
         else:
             LOG.debug(_("Delete '%s' finished") % (backup_id))
index 0073354b0dc3928d9840177b121949295193336e..9abd39c332cb356484a7212970fb73b733cffd47 100644 (file)
@@ -107,7 +107,7 @@ class SwiftBackupDriver(BackupDriver):
         raise ValueError(unicode(err))
 
     def __init__(self, context, db_driver=None):
-        self.context = context
+        super(SwiftBackupDriver, self).__init__(context, db_driver)
         self.swift_url = '%s%s' % (CONF.backup_swift_url,
                                    self.context.project_id)
         self.az = CONF.storage_availability_zone
@@ -135,8 +135,6 @@ class SwiftBackupDriver(BackupDriver):
                                          preauthtoken=self.context.auth_token,
                                          starting_backoff=self.swift_backoff)
 
-        super(SwiftBackupDriver, self).__init__(db_driver)
-
     def _check_container_exists(self, container):
         LOG.debug(_('_check_container_exists: container: %s') % container)
         try:
@@ -327,8 +325,16 @@ class SwiftBackupDriver(BackupDriver):
                               {'object_count': object_id})
         LOG.debug(_('backup %s finished.') % backup['id'])
 
-    def backup(self, backup, volume_file):
-        """Backup the given volume to swift using the given backup metadata."""
+    def backup(self, backup, volume_file, backup_metadata=False):
+        """Backup the given volume to Swift."""
+
+        # TODO(dosaboy): this needs implementing (see backup.drivers.ceph for
+        #                an example)
+        if backup_metadata:
+            msg = _("Volume metadata backup requested but this driver does "
+                    "not yet support this feature.")
+            raise exception.InvalidBackup(reason=msg)
+
         object_meta, container = self._prepare_backup(backup)
         while True:
             data = volume_file.read(self.data_block_size_bytes)
index 3f0806a06d4230bc8bc49453fbd297a33d562208..c871360dfc38cb994c29f879d5d7bba392497334 100644 (file)
@@ -59,10 +59,9 @@ class TSMBackupDriver(BackupDriver):
     DRIVER_VERSION = '1.0.0'
 
     def __init__(self, context, db_driver=None):
-        self.context = context
+        super(TSMBackupDriver, self).__init__(context, db_driver)
         self.tsm_password = CONF.backup_tsm_password
         self.volume_prefix = CONF.backup_tsm_volume_prefix
-        super(TSMBackupDriver, self).__init__(db_driver)
 
     def _make_link(self, volume_path, backup_path, vol_id):
         """Create a hard link for the volume block device.
@@ -267,7 +266,7 @@ class TSMBackupDriver(BackupDriver):
                       'err': e.stderr})
             LOG.error(err)
 
-    def backup(self, backup, volume_file):
+    def backup(self, backup, volume_file, backup_metadata=False):
         """Backup the given volume to TSM.
 
         TSM performs an image backup of a volume. The volume_file is
@@ -276,9 +275,17 @@ class TSMBackupDriver(BackupDriver):
 
         :param backup: backup information for volume
         :param volume_file: file object representing the volume
+        :param backup_metadata: whether or not to backup volume metadata
         :raises InvalidBackup
         """
 
+        # TODO(dosaboy): this needs implementing (see backup.drivers.ceph for
+        #                an example)
+        if backup_metadata:
+            msg = _("Volume metadata backup requested but this driver does "
+                    "not yet support this feature.")
+            raise exception.InvalidBackup(reason=msg)
+
         backup_id = backup['id']
         volume_id = backup['volume_id']
         volume_path = self._get_volume_realpath(volume_file, volume_id)
index 81d1da805283475a780d85a2798af9840ad31823..e2f1f7b97b4f0d1f68cc53133d203c4bf3a9ddaf 100644 (file)
@@ -477,6 +477,14 @@ class BackupOperationError(Invalid):
     message = _("An error has occurred during backup operation")
 
 
+class BackupMetadataUnsupportedVersion(BackupDriverException):
+    message = _("Unsupported backup metadata version requested")
+
+
+class VolumeMetadataBackupExists(BackupDriverException):
+    message = _("Metadata backup already exists for this volume")
+
+
 class BackupRBDOperationFailed(BackupDriverException):
     message = _("Backup RBD operation failed")
 
index 627b58be7b3b1438a7fa35e12d9247c4ee027f64..52816727d898d4d1e78ed46cd8ab45493d98694b 100644 (file)
@@ -20,10 +20,12 @@ import os
 import tempfile
 import uuid
 
+from cinder.backup import driver
 from cinder.backup.drivers import ceph
 from cinder import context
 from cinder import db
 from cinder import exception
+from cinder.openstack.common import jsonutils
 from cinder.openstack.common import log as logging
 from cinder.openstack.common import processutils
 from cinder import test
@@ -51,6 +53,10 @@ class MockImageBusyException(MockException):
     """Used as mock for rbd.ImageBusy."""
 
 
+class MockObjectNotFoundException(MockException):
+    """Used as mock for rados.MockObjectNotFoundException."""
+
+
 def common_mocks(f):
     """Decorator to set mocks common to all tests.
 
@@ -61,12 +67,12 @@ def common_mocks(f):
         # NOTE(dosaboy): mock Popen to, by default, raise Exception in order to
         #                ensure that any test ending up in a subprocess fails
         #                if not properly mocked.
-        @mock.patch('subprocess.Popen')
-        # NOTE(dosaboy): mock out eventlet.sleep() so that it does nothing.
-        @mock.patch('eventlet.sleep')
-        @mock.patch('time.time')
+        @mock.patch('subprocess.Popen', spec=True)
         # NOTE(dosaboy): set spec to empty object so that hasattr calls return
         #                False by default.
+        @mock.patch('eventlet.sleep', spec=True)
+        @mock.patch('time.time', spec=True)
+        # NOTE(dosaboy): mock out eventlet.sleep() so that it does nothing.
         @mock.patch('cinder.backup.drivers.ceph.rbd', spec=object)
         @mock.patch('cinder.backup.drivers.ceph.rados', spec=object)
         def _common_inner_inner2(mock_rados, mock_rbd, mock_time, mock_sleep,
@@ -367,12 +373,13 @@ class BackupCephTestCase(test.TestCase):
         self.service.rbd.Image.write = mock.Mock()
         self.service.rbd.Image.write.side_effect = mock_write_data
 
-        with mock.patch.object(self.service, '_discard_bytes'):
-            with tempfile.NamedTemporaryFile() as test_file:
-                self.service.backup(self.backup, self.volume_file)
+        with mock.patch.object(self.service, '_backup_metadata'):
+            with mock.patch.object(self.service, '_discard_bytes'):
+                with tempfile.NamedTemporaryFile() as test_file:
+                    self.service.backup(self.backup, self.volume_file)
 
-                # Ensure the files are equal
-                self.assertEqual(checksum.digest(), self.checksum.digest())
+                    # Ensure the files are equal
+                    self.assertEqual(checksum.digest(), self.checksum.digest())
 
         self.assertTrue(self.service.rbd.Image.write.called)
 
@@ -391,8 +398,8 @@ class BackupCephTestCase(test.TestCase):
                          "volume-%s.backup.%s" % (self.volume_id, '1234'))
 
     @common_mocks
-    @mock.patch('fcntl.fcntl')
-    @mock.patch('subprocess.Popen')
+    @mock.patch('fcntl.fcntl', spec=True)
+    @mock.patch('subprocess.Popen', spec=True)
     def test_backup_volume_from_rbd(self, mock_popen, mock_fnctl):
         backup_name = self.service._get_backup_base_name(self.backup_id,
                                                          diff_format=True)
@@ -416,34 +423,36 @@ class BackupCephTestCase(test.TestCase):
         self.mock_rbd.RBD.list = mock.Mock()
         self.mock_rbd.RBD.list.return_value = [backup_name]
 
-        with mock.patch.object(self.service, 'get_backup_snaps') as \
-                mock_get_backup_snaps:
-            with mock.patch.object(self.service, '_full_backup') as \
-                    mock_full_backup:
-                with mock.patch.object(self.service, '_try_delete_base_image'):
-                    with tempfile.NamedTemporaryFile() as test_file:
-                        checksum = hashlib.sha256()
-                        image = self.service.rbd.Image()
-                        meta = rbddriver.RBDImageMetadata(image,
-                                                          'pool_foo',
-                                                          'user_foo',
-                                                          'conf_foo')
-                        self.service.backup(self.backup,
-                                            rbddriver.RBDImageIOWrapper(meta))
-
-                        self.assertEqual(self.callstack, ['popen_init',
-                                                          'read',
-                                                          'popen_init',
-                                                          'write',
-                                                          'stdout_close',
-                                                          'communicate'])
-
-                        self.assertFalse(mock_full_backup.called)
-                        self.assertTrue(mock_get_backup_snaps.called)
-
-                        # Ensure the files are equal
-                        self.assertEqual(checksum.digest(),
-                                         self.checksum.digest())
+        with mock.patch.object(self.service, '_backup_metadata'):
+            with mock.patch.object(self.service, 'get_backup_snaps') as \
+                    mock_get_backup_snaps:
+                with mock.patch.object(self.service, '_full_backup') as \
+                        mock_full_backup:
+                    with mock.patch.object(self.service,
+                                           '_try_delete_base_image'):
+                        with tempfile.NamedTemporaryFile() as test_file:
+                            checksum = hashlib.sha256()
+                            image = self.service.rbd.Image()
+                            meta = rbddriver.RBDImageMetadata(image,
+                                                              'pool_foo',
+                                                              'user_foo',
+                                                              'conf_foo')
+                            rbdio = rbddriver.RBDImageIOWrapper(meta)
+                            self.service.backup(self.backup, rbdio)
+
+                            self.assertEqual(self.callstack, ['popen_init',
+                                                              'read',
+                                                              'popen_init',
+                                                              'write',
+                                                              'stdout_close',
+                                                              'communicate'])
+
+                            self.assertFalse(mock_full_backup.called)
+                            self.assertTrue(mock_get_backup_snaps.called)
+
+                            # Ensure the files are equal
+                            self.assertEqual(checksum.digest(),
+                                             self.checksum.digest())
 
     @common_mocks
     def test_backup_vol_length_0(self):
@@ -476,22 +485,27 @@ class BackupCephTestCase(test.TestCase):
         self.mock_rbd.Image.size.return_value = \
             self.chunk_size * self.num_chunks
 
-        with mock.patch.object(self.service, '_discard_bytes') as \
-                mock_discard_bytes:
-            with tempfile.NamedTemporaryFile() as test_file:
-                self.volume_file.seek(0)
+        with mock.patch.object(self.service, '_restore_metadata') as \
+                mock_restore_metadata:
+            with mock.patch.object(self.service, '_discard_bytes') as \
+                    mock_discard_bytes:
+                with tempfile.NamedTemporaryFile() as test_file:
+                    self.volume_file.seek(0)
 
-                self.service.restore(self.backup, self.volume_id, test_file)
+                    self.service.restore(self.backup, self.volume_id,
+                                         test_file)
 
-                checksum = hashlib.sha256()
-                test_file.seek(0)
-                for c in xrange(0, self.num_chunks):
-                    checksum.update(test_file.read(self.chunk_size))
+                    checksum = hashlib.sha256()
+                    test_file.seek(0)
+                    for c in xrange(0, self.num_chunks):
+                        checksum.update(test_file.read(self.chunk_size))
 
-                # Ensure the files are equal
-                self.assertEqual(checksum.digest(), self.checksum.digest())
+                    # Ensure the files are equal
+                    self.assertEqual(checksum.digest(), self.checksum.digest())
 
-            self.assertTrue(mock_discard_bytes.called)
+                    self.assertTrue(mock_restore_metadata.called)
+                    self.assertTrue(mock_discard_bytes.called)
+                    self.assertTrue(mock_discard_bytes.called)
 
         self.assertTrue(self.service.rbd.Image.read.called)
 
@@ -560,7 +574,8 @@ class BackupCephTestCase(test.TestCase):
                 self.assertEqual(rem, (snap_name, 0))
 
     @common_mocks
-    def test_try_delete_base_image_diff_format(self):
+    @mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
+    def test_try_delete_base_image_diff_format(self, mock_meta_backup):
         backup_name = self.service._get_backup_base_name(self.volume_id,
                                                          diff_format=True)
 
@@ -581,7 +596,8 @@ class BackupCephTestCase(test.TestCase):
         self.assertTrue(self.mock_rbd.RBD.remove.called)
 
     @common_mocks
-    def test_try_delete_base_image(self):
+    @mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
+    def test_try_delete_base_image(self, mock_meta_backup):
         backup_name = self.service._get_backup_base_name(self.volume_id,
                                                          self.backup_id)
 
@@ -616,13 +632,15 @@ class BackupCephTestCase(test.TestCase):
         self.assertTrue(MockImageBusyException in RAISED_EXCEPTIONS)
 
     @common_mocks
-    def test_delete(self):
+    @mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
+    def test_delete(self, mock_meta_backup):
         with mock.patch.object(self.service, '_try_delete_base_image'):
             self.service.delete(self.backup)
             self.assertEqual(RAISED_EXCEPTIONS, [])
 
     @common_mocks
-    def test_delete_image_not_found(self):
+    @mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
+    def test_delete_image_not_found(self, mock_meta_backup):
         with mock.patch.object(self.service, '_try_delete_base_image') as \
                 mock_del_base:
             mock_del_base.side_effect = self.mock_rbd.ImageNotFound
@@ -760,11 +778,226 @@ class BackupCephTestCase(test.TestCase):
                         self.assertTrue(mock_file_is_rbd.called)
 
     @common_mocks
-    @mock.patch('fcntl.fcntl')
-    @mock.patch('subprocess.Popen')
+    @mock.patch('fcntl.fcntl', spec=True)
+    @mock.patch('subprocess.Popen', spec=True)
     def test_piped_execute(self, mock_popen, mock_fcntl):
         mock_fcntl.return_value = 0
         self._setup_mock_popen(mock_popen, ['out', 'err'])
         self.service._piped_execute(['foo'], ['bar'])
         self.assertEqual(self.callstack, ['popen_init', 'popen_init',
                                           'stdout_close', 'communicate'])
+
+    @common_mocks
+    def test_restore_metdata(self):
+        version = 1
+
+        def mock_read(*args):
+            base_tag = driver.BackupMetadataAPI.TYPE_TAG_VOL_BASE_META
+            glance_tag = driver.BackupMetadataAPI.TYPE_TAG_VOL_GLANCE_META
+            return jsonutils.dumps({base_tag: {'image_name': 'image.base'},
+                                    glance_tag: {'image_name': 'image.glance'},
+                                    'version': version})
+
+        self.mock_rados.Object = mock.Mock
+        self.mock_rados.Object.read = mock.Mock()
+        self.mock_rados.Object.read.side_effect = mock_read
+        self.mock_rados.Object.stat = mock.Mock()
+
+        self.service._restore_metadata(self.backup, self.volume_id)
+
+        self.assertTrue(self.mock_rados.Object.stat.called)
+        self.assertTrue(self.mock_rados.Object.read.called)
+
+        version = 2
+        try:
+            self.service._restore_metadata(self.backup, self.volume_id)
+        except exception.BackupOperationError as exc:
+            msg = _("Metadata restore failed due to incompatible version")
+            self.assertEqual(str(exc), msg)
+        else:
+            # Force a test failure
+            self.assertFalse(True)
+
+    @common_mocks
+    @mock.patch('cinder.backup.drivers.ceph.VolumeMetadataBackup', spec=True)
+    def test_backup_metata_already_exists(self, mock_meta_backup):
+
+        def mock_set(json_meta):
+            msg = (_("Metadata backup object '%s' already exists") %
+                   ("backup.%s.meta" % (self.backup_id)))
+            raise exception.VolumeMetadataBackupExists(msg)
+
+        mock_meta_backup.return_value.set = mock.Mock()
+        mock_meta_backup.return_value.set.side_effect = mock_set
+
+        with mock.patch.object(self.service, 'get_metadata') as \
+                mock_get_metadata:
+            mock_get_metadata.return_value = "some.json.metadata"
+            try:
+                self.service._backup_metadata(self.backup)
+            except exception.BackupOperationError as e:
+                msg = (_("Failed to backup volume metadata - Metadata backup "
+                         "object 'backup.%s.meta' already exists") %
+                       (self.backup_id))
+                self.assertEqual(str(e), msg)
+            else:
+                # Make the test fail
+                self.assertFalse(True)
+
+        self.assertFalse(mock_meta_backup.set.called)
+
+    @common_mocks
+    def test_backup_metata_error(self):
+        """Ensure that delete() is called if the metadata backup fails.
+
+        Also ensure that the exception is propagated to the caller.
+        """
+        with mock.patch.object(self.service, '_backup_metadata') as \
+                mock_backup_metadata:
+            mock_backup_metadata.side_effect = exception.BackupOperationError
+            with mock.patch.object(self.service, '_get_volume_size_gb'):
+                with mock.patch.object(self.service, '_file_is_rbd',
+                                       return_value=False):
+                    with mock.patch.object(self.service, '_full_backup'):
+                        with mock.patch.object(self.service, 'delete') as \
+                                mock_delete:
+                            self.assertRaises(exception.BackupOperationError,
+                                              self.service.backup, self.backup,
+                                              mock.Mock(),
+                                              backup_metadata=True)
+                            self.assertTrue(mock_delete.called)
+
+    @common_mocks
+    def test_restore_invalid_metadata_version(self):
+
+        def mock_read(*args):
+            base_tag = driver.BackupMetadataAPI.TYPE_TAG_VOL_BASE_META
+            glance_tag = driver.BackupMetadataAPI.TYPE_TAG_VOL_GLANCE_META
+            return jsonutils.dumps({base_tag: {'image_name': 'image.base'},
+                                    glance_tag: {'image_name': 'image.glance'},
+                                    'version': 2})
+
+        self.mock_rados.Object = mock.Mock
+        self.mock_rados.Object.read = mock.Mock()
+        self.mock_rados.Object.read.side_effect = mock_read
+        with mock.patch.object(ceph.VolumeMetadataBackup, '_exists') as \
+                mock_exists:
+            mock_exists.return_value = True
+
+            self.assertRaises(exception.BackupOperationError,
+                              self.service._restore_metadata,
+                              self.backup, self.volume_id)
+
+            self.assertTrue(mock_exists.called)
+
+        self.assertTrue(self.mock_rados.Object.read.called)
+
+
+def common_meta_backup_mocks(f):
+    """Decorator to set mocks common to all metadata backup tests.
+
+    The point of doing these mocks here is so that we don't accidentally set
+    mocks that can't/dont't get unset.
+    """
+    def _common_inner_inner1(inst, *args, **kwargs):
+        @mock.patch('cinder.backup.drivers.ceph.rbd', spec=object)
+        @mock.patch('cinder.backup.drivers.ceph.rados', spec=object)
+        def _common_inner_inner2(mock_rados, mock_rbd):
+            inst.mock_rados = mock_rados
+            inst.mock_rbd = mock_rbd
+            inst.mock_rados.Object = mock.Mock
+            inst.mock_rados.ObjectNotFound = MockObjectNotFoundException
+            return f(inst, *args, **kwargs)
+
+        return _common_inner_inner2()
+    return _common_inner_inner1
+
+
+class VolumeMetadataBackupTestCase(test.TestCase):
+
+    def setUp(self):
+        global RAISED_EXCEPTIONS
+        RAISED_EXCEPTIONS = []
+        super(VolumeMetadataBackupTestCase, self).setUp()
+        self.backup_id = str(uuid.uuid4())
+        self.mb = ceph.VolumeMetadataBackup(mock.Mock(), self.backup_id)
+
+    def tearDown(self):
+        super(VolumeMetadataBackupTestCase, self).tearDown()
+
+    @common_meta_backup_mocks
+    def test_name(self):
+        self.assertEqual(self.mb.name, 'backup.%s.meta' % (self.backup_id))
+
+    @common_meta_backup_mocks
+    def test_exists(self):
+        # True
+        with mock.patch.object(self.mock_rados.Object, 'stat') as mock_stat:
+            self.assertTrue(self.mb.exists)
+            self.assertTrue(mock_stat.called)
+
+        # False
+        with mock.patch.object(self.mock_rados.Object, 'stat') as mock_stat:
+            mock_stat.side_effect = self.mock_rados.ObjectNotFound
+            self.assertFalse(self.mb.exists)
+            self.assertTrue(mock_stat.called)
+            self.assertEqual(RAISED_EXCEPTIONS, [MockObjectNotFoundException])
+
+    @common_meta_backup_mocks
+    def test_set(self):
+        obj_data = []
+        called = []
+
+        def mock_read(*args):
+            called.append('read')
+            self.assertTrue(len(obj_data) == 1)
+            return obj_data[0]
+
+        def _mock_write(data):
+            obj_data.append(data)
+            called.append('write')
+
+        self.mb.get = mock.Mock()
+        self.mb.get.side_effect = mock_read
+
+        with mock.patch.object(ceph.VolumeMetadataBackup, 'set') as mock_write:
+            mock_write.side_effect = _mock_write
+
+            self.mb.set({'foo': 'bar'})
+            self.assertEqual(self.mb.get(), {'foo': 'bar'})
+            self.assertTrue(self.mb.get.called)
+
+            self.mb._exists = mock.Mock()
+            self.mb._exists.return_value = True
+
+        # use the unmocked set() method.
+        self.assertRaises(exception.VolumeMetadataBackupExists, self.mb.set,
+                          {'doo': 'dah'})
+
+        # check the meta obj state has not changed.
+        self.assertEqual(self.mb.get(), {'foo': 'bar'})
+
+        self.assertEqual(called, ['write', 'read', 'read'])
+
+    @common_meta_backup_mocks
+    def test_get(self):
+        with mock.patch.object(self.mock_rados.Object, 'stat') as mock_stat:
+            mock_stat.side_effect = self.mock_rados.ObjectNotFound
+            with mock.patch.object(self.mock_rados.Object, 'read') as \
+                    mock_read:
+                mock_read.return_value = 'meta'
+                self.assertIsNone(self.mb.get())
+                mock_stat.side_effect = None
+                self.assertEqual(self.mb.get(), 'meta')
+
+    @common_meta_backup_mocks
+    def remove_if_exists(self):
+        with mock.patch.object(self.mock_rados.Object, 'remove') as \
+                mock_remove:
+            mock_remove.side_effect = self.mock_rados.ObjectNotFound
+            self.mb.remove_if_exists()
+            self.assertEqual(RAISED_EXCEPTIONS, [MockObjectNotFoundException])
+
+            self.mock_rados.Object.remove.side_effect = None
+            self.mb.remove_if_exists()
+            self.assertEqual(RAISED_EXCEPTIONS, [])
diff --git a/cinder/tests/test_backup_driver_base.py b/cinder/tests/test_backup_driver_base.py
new file mode 100644 (file)
index 0000000..d32e7c3
--- /dev/null
@@ -0,0 +1,224 @@
+# Copyright 2013 Canonical Ltd.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+""" Tests for the backup service base driver. """
+
+import mock
+import uuid
+
+from cinder.backup import driver
+from cinder import context
+from cinder import db
+from cinder import exception
+from cinder.openstack.common import jsonutils
+from cinder import test
+
+
+class BackupBaseDriverTestCase(test.TestCase):
+
+    def _create_volume_db_entry(self, id, size):
+        vol = {'id': id, 'size': size, 'status': 'available'}
+        return db.volume_create(self.ctxt, vol)['id']
+
+    def _create_backup_db_entry(self, backupid, volid, size):
+        backup = {'id': backupid, 'size': size, 'volume_id': volid}
+        return db.backup_create(self.ctxt, backup)['id']
+
+    def setUp(self):
+        super(BackupBaseDriverTestCase, self).setUp()
+        self.ctxt = context.get_admin_context()
+
+        self.volume_id = str(uuid.uuid4())
+        self.backup_id = str(uuid.uuid4())
+
+        self._create_backup_db_entry(self.backup_id, self.volume_id, 1)
+        self._create_volume_db_entry(self.volume_id, 1)
+        self.backup = db.backup_get(self.ctxt, self.backup_id)
+        self.driver = driver.BackupDriver(self.ctxt)
+
+    def test_backup(self):
+        self.assertRaises(NotImplementedError,
+                          self.driver.backup, self.backup, self.volume_id)
+
+    def test_restore(self):
+        self.assertRaises(NotImplementedError,
+                          self.driver.restore, self.backup, self.volume_id,
+                          None)
+
+    def test_delete(self):
+        self.assertRaises(NotImplementedError,
+                          self.driver.delete, self.backup)
+
+    def test_get_metadata(self):
+        json_metadata = self.driver.get_metadata(self.volume_id)
+        metadata = jsonutils.loads(json_metadata)
+        self.assertEqual(metadata['version'], 1)
+
+    def test_put_metadata(self):
+        metadata = {'version': 1}
+        self.driver.put_metadata(self.volume_id, jsonutils.dumps(metadata))
+
+    def test_get_put_metadata(self):
+        json_metadata = self.driver.get_metadata(self.volume_id)
+        self.driver.put_metadata(self.volume_id, json_metadata)
+
+    def tearDown(self):
+        super(BackupBaseDriverTestCase, self).tearDown()
+
+
+class BackupMetadataAPITestCase(test.TestCase):
+
+    def _create_volume_db_entry(self, id, size):
+        vol = {'id': id, 'size': size, 'status': 'available'}
+        return db.volume_create(self.ctxt, vol)['id']
+
+    def setUp(self):
+        super(BackupMetadataAPITestCase, self).setUp()
+        self.ctxt = context.get_admin_context()
+        self.volume_id = str(uuid.uuid4())
+        self._create_volume_db_entry(self.volume_id, 1)
+        self.bak_meta_api = driver.BackupMetadataAPI(self.ctxt)
+
+    def _add_metadata(self, vol_meta=False, vol_glance_meta=False):
+        if vol_meta:
+            # Add some VolumeMetadata
+            db.volume_metadata_update(self.ctxt, self.volume_id,
+                                      {'fee': 'fi'}, False)
+            db.volume_metadata_update(self.ctxt, self.volume_id,
+                                      {'fo': 'fum'}, False)
+
+        if vol_glance_meta:
+            # Add some GlanceMetadata
+            db.volume_glance_metadata_create(self.ctxt, self.volume_id,
+                                             'disk_format', 'bare')
+            db.volume_glance_metadata_create(self.ctxt, self.volume_id,
+                                             'container_type', 'ovf')
+
+    def test_get(self):
+        # Volume won't have anything other than base by default
+        meta = self.bak_meta_api.get(self.volume_id)
+        s1 = set(jsonutils.loads(meta).keys())
+        s2 = ['version', self.bak_meta_api.TYPE_TAG_VOL_BASE_META]
+        self.assertEqual(s1.symmetric_difference(s2), set())
+
+        self._add_metadata(vol_glance_meta=True)
+
+        meta = self.bak_meta_api.get(self.volume_id)
+        s1 = set(jsonutils.loads(meta).keys())
+        s2 = ['version', self.bak_meta_api.TYPE_TAG_VOL_BASE_META,
+              self.bak_meta_api.TYPE_TAG_VOL_GLANCE_META]
+        self.assertEqual(s1.symmetric_difference(s2), set())
+
+        self._add_metadata(vol_meta=True)
+
+        meta = self.bak_meta_api.get(self.volume_id)
+        s1 = set(jsonutils.loads(meta).keys())
+        s2 = ['version', self.bak_meta_api.TYPE_TAG_VOL_BASE_META,
+              self.bak_meta_api.TYPE_TAG_VOL_GLANCE_META,
+              self.bak_meta_api.TYPE_TAG_VOL_META]
+        self.assertEqual(s1.symmetric_difference(s2), set())
+
+    def test_put(self):
+        meta = self.bak_meta_api.get(self.volume_id)
+        self.bak_meta_api.put(self.volume_id, meta)
+
+        self._add_metadata(vol_glance_meta=True)
+        meta = self.bak_meta_api.get(self.volume_id)
+        self.bak_meta_api.put(self.volume_id, meta)
+
+        self._add_metadata(vol_meta=True)
+        meta = self.bak_meta_api.get(self.volume_id)
+        self.bak_meta_api.put(self.volume_id, meta)
+
+    def test_put_invalid_version(self):
+        container = jsonutils.dumps({'version': 2})
+        self.assertRaises(exception.BackupMetadataUnsupportedVersion,
+                          self.bak_meta_api.put, self.volume_id, container)
+
+    def test_v1_restore_factory(self):
+        fact = self.bak_meta_api._v1_restore_factory()
+
+        keys = [self.bak_meta_api.TYPE_TAG_VOL_BASE_META,
+                self.bak_meta_api.TYPE_TAG_VOL_META,
+                self.bak_meta_api.TYPE_TAG_VOL_GLANCE_META]
+
+        self.assertEqual(set(keys).symmetric_difference(set(fact.keys())),
+                         set([]))
+
+        for f in fact:
+            func = fact[f][0]
+            fields = fact[f][1]
+            func({}, self.volume_id, fields)
+
+    def test_restore_vol_glance_meta(self):
+        fields = {}
+        container = {}
+        self.bak_meta_api._save_vol_glance_meta(container, self.volume_id)
+        self.bak_meta_api._restore_vol_glance_meta(container, self.volume_id,
+                                                   fields)
+        self._add_metadata(vol_glance_meta=True)
+        self.bak_meta_api._save_vol_glance_meta(container, self.volume_id)
+        self.bak_meta_api._restore_vol_glance_meta(container, self.volume_id,
+                                                   fields)
+
+    def test_restore_vol_meta(self):
+        fields = {}
+        container = {}
+        self.bak_meta_api._save_vol_meta(container, self.volume_id)
+        self.bak_meta_api._restore_vol_meta(container, self.volume_id, fields)
+        self._add_metadata(vol_meta=True)
+        self.bak_meta_api._save_vol_meta(container, self.volume_id)
+        self.bak_meta_api._restore_vol_meta(container, self.volume_id, fields)
+
+    def test_restore_vol_base_meta(self):
+        fields = {}
+        container = {}
+        self.bak_meta_api._save_vol_base_meta(container, self.volume_id)
+        self.bak_meta_api._restore_vol_base_meta(container, self.volume_id,
+                                                 fields)
+
+    def test_filter(self):
+        metadata = {'a': 1, 'b': 2, 'c': 3}
+        self.assertEqual(metadata, self.bak_meta_api._filter(metadata, []))
+        self.assertEqual({'b': 2}, self.bak_meta_api._filter(metadata, ['b']))
+        self.assertEqual({}, self.bak_meta_api._filter(metadata, ['d']))
+        self.assertEqual({'a': 1, 'b': 2},
+                         self.bak_meta_api._filter(metadata, ['a', 'b']))
+
+    def test_save_vol_glance_meta(self):
+        container = {}
+        self.bak_meta_api._save_vol_glance_meta(container, self.volume_id)
+
+    def test_save_vol_meta(self):
+        container = {}
+        self.bak_meta_api._save_vol_meta(container, self.volume_id)
+
+    def test_save_vol_base_meta(self):
+        container = {}
+        self.bak_meta_api._save_vol_base_meta(container, self.volume_id)
+
+    def test_is_serializable(self):
+        data = {'foo': 'bar'}
+        if self.bak_meta_api._is_serializable(data):
+            jsonutils.dumps(data)
+
+    def test_is_not_serializable(self):
+        data = {'foo': 'bar'}
+        with mock.patch.object(jsonutils, 'dumps') as mock_dumps:
+            mock_dumps.side_effect = TypeError
+            self.assertFalse(self.bak_meta_api._is_serializable(data))
+            mock_dumps.assert_called_once()
+
+    def tearDown(self):
+        super(BackupMetadataAPITestCase, self).tearDown()
index 56f3ec55d6979a46dc058db2b1eb5b61c7dec58b..71ba221a1bb73c30ac3a2d21eb816ea2a6989e81 100644 (file)
 #osapi_max_request_body_size=114688
 
 
+#
+# Options defined in cinder.backup.driver
+#
+
+# Backup metadata version to be used when backing up volume
+# metadata. If this number is bumped, make sure the service
+# doing the restore supports the new version. (integer value)
+#backup_metadata_version=1
+
+
 #
 # Options defined in cinder.backup.drivers.ceph
 #