]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Added incremental backup support to Ceph backup driver
authorEdward Hope-Morley <edward.hope-morley@canonical.com>
Tue, 2 Jul 2013 08:40:44 +0000 (09:40 +0100)
committerEdward Hope-Morley <edward.hope-morley@canonical.com>
Thu, 18 Jul 2013 20:21:32 +0000 (21:21 +0100)
The Ceph backup driver is now capable of doing differential
and incremental backups between or within Ceph clusters.

Implements: blueprint cinder-backup-to-ceph

Change-Id: Id59bf1963c6d35aae4baf6f49be17340982c205c

cinder/backup/drivers/ceph.py
cinder/exception.py
cinder/tests/backup/fake_rados.py
cinder/tests/test_backup_ceph.py
cinder/tests/test_rbd.py
cinder/volume/drivers/rbd.py
etc/cinder/cinder.conf.sample

index 7b56b7a0ebd2aacc08a66aecff6e3983dc945dff..d9c7341946d3cb8eb2344942a53ef7a987ab4ccf 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-"""Ceph Backup Service Implementation"""
+"""Ceph Backup Service Implementation.
 
-import os
-import time
+This driver supports backing up volumes of any type to a Ceph backend store. It
+is also capable of detecting whether the volume to be backed up is a Ceph RBD
+volume and if so, attempts to perform incremental/differential backups.
+
+Support is also included for the following in the case of source volume being a
+Ceph RBD volume:
+
+    * backing up within the same Ceph pool (not recommended)
+    * backing up between different Ceph pools
+    * backing up between different Ceph clusters
+
+At the time of writing, differential backup support in Ceph/librbd was quite
+new so this driver accounts for this by first attempting differential backup
+and falling back to full backup/copy if the former fails.
+
+If incremental backups are used, multiple backups of the same volume are stored
+as snapshots so that minimal space is consumed in the backup store and
+restoring the volume takes a far reduced amount of time compared to a full
+copy.
+
+Note that Cinder supports restoring to a new volume or the original volume the
+backup was taken from. For the latter case, a full copy is enforced since this
+was deemed the safest action to take. It is therefore recommended to always
+restore to a new volume (default).
+"""
 
 import eventlet
-from oslo.config import cfg
+import os
+import re
+import time
 
 from cinder.backup.driver import BackupDriver
 from cinder import exception
 from cinder.openstack.common import log as logging
 from cinder import units
-import cinder.volume.drivers.rbd as rbddriver
+from cinder import utils
+import cinder.volume.drivers as drivers
+from oslo.config import cfg
 
 try:
     import rados
@@ -57,58 +84,93 @@ CONF.register_opts(service_opts)
 
 
 class CephBackupDriver(BackupDriver):
-    """Backup up Cinder volumes to Ceph Object Store"""
+    """Backup up Cinder volumes to Ceph Object Store.
 
-    def __init__(self, context, db_driver=None):
+    This class enables backing up Cinder volumes to a Ceph object store.
+    Backups may be stored in their own pool or even cluster. Store location is
+    defined by the Ceph conf file and Service config options supplied.
+
+    If the source volume is itself an RBD volume, the backup will be performed
+    using incremental differential backups which *should* give a performance
+    gain.
+    """
+
+    def __init__(self, context, db_driver=None, execute=None):
         super(CephBackupDriver, self).__init__(db_driver)
         self.rbd = rbd
         self.rados = rados
         self.context = context
         self.chunk_size = CONF.backup_ceph_chunk_size
-        if self._supports_stripingv2():
+        self._execute = execute or utils.execute
+
+        if self._supports_stripingv2:
             self.rbd_stripe_unit = CONF.backup_ceph_stripe_unit
             self.rbd_stripe_count = CONF.backup_ceph_stripe_count
         else:
-            LOG.info("rbd striping not supported - ignoring conf settings "
-                     "for rbd striping")
+            LOG.info(_("rbd striping not supported - ignoring configuration "
+                       "settings for rbd striping"))
             self.rbd_stripe_count = 0
             self.rbd_stripe_unit = 0
 
-        self._ceph_user = str(CONF.backup_ceph_user)
-        self._ceph_pool = str(CONF.backup_ceph_pool)
-        self._ceph_conf = str(CONF.backup_ceph_conf)
+        self._ceph_backup_user = str(CONF.backup_ceph_user)
+        self._ceph_backup_pool = str(CONF.backup_ceph_pool)
+        self._ceph_backup_conf = str(CONF.backup_ceph_conf)
 
-    def _supports_layering(self):
-        """
-        Determine whether copy-on-write is supported by our version of librbd
+    def _validate_string_args(self, *args):
+        """Ensure all args are non-None and non-empty."""
+        return all(args)
+
+    def _ceph_args(self, user, conf=None, pool=None):
+        """Create default ceph args for executing rbd commands.
+
+        If no --conf is provided, rbd will look in the default locations e.g.
+        /etc/ceph/ceph.conf
         """
+
+        # Make sure user arg is valid since rbd command may not fail if
+        # invalid/no user provided, resulting in unexpected behaviour.
+        if not self._validate_string_args(user):
+            raise exception.BackupInvalidCephArgs(_("invalid user '%s'") %
+                                                  (user))
+
+        args = ['--id', user]
+        if conf:
+            args += ['--conf', conf]
+        if pool:
+            args += '--pool', pool
+
+        return args
+
+    @property
+    def _supports_layering(self):
+        """Determine if copy-on-write is supported by our version of librbd."""
         return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
 
+    @property
     def _supports_stripingv2(self):
-        """
-        Determine whether striping is supported by our version of librbd
-        """
+        """Determine if striping is supported by our version of librbd."""
         return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2')
 
     def _get_rbd_support(self):
+        """Determine RBD features supported by our version of librbd."""
         old_format = True
         features = 0
-        if self._supports_layering():
+        if self._supports_layering:
             old_format = False
             features |= self.rbd.RBD_FEATURE_LAYERING
-        if self._supports_stripingv2():
+        if self._supports_stripingv2:
             old_format = False
             features |= self.rbd.RBD_FEATURE_STRIPINGV2
 
         return (old_format, features)
 
     def _connect_to_rados(self, pool=None):
-        """Establish connection to the Ceph cluster"""
-        client = self.rados.Rados(rados_id=self._ceph_user,
-                                  conffile=self._ceph_conf)
+        """Establish connection to the backup Ceph cluster."""
+        client = self.rados.Rados(rados_id=self._ceph_backup_user,
+                                  conffile=self._ceph_backup_conf)
         try:
             client.connect()
-            pool_to_open = str(pool or self._ceph_pool)
+            pool_to_open = str(pool or self._ceph_backup_pool)
             ioctx = client.open_ioctx(pool_to_open)
             return client, ioctx
         except self.rados.Error:
@@ -117,157 +179,728 @@ class CephBackupDriver(BackupDriver):
             raise
 
     def _disconnect_from_rados(self, client, ioctx):
-        """Terminate connection with the Ceph cluster"""
+        """Terminate connection with the backup Ceph cluster."""
         # closing an ioctx cannot raise an exception
         ioctx.close()
         client.shutdown()
 
-    def _get_backup_base_name(self, volume_id, backup_id):
-        """Return name of base image used for backup."""
-        # Ensure no unicode
-        return str("volume-%s.backup.%s" % (volume_id, backup_id))
+    def _get_backup_base_name(self, volume_id, backup_id=None,
+                              diff_format=False):
+        """Return name of base image used for backup.
 
-    def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False):
-        """
-        Transfer data between file and rbd. If destination is rbd, source is
-        assumed to be file, otherwise source is assumed to be rbd.
+        Incremental backups use a new base name so we support old and new style
+        format.
         """
+        # Ensure no unicode
+        if diff_format:
+            return str("volume-%s.backup.base" % (volume_id))
+        else:
+            if backup_id is None:
+                msg = _("backup_id required")
+                raise exception.InvalidParameterValue(msg)
+            return str("volume-%s.backup.%s" % (volume_id, backup_id))
+
+    def _transfer_data(self, src, src_name, dest, dest_name, length):
+        """Transfer data between files (Python IO objects)."""
+        LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") %
+                  {'src': src_name, 'dest': dest_name})
+
         chunks = int(length / self.chunk_size)
-        LOG.debug("transferring %s chunks of %s bytes to '%s'" %
-                  (chunks, self.chunk_size, dest_name))
+        LOG.debug(_("%(chunks)s chunks of %(bytes)s bytes to be transferred") %
+                  {'chunks': chunks, 'bytes': self.chunk_size})
+
         for chunk in xrange(0, chunks):
-            offset = chunk * self.chunk_size
             before = time.time()
-
-            if dest_is_rbd:
-                dest.write(src.read(self.chunk_size), offset)
-                # note(dosaboy): librbd writes are synchronous so flush() will
-                # have not effect. Also, flush only supported in more recent
-                # versions of librbd.
-            else:
-                dest.write(src.read(offset, self.chunk_size))
-                dest.flush()
-
+            data = src.read(self.chunk_size)
+            dest.write(data)
+            dest.flush()
             delta = (time.time() - before)
             rate = (self.chunk_size / delta) / 1024
-            LOG.debug("transferred chunk %s of %s (%dK/s)" %
-                      (chunk, chunks, rate))
+            LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s "
+                         "(%(rate)dK/s)") %
+                       {'chunk': chunk, 'chunks': chunks,
+                        'rate': rate}))
 
             # yield to any other pending backups
             eventlet.sleep(0)
 
         rem = int(length % self.chunk_size)
         if rem:
-            LOG.debug("transferring remaining %s bytes" % (rem))
-            offset = (length - rem)
-            if dest_is_rbd:
-                dest.write(src.read(rem), offset)
-                # note(dosaboy): librbd writes are synchronous so flush() will
-                # have not effect. Also, flush only supported in more recent
-                # versions of librbd.
-            else:
-                dest.write(src.read(offset, rem))
-                dest.flush()
-
+            LOG.debug(_("transferring remaining %s bytes") % (rem))
+            data = src.read(rem)
+            dest.write(data)
+            dest.flush()
             # yield to any other pending backups
             eventlet.sleep(0)
 
-    def _backup_volume_from_file(self, backup_name, backup_size, volume_file):
-        """Backup a volume from file stream"""
-        LOG.debug("performing backup from file")
+    def _create_base_image(self, name, size, rados_client):
+        """Create a base backup image.
 
+        This will be the base image used for storing differential exports.
+        """
+        LOG.debug(_("creating base image '%s'") % (name))
         old_format, features = self._get_rbd_support()
+        self.rbd.RBD().create(ioctx=rados_client.ioctx,
+                              name=name,
+                              size=size,
+                              old_format=old_format,
+                              features=features,
+                              stripe_unit=self.rbd_stripe_unit,
+                              stripe_count=self.rbd_stripe_count)
+
+    def _delete_backup_snapshots(self, rados_client, base_name, backup_id):
+        """Delete any snapshots associated with this backup.
+
+        A backup should have at most *one* associated snapshot.
+
+        This is required before attempting to delete the base image. The
+        snapshots on the original volume can be left as they will be purged
+        when the volume is deleted.
+        """
+        backup_snaps = None
+        base_rbd = self.rbd.Image(rados_client.ioctx, base_name)
+        try:
+            snap_name = self._get_backup_snap_name(base_rbd, base_name,
+                                                   backup_id)
+            if snap_name:
+                LOG.debug(_("deleting backup snapshot='%s'") % (snap_name))
+                base_rbd.remove_snap(snap_name)
+            else:
+                LOG.debug(_("no backup snapshot to delete"))
 
-        with rbddriver.RADOSClient(self, self._ceph_pool) as client:
+            # Now check whether any snapshots remain on the base image
+            backup_snaps = self.get_backup_snaps(base_rbd)
+        finally:
+            base_rbd.close()
+
+        if backup_snaps:
+            return len(backup_snaps)
+        else:
+            return 0
+
+    def _try_delete_base_image(self, backup_id, volume_id, base_name=None):
+        """Try to delete backup RBD image.
+
+        If the rbd image is a base image for incremental backups, it may have
+        snapshots. Delete the snapshot associated with backup_id and if the
+        image has no more snapshots, delete it. Otherwise return.
+
+        If no base name is provided try normal (full) format then diff format
+        image name.
+
+        If a base name is provided but does not exist, ImageNotFound will be
+        raised.
+
+        If the image is busy, a number of retries will be performed if
+        ImageBusy is received, after which the exception will be propagated to
+        the caller.
+        """
+        retries = 3
+        delay = 5
+        try_diff_format = False
+
+        if base_name is None:
+            try_diff_format = True
+
+            base_name = self._get_backup_base_name(volume_id, backup_id)
+            LOG.debug(_("trying diff format name format basename='%s'") %
+                      (base_name))
+
+        with drivers.rbd.RADOSClient(self) as client:
+            rbd_exists, base_name = \
+                self._rbd_image_exists(base_name, volume_id, client,
+                                       try_diff_format=try_diff_format)
+            if not rbd_exists:
+                raise self.rbd.ImageNotFound(_("image %s not found") %
+                                             (base_name))
+
+            while retries >= 0:
+                # First delete associated snapshot (if exists)
+                rem = self._delete_backup_snapshots(client, base_name,
+                                                    backup_id)
+                if rem:
+                    msg = (_("base image still has %s snapshots so not "
+                             "deleting base image") % (rem))
+                    LOG.info(msg)
+                    return
+
+                LOG.info(_("deleting base image='%s'") % (base_name))
+                # Delete base if no more snapshots
+                try:
+                    self.rbd.RBD().remove(client.ioctx, base_name)
+                except self.rbd.ImageBusy as exc:
+                    # Allow a retry if the image is busy
+                    if retries > 0:
+                        LOG.info((_("image busy, retrying %(retries)s "
+                                    "more time(s) in %(delay)ss") %
+                                  {'retries': retries, 'delay': delay}))
+                        eventlet.sleep(delay)
+                    else:
+                        LOG.error(_("max retries reached - raising error"))
+                        raise exc
+                else:
+                    LOG.debug(_("base backup image='%s' deleted)") %
+                              (base_name))
+                    retries = 0
+                finally:
+                    retries -= 1
+
+    def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
+                           src_user, src_conf, dest_user, dest_conf,
+                           src_snap=None, from_snap=None):
+        """Backup only extents changed between two points.
+
+        If no snapshot is provided, the diff extents will be all those changed
+        since the rbd volume/base was created, otherwise it will be those
+        changed since the snapshot was created.
+        """
+        LOG.debug(_("performing differential transfer from '%(src)s' to "
+                    "'%(dest)s'") %
+                  {'src': src_name, 'dest': dest_name})
+
+        # NOTE(dosaboy): Need to be tolerant of clusters/clients that do
+        # not support these operations since at the time of writing they
+        # were very new.
+
+        src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
+        dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
+
+        try:
+            cmd = ['rbd', 'export-diff'] + src_ceph_args
+            if from_snap is not None:
+                cmd += ['--from-snap', from_snap]
+            if src_snap:
+                path = str("%s/%s@%s" % (src_pool, src_name, src_snap))
+            else:
+                path = str("%s/%s" % (src_pool, src_name))
+            cmd += [path, '-']
+            out, err = self._execute(*cmd)
+        except (exception.ProcessExecutionError, exception.Error) as exc:
+            LOG.info(_("rbd export-diff failed - %s") % (str(exc)))
+            raise exception.BackupRBDOperationFailed("rbd export-diff failed")
+
+        try:
+            cmd = ['rbd', 'import-diff'] + dest_ceph_args
+            cmd += ['-', str("%s/%s" % (dest_pool, dest_name))]
+            out, err = self._execute(*cmd, process_input=out)
+        except (exception.ProcessExecutionError, exception.Error) as exc:
+            LOG.info(_("rbd import-diff failed - %s") % (str(exc)))
+            raise exception.BackupRBDOperationFailed("rbd import-diff failed")
+
+    def _rbd_image_exists(self, name, volume_id, client,
+                          try_diff_format=False):
+        """Return tuple (exists, name)."""
+        rbds = self.rbd.RBD().list(client.ioctx)
+        if name not in rbds:
+            msg = _("image '%s' not found - trying diff format name") % (name)
+            LOG.debug(msg)
+            if try_diff_format:
+                name = self._get_backup_base_name(volume_id, diff_format=True)
+                if name not in rbds:
+                    msg = _("diff format image '%s' not found") % (name)
+                    LOG.debug(msg)
+                    return False, name
+            else:
+                return False, name
+
+        return True, name
+
+    def _snap_exists(self, base_name, snap_name, client):
+        """Return True if snapshot exists in base image."""
+        base_rbd = self.rbd.Image(client.ioctx, base_name)
+        try:
+            snaps = base_rbd.list_snaps()
+        finally:
+            base_rbd.close()
+
+        if snaps is None:
+            return False
+
+        for snap in snaps:
+            if snap['name'] == snap_name:
+                return True
+
+        return False
+
+    def _backup_rbd(self, backup_id, volume_id, volume_file, volume_name,
+                    length):
+        """Create a incremental backup from an RBD image."""
+        rbd_user = volume_file.rbd_user
+        rbd_pool = volume_file.rbd_pool
+        rbd_conf = volume_file.rbd_conf
+        source_rbd_image = volume_file.rbd_image
+
+        # Identify our --from-snap point (if one exists)
+        from_snap = self._get_most_recent_snap(source_rbd_image)
+        LOG.debug(_("using --from-snap '%s'") % from_snap)
+
+        backup_name = self._get_backup_base_name(volume_id, diff_format=True)
+        image_created = False
+        force_full_backup = False
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+            # If from_snap does not exist in the destination, this implies a
+            # previous backup has failed. In this case we will force a full
+            # backup.
+            #
+            # TODO(dosaboy): find a way to repair the broken backup
+            #
+            if backup_name not in self.rbd.RBD().list(ioctx=client.ioctx):
+                # If a from_snap is defined then we cannot proceed (see above)
+                if from_snap is not None:
+                    force_full_backup = True
+
+                # Create new base image
+                self._create_base_image(backup_name, length, client)
+                image_created = True
+            else:
+                # If a from_snap is defined but does not exist in the back base
+                # then we cannot proceed (see above)
+                if not self._snap_exists(backup_name, from_snap, client):
+                    force_full_backup = True
+
+        if force_full_backup:
+            errmsg = (_("snap='%(snap)s' does not exist in base "
+                        "image='%(base)s' - aborting incremental backup") %
+                      {'snap': from_snap, 'base': backup_name})
+            LOG.info(errmsg)
+            # Raise this exception so that caller can try another
+            # approach
+            raise exception.BackupRBDOperationFailed(errmsg)
+
+        # Snapshot source volume so that we have a new point-in-time
+        new_snap = self._get_new_snap_name(backup_id)
+        LOG.debug(_("creating backup snapshot='%s'") % (new_snap))
+        source_rbd_image.create_snap(new_snap)
+
+        # Attempt differential backup. If this fails, perhaps because librbd
+        # or Ceph cluster version does not support it, do a full backup
+        # instead.
+        #
+        # TODO(dosaboy): find a way to determine if the operation is supported
+        #                rather than brute force approach.
+        try:
+            before = time.time()
+            self._rbd_diff_transfer(volume_name, rbd_pool, backup_name,
+                                    self._ceph_backup_pool,
+                                    src_user=rbd_user,
+                                    src_conf=rbd_conf,
+                                    dest_user=self._ceph_backup_user,
+                                    dest_conf=self._ceph_backup_conf,
+                                    src_snap=new_snap,
+                                    from_snap=from_snap)
+
+            LOG.debug(_("differential backup transfer completed in %.4fs") %
+                      (time.time() - before))
+
+            # We don't need the previous snapshot (if there was one) anymore so
+            # delete it.
+            if from_snap:
+                source_rbd_image.remove_snap(from_snap)
+
+        except exception.BackupRBDOperationFailed:
+            LOG.debug(_("differential backup transfer failed"))
+
+            # Clean up if image was created as part of this operation
+            if image_created:
+                self._try_delete_base_image(backup_id, volume_id,
+                                            base_name=backup_name)
+
+            # Delete snapshot
+            LOG.debug(_("deleting backup snapshot='%s'") % (new_snap))
+            source_rbd_image.remove_snap(new_snap)
+
+            # Re-raise the exception so that caller can try another approach
+            raise
+
+    def _file_is_rbd(self, volume_file):
+        """Returns True if the volume_file is actually an RBD image."""
+        return hasattr(volume_file, 'rbd_image')
+
+    def _full_backup(self, backup_id, volume_id, src_volume, src_name, length):
+        """Perform a full backup of src volume.
+
+        First creates a base backup image in our backup location then performs
+        an chunked copy of all data from source volume to a new backup rbd
+        image.
+        """
+        backup_name = self._get_backup_base_name(volume_id, backup_id)
+
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+            # First create base backup image
+            old_format, features = self._get_rbd_support()
+            LOG.debug(_("creating base image='%s'") % (backup_name))
             self.rbd.RBD().create(ioctx=client.ioctx,
                                   name=backup_name,
-                                  size=backup_size,
+                                  size=length,
                                   old_format=old_format,
                                   features=features,
                                   stripe_unit=self.rbd_stripe_unit,
                                   stripe_count=self.rbd_stripe_count)
 
+            LOG.debug(_("copying data"))
             dest_rbd = self.rbd.Image(client.ioctx, backup_name)
             try:
-                self._transfer_data(volume_file, dest_rbd, backup_name,
-                                    backup_size, dest_is_rbd=True)
+                rbd_meta = drivers.rbd.RBDImageMetadata(dest_rbd,
+                                                        self._ceph_backup_pool,
+                                                        self._ceph_backup_user,
+                                                        self._ceph_backup_conf)
+                rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
+                self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
+                                    length)
             finally:
                 dest_rbd.close()
 
+    @staticmethod
+    def backup_snapshot_name_pattern():
+        """Returns the pattern used to match backup snapshots.
+
+        It is essential that snapshots created for purposes other than backups
+        do not have this name format.
+        """
+        return r"^backup\.([a-z0-9\-]+?)\.snap\.(.+)$"
+
+    @classmethod
+    def get_backup_snaps(cls, rbd_image, sort=False):
+        """Get all backup snapshots for the given rbd image.
+
+        NOTE: this call is made public since these snapshots must be deleted
+              before the base volume can be deleted.
+        """
+        snaps = rbd_image.list_snaps()
+
+        backup_snaps = []
+        for snap in snaps:
+            search_key = cls.backup_snapshot_name_pattern()
+            result = re.search(search_key, snap['name'])
+            if result:
+                backup_snaps.append({'name': result.group(0),
+                                     'backup_id': result.group(1),
+                                     'timestamp': result.group(2)})
+
+        if sort:
+            # Sort into ascending order of timestamp
+            backup_snaps.sort(key=lambda x: x['timestamp'], reverse=True)
+
+        return backup_snaps
+
+    def _get_new_snap_name(self, backup_id):
+        return str("backup.%s.snap.%s" % (backup_id, time.time()))
+
+    def _get_backup_snap_name(self, rbd_image, name, backup_id):
+        """Return the name of the snapshot associated with backup_id.
+
+        The rbd image provided must be the base image used for an incremental
+        backup.
+
+        A back is only allowed to have one associated snapshot. If more than
+        one is found, exception.BackupOperationError is raised.
+        """
+        snaps = self.get_backup_snaps(rbd_image)
+
+        LOG.debug(_("looking for snapshot of backup base '%s'") % (name))
+
+        if not snaps:
+            LOG.debug(_("backup base '%s' has no snapshots") % (name))
+            return None
+
+        snaps = [snap['name'] for snap in snaps
+                 if snap['backup_id'] == backup_id]
+
+        if not snaps:
+            LOG.debug(_("backup '%s' has no snapshot") % (backup_id))
+            return None
+
+        if len(snaps) > 1:
+            msg = (_("backup should only have one snapshot but instead has %s")
+                   % (len(snaps)))
+            LOG.error(msg)
+            raise exception.BackupOperationError(msg)
+
+        LOG.debug(_("found snapshot '%s'") % (snaps[0]))
+        return snaps[0]
+
+    def _get_most_recent_snap(self, rbd_image):
+        """Get the most recent backup snapshot of the provided image.
+
+        Returns name of most recent backup snapshot or None if there are no
+        backup snapshot.
+        """
+        backup_snaps = self.get_backup_snaps(rbd_image, sort=True)
+        if not backup_snaps:
+            return None
+
+        return backup_snaps[0]['name']
+
+    def _get_volume_size_gb(self, volume):
+        """Return the size in gigabytes of the given volume.
+
+        Raises exception.InvalidParameterValue if voluem size is 0.
+        """
+        if int(volume['size']) == 0:
+            raise exception.InvalidParameterValue("need non-zero volume size")
+
+        return int(volume['size']) * units.GiB
+
     def backup(self, backup, volume_file):
-        """Backup the given volume to Ceph object store"""
+        """Backup the given volume to Ceph object store.
+
+        If the source volume is an RBD we will attempt to do an
+        incremental/differential backup, otherwise a full copy is performed.
+        If this fails we will attempt to fall back to full copy.
+        """
         backup_id = backup['id']
         volume = self.db.volume_get(self.context, backup['volume_id'])
-        backup_name = self._get_backup_base_name(volume['id'], backup_id)
+        volume_id = volume['id']
+        volume_name = volume['name']
 
-        LOG.debug("Starting backup of volume='%s' to rbd='%s'" %
-                  (volume['name'], backup_name))
+        LOG.debug(_("Starting backup of volume='%s'") % volume_name)
 
-        if int(volume['size']) == 0:
-            raise exception.InvalidParameterValue("need non-zero volume size")
-        else:
-            backup_size = int(volume['size']) * units.GiB
+        # Ensure we are at the beginning of the volume
+        volume_file.seek(0)
+        length = self._get_volume_size_gb(volume)
 
-        if volume_file:
-            self._backup_volume_from_file(backup_name, backup_size,
-                                          volume_file)
+        do_full_backup = False
+        if self._file_is_rbd(volume_file):
+            # If volume an RBD, attempt incremental backup.
+            try:
+                self._backup_rbd(backup_id, volume_id, volume_file,
+                                 volume_name, length)
+            except exception.BackupRBDOperationFailed:
+                LOG.debug(_("forcing full backup"))
+                do_full_backup = True
         else:
-            errmsg = ("No volume_file was provided so I cannot do requested "
-                      "backup (id=%s)" % (backup_id))
-            raise exception.BackupVolumeInvalidType(errmsg)
+            do_full_backup = True
 
-        self.db.backup_update(self.context, backup['id'],
-                              {'container': self._ceph_pool})
+        if do_full_backup:
+            self._full_backup(backup_id, volume_id, volume_file,
+                              volume_name, length)
 
-        LOG.debug(_("backup '%s' finished.") % (backup_id))
+        self.db.backup_update(self.context, backup_id,
+                              {'container': self._ceph_backup_pool})
 
-    def restore(self, backup, volume_id, volume_file):
-        """Restore the given volume backup from Ceph object store"""
-        volume = self.db.volume_get(self.context, volume_id)
-        backup_name = self._get_backup_base_name(backup['volume_id'],
-                                                 backup['id'])
+        LOG.debug(_("backup '%s' finished.") % (backup_id))
 
-        LOG.debug('starting backup restore from Ceph backup=%s '
-                  'to volume=%s' % (backup['id'], volume['name']))
+    def _full_restore(self, backup_id, volume_id, dest_file, dest_name,
+                      length, src_snap=None):
+        """Restore the given volume file from backup RBD.
 
-        # Ensure we are at the beginning of the volume
-        volume_file.seek(0)
+        This will result in all extents being copied from source to destination
+        """
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
 
-        backup_size = int(volume['size']) * units.GiB
+            if src_snap:
+                # If a source snapshot is provided we assume the base is diff
+                # format.
+                backup_name = self._get_backup_base_name(volume_id,
+                                                         diff_format=True)
+            else:
+                backup_name = self._get_backup_base_name(volume_id, backup_id)
 
-        with rbddriver.RADOSClient(self, self._ceph_pool) as client:
-            src_rbd = self.rbd.Image(client.ioctx, backup_name)
+            # Retrieve backup volume
+            src_rbd = self.rbd.Image(client.ioctx, backup_name,
+                                     snapshot=src_snap)
             try:
-                self._transfer_data(src_rbd, volume_file, volume['name'],
-                                    backup_size)
+                rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd,
+                                                        self._ceph_backup_pool,
+                                                        self._ceph_backup_user,
+                                                        self._ceph_backup_conf)
+                rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
+                self._transfer_data(rbd_fd, backup_name, dest_file, dest_name,
+                                    length)
             finally:
                 src_rbd.close()
 
-        # Be tolerant to IO implementations that do not support fileno()
+    def _restore_rbd(self, base_name, volume_file, volume_name, restore_point):
+        """Restore RBD volume from RBD image."""
+        rbd_user = volume_file.rbd_user
+        rbd_pool = volume_file.rbd_pool
+        rbd_conf = volume_file.rbd_conf
+
+        LOG.debug(_("trying incremental restore from base='%(base)s' "
+                    "snap='%(snap)s'") %
+                  {'base': base_name, 'snap': restore_point})
+        before = time.time()
         try:
-            fileno = volume_file.fileno()
-        except IOError:
-            LOG.info("volume_file does not support fileno() so skipping "
-                     "fsync()")
+            self._rbd_diff_transfer(base_name, self._ceph_backup_pool,
+                                    volume_name, rbd_pool,
+                                    src_user=self._ceph_backup_user,
+                                    src_conf=self._ceph_backup_conf,
+                                    dest_user=rbd_user, dest_conf=rbd_conf,
+                                    src_snap=restore_point)
+        except exception.BackupRBDOperationFailed:
+            LOG.exception(_("differential restore failed, trying full "
+                            "restore"))
+            raise
+
+        LOG.debug(_("restore transfer completed in %.4fs") %
+                  (time.time() - before))
+
+    def _num_backup_snaps(self, backup_base_name):
+        """Return the number of snapshots that exist on the base image."""
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+            base_rbd = self.rbd.Image(client.ioctx, backup_base_name)
+            try:
+                snaps = self.get_backup_snaps(base_rbd)
+            finally:
+                base_rbd.close()
+
+        if snaps:
+            return len(snaps)
         else:
-            os.fsync(fileno)
+            return 0
 
-        LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id))
+    def _get_restore_point(self, base_name, backup_id):
+        """Get restore point snapshot name for incremental backup.
 
-    def delete(self, backup):
-        """Delete the given backup from Ceph object store"""
+        If the backup was not incremental None is returned.
+        """
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+            base_rbd = self.rbd.Image(client.ioctx, base_name)
+            try:
+                restore_point = self._get_backup_snap_name(base_rbd, base_name,
+                                                           backup_id)
+            finally:
+                base_rbd.close()
+
+        return restore_point
+
+    def _rbd_has_extents(self, rbd_volume):
+        """Check whether the given rbd volume has extents.
+
+        Return True if has extents, otherwise False.
+        """
+        extents = []
+
+        def iter_cb(offset, length, exists):
+            if exists:
+                extents.append(length)
+
+        rbd_volume.diff_iterate(0, rbd_volume.size(), None, iter_cb)
+
+        if extents:
+            LOG.debug("rbd has %s extents" % (sum(extents)))
+            return True
+
+        return False
+
+    def _diff_restore_allowed(self, base_name, backup, volume, volume_file,
+                              rados_client):
+        """Determine whether a differential restore is possible/allowed.
+
+        In order for a differential restore to be performed we need:
+            * destination volume must be RBD
+            * destination volume must have zero extents
+            * backup base image must exist
+            * backup must have a restore point
+
+        Returns True if differential restore is allowed, False otherwise.
+        """
+        not_allowed = (False, None)
+
+        # If the volume we are restoring to is the volume the backup was made
+        # from, force a full restore since a diff will not work in this case.
+        if volume['id'] == backup['volume_id']:
+            LOG.debug("dest volume is original volume - forcing full copy")
+            return not_allowed
+
+        if self._file_is_rbd(volume_file):
+            rbd_exists, base_name = self._rbd_image_exists(base_name,
+                                                           backup['volume_id'],
+                                                           rados_client)
+
+            if not rbd_exists:
+                return not_allowed
+
+            # Get the restore point. If no restore point is found, we assume
+            # that the backup was not performed using diff/incremental methods
+            # so we enforce full copy.
+            restore_point = self._get_restore_point(base_name, backup['id'])
+            if restore_point:
+                # If the destination volume has extents we cannot allow a diff
+                # restore.
+                if self._rbd_has_extents(volume_file.rbd_image):
+                    # We return the restore point so that a full copy is done
+                    # from snapshot.
+                    LOG.debug("destination has extents - forcing full copy")
+                    return False, restore_point
+
+                return True, restore_point
+            else:
+                LOG.info(_("no restore point found for backup='%s', forcing "
+                           "full copy") % (backup['id']))
+
+        return not_allowed
+
+    def _try_restore(self, backup, volume, volume_file):
+        """Attempt to restore volume from backup."""
+        volume_name = volume['name']
         backup_id = backup['id']
-        backup_name = self._get_backup_base_name(backup['volume_id'],
-                                                 backup_id)
+        backup_volume_id = backup['volume_id']
+        length = int(volume['size']) * units.GiB
 
-        LOG.debug('delete started for backup=%s', backup['id'])
+        base_name = self._get_backup_base_name(backup['volume_id'],
+                                               diff_format=True)
+
+        with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
+            diff_restore, restore_point = \
+                self._diff_restore_allowed(base_name, backup, volume,
+                                           volume_file, client)
+
+        if diff_restore:
+            try:
+                do_full_restore = False
+                self._restore_rbd(base_name, volume_file, volume_name,
+                                  restore_point)
+            except exception.BackupRBDOperationFailed:
+                LOG.debug(_("forcing full restore"))
+                do_full_restore = True
+        else:
+            do_full_restore = True
+
+        if do_full_restore:
+            # Otherwise full copy
+            self._full_restore(backup_id, backup_volume_id, volume_file,
+                               volume_name, length, src_snap=restore_point)
+
+    def restore(self, backup, volume_id, volume_file):
+        """Restore the given volume backup from Ceph object store."""
+        target_volume = self.db.volume_get(self.context, volume_id)
+        LOG.debug(_('starting restore from Ceph backup=%(src)s to '
+                    'volume=%(dest)s') %
+                  {'src': backup['id'], 'dest': target_volume['name']})
+
+        # Ensure we are at the beginning of the volume
+        volume_file.seek(0)
+
+        try:
+            self._try_restore(backup, target_volume, volume_file)
+
+            # Be tolerant to IO implementations that do not support fileno()
+            try:
+                fileno = volume_file.fileno()
+            except IOError:
+                LOG.info(_("volume_file does not support fileno() so skipping "
+                           "fsync()"))
+            else:
+                os.fsync(fileno)
+
+            LOG.debug(_('restore finished.'))
+        except exception.BackupOperationError as e:
+            LOG.error(_('restore finished with error - %s') % (e))
+            raise
+
+    def delete(self, backup):
+        """Delete the given backup from Ceph object store."""
+        backup_id = backup['id']
+        LOG.debug(_('delete started for backup=%s') % backup['id'])
 
         try:
-            with rbddriver.RADOSClient(self) as client:
-                self.rbd.RBD().remove(client.ioctx, backup_name)
+            self._try_delete_base_image(backup['id'], backup['volume_id'])
         except self.rbd.ImageNotFound:
-            LOG.warning("rbd image '%s' not found but continuing anyway so "
-                        "that db entry can be removed" % (backup_name))
+            msg = _("rbd image not found but continuing anyway so "
+                    "that db entry can be removed")
+            LOG.warning(msg)
+            LOG.info(_("delete '%s' finished with warning") % (backup_id))
 
         LOG.debug(_("delete '%s' finished") % (backup_id))
 
index ac0cc157a30916598b5a32abde1ef92bd5148c39..da35a30b21216bbc7997304ba9200057c8139483 100644 (file)
@@ -564,6 +564,18 @@ class ImageCopyFailure(Invalid):
     message = _("Failed to copy image to volume: %(reason)s")
 
 
+class BackupInvalidCephArgs(Invalid):
+    message = _("Invalid Ceph args provided for backup rbd operation")
+
+
+class BackupOperationError(Invalid):
+    message = _("An error has occurred during backup operation")
+
+
+class BackupRBDOperationFailed(Invalid):
+    message = _("Backup RBD operation failed")
+
+
 class BackupVolumeInvalidType(Invalid):
     message = _("Backup volume %(volume_id)s type not recognised.")
 
index 2169cc05fd7ad3db12a34f5d448c8ee8d7ec14bd..b9d3cd6a6957099dcb3ef9642759efe3848d5616 100644 (file)
@@ -16,7 +16,7 @@
 
 class mock_rados(object):
 
-    class mock_ioctx(object):
+    class ioctx(object):
         def __init__(self, *args, **kwargs):
             pass
 
@@ -32,7 +32,7 @@ class mock_rados(object):
             pass
 
         def open_ioctx(self, *args, **kwargs):
-            return mock_rados.mock_ioctx()
+            return mock_rados.ioctx()
 
         def shutdown(self, *args, **kwargs):
             pass
@@ -44,23 +44,43 @@ class mock_rados(object):
 
 class mock_rbd(object):
 
+    class ImageBusy(Exception):
+        def __init__(self, *args, **kwargs):
+            pass
+
+    class ImageNotFound(Exception):
+        def __init__(self, *args, **kwargs):
+            pass
+
     class Image(object):
 
         def __init__(self, *args, **kwargs):
             pass
 
-        def read(self, *args, **kwargs):
+        def create_snap(self, *args, **kwargs):
             pass
 
-        def write(self, *args, **kwargs):
+        def remove_snap(self, *args, **kwargs):
             pass
 
+        def read(self, *args, **kwargs):
+            raise NotImplementedError()
+
+        def write(self, *args, **kwargs):
+            raise NotImplementedError()
+
         def resize(self, *args, **kwargs):
-            pass
+            raise NotImplementedError()
 
-        def close(self, *args, **kwargs):
+        def close(self):
             pass
 
+        def list_snaps(self):
+            raise NotImplementedError()
+
+        def size(self):
+            raise NotImplementedError()
+
     class RBD(object):
 
         def __init__(self, *args, **kwargs):
@@ -72,6 +92,5 @@ class mock_rbd(object):
         def remove(self, *args, **kwargs):
             pass
 
-    class ImageNotFound(Exception):
-        def __init__(self, *args, **kwargs):
-            pass
+        def list(self, *args, **kwargs):
+            raise NotImplementedError()
index a25fccde74459adb364d1933c18dfac5f2a5dece..2dd69ace113354ae315284466b3f6badf5326a24 100644 (file)
 #    under the License.
 """ Tests for Ceph backup service """
 
+import eventlet
 import hashlib
 import os
 import tempfile
+import time
 import uuid
 
-from cinder.backup.drivers.ceph import CephBackupDriver
-from cinder.tests.backup.fake_rados import mock_rados
-from cinder.tests.backup.fake_rados import mock_rbd
-
 from cinder.backup.drivers import ceph
 from cinder import context
 from cinder import db
 from cinder import exception
 from cinder.openstack.common import log as logging
 from cinder import test
+from cinder.tests.backup.fake_rados import mock_rados
+from cinder.tests.backup.fake_rados import mock_rbd
+from cinder.volume.drivers import rbd as rbddriver
 
 LOG = logging.getLogger(__name__)
 
@@ -44,18 +45,30 @@ class BackupCephTestCase(test.TestCase):
         backup = {'id': backupid, 'size': size, 'volume_id': volid}
         return db.backup_create(self.ctxt, backup)['id']
 
+    def fake_execute_w_exception(*args, **kwargs):
+        raise exception.ProcessExecutionError()
+
+    def time_inc(self):
+        self.counter += 1
+        return self.counter
+
+    def _get_wrapped_rbd_io(self, rbd_image):
+        rbd_meta = rbddriver.RBDImageMetadata(rbd_image, 'pool_foo',
+                                              'user_foo', 'conf_foo')
+        return rbddriver.RBDImageIOWrapper(rbd_meta)
+
     def setUp(self):
         super(BackupCephTestCase, self).setUp()
         self.ctxt = context.get_admin_context()
 
-        self.vol_id = str(uuid.uuid4())
+        self.volume_id = str(uuid.uuid4())
         self.backup_id = str(uuid.uuid4())
 
         # Setup librbd stubs
         self.stubs.Set(ceph, 'rados', mock_rados)
         self.stubs.Set(ceph, 'rbd', mock_rbd)
 
-        self._create_backup_db_entry(self.backup_id, self.vol_id, 1)
+        self._create_backup_db_entry(self.backup_id, self.volume_id, 1)
 
         self.chunk_size = 1024
         self.num_chunks = 128
@@ -72,30 +85,108 @@ class BackupCephTestCase(test.TestCase):
 
         self.volume_file.seek(0)
 
-    def test_get_rbd_support(self):
-        service = CephBackupDriver(self.ctxt)
+        # Always trigger an exception if a command is executed since it should
+        # always be dealt with gracefully. At time of writing on rbd
+        # export/import-diff is executed and if they fail we expect to find
+        # alternative means of backing up.
+        fake_exec = self.fake_execute_w_exception
+        self.service = ceph.CephBackupDriver(self.ctxt, execute=fake_exec)
+
+        # Ensure that time.time() always returns more than the last time it was
+        # called to avoid div by zero errors.
+        self.counter = float(0)
+        self.stubs.Set(time, 'time', self.time_inc)
+        self.stubs.Set(eventlet, 'sleep', lambda *args: None)
 
-        self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING'))
-        self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2'))
+    def test_get_rbd_support(self):
+        self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
+        self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
 
-        oldformat, features = service._get_rbd_support()
+        oldformat, features = self.service._get_rbd_support()
         self.assertTrue(oldformat)
         self.assertEquals(features, 0)
 
-        service.rbd.RBD_FEATURE_LAYERING = 1
+        self.service.rbd.RBD_FEATURE_LAYERING = 1
 
-        oldformat, features = service._get_rbd_support()
+        oldformat, features = self.service._get_rbd_support()
         self.assertFalse(oldformat)
         self.assertEquals(features, 1)
 
-        service.rbd.RBD_FEATURE_STRIPINGV2 = 2
+        self.service.rbd.RBD_FEATURE_STRIPINGV2 = 2
 
-        oldformat, features = service._get_rbd_support()
+        oldformat, features = self.service._get_rbd_support()
         self.assertFalse(oldformat)
         self.assertEquals(features, 1 | 2)
 
-    def test_tranfer_data_from_rbd(self):
-        service = CephBackupDriver(self.ctxt)
+    def _set_common_backup_stubs(self, service):
+        self.stubs.Set(self.service, '_get_rbd_support', lambda: (True, 3))
+        self.stubs.Set(self.service, 'get_backup_snaps',
+                       lambda *args, **kwargs: None)
+
+        def rbd_size(inst):
+            return self.chunk_size * self.num_chunks
+
+        self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+
+    def _set_common_restore_stubs(self, service):
+        self._set_common_backup_stubs(self.service)
+
+        def rbd_size(inst):
+            return self.chunk_size * self.num_chunks
+
+        self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+
+    def test_get_most_recent_snap(self):
+        last = 'backup.%s.snap.9824923.1212' % (uuid.uuid4())
+
+        def list_snaps(inst, *args):
+            return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
+                    {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
+                    {'name': last},
+                    {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
+
+        self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+
+        snap = self.service._get_most_recent_snap(self.service.rbd.Image())
+
+        self.assertEquals(last, snap)
+
+    def test_get_backup_snap_name(self):
+        snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
+
+        def mock_get_backup_snaps(inst, *args):
+            return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4()),
+                     'backup_id': str(uuid.uuid4())},
+                    {'name': snap_name,
+                     'backup_id': self.backup_id}]
+
+        self.stubs.Set(self.service, 'get_backup_snaps', lambda *args: None)
+        name = self.service._get_backup_snap_name(self.service.rbd.Image(),
+                                                  'base_foo',
+                                                  self.backup_id)
+        self.assertIsNone(name)
+
+        self.stubs.Set(self.service, 'get_backup_snaps', mock_get_backup_snaps)
+        name = self.service._get_backup_snap_name(self.service.rbd.Image(),
+                                                  'base_foo',
+                                                  self.backup_id)
+        self.assertEquals(name, snap_name)
+
+    def test_get_backup_snaps(self):
+
+        def list_snaps(inst, *args):
+            return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
+                    {'name': 'backup.%s.wambam.6423868.2342' % (uuid.uuid4())},
+                    {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
+                    {'name': 'bbbackup.%s.snap.1321319.3235' % (uuid.uuid4())},
+                    {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
+
+        self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+        snaps = self.service.get_backup_snaps(self.service.rbd.Image())
+        self.assertTrue(len(snaps) == 3)
+
+    def test_transfer_data_from_rbd_to_file(self):
+        self._set_common_backup_stubs(self.service)
 
         with tempfile.NamedTemporaryFile() as test_file:
             self.volume_file.seek(0)
@@ -103,10 +194,11 @@ class BackupCephTestCase(test.TestCase):
             def read_data(inst, offset, length):
                 return self.volume_file.read(self.length)
 
-            self.stubs.Set(service.rbd.Image, 'read', read_data)
+            self.stubs.Set(self.service.rbd.Image, 'read', read_data)
 
-            service._transfer_data(service.rbd.Image(), test_file, 'foo',
-                                   self.length)
+            rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+            self.service._transfer_data(rbd_io, 'src_foo', test_file,
+                                        'dest_foo', self.length)
 
             checksum = hashlib.sha256()
             test_file.seek(0)
@@ -116,64 +208,118 @@ class BackupCephTestCase(test.TestCase):
             # Ensure the files are equal
             self.assertEquals(checksum.digest(), self.checksum.digest())
 
-    def test_tranfer_data_to_rbd(self):
-        service = CephBackupDriver(self.ctxt)
+    def test_transfer_data_from_rbd_to_rbd(self):
+        def rbd_size(inst):
+            return self.chunk_size * self.num_chunks
 
         with tempfile.NamedTemporaryFile() as test_file:
+            self.volume_file.seek(0)
             checksum = hashlib.sha256()
 
+            def read_data(inst, offset, length):
+                return self.volume_file.read(self.length)
+
             def write_data(inst, data, offset):
                 checksum.update(data)
                 test_file.write(data)
 
-            self.stubs.Set(service.rbd.Image, 'write', write_data)
+            self.stubs.Set(self.service.rbd.Image, 'read', read_data)
+            self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
+            self.stubs.Set(self.service.rbd.Image, 'write', write_data)
 
-            service._transfer_data(self.volume_file, service.rbd.Image(),
-                                   'foo', self.length, dest_is_rbd=True)
+            rbd1 = self.service.rbd.Image()
+            rbd2 = self.service.rbd.Image()
+
+            src_rbd_io = self._get_wrapped_rbd_io(rbd1)
+            dest_rbd_io = self._get_wrapped_rbd_io(rbd2)
+            self.service._transfer_data(src_rbd_io, 'src_foo', dest_rbd_io,
+                                        'dest_foo', self.length)
 
             # Ensure the files are equal
             self.assertEquals(checksum.digest(), self.checksum.digest())
 
-    def test_backup_volume_from_file(self):
-        service = CephBackupDriver(self.ctxt)
+    def test_transfer_data_from_file_to_rbd(self):
+        self._set_common_backup_stubs(self.service)
 
         with tempfile.NamedTemporaryFile() as test_file:
+            self.volume_file.seek(0)
             checksum = hashlib.sha256()
 
             def write_data(inst, data, offset):
                 checksum.update(data)
                 test_file.write(data)
 
-            self.stubs.Set(service.rbd.Image, 'write', write_data)
+            self.stubs.Set(self.service.rbd.Image, 'write', write_data)
 
-            service._backup_volume_from_file('foo', self.length,
-                                             self.volume_file)
+            rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+            self.service._transfer_data(self.volume_file, 'src_foo',
+                                        rbd_io, 'dest_foo', self.length)
 
             # Ensure the files are equal
             self.assertEquals(checksum.digest(), self.checksum.digest())
 
-    def tearDown(self):
-        self.volume_file.close()
-        super(BackupCephTestCase, self).tearDown()
+    def test_transfer_data_from_file_to_file(self):
+        self._set_common_backup_stubs(self.service)
 
-    def test_backup_error1(self):
-        service = CephBackupDriver(self.ctxt)
-        backup = db.backup_get(self.ctxt, self.backup_id)
-        self._create_volume_db_entry(self.vol_id, 0)
-        self.assertRaises(exception.InvalidParameterValue, service.backup,
-                          backup, self.volume_file)
+        with tempfile.NamedTemporaryFile() as test_file:
+            self.volume_file.seek(0)
+            checksum = hashlib.sha256()
+
+            self.service._transfer_data(self.volume_file, 'src_foo', test_file,
+                                        'dest_foo', self.length)
+
+            checksum = hashlib.sha256()
+            test_file.seek(0)
+            for c in xrange(0, self.num_chunks):
+                checksum.update(test_file.read(self.chunk_size))
 
-    def test_backup_error2(self):
-        service = CephBackupDriver(self.ctxt)
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_backup_volume_from_file(self):
+        self._create_volume_db_entry(self.volume_id, 1)
         backup = db.backup_get(self.ctxt, self.backup_id)
-        self._create_volume_db_entry(self.vol_id, 1)
-        self.assertRaises(exception.BackupVolumeInvalidType, service.backup,
-                          backup, None)
 
-    def test_backup_good(self):
-        service = CephBackupDriver(self.ctxt)
+        self._set_common_backup_stubs(self.service)
+
+        with tempfile.NamedTemporaryFile() as test_file:
+            checksum = hashlib.sha256()
+
+            def write_data(inst, data, offset):
+                checksum.update(data)
+                test_file.write(data)
+
+            self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+
+            self.service.backup(backup, self.volume_file)
+
+            # Ensure the files are equal
+            self.assertEquals(checksum.digest(), self.checksum.digest())
+
+    def test_get_backup_base_name(self):
+        name = self.service._get_backup_base_name(self.volume_id,
+                                                  diff_format=True)
+        self.assertEquals(name, "volume-%s.backup.base" % (self.volume_id))
+
+        self.assertRaises(exception.InvalidParameterValue,
+                          self.service._get_backup_base_name,
+                          self.volume_id)
+
+        name = self.service._get_backup_base_name(self.volume_id, '1234')
+        self.assertEquals(name, "volume-%s.backup.%s" %
+                          (self.volume_id, '1234'))
+
+    def test_backup_volume_from_rbd(self):
+        self._create_volume_db_entry(self.volume_id, 1)
         backup = db.backup_get(self.ctxt, self.backup_id)
-        self._create_volume_db_entry(self.vol_id, 1)
+
+        self._set_common_backup_stubs(self.service)
+
+        backup_name = self.service._get_backup_base_name(self.backup_id,
+                                                         diff_format=True)
+
+        self.stubs.Set(self.service, '_try_delete_base_image',
+                       lambda *args, **kwargs: None)
 
         with tempfile.NamedTemporaryFile() as test_file:
             checksum = hashlib.sha256()
@@ -182,27 +328,57 @@ class BackupCephTestCase(test.TestCase):
                 checksum.update(data)
                 test_file.write(data)
 
-            self.stubs.Set(service.rbd.Image, 'write', write_data)
+            def read_data(inst, offset, length):
+                return self.volume_file.read(self.length)
+
+            def rbd_list(inst, ioctx):
+                return [backup_name]
+
+            self.stubs.Set(self.service.rbd.Image, 'read', read_data)
+            self.stubs.Set(self.service.rbd.Image, 'write', write_data)
+            self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+            meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(),
+                                              'pool_foo', 'user_foo',
+                                              'conf_foo')
+            rbd_io = rbddriver.RBDImageIOWrapper(meta)
 
-            service.backup(backup, self.volume_file)
+            self.service.backup(backup, rbd_io)
 
             # Ensure the files are equal
             self.assertEquals(checksum.digest(), self.checksum.digest())
 
+    def test_backup_vol_length_0(self):
+        self._set_common_backup_stubs(self.service)
+
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        self._create_volume_db_entry(self.volume_id, 0)
+        self.assertRaises(exception.InvalidParameterValue, self.service.backup,
+                          backup, self.volume_file)
+
     def test_restore(self):
-        service = CephBackupDriver(self.ctxt)
-        self._create_volume_db_entry(self.vol_id, 1)
+        self._create_volume_db_entry(self.volume_id, 1)
         backup = db.backup_get(self.ctxt, self.backup_id)
 
+        self._set_common_restore_stubs(self.service)
+
+        backup_name = self.service._get_backup_base_name(self.backup_id,
+                                                         diff_format=True)
+
+        def rbd_list(inst, ioctx):
+            return [backup_name]
+
+        self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
         with tempfile.NamedTemporaryFile() as test_file:
             self.volume_file.seek(0)
 
             def read_data(inst, offset, length):
                 return self.volume_file.read(self.length)
 
-            self.stubs.Set(service.rbd.Image, 'read', read_data)
+            self.stubs.Set(self.service.rbd.Image, 'read', read_data)
 
-            service.restore(backup, self.vol_id, test_file)
+            self.service.restore(backup, self.volume_id, test_file)
 
             checksum = hashlib.sha256()
             test_file.seek(0)
@@ -212,17 +388,183 @@ class BackupCephTestCase(test.TestCase):
             # Ensure the files are equal
             self.assertEquals(checksum.digest(), self.checksum.digest())
 
-    def test_delete(self):
-        service = CephBackupDriver(self.ctxt)
-        self._create_volume_db_entry(self.vol_id, 1)
+    def test_create_base_image_if_not_exists(self):
+        pass
+
+    def test_delete_backup_snapshots(self):
+        snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
+        base_name = self.service._get_backup_base_name(self.volume_id,
+                                                       diff_format=True)
+
+        self.stubs.Set(self.service, '_get_backup_snap_name',
+                       lambda *args: snap_name)
+
+        self.stubs.Set(self.service, 'get_backup_snaps',
+                       lambda *args: None)
+
+        rem = self.service._delete_backup_snapshots(mock_rados(), base_name,
+                                                    self.backup_id)
+
+        self.assertEquals(rem, 0)
+
+    def test_try_delete_base_image_diff_format(self):
+        # don't create volume db entry since it should not be required
         backup = db.backup_get(self.ctxt, self.backup_id)
 
+        backup_name = self.service._get_backup_base_name(self.volume_id,
+                                                         diff_format=True)
+
+        snap_name = self.service._get_new_snap_name(self.backup_id)
+        snaps = [{'name': snap_name}]
+
+        def rbd_list(*args):
+            return [backup_name]
+
+        def list_snaps(*args):
+            return snaps
+
+        def remove_snap(*args):
+            snaps.pop()
+
+        self.stubs.Set(self.service.rbd.Image, 'remove_snap', remove_snap)
+        self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
+        self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
         # Must be something mutable
         remove_called = []
 
         def remove(inst, ioctx, name):
             remove_called.append(True)
 
-        self.stubs.Set(service.rbd.RBD, 'remove', remove)
-        service.delete(backup)
+        self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+        self.service.delete(backup)
         self.assertTrue(remove_called[0])
+
+    def test_try_delete_base_image(self):
+        # don't create volume db entry since it should not be required
+        self._create_volume_db_entry(self.volume_id, 1)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        backup_name = self.service._get_backup_base_name(self.volume_id,
+                                                         self.backup_id)
+
+        def rbd_list(inst, ioctx):
+            return [backup_name]
+
+        self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+        # Must be something mutable
+        remove_called = []
+
+        self.stubs.Set(self.service, 'get_backup_snaps',
+                       lambda *args, **kwargs: None)
+
+        def remove(inst, ioctx, name):
+            remove_called.append(True)
+
+        self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+        self.service.delete(backup)
+        self.assertTrue(remove_called[0])
+
+    def test_try_delete_base_image_busy(self):
+        """This should induce retries then raise rbd.ImageBusy."""
+        # don't create volume db entry since it should not be required
+        self._create_volume_db_entry(self.volume_id, 1)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        backup_name = self.service._get_backup_base_name(self.volume_id,
+                                                         self.backup_id)
+
+        def rbd_list(inst, ioctx):
+            return [backup_name]
+
+        self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
+
+        # Must be something mutable
+        remove_called = []
+
+        self.stubs.Set(self.service, 'get_backup_snaps',
+                       lambda *args, **kwargs: None)
+
+        def remove(inst, ioctx, name):
+            raise self.service.rbd.ImageBusy("image busy")
+
+        self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
+
+        self.assertRaises(self.service.rbd.ImageBusy,
+                          self.service._try_delete_base_image,
+                          backup['id'], backup['volume_id'])
+
+    def test_delete(self):
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        def del_base_image(*args):
+            pass
+
+        self.stubs.Set(self.service, '_try_delete_base_image',
+                       lambda *args: None)
+
+        self.service.delete(backup)
+
+    def test_delete_image_not_found(self):
+        backup = db.backup_get(self.ctxt, self.backup_id)
+
+        def del_base_image(*args):
+            raise self.service.rbd.ImageNotFound
+
+        self.stubs.Set(self.service, '_try_delete_base_image',
+                       lambda *args: None)
+
+        # ImageNotFound exception is caught so that db entry can be cleared
+        self.service.delete(backup)
+
+    def test_diff_restore_allowed_true(self):
+        is_allowed = (True, 'restore.foo')
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        alt_volume_id = str(uuid.uuid4())
+        self._create_volume_db_entry(alt_volume_id, 1)
+        alt_volume = db.volume_get(self.ctxt, alt_volume_id)
+        rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+
+        self.stubs.Set(self.service, '_get_restore_point',
+                       lambda *args: 'restore.foo')
+
+        self.stubs.Set(self.service, '_rbd_has_extents',
+                       lambda *args: False)
+
+        self.stubs.Set(self.service, '_rbd_image_exists',
+                       lambda *args: (True, 'foo'))
+
+        self.stubs.Set(self.service, '_file_is_rbd', lambda *args: True)
+
+        resp = self.service._diff_restore_allowed('foo', backup, alt_volume,
+                                                  rbd_io, mock_rados())
+        self.assertEquals(resp, is_allowed)
+
+    def test_diff_restore_allowed_false(self):
+        not_allowed = (False, None)
+        backup = db.backup_get(self.ctxt, self.backup_id)
+        self._create_volume_db_entry(self.volume_id, 1)
+        original_volume = db.volume_get(self.ctxt, self.volume_id)
+        rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
+
+        self.stubs.Set(self.service, '_get_restore_point',
+                       lambda *args: None)
+
+        self.stubs.Set(self.service, '_rbd_has_extents',
+                       lambda *args: True)
+
+        self.stubs.Set(self.service, '_rbd_image_exists',
+                       lambda *args: (False, 'foo'))
+
+        self.stubs.Set(self.service, '_file_is_rbd', lambda *args: False)
+
+        resp = self.service._diff_restore_allowed('foo', backup,
+                                                  original_volume, rbd_io,
+                                                  mock_rados())
+        self.assertEquals(resp, not_allowed)
+
+    def tearDown(self):
+        self.volume_file.close()
+        self.stubs.UnsetAll()
+        super(BackupCephTestCase, self).tearDown()
index 024a046c2302495232605908e08bdefc0b7d8953..473172c51f9b527c917f87023d2089a078ee4725 100644 (file)
@@ -127,9 +127,13 @@ class RBDTestCase(test.TestCase):
         volume = dict(name=name)
         mock_client = self.mox.CreateMockAnything()
         self.mox.StubOutWithMock(driver, 'RADOSClient')
+        self.stubs.Set(self.driver, '_get_backup_snaps', lambda *args: None)
 
         driver.RADOSClient(self.driver).AndReturn(mock_client)
         mock_client.__enter__().AndReturn(mock_client)
+        mock_image = self.mox.CreateMockAnything()
+        self.rbd.Image(mox.IgnoreArg(), str(name)).AndReturn(mock_image)
+        mock_image.close()
         mock_rbd = self.mox.CreateMockAnything()
         self.rbd.RBD().AndReturn(mock_rbd)
         mock_rbd.remove(mox.IgnoreArg(), str(name))
index b5a8b0a71ad555f062d506b98b077b4bfedf8610..5fa180b8c98d47f320a66b0b97f63e501c0a2d0d 100644 (file)
@@ -24,17 +24,15 @@ import os
 import tempfile
 import urllib
 
-from oslo.config import cfg
-
+import cinder.backup.drivers.ceph as ceph_backup
 from cinder import exception
 from cinder.image import image_utils
-from cinder import units
-from cinder import utils
-
 from cinder.openstack.common import fileutils
 from cinder.openstack.common import log as logging
+from cinder import units
+from cinder import utils
 from cinder.volume import driver
-
+from oslo.config import cfg
 
 try:
     import rados
@@ -85,28 +83,53 @@ def ascii_str(string):
     return str(string)
 
 
+class RBDImageMetadata(object):
+    """RBD image metadata to be used with RBDImageIOWrapper"""
+    def __init__(self, image, pool, user, conf):
+        self.image = image
+        self.pool = str(pool)
+        self.user = str(user)
+        self.conf = str(conf)
+
+
 class RBDImageIOWrapper(io.RawIOBase):
     """
     Wrapper to provide standard Python IO interface to RBD images so that they
     can be treated as files.
     """
 
-    def __init__(self, rbd_image):
+    def __init__(self, rbd_meta):
         super(RBDImageIOWrapper, self).__init__()
-        self.rbd_image = rbd_image
+        self._rbd_meta = rbd_meta
         self._offset = 0
 
     def _inc_offset(self, length):
         self._offset += length
 
+    @property
+    def rbd_image(self):
+        return self._rbd_meta.image
+
+    @property
+    def rbd_user(self):
+        return self._rbd_meta.user
+
+    @property
+    def rbd_pool(self):
+        return self._rbd_meta.pool
+
+    @property
+    def rbd_conf(self):
+        return self._rbd_meta.conf
+
     def read(self, length=None):
         offset = self._offset
-        total = self.rbd_image.size()
+        total = self._rbd_meta.image.size()
 
         # (dosaboy): posix files do not barf if you read beyond their length
         # (they just return nothing) but rbd images do so we need to return
         # empty string if we are at the end of the image
-        if (offset == total):
+        if (offset >= total):
             return ''
 
         if length is None:
@@ -116,10 +139,10 @@ class RBDImageIOWrapper(io.RawIOBase):
             length = total - offset
 
         self._inc_offset(length)
-        return self.rbd_image.read(int(offset), int(length))
+        return self._rbd_meta.image.read(int(offset), int(length))
 
     def write(self, data):
-        self.rbd_image.write(data, self._offset)
+        self._rbd_meta.image.write(data, self._offset)
         self._inc_offset(len(data))
 
     def seekable(self):
@@ -147,10 +170,9 @@ class RBDImageIOWrapper(io.RawIOBase):
 
     def flush(self):
         try:
-            self.rbd_image.flush()
-        except AttributeError as exc:
-            LOG.warning("flush() not supported in this version of librbd - "
-                        "%s" % (str(rbd.RBD().version())))
+            self._rbd_meta.image.flush()
+        except AttributeError:
+            LOG.warning(_("flush() not supported in this version of librbd"))
 
     def fileno(self):
         """
@@ -274,6 +296,14 @@ class RBDDriver(driver.VolumeDriver):
         ioctx.close()
         client.shutdown()
 
+    def _get_backup_snaps(self, rbd_image):
+        """Get list of any backup snapshots that exist on this volume.
+
+        There should only ever be one but accept all since they need to be
+        deleted before the volume can be.
+        """
+        return ceph_backup.CephBackupDriver.get_backup_snaps(rbd_image)
+
     def _get_mon_addrs(self):
         args = ['ceph', 'mon', 'dump', '--format=json'] + self._ceph_args()
         out, _ = self._execute(*args)
@@ -386,6 +416,16 @@ class RBDDriver(driver.VolumeDriver):
     def delete_volume(self, volume):
         """Deletes a logical volume."""
         with RADOSClient(self) as client:
+            # Ensure any backup snapshots are deleted
+            rbd_image = self.rbd.Image(client.ioctx, str(volume['name']))
+            try:
+                backup_snaps = self._get_backup_snaps(rbd_image)
+                if backup_snaps:
+                    for snap in backup_snaps:
+                        rbd_image.remove_snap(snap['name'])
+            finally:
+                rbd_image.close()
+
             try:
                 self.rbd.RBD().remove(client.ioctx, str(volume['name']))
             except self.rbd.ImageHasSnapshots:
@@ -540,8 +580,11 @@ class RBDDriver(driver.VolumeDriver):
         pool = self.configuration.rbd_pool
         volname = volume['name']
 
-        with RBDVolumeProxy(self, volname, pool, read_only=True) as rbd_image:
-            rbd_fd = RBDImageIOWrapper(rbd_image)
+        with RBDVolumeProxy(self, volname, pool) as rbd_image:
+            rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
+                                        self.configuration.rbd_user,
+                                        self.configuration.rbd_ceph_conf)
+            rbd_fd = RBDImageIOWrapper(rbd_meta)
             backup_service.backup(backup, rbd_fd)
 
         LOG.debug("volume backup complete.")
@@ -551,7 +594,10 @@ class RBDDriver(driver.VolumeDriver):
         pool = self.configuration.rbd_pool
 
         with RBDVolumeProxy(self, volume['name'], pool) as rbd_image:
-            rbd_fd = RBDImageIOWrapper(rbd_image)
+            rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
+                                        self.configuration.rbd_user,
+                                        self.configuration.rbd_ceph_conf)
+            rbd_fd = RBDImageIOWrapper(rbd_meta)
             backup_service.restore(backup, volume['id'], rbd_fd)
 
         LOG.debug("volume restore complete.")
index 2a80c6071880e5806996583dcb71cad660429956..c605266efdd7e1ebfc2b6de0c18d98b12e9edb9c 100644 (file)
 # Options defined in cinder.volume.drivers.rbd
 #
 
+# path to the ceph configuration file to use (string)
+#rbd_ceph_conf=/etc/ceph/ceph.conf
+
 # the RADOS pool in which rbd volumes are stored (string
 # value)
 #rbd_pool=rbd